Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save to parquet #343

Merged
merged 15 commits into from
Jan 11, 2024
133 changes: 97 additions & 36 deletions src/tape/ensemble.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import glob
import os
import json
import shutil
import warnings
import requests
import dask.dataframe as dd
Expand Down Expand Up @@ -1284,19 +1286,32 @@
# Determine the path
ens_path = os.path.join(path, dirname)

# First look for an existing metadata.json file in the path
try:
with open(os.path.join(ens_path, "metadata.json"), "r") as oldfile:
Copy link
Collaborator

@wilsonbb wilsonbb Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can probably remove filename "metadata.json" into METADATA_FILENAME constant or equivalent

Also maybe a more descriptive name like "ensemble_metadata.json" might be less likely to clash with random files in some user's directory

# Reading from json file
old_metadata = json.load(oldfile)
old_subdirs = old_metadata["subdirs"]
# Delete any old subdirectories
for subdir in old_subdirs:
shutil.rmtree(os.path.join(ens_path, subdir))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're not able to delete a subdirectory, do we want to explicitly handle the exception and provide additional logging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that whatever error shutils would provide would probably be as descriptive as anything we'd provide? Also not sure how easy it would be to test this, setting permissions mid-test seems messy...

except FileNotFoundError:
pass

# Compile frame list
if additional_frames is True:
frames_to_save = list(self.frames.keys()) # save all frames
elif additional_frames is False:
frames_to_save = ["object", "source"] # save just object and source
elif isinstance(additional_frames, Iterable):
frames_to_save = [frame for frame in additional_frames if frame in list(self.frames.keys())]

frames_to_save = set(additional_frames)
invalid_frames = frames_to_save.difference(set(self.frames.keys()))
# Raise an error if any frames were not found in the frame list
if len(frames_to_save) != len(additional_frames):
if len(invalid_frames) != 0:
raise ValueError(
"One or more frames specified in `additional_frames` was not found in the frame list."
f"The frame(s): {invalid_frames} specified in `additional_frames` were not found in the frame list."
)
frames_to_save = list(frames_to_save)

# Make sure object and source are in the frame list
if "object" not in frames_to_save:
Expand All @@ -1307,29 +1322,49 @@
raise ValueError("Invalid input to `additional_frames`, must be boolean or list-like")

# Save the frame list to disk
created_subdirs = [] # track the list of created subdirectories
divisions_known = [] # log whether divisions were known for each frame
for frame_label in frames_to_save:
# grab the dataframe from the frame label
frame = self.frames[frame_label]

# Object can have no columns, which parquet doesn't handle
# In this case, we'll avoid saving to parquet
if frame_label == "object":
if len(frame.columns) == 0:
print("The Object Frame was not saved as no columns were present.")
continue
# When the frame has no columns, avoid the save as parquet doesn't handle it
# Most commonly this applies to the object table when it's built from source
if len(frame.columns) == 0:
print(f"Frame: {frame_label} was not saved as no columns were present.")
continue

# creates a subdirectory for the frame partition files
frame.to_parquet(os.path.join(ens_path, frame_label), **kwargs)
frame.to_parquet(os.path.join(ens_path, frame_label), write_metadata_file=True, **kwargs)
created_subdirs.append(frame_label)
divisions_known.append(frame.known_divisions)

# Save a metadata file
col_map = self.make_column_map() # grab the current column_mapper

metadata = {
"subdirs": created_subdirs,
"known_divisions": divisions_known,
"column_mapper": col_map.map,
}
json_metadata = json.dumps(metadata, indent=4)

with open(os.path.join(ens_path, "metadata.json"), "w") as outfile:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should write the new metadata file before doing any saving to parquet.

My thinking is that in the case where:

  • user wants to save a large ensemble
  • some dataframes have been saved
  • a crash error occurs while saving a subsequent result dataframe
  • user runs save_ensemble later in the same folder but with differently named result dataframes

old subdirectories won't be cleaned up since no metadata file was written. Admittedly a bit of an edge case and also a minor pain since we would have to loop over the frames/subdirs twice

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the parquet saving to the very end of the function in latest commit

outfile.write(json_metadata)

# Save a ColumnMapper file
col_map = self.make_column_map()
np.save(os.path.join(ens_path, "column_mapper.npy"), col_map.map)
# np.save(os.path.join(ens_path, "column_mapper.npy"), col_map.map)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we left this line in :)


print(f"Saved to {os.path.join(path, dirname)}")

return

def from_ensemble(self, dirpath, additional_frames=True, column_mapper=None, **kwargs):
def from_ensemble(
self,
dirpath,
additional_frames=True,
column_mapper=None,
**kwargs,
):
"""Load an ensemble from an on-disk ensemble.

Parameters
Expand All @@ -1355,33 +1390,55 @@
The ensemble object.
"""

