diff --git a/src/components/fb_util/build.rs b/src/components/fb_util/build.rs index 07ce4a4be8..63fd616af3 100644 --- a/src/components/fb_util/build.rs +++ b/src/components/fb_util/build.rs @@ -42,18 +42,18 @@ fn main() -> Result<()> { tonic_build::manual::Method::builder() .name("write") .route_name("Write") - .input_type("crate::common::FlatBufferBytes") - .output_type("crate::common::FlatBufferBytes") - .codec_path("crate::common::FlatBufferCodec") + .input_type("crate::remote_service::FlatBufferBytes") + .output_type("crate::remote_service::FlatBufferBytes") + .codec_path("crate::remote_service::FlatBufferCodec") .build(), ) .method( tonic_build::manual::Method::builder() .name("write_batch") .route_name("WriteBatch") - .input_type("crate::common::FlatBufferBytes") - .output_type("crate::common::FlatBufferBytes") - .codec_path("crate::common::FlatBufferCodec") + .input_type("crate::remote_service::FlatBufferBytes") + .output_type("crate::remote_service::FlatBufferBytes") + .codec_path("crate::remote_service::FlatBufferCodec") .build(), ) .build(); diff --git a/src/components/fb_util/src/lib.rs b/src/components/fb_util/src/lib.rs index d9a6fc9c2d..39e2c47d71 100644 --- a/src/components/fb_util/src/lib.rs +++ b/src/components/fb_util/src/lib.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -pub mod common; +pub mod remote_service; #[allow(clippy::all)] #[rustfmt::skip] #[allow(non_snake_case)] diff --git a/src/components/fb_util/src/common.rs b/src/components/fb_util/src/remote_service.rs similarity index 97% rename from src/components/fb_util/src/common.rs rename to src/components/fb_util/src/remote_service.rs index 3616b3fd7f..77eece3732 100644 --- a/src/components/fb_util/src/common.rs +++ b/src/components/fb_util/src/remote_service.rs @@ -81,7 +81,7 @@ impl Decoder for FlatBufferDecoder { } } -/// A [`Codec`] that implements `application/grpc+json` via the serde library. +/// A [`Codec`] that implements the FlatBuffers serialization format. #[derive(Debug, Clone, Default)] pub struct FlatBufferCodec(); diff --git a/src/remote_engine_client/src/client.rs b/src/remote_engine_client/src/client.rs index d823a6fa74..f571cdc674 100644 --- a/src/remote_engine_client/src/client.rs +++ b/src/remote_engine_client/src/client.rs @@ -168,7 +168,7 @@ impl Client { endpoint, table_idents: vec![table_ident.clone()], code: header.code(), - msg: header.error().unwrap().to_string(), + msg: header.error().unwrap_or("").to_string(), } .fail() } else { @@ -189,8 +189,7 @@ impl Client { // Find the channels from router firstly. let mut write_batch_contexts_by_endpoint = HashMap::new(); for request in requests { - let route_context: crate::cached_router::RouteContext = - self.cached_router.route(&request.table).await?; + let route_context = self.cached_router.route(&request.table).await?; let write_batch_context = write_batch_contexts_by_endpoint .entry(route_context.endpoint) .or_insert(WriteBatchContext { @@ -255,7 +254,7 @@ impl Client { endpoint, table_idents: table_idents.clone(), code: header.code(), - msg: header.error().unwrap().to_string(), + msg: header.error().unwrap_or("").to_string(), } .fail() .box_err() diff --git a/src/server/src/grpc/mod.rs b/src/server/src/grpc/mod.rs index 1e41899dca..a5da2ec20c 100644 --- a/src/server/src/grpc/mod.rs +++ b/src/server/src/grpc/mod.rs @@ -166,7 +166,7 @@ pub struct RpcServices { rpc_server: InterceptedService, AuthWithFile>, meta_rpc_server: Option>, remote_engine_server: RemoteEngineServiceServer, - remote_engine_server_ext: RemoteEngineFbServiceServer, + remote_engine_fb_server: RemoteEngineFbServiceServer, runtime: Arc, stop_tx: Option>, join_handle: Option>, @@ -177,7 +177,7 @@ impl RpcServices { let rpc_server = self.rpc_server.clone(); let meta_rpc_server = self.meta_rpc_server.clone(); let remote_engine_server = self.remote_engine_server.clone(); - let remote_engine_server_ext = self.remote_engine_server_ext.clone(); + let remote_engine_fb_server = self.remote_engine_fb_server.clone(); let serve_addr = self.serve_addr; let (stop_tx, stop_rx) = oneshot::channel(); let join_handle = self.runtime.spawn(async move { @@ -192,7 +192,7 @@ impl RpcServices { info!("Grpc server serves remote engine rpc service"); router = router.add_service(remote_engine_server); - router = router.add_service(remote_engine_server_ext); + router = router.add_service(remote_engine_fb_server); router .serve_with_shutdown(serve_addr, stop_rx.map(drop)) @@ -343,7 +343,7 @@ impl Builder { hotspot_recorder, }; - let remote_engine_server_ext = RemoteEngineFbServiceServer::new(service_ext); + let remote_engine_fb_server = RemoteEngineFbServiceServer::new(service_ext); let runtime = runtimes.default_runtime.clone(); @@ -361,7 +361,7 @@ impl Builder { rpc_server, meta_rpc_server, remote_engine_server, - remote_engine_server_ext, + remote_engine_fb_server, runtime, stop_tx: None, join_handle: None, diff --git a/src/server/src/grpc/remote_engine_service/mod.rs b/src/server/src/grpc/remote_engine_service/mod.rs index 3533c88b59..68004eaa8d 100644 --- a/src/server/src/grpc/remote_engine_service/mod.rs +++ b/src/server/src/grpc/remote_engine_service/mod.rs @@ -30,13 +30,13 @@ use async_trait::async_trait; use catalog::{manager::ManagerRef, schema::SchemaRef}; use common_types::{record_batch::RecordBatch, request_id::RequestId}; use fb_util::{ - common::FlatBufferBytes, remote_engine_fb_service_server::RemoteEngineFbService, remote_engine_generated::fbprotocol::{ ContiguousRows as FBContiguousRows, ResponseHeader, ResponseHeaderArgs, WriteBatchRequest as FBWriteBatchRequest, WriteRequest as FBWriteRequest, WriteResponse as FBWriteResponse, WriteResponseArgs as FBWriteResponseArgs, }, + remote_service::FlatBufferBytes, }; use futures::{ stream::{self, BoxStream, FuturesUnordered, StreamExt}, @@ -101,6 +101,11 @@ mod metrics; const STREAM_QUERY_CHANNEL_LEN: usize = 200; const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024; +enum RequestType { + Protobuf, + Flatbuffer, +} + enum TonicWriteBatchRequestExt { Proto(Request), Flatbuffer(tonic::Request), @@ -608,7 +613,7 @@ impl RemoteEngineServiceImpl { ) -> std::result::Result { let begin_instant = Instant::now(); let ctx = self.handler_ctx(); - let (handle, is_flatbuffer) = match request { + let (handle, request_type) = match request { TonicWriteRequestExt::Proto(v) => ( self.runtimes.write_runtime.spawn(async move { let request = WriteRequestExt::Proto(v.into_inner()); @@ -618,7 +623,7 @@ impl RemoteEngineServiceImpl { e }) }), - false, + RequestType::Protobuf, ), TonicWriteRequestExt::Flatbuffer(v) => ( self.runtimes.write_runtime.spawn(async move { @@ -631,7 +636,7 @@ impl RemoteEngineServiceImpl { e }) }), - true, + RequestType::Flatbuffer, ), }; @@ -655,11 +660,11 @@ impl RemoteEngineServiceImpl { .write .observe(begin_instant.saturating_elapsed().as_secs_f64()); - match is_flatbuffer { - true => Ok(TonicWriteResponseExt::Flatbuffer(Response::new( + match request_type { + RequestType::Flatbuffer => Ok(TonicWriteResponseExt::Flatbuffer(Response::new( build_fb_write_response(resp), ))), - false => Ok(TonicWriteResponseExt::Proto(Response::new(resp))), + RequestType::Protobuf => Ok(TonicWriteResponseExt::Proto(Response::new(resp))), } } @@ -705,7 +710,7 @@ impl RemoteEngineServiceImpl { ) -> std::result::Result { let begin_instant = Instant::now(); - let (write_table_handles, is_flatbuffer) = match request { + let (write_table_handles, request_type) = match request { TonicWriteBatchRequestExt::Proto(v) => { let request = v.into_inner(); let mut write_table_handles = Vec::with_capacity(request.batch.len()); @@ -717,7 +722,7 @@ impl RemoteEngineServiceImpl { .spawn(handle_write(ctx, WriteRequestExt::Proto(one_request))); write_table_handles.push(handle); } - (write_table_handles, false) + (write_table_handles, RequestType::Protobuf) } TonicWriteBatchRequestExt::Flatbuffer(v) => { let request = Arc::new(v.into_inner()); @@ -739,7 +744,7 @@ impl RemoteEngineServiceImpl { }); write_table_handles.push(handle); } - (write_table_handles, true) + (write_table_handles, RequestType::Flatbuffer) } }; @@ -776,11 +781,11 @@ impl RemoteEngineServiceImpl { .write_batch .observe(begin_instant.saturating_elapsed().as_secs_f64()); - match is_flatbuffer { - true => Ok(TonicWriteResponseExt::Flatbuffer(Response::new( + match request_type { + RequestType::Flatbuffer => Ok(TonicWriteResponseExt::Flatbuffer(Response::new( build_fb_write_response(batch_resp), ))), - false => Ok(TonicWriteResponseExt::Proto(Response::new(batch_resp))), + RequestType::Protobuf => Ok(TonicWriteResponseExt::Proto(Response::new(batch_resp))), } } @@ -1023,7 +1028,10 @@ impl RemoteEngineFbService for RemoteEngineServiceImpl { .await?; match result { TonicWriteResponseExt::Flatbuffer(v) => Ok(v), - TonicWriteResponseExt::Proto(_) => Err(Status::new(Code::Internal, "logic error")), + TonicWriteResponseExt::Proto(_) => Err(Status::new( + Code::Internal, + "only support flatbuffer payload", + )), } } @@ -1036,7 +1044,10 @@ impl RemoteEngineFbService for RemoteEngineServiceImpl { .await?; match result { TonicWriteResponseExt::Flatbuffer(v) => Ok(v), - TonicWriteResponseExt::Proto(_) => Err(Status::new(Code::Internal, "logic error")), + TonicWriteResponseExt::Proto(_) => Err(Status::new( + Code::Internal, + "only support flatbuffer payload", + )), } } } @@ -1077,7 +1088,9 @@ impl RemoteEngineService for RemoteEngineServiceImpl { .await?; match result { TonicWriteResponseExt::Proto(v) => Ok(v), - TonicWriteResponseExt::Flatbuffer(_) => Err(Status::new(Code::Internal, "logic error")), + TonicWriteResponseExt::Flatbuffer(_) => { + Err(Status::new(Code::Internal, "only support protobuf payload")) + } } } @@ -1097,7 +1110,9 @@ impl RemoteEngineService for RemoteEngineServiceImpl { .await?; match result { TonicWriteResponseExt::Proto(v) => Ok(v), - TonicWriteResponseExt::Flatbuffer(_) => Err(Status::new(Code::Internal, "logic error")), + TonicWriteResponseExt::Flatbuffer(_) => { + Err(Status::new(Code::Internal, "only support protobuf payload")) + } } } @@ -1600,12 +1615,18 @@ fn build_fb_write_response(resp: WriteResponse) -> FlatBufferBytes { let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024); let header = resp.header.unwrap(); - let error_message = builder.create_string(header.error.as_str()); + let error_message = if header.error.is_empty() { + None + } else { + Some(builder.create_string(header.error.as_str())) + }; + + // builder.create_string(header.error.as_str()); let response_header_args = ResponseHeader::create( &mut builder, &ResponseHeaderArgs { code: header.code, - error: Some(error_message), + error: error_message, }, ); diff --git a/src/table_engine/src/remote/model.rs b/src/table_engine/src/remote/model.rs index d04cb24664..8e2eb75386 100644 --- a/src/table_engine/src/remote/model.rs +++ b/src/table_engine/src/remote/model.rs @@ -34,7 +34,6 @@ use common_types::{ schema::{IndexInWriterSchema, RecordSchema, Schema, Version}, }; use fb_util::{ - common::FlatBufferBytes, remote_engine_generated::fbprotocol::{ ColumnDesc as FBColumnDesc, ContiguousRows as FBContiguousRows, ContiguousRowsArgs as FBContiguousRowsArgs, EncodedRow, EncodedRowArgs, @@ -43,6 +42,7 @@ use fb_util::{ WriteBatchRequest as FBWriteBatchRequest, WriteBatchRequestArgs as FBWriteBatchRequestArgs, WriteRequest as FBWriteRequest, WriteRequestArgs as FBWriteRequestArgs, }, + remote_service::FlatBufferBytes, }; use flatbuffers; use generic_error::{BoxError, GenericError, GenericResult};