Skip to content

Commit

Permalink
fix: resolve PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield committed Mar 22, 2023
1 parent dba3a83 commit e1efc8a
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/datanode/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Instance {
.await
.context(PlanStatementSnafu)?;
self.query_engine
.execute(plan, ctx.clone())
.execute(plan, ctx)
.await
.context(ExecuteLogicalPlanSnafu)
}
Expand Down
9 changes: 3 additions & 6 deletions src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,9 @@ impl Instance {
) -> Result<Output> {
match stmt {
QueryStatement::Sql(Statement::Insert(insert)) => {
let request = SqlHandler::insert_to_request(
self.catalog_manager.clone(),
*insert,
query_ctx.clone(),
)
.await?;
let request =
SqlHandler::insert_to_request(self.catalog_manager.clone(), *insert, query_ctx)
.await?;
self.sql_handler.insert(request).await
}
QueryStatement::Sql(Statement::Delete(delete)) => {
Expand Down
103 changes: 90 additions & 13 deletions src/datanode/src/tests/instance_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use common_telemetry::logging;
use datatypes::data_type::ConcreteDataType;
use datatypes::vectors::{Int64Vector, StringVector, UInt64Vector, VectorRef};
use query::parser::{QueryLanguageParser, QueryStatement};
use session::context::QueryContext;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
use sql::statements::statement::Statement;

use crate::error::{ExecuteLogicalPlanSnafu, PlanStatementSnafu};
use crate::error::{Error, ExecuteLogicalPlanSnafu, PlanStatementSnafu};
use crate::tests::test_util::{self, check_output_stream, setup_test_instance, MockInstance};

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -187,6 +187,67 @@ async fn test_execute_insert() {
assert!(matches!(output, Output::AffectedRows(2)));
}

#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert_by_select() {
let instance = setup_test_instance("test_execute_insert_by_select").await;

// create table
execute_sql(
&instance,
"create table demo1(host string, cpu double, memory double, ts timestamp time index);",
)
.await;
execute_sql(
&instance,
"create table demo2(host string, cpu double, memory double, ts timestamp time index);",
)
.await;

let output = execute_sql(
&instance,
r#"insert into demo1(host, cpu, memory, ts) values
('host1', 66.6, 1024, 1655276557000),
('host2', 88.8, 333.3, 1655276558000)
"#,
)
.await;
assert!(matches!(output, Output::AffectedRows(2)));

assert!(matches!(
try_execute_sql(&instance, "insert into demo2(host) select * from demo1")
.await
.unwrap_err(),
Error::PlanStatement { .. }
));
assert!(matches!(
try_execute_sql(&instance, "insert into demo2 select cpu,memory from demo1")
.await
.unwrap_err(),
Error::PlanStatement { .. }
));

assert!(matches!(
try_execute_sql(&instance, "insert into demo2(ts) select memory from demo1")
.await
.unwrap_err(),
Error::PlanStatement { .. }
));

let output = execute_sql(&instance, "insert into demo2 select * from demo1").await;
assert!(matches!(output, Output::AffectedRows(2)));

let output = execute_sql(&instance, "select * from demo2 order by ts").await;
let expected = "\
+-------+------+--------+---------------------+
| host | cpu | memory | ts |
+-------+------+--------+---------------------+
| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 |
| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 |
+-------+------+--------+---------------------+"
.to_string();
check_output_stream(output, expected).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_execute_insert_query_with_i64_timestamp() {
let instance = MockInstance::new("insert_query_i64_timestamp").await;
Expand Down Expand Up @@ -887,26 +948,42 @@ async fn execute_sql(instance: &MockInstance, sql: &str) -> Output {
execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
}

async fn try_execute_sql(
instance: &MockInstance,
sql: &str,
) -> Result<Output, crate::error::Error> {
try_execute_sql_in_db(instance, sql, DEFAULT_SCHEMA_NAME).await
}

async fn try_execute_sql_in_db(
instance: &MockInstance,
sql: &str,
db: &str,
) -> Result<Output, crate::error::Error> {
let query_ctx = Arc::new(QueryContext::with(DEFAULT_CATALOG_NAME, db));

async fn plan_exec(
instance: &MockInstance,
stmt: QueryStatement,
query_ctx: QueryContextRef,
) -> Result<Output, Error> {
let engine = instance.inner().query_engine();
let plan = engine
.planner()
.plan(stmt, query_ctx.clone())
.await
.context(PlanStatementSnafu)?;
engine
.execute(plan, query_ctx)
.await
.context(ExecuteLogicalPlanSnafu)
}

let stmt = QueryLanguageParser::parse_sql(sql).unwrap();
match stmt {
QueryStatement::Sql(Statement::Query(_)) => {
let engine = instance.inner().query_engine();
let plan = engine
.planner()
.plan(stmt, query_ctx.clone())
.await
.context(PlanStatementSnafu)?;
engine
.execute(plan, query_ctx)
.await
.context(ExecuteLogicalPlanSnafu)
QueryStatement::Sql(Statement::Query(_)) => plan_exec(instance, stmt, query_ctx).await,
QueryStatement::Sql(Statement::Insert(ref insert)) if insert.is_insert_select() => {
plan_exec(instance, stmt, query_ctx).await
}
_ => instance.inner().execute_stmt(stmt, query_ctx).await,
}
Expand Down
7 changes: 5 additions & 2 deletions src/query/src/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter;
pub use crate::datafusion::planner::DfContextProviderAdapter;
use crate::error::{
CatalogNotFoundSnafu, CatalogSnafu, CreateRecordBatchSnafu, DataFusionSnafu,
QueryExecutionSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu,
QueryExecutionSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu, UnsupportedExprSnafu,
};
use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
Expand Down Expand Up @@ -165,7 +165,10 @@ impl QueryEngine for DatafusionQueryEngine {
match plan {
LogicalPlan::DfPlan(DfLogicalPlan::Dml(dml)) => match dml.op {
WriteOp::Insert => self.exec_insert_plan(&dml, query_ctx).await,
_ => unimplemented!(),
_ => UnsupportedExprSnafu {
name: format!("DML op {}", dml.op),
}
.fail(),
},
_ => self.exec_query_plan(plan).await,
}
Expand Down

0 comments on commit e1efc8a

Please sign in to comment.