diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition.py b/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition.py index 1734901a9c2b3..2dd0f6f9bd94a 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition.py @@ -1,6 +1,7 @@ import functools import hashlib from abc import ABC, abstractmethod, abstractproperty +from dataclasses import dataclass from typing import ( TYPE_CHECKING, AbstractSet, @@ -19,7 +20,6 @@ import pendulum -import dagster._check as check from dagster._annotations import experimental from dagster._core.definitions.events import AssetKey from dagster._core.definitions.metadata import MetadataMapping, MetadataValue @@ -307,10 +307,8 @@ def num_requested(self) -> int: return self.evaluation.true_subset.size -# Adding the NamedTuple inheritance to avoid bugs with the experimental decorator and subclasses. @experimental -@whitelist_for_serdes -class AssetCondition(NamedTuple("_AssetCondition", []), ABC): +class AssetCondition(ABC): """An AssetCondition represents some state of the world that can influence if an asset partition should be materialized or not. AssetConditions can be combined to create new conditions using the `&` (and), `|` (or), and `~` (not) operators. @@ -345,17 +343,17 @@ def description(self) -> str: def __and__(self, other: "AssetCondition") -> "AssetCondition": # group AndAssetConditions together if isinstance(self, AndAssetCondition): - return AndAssetCondition(children=[*self.children, other]) - return AndAssetCondition(children=[self, other]) + return AndAssetCondition(operands=[*self.operands, other]) + return AndAssetCondition(operands=[self, other]) def __or__(self, other: "AssetCondition") -> "AssetCondition": # group OrAssetConditions together if isinstance(self, OrAssetCondition): - return OrAssetCondition(children=[*self.children, other]) - return OrAssetCondition(children=[self, other]) + return OrAssetCondition(operands=[*self.operands, other]) + return OrAssetCondition(operands=[self, other]) def __invert__(self) -> "AssetCondition": - return NotAssetCondition(children=[self]) + return NotAssetCondition(operand=self) @property def is_legacy(self) -> bool: @@ -447,12 +445,12 @@ def parents_updated_since_cron(cron_schedule: str, timezone: str = "UTC") -> "As @experimental @whitelist_for_serdes -class RuleCondition( # type: ignore # related to AssetCondition being experimental - NamedTuple("_RuleCondition", [("rule", "AutoMaterializeRule")]), - AssetCondition, -): +@dataclass(frozen=True) +class RuleCondition(AssetCondition): """This class represents the condition that a particular AutoMaterializeRule is satisfied.""" + rule: "AutoMaterializeRule" + @property def unique_id(self) -> str: parts = [self.rule.__class__.__name__, self.description] @@ -476,12 +474,16 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition @experimental @whitelist_for_serdes -class AndAssetCondition( # type: ignore # related to AssetCondition being experimental - NamedTuple("_AndAssetCondition", [("children", Sequence[AssetCondition])]), - AssetCondition, -): +@dataclass(frozen=True) +class AndAssetCondition(AssetCondition): """This class represents the condition that all of its children evaluate to true.""" + operands: Sequence[AssetCondition] + + @property + def children(self) -> Sequence[AssetCondition]: + return self.operands + @property def description(self) -> str: return "All of" @@ -499,12 +501,16 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition @experimental @whitelist_for_serdes -class OrAssetCondition( # type: ignore # related to AssetCondition being experimental - NamedTuple("_OrAssetCondition", [("children", Sequence[AssetCondition])]), - AssetCondition, -): +@dataclass(frozen=True) +class OrAssetCondition(AssetCondition): """This class represents the condition that any of its children evaluate to true.""" + operands: Sequence[AssetCondition] + + @property + def children(self) -> Sequence[AssetCondition]: + return self.operands + @property def description(self) -> str: return "Any of" @@ -524,29 +530,25 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition @experimental @whitelist_for_serdes -class NotAssetCondition( # type: ignore # related to AssetCondition being experimental - NamedTuple("_NotAssetCondition", [("children", Sequence[AssetCondition])]), - AssetCondition, -): +@dataclass(frozen=True) +class NotAssetCondition(AssetCondition): """This class represents the condition that none of its children evaluate to true.""" - def __new__(cls, children: Sequence[AssetCondition]): - check.invariant(len(children) == 1) - return super().__new__(cls, children) + operand: AssetCondition @property def description(self) -> str: return "Not" @property - def child(self) -> AssetCondition: - return self.children[0] + def children(self) -> Sequence[AssetCondition]: + return [self.operand] def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult: child_context = context.for_child( - condition=self.child, candidate_subset=context.candidate_subset + condition=self.operand, candidate_subset=context.candidate_subset ) - child_result = self.child.evaluate(child_context) + child_result = self.operand.evaluate(child_context) true_subset = context.candidate_subset - child_result.true_subset return AssetConditionResult.create_from_children(context, true_subset, [child_result]) diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py index 17a32d2f9a165..edad2c673eee4 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py @@ -298,26 +298,26 @@ def to_asset_condition(self) -> "AssetCondition": return self.asset_condition materialize_condition = OrAssetCondition( - children=[ + operands=[ rule.to_asset_condition() for rule in sorted(self.materialize_rules, key=lambda rule: rule.description) ] ) skip_condition = OrAssetCondition( - children=[ + operands=[ rule.to_asset_condition() for rule in sorted(self.skip_rules, key=lambda rule: rule.description) ] ) children = [ materialize_condition, - NotAssetCondition([skip_condition]), + NotAssetCondition(skip_condition), ] if self.max_materializations_per_minute: discard_condition = DiscardOnMaxMaterializationsExceededRule( self.max_materializations_per_minute ).to_asset_condition() - children.append(NotAssetCondition([discard_condition])) + children.append(NotAssetCondition(discard_condition)) # results in an expression of the form (m1 | m2 | ... | mn) & ~(s1 | s2 | ... | sn) & ~d return AndAssetCondition(children) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py index 1650949c0effc..db0fdb4e9bf42 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py @@ -45,7 +45,7 @@ def evaluate( # ensure that the top level condition never returns any asset partitions, as otherwise the # next evaluation will assume that those asset partitions were requested by the machinery asset_condition = AndAssetCondition( - children=[check.not_none(self.asset_condition), FalseAssetCondition()] + operands=[check.not_none(self.asset_condition), FalseAssetCondition()] ) with pendulum_freeze_time(self.current_time):