Skip to content

Commit

Permalink
chore: http route directly from meta (#1221)
Browse files Browse the repository at this point in the history
## Rationale
We want to know the real endpoint of table through http api, for
reaching this we need to get route info from meta directly rather than
from cache.

## Detailed Changes
Get route info directy from meta in http api.

## Test Plan
Test manually.
  • Loading branch information
Rachelint authored Sep 19, 2023
1 parent 419a473 commit 0f99e23
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 67 deletions.
13 changes: 7 additions & 6 deletions proxy/src/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use std::{

use async_trait::async_trait;
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, RequestContext, RouteRequest,
storage_service_client::StorageServiceClient, RequestContext, RouteRequest as RouteRequestPb,
};
use log::{debug, error, warn};
use macros::define_result;
use router::{endpoint::Endpoint, RouterRef};
use router::{endpoint::Endpoint, RouteRequest, RouterRef};
use serde::{Deserialize, Serialize};
use snafu::{Backtrace, ResultExt, Snafu};
use time_ext::ReadableDuration;
Expand Down Expand Up @@ -282,12 +282,13 @@ impl<B: ClientBuilder> Forwarder<B> {
forwarded_from,
} = forward_req;

let route_req = RouteRequest {
let req_pb = RouteRequestPb {
context: Some(RequestContext { database: schema }),
tables: vec![table],
};

let endpoint = match self.router.route(route_req).await {
let request = RouteRequest::new(req_pb, true);
let endpoint = match self.router.route(request).await {
Ok(mut routes) => {
if routes.len() != 1 || routes[0].endpoint.is_none() {
warn!(
Expand Down Expand Up @@ -420,11 +421,11 @@ mod tests {
#[async_trait]
impl Router for MockRouter {
async fn route(&self, req: RouteRequest) -> router::Result<Vec<Route>> {
let endpoint = self.routing_tables.get(&req.tables[0]);
let endpoint = self.routing_tables.get(&req.inner.tables[0]);
match endpoint {
None => Ok(vec![]),
Some(v) => Ok(vec![Route {
table: req.tables[0].clone(),
table: req.inner.tables[0].clone(),
endpoint: Some(v.clone().into()),
}]),
}
Expand Down
8 changes: 5 additions & 3 deletions proxy/src/grpc/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use ceresdbproto::storage::{RouteRequest, RouteResponse};
use ceresdbproto::storage::{RouteRequest as RouteRequestPb, RouteResponse};
use router::RouteRequest;

use crate::{error, metrics::GRPC_HANDLER_COUNTER_VEC, Context, Proxy};

impl Proxy {
pub async fn handle_route(&self, _ctx: Context, req: RouteRequest) -> RouteResponse {
let routes = self.route(req).await;
pub async fn handle_route(&self, _ctx: Context, req: RouteRequestPb) -> RouteResponse {
let request = RouteRequest::new(req, true);
let routes = self.route(request).await;

let mut resp = RouteResponse::default();
match routes {
Expand Down
13 changes: 9 additions & 4 deletions proxy/src/http/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use ceresdbproto::storage::RouteRequest;
use router::endpoint::Endpoint;
use ceresdbproto::storage::RouteRequest as RouteRequestPb;
use router::{endpoint::Endpoint, RouteRequest};
use serde::Serialize;

use crate::{context::RequestContext, error::Result, Proxy};
Expand All @@ -39,14 +39,19 @@ impl Proxy {
return Ok(RouteResponse { routes: vec![] });
}

let route_req = RouteRequest {
let req_pb = RouteRequestPb {
context: Some(ceresdbproto::storage::RequestContext {
database: ctx.schema.clone(),
}),
tables: vec![table.to_string()],
};

let routes = self.route(route_req).await?;
let request = RouteRequest {
route_with_cache: false,
inner: req_pb,
};

let routes = self.route(request).await?;

let routes = routes
.into_iter()
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use catalog::{
};
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, PrometheusRemoteQueryRequest,
PrometheusRemoteQueryResponse, Route, RouteRequest,
PrometheusRemoteQueryResponse, Route,
};
use common_types::{request_id::RequestId, table::DEFAULT_SHARD_ID, ENABLE_TTL, TTL};
use datafusion::{
Expand All @@ -70,7 +70,7 @@ use interpreters::{
};
use log::{error, info, warn};
use query_frontend::plan::Plan;
use router::{endpoint::Endpoint, Router};
use router::{endpoint::Endpoint, RouteRequest, Router};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table_engine::{
Expand Down
20 changes: 10 additions & 10 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use std::{

use bytes::Bytes;
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, value, RouteRequest, Value, WriteRequest,
WriteResponse as WriteResponsePB, WriteSeriesEntry, WriteTableRequest,
storage_service_client::StorageServiceClient, value, RouteRequest as RouteRequestPb, Value,
WriteRequest, WriteResponse as WriteResponsePB, WriteSeriesEntry, WriteTableRequest,
};
use cluster::config::SchemaConfig;
use common_types::{
Expand All @@ -45,7 +45,7 @@ use query_frontend::{
planner::{build_column_schema, try_get_data_type_from_value},
provider::CatalogMetaProvider,
};
use router::endpoint::Endpoint;
use router::{endpoint::Endpoint, RouteRequest};
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::table::TableRef;
use tonic::transport::Channel;
Expand Down Expand Up @@ -310,13 +310,13 @@ impl Proxy {

// TODO: Make the router can accept an iterator over the tables to avoid the
// memory allocation here.
let route_data = self
.router
.route(RouteRequest {
context: req.context.clone(),
tables,
})
.await?;
let req_pb = RouteRequestPb {
context: req.context.clone(),
tables,
};
let request = RouteRequest::new(req_pb, true);
let route_data = self.router.route(request).await?;

let forwarded_table_routes = route_data
.into_iter()
.filter_map(|router| {
Expand Down
15 changes: 6 additions & 9 deletions remote_engine_client/src/cached_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{collections::HashMap, sync::RwLock};

use ceresdbproto::storage::{self, RequestContext};
use log::debug;
use router::{endpoint::Endpoint, RouterRef};
use router::{endpoint::Endpoint, RouteRequest, RouterRef};
use snafu::{OptionExt, ResultExt};
use table_engine::remote::model::TableIdentifier;
use tonic::transport::Channel as TonicChannel;
Expand Down Expand Up @@ -105,19 +105,16 @@ impl CachedRouter {
async fn do_route(&self, table_ident: &TableIdentifier) -> Result<RouteContext> {
let schema = &table_ident.schema;
let table = table_ident.table.clone();
let route_request = storage::RouteRequest {
let request_pb = storage::RouteRequest {
context: Some(RequestContext {
database: schema.to_string(),
}),
tables: vec![table],
};
let route_infos = self
.router
.route(route_request)
.await
.context(RouteWithCause {
table_ident: table_ident.clone(),
})?;
let request = RouteRequest::new(request_pb, true);
let route_infos = self.router.route(request).await.context(RouteWithCause {
table_ident: table_ident.clone(),
})?;

if route_infos.is_empty() {
return RouteNoCause {
Expand Down
79 changes: 52 additions & 27 deletions router/src/cluster_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! A router based on the [`cluster::Cluster`].
use async_trait::async_trait;
use ceresdbproto::storage::{Route, RouteRequest};
use ceresdbproto::storage::Route;
use cluster::ClusterRef;
use generic_error::BoxError;
use log::trace;
Expand All @@ -24,7 +24,8 @@ use moka::future::Cache;
use snafu::ResultExt;

use crate::{
endpoint::Endpoint, OtherWithCause, ParseEndpoint, Result, RouteCacheConfig, Router, TableInfo,
endpoint::Endpoint, OtherWithCause, ParseEndpoint, Result, RouteCacheConfig, RouteRequest,
Router, TableInfo,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -75,22 +76,15 @@ impl ClusterBasedRouter {
miss
}

async fn route_with_cache(
async fn route_from_meta(
&self,
tables: &Vec<String>,
tables: &[String],
database: String,
) -> Result<Vec<RouteData>> {
// Firstly route table from local cache.
let mut routes = Vec::with_capacity(tables.len());
let miss = self.route_from_cache(tables, &mut routes);
trace!("Route from cache, miss:{miss:?}, routes:{routes:?}");

if miss.is_empty() {
return Ok(routes);
}
routes: &mut Vec<RouteData>,
) -> Result<()> {
let route_tables_req = RouteTablesRequest {
schema_name: database,
table_names: miss,
table_names: tables.to_owned(),
};

let route_resp = self
Expand Down Expand Up @@ -126,6 +120,33 @@ impl ClusterBasedRouter {
routes.push(route);
}
}

Ok(())
}

async fn route_internal(
&self,
tables: &[String],
database: String,
route_with_cache: bool,
) -> Result<Vec<RouteData>> {
// Firstly route table from local cache.
let mut routes = Vec::with_capacity(tables.len());
let miss = if route_with_cache {
self.route_from_cache(tables, &mut routes)
} else {
tables.to_owned()
};

trace!("Route from cache, miss:{miss:?}, routes:{routes:?}");

if miss.is_empty() {
return Ok(routes);
}

// If miss exists, route from meta.
self.route_from_meta(&miss, database, &mut routes).await?;

Ok(routes)
}
}
Expand All @@ -145,9 +166,12 @@ fn make_route(table_info: TableInfo, endpoint: Option<&str>) -> Result<RouteData
#[async_trait]
impl Router for ClusterBasedRouter {
async fn route(&self, req: RouteRequest) -> Result<Vec<Route>> {
let req_ctx = req.context.unwrap();
let route_data_vec = self.route_with_cache(&req.tables, req_ctx.database).await?;
Ok(route_data_vec
let req_ctx = req.inner.context.unwrap();
let route_datas = self
.route_internal(&req.inner.tables, req_ctx.database, req.route_with_cache)
.await?;

Ok(route_datas
.into_iter()
.map(|v| Route {
table: v.table_info.name,
Expand All @@ -158,7 +182,7 @@ impl Router for ClusterBasedRouter {

async fn fetch_table_info(&self, schema: &str, table: &str) -> Result<Option<TableInfo>> {
let mut route_data_vec = self
.route_with_cache(&vec![table.to_string()], schema.to_string())
.route_internal(&[table.to_string()], schema.to_string(), true)
.await?;
if route_data_vec.is_empty() {
return Ok(None);
Expand All @@ -174,7 +198,7 @@ impl Router for ClusterBasedRouter {
mod tests {
use std::{collections::HashMap, sync::Arc, thread::sleep, time::Duration};

use ceresdbproto::storage::RequestContext;
use ceresdbproto::storage::{RequestContext, RouteRequest as RouteRequestPb};
use cluster::{
shard_lock_manager::ShardLockManagerRef, shard_set::ShardRef, Cluster, ClusterNodesResp,
};
Expand Down Expand Up @@ -275,14 +299,15 @@ mod tests {

// first case get two tables, no one miss
let tables = vec![table1.to_string(), table2.to_string()];
let result = router
.route(RouteRequest {
context: Some(RequestContext {
database: String::from("public"),
}),
tables: tables.clone(),
})
.await;
let request_pb = RouteRequestPb {
context: Some(RequestContext {
database: String::from("public"),
}),
tables: tables.clone(),
};
let request = RouteRequest::new(request_pb, true);

let result = router.route(request).await;
assert_eq!(result.unwrap().len(), 2);

let mut routes = Vec::with_capacity(tables.len());
Expand Down
16 changes: 15 additions & 1 deletion router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod rule_based;
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use ceresdbproto::storage::{Route, RouteRequest};
use ceresdbproto::storage::{Route, RouteRequest as RouteRequestPb};
pub use cluster_based::ClusterBasedRouter;
use macros::define_result;
use meta_client::types::TableInfo;
Expand Down Expand Up @@ -79,6 +79,20 @@ pub trait Router {
async fn fetch_table_info(&self, schema: &str, table: &str) -> Result<Option<TableInfo>>;
}

pub struct RouteRequest {
pub route_with_cache: bool,
pub inner: RouteRequestPb,
}

impl RouteRequest {
pub fn new(request: RouteRequestPb, route_with_cache: bool) -> Self {
Self {
route_with_cache,
inner: request,
}
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RouteCacheConfig {
/// Enable route cache, default false.
Expand Down
12 changes: 7 additions & 5 deletions router/src/rule_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
use std::collections::HashMap;

use async_trait::async_trait;
use ceresdbproto::storage::{self, Route, RouteRequest};
use ceresdbproto::storage::{self, Route};
use cluster::config::SchemaConfig;
use log::info;
use meta_client::types::ShardId;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt};

use crate::{endpoint::Endpoint, hash, Result, RouteNotFound, Router, ShardNotFound, TableInfo};
use crate::{
endpoint::Endpoint, hash, Result, RouteNotFound, RouteRequest, Router, ShardNotFound, TableInfo,
};

pub type ShardNodes = HashMap<ShardId, Endpoint>;

Expand Down Expand Up @@ -151,7 +153,7 @@ impl RuleBasedRouter {
#[async_trait]
impl Router for RuleBasedRouter {
async fn route(&self, req: RouteRequest) -> Result<Vec<Route>> {
let req_ctx = req.context.unwrap();
let req_ctx = req.inner.context.unwrap();
let schema = &req_ctx.database;
if let Some(shard_nodes) = self.cluster_view.schema_shards.get(schema) {
ensure!(!shard_nodes.is_empty(), RouteNotFound { schema });
Expand All @@ -161,8 +163,8 @@ impl Router for RuleBasedRouter {

// TODO(yingwen): Better way to get total shard number
let total_shards = shard_nodes.len();
let mut route_results = Vec::with_capacity(req.tables.len());
for table in req.tables {
let mut route_results = Vec::with_capacity(req.inner.tables.len());
for table in req.inner.tables {
let shard_id = Self::route_table(&table, rule_list_opt, total_shards);

let endpoint = shard_nodes.get(&shard_id).with_context(|| ShardNotFound {
Expand Down

0 comments on commit 0f99e23

Please sign in to comment.