# First grab the column_mapper if not specified
if column_mapper is None:
map_dict = np.load(os.path.join(dirpath, "column_mapper.npy"), allow_pickle="TRUE").item()
column_mapper = ColumnMapper()
column_mapper.map = map_dict
# Read in the metadata.json file
with open(os.path.join(dirpath, "metadata.json"), "r") as metadatafile:
# Reading from json file
metadata = json.load(metadatafile)

# Load in the metadata
subdirs = metadata["subdirs"]
frame_known_divisions = metadata["known_divisions"]
if column_mapper is None:
column_mapper = ColumnMapper()
column_mapper.map = metadata["column_mapper"]

# Load Object and Source
obj_path = os.path.join(dirpath, "object")
src_path = os.path.join(dirpath, "source")

# Check for whether or not object is present, it's not saved when no columns are present
if "object" in os.listdir(dirpath):
self.from_parquet(src_path, obj_path, column_mapper=column_mapper, **kwargs)
if "object" in subdirs:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: forgot to reference this earlier, but we can use the SOURCE_FRAME_LABEL and OBJECT_FRAME_LABEL instead of "object" and "source" everywhere

# divisions should be known for both tables to use the sorted kwarg
use_sorted = (
frame_known_divisions[subdirs.index("object")]
and frame_known_divisions[subdirs.index("source")]
)

self.from_parquet(
os.path.join(dirpath, "source"),
os.path.join(dirpath, "object"),
column_mapper=column_mapper,
sorted=use_sorted,
sort=False,
sync_tables=False, # a sync should always be performed just before saving
**kwargs,
)
else:
self.from_parquet(src_path, column_mapper=column_mapper, **kwargs)
use_sorted = frame_known_divisions[subdirs.index("source")]
self.from_parquet(
os.path.join(dirpath, "source"),
column_mapper=column_mapper,
sorted=use_sorted,
sort=False,
sync_tables=False, # a sync should always be performed just before saving
**kwargs,
)

# Load all remaining frames
if additional_frames is False:
return self # we are all done
else:
if additional_frames is True:
# Grab all subdirectory paths in the top-level folder, filter out any files
frames_to_load = [
os.path.join(dirpath, f)
for f in os.listdir(dirpath)
if not os.path.isfile(os.path.join(dirpath, f))
]
frames_to_load = [os.path.join(dirpath, f) for f in subdirs]
elif isinstance(additional_frames, Iterable):
frames_to_load = [os.path.join(dirpath, frame) for frame in additional_frames]
else:
Expand All @@ -1394,7 +1451,10 @@
if len(frames_to_load) > 0:
for frame in frames_to_load:
label = os.path.split(frame)[1]
ddf = EnsembleFrame.from_parquet(frame, label=label, **kwargs)
use_divisions = frame_known_divisions[subdirs.index(label)]
ddf = EnsembleFrame.from_parquet(
frame, label=label, calculate_divisions=use_divisions, **kwargs
)
self.add_frame(ddf, label)

return self
Expand Down Expand Up @@ -1499,6 +1559,12 @@
self._load_column_mapper(column_mapper, **kwargs)
source_frame = SourceFrame.from_dask_dataframe(source_frame, self)

# Repartition before any sorting
if npartitions and npartitions > 1:
source_frame = source_frame.repartition(npartitions=npartitions)
elif partition_size:
source_frame = source_frame.repartition(partition_size=partition_size)

Check warning on line 1566 in src/tape/ensemble.py

View check run for this annotation

Codecov / codecov/patch

src/tape/ensemble.py#L1566

Added line #L1566 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just add a partition_size pytest parameter and we can get rid of the code coverage warning here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding two conftest.py functions (one for from_parquet and one for from_dask_dataframes) as I think all the parameters for these loader functions live there


# Set the index of the source frame and save the resulting table
self.update_frame(source_frame.set_index(self._id_col, drop=True, sorted=sorted, sort=sort))

Expand All @@ -1515,11 +1581,6 @@
self.object.set_dirty(True)
self._sync_tables()

if npartitions and npartitions > 1:
self.source = self.source.repartition(npartitions=npartitions)
elif partition_size:
self.source = self.source.repartition(partition_size=partition_size)

