Skip to content

Commit

Permalink
feat: Substrait logical plan (#704)
Browse files Browse the repository at this point in the history
* feat: use Substrait logical plan to query data from Datanode in Frontend in distributed mode

* fix: resolve PR comments

* fix: resolve PR comments

* fix: resolve PR comments

Co-authored-by: luofucong <[email protected]>
  • Loading branch information
MichaelScofield and MichaelScofield authored Dec 6, 2022
1 parent 2034b40 commit 8959dbc
Show file tree
Hide file tree
Showing 27 changed files with 315 additions and 179 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions src/common/catalog/src/helper.rs → src/catalog/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
use std::collections::HashMap;
use std::fmt::{Display, Formatter};

use common_catalog::error::{
DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu,
};
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize, Serializer};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::{RawTableInfo, TableId, TableVersion};

use crate::consts::{
CATALOG_KEY_PREFIX, SCHEMA_KEY_PREFIX, TABLE_GLOBAL_KEY_PREFIX, TABLE_REGIONAL_KEY_PREFIX,
};
use crate::error::{
DeserializeCatalogEntryValueSnafu, Error, InvalidCatalogSnafu, SerializeCatalogEntryValueSnafu,
};
const CATALOG_KEY_PREFIX: &str = "__c";
const SCHEMA_KEY_PREFIX: &str = "__s";
const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg";
const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr";

const ALPHANUMERICS_NAME_PATTERN: &str = "[a-zA-Z_][a-zA-Z0-9_]*";

Expand Down
1 change: 1 addition & 0 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::error::{CreateTableSnafu, Result};
pub use crate::schema::{SchemaProvider, SchemaProviderRef};

