Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyrecest/distributions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from .circle.sine_skewed_distributions import (
AbstractSineSkewedDistribution,
GeneralizedKSineSkewedVonMisesDistribution,
GSSVMDistribution,
SineSkewedVonMisesDistribution,
SineSkewedWrappedCauchyDistribution,
SineSkewedWrappedNormalDistribution,
Expand Down Expand Up @@ -268,6 +269,7 @@
"CustomCircularDistribution",
"AbstractSineSkewedDistribution",
"GeneralizedKSineSkewedVonMisesDistribution",
"GSSVMDistribution",
"SineSkewedVonMisesDistribution",
"SineSkewedWrappedCauchyDistribution",
"SineSkewedWrappedNormalDistribution",
Expand Down
45 changes: 33 additions & 12 deletions pyrecest/distributions/circle/sine_skewed_distributions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
# pylint: disable=no-name-in-module,no-member
from pyrecest.backend import mod, ndim, pi, sin
from scipy.special import ive # pylint: disable=no-name-in-module
from scipy.stats import vonmises

from .abstract_circular_distribution import AbstractCircularDistribution
from .von_mises_distribution import VonMisesDistribution
from .wrapped_cauchy_distribution import WrappedCauchyDistribution
from .wrapped_normal_distribution import WrappedNormalDistribution

Expand Down Expand Up @@ -40,7 +40,7 @@ def validate_parameters(self):
def pdf(self, xs):
# Evaluate the von Mises distribution and multiply by (1 + lambda_ * sin(xa - mu))
assert self.k == 1, "Currently, only k=1 is supported"
vm_pdf = vonmises.pdf(xs, self.kappa, loc=self.mu)
vm_pdf = VonMisesDistribution(self.mu, self.kappa).pdf(xs)
skew_factor = (1 + self.lambda_ * sin(self.k * (xs - self.mu))) ** self.m
if self.m == 1:
norm_const = 1
Expand Down Expand Up @@ -79,26 +79,47 @@ def __init__(self, mu, kappa, lambda_):
super().__init__(mu, kappa, lambda_, k=1, m=1)


class GSSVMDistribution(GeneralizedKSineSkewedVonMisesDistribution):
"""
Generalized Skew-Symmetric Von Mises (GSSVM) distribution.

Special case of GeneralizedKSineSkewedVonMisesDistribution with k=1 fixed.
Corresponds to GSSVMDistribution in libDirectional.

Parameters:
- mu (float): Mean direction of the distribution.
- kappa (float): Concentration parameter (non-negative).
- lambda_ (float): Skewness parameter, must be between -1 and 1 inclusive.
- n (int): Order/power of the sine skewing term, must be a positive integer.
"""

def __init__(self, mu, kappa, lambda_, n):
super().__init__(mu, kappa, lambda_, k=1, m=n)

@property
def n(self):
return self.m

def shift(self, shift_by):
if ndim(shift_by) != 0:
raise ValueError("angle must be a scalar")
return GSSVMDistribution(self.mu + shift_by, self.kappa, self.lambda_, self.n)


def bessel_ratio(p, z):
"""
Computes the ratio I_p(z) / I_p(0) in a numerically stable manner using
Computes the ratio I_p(z) / I_0(z) in a numerically stable manner using
exponentially scaled modified Bessel functions.

Parameters:
- p: Order of the Bessel function.
- z: Argument for the Bessel function.

Returns:
- The ratio of I_p(z) to I_p(0), calculated in a numerically stable way.
- The ratio I_p(z) / I_0(z), calculated in a numerically stable way.
"""
# Compute the scaled Bessel function values for both the numerator and denominator.
scaled_numerator = ive(p, z)
scaled_denominator = ive(p, 0)

# Since ive(p, z) = iv(p, z) * exp(-|z|), and ive(p, 0) = iv(p, 0),
# when we take the ratio, the exp(-|z|) terms cancel out for the ratio calculation.
# Therefore, the ratio of the scaled values directly gives us the ratio of the original Bessel functions.
return scaled_numerator / scaled_denominator
# ive(p, z) = iv(p, z) * exp(-|z|), so ive(p, z) / ive(0, z) = iv(p, z) / iv(0, z).
return ive(p, z) / ive(0, z)


class AbstractSineSkewedDistribution(AbstractCircularDistribution):
Expand Down
79 changes: 79 additions & 0 deletions pyrecest/tests/distributions/test_gssvm_distribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import unittest

import numpy.testing as npt
from pyrecest.backend import array, pi
from pyrecest.distributions.circle.sine_skewed_distributions import (
GSSVMDistribution,
GeneralizedKSineSkewedVonMisesDistribution,
)

class TestGSSVMDistribution(unittest.TestCase):
def test_initialization(self):
"""Test initialization with valid and invalid parameters."""
dist = GSSVMDistribution(mu=pi, kappa=1.0, lambda_=0.5, n=1)
npt.assert_allclose(dist.mu, pi, rtol=5e-7)
npt.assert_allclose(dist.kappa, 1.0, rtol=1e-7)
npt.assert_allclose(dist.lambda_, 0.5, rtol=1e-7)
self.assertEqual(dist.n, 1)
self.assertEqual(dist.k, 1)

def test_invalid_lambda(self):
"""lambda_ must be in [-1, 1]."""
with self.assertRaises(AssertionError):
GSSVMDistribution(mu=0.0, kappa=1.0, lambda_=1.5, n=1)

def test_invalid_n_zero(self):
"""n must be a positive integer."""
with self.assertRaises(AssertionError):
GSSVMDistribution(mu=0.0, kappa=1.0, lambda_=0.5, n=0)

def test_n_maps_to_m(self):
"""n property must equal the internal m parameter."""
for n in (1, 2, 3, 4):
dist = GSSVMDistribution(mu=0.0, kappa=1.0, lambda_=0.5, n=n)
self.assertEqual(dist.n, n)
self.assertEqual(dist.m, n)

def test_pdf_non_negative(self):
"""PDF values must be non-negative everywhere."""
xs = array([0.0, pi / 4, pi / 2, pi, 3 * pi / 2])
for n in (1, 2, 3, 4):
dist = GSSVMDistribution(mu=pi, kappa=1.0, lambda_=0.5, n=n)
vals = dist.pdf(xs)
self.assertTrue(all(vals >= 0), f"Negative PDF for n={n}")

def test_pdf_matches_generalized_parent(self):
"""GSSVMDistribution PDF must match GeneralizedKSineSkewedVonMisesDistribution with k=1."""
xs = array([0.0, pi / 4, pi / 2, pi, 3 * pi / 2])
mu, kappa, lambda_ = 0.5, 2.0, 0.3
for n in (1, 2, 3, 4):
gssvm = GSSVMDistribution(mu=mu, kappa=kappa, lambda_=lambda_, n=n)
parent = GeneralizedKSineSkewedVonMisesDistribution(
mu=mu, kappa=kappa, lambda_=lambda_, k=1, m=n
)
npt.assert_array_almost_equal(gssvm.pdf(xs), parent.pdf(xs))

def test_shift(self):
"""shift() must return a GSSVMDistribution with updated mu."""
dist = GSSVMDistribution(mu=0.0, kappa=1.0, lambda_=0.5, n=2)
shifted = dist.shift(array(pi / 2))
self.assertIsInstance(shifted, GSSVMDistribution)
npt.assert_allclose(float(shifted.mu), float(pi / 2), rtol=1e-7)
npt.assert_allclose(shifted.kappa, dist.kappa, rtol=1e-7)
npt.assert_allclose(shifted.lambda_, dist.lambda_, rtol=1e-7)
self.assertEqual(shifted.n, dist.n)

def test_mu_wrapped(self):
"""mu should be wrapped to [0, 2*pi)."""
dist = GSSVMDistribution(mu=3 * pi, kappa=1.0, lambda_=0.0, n=1)
npt.assert_allclose(float(dist.mu), float(pi), rtol=5e-7)

def test_n_unsupported_raises(self):
"""n > 4 is not yet implemented."""
dist = GSSVMDistribution(mu=0.0, kappa=1.0, lambda_=0.5, n=5)
with self.assertRaises(NotImplementedError):
dist.pdf(array(0.0))


if __name__ == "__main__":
unittest.main()
Loading