From 764fb49d7d39c05e1d32e662115b08546ac1d127 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 8 Jan 2024 12:31:38 -0800 Subject: [PATCH 01/15] WIP: save_ensemble --- src/tape/ensemble.py | 87 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 0d5fb912..285ff718 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -8,6 +8,7 @@ from dask.distributed import Client from collections import Counter +from collections.abc import Iterable from .analysis.base import AnalysisFunction from .analysis.feature_extractor import BaseLightCurveFeature, FeatureExtractor @@ -1237,6 +1238,92 @@ def _standardize_batch(self, batch, on, by_band): return batch + def save_ensemble(self, path=".", dirname='ensemble', additional_frames=True, **kwargs): + """Save the current ensemble frames to disk. + + Parameters + ---------- + path: 'str' or path-like, optional + A path to the desired location of the top-level save directory, by + default this is the current working directory. + dirname: 'str', optional + The name of the saved ensemble directory, "ensemble" by default. + additional_frames: bool, or list, optional + Controls whether EnsembleFrames beyond the Object and Source Frames + are saved to disk. If True or False, this specifies whether all or + none of the additional frames are saved. Alternatively, a list of + EnsembleFrame names may be provided to specify which frames should + be saved. Object and Source will always be added and do not need to + be specified in the list. By default, all frames will be saved. + **kwargs: + Additional kwargs passed along to EnsembleFrame.to_parquet() + + Returns + ---------- + None + """ + + # Determine the path + ens_path = os.join(path, dirname) + + # Compile frame list + if additional_frames is True: + frames_to_save = list(self.frames.keys()) # save all frames + elif additional_frames is False: + frames_to_save = ["object", "source"] # save just object and source + elif isinstance(additional_frames, Iterable): + frames_to_save = [frame for frame in additional_frames if + frame in list(self.frames.keys())] + + # Raise an error if any frames were not found in the frame list + if len(frames_to_save) != len(additional_frames): + raise ValueError("One or more frames specified in `additional_frames` was not found in the frame list.") + + # Make sure object and source are in the frame list + if "object" not in frames_to_save: + frames_to_save.append("object") + if "source" not in frames_to_save: + frames_to_save.append("source") + else: + raise ValueError("Invalid input to `additional_frames`, must be boolean or list-like") + + # Save the frame list to disk + for frame_label in frames_to_save: + # grab the dataframe from the frame label + frame = self.frames[frame_label] + + # creates a subdirectory for the frame partition files + frame.to_parquet(os.path.join(ens_path, frame_label), **kwargs) + + # Save a ColumnMapper file + col_map = self.make_column_map() + np.save(os.path.join(ens_path, "column_mapper.npy"), col_map.map) + + return + + def from_ensemble(self, dirpath, additional_frames): + """Load an ensemble from an on-disk ensemble. + + Parameters + ---------- + dirpath: 'str' or path-like, optional + A path to the top-level ensemble directory to load from. + additional_frames: bool, or list, optional + Controls whether EnsembleFrames beyond the Object and Source Frames + are loaded from disk. If True or False, this specifies whether all + or none of the additional frames are loaded. Alternatively, a list + of EnsembleFrame names may be provided to specify which frames + should be loaded. Object and Source will always be added and do not + need to be specified in the list. By default, all frames will be + loaded. + + Returns + ---------- + ensemble: `tape.ensemble.Ensemble` + The ensemble object. + """ + pass + def from_pandas( self, source_frame, From 720c33c3b7cd8cb66f4f9d85e3c62424ab8b5a45 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Mon, 8 Jan 2024 15:25:15 -0800 Subject: [PATCH 02/15] WIP: from_ensemble --- src/tape/ensemble.py | 38 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 285ff718..6d3a187c 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1301,7 +1301,7 @@ def save_ensemble(self, path=".", dirname='ensemble', additional_frames=True, ** return - def from_ensemble(self, dirpath, additional_frames): + def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **kwargs): """Load an ensemble from an on-disk ensemble. Parameters @@ -1316,13 +1316,47 @@ def from_ensemble(self, dirpath, additional_frames): should be loaded. Object and Source will always be added and do not need to be specified in the list. By default, all frames will be loaded. + column_mapper: Tape.ColumnMapper object, or None, optional + Supplies a ColumnMapper to the Ensemble, if None (default) searches + for a column_mapper.npy file in the directory, which should be + created when the ensemble is saved. Returns ---------- ensemble: `tape.ensemble.Ensemble` The ensemble object. """ - pass + + # First grab the column_mapper if not specified + if column_mapper is None: + column_mapper = np.load(os.path.join(dirpath, 'column_mapper.npy'), allow_pickle='TRUE').item() + + # Load Object and Source + obj_path = os.path.join(dirpath, "object") + src_path = os.path.join(dirpath, "object") + self.from_parquet(src_path, obj_path, column_mapper=column_mapper, **kwargs) + + # Load all remaining frames + if additional_frames is False: + return self # we are all done + else: + if additional_frames is True: + # Grab all subdirectory paths in the top-level folder, filter out any files + frames_to_load = [f for f in os.listdir(dirpath) if not os.path.isfile(os.join(dirpath, f))] + elif isinstance(additional_frames, Iterable): + frames_to_load = [os.path.join(dirpath, frame) for frame in additional_frames] + else: + raise ValueError("Invalid input to `additional_frames`, must be boolean or list-like") + + # Filter out object and source from additional frames + frames_to_load = [frame for frame in frames_to_load if os.path.split(frame)[1] != "object" or os.path.split(frame)[1] != "source"] + + for frame in frames_to_load: + label = os.path.split(frame)[1] + ddf = EnsembleFrame.from_parquet(frame, label=label) + self.add_frame(ddf, label) + + return self def from_pandas( self, From 3c5ccbb2f00be96afac9e20a85ee78bdfeacc875 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 9 Jan 2024 13:25:42 -0800 Subject: [PATCH 03/15] save and load ensemble --- src/tape/ensemble.py | 45 +++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 6d3a187c..d427d9de 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -802,7 +802,9 @@ def calc_nobs(self, by_band=False, label="nobs", temporary=True): band_counts["total"] = band_counts[list(band_counts.columns)].sum(axis=1) bands = band_counts.columns.values - self.object = self.object.assign(**{label + "_" + str(band): band_counts[band] for band in bands}) + self.object.assign( + **{label + "_" + str(band): band_counts[band] for band in bands} + ).update_ensemble() if temporary: self._object_temp.extend(label + "_" + str(band) for band in bands) @@ -825,7 +827,7 @@ def calc_nobs(self, by_band=False, label="nobs", temporary=True): .repartition(npartitions=self.object.npartitions) ) - self.object = self.object.assign(**{label + "_total": counts[self._band_col]}) + self.object.assign(**{label + "_total": counts[self._band_col]}).update_ensemble() if temporary: self._object_temp.extend([label + "_total"]) @@ -1238,7 +1240,7 @@ def _standardize_batch(self, batch, on, by_band): return batch - def save_ensemble(self, path=".", dirname='ensemble', additional_frames=True, **kwargs): + def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, **kwargs): """Save the current ensemble frames to disk. Parameters @@ -1263,8 +1265,10 @@ def save_ensemble(self, path=".", dirname='ensemble', additional_frames=True, ** None """ + self._lazy_sync_tables("all") + # Determine the path - ens_path = os.join(path, dirname) + ens_path = os.path.join(path, dirname) # Compile frame list if additional_frames is True: @@ -1272,12 +1276,13 @@ def save_ensemble(self, path=".", dirname='ensemble', additional_frames=True, ** elif additional_frames is False: frames_to_save = ["object", "source"] # save just object and source elif isinstance(additional_frames, Iterable): - frames_to_save = [frame for frame in additional_frames if - frame in list(self.frames.keys())] + frames_to_save = [frame for frame in additional_frames if frame in list(self.frames.keys())] # Raise an error if any frames were not found in the frame list if len(frames_to_save) != len(additional_frames): - raise ValueError("One or more frames specified in `additional_frames` was not found in the frame list.") + raise ValueError( + "One or more frames specified in `additional_frames` was not found in the frame list." + ) # Make sure object and source are in the frame list if "object" not in frames_to_save: @@ -1299,6 +1304,8 @@ def save_ensemble(self, path=".", dirname='ensemble', additional_frames=True, ** col_map = self.make_column_map() np.save(os.path.join(ens_path, "column_mapper.npy"), col_map.map) + print(f"Saved to {os.path.join(path, dirname)}") + return def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **kwargs): @@ -1329,7 +1336,9 @@ def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **k # First grab the column_mapper if not specified if column_mapper is None: - column_mapper = np.load(os.path.join(dirpath, 'column_mapper.npy'), allow_pickle='TRUE').item() + map_dict = np.load(os.path.join(dirpath, "column_mapper.npy"), allow_pickle="TRUE").item() + column_mapper = ColumnMapper() + column_mapper.map = map_dict # Load Object and Source obj_path = os.path.join(dirpath, "object") @@ -1342,19 +1351,25 @@ def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **k else: if additional_frames is True: # Grab all subdirectory paths in the top-level folder, filter out any files - frames_to_load = [f for f in os.listdir(dirpath) if not os.path.isfile(os.join(dirpath, f))] + frames_to_load = [ + os.path.join(dirpath, f) + for f in os.listdir(dirpath) + if not os.path.isfile(os.path.join(dirpath, f)) + ] elif isinstance(additional_frames, Iterable): frames_to_load = [os.path.join(dirpath, frame) for frame in additional_frames] else: raise ValueError("Invalid input to `additional_frames`, must be boolean or list-like") # Filter out object and source from additional frames - frames_to_load = [frame for frame in frames_to_load if os.path.split(frame)[1] != "object" or os.path.split(frame)[1] != "source"] - - for frame in frames_to_load: - label = os.path.split(frame)[1] - ddf = EnsembleFrame.from_parquet(frame, label=label) - self.add_frame(ddf, label) + frames_to_load = [ + frame for frame in frames_to_load if os.path.split(frame)[1] not in ["object", "source"] + ] + if len(frames_to_load) > 0: + for frame in frames_to_load: + label = os.path.split(frame)[1] + ddf = EnsembleFrame.from_parquet(frame, label=label) + self.add_frame(ddf, label) return self From eb0f2b7b64f2059c1eacaf33831f3faa435359bf Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 9 Jan 2024 13:45:31 -0800 Subject: [PATCH 04/15] handle no-column object table; WIP unit test --- src/tape/ensemble.py | 16 ++++++++++++-- tests/tape_tests/test_ensemble.py | 36 +++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index d427d9de..6d286f66 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1297,6 +1297,13 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** # grab the dataframe from the frame label frame = self.frames[frame_label] + # Object can have no columns, which parquet doesn't handle + # In this case, we'll avoid saving to parquet + if frame_label == "object": + if len(frame.columns) == 0: + print("The Object Frame was not saved as no columns were present.") + continue + # creates a subdirectory for the frame partition files frame.to_parquet(os.path.join(ens_path, frame_label), **kwargs) @@ -1342,8 +1349,13 @@ def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **k # Load Object and Source obj_path = os.path.join(dirpath, "object") - src_path = os.path.join(dirpath, "object") - self.from_parquet(src_path, obj_path, column_mapper=column_mapper, **kwargs) + src_path = os.path.join(dirpath, "source") + + # Check for whether or not object is present, it's not saved when no columns are present + if "object" in os.listdir(dirpath): + self.from_parquet(src_path, obj_path, column_mapper=column_mapper, **kwargs) + else: + self.from_parquet(src_path, column_mapper=column_mapper, **kwargs) # Load all remaining frames if additional_frames is False: diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 995b1e82..18604775 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -478,6 +478,42 @@ def test_read_source_dict(dask_client): assert 8002 in obj_table.index +def test_save_and_load_ensemble(dask_client): + # Set a seed for reproducibility + np.random.seed(1) + + # Create some toy data + obj_ids = np.array([]) + mjds = np.array([]) + for i in range(10, 110): + obj_ids = np.append(obj_ids, np.array([i] * 1250)) + mjds = np.append(mjds, np.arange(0.0, 1250.0, 1.0)) + obj_ids = np.array(obj_ids) + flux = 10 * np.random.random(125000) + err = flux / 10 + band = np.random.choice(["g", "r"], 125000) + + # Store it in a dictionary + source_dict = {"id": obj_ids, "mjd": mjds, "flux": flux, "err": err, "band": band} + + # Create an Ensemble + ens = Ensemble() + ens.from_source_dict( + source_dict, + column_mapper=ColumnMapper( + id_col="id", time_col="mjd", flux_col="flux", err_col="err", band_col="band" + ), + ) + + # Make a column for the object table + ens.calc_nobs(temporary=False) + # Add a few result frames + ens.batch(np.mean, "flux", label="mean") + ens.batch(np.max, "flux", label="max") + + ens.save_ensemble("./ensemble") + + def test_insert(parquet_ensemble): num_partitions = parquet_ensemble.source.npartitions (old_object, old_source) = parquet_ensemble.compute() From 7bd32601a0848a118f5ad82174fd857d140339a5 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 9 Jan 2024 14:40:33 -0800 Subject: [PATCH 05/15] add unit test for save and load --- tests/tape_tests/test_ensemble.py | 76 +++++++++++++++++++++++++++++-- 1 file changed, 71 insertions(+), 5 deletions(-) diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 18604775..7f7407b9 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -1,5 +1,6 @@ """Test ensemble manipulations""" import copy +import os import dask.dataframe as dd import numpy as np @@ -478,7 +479,12 @@ def test_read_source_dict(dask_client): assert 8002 in obj_table.index -def test_save_and_load_ensemble(dask_client): +@pytest.mark.parametrize("add_frames", [True, False, ["max"], "42", ["max", "min"]]) +@pytest.mark.parametrize("obj_nocols", [True, False]) +def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols): + # Setup a temporary directory for files + save_path = tmp_path / "." + # Set a seed for reproducibility np.random.seed(1) @@ -497,7 +503,7 @@ def test_save_and_load_ensemble(dask_client): source_dict = {"id": obj_ids, "mjd": mjds, "flux": flux, "err": err, "band": band} # Create an Ensemble - ens = Ensemble() + ens = Ensemble(client=dask_client) ens.from_source_dict( source_dict, column_mapper=ColumnMapper( @@ -505,13 +511,73 @@ def test_save_and_load_ensemble(dask_client): ), ) - # Make a column for the object table - ens.calc_nobs(temporary=False) + # object table as defined above has no columns, add a column to test both cases + if not obj_nocols: + # Make a column for the object table + ens.calc_nobs(temporary=False) + # Add a few result frames ens.batch(np.mean, "flux", label="mean") ens.batch(np.max, "flux", label="max") - ens.save_ensemble("./ensemble") + # Save the Ensemble + if add_frames == "42" or add_frames == ["max", "min"]: + with pytest.raises(ValueError): + ens.save_ensemble(save_path, dirname="ensemble", additional_frames=add_frames) + return + else: + ens.save_ensemble(save_path, dirname="ensemble", additional_frames=add_frames) + # Inspect the save directory + dircontents = os.listdir(os.path.join(save_path, "ensemble")) + + assert "source" in dircontents # Source should always be there + assert "column_mapper.npy" in dircontents # should make a column_mapper file + if obj_nocols: # object shouldn't if it was empty + assert "object" not in dircontents + else: # otherwise it should be present + assert "object" in dircontents + if add_frames is True: # if additional_frames is true, mean and max should be there + assert "max" in dircontents + assert "mean" in dircontents + elif add_frames is False: # but they shouldn't be there if additional_frames is false + assert "max" not in dircontents + assert "mean" not in dircontents + elif type(add_frames) == list: # only max should be there if ["max"] is the input + assert "max" in dircontents + assert "mean" not in dircontents + + # Load a new Ensemble + loaded_ens = Ensemble(dask_client) + loaded_ens.from_ensemble(os.path.join(save_path, "ensemble"), additional_frames=add_frames) + + # compare object and source dataframes + assert loaded_ens.source.compute().equals(ens.source.compute()) + assert loaded_ens.object.compute().equals(ens.object.compute()) + + # Check the contents of the loaded ensemble + if add_frames is True: # if additional_frames is true, mean and max should be there + assert "max" in loaded_ens.frames.keys() + assert "mean" in loaded_ens.frames.keys() + + # Make sure the dataframes are identical + assert loaded_ens.select_frame("max").compute().equals(ens.select_frame("max").compute()) + assert loaded_ens.select_frame("mean").compute().equals(ens.select_frame("mean").compute()) + + elif add_frames is False: # but they shouldn't be there if additional_frames is false + assert "max" not in loaded_ens.frames.keys() + assert "mean" not in loaded_ens.frames.keys() + + elif type(add_frames) == list: # only max should be there if ["max"] is the input + assert "max" in loaded_ens.frames.keys() + assert "mean" not in loaded_ens.frames.keys() + + # Make sure the dataframes are identical + assert loaded_ens.select_frame("max").compute().equals(ens.select_frame("max").compute()) + + # Test a bad additional_frames call for the loader + with pytest.raises(ValueError): + bad_ens = Ensemble(dask_client) + loaded_ens.from_ensemble(os.path.join(save_path, "ensemble"), additional_frames=3) def test_insert(parquet_ensemble): From 54a1392ca3c77ca8691a789ed1089ff842dcde28 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Tue, 9 Jan 2024 14:53:20 -0800 Subject: [PATCH 06/15] docstring note; test tweak for code cov --- src/tape/ensemble.py | 7 +++++++ tests/tape_tests/test_ensemble.py | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 6d286f66..d1d23f36 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1263,6 +1263,13 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** Returns ---------- None + + Note + ---- + If the object frame is empty, which is often the case when an Ensemble + is constructed using only source files/dictionaries, then an object + subdirectory will not be created. `Ensemble.from_ensemble` will know + how to work with the directory in both cases. """ self._lazy_sync_tables("all") diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 7f7407b9..592eada4 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -479,7 +479,7 @@ def test_read_source_dict(dask_client): assert 8002 in obj_table.index -@pytest.mark.parametrize("add_frames", [True, False, ["max"], "42", ["max", "min"]]) +@pytest.mark.parametrize("add_frames", [True, False, ["max"], 42, ["max", "min"]]) @pytest.mark.parametrize("obj_nocols", [True, False]) def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols): # Setup a temporary directory for files @@ -521,7 +521,7 @@ def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols): ens.batch(np.max, "flux", label="max") # Save the Ensemble - if add_frames == "42" or add_frames == ["max", "min"]: + if add_frames == 42 or add_frames == ["max", "min"]: with pytest.raises(ValueError): ens.save_ensemble(save_path, dirname="ensemble", additional_frames=add_frames) return From 3a8cb2197066ee0a732b4d935f869f839464e534 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 10 Jan 2024 10:17:27 -0800 Subject: [PATCH 07/15] ensemble reader function --- src/tape/ensemble.py | 11 +++++---- src/tape/ensemble_readers.py | 38 +++++++++++++++++++++++++++++++ tests/tape_tests/test_ensemble.py | 12 +++++++--- 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index d1d23f36..466cf254 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1266,10 +1266,11 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** Note ---- - If the object frame is empty, which is often the case when an Ensemble - is constructed using only source files/dictionaries, then an object - subdirectory will not be created. `Ensemble.from_ensemble` will know - how to work with the directory in both cases. + If the object frame has no columns, which is often the case when an + Ensemble is constructed using only source files/dictionaries, then an + object subdirectory will not be created. `Ensemble.from_ensemble` will + know how to work with the directory whether or not the object + subdirectory is present. """ self._lazy_sync_tables("all") @@ -1387,7 +1388,7 @@ def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **k if len(frames_to_load) > 0: for frame in frames_to_load: label = os.path.split(frame)[1] - ddf = EnsembleFrame.from_parquet(frame, label=label) + ddf = EnsembleFrame.from_parquet(frame, label=label, **kwargs) self.add_frame(ddf, label) return self diff --git a/src/tape/ensemble_readers.py b/src/tape/ensemble_readers.py index 9b636024..e16efc12 100644 --- a/src/tape/ensemble_readers.py +++ b/src/tape/ensemble_readers.py @@ -10,6 +10,44 @@ from tape.utils import ColumnMapper +def read_ensemble(dirpath, additional_frames=True, column_mapper=None, dask_client=True, **kwargs): + """Load an ensemble from an on-disk ensemble. + + Parameters + ---------- + dirpath: 'str' or path-like, optional + A path to the top-level ensemble directory to load from. + additional_frames: bool, or list, optional + Controls whether EnsembleFrames beyond the Object and Source Frames + are loaded from disk. If True or False, this specifies whether all + or none of the additional frames are loaded. Alternatively, a list + of EnsembleFrame names may be provided to specify which frames + should be loaded. Object and Source will always be added and do not + need to be specified in the list. By default, all frames will be + loaded. + column_mapper: Tape.ColumnMapper object, or None, optional + Supplies a ColumnMapper to the Ensemble, if None (default) searches + for a column_mapper.npy file in the directory, which should be + created when the ensemble is saved. + dask_client: `dask.distributed.client` or `bool`, optional + Accepts an existing `dask.distributed.Client`, or creates one if + `client=True`, passing any additional kwargs to a + dask.distributed.Client constructor call. If `client=False`, the + Ensemble is created without a distributed client. + + Returns + ---------- + ensemble: `tape.ensemble.Ensemble` + An ensemble object. + """ + + new_ens = Ensemble(dask_client, **kwargs) + + new_ens.from_ensemble(dirpath, additional_frames=additional_frames, column_mapper=column_mapper, **kwargs) + + return new_ens + + def read_pandas_dataframe( source_frame, object_frame=None, diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 592eada4..206d99df 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -481,7 +481,8 @@ def test_read_source_dict(dask_client): @pytest.mark.parametrize("add_frames", [True, False, ["max"], 42, ["max", "min"]]) @pytest.mark.parametrize("obj_nocols", [True, False]) -def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols): +@pytest.mark.parametrize("use_reader", [False, True]) +def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols, use_reader): # Setup a temporary directory for files save_path = tmp_path / "." @@ -547,8 +548,13 @@ def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols): assert "mean" not in dircontents # Load a new Ensemble - loaded_ens = Ensemble(dask_client) - loaded_ens.from_ensemble(os.path.join(save_path, "ensemble"), additional_frames=add_frames) + if not use_reader: + loaded_ens = Ensemble(dask_client) + loaded_ens.from_ensemble(os.path.join(save_path, "ensemble"), additional_frames=add_frames) + else: + loaded_ens = tape.ensemble_readers.read_ensemble( + os.path.join(save_path, "ensemble"), additional_frames=add_frames, dask_client=dask_client + ) # compare object and source dataframes assert loaded_ens.source.compute().equals(ens.source.compute()) From c9e34a43fc6d622e1c390513147c2f4d057f0210 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 10 Jan 2024 10:54:33 -0800 Subject: [PATCH 08/15] add save docs --- .../tutorials/working_with_the_ensemble.ipynb | 47 ++++++++++++++++--- src/tape/ensemble.py | 6 +++ 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/docs/tutorials/working_with_the_ensemble.ipynb b/docs/tutorials/working_with_the_ensemble.ipynb index 1e43d3de..823fc16b 100644 --- a/docs/tutorials/working_with_the_ensemble.ipynb +++ b/docs/tutorials/working_with_the_ensemble.ipynb @@ -825,6 +825,46 @@ "We see that we now have a `Pandas.series` of `my_average_flux` result by object_id (lightcurve). In many cases, this may not be the ideal output for your function. This output is controlled by the `Dask` `meta` parameter. For more information on this parameter, you can read the `Dask` [documentation](https://blog.dask.org/2022/08/09/understanding-meta-keyword-argument). You may pass the `meta` parameter through `Ensemble.batch`, as shown above." ] }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Saving the Ensemble to Disk\n", + "\n", + "In some situations, you may find yourself running a given workflow many times. Due to the nature of lazy-computation, this will involve repeated execution of data I/O, pre-processing steps, initial analysis, etc. In these situations, it may be effective to instead save the ensemble state to disk after completion of these initial processing steps. To accomplish this, we can use the `Ensemble.save_ensemble` function." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ens.save_ensemble(\".\", \"ensemble\", additional_frames=[\"result_3\"]) # Saves object, source, and result_3 to disk" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The above command creates an \"ensemble\" directory in the current working directory. This directory contains a subdirectory of parquet files for each `EnsembleFrame` object that was included in the `additional_frames` kwarg. Note that if `additional_frames` was set to True or False this would save all or none of the additional `EnsembleFrame` objects respectively, and that the object (unless it has no columns) and source frames are always saved.\n", + "\n", + "From here, we can just load the ensemble from disk." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "new_ens = Ensemble(client = ens.client) # use the same client\n", + "new_ens.from_ensemble(\"./ensemble\", additional_frames=True)\n", + "new_ens.select_frame(\"result_3\").head(5)" + ] + }, { "cell_type": "code", "execution_count": null, @@ -838,13 +878,6 @@ "source": [ "ens.client.close() # Tear down the ensemble client" ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 466cf254..61cfb773 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1271,6 +1271,12 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** object subdirectory will not be created. `Ensemble.from_ensemble` will know how to work with the directory whether or not the object subdirectory is present. + + Be careful about repeated saves to the same directory name. This will + not be a perfect overwrite, as any products produced by a previous save + may not be deleted by successive saves if they are removed from the + ensemble. For best results, delete the directory between saves or + verify that the contents are what you would expect. """ self._lazy_sync_tables("all") From bd96057d2a1aae1c184c632b066ee7ac83e93c36 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 10 Jan 2024 11:03:18 -0800 Subject: [PATCH 09/15] add save docs --- docs/tutorials/working_with_the_ensemble.ipynb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/tutorials/working_with_the_ensemble.ipynb b/docs/tutorials/working_with_the_ensemble.ipynb index 823fc16b..a0f5b2ec 100644 --- a/docs/tutorials/working_with_the_ensemble.ipynb +++ b/docs/tutorials/working_with_the_ensemble.ipynb @@ -841,7 +841,9 @@ "metadata": {}, "outputs": [], "source": [ - "ens.save_ensemble(\".\", \"ensemble\", additional_frames=[\"result_3\"]) # Saves object, source, and result_3 to disk" + "ens.save_ensemble(\n", + " \".\", \"ensemble\", additional_frames=[\"result_3\"],\n", + " ) # Saves object, source, and result_3 to disk" ] }, { @@ -860,7 +862,7 @@ "metadata": {}, "outputs": [], "source": [ - "new_ens = Ensemble(client = ens.client) # use the same client\n", + "new_ens = Ensemble(client=ens.client) # use the same client\n", "new_ens.from_ensemble(\"./ensemble\", additional_frames=True)\n", "new_ens.select_frame(\"result_3\").head(5)" ] From 3c54ff73de18bb5ec2204ba7fea389cdf40227f3 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 10 Jan 2024 11:07:49 -0800 Subject: [PATCH 10/15] add save docs --- .../tutorials/working_with_the_ensemble.ipynb | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/docs/tutorials/working_with_the_ensemble.ipynb b/docs/tutorials/working_with_the_ensemble.ipynb index a0f5b2ec..25a4818e 100644 --- a/docs/tutorials/working_with_the_ensemble.ipynb +++ b/docs/tutorials/working_with_the_ensemble.ipynb @@ -26,7 +26,7 @@ "import numpy as np\n", "import pandas as pd\n", "\n", - "np.random.seed(1) \n", + "np.random.seed(1)\n", "\n", "# Generate 10 astronomical objects\n", "n_obj = 10\n", @@ -34,9 +34,9 @@ "names = ids.astype(str)\n", "object_table = pd.DataFrame(\n", " {\n", - " \"id\": ids, \n", + " \"id\": ids,\n", " \"name\": names,\n", - " \"ddf_bool\": np.random.randint(0, 2, n_obj), # 0 if from deep drilling field, 1 otherwise\n", + " \"ddf_bool\": np.random.randint(0, 2, n_obj), # 0 if from deep drilling field, 1 otherwise\n", " \"libid_cadence\": np.random.randint(1, 130, n_obj),\n", " }\n", ")\n", @@ -49,7 +49,7 @@ " {\n", " \"id\": 8000 + (np.arange(num_points) % n_obj),\n", " \"time\": np.arange(num_points),\n", - " \"flux\": np.random.random_sample(size=num_points)*10,\n", + " \"flux\": np.random.random_sample(size=num_points) * 10,\n", " \"band\": np.repeat(all_bands, num_points / len(all_bands)),\n", " \"error\": np.random.random_sample(size=num_points),\n", " \"count\": np.arange(num_points),\n", @@ -89,7 +89,8 @@ " flux_col=\"flux\",\n", " err_col=\"error\",\n", " band_col=\"band\",\n", - " npartitions=1)" + " npartitions=1,\n", + ")" ] }, { @@ -124,18 +125,12 @@ "from tape.utils import ColumnMapper\n", "\n", "# columns assigned manually\n", - "col_map = ColumnMapper().assign(id_col=\"id\",\n", - " time_col=\"time\",\n", - " flux_col=\"flux\",\n", - " err_col=\"error\",\n", - " band_col=\"band\")\n", + "col_map = ColumnMapper().assign(\n", + " id_col=\"id\", time_col=\"time\", flux_col=\"flux\", err_col=\"error\", band_col=\"band\"\n", + ")\n", "\n", "# Pass the ColumnMapper along to from_pandas\n", - "ens.from_pandas(\n", - " source_frame=source_table,\n", - " object_frame=object_table,\n", - " column_mapper=col_map,\n", - " npartitions=1)" + "ens.from_pandas(source_frame=source_table, object_frame=object_table, column_mapper=col_map, npartitions=1)" ] }, { @@ -616,8 +611,8 @@ "metadata": {}, "outputs": [], "source": [ - "ens.add_frame(ens.select_frame(\"stetson_j\"), \"stetson_j_result_1\") # Add result under new label\n", - "ens.drop_frame(\"stetson_j\") # Drop original label\n", + "ens.add_frame(ens.select_frame(\"stetson_j\"), \"stetson_j_result_1\") # Add result under new label\n", + "ens.drop_frame(\"stetson_j\") # Drop original label\n", "\n", "ens.select_frame(\"stetson_j_result_1\").compute()" ] @@ -655,7 +650,7 @@ "ens.drop_frame(\"result_1\")\n", "\n", "try:\n", - " ens.select_frame(\"result_1\") # This should result in a KeyError since the frame has been dropped.\n", + " ens.select_frame(\"result_1\") # This should result in a KeyError since the frame has been dropped.\n", "except Exception as e:\n", " print(\"As expected, the frame 'result_1 was dropped.\\n\" + str(e))" ] @@ -842,8 +837,10 @@ "outputs": [], "source": [ "ens.save_ensemble(\n", - " \".\", \"ensemble\", additional_frames=[\"result_3\"],\n", - " ) # Saves object, source, and result_3 to disk" + " \".\",\n", + " \"ensemble\",\n", + " additional_frames=[\"result_3\"],\n", + ") # Saves object, source, and result_3 to disk" ] }, { @@ -862,7 +859,7 @@ "metadata": {}, "outputs": [], "source": [ - "new_ens = Ensemble(client=ens.client) # use the same client\n", + "new_ens = Ensemble(client=ens.client) # use the same client\n", "new_ens.from_ensemble(\"./ensemble\", additional_frames=True)\n", "new_ens.select_frame(\"result_3\").head(5)" ] From ab10b6da16358719ca3e11074184e691f921e11b Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 10 Jan 2024 14:26:32 -0800 Subject: [PATCH 11/15] better expose from_parquet kwargs --- src/tape/ensemble.py | 54 ++++++++++++++++++++++++++++++------ src/tape/ensemble_readers.py | 34 +++++++++++++++++++++-- 2 files changed, 77 insertions(+), 11 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 61cfb773..9501d68e 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1329,7 +1329,15 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** return - def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **kwargs): + def from_ensemble(self, + dirpath, + additional_frames=True, + column_mapper=None, + additional_cols=True, + partition_size=None, + sorted=False, + sort=False, + **kwargs): """Load an ensemble from an on-disk ensemble. Parameters @@ -1348,6 +1356,19 @@ def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **k Supplies a ColumnMapper to the Ensemble, if None (default) searches for a column_mapper.npy file in the directory, which should be created when the ensemble is saved. + additional_cols: 'bool', optional + Boolean to indicate whether to carry in columns beyond the + critical columns, true will, while false will only load the columns + containing the critical quantities (id,time,flux,err,band) + partition_size: `int`, optional + If specified, attempts to repartition the ensemble to partitions + of size `partition_size`. + sorted: bool, optional + If the index column is already sorted in increasing order. + Defaults to False + sort: `bool`, optional + If True, sorts the DataFrame by the id column. Otherwise set the + index on the individual existing partitions. Defaults to False. Returns ---------- @@ -1367,9 +1388,25 @@ def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **k # Check for whether or not object is present, it's not saved when no columns are present if "object" in os.listdir(dirpath): - self.from_parquet(src_path, obj_path, column_mapper=column_mapper, **kwargs) + self.from_parquet(src_path, + obj_path, + column_mapper=column_mapper, + additional_cols=additional_cols, + sorted=sorted, sort=sort, + sync_tables=False, # a sync should always be performed just before saving + npartitions=None, # disabled, as this would be applied to all frames + partition_size=partition_size, + **kwargs) else: - self.from_parquet(src_path, column_mapper=column_mapper, **kwargs) + self.from_parquet(src_path, + column_mapper=column_mapper, + additional_cols=additional_cols, + sorted=sorted, + sort=sort, + sync_tables=False, # a sync should always be performed just before saving + npartitions=None, # disabled, as this would be applied to all frames + partition_size=partition_size, + **kwargs) # Load all remaining frames if additional_frames is False: @@ -1499,6 +1536,12 @@ def from_dask_dataframe( self._load_column_mapper(column_mapper, **kwargs) source_frame = SourceFrame.from_dask_dataframe(source_frame, self) + # Repartition before any sorting + if npartitions and npartitions > 1: + source_frame = source_frame.repartition(npartitions=npartitions) + elif partition_size: + source_frame = source_frame.repartition(partition_size=partition_size) + # Set the index of the source frame and save the resulting table self.update_frame(source_frame.set_index(self._id_col, drop=True, sorted=sorted, sort=sort)) @@ -1515,11 +1558,6 @@ def from_dask_dataframe( self.object.set_dirty(True) self._sync_tables() - if npartitions and npartitions > 1: - self.source = self.source.repartition(npartitions=npartitions) - elif partition_size: - self.source = self.source.repartition(partition_size=partition_size) - # Check that Divisions are established, warn if not. for name, table in [("object", self.object), ("source", self.source)]: if not table.known_divisions: diff --git a/src/tape/ensemble_readers.py b/src/tape/ensemble_readers.py index e16efc12..8446dbe1 100644 --- a/src/tape/ensemble_readers.py +++ b/src/tape/ensemble_readers.py @@ -10,7 +10,15 @@ from tape.utils import ColumnMapper -def read_ensemble(dirpath, additional_frames=True, column_mapper=None, dask_client=True, **kwargs): +def read_ensemble(dirpath, + additional_frames=True, + column_mapper=None, + dask_client=True, + additional_cols=True, + partition_size=None, + sorted=False, + sort=False, + **kwargs): """Load an ensemble from an on-disk ensemble. Parameters @@ -29,6 +37,19 @@ def read_ensemble(dirpath, additional_frames=True, column_mapper=None, dask_clie Supplies a ColumnMapper to the Ensemble, if None (default) searches for a column_mapper.npy file in the directory, which should be created when the ensemble is saved. + additional_cols: 'bool', optional + Boolean to indicate whether to carry in columns beyond the + critical columns, true will, while false will only load the columns + containing the critical quantities (id,time,flux,err,band) + partition_size: `int`, optional + If specified, attempts to repartition the ensemble to partitions + of size `partition_size`. + sorted: bool, optional + If the index column is already sorted in increasing order. + Defaults to False + sort: `bool`, optional + If True, sorts the DataFrame by the id column. Otherwise set the + index on the individual existing partitions. Defaults to False. dask_client: `dask.distributed.client` or `bool`, optional Accepts an existing `dask.distributed.Client`, or creates one if `client=True`, passing any additional kwargs to a @@ -41,9 +62,16 @@ def read_ensemble(dirpath, additional_frames=True, column_mapper=None, dask_clie An ensemble object. """ - new_ens = Ensemble(dask_client, **kwargs) + new_ens = Ensemble(dask_client) - new_ens.from_ensemble(dirpath, additional_frames=additional_frames, column_mapper=column_mapper, **kwargs) + new_ens.from_ensemble(dirpath, + additional_frames=additional_frames, + column_mapper=column_mapper, + additional_cols=additional_cols, + partition_size=partition_size, + sorted=sorted, + sort=sort, + **kwargs) return new_ens From 6730d47bcc8b5f2ee0d28ccee10e8882f2ca1c92 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Wed, 10 Jan 2024 14:27:03 -0800 Subject: [PATCH 12/15] better expose from_parquet kwargs --- src/tape/ensemble.py | 61 ++++++++++++++++++++---------------- src/tape/ensemble_readers.py | 38 ++++++++++++---------- 2 files changed, 55 insertions(+), 44 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 9501d68e..970369c6 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1329,15 +1329,17 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** return - def from_ensemble(self, - dirpath, - additional_frames=True, - column_mapper=None, - additional_cols=True, - partition_size=None, - sorted=False, - sort=False, - **kwargs): + def from_ensemble( + self, + dirpath, + additional_frames=True, + column_mapper=None, + additional_cols=True, + partition_size=None, + sorted=False, + sort=False, + **kwargs, + ): """Load an ensemble from an on-disk ensemble. Parameters @@ -1388,25 +1390,30 @@ def from_ensemble(self, # Check for whether or not object is present, it's not saved when no columns are present if "object" in os.listdir(dirpath): - self.from_parquet(src_path, - obj_path, - column_mapper=column_mapper, - additional_cols=additional_cols, - sorted=sorted, sort=sort, - sync_tables=False, # a sync should always be performed just before saving - npartitions=None, # disabled, as this would be applied to all frames - partition_size=partition_size, - **kwargs) + self.from_parquet( + src_path, + obj_path, + column_mapper=column_mapper, + additional_cols=additional_cols, + sorted=sorted, + sort=sort, + sync_tables=False, # a sync should always be performed just before saving + npartitions=None, # disabled, as this would be applied to all frames + partition_size=partition_size, + **kwargs, + ) else: - self.from_parquet(src_path, - column_mapper=column_mapper, - additional_cols=additional_cols, - sorted=sorted, - sort=sort, - sync_tables=False, # a sync should always be performed just before saving - npartitions=None, # disabled, as this would be applied to all frames - partition_size=partition_size, - **kwargs) + self.from_parquet( + src_path, + column_mapper=column_mapper, + additional_cols=additional_cols, + sorted=sorted, + sort=sort, + sync_tables=False, # a sync should always be performed just before saving + npartitions=None, # disabled, as this would be applied to all frames + partition_size=partition_size, + **kwargs, + ) # Load all remaining frames if additional_frames is False: diff --git a/src/tape/ensemble_readers.py b/src/tape/ensemble_readers.py index 8446dbe1..d714e979 100644 --- a/src/tape/ensemble_readers.py +++ b/src/tape/ensemble_readers.py @@ -10,15 +10,17 @@ from tape.utils import ColumnMapper -def read_ensemble(dirpath, - additional_frames=True, - column_mapper=None, - dask_client=True, - additional_cols=True, - partition_size=None, - sorted=False, - sort=False, - **kwargs): +def read_ensemble( + dirpath, + additional_frames=True, + column_mapper=None, + dask_client=True, + additional_cols=True, + partition_size=None, + sorted=False, + sort=False, + **kwargs, +): """Load an ensemble from an on-disk ensemble. Parameters @@ -64,14 +66,16 @@ def read_ensemble(dirpath, new_ens = Ensemble(dask_client) - new_ens.from_ensemble(dirpath, - additional_frames=additional_frames, - column_mapper=column_mapper, - additional_cols=additional_cols, - partition_size=partition_size, - sorted=sorted, - sort=sort, - **kwargs) + new_ens.from_ensemble( + dirpath, + additional_frames=additional_frames, + column_mapper=column_mapper, + additional_cols=additional_cols, + partition_size=partition_size, + sorted=sorted, + sort=sort, + **kwargs, + ) return new_ens From aad1978a3005b165e9745492f65b3f53f50238b0 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Thu, 11 Jan 2024 12:27:57 -0800 Subject: [PATCH 13/15] metadata updates, overwrite saves --- src/tape/ensemble.py | 123 +++++++++++++++++------------- src/tape/ensemble_frame.py | 15 +--- src/tape/ensemble_readers.py | 4 - tests/tape_tests/test_ensemble.py | 29 ++++++- 4 files changed, 99 insertions(+), 72 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 970369c6..d7a8581d 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1,5 +1,7 @@ import glob import os +import json +import shutil import warnings import requests import dask.dataframe as dd @@ -1284,6 +1286,18 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** # Determine the path ens_path = os.path.join(path, dirname) + # First look for an existing metadata.json file in the path + try: + with open(os.path.join(ens_path, "metadata.json"), "r") as oldfile: + # Reading from json file + old_metadata = json.load(oldfile) + old_subdirs = old_metadata["subdirs"] + # Delete any old subdirectories + for subdir in old_subdirs: + shutil.rmtree(os.path.join(ens_path, subdir)) + except FileNotFoundError: + pass + # Compile frame list if additional_frames is True: frames_to_save = list(self.frames.keys()) # save all frames @@ -1307,23 +1321,37 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** raise ValueError("Invalid input to `additional_frames`, must be boolean or list-like") # Save the frame list to disk + created_subdirs = [] # track the list of created subdirectories + divisions_known = [] # log whether divisions were known for each frame for frame_label in frames_to_save: # grab the dataframe from the frame label frame = self.frames[frame_label] - # Object can have no columns, which parquet doesn't handle - # In this case, we'll avoid saving to parquet - if frame_label == "object": - if len(frame.columns) == 0: - print("The Object Frame was not saved as no columns were present.") - continue + # When the frame has no columns, avoid the save as parquet doesn't handle it + # Most commonly this applies to the object table when it's built from source + if len(frame.columns) == 0: + print(f"Frame: {frame_label} was not saved as no columns were present.") + continue # creates a subdirectory for the frame partition files - frame.to_parquet(os.path.join(ens_path, frame_label), **kwargs) + frame.to_parquet(os.path.join(ens_path, frame_label), write_metadata_file=True, **kwargs) + created_subdirs.append(frame_label) + divisions_known.append(frame.known_divisions) + + # Save a metadata file + col_map = self.make_column_map() # grab the current column_mapper + + metadata = { + "subdirs": created_subdirs, + "known_divisions": divisions_known, + "column_mapper": col_map.map, + } + json_metadata = json.dumps(metadata, indent=4) - # Save a ColumnMapper file - col_map = self.make_column_map() - np.save(os.path.join(ens_path, "column_mapper.npy"), col_map.map) + with open(os.path.join(ens_path, "metadata.json"), "w") as outfile: + outfile.write(json_metadata) + + # np.save(os.path.join(ens_path, "column_mapper.npy"), col_map.map) print(f"Saved to {os.path.join(path, dirname)}") @@ -1334,10 +1362,6 @@ def from_ensemble( dirpath, additional_frames=True, column_mapper=None, - additional_cols=True, - partition_size=None, - sorted=False, - sort=False, **kwargs, ): """Load an ensemble from an on-disk ensemble. @@ -1358,19 +1382,6 @@ def from_ensemble( Supplies a ColumnMapper to the Ensemble, if None (default) searches for a column_mapper.npy file in the directory, which should be created when the ensemble is saved. - additional_cols: 'bool', optional - Boolean to indicate whether to carry in columns beyond the - critical columns, true will, while false will only load the columns - containing the critical quantities (id,time,flux,err,band) - partition_size: `int`, optional - If specified, attempts to repartition the ensemble to partitions - of size `partition_size`. - sorted: bool, optional - If the index column is already sorted in increasing order. - Defaults to False - sort: `bool`, optional - If True, sorts the DataFrame by the id column. Otherwise set the - index on the individual existing partitions. Defaults to False. Returns ---------- @@ -1378,40 +1389,45 @@ def from_ensemble( The ensemble object. """ - # First grab the column_mapper if not specified - if column_mapper is None: - map_dict = np.load(os.path.join(dirpath, "column_mapper.npy"), allow_pickle="TRUE").item() - column_mapper = ColumnMapper() - column_mapper.map = map_dict + # Read in the metadata.json file + with open(os.path.join(dirpath, "metadata.json"), "r") as metadatafile: + # Reading from json file + metadata = json.load(metadatafile) + + # Load in the metadata + subdirs = metadata["subdirs"] + frame_known_divisions = metadata["known_divisions"] + if column_mapper is None: + column_mapper = ColumnMapper() + column_mapper.map = metadata["column_mapper"] # Load Object and Source - obj_path = os.path.join(dirpath, "object") - src_path = os.path.join(dirpath, "source") # Check for whether or not object is present, it's not saved when no columns are present - if "object" in os.listdir(dirpath): + if "object" in subdirs: + # divisions should be known for both tables to use the sorted kwarg + use_sorted = ( + frame_known_divisions[subdirs.index("object")] + and frame_known_divisions[subdirs.index("source")] + ) + self.from_parquet( - src_path, - obj_path, + os.path.join(dirpath, "source"), + os.path.join(dirpath, "object"), column_mapper=column_mapper, - additional_cols=additional_cols, - sorted=sorted, - sort=sort, + sorted=use_sorted, + sort=False, sync_tables=False, # a sync should always be performed just before saving - npartitions=None, # disabled, as this would be applied to all frames - partition_size=partition_size, **kwargs, ) else: + use_sorted = frame_known_divisions[subdirs.index("source")] self.from_parquet( - src_path, + os.path.join(dirpath, "source"), column_mapper=column_mapper, - additional_cols=additional_cols, - sorted=sorted, - sort=sort, + sorted=use_sorted, + sort=False, sync_tables=False, # a sync should always be performed just before saving - npartitions=None, # disabled, as this would be applied to all frames - partition_size=partition_size, **kwargs, ) @@ -1421,11 +1437,7 @@ def from_ensemble( else: if additional_frames is True: # Grab all subdirectory paths in the top-level folder, filter out any files - frames_to_load = [ - os.path.join(dirpath, f) - for f in os.listdir(dirpath) - if not os.path.isfile(os.path.join(dirpath, f)) - ] + frames_to_load = [os.path.join(dirpath, f) for f in subdirs] elif isinstance(additional_frames, Iterable): frames_to_load = [os.path.join(dirpath, frame) for frame in additional_frames] else: @@ -1438,7 +1450,10 @@ def from_ensemble( if len(frames_to_load) > 0: for frame in frames_to_load: label = os.path.split(frame)[1] - ddf = EnsembleFrame.from_parquet(frame, label=label, **kwargs) + use_divisions = frame_known_divisions[subdirs.index(label)] + ddf = EnsembleFrame.from_parquet( + frame, label=label, calculate_divisions=use_divisions, **kwargs + ) self.add_frame(ddf, label) return self diff --git a/src/tape/ensemble_frame.py b/src/tape/ensemble_frame.py index 7a910f51..b270932e 100644 --- a/src/tape/ensemble_frame.py +++ b/src/tape/ensemble_frame.py @@ -844,14 +844,7 @@ def convert_flux_to_mag( return result @classmethod - def from_parquet( - cl, - path, - index=None, - columns=None, - label=None, - ensemble=None, - ): + def from_parquet(cl, path, index=None, columns=None, label=None, ensemble=None, **kwargs): """Returns an EnsembleFrame constructed from loading a parquet file. Parameters ---------- @@ -879,11 +872,7 @@ def from_parquet( # Read the parquet file with an engine that will assume the meta is a TapeFrame which Dask will # instantiate as EnsembleFrame via its dispatcher. result = dd.read_parquet( - path, - index=index, - columns=columns, - split_row_groups=True, - engine=TapeArrowEngine, + path, index=index, columns=columns, split_row_groups=True, engine=TapeArrowEngine, **kwargs ) result.label = label result.ensemble = ensemble diff --git a/src/tape/ensemble_readers.py b/src/tape/ensemble_readers.py index d714e979..55b93e26 100644 --- a/src/tape/ensemble_readers.py +++ b/src/tape/ensemble_readers.py @@ -17,8 +17,6 @@ def read_ensemble( dask_client=True, additional_cols=True, partition_size=None, - sorted=False, - sort=False, **kwargs, ): """Load an ensemble from an on-disk ensemble. @@ -72,8 +70,6 @@ def read_ensemble( column_mapper=column_mapper, additional_cols=additional_cols, partition_size=partition_size, - sorted=sorted, - sort=sort, **kwargs, ) diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 206d99df..7fc52c2e 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -483,6 +483,7 @@ def test_read_source_dict(dask_client): @pytest.mark.parametrize("obj_nocols", [True, False]) @pytest.mark.parametrize("use_reader", [False, True]) def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols, use_reader): + """Test the save and load ensemble loop""" # Setup a temporary directory for files save_path = tmp_path / "." @@ -532,7 +533,7 @@ def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols, u dircontents = os.listdir(os.path.join(save_path, "ensemble")) assert "source" in dircontents # Source should always be there - assert "column_mapper.npy" in dircontents # should make a column_mapper file + assert "metadata.json" in dircontents # should make a metadata file if obj_nocols: # object shouldn't if it was empty assert "object" not in dircontents else: # otherwise it should be present @@ -586,6 +587,32 @@ def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols, u loaded_ens.from_ensemble(os.path.join(save_path, "ensemble"), additional_frames=3) +def test_save_overwrite(parquet_ensemble, tmp_path): + """Test that successive saves produce the correct behavior""" + # Setup a temporary directory for files + save_path = tmp_path / "." + + ens = parquet_ensemble + + # Add a few result frames + ens.batch(np.mean, "psFlux", label="mean") + ens.batch(np.max, "psFlux", label="max") + + # Write first with all frames + ens.save_ensemble(save_path, dirname="ensemble", additional_frames=True) + + # Inspect the save directory + dircontents = os.listdir(os.path.join(save_path, "ensemble")) + assert "max" in dircontents # "max" should have been added + + # Then write again with "max" not included + ens.save_ensemble(save_path, dirname="ensemble", additional_frames=["mean"]) + + # Inspect the save directory + dircontents = os.listdir(os.path.join(save_path, "ensemble")) + assert "max" not in dircontents # "max" should have been removed + + def test_insert(parquet_ensemble): num_partitions = parquet_ensemble.source.npartitions (old_object, old_source) = parquet_ensemble.compute() From 4297e7764b02038eb3e2a132a11fe1b14fe687e4 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Thu, 11 Jan 2024 13:24:35 -0800 Subject: [PATCH 14/15] include frames in error message --- src/tape/ensemble.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index d7a8581d..2deb5186 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -1304,13 +1304,14 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** elif additional_frames is False: frames_to_save = ["object", "source"] # save just object and source elif isinstance(additional_frames, Iterable): - frames_to_save = [frame for frame in additional_frames if frame in list(self.frames.keys())] - + frames_to_save = set(additional_frames) + invalid_frames = frames_to_save.difference(set(self.frames.keys())) # Raise an error if any frames were not found in the frame list - if len(frames_to_save) != len(additional_frames): + if len(invalid_frames) != 0: raise ValueError( - "One or more frames specified in `additional_frames` was not found in the frame list." + f"The frame(s): {invalid_frames} specified in `additional_frames` were not found in the frame list." ) + frames_to_save = list(frames_to_save) # Make sure object and source are in the frame list if "object" not in frames_to_save: From fcf67da6cc0a267ecc1b549aa58c84230528c530 Mon Sep 17 00:00:00 2001 From: Doug Branton Date: Thu, 11 Jan 2024 14:58:56 -0800 Subject: [PATCH 15/15] address review comments --- src/tape/ensemble.py | 53 +++++++++++++++------------- src/tape/ensemble_readers.py | 17 --------- tests/tape_tests/conftest.py | 57 ++++++++++++++++++++++++++++--- tests/tape_tests/test_ensemble.py | 4 ++- 4 files changed, 84 insertions(+), 47 deletions(-) diff --git a/src/tape/ensemble.py b/src/tape/ensemble.py index 2deb5186..90bf1a7c 100644 --- a/src/tape/ensemble.py +++ b/src/tape/ensemble.py @@ -34,6 +34,8 @@ DEFAULT_FRAME_LABEL = "result" # A base default label for an Ensemble's result frames. +METADATA_FILENAME = "ensemble_metadata.json" + class Ensemble: """Ensemble object is a collection of light curve ids""" @@ -1286,9 +1288,9 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** # Determine the path ens_path = os.path.join(path, dirname) - # First look for an existing metadata.json file in the path + # First look for an existing metadata file in the path try: - with open(os.path.join(ens_path, "metadata.json"), "r") as oldfile: + with open(os.path.join(ens_path, METADATA_FILENAME), "r") as oldfile: # Reading from json file old_metadata = json.load(oldfile) old_subdirs = old_metadata["subdirs"] @@ -1302,7 +1304,7 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** if additional_frames is True: frames_to_save = list(self.frames.keys()) # save all frames elif additional_frames is False: - frames_to_save = ["object", "source"] # save just object and source + frames_to_save = [OBJECT_FRAME_LABEL, SOURCE_FRAME_LABEL] # save just object and source elif isinstance(additional_frames, Iterable): frames_to_save = set(additional_frames) invalid_frames = frames_to_save.difference(set(self.frames.keys())) @@ -1314,14 +1316,14 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** frames_to_save = list(frames_to_save) # Make sure object and source are in the frame list - if "object" not in frames_to_save: - frames_to_save.append("object") - if "source" not in frames_to_save: - frames_to_save.append("source") + if OBJECT_FRAME_LABEL not in frames_to_save: + frames_to_save.append(OBJECT_FRAME_LABEL) + if SOURCE_FRAME_LABEL not in frames_to_save: + frames_to_save.append(SOURCE_FRAME_LABEL) else: raise ValueError("Invalid input to `additional_frames`, must be boolean or list-like") - # Save the frame list to disk + # Generate the metadata first created_subdirs = [] # track the list of created subdirectories divisions_known = [] # log whether divisions were known for each frame for frame_label in frames_to_save: @@ -1331,17 +1333,14 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** # When the frame has no columns, avoid the save as parquet doesn't handle it # Most commonly this applies to the object table when it's built from source if len(frame.columns) == 0: - print(f"Frame: {frame_label} was not saved as no columns were present.") + print(f"Frame: {frame_label} will not be saved as no columns are present.") continue - # creates a subdirectory for the frame partition files - frame.to_parquet(os.path.join(ens_path, frame_label), write_metadata_file=True, **kwargs) created_subdirs.append(frame_label) divisions_known.append(frame.known_divisions) # Save a metadata file col_map = self.make_column_map() # grab the current column_mapper - metadata = { "subdirs": created_subdirs, "known_divisions": divisions_known, @@ -1349,10 +1348,14 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, ** } json_metadata = json.dumps(metadata, indent=4) - with open(os.path.join(ens_path, "metadata.json"), "w") as outfile: + # Make the directory if it doesn't already exist + os.makedirs(ens_path, exist_ok=True) + with open(os.path.join(ens_path, METADATA_FILENAME), "w") as outfile: outfile.write(json_metadata) - # np.save(os.path.join(ens_path, "column_mapper.npy"), col_map.map) + # Now write out the frames to subdirectories + for subdir in created_subdirs: + self.frames[subdir].to_parquet(os.path.join(ens_path, subdir), write_metadata_file=True, **kwargs) print(f"Saved to {os.path.join(path, dirname)}") @@ -1390,8 +1393,8 @@ def from_ensemble( The ensemble object. """ - # Read in the metadata.json file - with open(os.path.join(dirpath, "metadata.json"), "r") as metadatafile: + # Read in the metadata file + with open(os.path.join(dirpath, METADATA_FILENAME), "r") as metadatafile: # Reading from json file metadata = json.load(metadatafile) @@ -1405,16 +1408,16 @@ def from_ensemble( # Load Object and Source # Check for whether or not object is present, it's not saved when no columns are present - if "object" in subdirs: + if OBJECT_FRAME_LABEL in subdirs: # divisions should be known for both tables to use the sorted kwarg use_sorted = ( - frame_known_divisions[subdirs.index("object")] - and frame_known_divisions[subdirs.index("source")] + frame_known_divisions[subdirs.index(OBJECT_FRAME_LABEL)] + and frame_known_divisions[subdirs.index(SOURCE_FRAME_LABEL)] ) self.from_parquet( - os.path.join(dirpath, "source"), - os.path.join(dirpath, "object"), + os.path.join(dirpath, SOURCE_FRAME_LABEL), + os.path.join(dirpath, OBJECT_FRAME_LABEL), column_mapper=column_mapper, sorted=use_sorted, sort=False, @@ -1422,9 +1425,9 @@ def from_ensemble( **kwargs, ) else: - use_sorted = frame_known_divisions[subdirs.index("source")] + use_sorted = frame_known_divisions[subdirs.index(SOURCE_FRAME_LABEL)] self.from_parquet( - os.path.join(dirpath, "source"), + os.path.join(dirpath, SOURCE_FRAME_LABEL), column_mapper=column_mapper, sorted=use_sorted, sort=False, @@ -1446,7 +1449,9 @@ def from_ensemble( # Filter out object and source from additional frames frames_to_load = [ - frame for frame in frames_to_load if os.path.split(frame)[1] not in ["object", "source"] + frame + for frame in frames_to_load + if os.path.split(frame)[1] not in [OBJECT_FRAME_LABEL, SOURCE_FRAME_LABEL] ] if len(frames_to_load) > 0: for frame in frames_to_load: diff --git a/src/tape/ensemble_readers.py b/src/tape/ensemble_readers.py index 55b93e26..91f9add2 100644 --- a/src/tape/ensemble_readers.py +++ b/src/tape/ensemble_readers.py @@ -15,8 +15,6 @@ def read_ensemble( additional_frames=True, column_mapper=None, dask_client=True, - additional_cols=True, - partition_size=None, **kwargs, ): """Load an ensemble from an on-disk ensemble. @@ -37,19 +35,6 @@ def read_ensemble( Supplies a ColumnMapper to the Ensemble, if None (default) searches for a column_mapper.npy file in the directory, which should be created when the ensemble is saved. - additional_cols: 'bool', optional - Boolean to indicate whether to carry in columns beyond the - critical columns, true will, while false will only load the columns - containing the critical quantities (id,time,flux,err,band) - partition_size: `int`, optional - If specified, attempts to repartition the ensemble to partitions - of size `partition_size`. - sorted: bool, optional - If the index column is already sorted in increasing order. - Defaults to False - sort: `bool`, optional - If True, sorts the DataFrame by the id column. Otherwise set the - index on the individual existing partitions. Defaults to False. dask_client: `dask.distributed.client` or `bool`, optional Accepts an existing `dask.distributed.Client`, or creates one if `client=True`, passing any additional kwargs to a @@ -68,8 +53,6 @@ def read_ensemble( dirpath, additional_frames=additional_frames, column_mapper=column_mapper, - additional_cols=additional_cols, - partition_size=partition_size, **kwargs, ) diff --git a/tests/tape_tests/conftest.py b/tests/tape_tests/conftest.py index c0af84c3..ec6b4521 100644 --- a/tests/tape_tests/conftest.py +++ b/tests/tape_tests/conftest.py @@ -233,6 +233,7 @@ def parquet_ensemble_without_client(): return ens + @pytest.fixture def parquet_files_and_ensemble_without_client(): """Create an Ensemble from parquet data without a dask client.""" @@ -246,12 +247,10 @@ def parquet_files_and_ensemble_without_client(): err_col="psFluxErr", band_col="filterName", ) - ens = ens.from_parquet( - source_file, - object_file, - column_mapper=colmap) + ens = ens.from_parquet(source_file, object_file, column_mapper=colmap) return ens, source_file, object_file, colmap + # pylint: disable=redefined-outer-name @pytest.fixture def parquet_ensemble(dask_client): @@ -270,6 +269,25 @@ def parquet_ensemble(dask_client): return ens +# pylint: disable=redefined-outer-name +@pytest.fixture +def parquet_ensemble_partition_size(dask_client): + """Create an Ensemble from parquet data.""" + ens = Ensemble(client=dask_client) + ens.from_parquet( + "tests/tape_tests/data/source/test_source.parquet", + "tests/tape_tests/data/object/test_object.parquet", + id_col="ps1_objid", + time_col="midPointTai", + band_col="filterName", + flux_col="psFlux", + err_col="psFluxErr", + partition_size="1MB", + ) + + return ens + + # pylint: disable=redefined-outer-name @pytest.fixture def parquet_ensemble_with_divisions(dask_client): @@ -386,6 +404,34 @@ def dask_dataframe_ensemble(dask_client): return ens +# pylint: disable=redefined-outer-name +@pytest.fixture +def dask_dataframe_ensemble_partition_size(dask_client): + """Create an Ensemble from parquet data.""" + ens = Ensemble(client=dask_client) + + num_points = 1000 + all_bands = np.array(["r", "g", "b", "i"]) + rows = { + "id": 8000 + (np.arange(num_points) % 5), + "time": np.arange(num_points), + "flux": np.arange(num_points) % len(all_bands), + "band": np.repeat(all_bands, num_points / len(all_bands)), + "err": 0.1 * (np.arange(num_points) % 10), + "count": np.arange(num_points), + "something_else": np.full(num_points, None), + } + cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="err", band_col="band") + + ens.from_dask_dataframe( + source_frame=dd.from_dict(rows, npartitions=1), + column_mapper=cmap, + partition_size="1MB", + ) + + return ens + + # pylint: disable=redefined-outer-name @pytest.fixture def dask_dataframe_with_object_ensemble(dask_client): @@ -490,6 +536,7 @@ def pandas_with_object_ensemble(dask_client): return ens + # pylint: disable=redefined-outer-name @pytest.fixture def ensemble_from_source_dict(dask_client): @@ -511,4 +558,4 @@ def ensemble_from_source_dict(dask_client): cmap = ColumnMapper(id_col="id", time_col="time", flux_col="flux", err_col="error", band_col="band") ens.from_source_dict(source_dict, column_mapper=cmap) - return ens, source_dict \ No newline at end of file + return ens, source_dict diff --git a/tests/tape_tests/test_ensemble.py b/tests/tape_tests/test_ensemble.py index 7fc52c2e..c2a8945c 100644 --- a/tests/tape_tests/test_ensemble.py +++ b/tests/tape_tests/test_ensemble.py @@ -50,6 +50,7 @@ def test_with_client(): "parquet_ensemble_from_hipscat", "parquet_ensemble_with_column_mapper", "parquet_ensemble_with_known_column_mapper", + "parquet_ensemble_partition_size", "read_parquet_ensemble", "read_parquet_ensemble_without_client", "read_parquet_ensemble_from_source", @@ -102,6 +103,7 @@ def test_parquet_construction(data_fixture, request): "data_fixture", [ "dask_dataframe_ensemble", + "dask_dataframe_ensemble_partition_size", "dask_dataframe_with_object_ensemble", "pandas_ensemble", "pandas_with_object_ensemble", @@ -533,7 +535,7 @@ def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols, u dircontents = os.listdir(os.path.join(save_path, "ensemble")) assert "source" in dircontents # Source should always be there - assert "metadata.json" in dircontents # should make a metadata file + assert "ensemble_metadata.json" in dircontents # should make a metadata file if obj_nocols: # object shouldn't if it was empty assert "object" not in dircontents else: # otherwise it should be present