Skip to content

Commit

Permalink
Merge branch 'main' into warnings-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jsdillon authored Oct 16, 2024
2 parents f8f650e + dd32406 commit ae8897f
Show file tree
Hide file tree
Showing 4 changed files with 515 additions and 10 deletions.
248 changes: 248 additions & 0 deletions hera_cal/frf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np
from hera_filters import dspec
from hera_cal.noise import predict_noise_variance_from_autos

from . import utils
from scipy.interpolate import interp1d
Expand Down Expand Up @@ -1998,3 +1999,250 @@ def time_average_argparser():
ap.add_argument("--equalize_interleave_ntimes", action="store_true", default=False, help=desc)

return ap

def get_frop_for_noise(times, filter_cent_use, filter_half_wid_use, freqs,
t_avg=300., eigenval_cutoff=1e-9, weights=None,
coherent_avg=True, wgt_tavg_by_nsample=True,
nsamples=None, rephase=True, bl_vec=None,
lat=-30.721526120689507, dlst=None):
"""
Get FRF + coherent time average operator for the purposes of noise
covariance calculation. Composes time averaging and the main lobe FRF into
a single operation.
Parameters:
times (array):
(Interleaved) Julian Dates on hd, converted to seconds.
filter_cent_use (float):
Filter center for main lobe filter, in Hz.
filter_half_wid_use (float):
Filter half width for main lobe filter, in Hz.
freqs (array):
Frequencies in the data (in Hz).
t_avg (float):
Desired coherent averaging length, in seconds.
eigenval_cutoff (float):
Eigenvalue cutoff for DPSS modes.
weights (array):
Array of weights to use for main lobe filter.
None (default) uses uniform weights.
coherent_avg (bool):
Whether to coherently average on the t_avg cadence or not.
wgt_tavg_by_nsample (bool):
Whether to weight the time average by nsamples.
Otherwise do uniform weighting.
nsamples (array):
Array proportional to how many nights contributed to each sample.
rephase (bool):
Whether to rephase before averaging.
bl_vec (array):
Baseline vector in meters in ENU coordinate system
lat (float):
Array latitude in degrees North.
dlst (float or array_like):
Amount of LST to rephase by, in radians. If float, rephases all
times by the same amount.
Returns:
frop (array):
Filter operator. Shape (Ntimes_coarse, Ntimes_fine, Nfreqs).
"""

# Time handling is a slightly modified port from hera_filters/hera_cal

Ntimes = len(times)
Nfreqs = len(freqs)

dmatr, _ = dspec.dpss_operator(times, np.array([filter_cent_use, ]),
np.array([filter_half_wid_use, ]),
eigenval_cutoff=np.array([eigenval_cutoff, ]))
Nmodes = dmatr.shape[-1]

if weights is None:
weights = np.ones([Ntimes, Nfreqs])
elif weights.shape != (Ntimes, Nfreqs):
raise ValueError(f"weights has wrong shape {weights.shape} "
f"compared to (Ntimes, Nfreqs) = {(Ntimes, Nfreqs)}"
" May need to be sliced along an axis.")

#####Index Legend#####
# a = DPSS mode #
# f = frequency #
# t = fine time #
# T = coarse time #
#####Index Legend#####

ddagW = dmatr.T.conj()[:, np.newaxis] * weights.T # aft
ddagWd = ddagW @ dmatr # afa
lsq = np.linalg.solve(ddagWd.swapaxes(0,1), ddagW.swapaxes(0,1)) # fat

if coherent_avg:
dtime = np.median(np.abs(np.diff(times)))
chunk_size = int(np.round((t_avg / dtime)))
Nchunk = int(np.ceil(Ntimes / chunk_size))
chunk_remainder = Ntimes % chunk_size

tavg_weights = nsamples if wgt_tavg_by_nsample else np.where(weights, np.ones([Ntimes, Nfreqs]), 0)
if tavg_weights.shape != (Ntimes, Nfreqs):
raise ValueError(f"nsamples has wrong shape {nsamples.shape} "
f"compared to (Ntimes, Nfreqs) = {(Ntimes, Nfreqs)}"
" May need to be sliced along an axis.")
if chunk_remainder > 0: # Stack some 0s that get 0 weight so we can do the reshaping below without incident

dmatr_stack_shape = [chunk_size - chunk_remainder, Nmodes]
weights_stack_shape = [chunk_size - chunk_remainder, Nfreqs]
dmatr = np.vstack((dmatr, np.zeros(dmatr_stack_shape, dtype=complex)))
tavg_weights = np.vstack((tavg_weights, np.zeros(weights_stack_shape, dtype=complex)))
dlst = np.append(dlst, np.zeros(chunk_size - chunk_remainder, dtype=float))

dres = dmatr.reshape(Nchunk, chunk_size, Nmodes)
wres = tavg_weights.reshape(Nchunk, chunk_size, Nfreqs)
wnorm = wres.sum(axis=1)[:, np.newaxis]
# normalize for an average
wres = np.where(wnorm > 0, wres / wnorm, 0)

# Apply the rephase to the weights matrix since it's mathematically equivalent and convenient
if rephase:
wres = utils.lst_rephase(wres.reshape(Nchunk * chunk_size, 1, Nfreqs),
bl_vec, freqs, dlst, lat=lat, inplace=False)
wres = wres.reshape(Nchunk, chunk_size, Nfreqs)

# does "Ttf,Tta->Tfa" much faster than einsum and fancy indexing
dchunk = np.zeros([Nchunk, Nfreqs, Nmodes], dtype=complex)
for coarse_time_ind in range(Nchunk):
dchunk[coarse_time_ind] = np.tensordot(wres[coarse_time_ind].T,
dres[coarse_time_ind],
axes=1)

# does "Tfa,fat->Ttf" faster than einsum and fancy indexing
frop = np.zeros([Nchunk, Ntimes, Nfreqs], dtype=complex)
for freq_ind in range(Nfreqs):
frop[:, :, freq_ind] = np.tensordot(dchunk[:, freq_ind],
lsq[freq_ind],
axes=1)
else:
# ta,fat->ttf
frop = np.tensordot(dmatr, lsq.transpose(1, 2, 0), axes=1)

return frop

def prep_var_for_frop(data, nsamples, weights, cross_antpairpol, freq_slice,
auto_ant=None, default_value=0.):
"""
Wrapper around hera_cal.noise.predict_noise_variance_from_autos that preps
the noise variance calculation for FRF + coherent average.
Parameters:
data: DataContainer
Must contain the cross_antpairpol in question as well as some
corresponding autos.
nsamples: DataContainer
Contains the nsamples associated with the cross_antpairpol in question.
weights: DataContainer
Contains the weights associated with the cross_antpairpol in question.
cross_antpairpol: tuple
Tuple whose first two entries are antennas and last entry is a
polarization.
freq_slice: slice
A slice into the frequency axis of the data that all gets filtered
identically (a PS analysis band).
auto_ant: int
If not None, should be a single integer specifying a single
antenna's autos (this is used because the redundantly averaged data
have a single auto file for all antennas).
default_value: (float)
The default variance to use in locations with 0 nsamples to avoid
nans.
Returns:
var: array
Noise variance (Ntimes, Nfreqs)
"""
var = predict_noise_variance_from_autos(cross_antpairpol, data,
nsamples=nsamples, auto_ant=auto_ant)
var = var[:, freq_slice]

var_isnotfinite = ~np.isfinite(var)
if np.any(var_isnotfinite):
# Check all the infs/nans are weighted to zero and replace with default value
all_nonfinite_zero = np.all(weights[cross_antpairpol][:, freq_slice][var_isnotfinite]) == 0
if not all_nonfinite_zero:
warnings.warn("Not all nonfinite variance locations are of zero weight!")

print(f"Replacing nonfinite variances with default value: {default_value}")
var[var_isnotfinite] = default_value

return var

