diff --git a/pyrecest/filters/__init__.py b/pyrecest/filters/__init__.py index 914306392..7fa93cb69 100644 --- a/pyrecest/filters/__init__.py +++ b/pyrecest/filters/__init__.py @@ -1,5 +1,6 @@ from .abstract_filter import AbstractFilter from .abstract_particle_filter import AbstractParticleFilter +from .circular_ukf import CircularUKF from .euclidean_particle_filter import EuclideanParticleFilter from .hypertoroidal_particle_filter import HypertoroidalParticleFilter from .kalman_filter import KalmanFilter @@ -7,6 +8,7 @@ __all__ = [ "AbstractFilter", + "CircularUKF", "EuclideanFilterMixin", "HypertoroidalFilterMixin", "AbstractParticleFilter", diff --git a/pyrecest/filters/circular_ukf.py b/pyrecest/filters/circular_ukf.py new file mode 100644 index 000000000..fad9e173d --- /dev/null +++ b/pyrecest/filters/circular_ukf.py @@ -0,0 +1,271 @@ +""" +A modified unscented Kalman filter for circular distributions, +interprets circle as 1D interval [0, 2*pi). + +References: + Gerhard Kurz, Igor Gilitschenski, Uwe D. Hanebeck, + Recursive Bayesian Filtering in Circular State Spaces + arXiv preprint: Systems and Control (cs.SY), January 2015. +""" + +# pylint: disable=no-name-in-module,no-member +import pyrecest.backend +from bayesian_filters.kalman import MerweScaledSigmaPoints, UnscentedKalmanFilter + +# pylint: disable=no-name-in-module,no-member +from pyrecest.backend import array, atleast_1d, mod, pi, sign +from pyrecest.distributions import GaussianDistribution + +from .abstract_filter import AbstractFilter +from .manifold_mixins import CircularFilterMixin + + +def _make_ukf( # pylint: disable=too-many-arguments,too-many-positional-arguments + fx, hx, dim_z, x0, P0, Q, R, alpha=1e-3, beta=2.0, kappa=0.0 +): + """Helper to build a UnscentedKalmanFilter from bayesian_filters. + + Parameters + ---------- + R: + Measurement noise covariance matrix of shape (dim_z, dim_z). + alpha, beta, kappa: + Sigma-point parameters for :class:`MerweScaledSigmaPoints`. + """ + points = MerweScaledSigmaPoints(n=1, alpha=alpha, beta=beta, kappa=kappa) + ukf = UnscentedKalmanFilter( + dim_x=1, + dim_z=dim_z, + dt=1.0, + hx=hx, + fx=fx, + points=points, + ) + ukf.x = array([x0]) + ukf.P = array([[P0]]) + ukf.Q = array([[Q]]) + ukf.R = R # R must already be a (dim_z, dim_z) array + return ukf + + +class CircularUKF(AbstractFilter, CircularFilterMixin): + """ + A modified unscented Kalman filter for circular distributions. + + The state is represented as a 1-D :class:`GaussianDistribution` on the + circle [0, 2*pi). + """ + + def __init__(self, alpha: float = 1e-3, beta: float = 2.0, kappa: float = 0.0): + """Initialise with a standard Gaussian at 0 with unit variance. + + Parameters + ---------- + alpha: + UKF sigma-point spread parameter (default 1e-3). + beta: + UKF prior distribution parameter (default 2.0, optimal for Gaussian). + kappa: + UKF secondary scaling parameter (default 0.0). + """ + self._alpha = alpha + self._beta = beta + self._kappa = kappa + initial_state = GaussianDistribution( + array([0.0]), array([[1.0]]) + ) + CircularFilterMixin.__init__(self) + AbstractFilter.__init__(self, initial_state) + + # ------------------------------------------------------------------ + # filter_state property + # ------------------------------------------------------------------ + + @property + def filter_state(self) -> GaussianDistribution: + return self._filter_state + + @filter_state.setter + def filter_state(self, new_state): + if not isinstance(new_state, GaussianDistribution): + new_state = GaussianDistribution.from_distribution(new_state) + assert new_state.dim == 1, "CircularUKF only supports 1-D state" + self._filter_state = new_state + + # ------------------------------------------------------------------ + # Prediction + # ------------------------------------------------------------------ + + def predict_identity(self, gauss_sys: GaussianDistribution): + """ + Predict assuming identity system model: + x(k+1) = x(k) + w(k) mod 2*pi + where *w(k)* is additive noise described by *gauss_sys*. + + Parameters + ---------- + gauss_sys: + Distribution of additive system noise (converted to + :class:`GaussianDistribution` if necessary). + """ + if not isinstance(gauss_sys, GaussianDistribution): + gauss_sys = GaussianDistribution.from_distribution(gauss_sys) + new_mu = mod(self._filter_state.mu + gauss_sys.mu, 2.0 * pi) + new_C = self._filter_state.C + gauss_sys.C + self._filter_state = GaussianDistribution(new_mu, new_C) + + def predict_nonlinear(self, f, gauss_sys: GaussianDistribution): + """ + Predict assuming a nonlinear system model: + x(k+1) = f(x(k)) + w(k) mod 2*pi + where *w(k)* is additive noise described by *gauss_sys*. + + Parameters + ---------- + f: + Function from [0, 2*pi) to [0, 2*pi). + gauss_sys: + Distribution of additive system noise. + """ + if not isinstance(gauss_sys, GaussianDistribution): + gauss_sys = GaussianDistribution.from_distribution(gauss_sys) + + assert ( + pyrecest.backend.__backend_name__ != "pytorch" + ), "Not supported on this backend" + + mu0 = float(self._filter_state.mu[0]) + C0 = float(self._filter_state.C[0, 0]) + Q_val = float(gauss_sys.C[0, 0]) + noise_mean = float(gauss_sys.mu[0]) + + def fx(x, dt): # pylint: disable=unused-argument + return array([f(x.flatten()[0]) + noise_mean]) + + def hx(x): + return x + + ukf = _make_ukf( + fx, hx, dim_z=1, x0=mu0, P0=C0, Q=Q_val, R=array([[C0]]), + alpha=self._alpha, beta=self._beta, kappa=self._kappa, + ) + ukf.predict() + + new_mu = mod(array([ukf.x.flatten()[0]]), 2.0 * pi) + new_C = array([[ukf.P[0, 0]]]) + self._filter_state = GaussianDistribution(new_mu, new_C) + + # ------------------------------------------------------------------ + # Update + # ------------------------------------------------------------------ + + def update_identity(self, gauss_meas: GaussianDistribution, z): + """ + Update assuming identity measurement model: + z(k) = x(k) + v(k) mod 2*pi + where *v(k)* is additive noise described by *gauss_meas*. + + Parameters + ---------- + gauss_meas: + Distribution of additive measurement noise. + z: + Scalar measurement in [0, 2*pi). + """ + if not isinstance(gauss_meas, GaussianDistribution): + gauss_meas = GaussianDistribution.from_distribution(gauss_meas) + + z_val = float(array(z).flatten()[0]) + # Shift measurement by noise mean + z_val = float(mod(array([z_val - float(gauss_meas.mu[0])]), 2.0 * pi)[0]) + + mu = float(self._filter_state.mu[0]) + # Move measurement if necessary (wrap to be nearest to current mean) + if abs(mu - z_val) > float(pi): + z_val = z_val + 2.0 * float(pi) * sign(mu - z_val) + + C = float(self._filter_state.C[0, 0]) + R = float(gauss_meas.C[0, 0]) + + # Kalman update + K = C / (C + R) + new_mu = mu + K * (z_val - mu) + new_C = (1.0 - K) * C + + new_mu = float(mod(array([new_mu]), 2.0 * pi)[0]) + self._filter_state = GaussianDistribution( + array([new_mu]), array([[new_C]]) + ) + + def update_nonlinear( + self, f, gauss_meas: GaussianDistribution, z, measurement_periodic: bool = False + ): + """ + Update assuming a nonlinear measurement model: + z(k) = f(x(k)) + v(k) (if *measurement_periodic* is False) + z(k) = f(x(k)) + v(k) mod 2*pi (if *measurement_periodic* is True) + where *v(k)* is additive noise described by *gauss_meas*. + + Parameters + ---------- + f: + Measurement function from [0, 2*pi) to R^d. + gauss_meas: + Distribution of additive measurement noise. + z: + Measurement vector of shape (d,). + measurement_periodic: + Whether the measurement is a periodic quantity. + """ + if not isinstance(gauss_meas, GaussianDistribution): + gauss_meas = GaussianDistribution.from_distribution(gauss_meas) + + assert ( + pyrecest.backend.__backend_name__ != "pytorch" + ), "Not supported on this backend" + + z = atleast_1d(array(z, dtype=float)) + dim_z = len(z.flatten()) + + mu0 = float(self._filter_state.mu[0]) + C0 = float(self._filter_state.C[0, 0]) + pi_val = float(pi) + + if measurement_periodic: + for i in range(dim_z): + z_i = float(z[i]) + if abs(mu0 - z_i) > pi_val: + z[i] = z_i + 2.0 * pi_val * float(sign(mu0 - z_i)) + + if dim_z == 1: + R_mat = array([[float(gauss_meas.C.flatten()[0])]]) + else: + R_mat = array(gauss_meas.C, dtype=float).reshape(dim_z, dim_z) + + def fx(x, dt): # pylint: disable=unused-argument + return x + + def hx(x): + return atleast_1d(array([f(x.flatten()[0])], dtype=float)) + + ukf = _make_ukf( + fx, hx, dim_z=dim_z, x0=mu0, P0=C0, Q=0.0, R=R_mat, + alpha=self._alpha, beta=self._beta, kappa=self._kappa, + ) + # predict() with identity fx and Q=0 populates sigmas_f without + # altering the mean or covariance, which is required before update(). + ukf.predict() + ukf.update(z) + + new_mu = float(mod(array([ukf.x.flatten()[0]]), 2.0 * pi)[0]) + self._filter_state = GaussianDistribution( + array([new_mu]), array([[ukf.P[0, 0]]]) + ) + + # ------------------------------------------------------------------ + # Point estimate + # ------------------------------------------------------------------ + + def get_point_estimate(self): + """Return the mean of the current state estimate.""" + return self._filter_state.mu diff --git a/pyrecest/tests/filters/test_circular_ukf.py b/pyrecest/tests/filters/test_circular_ukf.py new file mode 100644 index 000000000..95c258092 --- /dev/null +++ b/pyrecest/tests/filters/test_circular_ukf.py @@ -0,0 +1,128 @@ +import unittest + +import numpy.testing as npt + +# pylint: disable=no-name-in-module,no-member +import pyrecest.backend + +# pylint: disable=no-name-in-module,no-member +from pyrecest.backend import array, pi +from pyrecest.distributions import GaussianDistribution +from pyrecest.filters.circular_ukf import CircularUKF + + +class CircularUKFTest(unittest.TestCase): + def setUp(self): + self.filter = CircularUKF() + self.g = GaussianDistribution(array([0.5]), array([[0.7]])) + + def test_initialization(self): + self.filter.filter_state = self.g + g1 = self.filter.filter_state + self.assertIsInstance(g1, GaussianDistribution) + npt.assert_equal(self.g.mu, g1.mu) + npt.assert_equal(self.g.C, g1.C) + + def test_predict_identity(self): + self.filter.filter_state = self.g + self.filter.predict_identity(self.g) + g_identity = self.filter.filter_state + self.assertIsInstance(g_identity, GaussianDistribution) + npt.assert_almost_equal(g_identity.mu, self.g.mu + self.g.mu) + npt.assert_almost_equal(g_identity.C, self.g.C + self.g.C) + + @unittest.skipIf( + pyrecest.backend.__backend_name__ == "pytorch", + reason="Not supported on this backend", + ) + def test_predict_nonlinear_identity_function(self): + self.filter.filter_state = self.g + self.filter.predict_nonlinear(lambda x: x, self.g) + g_nonlin = self.filter.filter_state + self.assertIsInstance(g_nonlin, GaussianDistribution) + npt.assert_almost_equal(g_nonlin.mu, self.g.mu + self.g.mu, decimal=10) + npt.assert_almost_equal(g_nonlin.C, self.g.C + self.g.C, decimal=10) + + @unittest.skipIf( + pyrecest.backend.__backend_name__ == "pytorch", + reason="Not supported on this backend", + ) + def test_predict_nonlinear_true_nonlinear(self): + g4 = GaussianDistribution(array([0.0]), array([[0.7]])) + self.filter.filter_state = g4 + self.filter.predict_nonlinear(lambda x: x**3, g4) + g_nonlin = self.filter.filter_state + # 0.0 and 2*pi are the same angle; compare via circular distance + mu_diff = (float(g_nonlin.mu[0]) - float(g4.mu[0])) % (2.0 * float(pi)) + if mu_diff > float(pi): + mu_diff -= 2.0 * float(pi) + self.assertAlmostEqual(mu_diff, 0.0, places=10) + self.assertGreater(float(g_nonlin.C[0, 0]), float(g4.C[0, 0])) + + def test_update_identity(self): + self.filter.filter_state = self.g + self.filter.update_identity( + GaussianDistribution(array([0.0]), self.g.C), self.g.mu + ) + g_identity = self.filter.filter_state + self.assertIsInstance(g_identity, GaussianDistribution) + npt.assert_almost_equal(g_identity.mu, self.g.mu) + npt.assert_almost_equal(g_identity.C, self.g.C / 2.0) + + def test_update_identity_nonzero_noise_mean(self): + self.filter.filter_state = self.g + self.filter.update_identity( + GaussianDistribution(array([2.0]), self.g.C), self.g.mu + array([2.0]) + ) + g_identity2 = self.filter.filter_state + self.assertIsInstance(g_identity2, GaussianDistribution) + npt.assert_almost_equal(g_identity2.mu, self.g.mu) + npt.assert_almost_equal(g_identity2.C, self.g.C / 2.0) + + def test_update_identity_different_measurement(self): + self.filter.filter_state = self.g + z = 2.0 * pi - 1.0 + self.filter.update_identity(GaussianDistribution(array([0.0]), self.g.C), z) + g_identity3 = self.filter.filter_state + self.assertIsInstance(g_identity3, GaussianDistribution) + self.assertGreater(float(g_identity3.mu[0]), z) + self.assertLess(float(g_identity3.mu[0]), 2.0 * pi) + self.assertGreater(float(self.g.C[0, 0]), float(g_identity3.C[0, 0])) + + @unittest.skipIf( + pyrecest.backend.__backend_name__ == "pytorch", + reason="Not supported on this backend", + ) + def test_update_nonlinear_identity_function(self): + g7 = GaussianDistribution(array([0.0]), array([[0.7]])) + self.filter.filter_state = g7 + z = array([0.4]) + self.filter.update_nonlinear(lambda x: x, g7, z) + g8 = self.filter.filter_state + self.assertGreater(float(g8.mu[0]), float(g7.mu[0])) + self.assertLess(float(g8.mu[0]), float(z[0])) + self.assertGreater(float(g7.C[0, 0]), float(g8.C[0, 0])) + + @unittest.skipIf( + pyrecest.backend.__backend_name__ == "pytorch", + reason="Not supported on this backend", + ) + def test_update_nonlinear_periodic_measurement(self): + g9 = GaussianDistribution(array([0.1]), array([[0.7]])) + self.filter.filter_state = g9 + z = array([2.0 * pi - 0.4]) + g7 = GaussianDistribution(array([0.0]), array([[0.7]])) + self.filter.update_nonlinear(lambda x: x, g7, z, measurement_periodic=True) + g10 = self.filter.filter_state + self.assertGreater(float(g10.mu[0]), float(z[0])) + self.assertLess(float(g10.mu[0]), 2.0 * pi) + self.assertGreater(float(g7.C[0, 0]), float(g10.C[0, 0])) + + def test_get_point_estimate(self): + self.filter.filter_state = self.g + estimate = self.filter.get_point_estimate() + npt.assert_equal(estimate, self.g.mu) + + +if __name__ == "__main__": + unittest.main()