diff --git a/pyrecest/distributions/__init__.py b/pyrecest/distributions/__init__.py index 72efac1ad..b4ecef0a4 100644 --- a/pyrecest/distributions/__init__.py +++ b/pyrecest/distributions/__init__.py @@ -72,6 +72,7 @@ PartiallyWrappedNormalDistribution, ) from .circle.abstract_circular_distribution import AbstractCircularDistribution +from .circle.generalized_von_mises_distribution import GvMDistribution from .circle.circular_dirac_distribution import CircularDiracDistribution from .circle.circular_fourier_distribution import CircularFourierDistribution from .circle.circular_mixture import CircularMixture @@ -222,6 +223,7 @@ ] __all__ = aliases + [ + "GvMDistribution", "GeneralizedKSineSkewedVonMisesDistribution", "AbstractBoundedDomainDistribution", "AbstractBoundedNonPeriodicDistribution", diff --git a/pyrecest/distributions/circle/generalized_von_mises_distribution.py b/pyrecest/distributions/circle/generalized_von_mises_distribution.py new file mode 100644 index 000000000..d5c672acf --- /dev/null +++ b/pyrecest/distributions/circle/generalized_von_mises_distribution.py @@ -0,0 +1,77 @@ +# pylint: disable=no-name-in-module,no-member,redefined-builtin +from pyrecest.backend import arange, array, cos, exp, sum + +from .abstract_circular_distribution import AbstractCircularDistribution + + +class GvMDistribution(AbstractCircularDistribution): + """ + Generalized von Mises distribution of arbitrary order. + + See Riccardo Gatto, Sreenivasa Rao Jammalamadaka, + "The Generalized von Mises Distribution", + Statistical Methodology, 2007. + + Parameters + ---------- + mu : array_like, shape (k,) + Location parameters. + kappa : array_like, shape (k,) + Concentration parameters, all must be positive. + """ + + def __init__(self, mu, kappa): + mu = array(mu, dtype=float) + kappa = array(kappa, dtype=float) + assert mu.ndim == 1, "mu must be a 1D array" + assert mu.shape == kappa.shape, "mu and kappa must have the same shape" + assert (kappa > 0).all(), "all kappa values must be positive" + AbstractCircularDistribution.__init__(self) + self.mu = mu + self.kappa = kappa + self._norm_const = None + + @property + def norm_const(self): + if self._norm_const is None: + self._norm_const = self.integrate_fun_over_domain( + lambda *args: self.pdf_unnormalized(array(args)), self.dim + ) + return self._norm_const + + def pdf_unnormalized(self, xs): + """ + Evaluate the unnormalized pdf at each point in xs. + + Parameters + ---------- + xs : array_like, shape (n,) + Points where the pdf is evaluated. + + Returns + ------- + p : array, shape (n,) + Unnormalized pdf values. + """ + xs = array(xs) + # j = [1, 2, ..., k], shape (k,) + j = arange(1, self.mu.shape[0] + 1, dtype=float) + # Broadcast: (k, 1) * ((1, n) - (k, 1)) → (k, n) + arg = j[:, None] * (xs[None, :] - self.mu[:, None]) + return exp(sum(self.kappa[:, None] * cos(arg), axis=0)) + + def pdf(self, xs): + """ + Evaluate the pdf at each point in xs. + + Parameters + ---------- + xs : array_like, shape (n,) + Points where the pdf is evaluated. + + Returns + ------- + p : array, shape (n,) + Pdf values. + """ + return self.pdf_unnormalized(xs) / self.norm_const diff --git a/pyrecest/tests/distributions/test_generalized_von_mises_distribution.py b/pyrecest/tests/distributions/test_generalized_von_mises_distribution.py new file mode 100644 index 000000000..c8bf4aac0 --- /dev/null +++ b/pyrecest/tests/distributions/test_generalized_von_mises_distribution.py @@ -0,0 +1,68 @@ +import unittest + +import numpy.testing as npt + +# pylint: disable=no-name-in-module,no-member,redefined-builtin +from pyrecest.backend import all, array, isinf, isnan, linspace, pi +from pyrecest.distributions import GvMDistribution, VonMisesDistribution + + +class TestGvMDistribution(unittest.TestCase): + def test_init(self): + dist = GvMDistribution(array([2.0]), array([1.0])) + self.assertEqual(dist.mu.shape, (1,)) + self.assertEqual(dist.kappa.shape, (1,)) + + def test_init_invalid_kappa(self): + with self.assertRaises(AssertionError): + GvMDistribution(array([2.0]), array([-1.0])) + + def test_init_shape_mismatch(self): + with self.assertRaises(AssertionError): + GvMDistribution(array([1.0, 2.0]), array([1.0])) + + def test_pdf_order1_matches_von_mises(self): + """Order-1 GvM with a single mu and kappa should match the von Mises PDF.""" + mu_val = 2.0 + kappa_val = 1.0 + gvm = GvMDistribution(array([mu_val]), array([kappa_val])) + vm = VonMisesDistribution(mu_val, kappa_val) + + xs = linspace(0.0, 2.0 * pi, 100) + npt.assert_array_almost_equal(gvm.pdf(xs), vm.pdf(xs), decimal=5) + + def test_pdf_non_negative(self): + gvm = GvMDistribution(array([1.0, 0.5]), array([2.0, 1.0])) + xs = linspace(0.0, 2.0 * pi, 50) + self.assertTrue((gvm.pdf(xs) >= 0).all()) + + def test_pdf_integrates_to_one(self): + """PDF should integrate to 1 over [0, 2*pi].""" + from scipy.integrate import quad + + gvm = GvMDistribution(array([1.0, 0.5]), array([2.0, 1.0])) + result, _ = quad(lambda x: float(gvm.pdf(array([x]))[0]), 0.0, 2.0 * float(pi)) + npt.assert_almost_equal(result, 1.0, decimal=5) + + def test_pdf_order2_specific_values(self): + """Test order-2 PDF at specific points, compared to manually computed values.""" + mu = array([2.0, 1.0]) + kappa = array([1.0, 0.5]) + gvm = GvMDistribution(mu, kappa) + + xs = linspace(0.0, 2.0 * pi, 5) + pdf_vals = gvm.pdf(xs) + # All values must be positive and finite + self.assertTrue(all(~isnan(pdf_vals) & ~isinf(pdf_vals))) + self.assertTrue(all(pdf_vals >= 0)) + + def test_norm_const_cached(self): + """Norm constant should be computed once and cached.""" + gvm = GvMDistribution(array([1.0]), array([1.0])) + c1 = gvm.norm_const + c2 = gvm.norm_const + self.assertEqual(c1, c2) + + +if __name__ == "__main__": + unittest.main()