-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle op outputs in default asset IO manager #8074
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
1 Ignored Deployment
|
if context.has_asset_key: | ||
path = context.get_asset_output_identifier() | ||
else: | ||
path = [context.run_id, context.step_key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should this be context.get_output_identifier() for parity with PickledObject...
maybe even return super()._get_path(context), but I'm not picky on that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! didn't realize that function existed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one comment, otherwise 🚢
If I understand correctly, this makes the behavior of fs_asset_io_manager identical to the behavior of fs_io_manager, when the output is not an asset. Thoughts on just merging them together? I.e. getting rid of fs_asset_io_manager and adding logic inside fs_io_manager? A small advantage is that we'd now have a name that's not as long. Also, we should merge this after this external contribution lands so that we don't force them to rebase: #8007. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.
@@ -131,7 +131,7 @@ def asset2(asset1): | |||
|
|||
return graph.to_job( | |||
resource_defs=merge_dicts( | |||
{"io_manager": fs_asset_io_manager}, all_resource_defs, {"root_manager": root_manager} | |||
{"io_manager": fs_io_manager}, all_resource_defs, {"root_manager": root_manager} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the to_job() function will by default supply fs_io_manager as the default io_manager, so it's possible that we can remove this bit. (not 100% sure)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed ✅
1887771
to
afd6dc3
Compare
@sryza I've adjusted this PR to merge functionality of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Love to see PRs that remove more lines than they add. We'll need to make sure to message this as a breaking change in our CHANGES.md. Also, as followups, we'll need this for the gcs, s3, and azure IO managers as well.
Addresses #7713
This PR adjusts the default
fs_asset_io_manager
to handle op outputs by storing op outputs in a file directory/run_id/step_key
to ensure unique directories for each step.