diff --git a/Cargo.lock b/Cargo.lock index dd82669f85..a5a8f63057 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1083,11 +1083,11 @@ dependencies = [ [[package]] name = "ceresdb-client-rs" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdb-client-rs.git?rev=a72e673103463c7962e01a097592fc7edbcc0b79#a72e673103463c7962e01a097592fc7edbcc0b79" +source = "git+https://github.com/CeresDB/ceresdb-client-rs.git?rev=5fbd1a1526c3ddd25bb1f38f63f869c892052f7c#5fbd1a1526c3ddd25bb1f38f63f869c892052f7c" dependencies = [ "arrow 23.0.0", "async-trait", - "ceresdbproto", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=43939799b2e65e3fc5795118fc77593f7c4b19d7)", "dashmap 5.4.0", "futures 0.3.25", "paste 1.0.8", @@ -1111,7 +1111,18 @@ dependencies = [ [[package]] name = "ceresdbproto" version = "0.1.0" -source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=55495dd395d12a1f97f6c98e355290671d107c44#55495dd395d12a1f97f6c98e355290671d107c44" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=43939799b2e65e3fc5795118fc77593f7c4b19d7#43939799b2e65e3fc5795118fc77593f7c4b19d7" +dependencies = [ + "prost", + "protoc-bin-vendored", + "tonic", + "tonic-build", +] + +[[package]] +name = "ceresdbproto" +version = "0.1.0" +source = "git+https://github.com/CeresDB/ceresdbproto.git?rev=dd6921dbb59bbe9a2fcc27960e76a8c870b44415#dd6921dbb59bbe9a2fcc27960e76a8c870b44415" dependencies = [ "prost", "protoc-bin-vendored", @@ -1250,7 +1261,7 @@ name = "cluster" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=dd6921dbb59bbe9a2fcc27960e76a8c870b44415)", "common_types", "common_util", "log", @@ -3548,7 +3559,7 @@ name = "meta_client" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=dd6921dbb59bbe9a2fcc27960e76a8c870b44415)", "common_types", "common_util", "futures 0.3.25", @@ -5218,7 +5229,7 @@ version = "1.0.0-alpha02" dependencies = [ "arrow_ext", "async-trait", - "ceresdbproto", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=dd6921dbb59bbe9a2fcc27960e76a8c870b44415)", "clru", "common_types", "common_util", @@ -5343,7 +5354,7 @@ name = "router" version = "1.0.0-alpha02" dependencies = [ "async-trait", - "ceresdbproto", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=dd6921dbb59bbe9a2fcc27960e76a8c870b44415)", "cluster", "common_types", "common_util", @@ -5683,7 +5694,7 @@ dependencies = [ "async-trait", "bytes 1.2.1", "catalog", - "ceresdbproto", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=dd6921dbb59bbe9a2fcc27960e76a8c870b44415)", "cluster", "common_types", "common_util", @@ -6013,7 +6024,7 @@ dependencies = [ "arrow 31.0.0", "async-trait", "catalog", - "ceresdbproto", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=dd6921dbb59bbe9a2fcc27960e76a8c870b44415)", "common_types", "common_util", "datafusion", @@ -6292,7 +6303,7 @@ version = "1.0.0-alpha02" dependencies = [ "arrow 31.0.0", "async-trait", - "ceresdbproto", + "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=dd6921dbb59bbe9a2fcc27960e76a8c870b44415)", "common_types", "common_util", "datafusion", diff --git a/Cargo.toml b/Cargo.toml index 3f9cc97f49..0d0f85d63b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ zstd = { version = "0.12", default-features = false } [workspace.dependencies.ceresdbproto] git = "https://github.com/CeresDB/ceresdbproto.git" -rev = "55495dd395d12a1f97f6c98e355290671d107c44" +rev = "dd6921dbb59bbe9a2fcc27960e76a8c870b44415" [dependencies] analytic_engine = { workspace = true } diff --git a/docs/minimal.toml b/docs/minimal.toml index 5532a48889..14e03dc3d0 100644 --- a/docs/minimal.toml +++ b/docs/minimal.toml @@ -4,9 +4,6 @@ http_port = 5440 grpc_port = 8831 log_level = "info" -[analytic] -wal_path = "/tmp/ceresdb" - [analytic.storage] mem_cache_capacity = '1G' mem_cache_partition_bits = 0 @@ -15,6 +12,10 @@ mem_cache_partition_bits = 0 type = "Local" data_path = "/tmp/ceresdb" +[analytic.wal_storage] +type = "RocksDB" +path = "/tmp/ceresdb" + [[static_route.topology.schema_shards]] schema = 'public' auto_create_tables = true diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index d98a2b2da8..031bd0bdfe 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -8,6 +8,6 @@ workspace = true [dependencies] anyhow = "1.0.58" async-trait = "0.1" -ceresdb-client-rs = { git = "https://github.com/CeresDB/ceresdb-client-rs.git", rev = "a72e673103463c7962e01a097592fc7edbcc0b79" } +ceresdb-client-rs = { git = "https://github.com/CeresDB/ceresdb-client-rs.git", rev = "5fbd1a1526c3ddd25bb1f38f63f869c892052f7c" } sqlness = "0.3" tokio = { workspace = true } diff --git a/integration_tests/src/database.rs b/integration_tests/src/database.rs index cf075288de..c2e2eb0e52 100644 --- a/integration_tests/src/database.rs +++ b/integration_tests/src/database.rs @@ -74,7 +74,10 @@ impl CeresDB { } async fn execute(query: String, client: Arc) -> Box { - let query_ctx = RpcContext::new("public".to_string(), "".to_string()); + let query_ctx = RpcContext { + database: Some("public".to_string()), + timeout: None, + }; let query_req = Request { tables: vec![], sql: query, diff --git a/remote_engine_client/src/cached_router.rs b/remote_engine_client/src/cached_router.rs index 8b1002ec67..bfb6cab176 100644 --- a/remote_engine_client/src/cached_router.rs +++ b/remote_engine_client/src/cached_router.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; -use ceresdbproto::storage; +use ceresdbproto::storage::{self, RequestContext}; use log::debug; use router::RouterRef; use snafu::{OptionExt, ResultExt}; @@ -89,15 +89,18 @@ impl CachedRouter { let schema = &table_ident.schema; let table = table_ident.table.clone(); let route_request = storage::RouteRequest { + context: Some(RequestContext { + database: schema.to_string(), + }), tables: vec![table], }; - let route_infos = - self.router - .route(schema, route_request) - .await - .context(RouteWithCause { - table_ident: table_ident.clone(), - })?; + let route_infos = self + .router + .route(route_request) + .await + .context(RouteWithCause { + table_ident: table_ident.clone(), + })?; if route_infos.is_empty() { return RouteNoCause { diff --git a/router/src/cluster_based.rs b/router/src/cluster_based.rs index 0ad02ab2da..e437e5cf67 100644 --- a/router/src/cluster_based.rs +++ b/router/src/cluster_based.rs @@ -34,9 +34,10 @@ fn make_route(table_name: &str, endpoint: &str) -> Result { #[async_trait] impl Router for ClusterBasedRouter { - async fn route(&self, schema: &str, req: RouteRequest) -> Result> { + async fn route(&self, req: RouteRequest) -> Result> { + let req_ctx = req.context.unwrap(); let route_tables_req = RouteTablesRequest { - schema_name: schema.to_string(), + schema_name: req_ctx.database, table_names: req.tables, }; let route_resp = self diff --git a/router/src/lib.rs b/router/src/lib.rs index cf47990e22..8153dbf1df 100644 --- a/router/src/lib.rs +++ b/router/src/lib.rs @@ -61,5 +61,5 @@ pub type RouterRef = Arc; #[async_trait] pub trait Router { - async fn route(&self, schema: &str, req: RouteRequest) -> Result>; + async fn route(&self, req: RouteRequest) -> Result>; } diff --git a/router/src/rule_based.rs b/router/src/rule_based.rs index 61c9428dd0..afa6e31b39 100644 --- a/router/src/rule_based.rs +++ b/router/src/rule_based.rs @@ -138,7 +138,9 @@ impl RuleBasedRouter { #[async_trait] impl Router for RuleBasedRouter { - async fn route(&self, schema: &str, req: RouteRequest) -> Result> { + async fn route(&self, req: RouteRequest) -> Result> { + let req_ctx = req.context.unwrap(); + let schema = &req_ctx.database; if let Some(shard_nodes) = self.cluster_view.schema_shards.get(schema) { ensure!(!shard_nodes.is_empty(), RouteNotFound { schema }); diff --git a/server/src/grpc/forward.rs b/server/src/grpc/forward.rs index e09084521f..dd06936def 100644 --- a/server/src/grpc/forward.rs +++ b/server/src/grpc/forward.rs @@ -9,7 +9,9 @@ use std::{ }; use async_trait::async_trait; -use ceresdbproto::storage::{storage_service_client::StorageServiceClient, RouteRequest}; +use ceresdbproto::storage::{ + storage_service_client::StorageServiceClient, RequestContext, RouteRequest, +}; use log::{debug, error, warn}; use router::{endpoint::Endpoint, RouterRef}; use serde_derive::Deserialize; @@ -19,8 +21,6 @@ use tonic::{ transport::{self, Channel}, }; -use crate::consts::SCHEMA_HEADER; - #[derive(Debug, Snafu)] pub enum Error { #[snafu(display( @@ -286,10 +286,11 @@ impl Forwarder { } = forward_req; let route_req = RouteRequest { + context: Some(RequestContext { database: schema }), tables: vec![table], }; - let endpoint = match self.router.route(&schema, route_req).await { + let endpoint = match self.router.route(route_req).await { Ok(mut routes) => { if routes.len() != 1 || routes[0].endpoint.is_none() { warn!( @@ -315,11 +316,6 @@ impl Forwarder { { // TODO: we should use the timeout from the original request. req.set_timeout(self.config.forward_timeout); - let metadata = req.metadata_mut(); - metadata.insert( - SCHEMA_HEADER, - schema.parse().context(InvalidSchema { schema })?, - ); } // TODO: add metrics to record the forwarding. @@ -364,6 +360,7 @@ impl Forwarder { #[cfg(test)] mod tests { + use catalog::consts::DEFAULT_SCHEMA; use ceresdbproto::storage::{Route, SqlQueryRequest, SqlQueryResponse}; use futures::FutureExt; use router::Router; @@ -397,7 +394,7 @@ mod tests { #[async_trait] impl Router for MockRouter { - async fn route(&self, _schema: &str, req: RouteRequest) -> router::Result> { + async fn route(&self, req: RouteRequest) -> router::Result> { let endpoint = self.routing_tables.get(&req.tables[0]); match endpoint { None => Ok(vec![]), @@ -463,20 +460,22 @@ mod tests { let make_forward_req = |table: &str| { let query_request = SqlQueryRequest { + context: Some(RequestContext { + database: DEFAULT_SCHEMA.to_string(), + }), tables: vec![table.to_string()], sql: "".to_string(), }; ForwardRequest { - schema: "public".to_string(), + schema: DEFAULT_SCHEMA.to_string(), table: table.to_string(), req: query_request.into_request(), } }; let do_rpc = |_client, req: tonic::Request, endpoint: &Endpoint| { - let schema = req.metadata().get(SCHEMA_HEADER).unwrap().to_str().unwrap(); - assert_eq!(schema, "public"); let req = req.into_inner(); + assert_eq!(req.context.unwrap().database, DEFAULT_SCHEMA); let expect_endpoint = mock_router.routing_tables.get(&req.tables[0]).unwrap(); assert_eq!(expect_endpoint, endpoint); diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index 3640e99eaf..591267209d 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -10,12 +10,10 @@ use std::{ }; use async_trait::async_trait; -use ceresdbproto::{ - prometheus::{PrometheusQueryRequest, PrometheusQueryResponse}, - storage::{ - storage_service_server::StorageService, value::Value, RouteRequest, RouteResponse, - SqlQueryRequest, SqlQueryResponse, WriteRequest, WriteResponse, WriteTableRequest, - }, +use ceresdbproto::storage::{ + storage_service_server::StorageService, value::Value, PrometheusQueryRequest, + PrometheusQueryResponse, RouteRequest, RouteResponse, SqlQueryRequest, SqlQueryResponse, + WriteRequest, WriteResponse, WriteTableRequest, }; use cluster::config::SchemaConfig; use common_types::{ @@ -40,7 +38,6 @@ use tonic::metadata::{KeyAndValueRef, MetadataMap}; use self::sql_query::{QueryResponseBuilder, QueryResponseWriter}; use crate::{ - consts, grpc::{ forward::ForwarderRef, metrics::GRPC_HANDLER_DURATION_HISTOGRAM_VEC, @@ -59,6 +56,8 @@ pub(crate) mod write; const STREAM_QUERY_CHANNEL_LEN: usize = 20; /// Rpc request header +/// Tenant/token will be saved in header in future +#[allow(dead_code)] #[derive(Debug, Default)] pub struct RequestHeader { metas: HashMap>, @@ -88,6 +87,7 @@ impl From<&MetadataMap> for RequestHeader { } impl RequestHeader { + #[allow(dead_code)] pub fn get(&self, key: &str) -> Option<&[u8]> { self.metas.get(key).map(|v| v.as_slice()) } @@ -99,8 +99,7 @@ pub struct HandlerContext<'a, Q> { router: RouterRef, instance: InstanceRef, catalog: String, - schema: String, - schema_config: Option<&'a SchemaConfig>, + schema_config_provider: &'a SchemaConfigProviderRef, forwarder: Option, timeout: Option, resp_compress_min_length: usize, @@ -116,62 +115,26 @@ impl<'a, Q> HandlerContext<'a, Q> { forwarder: Option, timeout: Option, resp_compress_min_length: usize, - ) -> Result { - let default_catalog = instance.catalog_manager.default_catalog_name(); - let default_schema = instance.catalog_manager.default_schema_name(); - - let catalog = header - .get(consts::CATALOG_HEADER) - .map(|v| String::from_utf8(v.to_vec())) - .transpose() - .box_err() - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "fail to parse catalog name", - })? - .unwrap_or_else(|| default_catalog.to_string()); - - let schema = header - .get(consts::SCHEMA_HEADER) - .map(|v| String::from_utf8(v.to_vec())) - .transpose() - .box_err() - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "fail to parse schema name", - })? - .unwrap_or_else(|| default_schema.to_string()); + ) -> Self { + // catalog is not exposed to protocol layer + let catalog = instance.catalog_manager.default_catalog_name().to_string(); - let schema_config = schema_config_provider - .schema_config(&schema) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: format!("fail to fetch schema config, schema_name:{}", schema), - })?; - - Ok(Self { + Self { header, router, instance, catalog, - schema, - schema_config, + schema_config_provider, forwarder, timeout, resp_compress_min_length, - }) + } } #[inline] fn catalog(&self) -> &str { &self.catalog } - - #[inline] - fn schema(&self) -> &str { - &self.schema - } } #[derive(Clone)] @@ -212,14 +175,17 @@ macro_rules! handle_request { let schema_config_provider = self.schema_config_provider.clone(); // we need to pass the result via channel let join_handle = runtime.spawn(async move { + let req = request.into_inner(); + if req.context.is_none() { + ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: "database is not set", + } + .fail()? + } let handler_ctx = - HandlerContext::new(header, router, instance, &schema_config_provider, forwarder, timeout, resp_compress_min_length) - .box_err() - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "invalid header", - })?; - $mod_name::$handle_fn(&handler_ctx, request.into_inner()) + HandlerContext::new(header, router, instance, &schema_config_provider, forwarder, timeout, resp_compress_min_length); + $mod_name::$handle_fn(&handler_ctx, req) .await .map_err(|e| { error!( @@ -260,6 +226,9 @@ macro_rules! handle_request { } impl StorageServiceImpl { + // `RequestContext` is ensured in `handle_request` macro, so handler + // can just use it with unwrap() + handle_request!(route, handle_route, RouteRequest, RouteResponse); handle_request!(write, handle_write, WriteRequest, WriteResponse); @@ -282,7 +251,6 @@ impl StorageServiceImpl { let header = RequestHeader::from(request.metadata()); let instance = self.instance.clone(); let schema_config_provider = self.schema_config_provider.clone(); - let handler_ctx = HandlerContext::new( header, router, @@ -291,12 +259,7 @@ impl StorageServiceImpl { self.forwarder.clone(), self.timeout, self.resp_compress_min_length, - ) - .box_err() - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "invalid header", - })?; + ); let mut total_success = 0; let mut resp = WriteResponse::default(); @@ -355,26 +318,28 @@ impl StorageServiceImpl { let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN); let _: JoinHandle> = self.runtimes.read_runtime.spawn(async move { - let handler_ctx = HandlerContext::new(header, router, instance, &schema_config_provider, forwarder, timeout, resp_compress_min_length) - .box_err() - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "invalid header", - })?; - + let handler_ctx = HandlerContext::new( + header, + router, + instance, + &schema_config_provider, + forwarder, + timeout, + resp_compress_min_length, + ); let query_req = request.into_inner(); let output = sql_query::fetch_query_output(&handler_ctx, &query_req) - .await - .map_err(|e| { - error!("Failed to handle request, mod:stream_query, handler:handle_stream_query, err:{}", e); - e - })?; + .await + .map_err(|e| { + error!("Failed to handle request, mod:stream_query, handler:handle_stream_query, err:{}", e); + e + })?; match output { Output::AffectedRows(rows) => { - let resp = QueryResponseBuilder::with_ok_header().build_with_affected_rows(rows); - if tx.send(Ok(resp)).await.is_err() { - error!("Failed to send affected rows resp in stream query"); - } + let resp = QueryResponseBuilder::with_ok_header().build_with_affected_rows(rows); + if tx.send(Ok(resp)).await.is_err() { + error!("Failed to send affected rows resp in stream query"); + } } Output::Records(batches) => { for batch in &batches { diff --git a/server/src/grpc/storage_service/prom_query.rs b/server/src/grpc/storage_service/prom_query.rs index 05789c59b4..96f64b67af 100644 --- a/server/src/grpc/storage_service/prom_query.rs +++ b/server/src/grpc/storage_service/prom_query.rs @@ -8,7 +8,8 @@ use std::{ use ceresdbproto::{ common::ResponseHeader, - prometheus::{Label, PrometheusQueryRequest, PrometheusQueryResponse, Sample, TimeSeries}, + prometheus::{Label, Sample, TimeSeries}, + storage::{PrometheusQueryRequest, PrometheusQueryResponse}, }; use common_types::{ datum::DatumKind, @@ -49,13 +50,14 @@ where let request_id = RequestId::next_id(); let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); + let req_ctx = req.context.unwrap(); + let schema = req_ctx.database; debug!( - "Grpc handle query begin, catalog:{}, schema:{}, request_id:{}, request:{:?}", + "Grpc handle query begin, catalog:{}, schema:{}, request_id:{}", ctx.catalog(), - ctx.schema(), + &schema, request_id, - req, ); let instance = &ctx.instance; @@ -64,14 +66,14 @@ where let provider = CatalogMetaProvider { manager: instance.catalog_manager.clone(), default_catalog: ctx.catalog(), - default_schema: ctx.schema(), + default_schema: &schema, function_registry: &*instance.function_registry, }; let frontend = Frontend::new(provider); let mut sql_ctx = SqlContext::new(request_id, deadline); let expr = frontend - .parse_promql(&mut sql_ctx, req) + .parse_promql(&mut sql_ctx, req.expr) .box_err() .context(ErrWithCause { code: StatusCode::BAD_REQUEST, @@ -105,7 +107,7 @@ where // Execute in interpreter let interpreter_ctx = InterpreterContext::builder(request_id, deadline) // Use current ctx's catalog and schema as default catalog and schema - .default_catalog_and_schema(ctx.catalog().to_string(), ctx.schema().to_string()) + .default_catalog_and_schema(ctx.catalog().to_string(), schema.to_string()) .build(); let interpreter_factory = Factory::new( instance.query_executor.clone(), diff --git a/server/src/grpc/storage_service/route.rs b/server/src/grpc/storage_service/route.rs index a5b57869dd..be34143295 100644 --- a/server/src/grpc/storage_service/route.rs +++ b/server/src/grpc/storage_service/route.rs @@ -13,7 +13,7 @@ pub async fn handle_route( ctx: &HandlerContext<'_, Q>, req: RouteRequest, ) -> Result { - let routes = ctx.router.route(ctx.schema(), req).await?; + let routes = ctx.router.route(req).await?; let resp = RouteResponse { header: Some(error::build_ok_header()), diff --git a/server/src/grpc/storage_service/sql_query.rs b/server/src/grpc/storage_service/sql_query.rs index c3f60214df..143d5346d6 100644 --- a/server/src/grpc/storage_service/sql_query.rs +++ b/server/src/grpc/storage_service/sql_query.rs @@ -92,8 +92,9 @@ async fn maybe_forward_query( return None; } + let req_ctx = req.context.as_ref().unwrap(); let forward_req = ForwardRequest { - schema: ctx.schema.clone(), + schema: req_ctx.database.clone(), table: req.tables[0].clone(), req: req.clone().into_request(), }; @@ -155,20 +156,21 @@ pub async fn fetch_query_output( let deadline = ctx.timeout.map(|t| begin_instant + t); info!( - "Grpc handle query begin, catalog:{}, schema:{}, request_id:{}, request:{:?}", + "Grpc handle query begin, catalog:{}, request_id:{}, request:{:?}", ctx.catalog(), - ctx.schema(), request_id, req, ); + let req_ctx = req.context.as_ref().unwrap(); + let schema = &req_ctx.database; let instance = &ctx.instance; // TODO(yingwen): Privilege check, cannot access data of other tenant // TODO(yingwen): Maybe move MetaProvider to instance let provider = CatalogMetaProvider { manager: instance.catalog_manager.clone(), default_catalog: ctx.catalog(), - default_schema: ctx.schema(), + default_schema: schema, function_registry: &*instance.function_registry, }; let frontend = Frontend::new(provider); @@ -240,7 +242,7 @@ pub async fn fetch_query_output( // Execute in interpreter let interpreter_ctx = InterpreterContext::builder(request_id, deadline) // Use current ctx's catalog and schema as default catalog and schema - .default_catalog_and_schema(ctx.catalog().to_string(), ctx.schema().to_string()) + .default_catalog_and_schema(ctx.catalog().to_string(), schema.to_string()) .build(); let interpreter_factory = Factory::new( instance.query_executor.clone(), @@ -279,7 +281,7 @@ pub async fn fetch_query_output( info!( "Grpc handle query success, catalog:{}, schema:{}, request_id:{}, cost:{}ms, request:{:?}", ctx.catalog(), - ctx.schema(), + schema, request_id, begin_instant.saturating_elapsed().as_millis(), req, diff --git a/server/src/grpc/storage_service/write.rs b/server/src/grpc/storage_service/write.rs index 34c35e0d85..e4850c2574 100644 --- a/server/src/grpc/storage_service/write.rs +++ b/server/src/grpc/storage_service/write.rs @@ -45,7 +45,17 @@ pub(crate) async fn handle_write( let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); let catalog = ctx.catalog(); - let schema = ctx.schema(); + let req_ctx = req.context.unwrap(); + let schema = req_ctx.database; + let schema_config = ctx + .schema_config_provider + .schema_config(&schema) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!("fail to fetch schema config, schema_name:{}", schema), + })?; + debug!( "Grpc handle write begin, catalog:{}, schema:{}, request_id:{}, first_table:{:?}, num_tables:{}", catalog, @@ -60,10 +70,10 @@ pub(crate) async fn handle_write( let plan_vec = write_request_to_insert_plan( request_id, catalog, - schema, + &schema, ctx.instance.clone(), - req, - ctx.schema_config, + req.table_requests, + schema_config, deadline, ) .await?; @@ -73,7 +83,7 @@ pub(crate) async fn handle_write( success += execute_plan( request_id, catalog, - schema, + &schema, ctx.instance.clone(), insert_plan, deadline, @@ -160,13 +170,13 @@ pub async fn write_request_to_insert_plan( catalog: &str, schema: &str, instance: InstanceRef, - write_request: WriteRequest, + table_requests: Vec, schema_config: Option<&SchemaConfig>, deadline: Option, ) -> Result> { - let mut plan_vec = Vec::with_capacity(write_request.table_requests.len()); + let mut plan_vec = Vec::with_capacity(table_requests.len()); - for write_table_req in write_request.table_requests { + for write_table_req in table_requests { let table_name = &write_table_req.table; let mut table = try_get_table(catalog, schema, instance.clone(), table_name)?; diff --git a/server/src/handlers/prom.rs b/server/src/handlers/prom.rs index 5ce7fddfee..b6e859abff 100644 --- a/server/src/handlers/prom.rs +++ b/server/src/handlers/prom.rs @@ -8,8 +8,7 @@ use std::{collections::HashMap, time::Instant}; use async_trait::async_trait; use ceresdbproto::storage::{ - value, Field, FieldGroup, Tag, Value, WriteRequest as WriteRequestPb, WriteSeriesEntry, - WriteTableRequest, + value, Field, FieldGroup, Tag, Value, WriteSeriesEntry, WriteTableRequest, }; use common_types::{ datum::DatumKind, @@ -162,7 +161,7 @@ impl CeresDBStorage { Ok((metric, filters)) } - fn convert_write_request(req: WriteRequest) -> Result { + fn convert_write_request(req: WriteRequest) -> Result> { let mut req_by_metric = HashMap::new(); for timeseries in req.timeseries { let (metric, labels) = Self::normalize_labels(timeseries.labels)?; @@ -207,9 +206,7 @@ impl CeresDBStorage { }); } - Ok(WriteRequestPb { - table_requests: req_by_metric.into_values().collect(), - }) + Ok(req_by_metric.into_values().collect()) } fn convert_query_result(metric: String, resp: Output) -> Result { diff --git a/sql/src/frontend.rs b/sql/src/frontend.rs index 1c0a4d9a00..e46fc92da2 100644 --- a/sql/src/frontend.rs +++ b/sql/src/frontend.rs @@ -4,7 +4,7 @@ use std::{sync::Arc, time::Instant}; -use ceresdbproto::prometheus::PrometheusQueryRequest; +use ceresdbproto::prometheus::Expr as PromExpr; use common_types::request_id::RequestId; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table; @@ -85,8 +85,8 @@ impl

Frontend

{ } /// Parse the request and returns the Expr - pub fn parse_promql(&self, _ctx: &mut Context, req: PrometheusQueryRequest) -> Result { - let expr = req.expr.context(ExprNotFoundInPromRequest)?; + pub fn parse_promql(&self, _ctx: &mut Context, expr: Option) -> Result { + let expr = expr.context(ExprNotFoundInPromRequest)?; Expr::try_from(expr).context(InvalidPromRequest) } }