"""
Routines and Frameworks for intelligent integration of the correlation function,
to obtain eg. projected and angular correlations functions.
"""
from __future__ import annotations
import warnings
import numpy as np
from hmf import Cosmology as csm
from hmf import cached_quantity, parameter
from scipy.integrate import simpson
from scipy.interpolate import InterpolatedUnivariateSpline as _spline
from .halo_model import HaloModel
[docs]
class ProjectedCF(HaloModel):
r"""
Class for calculating projected correlation function.
Parameters
----------
rp_min
Minimum projected scale to calculate (Mpc/h)
rp_max
Maximum projected scale to calculate (Mpc/h)
rp_num
Number of bins in projected scale
rp_log
Whether the projected scale vector should be log-spaced.
proj_limit
The largest non-projected scale up to which to integrate.
"""
def __init__(
self,
rp_min: float = 0.01,
rp_max: float = 50.0,
rp_num: float = 30,
rp_log: bool = True,
proj_limit=None,
**kwargs,
):
# Set default rnum
if "rnum" not in kwargs:
kwargs["rnum"] = 5 * rp_num
super().__init__(**kwargs)
self.proj_limit = proj_limit
self.rp_min = rp_min
self.rp_max = rp_max
self.rp_num = rp_num
self.rp_log = rp_log
@parameter("switch")
def rp_min(self, val):
"""Minimum projected radius."""
return val
@parameter("option")
def rp_log(self, val):
"""If projected radius bins are logarithmically distributed."""
return bool(val)
@parameter("res")
def rp_max(self, val):
"""Maximum projected radius."""
return val
@parameter("res")
def rp_num(self, val):
"""Number of projected radius bins."""
if val < 0:
raise ValueError("rp_num must be > 0")
return int(val)
@parameter("switch")
def proj_limit(self, val):
"""Upper limits for projected radius. Can be ``None``."""
return val
@cached_quantity
def rp(self):
"""Array of projected radius bins."""
if isinstance(self.rp_min, (list, np.ndarray)):
rp = np.array(self.rp_min)
else:
if self.rp_log:
rp = np.logspace(np.log10(self.rp_min), np.log10(self.rp_max), self.rp_num)
else:
rp = np.linspace(self.rp_min, self.rp_max, self.rp_num)
return rp
@cached_quantity
def rlim(self):
"""Upper limits for projected radius.
Either manually set or taken to be some large value.
"""
return self.proj_limit or max(80.0, 5 * self.rp.max())
@cached_quantity
def r(self):
"""Logarithmically distributed rp array."""
return np.logspace(np.log10(self.rp.min()), np.log10(self.rlim), self.rnum)
@cached_quantity
def projected_corr_gal(self):
"""
Projected correlation function w(r_p).
From Beutler 2011, eq 6.
To integrate perform a substitution y = x - r_p.
"""
return projected_corr_gal(self.r, self.corr_auto_tracer, self.rlim, self.rp)
[docs]
def projected_corr_gal(
r: np.ndarray, xir: np.ndarray, rlim: np.ndarray, rp_out: [None, np.ndarray] = None
):
"""
Projected correlation function w(r_p).
Equation taken from [1]_.
To integrate, we perform a substitution y = x - r_p.
Parameters
----------
r : float array
Array of scales for the 3D correlation function, in [Mpc/h]
xir : float array
3D correlation function Array of xi(r), unitless.
rlim : float array
Upper limits of scales.
References
----------
.. [1] Davis, M. and Peebles, P. J. E., "A survey of galaxy redshifts.
V. The two-point position and velocity correlations. ",
https://ui.adsabs.harvard.edu/abs/1983ApJ...267..465D.
"""
if rp_out is None:
rp_out = r
lnr = np.log(r)
lnxi = np.log(xir)
p = np.zeros_like(rp_out)
fit = _spline(r, xir, k=3) # [self.corr_gal > 0] maybe?
f_peak = 0.01
a = 0
for i, rp in enumerate(rp_out):
if a != 1.3 and i < len(r) - 1:
# Get log slope at rp
ydiff = (lnxi[i + 1] - lnxi[i]) / (lnr[i + 1] - lnr[i])
# if the slope is flatter than 1.3, it will converge faster, but to make
# sure, we cut at 1.3
a = max(1.3, -ydiff)
theta = _get_theta(a)
min_y = theta * f_peak**2 * rp
# Get the upper limit for this rp
ylim = rlim - rp
# Set the y vector for this rp
y = np.logspace(np.log(min_y), np.log(ylim), 1000, base=np.e)
# Integrate
integ_corr = fit(y + rp)
integrand = (y + rp) * integ_corr / np.sqrt((y + 2 * rp) * y)
p[i] = simpson(integrand, x=y) * 2
return p
def _get_theta(a):
"""Utility Function."""
theta = (
2 ** (1 + 2 * a)
* (
7
- 2 * a**3
+ 3 * np.sqrt(5 - 8 * a + 4 * a**2)
+ a**2 * (9 + np.sqrt(5 - 8 * a + 4 * a**2))
- a * (13 + 3 * np.sqrt(5 - 8 * a + 4 * a**2))
)
* ((1 + np.sqrt(5 - 8 * a + 4 * a**2)) / (a - 1)) ** (-2 * a)
)
theta /= (a - 1) ** 2 * (-1 + 2 * a + np.sqrt(5 - 8 * a + 4 * a**2))
return theta
[docs]
def flat_z_dist(zmin, zmax):
"""Get an array of z weights, assuming equally distributed z bins."""
def ret(z):
z = np.atleast_1d(z)
return np.where(np.logical_and(z >= zmin, z <= zmax), 1.0 / (zmax - zmin), 0)
return ret
[docs]
def dxdz(z, cosmo=csm().cosmo):
"""Derivative of comoving distance with redshift [Mpc/h]."""
dh = cosmo.hubble_distance * cosmo.h
return dh.value / cosmo.efunc(z)
[docs]
class AngularCF(HaloModel):
"""
Framework extension to angular correlation functions.
Parameters
----------
p1 : callable, optional
The redshift distribution of the sample. This needs not
be normalised to 1, as this will occur internally. May be
either a function of radial distance [Mpc/h] or redshift.
If a function of radial distance, :func:`p_of_z` must be set to
False. Default is a flat distribution in redshift.
p2 : callable, optional
See `p1`. This can optionally be a different function against
which to cross-correlate. By default is equivalent to :func:`p1`.
theta_min, theta_max : float, optional
min,max angular separations [Rad]
theta_num : int, optional
Number of steps in angular separation
theta_log : bool, optional
Whether to use logspace for theta values
zmin, zmax : float, optional
The redshift limits of the sample distribution. Note that
this is in redshit, regardless of the value of :func:`p_of_z`.
znum : int, optional
Number of steps in redshift grid.
logu_min, logu_max : float, optional
min,max of the log10 of radial separation grid [Mpc/h]. Must be large
enough to let the integral over the 3D correlation function to converge.
unum : int, optional
Number of steps in the u grid.
check_p_norm : bool, optional
If False, cancels checking the normalisation of :func:`p1` and :func:`p2`.
p_of_z : bool, optional
Whether :func:`p1` and :func:`p2` are functions of redshift.
kwargs : unpacked-dict
Any keyword arguments passed down to :class:`halomod.HaloModel`.
"""
def __init__(
self,
p1=None,
p2=None,
theta_min=1e-3 * np.pi / 180.0,
theta_max=np.pi / 180.0,
theta_num=30,
theta_log=True,
zmin=0.2,
zmax=0.4,
znum=100,
logu_min=-4,
logu_max=2.3,
unum=100,
check_p_norm=True,
p_of_z=True,
**kwargs,
):
super().__init__(**kwargs)
if self.z < zmin or self.z > zmax:
warnings.warn(
f"Your specified redshift (z={self.z}) is not within your selection "
f"function, z=({zmin},{zmax})",
stacklevel=2,
)
if p1 is None:
p1 = flat_z_dist(zmin, zmax)
self.p1 = p1
self.p2 = p2
self.zmin = zmin
self.zmax = zmax
self.znum = znum
self.logu_min = logu_min
self.logu_max = logu_max
self.unum = unum
self.check_p_norm = check_p_norm
self.p_of_z = p_of_z
self.theta_min = theta_min
self.theta_max = theta_max
self.theta_num = theta_num
self.theta_log = theta_log
@parameter("param")
def p1(self, val):
"""The redshift distribution of the sample."""
return val
@parameter("param")
def p2(self, val):
"""The redshift distribution of the second sample for correlation."""
return val
@parameter("model")
def p_of_z(self, val):
"""Whether :func:`p1` and :func:`p2` are functions of redshift."""
return val
@parameter("res")
def theta_min(self, val):
"""Minimum angular separations [Rad]."""
if val < 0:
raise ValueError("theta_min must be > 0")
return val
@parameter("res")
def theta_max(self, val):
"""Maximum angular separations [Rad]."""
if val > 180.0:
raise ValueError("theta_max must be < 180.0")
return val
@parameter("res")
def theta_num(self, val):
"""Number of angular separations [Rad]."""
return val
@parameter("res")
def theta_log(self, val):
"""If angular separations are logarithmically distributed."""
return val
@parameter("param")
def zmin(self, val):
"""Minimum redshift."""
return val
@parameter("param")
def zmax(self, val):
"""Maximum redshift."""
return val
@parameter("res")
def znum(self, val):
"""Number of redshift bins."""
return val
@parameter("res")
def logu_min(self, val):
"""Minimum radial separation grid [Mpc/h]."""
return val
@parameter("res")
def logu_max(self, val):
"""Maximum radial separation grid [Mpc/h]."""
return val
@parameter("res")
def unum(self, val):
"""Number of radial separation grids [Mpc/h]."""
return val
@parameter("option")
def check_p_norm(self, val):
"""If False, cancels checking the normalisation of :func:`p1` and :func:`p2`."""
return val
@cached_quantity
def zvec(self):
"""Redshift distribution grid."""
return np.linspace(self.zmin, self.zmax, self.znum)
@cached_quantity
def uvec(self):
"""Radial separation grid [Mpc/h]."""
return np.logspace(self.logu_min, self.logu_max, self.unum)
@cached_quantity
def xvec(self):
"""Radial distance grid (corresponds to zvec) [Mpc/h]."""
return self.cosmo.comoving_distance(self.zvec).value
@cached_quantity
def theta(self):
"""Angular separations, [Rad]."""
if self.theta_min > self.theta_max:
raise ValueError("theta_min must be less than theta_max")
if self.theta_log:
return np.logspace(np.log10(self.theta_min), np.log10(self.theta_max), self.theta_num)
else:
return np.linspace(self.theta_min, self.theta_max, self.theta_num)
@cached_quantity
def r(self):
"""Physical separation grid [Mpc/h]."""
rmin = np.sqrt((10**self.logu_min) ** 2 + self.theta.min() ** 2 * self.xvec.min() ** 2)
rmax = np.sqrt((10**self.logu_max) ** 2 + self.theta.max() ** 2 * self.xvec.max() ** 2)
return np.logspace(np.log10(rmin), np.log10(rmax), self.rnum)
@cached_quantity
def angular_corr_gal(self):
"""The angular correlation function w(theta) for tracers.
Equation taken from Eq.(33) of [1]_.
References
----------
.. [1] Blake, C. et al., "Halo-model signatures
from 380000 Sloan Digital Sky Survey luminous red galaxies
with photometric redshifts",
https://ui.adsabs.harvard.edu/abs/2008MNRAS.385.1257B.
"""
return angular_corr_gal(
self.theta,
self.corr_auto_tracer_fnc,
self.p1,
self.zmin,
self.zmax,
self.logu_min,
self.logu_max,
znum=self.znum,
unum=self.unum,
p2=self.p2,
check_p_norm=self.check_p_norm,
cosmo=self.cosmo,
p_of_z=self.p_of_z,
)
@cached_quantity
def angular_corr_matter(self):
"""
The angular correlation function w(theta) for matter.
Equation taken from Eq.(33) of [1]_.
References
----------
.. [1] Blake, C. et al., "Halo-model signatures
from 380000 Sloan Digital Sky Survey luminous red galaxies
with photometric redshifts",
https://ui.adsabs.harvard.edu/abs/2008MNRAS.385.1257B.
"""
return angular_corr_gal(
self.theta,
self.corr_auto_matter_fnc,
self.p1,
self.zmin,
self.zmax,
self.logu_min,
self.logu_max,
znum=self.znum,
unum=self.unum,
p2=self.p2,
check_p_norm=self.check_p_norm,
cosmo=self.cosmo,
p_of_z=self.p_of_z,
)
def _check_p(p, z):
"""If False, cancels checking the normalisation of :func:`p1` and :func:`p2`."""
integ = p.integral(z.min(), z.max()) if hasattr(p, "integral") else simpson(p(z), x=z)
if not np.isclose(integ, 1.0, rtol=0.01):
warnings.warn(
f"Filter function p(x) did not integrate to 1 ({integ}). Tentatively re-normalising.",
stacklevel=2,
)
return lambda z: p(z) / integ
else:
return p
[docs]
def angular_corr_gal(
theta,
xi,
p1,
zmin,
zmax,
logu_min,
logu_max,
znum=100,
unum=100,
p2=None,
check_p_norm=True,
cosmo=None,
p_of_z=True,
**xi_kw,
):
"""
Calculate the angular correlation function w(theta).
From [1]_, Eq. 33. That is, this uses the Limber approximation.
This does not hold either for wide angles, or thin radial distributions.
Parameters
----------
theta : array_like
Angles at which to calculate the angular correlation. In radians.
xi : callable
A function of one variable: r [Mpc/h], which returns
the 3D correlation function at the scale r.
p1: callable
The redshift distribution of sources. Should integrate to 1 between
`logz_min` and `logz_max`. A callable function of a single variable, z.
zmin, zmax : float
The redshift limits of the sample distribution. Note that
this is in redshift, regardless of the value of `p_of_z`.
logu_min, logu_max : float
min,max of the log10 of radial separation grid [Mpc/h]. Must be large
enough to let the integral over the 3D correlation function to converge.
znum : int, optional
Number of steps in redshift grid.
unum : int, optional
Number of steps in the u grid.
p2 : callable, optional
The same as `p1`, but for a second, cross-correlating dataset. If not
provided, defaults to `p1` (i.e. auto-correlation).
check_p_norm : bool, optional
If False, cancels checking the normalisation of `p1` and `p2`.
p_of_z : bool, optional
Whether `p1` and `p2` are functions of redshift.
cosmo : `hmf.cosmo.Cosmology` instance, optional
A cosmology, used to generate comoving distance from redshift. Default
is the default cosmology of the `hmf` package.
xi_kw : unpacked-dict
Any arguments to `xi` other than r,z.
Returns
-------
wtheta : array_like
The angular correlation function corresponding to `theta`.
References
----------
.. [1] Blake, C. et al., "Halo-model signatures
from 380000 Sloan Digital Sky Survey luminous red galaxies
with photometric redshifts",
https://ui.adsabs.harvard.edu/abs/2008MNRAS.385.1257B.
"""
if cosmo is None:
cosmo = csm().cosmo
# Arrays
u = np.logspace(logu_min, logu_max, unum)
if p_of_z:
z = np.linspace(zmin, zmax, znum)
x = (cosmo.comoving_distance(z) * cosmo.h).value
else:
xmin = (cosmo.comoving_distance(zmin) * cosmo.h).value
xmax = (cosmo.comoving_distance(zmax) * cosmo.h).value
x = np.linspace(xmin, xmax, znum)
if check_p_norm:
p1 = _check_p(p1, z if p_of_z else x)
if p2 is None:
p2 = p1
elif check_p_norm:
p2 = _check_p(p2, z if p_of_z else x)
p_integ = p1(z) * p2(z) / dxdz(z, cosmo) ** 2 if p_of_z else p1(x) * p2(x)
R = np.sqrt(np.add.outer(np.outer(theta**2, x**2), u**2)).flatten()
integrand = np.einsum(
"kij,i->kij", xi(R, **xi_kw).reshape((len(theta), len(x), len(u))), p_integ
)
return 2 * simpson(simpson(integrand, x=u), x=x)