From f4b89d870ab405a72b94705f56ba6bff8d0adbca Mon Sep 17 00:00:00 2001 From: luofucong Date: Tue, 27 Dec 2022 16:14:21 +0800 Subject: [PATCH] fix: ease the restriction of the original "SelectExpr" since we used to pass SQLs other than selection in the related GRPC interface --- src/api/greptime/v1/database.proto | 14 +++----------- src/client/src/database.rs | 16 +++++++-------- src/datanode/src/instance/flight.rs | 30 ++++++++++------------------- src/datanode/src/instance/grpc.rs | 12 ++++++------ src/frontend/src/instance.rs | 8 ++++---- 5 files changed, 31 insertions(+), 49 deletions(-) diff --git a/src/api/greptime/v1/database.proto b/src/api/greptime/v1/database.proto index 358b01923f37..bd24087264c4 100644 --- a/src/api/greptime/v1/database.proto +++ b/src/api/greptime/v1/database.proto @@ -18,15 +18,12 @@ message ObjectExpr { ExprHeader header = 1; oneof expr { InsertExpr insert = 2; - SelectExpr select = 3; - UpdateExpr update = 4; - DeleteExpr delete = 5; + QueryRequest query = 3; } } -// TODO(fys): Only support sql now, and will support promql etc in the future -message SelectExpr { - oneof expr { +message QueryRequest { + oneof query { string sql = 1; bytes logical_plan = 2; } @@ -48,11 +45,6 @@ message InsertExpr { uint32 region_number = 5; } -// TODO(jiachun) -message UpdateExpr {} -// TODO(jiachun) -message DeleteExpr {} - message ObjectResult { ResultHeader header = 1; oneof result { diff --git a/src/client/src/database.rs b/src/client/src/database.rs index d1608442d8eb..14c1d5dc28f0 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use api::v1::codec::SelectResult as GrpcSelectResult; use api::v1::column::SemanticType; use api::v1::{ - object_expr, object_result, select_expr, DatabaseRequest, ExprHeader, InsertExpr, - MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, SelectExpr, + object_expr, object_result, query_request, DatabaseRequest, ExprHeader, InsertExpr, + MutateResult as GrpcMutateResult, ObjectExpr, ObjectResult as GrpcObjectResult, QueryRequest, }; use common_error::status_code::StatusCode; use common_grpc::flight::{raw_flight_data_to_message, FlightMessage}; @@ -83,28 +83,28 @@ impl Database { pub async fn select(&self, expr: Select) -> Result { let select_expr = match expr { - Select::Sql(sql) => SelectExpr { - expr: Some(select_expr::Expr::Sql(sql)), + Select::Sql(sql) => QueryRequest { + query: Some(query_request::Query::Sql(sql)), }, }; self.do_select(select_expr).await } pub async fn logical_plan(&self, logical_plan: Vec) -> Result { - let select_expr = SelectExpr { - expr: Some(select_expr::Expr::LogicalPlan(logical_plan)), + let select_expr = QueryRequest { + query: Some(query_request::Query::LogicalPlan(logical_plan)), }; self.do_select(select_expr).await } - async fn do_select(&self, select_expr: SelectExpr) -> Result { + async fn do_select(&self, select_expr: QueryRequest) -> Result { let header = ExprHeader { version: PROTOCOL_VERSION, }; let expr = ObjectExpr { header: Some(header), - expr: Some(object_expr::Expr::Select(select_expr)), + expr: Some(object_expr::Expr::Query(select_expr)), }; let obj_result = self.object(expr).await?; diff --git a/src/datanode/src/instance/flight.rs b/src/datanode/src/instance/flight.rs index 58e01a6d6d56..da22ffd2ac59 100644 --- a/src/datanode/src/instance/flight.rs +++ b/src/datanode/src/instance/flight.rs @@ -17,7 +17,7 @@ mod stream; use std::pin::Pin; use api::v1::object_expr::Expr; -use api::v1::select_expr::Expr as SelectExpr; +use api::v1::query_request::Query; use api::v1::ObjectExpr; use arrow_flight::flight_service_server::FlightService; use arrow_flight::{ @@ -29,8 +29,7 @@ use common_query::Output; use futures::Stream; use prost::Message; use session::context::QueryContext; -use snafu::{ensure, OptionExt, ResultExt}; -use sql::statements::statement::Statement; +use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response, Streaming}; use crate::error::{self, Result}; @@ -81,18 +80,15 @@ impl FlightService for Instance { .expr .context(error::MissingRequiredFieldSnafu { name: "expr" })?; match expr { - Expr::Select(select_expr) => { - let select_expr = select_expr - .expr + Expr::Query(query_request) => { + let query = query_request + .query .context(error::MissingRequiredFieldSnafu { name: "expr" })?; - let stream = self.handle_select_expr(select_expr).await?; + let stream = self.handle_query(query).await?; Ok(Response::new(Box::pin(stream) as TonicStream)) } // TODO(LFC): Implement Insertion Flight interface. Expr::Insert(_) => Err(tonic::Status::unimplemented("Not yet implemented")), - Expr::Update(_) | Expr::Delete(_) => { - Err(tonic::Status::unimplemented("Not yet implemented")) - } } } @@ -134,22 +130,16 @@ impl FlightService for Instance { } impl Instance { - async fn handle_select_expr(&self, select_expr: SelectExpr) -> Result { - let output = match select_expr { - SelectExpr::Sql(sql) => { + async fn handle_query(&self, query: Query) -> Result { + let output = match query { + Query::Sql(sql) => { let stmt = self .query_engine .sql_to_statement(&sql) .context(error::ExecuteSqlSnafu)?; - ensure!( - matches!(stmt, Statement::Query(_)), - error::InvalidSqlSnafu { - msg: format!("expect SQL to be selection, actual: {sql}") - } - ); self.execute_stmt(stmt, QueryContext::arc()).await? } - SelectExpr::LogicalPlan(plan) => self.execute_logical(plan).await?, + Query::LogicalPlan(plan) => self.execute_logical(plan).await?, }; let recordbatch_stream = match output { diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 28534bc492d4..2a52a07e85e0 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -15,7 +15,7 @@ use api::result::{build_err_result, AdminResultBuilder, ObjectResultBuilder}; use api::v1::{ admin_expr, object_expr, AdminExpr, AdminResult, Column, CreateDatabaseExpr, ObjectExpr, - ObjectResult, SelectExpr, + ObjectResult, QueryRequest, }; use arrow_flight::flight_service_server::FlightService; use arrow_flight::Ticket; @@ -105,11 +105,11 @@ impl Instance { } } - async fn handle_select(&self, select_expr: SelectExpr) -> Result { + async fn handle_query_request(&self, query_request: QueryRequest) -> Result { let ticket = Request::new(Ticket { ticket: ObjectExpr { header: None, - expr: Some(object_expr::Expr::Select(select_expr)), + expr: Some(object_expr::Expr::Query(query_request)), } .encode_to_vec(), }); @@ -169,12 +169,12 @@ impl GrpcQueryHandler for Instance { self.handle_insert(catalog_name, schema_name, table_name, insert_batches) .await } - Some(object_expr::Expr::Select(select_expr)) => self - .handle_select(select_expr.clone()) + Some(object_expr::Expr::Query(query_request)) => self + .handle_query_request(query_request.clone()) .await .map_err(BoxedError::new) .context(servers::error::ExecuteQuerySnafu { - query: format!("{select_expr:?}"), + query: format!("{query_request:?}"), })?, other => { return servers::error::NotSupportedSnafu { diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 5ea5c35aa8c5..7b01890b76de 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -721,9 +721,9 @@ mod tests { use api::v1::column::SemanticType; use api::v1::{ - admin_expr, admin_result, column, object_expr, object_result, select_expr, Column, + admin_expr, admin_result, column, object_expr, object_result, query_request, Column, ColumnDataType, ColumnDef as GrpcColumnDef, ExprHeader, FlightDataRaw, MutateResult, - SelectExpr, + QueryRequest, }; use common_grpc::flight::{raw_flight_data_to_message, FlightMessage}; use common_recordbatch::RecordBatch; @@ -930,8 +930,8 @@ mod tests { // select let object_expr = ObjectExpr { header: Some(ExprHeader::default()), - expr: Some(object_expr::Expr::Select(SelectExpr { - expr: Some(select_expr::Expr::Sql("select * from demo".to_string())), + expr: Some(Expr::Query(QueryRequest { + query: Some(query_request::Query::Sql("select * from demo".to_string())), })), }; let result = GrpcQueryHandler::do_query(&*instance, object_expr)