Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save to parquet #343

Merged
merged 15 commits into from
Jan 11, 2024
Merged

Save to parquet #343

merged 15 commits into from
Jan 11, 2024

Conversation

dougbrn
Copy link
Collaborator

@dougbrn dougbrn commented Jan 8, 2024

Change Description

Resolves #151.

  • My PR includes a link to the issue that I am addressing

Solution Description

Code Quality

  • My code builds (or compiles) cleanly without any errors or warnings
  • My code contains relevant comments and necessary documentation

Project-Specific Pull Request Checklists

  • I have added a function that requires a sync_tables command, and have added the neccesary sync_tables call

Bug Fix Checklist

  • My fix includes a new test that breaks as a result of the bug (if possible)
  • My change includes a breaking change
    • My change includes backwards compatibility and deprecation warnings (if possible)

New Feature Checklist

  • I have added or updated the docstrings associated with my feature using the NumPy docstring format
  • I have updated the tutorial to highlight my new feature (if appropriate)
  • I have added unit/End-to-End (E2E) test cases to cover my new feature
  • My change includes a breaking change
    • My change includes backwards compatibility and deprecation warnings (if possible)

Documentation Change Checklist

Build/CI Change Checklist

  • If required or optional dependencies have changed (including version numbers), I have updated the README to reflect this
  • If this is a new CI setup, I have added the associated badge to the README

Other Change Checklist

  • Any new or updated docstrings use the NumPy docstring format.
  • I have updated the tutorial to highlight my new feature (if appropriate)
  • I have added unit/End-to-End (E2E) test cases to cover any changes
  • My change includes a breaking change
    • My change includes backwards compatibility and deprecation warnings (if possible)

Copy link

github-actions bot commented Jan 8, 2024

Before [1644284] <v0.3.1> After [3cf44d1] Ratio Benchmark (Parameter)
38.7±0.5ms 39.0±0.2ms 1.01 benchmarks.time_batch
41.3±0.2ms 41.5±0.2ms 1 benchmarks.time_prune_sync_workflow

Click here to view all benchmarks.

Copy link

codecov bot commented Jan 8, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (1644284) 94.66% compared to head (fcf67da) 94.99%.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #343      +/-   ##
==========================================
+ Coverage   94.66%   94.99%   +0.32%     
==========================================
  Files          24       24              
  Lines        1557     1638      +81     
==========================================
+ Hits         1474     1556      +82     
+ Misses         83       82       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@dougbrn dougbrn marked this pull request as ready for review January 10, 2024 19:16
@dougbrn dougbrn requested a review from wilsonbb January 10, 2024 19:27
elif additional_frames is False:
frames_to_save = ["object", "source"] # save just object and source
elif isinstance(additional_frames, Iterable):
frames_to_save = [frame for frame in additional_frames if frame in list(self.frames.keys())]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can use sets here and give the user a more helpful error message

