From a9049d310e205f35050782aa3d0abb51ddc05185 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 19 Aug 2024 15:00:41 -0400 Subject: [PATCH] tests --- .../dagster_graphql/schema/backfill.py | 39 +++++++++------ .../graphql/test_asset_backfill.py | 12 +++++ .../graphql/test_partition_backfill.py | 47 ++++++++++++++++++- 3 files changed, 82 insertions(+), 16 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py index 686c4879ff63d..996a1df19685c 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/backfill.py @@ -9,7 +9,7 @@ from dagster._core.definitions.partition import PartitionsSubset from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset -from dagster._core.errors import DagsterError +from dagster._core.errors import DagsterError, DagsterInvariantViolationError from dagster._core.execution.asset_backfill import ( AssetBackfillStatus, PartitionedAssetBackfillStatus, @@ -493,33 +493,42 @@ def resolve_tags(self, _graphene_info: ResolveInfo): ] def resolve_runStatus(self, _graphene_info: ResolveInfo) -> GrapheneRunStatus: - if self.status == BulkActionStatus.FAILED: + converted_status = BulkActionStatus[self.status] + if converted_status is BulkActionStatus.FAILED: return GrapheneRunStatus.FAILURE - if self.status == BulkActionStatus.CANCELED: + if converted_status is BulkActionStatus.CANCELED: return GrapheneRunStatus.CANCELED - if self.status == BulkActionStatus.CANCELING: + if converted_status is BulkActionStatus.CANCELING: return GrapheneRunStatus.CANCELING - if self.status == BulkActionStatus.REQUESTED: + if converted_status is BulkActionStatus.REQUESTED: # if no runs have been launched: if len(self._get_records(_graphene_info)) == 0: return GrapheneRunStatus.NOT_STARTED # GrapheneRunStatus.QUEUED? return GrapheneRunStatus.STARTED # BulkActionStatus.COMPLETED sub_runs = self._get_records(_graphene_info) - sub_run_statuses = [record.dagster_run.status for record in sub_runs] - if all(status == GrapheneRunStatus.SUCCESS for status in sub_run_statuses): + sub_run_statuses = [record.dagster_run.status.value for record in sub_runs] + if all(status == "SUCCESS" for status in sub_run_statuses): return GrapheneRunStatus.SUCCESS - if any(status == GrapheneRunStatus.FAILURE for status in sub_run_statuses): + if any(status == "FAILURE" for status in sub_run_statuses): return GrapheneRunStatus.FAILURE - if any(status == GrapheneRunStatus.CANCELED for status in sub_run_statuses): + if any(status == "CANCELED" for status in sub_run_statuses): return GrapheneRunStatus.FAILURE - return GrapheneRunStatus.FAILURE # what should we default to? maybe raise an error - # raise DagsterInvariantViolationError( - # f"Unable to convert BulkActionStatus {self.args[0]} to a RunStatus. {self.args[0]} is an unknown status." - # ) - - return GrapheneBulkActionStatus(self.status).to_dagster_run_status() + # can't import this because two deserializers get registered for PipelineRunStatsSnapshot + # from python_modules.dagster.dagster._core.storage.dagster_run import DagsterRunStatus + # + # sub_run_statuses = [record.dagster_run.status for record in sub_runs] + # if all(status == DagsterRunStatus.SUCCESS for status in sub_run_statuses): + # return GrapheneRunStatus.SUCCESS + # if any(status == DagsterRunStatus.FAILURE for status in sub_run_statuses): + # return GrapheneRunStatus.FAILURE + # if any(status == DagsterRunStatus.CANCELED for status in sub_run_statuses): + # return GrapheneRunStatus.FAILURE + + raise DagsterInvariantViolationError( + f"Unable to convert BulkActionStatus {self.args[0]} to a RunStatus. {self.args[0]} is an unknown status." + ) def resolve_endTimestamp(self, graphene_info: ResolveInfo) -> Optional[float]: if self._backfill_job.status == BulkActionStatus.REQUESTED: diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py index f6435026bed67..f50440c16adb9 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_asset_backfill.py @@ -38,6 +38,7 @@ results { id status + runStatus numPartitions timestamp partitionNames @@ -63,6 +64,7 @@ query SingleBackfillQuery($backfillId: String!) { partitionBackfillOrError(backfillId: $backfillId) { ... on PartitionBackfill { + runStatus partitionStatuses { results { id @@ -495,6 +497,7 @@ def test_launch_asset_backfill(): assert backfill_results[0]["partitionSet"] is None assert backfill_results[0]["partitionSetName"] is None assert set(backfill_results[0]["partitionNames"]) == {"a", "b"} + assert backfill_results[0]["runStatus"] == "NOT_STARTED" # on PartitionBackfill single_backfill_result = execute_dagster_graphql( @@ -505,6 +508,10 @@ def test_launch_asset_backfill(): assert ( single_backfill_result.data["partitionBackfillOrError"]["partitionStatuses"] is None ) + assert ( + single_backfill_result.data["partitionBackfillOrError"]["runStatus"] + == "NOT_STARTED" + ) def test_remove_partitions_defs_after_backfill_backcompat(): @@ -620,6 +627,10 @@ def test_remove_partitions_defs_after_backfill(): assert ( single_backfill_result.data["partitionBackfillOrError"]["partitionStatuses"] is None ) + assert ( + single_backfill_result.data["partitionBackfillOrError"]["runStatus"] + == "NOT_STARTED" + ) def test_launch_asset_backfill_with_non_partitioned_asset(): @@ -712,6 +723,7 @@ def test_launch_asset_backfill_with_upstream_anchor_asset(): assert backfill_results[0]["partitionSet"] is None assert backfill_results[0]["partitionSetName"] is None assert backfill_results[0]["partitionNames"] is None + assert backfill_results[0]["runStatus"] == "NOT_STARTED" def get_daily_two_hourly_repo() -> RepositoryDefinition: diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py index 8cac6941c7dae..44ddbebb89f9f 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_partition_backfill.py @@ -57,6 +57,7 @@ __typename id status + runStatus numCancelable partitionNames numPartitions @@ -87,6 +88,7 @@ partitionBackfillOrError(backfillId: $backfillId) { __typename ... on PartitionBackfill { + runStatus assetBackfillData { assetBackfillStatuses { ... on AssetPartitionsStatusCounts { @@ -392,6 +394,7 @@ def test_resume_backfill_failure(self, graphql_context): class TestDaemonPartitionBackfill(ExecutingGraphQLContextTestMatrix): def test_launch_full_pipeline_backfill(self, graphql_context): + # TestDaemonPartitionBackfill::test_launch_full_pipeline_backfill[sqlite_with_default_run_launcher_managed_grpc_env] repository_selector = infer_repository_selector(graphql_context) result = execute_dagster_graphql( graphql_context, @@ -425,6 +428,7 @@ def test_launch_full_pipeline_backfill(self, graphql_context): assert result.data["partitionBackfillOrError"]["numCancelable"] == 2 assert result.data["partitionBackfillOrError"]["hasCancelPermission"] is True assert result.data["partitionBackfillOrError"]["hasResumePermission"] is True + assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED" assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2 @@ -499,6 +503,7 @@ def test_launch_partial_backfill(self, graphql_context): assert result.data["partitionBackfillOrError"]["numCancelable"] == 2 assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2 assert result.data["partitionBackfillOrError"]["reexecutionSteps"] == ["after_failure"] + assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED" def test_cancel_backfill(self, graphql_context): repository_selector = infer_repository_selector(graphql_context) @@ -533,6 +538,7 @@ def test_cancel_backfill(self, graphql_context): assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED" assert result.data["partitionBackfillOrError"]["numCancelable"] == 2 assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2 + assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED" result = execute_dagster_graphql( graphql_context, @@ -551,6 +557,7 @@ def test_cancel_backfill(self, graphql_context): assert result.data assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" assert result.data["partitionBackfillOrError"]["status"] == "CANCELED" + assert result.data["partitionBackfillOrError"]["runStatus"] == "CANCELED" def test_cancel_asset_backfill(self, graphql_context): asset_key = AssetKey("hanging_partition_asset") @@ -666,6 +673,17 @@ def test_resume_backfill(self, graphql_context): backfill = graphql_context.instance.get_backfill(backfill_id) graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.FAILED)) + result = execute_dagster_graphql( + graphql_context, + PARTITION_PROGRESS_QUERY, + variables={"backfillId": backfill_id}, + ) + assert not result.errors + assert result.data + assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" + assert result.data["partitionBackfillOrError"]["status"] == "FAILED" + assert result.data["partitionBackfillOrError"]["runStatus"] == "FAILURE" + result = execute_dagster_graphql( graphql_context, RESUME_BACKFILL_MUTATION, @@ -683,6 +701,7 @@ def test_resume_backfill(self, graphql_context): assert result.data assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED" + assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED" def test_backfill_run_stats(self, graphql_context): repository_selector = infer_repository_selector(graphql_context) @@ -725,12 +744,13 @@ def test_backfill_run_stats(self, graphql_context): PARTITION_PROGRESS_QUERY, variables={"backfillId": backfill_id}, ) - + assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED" assert not result.errors assert result.data assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED" assert result.data["partitionBackfillOrError"]["numPartitions"] == 4 + run_stats = _get_run_stats( result.data["partitionBackfillOrError"]["partitionStatuses"]["results"] ) @@ -752,6 +772,7 @@ def test_backfill_run_stats(self, graphql_context): variables={"backfillId": backfill_id}, ) assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED" + assert result.data["partitionBackfillOrError"]["runStatus"] == "FAILURE" def test_asset_job_backfill_run_stats(self, graphql_context): repository_selector = infer_repository_selector(graphql_context) @@ -798,6 +819,7 @@ def test_asset_job_backfill_run_stats(self, graphql_context): ) assert run_stats.get("total") == 4 assert run_stats.get("success") == 4 + assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED" def test_asset_backfill_stats_in_topological_order(self, graphql_context): asset_key_paths_in_topo_order = [ @@ -885,6 +907,7 @@ def test_asset_backfill_partition_stats(self, graphql_context): assert not result.errors assert result.data backfill_data = result.data["partitionBackfillOrError"]["assetBackfillData"] + assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED" assert backfill_data["rootTargetedPartitions"]["ranges"] is None assert set(backfill_data["rootTargetedPartitions"]["partitionKeys"]) == set(partitions) @@ -955,6 +978,7 @@ def test_asset_backfill_status_with_upstream_failure(self, graphql_context): assert not result.errors assert result.data backfill_data = result.data["partitionBackfillOrError"]["assetBackfillData"] + assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED" assert backfill_data["rootTargetedPartitions"]["ranges"] == [ {"start": "2023-01-09", "end": "2023-01-09"} @@ -980,6 +1004,7 @@ def test_asset_backfill_status_with_upstream_failure(self, graphql_context): assert asset_statuses[2]["numPartitionsFailed"] == 1 def test_backfill_run_completed(self, graphql_context): + # TestDaemonPartitionBackfill::test_backfill_run_completed[sqlite_with_default_run_launcher_managed_grpc_env] repository_selector = infer_repository_selector(graphql_context) result = execute_dagster_graphql( graphql_context, @@ -1025,6 +1050,7 @@ def test_backfill_run_completed(self, graphql_context): assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED" assert result.data["partitionBackfillOrError"]["numPartitions"] == 4 + assert result.data["partitionBackfillOrError"]["runStatus"] == "SUCCESS" run_stats = _get_run_stats( result.data["partitionBackfillOrError"]["partitionStatuses"]["results"] @@ -1081,6 +1107,7 @@ def test_backfill_run_incomplete(self, graphql_context): assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill" assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED" assert result.data["partitionBackfillOrError"]["numPartitions"] == 4 + assert result.data["partitionBackfillOrError"]["runStatus"] == "FAILURE" run_stats = _get_run_stats( result.data["partitionBackfillOrError"]["partitionStatuses"]["results"] ) @@ -1254,6 +1281,15 @@ def test_launch_from_failure(self, graphql_context): assert result.data["partitionBackfillOrError"]["numCancelable"] == 2 assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2 assert result.data["partitionBackfillOrError"]["fromFailure"] + assert ( + len( + graphql_context.instance.get_run_records( + filters=RunsFilter.for_backfill(backfill_id) + ) + ) + == 0 + ) + assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED" def test_launch_backfill_with_all_partitions_flag(self, graphql_context): repository_selector = infer_repository_selector(graphql_context) @@ -1289,3 +1325,12 @@ def test_launch_backfill_with_all_partitions_flag(self, graphql_context): assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED" assert result.data["partitionBackfillOrError"]["numCancelable"] == 10 assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 10 + assert ( + len( + graphql_context.instance.get_run_records( + filters=RunsFilter.for_backfill(backfill_id) + ) + ) + == 0 + ) + assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED"