Skip to content

Commit

Permalink
adapter: Implement expression cache
Browse files Browse the repository at this point in the history
This commit adds a module for an expression cache. It adds the
functionality to durably cache optimized expressions. It uses the
durable_cache module in its implementation.

The expression cache is run on its own task, so that callers can insert
new entries without blocking on the insert completing.

The adapter doesn't use the expression cache yet, that is saved for a
later commit.

Works towards resolving #MaterializeInc/database-issues/issues/8384
  • Loading branch information
jkosh44 committed Oct 24, 2024
1 parent 0672b36 commit 36cfe91
Show file tree
Hide file tree
Showing 15 changed files with 705 additions and 29 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 6 additions & 16 deletions doc/developer/design/20241008_expression_cache.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,7 @@ struct Expressions {
physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
notices: SmallVec<[Arc<OptimizerNotice>; 4]>,
optimizer_feature_overrides: OptimizerFeatures,
}

struct NewEntry {
/// `GlobalId` of the new expression.
id: GlobalId,
/// New `Expressions` to cache.
expressions: Expressions,
/// `GlobalId`s to invalidate as a result of the new entry.
invalidate_ids: BTreeSet<GlobalId>,
optimizer_feature: OptimizerFeatures,
}

struct ExpressionCache {
Expand All @@ -100,13 +91,12 @@ impl ExpressionCache {
/// Returns all cached expressions in the current deploy generation, after reconciliation.
fn open(&mut self, current_ids: &BTreeSet<GlobalId>, optimizer_features: &OptimizerFeatures, remove_prior_gens: bool) -> Vec<(GlobalId, Expressions)>;

/// Durably inserts `expressions` into current deploy generation. This may also invalidate
/// entries giving by `expressions`.
///
/// Returns a [`Future`] that completes once the changes have been made durable.
/// Durably removes all entries given by `invalidate_ids` and inserts `new_entries` into
/// current deploy generation.
///
/// Panics if any `GlobalId` already exists in the cache.
fn insert_expressions(&mut self, expressions: Vec<NewEntry>) -> impl Future<Output=()>;
/// If there is a duplicate ID in both `invalidate_ids` and `new_entries`, then the final value
/// will be taken from `new_entries`.
fn insert_expressions(&mut self, new_entries: Vec<(GlobalId, Expressions)>, invalidate_ids: BTreeSet<GlobalId>);

/// Durably remove and return all entries in current deploy generation that depend on an ID in
/// `dropped_ids` .
Expand Down
6 changes: 6 additions & 0 deletions src/adapter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ rust_library(
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/dyncfg:mz_dyncfg",
"//src/expr:mz_expr",
"//src/kafka-util:mz_kafka_util",
Expand Down Expand Up @@ -104,6 +105,7 @@ rust_test(
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/dyncfg:mz_dyncfg",
"//src/expr:mz_expr",
"//src/kafka-util:mz_kafka_util",
Expand Down Expand Up @@ -153,6 +155,7 @@ rust_doc_test(
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/dyncfg:mz_dyncfg",
"//src/expr:mz_expr",
"//src/kafka-util:mz_kafka_util",
Expand Down Expand Up @@ -222,6 +225,7 @@ rust_test(
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/dyncfg:mz_dyncfg",
"//src/expr:mz_expr",
"//src/kafka-util:mz_kafka_util",
Expand Down Expand Up @@ -291,6 +295,7 @@ rust_test(
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/dyncfg:mz_dyncfg",
"//src/expr:mz_expr",
"//src/kafka-util:mz_kafka_util",
Expand Down Expand Up @@ -360,6 +365,7 @@ rust_test(
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/dyncfg:mz_dyncfg",
"//src/expr:mz_expr",
"//src/kafka-util:mz_kafka_util",
Expand Down
1 change: 1 addition & 0 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mz-compute-client = { path = "../compute-client" }
mz-compute-types = { path = "../compute-types" }
mz-controller = { path = "../controller" }
mz-controller-types = { path = "../controller-types" }
mz-durable-cache = { path = "../durable-cache" }
mz-dyncfg = { path = "../dyncfg" }
mz-expr = { path = "../expr" }
mz-kafka-util = { path = "../kafka-util" }
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ mod migrate;

mod apply;
mod dataflow_expiration;
mod expr_cache;
mod open;
mod state;
mod transact;
Expand Down
Loading

0 comments on commit 36cfe91

Please sign in to comment.