Skip to content

Commit

Permalink
Merge pull request #201 from lincc-frameworks/light-curve-package
Browse files Browse the repository at this point in the history
Support of light-curve package
  • Loading branch information
hombit authored Aug 30, 2023
2 parents c1d9db7 + 6120262 commit d3a250b
Show file tree
Hide file tree
Showing 7 changed files with 620 additions and 74 deletions.
459 changes: 402 additions & 57 deletions docs/tutorials/working_with_the_ensemble.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
'coverage',
"deprecated",
"ipykernel", # Support for Jupyter notebooks
"light-curve>=0.7.3,<0.8.0",
]

# On a mac, install optional dependencies with `pip install '.[dev]'` (include the single quotes)
Expand Down
1 change: 1 addition & 0 deletions src/tape/analysis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base import AnalysisFunction # noqa
from .feature_extractor import FeatureExtractor # noqa
from .light_curve import LightCurve # noqa
from .stetsonj import * # noqa
from .structurefunction2 import * # noqa
95 changes: 95 additions & 0 deletions src/tape/analysis/feature_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Auxiliary code for time-series feature extraction with "light-curve" package
"""

from typing import List

import numpy as np
import pandas as pd
from light_curve.light_curve_ext import _FeatureEvaluator as BaseLightCurveFeature

from tape.analysis.base import AnalysisFunction


__all__ = ["FeatureExtractor", "BaseLightCurveFeature"]


class FeatureExtractor(AnalysisFunction):
"""Apply light-curve package feature extractor to a light curve
Parameters
----------
feature : light_curve.light_curve_ext._FeatureEvaluator
Feature extractor to apply, see "light-curve" package for more details.
Attributes
----------
feature : light_curve.light_curve_ext._FeatureEvaluator
Feature extractor to apply, see "light-curve" package for more details.
"""

def __init__(self, feature: BaseLightCurveFeature):
self.feature = feature

def cols(self, ens: "Ensemble") -> List[str]:
return [ens._time_col, ens._flux_col, ens._err_col, ens._band_col]

def meta(self, ens: "Ensemble") -> pd.DataFrame:
"""Return the schema of the analysis function output.
It always returns a pandas.DataFrame with the same columns as
`self.feature.names` and dtype `np.float64`. However, if
input columns are all single precision floats then the output dtype
will be `np.float32`.
"""
return pd.DataFrame(dtype=np.float64, columns=self.feature.names)

def on(self, ens: "Ensemble") -> List[str]:
return [ens._id_col]

def __call__(self, time, flux, err, band, *, band_to_calc: str, **kwargs) -> pd.DataFrame:
"""
Apply a feature extractor to a light curve, concatenating the results over
all bands.
Parameters
----------
time : `numpy.ndarray`
Time values
flux : `numpy.ndarray`
Brightness values, flux or magnitudes
err : `numpy.ndarray`
Errors for "flux"
band : `numpy.ndarray`
Passband names.
band_to_calc : `str`
Name of the passband to calculate features for.
**kwargs : `dict`
Additional keyword arguments to pass to the feature extractor.
Returns
-------
features : pandas.DataFrame
Feature values for each band, dtype is a common type for input arrays.
"""

# Select passband to calculate
band_mask = band == band_to_calc
time, flux, err = (a[band_mask] for a in (time, flux, err))

# Sort inputs by time if not already sorted
if not kwargs.get("sorted", False):
sort_idx = np.argsort(time)
time, flux, err, band = (a[sort_idx] for a in (time, flux, err, band))
# Now we can update the kwargs for better performance
kwargs = kwargs.copy()
kwargs["sorted"] = True

# Convert the numerical arrays to a common dtype
dtype = np.find_common_type([a.dtype for a in (time, flux, err)], [])
time, flux, err = (a.astype(dtype) for a in (time, flux, err))

values = self.feature(time, flux, err, **kwargs)

series = pd.Series(dict(zip(self.feature.names, values)))
return series
68 changes: 55 additions & 13 deletions src/tape/ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from dask.distributed import Client

from .analysis.base import AnalysisFunction
from .analysis.feature_extractor import BaseLightCurveFeature, FeatureExtractor
from .analysis.structure_function import SF_METHODS
from .analysis.structurefunction2 import calc_sf2
from .timeseries import TimeSeries
Expand Down Expand Up @@ -442,15 +443,22 @@ def coalesce(self, input_cols, output_col, table="object", drop_inputs=False):
else:
raise ValueError(f"{table} is not one of 'object' or 'source'")

# Create a subset dataframe with the coalesced columns
# Drop index for dask series operations - unfortunate
coal_ddf = table_ddf[input_cols].reset_index()

# Coalesce each column iteratively
i = 0
coalesce_col = table_ddf[input_cols[0]]
coalesce_col = coal_ddf[input_cols[0]]
while i < len(input_cols) - 1:
coalesce_col = coalesce_col.combine_first(table_ddf[input_cols[i + 1]])
coalesce_col = coalesce_col.combine_first(coal_ddf[input_cols[i + 1]])
i += 1
print("am I using this code")
# Assign the new column to the subset df, and reintroduce index
coal_ddf = coal_ddf.assign(**{output_col: coalesce_col}).set_index(self._id_col)

# assign the result to the desired column name
table_ddf = table_ddf.assign(**{output_col: coalesce_col})
table_ddf = table_ddf.assign(**{output_col: coal_ddf[output_col]})

# Drop the input columns if wanted
if drop_inputs:
Expand Down Expand Up @@ -667,16 +675,24 @@ def batch(self, func, *args, meta=None, use_map=True, compute=True, on=None, **k
Parameters
----------
func : `function`
A function to apply to all objects in the ensemble
A function to apply to all objects in the ensemble. The function
could be a TAPE function, an initialized feature extractor from
`light-curve` package or a user-defined function. In the least
case the function must have the following signature:
`func(*cols, **kwargs)`, where the names of the `cols` are
specified in `args`, `kwargs` are keyword arguments passed to the
function, and the return value schema is described by `meta`.
For TAPE and `light-curve` functions `args`, `meta` and `on` are
populated automatically.
*args:
Denotes the ensemble columns to use as inputs for a function,
order must be correct for function. If passing a TAPE
function, these are populated automatically.
or `light-curve` function, these are populated automatically.
meta : `pd.Series`, `pd.DataFrame`, `dict`, or `tuple-like`
Dask's meta parameter, which lays down the expected structure of
the results. Overridden by TAPE for TAPE
the results. Overridden by TAPE for TAPE and `light-curve`
functions. If none, attempts to coerce the result to a
pandas.series.
pandas.Series.
use_map : `boolean`
Determines whether `dask.dataframe.DataFrame.map_partitions` is
used (True). Using map_partitions is generally more efficient, but
Expand All @@ -687,24 +703,50 @@ def batch(self, func, *args, meta=None, use_map=True, compute=True, on=None, **k
later compute call.
on: 'str' or 'list'
Designates which column(s) to groupby. Columns may be from the
source or object tables.
source or object tables. For TAPE and `light-curve` functions
this is populated automatically.
**kwargs:
Additional optional parameters passed for the selected function
Returns
----------
-------
result: `Dask.Series`
Series of function results
Example
----------
`
Examples
--------
Run a TAPE function on the ensemble:
```
from tape.analysis.stetsonj import calc_stetson_J
ens = Ensemble().from_dataset('rrlyr82')
ensemble.batch(calc_stetson_J, band_to_calc='i')
`
```
Run a light-curve function on the ensemble:
```
from light_curve import EtaE
ens.batch(EtaE(), band_to_calc='g')
```
Run a custom function on the ensemble:
```
def s2n_inter_quartile_range(flux, err):
first, third = np.quantile(flux / err, [0.25, 0.75])
return third - first
ens.batch(s2n_inter_quartile_range, ens._flux_col, ens._err_col)
```
Or even a numpy built-in function:
```
amplitudes = ens.batch(np.ptp, ens._flux_col)
```
"""
self._lazy_sync_tables(table="all")

