Skip to content

Commit

Permalink
convert RunsFilter to @record (#24457)
Browse files Browse the repository at this point in the history
## Summary & Motivation

## How I Tested These Changes

## Changelog

NOCHANGELOG
  • Loading branch information
jamiedemaria authored Sep 13, 2024
1 parent 11ac087 commit c3cdb63
Showing 1 changed file with 23 additions and 26 deletions.
49 changes: 23 additions & 26 deletions python_modules/dagster/dagster/_core/storage/dagster_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
TICK_ID_TAG,
)
from dagster._core.utils import make_new_run_id
from dagster._record import IHaveNew, LegacyNamedTupleMixin, record_custom
from dagster._serdes.serdes import NamedTupleSerializer, whitelist_for_serdes

if TYPE_CHECKING:
Expand Down Expand Up @@ -508,22 +509,8 @@ def tags_for_tick_id(tick_id: str, has_evaluations: bool = False) -> Mapping[str
return {TICK_ID_TAG: tick_id, **automation_tags}


class RunsFilter(
NamedTuple(
"_RunsFilter",
[
("run_ids", Sequence[str]),
("job_name", Optional[str]),
("statuses", Sequence[DagsterRunStatus]),
("tags", Mapping[str, Union[str, Sequence[str]]]),
("snapshot_id", Optional[str]),
("updated_after", Optional[datetime]),
("updated_before", Optional[datetime]),
("created_after", Optional[datetime]),
("created_before", Optional[datetime]),
],
)
):
@record_custom
class RunsFilter(IHaveNew, LegacyNamedTupleMixin):
"""Defines a filter across job runs, for use when querying storage directly.
Each field of the RunsFilter represents a logical AND with each other. For
Expand All @@ -545,6 +532,16 @@ class RunsFilter(
"""

run_ids: Sequence[str]
job_name: Optional[str]
statuses: Sequence[DagsterRunStatus]
tags: Mapping[str, Union[str, Sequence[str]]]
snapshot_id: Optional[str]
updated_after: Optional[datetime]
updated_before: Optional[datetime]
created_after: Optional[datetime]
created_before: Optional[datetime]

def __new__(
cls,
run_ids: Optional[Sequence[str]] = None,
Expand All @@ -559,17 +556,17 @@ def __new__(
):
check.invariant(run_ids != [], "When filtering on run ids, a non-empty list must be used.")

return super(RunsFilter, cls).__new__(
return super().__new__(
cls,
run_ids=check.opt_sequence_param(run_ids, "run_ids", of_type=str),
job_name=check.opt_str_param(job_name, "job_name"),
statuses=check.opt_sequence_param(statuses, "statuses", of_type=DagsterRunStatus),
tags=check.opt_mapping_param(tags, "tags", key_type=str),
snapshot_id=check.opt_str_param(snapshot_id, "snapshot_id"),
updated_after=check.opt_inst_param(updated_after, "updated_after", datetime),
updated_before=check.opt_inst_param(updated_before, "updated_before", datetime),
created_after=check.opt_inst_param(created_after, "created_after", datetime),
created_before=check.opt_inst_param(created_before, "created_before", datetime),
run_ids=run_ids or [],
job_name=job_name,
statuses=statuses or [],
tags=tags or {},
snapshot_id=snapshot_id,
updated_after=updated_after,
updated_before=updated_before,
created_after=created_after,
created_before=created_before,
)

@staticmethod
Expand Down

0 comments on commit c3cdb63

Please sign in to comment.