Skip to content

Commit

Permalink
Exclude AllPartitionMappings from stale status calclation (#21146)
Browse files Browse the repository at this point in the history
Summary:
This adds another early breaking point based on profiling that we have
observed causing unacceptably slow staleness calculations - an
unpartitioned asset that is downstream of an AllPartitionsMapping. In
this case, just calculating the list of partition keys can be enough to
slow down the calculations enough here, and the thinking was that in the
majority of cases there will be more than 100 partitions (and even if
there were not, it would be odd and surprising for staleness
calculations to suddenly disappear once the size of the partition set
dips above 100 partitions).

I'm confused because there are comments here (and discussion in
https://github.com/dagster-io/dagster/pull/14265/files) that seems to
indicate we were already doing this, but I can't find any code that
actually does it. Did we take it out at some point?

Test Plan: Will add tests once confusion is resolved

## Summary & Motivation

## How I Tested These Changes
  • Loading branch information
gibsondan authored Apr 11, 2024
1 parent 69fe2ba commit 441d00d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 10 deletions.
25 changes: 15 additions & 10 deletions python_modules/dagster/dagster/_core/definitions/data_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,6 @@ def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterato
f"has a new dependency on {dep_key.asset_key.to_user_string()}",
dep_key,
)
# Currently we exclude assets downstream of AllPartitionMappings from stale
# status logic due to potentially huge numbers of dependencies.
elif self._is_dep_updated(provenance, dep_key):
report_data_version = (
dep_asset.code_version is not None
Expand Down Expand Up @@ -694,35 +692,42 @@ def _get_latest_data_version_record(
)

# If a partition has greater than or equal to SKIP_PARTITION_DATA_VERSION_DEPENDENCY_THRESHOLD
# of dependencies, it is not included in partition_deps. This is for performance reasons. This
# constraint can be removed when we have thoroughly tested performance for large upstream
# partition counts. At that time, the body of this method can just be replaced with a call to
# `asset_graph.get_parents_partitions`, which the logic here largely replicates.
# of dependencies, or is downstream of a time window partition with an AllPartitionsMapping,
# it is not included in partition_deps. This is for performance reasons. Besides this, the
# logic here largely replicates `asset_graph.get_parents_partitions`.
#
# If an asset is self-dependent and has greater than or equal to
# Similarly, If an asset is self-dependent and has greater than or equal to
# SKIP_PARTITION_DATA_VERSION_SELF_DEPENDENCY_THRESHOLD partitions, we don't check the
# self-edge for updated data or propagate other stale causes through the edge. That is because
# the current logic will recurse to the first partition, potentially throwing a recursion error.
# This constraint can be removed when we have thoroughly tested performance for large partition
# counts on self-dependent assets.
@cached_method
def _get_partition_dependencies(
self, *, key: "AssetKeyPartitionKey"
) -> Sequence["AssetKeyPartitionKey"]:
from dagster import AllPartitionMapping
from dagster._core.definitions.events import (
AssetKeyPartitionKey,
)
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition

asset_deps = self.asset_graph.get(key.asset_key).parent_keys

deps = []
for dep_asset_key in asset_deps:
if not self.asset_graph.get(dep_asset_key).is_partitioned:
dep_asset = self.asset_graph.get(dep_asset_key)
if not dep_asset.is_partitioned:
deps.append(AssetKeyPartitionKey(dep_asset_key, None))
elif key.asset_key == dep_asset_key and self._exceeds_self_partition_limit(
key.asset_key
):
continue
elif isinstance(
dep_asset.partitions_def, TimeWindowPartitionsDefinition
) and isinstance(
self.asset_graph.get_partition_mapping(key.asset_key, dep_asset_key),
AllPartitionMapping,
):
continue
else:
upstream_partition_keys = list(
self.asset_graph.get_parent_partition_keys_for_child(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey, Output
from dagster._core.definitions.observe import observe
from dagster._core.definitions.partition import StaticPartitionsDefinition
from dagster._core.definitions.partition_mapping import AllPartitionMapping
from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition
from dagster._core.events import DagsterEventType
Expand Down Expand Up @@ -565,6 +566,52 @@ def asset3(asset1): ...
assert status_resolver.get_status(asset3.key) == StaleStatus.STALE


def test_stale_status_downstream_of_all_partitions_mapping():
start_date = datetime(2020, 1, 1)
end_date = start_date + timedelta(days=2)
start_key = start_date.strftime("%Y-%m-%d")
end_key = (end_date - timedelta(days=1)).strftime("%Y-%m-%d")

partitions_def = DailyPartitionsDefinition(start_date=start_date, end_date=end_date)

@asset(partitions_def=partitions_def)
def asset1():
return 1

@asset(
ins={"asset1": AssetIn(partition_mapping=AllPartitionMapping())},
)
def asset2(asset1):
return 2

all_assets = [asset1, asset2]

# Downstream values are not stale even after upstream changed because of the partition mapping
with instance_for_test() as instance:
for k in partitions_def.get_partition_keys():
materialize_asset(all_assets, asset1, instance, partition_key=k)

materialize_asset(all_assets, asset2, instance)

status_resolver = get_stale_status_resolver(instance, all_assets)
for k in partitions_def.get_partition_keys():
assert status_resolver.get_status(asset1.key, k) == StaleStatus.FRESH

assert status_resolver.get_status(asset2.key, None) == StaleStatus.FRESH

materialize_asset(
all_assets,
asset1,
instance,
partition_key=start_key,
)

status_resolver = get_stale_status_resolver(instance, all_assets)

# Still fresh b/c of the partition mapping
assert status_resolver.get_status(asset2.key, None) == StaleStatus.FRESH


def test_stale_status_many_to_one_partitions() -> None:
partitions_def = StaticPartitionsDefinition(["alpha", "beta"])

Expand Down

0 comments on commit 441d00d

Please sign in to comment.