# Convert light-curve package feature into analysis function
if isinstance(func, BaseLightCurveFeature):
func = FeatureExtractor(func)
# Extract function information if TAPE analysis function
if isinstance(func, AnalysisFunction):
args = func.cols(self)
meta = func.meta(self)
Expand Down
12 changes: 8 additions & 4 deletions tests/tape_tests/test_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@


@pytest.mark.parametrize("cls", analysis.AnalysisFunction.__subclasses__())
def test_analysis_function(cls):
def test_analysis_function(cls, dask_client):
"""
Test AnalysisFunction child classes
"""
# We skip child classes with non-trivial constructors
try:
obj = cls()
except TypeError:
pytest.skip(f"Class {cls} has non-trivial constructor")

rows = {
"id": [8001, 8001, 8001, 8001, 8002, 8002, 8002, 8002, 8002],
"time": [10.1, 10.2, 10.2, 11.1, 11.2, 11.3, 11.4, 15.0, 15.1],
Expand All @@ -22,9 +28,7 @@ def test_analysis_function(cls):
"flux": [1.0, 2.0, 5.0, 3.0, 1.0, 2.0, 3.0, 4.0, 5.0],
}
cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="err", band_col="band")
ens = Ensemble().from_source_dict(rows, column_mapper=cmap)

