Skip to content

Commit

Permalink
Revert "Revert "Fix bootstrapping behavior of MaterializeOnCron (#20020
Browse files Browse the repository at this point in the history
…)" (#2…"

This reverts commit 43592d6.
  • Loading branch information
OwenKephart authored Mar 6, 2024
1 parent 1f44f9a commit 5607969
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ def missed_cron_ticks(
self, context: AssetConditionEvaluationContext
) -> Sequence[datetime.datetime]:
"""Returns the cron ticks which have been missed since the previous cursor was generated."""
if not context.previous_evaluation_timestamp:
# if it's the first time evaluating this rule, then just count the latest tick as missed
if not context.previous_evaluation or not context.previous_evaluation_timestamp:
previous_dt = next(
reverse_cron_string_iterator(
end_timestamp=context.evaluation_time.timestamp(),
Expand All @@ -314,11 +315,9 @@ def missed_cron_ticks(
missed_ticks.append(dt)
return missed_ticks

def get_new_asset_partitions_to_request(
self, context: AssetConditionEvaluationContext
def get_new_candidate_asset_partitions(
self, context: AssetConditionEvaluationContext, missed_ticks: Sequence[datetime.datetime]
) -> AbstractSet[AssetKeyPartitionKey]:
missed_ticks = self.missed_cron_ticks(context)

if not missed_ticks:
return set()

Expand Down Expand Up @@ -382,9 +381,20 @@ def evaluate_for_asset(
) -> "AssetConditionResult":
from .asset_condition.asset_condition import AssetConditionResult

new_asset_partitions_to_request = self.get_new_asset_partitions_to_request(context)
missed_ticks = self.missed_cron_ticks(context)
new_asset_partitions = self.get_new_candidate_asset_partitions(context, missed_ticks)

# if it's the first time evaluating this rule, must query for the actual subset that has
# been materialized since the previous cron tick, as materializations may have happened
# before the previous evaluation, which
# `context.materialized_requested_or_discarded_since_previous_tick_subset` would not capture
if context.previous_evaluation is None:
new_asset_partitions -= context.instance_queryer.get_asset_subset_updated_after_time(
asset_key=context.asset_key, after_time=missed_ticks[-1]
).asset_partitions

asset_subset_to_request = AssetSubset.from_asset_partitions_set(
context.asset_key, context.partitions_def, new_asset_partitions_to_request
context.asset_key, context.partitions_def, new_asset_partitions
) | (
context.previous_true_subset.as_valid(context.partitions_def)
- context.materialized_requested_or_discarded_since_previous_tick_subset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,38 @@ def get_cron_policy(
.evaluate_tick()
.assert_requested_runs(),
),
AssetDaemonScenario(
id="basic_hourly_cron_unpartitioned_rule_added_later",
initial_state=one_asset.with_asset_properties(
# this policy will never materialize the asset
auto_materialize_policy=AutoMaterializePolicy(
rules={AutoMaterializeRule.skip_on_parent_missing()}
)
),
execution_fn=lambda state: state.evaluate_tick()
.assert_requested_runs()
# rule added after the first tick, should capture that this asset was not materialized
# since the previous tick
.with_asset_properties(auto_materialize_policy=get_cron_policy(basic_hourly_cron_schedule))
.evaluate_tick()
.assert_requested_runs(run_request(["A"]))
.with_not_started_runs()
# back to the original policy which never materializes
.with_current_time_advanced(seconds=30)
.with_asset_properties(
auto_materialize_policy=AutoMaterializePolicy(
rules={AutoMaterializeRule.skip_on_parent_missing()}
)
)
.evaluate_tick()
.assert_requested_runs()
# now we add the policy back in, but it's already been materialized since the previous tick
# so it shouldn't execute again
.with_current_time_advanced(seconds=30)
.with_asset_properties(auto_materialize_policy=get_cron_policy(basic_hourly_cron_schedule))
.evaluate_tick()
.assert_requested_runs(),
),
AssetDaemonScenario(
id="basic_hourly_cron_unpartitioned_multi_asset",
initial_spec=three_assets_not_subsettable.with_asset_properties(
Expand Down

0 comments on commit 5607969

Please sign in to comment.