Skip to content

Commit

Permalink
add BulkActionStatuses COMPLETED_SUCCESS and COMPLETED_FAILURE (#24365)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Adds two new BulkAction statuses `COMPLETED_SUCCESS` and
`COMPLETED_FAILURE` to expand on the existing `COMPLETED` status. This
allows us to store the more fine grained information of if the backfill
completed with some run failures or if the backfill completed with all
runs successful.

## How I Tested These Changes

## Changelog

Insert changelog entry or "NOCHANGELOG" here.

- [X] `NEW` Backfill status will now distinguish between a completed
backfill with all runs successful, and completed backfill with some run
failures. Previously, these were displayed as the same `Completed`
status. This change only applies to new backfills. Backfills that have
already completed will still show a `Completed` status.
  • Loading branch information
jamiedemaria authored Sep 16, 2024
1 parent 8f33967 commit 13dbe4f
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 28 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ const labelForBackfillStatus = (key: BulkActionStatus) => {
return 'Failed';
case BulkActionStatus.REQUESTED:
return 'In progress';
case BulkActionStatus.COMPLETED_SUCCESS:
return 'Success';
case BulkActionStatus.COMPLETED_FAILED:
return 'Failed';
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ export const BackfillStatusTagForPage = ({backfill}: {backfill: BackfillState})
return errorState('Failed');
case BulkActionStatus.COMPLETED:
return <Tag intent="success">Completed</Tag>;
case BulkActionStatus.COMPLETED_SUCCESS:
return <Tag intent="success">Succeeded</Tag>;
case BulkActionStatus.COMPLETED_FAILED:
return errorState('Failed');
default:
return <Tag>{status}</Tag>;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class GrapheneBulkActionStatus(graphene.Enum):
FAILED = "FAILED"
CANCELED = "CANCELED"
CANCELING = "CANCELING"
COMPLETED_SUCCESS = "COMPLETED_SUCCESS"
COMPLETED_FAILED = "COMPLETED_FAILED"

class Meta:
name = "BulkActionStatus"
Expand All @@ -130,6 +132,10 @@ def to_dagster_run_status(self) -> GrapheneRunStatus:
return GrapheneRunStatus.STARTED # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.COMPLETED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.SUCCESS # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.COMPLETED_SUCCESS.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.SUCCESS # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.COMPLETED_FAILED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.FAILURE # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.FAILED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.FAILURE # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.CANCELED.value: # pyright: ignore[reportAttributeAccessIssue]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,14 +747,16 @@ def test_backfill_run_stats(self, graphql_context):
backfill = graphql_context.instance.get_backfill(backfill_id)

# Artificially mark the backfill as complete - verify run status is INCOMPLETE until the runs all succeed
graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS)
)

result = execute_dagster_graphql(
graphql_context,
PARTITION_PROGRESS_QUERY,
variables={"backfillId": backfill_id},
)
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_SUCCESS"

def test_asset_job_backfill_run_stats(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
Expand Down Expand Up @@ -1004,7 +1006,9 @@ def test_backfill_run_completed(self, graphql_context):

backfill = graphql_context.instance.get_backfill(backfill_id)

graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS)
)

_seed_runs(
graphql_context,
Expand All @@ -1026,7 +1030,7 @@ def test_backfill_run_completed(self, graphql_context):
assert not result.errors
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_SUCCESS"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4

run_stats = _get_run_stats(
Expand Down Expand Up @@ -1060,7 +1064,9 @@ def test_backfill_run_incomplete(self, graphql_context):

backfill = graphql_context.instance.get_backfill(backfill_id)

graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_FAILED)
)

