Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Aug 19, 2024
1 parent 2d7f32f commit a9049d3
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 16 deletions.
39 changes: 24 additions & 15 deletions python_modules/dagster-graphql/dagster_graphql/schema/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dagster._core.definitions.partition import PartitionsSubset
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset
from dagster._core.errors import DagsterError
from dagster._core.errors import DagsterError, DagsterInvariantViolationError
from dagster._core.execution.asset_backfill import (
AssetBackfillStatus,
PartitionedAssetBackfillStatus,
Expand Down Expand Up @@ -493,33 +493,42 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
]

def resolve_runStatus(self, _graphene_info: ResolveInfo) -> GrapheneRunStatus:
if self.status == BulkActionStatus.FAILED:
converted_status = BulkActionStatus[self.status]
if converted_status is BulkActionStatus.FAILED:
return GrapheneRunStatus.FAILURE
if self.status == BulkActionStatus.CANCELED:
if converted_status is BulkActionStatus.CANCELED:
return GrapheneRunStatus.CANCELED
if self.status == BulkActionStatus.CANCELING:
if converted_status is BulkActionStatus.CANCELING:
return GrapheneRunStatus.CANCELING
if self.status == BulkActionStatus.REQUESTED:
if converted_status is BulkActionStatus.REQUESTED:
# if no runs have been launched:
if len(self._get_records(_graphene_info)) == 0:
return GrapheneRunStatus.NOT_STARTED # GrapheneRunStatus.QUEUED?
return GrapheneRunStatus.STARTED
# BulkActionStatus.COMPLETED
sub_runs = self._get_records(_graphene_info)
sub_run_statuses = [record.dagster_run.status for record in sub_runs]
if all(status == GrapheneRunStatus.SUCCESS for status in sub_run_statuses):
sub_run_statuses = [record.dagster_run.status.value for record in sub_runs]
if all(status == "SUCCESS" for status in sub_run_statuses):
return GrapheneRunStatus.SUCCESS
if any(status == GrapheneRunStatus.FAILURE for status in sub_run_statuses):
if any(status == "FAILURE" for status in sub_run_statuses):
return GrapheneRunStatus.FAILURE
if any(status == GrapheneRunStatus.CANCELED for status in sub_run_statuses):
if any(status == "CANCELED" for status in sub_run_statuses):
return GrapheneRunStatus.FAILURE

return GrapheneRunStatus.FAILURE # what should we default to? maybe raise an error
# raise DagsterInvariantViolationError(
# f"Unable to convert BulkActionStatus {self.args[0]} to a RunStatus. {self.args[0]} is an unknown status."
# )

return GrapheneBulkActionStatus(self.status).to_dagster_run_status()
# can't import this because two deserializers get registered for PipelineRunStatsSnapshot
# from python_modules.dagster.dagster._core.storage.dagster_run import DagsterRunStatus
#
# sub_run_statuses = [record.dagster_run.status for record in sub_runs]
# if all(status == DagsterRunStatus.SUCCESS for status in sub_run_statuses):
# return GrapheneRunStatus.SUCCESS
# if any(status == DagsterRunStatus.FAILURE for status in sub_run_statuses):
# return GrapheneRunStatus.FAILURE
# if any(status == DagsterRunStatus.CANCELED for status in sub_run_statuses):
# return GrapheneRunStatus.FAILURE

raise DagsterInvariantViolationError(
f"Unable to convert BulkActionStatus {self.args[0]} to a RunStatus. {self.args[0]} is an unknown status."
)

