Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): distributed execution of recluster #13048

Merged
merged 16 commits into from
Oct 25, 2023
2 changes: 1 addition & 1 deletion src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ pub trait TableExt: Table {
let tid = table_info.ident.table_id;
let catalog = ctx.get_catalog(table_info.catalog()).await?;
let (ident, meta) = catalog.get_table_meta_by_id(tid).await?;
let table_info: TableInfo = TableInfo {
let table_info = TableInfo {
ident,
desc: "".to_owned(),
name,
Expand Down
99 changes: 60 additions & 39 deletions src/query/service/src/interpreters/interpreter_table_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::time::SystemTime;
use common_base::runtime::GlobalIORuntime;
use common_catalog::plan::Partitions;
use common_catalog::table::CompactTarget;
use common_catalog::table::TableExt;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::schema::CatalogInfo;
Expand All @@ -33,8 +32,10 @@ use common_sql::executor::PhysicalPlan;
use common_sql::plans::OptimizeTableAction;
use common_sql::plans::OptimizeTablePlan;
use common_storages_factory::NavigationPoint;
use common_storages_fuse::FuseTable;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::interpreter_table_recluster::build_recluster_physical_plan;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::pipelines::executor::ExecutorSettings;
Expand Down Expand Up @@ -122,9 +123,10 @@ impl OptimizeTableInterpreter {
target: CompactTarget,
need_purge: bool,
) -> Result<PipelineBuildResult> {
let mut table = self
.ctx
.get_table(&self.plan.catalog, &self.plan.database, &self.plan.table)
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let tenant = self.ctx.get_tenant();
let mut table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;

let table_info = table.get_table_info().clone();
Expand Down Expand Up @@ -174,7 +176,6 @@ impl OptimizeTableInterpreter {

let mut build_res = PipelineBuildResult::create();
let settings = self.ctx.get_settings();
let mut reclustered_block_count = 0;
let need_recluster = !table.cluster_keys(self.ctx.clone()).is_empty();
if need_recluster {
if !compact_pipeline.is_empty() {
Expand All @@ -189,50 +190,70 @@ impl OptimizeTableInterpreter {
executor.execute()?;

// refresh table.
table = table.as_ref().refresh(self.ctx.as_ref()).await?;
table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;
}

reclustered_block_count = table
.recluster(
self.ctx.clone(),
None,
self.plan.limit,
&mut build_res.main_pipeline,
)
.await?;
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
if let Some(mutator) = fuse_table
.build_recluster_mutator(self.ctx.clone(), None, self.plan.limit)
.await?
{
if !mutator.tasks.is_empty() {
let reclustered_block_count = mutator.recluster_blocks_count;
let physical_plan = build_recluster_physical_plan(
mutator.tasks,
table.get_table_info().clone(),
catalog.info(),
mutator.snapshot,
mutator.remained_blocks,
mutator.removed_segment_indexes,
mutator.removed_segment_summary,
)?;

build_res = build_query_pipeline_without_render_result_set(
&self.ctx,
&physical_plan,
false,
)
.await?;

let ctx = self.ctx.clone();
let plan = self.plan.clone();
let start = SystemTime::now();
build_res
.main_pipeline
.set_on_finished(move |may_error| match may_error {
None => InterpreterClusteringHistory::write_log(
&ctx,
start,
&plan.database,
&plan.table,
reclustered_block_count,
),
Some(error_code) => Err(error_code.clone()),
});
}
}
} else {
build_res.main_pipeline = compact_pipeline;
}

let ctx = self.ctx.clone();
let plan = self.plan.clone();
if build_res.main_pipeline.is_empty() {
if need_purge {
if need_purge {
if build_res.main_pipeline.is_empty() {
purge(ctx, plan, None).await?;
} else {
build_res
.main_pipeline
.set_on_finished(move |may_error| match may_error {
None => GlobalIORuntime::instance()
.block_on(async move { purge(ctx, plan, None).await }),
Some(error_code) => Err(error_code.clone()),
});
}
} else {
let start = SystemTime::now();
build_res
.main_pipeline
.set_on_finished(move |may_error| match may_error {
None => {
if need_recluster {
InterpreterClusteringHistory::write_log(
&ctx,
start,
&plan.database,
&plan.table,
reclustered_block_count,
)?;
}
if need_purge {
GlobalIORuntime::instance()
.block_on(async move { purge(ctx, plan, None).await })?;
}
Ok(())
}
Some(error_code) => Err(error_code.clone()),
});
}

Ok(build_res)
Expand Down
127 changes: 100 additions & 27 deletions src/query/service/src/interpreters/interpreter_table_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,28 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::type_check::check_function;
use common_functions::BUILTIN_FUNCTIONS;
use common_meta_app::schema::CatalogInfo;
use common_meta_app::schema::TableInfo;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind;
use common_sql::executor::PhysicalPlan;
use common_sql::executor::ReclusterSink;
use common_sql::executor::ReclusterSource;
use common_sql::executor::ReclusterTask;
use common_storages_fuse::FuseTable;
use log::error;
use log::info;
use log::warn;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::Statistics;
use storages_common_table_meta::meta::TableSnapshot;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterClusteringHistory;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::Pipeline;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;
use crate::sql::executor::cast_expr_to_non_null_boolean;
Expand Down Expand Up @@ -59,17 +71,9 @@ impl Interpreter for ReclusterTableInterpreter {
let plan = &self.plan;
let ctx = self.ctx.clone();
let settings = ctx.get_settings();
let tenant = ctx.get_tenant();
let max_threads = settings.get_max_threads()?;
let max_threads = settings.get_max_threads()? as usize;
let recluster_timeout_secs = settings.get_recluster_timeout_secs()?;

// Status.
{
let status = "recluster: begin to run recluster";
ctx.set_status_info(status);
info!("{}", status);
}

// Build extras via push down scalar
let extras = if let Some(scalar) = &plan.push_downs {
// prepare the filter expression
Expand All @@ -95,6 +99,12 @@ impl Interpreter for ReclusterTableInterpreter {
None
};

let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let tenant = self.ctx.get_tenant();
let mut table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;

let mut times = 0;
let mut block_count = 0;
let start = SystemTime::now();
Expand All @@ -108,13 +118,6 @@ impl Interpreter for ReclusterTableInterpreter {
return Err(err);
}

let table = self
.ctx
.get_catalog(&plan.catalog)
.await?
.get_table(tenant.as_str(), &plan.database, &plan.table)
.await?;

// check if the table is locked.
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let reply = catalog
Expand All @@ -127,24 +130,52 @@ impl Interpreter for ReclusterTableInterpreter {
)));
}

let mut pipeline = Pipeline::create();
let reclustered_block_count = table
.recluster(ctx.clone(), extras.clone(), plan.limit, &mut pipeline)
// Status.
{
let status = "recluster: begin to run recluster";
ctx.set_status_info(status);
info!("{}", status);
}

let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let mutator = fuse_table
.build_recluster_mutator(ctx.clone(), extras.clone(), plan.limit)
.await?;
if pipeline.is_empty() {
if mutator.is_none() {
break;
};

block_count += reclustered_block_count;
let max_threads = std::cmp::min(max_threads, reclustered_block_count) as usize;
pipeline.set_max_threads(max_threads);
let mutator = mutator.unwrap();
if mutator.tasks.is_empty() {
break;
};
block_count += mutator.recluster_blocks_count;
let physical_plan = build_recluster_physical_plan(
mutator.tasks,
table.get_table_info().clone(),
catalog.info(),
mutator.snapshot,
mutator.remained_blocks,
mutator.removed_segment_indexes,
mutator.removed_segment_summary,
)?;

let mut build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false)
.await?;
assert!(build_res.main_pipeline.is_complete_pipeline()?);
build_res.set_max_threads(max_threads);

let query_id = ctx.get_id();
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;

ctx.set_executor(executor.get_inner())?;
executor.execute()?;
let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);

let complete_executor =
PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?;
ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;

let elapsed_time = SystemTime::now().duration_since(start).unwrap();
times += 1;
Expand All @@ -170,6 +201,11 @@ impl Interpreter for ReclusterTableInterpreter {
);
break;
}

// refresh table.
table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;
}

if block_count != 0 {
Expand All @@ -185,3 +221,40 @@ impl Interpreter for ReclusterTableInterpreter {
Ok(PipelineBuildResult::create())
}
}

pub fn build_recluster_physical_plan(
tasks: Vec<ReclusterTask>,
table_info: TableInfo,
catalog_info: CatalogInfo,
snapshot: Arc<TableSnapshot>,
remained_blocks: Vec<Arc<BlockMeta>>,
removed_segment_indexes: Vec<usize>,
removed_segment_summary: Statistics,
) -> Result<PhysicalPlan> {
let is_distributed = tasks.len() > 1;
let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
tasks,
table_info: table_info.clone(),
catalog_info: catalog_info.clone(),
}));

if is_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
kind: FragmentKind::Merge,
keys: vec![],
ignore_exchange: false,
});
}

Ok(PhysicalPlan::ReclusterSink(Box::new(ReclusterSink {
input: Box::new(root),
table_info,
catalog_info,
snapshot,
remained_blocks,
removed_segment_indexes,
removed_segment_summary,
})))
}
Loading
Loading