Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix compile errors in benchmark and substrait #745

Merged
merged 4 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;

use arrow::array::{ArrayRef, PrimitiveArray, StringArray, TimestampNanosecondArray};
Expand All @@ -32,9 +31,7 @@ use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateExpr, InsertExpr};
use client::{Client, Database, Select};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use tokio::task::JoinSet;

const DATABASE_NAME: &str = "greptime";
Expand Down Expand Up @@ -86,10 +83,14 @@ async fn write_data(
pb_style: ProgressStyle,
) -> u128 {
let file = std::fs::File::open(&path).unwrap();
let file_reader = Arc::new(SerializedFileReader::new(file).unwrap());
let row_num = file_reader.metadata().file_metadata().num_rows();
let record_batch_reader = ParquetFileArrowReader::new(file_reader)
.get_record_reader(batch_size)
let record_batch_reader_builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let row_num = record_batch_reader_builder
.metadata()
.file_metadata()
.num_rows();
let record_batch_reader = record_batch_reader_builder
.with_batch_size(batch_size)
.build()
.unwrap();
let progress_bar = mpb.add(ProgressBar::new(row_num as _));
progress_bar.set_style(pb_style);
Expand Down Expand Up @@ -210,9 +211,10 @@ fn build_values(column: &ArrayRef) -> Values {
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _)
| DataType::Union(_, _, _)
| DataType::Dictionary(_, _)
| DataType::Decimal(_, _)
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _)
| DataType::Map(_, _) => todo!(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/substrait/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::collections::HashMap;

use datafusion::logical_plan::DFSchemaRef;
use datafusion::common::DFSchemaRef;
use substrait_proto::protobuf::extensions::simple_extension_declaration::{
ExtensionFunction, MappingType,
};
Expand Down
50 changes: 39 additions & 11 deletions src/common/substrait/src/df_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use std::collections::VecDeque;
use std::str::FromStr;

use datafusion::logical_plan::{Column, Expr};
use datafusion_expr::{expr_fn, lit, BuiltinScalarFunction, Operator};
use datafusion::common::Column;
use datafusion_expr::{expr_fn, lit, Between, BinaryExpr, BuiltinScalarFunction, Expr, Operator};
use datatypes::schema::Schema;
use snafu::{ensure, OptionExt};
use substrait_proto::protobuf::expression::field_reference::ReferenceType as FieldReferenceType;
Expand Down Expand Up @@ -311,21 +311,21 @@ pub fn convert_scalar_function(
// skip GetIndexedField, unimplemented.
"between" => {
ensure_arg_len(3)?;
Expr::Between {
Expr::Between(Between {
expr: Box::new(inputs.pop_front().unwrap()),
negated: false,
low: Box::new(inputs.pop_front().unwrap()),
high: Box::new(inputs.pop_front().unwrap()),
}
})
}
"not_between" => {
ensure_arg_len(3)?;
Expr::Between {
Expr::Between(Between {
expr: Box::new(inputs.pop_front().unwrap()),
negated: true,
low: Box::new(inputs.pop_front().unwrap()),
high: Box::new(inputs.pop_front().unwrap()),
}
})
}
// skip Case, is covered in substrait::SwitchExpression.
// skip Cast and TryCast, is covered in substrait::Cast.
Expand Down Expand Up @@ -477,7 +477,7 @@ pub fn expression_from_df_expr(
rex_type: Some(RexType::Literal(l)),
}
}
Expr::BinaryExpr { left, op, right } => {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let left = expression_from_df_expr(ctx, left, schema)?;
let right = expression_from_df_expr(ctx, right, schema)?;
let arguments = utils::expression_to_argument(vec![left, right]);
Expand Down Expand Up @@ -518,12 +518,12 @@ pub fn expression_from_df_expr(
name: expr.to_string(),
}
.fail()?,
Expr::Between {
Expr::Between(Between {
expr,
negated,
low,
high,
} => {
}) => {
let expr = expression_from_df_expr(ctx, expr, schema)?;
let low = expression_from_df_expr(ctx, low, schema)?;
let high = expression_from_df_expr(ctx, high, schema)?;
Expand Down Expand Up @@ -564,7 +564,21 @@ pub fn expression_from_df_expr(
| Expr::WindowFunction { .. }
| Expr::AggregateUDF { .. }
| Expr::InList { .. }
| Expr::Wildcard => UnsupportedExprSnafu {
| Expr::Wildcard
| Expr::Like(_)
| Expr::ILike(_)
| Expr::SimilarTo(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsUnknown(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::IsNotUnknown(_)
| Expr::Exists { .. }
| Expr::InSubquery { .. }
| Expr::ScalarSubquery(..)
| Expr::QualifiedWildcard { .. } => todo!(),
Expr::GroupingSet(_) => UnsupportedExprSnafu {
name: expr.to_string(),
}
.fail()?,
Expand Down Expand Up @@ -628,6 +642,10 @@ mod utils {
Operator::RegexNotIMatch => "regex_not_i_match",
Operator::BitwiseAnd => "bitwise_and",
Operator::BitwiseOr => "bitwise_or",
Operator::BitwiseXor => "bitwise_xor",
Operator::BitwiseShiftRight => "bitwise_shift_right",
Operator::BitwiseShiftLeft => "bitwise_shift_left",
Operator::StringConcat => "string_concat",
}
}

Expand Down Expand Up @@ -679,7 +697,6 @@ mod utils {
BuiltinScalarFunction::Sqrt => "sqrt",
BuiltinScalarFunction::Tan => "tan",
BuiltinScalarFunction::Trunc => "trunc",
BuiltinScalarFunction::Array => "make_array",
BuiltinScalarFunction::Ascii => "ascii",
BuiltinScalarFunction::BitLength => "bit_length",
BuiltinScalarFunction::Btrim => "btrim",
Expand Down Expand Up @@ -723,6 +740,17 @@ mod utils {
BuiltinScalarFunction::Trim => "trim",
BuiltinScalarFunction::Upper => "upper",
BuiltinScalarFunction::RegexpMatch => "regexp_match",
BuiltinScalarFunction::Atan2 => "atan2",
BuiltinScalarFunction::Coalesce => "coalesce",
BuiltinScalarFunction::Power => "power",
BuiltinScalarFunction::MakeArray => "make_array",
BuiltinScalarFunction::DateBin => "date_bin",
BuiltinScalarFunction::FromUnixtime => "from_unixtime",
BuiltinScalarFunction::CurrentDate => "current_date",
BuiltinScalarFunction::CurrentTime => "current_time",
BuiltinScalarFunction::Uuid => "uuid",
BuiltinScalarFunction::Struct => "struct",
BuiltinScalarFunction::ArrowTypeof => "arrow_type_of",
}
}
}
Expand Down
43 changes: 29 additions & 14 deletions src/common/substrait/src/df_logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use catalog::CatalogManagerRef;
use common_error::prelude::BoxedError;
use common_telemetry::debug;
use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datafusion::datasource::TableProvider;
use datafusion::logical_plan::plan::Filter;
use datafusion::logical_plan::{LogicalPlan, TableScan, ToDFSchema};
use datafusion::common::ToDFSchema;
use datafusion::datasource::DefaultTableSource;
use datafusion::physical_plan::project_schema;
use datafusion_expr::{Filter, LogicalPlan, TableScan, TableSource};
use prost::Message;
use snafu::{ensure, OptionExt, ResultExt};
use substrait_proto::protobuf::expression::mask_expression::{StructItem, StructSelect};
Expand Down Expand Up @@ -144,7 +144,7 @@ impl DFLogicalSubstraitConvertor {
.context(error::ConvertDfSchemaSnafu)?;
let predicate = to_df_expr(ctx, *condition, &schema)?;

LogicalPlan::Filter(Filter { predicate, input })
LogicalPlan::Filter(Filter::try_new(predicate, input).context(DFInternalSnafu)?)
}
RelType::Fetch(_fetch_rel) => UnsupportedPlanSnafu {
name: "Fetch Relation",
Expand Down Expand Up @@ -238,7 +238,9 @@ impl DFLogicalSubstraitConvertor {
.context(TableNotFoundSnafu {
name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
})?;
let adapter = Arc::new(DfTableProviderAdapter::new(table_ref));
let adapter = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(table_ref),
)));

// Get schema directly from the table, and compare it with the schema retrieved from substrait proto.
let stored_schema = adapter.schema();
Expand Down Expand Up @@ -267,14 +269,14 @@ impl DFLogicalSubstraitConvertor {

ctx.set_df_schema(projected_schema.clone());

// TODO(ruihang): Support limit
// TODO(ruihang): Support limit(fetch)
Ok(LogicalPlan::TableScan(TableScan {
table_name: format!("{}.{}.{}", catalog_name, schema_name, table_name),
source: adapter,
projection,
projected_schema,
filters,
limit: None,
fetch: None,
}))
}

Expand Down Expand Up @@ -302,7 +304,7 @@ impl DFLogicalSubstraitConvertor {
.fail()?,
LogicalPlan::Filter(filter) => {
let input = Some(Box::new(
self.logical_plan_to_rel(ctx, filter.input.clone())?,
self.logical_plan_to_rel(ctx, filter.input().clone())?,
));

let schema = plan
Expand All @@ -312,7 +314,7 @@ impl DFLogicalSubstraitConvertor {
.context(error::ConvertDfSchemaSnafu)?;
let condition = Some(Box::new(expression_from_df_expr(
ctx,
&filter.predicate,
filter.predicate(),
&schema,
)?));

Expand Down Expand Up @@ -368,7 +370,16 @@ impl DFLogicalSubstraitConvertor {
name: "DataFusion Logical Limit",
}
.fail()?,
LogicalPlan::CreateExternalTable(_)

LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::CreateView(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropView(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::SetVariable(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::Values(_)
Expand Down Expand Up @@ -485,7 +496,9 @@ impl DFLogicalSubstraitConvertor {
fn same_schema_without_metadata(lhs: &ArrowSchemaRef, rhs: &ArrowSchemaRef) -> bool {
lhs.fields.len() == rhs.fields.len()
&& lhs.fields.iter().zip(rhs.fields.iter()).all(|(x, y)| {
x.name == y.name && x.data_type == y.data_type && x.is_nullable == y.is_nullable
x.name() == y.name()
&& x.data_type() == y.data_type()
&& x.is_nullable() == y.is_nullable()
})
}

Expand All @@ -494,7 +507,7 @@ mod test {
use catalog::local::{LocalCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datafusion::logical_plan::DFSchema;
use datafusion::common::{DFSchema, ToDFSchema};
use datatypes::schema::Schema;
use table::requests::CreateTableRequest;
use table::test_util::{EmptyTable, MockTableEngine};
Expand Down Expand Up @@ -564,7 +577,9 @@ mod test {
})
.await
.unwrap();
let adapter = Arc::new(DfTableProviderAdapter::new(table_ref));
let adapter = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(table_ref),
)));

let projection = vec![1, 3, 5];
let df_schema = adapter.schema().to_dfschema().unwrap();
Expand All @@ -584,7 +599,7 @@ mod test {
projection: Some(projection),
projected_schema,
filters: vec![],
limit: None,
fetch: None,
});

logical_plan_round_trip(table_scan_plan, catalog_manager).await;
Expand Down
4 changes: 3 additions & 1 deletion src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,9 @@ mod tests {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, SchemaBuilder};
use datatypes::value::Value;
use datatypes::vectors::TimestampMillisecondVector;
use datatypes::vectors::{
Float64Vector, Int32Vector, StringVector, TimestampMillisecondVector, VectorRef,
};
use log_store::fs::noop::NoopLogStore;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
Expand Down
13 changes: 8 additions & 5 deletions src/sql/src/statements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ pub mod insert;
pub mod query;
pub mod show;
pub mod statement;

use std::str::FromStr;

use api::helper::ColumnDataTypeWrapper;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_time::Timestamp;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
use datatypes::types::DateTimeType;
Expand Down Expand Up @@ -79,7 +79,7 @@ fn parse_string_to_value(
data_type: &ConcreteDataType,
) -> Result<Value> {
ensure!(
data_type.stringifiable(),
data_type.is_stringifiable(),
ColumnTypeMismatchSnafu {
column_name,
expect: data_type.clone(),
Expand Down Expand Up @@ -112,8 +112,8 @@ fn parse_string_to_value(
ConcreteDataType::Timestamp(t) => {
if let Ok(ts) = Timestamp::from_str(&s) {
Ok(Value::Timestamp(Timestamp::new(
ts.convert_to(t.unit),
t.unit,
ts.convert_to(t.unit()),
t.unit(),
)))
} else {
ParseSqlValueSnafu {
Expand Down Expand Up @@ -301,7 +301,10 @@ pub fn sql_data_type_to_concrete_data_type(data_type: &SqlDataType) -> Result<Co
SqlDataType::Date => Ok(ConcreteDataType::date_datatype()),
SqlDataType::Custom(obj_name) => match &obj_name.0[..] {
[type_name] => {
if type_name.value.eq_ignore_ascii_case(DateTimeType::name()) {
if type_name
.value
.eq_ignore_ascii_case(DateTimeType::default().name())
{
Ok(ConcreteDataType::datetime_datatype())
} else {
error::SqlTypeNotSupportedSnafu {
Expand Down
17 changes: 16 additions & 1 deletion tests/runner/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,26 @@ pub fn values_to_string(data_type: ColumnDataType, values: Values) -> Vec<String
.into_iter()
.map(|v| v.to_string())
.collect(),
ColumnDataType::Timestamp => values
ColumnDataType::TimestampSecond => values
.ts_second_values
.into_iter()
.map(|v| v.to_string())
.collect(),
ColumnDataType::TimestampMillisecond => values
.ts_millisecond_values
.into_iter()
.map(|v| v.to_string())
.collect(),
ColumnDataType::TimestampMicrosecond => values
.ts_microsecond_values
.into_iter()
.map(|v| v.to_string())
.collect(),
ColumnDataType::TimestampNanosecond => values
.ts_nanosecond_values
.into_iter()
.map(|v| v.to_string())
.collect(),
}
}

Expand Down