From 0c35462406ce15ffaee4679112c19d4b8d9e5e13 Mon Sep 17 00:00:00 2001 From: OwenKephart Date: Thu, 11 Apr 2024 07:21:28 -0700 Subject: [PATCH] Use dataclass instead of NamedTuple for serializing AssetCondition (#21147) ## Summary & Motivation We're about to start serializing these things, so making a diving save to allow us to use the nicer dataclass API instead of NamedTuples. In the process of doing this, I realized that we had a weird setup with a `children` property on the base AssetCondition class that didn't work when using dataclasses. Previously, using NamedTuples meant that the NamedTuple property management stuff would handle the case where you added an explicit `children` field on the child class, but dataclasses don't work in the same way. Basically we want a few things: - To have a property on all AssetConditions that lists all sub-conditions (if any) - To not have to explicitly store a set of sub-conditions in serialized form if there aren't any (mostly a dev ergonomics thing, it's a pain to add a guaranteed-to-be-empty "children" property to all child classes) - To make it easy to set invariants that certain AssetConditions will only have a single child, etc. This new system makes it so that AssetConditions that *do* have sub-conditions just store them in a field with a different name (currently we're just doing boolean expressions, so I've used "operand" / "operands"), and then implement an explicit "children" property on top of that field. AssetConditions without sub-conditions just use the default "children" implementation, which returns an empty list. Overall, this setup works a lot nicer and simplifies the class definitions for all these subclasses. ## How I Tested These Changes --- .../asset_condition/asset_condition.py | 66 ++++++++++--------- .../definitions/auto_materialize_policy.py | 8 +-- .../asset_condition_scenario.py | 2 +- 3 files changed, 39 insertions(+), 37 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition.py b/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition.py index 1734901a9c2b3..2dd0f6f9bd94a 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_condition/asset_condition.py @@ -1,6 +1,7 @@ import functools import hashlib from abc import ABC, abstractmethod, abstractproperty +from dataclasses import dataclass from typing import ( TYPE_CHECKING, AbstractSet, @@ -19,7 +20,6 @@ import pendulum -import dagster._check as check from dagster._annotations import experimental from dagster._core.definitions.events import AssetKey from dagster._core.definitions.metadata import MetadataMapping, MetadataValue @@ -307,10 +307,8 @@ def num_requested(self) -> int: return self.evaluation.true_subset.size -# Adding the NamedTuple inheritance to avoid bugs with the experimental decorator and subclasses. @experimental -@whitelist_for_serdes -class AssetCondition(NamedTuple("_AssetCondition", []), ABC): +class AssetCondition(ABC): """An AssetCondition represents some state of the world that can influence if an asset partition should be materialized or not. AssetConditions can be combined to create new conditions using the `&` (and), `|` (or), and `~` (not) operators. @@ -345,17 +343,17 @@ def description(self) -> str: def __and__(self, other: "AssetCondition") -> "AssetCondition": # group AndAssetConditions together if isinstance(self, AndAssetCondition): - return AndAssetCondition(children=[*self.children, other]) - return AndAssetCondition(children=[self, other]) + return AndAssetCondition(operands=[*self.operands, other]) + return AndAssetCondition(operands=[self, other]) def __or__(self, other: "AssetCondition") -> "AssetCondition": # group OrAssetConditions together if isinstance(self, OrAssetCondition): - return OrAssetCondition(children=[*self.children, other]) - return OrAssetCondition(children=[self, other]) + return OrAssetCondition(operands=[*self.operands, other]) + return OrAssetCondition(operands=[self, other]) def __invert__(self) -> "AssetCondition": - return NotAssetCondition(children=[self]) + return NotAssetCondition(operand=self) @property def is_legacy(self) -> bool: @@ -447,12 +445,12 @@ def parents_updated_since_cron(cron_schedule: str, timezone: str = "UTC") -> "As @experimental @whitelist_for_serdes -class RuleCondition( # type: ignore # related to AssetCondition being experimental - NamedTuple("_RuleCondition", [("rule", "AutoMaterializeRule")]), - AssetCondition, -): +@dataclass(frozen=True) +class RuleCondition(AssetCondition): """This class represents the condition that a particular AutoMaterializeRule is satisfied.""" + rule: "AutoMaterializeRule" + @property def unique_id(self) -> str: parts = [self.rule.__class__.__name__, self.description] @@ -476,12 +474,16 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition @experimental @whitelist_for_serdes -class AndAssetCondition( # type: ignore # related to AssetCondition being experimental - NamedTuple("_AndAssetCondition", [("children", Sequence[AssetCondition])]), - AssetCondition, -): +@dataclass(frozen=True) +class AndAssetCondition(AssetCondition): """This class represents the condition that all of its children evaluate to true.""" + operands: Sequence[AssetCondition] + + @property + def children(self) -> Sequence[AssetCondition]: + return self.operands + @property def description(self) -> str: return "All of" @@ -499,12 +501,16 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition @experimental @whitelist_for_serdes -class OrAssetCondition( # type: ignore # related to AssetCondition being experimental - NamedTuple("_OrAssetCondition", [("children", Sequence[AssetCondition])]), - AssetCondition, -): +@dataclass(frozen=True) +class OrAssetCondition(AssetCondition): """This class represents the condition that any of its children evaluate to true.""" + operands: Sequence[AssetCondition] + + @property + def children(self) -> Sequence[AssetCondition]: + return self.operands + @property def description(self) -> str: return "Any of" @@ -524,29 +530,25 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition @experimental @whitelist_for_serdes -class NotAssetCondition( # type: ignore # related to AssetCondition being experimental - NamedTuple("_NotAssetCondition", [("children", Sequence[AssetCondition])]), - AssetCondition, -): +@dataclass(frozen=True) +class NotAssetCondition(AssetCondition): """This class represents the condition that none of its children evaluate to true.""" - def __new__(cls, children: Sequence[AssetCondition]): - check.invariant(len(children) == 1) - return super().__new__(cls, children) + operand: AssetCondition @property def description(self) -> str: return "Not" @property - def child(self) -> AssetCondition: - return self.children[0] + def children(self) -> Sequence[AssetCondition]: + return [self.operand] def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult: child_context = context.for_child( - condition=self.child, candidate_subset=context.candidate_subset + condition=self.operand, candidate_subset=context.candidate_subset ) - child_result = self.child.evaluate(child_context) + child_result = self.operand.evaluate(child_context) true_subset = context.candidate_subset - child_result.true_subset return AssetConditionResult.create_from_children(context, true_subset, [child_result]) diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py index 17a32d2f9a165..edad2c673eee4 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_policy.py @@ -298,26 +298,26 @@ def to_asset_condition(self) -> "AssetCondition": return self.asset_condition materialize_condition = OrAssetCondition( - children=[ + operands=[ rule.to_asset_condition() for rule in sorted(self.materialize_rules, key=lambda rule: rule.description) ] ) skip_condition = OrAssetCondition( - children=[ + operands=[ rule.to_asset_condition() for rule in sorted(self.skip_rules, key=lambda rule: rule.description) ] ) children = [ materialize_condition, - NotAssetCondition([skip_condition]), + NotAssetCondition(skip_condition), ] if self.max_materializations_per_minute: discard_condition = DiscardOnMaxMaterializationsExceededRule( self.max_materializations_per_minute ).to_asset_condition() - children.append(NotAssetCondition([discard_condition])) + children.append(NotAssetCondition(discard_condition)) # results in an expression of the form (m1 | m2 | ... | mn) & ~(s1 | s2 | ... | sn) & ~d return AndAssetCondition(children) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py index 1650949c0effc..db0fdb4e9bf42 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/asset_condition_scenario.py @@ -45,7 +45,7 @@ def evaluate( # ensure that the top level condition never returns any asset partitions, as otherwise the # next evaluation will assume that those asset partitions were requested by the machinery asset_condition = AndAssetCondition( - children=[check.not_none(self.asset_condition), FalseAssetCondition()] + operands=[check.not_none(self.asset_condition), FalseAssetCondition()] ) with pendulum_freeze_time(self.current_time):