Skip to content

Commit

Permalink
feat: impl table_constraints table for information_schema (#3698)
Browse files Browse the repository at this point in the history
* feat: impl table_constraints table for information_schema

* test: update information_schema sqlness test

* test: adds table_constraints sqlness test
  • Loading branch information
killme2008 authored Apr 15, 2024
1 parent db329f6 commit 75d85f9
Show file tree
Hide file tree
Showing 10 changed files with 392 additions and 4 deletions.
10 changes: 10 additions & 0 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod predicate;
mod region_peers;
mod runtime_metrics;
pub mod schemata;
mod table_constraints;
mod table_names;
pub mod tables;

Expand Down Expand Up @@ -52,6 +53,7 @@ use crate::information_schema::partitions::InformationSchemaPartitions;
use crate::information_schema::region_peers::InformationSchemaRegionPeers;
use crate::information_schema::runtime_metrics::InformationSchemaMetrics;
use crate::information_schema::schemata::InformationSchemaSchemata;
use crate::information_schema::table_constraints::InformationSchemaTableConstraints;
use crate::information_schema::tables::InformationSchemaTables;
use crate::CatalogManager;

Expand Down Expand Up @@ -173,6 +175,10 @@ impl InformationSchemaProvider {
KEY_COLUMN_USAGE.to_string(),
self.build_table(KEY_COLUMN_USAGE).unwrap(),
);
tables.insert(
TABLE_CONSTRAINTS.to_string(),
self.build_table(TABLE_CONSTRAINTS).unwrap(),
);

// Add memory tables
for name in MEMORY_TABLES.iter() {
Expand Down Expand Up @@ -241,6 +247,10 @@ impl InformationSchemaProvider {
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
TABLE_CONSTRAINTS => Some(Arc::new(InformationSchemaTableConstraints::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
)) as _),
_ => None,
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ impl InformationSchemaColumnsBuilder {
};

self.add_column(
idx,
&predicates,
idx,
&catalog_name,
&schema_name,
&table.table_info().name,
Expand All @@ -292,8 +292,8 @@ impl InformationSchemaColumnsBuilder {
#[allow(clippy::too_many_arguments)]
fn add_column(
&mut self,
index: usize,
predicates: &Predicates,
index: usize,
catalog_name: &str,
schema_name: &str,
table_name: &str,
Expand Down
9 changes: 7 additions & 2 deletions src/catalog/src/information_schema/key_column_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ pub const COLUMN_NAME: &str = "column_name";
pub const ORDINAL_POSITION: &str = "ordinal_position";
const INIT_CAPACITY: usize = 42;

/// Primary key constraint name
pub(crate) const PRI_CONSTRAINT_NAME: &str = "PRIMARY";
/// Time index constraint name
pub(crate) const TIME_INDEX_CONSTRAINT_NAME: &str = "TIME INDEX";

/// The virtual table implementation for `information_schema.KEY_COLUMN_USAGE`.
pub(super) struct InformationSchemaKeyColumnUsage {
schema: SchemaRef,
Expand Down Expand Up @@ -232,7 +237,7 @@ impl InformationSchemaKeyColumnUsageBuilder {
self.add_key_column_usage(
&predicates,
&schema_name,
"TIME INDEX",
TIME_INDEX_CONSTRAINT_NAME,
&catalog_name,
&schema_name,
&table_name,
Expand Down Expand Up @@ -262,7 +267,7 @@ impl InformationSchemaKeyColumnUsageBuilder {
self.add_key_column_usage(
&predicates,
&schema_name,
"PRIMARY",
PRI_CONSTRAINT_NAME,
&catalog_name,
&schema_name,
&table_name,
Expand Down
286 changes: 286 additions & 0 deletions src/catalog/src/information_schema/table_constraints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, Weak};

use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID;
use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, MutableVector};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{ConstantVector, StringVector, StringVectorBuilder, VectorRef};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{ScanRequest, TableId};

use super::{InformationTable, TABLE_CONSTRAINTS};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::key_column_usage::{
PRI_CONSTRAINT_NAME, TIME_INDEX_CONSTRAINT_NAME,
};
use crate::information_schema::Predicates;
use crate::CatalogManager;

/// The `TABLE_CONSTRAINTS` table describes which tables have constraints.
pub(super) struct InformationSchemaTableConstraints {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
}

const CONSTRAINT_CATALOG: &str = "constraint_catalog";
const CONSTRAINT_SCHEMA: &str = "constraint_schema";
const CONSTRAINT_NAME: &str = "constraint_name";
const TABLE_SCHEMA: &str = "table_schema";
const TABLE_NAME: &str = "table_name";
const CONSTRAINT_TYPE: &str = "constraint_type";
const ENFORCED: &str = "enforced";

const INIT_CAPACITY: usize = 42;

const TIME_INDEX_CONSTRAINT_TYPE: &str = "TIME INDEX";
const PRI_KEY_CONSTRAINT_TYPE: &str = "PRIMARY KEY";

impl InformationSchemaTableConstraints {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
}
}

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
ColumnSchema::new(
CONSTRAINT_CATALOG,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(
CONSTRAINT_SCHEMA,
ConcreteDataType::string_datatype(),
false,
),
ColumnSchema::new(CONSTRAINT_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_SCHEMA, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(TABLE_NAME, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(CONSTRAINT_TYPE, ConcreteDataType::string_datatype(), false),
ColumnSchema::new(ENFORCED, ConcreteDataType::string_datatype(), false),
]))
}

fn builder(&self) -> InformationSchemaTableConstraintsBuilder {
InformationSchemaTableConstraintsBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
)
}
}

impl InformationTable for InformationSchemaTableConstraints {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID
}

fn table_name(&self) -> &'static str {
TABLE_CONSTRAINTS
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_table_constraints(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}

struct InformationSchemaTableConstraintsBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,

constraint_schemas: StringVectorBuilder,
constraint_names: StringVectorBuilder,
table_schemas: StringVectorBuilder,
table_names: StringVectorBuilder,
constraint_types: StringVectorBuilder,
}

impl InformationSchemaTableConstraintsBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
constraint_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
constraint_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_schemas: StringVectorBuilder::with_capacity(INIT_CAPACITY),
table_names: StringVectorBuilder::with_capacity(INIT_CAPACITY),
constraint_types: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}

/// Construct the `information_schema.table_constraints` virtual table
async fn make_table_constraints(
&mut self,
request: Option<ScanRequest>,
) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);

for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;

while let Some(table) = stream.try_next().await? {
let keys = &table.table_info().meta.primary_key_indices;
let schema = table.schema();

if schema.timestamp_index().is_some() {
self.add_table_constraint(
&predicates,
&schema_name,
TIME_INDEX_CONSTRAINT_NAME,
&schema_name,
&table.table_info().name,
TIME_INDEX_CONSTRAINT_TYPE,
);
}

if !keys.is_empty() {
self.add_table_constraint(
&predicates,
&schema_name,
PRI_CONSTRAINT_NAME,
&schema_name,
&table.table_info().name,
PRI_KEY_CONSTRAINT_TYPE,
);
}
}
}

