Skip to content

Commit

Permalink
cr fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zealchen committed Aug 8, 2024
1 parent 6edff22 commit 2edb871
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 37 deletions.
12 changes: 6 additions & 6 deletions src/components/fb_util/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ fn main() -> Result<()> {
tonic_build::manual::Method::builder()
.name("write")
.route_name("Write")
.input_type("crate::common::FlatBufferBytes")
.output_type("crate::common::FlatBufferBytes")
.codec_path("crate::common::FlatBufferCodec")
.input_type("crate::remote_service::FlatBufferBytes")
.output_type("crate::remote_service::FlatBufferBytes")
.codec_path("crate::remote_service::FlatBufferCodec")
.build(),
)
.method(
tonic_build::manual::Method::builder()
.name("write_batch")
.route_name("WriteBatch")
.input_type("crate::common::FlatBufferBytes")
.output_type("crate::common::FlatBufferBytes")
.codec_path("crate::common::FlatBufferCodec")
.input_type("crate::remote_service::FlatBufferBytes")
.output_type("crate::remote_service::FlatBufferBytes")
.codec_path("crate::remote_service::FlatBufferCodec")
.build(),
)
.build();
Expand Down
2 changes: 1 addition & 1 deletion src/components/fb_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

pub mod common;
pub mod remote_service;
#[allow(clippy::all)]
#[rustfmt::skip]
#[allow(non_snake_case)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Decoder for FlatBufferDecoder {
}
}

/// A [`Codec`] that implements `application/grpc+json` via the serde library.
/// A [`Codec`] that implements the FlatBuffers serialization format.
#[derive(Debug, Clone, Default)]
pub struct FlatBufferCodec();

