diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index c0db85bf8d9c..0397848d6a7b 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -73,7 +73,7 @@ impl Instance { .await .context(PlanStatementSnafu)?; self.query_engine - .execute(plan, ctx.clone()) + .execute(plan, ctx) .await .context(ExecuteLogicalPlanSnafu) } diff --git a/src/datanode/src/instance/sql.rs b/src/datanode/src/instance/sql.rs index 436aae411b68..e757a3ff42c4 100644 --- a/src/datanode/src/instance/sql.rs +++ b/src/datanode/src/instance/sql.rs @@ -48,12 +48,9 @@ impl Instance { ) -> Result { match stmt { QueryStatement::Sql(Statement::Insert(insert)) => { - let request = SqlHandler::insert_to_request( - self.catalog_manager.clone(), - *insert, - query_ctx.clone(), - ) - .await?; + let request = + SqlHandler::insert_to_request(self.catalog_manager.clone(), *insert, query_ctx) + .await?; self.sql_handler.insert(request).await } QueryStatement::Sql(Statement::Delete(delete)) => { diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 378483e396e4..d0627c164626 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -22,11 +22,11 @@ use common_telemetry::logging; use datatypes::data_type::ConcreteDataType; use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef}; use query::parser::{QueryLanguageParser, QueryStatement}; -use session::context::QueryContext; +use session::context::{QueryContext, QueryContextRef}; use snafu::ResultExt; use sql::statements::statement::Statement; -use crate::error::{ExecuteLogicalPlanSnafu, PlanStatementSnafu}; +use crate::error::{Error, ExecuteLogicalPlanSnafu, PlanStatementSnafu}; use crate::tests::test_util::{self, check_output_stream, setup_test_instance, MockInstance}; #[tokio::test(flavor = "multi_thread")] @@ -187,6 +187,67 @@ async fn test_execute_insert() { assert!(matches!(output, Output::AffectedRows(2))); } +#[tokio::test(flavor = "multi_thread")] +async fn test_execute_insert_by_select() { + let instance = setup_test_instance("test_execute_insert_by_select").await; + + // create table + execute_sql( + &instance, + "create table demo1(host string, cpu double, memory double, ts timestamp time index);", + ) + .await; + execute_sql( + &instance, + "create table demo2(host string, cpu double, memory double, ts timestamp time index);", + ) + .await; + + let output = execute_sql( + &instance, + r#"insert into demo1(host, cpu, memory, ts) values + ('host1', 66.6, 1024, 1655276557000), + ('host2', 88.8, 333.3, 1655276558000) + "#, + ) + .await; + assert!(matches!(output, Output::AffectedRows(2))); + + assert!(matches!( + try_execute_sql(&instance, "insert into demo2(host) select * from demo1") + .await + .unwrap_err(), + Error::PlanStatement { .. } + )); + assert!(matches!( + try_execute_sql(&instance, "insert into demo2 select cpu,memory from demo1") + .await + .unwrap_err(), + Error::PlanStatement { .. } + )); + + assert!(matches!( + try_execute_sql(&instance, "insert into demo2(ts) select memory from demo1") + .await + .unwrap_err(), + Error::PlanStatement { .. } + )); + + let output = execute_sql(&instance, "insert into demo2 select * from demo1").await; + assert!(matches!(output, Output::AffectedRows(2))); + + let output = execute_sql(&instance, "select * from demo2 order by ts").await; + let expected = "\ ++-------+------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+------+--------+---------------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | ++-------+------+--------+---------------------+" + .to_string(); + check_output_stream(output, expected).await; +} + #[tokio::test(flavor = "multi_thread")] async fn test_execute_insert_query_with_i64_timestamp() { let instance = MockInstance::new("insert_query_i64_timestamp").await; @@ -887,6 +948,13 @@ async fn execute_sql(instance: &MockInstance, sql: &str) -> Output { execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await } +async fn try_execute_sql( + instance: &MockInstance, + sql: &str, +) -> Result { + try_execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await +} + async fn try_execute_sql_in_db( instance: &MockInstance, sql: &str, @@ -894,19 +962,28 @@ async fn try_execute_sql_in_db( ) -> Result { let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db)); + async fn plan_exec( + instance: &MockInstance, + stmt: QueryStatement, + query_ctx: QueryContextRef, + ) -> Result { + let engine = instance.inner().query_engine(); + let plan = engine + .planner() + .plan(stmt, query_ctx.clone()) + .await + .context(PlanStatementSnafu)?; + engine + .execute(plan, query_ctx) + .await + .context(ExecuteLogicalPlanSnafu) + } + let stmt = QueryLanguageParser::parse_sql(sql).unwrap(); match stmt { - QueryStatement::Sql(Statement::Query(_)) => { - let engine = instance.inner().query_engine(); - let plan = engine - .planner() - .plan(stmt, query_ctx.clone()) - .await - .context(PlanStatementSnafu)?; - engine - .execute(plan, query_ctx) - .await - .context(ExecuteLogicalPlanSnafu) + QueryStatement::Sql(Statement::Query(_)) => plan_exec(instance, stmt, query_ctx).await, + QueryStatement::Sql(Statement::Insert(ref insert)) if insert.is_insert_select() => { + plan_exec(instance, stmt, query_ctx).await } _ => instance.inner().execute_stmt(stmt, query_ctx).await, } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index f10484b87059..498a104959de 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -46,7 +46,7 @@ pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter; pub use crate::datafusion::planner::DfContextProviderAdapter; use crate::error::{ CatalogNotFoundSnafu, CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu, - QueryExecutionSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu, + QueryExecutionSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu, UnsupportedExprSnafu, }; use crate::executor::QueryExecutor; use crate::logical_optimizer::LogicalOptimizer; @@ -165,7 +165,10 @@ impl QueryEngine for DatafusionQueryEngine { match plan { LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => match dml.op { WriteOp::Insert => self.exec_insert_plan(&dml, query_ctx).await, - _ => unimplemented!(), + _ => UnsupportedExprSnafu { + name: format!("DML op {}", dml.op), + } + .fail(), }, _ => self.exec_query_plan(plan).await, }