_seed_runs(
graphql_context,
Expand All @@ -1082,7 +1088,7 @@ def test_backfill_run_incomplete(self, graphql_context):
assert not result.errors
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_FAILED"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4
assert len(result.data["partitionBackfillOrError"]["cancelableRuns"]) == 1
run_stats = _get_run_stats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _create_backfill(graphql_context) -> str:
backfill = PartitionBackfill(
backfill_id=make_new_backfill_id(),
serialized_asset_backfill_data="foo", # the content of the backfill doesn't matter for testing fetching mega runs
status=BulkActionStatus.COMPLETED,
status=BulkActionStatus.COMPLETED_SUCCESS,
reexecution_steps=None,
tags=None,
backfill_timestamp=get_current_timestamp(),
Expand Down
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/_cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ def _execute_backfill_command_at_location(
if dagster_run:
instance.submit_run(dagster_run.run_id, workspace)

# TODO - figure out what to do here
instance.add_backfill(backfill_job.with_status(BulkActionStatus.COMPLETED))

print_fn(f"Launched backfill job `{backfill_id}`")
Expand Down
12 changes: 9 additions & 3 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,9 +1032,15 @@ def execute_asset_backfill_iteration(
# failure, or cancellation). Since the AssetBackfillData object stores materialization states
# per asset partition, the daemon continues to update the backfill data until all runs have
# finished in order to display the final partition statuses in the UI.
updated_backfill: PartitionBackfill = updated_backfill.with_status(
BulkActionStatus.COMPLETED
)
if (
updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
> 0
):
updated_backfill = updated_backfill.with_status(BulkActionStatus.COMPLETED_FAILED)
else:
updated_backfill: PartitionBackfill = updated_backfill.with_status(
BulkActionStatus.COMPLETED_SUCCESS
)
instance.update_backfill(updated_backfill)

new_materialized_partitions = (
Expand Down
6 changes: 4 additions & 2 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
@whitelist_for_serdes
class BulkActionStatus(Enum):
REQUESTED = "REQUESTED"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
COMPLETED = "COMPLETED" # deprecated. Use COMPLETED_SUCCESS or COMPLETED_FAILED instead
FAILED = "FAILED" # denotes when there is a daemon failure, or some other issue processing the backfill
CANCELING = "CANCELING"
CANCELED = "CANCELED"
COMPLETED_SUCCESS = "COMPLETED_SUCCESS"
COMPLETED_FAILED = "COMPLETED_FAILED" # denotes that the backfill daemon completed successfully, but some runs failed

@staticmethod
def from_graphql_input(graphql_str):
Expand Down
15 changes: 14 additions & 1 deletion python_modules/dagster/dagster/_core/execution/job_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,20 @@ def execute_job_backfill_iteration(
f"Backfill completed for {backfill.backfill_id} for"
f" {len(partition_names)} partitions"
)
instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
if (
len(
instance.get_run_ids(
filters=RunsFilter(
tags=DagsterRun.tags_for_backfill_id(backfill.backfill_id),
statuses=[DagsterRunStatus.FAILURE, DagsterRunStatus.CANCELED],
)
)
)
> 0
):
instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED_FAILED))
else:
instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS))
yield None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ def test_job_backfill_status(
assert instance.get_runs_count() == 3
backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


@pytest.mark.skipif(IS_WINDOWS, reason="flaky in windows")
Expand Down Expand Up @@ -1250,7 +1250,7 @@ def test_pure_asset_backfill(
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill("backfill_with_asset_selection")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_backfill_from_failure_for_subselection(
Expand Down Expand Up @@ -2044,7 +2044,7 @@ def test_asset_job_backfill_single_run_multiple_iterations(
assert instance.get_runs_count() == 1
backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_asset_job_backfill_multi_run(
Expand Down Expand Up @@ -2656,7 +2656,7 @@ def override_backfill_storage_setting(self):
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill("backfill_with_asset_selection")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

# set num_lines high so we know we get all of the remaining logs
os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "100"
Expand Down Expand Up @@ -2726,7 +2726,7 @@ def test_asset_backfill_from_asset_graph_subset(
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill("backfill_from_asset_graph_subset")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions(
Expand Down Expand Up @@ -2792,4 +2792,4 @@ def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions(
"backfill_from_asset_graph_subset_with_static_and_time_partitions"
)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_simple(instance: DagsterInstance, external_repo: ExternalRepository):
launch_process.join(timeout=60)
backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


@pytest.mark.skipif(
Expand Down Expand Up @@ -115,7 +115,7 @@ def test_before_submit(

backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
assert instance.get_runs_count() == 3


Expand Down Expand Up @@ -162,5 +162,5 @@ def test_crash_after_submit(

backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
assert instance.get_runs_count() == 3
Original file line number Diff line number Diff line change
Expand Up @@ -1361,7 +1361,7 @@ def test_backfill(self, storage: RunStorage):
backfill = storage.get_backfill(one.backfill_id)
assert backfill == one

storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED))
storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED_SUCCESS))
assert len(storage.get_backfills()) == 1
assert len(storage.get_backfills(status=BulkActionStatus.REQUESTED)) == 0

Expand Down Expand Up @@ -1391,7 +1391,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
assert (
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED])
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED_SUCCESS])
)
)
== 0
Expand All @@ -1400,7 +1400,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
len(
storage.get_backfills(
filters=BulkActionsFilter(
statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED]
statuses=[BulkActionStatus.COMPLETED_SUCCESS, BulkActionStatus.REQUESTED]
)
)
)
Expand All @@ -1411,7 +1411,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
)
assert backfills[0] == one

storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED))
storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED_SUCCESS))
assert (
len(
storage.get_backfills(
Expand All @@ -1423,7 +1423,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
assert (
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED])
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED_SUCCESS])
)
)
== 1
Expand All @@ -1443,7 +1443,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
len(
storage.get_backfills(
filters=BulkActionsFilter(
statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED]
statuses=[BulkActionStatus.COMPLETED_SUCCESS, BulkActionStatus.REQUESTED]
)
)
)
Expand Down

1 comment on commit 13dbe4f

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-7ayn7k69g-elementl.vercel.app

Built with commit 13dbe4f.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.