# Check that Divisions are established, warn if not.
for name, table in [("object", self.object), ("source", self.source)]:
if not table.known_divisions:
Expand Down
15 changes: 2 additions & 13 deletions src/tape/ensemble_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,14 +844,7 @@ def convert_flux_to_mag(
return result

@classmethod
def from_parquet(
cl,
path,
index=None,
columns=None,
label=None,
ensemble=None,
):
def from_parquet(cl, path, index=None, columns=None, label=None, ensemble=None, **kwargs):
"""Returns an EnsembleFrame constructed from loading a parquet file.
Parameters
----------
Expand Down Expand Up @@ -879,11 +872,7 @@ def from_parquet(
# Read the parquet file with an engine that will assume the meta is a TapeFrame which Dask will
# instantiate as EnsembleFrame via its dispatcher.
result = dd.read_parquet(
path,
index=index,
columns=columns,
split_row_groups=True,
engine=TapeArrowEngine,
path, index=index, columns=columns, split_row_groups=True, engine=TapeArrowEngine, **kwargs
)
result.label = label
result.ensemble = ensemble
Expand Down
34 changes: 31 additions & 3 deletions src/tape/ensemble_readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@
from tape.utils import ColumnMapper


def read_ensemble(dirpath, additional_frames=True, column_mapper=None, dask_client=True, **kwargs):
def read_ensemble(
dirpath,
additional_frames=True,
column_mapper=None,
dask_client=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: move dask_client to reflect Parameter order in the doc string (or vice versa)

additional_cols=True,
partition_size=None,
**kwargs,
):
"""Load an ensemble from an on-disk ensemble.

Parameters
Expand All @@ -29,6 +37,19 @@ def read_ensemble(dirpath, additional_frames=True, column_mapper=None, dask_clie
Supplies a ColumnMapper to the Ensemble, if None (default) searches
for a column_mapper.npy file in the directory, which should be
created when the ensemble is saved.
additional_cols: 'bool', optional
Boolean to indicate whether to carry in columns beyond the
critical columns, true will, while false will only load the columns
containing the critical quantities (id,time,flux,err,band)
partition_size: `int`, optional
If specified, attempts to repartition the ensemble to partitions
of size `partition_size`.
sorted: bool, optional
If the index column is already sorted in increasing order.
Defaults to False
sort: `bool`, optional
If True, sorts the DataFrame by the id column. Otherwise set the
index on the individual existing partitions. Defaults to False.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want sort and sorted to be explicit parameters in the function definition? Or did we mean to remove these from the docstring?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, forgot to remove these from ensemble_readers.py as well!

dask_client: `dask.distributed.client` or `bool`, optional
Accepts an existing `dask.distributed.Client`, or creates one if
`client=True`, passing any additional kwargs to a
Expand All @@ -41,9 +62,16 @@ def read_ensemble(dirpath, additional_frames=True, column_mapper=None, dask_clie
An ensemble object.
"""

new_ens = Ensemble(dask_client, **kwargs)
new_ens = Ensemble(dask_client)

new_ens.from_ensemble(dirpath, additional_frames=additional_frames, column_mapper=column_mapper, **kwargs)
new_ens.from_ensemble(
dirpath,
additional_frames=additional_frames,
column_mapper=column_mapper,
additional_cols=additional_cols,
partition_size=partition_size,
**kwargs,
)

return new_ens

Expand Down
29 changes: 28 additions & 1 deletion tests/tape_tests/test_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ def test_read_source_dict(dask_client):
@pytest.mark.parametrize("obj_nocols", [True, False])
@pytest.mark.parametrize("use_reader", [False, True])
def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols, use_reader):
"""Test the save and load ensemble loop"""
# Setup a temporary directory for files
save_path = tmp_path / "."

Expand Down Expand Up @@ -532,7 +533,7 @@ def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols, u
dircontents = os.listdir(os.path.join(save_path, "ensemble"))

assert "source" in dircontents # Source should always be there
assert "column_mapper.npy" in dircontents # should make a column_mapper file
assert "metadata.json" in dircontents # should make a metadata file
if obj_nocols: # object shouldn't if it was empty
assert "object" not in dircontents
else: # otherwise it should be present
Expand Down Expand Up @@ -586,6 +587,32 @@ def test_save_and_load_ensemble(dask_client, tmp_path, add_frames, obj_nocols, u
loaded_ens.from_ensemble(os.path.join(save_path, "ensemble"), additional_frames=3)


def test_save_overwrite(parquet_ensemble, tmp_path):
"""Test that successive saves produce the correct behavior"""
# Setup a temporary directory for files
save_path = tmp_path / "."

ens = parquet_ensemble

# Add a few result frames
ens.batch(np.mean, "psFlux", label="mean")
ens.batch(np.max, "psFlux", label="max")

# Write first with all frames
ens.save_ensemble(save_path, dirname="ensemble", additional_frames=True)

# Inspect the save directory
dircontents = os.listdir(os.path.join(save_path, "ensemble"))
assert "max" in dircontents # "max" should have been added

# Then write again with "max" not included
ens.save_ensemble(save_path, dirname="ensemble", additional_frames=["mean"])

# Inspect the save directory
dircontents = os.listdir(os.path.join(save_path, "ensemble"))
assert "max" not in dircontents # "max" should have been removed


def test_insert(parquet_ensemble):
num_partitions = parquet_ensemble.source.npartitions
(old_object, old_source) = parquet_ensemble.compute()
Expand Down