Skip to content

Commit

Permalink
feat: support request context (apache#636)
Browse files Browse the repository at this point in the history
* feat: support new request context

* fix clippy

* fix message

* fix wong wal location

* remove schema from route trait

* fix clippy

* make CI happy

* refactor by CR

---------

Co-authored-by: chunshao.rcs <[email protected]>
  • Loading branch information
jiacai2050 and chunshao90 authored Feb 13, 2023
1 parent 5b60525 commit 45ebebf
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 151 deletions.
31 changes: 21 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions docs/minimal.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ http_port = 5440
grpc_port = 8831
log_level = "info"

[analytic]
wal_path = "/tmp/ceresdb"

[analytic.storage]
mem_cache_capacity = '1G'
mem_cache_partition_bits = 0
Expand All @@ -15,6 +12,10 @@ mem_cache_partition_bits = 0
type = "Local"
data_path = "/tmp/ceresdb"

[analytic.wal_storage]
type = "RocksDB"
path = "/tmp/ceresdb"

[[static_route.topology.schema_shards]]
schema = 'public'
auto_create_tables = true
Expand Down
5 changes: 4 additions & 1 deletion integration_tests/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ impl CeresDB {
}

async fn execute(query: String, client: Arc<dyn DbClient>) -> Box<dyn Display> {
let query_ctx = RpcContext::new("public".to_string(), "".to_string());
let query_ctx = RpcContext {
database: Some("public".to_string()),
timeout: None,
};
let query_req = Request {
tables: vec![],
sql: query,
Expand Down
19 changes: 11 additions & 8 deletions remote_engine_client/src/cached_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::collections::HashMap;

use ceresdbproto::storage;
use ceresdbproto::storage::{self, RequestContext};
use log::debug;
use router::RouterRef;
use snafu::{OptionExt, ResultExt};
Expand Down Expand Up @@ -89,15 +89,18 @@ impl CachedRouter {
let schema = &table_ident.schema;
let table = table_ident.table.clone();
let route_request = storage::RouteRequest {
context: Some(RequestContext {
database: schema.to_string(),
}),
tables: vec![table],
};
let route_infos =
self.router
.route(schema, route_request)
.await
.context(RouteWithCause {
table_ident: table_ident.clone(),
})?;
let route_infos = self
.router
.route(route_request)
.await
.context(RouteWithCause {
table_ident: table_ident.clone(),
})?;

if route_infos.is_empty() {
return RouteNoCause {
Expand Down
5 changes: 3 additions & 2 deletions router/src/cluster_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ fn make_route(table_name: &str, endpoint: &str) -> Result<Route> {

#[async_trait]
impl Router for ClusterBasedRouter {
async fn route(&self, schema: &str, req: RouteRequest) -> Result<Vec<Route>> {
async fn route(&self, req: RouteRequest) -> Result<Vec<Route>> {
let req_ctx = req.context.unwrap();
let route_tables_req = RouteTablesRequest {
schema_name: schema.to_string(),
schema_name: req_ctx.database,
table_names: req.tables,
};
let route_resp = self
Expand Down
2 changes: 1 addition & 1 deletion router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,5 @@ pub type RouterRef = Arc<dyn Router + Sync + Send>;

#[async_trait]
pub trait Router {
async fn route(&self, schema: &str, req: RouteRequest) -> Result<Vec<Route>>;
async fn route(&self, req: RouteRequest) -> Result<Vec<Route>>;
}
4 changes: 3 additions & 1 deletion router/src/rule_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ impl RuleBasedRouter {

#[async_trait]
impl Router for RuleBasedRouter {
async fn route(&self, schema: &str, req: RouteRequest) -> Result<Vec<Route>> {
async fn route(&self, req: RouteRequest) -> Result<Vec<Route>> {
let req_ctx = req.context.unwrap();
let schema = &req_ctx.database;
if let Some(shard_nodes) = self.cluster_view.schema_shards.get(schema) {
ensure!(!shard_nodes.is_empty(), RouteNotFound { schema });

Expand Down
25 changes: 12 additions & 13 deletions server/src/grpc/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use std::{
};

use async_trait::async_trait;
use ceresdbproto::storage::{storage_service_client::StorageServiceClient, RouteRequest};
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, RequestContext, RouteRequest,
};
use log::{debug, error, warn};
use router::{endpoint::Endpoint, RouterRef};
use serde_derive::Deserialize;
Expand All @@ -19,8 +21,6 @@ use tonic::{
transport::{self, Channel},
};

use crate::consts::SCHEMA_HEADER;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
Expand Down Expand Up @@ -286,10 +286,11 @@ impl<B: ClientBuilder> Forwarder<B> {
} = forward_req;

let route_req = RouteRequest {
context: Some(RequestContext { database: schema }),
tables: vec![table],
};

let endpoint = match self.router.route(&schema, route_req).await {
let endpoint = match self.router.route(route_req).await {
Ok(mut routes) => {
if routes.len() != 1 || routes[0].endpoint.is_none() {
warn!(
Expand All @@ -315,11 +316,6 @@ impl<B: ClientBuilder> Forwarder<B> {
{
// TODO: we should use the timeout from the original request.
req.set_timeout(self.config.forward_timeout);
let metadata = req.metadata_mut();
metadata.insert(
SCHEMA_HEADER,
schema.parse().context(InvalidSchema { schema })?,
);
}

// TODO: add metrics to record the forwarding.
Expand Down Expand Up @@ -364,6 +360,7 @@ impl<B: ClientBuilder> Forwarder<B> {

#[cfg(test)]
mod tests {
use catalog::consts::DEFAULT_SCHEMA;
use ceresdbproto::storage::{Route, SqlQueryRequest, SqlQueryResponse};
use futures::FutureExt;
use router::Router;
Expand Down Expand Up @@ -397,7 +394,7 @@ mod tests {

#[async_trait]
impl Router for MockRouter {
async fn route(&self, _schema: &str, req: RouteRequest) -> router::Result<Vec<Route>> {
async fn route(&self, req: RouteRequest) -> router::Result<Vec<Route>> {
let endpoint = self.routing_tables.get(&req.tables[0]);
match endpoint {
None => Ok(vec![]),
Expand Down Expand Up @@ -463,20 +460,22 @@ mod tests {

let make_forward_req = |table: &str| {
let query_request = SqlQueryRequest {
context: Some(RequestContext {
database: DEFAULT_SCHEMA.to_string(),
}),
tables: vec![table.to_string()],
sql: "".to_string(),
};
ForwardRequest {
schema: "public".to_string(),
schema: DEFAULT_SCHEMA.to_string(),
table: table.to_string(),
req: query_request.into_request(),
}
};

let do_rpc = |_client, req: tonic::Request<SqlQueryRequest>, endpoint: &Endpoint| {
let schema = req.metadata().get(SCHEMA_HEADER).unwrap().to_str().unwrap();
assert_eq!(schema, "public");
let req = req.into_inner();
assert_eq!(req.context.unwrap().database, DEFAULT_SCHEMA);
let expect_endpoint = mock_router.routing_tables.get(&req.tables[0]).unwrap();
assert_eq!(expect_endpoint, endpoint);

Expand Down
Loading

0 comments on commit 45ebebf

Please sign in to comment.