diff --git a/src/interpreters/src/insert.rs b/src/interpreters/src/insert.rs index d6307bf125..3755b7a8eb 100644 --- a/src/interpreters/src/insert.rs +++ b/src/interpreters/src/insert.rs @@ -30,6 +30,7 @@ use common_types::{ column_block::{ColumnBlock, ColumnBlockBuilder}, column_schema::ColumnId, datum::Datum, + record_batch::RecordBatch as CommonRecordBatch, row::{Row, RowBuilder, RowGroup}, schema::Schema, }; @@ -43,9 +44,10 @@ use datafusion::{ }, }; use df_operator::visitor::find_columns_by_expr; -use futures::TryStreamExt; +use futures::StreamExt; use generic_error::{BoxError, GenericError}; use hash_ext::hash64; +use logger::error; use macros::define_result; use query_engine::{executor::ExecutorRef, physical_planner::PhysicalPlannerRef}; use query_frontend::{ @@ -54,12 +56,14 @@ use query_frontend::{ }; use runtime::Priority; use snafu::{ensure, OptionExt, ResultExt, Snafu}; -use table_engine::table::{TableRef, WriteRequest}; +use table_engine::{ + stream::SendableRecordBatchStream, + table::{TableRef, WriteRequest}, +}; use crate::{ context::Context, interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as InterpreterResult}, - RecordBatchVec, }; #[derive(Debug, Snafu)] @@ -115,6 +119,9 @@ pub enum Error { #[snafu(display("Record columns not enough, len:{}, index:{}", len, index))] RecordColumnsNotEnough { len: usize, index: usize }, + + #[snafu(display("Failed to do select, err:{}", source))] + Select { source: table_engine::stream::Error }, } define_result!(Error); @@ -152,14 +159,18 @@ impl Interpreter for InsertInterpreter { default_value_map, } = self.plan; - let mut rows = match source { - InsertSource::Values { row_group } => row_group, + match source { + InsertSource::Values { row_group: rows } => { + let num_rows = + prepare_and_write_table(table.clone(), rows, &default_value_map).await?; + + Ok(Output::AffectedRows(num_rows)) + } InsertSource::Select { query: query_plan, column_index_in_insert, } => { - // TODO: support streaming insert - let record_batches = exec_select_logical_plan( + let mut record_batches_stream = exec_select_logical_plan( self.ctx, query_plan, self.executor, @@ -168,30 +179,90 @@ impl Interpreter for InsertInterpreter { .await .context(Insert)?; - if record_batches.is_empty() { - return Ok(Output::AffectedRows(0)); + // accumulate 8 record batches before writing to table and this param + // can be adjusted according to the performance test result. + let max_batch_size = 8; + let mut result_rows = 0; + let mut record_batches = Vec::new(); + while let Some(response) = record_batches_stream.next().await { + match response { + Ok(record_batch) => { + if record_batch.is_empty() { + continue; + } + record_batches.push(record_batch); + } + Err(e) => { + error!("Failed to get record batch, err:{}", e); + return Err(e).context(Select).context(Insert)?; + } + } + + if record_batches.len() >= max_batch_size { + let num_rows = write_record_batches( + &mut record_batches, + column_index_in_insert.as_slice(), + table.clone(), + &default_value_map, + ) + .await?; + result_rows += num_rows; + } + } + + if !record_batches.is_empty() { + let num_rows = write_record_batches( + &mut record_batches, + column_index_in_insert.as_slice(), + table, + &default_value_map, + ) + .await?; + result_rows += num_rows; } - convert_records_to_row_group(record_batches, column_index_in_insert, table.schema()) - .context(Insert)? + Ok(Output::AffectedRows(result_rows)) } - }; + } + } +} - maybe_generate_tsid(&mut rows).context(Insert)?; +async fn write_record_batches( + record_batches: &mut Vec, + column_index_in_insert: &[InsertMode], + table: TableRef, + default_value_map: &BTreeMap, +) -> InterpreterResult { + let row_group = convert_records_to_row_group( + record_batches.as_slice(), + column_index_in_insert, + table.schema(), + ) + .context(Insert)?; + record_batches.clear(); + + prepare_and_write_table(table, row_group, default_value_map).await +} - // Fill default values - fill_default_values(table.clone(), &mut rows, &default_value_map).context(Insert)?; +async fn prepare_and_write_table( + table: TableRef, + mut row_group: RowGroup, + default_value_map: &BTreeMap, +) -> InterpreterResult { + maybe_generate_tsid(&mut row_group).context(Insert)?; - let request = WriteRequest { row_group: rows }; + // Fill default values + fill_default_values(table.clone(), &mut row_group, default_value_map).context(Insert)?; - let num_rows = table - .write(request) - .await - .context(WriteTable) - .context(Insert)?; + let request = WriteRequest { row_group }; - Ok(Output::AffectedRows(num_rows)) - } + let num_rows = table + .write(request) + .await + .context(WriteTable) + .context(Insert)?; + + Ok(num_rows) } async fn exec_select_logical_plan( @@ -199,7 +270,7 @@ async fn exec_select_logical_plan( query_plan: QueryPlan, executor: ExecutorRef, physical_planner: PhysicalPlannerRef, -) -> Result { +) -> Result { let priority = Priority::High; let query_ctx = ctx @@ -216,7 +287,7 @@ async fn exec_select_logical_plan( })?; // Execute select physical plan. - let record_batch_stream = executor + let record_batch_stream: SendableRecordBatchStream = executor .execute(&query_ctx, physical_plan) .await .box_err() @@ -224,26 +295,17 @@ async fn exec_select_logical_plan( msg: "failed to execute select physical plan", })?; - let record_batches = - record_batch_stream - .try_collect() - .await - .box_err() - .context(ExecuteSelectPlan { - msg: "failed to collect select execution results", - })?; - - Ok(record_batches) + Ok(record_batch_stream) } fn convert_records_to_row_group( - record_batches: RecordBatchVec, - column_index_in_insert: Vec, + record_batches: &[CommonRecordBatch], + column_index_in_insert: &[InsertMode], schema: Schema, ) -> Result { let mut data_rows: Vec = Vec::new(); - for record in &record_batches { + for record in record_batches { let num_cols = record.num_columns(); let num_rows = record.num_rows(); for row_idx in 0..num_rows {