Skip to content

Commit

Permalink
add new statuses for backfill completed success and failure
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 13, 2024
1 parent c3cdb63 commit b768ccf
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 28 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class GrapheneBulkActionStatus(graphene.Enum):
FAILED = "FAILED"
CANCELED = "CANCELED"
CANCELING = "CANCELING"
COMPLETED_SUCCESS = "COMPLETED_SUCCESS"
COMPLETED_FAILED = "COMPLETED_FAILED"

class Meta:
name = "BulkActionStatus"
Expand All @@ -130,6 +132,10 @@ def to_dagster_run_status(self) -> GrapheneRunStatus:
return GrapheneRunStatus.STARTED # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.COMPLETED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.SUCCESS # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.COMPLETED_SUCCESS.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.SUCCESS # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.COMPLETED_FAILED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.FAILURE # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.FAILED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.FAILURE # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.CANCELED.value: # pyright: ignore[reportAttributeAccessIssue]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,14 +747,16 @@ def test_backfill_run_stats(self, graphql_context):
backfill = graphql_context.instance.get_backfill(backfill_id)

# Artificially mark the backfill as complete - verify run status is INCOMPLETE until the runs all succeed
graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS)
)

result = execute_dagster_graphql(
graphql_context,
PARTITION_PROGRESS_QUERY,
variables={"backfillId": backfill_id},
)
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_SUCCESS"

def test_asset_job_backfill_run_stats(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
Expand Down Expand Up @@ -1004,7 +1006,9 @@ def test_backfill_run_completed(self, graphql_context):

backfill = graphql_context.instance.get_backfill(backfill_id)

graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS)
)

_seed_runs(
graphql_context,
Expand All @@ -1026,7 +1030,7 @@ def test_backfill_run_completed(self, graphql_context):
assert not result.errors
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_SUCCESS"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4

run_stats = _get_run_stats(
Expand Down Expand Up @@ -1060,7 +1064,9 @@ def test_backfill_run_incomplete(self, graphql_context):

backfill = graphql_context.instance.get_backfill(backfill_id)

graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_FAILED)
)

