Skip to content

Commit

Permalink
warnings and formatting fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jsdillon committed Oct 16, 2024
1 parent ae8897f commit c8505a1
Showing 1 changed file with 79 additions and 78 deletions.
157 changes: 79 additions & 78 deletions hera_cal/tests/test_frf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,7 @@ def test_tophat_linear_argparser(self):
assert a.time_thresh == 0.05
assert not a.factorize_flags


def test_get_frop_for_noise():
uvh5 = os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5")
hd = io.HERAData([uvh5])
Expand All @@ -1083,101 +1084,100 @@ def test_get_frop_for_noise():
filt_cent = 0
filt_hw = 0.005
eval_cutoff = 1e-12

# Make random weights for extra fun
rng = np.random.default_rng(seed=1)
weights = rng.exponential(size=data.shape)


frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs, weights=weights,
coherent_avg=False,

frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs, weights=weights,
coherent_avg=False,
eigenval_cutoff=eval_cutoff)

frf_dat = (frop * data[bl]).sum(axis=1)
frf_dat_pipeline = copy.deepcopy(data)

# From main_beam_FR_filter in single_baseline_notebook
# TODO: graduate that code into hera_cal and put here
# TODO: graduate that code into hera_cal and put here
d_mdl = np.zeros_like(data[bl])

d_mdl, _, _ = dspec.fourier_filter(times, data[bl],
d_mdl, _, _ = dspec.fourier_filter(times, data[bl],
wgts=weights, filter_centers=[filt_cent],
filter_half_widths=[filt_hw],
mode='dpss_solve',
eigenval_cutoff=[eval_cutoff],
suppression_factors=[eval_cutoff],
max_contiguous_edge_flags=len(data.times),
filter_half_widths=[filt_hw],
mode='dpss_solve',
eigenval_cutoff=[eval_cutoff],
suppression_factors=[eval_cutoff],
max_contiguous_edge_flags=len(data.times),
filter_dims=0)
frf_dat_pipeline[bl] = d_mdl

assert np.allclose(frf_dat_pipeline[bl], frf_dat)

# Now do coherent averaging
# TODO: These lines are non-interleaved versions of some lines in the
# single-baseline PS notebook. They seem useful -- maybe just make them
# TODO: These lines are non-interleaved versions of some lines in the
# single-baseline PS notebook. They seem useful -- maybe just make them
# standard functions?
dt = times[1] - times[0]
Navg = int(np.round(300. / dt))
n_avg_int = int(np.ceil(len(data.lsts) / Navg))
target_lsts = [np.mean(np.unwrap(data.lsts)[i * Navg:(i+1) * Navg])
target_lsts = [np.mean(np.unwrap(data.lsts)[i * Navg:(i + 1) * Navg])
for i in range(n_avg_int)]
dlst = [target_lsts[i] - l for i in range(n_avg_int)
for l in np.unwrap(data.lsts)[i * Navg:(i+1) * Navg]]
bl_vec = data.antpos[bl[0]] - data.antpos[bl[1]]

dlst = [target_lsts[i] - lst for i in range(n_avg_int)
for lst in np.unwrap(data.lsts)[i * Navg:(i + 1) * Navg]]
bl_vec = data.antpos[bl[0]] - data.antpos[bl[1]]

frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs, weights=weights,
coherent_avg=True,
frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs, weights=weights,
coherent_avg=True,
eigenval_cutoff=eval_cutoff, dlst=dlst,
nsamples=nsamples[bl], t_avg=300.,
bl_vec=bl_vec)
frf_dat = (frop * data[bl]).sum(axis=1)

utils.lst_rephase(frf_dat_pipeline, {bl: bl_vec}, data.freqs, dlst,
lat=hd.telescope_location_lat_lon_alt_degrees[0],
utils.lst_rephase(frf_dat_pipeline, {bl: bl_vec}, data.freqs, dlst,
lat=hd.telescope.location.lat.deg,
inplace=True)
avg_data = frf.timeavg_waterfall(frf_dat_pipeline[bl], Navg,
flags=np.zeros_like(flags[bl]),
nsamples=nsamples[bl],
nsamples=nsamples[bl],
extra_arrays={'times': data.times},
lsts=data.lsts, freqs=data.freqs,
rephase=False, bl_vec=bl_vec,
rephase=False, bl_vec=bl_vec,
verbose=False)[0]

assert np.allclose(avg_data, frf_dat)

# Test uniform weights
weights = np.ones_like(data[bl])
frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs, weights=weights,
frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs, weights=weights,
coherent_avg=False,
eigenval_cutoff=eval_cutoff)
d_mdl, _, _ = dspec.fourier_filter(times, data[bl],
d_mdl, _, _ = dspec.fourier_filter(times, data[bl],
wgts=weights,
filter_centers=[filt_cent],
filter_half_widths=[filt_hw],
mode='dpss_solve',
eigenval_cutoff=[eval_cutoff],
suppression_factors=[eval_cutoff],
max_contiguous_edge_flags=len(data.times),
filter_half_widths=[filt_hw],
mode='dpss_solve',
eigenval_cutoff=[eval_cutoff],
suppression_factors=[eval_cutoff],
max_contiguous_edge_flags=len(data.times),
filter_dims=0)
frf_dat_pipeline[bl] = d_mdl
frf_dat = (frop * data[bl]).sum(axis=1)
assert np.allclose(frf_dat_pipeline[bl], frf_dat)

# Check that setting weights to None does the same as uniform weights
frop_none = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs, weights=None,
frop_none = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs, weights=None,
coherent_avg=False,
eigenval_cutoff=eval_cutoff)
assert np.array_equal(frop, frop_none)


def test_prep_var_for_frop():
uvh5 = os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5")
hd = io.HERAData([uvh5])
data, flags, nsamples = hd.read()
data, flags, nsamples = hd.read(fix_autos=True)
# The first 37 freqs give 0 variance so just exclude them to make the rest
# of the test more transparent
freq_slice = slice(37, 1024)
Expand All @@ -1195,24 +1195,25 @@ def test_prep_var_for_frop():
var_for_frop = frf.prep_var_for_frop(data, nsamples, weights,
cross_antpairpol, freq_slice,
auto_ant=53)

assert np.logical_not(np.any(var_for_frop == var[:, freq_slice]))

# Now give it some nsamples == 0 to deal with
# Should replace all of one channel with a peculiarly chosen value of 2.3
nsamples[cross_antpairpol][:, 48] = 0
with pytest.warns(UserWarning,
with pytest.warns(UserWarning,
match="Not all nonfinite variance locations are of zero weight!"):
var_for_frop = frf.prep_var_for_frop(data, nsamples, weights,
cross_antpairpol, freq_slice,
auto_ant=53,
default_value=2.3)
assert np.all(var_for_frop[:, 48-37] == 2.3)
assert np.all(var_for_frop[:, 48 - 37] == 2.3)


def test_get_FRF_cov():
uvh5 = os.path.join(DATA_PATH, "test_input/zen.2458101.46106.xx.HH.OCR_53x_54x_only.uvh5")
hd = io.HERAData([uvh5])
data, flags, nsamples = hd.read()
data, flags, nsamples = hd.read(fix_autos=True)
# The first 37 freqs give 0 variance so just exclude them to make the rest
# of the test more transparent
freq_slice = slice(37, 1024)
Expand All @@ -1224,67 +1225,67 @@ def test_get_FRF_cov():
var_for_frop = frf.prep_var_for_frop(data, nsamples, weights,
cross_antpairpol, freq_slice,
auto_ant=53)

dt = times[1] - times[0]
Navg = int(np.round(300. / dt))
n_avg_int = int(np.ceil(len(data.lsts) / Navg))
target_lsts = [np.mean(np.unwrap(data.lsts)[i * Navg:(i+1) * Navg])
target_lsts = [np.mean(np.unwrap(data.lsts)[i * Navg:(i + 1) * Navg])
for i in range(n_avg_int)]
dlst = [target_lsts[i] - l for i in range(n_avg_int)
for l in np.unwrap(data.lsts)[i * Navg:(i+1) * Navg]]
bl_vec = data.antpos[cross_antpairpol[0]] - data.antpos[cross_antpairpol[1]]
dlst = [target_lsts[i] - lst for i in range(n_avg_int)
for lst in np.unwrap(data.lsts)[i * Navg:(i + 1) * Navg]]
bl_vec = data.antpos[cross_antpairpol[0]] - data.antpos[cross_antpairpol[1]]

# Need to give it some complex structure
filt_cent = 0.005
filt_hw = 0.005
frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs[freq_slice],
weights=weights[cross_antpairpol][:, freq_slice],
frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs[freq_slice],
weights=weights[cross_antpairpol][:, freq_slice],
coherent_avg=True,
eigenval_cutoff=eval_cutoff, dlst=dlst,
nsamples=nsamples[cross_antpairpol][:, freq_slice],
nsamples=nsamples[cross_antpairpol][:, freq_slice],
t_avg=300., bl_vec=bl_vec)

cov = frf.get_FRF_cov(frop, var_for_frop)

# Check that at least it's (close to) hermitian at every frequency
# Tests for value sensibility exist in single baseline PS notebook
for freq_ind in range(1024 - 37):
assert np.allclose(cov[freq_ind], cov[freq_ind].T.conj())


# Pretend we forgot to slice things
with pytest.raises(ValueError, match="nsamples has wrong shape"):
frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs[freq_slice],
weights=weights[cross_antpairpol][:, freq_slice],
coherent_avg=True,
eigenval_cutoff=eval_cutoff,
dlst=dlst,
nsamples=nsamples[cross_antpairpol],
t_avg=300., bl_vec=bl_vec)
frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs[freq_slice],
weights=weights[cross_antpairpol][:, freq_slice],
coherent_avg=True,
eigenval_cutoff=eval_cutoff,
dlst=dlst,
nsamples=nsamples[cross_antpairpol],
t_avg=300., bl_vec=bl_vec)

