Skip to content

Commit

Permalink
Implement dataflow expiration to limit temporal data retention (#29587)
Browse files Browse the repository at this point in the history
Introduces a new feature to limit data retention in temporal filters by
dropped retraction diffs beyond a configured expiration time.

Motivation and logic is explained in more details in the design
doc: doc/developer/design/20240919_dataflow_expiration.md.

Fixes MaterializeInc/database-issues#7757

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [x] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [x] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
Co-authored-by: Siddhartha Sahu <[email protected]>
Co-authored-by: Dennis Felsing <[email protected]>
  • Loading branch information
3 people authored Oct 10, 2024
1 parent 207b9c6 commit 05acf95
Show file tree
Hide file tree
Showing 36 changed files with 1,176 additions and 37 deletions.
133 changes: 133 additions & 0 deletions doc/developer/design/20240919_dataflow_expiration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Dataflow Expiration

- Associated issues/prs: [#26029](https://github.com/MaterializeInc/materialize/issues/26029) [#29587](https://github.com/MaterializeInc/materialize/pull/29587)

## The Problem

Temporal filters currently require Materialize to maintain all future retractions
of data that is currently visible. For long windows, the retractions could be at
timestamps beyond our next scheduled restart, which is typically our weekly
DB release.

For instance, in the example below, the temporal filter in the `last_30_days`
view causes two diffs to be generated for every row inserted into `events`, the
row itself and a retraction 30 days later. However, if the replica is
restarted in the next few days, the retraction diff is never processed, making
it redundant to keep the diff waiting to be processed.

```sql
-- Create a table of timestamped events.
CREATE TABLE events (
content TEXT,
event_ts TIMESTAMP
);
-- Create a view of events from the last 30 seconds.
CREATE VIEW last_30_days AS
SELECT event_ts, content
FROM events
WHERE mz_now() <= event_ts + INTERVAL '30 days';

INSERT INTO events VALUES ('hello', now());

COPY (SUBSCRIBE (SELECT event_ts, content FROM last_30_days)) TO STDOUT;
1727130590201 1 2023-09-23 22:29:50.201 hello -- now()
1729722590222 -1 2023-10-23 22:29:50.222 hello -- now() + 30 days
```

Dataflows with large temporal windows (e.g. a year) can generate a large number
of retractions that consume memory and CPU but are never used. Instead, all
such retractions can be dropped.

## Success Criteria

When temporal filters are in use, retraction diffs associated with timestamps
beyond a set expiration time can be dropped without affecting correctness,
resulting in lower memory and CPU utilization from halving the number of
processed diffs.

## Solution Proposal

A new LaunchDarkly feature flag is introduced that specifies an _expiration
offset_ (a `Duration`). The _replica expiration_ time is computed as the offset
added to the start time of the replica. Dataflows matching certain
criteria (detailed below) are then configured with a _dataflow expiration_
derived from the replica expiration. Diffs generated in these dataflows beyond
the dataflow expiration are dropped. To ensure correctness, panic checks are
added to these dataflows that ensure that the frontier does not exceed the
dataflow expiration before the replica is restarted.

An overview of the logic used for these features is as follows:
```
# Consider the `upper` for different dataflows
if mv_view_with_constant_values:
upper := []
else if mv_with_refresh_every:
upper := [next_refresh()]
else:
upper := [write_frontier()]
# The `upper` for a dataflow considering all its transitive inputs
inputs_upper := meet(for all inputs i: i_upper)
# Dataflow expiration logic
if compute_replica_expiration_offset is not set:
dataflow_replication := []
else for dataflows of type in [materialized view, index, subscribe]:
replica_expiration := replica_start + compute_replica_expiration_offset
if dataflow_timeline is not EpochMilliseconds:
# Dataflows that do not depend on any source or table are not in the
# EpochMilliseconds timeline
dataflow_expiration := []
else if refresh_interval set in any transitive dependency of dataflow:
dataflow_expiration := []
else if inputs_upper == []:
dataflow_expiration := []
else if inputs_upper > expiration:
dataflow_expiration := inputs_upper
else:
dataflow_expiration := replica_expiration
dataflow_until := dataflow_until.meet(dataflow_expiration)
```

Note that we only consider dataflows representing materialized views, indexes,
and subscribes. These are long-running dataflows that maintain state during
their lifetime. Other dataflows such as peeks are transient and do not need to
explicitly drop retraction diffs.

More concretely, we make the following changes:

* Introduce a new dyncfg `compute_replica_expiration_offset`.
* If the offset is configured with a non-zero value, compute
`replica_expiration = now() + offset`. This value specifies the maximum
time for which the replica is expected to be running. Consequently, diffs
associated with timestamps beyond this limit do not have to be stored and can
be dropped.
* When building a dataflow, compute `dataflow_expiration` as per the logic
described above. If non-empty, the `dataflow_expiration` is added to the
dataflow `until` that ensures that any diff beyond this limit is dropped in
`mfp.evaluate()`.
* To ensure correctness, we attach checks in `Context::export_index` and
`Context::export_sink` that panic if the dataflow frontier exceeds the
configured `dataflow_expiration`. This is to prevent the dataflow from
serving potentially incorrect results due to dropped data.
* On a replica restart, `replica_expiration` and `dataflow_expiration` is
recomputed as the offset to the new start time. Any data whose timestamps
are within the new limit are now not dropped.

## Open Questions

- What is the appropriate default expiration time?
- Given that we currently restart replicas every week as part of the DB release
and leaving some buffer for skipped week, 3 weeks (+1 day margin) seems like
a good limit to start with.

## Out of Scope

Dataflow expiration is disabled for the following cases:

- Dataflows whose timeline type is not `Timeline::EpochMillis`. We rely on the
frontier timestamp being comparable to wall clock time of the replica.
- Dataflows that transitively depend on a materialized view with a non-default
refresh schedule. Handling such materialized views would require additional
logic to track the refresh schedules and ensure that the dataflow expiration
1 change: 1 addition & 0 deletions misc/python/materialize/cli/ci_annotate_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
| restart-materialized-1\ *|\ thread\ 'coordinator'\ panicked\ at\ 'external\ operation\ .*\ failed\ unrecoverably.*
# Expected in cluster test
| cluster-clusterd[12]-1\ .*\ halting\ process:\ new\ timely\ configuration\ does\ not\ match\ existing\ timely\ configuration
| cluster-clusterd1-1\ .*\ has\ exceeded\ expiration
# Emitted by tests employing explicit mz_panic()
| forced\ panic
# Emitted by broken_statements.slt in order to stop panic propagation, as 'forced panic' will unwantedly panic the `environmentd` thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

# Consider increasing the scenario's class #version() if changes are expected to impact results!
SHA256_BY_SCENARIO_FILE: dict[str, str] = {
"benchmark_main.py": "9419ffb7f17de4584fc35e2f531c3e6181ad2b0284bf6aeb2f15ebb966a5e007",
"benchmark_main.py": "0e328994c56b9bae2a9da0db10974b4fa30dd8c825e3290e00f5cecbb32ec338",
"concurrency.py": "2e9c149c136b83b3853abc923a1adbdaf55a998ab4557712f8424c8b16f2adb1",
"customer.py": "d1e72837a342c3ebf1f4a32ec583b1b78a78644cdba495030a6df45ebbffe703",
"optbench.py": "ae411afe1ba595021f2f9d2d21500ba0c1c6941561493eabcae113373f493bfa",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2182,3 +2182,48 @@ def benchmark(self) -> MeasurementSource:
"""
)
)


class ReplicaExpiration(Scenario):
def init(self) -> list[Action]:
return [
TdAction(
"""
> CREATE TABLE events (
content TEXT,
event_ts TIMESTAMP
);
> CREATE VIEW last_30_days AS
SELECT event_ts, content
FROM events
WHERE mz_now() <= event_ts + INTERVAL '30 days';
> CREATE DEFAULT INDEX ON last_30_days
"""
),
]

def benchmark(self) -> MeasurementSource:
return Td(
dedent(
f"""
> DELETE FROM events;
> SELECT COUNT(*) FROM last_30_days
0
> SELECT 1;
/* A */
1
> INSERT INTO events SELECT concat('somelongstringthatdoesntmattermuchatallbutrequiresmemorytostoreXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', x::text), now() FROM generate_series(1, {self.n()}) AS x
> SELECT COUNT(*) FROM last_30_days
{self.n()}
> SELECT 1;
/* B */
1
"""
)
)
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def get_default_system_parameters(
"cluster_always_use_disk": "true",
"compute_dataflow_max_inflight_bytes": "134217728", # 128 MiB
"compute_hydration_concurrency": "2",
"compute_replica_expiration_offset": "3d",
"disk_cluster_replicas_default": "true",
"enable_0dt_deployment": "true" if zero_downtime else "false",
"enable_0dt_deployment_panic_after_timeout": "true",
Expand Down
2 changes: 2 additions & 0 deletions misc/python/materialize/mzcompose/services/clusterd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
environment_extra: list[str] = [],
memory: str | None = None,
options: list[str] = [],
restart: str = "no",
) -> None:
environment = [
"CLUSTERD_LOG_FILTER",
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(
"ports": [2100, 2101, 6878],
"environment": environment,
"volumes": DEFAULT_MZ_VOLUMES,
"restart": restart,
}
)

Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub(crate) mod consistency;
mod migrate;

mod apply;
mod dataflow_expiration;
mod open;
mod state;
mod transact;
Expand Down
28 changes: 28 additions & 0 deletions src/adapter/src/catalog/dataflow_expiration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.

//! Helper function for dataflow expiration checks.
use mz_repr::GlobalId;

use crate::catalog::Catalog;

impl Catalog {
/// Whether the catalog entry `id` or any of its transitive dependencies is a materialized view
/// with a refresh schedule. Used to disable dataflow expiration if found.
pub(crate) fn item_has_transitive_refresh_schedule(&self, id: GlobalId) -> bool {
let test_has_transitive_refresh_schedule = |dep: GlobalId| -> bool {
if let Some(mv) = self.get_entry(&dep).materialized_view() {
return mv.refresh_schedule.is_some();
}
false
};
test_has_transitive_refresh_schedule(id)
|| self
.state()
.transitive_uses(id)
.any(test_has_transitive_refresh_schedule)
}
}
24 changes: 22 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,10 @@ pub struct CreateMaterializedViewOptimize {
/// An optional context set iff the state machine is initiated from
/// sequencing an EXPLAIN for this statement.
explain_ctx: ExplainContext,
/// Whether the timeline is [`mz_storage_types::sources::Timeline::EpochMilliseconds`].
///
/// Used to determine if it is safe to enable dataflow expiration.
is_timeline_epoch_ms: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -2572,8 +2576,15 @@ impl Coordinator {
.catalog()
.resolve_full_name(entry.name(), None)
.to_string();
let is_timeline_epoch_ms = self.get_timeline_context(*id).is_timeline_epoch_ms();
let (_optimized_plan, physical_plan, _metainfo) = self
.optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
.optimize_create_continual_task(
&ct,
*id,
self.owned_catalog(),
debug_name,
is_timeline_epoch_ms,
)
.expect("builtin CT should optimize successfully");

// Determine an as of for the new continual task.
Expand Down Expand Up @@ -2616,6 +2627,7 @@ impl Coordinator {

for entry in ordered_catalog_entries {
let id = entry.id();
let is_timeline_epoch_ms = self.get_timeline_context(id).is_timeline_epoch_ms();
match entry.item() {
CatalogItem::Index(idx) => {
// Collect optimizer parameters.
Expand Down Expand Up @@ -2646,6 +2658,7 @@ impl Coordinator {
entry.name().clone(),
idx.on,
idx.keys.to_vec(),
is_timeline_epoch_ms,
);
let global_mir_plan = optimizer.optimize(index_plan)?;
let optimized_plan = global_mir_plan.df_desc().clone();
Expand Down Expand Up @@ -2700,6 +2713,7 @@ impl Coordinator {
debug_name,
optimizer_config.clone(),
self.optimizer_metrics(),
is_timeline_epoch_ms,
);

// MIR ⇒ MIR optimization (global)
Expand Down Expand Up @@ -2743,7 +2757,13 @@ impl Coordinator {
.resolve_full_name(entry.name(), None)
.to_string();
let (optimized_plan, physical_plan, metainfo) = self
.optimize_create_continual_task(ct, id, self.owned_catalog(), debug_name)?;
.optimize_create_continual_task(
ct,
id,
self.owned_catalog(),
debug_name,
is_timeline_epoch_ms,
)?;

let catalog = self.catalog_mut();
catalog.set_optimized_plan(id, optimized_plan);
Expand Down
6 changes: 5 additions & 1 deletion src/adapter/src/coord/introspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,11 @@ impl Coordinator {
let read_holds = self.acquire_read_holds(&id_bundle);
let as_of = read_holds.least_valid_read();

let global_mir_plan = global_mir_plan.resolve(as_of);
// Introspection subscribes only read from system collections, which are always in
// the `EpochMilliseconds` timeline.
let is_timeline_epoch_ms = true;

let global_mir_plan = global_mir_plan.resolve(as_of, is_timeline_epoch_ms);

let span = Span::current();
Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ impl Coordinator {
},
};

let is_timeline_epoch_ms = self
.validate_timeline_context(resolved_ids.0.clone())?
.is_timeline_epoch_ms();

// Construct the CatalogItem for this CT and optimize it.
let mut item = crate::continual_task::ct_item_from_plan(plan, sink_id, resolved_ids)?;
let full_name = bootstrap_catalog.resolve_full_name(&name, Some(session.conn_id()));
Expand All @@ -90,6 +94,7 @@ impl Coordinator {
sink_id,
Arc::new(bootstrap_catalog),
full_name.to_string(),
is_timeline_epoch_ms,
)?;

// Timestamp selection
Expand Down Expand Up @@ -156,6 +161,7 @@ impl Coordinator {
output_id: GlobalId,
catalog: Arc<dyn OptimizerCatalog>,
debug_name: String,
is_timeline_epoch_ms: bool,
) -> Result<
(
DataflowDescription<OptimizedMirRelationExpr>,
Expand Down Expand Up @@ -185,6 +191,7 @@ impl Coordinator {
debug_name,
optimizer_config,
self.optimizer_metrics(),
is_timeline_epoch_ms,
);

// HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
Expand Down
Loading

0 comments on commit 05acf95

Please sign in to comment.