From 0b05aa279004e469567afb3ab076bf0e3c3b1faf Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Mon, 1 Aug 2022 11:07:05 -0600 Subject: [PATCH] Upgrade arrow fix decimal (#4) Fix human error Patch crates io to fix build (#5) * fix decimal * patch crate versions Patch objectstore Test in CI Undo override? Fix more errors Fix last error? Formatting Clippy Fixes Fix refs Able to get session context, but JDBC driver hung Upgrade to arrow 20 Upgrade to RC2 Formatting Fix some imports Install protoc Try platform agnostic path Debug in CI :( Debug in CI :( Debug in CI :( Not worth it, just separate builds Variables Fixes Fix windows? Fix windows? Hackily fix windows Down to 1 failure Fix protoc All? tests pass Formatting --- .github/workflows/rust.yml | 76 +++++++- benchmarks/Cargo.toml | 2 +- datafusion-cli/Cargo.toml | 4 +- datafusion-examples/Cargo.toml | 8 +- datafusion/common/Cargo.toml | 7 +- datafusion/common/src/scalar.rs | 22 +-- datafusion/core/Cargo.toml | 22 +-- datafusion/core/fuzz-utils/Cargo.toml | 2 +- .../src/avro_to_arrow/arrow_array_reader.rs | 10 +- datafusion/core/src/avro_to_arrow/schema.rs | 4 +- .../core/src/catalog/information_schema.rs | 2 +- .../core/src/datasource/file_format/json.rs | 9 +- .../src/datasource/file_format/parquet.rs | 27 ++- .../core/src/physical_optimizer/pruning.rs | 8 +- .../file_format/chunked_store.rs | 18 +- .../src/physical_plan/file_format/json.rs | 3 +- .../src/physical_plan/file_format/parquet.rs | 25 +-- .../core/src/physical_plan/hash_join.rs | 4 +- .../core/src/physical_plan/hash_utils.rs | 2 +- .../core/src/physical_plan/repartition.rs | 2 +- .../core/src/physical_plan/sort_merge_join.rs | 4 +- datafusion/core/src/scheduler/plan.rs | 2 +- datafusion/core/src/scheduler/task.rs | 2 +- datafusion/core/src/test_util.rs | 5 +- datafusion/core/tests/parquet_pruning.rs | 4 +- datafusion/core/tests/path_partition.rs | 20 ++- datafusion/core/tests/sql/aggregates.rs | 8 +- datafusion/core/tests/sql/decimal.rs | 168 +++++++++--------- datafusion/core/tests/sql/joins.rs | 32 ++-- datafusion/core/tests/sql/mod.rs | 4 +- datafusion/expr/Cargo.toml | 4 +- datafusion/expr/src/aggregate_function.rs | 48 ++--- datafusion/expr/src/binary_rule.rs | 112 ++++++------ datafusion/expr/src/type_coercion.rs | 2 +- datafusion/expr/src/utils.rs | 2 +- datafusion/jit/Cargo.toml | 6 +- datafusion/optimizer/Cargo.toml | 8 +- .../src/decorrelate_scalar_subquery.rs | 6 +- .../optimizer/src/decorrelate_where_exists.rs | 3 +- .../optimizer/src/decorrelate_where_in.rs | 5 +- .../optimizer/src/simplify_expressions.rs | 12 +- datafusion/physical-expr/Cargo.toml | 8 +- .../physical-expr/src/aggregate/average.rs | 14 +- .../physical-expr/src/aggregate/build_in.rs | 37 ++-- .../physical-expr/src/aggregate/min_max.rs | 26 +-- datafusion/physical-expr/src/aggregate/sum.rs | 20 +-- .../src/aggregate/sum_distinct.rs | 4 +- .../physical-expr/src/expressions/binary.rs | 18 +- .../src/expressions/binary/adapter.rs | 2 +- .../src/expressions/binary/kernels_arrow.rs | 8 +- .../physical-expr/src/expressions/cast.rs | 34 ++-- .../physical-expr/src/expressions/in_list.rs | 56 +++++- .../physical-expr/src/expressions/try_cast.rs | 34 ++-- datafusion/physical-expr/src/type_coercion.rs | 2 +- datafusion/proto/Cargo.toml | 12 +- datafusion/proto/src/from_proto.rs | 4 +- datafusion/proto/src/lib.rs | 10 +- datafusion/proto/src/to_proto.rs | 4 +- datafusion/row/Cargo.toml | 6 +- datafusion/row/src/layout.rs | 2 +- datafusion/row/src/lib.rs | 2 +- datafusion/sql/Cargo.toml | 6 +- datafusion/sql/examples/sql.rs | 4 +- datafusion/sql/src/planner.rs | 18 +- datafusion/sql/src/utils.rs | 8 +- 65 files changed, 607 insertions(+), 446 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3a15759e1eaad..6d08b98a5545d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -82,6 +82,16 @@ jobs: - uses: actions/checkout@v2 with: submodules: true + - name: Install protobuf compiler + shell: bash + run: | + mkdir -p $HOME/d/protoc + cd $HOME/d/protoc + export PROTO_ZIP="protoc-21.4-linux-x86_64.zip" + curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP + unzip $PROTO_ZIP + export PATH=$PATH:$HOME/d/protoc/bin + protoc --version - name: Cache Cargo uses: actions/cache@v3 with: @@ -94,6 +104,7 @@ jobs: rust-version: ${{ matrix.rust }} - name: Run tests run: | + export PATH=$PATH:$HOME/d/protoc/bin cargo test --features avro,jit,scheduler,json # test datafusion-sql examples cargo run --example sql @@ -159,17 +170,65 @@ jobs: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres - windows-and-macos: - name: Test on ${{ matrix.os }} Rust ${{ matrix.rust }} + windows: + name: Test on Windows Rust ${{ matrix.rust }} runs-on: ${{ matrix.os }} strategy: matrix: - os: [windows-latest, macos-latest] + os: [windows-latest] rust: [stable] steps: - uses: actions/checkout@v2 with: submodules: true + - name: Install protobuf compiler + shell: bash + run: | + mkdir -p $HOME/d/protoc + cd $HOME/d/protoc + export PROTO_ZIP="protoc-21.4-win64.zip" + curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP + unzip $PROTO_ZIP + export PATH=$PATH:$HOME/d/protoc/bin + protoc.exe --version + # TODO: this won't cache anything, which is expensive. Setup this action + # with a OS-dependent path. + - name: Setup Rust toolchain + run: | + rustup toolchain install ${{ matrix.rust }} + rustup default ${{ matrix.rust }} + rustup component add rustfmt + - name: Run tests + shell: bash + run: | + export PATH=$PATH:$HOME/d/protoc/bin + cargo test + env: + # do not produce debug symbols to keep memory usage down + RUSTFLAGS: "-C debuginfo=0" + + macos: + name: Test on MacOS Rust ${{ matrix.rust }} + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [macos-latest] + rust: [stable] + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - name: Install protobuf compiler + shell: bash + run: | + mkdir -p $HOME/d/protoc + cd $HOME/d/protoc + export PROTO_ZIP="protoc-21.4-osx-x86_64.zip" + curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP + unzip $PROTO_ZIP + echo "$HOME/d/protoc/bin" >> $GITHUB_PATH + export PATH=$PATH:$HOME/d/protoc/bin + protoc --version # TODO: this won't cache anything, which is expensive. Setup this action # with a OS-dependent path. - name: Setup Rust toolchain @@ -250,6 +309,16 @@ jobs: - uses: actions/checkout@v2 with: submodules: true + - name: Install protobuf compiler + shell: bash + run: | + mkdir -p $HOME/d/protoc + cd $HOME/d/protoc + export PROTO_ZIP="protoc-21.4-linux-x86_64.zip" + curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP + unzip $PROTO_ZIP + export PATH=$PATH:$HOME/d/protoc/bin + protoc --version - name: Setup Rust toolchain run: | rustup toolchain install ${{ matrix.rust }} @@ -263,6 +332,7 @@ jobs: key: cargo-coverage-cache3- - name: Run coverage run: | + export PATH=$PATH:$HOME/d/protoc/bin rustup toolchain install stable rustup default stable cargo install --version 0.20.1 cargo-tarpaulin diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 3dd9bcbde04c4..854e1b46b18c1 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -32,7 +32,7 @@ simd = ["datafusion/simd"] snmalloc = ["snmalloc-rs"] [dependencies] -datafusion = { path = "../datafusion/core" } +datafusion = { path = "../datafusion/core", features = [], optional = false } env_logger = "0.9" futures = "0.3" mimalloc = { version = "0.1", optional = true, default-features = false } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 6ac3a30c8249d..097a7519a1a79 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -29,9 +29,9 @@ rust-version = "1.59" readme = "README.md" [dependencies] -arrow = { version = "19.0.0" } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = [], optional = false } clap = { version = "3", features = ["derive", "cargo"] } -datafusion = { path = "../datafusion/core", version = "10.0.0" } +datafusion = { path = "../datafusion/core", features = [], optional = false } dirs = "4.0.0" env_logger = "0.9" mimalloc = { version = "0.1", default-features = false } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index ed23512f3e2d3..cccd0130a7075 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -34,13 +34,13 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] -arrow-flight = { version = "19.0.0" } +arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = [], optional = false } async-trait = "0.1.41" -datafusion = { path = "../datafusion/core" } +datafusion = { path = "../datafusion/core", features = [], optional = false } futures = "0.3" num_cpus = "1.13.0" -prost = "0.10" +prost = "0.11.0" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.82" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } -tonic = "0.7" +tonic = "0.8" diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 33d6af087d4cb..036267c61833b 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -39,11 +39,12 @@ pyarrow = ["pyo3"] [dependencies] apache-avro = { version = "0.14", features = ["snappy"], optional = true } -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = ["prettyprint"], optional = false } +avro-rs = { version = "0.13", features = ["snappy"], optional = true } cranelift-module = { version = "0.86.1", optional = true } -object_store = { version = "0.3", optional = true } +object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = [], optional = true } ordered-float = "3.0" -parquet = { version = "19.0.0", features = ["arrow"], optional = true } +parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = ["arrow"], optional = true } pyo3 = { version = "0.16", optional = true } serde_json = "1.0" sqlparser = "0.19" diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 5b45b4b06967f..3069a54f491f7 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -27,7 +27,7 @@ use arrow::{ IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, - DECIMAL_MAX_PRECISION, + DECIMAL128_MAX_PRECISION, }, util::decimal::{BasicDecimal, Decimal128}, }; @@ -611,7 +611,7 @@ impl ScalarValue { scale: usize, ) -> Result { // make sure the precision and scale is valid - if precision <= DECIMAL_MAX_PRECISION && scale <= precision { + if precision <= DECIMAL128_MAX_PRECISION && scale <= precision { return Ok(ScalarValue::Decimal128(Some(value), precision, scale)); } Err(DataFusionError::Internal(format!( @@ -654,7 +654,7 @@ impl ScalarValue { ScalarValue::Int32(_) => DataType::Int32, ScalarValue::Int64(_) => DataType::Int64, ScalarValue::Decimal128(_, precision, scale) => { - DataType::Decimal(*precision, *scale) + DataType::Decimal128(*precision, *scale) } ScalarValue::TimestampSecond(_, tz_opt) => { DataType::Timestamp(TimeUnit::Second, tz_opt.clone()) @@ -935,7 +935,7 @@ impl ScalarValue { } let array: ArrayRef = match &data_type { - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { let decimal_array = ScalarValue::iter_to_decimal_array(scalars, precision, scale)?; Arc::new(decimal_array) @@ -1448,7 +1448,7 @@ impl ScalarValue { Ok(match array.data_type() { DataType::Null => ScalarValue::Null, - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { ScalarValue::get_decimal_value_from_array(array, index, precision, scale) } DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean), @@ -1899,7 +1899,7 @@ impl TryFrom<&DataType> for ScalarValue { DataType::UInt16 => ScalarValue::UInt16(None), DataType::UInt32 => ScalarValue::UInt32(None), DataType::UInt64 => ScalarValue::UInt64(None), - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { ScalarValue::Decimal128(None, *precision, *scale) } DataType::Utf8 => ScalarValue::Utf8(None), @@ -2145,7 +2145,7 @@ mod tests { #[test] fn scalar_decimal_test() { let decimal_value = ScalarValue::Decimal128(Some(123), 10, 1); - assert_eq!(DataType::Decimal(10, 1), decimal_value.get_datatype()); + assert_eq!(DataType::Decimal128(10, 1), decimal_value.get_datatype()); let try_into_value: i128 = decimal_value.clone().try_into().unwrap(); assert_eq!(123_i128, try_into_value); assert!(!decimal_value.is_null()); @@ -2163,14 +2163,14 @@ mod tests { let array = decimal_value.to_array(); let array = array.as_any().downcast_ref::().unwrap(); assert_eq!(1, array.len()); - assert_eq!(DataType::Decimal(10, 1), array.data_type().clone()); + assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone()); assert_eq!(123i128, array.value(0).as_i128()); // decimal scalar to array with size let array = decimal_value.to_array_of_size(10); let array_decimal = array.as_any().downcast_ref::().unwrap(); assert_eq!(10, array.len()); - assert_eq!(DataType::Decimal(10, 1), array.data_type().clone()); + assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone()); assert_eq!(123i128, array_decimal.value(0).as_i128()); assert_eq!(123i128, array_decimal.value(9).as_i128()); // test eq array @@ -2208,7 +2208,7 @@ mod tests { // convert the vec to decimal array and check the result let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap(); assert_eq!(3, array.len()); - assert_eq!(DataType::Decimal(10, 2), array.data_type().clone()); + assert_eq!(DataType::Decimal128(10, 2), array.data_type().clone()); let decimal_vec = vec![ ScalarValue::Decimal128(Some(1), 10, 2), @@ -2218,7 +2218,7 @@ mod tests { ]; let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap(); assert_eq!(4, array.len()); - assert_eq!(DataType::Decimal(10, 2), array.data_type().clone()); + assert_eq!(DataType::Decimal128(10, 2), array.data_type().clone()); assert!(ScalarValue::try_new_decimal128(1, 10, 2) .unwrap() diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 073b951ffb82c..6cc13f161a9cf 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -56,17 +56,17 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion [dependencies] ahash = { version = "0.7", default-features = false } apache-avro = { version = "0.14", optional = true } -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = ["prettyprint"], optional = false } async-trait = "0.1.41" bytes = "1.1" chrono = { version = "0.4", default-features = false } -datafusion-common = { path = "../common", version = "10.0.0", features = ["parquet", "object_store"] } -datafusion-expr = { path = "../expr", version = "10.0.0" } -datafusion-jit = { path = "../jit", version = "10.0.0", optional = true } -datafusion-optimizer = { path = "../optimizer", version = "10.0.0" } -datafusion-physical-expr = { path = "../physical-expr", version = "10.0.0" } -datafusion-row = { path = "../row", version = "10.0.0" } -datafusion-sql = { path = "../sql", version = "10.0.0" } +datafusion-common = { path = "../common", features = ["parquet", "object_store"], optional = false } +datafusion-expr = { path = "../expr", features = [], optional = false } +datafusion-jit = { path = "../jit", features = [], optional = true } +datafusion-optimizer = { path = "../optimizer", features = [], optional = false } +datafusion-physical-expr = { path = "../physical-expr", features = [], optional = false } +datafusion-row = { path = "../row", features = [], optional = false } +datafusion-sql = { path = "../sql", features = [], optional = false } futures = "0.3" glob = "0.3.0" hashbrown = { version = "0.12", features = ["raw"] } @@ -75,10 +75,10 @@ lazy_static = { version = "^1.4.0" } log = "^0.4" num-traits = { version = "0.2", optional = true } num_cpus = "1.13.0" -object_store = "0.3.0" +object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = [], optional = false } ordered-float = "3.0" parking_lot = "0.12" -parquet = { version = "19.0.0", features = ["arrow", "async"] } +parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = ["arrow", "async"], optional = false } paste = "^1.0" pin-project-lite = "^0.2.7" pyo3 = { version = "0.16", optional = true } @@ -98,7 +98,7 @@ csv = "1.1.6" ctor = "0.1.22" doc-comment = "0.3" env_logger = "0.9" -fuzz-utils = { path = "fuzz-utils" } +fuzz-utils = { path = "fuzz-utils", features = [], optional = false } [[bench]] harness = false diff --git a/datafusion/core/fuzz-utils/Cargo.toml b/datafusion/core/fuzz-utils/Cargo.toml index 0d66a69999997..859c1c33fc934 100644 --- a/datafusion/core/fuzz-utils/Cargo.toml +++ b/datafusion/core/fuzz-utils/Cargo.toml @@ -23,6 +23,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = ["prettyprint"], optional = false } env_logger = "0.9.0" rand = "0.8" diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs index 864a675ed40f1..2da8066b1c877 100644 --- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs @@ -101,12 +101,10 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { "Failed to parse avro value: {:?}", e ))), - other => { - return Err(ArrowError::ParseError(format!( - "Row needs to be of type object, got: {:?}", - other - ))) - } + other => Err(ArrowError::ParseError(format!( + "Row needs to be of type object, got: {:?}", + other + ))), }) .collect::>>>()?; if rows.is_empty() { diff --git a/datafusion/core/src/avro_to_arrow/schema.rs b/datafusion/core/src/avro_to_arrow/schema.rs index 5e601504d1f6c..d1964189610ed 100644 --- a/datafusion/core/src/avro_to_arrow/schema.rs +++ b/datafusion/core/src/avro_to_arrow/schema.rs @@ -141,7 +141,7 @@ fn schema_to_field_with_props( AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32), AvroSchema::Decimal { precision, scale, .. - } => DataType::Decimal(*precision, *scale), + } => DataType::Decimal128(*precision, *scale), AvroSchema::Uuid => DataType::FixedSizeBinary(16), AvroSchema::Date => DataType::Date32, AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond), @@ -217,7 +217,7 @@ fn default_field_name(dt: &DataType) -> &str { DataType::Union(_, _, _) => "union", DataType::Dictionary(_, _) => "map", DataType::Map(_, _) => unimplemented!("Map support not implemented"), - DataType::Decimal(_, _) => "decimal", + DataType::Decimal128(_, _) => "decimal", DataType::Decimal256(_, _) => "decimal", } } diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs index d4944c2d54440..49b1f9dc7952f 100644 --- a/datafusion/core/src/catalog/information_schema.rs +++ b/datafusion/core/src/catalog/information_schema.rs @@ -508,7 +508,7 @@ impl InformationSchemaColumnsBuilder { Float32 => (Some(24), Some(2), None), // Numbers from postgres `double` type Float64 => (Some(24), Some(2), None), - Decimal(precision, scale) => { + Decimal128(precision, scale) => { (Some(*precision as u64), Some(10), Some(*scale as u64)) } _ => (None, None, None), diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 9a889ab4c0f58..122e80b818937 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -133,6 +133,7 @@ mod tests { use arrow::array::Int64Array; use futures::StreamExt; use object_store::local::LocalFileSystem; + use object_store::path::Path; use super::*; use crate::physical_plan::collect; @@ -231,17 +232,21 @@ mod tests { projection: Option>, limit: Option, ) -> Result> { + let store_root = env!("CARGO_MANIFEST_DIR"); let filename = "tests/jsons/2.json"; let format = JsonFormat::default(); - scan_format(&format, ".", filename, projection, limit).await + scan_format(&format, store_root, filename, projection, limit).await } #[tokio::test] async fn infer_schema_with_limit() { let store = Arc::new(LocalFileSystem::new()) as _; - let filename = "tests/jsons/schema_infer_limit.json"; let format = JsonFormat::default().with_schema_infer_max_rec(Some(3)); + let store_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR")); + let filename = store_root.join("tests/jsons/schema_infer_limit.json"); + let filename = filename.to_str().expect("Unable to get path!"); + let file_schema = format .infer_schema(&store, &[local_unpartitioned_file(filename)]) .await diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index dfd08352dffb8..b5bae663728f0 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -575,7 +575,8 @@ mod tests { use futures::StreamExt; use object_store::local::LocalFileSystem; use object_store::path::Path; - use object_store::{GetResult, ListResult}; + use object_store::{GetResult, ListResult, MultipartId}; + use tokio::io::AsyncWrite; #[tokio::test] async fn read_merged_batches() -> Result<()> { @@ -649,6 +650,22 @@ mod tests { Err(object_store::Error::NotImplemented) } + async fn put_multipart( + &self, + _location: &Path, + ) -> object_store::Result<(MultipartId, Box)> + { + Err(object_store::Error::NotImplemented) + } + + async fn abort_multipart( + &self, + _location: &Path, + _multipart_id: &MultipartId, + ) -> object_store::Result<()> { + Err(object_store::Error::NotImplemented) + } + async fn get(&self, _location: &Path) -> object_store::Result { Err(object_store::Error::NotImplemented) } @@ -1073,7 +1090,7 @@ mod tests { assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); - assert_eq!(&DataType::Decimal(4, 2), column.data_type()); + assert_eq!(&DataType::Decimal128(4, 2), column.data_type()); // parquet use the int64 as the physical type to store decimal let exec = get_exec("int64_decimal.parquet", None, None).await?; @@ -1081,7 +1098,7 @@ mod tests { assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); - assert_eq!(&DataType::Decimal(10, 2), column.data_type()); + assert_eq!(&DataType::Decimal128(10, 2), column.data_type()); // parquet use the fixed length binary as the physical type to store decimal let exec = get_exec("fixed_length_decimal.parquet", None, None).await?; @@ -1089,14 +1106,14 @@ mod tests { assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); - assert_eq!(&DataType::Decimal(25, 2), column.data_type()); + assert_eq!(&DataType::Decimal128(25, 2), column.data_type()); let exec = get_exec("fixed_length_decimal_legacy.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); - assert_eq!(&DataType::Decimal(13, 2), column.data_type()); + assert_eq!(&DataType::Decimal128(13, 2), column.data_type()); // parquet use the fixed length binary as the physical type to store decimal // TODO: arrow-rs don't support convert the physical type of binary to decimal diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index b1242b15032f9..aa4d88979d78b 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -800,7 +800,7 @@ mod tests { use crate::from_slice::FromSlice; use crate::logical_plan::{col, lit}; use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType}; - use arrow::array::Decimal128Array; + use arrow::array::{BasicDecimalArray, Decimal128Array}; use arrow::{ array::{BinaryArray, Int32Array, Int64Array, StringArray}, datatypes::{DataType, TimeUnit}, @@ -1457,7 +1457,7 @@ mod tests { // decimal(9,2) let schema = Arc::new(Schema::new(vec![Field::new( "s1", - DataType::Decimal(9, 2), + DataType::Decimal128(9, 2), true, )])); // s1 > 5 @@ -1479,7 +1479,7 @@ mod tests { // decimal(18,2) let schema = Arc::new(Schema::new(vec![Field::new( "s1", - DataType::Decimal(18, 2), + DataType::Decimal128(18, 2), true, )])); // s1 > 5 @@ -1501,7 +1501,7 @@ mod tests { // decimal(23,2) let schema = Arc::new(Schema::new(vec![Field::new( "s1", - DataType::Decimal(23, 2), + DataType::Decimal128(23, 2), true, )])); // s1 > 5 diff --git a/datafusion/core/src/physical_plan/file_format/chunked_store.rs b/datafusion/core/src/physical_plan/file_format/chunked_store.rs index 216926b067134..1a48804a2c553 100644 --- a/datafusion/core/src/physical_plan/file_format/chunked_store.rs +++ b/datafusion/core/src/physical_plan/file_format/chunked_store.rs @@ -20,11 +20,12 @@ use bytes::Bytes; use futures::stream::BoxStream; use futures::StreamExt; use object_store::path::Path; -use object_store::Result; use object_store::{GetResult, ListResult, ObjectMeta, ObjectStore}; +use object_store::{MultipartId, Result}; use std::fmt::{Debug, Display, Formatter}; use std::ops::Range; use std::sync::Arc; +use tokio::io::AsyncWrite; /// Wraps a [`ObjectStore`] and makes its get response return chunks /// @@ -53,6 +54,21 @@ impl ObjectStore for ChunkedStore { self.inner.put(location, bytes).await } + async fn put_multipart( + &self, + location: &Path, + ) -> Result<(MultipartId, Box)> { + self.inner.put_multipart(location).await + } + + async fn abort_multipart( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> Result<()> { + self.inner.abort_multipart(location, multipart_id).await + } + async fn get(&self, location: &Path) -> Result { let bytes = self.inner.get(location).await?.bytes().await?; let mut offset = 0; diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 6cc864312ded0..fa0a0b4adf8bc 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -257,7 +257,8 @@ mod tests { let store_url = ObjectStoreUrl::local_filesystem(); let store = ctx.runtime_env().object_store(&store_url).unwrap(); - let path = format!("{}/1.json", TEST_DATA_BASE); + let store_root = Path::new(env!("CARGO_MANIFEST_DIR")); + let path = store_root.join(TEST_DATA_BASE).join("1.json"); let meta = local_unpartitioned_file(path); let schema = JsonFormat::default() .infer_schema(&store, &[meta.clone()]) diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 184214d6875d9..ce332b201dfda 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -425,7 +425,7 @@ macro_rules! get_statistic { ParquetStatistics::Int32(s) => { match $target_arrow_type { // int32 to decimal with the precision and scale - Some(DataType::Decimal(precision, scale)) => { + Some(DataType::Decimal128(precision, scale)) => { Some(ScalarValue::Decimal128( Some(*s.$func() as i128), precision, @@ -438,7 +438,7 @@ macro_rules! get_statistic { ParquetStatistics::Int64(s) => { match $target_arrow_type { // int64 to decimal with the precision and scale - Some(DataType::Decimal(precision, scale)) => { + Some(DataType::Decimal128(precision, scale)) => { Some(ScalarValue::Decimal128( Some(*s.$func() as i128), precision, @@ -463,7 +463,7 @@ macro_rules! get_statistic { ParquetStatistics::FixedLenByteArray(s) => { match $target_arrow_type { // just support the decimal data type - Some(DataType::Decimal(precision, scale)) => { + Some(DataType::Decimal128(precision, scale)) => { Some(ScalarValue::Decimal128( Some(from_bytes_to_i128(s.$bytes_func())), precision, @@ -535,10 +535,10 @@ fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option { - Some(DataType::Decimal(precision as usize, scale as usize)) + Some(DataType::Decimal128(precision as usize, scale as usize)) } _ => match type_ptr.get_basic_info().converted_type() { - ConvertedType::DECIMAL => Some(DataType::Decimal( + ConvertedType::DECIMAL => Some(DataType::Decimal128( type_ptr.get_precision() as usize, type_ptr.get_scale() as usize, )), @@ -1239,7 +1239,8 @@ mod tests { async fn parquet_exec_with_error() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); - let location = Path::from_filesystem_path(".") + let store_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR")); + let location = Path::from_filesystem_path(store_root) .unwrap() .child("invalid.parquet"); @@ -1475,7 +1476,8 @@ mod tests { // INT32: c1 > 5, the c1 is decimal(9,2) let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))); - let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(9, 2), false)]); + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]); let schema_descr = get_test_schema_descr(vec![( "c1", PhysicalType::INT32, @@ -1516,7 +1518,8 @@ mod tests { // INT32: c1 > 5, but parquet decimal type has different precision or scale to arrow decimal // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0) let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 5, 2))); - let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(5, 2), false)]); + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(5, 2), false)]); // The decimal of parquet is decimal(9,0) let schema_descr = get_test_schema_descr(vec![( "c1", @@ -1568,7 +1571,8 @@ mod tests { // INT64: c1 < 5, the c1 is decimal(18,2) let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2))); - let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(18, 2), false)]); + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]); let schema_descr = get_test_schema_descr(vec![( "c1", PhysicalType::INT64, @@ -1607,7 +1611,8 @@ mod tests { // FIXED_LENGTH_BYTE_ARRAY: c1 = 100, the c1 is decimal(28,2) // the type of parquet is decimal(18,2) let expr = col("c1").eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); - let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(18, 3), false)]); + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 3), false)]); let schema_descr = get_test_schema_descr(vec![( "c1", PhysicalType::FIXED_LEN_BYTE_ARRAY, diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs index 6540cc1301cf0..6111e3ae1dd67 100644 --- a/datafusion/core/src/physical_plan/hash_join.rs +++ b/datafusion/core/src/physical_plan/hash_join.rs @@ -1098,8 +1098,8 @@ fn equal_rows( DataType::LargeUtf8 => { equal_rows_elem!(LargeStringArray, l, r, left, right, null_equals_null) } - DataType::Decimal(_, lscale) => match r.data_type() { - DataType::Decimal(_, rscale) => { + DataType::Decimal128(_, lscale) => match r.data_type() { + DataType::Decimal128(_, rscale) => { if lscale == rscale { equal_rows_elem!( Decimal128Array, diff --git a/datafusion/core/src/physical_plan/hash_utils.rs b/datafusion/core/src/physical_plan/hash_utils.rs index b9c34ec9b401c..a89247d7e2001 100644 --- a/datafusion/core/src/physical_plan/hash_utils.rs +++ b/datafusion/core/src/physical_plan/hash_utils.rs @@ -336,7 +336,7 @@ pub fn create_hashes<'a>( DataType::Null => { hash_null(random_state, hashes_buffer, multi_col); } - DataType::Decimal(_, _) => { + DataType::Decimal128(_, _) => { hash_decimal128(col, random_state, hashes_buffer, multi_col); } DataType::UInt8 => { diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs index 552e1820a7390..f9024797b3884 100644 --- a/datafusion/core/src/physical_plan/repartition.rs +++ b/datafusion/core/src/physical_plan/repartition.rs @@ -933,7 +933,7 @@ mod tests { let items_set: HashSet<&str> = items_vec.iter().copied().collect(); assert_eq!(items_vec.len(), items_set.len()); let source_str_set: HashSet<&str> = - (&["foo", "bar", "frob", "baz", "goo", "gar", "grob", "gaz"]) + ["foo", "bar", "frob", "baz", "goo", "gar", "grob", "gaz"] .iter() .copied() .collect(); diff --git a/datafusion/core/src/physical_plan/sort_merge_join.rs b/datafusion/core/src/physical_plan/sort_merge_join.rs index 4eb1616c9a6d7..da5f9c649f0b1 100644 --- a/datafusion/core/src/physical_plan/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/sort_merge_join.rs @@ -1098,7 +1098,7 @@ fn compare_join_arrays( DataType::Float64 => compare_value!(Float64Array), DataType::Utf8 => compare_value!(StringArray), DataType::LargeUtf8 => compare_value!(LargeStringArray), - DataType::Decimal(..) => compare_value!(Decimal128Array), + DataType::Decimal128(..) => compare_value!(Decimal128Array), DataType::Timestamp(time_unit, None) => match time_unit { TimeUnit::Second => compare_value!(TimestampSecondArray), TimeUnit::Millisecond => compare_value!(TimestampMillisecondArray), @@ -1164,7 +1164,7 @@ fn is_join_arrays_equal( DataType::Float64 => compare_value!(Float64Array), DataType::Utf8 => compare_value!(StringArray), DataType::LargeUtf8 => compare_value!(LargeStringArray), - DataType::Decimal(..) => compare_value!(Decimal128Array), + DataType::Decimal128(..) => compare_value!(Decimal128Array), DataType::Timestamp(time_unit, None) => match time_unit { TimeUnit::Second => compare_value!(TimestampSecondArray), TimeUnit::Millisecond => compare_value!(TimestampMillisecondArray), diff --git a/datafusion/core/src/scheduler/plan.rs b/datafusion/core/src/scheduler/plan.rs index e7d5e1d331769..b5a786a322568 100644 --- a/datafusion/core/src/scheduler/plan.rs +++ b/datafusion/core/src/scheduler/plan.rs @@ -29,7 +29,7 @@ use crate::scheduler::pipeline::{ }; /// Identifies the [`Pipeline`] within the [`PipelinePlan`] to route output to -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct OutputLink { /// The index of the [`Pipeline`] in [`PipelinePlan`] to route output to pub pipeline: usize, diff --git a/datafusion/core/src/scheduler/task.rs b/datafusion/core/src/scheduler/task.rs index e90b2f07a012d..b723a37ce7e86 100644 --- a/datafusion/core/src/scheduler/task.rs +++ b/datafusion/core/src/scheduler/task.rs @@ -137,7 +137,7 @@ impl Task { let partition = self.waker.partition; let waker = futures::task::waker_ref(&self.waker); - let mut cx = Context::from_waker(&*waker); + let mut cx = Context::from_waker(&waker); let pipelines = &self.context.pipelines; let routable = &pipelines[node]; diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index ee53a424c82ed..dd48815d752f8 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -223,6 +223,7 @@ fn get_data_dir(udf_env: &str, submodule_data: &str) -> Result) -> RecordBatch { fn make_decimal_batch(v: Vec, precision: usize, scale: usize) -> RecordBatch { let schema = Arc::new(Schema::new(vec![Field::new( "decimal_col", - DataType::Decimal(precision, scale), + DataType::Decimal128(precision, scale), true, )])); let array = Arc::new( diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 821d174f2d992..fca9b9a43b1c4 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -40,7 +40,10 @@ use datafusion::{ use datafusion_common::ScalarValue; use futures::stream::BoxStream; use futures::{stream, StreamExt}; -use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore}; +use object_store::{ + path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, +}; +use tokio::io::AsyncWrite; #[tokio::test] async fn parquet_distinct_partition_col() -> Result<()> { @@ -516,6 +519,21 @@ impl ObjectStore for MirroringObjectStore { unimplemented!() } + async fn put_multipart( + &self, + _location: &Path, + ) -> object_store::Result<(MultipartId, Box)> { + unimplemented!() + } + + async fn abort_multipart( + &self, + _location: &Path, + _multipart_id: &MultipartId, + ) -> object_store::Result<()> { + unimplemented!() + } + async fn get(&self, location: &Path) -> object_store::Result { self.files.iter().find(|x| *x == location.as_ref()).unwrap(); let path = std::path::PathBuf::from(&self.mirrored_file); diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index eb0e07f84291e..76918bcb0f30d 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -1472,7 +1472,7 @@ async fn aggregate_decimal_min() -> Result<()> { "+-----------------+", ]; assert_eq!( - &DataType::Decimal(10, 3), + &DataType::Decimal128(10, 3), result[0].schema().field(0).data_type() ); assert_batches_sorted_eq!(expected, &result); @@ -1496,7 +1496,7 @@ async fn aggregate_decimal_max() -> Result<()> { "+-----------------+", ]; assert_eq!( - &DataType::Decimal(10, 3), + &DataType::Decimal128(10, 3), result[0].schema().field(0).data_type() ); assert_batches_sorted_eq!(expected, &result); @@ -1519,7 +1519,7 @@ async fn aggregate_decimal_sum() -> Result<()> { "+-----------------+", ]; assert_eq!( - &DataType::Decimal(20, 3), + &DataType::Decimal128(20, 3), result[0].schema().field(0).data_type() ); assert_batches_sorted_eq!(expected, &result); @@ -1542,7 +1542,7 @@ async fn aggregate_decimal_avg() -> Result<()> { "+-----------------+", ]; assert_eq!( - &DataType::Decimal(14, 7), + &DataType::Decimal128(14, 7), result[0].schema().field(0).data_type() ); assert_batches_sorted_eq!(expected, &result); diff --git a/datafusion/core/tests/sql/decimal.rs b/datafusion/core/tests/sql/decimal.rs index c8c242155257e..9b16ca53462bc 100644 --- a/datafusion/core/tests/sql/decimal.rs +++ b/datafusion/core/tests/sql/decimal.rs @@ -23,45 +23,45 @@ async fn decimal_cast() -> Result<()> { let sql = "select cast(1.23 as decimal(10,4))"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 4), + &DataType::Decimal128(10, 4), actual[0].schema().field(0).data_type() ); let expected = vec![ - "+---------------------------------------+", - "| CAST(Float64(1.23) AS Decimal(10, 4)) |", - "+---------------------------------------+", - "| 1.2300 |", - "+---------------------------------------+", + "+------------------------------------------+", + "| CAST(Float64(1.23) AS Decimal128(10, 4)) |", + "+------------------------------------------+", + "| 1.2300 |", + "+------------------------------------------+", ]; assert_batches_eq!(expected, &actual); let sql = "select cast(cast(1.23 as decimal(10,3)) as decimal(10,4))"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 4), + &DataType::Decimal128(10, 4), actual[0].schema().field(0).data_type() ); let expected = vec![ - "+---------------------------------------------------------------+", - "| CAST(CAST(Float64(1.23) AS Decimal(10, 3)) AS Decimal(10, 4)) |", - "+---------------------------------------------------------------+", - "| 1.2300 |", - "+---------------------------------------------------------------+", + "+---------------------------------------------------------------------+", + "| CAST(CAST(Float64(1.23) AS Decimal128(10, 3)) AS Decimal128(10, 4)) |", + "+---------------------------------------------------------------------+", + "| 1.2300 |", + "+---------------------------------------------------------------------+", ]; assert_batches_eq!(expected, &actual); let sql = "select cast(1.2345 as decimal(24,2))"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(24, 2), + &DataType::Decimal128(24, 2), actual[0].schema().field(0).data_type() ); let expected = vec![ - "+-----------------------------------------+", - "| CAST(Float64(1.2345) AS Decimal(24, 2)) |", - "+-----------------------------------------+", - "| 1.23 |", - "+-----------------------------------------+", + "+--------------------------------------------+", + "| CAST(Float64(1.2345) AS Decimal128(24, 2)) |", + "+--------------------------------------------+", + "| 1.23 |", + "+--------------------------------------------+", ]; assert_batches_eq!(expected, &actual); @@ -75,7 +75,7 @@ async fn decimal_by_sql() -> Result<()> { let sql = "SELECT c1 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -110,7 +110,7 @@ async fn decimal_by_filter() -> Result<()> { let sql = "select c1 from decimal_simple where c1 > 0.000030"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -133,11 +133,11 @@ async fn decimal_by_filter() -> Result<()> { let sql = "select * from decimal_simple where c1 > c5"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); assert_eq!( - &DataType::Decimal(12, 7), + &DataType::Decimal128(12, 7), actual[0].schema().field(4).data_type() ); let expected = vec![ @@ -161,7 +161,7 @@ async fn decimal_agg_function() -> Result<()> { let sql = "select min(c1) from decimal_simple where c4=false"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -176,7 +176,7 @@ async fn decimal_agg_function() -> Result<()> { let sql = "select max(c1) from decimal_simple where c4=false"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -193,7 +193,7 @@ async fn decimal_agg_function() -> Result<()> { let actual = execute_to_batches(&ctx, sql).await; // inferred precision is 10+10 assert_eq!( - &DataType::Decimal(20, 6), + &DataType::Decimal128(20, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -211,7 +211,7 @@ async fn decimal_agg_function() -> Result<()> { let sql = "select avg(c1) from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(14, 10), + &DataType::Decimal128(14, 10), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -234,7 +234,7 @@ async fn decimal_logic_op() -> Result<()> { let sql = "select * from decimal_simple where c1=CAST(0.00002 as Decimal(10,8))"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -274,7 +274,7 @@ async fn decimal_logic_op() -> Result<()> { let sql = "select * from decimal_simple where 0.00002 > c1"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -290,7 +290,7 @@ async fn decimal_logic_op() -> Result<()> { let sql = "select * from decimal_simple where c1 <= 0.00002"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -308,7 +308,7 @@ async fn decimal_logic_op() -> Result<()> { let sql = "select * from decimal_simple where c1 > 0.00002"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -335,7 +335,7 @@ async fn decimal_logic_op() -> Result<()> { let sql = "select * from decimal_simple where c1 >= 0.00002"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -372,7 +372,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let actual = execute_to_batches(&ctx, sql).await; // array decimal(10,6) + scalar decimal(20,0) => decimal(21,6) assert_eq!( - &DataType::Decimal(27, 6), + &DataType::Decimal128(27, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -401,7 +401,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1+c5 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(13, 7), + &DataType::Decimal128(13, 7), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -430,7 +430,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1-1 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(27, 6), + &DataType::Decimal128(27, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -459,7 +459,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1-c5 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(13, 7), + &DataType::Decimal128(13, 7), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -488,7 +488,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1*20 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(31, 6), + &DataType::Decimal128(31, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -517,7 +517,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1*c5 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(23, 13), + &DataType::Decimal128(23, 13), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -546,36 +546,36 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1/cast(0.00001 as decimal(5,5)) from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(21, 12), + &DataType::Decimal128(21, 12), actual[0].schema().field(0).data_type() ); let expected = vec![ - "+-------------------------------------------------------------+", - "| decimal_simple.c1 / CAST(Float64(0.00001) AS Decimal(5, 5)) |", - "+-------------------------------------------------------------+", - "| 1.000000000000 |", - "| 2.000000000000 |", - "| 2.000000000000 |", - "| 3.000000000000 |", - "| 3.000000000000 |", - "| 3.000000000000 |", - "| 4.000000000000 |", - "| 4.000000000000 |", - "| 4.000000000000 |", - "| 4.000000000000 |", - "| 5.000000000000 |", - "| 5.000000000000 |", - "| 5.000000000000 |", - "| 5.000000000000 |", - "| 5.000000000000 |", - "+-------------------------------------------------------------+", + "+----------------------------------------------------------------+", + "| decimal_simple.c1 / CAST(Float64(0.00001) AS Decimal128(5, 5)) |", + "+----------------------------------------------------------------+", + "| 1.000000000000 |", + "| 2.000000000000 |", + "| 2.000000000000 |", + "| 3.000000000000 |", + "| 3.000000000000 |", + "| 3.000000000000 |", + "| 4.000000000000 |", + "| 4.000000000000 |", + "| 4.000000000000 |", + "| 4.000000000000 |", + "| 5.000000000000 |", + "| 5.000000000000 |", + "| 5.000000000000 |", + "| 5.000000000000 |", + "| 5.000000000000 |", + "+----------------------------------------------------------------+", ]; assert_batches_eq!(expected, &actual); let sql = "select c1/c5 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(30, 19), + &DataType::Decimal128(30, 19), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -605,36 +605,36 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c5%cast(0.00001 as decimal(5,5)) from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(7, 7), + &DataType::Decimal128(7, 7), actual[0].schema().field(0).data_type() ); let expected = vec![ - "+-------------------------------------------------------------+", - "| decimal_simple.c5 % CAST(Float64(0.00001) AS Decimal(5, 5)) |", - "+-------------------------------------------------------------+", - "| 0.0000040 |", - "| 0.0000050 |", - "| 0.0000090 |", - "| 0.0000020 |", - "| 0.0000050 |", - "| 0.0000010 |", - "| 0.0000040 |", - "| 0.0000000 |", - "| 0.0000000 |", - "| 0.0000040 |", - "| 0.0000020 |", - "| 0.0000080 |", - "| 0.0000030 |", - "| 0.0000080 |", - "| 0.0000000 |", - "+-------------------------------------------------------------+", + "+----------------------------------------------------------------+", + "| decimal_simple.c5 % CAST(Float64(0.00001) AS Decimal128(5, 5)) |", + "+----------------------------------------------------------------+", + "| 0.0000040 |", + "| 0.0000050 |", + "| 0.0000090 |", + "| 0.0000020 |", + "| 0.0000050 |", + "| 0.0000010 |", + "| 0.0000040 |", + "| 0.0000000 |", + "| 0.0000000 |", + "| 0.0000040 |", + "| 0.0000020 |", + "| 0.0000080 |", + "| 0.0000030 |", + "| 0.0000080 |", + "| 0.0000000 |", + "+----------------------------------------------------------------+", ]; assert_batches_eq!(expected, &actual); let sql = "select c1%c5 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(11, 7), + &DataType::Decimal128(11, 7), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -670,7 +670,7 @@ async fn decimal_sort() -> Result<()> { let sql = "select * from decimal_simple where c1 >= 0.00004 order by c1"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -693,7 +693,7 @@ async fn decimal_sort() -> Result<()> { let sql = "select * from decimal_simple where c1 >= 0.00004 order by c1 desc"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -716,7 +716,7 @@ async fn decimal_sort() -> Result<()> { let sql = "select * from decimal_simple where c1 < 0.00003 order by c1 desc,c4"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -740,7 +740,7 @@ async fn decimal_group_function() -> Result<()> { let sql = "select count(*),c1 from decimal_simple group by c1 order by c1"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(1).data_type() ); let expected = vec![ @@ -759,7 +759,7 @@ async fn decimal_group_function() -> Result<()> { let sql = "select count(*),c1,c4 from decimal_simple group by c1,c4 order by c1,c4"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(1).data_type() ); let expected = vec![ diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index d1632277a30bf..6a3ea67ab427b 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -1230,10 +1230,10 @@ async fn hash_join_with_date32() -> Result<()> { let plan = state.optimize(&plan)?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", - " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " Inner Join: #t1.c1 = #t2.c1 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Inner Join: #t1.c1 = #t2.c1 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -1272,10 +1272,10 @@ async fn hash_join_with_date64() -> Result<()> { let plan = state.optimize(&plan)?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", - " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " Left Join: #t1.c2 = #t2.c2 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Left Join: #t1.c2 = #t2.c2 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -1316,10 +1316,10 @@ async fn hash_join_with_decimal() -> Result<()> { let plan = state.optimize(&plan)?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", - " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " Right Join: #t1.c3 = #t2.c3 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Right Join: #t1.c3 = #t2.c3 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -1360,10 +1360,10 @@ async fn hash_join_with_dictionary() -> Result<()> { let plan = state.optimize(&plan)?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", - " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " Inner Join: #t1.c4 = #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Inner Join: #t1.c4 = #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 5481161d0c273..6c357380a0606 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -279,7 +279,7 @@ fn create_hashjoin_datatype_context() -> Result { let t1_schema = Schema::new(vec![ Field::new("c1", DataType::Date32, true), Field::new("c2", DataType::Date64, true), - Field::new("c3", DataType::Decimal(5, 2), true), + Field::new("c3", DataType::Decimal128(5, 2), true), Field::new( "c4", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), @@ -312,7 +312,7 @@ fn create_hashjoin_datatype_context() -> Result { let t2_schema = Schema::new(vec![ Field::new("c1", DataType::Date32, true), Field::new("c2", DataType::Date64, true), - Field::new("c3", DataType::Decimal(10, 2), true), + Field::new("c3", DataType::Decimal128(10, 2), true), Field::new( "c4", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index e99a24c320d79..4fc8863c0cb43 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -36,6 +36,6 @@ path = "src/lib.rs" [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "19.0.0", features = ["prettyprint"] } -datafusion-common = { path = "../common", version = "10.0.0" } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = ["prettyprint"], optional = false } +datafusion-common = { path = "../common", features = [], optional = false } sqlparser = "0.19" diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs index 09d759e564666..a4281aa0aa2a3 100644 --- a/datafusion/expr/src/aggregate_function.rs +++ b/datafusion/expr/src/aggregate_function.rs @@ -19,7 +19,7 @@ use crate::{Signature, TypeSignature, Volatility}; use arrow::datatypes::{ - DataType, Field, TimeUnit, DECIMAL_MAX_PRECISION, DECIMAL_MAX_SCALE, + DataType, Field, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, }; use datafusion_common::{DataFusionError, Result}; use std::ops::Deref; @@ -407,11 +407,11 @@ pub fn sum_return_type(arg_type: &DataType) -> Result { // In the https://www.postgresql.org/docs/current/functions-aggregate.html doc, // the result type of floating-point is FLOAT64 with the double precision. DataType::Float64 | DataType::Float32 => Ok(DataType::Float64), - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { // in the spark, the result type is DECIMAL(min(38,precision+10), s) // ref: https://github.com/apache/spark/blob/fcf636d9eb8d645c24be3db2d599aba2d7e2955a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala#L66 - let new_precision = DECIMAL_MAX_PRECISION.min(*precision + 10); - Ok(DataType::Decimal(new_precision, *scale)) + let new_precision = DECIMAL128_MAX_PRECISION.min(*precision + 10); + Ok(DataType::Decimal128(new_precision, *scale)) } other => Err(DataFusionError::Plan(format!( "SUM does not support type \"{:?}\"", @@ -503,12 +503,12 @@ pub fn stddev_return_type(arg_type: &DataType) -> Result { /// function return type of an average pub fn avg_return_type(arg_type: &DataType) -> Result { match arg_type { - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { // in the spark, the result type is DECIMAL(min(38,precision+4), min(38,scale+4)). // ref: https://github.com/apache/spark/blob/fcf636d9eb8d645c24be3db2d599aba2d7e2955a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala#L66 - let new_precision = DECIMAL_MAX_PRECISION.min(*precision + 4); - let new_scale = DECIMAL_MAX_SCALE.min(*scale + 4); - Ok(DataType::Decimal(new_precision, new_scale)) + let new_precision = DECIMAL128_MAX_PRECISION.min(*precision + 4); + let new_scale = DECIMAL128_MAX_SCALE.min(*scale + 4); + Ok(DataType::Decimal128(new_precision, new_scale)) } DataType::Int8 | DataType::Int16 @@ -609,7 +609,7 @@ pub fn is_sum_support_arg_type(arg_type: &DataType) -> bool { | DataType::Int64 | DataType::Float32 | DataType::Float64 - | DataType::Decimal(_, _) + | DataType::Decimal128(_, _) ) } @@ -626,7 +626,7 @@ pub fn is_avg_support_arg_type(arg_type: &DataType) -> bool { | DataType::Int64 | DataType::Float32 | DataType::Float64 - | DataType::Decimal(_, _) + | DataType::Decimal128(_, _) ) } @@ -755,7 +755,7 @@ mod tests { ]; let input_types = vec![ vec![DataType::Int32], - vec![DataType::Decimal(10, 2)], + vec![DataType::Decimal128(10, 2)], vec![DataType::Utf8], ]; for fun in funs { @@ -770,7 +770,7 @@ mod tests { let input_types = vec![ vec![DataType::Int32], vec![DataType::Float32], - vec![DataType::Decimal(20, 3)], + vec![DataType::Decimal128(20, 3)], ]; for fun in funs { for input_type in &input_types { @@ -807,13 +807,13 @@ mod tests { #[test] fn test_avg_return_data_type() -> Result<()> { - let data_type = DataType::Decimal(10, 5); + let data_type = DataType::Decimal128(10, 5); let result_type = avg_return_type(&data_type)?; - assert_eq!(DataType::Decimal(14, 9), result_type); + assert_eq!(DataType::Decimal128(14, 9), result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); let result_type = avg_return_type(&data_type)?; - assert_eq!(DataType::Decimal(38, 14), result_type); + assert_eq!(DataType::Decimal128(38, 14), result_type); Ok(()) } @@ -823,20 +823,20 @@ mod tests { let result_type = variance_return_type(&data_type)?; assert_eq!(DataType::Float64, result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); assert!(variance_return_type(&data_type).is_err()); Ok(()) } #[test] fn test_sum_return_data_type() -> Result<()> { - let data_type = DataType::Decimal(10, 5); + let data_type = DataType::Decimal128(10, 5); let result_type = sum_return_type(&data_type)?; - assert_eq!(DataType::Decimal(20, 5), result_type); + assert_eq!(DataType::Decimal128(20, 5), result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); let result_type = sum_return_type(&data_type)?; - assert_eq!(DataType::Decimal(38, 10), result_type); + assert_eq!(DataType::Decimal128(38, 10), result_type); Ok(()) } @@ -846,7 +846,7 @@ mod tests { let result_type = stddev_return_type(&data_type)?; assert_eq!(DataType::Float64, result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); assert!(stddev_return_type(&data_type).is_err()); Ok(()) } @@ -857,7 +857,7 @@ mod tests { let result_type = covariance_return_type(&data_type)?; assert_eq!(DataType::Float64, result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); assert!(covariance_return_type(&data_type).is_err()); Ok(()) } @@ -868,7 +868,7 @@ mod tests { let result_type = correlation_return_type(&data_type)?; assert_eq!(DataType::Float64, result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); assert!(correlation_return_type(&data_type).is_err()); Ok(()) } diff --git a/datafusion/expr/src/binary_rule.rs b/datafusion/expr/src/binary_rule.rs index 21e62344cc34f..d6994d68847a3 100644 --- a/datafusion/expr/src/binary_rule.rs +++ b/datafusion/expr/src/binary_rule.rs @@ -19,7 +19,7 @@ use crate::Operator; use arrow::compute::can_cast_types; -use arrow::datatypes::{DataType, DECIMAL_MAX_PRECISION, DECIMAL_MAX_SCALE}; +use arrow::datatypes::{DataType, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE}; use datafusion_common::DataFusionError; use datafusion_common::Result; @@ -194,9 +194,9 @@ fn comparison_binary_numeric_coercion( // that the coercion removes the least amount of information match (lhs_type, rhs_type) { // support decimal data type for comparison operation - (d1 @ Decimal(_, _), d2 @ Decimal(_, _)) => get_wider_decimal_type(d1, d2), - (Decimal(_, _), _) => get_comparison_common_decimal_type(lhs_type, rhs_type), - (_, Decimal(_, _)) => get_comparison_common_decimal_type(rhs_type, lhs_type), + (d1 @ Decimal128(_, _), d2 @ Decimal128(_, _)) => get_wider_decimal_type(d1, d2), + (Decimal128(_, _), _) => get_comparison_common_decimal_type(lhs_type, rhs_type), + (_, Decimal128(_, _)) => get_comparison_common_decimal_type(rhs_type, lhs_type), (Float64, _) | (_, Float64) => Some(Float64), (_, Float32) | (Float32, _) => Some(Float32), (Int64, _) | (_, Int64) => Some(Int64), @@ -218,25 +218,25 @@ fn get_comparison_common_decimal_type( let other_decimal_type = &match other_type { // This conversion rule is from spark // https://github.com/apache/spark/blob/1c81ad20296d34f137238dadd67cc6ae405944eb/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala#L127 - DataType::Int8 => DataType::Decimal(3, 0), - DataType::Int16 => DataType::Decimal(5, 0), - DataType::Int32 => DataType::Decimal(10, 0), - DataType::Int64 => DataType::Decimal(20, 0), - DataType::Float32 => DataType::Decimal(14, 7), - DataType::Float64 => DataType::Decimal(30, 15), + DataType::Int8 => DataType::Decimal128(3, 0), + DataType::Int16 => DataType::Decimal128(5, 0), + DataType::Int32 => DataType::Decimal128(10, 0), + DataType::Int64 => DataType::Decimal128(20, 0), + DataType::Float32 => DataType::Decimal128(14, 7), + DataType::Float64 => DataType::Decimal128(30, 15), _ => { return None; } }; match (decimal_type, &other_decimal_type) { - (d1 @ DataType::Decimal(_, _), d2 @ DataType::Decimal(_, _)) => { + (d1 @ DataType::Decimal128(_, _), d2 @ DataType::Decimal128(_, _)) => { get_wider_decimal_type(d1, d2) } _ => None, } } -// Returns a `DataType::Decimal` that can store any value from either +// Returns a `DataType::Decimal128` that can store any value from either // `lhs_decimal_type` and `rhs_decimal_type` // The result decimal type is (max(s1, s2) + max(p1-s1, p2-s2), max(s1, s2)). fn get_wider_decimal_type( @@ -244,7 +244,7 @@ fn get_wider_decimal_type( rhs_type: &DataType, ) -> Option { match (lhs_decimal_type, rhs_type) { - (DataType::Decimal(p1, s1), DataType::Decimal(p2, s2)) => { + (DataType::Decimal128(p1, s1), DataType::Decimal128(p2, s2)) => { // max(s1, s2) + max(p1-s1, p2-s2), max(s1, s2) let s = *s1.max(s2); let range = (p1 - s1).max(p2 - s2); @@ -258,13 +258,13 @@ fn get_wider_decimal_type( // Now, we just support the signed integer type and floating-point type. fn coerce_numeric_type_to_decimal(numeric_type: &DataType) -> Option { match numeric_type { - DataType::Int8 => Some(DataType::Decimal(3, 0)), - DataType::Int16 => Some(DataType::Decimal(5, 0)), - DataType::Int32 => Some(DataType::Decimal(10, 0)), - DataType::Int64 => Some(DataType::Decimal(20, 0)), + DataType::Int8 => Some(DataType::Decimal128(3, 0)), + DataType::Int16 => Some(DataType::Decimal128(5, 0)), + DataType::Int32 => Some(DataType::Decimal128(10, 0)), + DataType::Int64 => Some(DataType::Decimal128(20, 0)), // TODO if we convert the floating-point data to the decimal type, it maybe overflow. - DataType::Float32 => Some(DataType::Decimal(14, 7)), - DataType::Float64 => Some(DataType::Decimal(30, 15)), + DataType::Float32 => Some(DataType::Decimal128(14, 7)), + DataType::Float64 => Some(DataType::Decimal128(30, 15)), _ => None, } } @@ -289,10 +289,10 @@ fn mathematics_numerical_coercion( // these are ordered from most informative to least informative so // that the coercion removes the least amount of information match (lhs_type, rhs_type) { - (Decimal(_, _), Decimal(_, _)) => { + (Decimal128(_, _), Decimal128(_, _)) => { coercion_decimal_mathematics_type(mathematics_op, lhs_type, rhs_type) } - (Decimal(_, _), _) => { + (Decimal128(_, _), _) => { let converted_decimal_type = coerce_numeric_type_to_decimal(rhs_type); match converted_decimal_type { None => None, @@ -303,7 +303,7 @@ fn mathematics_numerical_coercion( ), } } - (_, Decimal(_, _)) => { + (_, Decimal128(_, _)) => { let converted_decimal_type = coerce_numeric_type_to_decimal(lhs_type); match converted_decimal_type { None => None, @@ -329,9 +329,9 @@ fn mathematics_numerical_coercion( } fn create_decimal_type(precision: usize, scale: usize) -> DataType { - DataType::Decimal( - DECIMAL_MAX_PRECISION.min(precision), - DECIMAL_MAX_SCALE.min(scale), + DataType::Decimal128( + DECIMAL128_MAX_PRECISION.min(precision), + DECIMAL128_MAX_SCALE.min(scale), ) } @@ -344,7 +344,7 @@ fn coercion_decimal_mathematics_type( match (left_decimal_type, right_decimal_type) { // The coercion rule from spark // https://github.com/apache/spark/blob/c20af535803a7250fef047c2bf0fe30be242369d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala#L35 - (Decimal(p1, s1), Decimal(p2, s2)) => { + (Decimal128(p1, s1), Decimal128(p2, s2)) => { match mathematics_op { Operator::Plus | Operator::Minus => { // max(s1, s2) @@ -392,19 +392,17 @@ pub fn is_signed_numeric(dt: &DataType) -> bool { | DataType::Float16 | DataType::Float32 | DataType::Float64 - | DataType::Decimal(_, _) + | DataType::Decimal128(_, _) ) } /// Determine if a DataType is numeric or not pub fn is_numeric(dt: &DataType) -> bool { is_signed_numeric(dt) - || match dt { - DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => { - true - } - _ => false, - } + || matches!( + dt, + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 + ) } /// Determine if at least of one of lhs and rhs is numeric, and the other must be NULL or numeric @@ -653,7 +651,7 @@ mod tests { #[test] fn test_decimal_binary_comparison_coercion() -> Result<()> { - let input_decimal = DataType::Decimal(20, 3); + let input_decimal = DataType::Decimal128(20, 3); let input_types = [ DataType::Int8, DataType::Int16, @@ -661,18 +659,18 @@ mod tests { DataType::Int64, DataType::Float32, DataType::Float64, - DataType::Decimal(38, 10), - DataType::Decimal(20, 8), + DataType::Decimal128(38, 10), + DataType::Decimal128(20, 8), ]; let result_types = [ - DataType::Decimal(20, 3), - DataType::Decimal(20, 3), - DataType::Decimal(20, 3), - DataType::Decimal(23, 3), - DataType::Decimal(24, 7), - DataType::Decimal(32, 15), - DataType::Decimal(38, 10), - DataType::Decimal(25, 8), + DataType::Decimal128(20, 3), + DataType::Decimal128(20, 3), + DataType::Decimal128(20, 3), + DataType::Decimal128(23, 3), + DataType::Decimal128(24, 7), + DataType::Decimal128(32, 15), + DataType::Decimal128(38, 10), + DataType::Decimal128(25, 8), ]; let comparison_op_types = [ Operator::NotEq, @@ -699,66 +697,66 @@ mod tests { fn test_decimal_mathematics_op_type() { assert_eq!( coerce_numeric_type_to_decimal(&DataType::Int8).unwrap(), - DataType::Decimal(3, 0) + DataType::Decimal128(3, 0) ); assert_eq!( coerce_numeric_type_to_decimal(&DataType::Int16).unwrap(), - DataType::Decimal(5, 0) + DataType::Decimal128(5, 0) ); assert_eq!( coerce_numeric_type_to_decimal(&DataType::Int32).unwrap(), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ); assert_eq!( coerce_numeric_type_to_decimal(&DataType::Int64).unwrap(), - DataType::Decimal(20, 0) + DataType::Decimal128(20, 0) ); assert_eq!( coerce_numeric_type_to_decimal(&DataType::Float32).unwrap(), - DataType::Decimal(14, 7) + DataType::Decimal128(14, 7) ); assert_eq!( coerce_numeric_type_to_decimal(&DataType::Float64).unwrap(), - DataType::Decimal(30, 15) + DataType::Decimal128(30, 15) ); let op = Operator::Plus; - let left_decimal_type = DataType::Decimal(10, 3); - let right_decimal_type = DataType::Decimal(20, 4); + let left_decimal_type = DataType::Decimal128(10, 3); + let right_decimal_type = DataType::Decimal128(20, 4); let result = coercion_decimal_mathematics_type( &op, &left_decimal_type, &right_decimal_type, ); - assert_eq!(DataType::Decimal(21, 4), result.unwrap()); + assert_eq!(DataType::Decimal128(21, 4), result.unwrap()); let op = Operator::Minus; let result = coercion_decimal_mathematics_type( &op, &left_decimal_type, &right_decimal_type, ); - assert_eq!(DataType::Decimal(21, 4), result.unwrap()); + assert_eq!(DataType::Decimal128(21, 4), result.unwrap()); let op = Operator::Multiply; let result = coercion_decimal_mathematics_type( &op, &left_decimal_type, &right_decimal_type, ); - assert_eq!(DataType::Decimal(31, 7), result.unwrap()); + assert_eq!(DataType::Decimal128(31, 7), result.unwrap()); let op = Operator::Divide; let result = coercion_decimal_mathematics_type( &op, &left_decimal_type, &right_decimal_type, ); - assert_eq!(DataType::Decimal(35, 24), result.unwrap()); + assert_eq!(DataType::Decimal128(35, 24), result.unwrap()); let op = Operator::Modulo; let result = coercion_decimal_mathematics_type( &op, &left_decimal_type, &right_decimal_type, ); - assert_eq!(DataType::Decimal(11, 4), result.unwrap()); + assert_eq!(DataType::Decimal128(11, 4), result.unwrap()); } #[test] diff --git a/datafusion/expr/src/type_coercion.rs b/datafusion/expr/src/type_coercion.rs index 33a540d6f1ef2..27eee3d300662 100644 --- a/datafusion/expr/src/type_coercion.rs +++ b/datafusion/expr/src/type_coercion.rs @@ -182,7 +182,7 @@ pub fn can_coerce_from(type_into: &DataType, type_from: &DataType) -> bool { | UInt64 | Float32 | Float64 - | Decimal(_, _) + | Decimal128(_, _) ), Timestamp(TimeUnit::Nanosecond, None) => { matches!(type_from, Null | Timestamp(_, None)) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index f2f5d6002cc87..25ca8d6e36d62 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -689,7 +689,7 @@ pub fn can_hash(data_type: &DataType) -> bool { }, DataType::Utf8 => true, DataType::LargeUtf8 => true, - DataType::Decimal(_, _) => true, + DataType::Decimal128(_, _) => true, DataType::Date32 => true, DataType::Date64 => true, DataType::Dictionary(key_type, value_type) diff --git a/datafusion/jit/Cargo.toml b/datafusion/jit/Cargo.toml index e5b9e9297c096..7bf7f429ef7a8 100644 --- a/datafusion/jit/Cargo.toml +++ b/datafusion/jit/Cargo.toml @@ -36,12 +36,12 @@ path = "src/lib.rs" jit = [] [dependencies] -arrow = { version = "19.0.0" } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = [], optional = false } cranelift = "0.86.1" cranelift-jit = "0.86.1" cranelift-module = "0.86.1" cranelift-native = "0.86.1" -datafusion-common = { path = "../common", version = "10.0.0", features = ["jit"] } -datafusion-expr = { path = "../expr", version = "10.0.0" } +datafusion-common = { path = "../common", features = ["jit"], optional = false } +datafusion-expr = { path = "../expr", features = [], optional = false } parking_lot = "0.12" diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 30d943c867ba8..6a766271f9b72 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -37,12 +37,12 @@ default = ["unicode_expressions"] unicode_expressions = [] [dependencies] -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = ["prettyprint"], optional = false } async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } -datafusion-common = { path = "../common", version = "10.0.0" } -datafusion-expr = { path = "../expr", version = "10.0.0" } -datafusion-physical-expr = { path = "../physical-expr", version = "10.0.0" } +datafusion-common = { path = "../common", features = [], optional = false } +datafusion-expr = { path = "../expr", features = [], optional = false } +datafusion-physical-expr = { path = "../physical-expr", features = [], optional = false } hashbrown = { version = "0.12", features = ["raw"] } log = "^0.4" diff --git a/datafusion/optimizer/src/decorrelate_scalar_subquery.rs b/datafusion/optimizer/src/decorrelate_scalar_subquery.rs index d4f8372bd326a..561757dc87456 100644 --- a/datafusion/optimizer/src/decorrelate_scalar_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_scalar_subquery.rs @@ -69,7 +69,7 @@ impl DecorrelateScalarSubquery { _ => return Ok(()), }; let subquery = - self.optimize(&*subquery.subquery, optimizer_config)?; + self.optimize(&subquery.subquery, optimizer_config)?; let subquery = Arc::new(subquery); let subquery = Subquery { subquery }; let res = SubqueryInfo::new(subquery, expr, *op, lhs); @@ -163,7 +163,7 @@ fn optimize_scalar( "optimizing:\n{}", query_info.query.subquery.display_indent() ); - let proj = Projection::try_from_plan(&*query_info.query.subquery) + let proj = Projection::try_from_plan(&query_info.query.subquery) .map_err(|e| context!("scalar subqueries must have a projection", e))?; let proj = only_or_err(proj.expr.as_slice()) .map_err(|e| context!("exactly one expression should be projected", e))?; @@ -173,7 +173,7 @@ fn optimize_scalar( .map_err(|e| context!("Exactly one input is expected. Is this a join?", e))?; let aggr = Aggregate::try_from_plan(sub_input) .map_err(|e| context!("scalar subqueries must aggregate a value", e))?; - let filter = Filter::try_from_plan(&*aggr.input).map_err(|e| { + let filter = Filter::try_from_plan(&aggr.input).map_err(|e| { context!("scalar subqueries must have a filter to be correlated", e) })?; diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index 2c25bcbb28e7e..90fff3f800543 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -56,8 +56,7 @@ impl DecorrelateWhereExists { for it in filters.iter() { match it { Expr::Exists { subquery, negated } => { - let subquery = - self.optimize(&*subquery.subquery, optimizer_config)?; + let subquery = self.optimize(&subquery.subquery, optimizer_config)?; let subquery = Arc::new(subquery); let subquery = Subquery { subquery }; let subquery = SubqueryInfo::new(subquery.clone(), *negated); diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index f90d94d8c16f9..5da7d80c1691d 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -60,8 +60,7 @@ impl DecorrelateWhereIn { subquery, negated, } => { - let subquery = - self.optimize(&*subquery.subquery, optimizer_config)?; + let subquery = self.optimize(&subquery.subquery, optimizer_config)?; let subquery = Arc::new(subquery); let subquery = Subquery { subquery }; let subquery = @@ -132,7 +131,7 @@ fn optimize_where_in( outer_other_exprs: &[Expr], optimizer_config: &mut OptimizerConfig, ) -> datafusion_common::Result { - let proj = Projection::try_from_plan(&*query_info.query.subquery) + let proj = Projection::try_from_plan(&query_info.query.subquery) .map_err(|e| context!("a projection is required", e))?; let mut subqry_input = proj.input.clone(); let proj = only_or_err(proj.expr.as_slice()) diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs index 0b865238f42cb..06fb82427621b 100644 --- a/datafusion/optimizer/src/simplify_expressions.rs +++ b/datafusion/optimizer/src/simplify_expressions.rs @@ -159,15 +159,7 @@ fn is_false(expr: &Expr) -> bool { /// returns true if `haystack` looks like (needle OP X) or (X OP needle) fn is_op_with(target_op: Operator, haystack: &Expr, needle: &Expr) -> bool { - match haystack { - Expr::BinaryExpr { left, op, right } - if op == &target_op - && (needle == left.as_ref() || needle == right.as_ref()) => - { - true - } - _ => false, - } + matches!(haystack, Expr::BinaryExpr { left, op, right } if op == &target_op && (needle == left.as_ref() || needle == right.as_ref())) } /// returns the contained boolean value in `expr` as @@ -1903,7 +1895,7 @@ mod tests { let optimized_plan = rule .optimize(plan, &mut config) .expect("failed to optimize plan"); - return format!("{:?}", optimized_plan); + format!("{:?}", optimized_plan) } #[test] diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 6199949f0256c..3b4ba073fb52e 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -40,13 +40,13 @@ unicode_expressions = ["unicode-segmentation"] [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = ["prettyprint"], optional = false } blake2 = { version = "^0.10.2", optional = true } blake3 = { version = "1.0", optional = true } chrono = { version = "0.4", default-features = false } -datafusion-common = { path = "../common", version = "10.0.0" } -datafusion-expr = { path = "../expr", version = "10.0.0" } -datafusion-row = { path = "../row", version = "10.0.0" } +datafusion-common = { path = "../common", features = [], optional = false } +datafusion-expr = { path = "../expr", features = [], optional = false } +datafusion-row = { path = "../row", features = [], optional = false } hashbrown = { version = "0.12", features = ["raw"] } lazy_static = { version = "^1.4.0" } md-5 = { version = "^0.10.0", optional = true } diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index a55e0e35278f8..9248a5e6bb3f5 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -54,7 +54,7 @@ impl Avg { // the result of avg just support FLOAT64 and Decimal data type. assert!(matches!( data_type, - DataType::Float64 | DataType::Decimal(_, _) + DataType::Float64 | DataType::Decimal128(_, _) )); Self { name: name.into(), @@ -301,10 +301,10 @@ mod tests { generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Avg, ScalarValue::Decimal128(Some(35000), 14, 4), - DataType::Decimal(14, 4) + DataType::Decimal128(14, 4) ) } @@ -318,10 +318,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Avg, ScalarValue::Decimal128(Some(32500), 14, 4), - DataType::Decimal(14, 4) + DataType::Decimal128(14, 4) ) } @@ -336,10 +336,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Avg, ScalarValue::Decimal128(None, 14, 4), - DataType::Decimal(14, 4) + DataType::Decimal128(14, 4) ) } diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index 8d76e35e4945e..a28c397accaed 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -292,7 +292,7 @@ mod tests { DataType::Int32, DataType::Float32, DataType::Float64, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), DataType::Utf8, ]; for fun in funcs { @@ -453,7 +453,7 @@ mod tests { DataType::Int32, DataType::Float32, DataType::Float64, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), DataType::Utf8, ]; for fun in funcs { @@ -898,7 +898,7 @@ mod tests { let observed = return_type( &AggregateFunction::ApproxMedian, - &[DataType::Decimal(10, 6)], + &[DataType::Decimal128(10, 6)], ); assert!(observed.is_err()); @@ -914,13 +914,14 @@ mod tests { assert_eq!(DataType::Int32, observed); // test decimal for min - let observed = return_type(&AggregateFunction::Min, &[DataType::Decimal(10, 6)])?; - assert_eq!(DataType::Decimal(10, 6), observed); + let observed = + return_type(&AggregateFunction::Min, &[DataType::Decimal128(10, 6)])?; + assert_eq!(DataType::Decimal128(10, 6), observed); // test decimal for max let observed = - return_type(&AggregateFunction::Max, &[DataType::Decimal(28, 13)])?; - assert_eq!(DataType::Decimal(28, 13), observed); + return_type(&AggregateFunction::Max, &[DataType::Decimal128(28, 13)])?; + assert_eq!(DataType::Decimal128(28, 13), observed); Ok(()) } @@ -939,11 +940,13 @@ mod tests { let observed = return_type(&AggregateFunction::Sum, &[DataType::Float64])?; assert_eq!(DataType::Float64, observed); - let observed = return_type(&AggregateFunction::Sum, &[DataType::Decimal(10, 5)])?; - assert_eq!(DataType::Decimal(20, 5), observed); + let observed = + return_type(&AggregateFunction::Sum, &[DataType::Decimal128(10, 5)])?; + assert_eq!(DataType::Decimal128(20, 5), observed); - let observed = return_type(&AggregateFunction::Sum, &[DataType::Decimal(35, 5)])?; - assert_eq!(DataType::Decimal(38, 5), observed); + let observed = + return_type(&AggregateFunction::Sum, &[DataType::Decimal128(35, 5)])?; + assert_eq!(DataType::Decimal128(38, 5), observed); Ok(()) } @@ -970,7 +973,7 @@ mod tests { assert_eq!(DataType::Int64, observed); let observed = - return_type(&AggregateFunction::Count, &[DataType::Decimal(28, 13)])?; + return_type(&AggregateFunction::Count, &[DataType::Decimal128(28, 13)])?; assert_eq!(DataType::Int64, observed); Ok(()) } @@ -986,11 +989,13 @@ mod tests { let observed = return_type(&AggregateFunction::Avg, &[DataType::Int32])?; assert_eq!(DataType::Float64, observed); - let observed = return_type(&AggregateFunction::Avg, &[DataType::Decimal(10, 6)])?; - assert_eq!(DataType::Decimal(14, 10), observed); + let observed = + return_type(&AggregateFunction::Avg, &[DataType::Decimal128(10, 6)])?; + assert_eq!(DataType::Decimal128(14, 10), observed); - let observed = return_type(&AggregateFunction::Avg, &[DataType::Decimal(36, 6)])?; - assert_eq!(DataType::Decimal(38, 10), observed); + let observed = + return_type(&AggregateFunction::Avg, &[DataType::Decimal128(36, 6)])?; + assert_eq!(DataType::Decimal128(38, 10), observed); Ok(()) } diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 077f4d725de3b..0391382f91c4d 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -207,7 +207,7 @@ macro_rules! typed_min_max_batch_decimal128 { macro_rules! min_max_batch { ($VALUES:expr, $OP:ident) => {{ match $VALUES.data_type() { - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { typed_min_max_batch_decimal128!($VALUES, precision, scale, $OP) } // all types that have a natural order @@ -803,10 +803,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Min, ScalarValue::Decimal128(Some(1), 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } @@ -821,10 +821,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Min, ScalarValue::Decimal128(None, 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } @@ -840,10 +840,10 @@ mod tests { generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Min, ScalarValue::Decimal128(Some(1), 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } @@ -892,10 +892,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Max, ScalarValue::Decimal128(Some(5), 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } @@ -909,10 +909,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Max, ScalarValue::Decimal128(Some(5), 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } @@ -926,10 +926,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Min, ScalarValue::Decimal128(None, 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index b0a7de6c633c6..634b21c61a331 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -176,7 +176,7 @@ fn sum_decimal_batch( pub(crate) fn sum_batch(values: &ArrayRef, sum_type: &DataType) -> Result { let values = &cast(values, sum_type)?; Ok(match values.data_type() { - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { sum_decimal_batch(values, precision, scale)? } DataType::Float64 => typed_sum_delta_batch!(values, Float64Array, Float64), @@ -544,7 +544,7 @@ mod tests { .collect::() .with_precision_and_scale(10, 0)?, ); - let result = sum_batch(&array, &DataType::Decimal(10, 0))?; + let result = sum_batch(&array, &DataType::Decimal128(10, 0))?; assert_eq!(ScalarValue::Decimal128(Some(15), 10, 0), result); // test agg @@ -557,10 +557,10 @@ mod tests { generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Sum, ScalarValue::Decimal128(Some(15), 20, 0), - DataType::Decimal(20, 0) + DataType::Decimal128(20, 0) ) } @@ -579,7 +579,7 @@ mod tests { .collect::() .with_precision_and_scale(10, 0)?, ); - let result = sum_batch(&array, &DataType::Decimal(10, 0))?; + let result = sum_batch(&array, &DataType::Decimal128(10, 0))?; assert_eq!(ScalarValue::Decimal128(Some(13), 10, 0), result); // test agg @@ -591,10 +591,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(35, 0), + DataType::Decimal128(35, 0), Sum, ScalarValue::Decimal128(Some(13), 38, 0), - DataType::Decimal(38, 0) + DataType::Decimal128(38, 0) ) } @@ -613,16 +613,16 @@ mod tests { .collect::() .with_precision_and_scale(10, 0)?, ); - let result = sum_batch(&array, &DataType::Decimal(10, 0))?; + let result = sum_batch(&array, &DataType::Decimal128(10, 0))?; assert_eq!(ScalarValue::Decimal128(None, 10, 0), result); // test agg generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Sum, ScalarValue::Decimal128(None, 20, 0), - DataType::Decimal(20, 0) + DataType::Decimal128(20, 0) ) } diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs index d939a033e3689..96ba818349594 100644 --- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs @@ -289,9 +289,9 @@ mod tests { ); generic_test_sum_distinct!( array, - DataType::Decimal(35, 0), + DataType::Decimal128(35, 0), ScalarValue::Decimal128(Some(1), 38, 0), - DataType::Decimal(38, 0) + DataType::Decimal128(38, 0) ) } } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index f199466e9ac93..64e35311625af 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -334,7 +334,7 @@ macro_rules! binary_primitive_array_op { match $LEFT.data_type() { // TODO support decimal type // which is not the primitive type - DataType::Decimal(_,_) => compute_decimal_op!($LEFT, $RIGHT, $OP, Decimal128Array), + DataType::Decimal128(_,_) => compute_decimal_op!($LEFT, $RIGHT, $OP, Decimal128Array), DataType::Int8 => compute_op!($LEFT, $RIGHT, $OP, Int8Array), DataType::Int16 => compute_op!($LEFT, $RIGHT, $OP, Int16Array), DataType::Int32 => compute_op!($LEFT, $RIGHT, $OP, Int32Array), @@ -359,7 +359,7 @@ macro_rules! binary_primitive_array_op { macro_rules! binary_primitive_array_op_scalar { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ let result: Result> = match $LEFT.data_type() { - DataType::Decimal(_,_) => compute_decimal_op_scalar!($LEFT, $RIGHT, $OP, Decimal128Array), + DataType::Decimal128(_,_) => compute_decimal_op_scalar!($LEFT, $RIGHT, $OP, Decimal128Array), DataType::Int8 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int8Array), DataType::Int16 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int16Array), DataType::Int32 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int32Array), @@ -386,7 +386,7 @@ macro_rules! binary_array_op { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ match $LEFT.data_type() { DataType::Null => compute_null_op!($LEFT, $RIGHT, $OP, NullArray), - DataType::Decimal(_,_) => compute_decimal_op!($LEFT, $RIGHT, $OP, Decimal128Array), + DataType::Decimal128(_,_) => compute_decimal_op!($LEFT, $RIGHT, $OP, Decimal128Array), DataType::Int8 => compute_op!($LEFT, $RIGHT, $OP, Int8Array), DataType::Int16 => compute_op!($LEFT, $RIGHT, $OP, Int16Array), DataType::Int32 => compute_op!($LEFT, $RIGHT, $OP, Int32Array), @@ -2208,7 +2208,7 @@ mod tests { // compare decimal array with other array type let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Decimal(10, 0), true), + Field::new("b", DataType::Decimal128(10, 0), true), ])); let value: i64 = 123; @@ -2252,7 +2252,7 @@ mod tests { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Float64, true), - Field::new("b", DataType::Decimal(10, 2), true), + Field::new("b", DataType::Decimal128(10, 2), true), ])); let value: i128 = 123; @@ -2353,7 +2353,7 @@ mod tests { fn arithmetic_decimal_expr_test() -> Result<()> { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Decimal(10, 2), true), + Field::new("b", DataType::Decimal128(10, 2), true), ])); let value: i128 = 123; let decimal_array = Arc::new(create_decimal_array( @@ -2391,7 +2391,7 @@ mod tests { // subtract: decimal array subtract int32 array let schema = Arc::new(Schema::new(vec![ Field::new("b", DataType::Int32, true), - Field::new("a", DataType::Decimal(10, 2), true), + Field::new("a", DataType::Decimal128(10, 2), true), ])); let expect = Arc::new(create_decimal_array( &[Some(-12177), None, Some(-12178), Some(-12276)], @@ -2424,7 +2424,7 @@ mod tests { // divide: int32 array divide decimal array let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Decimal(10, 2), true), + Field::new("b", DataType::Decimal128(10, 2), true), ])); let expect = Arc::new(create_decimal_array( &[ @@ -2447,7 +2447,7 @@ mod tests { // modulus: int32 array modulus decimal array let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Decimal(10, 2), true), + Field::new("b", DataType::Decimal128(10, 2), true), ])); let expect = Arc::new(create_decimal_array( &[Some(000), None, Some(100), Some(000)], diff --git a/datafusion/physical-expr/src/expressions/binary/adapter.rs b/datafusion/physical-expr/src/expressions/binary/adapter.rs index b0293cdf0be6c..12b8fab89d76c 100644 --- a/datafusion/physical-expr/src/expressions/binary/adapter.rs +++ b/datafusion/physical-expr/src/expressions/binary/adapter.rs @@ -38,7 +38,7 @@ macro_rules! make_dyn_comp_op { // Call `op_decimal` (e.g. `eq_decimal) until // arrow has native support // https://github.com/apache/arrow-rs/issues/1200 - (DataType::Decimal(_, _), DataType::Decimal(_, _)) => { + (DataType::Decimal128(_, _), DataType::Decimal128(_, _)) => { [<$OP _decimal>](as_decimal_array(left), as_decimal_array(right)) }, // By default call the arrow kernel diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs index ba8fff716e081..69b47944d779c 100644 --- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs +++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs @@ -133,7 +133,7 @@ where { Ok(left .iter() - .map(|left| left.map(|left| op(left, right))) + .map(|left| left.map(|left| op(left.as_i128(), right))) .collect()) } @@ -152,7 +152,7 @@ where .zip(right.iter()) .map(|(left, right)| { if let (Some(left), Some(right)) = (left, right) { - Some(op(left, right)) + Some(op(left.as_i128(), right.as_i128())) } else { None } @@ -288,7 +288,7 @@ where .zip(right.iter()) .map(|(left, right)| { if let (Some(left), Some(right)) = (left, right) { - Some(op(left, right)).transpose() + Some(op(left.as_i128(), right.as_i128())).transpose() } else { Ok(None) } @@ -307,7 +307,7 @@ where left.iter() .map(|left| { if let Some(left) = left { - Some(op(left, right)).transpose() + Some(op(left.as_i128(), right)).transpose() } else { Ok(None) } diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 72503bbdb4429..176eb9472ce9c 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -290,9 +290,9 @@ mod tests { generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Decimal128Array, - DataType::Decimal(20, 6), + DataType::Decimal128(20, 6), vec![ Some(convert(1_234_000)), Some(convert(2_222_000)), @@ -312,9 +312,9 @@ mod tests { let convert = |v: i128| Decimal128::new(10, 2, &v.to_le_bytes()); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Decimal128Array, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), vec![ Some(convert(123)), Some(convert(222)), @@ -339,7 +339,7 @@ mod tests { .with_precision_and_scale(10, 0)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int8Array, DataType::Int8, vec![ @@ -360,7 +360,7 @@ mod tests { .with_precision_and_scale(10, 0)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int16Array, DataType::Int16, vec![ @@ -381,7 +381,7 @@ mod tests { .with_precision_and_scale(10, 0)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int32Array, DataType::Int32, vec![ @@ -402,7 +402,7 @@ mod tests { .with_precision_and_scale(10, 0)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int64Array, DataType::Int64, vec![ @@ -431,7 +431,7 @@ mod tests { .with_precision_and_scale(10, 3)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Float32Array, DataType::Float32, vec![ @@ -452,7 +452,7 @@ mod tests { .with_precision_and_scale(20, 6)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(20, 6), + DataType::Decimal128(20, 6), Float64Array, DataType::Float64, vec![ @@ -477,7 +477,7 @@ mod tests { DataType::Int8, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(3, 0), + DataType::Decimal128(3, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -495,7 +495,7 @@ mod tests { DataType::Int16, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(5, 0), + DataType::Decimal128(5, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -513,7 +513,7 @@ mod tests { DataType::Int32, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -531,7 +531,7 @@ mod tests { DataType::Int64, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(20, 0), + DataType::Decimal128(20, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -549,7 +549,7 @@ mod tests { DataType::Int64, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(20, 2), + DataType::Decimal128(20, 2), vec![ Some(convert(100)), Some(convert(200)), @@ -567,7 +567,7 @@ mod tests { DataType::Float32, vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50], Decimal128Array, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), vec![ Some(convert(150)), Some(convert(250)), @@ -585,7 +585,7 @@ mod tests { DataType::Float64, vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50], Decimal128Array, - DataType::Decimal(20, 4), + DataType::Decimal128(20, 4), vec![ Some(convert(15000)), Some(convert(25000)), diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 899a20835ba7e..a391bf51dda18 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -254,6 +254,45 @@ macro_rules! collection_contains_check { }}; } +macro_rules! collection_contains_check_decimal { + ($ARRAY:expr, $VALUES:expr, $NEGATED:expr, $CONTAINS_NULL:expr) => {{ + let bool_array = if $NEGATED { + // Not in + if $CONTAINS_NULL { + $ARRAY + .iter() + .map(|vop| match vop.map(|v| !$VALUES.contains(&v.as_i128())) { + Some(true) => None, + x => x, + }) + .collect::() + } else { + $ARRAY + .iter() + .map(|vop| vop.map(|v| !$VALUES.contains(&v.as_i128()))) + .collect::() + } + } else { + // In + if $CONTAINS_NULL { + $ARRAY + .iter() + .map(|vop| match vop.map(|v| $VALUES.contains(&v.as_i128())) { + Some(false) => None, + x => x, + }) + .collect::() + } else { + $ARRAY + .iter() + .map(|vop| vop.map(|v| $VALUES.contains(&v.as_i128()))) + .collect::() + } + }; + ColumnarValue::Array(Arc::new(bool_array)) + }}; +} + // whether each value on the left (can be null) is contained in the non-null list fn in_list_utf8( array: &GenericStringArray, @@ -315,7 +354,7 @@ fn make_list_contains_decimal( }) .collect::>(); - collection_contains_check!(array, values, negated, contains_null) + collection_contains_check_decimal!(array, values, negated, contains_null) } fn make_set_contains_decimal( @@ -335,7 +374,7 @@ fn make_set_contains_decimal( .collect::>(); let native_set: HashSet = HashSet::from_iter(native_array); - collection_contains_check!(array, native_set, negated, contains_null) + collection_contains_check_decimal!(array, native_set, negated, contains_null) } fn set_contains_utf8( @@ -631,7 +670,7 @@ impl PhysicalExpr for InListExpr { .unwrap(); Ok(set_contains_utf8(array, set, self.negated)) } - DataType::Decimal(_, _) => { + DataType::Decimal128(_, _) => { let array = array.as_any().downcast_ref::().unwrap(); Ok(make_set_contains_decimal(array, set, self.negated)) } @@ -760,7 +799,7 @@ impl PhysicalExpr for InListExpr { let null_array = new_null_array(&DataType::Boolean, array.len()); Ok(ColumnarValue::Array(Arc::new(null_array))) } - DataType::Decimal(_, _) => { + DataType::Decimal128(_, _) => { let decimal_array = array.as_any().downcast_ref::().unwrap(); Ok(make_list_contains_decimal( @@ -1032,7 +1071,8 @@ mod tests { #[test] fn in_list_decimal() -> Result<()> { // Now, we can check the NULL type - let schema = Schema::new(vec![Field::new("a", DataType::Decimal(13, 4), true)]); + let schema = + Schema::new(vec![Field::new("a", DataType::Decimal128(13, 4), true)]); let array = vec![Some(100_0000_i128), None, Some(200_5000_i128)] .into_iter() .collect::(); @@ -1278,7 +1318,8 @@ mod tests { #[test] fn in_list_set_decimal() -> Result<()> { - let schema = Schema::new(vec![Field::new("a", DataType::Decimal(13, 4), true)]); + let schema = + Schema::new(vec![Field::new("a", DataType::Decimal128(13, 4), true)]); let array = vec![Some(100_0000_i128), Some(200_5000_i128), None] .into_iter() .collect::(); @@ -1320,7 +1361,8 @@ mod tests { #[test] fn test_cast_static_filter_to_set() -> Result<()> { // random schema - let schema = Schema::new(vec![Field::new("a", DataType::Decimal(13, 4), true)]); + let schema = + Schema::new(vec![Field::new("a", DataType::Decimal128(13, 4), true)]); // list of phy expr let mut phy_exprs = vec![ lit(1i64), diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs index 5e8cc30feed06..0333cb30098aa 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr/src/expressions/try_cast.rs @@ -237,9 +237,9 @@ mod tests { let convert = |v: i128| Decimal128::new(20, 6, &v.to_le_bytes()); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Decimal128Array, - DataType::Decimal(20, 6), + DataType::Decimal128(20, 6), vec![ Some(convert(1_234_000)), Some(convert(2_222_000)), @@ -254,9 +254,9 @@ mod tests { let convert = |v: i128| Decimal128::new(10, 2, &v.to_le_bytes()); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Decimal128Array, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), vec![ Some(convert(123)), Some(convert(222)), @@ -279,7 +279,7 @@ mod tests { // decimal to i8 generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int8Array, DataType::Int8, vec![ @@ -296,7 +296,7 @@ mod tests { let decimal_array = create_decimal_array(&array, 10, 0); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int16Array, DataType::Int16, vec![ @@ -313,7 +313,7 @@ mod tests { let decimal_array = create_decimal_array(&array, 10, 0); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int32Array, DataType::Int32, vec![ @@ -330,7 +330,7 @@ mod tests { let decimal_array = create_decimal_array(&array, 10, 0); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int64Array, DataType::Int64, vec![ @@ -348,7 +348,7 @@ mod tests { let decimal_array = create_decimal_array(&array, 10, 3); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Float32Array, DataType::Float32, vec![ @@ -364,7 +364,7 @@ mod tests { let decimal_array = create_decimal_array(&array, 20, 6); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(20, 6), + DataType::Decimal128(20, 6), Float64Array, DataType::Float64, vec![ @@ -389,7 +389,7 @@ mod tests { DataType::Int8, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(3, 0), + DataType::Decimal128(3, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -406,7 +406,7 @@ mod tests { DataType::Int16, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(5, 0), + DataType::Decimal128(5, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -423,7 +423,7 @@ mod tests { DataType::Int32, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -440,7 +440,7 @@ mod tests { DataType::Int64, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(20, 0), + DataType::Decimal128(20, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -457,7 +457,7 @@ mod tests { DataType::Int64, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(20, 2), + DataType::Decimal128(20, 2), vec![ Some(convert(100)), Some(convert(200)), @@ -474,7 +474,7 @@ mod tests { DataType::Float32, vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50], Decimal128Array, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), vec![ Some(convert(150)), Some(convert(250)), @@ -491,7 +491,7 @@ mod tests { DataType::Float64, vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50], Decimal128Array, - DataType::Decimal(20, 4), + DataType::Decimal128(20, 4), vec![ Some(convert(15000)), Some(convert(25000)), diff --git a/datafusion/physical-expr/src/type_coercion.rs b/datafusion/physical-expr/src/type_coercion.rs index fb5f59ef376de..c7648cc264d97 100644 --- a/datafusion/physical-expr/src/type_coercion.rs +++ b/datafusion/physical-expr/src/type_coercion.rs @@ -78,7 +78,7 @@ mod tests { Schema::new( t.iter() .enumerate() - .map(|(i, t)| Field::new(&*format!("c{}", i), t.clone(), true)) + .map(|(i, t)| Field::new(&format!("c{}", i), t.clone(), true)) .collect(), ) }; diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index c2966dcdee86b..90db744356e42 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -37,13 +37,13 @@ default = [] json = ["pbjson", "pbjson-build", "serde", "serde_json"] [dependencies] -arrow = { version = "19.0.0" } -datafusion = { path = "../core", version = "10.0.0" } -datafusion-common = { path = "../common", version = "10.0.0" } -datafusion-expr = { path = "../expr", version = "10.0.0" } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = [], optional = false } +datafusion = { path = "../core", features = [], optional = false } +datafusion-common = { path = "../common", features = [], optional = false } +datafusion-expr = { path = "../expr", features = [], optional = false } pbjson = { version = "0.3", optional = true } pbjson-types = { version = "0.3", optional = true } -prost = "0.10" +prost = "0.11.0" serde = { version = "1.0", optional = true } serde_json = { version = "1.0", optional = true } @@ -53,4 +53,4 @@ tokio = "1.18" [build-dependencies] pbjson-build = { version = "0.3", optional = true } -prost-build = { version = "0.10" } +prost-build = { version = "0.11.1" } diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index 1f3c3955a0f48..96b76b5bd4441 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -226,7 +226,7 @@ impl From for DataType { DataType::Time64(TimeUnit::Nanosecond) } protobuf::PrimitiveScalarType::Null => DataType::Null, - protobuf::PrimitiveScalarType::Decimal128 => DataType::Decimal(0, 0), + protobuf::PrimitiveScalarType::Decimal128 => DataType::Decimal128(0, 0), protobuf::PrimitiveScalarType::Date64 => DataType::Date64, protobuf::PrimitiveScalarType::TimeSecond => { DataType::Timestamp(TimeUnit::Second, None) @@ -309,7 +309,7 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType { arrow_type::ArrowTypeEnum::Decimal(protobuf::Decimal { whole, fractional, - }) => DataType::Decimal(*whole as usize, *fractional as usize), + }) => DataType::Decimal128(*whole as usize, *fractional as usize), arrow_type::ArrowTypeEnum::List(list) => { let list_type = list.as_ref().field_type.as_deref().required("field_type")?; diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 88230766d9071..c697234427906 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -153,7 +153,7 @@ mod roundtrip_tests { pub expr: ::core::option::Option, } - #[derive(Clone, PartialEq, ::prost::Message)] + #[derive(Clone, PartialEq, Eq, ::prost::Message)] pub struct TopKExecProto { #[prost(uint64, tag = "1")] pub k: u64, @@ -569,7 +569,7 @@ mod roundtrip_tests { DataType::FixedSizeBinary(1234), DataType::FixedSizeBinary(-432), DataType::LargeBinary, - DataType::Decimal(1345, 5431), + DataType::Decimal128(1345, 5431), // Recursive list tests DataType::List(new_box_field("Level1", DataType::Binary, true)), DataType::List(new_box_field( @@ -651,7 +651,7 @@ mod roundtrip_tests { ])), ), DataType::Dictionary( - Box::new(DataType::Decimal(10, 50)), + Box::new(DataType::Decimal128(10, 50)), Box::new(DataType::FixedSizeList( new_box_field("Level1", DataType::Binary, true), 4, @@ -724,7 +724,7 @@ mod roundtrip_tests { DataType::LargeBinary, DataType::Utf8, DataType::LargeUtf8, - DataType::Decimal(1345, 5431), + DataType::Decimal128(1345, 5431), // Recursive list tests DataType::List(new_box_field("Level1", DataType::Binary, true)), DataType::List(new_box_field( @@ -806,7 +806,7 @@ mod roundtrip_tests { ])), ), DataType::Dictionary( - Box::new(DataType::Decimal(10, 50)), + Box::new(DataType::Decimal128(10, 50)), Box::new(DataType::FixedSizeList( new_box_field("Level1", DataType::Binary, true), 4, diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index b8ca810084530..2f083f6129276 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -219,7 +219,7 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum { value: Some(Box::new(value_type.as_ref().into())), })) } - DataType::Decimal(whole, fractional) => Self::Decimal(protobuf::Decimal { + DataType::Decimal128(whole, fractional) => Self::Decimal(protobuf::Decimal { whole: *whole as u64, fractional: *fractional as u64, }), @@ -1243,7 +1243,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype { | DataType::Union(_, _, _) | DataType::Dictionary(_, _) | DataType::Map(_, _) - | DataType::Decimal(_, _) + | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => { return Err(Error::invalid_scalar_type(val)); } diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml index 2227103f4b486..123a9172788dc 100644 --- a/datafusion/row/Cargo.toml +++ b/datafusion/row/Cargo.toml @@ -37,8 +37,8 @@ path = "src/lib.rs" jit = ["datafusion-jit"] [dependencies] -arrow = { version = "19.0.0" } -datafusion-common = { path = "../common", version = "10.0.0" } -datafusion-jit = { path = "../jit", version = "10.0.0", optional = true } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = [], optional = false } +datafusion-common = { path = "../common", features = [], optional = false } +datafusion-jit = { path = "../jit", features = [], optional = true } paste = "^1.0" rand = "0.8" diff --git a/datafusion/row/src/layout.rs b/datafusion/row/src/layout.rs index e5214f7c307e3..1518df9bf55a2 100644 --- a/datafusion/row/src/layout.rs +++ b/datafusion/row/src/layout.rs @@ -166,7 +166,7 @@ fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec, usiz let mut offset = null_width; for f in schema.fields() { offsets.push(offset); - assert!(!matches!(f.data_type(), DataType::Decimal(_, _))); + assert!(!matches!(f.data_type(), DataType::Decimal128(_, _))); // All of the current support types can fit into one single 8-bytes word. // When we decide to support Decimal type in the future, its width would be // of two 8-bytes words and should adapt the width calculation below. diff --git a/datafusion/row/src/lib.rs b/datafusion/row/src/lib.rs index 5a76693564ab4..7d715f9da4ea8 100644 --- a/datafusion/row/src/lib.rs +++ b/datafusion/row/src/lib.rs @@ -388,7 +388,7 @@ mod tests { fn test_unsupported_type_read() { let schema = Arc::new(Schema::new(vec![Field::new( "a", - DataType::Decimal(5, 2), + DataType::Decimal128(5, 2), false, )])); let vector = vec![0; 1024]; diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index 760a58587ef6d..3a812e620bd7e 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -38,9 +38,9 @@ unicode_expressions = [] [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "19.0.0", features = ["prettyprint"] } -datafusion-common = { path = "../common", version = "10.0.0" } -datafusion-expr = { path = "../expr", version = "10.0.0" } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "30c94dbf1c422f81f8520b9956e96ab7b53c3f47", features = ["prettyprint"], optional = false } +datafusion-common = { path = "../common", features = [], optional = false } +datafusion-expr = { path = "../expr", features = [], optional = false } hashbrown = "0.12" sqlparser = "0.19" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } diff --git a/datafusion/sql/examples/sql.rs b/datafusion/sql/examples/sql.rs index d5af9d5ed287d..f03cad0b6fe35 100644 --- a/datafusion/sql/examples/sql.rs +++ b/datafusion/sql/examples/sql.rs @@ -75,7 +75,7 @@ impl MySchemaProvider { "state".to_string(), create_table_source(vec![ Field::new("id", DataType::Int32, false), - Field::new("sales_tax", DataType::Decimal(10, 2), false), + Field::new("sales_tax", DataType::Decimal128(10, 2), false), ]), ); tables.insert( @@ -85,7 +85,7 @@ impl MySchemaProvider { Field::new("customer_id", DataType::Int32, false), Field::new("item_id", DataType::Int32, false), Field::new("quantity", DataType::Int32, false), - Field::new("price", DataType::Decimal(10, 2), false), + Field::new("price", DataType::Decimal128(10, 2), false), ]), ); Self { tables } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index d93076fbdd916..ad16d5ea3d9cd 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -368,9 +368,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let table_ref: TableReference = table_name.as_str().into(); // check if table_name exists - if let Err(e) = self.schema_provider.get_table_provider(table_ref) { - return Err(e); - } + let _ = self.schema_provider.get_table_provider(table_ref)?; if self.has_table("information_schema", "tables") { let sql = format!("SELECT column_name, data_type, is_nullable \ @@ -2290,9 +2288,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let table_name = normalize_sql_object_name(sql_table_name); let table_ref: TableReference = table_name.as_str().into(); - if let Err(e) = self.schema_provider.get_table_provider(table_ref) { - return Err(e); - } + let _ = self.schema_provider.get_table_provider(table_ref)?; // Figure out the where clause let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter(); @@ -2337,9 +2333,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let table_name = normalize_sql_object_name(sql_table_name); let table_ref: TableReference = table_name.as_str().into(); - if let Err(e) = self.schema_provider.get_table_provider(table_ref) { - return Err(e); - } + let _ = self.schema_provider.get_table_provider(table_ref)?; // Figure out the where clause let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter(); @@ -2628,7 +2622,7 @@ mod tests { fn test_int_decimal_default() { quick_test( "SELECT CAST(10 AS DECIMAL)", - "Projection: CAST(Int64(10) AS Decimal(38, 10))\ + "Projection: CAST(Int64(10) AS Decimal128(38, 10))\ \n EmptyRelation", ); } @@ -2637,7 +2631,7 @@ mod tests { fn test_int_decimal_no_scale() { quick_test( "SELECT CAST(10 AS DECIMAL(5))", - "Projection: CAST(Int64(10) AS Decimal(5, 0))\ + "Projection: CAST(Int64(10) AS Decimal128(5, 0))\ \n EmptyRelation", ); } @@ -4418,7 +4412,7 @@ mod tests { ])), "test_decimal" => Ok(Schema::new(vec![ Field::new("id", DataType::Int32, false), - Field::new("price", DataType::Decimal(10, 2), false), + Field::new("price", DataType::Decimal128(10, 2), false), ])), "person" => Ok(Schema::new(vec![ Field::new("id", DataType::UInt32, false), diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 00c28f8234172..81ea34de187b5 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -17,7 +17,7 @@ //! SQL Utility Functions -use arrow::datatypes::{DataType, DECIMAL_DEFAULT_SCALE, DECIMAL_MAX_PRECISION}; +use arrow::datatypes::{DataType, DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE}; use sqlparser::ast::Ident; use datafusion_common::{DataFusionError, Result, ScalarValue}; @@ -454,17 +454,17 @@ pub(crate) fn make_decimal_type( "Cannot specify only scale for decimal data type".to_string(), )) } - (None, None) => (DECIMAL_MAX_PRECISION, DECIMAL_DEFAULT_SCALE), + (None, None) => (DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE), }; // Arrow decimal is i128 meaning 38 maximum decimal digits - if precision > DECIMAL_MAX_PRECISION || scale > precision { + if precision > DECIMAL128_MAX_PRECISION || scale > precision { Err(DataFusionError::Internal(format!( "For decimal(precision, scale) precision must be less than or equal to 38 and scale can't be greater than precision. Got ({}, {})", precision, scale ))) } else { - Ok(DataType::Decimal(precision, scale)) + Ok(DataType::Decimal128(precision, scale)) } }