Skip to content

Commit

Permalink
Revert "Fix bootstrapping behavior of MaterializeOnCron (#20020)" (#2…
Browse files Browse the repository at this point in the history
…0285)

## Summary & Motivation

#20020 borked master. This reverts the change.


## How I Tested These Changes

On master:

```
(dagster-3.11.5-2024-02-25) ➜  dagster git:(master) ✗ pytest python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon.py
WARNING:root:Unable to detect CI environment.  No test analytics will be sent.
====================================================================== test session starts =======================================================================
platform darwin -- Python 3.11.5, pytest-7.4.4, pluggy-1.4.0
rootdir: /Users/schrockn/code/dagster-io/dagster
configfile: pyproject.toml
plugins: syrupy-4.6.1, cases-3.8.2, anyio-4.3.0, time-machine-2.13.0, dependency-0.5.1, typeguard-4.1.5, mock-3.3.1, requests-mock-1.11.0, cov-2.10.1, xdist-3.3.1, buildkite-test-collector-0.1.7, hypothesis-6.98.12, rerunfailures-10.0
collected 0 items / 1 error

============================================================================= ERRORS =============================================================================
______________________ ERROR collecting python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon.py _______________________
python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon.py:62: in <module>
    from .updated_scenarios.cron_scenarios import (
python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/updated_scenarios/cron_scenarios.py:74: in <module>
    AssetDaemonScenario(
E   TypeError: AssetDaemonScenario.__new__() got an unexpected keyword argument 'initial_state'
==================================================================== short test summary info =====================================================================
ERROR python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon.py - TypeError: AssetDaemonScenario.__new__() got an unexpected keyword argument 'initial_state'
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Interrupted: 1 error during collection !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
======================================================================== 1 error in 0.37s =======================================================================
```

On revert branch:

```
(dagster-3.11.5-2024-02-25) ➜  dagster git:(revert-20020) ✗ pytest python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon.py
WARNING:root:Unable to detect CI environment.  No test analytics will be sent.
====================================================================== test session starts =======================================================================
platform darwin -- Python 3.11.5, pytest-7.4.4, pluggy-1.4.0
rootdir: /Users/schrockn/code/dagster-io/dagster
configfile: pyproject.toml
plugins: syrupy-4.6.1, cases-3.8.2, anyio-4.3.0, time-machine-2.13.0, dependency-0.5.1, typeguard-4.1.5, mock-3.3.1, requests-mock-1.11.0, cov-2.10.1, xdist-3.3.1, buildkite-test-collector-0.1.7, hypothesis-6.98.12, rerunfailures-10.0
collected 69 items

python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_asset_daemon.py ................................
```
  • Loading branch information
schrockn authored Mar 6, 2024
1 parent 7514ea3 commit 43592d6
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ def missed_cron_ticks(
self, context: AssetConditionEvaluationContext
) -> Sequence[datetime.datetime]:
"""Returns the cron ticks which have been missed since the previous cursor was generated."""
# if it's the first time evaluating this rule, then just count the latest tick as missed
if not context.previous_evaluation or not context.previous_evaluation_timestamp:
if not context.previous_evaluation_timestamp:
previous_dt = next(
reverse_cron_string_iterator(
end_timestamp=context.evaluation_time.timestamp(),
Expand All @@ -315,9 +314,11 @@ def missed_cron_ticks(
missed_ticks.append(dt)
return missed_ticks

def get_new_candidate_asset_partitions(
self, context: AssetConditionEvaluationContext, missed_ticks: Sequence[datetime.datetime]
def get_new_asset_partitions_to_request(
self, context: AssetConditionEvaluationContext
) -> AbstractSet[AssetKeyPartitionKey]:
missed_ticks = self.missed_cron_ticks(context)

if not missed_ticks:
return set()

Expand Down Expand Up @@ -381,20 +382,9 @@ def evaluate_for_asset(
) -> "AssetConditionResult":
from .asset_condition.asset_condition import AssetConditionResult

missed_ticks = self.missed_cron_ticks(context)
new_asset_partitions = self.get_new_candidate_asset_partitions(context, missed_ticks)

# if it's the first time evaluating this rule, must query for the actual subset that has
# been materialized since the previous cron tick, as materializations may have happened
# before the previous evaluation, which
# `context.materialized_requested_or_discarded_since_previous_tick_subset` would not capture
if context.previous_evaluation is None:
new_asset_partitions -= context.instance_queryer.get_asset_subset_updated_after_time(
asset_key=context.asset_key, after_time=missed_ticks[-1]
).asset_partitions

new_asset_partitions_to_request = self.get_new_asset_partitions_to_request(context)
asset_subset_to_request = AssetSubset.from_asset_partitions_set(
context.asset_key, context.partitions_def, new_asset_partitions
context.asset_key, context.partitions_def, new_asset_partitions_to_request
) | (
context.previous_true_subset.as_valid(context.partitions_def)
- context.materialized_requested_or_discarded_since_previous_tick_subset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,38 +71,6 @@ def get_cron_policy(
.evaluate_tick()
.assert_requested_runs(),
),
AssetDaemonScenario(
id="basic_hourly_cron_unpartitioned_rule_added_later",
initial_state=one_asset.with_asset_properties(
# this policy will never materialize the asset
auto_materialize_policy=AutoMaterializePolicy(
rules={AutoMaterializeRule.skip_on_parent_missing()}
)
),
execution_fn=lambda state: state.evaluate_tick()
.assert_requested_runs()
# rule added after the first tick, should capture that this asset was not materialized
# since the previous tick
.with_asset_properties(auto_materialize_policy=get_cron_policy(basic_hourly_cron_schedule))
.evaluate_tick()
.assert_requested_runs(run_request(["A"]))
.with_not_started_runs()
# back to the original policy which never materializes
.with_current_time_advanced(seconds=30)
.with_asset_properties(
auto_materialize_policy=AutoMaterializePolicy(
rules={AutoMaterializeRule.skip_on_parent_missing()}
)
)
.evaluate_tick()
.assert_requested_runs()
# now we add the policy back in, but it's already been materialized since the previous tick
# so it shouldn't execute again
.with_current_time_advanced(seconds=30)
.with_asset_properties(auto_materialize_policy=get_cron_policy(basic_hourly_cron_schedule))
.evaluate_tick()
.assert_requested_runs(),
),
AssetDaemonScenario(
id="basic_hourly_cron_unpartitioned_multi_asset",
initial_spec=three_assets_not_subsettable.with_asset_properties(
Expand Down

0 comments on commit 43592d6

Please sign in to comment.