frames_to_save = set(additional_frames)
invalid_frames = frames_to_save.difference(set(self.frames.keys())

# Raise error and tell user the invalid frames
if len(invalid_frames) == 0:
   ...

frames_to_save.update(["object", "source"])

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in latest commit

know how to work with the directory whether or not the object
subdirectory is present.

Be careful about repeated saves to the same directory name. This will
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given how liberally jupyter users may hit "run all cells", I think this could lead to some easy mistakes.

What about having an "overwrite" parameter?

If True, we will first try to delete the save directory (possibly printing a message if we did so)

If False, we can throw an exception if the save directory already exists noting they should use a different name or set "overwrite=True"

My instinct is that the default value of the parameter should be False.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so we have access to overwrite already via the **kwargs passed along to all the to_parquet calls. What remains is that that will only overwrite any subdirectories (ensemble/object, ensemble/source, ensemble/result, etc.) that were present in both saves. So any previous subdirectories will not be deleted if a new save doesn't include them. I'm really worried about doing any form of delete, just as someone could specify a non-empty directory and I wouldn't want to delete anything else.

So for 1. Maybe the overwrite parameter should graduate out of the **kwargs for visibility? And 2. I'm not sure but maybe we could have save write some kind of subdirs metadata file that lets TAPE know which subdirectories it made, so that overwrite can clean those directories up in subsequent overwrite-enabled saves?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wilson and I talked offline a bit on this. The strategy that we agreed on is to generate a metadata file that knows which subdirectories were produced by the save command. Successive save commands will look for this file and use it to locate which subdirectories in the directory should be removed during the save operation. In particular, we'd be looking for any subdirectories created by a previous save command that will not be overwritten by the current save command. Parquet will handle overwrites for conflicting directories, but we will need ensemble.save_ensemble to catch and remove these frame subdirectories that would otherwise not be touched.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest commit implements this, with the tweak that I ended up opting to just clean all the previous subdirectories up first as it was logically easier.


# Check for whether or not object is present, it's not saved when no columns are present
if "object" in os.listdir(dirpath):
self.from_parquet(src_path, obj_path, column_mapper=column_mapper, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we always use sync_tables=False since we synced object and source before saving?

Also as a general question that may be out of scope for this PR, are we preserving divisions here? If so I guess it would be helpful to use sorted=True if we knew that information?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can always have sync_tables=False, right, good point!

For divisions, these are not being preserved at the moment, which definitely seems like an issue. We can save an ensemble without divisions being known, so the sorted parameter will need to be dependent on that information.
Similar to above, maybe there's a need for a metadata file that should be generated to indicate whether divisions are known that can be read in?

Copy link
Collaborator Author

@dougbrn dougbrn Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just pushed another commit to better expose the sort/sorted kwargs from ensemble.from_parquet so they are usable in these cases. Some kind of metadata may be preferable here still so the user doesn't need to specify at all.

Edit: No longer accurate, refer to next comment

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another update, I now have this working with metadata. ensemble.save_ensemble now stores a boolean per frame in the metadata.json file indicating whether that frame had divisions set. Instead of using the sort/sorted kwargs, I've changed ensemble.from_ensemble to use this to determine whether to calculate divisions or not. In the case of object and source, this is just by setting the sorted flag or not. For the other frames, the sorted flag is only applicable to the set_index call that ensemble.from_parquet folds into. So this PR now has the frames generate a parquet metadata file which populates divisions information if it's available, and the reading of additional frames now uses the parquet metadata file.

I experimented with trying to get our ensemble.from_parquet to use these _metadata files, but it would be a non-trivial change in the logic that felt out of scope for this PR. It's potentially worth logging for the future as using the _metadata files would avoid having to calculate the min/max values per partition to get divisions, but on the other hand these files can get quite large for large datasets so they may have issues at scale.

@dougbrn dougbrn requested a review from wilsonbb January 11, 2024 21:50
Copy link
Collaborator

@wilsonbb wilsonbb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have some questions and suggestions but overall looks good to me! Thanks Doug!

# Save a ColumnMapper file
col_map = self.make_column_map()
np.save(os.path.join(ens_path, "column_mapper.npy"), col_map.map)
# np.save(os.path.join(ens_path, "column_mapper.npy"), col_map.map)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we left this line in :)

@@ -1284,19 +1286,32 @@ def save_ensemble(self, path=".", dirname="ensemble", additional_frames=True, **
# Determine the path
ens_path = os.path.join(path, dirname)

# First look for an existing metadata.json file in the path
try:
with open(os.path.join(ens_path, "metadata.json"), "r") as oldfile:
Copy link
Collaborator

@wilsonbb wilsonbb Jan 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can probably remove filename "metadata.json" into METADATA_FILENAME constant or equivalent

Also maybe a more descriptive name like "ensemble_metadata.json" might be less likely to clash with random files in some user's directory


# Check for whether or not object is present, it's not saved when no columns are present
if "object" in os.listdir(dirpath):
self.from_parquet(src_path, obj_path, column_mapper=column_mapper, **kwargs)
if "object" in subdirs:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: forgot to reference this earlier, but we can use the SOURCE_FRAME_LABEL and OBJECT_FRAME_LABEL instead of "object" and "source" everywhere

old_subdirs = old_metadata["subdirs"]
# Delete any old subdirectories
for subdir in old_subdirs:
shutil.rmtree(os.path.join(ens_path, subdir))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're not able to delete a subdirectory, do we want to explicitly handle the exception and provide additional logging?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking that whatever error shutils would provide would probably be as descriptive as anything we'd provide? Also not sure how easy it would be to test this, setting permissions mid-test seems messy...

}
json_metadata = json.dumps(metadata, indent=4)

with open(os.path.join(ens_path, "metadata.json"), "w") as outfile:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should write the new metadata file before doing any saving to parquet.

My thinking is that in the case where:

  • user wants to save a large ensemble
  • some dataframes have been saved
  • a crash error occurs while saving a subsequent result dataframe
  • user runs save_ensemble later in the same folder but with differently named result dataframes

old subdirectories won't be cleaned up since no metadata file was written. Admittedly a bit of an edge case and also a minor pain since we would have to loop over the frames/subdirs twice

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the parquet saving to the very end of the function in latest commit

if npartitions and npartitions > 1:
source_frame = source_frame.repartition(npartitions=npartitions)
elif partition_size:
source_frame = source_frame.repartition(partition_size=partition_size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just add a partition_size pytest parameter and we can get rid of the code coverage warning here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding two conftest.py functions (one for from_parquet and one for from_dask_dataframes) as I think all the parameters for these loader functions live there

dirpath,
additional_frames=True,
column_mapper=None,
dask_client=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: move dask_client to reflect Parameter order in the doc string (or vice versa)

Comment on lines 47 to 52
sorted: bool, optional
If the index column is already sorted in increasing order.
Defaults to False
sort: `bool`, optional
If True, sorts the DataFrame by the id column. Otherwise set the
index on the individual existing partitions. Defaults to False.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want sort and sorted to be explicit parameters in the function definition? Or did we mean to remove these from the docstring?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, forgot to remove these from ensemble_readers.py as well!

Copy link
Collaborator

@wilsonbb wilsonbb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! :)

@dougbrn dougbrn merged commit 3c20f13 into main Jan 11, 2024
13 checks passed
@dougbrn dougbrn deleted the save_to_parquet branch February 8, 2024 19:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Save Ensemble to Parquet
2 participants