Expand Down
7 changes: 3 additions & 4 deletions src/remote_engine_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl Client {
endpoint,
table_idents: vec![table_ident.clone()],
code: header.code(),
msg: header.error().unwrap().to_string(),
msg: header.error().unwrap_or("").to_string(),
}
.fail()
} else {
Expand All @@ -189,8 +189,7 @@ impl Client {
// Find the channels from router firstly.
let mut write_batch_contexts_by_endpoint = HashMap::new();
for request in requests {
let route_context: crate::cached_router::RouteContext =
self.cached_router.route(&request.table).await?;
let route_context = self.cached_router.route(&request.table).await?;
let write_batch_context = write_batch_contexts_by_endpoint
.entry(route_context.endpoint)
.or_insert(WriteBatchContext {
Expand Down Expand Up @@ -255,7 +254,7 @@ impl Client {
endpoint,
table_idents: table_idents.clone(),
code: header.code(),
msg: header.error().unwrap().to_string(),
msg: header.error().unwrap_or("").to_string(),
}
.fail()
.box_err()
Expand Down
10 changes: 5 additions & 5 deletions src/server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub struct RpcServices {
rpc_server: InterceptedService<StorageServiceServer<StorageServiceImpl>, AuthWithFile>,
meta_rpc_server: Option<MetaEventServiceServer<MetaServiceImpl>>,
remote_engine_server: RemoteEngineServiceServer<RemoteEngineServiceImpl>,
remote_engine_server_ext: RemoteEngineFbServiceServer<RemoteEngineServiceImpl>,
remote_engine_fb_server: RemoteEngineFbServiceServer<RemoteEngineServiceImpl>,
runtime: Arc<Runtime>,
stop_tx: Option<Sender<()>>,
join_handle: Option<JoinHandle<()>>,
Expand All @@ -177,7 +177,7 @@ impl RpcServices {
let rpc_server = self.rpc_server.clone();
let meta_rpc_server = self.meta_rpc_server.clone();
let remote_engine_server = self.remote_engine_server.clone();
let remote_engine_server_ext = self.remote_engine_server_ext.clone();
let remote_engine_fb_server = self.remote_engine_fb_server.clone();
let serve_addr = self.serve_addr;
let (stop_tx, stop_rx) = oneshot::channel();
let join_handle = self.runtime.spawn(async move {
Expand All @@ -192,7 +192,7 @@ impl RpcServices {

info!("Grpc server serves remote engine rpc service");
router = router.add_service(remote_engine_server);
router = router.add_service(remote_engine_server_ext);
router = router.add_service(remote_engine_fb_server);

router
.serve_with_shutdown(serve_addr, stop_rx.map(drop))
Expand Down Expand Up @@ -343,7 +343,7 @@ impl Builder {
hotspot_recorder,
};

let remote_engine_server_ext = RemoteEngineFbServiceServer::new(service_ext);
let remote_engine_fb_server = RemoteEngineFbServiceServer::new(service_ext);

let runtime = runtimes.default_runtime.clone();

Expand All @@ -361,7 +361,7 @@ impl Builder {
rpc_server,
meta_rpc_server,
remote_engine_server,
remote_engine_server_ext,
remote_engine_fb_server,
runtime,
stop_tx: None,
join_handle: None,
Expand Down
59 changes: 40 additions & 19 deletions src/server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ use async_trait::async_trait;
use catalog::{manager::ManagerRef, schema::SchemaRef};
use common_types::{record_batch::RecordBatch, request_id::RequestId};
use fb_util::{
common::FlatBufferBytes,
remote_engine_fb_service_server::RemoteEngineFbService,
remote_engine_generated::fbprotocol::{
ContiguousRows as FBContiguousRows, ResponseHeader, ResponseHeaderArgs,
WriteBatchRequest as FBWriteBatchRequest, WriteRequest as FBWriteRequest,
WriteResponse as FBWriteResponse, WriteResponseArgs as FBWriteResponseArgs,
},
remote_service::FlatBufferBytes,
};
use futures::{
stream::{self, BoxStream, FuturesUnordered, StreamExt},
Expand Down Expand Up @@ -101,6 +101,11 @@ mod metrics;
const STREAM_QUERY_CHANNEL_LEN: usize = 200;
const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024;

enum RequestType {
Protobuf,
Flatbuffer,
}

enum TonicWriteBatchRequestExt {
Proto(Request<WriteBatchRequest>),
Flatbuffer(tonic::Request<FlatBufferBytes>),
Expand Down Expand Up @@ -608,7 +613,7 @@ impl RemoteEngineServiceImpl {
) -> std::result::Result<TonicWriteResponseExt, Status> {
let begin_instant = Instant::now();
let ctx = self.handler_ctx();
let (handle, is_flatbuffer) = match request {
let (handle, request_type) = match request {
TonicWriteRequestExt::Proto(v) => (
self.runtimes.write_runtime.spawn(async move {
let request = WriteRequestExt::Proto(v.into_inner());
Expand All @@ -618,7 +623,7 @@ impl RemoteEngineServiceImpl {
e
})
}),
false,
RequestType::Protobuf,
),
TonicWriteRequestExt::Flatbuffer(v) => (
self.runtimes.write_runtime.spawn(async move {
Expand All @@ -631,7 +636,7 @@ impl RemoteEngineServiceImpl {
e
})
}),
true,
RequestType::Flatbuffer,
),
};

Expand All @@ -655,11 +660,11 @@ impl RemoteEngineServiceImpl {
.write
.observe(begin_instant.saturating_elapsed().as_secs_f64());

match is_flatbuffer {
true => Ok(TonicWriteResponseExt::Flatbuffer(Response::new(
match request_type {
RequestType::Flatbuffer => Ok(TonicWriteResponseExt::Flatbuffer(Response::new(
build_fb_write_response(resp),
))),
false => Ok(TonicWriteResponseExt::Proto(Response::new(resp))),
RequestType::Protobuf => Ok(TonicWriteResponseExt::Proto(Response::new(resp))),
}
}

Expand Down Expand Up @@ -705,7 +710,7 @@ impl RemoteEngineServiceImpl {
) -> std::result::Result<TonicWriteResponseExt, Status> {
let begin_instant = Instant::now();

let (write_table_handles, is_flatbuffer) = match request {
let (write_table_handles, request_type) = match request {
TonicWriteBatchRequestExt::Proto(v) => {
let request = v.into_inner();
let mut write_table_handles = Vec::with_capacity(request.batch.len());
Expand All @@ -717,7 +722,7 @@ impl RemoteEngineServiceImpl {
.spawn(handle_write(ctx, WriteRequestExt::Proto(one_request)));
write_table_handles.push(handle);
}
(write_table_handles, false)
(write_table_handles, RequestType::Protobuf)
}
TonicWriteBatchRequestExt::Flatbuffer(v) => {
let request = Arc::new(v.into_inner());
Expand All @@ -739,7 +744,7 @@ impl RemoteEngineServiceImpl {
});
write_table_handles.push(handle);
}
(write_table_handles, true)
(write_table_handles, RequestType::Flatbuffer)
}
};

Expand Down Expand Up @@ -776,11 +781,11 @@ impl RemoteEngineServiceImpl {
.write_batch
.observe(begin_instant.saturating_elapsed().as_secs_f64());

match is_flatbuffer {
true => Ok(TonicWriteResponseExt::Flatbuffer(Response::new(
match request_type {
RequestType::Flatbuffer => Ok(TonicWriteResponseExt::Flatbuffer(Response::new(
build_fb_write_response(batch_resp),
))),
false => Ok(TonicWriteResponseExt::Proto(Response::new(batch_resp))),
RequestType::Protobuf => Ok(TonicWriteResponseExt::Proto(Response::new(batch_resp))),
}
}

Expand Down Expand Up @@ -1023,7 +1028,10 @@ impl RemoteEngineFbService for RemoteEngineServiceImpl {
.await?;
match result {
TonicWriteResponseExt::Flatbuffer(v) => Ok(v),
TonicWriteResponseExt::Proto(_) => Err(Status::new(Code::Internal, "logic error")),
TonicWriteResponseExt::Proto(_) => Err(Status::new(
Code::Internal,
"only support flatbuffer payload",
)),
}
}

Expand All @@ -1036,7 +1044,10 @@ impl RemoteEngineFbService for RemoteEngineServiceImpl {
.await?;
match result {
TonicWriteResponseExt::Flatbuffer(v) => Ok(v),
TonicWriteResponseExt::Proto(_) => Err(Status::new(Code::Internal, "logic error")),
TonicWriteResponseExt::Proto(_) => Err(Status::new(
Code::Internal,
"only support flatbuffer payload",
)),
}
}
}
Expand Down Expand Up @@ -1077,7 +1088,9 @@ impl RemoteEngineService for RemoteEngineServiceImpl {
.await?;
match result {
TonicWriteResponseExt::Proto(v) => Ok(v),
TonicWriteResponseExt::Flatbuffer(_) => Err(Status::new(Code::Internal, "logic error")),
TonicWriteResponseExt::Flatbuffer(_) => {
Err(Status::new(Code::Internal, "only support protobuf payload"))
}
}
}

Expand All @@ -1097,7 +1110,9 @@ impl RemoteEngineService for RemoteEngineServiceImpl {
.await?;
match result {
TonicWriteResponseExt::Proto(v) => Ok(v),
TonicWriteResponseExt::Flatbuffer(_) => Err(Status::new(Code::Internal, "logic error")),
TonicWriteResponseExt::Flatbuffer(_) => {
Err(Status::new(Code::Internal, "only support protobuf payload"))
}
}
}

Expand Down Expand Up @@ -1600,12 +1615,18 @@ fn build_fb_write_response(resp: WriteResponse) -> FlatBufferBytes {
let mut builder = flatbuffers::FlatBufferBuilder::with_capacity(1024);
let header = resp.header.unwrap();

let error_message = builder.create_string(header.error.as_str());
let error_message = if header.error.is_empty() {
None
} else {
Some(builder.create_string(header.error.as_str()))
};

// builder.create_string(header.error.as_str());
let response_header_args = ResponseHeader::create(
&mut builder,
&ResponseHeaderArgs {
code: header.code,
error: Some(error_message),
error: error_message,
},
);

Expand Down
2 changes: 1 addition & 1 deletion src/table_engine/src/remote/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use common_types::{
schema::{IndexInWriterSchema, RecordSchema, Schema, Version},
};
use fb_util::{
common::FlatBufferBytes,
remote_engine_generated::fbprotocol::{
ColumnDesc as FBColumnDesc, ContiguousRows as FBContiguousRows,
ContiguousRowsArgs as FBContiguousRowsArgs, EncodedRow, EncodedRowArgs,
Expand All @@ -43,6 +42,7 @@ use fb_util::{
WriteBatchRequest as FBWriteBatchRequest, WriteBatchRequestArgs as FBWriteBatchRequestArgs,
WriteRequest as FBWriteRequest, WriteRequestArgs as FBWriteRequestArgs,
},
remote_service::FlatBufferBytes,
};
use flatbuffers;
use generic_error::{BoxError, GenericError, GenericResult};
Expand Down

0 comments on commit 2edb871

Please sign in to comment.