Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add BulkActionStatuses COMPLETED_SUCCESS and COMPLETED_FAILURE #24365

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ const labelForBackfillStatus = (key: BulkActionStatus) => {
return 'Failed';
case BulkActionStatus.REQUESTED:
return 'In progress';
case BulkActionStatus.COMPLETED_SUCCESS:
return 'Success';
case BulkActionStatus.COMPLETED_FAILED:
return 'Failed';
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ export const BackfillStatusTagForPage = ({backfill}: {backfill: BackfillState})
return errorState('Failed');
case BulkActionStatus.COMPLETED:
return <Tag intent="success">Completed</Tag>;
case BulkActionStatus.COMPLETED_SUCCESS:
return <Tag intent="success">Succeeded</Tag>;
case BulkActionStatus.COMPLETED_FAILED:
return errorState('Failed');
default:
return <Tag>{status}</Tag>;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class GrapheneBulkActionStatus(graphene.Enum):
FAILED = "FAILED"
CANCELED = "CANCELED"
CANCELING = "CANCELING"
COMPLETED_SUCCESS = "COMPLETED_SUCCESS"
COMPLETED_FAILED = "COMPLETED_FAILED"

class Meta:
name = "BulkActionStatus"
Expand All @@ -130,6 +132,10 @@ def to_dagster_run_status(self) -> GrapheneRunStatus:
return GrapheneRunStatus.STARTED # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.COMPLETED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.SUCCESS # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.COMPLETED_SUCCESS.value: # pyright: ignore[reportAttributeAccessIssue]
Copy link
Contributor Author

@jamiedemaria jamiedemaria Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to think of other potential solutions for this conversion (if we maintain the runStatus graphene attr i mentioned in my other comment). It looks like we'll need to convert run statuses to bulk action statuses too for filtering, so i think a consistent solution is in order. Another thing that i can stack on, but isn't strictly required for this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two lines added here look fine to me. But wondering more generally what the filtering experience looks like -- if I filter to queued runs, do I see runs in the backfill? Or do I see the whole backfill display as a single run if any run is queued? Or something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scheme were going with for filtering is that the row-level object is what gets filtered. So if you are in the combined single runs + backfills view, and filter for queued status, you will only see single-runs that have queued status. and no backfills (even if the backfill has queued runs)

return GrapheneRunStatus.SUCCESS # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.COMPLETED_FAILED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.FAILURE # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.FAILED.value: # pyright: ignore[reportAttributeAccessIssue]
return GrapheneRunStatus.FAILURE # pyright: ignore[reportReturnType]
if self.args[0] == GrapheneBulkActionStatus.CANCELED.value: # pyright: ignore[reportAttributeAccessIssue]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,14 +747,16 @@ def test_backfill_run_stats(self, graphql_context):
backfill = graphql_context.instance.get_backfill(backfill_id)

# Artificially mark the backfill as complete - verify run status is INCOMPLETE until the runs all succeed
graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS)
)

result = execute_dagster_graphql(
graphql_context,
PARTITION_PROGRESS_QUERY,
variables={"backfillId": backfill_id},
)
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_SUCCESS"

def test_asset_job_backfill_run_stats(self, graphql_context):
repository_selector = infer_repository_selector(graphql_context)
Expand Down Expand Up @@ -1004,7 +1006,9 @@ def test_backfill_run_completed(self, graphql_context):

backfill = graphql_context.instance.get_backfill(backfill_id)

graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS)
)

