From c5825cc716eca1fc66db8c4faca9a7549a3f119e Mon Sep 17 00:00:00 2001 From: MianChen <283559115@qq.com> Date: Thu, 18 Jul 2024 01:40:30 -0500 Subject: [PATCH] refactor: insert select to stream mode (#1544) ## Rationale Close #1542 ## Detailed Changes Do select and insert procedure in stream way. ## Test Plan CI test. --------- Co-authored-by: jiacai2050 --- .../env/local/dml/insert_into_select.result | 10 +- .../env/local/dml/insert_into_select.sql | 4 +- src/interpreters/Cargo.toml | 1 + src/interpreters/src/insert.rs | 170 ++++++++++++++---- 4 files changed, 143 insertions(+), 42 deletions(-) diff --git a/integration_tests/cases/env/local/dml/insert_into_select.result b/integration_tests/cases/env/local/dml/insert_into_select.result index 77b648a269..93fc82567c 100644 --- a/integration_tests/cases/env/local/dml/insert_into_select.result +++ b/integration_tests/cases/env/local/dml/insert_into_select.result @@ -35,9 +35,11 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`) VALUES (1, 100, "s1"), (2, 200, "s2"), - (3, 300, "s3"); + (3, 300, "s3"), + (4, 400, "s4"), + (5, 500, "s5"); -affected_rows: 3 +affected_rows: 5 DROP TABLE IF EXISTS `insert_into_select_table2`; @@ -58,7 +60,7 @@ INSERT INTO `insert_into_select_table2` (`timestamp`, `value`) SELECT `timestamp`, `value` FROM `insert_into_select_table1`; -affected_rows: 3 +affected_rows: 5 SELECT `timestamp`, `value`, `name` FROM `insert_into_select_table2`; @@ -67,6 +69,8 @@ timestamp,value,name, Timestamp(1),Int32(100),String(""), Timestamp(2),Int32(200),String(""), Timestamp(3),Int32(300),String(""), +Timestamp(4),Int32(400),String(""), +Timestamp(5),Int32(500),String(""), DROP TABLE `insert_into_select_table1`; diff --git a/integration_tests/cases/env/local/dml/insert_into_select.sql b/integration_tests/cases/env/local/dml/insert_into_select.sql index dbe157609d..1a0d4a1da0 100644 --- a/integration_tests/cases/env/local/dml/insert_into_select.sql +++ b/integration_tests/cases/env/local/dml/insert_into_select.sql @@ -32,7 +32,9 @@ INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`) VALUES (1, 100, "s1"), (2, 200, "s2"), - (3, 300, "s3"); + (3, 300, "s3"), + (4, 400, "s4"), + (5, 500, "s5"); DROP TABLE IF EXISTS `insert_into_select_table2`; diff --git a/src/interpreters/Cargo.toml b/src/interpreters/Cargo.toml index 6f1513fb9b..d237d5b870 100644 --- a/src/interpreters/Cargo.toml +++ b/src/interpreters/Cargo.toml @@ -54,6 +54,7 @@ regex = { workspace = true } runtime = { workspace = true } snafu = { workspace = true } table_engine = { workspace = true } +tokio = { workspace = true } [dev-dependencies] analytic_engine = { workspace = true, features = ["test"] } diff --git a/src/interpreters/src/insert.rs b/src/interpreters/src/insert.rs index d6307bf125..5d9e254f1b 100644 --- a/src/interpreters/src/insert.rs +++ b/src/interpreters/src/insert.rs @@ -30,6 +30,7 @@ use common_types::{ column_block::{ColumnBlock, ColumnBlockBuilder}, column_schema::ColumnId, datum::Datum, + record_batch::RecordBatch as CommonRecordBatch, row::{Row, RowBuilder, RowGroup}, schema::Schema, }; @@ -54,12 +55,15 @@ use query_frontend::{ }; use runtime::Priority; use snafu::{ensure, OptionExt, ResultExt, Snafu}; -use table_engine::table::{TableRef, WriteRequest}; +use table_engine::{ + stream::SendableRecordBatchStream, + table::{TableRef, WriteRequest}, +}; +use tokio::sync::mpsc; use crate::{ context::Context, interpreter::{Insert, Interpreter, InterpreterPtr, Output, Result as InterpreterResult}, - RecordBatchVec, }; #[derive(Debug, Snafu)] @@ -115,10 +119,23 @@ pub enum Error { #[snafu(display("Record columns not enough, len:{}, index:{}", len, index))] RecordColumnsNotEnough { len: usize, index: usize }, + + #[snafu(display("Failed to do select, err:{}", source))] + Select { source: table_engine::stream::Error }, + + #[snafu(display("Failed to send msg in channel, err:{}", msg))] + MsgChannel { msg: String }, + + #[snafu(display("Failed to join async task, err:{}", msg))] + AsyncTask { msg: String }, } define_result!(Error); +// TODO: make those configurable +const INSERT_SELECT_ROW_BATCH_NUM: usize = 1000; +const INSERT_SELECT_PENDING_BATCH_NUM: usize = 3; + pub struct InsertInterpreter { ctx: Context, plan: InsertPlan, @@ -152,14 +169,18 @@ impl Interpreter for InsertInterpreter { default_value_map, } = self.plan; - let mut rows = match source { - InsertSource::Values { row_group } => row_group, + match source { + InsertSource::Values { row_group: rows } => { + let num_rows = + prepare_and_write_table(table.clone(), rows, &default_value_map).await?; + + Ok(Output::AffectedRows(num_rows)) + } InsertSource::Select { query: query_plan, column_index_in_insert, } => { - // TODO: support streaming insert - let record_batches = exec_select_logical_plan( + let mut record_batches_stream = exec_select_logical_plan( self.ctx, query_plan, self.executor, @@ -168,30 +189,112 @@ impl Interpreter for InsertInterpreter { .await .context(Insert)?; - if record_batches.is_empty() { - return Ok(Output::AffectedRows(0)); - } + let (tx, rx) = mpsc::channel(INSERT_SELECT_PENDING_BATCH_NUM); + let producer = tokio::spawn(async move { + while let Some(record_batch) = record_batches_stream + .try_next() + .await + .context(Select) + .context(Insert)? + { + if record_batch.is_empty() { + continue; + } + if let Err(e) = tx.send(record_batch).await { + return Err(Error::MsgChannel { + msg: format!("{}", e), + }) + .context(Insert)?; + } + } + Ok(()) + }); + + let consumer = tokio::spawn(async move { + let mut rx = rx; + let mut result_rows = 0; + let mut pending_rows = 0; + let mut record_batches = Vec::new(); + while let Some(record_batch) = rx.recv().await { + pending_rows += record_batch.num_rows(); + record_batches.push(record_batch); + if pending_rows >= INSERT_SELECT_ROW_BATCH_NUM { + pending_rows = 0; + let num_rows = write_record_batches( + &mut record_batches, + column_index_in_insert.as_slice(), + table.clone(), + &default_value_map, + ) + .await?; + result_rows += num_rows; + } + } - convert_records_to_row_group(record_batches, column_index_in_insert, table.schema()) - .context(Insert)? + if !record_batches.is_empty() { + let num_rows = write_record_batches( + &mut record_batches, + column_index_in_insert.as_slice(), + table, + &default_value_map, + ) + .await?; + result_rows += num_rows; + } + Ok(result_rows) + }); + + match tokio::try_join!(producer, consumer) { + Ok((select_res, write_rows)) => { + select_res?; + Ok(Output::AffectedRows(write_rows?)) + } + Err(e) => Err(Error::AsyncTask { + msg: format!("{}", e), + }) + .context(Insert)?, + } } - }; + } + } +} - maybe_generate_tsid(&mut rows).context(Insert)?; +async fn write_record_batches( + record_batches: &mut Vec, + column_index_in_insert: &[InsertMode], + table: TableRef, + default_value_map: &BTreeMap, +) -> InterpreterResult { + let row_group = convert_records_to_row_group( + record_batches.as_slice(), + column_index_in_insert, + table.schema(), + ) + .context(Insert)?; + record_batches.clear(); + + prepare_and_write_table(table, row_group, default_value_map).await +} - // Fill default values - fill_default_values(table.clone(), &mut rows, &default_value_map).context(Insert)?; +async fn prepare_and_write_table( + table: TableRef, + mut row_group: RowGroup, + default_value_map: &BTreeMap, +) -> InterpreterResult { + maybe_generate_tsid(&mut row_group).context(Insert)?; - let request = WriteRequest { row_group: rows }; + // Fill default values + fill_default_values(table.clone(), &mut row_group, default_value_map).context(Insert)?; - let num_rows = table - .write(request) - .await - .context(WriteTable) - .context(Insert)?; + let request = WriteRequest { row_group }; - Ok(Output::AffectedRows(num_rows)) - } + let num_rows = table + .write(request) + .await + .context(WriteTable) + .context(Insert)?; + + Ok(num_rows) } async fn exec_select_logical_plan( @@ -199,7 +302,7 @@ async fn exec_select_logical_plan( query_plan: QueryPlan, executor: ExecutorRef, physical_planner: PhysicalPlannerRef, -) -> Result { +) -> Result { let priority = Priority::High; let query_ctx = ctx @@ -216,7 +319,7 @@ async fn exec_select_logical_plan( })?; // Execute select physical plan. - let record_batch_stream = executor + let record_batch_stream: SendableRecordBatchStream = executor .execute(&query_ctx, physical_plan) .await .box_err() @@ -224,26 +327,17 @@ async fn exec_select_logical_plan( msg: "failed to execute select physical plan", })?; - let record_batches = - record_batch_stream - .try_collect() - .await - .box_err() - .context(ExecuteSelectPlan { - msg: "failed to collect select execution results", - })?; - - Ok(record_batches) + Ok(record_batch_stream) } fn convert_records_to_row_group( - record_batches: RecordBatchVec, - column_index_in_insert: Vec, + record_batches: &[CommonRecordBatch], + column_index_in_insert: &[InsertMode], schema: Schema, ) -> Result { let mut data_rows: Vec = Vec::new(); - for record in &record_batches { + for record in record_batches { let num_cols = record.num_columns(); let num_rows = record.num_rows(); for row_idx in 0..num_rows {