diff --git a/Cargo.lock b/Cargo.lock index 4113c6f8c7d4..77e4c87ca4af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6323,6 +6323,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datatypes", + "itertools 0.10.5", "lazy_static", "meta-client", "moka", diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index c9fd60e9ee4e..11cb3df96b71 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -164,6 +164,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to find table partitions: #{table}"))] + FindPartitions { + source: partition::error::Error, + table: String, + }, + + #[snafu(display("Failed to find region routes"))] + FindRegionRoutes { source: partition::error::Error }, + #[snafu(display("Failed to read system catalog table records"))] ReadSystemCatalog { location: Location, @@ -254,11 +263,14 @@ impl ErrorExt for Error { match self { Error::InvalidKey { .. } | Error::SchemaNotFound { .. } - | Error::TableNotFound { .. } | Error::CatalogNotFound { .. } + | Error::FindPartitions { .. } + | Error::FindRegionRoutes { .. } | Error::InvalidEntryType { .. } | Error::ParallelOpenTable { .. } => StatusCode::Unexpected, + Error::TableNotFound { .. } => StatusCode::TableNotFound, + Error::SystemCatalog { .. } | Error::EmptyValue { .. } | Error::ValueDeserialize { .. } => StatusCode::StorageUnavailable, diff --git a/src/catalog/src/information_schema.rs b/src/catalog/src/information_schema.rs index 7abc4797816a..d2d83e4d9d8a 100644 --- a/src/catalog/src/information_schema.rs +++ b/src/catalog/src/information_schema.rs @@ -15,7 +15,9 @@ mod columns; mod key_column_usage; mod memory_table; +mod partitions; mod predicate; +mod region_peers; mod runtime_metrics; mod schemata; mod table_names; @@ -47,6 +49,8 @@ use self::columns::InformationSchemaColumns; use crate::error::Result; use crate::information_schema::key_column_usage::InformationSchemaKeyColumnUsage; use crate::information_schema::memory_table::{get_schema_columns, MemoryTable}; +use crate::information_schema::partitions::InformationSchemaPartitions; +use crate::information_schema::region_peers::InformationSchemaRegionPeers; use crate::information_schema::runtime_metrics::InformationSchemaMetrics; use crate::information_schema::schemata::InformationSchemaSchemata; use crate::information_schema::tables::InformationSchemaTables; @@ -74,6 +78,7 @@ lazy_static! { TRIGGERS, GLOBAL_STATUS, SESSION_STATUS, + PARTITIONS, ]; } @@ -156,6 +161,10 @@ impl InformationSchemaProvider { BUILD_INFO.to_string(), self.build_table(BUILD_INFO).unwrap(), ); + tables.insert( + REGION_PEERS.to_string(), + self.build_table(REGION_PEERS).unwrap(), + ); } tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap()); @@ -226,6 +235,14 @@ impl InformationSchemaProvider { self.catalog_manager.clone(), )) as _), RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())), + PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new( + self.catalog_name.clone(), + self.catalog_manager.clone(), + )) as _), + REGION_PEERS => Some(Arc::new(InformationSchemaRegionPeers::new( + self.catalog_name.clone(), + self.catalog_manager.clone(), + )) as _), _ => None, } } diff --git a/src/catalog/src/information_schema/columns.rs b/src/catalog/src/information_schema/columns.rs index dbae86538346..f96cba8f901e 100644 --- a/src/catalog/src/information_schema/columns.rs +++ b/src/catalog/src/information_schema/columns.rs @@ -58,6 +58,7 @@ const COLUMN_DEFAULT: &str = "column_default"; const IS_NULLABLE: &str = "is_nullable"; const COLUMN_TYPE: &str = "column_type"; const COLUMN_COMMENT: &str = "column_comment"; +const INIT_CAPACITY: usize = 42; impl InformationSchemaColumns { pub(super) fn new(catalog_name: String, catalog_manager: Weak) -> Self { @@ -154,16 +155,16 @@ impl InformationSchemaColumnsBuilder { schema, catalog_name, catalog_manager, - catalog_names: StringVectorBuilder::with_capacity(42), - schema_names: StringVectorBuilder::with_capacity(42), - table_names: StringVectorBuilder::with_capacity(42), - column_names: StringVectorBuilder::with_capacity(42), - data_types: StringVectorBuilder::with_capacity(42), - semantic_types: StringVectorBuilder::with_capacity(42), - column_defaults: StringVectorBuilder::with_capacity(42), - is_nullables: StringVectorBuilder::with_capacity(42), - column_types: StringVectorBuilder::with_capacity(42), - column_comments: StringVectorBuilder::with_capacity(42), + catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + column_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + data_types: StringVectorBuilder::with_capacity(INIT_CAPACITY), + semantic_types: StringVectorBuilder::with_capacity(INIT_CAPACITY), + column_defaults: StringVectorBuilder::with_capacity(INIT_CAPACITY), + is_nullables: StringVectorBuilder::with_capacity(INIT_CAPACITY), + column_types: StringVectorBuilder::with_capacity(INIT_CAPACITY), + column_comments: StringVectorBuilder::with_capacity(INIT_CAPACITY), } } @@ -177,13 +178,6 @@ impl InformationSchemaColumnsBuilder { let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { - if !catalog_manager - .schema_exists(&catalog_name, &schema_name) - .await? - { - continue; - } - let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await; while let Some(table) = stream.try_next().await? { diff --git a/src/catalog/src/information_schema/key_column_usage.rs b/src/catalog/src/information_schema/key_column_usage.rs index 28fba3c63ced..f12167ddc4c8 100644 --- a/src/catalog/src/information_schema/key_column_usage.rs +++ b/src/catalog/src/information_schema/key_column_usage.rs @@ -23,10 +23,10 @@ use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; -use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; +use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, VectorRef}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::value::Value; -use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder}; +use datatypes::vectors::{ConstantVector, StringVector, StringVectorBuilder, UInt32VectorBuilder}; use snafu::{OptionExt, ResultExt}; use store_api::storage::{ScanRequest, TableId}; @@ -44,6 +44,7 @@ const TABLE_SCHEMA: &str = "table_schema"; const TABLE_NAME: &str = "table_name"; const COLUMN_NAME: &str = "column_name"; const ORDINAL_POSITION: &str = "ordinal_position"; +const INIT_CAPACITY: usize = 42; /// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`. pub(super) struct InformationSchemaKeyColumnUsage { @@ -162,9 +163,6 @@ struct InformationSchemaKeyColumnUsageBuilder { column_name: StringVectorBuilder, ordinal_position: UInt32VectorBuilder, position_in_unique_constraint: UInt32VectorBuilder, - referenced_table_schema: StringVectorBuilder, - referenced_table_name: StringVectorBuilder, - referenced_column_name: StringVectorBuilder, } impl InformationSchemaKeyColumnUsageBuilder { @@ -177,18 +175,15 @@ impl InformationSchemaKeyColumnUsageBuilder { schema, catalog_name, catalog_manager, - constraint_catalog: StringVectorBuilder::with_capacity(42), - constraint_schema: StringVectorBuilder::with_capacity(42), - constraint_name: StringVectorBuilder::with_capacity(42), - table_catalog: StringVectorBuilder::with_capacity(42), - table_schema: StringVectorBuilder::with_capacity(42), - table_name: StringVectorBuilder::with_capacity(42), - column_name: StringVectorBuilder::with_capacity(42), - ordinal_position: UInt32VectorBuilder::with_capacity(42), - position_in_unique_constraint: UInt32VectorBuilder::with_capacity(42), - referenced_table_schema: StringVectorBuilder::with_capacity(42), - referenced_table_name: StringVectorBuilder::with_capacity(42), - referenced_column_name: StringVectorBuilder::with_capacity(42), + constraint_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY), + constraint_schema: StringVectorBuilder::with_capacity(INIT_CAPACITY), + constraint_name: StringVectorBuilder::with_capacity(INIT_CAPACITY), + table_catalog: StringVectorBuilder::with_capacity(INIT_CAPACITY), + table_schema: StringVectorBuilder::with_capacity(INIT_CAPACITY), + table_name: StringVectorBuilder::with_capacity(INIT_CAPACITY), + column_name: StringVectorBuilder::with_capacity(INIT_CAPACITY), + ordinal_position: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), + position_in_unique_constraint: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), } } @@ -301,12 +296,15 @@ impl InformationSchemaKeyColumnUsageBuilder { self.column_name.push(Some(column_name)); self.ordinal_position.push(Some(ordinal_position)); self.position_in_unique_constraint.push(None); - self.referenced_table_schema.push(None); - self.referenced_table_name.push(None); - self.referenced_column_name.push(None); } fn finish(&mut self) -> Result { + let rows_num = self.table_catalog.len(); + + let null_string_vector = Arc::new(ConstantVector::new( + Arc::new(StringVector::from(vec![None as Option<&str>])), + rows_num, + )); let columns: Vec = vec![ Arc::new(self.constraint_catalog.finish()), Arc::new(self.constraint_schema.finish()), @@ -317,9 +315,9 @@ impl InformationSchemaKeyColumnUsageBuilder { Arc::new(self.column_name.finish()), Arc::new(self.ordinal_position.finish()), Arc::new(self.position_in_unique_constraint.finish()), - Arc::new(self.referenced_table_schema.finish()), - Arc::new(self.referenced_table_name.finish()), - Arc::new(self.referenced_column_name.finish()), + null_string_vector.clone(), + null_string_vector.clone(), + null_string_vector, ]; RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) } diff --git a/src/catalog/src/information_schema/partitions.rs b/src/catalog/src/information_schema/partitions.rs new file mode 100644 index 000000000000..ecf23f8cc9ce --- /dev/null +++ b/src/catalog/src/information_schema/partitions.rs @@ -0,0 +1,399 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, Weak}; + +use arrow_schema::SchemaRef as ArrowSchemaRef; +use common_catalog::consts::INFORMATION_SCHEMA_PARTITIONS_TABLE_ID; +use common_error::ext::BoxedError; +use common_query::physical_plan::TaskContext; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use common_time::datetime::DateTime; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; +use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; +use datatypes::vectors::{ + ConstantVector, DateTimeVector, DateTimeVectorBuilder, Int64Vector, Int64VectorBuilder, + MutableVector, StringVector, StringVectorBuilder, UInt64VectorBuilder, +}; +use futures::TryStreamExt; +use partition::manager::PartitionInfo; +use partition::partition::PartitionDef; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{RegionId, ScanRequest, TableId}; +use table::metadata::{TableInfo, TableType}; + +use super::PARTITIONS; +use crate::error::{ + CreateRecordBatchSnafu, FindPartitionsSnafu, InternalSnafu, Result, + UpgradeWeakCatalogManagerRefSnafu, +}; +use crate::information_schema::{InformationTable, Predicates}; +use crate::kvbackend::KvBackendCatalogManager; +use crate::CatalogManager; + +const TABLE_CATALOG: &str = "table_catalog"; +const TABLE_SCHEMA: &str = "table_schema"; +const TABLE_NAME: &str = "table_name"; +const PARTITION_NAME: &str = "partition_name"; +const PARTITION_EXPRESSION: &str = "partition_expression"; +/// The region id +const GREPTIME_PARTITION_ID: &str = "greptime_partition_id"; +const INIT_CAPACITY: usize = 42; + +/// The `PARTITIONS` table provides information about partitioned tables. +/// See https://dev.mysql.com/doc/refman/8.0/en/information-schema-partitions-table.html +/// We provide an extral column `greptime_partition_id` for GreptimeDB region id. +pub(super) struct InformationSchemaPartitions { + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, +} + +impl InformationSchemaPartitions { + pub(super) fn new(catalog_name: String, catalog_manager: Weak) -> Self { + Self { + schema: Self::schema(), + catalog_name, + catalog_manager, + } + } + + pub(crate) fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + ColumnSchema::new(TABLE_CATALOG, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new(PARTITION_NAME, ConcreteDataType::string_datatype(), false), + ColumnSchema::new( + "subpartition_name", + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + "partition_ordinal_position", + ConcreteDataType::int64_datatype(), + true, + ), + ColumnSchema::new( + "subpartition_ordinal_position", + ConcreteDataType::int64_datatype(), + true, + ), + ColumnSchema::new( + "partition_method", + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + "subpartition_method", + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + PARTITION_EXPRESSION, + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + "subpartition_expression", + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new( + "partition_description", + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new("table_rows", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new("avg_row_length", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new("data_length", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new("max_data_length", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new("index_length", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new("data_free", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new("create_time", ConcreteDataType::datetime_datatype(), true), + ColumnSchema::new("update_time", ConcreteDataType::datetime_datatype(), true), + ColumnSchema::new("check_time", ConcreteDataType::datetime_datatype(), true), + ColumnSchema::new("checksum", ConcreteDataType::int64_datatype(), true), + ColumnSchema::new( + "partition_comment", + ConcreteDataType::string_datatype(), + true, + ), + ColumnSchema::new("nodegroup", ConcreteDataType::string_datatype(), true), + ColumnSchema::new("tablespace_name", ConcreteDataType::string_datatype(), true), + ColumnSchema::new( + GREPTIME_PARTITION_ID, + ConcreteDataType::uint64_datatype(), + true, + ), + ])) + } + + fn builder(&self) -> InformationSchemaPartitionsBuilder { + InformationSchemaPartitionsBuilder::new( + self.schema.clone(), + self.catalog_name.clone(), + self.catalog_manager.clone(), + ) + } +} + +impl InformationTable for InformationSchemaPartitions { + fn table_id(&self) -> TableId { + INFORMATION_SCHEMA_PARTITIONS_TABLE_ID + } + + fn table_name(&self) -> &'static str { + PARTITIONS + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, request: ScanRequest) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_partitions(Some(request)) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + +struct InformationSchemaPartitionsBuilder { + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, + + catalog_names: StringVectorBuilder, + schema_names: StringVectorBuilder, + table_names: StringVectorBuilder, + partition_names: StringVectorBuilder, + partition_ordinal_positions: Int64VectorBuilder, + partition_expressions: StringVectorBuilder, + create_times: DateTimeVectorBuilder, + partition_ids: UInt64VectorBuilder, +} + +impl InformationSchemaPartitionsBuilder { + fn new( + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, + ) -> Self { + Self { + schema, + catalog_name, + catalog_manager, + catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + partition_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + partition_ordinal_positions: Int64VectorBuilder::with_capacity(INIT_CAPACITY), + partition_expressions: StringVectorBuilder::with_capacity(INIT_CAPACITY), + create_times: DateTimeVectorBuilder::with_capacity(INIT_CAPACITY), + partition_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), + } + } + + /// Construct the `information_schema.partitions` virtual table + async fn make_partitions(&mut self, request: Option) -> Result { + let catalog_name = self.catalog_name.clone(); + let catalog_manager = self + .catalog_manager + .upgrade() + .context(UpgradeWeakCatalogManagerRefSnafu)?; + + let partition_manager = catalog_manager + .as_any() + .downcast_ref::() + .map(|catalog_manager| catalog_manager.partition_manager()); + + let predicates = Predicates::from_scan_request(&request); + + for schema_name in catalog_manager.schema_names(&catalog_name).await? { + let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await; + + while let Some(table) = stream.try_next().await? { + let table_info = table.table_info(); + + if table_info.table_type == TableType::Temporary { + continue; + } + + let table_id = table_info.ident.table_id; + let partitions = if let Some(partition_manager) = &partition_manager { + partition_manager + .find_table_partitions(table_id) + .await + .context(FindPartitionsSnafu { + table: &table_info.name, + })? + } else { + // Current node must be a standalone instance, contains only one partition by default. + // TODO(dennis): change it when we support multi-regions for standalone. + vec![PartitionInfo { + id: RegionId::new(table_id, 0), + partition: PartitionDef::new(vec![], vec![]), + }] + }; + + self.add_partitions( + &predicates, + &table_info, + &catalog_name, + &schema_name, + &table_info.name, + &partitions, + ); + } + } + + self.finish() + } + + #[allow(clippy::too_many_arguments)] + fn add_partitions( + &mut self, + predicates: &Predicates, + table_info: &TableInfo, + catalog_name: &str, + schema_name: &str, + table_name: &str, + partitions: &[PartitionInfo], + ) { + let row = [ + (TABLE_CATALOG, &Value::from(catalog_name)), + (TABLE_SCHEMA, &Value::from(schema_name)), + (TABLE_NAME, &Value::from(table_name)), + ]; + + if !predicates.eval(&row) { + return; + } + + for (index, partition) in partitions.iter().enumerate() { + let partition_name = format!("p{index}"); + + self.catalog_names.push(Some(catalog_name)); + self.schema_names.push(Some(schema_name)); + self.table_names.push(Some(table_name)); + self.partition_names.push(Some(&partition_name)); + self.partition_ordinal_positions + .push(Some((index + 1) as i64)); + let expressions = if partition.partition.partition_columns().is_empty() { + None + } else { + Some(partition.partition.to_string()) + }; + + self.partition_expressions.push(expressions.as_deref()); + self.create_times.push(Some(DateTime::from( + table_info.meta.created_on.timestamp_millis(), + ))); + self.partition_ids.push(Some(partition.id.as_u64())); + } + } + + fn finish(&mut self) -> Result { + let rows_num = self.catalog_names.len(); + + let null_string_vector = Arc::new(ConstantVector::new( + Arc::new(StringVector::from(vec![None as Option<&str>])), + rows_num, + )); + let null_i64_vector = Arc::new(ConstantVector::new( + Arc::new(Int64Vector::from(vec![None])), + rows_num, + )); + let null_datetime_vector = Arc::new(ConstantVector::new( + Arc::new(DateTimeVector::from(vec![None])), + rows_num, + )); + let partition_methods = Arc::new(ConstantVector::new( + Arc::new(StringVector::from(vec![Some("RANGE")])), + rows_num, + )); + + let columns: Vec = vec![ + Arc::new(self.catalog_names.finish()), + Arc::new(self.schema_names.finish()), + Arc::new(self.table_names.finish()), + Arc::new(self.partition_names.finish()), + null_string_vector.clone(), + Arc::new(self.partition_ordinal_positions.finish()), + null_i64_vector.clone(), + partition_methods, + null_string_vector.clone(), + Arc::new(self.partition_expressions.finish()), + null_string_vector.clone(), + null_string_vector.clone(), + // TODO(dennis): rows and index statistics info + null_i64_vector.clone(), + null_i64_vector.clone(), + null_i64_vector.clone(), + null_i64_vector.clone(), + null_i64_vector.clone(), + null_i64_vector.clone(), + Arc::new(self.create_times.finish()), + // TODO(dennis): supports update_time + null_datetime_vector.clone(), + null_datetime_vector, + null_i64_vector, + null_string_vector.clone(), + null_string_vector.clone(), + null_string_vector, + Arc::new(self.partition_ids.finish()), + ]; + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) + } +} + +impl DfPartitionStream for InformationSchemaPartitions { + fn schema(&self) -> &ArrowSchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_partitions(None) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} diff --git a/src/catalog/src/information_schema/region_peers.rs b/src/catalog/src/information_schema/region_peers.rs new file mode 100644 index 000000000000..882ad263092c --- /dev/null +++ b/src/catalog/src/information_schema/region_peers.rs @@ -0,0 +1,279 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use core::pin::pin; +use std::sync::{Arc, Weak}; + +use arrow_schema::SchemaRef as ArrowSchemaRef; +use common_catalog::consts::INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID; +use common_error::ext::BoxedError; +use common_meta::rpc::router::RegionRoute; +use common_query::physical_plan::TaskContext; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter; +use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream; +use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::value::Value; +use datatypes::vectors::{Int64VectorBuilder, StringVectorBuilder, UInt64VectorBuilder}; +use futures::{StreamExt, TryStreamExt}; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{ScanRequest, TableId}; +use table::metadata::TableType; + +use super::REGION_PEERS; +use crate::error::{ + CreateRecordBatchSnafu, FindRegionRoutesSnafu, InternalSnafu, Result, + UpgradeWeakCatalogManagerRefSnafu, +}; +use crate::information_schema::{InformationTable, Predicates}; +use crate::kvbackend::KvBackendCatalogManager; +use crate::CatalogManager; + +const REGION_ID: &str = "region_id"; +const PEER_ID: &str = "peer_id"; +const PEER_ADDR: &str = "peer_addr"; +const IS_LEADER: &str = "is_leader"; +const STATUS: &str = "status"; +const DOWN_SECONDS: &str = "down_seconds"; +const INIT_CAPACITY: usize = 42; + +/// The `REGION_PEERS` table provides information about the region distribution and routes. Including fields: +/// +/// - `region_id`: the region id +/// - `peer_id`: the region storage datanode peer id +/// - `peer_addr`: the region storage datanode peer address +/// - `is_leader`: whether the peer is the leader +/// - `status`: the region status, `ALIVE` or `DOWNGRADED`. +/// - `down_seconds`: the duration of being offline, in seconds. +/// +pub(super) struct InformationSchemaRegionPeers { + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, +} + +impl InformationSchemaRegionPeers { + pub(super) fn new(catalog_name: String, catalog_manager: Weak) -> Self { + Self { + schema: Self::schema(), + catalog_name, + catalog_manager, + } + } + + pub(crate) fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + ColumnSchema::new(REGION_ID, ConcreteDataType::uint64_datatype(), false), + ColumnSchema::new(PEER_ID, ConcreteDataType::uint64_datatype(), true), + ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true), + ColumnSchema::new(IS_LEADER, ConcreteDataType::string_datatype(), true), + ColumnSchema::new(STATUS, ConcreteDataType::string_datatype(), true), + ColumnSchema::new(DOWN_SECONDS, ConcreteDataType::int64_datatype(), true), + ])) + } + + fn builder(&self) -> InformationSchemaRegionPeersBuilder { + InformationSchemaRegionPeersBuilder::new( + self.schema.clone(), + self.catalog_name.clone(), + self.catalog_manager.clone(), + ) + } +} + +impl InformationTable for InformationSchemaRegionPeers { + fn table_id(&self) -> TableId { + INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID + } + + fn table_name(&self) -> &'static str { + REGION_PEERS + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn to_stream(&self, request: ScanRequest) -> Result { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + let stream = Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_region_peers(Some(request)) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )); + Ok(Box::pin( + RecordBatchStreamAdapter::try_new(stream) + .map_err(BoxedError::new) + .context(InternalSnafu)?, + )) + } +} + +struct InformationSchemaRegionPeersBuilder { + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, + + region_ids: UInt64VectorBuilder, + peer_ids: UInt64VectorBuilder, + peer_addrs: StringVectorBuilder, + is_leaders: StringVectorBuilder, + statuses: StringVectorBuilder, + down_seconds: Int64VectorBuilder, +} + +impl InformationSchemaRegionPeersBuilder { + fn new( + schema: SchemaRef, + catalog_name: String, + catalog_manager: Weak, + ) -> Self { + Self { + schema, + catalog_name, + catalog_manager, + region_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), + peer_ids: UInt64VectorBuilder::with_capacity(INIT_CAPACITY), + peer_addrs: StringVectorBuilder::with_capacity(INIT_CAPACITY), + is_leaders: StringVectorBuilder::with_capacity(INIT_CAPACITY), + statuses: StringVectorBuilder::with_capacity(INIT_CAPACITY), + down_seconds: Int64VectorBuilder::with_capacity(INIT_CAPACITY), + } + } + + /// Construct the `information_schema.region_peers` virtual table + async fn make_region_peers(&mut self, request: Option) -> Result { + let catalog_name = self.catalog_name.clone(); + let catalog_manager = self + .catalog_manager + .upgrade() + .context(UpgradeWeakCatalogManagerRefSnafu)?; + + let partition_manager = catalog_manager + .as_any() + .downcast_ref::() + .map(|catalog_manager| catalog_manager.partition_manager()); + + let predicates = Predicates::from_scan_request(&request); + + for schema_name in catalog_manager.schema_names(&catalog_name).await? { + let table_id_stream = catalog_manager + .tables(&catalog_name, &schema_name) + .await + .try_filter_map(|t| async move { + let table_info = t.table_info(); + if table_info.table_type == TableType::Temporary { + Ok(None) + } else { + Ok(Some(table_info.ident.table_id)) + } + }); + + const BATCH_SIZE: usize = 128; + + // Split table ids into chunks + let mut table_id_chunks = pin!(table_id_stream.ready_chunks(BATCH_SIZE)); + + while let Some(table_ids) = table_id_chunks.next().await { + let table_ids = table_ids.into_iter().collect::>>()?; + + let table_routes = if let Some(partition_manager) = &partition_manager { + partition_manager + .find_region_routes_batch(&table_ids) + .await + .context(FindRegionRoutesSnafu)? + } else { + table_ids.into_iter().map(|id| (id, vec![])).collect() + }; + + for routes in table_routes.values() { + self.add_region_peers(&predicates, routes); + } + } + } + + self.finish() + } + + fn add_region_peers(&mut self, predicates: &Predicates, routes: &[RegionRoute]) { + for route in routes { + let region_id = route.region.id.as_u64(); + let peer_id = route.leader_peer.clone().map(|p| p.id); + let peer_addr = route.leader_peer.clone().map(|p| p.addr); + let status = if let Some(status) = route.leader_status { + Some(status.as_ref().to_string()) + } else { + // Alive by default + Some("ALIVE".to_string()) + }; + + let row = [(REGION_ID, &Value::from(region_id))]; + + if !predicates.eval(&row) { + return; + } + + // TODO(dennis): adds followers. + self.region_ids.push(Some(region_id)); + self.peer_ids.push(peer_id); + self.peer_addrs.push(peer_addr.as_deref()); + self.is_leaders.push(Some("Yes")); + self.statuses.push(status.as_deref()); + self.down_seconds + .push(route.leader_down_millis().map(|m| m / 1000)); + } + } + + fn finish(&mut self) -> Result { + let columns: Vec = vec![ + Arc::new(self.region_ids.finish()), + Arc::new(self.peer_ids.finish()), + Arc::new(self.peer_addrs.finish()), + Arc::new(self.is_leaders.finish()), + Arc::new(self.statuses.finish()), + Arc::new(self.down_seconds.finish()), + ]; + RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu) + } +} + +impl DfPartitionStream for InformationSchemaRegionPeers { + fn schema(&self) -> &ArrowSchemaRef { + self.schema.arrow_schema() + } + + fn execute(&self, _: Arc) -> DfSendableRecordBatchStream { + let schema = self.schema.arrow_schema().clone(); + let mut builder = self.builder(); + Box::pin(DfRecordBatchStreamAdapter::new( + schema, + futures::stream::once(async move { + builder + .make_region_peers(None) + .await + .map(|x| x.into_df_record_batch()) + .map_err(Into::into) + }), + )) + } +} diff --git a/src/catalog/src/information_schema/schemata.rs b/src/catalog/src/information_schema/schemata.rs index eddfb142cc77..9f435878658e 100644 --- a/src/catalog/src/information_schema/schemata.rs +++ b/src/catalog/src/information_schema/schemata.rs @@ -41,6 +41,7 @@ const CATALOG_NAME: &str = "catalog_name"; const SCHEMA_NAME: &str = "schema_name"; const DEFAULT_CHARACTER_SET_NAME: &str = "default_character_set_name"; const DEFAULT_COLLATION_NAME: &str = "default_collation_name"; +const INIT_CAPACITY: usize = 42; /// The `information_schema.schemata` table implementation. pub(super) struct InformationSchemaSchemata { @@ -144,11 +145,11 @@ impl InformationSchemaSchemataBuilder { schema, catalog_name, catalog_manager, - catalog_names: StringVectorBuilder::with_capacity(42), - schema_names: StringVectorBuilder::with_capacity(42), - charset_names: StringVectorBuilder::with_capacity(42), - collation_names: StringVectorBuilder::with_capacity(42), - sql_paths: StringVectorBuilder::with_capacity(42), + catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + charset_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + collation_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + sql_paths: StringVectorBuilder::with_capacity(INIT_CAPACITY), } } @@ -162,13 +163,6 @@ impl InformationSchemaSchemataBuilder { let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { - if !catalog_manager - .schema_exists(&catalog_name, &schema_name) - .await? - { - continue; - } - self.add_schema(&predicates, &catalog_name, &schema_name); } diff --git a/src/catalog/src/information_schema/table_names.rs b/src/catalog/src/information_schema/table_names.rs index e47b96d146ec..32faa00e0370 100644 --- a/src/catalog/src/information_schema/table_names.rs +++ b/src/catalog/src/information_schema/table_names.rs @@ -39,3 +39,5 @@ pub const TRIGGERS: &str = "triggers"; pub const GLOBAL_STATUS: &str = "global_status"; pub const SESSION_STATUS: &str = "session_status"; pub const RUNTIME_METRICS: &str = "runtime_metrics"; +pub const PARTITIONS: &str = "partitions"; +pub const REGION_PEERS: &str = "greptime_region_peers"; diff --git a/src/catalog/src/information_schema/tables.rs b/src/catalog/src/information_schema/tables.rs index 454dbf9b8871..f55abce61f1f 100644 --- a/src/catalog/src/information_schema/tables.rs +++ b/src/catalog/src/information_schema/tables.rs @@ -45,6 +45,7 @@ const TABLE_NAME: &str = "table_name"; const TABLE_TYPE: &str = "table_type"; const TABLE_ID: &str = "table_id"; const ENGINE: &str = "engine"; +const INIT_CAPACITY: usize = 42; pub(super) struct InformationSchemaTables { schema: SchemaRef, @@ -141,12 +142,12 @@ impl InformationSchemaTablesBuilder { schema, catalog_name, catalog_manager, - catalog_names: StringVectorBuilder::with_capacity(42), - schema_names: StringVectorBuilder::with_capacity(42), - table_names: StringVectorBuilder::with_capacity(42), - table_types: StringVectorBuilder::with_capacity(42), - table_ids: UInt32VectorBuilder::with_capacity(42), - engines: StringVectorBuilder::with_capacity(42), + catalog_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + schema_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY), + table_types: StringVectorBuilder::with_capacity(INIT_CAPACITY), + table_ids: UInt32VectorBuilder::with_capacity(INIT_CAPACITY), + engines: StringVectorBuilder::with_capacity(INIT_CAPACITY), } } @@ -160,13 +161,6 @@ impl InformationSchemaTablesBuilder { let predicates = Predicates::from_scan_request(&request); for schema_name in catalog_manager.schema_names(&catalog_name).await? { - if !catalog_manager - .schema_exists(&catalog_name, &schema_name) - .await? - { - continue; - } - let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await; while let Some(table) = stream.try_next().await? { diff --git a/src/cmd/src/cli/bench.rs b/src/cmd/src/cli/bench.rs index 6cd7d86a1642..60405e812819 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cmd/src/cli/bench.rs @@ -156,6 +156,7 @@ fn create_region_routes(regions: Vec) -> Vec { }), follower_peers: vec![], leader_status: None, + leader_down_since: None, }); } diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 2fd9aa92a89f..ddf834dbc9cd 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -82,6 +82,10 @@ pub const INFORMATION_SCHEMA_GLOBAL_STATUS_TABLE_ID: u32 = 25; pub const INFORMATION_SCHEMA_SESSION_STATUS_TABLE_ID: u32 = 26; /// id for information_schema.RUNTIME_METRICS pub const INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID: u32 = 27; +/// id for information_schema.PARTITIONS +pub const INFORMATION_SCHEMA_PARTITIONS_TABLE_ID: u32 = 28; +/// id for information_schema.REGION_PEERS +pub const INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID: u32 = 29; /// ----- End of information_schema tables ----- pub const MITO_ENGINE: &str = "mito"; diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 702061c0b0a1..cf7ff84e92ee 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -844,6 +844,7 @@ mod tests { use std::sync::Arc; use bytes::Bytes; + use common_time::util::current_time_millis; use futures::TryStreamExt; use table::metadata::{RawTableInfo, TableInfo}; @@ -910,6 +911,7 @@ mod tests { leader_peer: Some(Peer::new(datanode, "a2")), follower_peers: vec![], leader_status: None, + leader_down_since: None, } } @@ -1263,6 +1265,7 @@ mod tests { leader_peer: Some(Peer::new(datanode, "a2")), leader_status: Some(RegionStatus::Downgraded), follower_peers: vec![], + leader_down_since: Some(current_time_millis()), }, RegionRoute { region: Region { @@ -1274,6 +1277,7 @@ mod tests { leader_peer: Some(Peer::new(datanode, "a1")), leader_status: None, follower_peers: vec![], + leader_down_since: None, }, ]; let table_info: RawTableInfo = @@ -1314,10 +1318,18 @@ mod tests { updated_route_value.region_routes().unwrap()[0].leader_status, Some(RegionStatus::Downgraded) ); + + assert!(updated_route_value.region_routes().unwrap()[0] + .leader_down_since + .is_some()); + assert_eq!( updated_route_value.region_routes().unwrap()[1].leader_status, Some(RegionStatus::Downgraded) ); + assert!(updated_route_value.region_routes().unwrap()[1] + .leader_down_since + .is_some()); } async fn assert_datanode_table( diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 270707d945c2..7886dc799787 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -457,7 +457,7 @@ mod tests { let new_raw_v = format!("{:?}", v); assert_eq!( new_raw_v, - r#"Physical(PhysicalTableRouteValue { region_routes: [RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None }, RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None }], version: 0 })"# + r#"Physical(PhysicalTableRouteValue { region_routes: [RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }, RegionRoute { region: Region { id: 1(0, 1), name: "r1", partition: None, attrs: {} }, leader_peer: Some(Peer { id: 2, addr: "a2" }), follower_peers: [], leader_status: None, leader_down_since: None }], version: 0 })"# ); } } diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index b5db5014fcec..31be66f64954 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -18,11 +18,13 @@ use api::v1::meta::{ Partition as PbPartition, Peer as PbPeer, Region as PbRegion, Table as PbTable, TableRoute as PbTableRoute, }; +use common_time::util::current_time_millis; use derive_builder::Builder; use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use snafu::OptionExt; use store_api::storage::{RegionId, RegionNumber}; +use strum::AsRefStr; use crate::error::{self, Result}; use crate::key::RegionDistribution; @@ -204,6 +206,7 @@ impl TableRoute { leader_peer, follower_peers, leader_status: None, + leader_down_since: None, }); } @@ -258,10 +261,25 @@ pub struct RegionRoute { #[builder(setter(into, strip_option), default)] #[serde(default, skip_serializing_if = "Option::is_none")] pub leader_status: Option, + /// The start time when the leader is in `Downgraded` status. + #[serde(default)] + #[builder(default = "self.default_leader_down_since()")] + pub leader_down_since: Option, +} + +impl RegionRouteBuilder { + fn default_leader_down_since(&self) -> Option { + match self.leader_status { + Some(Some(RegionStatus::Downgraded)) => Some(current_time_millis()), + _ => None, + } + } } /// The Status of the [Region]. -#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq)] +/// TODO(dennis): It's better to add more fine-grained statuses such as `PENDING` etc. +#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, AsRefStr)] +#[strum(serialize_all = "UPPERCASE")] pub enum RegionStatus { /// The following cases in which the [Region] will be downgraded. /// @@ -292,15 +310,34 @@ impl RegionRoute { /// **Notes:** Meta Server will stop renewing the lease for the downgraded [Region]. /// pub fn downgrade_leader(&mut self) { + self.leader_down_since = Some(current_time_millis()); self.leader_status = Some(RegionStatus::Downgraded) } + /// Returns how long since the leader is in `Downgraded` status. + pub fn leader_down_millis(&self) -> Option { + self.leader_down_since + .map(|start| current_time_millis() - start) + } + /// Sets the leader status. /// /// Returns true if updated. pub fn set_leader_status(&mut self, status: Option) -> bool { let updated = self.leader_status != status; + match (status, updated) { + (Some(RegionStatus::Downgraded), true) => { + self.leader_down_since = Some(current_time_millis()); + } + (Some(RegionStatus::Downgraded), false) => { + // Do nothing if leader is still in `Downgraded` status. + } + _ => { + self.leader_down_since = None; + } + } + self.leader_status = status; updated } @@ -441,6 +478,7 @@ mod tests { leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], leader_status: None, + leader_down_since: None, }; assert!(!region_route.is_leader_downgraded()); @@ -462,6 +500,7 @@ mod tests { leader_peer: Some(Peer::new(1, "a1")), follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], leader_status: None, + leader_down_since: None, }; let input = r#"{"region":{"id":2,"name":"r2","partition":null,"attrs":{}},"leader_peer":{"id":1,"addr":"a1"},"follower_peers":[{"id":2,"addr":"a2"},{"id":3,"addr":"a3"}]}"#; diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index eb792cf9ecd2..0a8f917bf6f2 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -296,6 +296,7 @@ mod test { leader_peer: Some(peer.clone()), follower_peers: vec![follower_peer.clone()], leader_status: Some(RegionStatus::Downgraded), + leader_down_since: Some(1), }, RegionRoute { region: Region::new_test(another_region_id), diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 62d7f92320fe..2c5a5f61d0d6 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -190,6 +190,7 @@ mod tests { use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::{Region, RegionRoute, RegionStatus}; + use common_time::util::current_time_millis; use store_api::storage::RegionId; use crate::error::Error; @@ -285,6 +286,7 @@ mod tests { leader_peer: Some(Peer::empty(1)), follower_peers: vec![Peer::empty(2), Peer::empty(3)], leader_status: Some(RegionStatus::Downgraded), + leader_down_since: Some(current_time_millis()), }]; env.create_physical_table_metadata(table_info, region_routes) @@ -296,6 +298,7 @@ mod tests { .unwrap(); assert!(!new_region_routes[0].is_leader_downgraded()); + assert!(new_region_routes[0].leader_down_since.is_none()); assert_eq!(new_region_routes[0].follower_peers, vec![Peer::empty(3)]); assert_eq!(new_region_routes[0].leader_peer.as_ref().unwrap().id, 2); } @@ -316,6 +319,7 @@ mod tests { leader_peer: Some(Peer::empty(1)), follower_peers: vec![Peer::empty(5), Peer::empty(3)], leader_status: Some(RegionStatus::Downgraded), + leader_down_since: Some(current_time_millis()), }, RegionRoute { region: Region::new_test(RegionId::new(table_id, 2)), @@ -377,6 +381,7 @@ mod tests { leader_peer: Some(leader_peer), follower_peers: vec![Peer::empty(2), Peer::empty(3)], leader_status: None, + leader_down_since: None, }]; env.create_physical_table_metadata(table_info, region_routes) @@ -400,6 +405,7 @@ mod tests { leader_peer: Some(candidate_peer), follower_peers: vec![Peer::empty(2), Peer::empty(3)], leader_status: None, + leader_down_since: None, }]; env.create_physical_table_metadata(table_info, region_routes) @@ -423,6 +429,7 @@ mod tests { leader_peer: Some(candidate_peer), follower_peers: vec![Peer::empty(2), Peer::empty(3)], leader_status: Some(RegionStatus::Downgraded), + leader_down_since: None, }]; env.create_physical_table_metadata(table_info, region_routes) diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index 3013ac9ad745..4d021fae97fa 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -50,6 +50,7 @@ pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) leader_peer, follower_peers: vec![], leader_status: None, + leader_down_since: None, } } @@ -132,6 +133,7 @@ pub(crate) async fn prepare_table_region_and_info_value( }), follower_peers: vec![], leader_status: None, + leader_down_since: None, }; // Region distribution: diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index dd2a044b51c3..859090a753b8 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -133,6 +133,7 @@ pub(crate) async fn create_partition_rule_manager( leader_peer: Some(Peer::new(3, "")), follower_peers: vec![], leader_status: None, + leader_down_since: None, }, RegionRoute { region: Region { @@ -151,6 +152,7 @@ pub(crate) async fn create_partition_rule_manager( leader_peer: Some(Peer::new(2, "")), follower_peers: vec![], leader_status: None, + leader_down_since: None, }, RegionRoute { region: Region { @@ -169,6 +171,7 @@ pub(crate) async fn create_partition_rule_manager( leader_peer: Some(Peer::new(1, "")), follower_peers: vec![], leader_status: None, + leader_down_since: None, }, ]), region_wal_options.clone(), @@ -200,6 +203,7 @@ pub(crate) async fn create_partition_rule_manager( leader_peer: None, follower_peers: vec![], leader_status: None, + leader_down_since: None, }, RegionRoute { region: Region { @@ -221,6 +225,7 @@ pub(crate) async fn create_partition_rule_manager( leader_peer: None, follower_peers: vec![], leader_status: None, + leader_down_since: None, }, RegionRoute { region: Region { @@ -239,6 +244,7 @@ pub(crate) async fn create_partition_rule_manager( leader_peer: None, follower_peers: vec![], leader_status: None, + leader_down_since: None, }, ]), region_wal_options, diff --git a/src/partition/Cargo.toml b/src/partition/Cargo.toml index d6a747db66ba..4fadc02b590c 100644 --- a/src/partition/Cargo.toml +++ b/src/partition/Cargo.toml @@ -17,6 +17,7 @@ datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datatypes.workspace = true +itertools.workspace = true lazy_static.workspace = true meta-client.workspace = true moka = { workspace = true, features = ["future"] } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 4e424b595a0e..ab757a3d2edf 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -65,7 +65,7 @@ impl PartitionRuleManager { } } - async fn find_region_routes(&self, table_id: TableId) -> Result> { + pub async fn find_region_routes(&self, table_id: TableId) -> Result> { let (_, route) = self .table_route_manager .get_physical_table_route(table_id) @@ -74,6 +74,29 @@ impl PartitionRuleManager { Ok(route.region_routes) } + pub async fn find_region_routes_batch( + &self, + table_ids: &[TableId], + ) -> Result>> { + let table_routes = self + .table_route_manager + .batch_get(table_ids) + .await + .context(error::TableRouteManagerSnafu)?; + + let mut table_region_routes = HashMap::with_capacity(table_routes.len()); + + for (table_id, table_route) in table_routes { + let region_routes = table_route + .region_routes() + .context(error::TableRouteManagerSnafu)? + .clone(); + table_region_routes.insert(table_id, region_routes); + } + + Ok(table_region_routes) + } + pub async fn find_table_partitions(&self, table_id: TableId) -> Result> { let region_routes = self.find_region_routes(table_id).await?; ensure!( diff --git a/src/partition/src/partition.rs b/src/partition/src/partition.rs index 1d63277a9fd1..f25ca63da2ae 100644 --- a/src/partition/src/partition.rs +++ b/src/partition/src/partition.rs @@ -13,12 +13,13 @@ // limitations under the License. use std::any::Any; -use std::fmt::Debug; +use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; use common_meta::rpc::router::Partition as MetaPartition; use datafusion_expr::Operator; use datatypes::prelude::Value; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use store_api::storage::RegionNumber; @@ -56,6 +57,29 @@ pub struct PartitionDef { partition_bounds: Vec, } +impl Display for PartitionBound { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::Value(v) => write!(f, "{}", v), + Self::MaxValue => write!(f, "MAXVALUE"), + } + } +} + +impl Display for PartitionDef { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "({}) VALUES LESS THAN ({})", + self.partition_columns.iter().join(", "), + self.partition_bounds + .iter() + .map(|b| format!("{b}")) + .join(", ") + ) + } +} + impl PartitionDef { pub fn new(partition_columns: Vec, partition_bounds: Vec) -> Self { Self { @@ -162,6 +186,8 @@ mod tests { PartitionBound::Value(1_i32.into()), ], }; + assert_eq!("(a, b) VALUES LESS THAN (MAXVALUE, 1)", def.to_string()); + let partition: MetaPartition = def.try_into().unwrap(); assert_eq!( r#"{"column_list":["a","b"],"value_list":["\"MaxValue\"","{\"Value\":{\"Int32\":1}}"]}"#, diff --git a/tests/cases/distributed/create/partition.result b/tests/cases/distributed/create/partition.result new file mode 100644 index 000000000000..9164096e3ed8 --- /dev/null +++ b/tests/cases/distributed/create/partition.result @@ -0,0 +1,40 @@ +CREATE TABLE my_table ( + a INT PRIMARY KEY, + b STRING, + ts TIMESTAMP TIME INDEX, +) +PARTITION BY RANGE COLUMNS (a) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20), + PARTITION p2 VALUES LESS THAN (MAXVALUE), +); + +Affected Rows: 0 + +-- SQLNESS REPLACE (\d{13}) ID +SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name; + ++---------------+--------------+------------+----------------+---------------------------------+-----------------------+ +| table_catalog | table_schema | table_name | partition_name | partition_expression | greptime_partition_id | ++---------------+--------------+------------+----------------+---------------------------------+-----------------------+ +| greptime | public | my_table | p0 | (a) VALUES LESS THAN (10) | ID | +| greptime | public | my_table | p1 | (a) VALUES LESS THAN (20) | ID | +| greptime | public | my_table | p2 | (a) VALUES LESS THAN (MAXVALUE) | ID | ++---------------+--------------+------------+----------------+---------------------------------+-----------------------+ + +-- SQLNESS REPLACE (\d{13}) REGION_ID +-- SQLNESS REPLACE (\d{1}) PEER_ID +SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; + ++---------------+---------+-----------+--------+ +| region_id | peer_id | is_leader | status | ++---------------+---------+-----------+--------+ +| REGION_ID | PEER_ID | Yes | ALIVE | +| REGION_ID | PEER_ID | Yes | ALIVE | +| REGION_ID | PEER_ID | Yes | ALIVE | ++---------------+---------+-----------+--------+ + +DROP TABLE my_table; + +Affected Rows: 0 + diff --git a/tests/cases/distributed/create/partition.sql b/tests/cases/distributed/create/partition.sql new file mode 100644 index 000000000000..df5f522f425b --- /dev/null +++ b/tests/cases/distributed/create/partition.sql @@ -0,0 +1,19 @@ +CREATE TABLE my_table ( + a INT PRIMARY KEY, + b STRING, + ts TIMESTAMP TIME INDEX, +) +PARTITION BY RANGE COLUMNS (a) ( + PARTITION p0 VALUES LESS THAN (10), + PARTITION p1 VALUES LESS THAN (20), + PARTITION p2 VALUES LESS THAN (MAXVALUE), +); + +-- SQLNESS REPLACE (\d{13}) ID +SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, greptime_partition_id from information_schema.partitions WHERE table_name = 'my_table' ORDER BY table_catalog, table_schema, table_name, partition_name; + +-- SQLNESS REPLACE (\d{13}) REGION_ID +-- SQLNESS REPLACE (\d{1}) PEER_ID +SELECT region_id, peer_id, is_leader, status FROM information_schema.greptime_region_peers ORDER BY peer_id; + +DROP TABLE my_table; diff --git a/tests/cases/standalone/common/show/show_databases_tables.result b/tests/cases/standalone/common/show/show_databases_tables.result index c61743d193a3..269983b36b33 100644 --- a/tests/cases/standalone/common/show/show_databases_tables.result +++ b/tests/cases/standalone/common/show/show_databases_tables.result @@ -34,6 +34,7 @@ show tables; | key_column_usage | | optimizer_trace | | parameters | +| partitions | | profiling | | referential_constraints | | routines | diff --git a/tests/cases/standalone/common/system/information_schema.result b/tests/cases/standalone/common/system/information_schema.result index 1d39d272930a..808b991dbaff 100644 --- a/tests/cases/standalone/common/system/information_schema.result +++ b/tests/cases/standalone/common/system/information_schema.result @@ -26,6 +26,7 @@ order by table_schema, table_name; | greptime | information_schema | key_column_usage | LOCAL TEMPORARY | 16 | | | greptime | information_schema | optimizer_trace | LOCAL TEMPORARY | 17 | | | greptime | information_schema | parameters | LOCAL TEMPORARY | 18 | | +| greptime | information_schema | partitions | LOCAL TEMPORARY | 28 | | | greptime | information_schema | profiling | LOCAL TEMPORARY | 19 | | | greptime | information_schema | referential_constraints | LOCAL TEMPORARY | 20 | | | greptime | information_schema | routines | LOCAL TEMPORARY | 21 | | @@ -182,6 +183,32 @@ select * from information_schema.columns order by table_schema, table_name, colu | greptime | information_schema | parameters | specific_catalog | String | FIELD | | No | String | | | greptime | information_schema | parameters | specific_name | String | FIELD | | No | String | | | greptime | information_schema | parameters | specific_schema | String | FIELD | | No | String | | +| greptime | information_schema | partitions | avg_row_length | Int64 | FIELD | | Yes | Int64 | | +| greptime | information_schema | partitions | check_time | DateTime | FIELD | | Yes | DateTime | | +| greptime | information_schema | partitions | checksum | Int64 | FIELD | | Yes | Int64 | | +| greptime | information_schema | partitions | create_time | DateTime | FIELD | | Yes | DateTime | | +| greptime | information_schema | partitions | data_free | Int64 | FIELD | | Yes | Int64 | | +| greptime | information_schema | partitions | data_length | Int64 | FIELD | | Yes | Int64 | | +| greptime | information_schema | partitions | greptime_partition_id | UInt64 | FIELD | | Yes | UInt64 | | +| greptime | information_schema | partitions | index_length | Int64 | FIELD | | Yes | Int64 | | +| greptime | information_schema | partitions | max_data_length | Int64 | FIELD | | Yes | Int64 | | +| greptime | information_schema | partitions | nodegroup | String | FIELD | | Yes | String | | +| greptime | information_schema | partitions | partition_comment | String | FIELD | | Yes | String | | +| greptime | information_schema | partitions | partition_description | String | FIELD | | Yes | String | | +| greptime | information_schema | partitions | partition_expression | String | FIELD | | Yes | String | | +| greptime | information_schema | partitions | partition_method | String | FIELD | | Yes | String | | +| greptime | information_schema | partitions | partition_name | String | FIELD | | No | String | | +| greptime | information_schema | partitions | partition_ordinal_position | Int64 | FIELD | | Yes | Int64 | | +| greptime | information_schema | partitions | subpartition_expression | String | FIELD | | Yes | String | | +| greptime | information_schema | partitions | subpartition_method | String | FIELD | | Yes | String | | +| greptime | information_schema | partitions | subpartition_name | String | FIELD | | Yes | String | | +| greptime | information_schema | partitions | subpartition_ordinal_position | Int64 | FIELD | | Yes | Int64 | | +| greptime | information_schema | partitions | table_catalog | String | FIELD | | No | String | | +| greptime | information_schema | partitions | table_name | String | FIELD | | No | String | | +| greptime | information_schema | partitions | table_rows | Int64 | FIELD | | Yes | Int64 | | +| greptime | information_schema | partitions | table_schema | String | FIELD | | No | String | | +| greptime | information_schema | partitions | tablespace_name | String | FIELD | | Yes | String | | +| greptime | information_schema | partitions | update_time | DateTime | FIELD | | Yes | DateTime | | | greptime | information_schema | profiling | block_ops_in | Int64 | FIELD | | No | Int64 | | | greptime | information_schema | profiling | block_ops_out | Int64 | FIELD | | No | Int64 | | | greptime | information_schema | profiling | context_involuntary | Int64 | FIELD | | No | Int64 | | @@ -528,7 +555,7 @@ select * from key_column_usage; +--------------------+-------------------+-----------------+---------------+--------------+------------+-------------+------------------+-------------------------------+-------------------------+-----------------------+------------------------+ -- tables not implemented -desc table COLUMN_PRIVILEGES; +DESC TABLE COLUMN_PRIVILEGES; +----------------+--------+-----+------+---------+---------------+ | Column | Type | Key | Null | Default | Semantic Type | @@ -542,14 +569,14 @@ desc table COLUMN_PRIVILEGES; | is_grantable | String | | NO | | FIELD | +----------------+--------+-----+------+---------+---------------+ -select * from COLUMN_PRIVILEGES; +SELECT * FROM COLUMN_PRIVILEGES; +---------+---------------+--------------+------------+-------------+----------------+--------------+ | grantee | table_catalog | table_schema | table_name | column_name | privilege_type | is_grantable | +---------+---------------+--------------+------------+-------------+----------------+--------------+ +---------+---------------+--------------+------------+-------------+----------------+--------------+ -desc table COLUMN_STATISTICS; +DESC TABLE COLUMN_STATISTICS; +-------------+--------+-----+------+---------+---------------+ | Column | Type | Key | Null | Default | Semantic Type | @@ -560,14 +587,14 @@ desc table COLUMN_STATISTICS; | histogram | String | | NO | | FIELD | +-------------+--------+-----+------+---------+---------------+ -select * from COLUMN_STATISTICS; +SELECT * FROM COLUMN_STATISTICS; +-------------+------------+-------------+-----------+ | schema_name | table_name | column_name | histogram | +-------------+------------+-------------+-----------+ +-------------+------------+-------------+-----------+ -select * from CHARACTER_SETS; +SELECT * FROM CHARACTER_SETS; +--------------------+----------------------+---------------+--------+ | character_set_name | default_collate_name | description | maxlen | @@ -575,7 +602,7 @@ select * from CHARACTER_SETS; | utf8 | utf8_bin | UTF-8 Unicode | 4 | +--------------------+----------------------+---------------+--------+ -select * from COLLATIONS; +SELECT * FROM COLLATIONS; +----------------+--------------------+----+------------+-------------+---------+ | collation_name | character_set_name | id | is_default | is_compiled | sortlen | @@ -583,7 +610,7 @@ select * from COLLATIONS; | utf8_bin | utf8 | 1 | Yes | Yes | 1 | +----------------+--------------------+----+------------+-------------+---------+ -select * from COLLATION_CHARACTER_SET_APPLICABILITY; +SELECT * FROM COLLATION_CHARACTER_SET_APPLICABILITY; +----------------+--------------------+ | collation_name | character_set_name | @@ -591,7 +618,7 @@ select * from COLLATION_CHARACTER_SET_APPLICABILITY; | utf8_bin | utf8 | +----------------+--------------------+ -desc table CHECK_CONSTRAINTS; +DESC TABLE CHECK_CONSTRAINTS; +--------------------+--------+-----+------+---------+---------------+ | Column | Type | Key | Null | Default | Semantic Type | @@ -602,14 +629,14 @@ desc table CHECK_CONSTRAINTS; | check_clause | String | | NO | | FIELD | +--------------------+--------+-----+------+---------+---------------+ -select * from CHECK_CONSTRAINTS; +SELECT * FROM CHECK_CONSTRAINTS; +--------------------+-------------------+-----------------+--------------+ | constraint_catalog | constraint_schema | constraint_name | check_clause | +--------------------+-------------------+-----------------+--------------+ +--------------------+-------------------+-----------------+--------------+ -desc table RUNTIME_METRICS; +DESC TABLE RUNTIME_METRICS; +-------------+----------------------+-----+------+---------+---------------+ | Column | Type | Key | Null | Default | Semantic Type | @@ -622,6 +649,19 @@ desc table RUNTIME_METRICS; | timestamp | TimestampMillisecond | | NO | | FIELD | +-------------+----------------------+-----+------+---------+---------------+ +DESC TABLE GREPTIME_REGION_PEERS; + ++--------------+--------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------------+--------+-----+------+---------+---------------+ +| region_id | UInt64 | | NO | | FIELD | +| peer_id | UInt64 | | YES | | FIELD | +| peer_addr | String | | YES | | FIELD | +| is_leader | String | | YES | | FIELD | +| status | String | | YES | | FIELD | +| down_seconds | Int64 | | YES | | FIELD | ++--------------+--------+-----+------+---------+---------------+ + drop table my_db.foo; Affected Rows: 0 diff --git a/tests/cases/standalone/common/system/information_schema.sql b/tests/cases/standalone/common/system/information_schema.sql index 625391344aa3..76261d1c665b 100644 --- a/tests/cases/standalone/common/system/information_schema.sql +++ b/tests/cases/standalone/common/system/information_schema.sql @@ -97,25 +97,27 @@ desc table key_column_usage; select * from key_column_usage; -- tables not implemented -desc table COLUMN_PRIVILEGES; +DESC TABLE COLUMN_PRIVILEGES; -select * from COLUMN_PRIVILEGES; +SELECT * FROM COLUMN_PRIVILEGES; -desc table COLUMN_STATISTICS; +DESC TABLE COLUMN_STATISTICS; -select * from COLUMN_STATISTICS; +SELECT * FROM COLUMN_STATISTICS; -select * from CHARACTER_SETS; +SELECT * FROM CHARACTER_SETS; -select * from COLLATIONS; +SELECT * FROM COLLATIONS; -select * from COLLATION_CHARACTER_SET_APPLICABILITY; +SELECT * FROM COLLATION_CHARACTER_SET_APPLICABILITY; -desc table CHECK_CONSTRAINTS; +DESC TABLE CHECK_CONSTRAINTS; -select * from CHECK_CONSTRAINTS; +SELECT * FROM CHECK_CONSTRAINTS; -desc table RUNTIME_METRICS; +DESC TABLE RUNTIME_METRICS; + +DESC TABLE GREPTIME_REGION_PEERS; drop table my_db.foo;