Skip to content

Commit

Permalink
remove extraneous files
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed May 26, 2022
1 parent 0f6223a commit afd6dc3
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 132 deletions.
2 changes: 1 addition & 1 deletion docs/sphinx/sections/api/apidocs/assets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ A software-defined asset combines:

.. autoclass:: SourceAsset

.. autoconfigurable:: fs_asset_io_manager
.. autoconfigurable:: fs_io_manager
:annotation: IOManagerDefinition
9 changes: 2 additions & 7 deletions python_modules/dagster/dagster/core/storage/fs_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,12 @@ def __init__(self, base_dir=None):

def _get_path(self, context: Union[InputContext, OutputContext]) -> str:
"""Automatically construct filepath."""
<<<<<<< HEAD
if context.has_asset_key:
path = context.get_asset_output_identifier()
path = context.get_asset_identifier()
else:
path = context.get_output_identifier()
path = context.get_identifier()

return os.path.join(self.base_dir, *path)
=======
identifier = context.get_identifier()
return os.path.join(self.base_dir, *identifier)
>>>>>>> d072b4102d60f2a65a0e6709e55e8226a384ef0f

def has_output(self, context):
filepath = self._get_path(context)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,21 @@

import pytest

from dagster import MetadataValue, ModeDefinition, execute_pipeline, graph, op, pipeline, solid
from dagster import (
AssetKey,
DailyPartitionsDefinition,
MetadataValue,
ModeDefinition,
Out,
Output,
StaticPartitionsDefinition,
execute_pipeline,
graph,
op,
pipeline,
solid,
)
from dagster.core.asset_defs import AssetGroup, AssetIn, asset, build_assets_job, multi_asset
from dagster.core.definitions.version_strategy import VersionStrategy
from dagster.core.errors import DagsterInvariantViolationError
from dagster.core.execution.api import create_execution_plan
Expand Down Expand Up @@ -178,3 +192,119 @@ def recursion_limit_graph():
match=r"Object .* exceeds recursion limit and is not picklable. .*",
):
recursion_job.execute_in_process(instance=instance)


def get_assets_job(io_manager_def, partitions_def=None):
asset1_namespace = ["one", "two", "three"]

@asset(namespace=["one", "two", "three"], partitions_def=partitions_def)
def asset1():
return [1, 2, 3]

@asset(
namespace=["four", "five"],
ins={"asset1": AssetIn(namespace=asset1_namespace)},
partitions_def=partitions_def,
)
def asset2(asset1):
return asset1 + [4]

return build_assets_job(
name="a", assets=[asset1, asset2], resource_defs={"io_manager": io_manager_def}
)


def test_fs_io_manager():
with tempfile.TemporaryDirectory() as tmpdir_path:
io_manager_def = fs_io_manager.configured({"base_dir": tmpdir_path})
job_def = get_assets_job(io_manager_def)

result = job_def.execute_in_process()
assert result.success

handled_output_events = list(
filter(lambda evt: evt.is_handled_output, result.all_node_events)
)
assert len(handled_output_events) == 2

filepath_a = os.path.join(tmpdir_path, "one", "two", "three", "asset1")
assert os.path.isfile(filepath_a)
with open(filepath_a, "rb") as read_obj:
assert pickle.load(read_obj) == [1, 2, 3]

loaded_input_events = list(filter(lambda evt: evt.is_loaded_input, result.all_node_events))
assert len(loaded_input_events) == 1
assert loaded_input_events[0].event_specific_data.upstream_step_key.endswith("asset1")

filepath_b = os.path.join(tmpdir_path, "four", "five", "asset2")
assert os.path.isfile(filepath_b)
with open(filepath_b, "rb") as read_obj:
assert pickle.load(read_obj) == [1, 2, 3, 4]


def test_fs_io_manager_partitioned():
with tempfile.TemporaryDirectory() as tmpdir_path:
io_manager_def = fs_io_manager.configured({"base_dir": tmpdir_path})
job_def = get_assets_job(
io_manager_def, partitions_def=DailyPartitionsDefinition(start_date="2020-02-01")
)

result = job_def.execute_in_process(partition_key="2020-05-03")
assert result.success

handled_output_events = list(
filter(lambda evt: evt.is_handled_output, result.all_node_events)
)
assert len(handled_output_events) == 2

filepath_a = os.path.join(tmpdir_path, "one", "two", "three", "asset1", "2020-05-03")
assert os.path.isfile(filepath_a)
with open(filepath_a, "rb") as read_obj:
assert pickle.load(read_obj) == [1, 2, 3]

loaded_input_events = list(filter(lambda evt: evt.is_loaded_input, result.all_node_events))
assert len(loaded_input_events) == 1
assert loaded_input_events[0].event_specific_data.upstream_step_key.endswith("asset1")

filepath_b = os.path.join(tmpdir_path, "four", "five", "asset2", "2020-05-03")
assert os.path.isfile(filepath_b)
with open(filepath_b, "rb") as read_obj:
assert pickle.load(read_obj) == [1, 2, 3, 4]


def test_fs_io_manager_partitioned_multi_asset():
with tempfile.TemporaryDirectory() as tmpdir_path:
io_manager_def = fs_io_manager.configured({"base_dir": tmpdir_path})

partitions = StaticPartitionsDefinition(["A"])

@multi_asset(
partitions_def=partitions,
outs={
"out_1": Out(asset_key=AssetKey("upstream_asset_1")),
"out_2": Out(asset_key=AssetKey("upstream_asset_2")),
},
)
def upstream_asset():
return (Output(1, output_name="out_1"), Output(2, output_name="out_2"))

@asset(
partitions_def=partitions,
)
def downstream_asset(upstream_asset_1: int) -> int:
del upstream_asset_1
return 2

group = AssetGroup(
[upstream_asset, downstream_asset], resource_defs={"io_manager": io_manager_def}
)

job = group.build_job(name="TheJob")

result = job.execute_in_process(partition_key="A")
assert result.success

handled_output_events = list(
filter(lambda evt: evt.is_handled_output, result.all_node_events)
)
assert len(handled_output_events) == 3

0 comments on commit afd6dc3

Please sign in to comment.