Skip to content

Commit

Permalink
refactor insert select to stream mode
Browse files Browse the repository at this point in the history
  • Loading branch information
zealchen committed Jul 17, 2024
1 parent fa5c286 commit c12f071
Showing 1 changed file with 100 additions and 38 deletions.
138 changes: 100 additions & 38 deletions src/interpreters/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use common_types::{
column_block::{ColumnBlock, ColumnBlockBuilder},
column_schema::ColumnId,
datum::Datum,
record_batch::RecordBatch as CommonRecordBatch,
row::{Row, RowBuilder, RowGroup},
schema::Schema,
};
Expand All @@ -43,9 +44,10 @@ use datafusion::{
},
};
use df_operator::visitor::find_columns_by_expr;
use futures::TryStreamExt;
use futures::StreamExt;
use generic_error::{BoxError, GenericError};
use hash_ext::hash64;
use logger::error;
use macros::define_result;
use query_engine::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef};
use query_frontend::{
Expand All @@ -54,12 +56,14 @@ use query_frontend::{
};
use runtime::Priority;
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use table_engine::table::{TableRef, WriteRequest};
use table_engine::{
stream::SendableRecordBatchStream,
table::{TableRef, WriteRequest},
};

use crate::{
context::Context,
interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as InterpreterResult},
RecordBatchVec,
};

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -115,6 +119,9 @@ pub enum Error {

#[snafu(display("Record columns not enough, len:{}, index:{}", len, index))]
RecordColumnsNotEnough { len: usize, index: usize },

#[snafu(display("Failed to do select, err:{}", source))]
Select { source: table_engine::stream::Error },
}

define_result!(Error);
Expand Down Expand Up @@ -152,14 +159,18 @@ impl Interpreter for InsertInterpreter {
default_value_map,
} = self.plan;

let mut rows = match source {
InsertSource::Values { row_group } => row_group,
match source {
InsertSource::Values { row_group: rows } => {
let num_rows =
prepare_and_write_table(table.clone(), rows, &default_value_map).await?;

Ok(Output::AffectedRows(num_rows))
}
InsertSource::Select {
query: query_plan,
column_index_in_insert,
} => {
// TODO: support streaming insert
let record_batches = exec_select_logical_plan(
let mut record_batches_stream = exec_select_logical_plan(
self.ctx,
query_plan,
self.executor,
Expand All @@ -168,38 +179,98 @@ impl Interpreter for InsertInterpreter {
.await
.context(Insert)?;

if record_batches.is_empty() {
return Ok(Output::AffectedRows(0));
// accumulate 8 record batches before writing to table and this param
// can be adjusted according to the performance test result.
let max_batch_size = 8;
let mut result_rows = 0;
let mut record_batches = Vec::new();
while let Some(response) = record_batches_stream.next().await {
match response {
Ok(record_batch) => {
if record_batch.is_empty() {
continue;
}
record_batches.push(record_batch);
}
Err(e) => {
error!("Failed to get record batch, err:{}", e);
return Err(e).context(Select).context(Insert)?;
}
}

if record_batches.len() >= max_batch_size {
let num_rows = write_record_batches(
&mut record_batches,
column_index_in_insert.as_slice(),
table.clone(),
&default_value_map,
)
.await?;
result_rows += num_rows;
}
}

if !record_batches.is_empty() {
let num_rows = write_record_batches(
&mut record_batches,
column_index_in_insert.as_slice(),
table,
&default_value_map,
)
.await?;
result_rows += num_rows;
}

convert_records_to_row_group(record_batches, column_index_in_insert, table.schema())
.context(Insert)?
Ok(Output::AffectedRows(result_rows))
}
};
}
}
}

maybe_generate_tsid(&mut rows).context(Insert)?;
async fn write_record_batches(
record_batches: &mut Vec<CommonRecordBatch>,
column_index_in_insert: &[InsertMode],
table: TableRef,
default_value_map: &BTreeMap<usize, DfLogicalExpr>,
) -> InterpreterResult<usize> {
let row_group = convert_records_to_row_group(
record_batches.as_slice(),
column_index_in_insert,
table.schema(),
)
.context(Insert)?;
record_batches.clear();

prepare_and_write_table(table, row_group, default_value_map).await
}

// Fill default values
fill_default_values(table.clone(), &mut rows, &default_value_map).context(Insert)?;
async fn prepare_and_write_table(
table: TableRef,
mut row_group: RowGroup,
default_value_map: &BTreeMap<usize, DfLogicalExpr>,
) -> InterpreterResult<usize> {
maybe_generate_tsid(&mut row_group).context(Insert)?;

let request = WriteRequest { row_group: rows };
// Fill default values
fill_default_values(table.clone(), &mut row_group, default_value_map).context(Insert)?;

let num_rows = table
.write(request)
.await
.context(WriteTable)
.context(Insert)?;
let request = WriteRequest { row_group };

Ok(Output::AffectedRows(num_rows))
}
let num_rows = table
.write(request)
.await
.context(WriteTable)
.context(Insert)?;

Ok(num_rows)
}

async fn exec_select_logical_plan(
ctx: Context,
query_plan: QueryPlan,
executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
) -> Result<RecordBatchVec> {
) -> Result<SendableRecordBatchStream> {
let priority = Priority::High;

let query_ctx = ctx
Expand All @@ -216,34 +287,25 @@ async fn exec_select_logical_plan(
})?;

// Execute select physical plan.
let record_batch_stream = executor
let record_batch_stream: SendableRecordBatchStream = executor
.execute(&query_ctx, physical_plan)
.await
.box_err()
.context(ExecuteSelectPlan {
msg: "failed to execute select physical plan",
})?;

let record_batches =
record_batch_stream
.try_collect()
.await
.box_err()
.context(ExecuteSelectPlan {
msg: "failed to collect select execution results",
})?;

Ok(record_batches)
Ok(record_batch_stream)
}

fn convert_records_to_row_group(
record_batches: RecordBatchVec,
column_index_in_insert: Vec<InsertMode>,
record_batches: &[CommonRecordBatch],
column_index_in_insert: &[InsertMode],
schema: Schema,
) -> Result<RowGroup> {
let mut data_rows: Vec<Row> = Vec::new();

for record in &record_batches {
for record in record_batches {
let num_cols = record.num_columns();
let num_rows = record.num_rows();
for row_idx in 0..num_rows {
Expand Down

0 comments on commit c12f071

Please sign in to comment.