Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign backfills a run status based on their sub-run statuses #23702

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 38 additions & 21 deletions python_modules/dagster-graphql/dagster_graphql/schema/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,25 +122,6 @@ class GrapheneBulkActionStatus(graphene.Enum):
class Meta:
name = "BulkActionStatus"

def to_dagster_run_status(self) -> GrapheneRunStatus:
"""Maps bulk action status to a run status for use with the RunsFeedEntry interface."""
# the pyright ignores are required because GrapheneBulkActionStatus.STATUS and GrapheneRunStatus.STATUS
# are interpreted as a Literal string during static analysis, but it is actually an Enum value
if self.args[0] == GrapheneBulkActionStatus.REQUESTED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.STARTED # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.COMPLETED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.SUCCESS # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.FAILED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.FAILURE # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.CANCELED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.CANCELED # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.CANCELING.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.CANCELING # pyright: ignore[reportReturnType]

raise DagsterInvariantViolationError(
f"Unable to convert BulkActionStatus {self.args[0]} to a RunStatus. {self.args[0]} is an unknown status."
)


class GrapheneAssetBackfillTargetPartitions(graphene.ObjectType):
class Meta:
Expand Down Expand Up @@ -511,8 +492,44 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
if get_tag_type(key) != TagType.HIDDEN
]

def resolve_runStatus(self, _graphene_info: ResolveInfo) -> GrapheneRunStatus:
return GrapheneBulkActionStatus(self.status).to_dagster_run_status()
def resolve_runStatus(self, _graphene_info: ResolveInfo) -> str:
converted_status = BulkActionStatus[self.status]
if converted_status is BulkActionStatus.FAILED:
return GrapheneRunStatus.FAILURE
if converted_status is BulkActionStatus.CANCELED:
return GrapheneRunStatus.CANCELED
if converted_status is BulkActionStatus.CANCELING:
return GrapheneRunStatus.CANCELING
if converted_status is BulkActionStatus.REQUESTED:
# if no runs have been launched:
if len(self._get_records(_graphene_info)) == 0:
return GrapheneRunStatus.QUEUED
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be not_started? starting? or just map to Started for all in requested state

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think STARTING might make sense here, but if it's STARTED, it's not a big deal.

return GrapheneRunStatus.STARTED
# BulkActionStatus.COMPLETED
# Backfills are only marked as COMPLETED once all runs that are part of the backfill have reached
# a finished state (SUCCESS, FAILURE, or CANCELED). So we only need to check for those
# statuses to determine the overall status of the backfill.
sub_runs = self._get_records(_graphene_info)
sub_run_statuses = [record.dagster_run.status.value for record in sub_runs]
if all(status == "SUCCESS" for status in sub_run_statuses):
return GrapheneRunStatus.SUCCESS
if any(status == "FAILURE" or status == "CANCELED" for status in sub_run_statuses):
return GrapheneRunStatus.FAILURE