def get_FRF_cov(frop, var):
"""
Get noise covariance from noise variance and given FRF + coherent average operator.
Parameters:
frop: array
A linear operator that performs the FRF and coherent averaging
operations in one step.
var: array
Noise variance for a single antpairpol.
Returns:
cov: array
The desired covariance. Shape (Nfreqs, Ntimes, Ntimes)
"""
Nfreqs = frop.shape[2]
Ncoarse = frop.shape[0]
cov = np.zeros([Nfreqs, Ncoarse, Ncoarse], dtype=complex)
for freq_ind in range(Nfreqs):
cov[freq_ind] = np.tensordot((frop[:, :, freq_ind] * var[:, freq_ind]),
frop[:, :, freq_ind].T.conj(), axes=1)

return cov

def get_corr(cov):
"""
Get correlation matrices from a sequence of covariance matrices.
Parameters:
cov: array
The covariance matrices.
Returns:
corr: array
The corresponding correlation matrices.
"""
Ntimes = cov.shape[1]
diags = cov[:, np.arange(Ntimes), np.arange(Ntimes)]
corr = cov / np.sqrt(diags[:, None] * diags[:, :, None])

return corr

def get_correction_factor_from_cov(cov, tslc=None):
"""
Get a correction factor for PS noise variance prediction in the absence
of propagating the noise covariances all the way to delay space. This is
based on HERA memo #132, where it is shown that one may calculate an
effective number of degrees of freedom based on the variance of a
generalized chi-square random variable.
Parameters:
cov: array
The time-time covariance matrix at every frequency
tslc: slice
A time slice to use in case some times should be excluded from the
calculation of this factor.
Returns:
correction_factor: array
The correction factor.
"""
corr = get_corr(cov)

if tslc is None:
Ncotimes = corr.shape[1]
Neff = Ncotimes**2 / np.sum(np.abs(corr)**2, axis=(1, 2))
else:
Ncotimes = tslc.stop - tslc.start
Neff = Ncotimes**2 / np.sum(np.abs(corr[:, tslc, tslc])**2, axis=(1, 2))
correction_factor = np.mean(Ncotimes / Neff) # Average over frequency since they are independent.

return correction_factor

14 changes: 12 additions & 2 deletions hera_cal/noise.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ def infer_dt(times_by_bl, bl, default_dt=None):
raise ValueError('Cannot infer dt when all len(times_by_bl) == 1 or fewer.')


def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None):
def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None,
auto_ant=None):
'''Predict the noise variance on a baseline using autocorrelation data
using the formla sigma^2 = Vii * Vjj / Delta t / Delta nu.
Expand All @@ -81,6 +82,11 @@ def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None)
from the frequencies stored in the DataContainer
nsamples: DataContainer mapping bl tuples to numpy arrays of the number
integrations for that given baseline. Must include nsamples[bl].
auto_ant: int
For some cases, like redundant averaging, the auto corresponding
to the individual antennas is not available. In this case, the
user should use this keyword to specify a single auto antenna from
which to derive this statistic.
Returns:
Noise variance predicted on baseline bl in units of data squared.
Expand All @@ -92,7 +98,11 @@ def predict_noise_variance_from_autos(bl, data, dt=None, df=None, nsamples=None)
df = np.median(np.ediff1d(data.freqs))

ap1, ap2 = split_pol(bl[2])
auto_bl1, auto_bl2 = (bl[0], bl[0], join_pol(ap1, ap1)), (bl[1], bl[1], join_pol(ap2, ap2))
if auto_ant is None:
auto_bl1, auto_bl2 = (bl[0], bl[0], join_pol(ap1, ap1)), (bl[1], bl[1], join_pol(ap2, ap2))
else:
auto_bl1, auto_bl2 = (auto_ant, auto_ant, join_pol(ap1, ap1)), (auto_ant, auto_ant, join_pol(ap2, ap2))

var = np.abs(data[auto_bl1] * data[auto_bl2] / dt / df)
if nsamples is not None:
return var / nsamples[bl]
Expand Down
Loading

0 comments on commit ae8897f

Please sign in to comment.