From 13dbe4f6f82749ba23a23b2808df21e0fc6b7975 Mon Sep 17 00:00:00 2001 From: jamiedemaria Date: Mon, 16 Sep 2024 11:27:29 -0400 Subject: [PATCH] add BulkActionStatuses COMPLETED_SUCCESS and COMPLETED_FAILURE (#24365) ## Summary & Motivation Adds two new BulkAction statuses `COMPLETED_SUCCESS` and `COMPLETED_FAILURE` to expand on the existing `COMPLETED` status. This allows us to store the more fine grained information of if the backfill completed with some run failures or if the backfill completed with all runs successful. ## How I Tested These Changes ## Changelog Insert changelog entry or "NOCHANGELOG" here. - [X] `NEW` Backfill status will now distinguish between a completed backfill with all runs successful, and completed backfill with some run failures. Previously, these were displayed as the same `Completed` status. This change only applies to new backfills. Backfills that have already completed will still show a `Completed` status. --- .../ui-core/src/graphql/schema.graphql | 2 ++ .../packages/ui-core/src/graphql/types.ts | 2 ++ .../ui-core/src/instance/InstanceBackfills.tsx | 4 ++++ .../backfill/BackfillStatusTagForPage.tsx | 4 ++++ .../dagster_graphql/schema/backfill.py | 6 ++++++ .../graphql/test_partition_backfill.py | 18 ++++++++++++------ .../graphql/test_runs_feed.py | 2 +- python_modules/dagster/dagster/_cli/job.py | 1 + .../dagster/_core/execution/asset_backfill.py | 12 +++++++++--- .../dagster/_core/execution/backfill.py | 6 ++++-- .../dagster/_core/execution/job_backfill.py | 15 ++++++++++++++- .../daemon_tests/test_backfill.py | 12 ++++++------ .../test_backfill_failure_recovery.py | 6 +++--- .../storage_tests/utils/run_storage.py | 12 ++++++------ 14 files changed, 74 insertions(+), 28 deletions(-) diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index 4932d6bcabe05..736f51d719248 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -2411,6 +2411,8 @@ enum BulkActionStatus { FAILED CANCELED CANCELING + COMPLETED_SUCCESS + COMPLETED_FAILED } type DaemonHealth { diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index facb53218d158..1cf60ad46ba51 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -727,6 +727,8 @@ export enum BulkActionStatus { CANCELED = 'CANCELED', CANCELING = 'CANCELING', COMPLETED = 'COMPLETED', + COMPLETED_FAILED = 'COMPLETED_FAILED', + COMPLETED_SUCCESS = 'COMPLETED_SUCCESS', FAILED = 'FAILED', REQUESTED = 'REQUESTED', } diff --git a/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceBackfills.tsx b/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceBackfills.tsx index 22fd5bf46ab96..86fc577c972b2 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceBackfills.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/instance/InstanceBackfills.tsx @@ -42,6 +42,10 @@ const labelForBackfillStatus = (key: BulkActionStatus) => { return 'Failed'; case BulkActionStatus.REQUESTED: return 'In progress'; + case BulkActionStatus.COMPLETED_SUCCESS: + return 'Success'; + case BulkActionStatus.COMPLETED_FAILED: + return 'Failed'; } }; diff --git a/js_modules/dagster-ui/packages/ui-core/src/instance/backfill/BackfillStatusTagForPage.tsx b/js_modules/dagster-ui/packages/ui-core/src/instance/backfill/BackfillStatusTagForPage.tsx index 528311d86222c..09e087bee92ed 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/instance/backfill/BackfillStatusTagForPage.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/instance/backfill/BackfillStatusTagForPage.tsx @@ -39,6 +39,10 @@ export const BackfillStatusTagForPage = ({backfill}: {backfill: BackfillState}) return errorState('Failed'); case BulkActionStatus.COMPLETED: return Completed; + case BulkActionStatus.COMPLETED_SUCCESS: + return Succeeded; + case BulkActionStatus.COMPLETED_FAILED: + return errorState('Failed'); default: return {status}; } diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index 96cfb2f5af7a3..cd07d59546e6f 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -118,6 +118,8 @@ class GrapheneBulkActionStatus(graphene.Enum): FAILED = "FAILED" CANCELED = "CANCELED" CANCELING = "CANCELING" + COMPLETED_SUCCESS = "COMPLETED_SUCCESS" + COMPLETED_FAILED = "COMPLETED_FAILED" class Meta: name = "BulkActionStatus" @@ -130,6 +132,10 @@ def to_dagster_run_status(self) -> GrapheneRunStatus: return GrapheneRunStatus.STARTED # pyright: ignore[reportReturnType] if self.args[0] == GrapheneBulkActionStatus.COMPLETED.value: # pyright: ignore[reportAttributeAccessIssue] return GrapheneRunStatus.SUCCESS # pyright: ignore[reportReturnType] + if self.args[0] == GrapheneBulkActionStatus.COMPLETED_SUCCESS.value: # pyright: ignore[reportAttributeAccessIssue] + return GrapheneRunStatus.SUCCESS # pyright: ignore[reportReturnType] + if self.args[0] == GrapheneBulkActionStatus.COMPLETED_FAILED.value: # pyright: ignore[reportAttributeAccessIssue] + return GrapheneRunStatus.FAILURE # pyright: ignore[reportReturnType] if self.args[0] == GrapheneBulkActionStatus.FAILED.value: # pyright: ignore[reportAttributeAccessIssue] return GrapheneRunStatus.FAILURE # pyright: ignore[reportReturnType] if self.args[0] == GrapheneBulkActionStatus.CANCELED.value: # pyright: ignore[reportAttributeAccessIssue] diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 1c61900563503..903d61c298f7a 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -747,14 +747,16 @@ def test_backfill_run_stats(self, graphql_context): backfill = graphql_context.instance.get_backfill(backfill_id) # Artificially mark the backfill as complete - verify run status is INCOMPLETE until the runs all succeed - graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + graphql_context.instance.update_backfill( + backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS) + ) result = execute_dagster_graphql( graphql_context, PARTITION_PROGRESS_QUERY, variables={"backfillId": backfill_id}, ) - assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED" + assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_SUCCESS" def test_asset_job_backfill_run_stats(self, graphql_context): repository_selector = infer_repository_selector(graphql_context) @@ -1004,7 +1006,9 @@ def test_backfill_run_completed(self, graphql_context): backfill = graphql_context.instance.get_backfill(backfill_id) - graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + graphql_context.instance.update_backfill( + backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS) + ) _seed_runs( graphql_context, @@ -1026,7 +1030,7 @@ def test_backfill_run_completed(self, graphql_context): assert not result.errors assert result.data assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" - assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED" + assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_SUCCESS" assert result.data["partitionBackfillOrError"]["numPartitions"] == 4 run_stats = _get_run_stats( @@ -1060,7 +1064,9 @@ def test_backfill_run_incomplete(self, graphql_context): backfill = graphql_context.instance.get_backfill(backfill_id) - graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + graphql_context.instance.update_backfill( + backfill.with_status(BulkActionStatus.COMPLETED_FAILED) + ) _seed_runs( graphql_context, @@ -1082,7 +1088,7 @@ def test_backfill_run_incomplete(self, graphql_context): assert not result.errors assert result.data assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" - assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED" + assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_FAILED" assert result.data["partitionBackfillOrError"]["numPartitions"] == 4 assert len(result.data["partitionBackfillOrError"]["cancelableRuns"]) == 1 run_stats = _get_run_stats( diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py index a8e518a9ffcaf..427167fa065f9 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_runs_feed.py @@ -73,7 +73,7 @@ def _create_backfill(graphql_context) -> str: backfill = PartitionBackfill( backfill_id=make_new_backfill_id(), serialized_asset_backfill_data="foo", # the content of the backfill doesn't matter for testing fetching mega runs - status=BulkActionStatus.COMPLETED, + status=BulkActionStatus.COMPLETED_SUCCESS, reexecution_steps=None, tags=None, backfill_timestamp=get_current_timestamp(), diff --git a/python_modules/dagster/dagster/_cli/job.py b/python_modules/dagster/dagster/_cli/job.py index d8f5515db6eb3..884d9f880bfc0 100644 --- a/python_modules/dagster/dagster/_cli/job.py +++ b/python_modules/dagster/dagster/_cli/job.py @@ -722,6 +722,7 @@ def _execute_backfill_command_at_location( if dagster_run: instance.submit_run(dagster_run.run_id, workspace) + # TODO - figure out what to do here instance.add_backfill(backfill_job.with_status(BulkActionStatus.COMPLETED)) print_fn(f"Launched backfill job `{backfill_id}`") diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index cacde7b2ee4e4..e5ca962b22fd9 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1032,9 +1032,15 @@ def execute_asset_backfill_iteration( # failure, or cancellation). Since the AssetBackfillData object stores materialization states # per asset partition, the daemon continues to update the backfill data until all runs have # finished in order to display the final partition statuses in the UI. - updated_backfill: PartitionBackfill = updated_backfill.with_status( - BulkActionStatus.COMPLETED - ) + if ( + updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets + > 0 + ): + updated_backfill = updated_backfill.with_status(BulkActionStatus.COMPLETED_FAILED) + else: + updated_backfill: PartitionBackfill = updated_backfill.with_status( + BulkActionStatus.COMPLETED_SUCCESS + ) instance.update_backfill(updated_backfill) new_materialized_partitions = ( diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index e992dc2c50558..87dafdda59b10 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -29,10 +29,12 @@ @whitelist_for_serdes class BulkActionStatus(Enum): REQUESTED = "REQUESTED" - COMPLETED = "COMPLETED" - FAILED = "FAILED" + COMPLETED = "COMPLETED" # deprecated. Use COMPLETED_SUCCESS or COMPLETED_FAILED instead + FAILED = "FAILED" # denotes when there is a daemon failure, or some other issue processing the backfill CANCELING = "CANCELING" CANCELED = "CANCELED" + COMPLETED_SUCCESS = "COMPLETED_SUCCESS" + COMPLETED_FAILED = "COMPLETED_FAILED" # denotes that the backfill daemon completed successfully, but some runs failed @staticmethod def from_graphql_input(graphql_str): diff --git a/python_modules/dagster/dagster/_core/execution/job_backfill.py b/python_modules/dagster/dagster/_core/execution/job_backfill.py index 61c06a8573699..b1304a2f6d2c9 100644 --- a/python_modules/dagster/dagster/_core/execution/job_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/job_backfill.py @@ -114,7 +114,20 @@ def execute_job_backfill_iteration( f"Backfill completed for {backfill.backfill_id} for" f" {len(partition_names)} partitions" ) - instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED)) + if ( + len( + instance.get_run_ids( + filters=RunsFilter( + tags=DagsterRun.tags_for_backfill_id(backfill.backfill_id), + statuses=[DagsterRunStatus.FAILURE, DagsterRunStatus.CANCELED], + ) + ) + ) + > 0 + ): + instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED_FAILED)) + else: + instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS)) yield None diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index e2803fad39348..4bca23f2b9fb3 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -728,7 +728,7 @@ def test_job_backfill_status( assert instance.get_runs_count() == 3 backfill = instance.get_backfill("simple") assert backfill - assert backfill.status == BulkActionStatus.COMPLETED + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS @pytest.mark.skipif(IS_WINDOWS, reason="flaky in windows") @@ -1250,7 +1250,7 @@ def test_pure_asset_backfill( list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill("backfill_with_asset_selection") assert backfill - assert backfill.status == BulkActionStatus.COMPLETED + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS def test_backfill_from_failure_for_subselection( @@ -2044,7 +2044,7 @@ def test_asset_job_backfill_single_run_multiple_iterations( assert instance.get_runs_count() == 1 backfill = instance.get_backfill("simple") assert backfill - assert backfill.status == BulkActionStatus.COMPLETED + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS def test_asset_job_backfill_multi_run( @@ -2656,7 +2656,7 @@ def override_backfill_storage_setting(self): list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill("backfill_with_asset_selection") assert backfill - assert backfill.status == BulkActionStatus.COMPLETED + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS # set num_lines high so we know we get all of the remaining logs os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "100" @@ -2726,7 +2726,7 @@ def test_asset_backfill_from_asset_graph_subset( list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon"))) backfill = instance.get_backfill("backfill_from_asset_graph_subset") assert backfill - assert backfill.status == BulkActionStatus.COMPLETED + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions( @@ -2792,4 +2792,4 @@ def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions( "backfill_from_asset_graph_subset_with_static_and_time_partitions" ) assert backfill - assert backfill.status == BulkActionStatus.COMPLETED + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill_failure_recovery.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill_failure_recovery.py index 3379dc6ed43e4..0cd6c0e80f73d 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill_failure_recovery.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill_failure_recovery.py @@ -69,7 +69,7 @@ def test_simple(instance: DagsterInstance, external_repo: ExternalRepository): launch_process.join(timeout=60) backfill = instance.get_backfill("simple") assert backfill - assert backfill.status == BulkActionStatus.COMPLETED + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS @pytest.mark.skipif( @@ -115,7 +115,7 @@ def test_before_submit( backfill = instance.get_backfill("simple") assert backfill - assert backfill.status == BulkActionStatus.COMPLETED + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS assert instance.get_runs_count() == 3 @@ -162,5 +162,5 @@ def test_crash_after_submit( backfill = instance.get_backfill("simple") assert backfill - assert backfill.status == BulkActionStatus.COMPLETED + assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS assert instance.get_runs_count() == 3 diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py index 9f98fa076cb6e..0504390c024a3 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py @@ -1361,7 +1361,7 @@ def test_backfill(self, storage: RunStorage): backfill = storage.get_backfill(one.backfill_id) assert backfill == one - storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED)) + storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED_SUCCESS)) assert len(storage.get_backfills()) == 1 assert len(storage.get_backfills(status=BulkActionStatus.REQUESTED)) == 0 @@ -1391,7 +1391,7 @@ def test_backfill_status_filtering(self, storage: RunStorage): assert ( len( storage.get_backfills( - filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED]) + filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED_SUCCESS]) ) ) == 0 @@ -1400,7 +1400,7 @@ def test_backfill_status_filtering(self, storage: RunStorage): len( storage.get_backfills( filters=BulkActionsFilter( - statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED] + statuses=[BulkActionStatus.COMPLETED_SUCCESS, BulkActionStatus.REQUESTED] ) ) ) @@ -1411,7 +1411,7 @@ def test_backfill_status_filtering(self, storage: RunStorage): ) assert backfills[0] == one - storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED)) + storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED_SUCCESS)) assert ( len( storage.get_backfills( @@ -1423,7 +1423,7 @@ def test_backfill_status_filtering(self, storage: RunStorage): assert ( len( storage.get_backfills( - filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED]) + filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED_SUCCESS]) ) ) == 1 @@ -1443,7 +1443,7 @@ def test_backfill_status_filtering(self, storage: RunStorage): len( storage.get_backfills( filters=BulkActionsFilter( - statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED] + statuses=[BulkActionStatus.COMPLETED_SUCCESS, BulkActionStatus.REQUESTED] ) ) )