Skip to content

Commit

Permalink
Use dataclass instead of NamedTuple for serializing AssetCondition (#…
Browse files Browse the repository at this point in the history
…21147)

## Summary & Motivation

We're about to start serializing these things, so making a diving save
to allow us to use the nicer dataclass API instead of NamedTuples.

In the process of doing this, I realized that we had a weird setup with
a `children` property on the base AssetCondition class that didn't work
when using dataclasses. Previously, using NamedTuples meant that the
NamedTuple property management stuff would handle the case where you
added an explicit `children` field on the child class, but dataclasses
don't work in the same way.

Basically we want a few things:
- To have a property on all AssetConditions that lists all
sub-conditions (if any)
- To not have to explicitly store a set of sub-conditions in serialized
form if there aren't any (mostly a dev ergonomics thing, it's a pain to
add a guaranteed-to-be-empty "children" property to all child classes)
- To make it easy to set invariants that certain AssetConditions will
only have a single child, etc.

This new system makes it so that AssetConditions that *do* have
sub-conditions just store them in a field with a different name
(currently we're just doing boolean expressions, so I've used "operand"
/ "operands"), and then implement an explicit "children" property on top
of that field. AssetConditions without sub-conditions just use the
default "children" implementation, which returns an empty list.

Overall, this setup works a lot nicer and simplifies the class
definitions for all these subclasses.

## How I Tested These Changes
  • Loading branch information
OwenKephart authored and yuhan committed Apr 11, 2024
1 parent f60efc7 commit 0c35462
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import hashlib
from abc import ABC, abstractmethod, abstractproperty
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
AbstractSet,
Expand All @@ -19,7 +20,6 @@

import pendulum

import dagster._check as check
from dagster._annotations import experimental
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.metadata import MetadataMapping, MetadataValue
Expand Down Expand Up @@ -307,10 +307,8 @@ def num_requested(self) -> int:
return self.evaluation.true_subset.size


