Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyrecest/distributions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
PartiallyWrappedNormalDistribution,
)
from .circle.abstract_circular_distribution import AbstractCircularDistribution
from .circle.generalized_von_mises_distribution import GvMDistribution
from .circle.circular_dirac_distribution import CircularDiracDistribution
from .circle.circular_fourier_distribution import CircularFourierDistribution
from .circle.circular_mixture import CircularMixture
Expand Down Expand Up @@ -222,6 +223,7 @@
]

__all__ = aliases + [
"GvMDistribution",
"GeneralizedKSineSkewedVonMisesDistribution",
"AbstractBoundedDomainDistribution",
"AbstractBoundedNonPeriodicDistribution",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# pylint: disable=no-name-in-module,no-member,redefined-builtin
from pyrecest.backend import arange, array, cos, exp, sum

from .abstract_circular_distribution import AbstractCircularDistribution


class GvMDistribution(AbstractCircularDistribution):
"""
Generalized von Mises distribution of arbitrary order.

See Riccardo Gatto, Sreenivasa Rao Jammalamadaka,
"The Generalized von Mises Distribution",
Statistical Methodology, 2007.

Parameters
----------
mu : array_like, shape (k,)
Location parameters.
kappa : array_like, shape (k,)
Concentration parameters, all must be positive.
"""

def __init__(self, mu, kappa):
mu = array(mu, dtype=float)
kappa = array(kappa, dtype=float)
assert mu.ndim == 1, "mu must be a 1D array"
assert mu.shape == kappa.shape, "mu and kappa must have the same shape"
assert (kappa > 0).all(), "all kappa values must be positive"
AbstractCircularDistribution.__init__(self)
self.mu = mu
self.kappa = kappa
self._norm_const = None

@property
def norm_const(self):
if self._norm_const is None:
self._norm_const = self.integrate_fun_over_domain(
lambda *args: self.pdf_unnormalized(array(args)), self.dim
)
return self._norm_const

def pdf_unnormalized(self, xs):
"""
Evaluate the unnormalized pdf at each point in xs.

Parameters
----------
xs : array_like, shape (n,)
Points where the pdf is evaluated.

Returns
-------
p : array, shape (n,)
Unnormalized pdf values.
"""
xs = array(xs)
# j = [1, 2, ..., k], shape (k,)
j = arange(1, self.mu.shape[0] + 1, dtype=float)
# Broadcast: (k, 1) * ((1, n) - (k, 1)) → (k, n)
arg = j[:, None] * (xs[None, :] - self.mu[:, None])
return exp(sum(self.kappa[:, None] * cos(arg), axis=0))

def pdf(self, xs):
"""
Evaluate the pdf at each point in xs.

Parameters
----------
xs : array_like, shape (n,)
Points where the pdf is evaluated.

Returns
-------
p : array, shape (n,)
Pdf values.
"""
return self.pdf_unnormalized(xs) / self.norm_const
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import unittest

import numpy.testing as npt

# pylint: disable=no-name-in-module,no-member,redefined-builtin
from pyrecest.backend import all, array, isinf, isnan, linspace, pi
from pyrecest.distributions import GvMDistribution, VonMisesDistribution


class TestGvMDistribution(unittest.TestCase):
def test_init(self):
dist = GvMDistribution(array([2.0]), array([1.0]))
self.assertEqual(dist.mu.shape, (1,))
self.assertEqual(dist.kappa.shape, (1,))

def test_init_invalid_kappa(self):
with self.assertRaises(AssertionError):
GvMDistribution(array([2.0]), array([-1.0]))

def test_init_shape_mismatch(self):
with self.assertRaises(AssertionError):
GvMDistribution(array([1.0, 2.0]), array([1.0]))

def test_pdf_order1_matches_von_mises(self):
"""Order-1 GvM with a single mu and kappa should match the von Mises PDF."""
mu_val = 2.0
kappa_val = 1.0
gvm = GvMDistribution(array([mu_val]), array([kappa_val]))
vm = VonMisesDistribution(mu_val, kappa_val)

xs = linspace(0.0, 2.0 * pi, 100)
npt.assert_array_almost_equal(gvm.pdf(xs), vm.pdf(xs), decimal=5)

def test_pdf_non_negative(self):
gvm = GvMDistribution(array([1.0, 0.5]), array([2.0, 1.0]))
xs = linspace(0.0, 2.0 * pi, 50)
self.assertTrue((gvm.pdf(xs) >= 0).all())

def test_pdf_integrates_to_one(self):
"""PDF should integrate to 1 over [0, 2*pi]."""
from scipy.integrate import quad

gvm = GvMDistribution(array([1.0, 0.5]), array([2.0, 1.0]))
result, _ = quad(lambda x: float(gvm.pdf(array([x]))[0]), 0.0, 2.0 * float(pi))
npt.assert_almost_equal(result, 1.0, decimal=5)

def test_pdf_order2_specific_values(self):
"""Test order-2 PDF at specific points, compared to manually computed values."""
mu = array([2.0, 1.0])
kappa = array([1.0, 0.5])
gvm = GvMDistribution(mu, kappa)

xs = linspace(0.0, 2.0 * pi, 5)
pdf_vals = gvm.pdf(xs)
# All values must be positive and finite
self.assertTrue(all(~isnan(pdf_vals) & ~isinf(pdf_vals)))
self.assertTrue(all(pdf_vals >= 0))

def test_norm_const_cached(self):
"""Norm constant should be computed once and cached."""
gvm = GvMDistribution(array([1.0]), array([1.0]))
c1 = gvm.norm_const
c2 = gvm.norm_const
self.assertEqual(c1, c2)


if __name__ == "__main__":
unittest.main()
Loading