_seed_runs(
graphql_context,
Expand All @@ -1082,7 +1088,7 @@ def test_backfill_run_incomplete(self, graphql_context):
assert not result.errors
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_FAILED"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4
assert len(result.data["partitionBackfillOrError"]["cancelableRuns"]) == 1
run_stats = _get_run_stats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _create_backfill(graphql_context) -> str:
backfill = PartitionBackfill(
backfill_id=make_new_backfill_id(),
serialized_asset_backfill_data="foo", # the content of the backfill doesn't matter for testing fetching mega runs
status=BulkActionStatus.COMPLETED,
status=BulkActionStatus.COMPLETED_SUCCESS,
reexecution_steps=None,
tags=None,
backfill_timestamp=get_current_timestamp(),
Expand Down
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/_cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ def _execute_backfill_command_at_location(
if dagster_run:
instance.submit_run(dagster_run.run_id, workspace)

# TODO - figure out what to do here
instance.add_backfill(backfill_job.with_status(BulkActionStatus.COMPLETED))

print_fn(f"Launched backfill job `{backfill_id}`")
Expand Down
12 changes: 9 additions & 3 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,9 +1032,15 @@ def execute_asset_backfill_iteration(
# failure, or cancellation). Since the AssetBackfillData object stores materialization states
# per asset partition, the daemon continues to update the backfill data until all runs have
# finished in order to display the final partition statuses in the UI.
updated_backfill: PartitionBackfill = updated_backfill.with_status(
BulkActionStatus.COMPLETED
)
if (
updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
> 0
):
updated_backfill = updated_backfill.with_status(BulkActionStatus.COMPLETED_FAILED)
else:
updated_backfill: PartitionBackfill = updated_backfill.with_status(
BulkActionStatus.COMPLETED_SUCCESS
)
instance.update_backfill(updated_backfill)

new_materialized_partitions = (
Expand Down
6 changes: 4 additions & 2 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
@whitelist_for_serdes
class BulkActionStatus(Enum):
REQUESTED = "REQUESTED"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
COMPLETED = "COMPLETED" # deprecated. Use COMPLETED_SUCCESS or COMPLETED_FAILED instead
FAILED = "FAILED" # denotes when there is a daemon failure, or some other issue processing the backfill
CANCELING = "CANCELING"
CANCELED = "CANCELED"
COMPLETED_SUCCESS = "COMPLETED_SUCCESS"
COMPLETED_FAILED = "COMPLETED_FAILED" # denotes that the backfill daemon completed successfully, but some runs failed

@staticmethod
def from_graphql_input(graphql_str):
Expand Down
15 changes: 14 additions & 1 deletion python_modules/dagster/dagster/_core/execution/job_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,20 @@ def execute_job_backfill_iteration(
f"Backfill completed for {backfill.backfill_id} for"
f" {len(partition_names)} partitions"
)
instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
if (
len(
instance.get_run_records(
filters=RunsFilter(
tags=DagsterRun.tags_for_backfill_id(backfill.backfill_id),
statuses=[DagsterRunStatus.FAILURE, DagsterRunStatus.CANCELED],
)
)
)
> 0
):
instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED_FAILED))
else:
instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS))
yield None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ def test_job_backfill_status(
assert instance.get_runs_count() == 3
backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


@pytest.mark.skipif(IS_WINDOWS, reason="flaky in windows")
Expand Down Expand Up @@ -1250,7 +1250,7 @@ def test_pure_asset_backfill(
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill("backfill_with_asset_selection")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_backfill_from_failure_for_subselection(
Expand Down Expand Up @@ -2044,7 +2044,7 @@ def test_asset_job_backfill_single_run_multiple_iterations(
assert instance.get_runs_count() == 1
backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_asset_job_backfill_multi_run(
Expand Down Expand Up @@ -2656,7 +2656,7 @@ def override_backfill_storage_setting(self):
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill("backfill_with_asset_selection")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

# set num_lines high so we know we get all of the remaining logs
os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "100"
Expand Down Expand Up @@ -2726,7 +2726,7 @@ def test_asset_backfill_from_asset_graph_subset(
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill("backfill_from_asset_graph_subset")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions(
Expand Down Expand Up @@ -2792,4 +2792,4 @@ def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions(
"backfill_from_asset_graph_subset_with_static_and_time_partitions"
)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_simple(instance: DagsterInstance, external_repo: ExternalRepository):
launch_process.join(timeout=60)
backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


@pytest.mark.skipif(
Expand Down Expand Up @@ -115,7 +115,7 @@ def test_before_submit(

backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
assert instance.get_runs_count() == 3


Expand Down Expand Up @@ -162,5 +162,5 @@ def test_crash_after_submit(

backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
assert instance.get_runs_count() == 3
Original file line number Diff line number Diff line change
Expand Up @@ -1361,7 +1361,7 @@ def test_backfill(self, storage: RunStorage):
backfill = storage.get_backfill(one.backfill_id)
assert backfill == one

storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED))
storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED_SUCCESS))
assert len(storage.get_backfills()) == 1
assert len(storage.get_backfills(status=BulkActionStatus.REQUESTED)) == 0

Expand Down Expand Up @@ -1391,7 +1391,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
assert (
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED])
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED_SUCCESS])
)
)
== 0
Expand All @@ -1400,7 +1400,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
len(
storage.get_backfills(
filters=BulkActionsFilter(
statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED]
statuses=[BulkActionStatus.COMPLETED_SUCCESS, BulkActionStatus.REQUESTED]
)
)
)
Expand All @@ -1411,7 +1411,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
)
assert backfills[0] == one

storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED))
storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED_SUCCESS))
assert (
len(
storage.get_backfills(
Expand All @@ -1423,7 +1423,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
assert (
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED])
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED_SUCCESS])
)
)
== 1
Expand All @@ -1443,7 +1443,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
len(
storage.get_backfills(
filters=BulkActionsFilter(
statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED]
statuses=[BulkActionStatus.COMPLETED_SUCCESS, BulkActionStatus.REQUESTED]
)
)
)
Expand Down

0 comments on commit b768ccf

Please sign in to comment.