Skip to content

Commit

Permalink
adapter: plan insight for peek persist limit
Browse files Browse the repository at this point in the history
Add a plan insight that adds a limit to check for PeekPersist fast paths.

See https://github.com/MaterializeInc/console/issues/2667
  • Loading branch information
maddyblue committed Jul 18, 2024
1 parent b3d67aa commit b722c16
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 6 deletions.
5 changes: 5 additions & 0 deletions src/adapter/src/coord/sequencer/inner/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,10 @@ impl Coordinator {
// situation where optimizing takes a while and there a lots of clusters,
// which would delay peek execution by the product of those.
let opt_limit = mz_adapter_types::dyncfgs::PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION.get(catalog.system_config().dyncfgs());
let target_instance = catalog
.get_cluster(optimizer.cluster_id())
.name
.clone();
let enable_re_optimize =
!(matches!(explain_ctx, ExplainContext::PlanInsightsNotice(_))
&& optimizer.duration() > opt_limit);
Expand All @@ -600,6 +604,7 @@ impl Coordinator {
raw_expr: plan.source.clone(),
catalog,
compute_instances,
target_instance,
metrics: optimizer.metrics().clone(),
finishing: optimizer.finishing().clone(),
optimizer_config: optimizer.config().clone(),
Expand Down
18 changes: 16 additions & 2 deletions src/adapter/src/explain/insights.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use mz_expr::{
AccessStrategy, AggregateExpr, AggregateFunc, Id, MirRelationExpr, OptimizedMirRelationExpr,
RowSetFinishing,
};
use mz_ore::num::NonNeg;
use mz_repr::explain::ExprHumanizer;
use mz_repr::{GlobalId, Timestamp};
use mz_sql::ast::Statement;
Expand Down Expand Up @@ -46,6 +47,7 @@ pub struct PlanInsightsContext {
// TODO: Avoid populating this if not needed. Maybe make this a method that can return a
// ComputeInstanceSnapshot for a given cluster.
pub compute_instances: BTreeMap<String, ComputeInstanceSnapshot>,
pub target_instance: String,
pub metrics: OptimizerMetrics,
pub finishing: RowSetFinishing,
pub optimizer_config: OptimizerConfig,
Expand All @@ -68,6 +70,8 @@ pub struct PlanInsights {
/// this as fast path. That is: if this query were run on the cluster of the key, it would be
/// fast because it would use the index of the value.
pub fast_path_clusters: BTreeMap<String, Option<FastPathCluster>>,
/// For the current cluster, whether adding a LIMIT <= this will result in a fast path.
pub fast_path_limit: Option<usize>,
/// Names of persist sources over which a count(*) is done.
pub persist_count: Vec<Name>,
}
Expand All @@ -89,14 +93,18 @@ impl PlanInsights {
.compute_instances
.into_iter()
.map(|(name, compute_instance)| {
// Optimize in parallel.
let raw_expr = ctx.raw_expr.clone();
let mut finishing = ctx.finishing.clone();
// For the current cluster, try adding a LIMIT to see if it fast paths with PeekPersist.
if name == ctx.target_instance {
finishing.limit = Some(NonNeg::try_from(1).expect("non-negitave"));
}
let session = Arc::clone(&session);
let timestamp_context = ctx.timestamp_context.clone();
let mut optimizer = optimize::peek::Optimizer::new(
Arc::clone(&ctx.catalog),
compute_instance,
ctx.finishing.clone(),
finishing,
ctx.view_id,
ctx.index_id,
ctx.optimizer_config.clone(),
Expand Down Expand Up @@ -126,6 +134,12 @@ impl PlanInsights {
};
let (plan, _, _) = plan.unapply();
if let PeekPlan::FastPath(plan) = plan {
// Same-cluster optimization is the LIMIT check.
if name == ctx.target_instance {
self.fast_path_limit =
Some(ctx.optimizer_config.features.persist_fast_path_limit);
continue;
}
let idx_name = if let FastPathPlan::PeekExisting(_, idx_id, _, _) = plan {
let idx_entry = ctx.catalog.get_entry(&idx_id);
Some(FastPathCluster {
Expand Down
Loading

0 comments on commit b722c16

Please sign in to comment.