# can't import this because two deserializers get registered for PipelineRunStatsSnapshot
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather use the code below to check the statuses of the runs, but if I import DagsterRunStatus I get the error dagster._serdes.errors.SerdesUsageError: Multiple deserializers registered for storage name 'PipelineRunStatsSnapshot' I think because the storage_nameforDagsterRunStatsSnapshot`

@whitelist_for_serdes(storage_name="PipelineRunStatsSnapshot")
class DagsterRunStatsSnapshot(
...

collides with GraphenePipelineRunStatsSnapshot

Not sure what there is to de about this other than move DagsterRunStatus into a separate module

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what there is to de about this other than move DagsterRunStatus into a separate module

Surprised that that would help with the serdes error, but moving it to a separate module seems like a nice thing to me.

# from python_modules.dagster.dagster._core.storage.dagster_run import DagsterRunStatus
#
# sub_run_statuses = [record.dagster_run.status for record in sub_runs]
# if all(status == DagsterRunStatus.SUCCESS for status in sub_run_statuses):
# return GrapheneRunStatus.SUCCESS
# if any(status == DagsterRunStatus.FAILURE for status in sub_run_statuses):
# return GrapheneRunStatus.FAILURE
# if any(status == DagsterRunStatus.CANCELED for status in sub_run_statuses):
# return GrapheneRunStatus.FAILURE

raise DagsterInvariantViolationError(
f"Unable to convert BulkActionStatus {self.args[0]} to a RunStatus. {self.args[0]} is an unknown status."
)

def resolve_endTimestamp(self, graphene_info: ResolveInfo) -> Optional[float]:
if self._backfill_job.status == BulkActionStatus.REQUESTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
results {
id
status
runStatus
numPartitions
timestamp
partitionNames
Expand All @@ -63,6 +64,7 @@
query SingleBackfillQuery($backfillId: String!) {
partitionBackfillOrError(backfillId: $backfillId) {
... on PartitionBackfill {
runStatus
partitionStatuses {
results {
id
Expand Down Expand Up @@ -495,6 +497,7 @@ def test_launch_asset_backfill():
assert backfill_results[0]["partitionSet"] is None
assert backfill_results[0]["partitionSetName"] is None
assert set(backfill_results[0]["partitionNames"]) == {"a", "b"}
assert backfill_results[0]["runStatus"] == "QUEUED"

# on PartitionBackfill
single_backfill_result = execute_dagster_graphql(
Expand All @@ -505,6 +508,7 @@ def test_launch_asset_backfill():
assert (
single_backfill_result.data["partitionBackfillOrError"]["partitionStatuses"] is None
)
assert single_backfill_result.data["partitionBackfillOrError"]["runStatus"] == "QUEUED"


def test_remove_partitions_defs_after_backfill_backcompat():
Expand Down Expand Up @@ -620,6 +624,7 @@ def test_remove_partitions_defs_after_backfill():
assert (
single_backfill_result.data["partitionBackfillOrError"]["partitionStatuses"] is None
)
assert single_backfill_result.data["partitionBackfillOrError"]["runStatus"] == "QUEUED"


def test_launch_asset_backfill_with_non_partitioned_asset():
Expand Down Expand Up @@ -712,6 +717,7 @@ def test_launch_asset_backfill_with_upstream_anchor_asset():
assert backfill_results[0]["partitionSet"] is None
assert backfill_results[0]["partitionSetName"] is None
assert backfill_results[0]["partitionNames"] is None
assert backfill_results[0]["runStatus"] == "QUEUED"


def get_daily_two_hourly_repo() -> RepositoryDefinition:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
__typename
id
status
runStatus
numCancelable
partitionNames
numPartitions
Expand Down Expand Up @@ -87,6 +88,7 @@
partitionBackfillOrError(backfillId: $backfillId) {
__typename
... on PartitionBackfill {
runStatus
assetBackfillData {
assetBackfillStatuses {
... on AssetPartitionsStatusCounts {
Expand Down Expand Up @@ -425,6 +427,7 @@ def test_launch_full_pipeline_backfill(self, graphql_context):
assert result.data["partitionBackfillOrError"]["numCancelable"] == 2
assert result.data["partitionBackfillOrError"]["hasCancelPermission"] is True
assert result.data["partitionBackfillOrError"]["hasResumePermission"] is True
assert result.data["partitionBackfillOrError"]["runStatus"] == "QUEUED"

assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2

Expand Down Expand Up @@ -499,6 +502,7 @@ def test_launch_partial_backfill(self, graphql_context):
assert result.data["partitionBackfillOrError"]["numCancelable"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2
assert result.data["partitionBackfillOrError"]["reexecutionSteps"] == ["after_failure"]
assert result.data["partitionBackfillOrError"]["runStatus"] == "QUEUED"

def test_cancel_backfill(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
Expand Down Expand Up @@ -533,6 +537,7 @@ def test_cancel_backfill(self, graphql_context):
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numCancelable"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2
assert result.data["partitionBackfillOrError"]["runStatus"] == "QUEUED"

result = execute_dagster_graphql(
graphql_context,
Expand All @@ -551,6 +556,7 @@ def test_cancel_backfill(self, graphql_context):
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "CANCELED"
assert result.data["partitionBackfillOrError"]["runStatus"] == "CANCELED"

def test_cancel_asset_backfill(self, graphql_context):
asset_key = AssetKey("hanging_partition_asset")
Expand Down Expand Up @@ -666,6 +672,17 @@ def test_resume_backfill(self, graphql_context):
backfill = graphql_context.instance.get_backfill(backfill_id)
graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.FAILED))

result = execute_dagster_graphql(
graphql_context,
PARTITION_PROGRESS_QUERY,
variables={"backfillId": backfill_id},
)
assert not result.errors
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "FAILED"
assert result.data["partitionBackfillOrError"]["runStatus"] == "FAILURE"

result = execute_dagster_graphql(
graphql_context,
RESUME_BACKFILL_MUTATION,
Expand All @@ -683,6 +700,7 @@ def test_resume_backfill(self, graphql_context):
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["runStatus"] == "QUEUED"

def test_backfill_run_stats(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
Expand Down Expand Up @@ -725,12 +743,13 @@ def test_backfill_run_stats(self, graphql_context):
PARTITION_PROGRESS_QUERY,
variables={"backfillId": backfill_id},
)

assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED"
assert not result.errors
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4

run_stats = _get_run_stats(
result.data["partitionBackfillOrError"]["partitionStatuses"]["results"]
)
Expand All @@ -752,6 +771,7 @@ def test_backfill_run_stats(self, graphql_context):
variables={"backfillId": backfill_id},
)
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["runStatus"] == "FAILURE"

def test_asset_job_backfill_run_stats(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
Expand Down Expand Up @@ -798,6 +818,7 @@ def test_asset_job_backfill_run_stats(self, graphql_context):
)
assert run_stats.get("total") == 4
assert run_stats.get("success") == 4
assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED"

def test_asset_backfill_stats_in_topological_order(self, graphql_context):
asset_key_paths_in_topo_order = [
Expand Down Expand Up @@ -885,6 +906,7 @@ def test_asset_backfill_partition_stats(self, graphql_context):
assert not result.errors
assert result.data
backfill_data = result.data["partitionBackfillOrError"]["assetBackfillData"]
assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED"

assert backfill_data["rootTargetedPartitions"]["ranges"] is None
assert set(backfill_data["rootTargetedPartitions"]["partitionKeys"]) == set(partitions)
Expand Down Expand Up @@ -955,6 +977,7 @@ def test_asset_backfill_status_with_upstream_failure(self, graphql_context):
assert not result.errors
assert result.data
backfill_data = result.data["partitionBackfillOrError"]["assetBackfillData"]
assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED"

assert backfill_data["rootTargetedPartitions"]["ranges"] == [
{"start": "2023-01-09", "end": "2023-01-09"}
Expand Down Expand Up @@ -1025,6 +1048,7 @@ def test_backfill_run_completed(self, graphql_context):
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4
assert result.data["partitionBackfillOrError"]["runStatus"] == "SUCCESS"

run_stats = _get_run_stats(
result.data["partitionBackfillOrError"]["partitionStatuses"]["results"]
Expand Down Expand Up @@ -1081,6 +1105,7 @@ def test_backfill_run_incomplete(self, graphql_context):
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4
assert result.data["partitionBackfillOrError"]["runStatus"] == "FAILURE"
run_stats = _get_run_stats(
result.data["partitionBackfillOrError"]["partitionStatuses"]["results"]
)
Expand Down Expand Up @@ -1254,6 +1279,15 @@ def test_launch_from_failure(self, graphql_context):
assert result.data["partitionBackfillOrError"]["numCancelable"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2
assert result.data["partitionBackfillOrError"]["fromFailure"]
assert (
len(
graphql_context.instance.get_run_records(
filters=RunsFilter.for_backfill(backfill_id)
)
)
== 0
)
assert result.data["partitionBackfillOrError"]["runStatus"] == "QUEUED"

def test_launch_backfill_with_all_partitions_flag(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
Expand Down Expand Up @@ -1289,3 +1323,12 @@ def test_launch_backfill_with_all_partitions_flag(self, graphql_context):
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numCancelable"] == 10
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 10
assert (
len(
graphql_context.instance.get_run_records(
filters=RunsFilter.for_backfill(backfill_id)
)
)
== 0
)
assert result.data["partitionBackfillOrError"]["runStatus"] == "QUEUED"