diff --git a/flint/leakage.py b/flint/leakage.py index 9a414f4..0a22bbe 100644 --- a/flint/leakage.py +++ b/flint/leakage.py @@ -2,7 +2,7 @@ from argparse import ArgumentParser from pathlib import Path -from typing import Dict, NamedTuple, Union, Optional, Tuple +from typing import Dict, NamedTuple, Union, Optional import astropy.units as u import numpy as np @@ -225,12 +225,23 @@ def load_and_filter_components( return comp_table +class PolStatistics(NamedTuple): + """Simple container for statistics around the extraction of leakage polarisation statistics""" + + peak: np.ndarray + """The peak pixel value""" + noise: np.ndarray + """The standard deviation of pixels within a box""" + mean: np.ndarray + """The mean of pixels within a box""" + + def extract_pol_stats_in_box( pol_image: np.ndarray, pixel_coords: PixelCoords, search_box_size: int, noise_box_size: int, -) -> Tuple[np.ndarray, np.ndarray]: +) -> PolStatistics: """Construct two boxes around nominated pixel coordinates to: * extract the peak signal within @@ -243,7 +254,7 @@ def extract_pol_stats_in_box( noise_box_size (int): Size of box to calculate the RMS over Returns: - Tuple[np.ndarray, np.ndarray]: Extracted peak polarised signal and noise + PolStatistics: Extracted statistics, including peak polarised signal, noise and mean """ y_max, x_max = pol_image.shape[-2:] @@ -251,6 +262,7 @@ def extract_pol_stats_in_box( pol_peak = None pol_noise = None + pol_mean = None for idx, box_size in enumerate((search_box_size, noise_box_size)): box_delta = int(np.ceil(box_size / 2)) @@ -291,11 +303,21 @@ def extract_pol_stats_in_box( for data in search_box ] ) + elif idx == 2: + pol_mean = np.array( + [ + np.nanmean(data) if np.any(np.isfinite(data)) else np.nan + for data in search_box + ] + ) assert pol_peak is not None, f"{pol_peak=}, which should not happen" assert pol_noise is not None, f"{pol_noise=}, which should not happen" + assert pol_mean is not None, f"{pol_mean=}, which should not happen" + + pol_stats = PolStatistics(peak=pol_peak, noise=pol_noise, mean=pol_mean) - return pol_peak, pol_noise + return pol_stats def _get_output_catalogue_path( @@ -354,18 +376,19 @@ def create_leakge_component_table( i_values = components[peak_flux_col] pol_pixel_coords = get_xy_pixel_coords(table=components, wcs=pol_fits.wcs) - pol_peak, pol_noise = extract_pol_stats_in_box( + pol_stats = extract_pol_stats_in_box( pol_image=pol_fits.data, pixel_coords=pol_pixel_coords, search_box_size=leakage_filters.search_box_size, noise_box_size=leakage_filters.noise_box_size, ) - frac_values = pol_peak / i_values + frac_values = pol_stats.peak / i_values logger.info(f"{frac_values.shape=}") components[f"{pol}_fraction"] = frac_values - components[f"{pol}_peak"] = pol_peak - components[f"{pol}_noise"] = pol_noise + components[f"{pol}_peak"] = pol_stats.peak + components[f"{pol}_noise"] = pol_stats.noise + components[f"{pol}_mean"] = pol_stats.mean output_path = _get_output_catalogue_path( input_path=catalogue if isinstance(catalogue, Path) else pol_image,