From 36cfe91f1d02cc5e2cd1b0ec02fe574bbe5ac800 Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Mon, 21 Oct 2024 10:27:27 -0400 Subject: [PATCH] adapter: Implement expression cache This commit adds a module for an expression cache. It adds the functionality to durably cache optimized expressions. It uses the durable_cache module in its implementation. The expression cache is run on its own task, so that callers can insert new entries without blocking on the insert completing. The adapter doesn't use the expression cache yet, that is saved for a later commit. Works towards resolving #MaterializeInc/database-issues/issues/8384 --- Cargo.lock | 4 + .../design/20241008_expression_cache.md | 22 +- src/adapter/BUILD.bazel | 6 + src/adapter/Cargo.toml | 1 + src/adapter/src/catalog.rs | 1 + src/adapter/src/catalog/expr_cache.rs | 590 ++++++++++++++++++ src/catalog/src/durable.rs | 2 +- src/catalog/src/durable/persist.rs | 8 + src/durable-cache/src/lib.rs | 27 +- src/ore/src/serde.rs | 48 ++ src/repr/src/global_id.rs | 3 + src/repr/src/optimize.rs | 3 +- src/transform/Cargo.toml | 3 + src/transform/src/dataflow.rs | 6 +- src/transform/src/notice.rs | 10 +- 15 files changed, 705 insertions(+), 29 deletions(-) create mode 100644 src/adapter/src/catalog/expr_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 5b56f836f3b4d..e1c591917a5eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4157,6 +4157,7 @@ dependencies = [ "mz-compute-types", "mz-controller", "mz-controller-types", + "mz-durable-cache", "mz-dyncfg", "mz-expr", "mz-kafka-util", @@ -6977,6 +6978,9 @@ dependencies = [ "ordered-float 4.2.0", "paste", "proc-macro2", + "proptest", + "proptest-derive", + "serde", "serde_json", "tracing", "workspace-hack", diff --git a/doc/developer/design/20241008_expression_cache.md b/doc/developer/design/20241008_expression_cache.md index b91b0d97b53db..ddafadb32e1f4 100644 --- a/doc/developer/design/20241008_expression_cache.md +++ b/doc/developer/design/20241008_expression_cache.md @@ -70,16 +70,7 @@ struct Expressions { physical_plan: DataflowDescription, dataflow_metainfos: DataflowMetainfo>, notices: SmallVec<[Arc; 4]>, - optimizer_feature_overrides: OptimizerFeatures, -} - -struct NewEntry { - /// `GlobalId` of the new expression. - id: GlobalId, - /// New `Expressions` to cache. - expressions: Expressions, - /// `GlobalId`s to invalidate as a result of the new entry. - invalidate_ids: BTreeSet, + optimizer_feature: OptimizerFeatures, } struct ExpressionCache { @@ -100,13 +91,12 @@ impl ExpressionCache { /// Returns all cached expressions in the current deploy generation, after reconciliation. fn open(&mut self, current_ids: &BTreeSet, optimizer_features: &OptimizerFeatures, remove_prior_gens: bool) -> Vec<(GlobalId, Expressions)>; - /// Durably inserts `expressions` into current deploy generation. This may also invalidate - /// entries giving by `expressions`. - /// - /// Returns a [`Future`] that completes once the changes have been made durable. + /// Durably removes all entries given by `invalidate_ids` and inserts `new_entries` into + /// current deploy generation. /// - /// Panics if any `GlobalId` already exists in the cache. - fn insert_expressions(&mut self, expressions: Vec) -> impl Future; + /// If there is a duplicate ID in both `invalidate_ids` and `new_entries`, then the final value + /// will be taken from `new_entries`. + fn insert_expressions(&mut self, new_entries: Vec<(GlobalId, Expressions)>, invalidate_ids: BTreeSet); /// Durably remove and return all entries in current deploy generation that depend on an ID in /// `dropped_ids` . diff --git a/src/adapter/BUILD.bazel b/src/adapter/BUILD.bazel index b012dcff3c67e..8429ea7b2c0a6 100644 --- a/src/adapter/BUILD.bazel +++ b/src/adapter/BUILD.bazel @@ -40,6 +40,7 @@ rust_library( "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/dyncfg:mz_dyncfg", "//src/expr:mz_expr", "//src/kafka-util:mz_kafka_util", @@ -104,6 +105,7 @@ rust_test( "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/dyncfg:mz_dyncfg", "//src/expr:mz_expr", "//src/kafka-util:mz_kafka_util", @@ -153,6 +155,7 @@ rust_doc_test( "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/dyncfg:mz_dyncfg", "//src/expr:mz_expr", "//src/kafka-util:mz_kafka_util", @@ -222,6 +225,7 @@ rust_test( "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/dyncfg:mz_dyncfg", "//src/expr:mz_expr", "//src/kafka-util:mz_kafka_util", @@ -291,6 +295,7 @@ rust_test( "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/dyncfg:mz_dyncfg", "//src/expr:mz_expr", "//src/kafka-util:mz_kafka_util", @@ -360,6 +365,7 @@ rust_test( "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/dyncfg:mz_dyncfg", "//src/expr:mz_expr", "//src/kafka-util:mz_kafka_util", diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml index 6e7e153268cf5..9d6511dcb7a46 100644 --- a/src/adapter/Cargo.toml +++ b/src/adapter/Cargo.toml @@ -43,6 +43,7 @@ mz-compute-client = { path = "../compute-client" } mz-compute-types = { path = "../compute-types" } mz-controller = { path = "../controller" } mz-controller-types = { path = "../controller-types" } +mz-durable-cache = { path = "../durable-cache" } mz-dyncfg = { path = "../dyncfg" } mz-expr = { path = "../expr" } mz-kafka-util = { path = "../kafka-util" } diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 08f8e133fe5ed..f67f7b0a17759 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -96,6 +96,7 @@ mod migrate; mod apply; mod dataflow_expiration; +mod expr_cache; mod open; mod state; mod transact; diff --git a/src/adapter/src/catalog/expr_cache.rs b/src/adapter/src/catalog/expr_cache.rs new file mode 100644 index 0000000000000..bebaa595749a5 --- /dev/null +++ b/src/adapter/src/catalog/expr_cache.rs @@ -0,0 +1,590 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! A cache for optimized expressions. + +use std::borrow::Cow; +use std::collections::{BTreeMap, BTreeSet}; +use std::future::Future; +use std::sync::Arc; + +use mz_catalog::durable::expression_cache_shard_id; +use mz_compute_types::dataflows::{DataflowDescription, DataflowExpirationDesc}; +use mz_durable_cache::{DurableCache, DurableCacheCodec}; +use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr}; +use mz_ore::channel::trigger; +use mz_ore::task::spawn; +use mz_persist_client::PersistClient; +use mz_persist_types::codec_impls::UnitSchema; +use mz_persist_types::Codec; +use mz_repr::adt::jsonb::Jsonb; +use mz_repr::optimize::OptimizerFeatures; +use mz_repr::{GlobalId, RelationDesc, RelationType, ScalarType}; +use mz_storage_types::sources::SourceData; +use mz_transform::dataflow::DataflowMetainfo; +use mz_transform::notice::OptimizerNotice; +use proptest::arbitrary::{any, Arbitrary}; +use proptest::prelude::BoxedStrategy; +use proptest::strategy::Strategy; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; +use timely::progress::Antichain; +use timely::Container; +use tokio::sync::mpsc; +use tracing::debug; +use uuid::Uuid; + +/// The data that is cached per catalog object. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct Expressions { + local_mir: OptimizedMirRelationExpr, + global_mir: DataflowDescription, + physical_plan: DataflowDescription, + dataflow_metainfos: DataflowMetainfo>, + notices: SmallVec<[Arc; 4]>, + optimizer_features: OptimizerFeatures, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)] +struct CacheKey { + deploy_generation: u64, + id: GlobalId, +} + +#[derive(Debug, Serialize, Deserialize)] +struct CodecKey<'a> { + cache_key: Cow<'a, CacheKey>, + expressions: Cow<'a, Expressions>, +} + +#[derive(Debug)] +struct ExpressionCodec; + +impl DurableCacheCodec for ExpressionCodec { + type Key = CacheKey; + type Val = Expressions; + type KeyCodec = SourceData; + type ValCodec = (); + + fn schemas() -> ( + ::Schema, + ::Schema, + ) { + ( + RelationDesc::builder() + .with_column("data", ScalarType::Jsonb.nullable(false)) + .finish(), + UnitSchema::default(), + ) + } + + fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) { + // codec -> serde X-> jsonb -> row -> source data -> _ -> source data -> row -> jsonb X-> serde -> codec + let codec_key = CodecKey { + cache_key: Cow::Borrowed(key), + expressions: Cow::Borrowed(val), + }; + let serde_value = serde_json::to_value(codec_key).expect("valid json"); + let jsonb = + Jsonb::from_serde_json(serde_value).expect("contained integers should fit in f64"); + let row = jsonb.into_row(); + let source_data = SourceData(Ok(row)); + (source_data, ()) + } + + fn decode(key: Self::KeyCodec, _val: Self::ValCodec) -> (Self::Key, Self::Val) { + let row = key.0.expect("only Ok values stored in expression cache"); + let jsonb = Jsonb::from_row(row); + let serde_value = jsonb.as_ref().to_serde_json(); + let codec_key: CodecKey = + serde_json::from_value(serde_value).expect("jsonb should roundtrip"); + ( + codec_key.cache_key.into_owned(), + codec_key.expressions.into_owned(), + ) + } +} + +/// Configuration needed to initialize an [`ExpressionCache`]. +#[derive(Debug, Clone)] +struct ExpressionCacheConfig<'a> { + deploy_generation: u64, + persist: &'a PersistClient, + organization_id: Uuid, + current_ids: &'a BTreeSet, + optimizer_features: &'a OptimizerFeatures, + remove_prior_gens: bool, +} + +/// A durable cache of optimized expressions. +struct ExpressionCache { + deploy_generation: u64, + durable_cache: DurableCache, +} + +impl ExpressionCache { + /// Creates a new [`ExpressionCache`] and reconciles all entries in current deploy generation + /// with the current objects, `current_ids`, and current optimizer features, + /// `optimizer_features`. + /// + /// If `remove_prior_gens` is `true`, all previous generations are durably removed from the + /// cache. + /// + /// Returns all cached expressions in the current deploy generation, after reconciliation. + async fn open( + ExpressionCacheConfig { + deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }: ExpressionCacheConfig<'_>, + ) -> (Self, BTreeMap) { + let shard_id = expression_cache_shard_id(organization_id); + let durable_cache = DurableCache::new(persist, shard_id, "expression cache").await; + let mut cache = Self { + deploy_generation, + durable_cache, + }; + + while let Err(err) = cache + .try_open(current_ids, optimizer_features, remove_prior_gens) + .await + { + debug!("failed to open cache: {err} ... retrying"); + } + + let entries = cache + .durable_cache + .entries_local() + .filter(|(key, _)| key.deploy_generation == cache.deploy_generation) + .map(|(key, expressions)| (key.id.clone(), expressions.clone())) + .collect(); + + (cache, entries) + } + + async fn try_open( + &mut self, + current_ids: &BTreeSet, + optimizer_features: &OptimizerFeatures, + remove_prior_gens: bool, + ) -> Result<(), mz_durable_cache::Error> { + let mut keys_to_remove = Vec::new(); + + for (key, expressions) in self.durable_cache.entries_local() { + if key.deploy_generation == self.deploy_generation { + // Remove dropped IDs. + if !current_ids.contains(&key.id) { + keys_to_remove.push((key.clone(), None)); + } + + // Remove expressions that were cached with different features. + if expressions.optimizer_features != *optimizer_features { + keys_to_remove.push((key.clone(), None)); + } + } else if remove_prior_gens { + // Remove expressions from previous generations. + keys_to_remove.push((key.clone(), None)); + } + } + + let keys_to_remove: Vec<_> = keys_to_remove + .iter() + .map(|(key, expressions)| (key, expressions.as_ref())) + .collect(); + self.durable_cache.try_set_many(&keys_to_remove).await + } + + /// Durably removes all entries given by `invalidate_ids` and inserts `new_entries` into + /// current deploy generation. + /// + /// If there is a duplicate ID in both `invalidate_ids` and `new_entries`, then the final value + /// will be taken from `new_entries`. + async fn insert_expressions( + &mut self, + new_entries: Vec<(GlobalId, Expressions)>, + invalidate_ids: BTreeSet, + ) { + let mut entries = BTreeMap::new(); + // Important to do `invalidate_ids` first, so that `new_entries` overwrites duplicate keys. + for id in invalidate_ids { + entries.insert( + CacheKey { + id, + deploy_generation: self.deploy_generation, + }, + None, + ); + } + for (id, expressions) in new_entries { + entries.insert( + CacheKey { + id, + deploy_generation: self.deploy_generation, + }, + Some(expressions), + ); + } + let entries: Vec<_> = entries + .iter() + .map(|(key, expressions)| (key, expressions.as_ref())) + .collect(); + self.durable_cache.set_many(&entries).await + } +} + +/// Operations to perform on the cache. +enum CacheOperation { + /// See [`ExpressionCache::insert_expressions`]. + Insert { + new_entries: Vec<(GlobalId, Expressions)>, + invalidate_ids: BTreeSet, + trigger: trigger::Trigger, + }, +} + +struct ExpressionCacheHandle { + tx: mpsc::UnboundedSender, +} + +impl ExpressionCacheHandle { + /// Spawns a task responsible for managing the expression cache. + /// + /// Returns a handle to interact with the cache and the initial contents of the cache. + async fn spawn_expression_cache( + config: ExpressionCacheConfig<'_>, + ) -> (Self, BTreeMap) { + let (mut cache, entries) = ExpressionCache::open(config).await; + let (tx, mut rx) = mpsc::unbounded_channel(); + spawn(|| "expression-cache-task", async move { + loop { + while let Some(op) = rx.recv().await { + match op { + CacheOperation::Insert { + new_entries, + invalidate_ids, + trigger: _trigger, + } => cache.insert_expressions(new_entries, invalidate_ids).await, + } + } + } + }); + + (Self { tx }, entries) + } + + fn insert_expressions( + &self, + new_entries: Vec<(GlobalId, Expressions)>, + invalidate_ids: BTreeSet, + ) -> impl Future { + let (trigger, trigger_rx) = trigger::channel(); + let op = CacheOperation::Insert { + new_entries, + invalidate_ids, + trigger, + }; + // If the send fails, then we must be shutting down. + let _ = self.tx.send(op); + trigger_rx + } +} + +impl Arbitrary for Expressions { + type Parameters = (); + fn arbitrary_with((): Self::Parameters) -> Self::Strategy { + // It would be better to implement `Arbitrary for these types, but that would be extremely + // painful, so we just manually construct very simple instances. + let local_mir = OptimizedMirRelationExpr(MirRelationExpr::Constant { + rows: Ok(Vec::new()), + typ: RelationType::empty(), + }); + let global_mir = DataflowDescription::new("gmir".to_string()); + let physical_plan = DataflowDescription { + source_imports: Default::default(), + index_imports: Default::default(), + objects_to_build: Vec::new(), + index_exports: Default::default(), + sink_exports: Default::default(), + as_of: Default::default(), + until: Antichain::new(), + initial_storage_as_of: None, + refresh_schedule: None, + debug_name: "pp".to_string(), + dataflow_expiration_desc: DataflowExpirationDesc::default(), + }; + + let dataflow_metainfos = any::>>(); + let notices = any::<[Arc; 4]>(); + let optimizer_feature = any::(); + + (dataflow_metainfos, notices, optimizer_feature) + .prop_map(move |(dataflow_metainfos, notices, optimizer_feature)| { + let local_mir = local_mir.clone(); + let global_mir = global_mir.clone(); + let physical_plan = physical_plan.clone(); + let notices = SmallVec::from_const(notices); + Expressions { + local_mir, + global_mir, + physical_plan, + dataflow_metainfos, + notices, + optimizer_features: optimizer_feature, + } + }) + .boxed() + } + + type Strategy = BoxedStrategy; +} + +#[cfg(test)] +mod tests { + use std::collections::{BTreeMap, BTreeSet}; + + use mz_durable_cache::DurableCacheCodec; + use mz_persist_client::PersistClient; + use mz_repr::optimize::OptimizerFeatures; + use mz_repr::GlobalId; + use proptest::arbitrary::any; + use proptest::arbitrary::Arbitrary; + use proptest::prelude::ProptestConfig; + use proptest::proptest; + use proptest::strategy::Strategy; + use proptest::test_runner::TestRunner; + use uuid::Uuid; + + use crate::catalog::expr_cache::{ + CacheKey, ExpressionCacheConfig, ExpressionCacheHandle, ExpressionCodec, Expressions, + }; + + fn generate_expressions() -> Expressions { + Expressions::arbitrary() + .new_tree(&mut TestRunner::default()) + .unwrap() + .current() + } + + #[mz_ore::test(tokio::test)] + async fn expression_cache() { + let first_deploy_generation = 0; + let second_deploy_generation = 1; + let persist = &PersistClient::new_for_tests().await; + let organization_id = Uuid::new_v4(); + + let current_ids = &mut BTreeSet::new(); + let optimizer_features = &mut OptimizerFeatures::default(); + let mut remove_prior_gens = false; + + let mut next_id = 0; + + let mut exps = { + // Open a new empty cache. + let (cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!(entries, BTreeMap::new(), "new cache should be empty"); + + // Insert some expressions into the cache. + let mut exps = BTreeMap::new(); + for _ in 0..5 { + let mut exp = (GlobalId::User(next_id), generate_expressions()); + next_id += 1; + exp.1.optimizer_features = optimizer_features.clone(); + cache + .insert_expressions(vec![exp.clone()], BTreeSet::new()) + .await; + current_ids.insert(exp.0); + exps.insert(exp.0, exp.1); + } + exps + }; + + { + // Re-open the cache. + let (cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, exps, + "re-opening the cache should recover the expressions" + ); + + // Insert an expression with non-matching optimizer features. + let mut exp = (GlobalId::User(next_id), generate_expressions()); + next_id += 1; + let mut optimizer_features = optimizer_features.clone(); + optimizer_features.enable_eager_delta_joins = + !optimizer_features.enable_eager_delta_joins; + exp.1.optimizer_features = optimizer_features.clone(); + cache + .insert_expressions(vec![exp.clone()], BTreeSet::new()) + .await; + current_ids.insert(exp.0); + } + + { + // Re-open the cache. + let (_cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, exps, + "expression with non-matching optimizer features should be removed during reconciliation" + ); + } + + { + // Simulate dropping an object. + let id_to_remove = exps.keys().next().unwrap().clone(); + current_ids.remove(&id_to_remove); + let _removed_exp = exps.remove(&id_to_remove); + + // Re-open the cache. + let (_cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, exps, + "dropped objects should be removed during reconciliation" + ); + } + + let new_gen_exps = { + // Open the cache at a new generation. + let (cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: second_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!(entries, BTreeMap::new(), "new generation should be empty"); + + // Insert some expressions at the new generation. + let mut exps = BTreeMap::new(); + for _ in 0..5 { + let mut exp = (GlobalId::User(next_id), generate_expressions()); + next_id += 1; + exp.1.optimizer_features = optimizer_features.clone(); + cache + .insert_expressions(vec![exp.clone()], BTreeSet::new()) + .await; + current_ids.insert(exp.0); + exps.insert(exp.0, exp.1); + } + exps + }; + + { + // Re-open the cache at the first generation. + let (_cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, exps, + "Previous generation expressions should still exist" + ); + } + + { + // Open the cache at a new generation and clear previous generations. + remove_prior_gens = true; + let (_cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: second_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, new_gen_exps, + "new generation expressions should be persisted" + ); + } + + { + // Re-open the cache at the first generation. + let (_cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, + BTreeMap::new(), + "Previous generation expressions should be cleared" + ); + } + } + + proptest! { + #![proptest_config(ProptestConfig::with_cases(32))] + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] + fn expr_cache_roundtrip((key, val) in any::<(CacheKey, Expressions)>()) { + let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &val); + let (decoded_key, decoded_val) = ExpressionCodec::decode(encoded_key, encoded_val); + + assert_eq!(key, decoded_key); + assert_eq!(val, decoded_val); + } + } +} diff --git a/src/catalog/src/durable.rs b/src/catalog/src/durable.rs index f3b8eb239b758..8f16a1f9624fe 100644 --- a/src/catalog/src/durable.rs +++ b/src/catalog/src/durable.rs @@ -36,7 +36,7 @@ pub use crate::durable::objects::{ Role, Schema, SourceReference, SourceReferences, StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping, UnfinalizedShard, }; -pub use crate::durable::persist::builtin_migration_shard_id; +pub use crate::durable::persist::{builtin_migration_shard_id, expression_cache_shard_id}; use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState}; pub use crate::durable::transaction::Transaction; use crate::durable::transaction::TransactionBatch; diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs index 2c53eb2e055ae..666355ce419a1 100644 --- a/src/catalog/src/durable/persist.rs +++ b/src/catalog/src/durable/persist.rs @@ -117,6 +117,8 @@ const CATALOG_SEED: usize = 1; const UPGRADE_SEED: usize = 2; /// Seed used to generate the persist shard ID for builtin table migrations. const BUILTIN_MIGRATION_SEED: usize = 3; +/// Seed used to generate the persist shard ID for the expression cache. +const EXPRESSION_CACHE_SEED: usize = 4; /// Durable catalog mode that dictates the effect of mutable operations. #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -1701,6 +1703,12 @@ pub fn builtin_migration_shard_id(organization_id: Uuid) -> ShardId { shard_id(organization_id, BUILTIN_MIGRATION_SEED) } +/// Deterministically generate an expression cache shard ID for the given +/// `organization_id`. +pub fn expression_cache_shard_id(organization_id: Uuid) -> ShardId { + shard_id(organization_id, EXPRESSION_CACHE_SEED) +} + /// Deterministically generate a shard ID for the given `organization_id` and `seed`. fn shard_id(organization_id: Uuid, seed: usize) -> ShardId { let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec(); diff --git a/src/durable-cache/src/lib.rs b/src/durable-cache/src/lib.rs index 34d5360fc8ccb..1294c5854101f 100644 --- a/src/durable-cache/src/lib.rs +++ b/src/durable-cache/src/lib.rs @@ -10,7 +10,7 @@ //! The crate provides a durable key-value cache abstraction implemented by persist. use std::collections::BTreeMap; -use std::fmt::Debug; +use std::fmt::{Debug, Formatter}; use std::hash::Hash; use std::sync::Arc; @@ -22,6 +22,7 @@ use mz_persist_client::write::WriteHandle; use mz_persist_client::{Diagnostics, PersistClient}; use mz_persist_types::{Codec, ShardId}; use timely::progress::Antichain; +use tracing::debug; pub trait DurableCacheCodec { type Key: Ord + Hash + Clone + Debug; @@ -37,9 +38,17 @@ pub trait DurableCacheCodec { fn decode(key: Self::KeyCodec, val: Self::ValCodec) -> (Self::Key, Self::Val); } -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum Error { - WriteConflict, + WriteConflict(UpperMismatch), +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Error::WriteConflict(err) => write!(f, "{err}"), + } + } } #[derive(Debug)] @@ -186,7 +195,9 @@ impl DurableCache { /// /// Failures will update the cache and retry until the cache is written successfully. pub async fn set(&mut self, key: &C::Key, value: Option<&C::Val>) { - while self.try_set(key, value).await.is_err() {} + while let Err(err) = self.try_set(key, value).await { + debug!("failed to set entry: {err} ... retrying"); + } } /// Durably set multiple key-value pairs in `entries`. Values of `None` deletes the @@ -194,7 +205,9 @@ impl DurableCache { /// /// Failures will update the cache and retry until the cache is written successfully. pub async fn set_many(&mut self, entries: &[(&C::Key, Option<&C::Val>)]) { - while self.try_set_many(entries).await.is_err() {} + while let Err(err) = self.try_set_many(entries).await { + debug!("failed to set entries: {err} ... retrying"); + } } /// Tries to durably set `key` to `value`. A `value` of `None` deletes the entry from the cache. @@ -247,8 +260,8 @@ impl DurableCache { Ok(()) } Err(err) => { - self.sync_to(err.current.into_option()).await; - Err(Error::WriteConflict) + self.sync_to(err.current.clone().into_option()).await; + Err(Error::WriteConflict(err)) } } } diff --git a/src/ore/src/serde.rs b/src/ore/src/serde.rs index 22ac46d9eea52..2ced501bebda3 100644 --- a/src/ore/src/serde.rs +++ b/src/ore/src/serde.rs @@ -32,3 +32,51 @@ where } s.end() } + +/// Used to deserialize fields of [`std::collections::BTreeMap`] whose key type is not a native +/// string. Annotate the field with +/// `#[serde(deserialize_with = "mz_ore::serde::string_key_to_btree_map")]`. +pub fn string_key_to_btree_map<'de, K, V, D>( + deserializer: D, +) -> Result, D::Error> +where + K: std::str::FromStr + Ord + std::fmt::Debug, + K::Err: std::fmt::Display, + V: serde::Deserialize<'de>, + D: serde::Deserializer<'de>, +{ + struct BTreeMapVisitor { + _phantom: std::marker::PhantomData<(K, V)>, + } + + impl<'de, K, V> serde::de::Visitor<'de> for BTreeMapVisitor + where + K: std::str::FromStr + Ord, + V: serde::Deserialize<'de>, + K::Err: std::fmt::Display, + { + type Value = std::collections::BTreeMap; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "a map with keys that implement FromStr") + } + + fn visit_map(self, mut access: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut map = std::collections::BTreeMap::new(); + + while let Some((key, value)) = access.next_entry::()? { + let key = K::from_str(&key).map_err(|e| serde::de::Error::custom(e))?; + map.insert(key, value); + } + + Ok(map) + } + } + + deserializer.deserialize_map(BTreeMapVisitor { + _phantom: Default::default(), + }) +} diff --git a/src/repr/src/global_id.rs b/src/repr/src/global_id.rs index 12dc3a44b0a4f..0b971eae22c5f 100644 --- a/src/repr/src/global_id.rs +++ b/src/repr/src/global_id.rs @@ -84,6 +84,9 @@ impl FromStr for GlobalId { if s.len() < 2 { return Err(anyhow!("couldn't parse id {}", s)); } + if s == "Explained Query" { + return Ok(GlobalId::Explain); + } let val: u64 = s[1..].parse()?; match s.chars().next().unwrap() { 's' => Ok(GlobalId::System(val)), diff --git a/src/repr/src/optimize.rs b/src/repr/src/optimize.rs index ac5b0304a75df..c016aa968900b 100644 --- a/src/repr/src/optimize.rs +++ b/src/repr/src/optimize.rs @@ -11,12 +11,13 @@ use std::collections::BTreeMap; +use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize}; /// A macro for feature flags managed by the optimizer. macro_rules! optimizer_feature_flags { ({ $($feature:ident: $type:ty,)* }) => { - #[derive(Clone, Debug, Default)] + #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, Arbitrary)] pub struct OptimizerFeatures { $(pub $feature: $type),* } diff --git a/src/transform/Cargo.toml b/src/transform/Cargo.toml index f65b42470478d..e6a22422b7218 100644 --- a/src/transform/Cargo.toml +++ b/src/transform/Cargo.toml @@ -20,6 +20,9 @@ mz-repr = { path = "../repr", features = ["tracing_"] } num-traits = "0.2" ordered-float = { version = "4.2.0", features = ["serde"] } paste = "1.0.11" +proptest = { version = "1.0.0", default-features = false, features = ["std"] } +proptest-derive = { version = "0.3.0" } +serde = "1.0.152" tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack" } diff --git a/src/transform/src/dataflow.rs b/src/transform/src/dataflow.rs index c82f290f0d4ea..6544d167616ec 100644 --- a/src/transform/src/dataflow.rs +++ b/src/transform/src/dataflow.rs @@ -26,6 +26,8 @@ use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError}; use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log}; use mz_repr::explain::{DeltaJoinIndexUsageType, IndexUsageType, UsedIndexes}; use mz_repr::GlobalId; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; use crate::monotonic::MonotonicFlag; use crate::notice::RawOptimizerNotice; @@ -1223,13 +1225,15 @@ impl IndexUsageContext { /// Extra information about the dataflow. This is not going to be shipped, but has to be processed /// in other ways, e.g., showing notices to the user, or saving meta-information to the catalog. -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)] pub struct DataflowMetainfo { /// Notices that the optimizer wants to show to users. /// For pushing a new element, use [`Self::push_optimizer_notice_dedup`]. pub optimizer_notices: Vec, /// What kind of operation (full scan, lookup, ...) will access each index. Computed by /// `prune_and_annotate_dataflow_index_imports`. + #[serde(serialize_with = "mz_ore::serde::map_key_to_string")] + #[serde(deserialize_with = "mz_ore::serde::string_key_to_btree_map")] pub index_usage_types: BTreeMap>, } diff --git a/src/transform/src/notice.rs b/src/transform/src/notice.rs index 1ddfbdc394c0d..66f2043fdb314 100644 --- a/src/transform/src/notice.rs +++ b/src/transform/src/notice.rs @@ -44,8 +44,10 @@ use std::{concat, stringify}; use enum_kinds::EnumKind; use mz_repr::explain::ExprHumanizer; use mz_repr::GlobalId; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)] /// An long lived in-memory representation of a [`RawOptimizerNotice`] that is /// meant to be kept as part of the hydrated catalog state. pub struct OptimizerNotice { @@ -121,7 +123,9 @@ impl OptimizerNotice { } } -#[derive(EnumKind, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive( + EnumKind, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary, +)] #[enum_kind(ActionKind)] /// An action attached to an [`OptimizerNotice`] pub enum Action { @@ -282,7 +286,7 @@ macro_rules! raw_optimizer_notices { paste::paste!{ /// Notices that the optimizer wants to show to users. #[derive(EnumKind, Clone, Debug, Eq, PartialEq, Hash)] - #[enum_kind(OptimizerNoticeKind, derive(PartialOrd, Ord, Hash))] + #[enum_kind(OptimizerNoticeKind, derive(PartialOrd, Ord, Hash, Serialize, Deserialize,Arbitrary))] pub enum RawOptimizerNotice { $( #[doc = concat!("See [`", stringify!($ty), "`].")]