Skip to content

Commit

Permalink
fix: resolve PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield committed Dec 6, 2022
1 parent 039dc37 commit 911fafc
Show file tree
Hide file tree
Showing 18 changed files with 59 additions and 52 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions src/common/catalog/src/helper.rs → src/catalog/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
use std::collections::HashMap;
use std::fmt::{Display, Formatter};

use common_catalog::error::{
DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu,
};
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize, Serializer};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::{RawTableInfo, TableId, TableVersion};

use crate::consts::{
CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_GLOBAL_KEY_PREFIX, TABLE_REGIONAL_KEY_PREFIX,
};
use crate::error::{
DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu,
};
const CATALOG_KEY_PREFIX: &str = "__c";
const SCHEMA_KEY_PREFIX: &str = "__s";
const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg";
const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr";

const ALPHANUMERICS_NAME_PATTERN: &str = "[a-zA-Z_][a-zA-Z0-9_]*";

Expand Down
1 change: 1 addition & 0 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::error::{CreateTableSnafu, Result};
pub use crate::schema::{SchemaProvider, SchemaProviderRef};

pub mod error;
pub mod helper;
pub mod local;
pub mod remote;
pub mod schema;
Expand Down
8 changes: 4 additions & 4 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use async_stream::stream;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_catalog::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
use common_telemetry::{debug, info};
use futures::Stream;
use futures_util::StreamExt;
Expand All @@ -39,6 +35,10 @@ use crate::error::{
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, InvalidTableSchemaSnafu,
OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu,
};
use crate::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
use crate::remote::{Kv, KvBackendRef};
use crate::{
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/tests/remote_catalog_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ mod tests {
use std::collections::HashSet;
use std::sync::Arc;

use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::{
KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider,
};
use catalog::{CatalogList, CatalogManager, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use datatypes::schema::Schema;
use futures_util::StreamExt;
use table::engine::{EngineContext, TableEngineRef};
Expand Down
1 change: 0 additions & 1 deletion src/common/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ regex = "1.6"
serde = "1.0"
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../../table" }

[dev-dependencies]
chrono = "0.4"
Expand Down
6 changes: 0 additions & 6 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,3 @@ pub const MIN_USER_TABLE_ID: u32 = 1024;
pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0;
/// scripts table id
pub const SCRIPTS_TABLE_ID: u32 = 1;

pub(crate) const CATALOG_KEY_PREFIX: &str = "__c";
pub(crate) const SCHEMA_KEY_PREFIX: &str = "__s";
pub(crate) const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg";
pub(crate) const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr";
pub const TABLE_ID_KEY_PREFIX: &str = "__tid";
7 changes: 0 additions & 7 deletions src/common/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,3 @@

pub mod consts;
pub mod error;
mod helper;

pub use helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix,
build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey,
TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
29 changes: 12 additions & 17 deletions src/common/substrait/src/df_logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ use bytes::{Buf, Bytes, BytesMut};
use catalog::CatalogManagerRef;
use common_error::prelude::BoxedError;
use common_telemetry::debug;
use datafusion::arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::datasource::TableProvider;
use datafusion::logical_plan::plan::Filter;
use datafusion::logical_plan::{DFSchemaRef, LogicalPlan, TableScan, ToDFSchema};
use datafusion::logical_plan::{LogicalPlan, TableScan, ToDFSchema};
use datafusion::physical_plan::project_schema;
use datatypes::schema::Schema;
use prost::Message;
use snafu::{ensure, OptionExt, ResultExt};
use substrait_proto::protobuf::expression::mask_expression::{StructItem, StructSelect};
Expand All @@ -38,7 +37,7 @@ use table::table::adapter::DfTableProviderAdapter;
use crate::context::ConvertorContext;
use crate::df_expr::{expression_from_df_expr, to_df_expr};
use crate::error::{
DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, Error, InternalSnafu,
self, DFInternalSnafu, DecodeRelSnafu, EmptyPlanSnafu, EncodeRelSnafu, Error, InternalSnafu,
InvalidParametersSnafu, MissingFieldSnafu, SchemaNotMatchSnafu, TableNotFoundSnafu,
UnknownPlanSnafu, UnsupportedExprSnafu, UnsupportedPlanSnafu,
};
Expand Down Expand Up @@ -139,7 +138,10 @@ impl DFLogicalSubstraitConvertor {
let schema = ctx.df_schema().context(InvalidParametersSnafu {
reason: "the underlying TableScan plan should have included a table schema",
})?;
let schema = try_convert_df_schema(schema)?;
let schema = schema
.clone()
.try_into()
.context(error::ConvertDfSchemaSnafu)?;
let predicate = to_df_expr(ctx, *condition, &schema)?;

LogicalPlan::Filter(Filter { predicate, input })
Expand Down Expand Up @@ -303,7 +305,11 @@ impl DFLogicalSubstraitConvertor {
self.logical_plan_to_rel(ctx, filter.input.clone())?,
));

let schema = try_convert_df_schema(plan.schema())?;
let schema = plan
.schema()
.clone()
.try_into()
.context(error::ConvertDfSchemaSnafu)?;
let condition = Some(Box::new(expression_from_df_expr(
ctx,
&filter.predicate,
Expand Down Expand Up @@ -483,17 +489,6 @@ fn same_schema_without_metadata(lhs: &ArrowSchemaRef, rhs: &ArrowSchemaRef) -> b
})
}

fn try_convert_df_schema(df_schema: &DFSchemaRef) -> Result<Schema, Error> {
#[allow(clippy::needless_borrow)]
let arrow_schema: ArrowSchema = (&**df_schema).into();

let schema: Schema = arrow_schema
.try_into()
.map_err(BoxedError::new)
.context(InternalSnafu)?;
Ok(schema)
}

#[cfg(test)]
mod test {
use catalog::local::{LocalCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
Expand Down
7 changes: 7 additions & 0 deletions src/common/substrait/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ pub enum Error {
storage_schema: datafusion::arrow::datatypes::SchemaRef,
backtrace: Backtrace,
},

#[snafu(display("Failed to convert DataFusion schema, source: {}", source))]
ConvertDfSchema {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -120,6 +126,7 @@ impl ErrorExt for Error {
| Error::TableNotFound { .. }
| Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments,
Error::DFInternal { .. } | Error::Internal { .. } => StatusCode::Internal,
Error::ConvertDfSchema { source } => source.status_code(),
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;

pub use arrow::datatypes::Metadata;
use arrow::datatypes::{Field, Schema as ArrowSchema};
use datafusion_common::DFSchemaRef;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};

Expand Down Expand Up @@ -465,6 +466,15 @@ impl TryFrom<ArrowSchema> for Schema {
}
}

impl TryFrom<DFSchemaRef> for Schema {
type Error = Error;

fn try_from(value: DFSchemaRef) -> Result<Self> {
let s: ArrowSchema = value.as_ref().into();
s.try_into()
}
}

fn try_parse_version(metadata: &Metadata, key: &str) -> Result<u32> {
if let Some(value) = metadata.get(key) {
let version = value
Expand Down
11 changes: 7 additions & 4 deletions src/frontend/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ use std::collections::HashSet;
use std::sync::Arc;

use catalog::error::{self as catalog_err, InvalidCatalogValueSnafu};
use catalog::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
TableGlobalKey, TableGlobalValue,
};
use catalog::remote::{Kv, KvBackendRef};
use catalog::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider,
SchemaProviderRef,
};
use common_catalog::{CatalogKey, SchemaKey, TableGlobalKey, TableGlobalValue};
use futures::StreamExt;
use meta_client::rpc::TableName;
use snafu::prelude::*;
Expand Down Expand Up @@ -130,7 +133,7 @@ impl CatalogList for FrontendCatalogManager {
let backend = self.backend.clone();
let res = std::thread::spawn(|| {
common_runtime::block_on_read(async move {
let key = common_catalog::build_catalog_prefix();
let key = build_catalog_prefix();
let mut iter = backend.range(key.as_bytes());
let mut res = HashSet::new();

Expand Down Expand Up @@ -180,7 +183,7 @@ impl CatalogProvider for FrontendCatalogProvider {
let catalog_name = self.catalog_name.clone();
let res = std::thread::spawn(|| {
common_runtime::block_on_read(async move {
let key = common_catalog::build_schema_prefix(&catalog_name);
let key = build_schema_prefix(&catalog_name);
let mut iter = backend.range(key.as_bytes());
let mut res = HashSet::new();

Expand Down Expand Up @@ -242,7 +245,7 @@ impl SchemaProvider for FrontendSchemaProvider {

std::thread::spawn(|| {
common_runtime::block_on_read(async move {
let key = common_catalog::build_table_global_prefix(catalog_name, schema_name);
let key = build_table_global_prefix(catalog_name, schema_name);
let mut iter = backend.range(key.as_bytes());
let mut res = HashSet::new();

Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use std::sync::Arc;

use api::helper::ColumnDataTypeWrapper;
use api::v1::{AlterExpr, CreateDatabaseExpr, CreateExpr};
use catalog::helper::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue};
use catalog::CatalogList;
use chrono::DateTime;
use client::admin::{admin_result_to_output, Admin};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::{SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue};
use common_query::Output;
use common_telemetry::{debug, error, info};
use datatypes::prelude::ConcreteDataType;
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mock = []
[dependencies]
api = { path = "../api" }
async-trait = "0.1"
catalog = { path = "../catalog" }
common-base = { path = "../common/base" }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::str::FromStr;

use api::v1::meta::TableName;
use common_catalog::TableGlobalKey;
use catalog::helper::TableGlobalKey;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion src/meta-srv/src/service/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use api::v1::meta::{
router_server, CreateRequest, Error, PeerDict, PutRequest, RangeRequest, Region, RegionRoute,
ResponseHeader, RouteRequest, RouteResponse, Table, TableRoute, TableRouteValue,
};
use common_catalog::{TableGlobalKey, TableGlobalValue};
use catalog::helper::{TableGlobalKey, TableGlobalValue};
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};
Expand Down
1 change: 1 addition & 0 deletions src/table/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "Apache-2.0"
[dependencies]
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
Expand Down
5 changes: 3 additions & 2 deletions src/table/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use chrono::{DateTime, Utc};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
pub use datatypes::error::{Error as ConvertError, Result as ConvertResult};
use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRef};
use derive_builder::Builder;
Expand Down Expand Up @@ -333,9 +334,9 @@ pub struct TableInfo {
/// Comment of the table.
#[builder(default, setter(into))]
pub desc: Option<String>,
#[builder(default = "\"greptime\".to_string()", setter(into))]
#[builder(default = "DEFAULT_CATALOG_NAME.to_string()", setter(into))]
pub catalog_name: String,
#[builder(default = "\"public\".to_string()", setter(into))]
#[builder(default = "DEFAULT_SCHEMA_NAME.to_string()", setter(into))]
pub schema_name: String,
pub meta: TableMeta,
#[builder(default = "TableType::Base")]
Expand Down

0 comments on commit 911fafc

Please sign in to comment.