Skip to content

Commit

Permalink
feat: distributed copy into table from query (#11943)
Browse files Browse the repository at this point in the history
* finish copy from query

* fix typo

* add purge

* fix result

* fix typo

* add tests

* remove useless comments

* fix conflict

* fix clippy

* fix clippy

* add more tests

* add more tests

* add query T

* fix test

* update

* update error code

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
JackTan25 and mergify[bot] authored Jul 14, 2023
1 parent 93bd6b8 commit f3cbcd7
Show file tree
Hide file tree
Showing 15 changed files with 414 additions and 125 deletions.
200 changes: 134 additions & 66 deletions src/query/service/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use common_expression::DataSchemaRefExt;
use common_meta_app::principal::StageInfo;
use common_pipeline_core::Pipeline;
use common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use common_sql::executor::DistributedCopyIntoTable;
use common_sql::executor::CopyIntoTableFromQuery;
use common_sql::executor::DistributedCopyIntoTableFromStage;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind;
use common_sql::executor::PhysicalPlan;
Expand Down Expand Up @@ -65,7 +66,7 @@ impl CopyInterpreter {
}

#[async_backtrace::framed]
async fn build_query(&self, query: &Plan) -> Result<(PipelineBuildResult, DataSchemaRef)> {
async fn build_query(&self, query: &Plan) -> Result<(SelectInterpreter, DataSchemaRef)> {
let (s_expr, metadata, bind_context, formatted_ast) = match query {
Plan::Query {
s_expr,
Expand Down Expand Up @@ -99,9 +100,8 @@ impl CopyInterpreter {
})
.collect();
let data_schema = DataSchemaRefExt::create(fields);
let plan = select_interpreter.build_physical_plan().await?;
let build_res = select_interpreter.build_pipeline(plan).await?;
Ok((build_res, data_schema))

Ok((select_interpreter, data_schema))
}

/// Build a pipeline for local copy into stage.
Expand All @@ -112,7 +112,9 @@ impl CopyInterpreter {
path: &str,
query: &Plan,
) -> Result<PipelineBuildResult> {
let (mut build_res, data_schema) = self.build_query(query).await?;
let (select_interpreter, data_schema) = self.build_query(query).await?;
let plan = select_interpreter.build_physical_plan().await?;
let mut build_res = select_interpreter.build_pipeline(plan).await?;
let table_schema = infer_table_schema(&data_schema)?;
let stage_table_info = StageTableInfo {
schema: table_schema,
Expand Down Expand Up @@ -147,7 +149,7 @@ impl CopyInterpreter {
async fn try_transform_copy_plan_from_local_to_distributed(
&self,
plan: &CopyIntoTablePlan,
) -> Result<Option<DistributedCopyIntoTable>> {
) -> Result<Option<CopyPlanType>> {
let ctx = self.ctx.clone();
let to_table = ctx
.get_table(&plan.catalog_name, &plan.database_name, &plan.table_name)
Expand Down Expand Up @@ -175,26 +177,55 @@ impl CopyInterpreter {
if read_source_plan.parts.len() <= 1 {
return Ok(None);
}

Ok(Some(DistributedCopyIntoTable {
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id for insert.
plan_id: 0,
catalog_name: plan.catalog_name.clone(),
database_name: plan.database_name.clone(),
table_name: plan.table_name.clone(),
required_values_schema: plan.required_values_schema.clone(),
values_consts: plan.values_consts.clone(),
required_source_schema: plan.required_source_schema.clone(),
write_mode: plan.write_mode,
validation_mode: plan.validation_mode.clone(),
force: plan.force,
stage_table_info: plan.stage_table_info.clone(),
source: Box::new(read_source_plan),
thresholds: to_table.get_block_thresholds(),
files,
table_info: to_table.get_table_info().clone(),
}))
if plan.query.is_none() {
Ok(Some(CopyPlanType::DistributedCopyIntoTableFromStage(
DistributedCopyIntoTableFromStage {
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id for insert.
plan_id: 0,
catalog_name: plan.catalog_name.clone(),
database_name: plan.database_name.clone(),
table_name: plan.table_name.clone(),
required_values_schema: plan.required_values_schema.clone(),
values_consts: plan.values_consts.clone(),
required_source_schema: plan.required_source_schema.clone(),
stage_table_info: plan.stage_table_info.clone(),
source: Box::new(read_source_plan),
files,
table_info: to_table.get_table_info().clone(),
force: plan.force,
write_mode: plan.write_mode,
thresholds: to_table.get_block_thresholds(),
validation_mode: plan.validation_mode.clone(),
},
)))
} else {
// plan query must exist, we can use unwarp directly.
let (select_interpreter, _) = self.build_query(plan.query.as_ref().unwrap()).await?;
let plan_query = select_interpreter.build_physical_plan().await?;
Ok(Some(CopyPlanType::CopyIntoTableFromQuery(
CopyIntoTableFromQuery {
// add exchange plan node to enable distributed
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id
plan_id: 0,
catalog_name: plan.catalog_name.clone(),
database_name: plan.database_name.clone(),
table_name: plan.table_name.clone(),
required_source_schema: plan.required_source_schema.clone(),
values_consts: plan.values_consts.clone(),
required_values_schema: plan.required_values_schema.clone(),
write_mode: plan.write_mode,
validation_mode: plan.validation_mode.clone(),
force: plan.force,
stage_table_info: plan.stage_table_info.clone(),
local_node_id: self.ctx.get_cluster().local_id.clone(),
input: Box::new(plan_query),
files: plan.collect_files(&table_ctx).await?,
table_info: to_table.get_table_info().clone(),
},
)))
}
}

#[async_backtrace::framed]
Expand Down Expand Up @@ -272,8 +303,9 @@ impl CopyInterpreter {
.clone()
.ok_or(ErrorCode::Internal("files_to_copy should not be None"))?;

let (query_build_res, query_source_schema) = self.build_query(query).await?;
build_res = query_build_res;
let (select_interpreter, query_source_schema) = self.build_query(query).await?;
let plan = select_interpreter.build_physical_plan().await?;
build_res = select_interpreter.build_pipeline(plan).await?;
source_schema = query_source_schema;
}
}
Expand Down Expand Up @@ -340,21 +372,80 @@ impl CopyInterpreter {
#[async_backtrace::framed]
async fn build_cluster_copy_into_table_pipeline(
&self,
distributed_plan: &DistributedCopyIntoTable,
distributed_plan: &CopyPlanType,
) -> Result<PipelineBuildResult> {
// add exchange plan node to enable distributed
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id for insert.
let exchange_plan = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(PhysicalPlan::DistributedCopyIntoTable(Box::new(
distributed_plan.clone(),
))),
kind: FragmentKind::Merge,
keys: Vec::new(),
});
let build_res = build_distributed_pipeline(&self.ctx, &exchange_plan, false).await?;
let (
catalog_name,
database_name,
table_name,
stage_info,
files,
force,
purge,
is_overwrite,
);
let mut build_res = match distributed_plan {
CopyPlanType::DistributedCopyIntoTableFromStage(plan) => {
catalog_name = plan.catalog_name.clone();
database_name = plan.database_name.clone();
table_name = plan.table_name.clone();
stage_info = plan.stage_table_info.stage_info.clone();
files = plan.files.clone();
force = plan.force;
purge = plan.stage_table_info.stage_info.copy_options.purge;
is_overwrite = plan.write_mode.is_overwrite();
// add exchange plan node to enable distributed
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id for insert.
let exchange_plan = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(PhysicalPlan::DistributedCopyIntoTableFromStage(Box::new(
plan.clone(),
))),
kind: FragmentKind::Merge,
keys: Vec::new(),
});

build_distributed_pipeline(&self.ctx, &exchange_plan, false).await?
}
CopyPlanType::CopyIntoTableFromQuery(plan) => {
catalog_name = plan.catalog_name.clone();
database_name = plan.database_name.clone();
table_name = plan.table_name.clone();
stage_info = plan.stage_table_info.stage_info.clone();
files = plan.files.clone();
force = plan.force;
purge = plan.stage_table_info.stage_info.copy_options.purge;
is_overwrite = plan.write_mode.is_overwrite();
// add exchange plan node to enable distributed
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id
let exchange_plan = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(PhysicalPlan::CopyIntoTableFromQuery(Box::new(plan.clone()))),
kind: FragmentKind::Merge,
keys: Vec::new(),
});
build_distributed_pipeline(&self.ctx, &exchange_plan, false).await?
}
_ => unreachable!(),
};
let to_table = self
.ctx
.get_table(&catalog_name, &database_name, &table_name)
.await?;

// commit.
build_commit_data_pipeline(
self.ctx.clone(),
&mut build_res.main_pipeline,
stage_info,
to_table,
files,
force,
purge,
is_overwrite,
)?;
Ok(build_res)
}
}
Expand All @@ -379,33 +470,10 @@ impl Interpreter for CopyInterpreter {
.try_transform_copy_plan_from_local_to_distributed(plan)
.await?;
if let Some(distributed_plan) = distributed_plan_op {
let mut build_res = self
let build_res = self
.build_cluster_copy_into_table_pipeline(&distributed_plan)
.await?;
let to_table = self
.ctx
.get_table(
&distributed_plan.catalog_name,
&distributed_plan.database_name,
&distributed_plan.table_name,
)
.await?;

// commit.
build_commit_data_pipeline(
self.ctx.clone(),
&mut build_res.main_pipeline,
distributed_plan.stage_table_info.stage_info.clone(),
to_table,
distributed_plan.files.clone(),
distributed_plan.force,
distributed_plan
.stage_table_info
.stage_info
.copy_options
.purge,
distributed_plan.write_mode.is_overwrite(),
)?;
Ok(build_res)
} else {
self.build_local_copy_into_table_pipeline(plan).await
Expand Down
16 changes: 13 additions & 3 deletions src/query/service/src/pipelines/builders/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use common_meta_app::principal::StageInfo;
use common_meta_app::schema::TableCopiedFileInfo;
use common_meta_app::schema::UpsertTableCopiedFileReq;
use common_pipeline_core::Pipeline;
use common_sql::executor::DistributedCopyIntoTable;
use common_sql::executor::CopyIntoTableFromQuery;
use common_sql::executor::DistributedCopyIntoTableFromStage;
use common_sql::plans::CopyIntoTableMode;
use common_sql::plans::CopyIntoTablePlan;
use common_storage::common_metrics::copy::metrics_inc_copy_purge_files_cost_milliseconds;
Expand All @@ -48,7 +49,10 @@ use crate::sessions::QueryContext;

pub enum CopyPlanType {
CopyIntoTablePlanOption(CopyIntoTablePlan),
DistributedCopyIntoTable(DistributedCopyIntoTable),
DistributedCopyIntoTableFromStage(DistributedCopyIntoTableFromStage),
// also distributed plan, but we think the real distributed part is the query
// so no "distributed" prefix here.
CopyIntoTableFromQuery(CopyIntoTableFromQuery),
}

pub fn build_append_data_pipeline(
Expand All @@ -70,7 +74,13 @@ pub fn build_append_data_pipeline(
plan_values_consts = plan.values_consts;
plan_write_mode = plan.write_mode;
}
CopyPlanType::DistributedCopyIntoTable(plan) => {
CopyPlanType::DistributedCopyIntoTableFromStage(plan) => {
plan_required_source_schema = plan.required_source_schema;
plan_required_values_schema = plan.required_values_schema;
plan_values_consts = plan.values_consts;
plan_write_mode = plan.write_mode;
}
CopyPlanType::CopyIntoTableFromQuery(plan) => {
plan_required_source_schema = plan.required_source_schema;
plan_required_values_schema = plan.required_values_schema;
plan_values_consts = plan.values_consts;
Expand Down
33 changes: 27 additions & 6 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ use common_sql::executor::AggregateExpand;
use common_sql::executor::AggregateFinal;
use common_sql::executor::AggregateFunctionDesc;
use common_sql::executor::AggregatePartial;
use common_sql::executor::CopyIntoTableFromQuery;
use common_sql::executor::DeleteFinal;
use common_sql::executor::DeletePartial;
use common_sql::executor::DistributedCopyIntoTable;
use common_sql::executor::DistributedCopyIntoTableFromStage;
use common_sql::executor::DistributedInsertSelect;
use common_sql::executor::EvalScalar;
use common_sql::executor::ExchangeSink;
Expand Down Expand Up @@ -205,15 +206,35 @@ impl PipelineBuilder {
PhysicalPlan::DeletePartial(delete) => self.build_delete_partial(delete),
PhysicalPlan::DeleteFinal(delete) => self.build_delete_final(delete),
PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join),
PhysicalPlan::DistributedCopyIntoTable(distributed_plan) => {
self.build_distributed_copy_into_table(distributed_plan)
PhysicalPlan::DistributedCopyIntoTableFromStage(distributed_plan) => {
self.build_distributed_copy_into_table_from_stage(distributed_plan)
}
PhysicalPlan::CopyIntoTableFromQuery(copy_plan) => {
self.build_copy_into_table_from_query(copy_plan)
}
}
}

fn build_distributed_copy_into_table(
fn build_copy_into_table_from_query(
&mut self,
copy_plan: &CopyIntoTableFromQuery,
) -> Result<()> {
self.build_pipeline(&copy_plan.input)?;
let catalog = self.ctx.get_catalog(&copy_plan.catalog_name)?;
let to_table = catalog.get_table_by_info(&copy_plan.table_info)?;
build_append_data_pipeline(
self.ctx.clone(),
&mut self.main_pipeline,
CopyPlanType::CopyIntoTableFromQuery(copy_plan.clone()),
copy_plan.required_source_schema.clone(),
to_table,
)?;
Ok(())
}

fn build_distributed_copy_into_table_from_stage(
&mut self,
distributed_plan: &DistributedCopyIntoTable,
distributed_plan: &DistributedCopyIntoTableFromStage,
) -> Result<()> {
let catalog = self.ctx.get_catalog(&distributed_plan.catalog_name)?;
let to_table = catalog.get_table_by_info(&distributed_plan.table_info)?;
Expand All @@ -229,7 +250,7 @@ impl PipelineBuilder {
build_append_data_pipeline(
ctx,
&mut self.main_pipeline,
CopyPlanType::DistributedCopyIntoTable(distributed_plan.clone()),
CopyPlanType::DistributedCopyIntoTableFromStage(distributed_plan.clone()),
distributed_plan.required_source_schema.clone(),
to_table,
)?;
Expand Down
9 changes: 6 additions & 3 deletions src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_sql::executor::DistributedCopyIntoTable;
use common_sql::executor::DistributedCopyIntoTableFromStage;
use common_sql::executor::FragmentKind;

use crate::api::BroadcastExchange;
Expand Down Expand Up @@ -139,9 +139,12 @@ impl PhysicalPlanReplacer for Fragmenter {
Ok(PhysicalPlan::TableScan(plan.clone()))
}

fn replace_copy_into_table(&mut self, plan: &DistributedCopyIntoTable) -> Result<PhysicalPlan> {
fn replace_copy_into_table(
&mut self,
plan: &DistributedCopyIntoTableFromStage,
) -> Result<PhysicalPlan> {
self.state = State::SelectLeaf;
Ok(PhysicalPlan::DistributedCopyIntoTable(Box::new(
Ok(PhysicalPlan::DistributedCopyIntoTableFromStage(Box::new(
plan.clone(),
)))
}
Expand Down
Loading

1 comment on commit f3cbcd7

@vercel
Copy link

@vercel vercel bot commented on f3cbcd7 Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.vercel.app
databend-databend.vercel.app
databend.rs

Please sign in to comment.