From 4df0dd2b7569cd15299c318f7d14dcf11351c31b Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Thu, 21 Dec 2023 13:45:31 -0800 Subject: [PATCH 1/4] adds the select_random_timeseries function --- src/tape/ensemble.py | 30 ++++++++++++++++++++++++++++++ tests/tape_tests/test_ensemble.py | 21 +++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index bc31ac7f..29db16eb 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1847,6 +1847,36 @@ def _sync_tables(self): self.object.set_dirty(False) return self + def select_random_timeseries(self, seed=None): + """Selects a random lightcurve from the Ensemble + + Parameters + ---------- + seed: int, or None + Sets a seed to return the same object id on successive runs. `None` + by default, in which case a seed is not set for the operation. + + Returns + ------- + ts: `TimeSeries` + Timeseries for a single object + + """ + + if seed is not None: + np.random.seed(seed) + + # Avoid a choice from full index space, select a random partition to grab from + if self.object.npartitions > 1: + partition_num = np.random.randint(0, self.object.npartitions - 1) + partition_ids = self.object.get_partition(partition_num).index.values + lcid = np.random.choice(partition_ids) + else: + partition_num = 0 + lcid = np.random.choice(self.object.index.values) + print(f"Selected Object {lcid} from Partition {partition_num}") + return self.to_timeseries(lcid) + def to_timeseries( self, target, diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 0765b1db..04e85e78 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -17,6 +17,7 @@ TapeSeries, TapeObjectFrame, TapeSourceFrame, + TimeSeries, ) from tape.analysis.stetsonj import calc_stetson_J from tape.analysis.structure_function.base_argument_container import StructureFunctionArgumentContainer @@ -1829,6 +1830,26 @@ def test_batch_with_custom_frame_meta(parquet_ensemble, custom_meta): assert isinstance(parquet_ensemble.select_frame("sf2_result"), EnsembleFrame) +@pytest.mark.parametrize("repartition", [False, True]) +@pytest.mark.parametrize("seed", [None, 42]) +def test_select_random_timeseries(parquet_ensemble, repartition, seed): + """Test the behavior of ensemble.select_random_timeseries""" + + ens = parquet_ensemble + + if repartition: + ens.object = ens.object.repartition(3) + + ts = ens.select_random_timeseries(seed=seed) + + assert isinstance(ts, TimeSeries) + + if seed == 42 and not repartition: + assert ts.meta["id"] == 88480000587403327 + elif seed == 42 and repartition: + assert ts.meta["id"] == 88480000310609896 + + def test_to_timeseries(parquet_ensemble): """ Test that ensemble.to_timeseries() runs and assigns the correct metadata From 16e1fe93534ad623abf3c38a454757933f13d836 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 22 Dec 2023 11:38:39 -0800 Subject: [PATCH 2/4] refactor to shuffle approach; add empty partition test --- src/tape/ensemble.py | 45 ++++++++++++++++++++++--------- tests/tape_tests/test_ensemble.py | 41 ++++++++++++++++++++++++++-- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 29db16eb..27caf8d9 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1848,7 +1848,7 @@ def _sync_tables(self): return self def select_random_timeseries(self, seed=None): - """Selects a random lightcurve from the Ensemble + """Selects a random lightcurve from a random partition of the Ensemble. Parameters ---------- @@ -1861,20 +1861,41 @@ def select_random_timeseries(self, seed=None): ts: `TimeSeries` Timeseries for a single object - """ + Note + ---- + This is not uniformly sampled. As a random partition is chosen first to + avoid a search in full index space, and partitions may vary in the + number of objects they contain. In other words, objects in smaller + partitions will have a higher probability of being chosen than objects + in larger partitions. + """ if seed is not None: - np.random.seed(seed) - - # Avoid a choice from full index space, select a random partition to grab from - if self.object.npartitions > 1: - partition_num = np.random.randint(0, self.object.npartitions - 1) - partition_ids = self.object.get_partition(partition_num).index.values - lcid = np.random.choice(partition_ids) + rng = np.random.default_rng(seed) else: - partition_num = 0 - lcid = np.random.choice(self.object.index.values) - print(f"Selected Object {lcid} from Partition {partition_num}") + rng = np.random + + # We will select one partition at random to select an object from + partitions = np.array(range(self.object.npartitions)) + rng.shuffle(partitions) # shuffle for empty checking + + object_selected = False + i = 0 + + # Scan through the shuffled partition list until a partition with data is found + while not object_selected: + partition_index = self.object.partitions[partitions[i]].index + # Check for empty partitions + if len(partition_index) > 0: + lcid = rng.choice(partition_index.values) # randomly select lightcurve + print(f"Selected Object {lcid} from Partition {partitions[i]}") + object_selected = True + else: + print(f"skipped empty partition: {partitions[i]}") + i += 1 + if i > len(partitions): + raise IndexError("Found no object IDs in the Object Table.") + return self.to_timeseries(lcid) def to_timeseries( diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 04e85e78..1fcbc664 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -1845,9 +1845,46 @@ def test_select_random_timeseries(parquet_ensemble, repartition, seed): assert isinstance(ts, TimeSeries) if seed == 42 and not repartition: - assert ts.meta["id"] == 88480000587403327 + assert ts.meta["id"] == 88472935274829959 elif seed == 42 and repartition: - assert ts.meta["id"] == 88480000310609896 + assert ts.meta["id"] == 88480001333818899 + + +@pytest.mark.parametrize("all_empty", [False, True]) +def test_select_random_timeseries_empty_partitions(dask_client, all_empty): + "Test the edge case where object has empty partitions" + + data_dict = { + "id": [42], + "flux": [1], + "time": [1], + "err": [1], + "band": [1], + } + + colmap = ColumnMapper().assign( + id_col="id", + time_col="time", + flux_col="flux", + err_col="err", + band_col="band", + ) + + ens = Ensemble(client=dask_client) + ens.from_source_dict(data_dict, column_mapper=colmap) + + # The single id will be in the last partition + ens.object = ens.object.repartition(5) + + # Remove the last partition, make sure we get the expected error when the + # Object table has no IDs in any partition + if all_empty: + ens.object = ens.object.partitions[0:-1] + with pytest.raises(IndexError): + ens.select_random_timeseries() + else: + ts = ens.select_random_timeseries() + assert ts.meta["id"] == 42 # Should always find the only object def test_to_timeseries(parquet_ensemble): From f8c584713d1ae27fa0daafadcdd227042bbf7725 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 22 Dec 2023 11:49:52 -0800 Subject: [PATCH 3/4] fix off by one; remove partition output --- src/tape/ensemble.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 27caf8d9..9ef50499 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1891,9 +1891,8 @@ def select_random_timeseries(self, seed=None): print(f"Selected Object {lcid} from Partition {partitions[i]}") object_selected = True else: - print(f"skipped empty partition: {partitions[i]}") i += 1 - if i > len(partitions): + if i >= len(partitions): raise IndexError("Found no object IDs in the Object Table.") return self.to_timeseries(lcid) From bc5ff2b1771b23c2da89a4cc73b01aef175d1b32 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Fri, 22 Dec 2023 11:51:47 -0800 Subject: [PATCH 4/4] pair down rng logic --- src/tape/ensemble.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 9ef50499..e642263a 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1870,10 +1870,8 @@ def select_random_timeseries(self, seed=None): in larger partitions. """ - if seed is not None: - rng = np.random.default_rng(seed) - else: - rng = np.random + + rng = np.random.default_rng(seed) # We will select one partition at random to select an object from partitions = np.array(range(self.object.npartitions))