From 69672c4c2e782de1057aa87230925e5abff8449e Mon Sep 17 00:00:00 2001 From: zealchen <283559115@qq.com> Date: Thu, 8 Aug 2024 16:33:39 -0500 Subject: [PATCH] refactor client to support both protobuf and flatbuffer protocol --- src/remote_engine_client/Cargo.toml | 2 +- src/remote_engine_client/src/client.rs | 368 +++++++++++++----- src/remote_engine_client/src/lib.rs | 5 + .../src/grpc/remote_engine_service/mod.rs | 6 +- 4 files changed, 274 insertions(+), 107 deletions(-) diff --git a/src/remote_engine_client/Cargo.toml b/src/remote_engine_client/Cargo.toml index 1620be63e1..4a70d656e6 100644 --- a/src/remote_engine_client/Cargo.toml +++ b/src/remote_engine_client/Cargo.toml @@ -47,4 +47,4 @@ snafu = { workspace = true } table_engine = { workspace = true } time_ext = { workspace = true } tokio = { workspace = true } -tonic = { workspace = true } +tonic = { workspace = true } \ No newline at end of file diff --git a/src/remote_engine_client/src/client.rs b/src/remote_engine_client/src/client.rs index f571cdc674..a25b19fa9f 100644 --- a/src/remote_engine_client/src/client.rs +++ b/src/remote_engine_client/src/client.rs @@ -58,7 +58,12 @@ use time_ext::ReadableDuration; use tokio::time::sleep; use tonic::{transport::Channel, Request, Streaming}; -use crate::{cached_router::CachedRouter, config::Config, error::*, status_code}; +use crate::{ + cached_router::{CachedRouter, RouteContext}, + config::Config, + error::*, + status_code, RequestType, +}; struct WriteBatchContext { table_idents: Vec, @@ -144,37 +149,15 @@ impl Client { // Write to remote. let table_ident = request.table.clone(); let endpoint = route_context.endpoint.clone(); - let request_fb = request.convert_into_fb().box_err().context(Convert { - msg: "Failed to convert WriteRequest to fb", - })?; - - let mut rpc_client = RemoteEngineFbServiceClient::::new(route_context.channel); - - let result = rpc_client - .write(Request::new(request_fb)) - .await - .with_context(|| Rpc { - table_idents: vec![table_ident.clone()], - msg: "Failed to write to remote engine", - }); - let result = result.and_then(|response| { - let fb_response = response.into_inner(); - let response = fb_response.deserialize::().unwrap(); - if let Some(header) = &response.header() - && !status_code::is_ok(header.code()) - { - Server { - endpoint, - table_idents: vec![table_ident.clone()], - code: header.code(), - msg: header.error().unwrap_or("").to_string(), - } - .fail() - } else { - Ok(response.affected_rows() as usize) + let result: std::result::Result = match runtime_request_type() { + RequestType::Protobuf => { + write_pb_internal(request, route_context, &table_ident, endpoint).await } - }); + RequestType::Flatbuffer => { + write_fb_internal(request, route_context, &table_ident, endpoint).await + } + }; if result.is_err() { // If occurred error, we simply evict the corresponding channel now. @@ -187,7 +170,8 @@ impl Client { pub async fn write_batch(&self, requests: Vec) -> Result> { // Find the channels from router firstly. - let mut write_batch_contexts_by_endpoint = HashMap::new(); + let mut write_batch_contexts_by_endpoint: HashMap = + HashMap::new(); for request in requests { let route_context = self.cached_router.route(&request.table).await?; let write_batch_context = write_batch_contexts_by_endpoint @@ -202,80 +186,17 @@ impl Client { } // Merge according to endpoint. - let mut write_handles = Vec::with_capacity(write_batch_contexts_by_endpoint.len()); - let mut written_tables = Vec::with_capacity(write_batch_contexts_by_endpoint.len()); - for (endpoint, context) in write_batch_contexts_by_endpoint { - // Write to remote. - let WriteBatchContext { - table_idents, - request, - channel, - } = context; - let batch_request_fb = request.convert_into_fb().box_err().context(Convert { - msg: "failed to convert request to fb", - })?; - let handle = self.io_runtime.spawn(async move { - let mut rpc_client = RemoteEngineFbServiceClient::::new(channel); - rpc_client - .write_batch(Request::new(batch_request_fb)) + match runtime_request_type() { + RequestType::Protobuf => { + self.write_batch_pb_internal(write_batch_contexts_by_endpoint) + .await + } + RequestType::Flatbuffer => { + self.write_batch_fb_internal(write_batch_contexts_by_endpoint) .await - .map(|v| (v, endpoint.clone())) - .box_err() - }); - - write_handles.push(handle); - written_tables.push(table_idents); - } - - let mut results = Vec::with_capacity(write_handles.len()); - for (table_idents, handle) in written_tables.into_iter().zip(write_handles) { - // If it's runtime error, don't evict entires from route cache. - let batch_result = match handle.await.box_err() { - Ok(result) => result, - Err(e) => { - results.push(WriteBatchResult { - table_idents, - result: Err(e), - }); - continue; - } - }; - - // Check remote write result then. - let result = batch_result.and_then(|result| { - let (response, endpoint) = result; - let fb_response = response.into_inner(); - let response = fb_response.deserialize::().unwrap(); - if let Some(header) = response.header() - && !status_code::is_ok(header.code()) - { - Server { - endpoint, - table_idents: table_idents.clone(), - code: header.code(), - msg: header.error().unwrap_or("").to_string(), - } - .fail() - .box_err() - } else { - Ok(response.affected_rows()) - } - }); - - if result.is_err() { - // If occurred error, we simply evict the corresponding channel now. - // TODO: evict according to the type of error. - self.evict_route_from_cache(&table_idents).await; } - - results.push(WriteBatchResult { - table_idents, - result, - }); } - - Ok(results) } pub async fn alter_table_schema(&self, request: AlterTableSchemaRequest) -> Result<()> { @@ -543,6 +464,162 @@ impl Client { ); self.cached_router.evict(table_idents).await; } + + async fn write_batch_pb_internal( + &self, + write_batch_contexts_by_endpoint: HashMap, + ) -> Result> { + let mut write_handles = Vec::with_capacity(write_batch_contexts_by_endpoint.len()); + let mut written_tables = Vec::with_capacity(write_batch_contexts_by_endpoint.len()); + for (endpoint, context) in write_batch_contexts_by_endpoint { + // Write to remote. + let WriteBatchContext { + table_idents, + request, + channel, + } = context; + let batch_request_pb = request.convert_into_pb().box_err().context(Convert { + msg: "failed to convert request to pb", + })?; + let handle = self.io_runtime.spawn(async move { + let mut rpc_client = RemoteEngineServiceClient::::new(channel); + rpc_client + .write_batch(Request::new(batch_request_pb)) + .await + .map(|v| (v, endpoint.clone())) + .box_err() + }); + + write_handles.push(handle); + written_tables.push(table_idents); + } + + let mut results = Vec::with_capacity(write_handles.len()); + for (table_idents, handle) in written_tables.into_iter().zip(write_handles) { + // If it's runtime error, don't evict entires from route cache. + let batch_result = match handle.await.box_err() { + Ok(result) => result, + Err(e) => { + results.push(WriteBatchResult { + table_idents, + result: Err(e), + }); + continue; + } + }; + + // Check remote write result then. + let result = batch_result.and_then(|result| { + let (response, endpoint) = result; + let response = response.into_inner(); + if let Some(header) = &response.header + && !status_code::is_ok(header.code) + { + Server { + endpoint, + table_idents: table_idents.clone(), + code: header.code, + msg: header.error.clone(), + } + .fail() + .box_err() + } else { + Ok(response.affected_rows) + } + }); + + if result.is_err() { + // If occurred error, we simply evict the corresponding channel now. + // TODO: evict according to the type of error. + self.evict_route_from_cache(&table_idents).await; + } + + results.push(WriteBatchResult { + table_idents, + result, + }); + } + Ok(results) + } + + async fn write_batch_fb_internal( + &self, + write_batch_contexts_by_endpoint: HashMap, + ) -> Result> { + let mut write_handles = Vec::with_capacity(write_batch_contexts_by_endpoint.len()); + let mut written_tables = Vec::with_capacity(write_batch_contexts_by_endpoint.len()); + for (endpoint, context) in write_batch_contexts_by_endpoint { + // Write to remote. + let WriteBatchContext { + table_idents, + request, + channel, + } = context; + + let batch_request_fb = request.convert_into_fb().box_err().context(Convert { + msg: "failed to convert request to fb", + })?; + let handle = self.io_runtime.spawn(async move { + let mut rpc_client = RemoteEngineFbServiceClient::::new(channel); + rpc_client + .write_batch(Request::new(batch_request_fb)) + .await + .map(|v| (v, endpoint.clone())) + .box_err() + }); + + write_handles.push(handle); + written_tables.push(table_idents); + } + + let mut results = Vec::with_capacity(write_handles.len()); + for (table_idents, handle) in written_tables.into_iter().zip(write_handles) { + // If it's runtime error, don't evict entires from route cache. + let batch_result = match handle.await.box_err() { + Ok(result) => result, + Err(e) => { + results.push(WriteBatchResult { + table_idents, + result: Err(e), + }); + continue; + } + }; + + // Check remote write result then. + let result = batch_result.and_then(|result| { + let (response, endpoint) = result; + let fb_response = response.into_inner(); + let response = fb_response.deserialize::().unwrap(); + if let Some(header) = response.header() + && !status_code::is_ok(header.code()) + { + Server { + endpoint, + table_idents: table_idents.clone(), + code: header.code(), + msg: header.error().unwrap_or("").to_string(), + } + .fail() + .box_err() + } else { + Ok(response.affected_rows()) + } + }); + + if result.is_err() { + // If occurred error, we simply evict the corresponding channel now. + // TODO: evict according to the type of error. + self.evict_route_from_cache(&table_idents).await; + } + + results.push(WriteBatchResult { + table_idents, + result, + }); + } + Ok(results) + } } pub struct ClientReadRecordBatchStream { @@ -651,3 +728,92 @@ fn convert_arrow_payload(mut v: ArrowPayload) -> Result { }) }) } + +fn runtime_request_type() -> RequestType { + // get request type from environment variable + std::env::var("REQUEST_TYPE").map_or(RequestType::Protobuf, |v| { + if v.to_lowercase() == "flatbuffer" { + RequestType::Flatbuffer + } else { + RequestType::Protobuf + } + }) +} + +async fn write_pb_internal( + request: WriteRequest, + route_context: RouteContext, + table_ident: &TableIdentifier, + endpoint: Endpoint, +) -> Result { + let request = request.convert_into_pb().box_err().context(Convert { + msg: "Failed to convert WriteRequest to pb", + })?; + let mut rpc_client = RemoteEngineServiceClient::::new(route_context.channel); + + let result = rpc_client + .write(Request::new(request)) + .await + .with_context(|| Rpc { + table_idents: vec![table_ident.clone()], + msg: "Failed to write to remote engine", + }); + + let result = result.and_then(|response| { + let response = response.into_inner(); + if let Some(header) = &response.header + && !status_code::is_ok(header.code) + { + Server { + endpoint, + table_idents: vec![table_ident.clone()], + code: header.code, + msg: header.error.clone(), + } + .fail() + } else { + Ok(response.affected_rows as usize) + } + }); + result +} + +async fn write_fb_internal( + request: WriteRequest, + route_context: RouteContext, + table_ident: &TableIdentifier, + endpoint: Endpoint, +) -> Result { + let request = request.convert_into_fb().box_err().context(Convert { + msg: "Failed to convert WriteRequest to fb", + })?; + + let mut rpc_client = RemoteEngineFbServiceClient::::new(route_context.channel); + + let result = rpc_client + .write(Request::new(request)) + .await + .with_context(|| Rpc { + table_idents: vec![table_ident.clone()], + msg: "Failed to write to remote engine", + }); + + let result = result.and_then(|response| { + let fb_response = response.into_inner(); + let response = fb_response.deserialize::().unwrap(); + if let Some(header) = &response.header() + && !status_code::is_ok(header.code()) + { + Server { + endpoint, + table_idents: vec![table_ident.clone()], + code: header.code(), + msg: header.error().unwrap_or("").to_string(), + } + .fail() + } else { + Ok(response.affected_rows() as usize) + } + }); + result +} diff --git a/src/remote_engine_client/src/lib.rs b/src/remote_engine_client/src/lib.rs index 53360bb03c..cc09ff0423 100644 --- a/src/remote_engine_client/src/lib.rs +++ b/src/remote_engine_client/src/lib.rs @@ -240,3 +240,8 @@ impl RecordBatchStream for RemoteReadRecordBatchStream { &self.0.record_schema } } + +pub enum RequestType { + Protobuf, + Flatbuffer, +} diff --git a/src/server/src/grpc/remote_engine_service/mod.rs b/src/server/src/grpc/remote_engine_service/mod.rs index 68004eaa8d..879dd286b0 100644 --- a/src/server/src/grpc/remote_engine_service/mod.rs +++ b/src/server/src/grpc/remote_engine_service/mod.rs @@ -67,6 +67,7 @@ use query_engine::{ physical_planner::PhysicalPlanRef, QueryEngineRef, QueryEngineType, }; +use remote_engine_client::RequestType; use runtime::{Priority, RuntimeRef}; use snafu::{OptionExt, ResultExt}; use table_engine::{ @@ -101,11 +102,6 @@ mod metrics; const STREAM_QUERY_CHANNEL_LEN: usize = 200; const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024; -enum RequestType { - Protobuf, - Flatbuffer, -} - enum TonicWriteBatchRequestExt { Proto(Request), Flatbuffer(tonic::Request),