Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle op outputs in default asset IO manager #8074

Merged
merged 7 commits into from
May 31, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
handle op outputs in default asset io manager
clairelin135 committed May 25, 2022
commit 032c347e15ccaa0cc721cd2a4dcd6dd3a70da8ab
Original file line number Diff line number Diff line change
@@ -81,4 +81,8 @@ def asset2(asset1):

class AssetPickledObjectFilesystemIOManager(PickledObjectFilesystemIOManager):
def _get_path(self, context):
return os.path.join(self.base_dir, *context.get_asset_output_identifier())
if context.has_asset_key:
path = context.get_asset_output_identifier()
else:
path = [context.run_id, context.step_key]
Copy link
Contributor

@OwenKephart OwenKephart May 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should this be context.get_output_identifier() for parity with PickledObject...

maybe even return super()._get_path(context), but I'm not picky on that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! didn't realize that function existed

return os.path.join(self.base_dir, *path)
Original file line number Diff line number Diff line change
@@ -16,7 +16,9 @@
Output,
ResourceDefinition,
StaticPartitionsDefinition,
execute_pipeline,
graph,
in_process_executor,
io_manager,
multi_asset,
op,
@@ -1233,3 +1235,41 @@ def my_derived_asset(my_source_asset):

result = source_asset_job.execute_in_process(asset_selection=[AssetKey("my_derived_asset")])
assert result.success


def test_op_outputs_with_default_asset_io_mgr():
@op
def return_stuff():
return 12

@op
def transform(data):
assert data == 12
return data * 2

@op
def one_more_transformation(transformed_data):
assert transformed_data == 24
return transformed_data + 1

@graph(
out={
"asset_1": GraphOut(),
"asset_2": GraphOut(),
},
)
def complicated_graph():
result = return_stuff()
return one_more_transformation(transform(result)), transform(result)

@asset
def my_asset(asset_1):
assert asset_1 == 25
return asset_1

my_job = AssetGroup(
[AssetsDefinition.from_graph(complicated_graph), my_asset],
).build_job("my_job", executor_def=in_process_executor)

result = execute_pipeline(my_job)
assert result.success