Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyrecest/distributions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
SineSkewedWrappedCauchyDistribution,
SineSkewedWrappedNormalDistribution,
)
from .circle.piecewise_constant_distribution import PiecewiseConstantDistribution # noqa: F401
from .circle.von_mises_distribution import VonMisesDistribution
from .circle.wrapped_cauchy_distribution import WrappedCauchyDistribution
from .circle.wrapped_laplace_distribution import WrappedLaplaceDistribution
Expand Down
197 changes: 197 additions & 0 deletions pyrecest/distributions/circle/piecewise_constant_distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
# pylint: disable=no-name-in-module,no-member,redefined-builtin
from pyrecest.backend import mod, pi, array, mean, floor, zeros, exp, sum, log, random

from .abstract_circular_distribution import AbstractCircularDistribution


class PiecewiseConstantDistribution(AbstractCircularDistribution):
"""Piecewise constant (i.e. discrete) circular distribution, similar to a histogram.

The circle [0, 2*pi) is divided into n equal intervals, each with a constant
probability density weight.

Gerhard Kurz, Florian Pfaff, Uwe D. Hanebeck,
Discrete Recursive Bayesian Filtering on Intervals and the Unit Circle
Proceedings of the 2016 IEEE International Conference on Multisensor Fusion
and Integration for Intelligent Systems (MFI 2016),
Baden-Baden, Germany, September 2016.
"""

def __init__(self, w):
"""Initialize with a weight vector that is automatically normalized.

Parameters
----------
w : array_like, shape (n,)
Weight for each interval (will be normalized to form a valid pdf).
"""
AbstractCircularDistribution.__init__(self)
w = array(w, dtype=float).ravel()
assert w.ndim == 1 and w.size > 0
self.w = w / (mean(w) * 2.0 * pi)

def pdf(self, xs):
"""Evaluate the pdf at each point in xs.

Parameters
----------
xs : array_like, shape (n,)
Points at which to evaluate the pdf.

Returns
-------
p : ndarray, shape (n,)
Pdf values at each point.
"""
assert xs.ndim == 1
n_intervals = len(self.w)
xs_mod = array(mod(xs, 2.0 * pi), dtype=float)
idx = array(
[min(int(floor(x / (2.0 * pi) * n_intervals)), n_intervals - 1) for x in xs_mod]
)
return self.w[idx]

def trigonometric_moment(self, n):
"""Calculate the n-th trigonometric moment analytically.

Parameters
----------
n : int
Moment order.

Returns
-------
m : complex
n-th trigonometric moment.
"""
if n == 0:
return 1.0 + 0j
num = len(self.w)
interv = zeros(num, dtype=complex)
for j in range(1, num + 1):
left = PiecewiseConstantDistribution.left_border(j, num)
r = PiecewiseConstantDistribution.right_border(j, num)
c = PiecewiseConstantDistribution.interval_center(j, num)
w_j = float(self.pdf(array([c]))[0])
interv[j - 1] = w_j * (exp(1j * n * r) - exp(1j * n * left))
return complex(-1j / n * sum(interv))

def entropy(self):
"""Calculate the entropy analytically.

Returns
-------
e : float
Entropy of the distribution.
"""
n = len(self.w)
return float(-2.0 * pi / n * sum(self.w * log(self.w)))

def sample(self, n):
"""Draw n random samples from the distribution.

Parameters
----------
n : int
Number of samples to draw.

Returns
-------
samples : ndarray, shape (n,)
Samples in [0, 2*pi).
"""
num_intervals = len(self.w)
interval_width = 2.0 * pi / num_intervals
# Each interval has probability w[j] * interval_width, which sums to 1 by
# construction. Divide by sum anyway to guard against floating-point drift.
interval_probs = self.w * interval_width
interval_probs /= interval_probs.sum()
interval_indices = random.choice(num_intervals, size=n, p=interval_probs)
return interval_indices * interval_width + random.uniform(
0.0, interval_width, size=n
)

@staticmethod
def left_border(m, n):
"""Left border of the m-th interval (1-indexed) for n total intervals.

Parameters
----------
m : int
Interval index (1-indexed).
n : int
Total number of intervals.

Returns
-------
float
Left border of the m-th interval.
"""
assert 1 <= m <= n
return 2.0 * pi / n * (m - 1)

@staticmethod
def right_border(m, n):
"""Right border of the m-th interval (1-indexed) for n total intervals.

Parameters
----------
m : int
Interval index (1-indexed).
n : int
Total number of intervals.

Returns
-------
float
Right border of the m-th interval.
"""
assert 1 <= m <= n
return 2.0 * pi / n * m

@staticmethod
def interval_center(m, n):
"""Center of the m-th interval (1-indexed) for n total intervals.

Parameters
----------
m : int
Interval index (1-indexed).
n : int
Total number of intervals.

Returns
-------
float
Center of the m-th interval.
"""
assert 1 <= m <= n
return 2.0 * pi / n * (m - 0.5)

@staticmethod
def calculate_parameters_numerically(pdf_func, n):
"""Calculate weights by numerically integrating a given pdf over each interval.

Parameters
----------
pdf_func : callable
Pdf of a circular density; accepts a 1-D array and returns a 1-D array.
n : int
Number of discretization intervals.

Returns
-------
w : ndarray, shape (n,)
Weights of the corresponding PiecewiseConstantDistribution.
"""
from scipy.integrate import quad # pylint: disable=import-outside-toplevel