# Adding the NamedTuple inheritance to avoid bugs with the experimental decorator and subclasses.
@experimental
@whitelist_for_serdes
class AssetCondition(NamedTuple("_AssetCondition", []), ABC):
class AssetCondition(ABC):
"""An AssetCondition represents some state of the world that can influence if an asset
partition should be materialized or not. AssetConditions can be combined to create
new conditions using the `&` (and), `|` (or), and `~` (not) operators.
Expand Down Expand Up @@ -345,17 +343,17 @@ def description(self) -> str:
def __and__(self, other: "AssetCondition") -> "AssetCondition":
# group AndAssetConditions together
if isinstance(self, AndAssetCondition):
return AndAssetCondition(children=[*self.children, other])
return AndAssetCondition(children=[self, other])
return AndAssetCondition(operands=[*self.operands, other])
return AndAssetCondition(operands=[self, other])

def __or__(self, other: "AssetCondition") -> "AssetCondition":
# group OrAssetConditions together
if isinstance(self, OrAssetCondition):
return OrAssetCondition(children=[*self.children, other])
return OrAssetCondition(children=[self, other])
return OrAssetCondition(operands=[*self.operands, other])
return OrAssetCondition(operands=[self, other])

def __invert__(self) -> "AssetCondition":
return NotAssetCondition(children=[self])
return NotAssetCondition(operand=self)

@property
def is_legacy(self) -> bool:
Expand Down Expand Up @@ -447,12 +445,12 @@ def parents_updated_since_cron(cron_schedule: str, timezone: str = "UTC") -> "As

@experimental
@whitelist_for_serdes
class RuleCondition( # type: ignore # related to AssetCondition being experimental
NamedTuple("_RuleCondition", [("rule", "AutoMaterializeRule")]),
AssetCondition,
):
@dataclass(frozen=True)
class RuleCondition(AssetCondition):
"""This class represents the condition that a particular AutoMaterializeRule is satisfied."""

rule: "AutoMaterializeRule"

@property
def unique_id(self) -> str:
parts = [self.rule.__class__.__name__, self.description]
Expand All @@ -476,12 +474,16 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition

@experimental
@whitelist_for_serdes
class AndAssetCondition( # type: ignore # related to AssetCondition being experimental
NamedTuple("_AndAssetCondition", [("children", Sequence[AssetCondition])]),
AssetCondition,
):
@dataclass(frozen=True)
class AndAssetCondition(AssetCondition):
"""This class represents the condition that all of its children evaluate to true."""

operands: Sequence[AssetCondition]

@property
def children(self) -> Sequence[AssetCondition]:
return self.operands

@property
def description(self) -> str:
return "All of"
Expand All @@ -499,12 +501,16 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition

@experimental
@whitelist_for_serdes
class OrAssetCondition( # type: ignore # related to AssetCondition being experimental
NamedTuple("_OrAssetCondition", [("children", Sequence[AssetCondition])]),
AssetCondition,
):
@dataclass(frozen=True)
class OrAssetCondition(AssetCondition):
"""This class represents the condition that any of its children evaluate to true."""

operands: Sequence[AssetCondition]

@property
def children(self) -> Sequence[AssetCondition]:
return self.operands

@property
def description(self) -> str:
return "Any of"
Expand All @@ -524,29 +530,25 @@ def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetCondition

@experimental
@whitelist_for_serdes
class NotAssetCondition( # type: ignore # related to AssetCondition being experimental
NamedTuple("_NotAssetCondition", [("children", Sequence[AssetCondition])]),
AssetCondition,
):
@dataclass(frozen=True)
class NotAssetCondition(AssetCondition):
"""This class represents the condition that none of its children evaluate to true."""

def __new__(cls, children: Sequence[AssetCondition]):
check.invariant(len(children) == 1)
return super().__new__(cls, children)
operand: AssetCondition

@property
def description(self) -> str:
return "Not"

@property
def child(self) -> AssetCondition:
return self.children[0]
def children(self) -> Sequence[AssetCondition]:
return [self.operand]

def evaluate(self, context: "AssetConditionEvaluationContext") -> AssetConditionResult:
child_context = context.for_child(
condition=self.child, candidate_subset=context.candidate_subset
condition=self.operand, candidate_subset=context.candidate_subset
)
child_result = self.child.evaluate(child_context)
child_result = self.operand.evaluate(child_context)
true_subset = context.candidate_subset - child_result.true_subset

return AssetConditionResult.create_from_children(context, true_subset, [child_result])
Original file line number Diff line number Diff line change
Expand Up @@ -298,26 +298,26 @@ def to_asset_condition(self) -> "AssetCondition":
return self.asset_condition

materialize_condition = OrAssetCondition(
children=[
operands=[
rule.to_asset_condition()
for rule in sorted(self.materialize_rules, key=lambda rule: rule.description)
]
)
skip_condition = OrAssetCondition(
children=[
operands=[
rule.to_asset_condition()
for rule in sorted(self.skip_rules, key=lambda rule: rule.description)
]
)
children = [
materialize_condition,
NotAssetCondition([skip_condition]),
NotAssetCondition(skip_condition),
]
if self.max_materializations_per_minute:
discard_condition = DiscardOnMaxMaterializationsExceededRule(
self.max_materializations_per_minute
).to_asset_condition()
children.append(NotAssetCondition([discard_condition]))
children.append(NotAssetCondition(discard_condition))

# results in an expression of the form (m1 | m2 | ... | mn) & ~(s1 | s2 | ... | sn) & ~d
return AndAssetCondition(children)
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def evaluate(
# ensure that the top level condition never returns any asset partitions, as otherwise the
# next evaluation will assume that those asset partitions were requested by the machinery
asset_condition = AndAssetCondition(
children=[check.not_none(self.asset_condition), FalseAssetCondition()]
operands=[check.not_none(self.asset_condition), FalseAssetCondition()]
)

with pendulum_freeze_time(self.current_time):
Expand Down

0 comments on commit 0c35462

Please sign in to comment.