self.finish()
}

fn add_table_constraint(
&mut self,
predicates: &Predicates,
constraint_schema: &str,
constraint_name: &str,
table_schema: &str,
table_name: &str,
constraint_type: &str,
) {
let row = [
(CONSTRAINT_SCHEMA, &Value::from(constraint_schema)),
(CONSTRAINT_NAME, &Value::from(constraint_name)),
(TABLE_SCHEMA, &Value::from(table_schema)),
(TABLE_NAME, &Value::from(table_name)),
(CONSTRAINT_TYPE, &Value::from(constraint_type)),
];

if !predicates.eval(&row) {
return;
}

self.constraint_schemas.push(Some(constraint_schema));
self.constraint_names.push(Some(constraint_name));
self.table_schemas.push(Some(table_schema));
self.table_names.push(Some(table_name));
self.constraint_types.push(Some(constraint_type));
}

fn finish(&mut self) -> Result<RecordBatch> {
let rows_num = self.constraint_names.len();

let constraint_catalogs = Arc::new(ConstantVector::new(
Arc::new(StringVector::from(vec!["def"])),
rows_num,
));
let enforceds = Arc::new(ConstantVector::new(
Arc::new(StringVector::from(vec!["YES"])),
rows_num,
));

let columns: Vec<VectorRef> = vec![
constraint_catalogs,
Arc::new(self.constraint_schemas.finish()),
Arc::new(self.constraint_names.finish()),
Arc::new(self.table_schemas.finish()),
Arc::new(self.table_names.finish()),
Arc::new(self.constraint_types.finish()),
enforceds,
];

RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}

impl DfPartitionStream for InformationSchemaTableConstraints {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}

fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_table_constraints(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}
1 change: 1 addition & 0 deletions src/catalog/src/information_schema/table_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ pub const SESSION_STATUS: &str = "session_status";
pub const RUNTIME_METRICS: &str = "runtime_metrics";
pub const PARTITIONS: &str = "partitions";
pub const REGION_PEERS: &str = "greptime_region_peers";
pub const TABLE_CONSTRAINTS: &str = "table_constraints";
2 changes: 2 additions & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub const INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID: u32 = 27;
pub const INFORMATION_SCHEMA_PARTITIONS_TABLE_ID: u32 = 28;
/// id for information_schema.REGION_PEERS
pub const INFORMATION_SCHEMA_REGION_PEERS_TABLE_ID: u32 = 29;
/// id for information_schema.columns
pub const INFORMATION_SCHEMA_TABLE_CONSTRAINTS_TABLE_ID: u32 = 30;
/// ----- End of information_schema tables -----
pub const MITO_ENGINE: &str = "mito";
Expand Down
Loading

0 comments on commit 75d85f9

Please sign in to comment.