Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import warnings

import scipy

# pylint: disable=redefined-builtin,no-name-in-module,no-member
Expand All @@ -14,6 +16,7 @@
full,
imag,
isnan,
zeros_like,
linalg,
pi,
real,
Expand All @@ -22,6 +25,10 @@
sin,
sqrt,
zeros,
cos,
sin,
meshgrid,
deg2rad,
)

# pylint: disable=E0611
Expand Down Expand Up @@ -190,3 +197,258 @@ def imag_part(phi, theta, n, m):
coeff_mat[n, m + n] = real_integral + 1j * imag_integral

return SphericalHarmonicsDistributionComplex(coeff_mat, transformation)

# ------------------------------------------------------------------
# pyshtools-based helper methods
# ------------------------------------------------------------------

@staticmethod
def _coeff_mat_to_pysh(coeff_mat, degree):
"""Convert our coeff_mat to a pyshtools SHComplexCoeffs object."""
import pyshtools as pysh # pylint: disable=import-error

clm = pysh.SHCoeffs.from_zeros(
degree, kind="complex", normalization="ortho", csphase=-1
)
for n in range(degree + 1):
for m in range(0, n + 1):
clm.coeffs[0, n, m] = coeff_mat[n, n + m]
for m in range(1, n + 1):
clm.coeffs[1, n, m] = coeff_mat[n, n - m]
return clm

@staticmethod
def _pysh_to_coeff_mat(clm, degree):
"""Convert a pyshtools SHComplexCoeffs object to our coeff_mat."""
coeff_mat = np.zeros((degree + 1, 2 * degree + 1), dtype=complex)
max_n = min(clm.lmax, degree)
for n in range(max_n + 1):
for m in range(0, n + 1):
coeff_mat[n, n + m] = clm.coeffs[0, n, m]
for m in range(1, n + 1):
coeff_mat[n, n - m] = clm.coeffs[1, n, m]
return coeff_mat

@staticmethod
def _get_dh_grid_cartesian(degree):
"""Return (x, y, z) flat arrays and grid_shape for the DH grid at *degree*."""
import pyshtools as pysh # pylint: disable=import-error

dummy = pysh.SHCoeffs.from_zeros(
degree, kind="complex", normalization="ortho", csphase=-1
)
grid = dummy.expand(grid="DH", extend=False)
lats, lons = grid.lats(), grid.lons()
lon_mesh, lat_mesh = meshgrid(lons, lats)
theta = deg2rad(90.0 - lat_mesh) # colatitude in radians
phi = deg2rad(lon_mesh) # azimuth in radians
x_c = sin(theta) * cos(phi)
y_c = sin(theta) * sin(phi)
z_c = cos(theta)
return x_c.ravel(), y_c.ravel(), z_c.ravel(), theta.shape

def _eval_on_grid(self, target_degree=None):
"""Evaluate this SHD on the DH grid at *target_degree* (defaults to own degree).

The DH grid is expanded at *target_degree* so that higher-frequency grids
can be used for intermediate computations (e.g. when squaring a degree-L
function that has degree 2L).
Returns a real 2-D numpy array of shape (nlat, nlon).
"""
import pyshtools as pysh # pylint: disable=import-error

degree = self.coeff_mat.shape[0] - 1
if target_degree is None:
target_degree = degree

# Pad our coefficients into a pysh object at target_degree
clm_full = pysh.SHCoeffs.from_zeros(
target_degree, kind="complex", normalization="ortho", csphase=-1
)
min_deg = min(degree, target_degree)
clm_own = self._coeff_mat_to_pysh(self.coeff_mat, min_deg)
clm_full.coeffs[0, : min_deg + 1, : min_deg + 1] = clm_own.coeffs[
0, : min_deg + 1, : min_deg + 1
]
clm_full.coeffs[1, : min_deg + 1, : min_deg + 1] = clm_own.coeffs[
1, : min_deg + 1, : min_deg + 1
]
grid = clm_full.expand(grid="DH", extend=False)
return grid.data.real

@staticmethod
def _fit_from_grid(grid_vals_real, degree, transformation):
"""Fit SH coefficients to real-valued grid values on a DH grid.

*grid_vals_real* is a 2D numpy array with shape matching the DH grid for
*degree*. Returns a new :class:`SphericalHarmonicsDistributionComplex`.
"""
import pyshtools as pysh # pylint: disable=import-error

grid_obj = pysh.SHGrid.from_array(
grid_vals_real.astype(complex), grid="DH"
)
clm = grid_obj.expand(lmax_calc=degree, normalization="ortho", csphase=-1)
coeff_mat = SphericalHarmonicsDistributionComplex._pysh_to_coeff_mat(clm, degree)
shd = SphericalHarmonicsDistributionComplex(coeff_mat, transformation)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
shd.normalize_in_place()
return shd

# ------------------------------------------------------------------
# Public numerical methods
# ------------------------------------------------------------------

@staticmethod
def from_distribution_numerical_fast(dist, degree, transformation="identity"):
"""Approximate *dist* by a degree-*degree* SHD using a DH grid.

This is faster than :meth:`from_distribution_via_integral` because it
uses the discrete spherical-harmonic transform instead of numerical
quadrature.
"""
if transformation not in ("identity", "sqrt"):
raise ValueError(f"Unsupported transformation: '{transformation}'")

x_c, y_c, z_c, grid_shape = (
SphericalHarmonicsDistributionComplex._get_dh_grid_cartesian(degree)
)
xs = column_stack([x_c, y_c, z_c])
fvals = array(dist.pdf(xs), dtype=float).reshape(grid_shape)