obj = cls()
ens = Ensemble(client=dask_client).from_source_dict(rows, column_mapper=cmap)

assert isinstance(obj.cols(ens), list)
assert len(obj.cols(ens)) > 0
Expand Down
58 changes: 58 additions & 0 deletions tests/tape_tests/test_feature_extraction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Test feature extraction with light_curve package"""

import light_curve as licu
import numpy as np
from numpy.testing import assert_array_equal, assert_allclose

from tape import Ensemble
from tape.analysis.feature_extractor import FeatureExtractor
from tape.utils import ColumnMapper


def test_stetsonk():
stetson_k = licu.StetsonK()

time = np.array([5.0, 4.0, 3.0, 2.0, 1.0, 0.0] * 2)
flux = 1.0 + time**2.0
err = np.full_like(time, 0.1, dtype=np.float32)
band = np.r_[["g"] * 6, ["r"] * 6]

extract_features = FeatureExtractor(stetson_k)
result = extract_features(time=time, flux=flux, err=err, band=band, band_to_calc="g")
assert result.shape == (1,)
assert_array_equal(result.index, ["stetson_K"])
assert_allclose(result.values, 0.84932, rtol=1e-5)
assert_array_equal(result.dtypes, np.float64)


def test_stetsonk_with_ensemble(dask_client):
n = 5

object1 = {
"id": np.full(n, 1),
"time": np.arange(n, dtype=np.float64),
"flux": np.linspace(1.0, 2.0, n),
"err": np.full(n, 0.1),
"band": np.full(n, "g"),
}
object2 = {
"id": np.full(2 * n, 2),
"time": np.arange(2 * n, dtype=np.float64),
"flux": np.r_[np.linspace(1.0, 2.0, n), np.linspace(1.0, 2.0, n)],
"err": np.full(2 * n, 0.01),
"band": np.r_[np.full(n, "g"), np.full(n, "r")],
}
rows = {column: np.concatenate([object1[column], object2[column]]) for column in object1}

cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="err", band_col="band")
ens = Ensemble(dask_client).from_source_dict(rows, cmap)

stetson_k = licu.Extractor(licu.AndersonDarlingNormal(), licu.InterPercentileRange(0.25), licu.StetsonK())
result = ens.batch(
stetson_k,
band_to_calc="g",
)

assert result.shape == (2, 3)
assert_array_equal(result.columns, ["anderson_darling_normal", "inter_percentile_range_25", "stetson_K"])
assert_allclose(result, [[0.114875, 0.625, 0.848528]] * 2, atol=1e-5)

0 comments on commit d3a250b

Please sign in to comment.