diff --git a/pyrecest/distributions/__init__.py b/pyrecest/distributions/__init__.py index 72efac1ad..771e942f5 100644 --- a/pyrecest/distributions/__init__.py +++ b/pyrecest/distributions/__init__.py @@ -84,6 +84,7 @@ SineSkewedWrappedCauchyDistribution, SineSkewedWrappedNormalDistribution, ) +from .circle.piecewise_constant_distribution import PiecewiseConstantDistribution # noqa: F401 from .circle.von_mises_distribution import VonMisesDistribution from .circle.wrapped_cauchy_distribution import WrappedCauchyDistribution from .circle.wrapped_laplace_distribution import WrappedLaplaceDistribution diff --git a/pyrecest/distributions/circle/piecewise_constant_distribution.py b/pyrecest/distributions/circle/piecewise_constant_distribution.py new file mode 100644 index 000000000..c3b8755e6 --- /dev/null +++ b/pyrecest/distributions/circle/piecewise_constant_distribution.py @@ -0,0 +1,197 @@ +# pylint: disable=no-name-in-module,no-member,redefined-builtin +from pyrecest.backend import mod, pi, array, mean, floor, zeros, exp, sum, log, random + +from .abstract_circular_distribution import AbstractCircularDistribution + + +class PiecewiseConstantDistribution(AbstractCircularDistribution): + """Piecewise constant (i.e. discrete) circular distribution, similar to a histogram. + + The circle [0, 2*pi) is divided into n equal intervals, each with a constant + probability density weight. + + Gerhard Kurz, Florian Pfaff, Uwe D. Hanebeck, + Discrete Recursive Bayesian Filtering on Intervals and the Unit Circle + Proceedings of the 2016 IEEE International Conference on Multisensor Fusion + and Integration for Intelligent Systems (MFI 2016), + Baden-Baden, Germany, September 2016. + """ + + def __init__(self, w): + """Initialize with a weight vector that is automatically normalized. + + Parameters + ---------- + w : array_like, shape (n,) + Weight for each interval (will be normalized to form a valid pdf). + """ + AbstractCircularDistribution.__init__(self) + w = array(w, dtype=float).ravel() + assert w.ndim == 1 and w.shape[0] > 0 + self.w = w / (mean(w) * 2.0 * pi) + + def pdf(self, xs): + """Evaluate the pdf at each point in xs. + + Parameters + ---------- + xs : array_like, shape (n,) + Points at which to evaluate the pdf. + + Returns + ------- + p : ndarray, shape (n,) + Pdf values at each point. + """ + assert xs.ndim == 1 + n_intervals = len(self.w) + xs_mod = array(mod(xs, 2.0 * pi), dtype=float) + idx = array( + [min(int(floor(x / (2.0 * pi) * n_intervals)), n_intervals - 1) for x in xs_mod] + ) + return self.w[idx] + + def trigonometric_moment(self, n): + """Calculate the n-th trigonometric moment analytically. + + Parameters + ---------- + n : int + Moment order. + + Returns + ------- + m : complex + n-th trigonometric moment. + """ + if n == 0: + return 1.0 + 0j + num = len(self.w) + interv = zeros(num, dtype=complex) + for j in range(1, num + 1): + left = PiecewiseConstantDistribution.left_border(j, num) + r = PiecewiseConstantDistribution.right_border(j, num) + c = PiecewiseConstantDistribution.interval_center(j, num) + w_j = float(self.pdf(array([c]))[0]) + interv[j - 1] = w_j * (exp(1j * n * r) - exp(1j * n * left)) + return complex(-1j / n * sum(interv)) + + def entropy(self): + """Calculate the entropy analytically. + + Returns + ------- + e : float + Entropy of the distribution. + """ + n = len(self.w) + return float(-2.0 * pi / n * sum(self.w * log(self.w))) + + def sample(self, n): + """Draw n random samples from the distribution. + + Parameters + ---------- + n : int + Number of samples to draw. + + Returns + ------- + samples : ndarray, shape (n,) + Samples in [0, 2*pi). + """ + num_intervals = len(self.w) + interval_width = 2.0 * pi / num_intervals + # Each interval has probability w[j] * interval_width, which sums to 1 by + # construction. Divide by sum anyway to guard against floating-point drift. + interval_probs = self.w * interval_width + interval_probs /= interval_probs.sum() + interval_indices = random.choice(num_intervals, size=n, p=interval_probs) + return interval_indices * interval_width + random.uniform( + 0.0, interval_width, size=n + ) + + @staticmethod + def left_border(m, n): + """Left border of the m-th interval (1-indexed) for n total intervals. + + Parameters + ---------- + m : int + Interval index (1-indexed). + n : int + Total number of intervals. + + Returns + ------- + float + Left border of the m-th interval. + """ + assert 1 <= m <= n + return 2.0 * pi / n * (m - 1) + + @staticmethod + def right_border(m, n): + """Right border of the m-th interval (1-indexed) for n total intervals. + + Parameters + ---------- + m : int + Interval index (1-indexed). + n : int + Total number of intervals. + + Returns + ------- + float + Right border of the m-th interval. + """ + assert 1 <= m <= n + return 2.0 * pi / n * m + + @staticmethod + def interval_center(m, n): + """Center of the m-th interval (1-indexed) for n total intervals. + + Parameters + ---------- + m : int + Interval index (1-indexed). + n : int + Total number of intervals. + + Returns + ------- + float + Center of the m-th interval. + """ + assert 1 <= m <= n + return 2.0 * pi / n * (m - 0.5) + + @staticmethod + def calculate_parameters_numerically(pdf_func, n): + """Calculate weights by numerically integrating a given pdf over each interval. + + Parameters + ---------- + pdf_func : callable + Pdf of a circular density; accepts a 1-D array and returns a 1-D array. + n : int + Number of discretization intervals. + + Returns + ------- + w : ndarray, shape (n,) + Weights of the corresponding PiecewiseConstantDistribution. + """ + from scipy.integrate import quad # pylint: disable=import-outside-toplevel + + assert n >= 1 + w = zeros(n) + for j in range(1, n + 1): + left = PiecewiseConstantDistribution.left_border(j, n) + r = PiecewiseConstantDistribution.right_border(j, n) + w[j - 1] = quad( + lambda x: float(pdf_func(array([x]))), left, r + )[0] + return w diff --git a/pyrecest/tests/distributions/test_piecewise_constant_distribution.py b/pyrecest/tests/distributions/test_piecewise_constant_distribution.py new file mode 100644 index 000000000..f65e9f61e --- /dev/null +++ b/pyrecest/tests/distributions/test_piecewise_constant_distribution.py @@ -0,0 +1,116 @@ +import unittest + +import numpy.testing as npt + +# pylint: disable=no-name-in-module,no-member,redefined-builtin +from pyrecest.backend import linspace, sum, exp, log, mean, pi, array +from pyrecest.distributions.circle.piecewise_constant_distribution import ( + PiecewiseConstantDistribution, +) +from pyrecest.distributions.circle.wrapped_normal_distribution import ( + WrappedNormalDistribution, +) + + +class PiecewiseConstantDistributionTest(unittest.TestCase): + def setUp(self): + self.w = array([1, 2, 3, 4, 5, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1], dtype=float) + self.normal = 1.0 / (2.0 * pi * mean(self.w)) + self.dist = PiecewiseConstantDistribution(self.w) + + def test_pdf(self): + npt.assert_allclose( + self.dist.pdf(array([0.0])), array([1 * self.normal]), rtol=1e-10 + ) + npt.assert_allclose( + self.dist.pdf(array([4.2])), array([5 * self.normal]), rtol=1e-10 + ) + npt.assert_allclose( + self.dist.pdf(array([10.9])), array([4 * self.normal]), rtol=1e-10 + ) + + def test_integral_normalized(self): + """Verify the distribution integrates to 1 via the exact sum.""" + n = len(self.dist.w) + npt.assert_allclose( + sum(self.dist.w) * (2.0 * pi / n), 1.0, rtol=1e-10 + ) + + def test_integral_partial(self): + """Verify partial integrals sum to 1 using a fine grid.""" + M = 1_000_000 + xs = linspace(0, 2.0 * pi, M, endpoint=False) + dx = 2.0 * pi / M + pdf_vals = self.dist.pdf(xs) + first_half = sum(pdf_vals[xs < pi]) * dx + second_half = sum(pdf_vals[xs >= pi]) * dx + npt.assert_allclose(first_half + second_half, 1.0, rtol=1e-4) + + def test_trigonometric_moment(self): + """Verify analytical trigonometric moments using a fine-grid reference.""" + M = 1_000_000 + xs = linspace(0, 2.0 * pi, M, endpoint=False) + pdf_vals = self.dist.pdf(xs) + dx = 2.0 * pi / M + for n_moment in [1, 2, 3]: + expected = sum(pdf_vals * exp(1j * n_moment * xs)) * dx + with self.subTest(n=n_moment): + npt.assert_allclose( + self.dist.trigonometric_moment(n_moment), expected, rtol=1e-4 + ) + + def test_interval_borders(self): + self.assertAlmostEqual( + PiecewiseConstantDistribution.left_border(1, 2), 0.0 * 2.0 * pi + ) + self.assertAlmostEqual( + PiecewiseConstantDistribution.interval_center(1, 2), + 1.0 / 4.0 * 2.0 * pi, + ) + self.assertAlmostEqual( + PiecewiseConstantDistribution.right_border(1, 2), + 1.0 / 2.0 * 2.0 * pi, + ) + self.assertAlmostEqual( + PiecewiseConstantDistribution.left_border(2, 2), + 1.0 / 2.0 * 2.0 * pi, + ) + self.assertAlmostEqual( + PiecewiseConstantDistribution.interval_center(2, 2), + 3.0 / 4.0 * 2.0 * pi, + ) + self.assertAlmostEqual( + PiecewiseConstantDistribution.right_border(2, 2), 1.0 * 2.0 * pi + ) + + def test_calculate_parameters_numerically(self): + """More samples should yield better moment matching.""" + wn = WrappedNormalDistribution(array(2.0), array(1.3)) + w1 = PiecewiseConstantDistribution.calculate_parameters_numerically(wn.pdf, 40) + w2 = PiecewiseConstantDistribution.calculate_parameters_numerically(wn.pdf, 45) + w3 = PiecewiseConstantDistribution.calculate_parameters_numerically(wn.pdf, 50) + p1 = PiecewiseConstantDistribution(w1) + p2 = PiecewiseConstantDistribution(w2) + p3 = PiecewiseConstantDistribution(w3) + delta1 = abs(wn.trigonometric_moment(1) - p1.trigonometric_moment(1)) + delta2 = abs(wn.trigonometric_moment(1) - p2.trigonometric_moment(1)) + delta3 = abs(wn.trigonometric_moment(1) - p3.trigonometric_moment(1)) + self.assertLessEqual(delta2, delta1) + self.assertLessEqual(delta3, delta2) + + def test_entropy(self): + """Verify analytical entropy against the direct formula.""" + w = self.dist.w + n = len(w) + expected = -2.0 * pi / n * sum(w * log(w)) + npt.assert_allclose(self.dist.entropy(), expected, rtol=1e-10) + + def test_sample(self): + samples = self.dist.sample(100) + self.assertEqual(len(samples), 100) + self.assertTrue(all(samples >= 0.0)) + self.assertTrue(all(samples < 2.0 * pi)) + + +if __name__ == "__main__": + unittest.main()