Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dataclass instead of NamedTuple for serializing AssetCondition #21147

Merged
merged 2 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import hashlib
from abc import ABC, abstractmethod, abstractproperty
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand All @@ -19,7 +20,6 @@

import pendulum

import dagster._check as check
from dagster._annotations import experimental
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.metadata import MetadataMapping, MetadataValue
Expand Down Expand Up @@ -307,10 +307,8 @@ def num_requested(self) -> int:
return self.evaluation.true_subset.size


# Adding the NamedTuple inheritance to avoid bugs with the experimental decorator and subclasses.
@experimental
@whitelist_for_serdes
class AssetCondition(NamedTuple("_AssetCondition", []), ABC):
class AssetCondition(ABC):
"""An AssetCondition represents some state of the world that can influence if an asset
partition should be materialized or not. AssetConditions can be combined to create
new conditions using the `&` (and), `|` (or), and `~` (not) operators.
Expand Down Expand Up @@ -345,17 +343,17 @@ def description(self) -> str:
def __and__(self, other: "AssetCondition") -> "AssetCondition":
# group AndAssetConditions together
if isinstance(self, AndAssetCondition):
return AndAssetCondition(children=[*self.children, other])
return AndAssetCondition(children=[self, other])
return AndAssetCondition(operands=[*self.operands, other])
return AndAssetCondition(operands=[self, other])

def __or__(self, other: "AssetCondition") -> "AssetCondition":
# group OrAssetConditions together
if isinstance(self, OrAssetCondition):
return OrAssetCondition(children=[*self.children, other])
return OrAssetCondition(children=[self, other])
return OrAssetCondition(operands=[*self.operands, other])
return OrAssetCondition(operands=[self, other])

def __invert__(self) -> "AssetCondition":
return NotAssetCondition(children=[self])
return NotAssetCondition(operand=self)

@property
def is_legacy(self) -> bool:
Expand Down Expand Up @@ -447,12 +445,12 @@ def parents_updated_since_cron(cron_schedule: str, timezone: str = "UTC") -> "As

@experimental
@whitelist_for_serdes
class RuleCondition( # type: ignore # related to AssetCondition being experimental
NamedTuple("_RuleCondition", [("rule", "AutoMaterializeRule")]),
AssetCondition,
):
@dataclass(frozen=True)
class RuleCondition(AssetCondition):
"""This class represents the condition that a particular AutoMaterializeRule is satisfied."""

rule: "AutoMaterializeRule"

@property
def unique_id(self) -> str:
parts = [self.rule.__class__.__name__, self.description]
Expand All @@ -476,12 +474,16 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition

@experimental
@whitelist_for_serdes
class AndAssetCondition( # type: ignore # related to AssetCondition being experimental
NamedTuple("_AndAssetCondition", [("children", Sequence[AssetCondition])]),
AssetCondition,
):
@dataclass(frozen=True)
class AndAssetCondition(AssetCondition):
"""This class represents the condition that all of its children evaluate to true."""

operands: Sequence[AssetCondition]

@property
def children(self) -> Sequence[AssetCondition]:
return self.operands

@property
def description(self) -> str:
return "All of"
Expand All @@ -499,12 +501,16 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition

@experimental
@whitelist_for_serdes
class OrAssetCondition( # type: ignore # related to AssetCondition being experimental
NamedTuple("_OrAssetCondition", [("children", Sequence[AssetCondition])]),
AssetCondition,
):
@dataclass(frozen=True)
class OrAssetCondition(AssetCondition):
"""This class represents the condition that any of its children evaluate to true."""

operands: Sequence[AssetCondition]

@property
def children(self) -> Sequence[AssetCondition]:
return self.operands

@property
def description(self) -> str:
return "Any of"
Expand All @@ -524,29 +530,25 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition

@experimental
@whitelist_for_serdes
class NotAssetCondition( # type: ignore # related to AssetCondition being experimental
NamedTuple("_NotAssetCondition", [("children", Sequence[AssetCondition])]),
AssetCondition,
):
@dataclass(frozen=True)
class NotAssetCondition(AssetCondition):
"""This class represents the condition that none of its children evaluate to true."""

def __new__(cls, children: Sequence[AssetCondition]):
check.invariant(len(children) == 1)
return super().__new__(cls, children)
operand: AssetCondition

@property
def description(self) -> str:
return "Not"

@property
def child(self) -> AssetCondition:
return self.children[0]
def children(self) -> Sequence[AssetCondition]:
return [self.operand]

def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult:
child_context = context.for_child(
condition=self.child, candidate_subset=context.candidate_subset
condition=self.operand, candidate_subset=context.candidate_subset
)
child_result = self.child.evaluate(child_context)
child_result = self.operand.evaluate(child_context)
true_subset = context.candidate_subset - child_result.true_subset

return AssetConditionResult.create_from_children(context, true_subset, [child_result])
Original file line number Diff line number Diff line change
Expand Up @@ -298,26 +298,26 @@ def to_asset_condition(self) -> "AssetCondition":
return self.asset_condition

materialize_condition = OrAssetCondition(
children=[
operands=[
rule.to_asset_condition()
for rule in sorted(self.materialize_rules, key=lambda rule: rule.description)
]
)
skip_condition = OrAssetCondition(
children=[
operands=[
rule.to_asset_condition()
for rule in sorted(self.skip_rules, key=lambda rule: rule.description)
]
)
children = [
materialize_condition,
NotAssetCondition([skip_condition]),
NotAssetCondition(skip_condition),
]
if self.max_materializations_per_minute:
discard_condition = DiscardOnMaxMaterializationsExceededRule(
self.max_materializations_per_minute
).to_asset_condition()
children.append(NotAssetCondition([discard_condition]))
children.append(NotAssetCondition(discard_condition))

# results in an expression of the form (m1 | m2 | ... | mn) & ~(s1 | s2 | ... | sn) & ~d
return AndAssetCondition(children)
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def evaluate(
# ensure that the top level condition never returns any asset partitions, as otherwise the
# next evaluation will assume that those asset partitions were requested by the machinery
asset_condition = AndAssetCondition(
children=[check.not_none(self.asset_condition), FalseAssetCondition()]
operands=[check.not_none(self.asset_condition), FalseAssetCondition()]
)

with pendulum_freeze_time(self.current_time):
Expand Down