_seed_runs(
graphql_context,
Expand All @@ -1026,7 +1030,7 @@ def test_backfill_run_completed(self, graphql_context):
assert not result.errors
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_SUCCESS"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4

run_stats = _get_run_stats(
Expand Down Expand Up @@ -1060,7 +1064,9 @@ def test_backfill_run_incomplete(self, graphql_context):

backfill = graphql_context.instance.get_backfill(backfill_id)

graphql_context.instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
graphql_context.instance.update_backfill(
backfill.with_status(BulkActionStatus.COMPLETED_FAILED)
)

_seed_runs(
graphql_context,
Expand All @@ -1082,7 +1088,7 @@ def test_backfill_run_incomplete(self, graphql_context):
assert not result.errors
assert result.data
assert result.data["partitionBackfillOrError"]["__typename"] == "PartitionBackfill"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED"
assert result.data["partitionBackfillOrError"]["status"] == "COMPLETED_FAILED"
assert result.data["partitionBackfillOrError"]["numPartitions"] == 4
assert len(result.data["partitionBackfillOrError"]["cancelableRuns"]) == 1
run_stats = _get_run_stats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _create_backfill(graphql_context) -> str:
backfill = PartitionBackfill(
backfill_id=make_new_backfill_id(),
serialized_asset_backfill_data="foo", # the content of the backfill doesn't matter for testing fetching mega runs
status=BulkActionStatus.COMPLETED,
status=BulkActionStatus.COMPLETED_SUCCESS,
reexecution_steps=None,
tags=None,
backfill_timestamp=get_current_timestamp(),
Expand Down
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/_cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ def _execute_backfill_command_at_location(
if dagster_run:
instance.submit_run(dagster_run.run_id, workspace)

# TODO - figure out what to do here
instance.add_backfill(backfill_job.with_status(BulkActionStatus.COMPLETED))

print_fn(f"Launched backfill job `{backfill_id}`")
Expand Down
12 changes: 9 additions & 3 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,9 +1032,15 @@ def execute_asset_backfill_iteration(
# failure, or cancellation). Since the AssetBackfillData object stores materialization states
# per asset partition, the daemon continues to update the backfill data until all runs have
# finished in order to display the final partition statuses in the UI.
updated_backfill: PartitionBackfill = updated_backfill.with_status(
BulkActionStatus.COMPLETED
)
if (
updated_backfill_data.failed_and_downstream_subset.num_partitions_and_non_partitioned_assets
> 0
):
updated_backfill = updated_backfill.with_status(BulkActionStatus.COMPLETED_FAILED)
else:
updated_backfill: PartitionBackfill = updated_backfill.with_status(
BulkActionStatus.COMPLETED_SUCCESS
)
instance.update_backfill(updated_backfill)

new_materialized_partitions = (
Expand Down
6 changes: 4 additions & 2 deletions python_modules/dagster/dagster/_core/execution/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
@whitelist_for_serdes
class BulkActionStatus(Enum):
REQUESTED = "REQUESTED"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
COMPLETED = "COMPLETED" # deprecated. Use COMPLETED_SUCCESS or COMPLETED_FAILED instead
FAILED = "FAILED" # denotes when there is a daemon failure, or some other issue processing the backfill
CANCELING = "CANCELING"
CANCELED = "CANCELED"
COMPLETED_SUCCESS = "COMPLETED_SUCCESS"
COMPLETED_FAILED = "COMPLETED_FAILED" # denotes that the backfill daemon completed successfully, but some runs failed

@staticmethod
def from_graphql_input(graphql_str):
Expand Down
15 changes: 14 additions & 1 deletion python_modules/dagster/dagster/_core/execution/job_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,20 @@ def execute_job_backfill_iteration(
f"Backfill completed for {backfill.backfill_id} for"
f" {len(partition_names)} partitions"
)
instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED))
if (
len(
instance.get_run_ids(
filters=RunsFilter(
tags=DagsterRun.tags_for_backfill_id(backfill.backfill_id),
statuses=[DagsterRunStatus.FAILURE, DagsterRunStatus.CANCELED],
)
)
)
> 0
):
instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED_FAILED))
else:
instance.update_backfill(backfill.with_status(BulkActionStatus.COMPLETED_SUCCESS))
yield None


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ def test_job_backfill_status(
assert instance.get_runs_count() == 3
backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


@pytest.mark.skipif(IS_WINDOWS, reason="flaky in windows")
Expand Down Expand Up @@ -1250,7 +1250,7 @@ def test_pure_asset_backfill(
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill("backfill_with_asset_selection")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_backfill_from_failure_for_subselection(
Expand Down Expand Up @@ -2044,7 +2044,7 @@ def test_asset_job_backfill_single_run_multiple_iterations(
assert instance.get_runs_count() == 1
backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_asset_job_backfill_multi_run(
Expand Down Expand Up @@ -2656,7 +2656,7 @@ def override_backfill_storage_setting(self):
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill("backfill_with_asset_selection")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS

# set num_lines high so we know we get all of the remaining logs
os.environ["DAGSTER_CAPTURED_LOG_CHUNK_SIZE"] = "100"
Expand Down Expand Up @@ -2726,7 +2726,7 @@ def test_asset_backfill_from_asset_graph_subset(
list(execute_backfill_iteration(workspace_context, get_default_daemon_logger("BackfillDaemon")))
backfill = instance.get_backfill("backfill_from_asset_graph_subset")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions(
Expand Down Expand Up @@ -2792,4 +2792,4 @@ def test_asset_backfill_from_asset_graph_subset_with_static_and_time_partitions(
"backfill_from_asset_graph_subset_with_static_and_time_partitions"
)
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_simple(instance: DagsterInstance, external_repo: ExternalRepository):
launch_process.join(timeout=60)
backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS


@pytest.mark.skipif(
Expand Down Expand Up @@ -115,7 +115,7 @@ def test_before_submit(

backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
assert instance.get_runs_count() == 3


Expand Down Expand Up @@ -162,5 +162,5 @@ def test_crash_after_submit(

backfill = instance.get_backfill("simple")
assert backfill
assert backfill.status == BulkActionStatus.COMPLETED
assert backfill.status == BulkActionStatus.COMPLETED_SUCCESS
assert instance.get_runs_count() == 3
Original file line number Diff line number Diff line change
Expand Up @@ -1361,7 +1361,7 @@ def test_backfill(self, storage: RunStorage):
backfill = storage.get_backfill(one.backfill_id)
assert backfill == one

storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED))
storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED_SUCCESS))
assert len(storage.get_backfills()) == 1
assert len(storage.get_backfills(status=BulkActionStatus.REQUESTED)) == 0

Expand Down Expand Up @@ -1391,7 +1391,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
assert (
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED])
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED_SUCCESS])
)
)
== 0
Expand All @@ -1400,7 +1400,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
len(
storage.get_backfills(
filters=BulkActionsFilter(
statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED]
statuses=[BulkActionStatus.COMPLETED_SUCCESS, BulkActionStatus.REQUESTED]
)
)
)
Expand All @@ -1411,7 +1411,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
)
assert backfills[0] == one

storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED))
storage.update_backfill(one.with_status(status=BulkActionStatus.COMPLETED_SUCCESS))
assert (
len(
storage.get_backfills(
Expand All @@ -1423,7 +1423,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
assert (
len(
storage.get_backfills(
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED])
filters=BulkActionsFilter(statuses=[BulkActionStatus.COMPLETED_SUCCESS])
)
)
== 1
Expand All @@ -1443,7 +1443,7 @@ def test_backfill_status_filtering(self, storage: RunStorage):
len(
storage.get_backfills(
filters=BulkActionsFilter(
statuses=[BulkActionStatus.COMPLETED, BulkActionStatus.REQUESTED]
statuses=[BulkActionStatus.COMPLETED_SUCCESS, BulkActionStatus.REQUESTED]
)
)
)
Expand Down
Loading