def resolve_endTimestamp(self, graphene_info: ResolveInfo) -> Optional[float]:
if self._backfill_job.status == BulkActionStatus.REQUESTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
results {
id
status
runStatus
numPartitions
timestamp
partitionNames
Expand All @@ -63,6 +64,7 @@
query SingleBackfillQuery($backfillId: String!) {
partitionBackfillOrError(backfillId: $backfillId) {
... on PartitionBackfill {
runStatus
partitionStatuses {
results {
id
Expand Down Expand Up @@ -495,6 +497,7 @@ def test_launch_asset_backfill():
assert backfill_results[0]["partitionSet"] is None
assert backfill_results[0]["partitionSetName"] is None
assert set(backfill_results[0]["partitionNames"]) == {"a", "b"}
assert backfill_results[0]["runStatus"] == "NOT_STARTED"

# on PartitionBackfill
single_backfill_result = execute_dagster_graphql(
Expand All @@ -505,6 +508,10 @@ def test_launch_asset_backfill():
assert (
single_backfill_result.data["partitionBackfillOrError"]["partitionStatuses"] is None
)
assert (
single_backfill_result.data["partitionBackfillOrError"]["runStatus"]
== "NOT_STARTED"
)


def test_remove_partitions_defs_after_backfill_backcompat():
Expand Down Expand Up @@ -620,6 +627,10 @@ def test_remove_partitions_defs_after_backfill():
assert (
single_backfill_result.data["partitionBackfillOrError"]["partitionStatuses"] is None
)
assert (
single_backfill_result.data["partitionBackfillOrError"]["runStatus"]
== "NOT_STARTED"
)


def test_launch_asset_backfill_with_non_partitioned_asset():
Expand Down Expand Up @@ -712,6 +723,7 @@ def test_launch_asset_backfill_with_upstream_anchor_asset():
assert backfill_results[0]["partitionSet"] is None
assert backfill_results[0]["partitionSetName"] is None
assert backfill_results[0]["partitionNames"] is None
assert backfill_results[0]["runStatus"] == "NOT_STARTED"


def get_daily_two_hourly_repo() -> RepositoryDefinition:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
__typename
id
status
runStatus
numCancelable
partitionNames
numPartitions
Expand Down Expand Up @@ -87,6 +88,7 @@
partitionBackfillOrError(backfillId: $backfillId) {
__typename
... on PartitionBackfill {
runStatus
assetBackfillData {
assetBackfillStatuses {
... on AssetPartitionsStatusCounts {
Expand Down Expand Up @@ -392,6 +394,7 @@ def test_resume_backfill_failure(self, graphql_context):

class TestDaemonPartitionBackfill(ExecutingGraphQLContextTestMatrix):
def test_launch_full_pipeline_backfill(self, graphql_context):
# TestDaemonPartitionBackfill::test_launch_full_pipeline_backfill[sqlite_with_default_run_launcher_managed_grpc_env]
repository_selector = infer_repository_selector(graphql_context)
result = execute_dagster_graphql(
graphql_context,
Expand Down Expand Up @@ -425,6 +428,7 @@ def test_launch_full_pipeline_backfill(self, graphql_context):
assert result.data["partitionBackfillOrError"]["numCancelable"] == 2
assert result.data["partitionBackfillOrError"]["hasCancelPermission"] is True
assert result.data["partitionBackfillOrError"]["hasResumePermission"] is True
assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED"

assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2

Expand Down Expand Up @@ -499,6 +503,7 @@ def test_launch_partial_backfill(self, graphql_context):
assert result.data["partitionBackfillOrError"]["numCancelable"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2
assert result.data["partitionBackfillOrError"]["reexecutionSteps"] == ["after_failure"]
assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED"

def test_cancel_backfill(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
Expand Down Expand Up @@ -533,6 +538,7 @@ def test_cancel_backfill(self, graphql_context):
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numCancelable"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2
assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED"

result = execute_dagster_graphql(
graphql_context,
Expand All @@ -551,6 +557,7 @@ def test_cancel_backfill(self, graphql_context):
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "CANCELED"
assert result.data["partitionBackfillOrError"]["runStatus"] == "CANCELED"

def test_cancel_asset_backfill(self, graphql_context):
asset_key = AssetKey("hanging_partition_asset")
Expand Down Expand Up @@ -666,6 +673,17 @@ def test_resume_backfill(self, graphql_context):
backfill = graphql_context.instance.get_backfill(backfill_id)
graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.FAILED))

result = execute_dagster_graphql(
graphql_context,
PARTITION_PROGRESS_QUERY,
variables={"backfillId": backfill_id},
)
assert not result.errors
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "FAILED"
assert result.data["partitionBackfillOrError"]["runStatus"] == "FAILURE"

result = execute_dagster_graphql(
graphql_context,
RESUME_BACKFILL_MUTATION,
Expand All @@ -683,6 +701,7 @@ def test_resume_backfill(self, graphql_context):
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED"

def test_backfill_run_stats(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
Expand Down Expand Up @@ -725,12 +744,13 @@ def test_backfill_run_stats(self, graphql_context):
PARTITION_PROGRESS_QUERY,
variables={"backfillId": backfill_id},
)

assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED"
assert not result.errors
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4

run_stats = _get_run_stats(
result.data["partitionBackfillOrError"]["partitionStatuses"]["results"]
)
Expand All @@ -752,6 +772,7 @@ def test_backfill_run_stats(self, graphql_context):
variables={"backfillId": backfill_id},
)
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["runStatus"] == "FAILURE"

def test_asset_job_backfill_run_stats(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
Expand Down Expand Up @@ -798,6 +819,7 @@ def test_asset_job_backfill_run_stats(self, graphql_context):
)
assert run_stats.get("total") == 4
assert run_stats.get("success") == 4
assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED"

def test_asset_backfill_stats_in_topological_order(self, graphql_context):
asset_key_paths_in_topo_order = [
Expand Down Expand Up @@ -885,6 +907,7 @@ def test_asset_backfill_partition_stats(self, graphql_context):
assert not result.errors
assert result.data
backfill_data = result.data["partitionBackfillOrError"]["assetBackfillData"]
assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED"

assert backfill_data["rootTargetedPartitions"]["ranges"] is None
assert set(backfill_data["rootTargetedPartitions"]["partitionKeys"]) == set(partitions)
Expand Down Expand Up @@ -955,6 +978,7 @@ def test_asset_backfill_status_with_upstream_failure(self, graphql_context):
assert not result.errors
assert result.data
backfill_data = result.data["partitionBackfillOrError"]["assetBackfillData"]
assert result.data["partitionBackfillOrError"]["runStatus"] == "STARTED"

assert backfill_data["rootTargetedPartitions"]["ranges"] == [
{"start": "2023-01-09", "end": "2023-01-09"}
Expand All @@ -980,6 +1004,7 @@ def test_asset_backfill_status_with_upstream_failure(self, graphql_context):
assert asset_statuses[2]["numPartitionsFailed"] == 1

def test_backfill_run_completed(self, graphql_context):
# TestDaemonPartitionBackfill::test_backfill_run_completed[sqlite_with_default_run_launcher_managed_grpc_env]
repository_selector = infer_repository_selector(graphql_context)
result = execute_dagster_graphql(
graphql_context,
Expand Down Expand Up @@ -1025,6 +1050,7 @@ def test_backfill_run_completed(self, graphql_context):
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4
assert result.data["partitionBackfillOrError"]["runStatus"] == "SUCCESS"

run_stats = _get_run_stats(
result.data["partitionBackfillOrError"]["partitionStatuses"]["results"]
Expand Down Expand Up @@ -1081,6 +1107,7 @@ def test_backfill_run_incomplete(self, graphql_context):
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4
assert result.data["partitionBackfillOrError"]["runStatus"] == "FAILURE"
run_stats = _get_run_stats(
result.data["partitionBackfillOrError"]["partitionStatuses"]["results"]
)
Expand Down Expand Up @@ -1254,6 +1281,15 @@ def test_launch_from_failure(self, graphql_context):
assert result.data["partitionBackfillOrError"]["numCancelable"] == 2
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 2
assert result.data["partitionBackfillOrError"]["fromFailure"]
assert (
len(
graphql_context.instance.get_run_records(
filters=RunsFilter.for_backfill(backfill_id)
)
)
== 0
)
assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED"

def test_launch_backfill_with_all_partitions_flag(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
Expand Down Expand Up @@ -1289,3 +1325,12 @@ def test_launch_backfill_with_all_partitions_flag(self, graphql_context):
assert result.data["partitionBackfillOrError"]["status"] == "REQUESTED"
assert result.data["partitionBackfillOrError"]["numCancelable"] == 10
assert len(result.data["partitionBackfillOrError"]["partitionNames"]) == 10
assert (
len(
graphql_context.instance.get_run_records(
filters=RunsFilter.for_backfill(backfill_id)
)
)
== 0
)
assert result.data["partitionBackfillOrError"]["runStatus"] == "NOT_STARTED"

0 comments on commit a9049d3

Please sign in to comment.