if transformation == "sqrt":
fvals = sqrt(max(fvals, 0.0))

return SphericalHarmonicsDistributionComplex._fit_from_grid(
fvals, degree, transformation
)

def convolve(self, other):
"""Spherical convolution with a *zonal* distribution *other*.

For the ``'identity'`` transformation the standard frequency-domain
formula is used (exact for bandlimited functions). For the ``'sqrt'``
transformation a grid-based approach with a 2× finer intermediate grid
is used so that squaring the sqrt functions introduces no aliasing.
"""
assert isinstance(
other, SphericalHarmonicsDistributionComplex
), "other must be a SphericalHarmonicsDistributionComplex"

degree = self.coeff_mat.shape[0] - 1

if self.transformation == "identity" and other.transformation == "identity":
# Direct frequency-domain formula: h_{l,m} = sqrt(4π/(2l+1)) * f_{l,m} * g_{l,0}
h_lm = zeros_like(self.coeff_mat)
for l in range(degree + 1):
factor = (
sqrt(4.0 * pi / (2 * l + 1))
* other.coeff_mat[l, l]
)
for m in range(-l, l + 1):
h_lm[l, l + m] = factor * self.coeff_mat[l, l + m]
result = SphericalHarmonicsDistributionComplex(h_lm, "identity")
with warnings.catch_warnings():
warnings.simplefilter("ignore")
result.normalize_in_place()
return result

if self.transformation == "sqrt" and other.transformation == "sqrt":
# Use a grid twice as fine to avoid aliasing when squaring.
degree_fine = 2 * degree

# Recover p = f^2 and q = g^2 on the fine grid, then SHT to degree.
f_grid = self._eval_on_grid(target_degree=degree_fine)
p_grid = f_grid**2

g_grid = other._eval_on_grid(target_degree=degree_fine)
q_grid = g_grid**2

import pyshtools as pysh # pylint: disable=import-error

def _grid_to_coeff(grid_vals):
grid_obj = pysh.SHGrid.from_array(
grid_vals.astype(complex), grid="DH"
)
clm = grid_obj.expand(
lmax_calc=degree, normalization="ortho", csphase=-1
)
return self._pysh_to_coeff_mat(clm, degree)

p_lm = _grid_to_coeff(p_grid)
q_lm = _grid_to_coeff(q_grid)

# Convolution formula on the identity coefficients
r_lm = zeros_like(p_lm)
for l in range(degree + 1):
factor = sqrt(4.0 * pi / (2 * l + 1)) * q_lm[l, l]
for m in range(-l, l + 1):
r_lm[l, l + m] = factor * p_lm[l, l + m]

# Evaluate r on the standard DH grid, take sqrt, refit
r_shd_id = SphericalHarmonicsDistributionComplex(r_lm, "identity")
r_grid = r_shd_id._eval_on_grid()
sqrt_r_grid = sqrt(max(r_grid, 0.0))

return SphericalHarmonicsDistributionComplex._fit_from_grid(
sqrt_r_grid, degree, "sqrt"
)

raise ValueError(
"convolve: mixed transformations are not supported. "
"Both self and other must use the same transformation."
)

def multiply(self, other):
"""Pointwise multiplication in physical space (Bayesian update step).

Works for both ``'identity'`` and ``'sqrt'`` transformations.
For ``'sqrt'``: ``sqrt(p) * sqrt(q) = sqrt(p*q)`` which is the correct
sqrt-transformed product density.
"""
assert isinstance(
other, SphericalHarmonicsDistributionComplex
), "other must be a SphericalHarmonicsDistributionComplex"
assert self.transformation == other.transformation, (
"multiply: both distributions must use the same transformation"
)

degree = self.coeff_mat.shape[0] - 1

f_grid = self._eval_on_grid()
g_grid = other._eval_on_grid()

h_grid = f_grid * g_grid

return SphericalHarmonicsDistributionComplex._fit_from_grid(
h_grid, degree, self.transformation
)

def rotate(self, alpha, beta, gamma):
"""Rotate the distribution by ZYZ Euler angles (in radians).

Parameters
----------
alpha : float
First rotation angle around Z (radians).
beta : float
Second rotation angle around Y (radians).
gamma : float
Third rotation angle around Z (radians).
"""
import pyshtools as pysh # pylint: disable=import-error

degree = self.coeff_mat.shape[0] - 1
clm = self._coeff_mat_to_pysh(self.coeff_mat, degree)
clm_rot = clm.rotate(
np.degrees(alpha),
np.degrees(beta),
np.degrees(gamma),
degrees=True,
body=True,
)
coeff_mat_rot = self._pysh_to_coeff_mat(clm_rot, degree)
return SphericalHarmonicsDistributionComplex(
coeff_mat_rot, self.transformation
)
2 changes: 2 additions & 0 deletions pyrecest/filters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .hypertoroidal_particle_filter import HypertoroidalParticleFilter
from .kalman_filter import KalmanFilter
from .manifold_mixins import EuclideanFilterMixin, HypertoroidalFilterMixin
from .von_mises_fisher_filter import VonMisesFisherFilter

__all__ = [
"AbstractFilter",
Expand All @@ -13,4 +14,5 @@
"HypertoroidalParticleFilter",
"KalmanFilter",
"EuclideanParticleFilter",
"VonMisesFisherFilter",
]
Loading
Loading