"""Module defining halo model components for halo exclusion."""
from __future__ import annotations
import warnings
from functools import cached_property
import numpy as np
from hmf import Component
from hmf._internals import pluggable
from scipy import integrate as intg
from scipy.interpolate import InterpolatedUnivariateSpline as _IUS
try:
from numba import jit
USE_NUMBA = True
except ImportError: # pragma: no cover
USE_NUMBA = False
warnings.warn(
"Warning: Some Halo-Exclusion models have significant speedup when using Numba",
stacklevel=2,
)
# ===============================================================================
# UTILITIES
# ===============================================================================
[docs]
def outer(a, b):
r"""Calculate the outer product of two vectors."""
return np.outer(a, b).reshape(a.shape + b.shape)
[docs]
def dbltrapz(X, dx, dy=None):
"""Double-integral over the last two dimensions of X using trapezoidal rule."""
dy = dy or dx
out = X.copy()
out[..., 1:-1, :] *= 2
out[..., :, 1:-1] *= 2
return dx * dy * np.sum(out, axis=(-2, -1)) / 4.0
[docs]
def makeW(nx, ny):
r"""Return a window matrix for double-intergral."""
W = np.ones((nx, ny))
W[1 : nx - 1 : 2, :] *= 4
W[:, 1 : ny - 1 : 2] *= 4
W[2 : nx - 1 : 2, :] *= 2
W[:, 2 : ny - 1 : 2] *= 2
return W
if USE_NUMBA:
[docs]
@jit(nopython=True)
def dblsimps_(X, dx, dy): # pragma: no cover
"""Double-integral of X **FOR SYMMETRIC FUNCTIONS**."""
nx = X.shape[-2]
ny = X.shape[-1]
W = makeW_(nx, ny) # only upper
tot = np.zeros_like(X[..., 0, 0])
for ix in range(nx):
tot += W[ix, ix] * X[..., ix, ix]
for iy in range(ix + 1, ny):
tot += 2 * W[ix, iy] * X[..., ix, iy]
return dx * dy * tot / 9.0
[docs]
@jit(nopython=True)
def makeW_(nx, ny): # pragma: no cover
r"""Return a window matrix for symmetric double-intergral."""
W = np.ones((nx, ny))
if nx % 2 == 0:
for ix in range(1, nx - 2, 2):
W[ix, -1] *= 4
W[-1, ix] *= 4
for iy in range(ny - 1):
W[ix, iy] *= 4
W[iy, ix] *= 4
for ix in range(2, nx - 2, 2):
W[ix, -1] *= 2
W[-1, ix] *= 2
for iy in range(ny - 1):
W[ix, iy] *= 2
W[iy, ix] *= 2
for ix in range(nx):
W[ix, -2] *= 2.5
W[ix, -1] *= 1.5
W[-2, ix] *= 2.5
W[-1, ix] *= 1.5
else:
for ix in range(1, nx - 1, 2):
for iy in range(ny):
W[ix, iy] *= 4
W[iy, ix] *= 4
for ix in range(2, nx - 1, 2):
for iy in range(ny):
W[ix, iy] *= 2
W[iy, ix] *= 2
return W
[docs]
@jit(nopython=True)
def makeH_(nx, ny): # pragma: no cover
"""Return the window matrix for trapezoidal intergral."""
H = np.ones((nx, ny))
for ix in range(1, nx - 1):
for iy in range(ny):
H[ix, iy] *= 2
H[iy, ix] *= 2
return H
[docs]
@jit(nopython=True)
def dbltrapz_(X, dx, dy): # pragma: no cover
"""Double-integral of X for the trapezoidal method."""
nx = X.shape[-2]
ny = X.shape[-1]
H = makeH_(nx, ny)
tot = np.zeros_like(X[..., 0, 0])
for ix in range(nx):
tot += H[ix, ix] * X[ix, ix]
for iy in range(ix + 1, ny):
tot += 2 * H[ix, iy] * X[ix, iy]
return dx * dy * tot / 4.0
# ===============================================================================
# Halo-Exclusion Models
# ===============================================================================
[docs]
@pluggable
class Exclusion(Component):
"""
Base class for exclusion models.
All models will need to perform single or double integrals over
arrays that may have an extra two dimensions. The maximum possible
size is k*r*m*m, which for normal values of the vectors equates to
~ 1000*50*500*500 = 12,500,000,000 values, which in 64-bit reals is
1e11 bytes = 100GB. We thus limit this to a maximum of either k*r*m
or r*m*m, both of which should be less than a GB of memory.
It is possibly better to limit it to k*r or m*m, which should be quite
memory efficient, but then without accelerators (ie. Numba), these
will be very slow.
Parameters
----------
m : np.ndarray
1D vector of halo masses in Msun/h.
density : np.ndarray
Either the number or mass density of the quantity under consideration (i.e.
mass density for the matter field, number density for galaxy/halo fields).
This quantity should be _normalized_ to integrate to unity over the full
mass range in `m`. Thus, for matter, it should be ``n(m)*m / rhobar``, and
for galaxies, it should be ``n(m)*N(m) / nbar_g``.
power_integrand
An array of shape ``(len(k), len(m))`` defining the integrand of the power
spectrum integral (see Eq. 13 of https://arxiv.org/pdf/2009.14066). For the
matter field, this should be ``n(m) * u(k,m) * m / rhobar``. At large k, it
should integrate to unity over mass.
bias : np.ndarray
Either a 1D vector (length m) or 2D array (shape ``(len(r), len(m))``) defining
the halo bias.
r : np.ndarray
A vector of scales, `r` in Mpc/h.
halo_density : float
The density of the halos whose masses are given by ``m``. This is typically
``halo_overdensity * mean_density0``.
"""
def __init__(
self,
m: np.ndarray,
density: np.ndarray,
power_integrand: np.ndarray,
bias: np.ndarray,
r: np.ndarray,
halo_density: float,
xmin: float | None = None,
):
self.density = density # 1d, (m)
self.m = m # 1d, (m)
self.power_integrand = power_integrand # 2d, (k,m)
self.bias = bias # 1d (m) or 2d (r,m)
self.r = r # 1d (r)
self.halo_density = halo_density
self.dlnx = np.log(m[1] / m[0])
# xmin: smooth lower mass bound for integration (None = integrate over full m)
self.xmin = xmin if (xmin is not None and xmin > m[0]) else None
[docs]
def raw_integrand(self) -> np.ndarray:
"""Compute the full power spectrum integrand.
The output is always a 3D array, with shape ``(r, k, m)``.
"""
if self.bias.ndim == 1:
# *m since integrating in logspace
return outer(np.ones_like(self.r), self.power_integrand * self.bias * self.m)
else:
return np.einsum("ij,kj->kij", self.power_integrand * self.m, self.bias)
def _spline_integrate(self, arr: np.ndarray) -> np.ndarray:
"""Integrate ``arr`` over log-mass (last axis) using a smooth lower bound.
When :attr:`xmin` is set, a cubic spline is fitted and integrated from
``xmin`` to ``m[-1]`` so that the result varies smoothly as ``xmin``
changes, even when ``xmin`` lies between two mass grid points.
Falls back to :func:`scipy.integrate.simpson` over the full grid when
``xmin`` is ``None``.
Parameters
----------
arr:
Array whose *last* axis corresponds to the mass grid ``self.m``.
Returns
-------
np.ndarray
The integral with one fewer dimension (mass axis integrated out).
"""
if self.xmin is None:
return intg.simpson(arr, dx=self.dlnx)
lnm = np.log(self.m)
lnxmin = np.log(self.xmin)
return np.apply_along_axis(
lambda f: _IUS(lnm, f).integral(lnxmin, lnm[-1]),
-1,
arr,
)
[docs]
def integrate(self) -> np.ndarray:
"""
Integrate the :meth:`raw_integrand` over mass.
This should pass back whatever is multiplied by P_m(k) to get the two-halo
term. Often this will be a square of an integral, sometimes a Double-integral.
The result should be a 2D array of shape ``(r, k)``.
"""
[docs]
@cached_property
def density_mod(self):
r"""The modified integrated density with halo exclusion.
Calculated in the matter case by
.. math:: \bar{n}^{-1} \sqrt{\int_0^{m'_1} \int_0^{m'_2} n(m_1) m_1 n(m_2) m_2 dm_1 dm_2},
and in the tracer case by replacing ``n(m).m`` with ``n(m) N(m)``.
See Eq. 47 of https://arxiv.org/pdf/2009.14066.
The array is a vector of length ``r``.
"""
return 1
@property
def r_halo(self):
"""The virial radius of the halo."""
return (3 * self.m / (4 * np.pi * self.halo_density)) ** (1.0 / 3.0)
[docs]
class NoExclusion(Exclusion):
r"""A model where there's no halo exclusion."""
[docs]
def integrate(self) -> np.ndarray:
"""Integrate the :meth:`raw_integrand` over mass.
Returns
-------
np.ndarray
An array of shape ``(1, k)`` when bias is 1D (no r-dependence),
or ``(r, k)`` when bias is 2D, to be multiplied by P_m(k) to obtain
the 2-halo power spectrum.
Notes
-----
When bias is 1D the integrand does not depend on ``r``. In that case this
method avoids allocating the full ``(r, k, m)`` intermediate array (which
can be several gigabytes for typical grid sizes) by integrating directly
over the ``(k, m)`` plane and returning a ``(1, k)`` result.
"""
if self.bias.ndim == 1:
# No r-dependence: integrate (k, m) directly to avoid the O(r*k*m)
# intermediate array that raw_integrand() would otherwise create.
integrand = self.power_integrand * self.bias * self.m # (k, m)
return self._spline_integrate(integrand)[np.newaxis, :] ** 2 # (1, k)
return self._spline_integrate(self.raw_integrand()) ** 2
[docs]
class Sphere(Exclusion):
r"""Spherical halo exclusion model.
Only halo pairs where the virial radius of
either halo is smaller than half of the seperation, i.e.:
.. math:: R_{\rm vir} \le r/2
will be accounted for.
"""
[docs]
@cached_property
def density_mod(self) -> np.ndarray:
"""The modified density after accounting for different integral mass limits.
Returns
-------
np.ndarray
The modified density as a function of the scale, r.
"""
density = np.outer(np.ones_like(self.r), self.density * self.m)
density[self.mask] = 0
return self._spline_integrate(density)
[docs]
@cached_property
def mask(self):
"""Elements that should be set to zero. Shape (r, m)."""
return (np.outer(self.m, np.ones_like(self.r)) > self.mlim).T
@property
def mlim(self):
"""The mass threshold for the mask."""
return 4 * np.pi * (self.r / 2) ** 3 * self.halo_density / 3
[docs]
def integrate(self):
"""Integrate the :meth:`raw_integrand` over mass under new mass limits.
Returns
-------
np.ndarray
An array of shape ``(r, k)`` that should be multiplied by P_m(k) to obtain
the 2-halo power spectrum.
"""
integ = self.raw_integrand() # r,k,m
integ.transpose((1, 0, 2))[:, self.mask] = 0
return self._spline_integrate(integ) ** 2
[docs]
class DblSphere(Sphere):
r"""Double Sphere model of halo exclusion.
Only halo pairs for which the sum of virial radii
is smaller than the separation, i.e.:
.. math:: R_{\rm vir,1}+R_{\rm vir,2} \le r
will be accounted for.
"""
[docs]
@cached_property
def mask(self):
"""Elements that should be set to zero (r,m,m)."""
rvir = self.r_halo
return (outer(np.add.outer(rvir, rvir), np.ones_like(self.r)) > self.r).T
[docs]
@cached_property
def density_mod(self):
"""The modified density, under new limits."""
out = np.zeros_like(self.r)
for i, _ in enumerate(self.r):
integrand = np.outer(self.density * self.m, self.density * self.m)
integrand[self.mask[i]] = 0
inner = self._spline_integrate(integrand)
out[i] = self._spline_integrate(inner)
return np.sqrt(out)
[docs]
def integrate(self):
"""Integrate the :meth:`raw_integrand` over mass."""
integ = self.raw_integrand() # (r,k,m)
return integrate_dblsphere(integ, self.mask, self.dlnx, self.m, self.xmin)
[docs]
def integrate_dblsphere(integ, mask, dx, m=None, xmin=None):
"""Integration function for double sphere model.
When *m* and *xmin* are provided the outer (m1) integral is evaluated via
a cubic spline so that the result varies smoothly as *xmin* changes.
Parameters
----------
integ : np.ndarray, shape (r, k, m)
mask : np.ndarray, shape (r, m, m)
dx : float
Log-mass grid spacing.
m : np.ndarray or None
Halo-mass grid (required when *xmin* is set).
xmin : float or None
Smooth lower bound for the outer (m1) integral. When ``None`` the
standard Simpson rule is used. The caller is responsible for passing
``xmin=None`` when ``xmin <= m[0]``; this function does not re-check
that condition.
"""
out = np.zeros_like(integ[:, :, 0])
integrand = np.zeros_like(mask, dtype=float)
if xmin is not None and m is not None:
lnm = np.log(m)
lnxmin = np.log(xmin)
def _outer(inner_arr):
# Each row of inner_arr corresponds to a different r value and has
# distinct values, so a separate spline must be fitted per row.
return np.apply_along_axis(
lambda g: _IUS(lnm, g).integral(lnxmin, lnm[-1]),
-1,
inner_arr,
)
else:
def _outer(inner_arr):
return intg.simpson(inner_arr, dx=dx)
for ik in range(integ.shape[1]):
for ir in range(mask.shape[0]):
integrand[ir] = np.outer(integ[ir, ik, :], integ[ir, ik, :])
integrand[mask] = 0
# Inner integral (over m2) uses Simpson's rule; outer uses spline if xmin set.
inner = intg.simpson(integrand, dx=dx)
out[:, ik] = _outer(inner)
return out
if USE_NUMBA:
[docs]
@jit(nopython=True)
def integrate_dblsphere_(integ, mask, dx): # pragma: no cover
r"""The same as :func:`integrate_dblsphere`, but uses NUMBA to speed up."""
nr = integ.shape[0]
nk = integ.shape[1]
nm = mask.shape[1]
out = np.zeros((nr, nk))
integrand = np.zeros((nm, nm))
for ir in range(nr):
for ik in range(nk):
for im in range(nm):
for jm in range(im, nm):
if mask[ir, im, jm]:
integrand[im, jm] = 0
else:
integrand[im, jm] = integ[ir, ik, im] * integ[ir, ik, jm]
out[ir, ik] = dblsimps_(integrand, dx, dx)
return out
[docs]
class DblSphere_(DblSphere): # pragma: no cover
r"""The same as :class:`DblSphere`. But uses NUMBA to speed up the integration."""
[docs]
def integrate(self):
"""Integrate the :meth:`raw_integrand` over mass."""
integ = self.raw_integrand() # (r,k,m)
return integrate_dblsphere_(integ, self.mask, self.dlnx)
[docs]
class DblEllipsoid(DblSphere):
r"""
Double Ellipsoid model of halo exclusion.
Assuming a lognormal distribution
of ellipticities for halos, the probability of halo pairs **not** excluded
is:
.. math:: P(y) = 3 y^2 - 2 y^3 ,\; y = (x-0.8)/0.29,\; x = r/(R_{\rm vir,1}+R_{\rm vir,2})
taken from [1]_.
References
----------
.. [1] Tinker, J. et al., " On the Mass-to-Light Ratio of Large-Scale Structure",
https://ui.adsabs.harvard.edu/abs/2005ApJ...631...41T.
"""
[docs]
@cached_property
def mask(self):
"""Unecessary for this approach."""
return None
[docs]
@cached_property
def prob(self):
"""The probablity distribution used in calculating double integral."""
rvir = self.r_halo
x = outer(self.r, 1 / np.add.outer(rvir, rvir))
x = (x - 0.8) / 0.29 # this is y but we re-use the memory
np.clip(x, 0, 1, x)
return 3 * x**2 - 2 * x**3
[docs]
@cached_property
def density_mod(self):
"""The modified density, under new limits."""
integrand = self.prob * outer(
np.ones_like(self.r), np.outer(self.density * self.m, self.density * self.m)
)
if self.xmin is None:
return np.sqrt(dbltrapz(integrand, self.dlnx))
# Spline integration for smooth lower bound: integrate inner (m2), then outer (m1)
inner = self._spline_integrate(integrand) # (r, m1)
return np.sqrt(self._spline_integrate(inner)) # (r,)
[docs]
def integrate(self):
"""Integrate the :meth:`raw_integrand` over mass."""
integ = self.raw_integrand() # (r,k,m)
out = np.zeros_like(integ[:, :, 0])
integrand = np.zeros_like(self.prob)
for ik in range(integ.shape[1]):
for ir in range(len(self.r)):
integrand[ir] = self.prob[ir] * np.outer(integ[ir, ik, :], integ[ir, ik, :])
# Inner integral (m2) uses Simpson's rule; outer (m1) uses spline if xmin set.
inner = intg.simpson(integrand, dx=self.dlnx) # (r, m1)
out[:, ik] = self._spline_integrate(inner) # (r,)
return out
if USE_NUMBA:
[docs]
class DblEllipsoid_(DblEllipsoid): # pragma: no cover
r"""The same as :class:`DblEllipsoid`. But uses NUMBA to speed up the integration."""
[docs]
@cached_property
def density_mod(self): # pragma: no cover
"""The modified density, under new limits."""
return density_mod_(
self.r,
self.r_halo,
np.outer(self.density * self.m, self.density * self.m),
self.dlnx,
)
[docs]
@cached_property
def prob(self): # pragma: no cover
"""The probablity distribution used in calculating double integral."""
return prob_inner_(self.r, self.r_halo)
[docs]
def integrate(self): # pragma: no cover
"""Integrate the :meth:`raw_integrand` over mass."""
return integrate_dblell(self.raw_integrand(), self.prob, self.dlnx)
[docs]
@jit(nopython=True)
def integrate_dblell(integ, prob, dx): # pragma: no cover
r"""Double Integration via the trapezoidal method if using NUMBA."""
nr = integ.shape[0]
nk = integ.shape[1]
nm = prob.shape[1]
out = np.zeros((nr, nk))
integrand = np.zeros((nm, nm))
for ir in range(nr):
for ik in range(nk):
for im in range(nm):
for jm in range(im, nm):
integrand[im, jm] = integ[ir, ik, im] * integ[ir, ik, jm] * prob[ir, im, jm]
out[ir, ik] = dbltrapz_(integrand, dx, dx)
return out
[docs]
@jit(nopython=True)
def density_mod_(r, rvir, densitymat, dx): # pragma: no cover
"""The modified density, under new limits."""
d = np.zeros(len(r))
for ir, rr in enumerate(r):
integrand = prob_inner_r_(rr, rvir) * densitymat
d[ir] = dbltrapz_(integrand, dx, dx)
return np.sqrt(d)
[docs]
@jit(nopython=True)
def prob_inner_(r, rvir): # pragma: no cover
"""Jit-compiled version of calculating prob, taking advantage of symmetry."""
nrv = len(rvir)
out = np.empty((len(r), nrv, nrv))
for ir, rr in enumerate(r):
for irv, rv1 in enumerate(rvir):
for jrv in range(irv, nrv):
rv2 = rvir[jrv]
x = (rr / (rv1 + rv2) - 0.8) / 0.29
if x <= 0:
out[ir, irv, jrv] = 0
elif x >= 1:
out[ir, irv, jrv] = 1
else:
out[ir, irv, jrv] = 3 * x**2 - 2 * x**3
return out
[docs]
@jit(nopython=True)
def prob_inner_r_(r, rvir): # pragma: no cover
"""
Jit-compiled version of calculating prob along one r,
taking advantage of symmetry.
"""
nrv = len(rvir)
out = np.empty((nrv, nrv))
for irv, rv1 in enumerate(rvir):
for jrv in range(irv, nrv):
rv2 = rvir[jrv]
x = (r / (rv1 + rv2) - 0.8) / 0.29
if x <= 0:
out[irv, jrv] = 0
elif x >= 1:
out[irv, jrv] = 1
else:
out[irv, jrv] = 3 * x**2 - 2 * x**3
return out
[docs]
class NgMatched(DblEllipsoid):
r"""
A model for double ellipsoid halo exclusion, where a mask is
defined so that the number density of galaxies is matched.
"""
[docs]
@cached_property
def mask(self):
"""Mask Function for matching density."""
integrand = self.density * self.m
cumint = intg.cumulative_trapezoid(integrand, dx=self.dlnx, initial=0) # len m
cumint = np.outer(np.ones_like(self.r), cumint) # r,m
return np.where(
cumint > 1.0001 * np.outer(self.density_mod, np.ones_like(self.m)),
np.ones_like(cumint, dtype=bool),
np.zeros_like(cumint, dtype=bool),
)
[docs]
def integrate(self):
"""Integrate the :meth:`raw_integrand` over mass."""
integ = self.raw_integrand().transpose((1, 0, 2)) # k, r, m
integ[:, self.mask] = 0
return self._spline_integrate(integ.transpose((1, 0, 2))) ** 2
if USE_NUMBA:
[docs]
class NgMatched_(DblEllipsoid_): # pragma: no cover
r"""The same as :class:`NgMatched`. But uses NUMBA to speed up the integration."""
[docs]
@cached_property
def mask(self):
"""Mask Function for matching density."""
integrand = self.density * self.m
cumint = intg.cumulative_trapezoid(integrand, dx=self.dlnx, initial=0) # len m
cumint = np.outer(np.ones_like(self.r), cumint) # r,m
return np.where(
cumint > 1.0001 * np.outer(self.density_mod, np.ones_like(self.m)),
np.ones_like(cumint, dtype=bool),
np.zeros_like(cumint, dtype=bool),
)
[docs]
def integrate(self):
"""Integrate the :meth:`raw_integrand` over mass."""
integ = self.raw_integrand() # r,k,m
integ.transpose((1, 0, 2))[:, self.mask] = 0
return intg.simpson(integ, dx=self.dlnx) ** 2
[docs]
def cumsimps(func, dx):
"""
A very simplistic cumulative simpsons rule integrator. func is an array,
h is the equal spacing. It is somewhat inaccurate in the first few bins, since
we just truncate the integral, regardless of whether it is odd or even numbered.
Examples
--------
>>> x = np.linspace(0,1,1001)
>>> y = np.sin(x)
>>> print cumsimps(y,0.001)/(1-np.cos(x))
"""
f1 = func.copy()
f1[1:-1] *= 2
f1[1:-1:2] *= 2
rm = func.copy()
rm[1:-1:2] *= 3
cs = np.cumsum(f1)
cs -= rm
return cs * dx / 3