From d71db3a7b800b0659ba8cdac7b9ae5df448b95a9 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 23 Oct 2023 15:03:10 -0700 Subject: [PATCH 1/7] add check functions --- pyproject.toml | 2 +- src/tape/ensemble.py | 74 ++++++++++++++++++++++++++++--- tests/tape_tests/test_ensemble.py | 54 ++++++++++++++++++++++ 3 files changed, 124 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 51cbc490..3e7021bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dynamic=["version"] dependencies = [ 'pandas', 'numpy<=1.23.5', - 'dask>=2023.5.0', + 'dask>=2023.6.1', 'dask[distributed]', 'pyarrow', 'pyvo', diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index bda92361..39930086 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -7,6 +7,7 @@ import pandas as pd from dask.distributed import Client +from collections import Counter from .analysis.base import AnalysisFunction from .analysis.feature_extractor import BaseLightCurveFeature, FeatureExtractor @@ -151,7 +152,7 @@ def insert_sources( # Create the new row and set the paritioning to match the original dataframe. df2 = dd.DataFrame.from_dict(rows, npartitions=1) - df2 = df2.set_index(self._id_col, drop=True) + df2 = df2.set_index(self._id_col, drop=True, sort=False) # Save the divisions and number of partitions. prev_div = self._source.divisions @@ -206,6 +207,54 @@ def info(self, verbose=True, memory_usage=True, **kwargs): print("Source Table") self._source.info(verbose=verbose, memory_usage=memory_usage, **kwargs) + def check_sorted(self, table="object"): + """Checks to see if an Ensemble Dataframe is sorted on the index. + + Parameters + ---------- + table: `str`, optional + The table to check. + + Returns + ------- + A boolean value indicating whether the index is sorted (True) + or not (False) + """ + if table == "object": + idx = self._object.index + elif table == "source": + idx = self._source.index + else: + raise ValueError(f"{table} is not one of 'object' or 'source'") + return idx.map_partitions(lambda a: np.all(a[:-1] <= a[1:])).compute().all() + + def check_lightcurve_cohesion(ens): + """Checks to see if lightcurves are split across multiple partitions. + + With partitioned data, and source information represented by rows, it + is possible that when loading data or manipulating it in some way (most + likely a repartition) that the sources for a given object will be split + among multiple partitions. This function will check to see if all + lightcurves are "cohesive", meaning the sources for that object only + live in a single partition of the dataset. + + Returns + ------- + A boolean value indicating whether the sources tied to a given object + are only found in a single partition (True), or if they are split + across multiple partitions (False) + + """ + idx = ens._source.index + counts = idx.map_partitions(lambda a: Counter(a.unique())).compute() + + unq_counter = counts[0] + for i in range(len(counts) - 1): + unq_counter += counts[i + 1] + if any(c >= 2 for c in unq_counter.values()): + return False + return True + def compute(self, table=None, **kwargs): """Wrapper for dask.dataframe.DataFrame.compute() @@ -802,7 +851,9 @@ def batch(self, func, *args, meta=None, use_map=True, compute=True, on=None, **k Determines whether `dask.dataframe.DataFrame.map_partitions` is used (True). Using map_partitions is generally more efficient, but requires the data from each lightcurve is housed in a single - partition. If False, a groupby will be performed instead. + partition. This can be checked using + `Ensemble.check_lightcurve_cohesion`. If False, a groupby will be + performed instead. compute: `boolean` Determines whether to compute the result immediately or hold for a later compute call. @@ -961,6 +1012,7 @@ def from_dask_dataframe( sync_tables=True, npartitions=None, partition_size=None, + sort=False, **kwargs, ): """Read in Dask dataframe(s) into an ensemble object @@ -985,6 +1037,9 @@ def from_dask_dataframe( partition_size: `int`, optional If specified, attempts to repartition the ensemble to partitions of size `partition_size`. + sort: `bool`, optional + If True, sorts the DataFrame by the id column. Otherwise set the index + on the individual existing partitions. Defaults to False. Returns ---------- @@ -994,14 +1049,14 @@ def from_dask_dataframe( self._load_column_mapper(column_mapper, **kwargs) # Set the index of the source frame and save the resulting table - self._source = source_frame.set_index(self._id_col, drop=True) + self._source = source_frame.set_index(self._id_col, drop=True, sort=sort) if object_frame is None: # generate an indexed object table from source self._object = self._generate_object_table() else: self._object = object_frame - self._object = self._object.set_index(self._id_col) + self._object = self._object.set_index(self._id_col, sort=sort) # Optionally sync the tables, recalculates nobs columns if sync_tables: @@ -1148,6 +1203,7 @@ def from_parquet( additional_cols=True, npartitions=None, partition_size=None, + sort=False, **kwargs, ): """Read in parquet file(s) into an ensemble object @@ -1181,6 +1237,9 @@ def from_parquet( partition_size: `int`, optional If specified, attempts to repartition the ensemble to partitions of size `partition_size`. + sort: `bool`, optional + If True, sorts the DataFrame by the id column. Otherwise set the index + on the individual existing partitions. Defaults to False. Returns ---------- @@ -1218,6 +1277,7 @@ def from_parquet( sync_tables=sync_tables, npartitions=npartitions, partition_size=partition_size, + sort=sort, **kwargs, ) @@ -1275,7 +1335,7 @@ def available_datasets(self): return {key: datasets_file[key]["description"] for key in datasets_file.keys()} - def from_source_dict(self, source_dict, column_mapper=None, npartitions=1, **kwargs): + def from_source_dict(self, source_dict, column_mapper=None, npartitions=1, sort=False, **kwargs): """Load the sources into an ensemble from a dictionary. Parameters @@ -1288,6 +1348,9 @@ def from_source_dict(self, source_dict, column_mapper=None, npartitions=1, **kwa npartitions: `int`, optional If specified, attempts to repartition the ensemble to the specified number of partitions + sort: `bool`, optional + If True, sorts the DataFrame by the id column. Otherwise set the index + on the individual existing partitions. Defaults to False. Returns ---------- @@ -1304,6 +1367,7 @@ def from_source_dict(self, source_dict, column_mapper=None, npartitions=1, **kwa column_mapper=column_mapper, sync_tables=True, npartitions=npartitions, + sort=sort, **kwargs, ) diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 3c30aeb5..f2af2618 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -435,6 +435,60 @@ def test_core_wrappers(parquet_ensemble): parquet_ensemble.compute() +@pytest.mark.parametrize("data_sorted", [True, False]) +def test_check_sorted(dask_client, data_sorted): + # Create some fake data. + + if data_sorted: + rows = { + "id": [8001, 8001, 8001, 8001, 8002, 8002, 8002, 8002, 8002], + "time": [10.1, 10.2, 10.2, 11.1, 11.2, 11.3, 11.4, 15.0, 15.1], + "band": ["g", "g", "b", "g", "b", "g", "g", "g", "g"], + "err": [1.0, 2.0, 1.0, 3.0, 2.0, 3.0, 4.0, 5.0, 6.0], + "flux": [1.0, 2.0, 5.0, 3.0, 1.0, 2.0, 3.0, 4.0, 5.0], + } + else: + rows = { + "id": [8001, 8002, 8001, 8001, 8002, 8002, 8001, 8002, 8002], + "time": [10.1, 10.2, 10.2, 11.1, 11.2, 11.3, 11.4, 15.0, 15.1], + "band": ["g", "g", "b", "g", "b", "g", "g", "g", "g"], + "err": [1.0, 2.0, 1.0, 3.0, 2.0, 3.0, 4.0, 5.0, 6.0], + "flux": [1.0, 2.0, 5.0, 3.0, 1.0, 2.0, 3.0, 4.0, 5.0], + } + cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="err", band_col="band") + ens = Ensemble(client=dask_client) + ens.from_source_dict(rows, column_mapper=cmap, sort=False) + + assert ens.check_sorted("source") == data_sorted + + +@pytest.mark.parametrize("data_cohesion", [True, False]) +def test_check_lightcurve_cohesion(dask_client, data_cohesion): + # Create some fake data. + + if data_cohesion: + rows = { + "id": [8001, 8001, 8001, 8001, 8001, 8002, 8002, 8002, 8002], + "time": [10.1, 10.2, 10.2, 11.1, 11.2, 11.3, 11.4, 15.0, 15.1], + "band": ["g", "g", "b", "g", "b", "g", "g", "g", "g"], + "err": [1.0, 2.0, 1.0, 3.0, 2.0, 3.0, 4.0, 5.0, 6.0], + "flux": [1.0, 2.0, 5.0, 3.0, 1.0, 2.0, 3.0, 4.0, 5.0], + } + else: + rows = { + "id": [8001, 8001, 8001, 8001, 8002, 8002, 8002, 8002, 8001], + "time": [10.1, 10.2, 10.2, 11.1, 11.2, 11.3, 11.4, 15.0, 15.1], + "band": ["g", "g", "b", "g", "b", "g", "g", "g", "g"], + "err": [1.0, 2.0, 1.0, 3.0, 2.0, 3.0, 4.0, 5.0, 6.0], + "flux": [1.0, 2.0, 5.0, 3.0, 1.0, 2.0, 3.0, 4.0, 5.0], + } + cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="err", band_col="band") + ens = Ensemble(client=dask_client) + ens.from_source_dict(rows, column_mapper=cmap, sort=False, npartitions=2) + + assert ens.check_lightcurve_cohesion() == data_cohesion + + def test_persist(dask_client): # Create some fake data. rows = { From e990da331e42fbca916d9969baa8d80c151aaa85 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 23 Oct 2023 15:53:24 -0700 Subject: [PATCH 2/7] fix insert issue --- src/tape/ensemble.py | 4 +++- tests/tape_tests/test_ensemble.py | 11 +++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 39930086..36c6930c 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -152,7 +152,7 @@ def insert_sources( # Create the new row and set the paritioning to match the original dataframe. df2 = dd.DataFrame.from_dict(rows, npartitions=1) - df2 = df2.set_index(self._id_col, drop=True, sort=False) + df2 = df2.set_index(self._id_col, drop=True, sort=True) # Save the divisions and number of partitions. prev_div = self._source.divisions @@ -170,6 +170,8 @@ def insert_sources( elif self._source.npartitions != prev_num: self._source = self._source.repartition(npartitions=prev_num) + return self + def client_info(self): """Calls the Dask Client, which returns cluster information diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index f2af2618..9278a702 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -372,8 +372,15 @@ def test_insert_paritioned(dask_client): "flux": [0.5 * float(i) for i in range(num_points)], "band": [all_bands[i % 4] for i in range(num_points)], } - cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="err", band_col="band") - ens.from_source_dict(rows, column_mapper=cmap, npartitions=4) + cmap = ColumnMapper( + id_col="id", + time_col="time", + flux_col="flux", + err_col="err", + band_col="band", + provenance_col="provenance", + ) + ens.from_source_dict(rows, column_mapper=cmap, npartitions=4, sort=True) # Save the old data for comparison. old_data = ens.compute("source") From c0d86df7c24233491ef32d34a0936bf61e1fcb90 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 23 Oct 2023 16:28:59 -0700 Subject: [PATCH 3/7] add sorted flag --- src/tape/ensemble.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 36c6930c..fd4b12ca 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1014,6 +1014,7 @@ def from_dask_dataframe( sync_tables=True, npartitions=None, partition_size=None, + sorted=False, sort=False, **kwargs, ): @@ -1039,9 +1040,12 @@ def from_dask_dataframe( partition_size: `int`, optional If specified, attempts to repartition the ensemble to partitions of size `partition_size`. + sorted: bool, optional + If the index column is already sorted in increasing order. + Defaults to False sort: `bool`, optional - If True, sorts the DataFrame by the id column. Otherwise set the index - on the individual existing partitions. Defaults to False. + If True, sorts the DataFrame by the id column. Otherwise set the + index on the individual existing partitions. Defaults to False. Returns ---------- @@ -1051,14 +1055,14 @@ def from_dask_dataframe( self._load_column_mapper(column_mapper, **kwargs) # Set the index of the source frame and save the resulting table - self._source = source_frame.set_index(self._id_col, drop=True, sort=sort) + self._source = source_frame.set_index(self._id_col, drop=True, sorted=sorted, sort=sort) if object_frame is None: # generate an indexed object table from source self._object = self._generate_object_table() else: self._object = object_frame - self._object = self._object.set_index(self._id_col, sort=sort) + self._object = self._object.set_index(self._id_col, sorted=sorted, sort=sort) # Optionally sync the tables, recalculates nobs columns if sync_tables: @@ -1205,6 +1209,7 @@ def from_parquet( additional_cols=True, npartitions=None, partition_size=None, + sorted=False, sort=False, **kwargs, ): @@ -1239,9 +1244,12 @@ def from_parquet( partition_size: `int`, optional If specified, attempts to repartition the ensemble to partitions of size `partition_size`. + sorted: bool, optional + If the index column is already sorted in increasing order. + Defaults to False sort: `bool`, optional - If True, sorts the DataFrame by the id column. Otherwise set the index - on the individual existing partitions. Defaults to False. + If True, sorts the DataFrame by the id column. Otherwise set the + index on the individual existing partitions. Defaults to False. Returns ---------- @@ -1279,6 +1287,7 @@ def from_parquet( sync_tables=sync_tables, npartitions=npartitions, partition_size=partition_size, + sorted=sorted, sort=sort, **kwargs, ) @@ -1350,9 +1359,12 @@ def from_source_dict(self, source_dict, column_mapper=None, npartitions=1, sort= npartitions: `int`, optional If specified, attempts to repartition the ensemble to the specified number of partitions + sorted: bool, optional + If the index column is already sorted in increasing order. + Defaults to False sort: `bool`, optional - If True, sorts the DataFrame by the id column. Otherwise set the index - on the individual existing partitions. Defaults to False. + If True, sorts the DataFrame by the id column. Otherwise set the + index on the individual existing partitions. Defaults to False. Returns ---------- @@ -1369,6 +1381,7 @@ def from_source_dict(self, source_dict, column_mapper=None, npartitions=1, sort= column_mapper=column_mapper, sync_tables=True, npartitions=npartitions, + sorted=sorted, sort=sort, **kwargs, ) From 35382238bb74c89a3d3ff906a6a3f575a42d8a7b Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 23 Oct 2023 16:33:02 -0700 Subject: [PATCH 4/7] add sorted flag --- src/tape/ensemble.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index fd4b12ca..66880ed7 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1363,7 +1363,7 @@ def from_source_dict(self, source_dict, column_mapper=None, npartitions=1, sort= If the index column is already sorted in increasing order. Defaults to False sort: `bool`, optional - If True, sorts the DataFrame by the id column. Otherwise set the + If True, sorts the DataFrame by the id column. Otherwise set the index on the individual existing partitions. Defaults to False. Returns From 1900e7a487c7e68333c4e5de3cb4dd102ec52184 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 24 Oct 2023 13:56:33 -0700 Subject: [PATCH 5/7] change sort check --- src/tape/ensemble.py | 11 ++++++++--- tests/tape_tests/test_ensemble.py | 7 ++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 66880ed7..28ff3683 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -210,7 +210,8 @@ def info(self, verbose=True, memory_usage=True, **kwargs): self._source.info(verbose=verbose, memory_usage=memory_usage, **kwargs) def check_sorted(self, table="object"): - """Checks to see if an Ensemble Dataframe is sorted on the index. + """Checks to see if an Ensemble Dataframe is sorted (increasing) on + the index. Parameters ---------- @@ -228,7 +229,9 @@ def check_sorted(self, table="object"): idx = self._source.index else: raise ValueError(f"{table} is not one of 'object' or 'source'") - return idx.map_partitions(lambda a: np.all(a[:-1] <= a[1:])).compute().all() + + # Use the existing index function to check if it's sorted (increasing) + return idx.is_monotonic_increasing.compute() def check_lightcurve_cohesion(ens): """Checks to see if lightcurves are split across multiple partitions. @@ -1346,7 +1349,9 @@ def available_datasets(self): return {key: datasets_file[key]["description"] for key in datasets_file.keys()} - def from_source_dict(self, source_dict, column_mapper=None, npartitions=1, sort=False, **kwargs): + def from_source_dict( + self, source_dict, column_mapper=None, npartitions=1, sorted=False, sort=False, **kwargs + ): """Load the sources into an ensemble from a dictionary. Parameters diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 9278a702..1db32e2b 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -443,7 +443,8 @@ def test_core_wrappers(parquet_ensemble): @pytest.mark.parametrize("data_sorted", [True, False]) -def test_check_sorted(dask_client, data_sorted): +@pytest.mark.parametrize("npartitions", [1, 2]) +def test_check_sorted(dask_client, data_sorted, npartitions): # Create some fake data. if data_sorted: @@ -456,7 +457,7 @@ def test_check_sorted(dask_client, data_sorted): } else: rows = { - "id": [8001, 8002, 8001, 8001, 8002, 8002, 8001, 8002, 8002], + "id": [8002, 8002, 8002, 8002, 8002, 8001, 8001, 8002, 8002], "time": [10.1, 10.2, 10.2, 11.1, 11.2, 11.3, 11.4, 15.0, 15.1], "band": ["g", "g", "b", "g", "b", "g", "g", "g", "g"], "err": [1.0, 2.0, 1.0, 3.0, 2.0, 3.0, 4.0, 5.0, 6.0], @@ -464,7 +465,7 @@ def test_check_sorted(dask_client, data_sorted): } cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="err", band_col="band") ens = Ensemble(client=dask_client) - ens.from_source_dict(rows, column_mapper=cmap, sort=False) + ens.from_source_dict(rows, column_mapper=cmap, sort=False, npartitions=npartitions) assert ens.check_sorted("source") == data_sorted From 53bcd082b9c8ce3e6d458c06f2660cc2d57c8892 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 24 Oct 2023 14:03:58 -0700 Subject: [PATCH 6/7] formatting --- src/tape/ensemble.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 28ff3683..5a2e29d1 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -254,8 +254,8 @@ def check_lightcurve_cohesion(ens): counts = idx.map_partitions(lambda a: Counter(a.unique())).compute() unq_counter = counts[0] - for i in range(len(counts) - 1): - unq_counter += counts[i + 1] + for i in range(1, len(counts)): + unq_counter += counts[i] if any(c >= 2 for c in unq_counter.values()): return False return True From 66203f9b6f029769b3e37d59d3465984f76a2e53 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 24 Oct 2023 14:30:10 -0700 Subject: [PATCH 7/7] fix function --- src/tape/ensemble.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 5a2e29d1..8d0b65dc 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -233,7 +233,7 @@ def check_sorted(self, table="object"): # Use the existing index function to check if it's sorted (increasing) return idx.is_monotonic_increasing.compute() - def check_lightcurve_cohesion(ens): + def check_lightcurve_cohesion(self): """Checks to see if lightcurves are split across multiple partitions. With partitioned data, and source information represented by rows, it @@ -250,7 +250,7 @@ def check_lightcurve_cohesion(ens): across multiple partitions (False) """ - idx = ens._source.index + idx = self._source.index counts = idx.map_partitions(lambda a: Counter(a.unique())).compute() unq_counter = counts[0]