Skip to content

Commit

Permalink
feat: make ds factor a parameter and remove threadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
skhrg committed Dec 19, 2024
1 parent 0345386 commit e5ecdd2
Showing 1 changed file with 20 additions and 26 deletions.
46 changes: 20 additions & 26 deletions sotodlib/tod_ops/jumps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import concurrent.futures
import os
from typing import Literal, Optional, Tuple, Union, cast, overload

import numpy as np
Expand All @@ -12,7 +10,6 @@
from so3g import (
matched_jumps,
matched_jumps64,
clean_flag,
find_quantized_jumps,
find_quantized_jumps64,
)
Expand All @@ -21,8 +18,6 @@

from ..flag_utils import _merge

NFUTURE = int(os.environ.get("NUM_FUTURES", min(32, int(os.cpu_count() or 0) + 4)))


def std_est(
x: NDArray[np.floating],
Expand Down Expand Up @@ -204,19 +199,16 @@ def _fix(jump_ranges, heights, x_fixed):
jumps = RangesMatrix.from_mask(np.atleast_2d(jumps))
elif isinstance(jumps, Ranges):
jumps = RangesMatrix.from_mask(np.atleast_2d(jumps.mask()))
if not isinstance(jumps, RangesMatrix):
raise TypeError("jumps not RangesMatrix or convertable to RangesMatrix")

if heights is None:
heights = estimate_heights(x_fixed, jumps.mask(), **kwargs)
elif isinstance(heights, csr_array):
heights = heights.toarray()
heights = cast(NDArray[np.floating], heights)

nfuture = min(len(x_fixed), NFUTURE)
slice_size = len(x_fixed) // nfuture
slices = [slice(i * slice_size, (i + 1) * slice_size) for i in range(nfuture)]
slices[-1] = slice(slices[-1].start, len(x_fixed))
with concurrent.futures.ThreadPoolExecutor() as e:
_ = [e.submit(_fix, jumps.ranges[s], heights[s], x_fixed[s]) for s in slices]
_fix(jumps.ranges, heights, x_fixed)

return x_fixed.reshape(orig_shape)

Expand Down Expand Up @@ -337,6 +329,7 @@ def twopi_jumps(
merge=...,
overwrite=...,
name=...,
ds=...,
**filter_pars,
) -> Tuple[RangesMatrix, csr_array, NDArray[np.floating]]:
...
Expand All @@ -355,6 +348,7 @@ def twopi_jumps(
merge=...,
overwrite=...,
name=...,
ds=...,
**filter_pars,
) -> Tuple[RangesMatrix, csr_array]:
...
Expand All @@ -372,6 +366,7 @@ def twopi_jumps(
merge: bool = True,
overwrite: bool = False,
name: str = "jumps_2pi",
ds: int = 10,
**filter_pars,
) -> Union[
Tuple[RangesMatrix, csr_array], Tuple[RangesMatrix, csr_array, NDArray[np.floating]]
Expand Down Expand Up @@ -410,6 +405,8 @@ def twopi_jumps(
name: String used to populate field in flagmanager if merge is True.
ds: Downsample factor used when computing noise level, the actual factor used is `ds*win_size`.
**filter_pars: Parameters to pass to _filter
Returns:
Expand All @@ -428,7 +425,7 @@ def twopi_jumps(
raise TypeError("Signal is not an array")
if atol is None:
atol = nsigma * std_est(
signal.astype(float), ds=win_size * 10, win_size=win_size
signal.astype(float), ds=win_size * ds, win_size=win_size
)
np.clip(atol, 1e-8, max_tol)

Expand Down Expand Up @@ -602,6 +599,7 @@ def find_jumps(
merge=...,
overwrite=...,
name=...,
ds=...,
**filter_pars,
) -> Tuple[RangesMatrix, csr_array]:
...
Expand All @@ -620,6 +618,7 @@ def find_jumps(
merge=...,
overwrite=...,
name=...,
ds=...,
**filter_pars,
) -> Tuple[RangesMatrix, csr_array, NDArray[np.floating]]:
...
Expand All @@ -637,6 +636,7 @@ def find_jumps(
merge: bool = True,
overwrite: bool = False,
name: str = "jumps",
ds: int = 10,
**filter_pars,
) -> Union[
Tuple[RangesMatrix, csr_array], Tuple[RangesMatrix, csr_array, NDArray[np.floating]]
Expand Down Expand Up @@ -678,6 +678,8 @@ def find_jumps(
name: String used to populate field in flagmanager if merge is True.
ds: Downsample factor used when computing noise level, the actual factor used is `ds*win_size`.
**filter_pars: Parameters to pass to _filter
Returns:
Expand All @@ -697,36 +699,28 @@ def find_jumps(
raise TypeError("Signal is not an array")

orig_shape = signal.shape
_signal = _filter(signal, **filter_pars)
_signal = np.atleast_2d(_signal)

if len(orig_shape) > 2:
raise ValueError("Jumpfinder only works on 1D or 2D data")

if min_size is None and min_sigma is not None:
min_size = min_sigma * std_est(
signal, ds=win_size * 10, win_size=win_size, axis=-1
signal, ds=win_size * ds, win_size=win_size, axis=-1
)
if min_size is None:
raise ValueError("min_size is somehow still None")
if isinstance(min_size, np.ndarray) and np.ndim(min_size) > 1: # type: ignore
raise ValueError("min_size must be 1d or a scalar")
elif isinstance(min_size, (float, int)):
min_size = float(min_size) * np.ones(len(signal))
min_size = float(min_size) * np.ones(len(_signal))

_signal = _filter(signal, **filter_pars)
_signal = np.atleast_2d(_signal)
# Mean subtract, if we don't do this then when we cumsum we get floats
# that are too big and lack the precicion to find jumps well
_signal -= np.mean(_signal, axis=-1)[..., None]

nfuture = min(len(_signal), NFUTURE)
slice_size = len(_signal) // nfuture
slices = [slice(i * slice_size, (i + 1) * slice_size) for i in range(nfuture)]
slices[-1] = slice(slices[-1].start, len(_signal))
with concurrent.futures.ThreadPoolExecutor() as e:
jump_futures = [
e.submit(_jumpfinder, _signal[s], min_size[s], win_size, exact)
for s in slices
]
jumps = np.vstack([j.result() for j in jump_futures]).reshape(orig_shape)
jumps = _jumpfinder(_signal, min_size, win_size, exact).reshape(orig_shape)

jump_ranges = RangesMatrix.from_mask(jumps).buffer(int(win_size / 2))
jumps = jump_ranges.mask()
Expand Down

0 comments on commit e5ecdd2

Please sign in to comment.