Skip to content

Commit

Permalink
finish copy from query
Browse files Browse the repository at this point in the history
  • Loading branch information
JackTan25 committed Jul 3, 2023
1 parent dcd3e80 commit 592d7b5
Show file tree
Hide file tree
Showing 11 changed files with 373 additions and 90 deletions.
179 changes: 139 additions & 40 deletions src/query/service/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ use common_expression::DataSchemaRefExt;
use common_expression::Scalar;
use common_meta_app::principal::StageInfo;
use common_meta_app::schema::TableCopiedFileInfo;
use common_meta_app::schema::TableInfo;
use common_meta_app::schema::UpsertTableCopiedFileReq;
use common_pipeline_core::Pipeline;
use common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use common_sql::executor::DistributedCopyIntoTable;
use common_sql::executor::CopyIntoTableFromQuery;
use common_sql::executor::DistributedCopyIntoTableFromStage;
use common_sql::executor::Exchange;
use common_sql::executor::FragmentKind;
use common_sql::executor::PhysicalPlan;
Expand Down Expand Up @@ -70,7 +72,8 @@ pub struct CopyInterpreter {

pub enum PlanParam {
CopyIntoTablePlanOption(CopyIntoTablePlan),
DistributedCopyIntoTable(DistributedCopyIntoTable),
DistributedCopyIntoTableFromStage(DistributedCopyIntoTableFromStage),
CopyIntoTableFromQuery(CopyIntoTableFromQuery),
}

impl CopyInterpreter {
Expand All @@ -80,7 +83,7 @@ impl CopyInterpreter {
}

#[async_backtrace::framed]
async fn build_query(&self, query: &Plan) -> Result<(PipelineBuildResult, DataSchemaRef)> {
async fn build_query(&self, query: &Plan) -> Result<(SelectInterpreter, DataSchemaRef)> {
let (s_expr, metadata, bind_context, formatted_ast) = match query {
Plan::Query {
s_expr,
Expand Down Expand Up @@ -114,9 +117,8 @@ impl CopyInterpreter {
})
.collect();
let data_schema = DataSchemaRefExt::create(fields);
let plan = select_interpreter.build_physical_plan().await?;
let build_res = select_interpreter.build_pipeline(plan).await?;
Ok((build_res, data_schema))

Ok((select_interpreter, data_schema))
}

#[async_backtrace::framed]
Expand All @@ -126,7 +128,9 @@ impl CopyInterpreter {
path: &str,
query: &Plan,
) -> Result<PipelineBuildResult> {
let (mut build_res, data_schema) = self.build_query(query).await?;
let (select_interpreter, data_schema) = self.build_query(query).await?;
let plan = select_interpreter.build_physical_plan().await?;
let mut build_res = select_interpreter.build_pipeline(plan).await?;
let table_schema = infer_table_schema(&data_schema)?;
let stage_table_info = StageTableInfo {
schema: table_schema,
Expand Down Expand Up @@ -202,7 +206,7 @@ impl CopyInterpreter {
async fn transform_copy_plan_distributed(
&self,
plan: &CopyIntoTablePlan,
) -> Result<Option<DistributedCopyIntoTable>> {
) -> Result<Option<DistributedCopyIntoTableFromStage>> {
let ctx = self.ctx.clone();
let to_table = ctx
.get_table(&plan.catalog_name, &plan.database_name, &plan.table_name)
Expand Down Expand Up @@ -230,7 +234,7 @@ impl CopyInterpreter {
if read_source_plan.parts.len() <= 1 {
return Ok(None);
}
Ok(Some(DistributedCopyIntoTable {
Ok(Some(DistributedCopyIntoTableFromStage {
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id for insert.
plan_id: 0,
Expand Down Expand Up @@ -302,7 +306,67 @@ impl CopyInterpreter {
.await?;

let (mut build_res, source_schema, files) = if let Some(query) = &plan.query {
let (build_res, source_schema) = self.build_query(query).await?;
let mut build_res;
let (select_interpreter, source_schema) = self.build_query(query).await?;
let plan_query = select_interpreter.build_physical_plan().await?;
if plan.enable_distributed {
let to_table = self
.ctx
.get_table(&plan.catalog_name, &plan.database_name, &plan.table_name)
.await?;
let table_ctx: Arc<dyn TableContext> = self.ctx.clone();
let copy_from_query = CopyIntoTableFromQuery {
// add exchange plan node to enable distributed
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id
plan_id: 0,
catalog_name: plan.catalog_name.clone(),
database_name: plan.database_name.clone(),
table_name: plan.table_name.clone(),
required_source_schema: plan.required_source_schema.clone(),
values_consts: plan.values_consts.clone(),
required_values_schema: plan.required_values_schema.clone(),
write_mode: plan.write_mode,
validation_mode: plan.validation_mode.clone(),
force: plan.force,
stage_table_info: plan.stage_table_info.clone(),
local_node_id: self.ctx.get_cluster().local_id.clone(),
input: Box::new(plan_query),
files: plan.collect_files(&table_ctx).await?,
table_info: to_table.get_table_info().clone(),
};

// add exchange plan node to enable distributed
// TODO(leiysky): we reuse the id of exchange here,
// which is not correct. We should generate a new id
let exchange_plan = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(PhysicalPlan::CopyIntoTableFromQuery(Box::new(
copy_from_query,
))),
kind: FragmentKind::Merge,
keys: Vec::new(),
});
build_res = build_distributed_pipeline(&self.ctx, &exchange_plan, false).await?;
let to_table = self
.ctx
.get_table(&plan.catalog_name, &plan.database_name, &plan.table_name)
.await?;
let files = plan.collect_files(&table_ctx).await?;
last_commit(
&self.ctx,
&plan.catalog_name,
to_table.get_table_info(),
&plan.stage_table_info.stage_info,
&files,
plan.force,
plan.write_mode,
&mut build_res,
)?;
} else {
build_res = select_interpreter.build_pipeline(plan_query).await?;
}

(
build_res,
source_schema,
Expand All @@ -327,17 +391,20 @@ impl CopyInterpreter {
.await?;
(build_res, plan.required_source_schema.clone(), files)
};
// copy into table from query/stage (standlone)
if !plan.enable_distributed {
append_data_and_set_finish(
&mut build_res.main_pipeline,
source_schema,
PlanParam::CopyIntoTablePlanOption(plan.clone()),
ctx,
to_table,
files,
start,
true,
)?;
}

append_data_and_set_finish(
&mut build_res.main_pipeline,
source_schema,
PlanParam::CopyIntoTablePlanOption(plan.clone()),
ctx,
to_table,
files,
start,
true,
)?;
Ok(build_res)
}

Expand Down Expand Up @@ -406,7 +473,7 @@ impl Interpreter for CopyInterpreter {

match &self.plan {
CopyPlan::IntoTable(plan) => {
if plan.enable_distributed {
if plan.enable_distributed && plan.query.is_none() {
let distributed_plan_op = self.transform_copy_plan_distributed(plan).await?;
if distributed_plan_op.is_none() {
return self.build_copy_into_table_pipeline(plan).await;
Expand All @@ -418,7 +485,7 @@ impl Interpreter for CopyInterpreter {
// which is not correct. We should generate a new id for insert.
let exchange_plan = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(PhysicalPlan::DistributedCopyIntoTable(Box::new(
input: Box::new(PhysicalPlan::DistributedCopyIntoTableFromStage(Box::new(
distributed_plan.clone(),
))),
kind: FragmentKind::Merge,
Expand All @@ -427,24 +494,15 @@ impl Interpreter for CopyInterpreter {
let mut build_res =
build_distributed_pipeline(&self.ctx, &exchange_plan, false).await?;

let catalog = self.ctx.get_catalog(&distributed_plan.catalog_name)?;
let to_table = catalog.get_table_by_info(&distributed_plan.table_info)?;
let copied_files = CopyInterpreter::upsert_copied_files_request(
self.ctx.clone(),
to_table.clone(),
distributed_plan.stage_table_info.stage_info.clone(),
distributed_plan.files.clone(),
last_commit(
&self.ctx,
&distributed_plan.catalog_name,
&distributed_plan.table_info,
&distributed_plan.stage_table_info.stage_info,
&distributed_plan.files,
distributed_plan.force,
)?;
let mut overwrite_ = false;
if let CopyIntoTableMode::Insert { overwrite } = plan.write_mode {
overwrite_ = overwrite;
}
to_table.commit_insertion(
self.ctx.clone(),
&mut build_res.main_pipeline,
copied_files,
overwrite_,
distributed_plan.write_mode,
&mut build_res,
)?;

Ok(build_res)
Expand All @@ -459,6 +517,38 @@ impl Interpreter for CopyInterpreter {
}
}
}
#[allow(clippy::too_many_arguments)]
fn last_commit(
ctx: &Arc<QueryContext>,
catalog_name: &str,
table_info: &TableInfo,
stage_info: &StageInfo,
files: &[StageFileInfo],
force: bool,
write_mode: CopyIntoTableMode,
build_res: &mut PipelineBuildResult,
) -> Result<()> {
let catalog = ctx.get_catalog(catalog_name)?;
let to_table = catalog.get_table_by_info(table_info)?;
let copied_files = CopyInterpreter::upsert_copied_files_request(
ctx.clone(),
to_table.clone(),
stage_info.clone(),
files.to_owned(),
force,
)?;
let mut overwrite_ = false;
if let CopyIntoTableMode::Insert { overwrite } = write_mode {
overwrite_ = overwrite;
}
to_table.commit_insertion(
ctx.clone(),
&mut build_res.main_pipeline,
copied_files,
overwrite_,
)?;
Ok(())
}

fn fill_const_columns(
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -508,7 +598,16 @@ pub fn append_data_and_set_finish(
plan_write_mode = plan.write_mode;
local_id = ctx.get_cluster().local_id.clone();
}
PlanParam::DistributedCopyIntoTable(plan) => {
PlanParam::DistributedCopyIntoTableFromStage(plan) => {
plan_required_source_schema = plan.required_source_schema;
plan_required_values_schema = plan.required_values_schema;
plan_values_consts = plan.values_consts;
plan_stage_table_info = plan.stage_table_info;
plan_force = plan.force;
plan_write_mode = plan.write_mode;
local_id = plan.local_node_id;
}
PlanParam::CopyIntoTableFromQuery(plan) => {
plan_required_source_schema = plan.required_source_schema;
plan_required_values_schema = plan.required_values_schema;
plan_values_consts = plan.values_consts;
Expand Down
37 changes: 31 additions & 6 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ use common_sql::executor::AggregateExpand;
use common_sql::executor::AggregateFinal;
use common_sql::executor::AggregateFunctionDesc;
use common_sql::executor::AggregatePartial;
use common_sql::executor::CopyIntoTableFromQuery;
use common_sql::executor::DeleteFinal;
use common_sql::executor::DeletePartial;
use common_sql::executor::DistributedCopyIntoTable;
use common_sql::executor::DistributedCopyIntoTableFromStage;
use common_sql::executor::DistributedInsertSelect;
use common_sql::executor::EvalScalar;
use common_sql::executor::ExchangeSink;
Expand Down Expand Up @@ -204,15 +205,39 @@ impl PipelineBuilder {
PhysicalPlan::DeletePartial(delete) => self.build_delete_partial(delete),
PhysicalPlan::DeleteFinal(delete) => self.build_delete_final(delete),
PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join),
PhysicalPlan::DistributedCopyIntoTable(distributed_plan) => {
self.build_distributed_copy_into_table(distributed_plan)
PhysicalPlan::DistributedCopyIntoTableFromStage(distributed_plan) => {
self.build_distributed_copy_into_table_from_stage(distributed_plan)
}
PhysicalPlan::CopyIntoTableFromQuery(copy_plan) => {
self.build_copy_into_table_from_query(copy_plan)
}
}
}

fn build_distributed_copy_into_table(
fn build_copy_into_table_from_query(
&mut self,
copy_plan: &CopyIntoTableFromQuery,
) -> Result<()> {
self.build_pipeline(&copy_plan.input)?;
let catalog = self.ctx.get_catalog(&copy_plan.catalog_name)?;
let to_table = catalog.get_table_by_info(&copy_plan.table_info)?;
let start = Instant::now();
append_data_and_set_finish(
&mut self.main_pipeline,
copy_plan.required_source_schema.clone(),
PlanParam::CopyIntoTableFromQuery(copy_plan.clone()),
self.ctx.clone(),
to_table,
copy_plan.files.clone(),
start,
false,
)?;
Ok(())
}

fn build_distributed_copy_into_table_from_stage(
&mut self,
distributed_plan: &DistributedCopyIntoTable,
distributed_plan: &DistributedCopyIntoTableFromStage,
) -> Result<()> {
let catalog = self.ctx.get_catalog(&distributed_plan.catalog_name)?;
let to_table = catalog.get_table_by_info(&distributed_plan.table_info)?;
Expand All @@ -227,7 +252,7 @@ impl PipelineBuilder {
append_data_and_set_finish(
&mut self.main_pipeline,
distributed_plan.required_source_schema.clone(),
PlanParam::DistributedCopyIntoTable(distributed_plan.clone()),
PlanParam::DistributedCopyIntoTableFromStage(distributed_plan.clone()),
ctx,
to_table,
distributed_plan.files.clone(),
Expand Down
9 changes: 6 additions & 3 deletions src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_sql::executor::DistributedCopyIntoTable;
use common_sql::executor::DistributedCopyIntoTableFromStage;
use common_sql::executor::FragmentKind;

use crate::api::BroadcastExchange;
Expand Down Expand Up @@ -139,9 +139,12 @@ impl PhysicalPlanReplacer for Fragmenter {
Ok(PhysicalPlan::TableScan(plan.clone()))
}

fn replace_copy_into_table(&mut self, plan: &DistributedCopyIntoTable) -> Result<PhysicalPlan> {
fn replace_copy_into_table(
&mut self,
plan: &DistributedCopyIntoTableFromStage,
) -> Result<PhysicalPlan> {
self.state = State::SelectLeaf;
Ok(PhysicalPlan::DistributedCopyIntoTable(Box::new(
Ok(PhysicalPlan::DistributedCopyIntoTableFromStage(Box::new(
plan.clone(),
)))
}
Expand Down
Loading

0 comments on commit 592d7b5

Please sign in to comment.