with pytest.raises(ValueError, match="weights has wrong shape"):
frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs[freq_slice],
weights=weights[cross_antpairpol],
coherent_avg=True,
eigenval_cutoff=eval_cutoff,
dlst=dlst,
nsamples=nsamples[cross_antpairpol],
t_avg=300., bl_vec=bl_vec)
frop = frf.get_frop_for_noise(times, filt_cent, filt_hw,
freqs=data.freqs[freq_slice],
weights=weights[cross_antpairpol],
coherent_avg=True,
eigenval_cutoff=eval_cutoff,
dlst=dlst,
nsamples=nsamples[cross_antpairpol],
t_avg=300., bl_vec=bl_vec)


def test_get_corr_and_factor():
# Actually just going to test an easy analytic case here
base = np.array([[2, 1, 0],
[1, 3, 1],
base = np.array([[2, 1, 0],
[1, 3, 1],
[0, 1, 4]])
cov = np.array([base, 2 * base])
corr = frf.get_corr(cov)

answer = np.array([[1, 1/np.sqrt(6), 0],
[1/np.sqrt(6), 1, 1/np.sqrt(12)],
[0, 1/np.sqrt(12), 1]])
answer = np.array([[1, 1 / np.sqrt(6), 0],
[1 / np.sqrt(6), 1, 1 / np.sqrt(12)],
[0, 1 / np.sqrt(12), 1]])

answer = np.array([answer, answer])

Expand All @@ -1308,4 +1309,4 @@ def test_get_corr_and_factor():
Neff = blocklen**2 / sum_sq
factor_slice_answer = blocklen / Neff

assert np.allclose(factor_slice, factor_slice_answer)
assert np.allclose(factor_slice, factor_slice_answer)

0 comments on commit c8505a1

Please sign in to comment.