diff --git a/proxy/src/forward.rs b/proxy/src/forward.rs index 8f4c365ded..1d00d56084 100644 --- a/proxy/src/forward.rs +++ b/proxy/src/forward.rs @@ -21,11 +21,11 @@ use std::{ use async_trait::async_trait; use ceresdbproto::storage::{ - storage_service_client::StorageServiceClient, RequestContext, RouteRequest, + storage_service_client::StorageServiceClient, RequestContext, RouteRequest as RouteRequestPb, }; use log::{debug, error, warn}; use macros::define_result; -use router::{endpoint::Endpoint, RouterRef}; +use router::{endpoint::Endpoint, RouteRequest, RouterRef}; use serde::{Deserialize, Serialize}; use snafu::{Backtrace, ResultExt, Snafu}; use time_ext::ReadableDuration; @@ -282,12 +282,13 @@ impl Forwarder { forwarded_from, } = forward_req; - let route_req = RouteRequest { + let req_pb = RouteRequestPb { context: Some(RequestContext { database: schema }), tables: vec![table], }; - let endpoint = match self.router.route(route_req).await { + let request = RouteRequest::new(req_pb, true); + let endpoint = match self.router.route(request).await { Ok(mut routes) => { if routes.len() != 1 || routes[0].endpoint.is_none() { warn!( @@ -420,11 +421,11 @@ mod tests { #[async_trait] impl Router for MockRouter { async fn route(&self, req: RouteRequest) -> router::Result> { - let endpoint = self.routing_tables.get(&req.tables[0]); + let endpoint = self.routing_tables.get(&req.inner.tables[0]); match endpoint { None => Ok(vec![]), Some(v) => Ok(vec![Route { - table: req.tables[0].clone(), + table: req.inner.tables[0].clone(), endpoint: Some(v.clone().into()), }]), } diff --git a/proxy/src/grpc/route.rs b/proxy/src/grpc/route.rs index bc21fd06cd..dcc76de75d 100644 --- a/proxy/src/grpc/route.rs +++ b/proxy/src/grpc/route.rs @@ -12,13 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ceresdbproto::storage::{RouteRequest, RouteResponse}; +use ceresdbproto::storage::{RouteRequest as RouteRequestPb, RouteResponse}; +use router::RouteRequest; use crate::{error, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy}; impl Proxy { - pub async fn handle_route(&self, _ctx: Context, req: RouteRequest) -> RouteResponse { - let routes = self.route(req).await; + pub async fn handle_route(&self, _ctx: Context, req: RouteRequestPb) -> RouteResponse { + let request = RouteRequest::new(req, true); + let routes = self.route(request).await; let mut resp = RouteResponse::default(); match routes { diff --git a/proxy/src/http/route.rs b/proxy/src/http/route.rs index 3a692cc4eb..8e989d560e 100644 --- a/proxy/src/http/route.rs +++ b/proxy/src/http/route.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ceresdbproto::storage::RouteRequest; -use router::endpoint::Endpoint; +use ceresdbproto::storage::RouteRequest as RouteRequestPb; +use router::{endpoint::Endpoint, RouteRequest}; use serde::Serialize; use crate::{context::RequestContext, error::Result, Proxy}; @@ -39,14 +39,19 @@ impl Proxy { return Ok(RouteResponse { routes: vec![] }); } - let route_req = RouteRequest { + let req_pb = RouteRequestPb { context: Some(ceresdbproto::storage::RequestContext { database: ctx.schema.clone(), }), tables: vec![table.to_string()], }; - let routes = self.route(route_req).await?; + let request = RouteRequest { + route_with_cache: false, + inner: req_pb, + }; + + let routes = self.route(request).await?; let routes = routes .into_iter() diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index a897392959..6593db42fa 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -53,7 +53,7 @@ use catalog::{ }; use ceresdbproto::storage::{ storage_service_client::StorageServiceClient, PrometheusRemoteQueryRequest, - PrometheusRemoteQueryResponse, Route, RouteRequest, + PrometheusRemoteQueryResponse, Route, }; use common_types::{request_id::RequestId, table::DEFAULT_SHARD_ID, ENABLE_TTL, TTL}; use datafusion::{ @@ -70,7 +70,7 @@ use interpreters::{ }; use log::{error, info, warn}; use query_frontend::plan::Plan; -use router::{endpoint::Endpoint, Router}; +use router::{endpoint::Endpoint, RouteRequest, Router}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use table_engine::{ diff --git a/proxy/src/write.rs b/proxy/src/write.rs index e7d81e17a8..caf78992b5 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -22,8 +22,8 @@ use std::{ use bytes::Bytes; use ceresdbproto::storage::{ - storage_service_client::StorageServiceClient, value, RouteRequest, Value, WriteRequest, - WriteResponse as WriteResponsePB, WriteSeriesEntry, WriteTableRequest, + storage_service_client::StorageServiceClient, value, RouteRequest as RouteRequestPb, Value, + WriteRequest, WriteResponse as WriteResponsePB, WriteSeriesEntry, WriteTableRequest, }; use cluster::config::SchemaConfig; use common_types::{ @@ -45,7 +45,7 @@ use query_frontend::{ planner::{build_column_schema, try_get_data_type_from_value}, provider::CatalogMetaProvider, }; -use router::endpoint::Endpoint; +use router::{endpoint::Endpoint, RouteRequest}; use snafu::{ensure, OptionExt, ResultExt}; use table_engine::table::TableRef; use tonic::transport::Channel; @@ -310,13 +310,13 @@ impl Proxy { // TODO: Make the router can accept an iterator over the tables to avoid the // memory allocation here. - let route_data = self - .router - .route(RouteRequest { - context: req.context.clone(), - tables, - }) - .await?; + let req_pb = RouteRequestPb { + context: req.context.clone(), + tables, + }; + let request = RouteRequest::new(req_pb, true); + let route_data = self.router.route(request).await?; + let forwarded_table_routes = route_data .into_iter() .filter_map(|router| { diff --git a/remote_engine_client/src/cached_router.rs b/remote_engine_client/src/cached_router.rs index 36b0c0c9a4..94336e228b 100644 --- a/remote_engine_client/src/cached_router.rs +++ b/remote_engine_client/src/cached_router.rs @@ -18,7 +18,7 @@ use std::{collections::HashMap, sync::RwLock}; use ceresdbproto::storage::{self, RequestContext}; use log::debug; -use router::{endpoint::Endpoint, RouterRef}; +use router::{endpoint::Endpoint, RouteRequest, RouterRef}; use snafu::{OptionExt, ResultExt}; use table_engine::remote::model::TableIdentifier; use tonic::transport::Channel as TonicChannel; @@ -105,19 +105,16 @@ impl CachedRouter { async fn do_route(&self, table_ident: &TableIdentifier) -> Result { let schema = &table_ident.schema; let table = table_ident.table.clone(); - let route_request = storage::RouteRequest { + let request_pb = storage::RouteRequest { context: Some(RequestContext { database: schema.to_string(), }), tables: vec![table], }; - let route_infos = self - .router - .route(route_request) - .await - .context(RouteWithCause { - table_ident: table_ident.clone(), - })?; + let request = RouteRequest::new(request_pb, true); + let route_infos = self.router.route(request).await.context(RouteWithCause { + table_ident: table_ident.clone(), + })?; if route_infos.is_empty() { return RouteNoCause { diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs index b20d56a136..89d679413a 100644 --- a/router/src/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -15,7 +15,7 @@ //! A router based on the [`cluster::Cluster`]. use async_trait::async_trait; -use ceresdbproto::storage::{Route, RouteRequest}; +use ceresdbproto::storage::Route; use cluster::ClusterRef; use generic_error::BoxError; use log::trace; @@ -24,7 +24,8 @@ use moka::future::Cache; use snafu::ResultExt; use crate::{ - endpoint::Endpoint, OtherWithCause, ParseEndpoint, Result, RouteCacheConfig, Router, TableInfo, + endpoint::Endpoint, OtherWithCause, ParseEndpoint, Result, RouteCacheConfig, RouteRequest, + Router, TableInfo, }; #[derive(Clone, Debug)] @@ -75,22 +76,15 @@ impl ClusterBasedRouter { miss } - async fn route_with_cache( + async fn route_from_meta( &self, - tables: &Vec, + tables: &[String], database: String, - ) -> Result> { - // Firstly route table from local cache. - let mut routes = Vec::with_capacity(tables.len()); - let miss = self.route_from_cache(tables, &mut routes); - trace!("Route from cache, miss:{miss:?}, routes:{routes:?}"); - - if miss.is_empty() { - return Ok(routes); - } + routes: &mut Vec, + ) -> Result<()> { let route_tables_req = RouteTablesRequest { schema_name: database, - table_names: miss, + table_names: tables.to_owned(), }; let route_resp = self @@ -126,6 +120,33 @@ impl ClusterBasedRouter { routes.push(route); } } + + Ok(()) + } + + async fn route_internal( + &self, + tables: &[String], + database: String, + route_with_cache: bool, + ) -> Result> { + // Firstly route table from local cache. + let mut routes = Vec::with_capacity(tables.len()); + let miss = if route_with_cache { + self.route_from_cache(tables, &mut routes) + } else { + tables.to_owned() + }; + + trace!("Route from cache, miss:{miss:?}, routes:{routes:?}"); + + if miss.is_empty() { + return Ok(routes); + } + + // If miss exists, route from meta. + self.route_from_meta(&miss, database, &mut routes).await?; + Ok(routes) } } @@ -145,9 +166,12 @@ fn make_route(table_info: TableInfo, endpoint: Option<&str>) -> Result Result> { - let req_ctx = req.context.unwrap(); - let route_data_vec = self.route_with_cache(&req.tables, req_ctx.database).await?; - Ok(route_data_vec + let req_ctx = req.inner.context.unwrap(); + let route_datas = self + .route_internal(&req.inner.tables, req_ctx.database, req.route_with_cache) + .await?; + + Ok(route_datas .into_iter() .map(|v| Route { table: v.table_info.name, @@ -158,7 +182,7 @@ impl Router for ClusterBasedRouter { async fn fetch_table_info(&self, schema: &str, table: &str) -> Result> { let mut route_data_vec = self - .route_with_cache(&vec![table.to_string()], schema.to_string()) + .route_internal(&[table.to_string()], schema.to_string(), true) .await?; if route_data_vec.is_empty() { return Ok(None); @@ -174,7 +198,7 @@ impl Router for ClusterBasedRouter { mod tests { use std::{collections::HashMap, sync::Arc, thread::sleep, time::Duration}; - use ceresdbproto::storage::RequestContext; + use ceresdbproto::storage::{RequestContext, RouteRequest as RouteRequestPb}; use cluster::{ shard_lock_manager::ShardLockManagerRef, shard_set::ShardRef, Cluster, ClusterNodesResp, }; @@ -275,14 +299,15 @@ mod tests { // first case get two tables, no one miss let tables = vec![table1.to_string(), table2.to_string()]; - let result = router - .route(RouteRequest { - context: Some(RequestContext { - database: String::from("public"), - }), - tables: tables.clone(), - }) - .await; + let request_pb = RouteRequestPb { + context: Some(RequestContext { + database: String::from("public"), + }), + tables: tables.clone(), + }; + let request = RouteRequest::new(request_pb, true); + + let result = router.route(request).await; assert_eq!(result.unwrap().len(), 2); let mut routes = Vec::with_capacity(tables.len()); diff --git a/router/src/lib.rs b/router/src/lib.rs index 4da164884e..62572fa120 100644 --- a/router/src/lib.rs +++ b/router/src/lib.rs @@ -19,7 +19,7 @@ pub mod rule_based; use std::{sync::Arc, time::Duration}; use async_trait::async_trait; -use ceresdbproto::storage::{Route, RouteRequest}; +use ceresdbproto::storage::{Route, RouteRequest as RouteRequestPb}; pub use cluster_based::ClusterBasedRouter; use macros::define_result; use meta_client::types::TableInfo; @@ -79,6 +79,20 @@ pub trait Router { async fn fetch_table_info(&self, schema: &str, table: &str) -> Result>; } +pub struct RouteRequest { + pub route_with_cache: bool, + pub inner: RouteRequestPb, +} + +impl RouteRequest { + pub fn new(request: RouteRequestPb, route_with_cache: bool) -> Self { + Self { + route_with_cache, + inner: request, + } + } +} + #[derive(Clone, Debug, Deserialize, Serialize)] pub struct RouteCacheConfig { /// Enable route cache, default false. diff --git a/router/src/rule_based.rs b/router/src/rule_based.rs index 846a62e252..0e4acec019 100644 --- a/router/src/rule_based.rs +++ b/router/src/rule_based.rs @@ -17,14 +17,16 @@ use std::collections::HashMap; use async_trait::async_trait; -use ceresdbproto::storage::{self, Route, RouteRequest}; +use ceresdbproto::storage::{self, Route}; use cluster::config::SchemaConfig; use log::info; use meta_client::types::ShardId; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt}; -use crate::{endpoint::Endpoint, hash, Result, RouteNotFound, Router, ShardNotFound, TableInfo}; +use crate::{ + endpoint::Endpoint, hash, Result, RouteNotFound, RouteRequest, Router, ShardNotFound, TableInfo, +}; pub type ShardNodes = HashMap; @@ -151,7 +153,7 @@ impl RuleBasedRouter { #[async_trait] impl Router for RuleBasedRouter { async fn route(&self, req: RouteRequest) -> Result> { - let req_ctx = req.context.unwrap(); + let req_ctx = req.inner.context.unwrap(); let schema = &req_ctx.database; if let Some(shard_nodes) = self.cluster_view.schema_shards.get(schema) { ensure!(!shard_nodes.is_empty(), RouteNotFound { schema }); @@ -161,8 +163,8 @@ impl Router for RuleBasedRouter { // TODO(yingwen): Better way to get total shard number let total_shards = shard_nodes.len(); - let mut route_results = Vec::with_capacity(req.tables.len()); - for table in req.tables { + let mut route_results = Vec::with_capacity(req.inner.tables.len()); + for table in req.inner.tables { let shard_id = Self::route_table(&table, rule_list_opt, total_shards); let endpoint = shard_nodes.get(&shard_id).with_context(|| ShardNotFound {