assert n >= 1
w = zeros(n)
for j in range(1, n + 1):
left = PiecewiseConstantDistribution.left_border(j, n)
r = PiecewiseConstantDistribution.right_border(j, n)
w[j - 1] = quad(
lambda x: float(pdf_func(array([x]))), left, r
)[0]
return w
116 changes: 116 additions & 0 deletions pyrecest/tests/distributions/test_piecewise_constant_distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import unittest

import numpy.testing as npt

# pylint: disable=no-name-in-module,no-member,redefined-builtin
from pyrecest.backend import linspace, sum, exp, log, mean, pi, array
from pyrecest.distributions.circle.piecewise_constant_distribution import (
PiecewiseConstantDistribution,
)
from pyrecest.distributions.circle.wrapped_normal_distribution import (
WrappedNormalDistribution,
)


class PiecewiseConstantDistributionTest(unittest.TestCase):
def setUp(self):
self.w = array([1, 2, 3, 4, 5, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1], dtype=float)
self.normal = 1.0 / (2.0 * pi * mean(self.w))
self.dist = PiecewiseConstantDistribution(self.w)

def test_pdf(self):
npt.assert_allclose(
self.dist.pdf(array([0.0])), array([1 * self.normal]), rtol=1e-10
)
npt.assert_allclose(
self.dist.pdf(array([4.2])), array([5 * self.normal]), rtol=1e-10
)
npt.assert_allclose(
self.dist.pdf(array([10.9])), array([4 * self.normal]), rtol=1e-10
)

def test_integral_normalized(self):
"""Verify the distribution integrates to 1 via the exact sum."""
n = len(self.dist.w)
npt.assert_allclose(
sum(self.dist.w) * (2.0 * pi / n), 1.0, rtol=1e-10
)

def test_integral_partial(self):
"""Verify partial integrals sum to 1 using a fine grid."""
M = 1_000_000
xs = linspace(0, 2.0 * pi, M, endpoint=False)
dx = 2.0 * pi / M
pdf_vals = self.dist.pdf(xs)
first_half = sum(pdf_vals[xs < pi]) * dx
second_half = sum(pdf_vals[xs >= pi]) * dx
npt.assert_allclose(first_half + second_half, 1.0, rtol=1e-4)

def test_trigonometric_moment(self):
"""Verify analytical trigonometric moments using a fine-grid reference."""
M = 1_000_000
xs = linspace(0, 2.0 * pi, M, endpoint=False)
pdf_vals = self.dist.pdf(xs)
dx = 2.0 * pi / M
for n_moment in [1, 2, 3]:
expected = sum(pdf_vals * exp(1j * n_moment * xs)) * dx
with self.subTest(n=n_moment):
npt.assert_allclose(
self.dist.trigonometric_moment(n_moment), expected, rtol=1e-4
)

def test_interval_borders(self):
self.assertAlmostEqual(
PiecewiseConstantDistribution.left_border(1, 2), 0.0 * 2.0 * pi
)
self.assertAlmostEqual(
PiecewiseConstantDistribution.interval_center(1, 2),
1.0 / 4.0 * 2.0 * pi,
)
self.assertAlmostEqual(
PiecewiseConstantDistribution.right_border(1, 2),
1.0 / 2.0 * 2.0 * pi,
)
self.assertAlmostEqual(
PiecewiseConstantDistribution.left_border(2, 2),
1.0 / 2.0 * 2.0 * pi,
)
self.assertAlmostEqual(
PiecewiseConstantDistribution.interval_center(2, 2),
3.0 / 4.0 * 2.0 * pi,
)
self.assertAlmostEqual(
PiecewiseConstantDistribution.right_border(2, 2), 1.0 * 2.0 * pi
)

def test_calculate_parameters_numerically(self):
"""More samples should yield better moment matching."""
wn = WrappedNormalDistribution(array(2.0), array(1.3))
w1 = PiecewiseConstantDistribution.calculate_parameters_numerically(wn.pdf, 40)
w2 = PiecewiseConstantDistribution.calculate_parameters_numerically(wn.pdf, 45)
w3 = PiecewiseConstantDistribution.calculate_parameters_numerically(wn.pdf, 50)
p1 = PiecewiseConstantDistribution(w1)
p2 = PiecewiseConstantDistribution(w2)
p3 = PiecewiseConstantDistribution(w3)
delta1 = abs(wn.trigonometric_moment(1) - p1.trigonometric_moment(1))
delta2 = abs(wn.trigonometric_moment(1) - p2.trigonometric_moment(1))
delta3 = abs(wn.trigonometric_moment(1) - p3.trigonometric_moment(1))
self.assertLessEqual(delta2, delta1)
self.assertLessEqual(delta3, delta2)

def test_entropy(self):
"""Verify analytical entropy against the direct formula."""
w = self.dist.w
n = len(w)
expected = -2.0 * pi / n * sum(w * log(w))
npt.assert_allclose(self.dist.entropy(), expected, rtol=1e-10)

def test_sample(self):
samples = self.dist.sample(100)
self.assertEqual(len(samples), 100)
self.assertTrue(all(samples >= 0.0))
self.assertTrue(all(samples < 2.0 * pi))


if __name__ == "__main__":
unittest.main()
Loading