pub mod error;
pub mod helper;
pub mod local;
pub mod remote;
pub mod schema;
Expand Down
8 changes: 4 additions & 4 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ use std::sync::Arc;
use arc_swap::ArcSwap;
use async_stream::stream;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_catalog::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
use common_telemetry::{debug, info};
use futures::Stream;
use futures_util::StreamExt;
Expand All @@ -39,6 +35,10 @@ use crate::error::{
CatalogNotFoundSnafu, CreateTableSnafu, InvalidCatalogValueSnafu, InvalidTableSchemaSnafu,
OpenTableSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, UnimplementedSnafu,
};
use crate::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, CatalogValue,
SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
use crate::remote::{Kv, KvBackendRef};
use crate::{
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/tests/remote_catalog_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ mod tests {
use std::collections::HashSet;
use std::sync::Arc;

use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::{
KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider,
};
use catalog::{CatalogList, CatalogManager, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use datatypes::schema::Schema;
use futures_util::StreamExt;
use table::engine::{EngineContext, TableEngineRef};
Expand Down
1 change: 0 additions & 1 deletion src/common/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ regex = "1.6"
serde = "1.0"
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../../table" }

[dev-dependencies]
chrono = "0.4"
Expand Down
6 changes: 0 additions & 6 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,3 @@ pub const MIN_USER_TABLE_ID: u32 = 1024;
pub const SYSTEM_CATALOG_TABLE_ID: u32 = 0;
/// scripts table id
pub const SCRIPTS_TABLE_ID: u32 = 1;

pub(crate) const CATALOG_KEY_PREFIX: &str = "__c";
pub(crate) const SCHEMA_KEY_PREFIX: &str = "__s";
pub(crate) const TABLE_GLOBAL_KEY_PREFIX: &str = "__tg";
pub(crate) const TABLE_REGIONAL_KEY_PREFIX: &str = "__tr";
pub const TABLE_ID_KEY_PREFIX: &str = "__tid";
7 changes: 0 additions & 7 deletions src/common/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,3 @@

pub mod consts;
pub mod error;
mod helper;

pub use helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix,
build_table_regional_prefix, CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey,
TableGlobalValue, TableRegionalKey, TableRegionalValue,
};
11 changes: 11 additions & 0 deletions src/common/substrait/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;

use datafusion::logical_plan::DFSchemaRef;
use substrait_proto::protobuf::extensions::simple_extension_declaration::{
ExtensionFunction, MappingType,
};
Expand All @@ -23,6 +24,7 @@ use substrait_proto::protobuf::extensions::SimpleExtensionDeclaration;
pub struct ConvertorContext {
scalar_fn_names: HashMap<String, u32>,
scalar_fn_map: HashMap<u32, String>,
df_schema: Option<DFSchemaRef>,
}

impl ConvertorContext {
Expand Down Expand Up @@ -63,4 +65,13 @@ impl ConvertorContext {
}
result
}

pub(crate) fn set_df_schema(&mut self, schema: DFSchemaRef) {
debug_assert!(self.df_schema.is_none());
self.df_schema.get_or_insert(schema);
}

pub(crate) fn df_schema(&self) -> Option<&DFSchemaRef> {
self.df_schema.as_ref()
}
}
34 changes: 27 additions & 7 deletions src/common/substrait/src/df_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ use std::collections::VecDeque;
use std::str::FromStr;

use datafusion::logical_plan::{Column, Expr};
use datafusion_expr::{expr_fn, BuiltinScalarFunction, Operator};
use datafusion_expr::{expr_fn, lit, BuiltinScalarFunction, Operator};
use datatypes::schema::Schema;
use snafu::{ensure, OptionExt};
use substrait_proto::protobuf::expression::field_reference::ReferenceType as FieldReferenceType;
use substrait_proto::protobuf::expression::reference_segment::{
ReferenceType as SegReferenceType, StructField,
};
use substrait_proto::protobuf::expression::{
FieldReference, ReferenceSegment, RexType, ScalarFunction,
FieldReference, Literal, ReferenceSegment, RexType, ScalarFunction,
};
use substrait_proto::protobuf::function_argument::ArgType;
use substrait_proto::protobuf::Expression;
Expand All @@ -33,15 +33,24 @@ use crate::context::ConvertorContext;
use crate::error::{
EmptyExprSnafu, InvalidParametersSnafu, MissingFieldSnafu, Result, UnsupportedExprSnafu,
};
use crate::types::{literal_type_to_scalar_value, scalar_value_as_literal_type};

/// Convert substrait's `Expression` to DataFusion's `Expr`.
pub fn to_df_expr(ctx: &ConvertorContext, expression: Expression, schema: &Schema) -> Result<Expr> {
pub(crate) fn to_df_expr(
ctx: &ConvertorContext,
expression: Expression,
schema: &Schema,
) -> Result<Expr> {
let expr_rex_type = expression.rex_type.context(EmptyExprSnafu)?;
match expr_rex_type {
RexType::Literal(_) => UnsupportedExprSnafu {
name: "substrait Literal expression",
RexType::Literal(l) => {
let t = l.literal_type.context(MissingFieldSnafu {
field: "LiteralType",
plan: "Literal",
})?;
let v = literal_type_to_scalar_value(t)?;
Ok(lit(v))
}
.fail()?,
RexType::Selection(selection) => convert_selection_rex(*selection, schema),
RexType::ScalarFunction(scalar_fn) => convert_scalar_function(ctx, scalar_fn, schema),
RexType::WindowFunction(_)
Expand Down Expand Up @@ -453,10 +462,21 @@ pub fn expression_from_df_expr(
}
}
// Don't merge them with other unsupported expr arms to preserve the ordering.
Expr::ScalarVariable(..) | Expr::Literal(..) => UnsupportedExprSnafu {
Expr::ScalarVariable(..) => UnsupportedExprSnafu {
name: expr.to_string(),
}
.fail()?,
Expr::Literal(v) => {
let t = scalar_value_as_literal_type(v)?;
let l = Literal {
nullable: true,
type_variation_reference: 0,
literal_type: Some(t),
};
Expression {
rex_type: Some(RexType::Literal(l)),
}
}
Expr::BinaryExpr { left, op, right } => {
let left = expression_from_df_expr(ctx, left, schema)?;
let right = expression_from_df_expr(ctx, right, schema)?;
Expand Down
Loading

0 comments on commit 8959dbc

Please sign in to comment.