diff --git a/python_modules/dagster/dagster/_core/storage/dagster_run.py b/python_modules/dagster/dagster/_core/storage/dagster_run.py index f549a0cc57667..b3ac8eff869cd 100644 --- a/python_modules/dagster/dagster/_core/storage/dagster_run.py +++ b/python_modules/dagster/dagster/_core/storage/dagster_run.py @@ -35,6 +35,7 @@ TICK_ID_TAG, ) from dagster._core.utils import make_new_run_id +from dagster._record import IHaveNew, LegacyNamedTupleMixin, record_custom from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes if TYPE_CHECKING: @@ -508,22 +509,8 @@ def tags_for_tick_id(tick_id: str, has_evaluations: bool = False) -> Mapping[str return {TICK_ID_TAG: tick_id, **automation_tags} -class RunsFilter( - NamedTuple( - "_RunsFilter", - [ - ("run_ids", Sequence[str]), - ("job_name", Optional[str]), - ("statuses", Sequence[DagsterRunStatus]), - ("tags", Mapping[str, Union[str, Sequence[str]]]), - ("snapshot_id", Optional[str]), - ("updated_after", Optional[datetime]), - ("updated_before", Optional[datetime]), - ("created_after", Optional[datetime]), - ("created_before", Optional[datetime]), - ], - ) -): +@record_custom +class RunsFilter(IHaveNew, LegacyNamedTupleMixin): """Defines a filter across job runs, for use when querying storage directly. Each field of the RunsFilter represents a logical AND with each other. For @@ -545,6 +532,16 @@ class RunsFilter( """ + run_ids: Sequence[str] + job_name: Optional[str] + statuses: Sequence[DagsterRunStatus] + tags: Mapping[str, Union[str, Sequence[str]]] + snapshot_id: Optional[str] + updated_after: Optional[datetime] + updated_before: Optional[datetime] + created_after: Optional[datetime] + created_before: Optional[datetime] + def __new__( cls, run_ids: Optional[Sequence[str]] = None, @@ -559,17 +556,17 @@ def __new__( ): check.invariant(run_ids != [], "When filtering on run ids, a non-empty list must be used.") - return super(RunsFilter, cls).__new__( + return super().__new__( cls, - run_ids=check.opt_sequence_param(run_ids, "run_ids", of_type=str), - job_name=check.opt_str_param(job_name, "job_name"), - statuses=check.opt_sequence_param(statuses, "statuses", of_type=DagsterRunStatus), - tags=check.opt_mapping_param(tags, "tags", key_type=str), - snapshot_id=check.opt_str_param(snapshot_id, "snapshot_id"), - updated_after=check.opt_inst_param(updated_after, "updated_after", datetime), - updated_before=check.opt_inst_param(updated_before, "updated_before", datetime), - created_after=check.opt_inst_param(created_after, "created_after", datetime), - created_before=check.opt_inst_param(created_before, "created_before", datetime), + run_ids=run_ids or [], + job_name=job_name, + statuses=statuses or [], + tags=tags or {}, + snapshot_id=snapshot_id, + updated_after=updated_after, + updated_before=updated_before, + created_after=created_after, + created_before=created_before, ) @staticmethod