From 00df64a68d1493cb0cb4137b8f99ec81bd8c2188 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 16 Sep 2021 09:44:25 +0800 Subject: [PATCH] Make scalar.rs compile (#2) * wip * more * Make scalar.rs compile --- datafusion/src/datasource/parquet.rs | 30 +++- datafusion/src/execution/dataframe_impl.rs | 4 +- datafusion/src/physical_plan/analyze.rs | 7 +- .../src/physical_plan/datetime_expressions.rs | 4 +- .../src/physical_plan/expressions/in_list.rs | 6 +- datafusion/src/physical_plan/parquet.rs | 2 +- datafusion/src/scalar.rs | 165 +++++++----------- 7 files changed, 94 insertions(+), 124 deletions(-) diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index b6afd22b245b..7c4e0e1a07c1 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -17,7 +17,7 @@ //! Parquet data source -use std::any::{Any, type_name}; +use std::any::{type_name, Any}; use std::fs::File; use std::sync::Arc; @@ -221,7 +221,8 @@ impl ParquetTableDescriptor { if let DataType::$DT = fields[i].data_type() { let stats = stats .as_any() - .downcast_ref::>().ok_or_else(|| { + .downcast_ref::>() + .ok_or_else(|| { DataFusionError::Internal(format!( "Failed to cast stats to {} stats", type_name::<$PRIMITIVE_TYPE>() @@ -254,9 +255,13 @@ impl ParquetTableDescriptor { match stats.physical_type() { PhysicalType::Boolean => { if let DataType::Boolean = fields[i].data_type() { - let stats = - stats.as_any().downcast_ref::().ok_or_else(|| { - DataFusionError::Internal("Failed to cast stats to boolean stats".to_owned()) + let stats = stats + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal( + "Failed to cast stats to boolean stats".to_owned(), + ) })?; if let Some(max_value) = &mut max_values[i] { if let Some(v) = stats.max_value { @@ -296,9 +301,13 @@ impl ParquetTableDescriptor { } PhysicalType::ByteArray => { if let DataType::Utf8 = fields[i].data_type() { - let stats = - stats.as_any().downcast_ref::().ok_or_else(|| { - DataFusionError::Internal("Failed to cast stats to binary stats".to_owned()) + let stats = stats + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal( + "Failed to cast stats to binary stats".to_owned(), + ) })?; if let Some(max_value) = &mut max_values[i] { if let Some(v) = stats.max_value { @@ -395,7 +404,10 @@ impl TableDescriptorBuilder for ParquetTableDescriptor { }; Ok(FileAndSchema { - file: PartitionedFile { path: path.to_owned(), statistics }, + file: PartitionedFile { + path: path.to_owned(), + statistics, + }, schema, }) } diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 55a69ed0dc0d..c48b9e5a13de 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -19,8 +19,6 @@ use std::sync::{Arc, Mutex}; -use arrow::io::print; -use arrow::record_batch::RecordBatch; use crate::error::Result; use crate::execution::context::{ExecutionContext, ExecutionContextState}; use crate::logical_plan::{ @@ -31,6 +29,8 @@ use crate::{ dataframe::*, physical_plan::{collect, collect_partitioned}, }; +use arrow::io::print; +use arrow::record_batch::RecordBatch; use crate::physical_plan::{ execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream, diff --git a/datafusion/src/physical_plan/analyze.rs b/datafusion/src/physical_plan/analyze.rs index 7ebf0160b128..541aa34f1207 100644 --- a/datafusion/src/physical_plan/analyze.rs +++ b/datafusion/src/physical_plan/analyze.rs @@ -29,8 +29,8 @@ use arrow::{datatypes::SchemaRef, record_batch::RecordBatch}; use futures::StreamExt; use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream}; -use async_trait::async_trait; use arrow::array::MutableUtf8Array; +use async_trait::async_trait; /// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input, /// discards the results, and then prints out an annotated plan with metrics @@ -182,10 +182,7 @@ impl ExecutionPlan for AnalyzeExec { let maybe_batch = RecordBatch::try_new( captured_schema, - vec![ - type_builder.into_arc(), - plan_builder.into_arc(), - ], + vec![type_builder.into_arc(), plan_builder.into_arc()], ); // again ignore error tx.send(maybe_batch).await.ok(); diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs index 298ce960fc7a..076d3119d0c0 100644 --- a/datafusion/src/physical_plan/datetime_expressions.rs +++ b/datafusion/src/physical_plan/datetime_expressions.rs @@ -26,9 +26,7 @@ use crate::{ use arrow::{ array::*, compute::cast, - datatypes::{ - DataType, TimeUnit, - }, + datatypes::{DataType, TimeUnit}, temporal_conversions::utf8_to_timestamp_ns_scalar, types::NativeType, }; diff --git a/datafusion/src/physical_plan/expressions/in_list.rs b/datafusion/src/physical_plan/expressions/in_list.rs index f7a18be412e4..31da1238f783 100644 --- a/datafusion/src/physical_plan/expressions/in_list.rs +++ b/datafusion/src/physical_plan/expressions/in_list.rs @@ -175,8 +175,7 @@ fn in_list_primitive( array: &PrimitiveArray, values: &[T], ) -> Result { - compare_primitive_op_scalar!(array, values, |x, v| v - .contains(&x)) + compare_primitive_op_scalar!(array, values, |x, v| v.contains(&x)) } // whether each value on the left (can be null) is contained in the non-null list @@ -184,8 +183,7 @@ fn not_in_list_primitive( array: &PrimitiveArray, values: &[T], ) -> Result { - compare_primitive_op_scalar!(array, values, |x, v| !v - .contains(&x)) + compare_primitive_op_scalar!(array, values, |x, v| !v.contains(&x)) } // whether each value on the left (can be null) is contained in the non-null list diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index c061c491bec5..839a847136af 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -70,7 +70,7 @@ pub struct ParquetExec { pub schema: Arc, /// Projection for which columns to load projection: Vec, - /// Batch size + /// Batch size batch_size: usize, /// Statistics for the data set (sum of statistics for all partitions) statistics: Statistics, diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index 345f99554337..791bb6e9fb56 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -22,10 +22,8 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc}; use crate::error::{DataFusionError, Result}; use arrow::{ array::*, - bitmap::MutableBitmap, buffer::MutableBuffer, datatypes::{DataType, Field, IntervalUnit, TimeUnit}, - error::{ArrowError, Result as ArrowResult}, types::days_ms, }; use ordered_float::OrderedFloat; @@ -37,6 +35,8 @@ type StringArray = Utf8Array; type LargeStringArray = Utf8Array; type SmallBinaryArray = BinaryArray; type LargeBinaryArray = BinaryArray; +type MutableStringArray = MutableUtf8Array; +type MutableLargeStringArray = MutableUtf8Array; /// Represents a dynamically typed, nullable single value. /// This is the single-valued counter-part of arrow’s `Array`. @@ -323,7 +323,7 @@ macro_rules! build_list { // the return on the macro is necessary, to short-circuit and return ArrayRef None => { return Arc::from(new_null_array( - &DataType::List(Box::new(Field::new( + DataType::List(Box::new(Field::new( "item", DataType::$SCALAR_TY, true, @@ -343,38 +343,39 @@ macro_rules! build_timestamp_list { match $VALUES { // the return on the macro is necessary, to short-circuit and return ArrayRef None => { - return Arc::new(new_null_array( - &DataType::List(Box::new(Field::new( + let null_array: ArrayRef = new_null_array( + DataType::List(Box::new(Field::new( "item", DataType::Timestamp($TIME_UNIT, $TIME_ZONE), true, ))), $SIZE, - )); + ).into(); + null_array } Some(values) => { let values = values.as_ref(); match $TIME_UNIT { TimeUnit::Second => build_values_list!( - TimestampSecondBuilder, + Int64Vec, TimestampSecond, values, $SIZE ), TimeUnit::Microsecond => build_values_list!( - TimestampMillisecondBuilder, + Int64Vec, TimestampMillisecond, values, $SIZE ), TimeUnit::Millisecond => build_values_list!( - TimestampMicrosecondBuilder, + Int64Vec, TimestampMicrosecond, values, $SIZE ), TimeUnit::Nanosecond => build_values_list!( - TimestampNanosecondBuilder, + Int64Vec, TimestampNanosecond, values, $SIZE @@ -386,25 +387,24 @@ macro_rules! build_timestamp_list { } macro_rules! build_values_list { - ($VALUE_BUILDER_TY:ident, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{ - let mut builder = ListBuilder::new($VALUE_BUILDER_TY::new($VALUES.len())); + ($MUTABLE_TY:ty, $SCALAR_TY:ident, $VALUES:expr, $SIZE:expr) => {{ + let mut array = MutableListArray::::new(); for _ in 0..$SIZE { + let mut vec = vec![]; for scalar_value in $VALUES { match scalar_value { - ScalarValue::$SCALAR_TY(Some(v)) => { - builder.values().append_value(v.clone()).unwrap() - } - ScalarValue::$SCALAR_TY(None) => { - builder.values().append_null().unwrap(); + ScalarValue::$SCALAR_TY(v) => { + vec.push(v.clone()); } _ => panic!("Incompatible ScalarValue for list"), }; } - builder.append(true).unwrap(); + array.try_push(Some(vec)).unwrap(); } - builder.finish() + let array: ListArray = array.into(); + Arc::new(array) }}; } @@ -617,42 +617,17 @@ impl ScalarValue { }}; } - macro_rules! build_array_list_primitive { - ($ARRAY_TY:ident, $SCALAR_TY:ident, $NATIVE_TYPE:ident) => {{ - Arc::new(ListArray::from_iter_primitive::<$ARRAY_TY, _, _>( - scalars.into_iter().map(|x| match x { - ScalarValue::List(xs, _) => xs.map(|x| { - x.iter() - .map(|x| match x { - ScalarValue::$SCALAR_TY(i) => *i, - sv => panic!("Inconsistent types in ScalarValue::iter_to_array. \ - Expected {:?}, got {:?}", data_type, sv), - }) - .collect::>>() - }), - sv => panic!("Inconsistent types in ScalarValue::iter_to_array. \ - Expected {:?}, got {:?}", data_type, sv), - }), - )) - }}; - } - - macro_rules! build_array_list_string { - ($BUILDER:ident, $SCALAR_TY:ident) => {{ - let mut builder = ListBuilder::new($BUILDER::new(0)); - + macro_rules! build_array_list { + ($MUTABLE_TY:ty, $SCALAR_TY:ident) => {{ + let mut array = MutableListArray::::new(); for scalar in scalars.into_iter() { match scalar { ScalarValue::List(Some(xs), _) => { let xs = *xs; + let mut vec = vec![]; for s in xs { match s { - ScalarValue::$SCALAR_TY(Some(val)) => { - builder.values().append_value(val)?; - } - ScalarValue::$SCALAR_TY(None) => { - builder.values().append_null()?; - } + ScalarValue::$SCALAR_TY(o) => { vec.push(o) } sv => return Err(DataFusionError::Internal(format!( "Inconsistent types in ScalarValue::iter_to_array. \ Expected Utf8, got {:?}", @@ -660,10 +635,10 @@ impl ScalarValue { ))), } } - builder.append(true)?; + array.try_push(Some(vec))?; } ScalarValue::List(None, _) => { - builder.append(false)?; + array.push_null(); } sv => { return Err(DataFusionError::Internal(format!( @@ -675,8 +650,8 @@ impl ScalarValue { } } - Arc::new(builder.finish()) - + let array: ListArray = array.into(); + Arc::new(array) }} } @@ -736,40 +711,40 @@ impl ScalarValue { build_array_primitive!(i32, IntervalYearMonth, data_type) } DataType::List(fields) if fields.data_type() == &DataType::Int8 => { - build_array_list_primitive!(Int8Type, Int8, i8) + build_array_list!(Int8Vec, Int8) } DataType::List(fields) if fields.data_type() == &DataType::Int16 => { - build_array_list_primitive!(Int16Type, Int16, i16) + build_array_list!(Int16Vec, Int16) } DataType::List(fields) if fields.data_type() == &DataType::Int32 => { - build_array_list_primitive!(Int32Type, Int32, i32) + build_array_list!(Int32Vec, Int32) } DataType::List(fields) if fields.data_type() == &DataType::Int64 => { - build_array_list_primitive!(Int64Type, Int64, i64) + build_array_list!(Int64Vec, Int64) } DataType::List(fields) if fields.data_type() == &DataType::UInt8 => { - build_array_list_primitive!(UInt8Type, UInt8, u8) + build_array_list!(UInt8Vec, UInt8) } DataType::List(fields) if fields.data_type() == &DataType::UInt16 => { - build_array_list_primitive!(UInt16Type, UInt16, u16) + build_array_list!(UInt16Vec, UInt16) } DataType::List(fields) if fields.data_type() == &DataType::UInt32 => { - build_array_list_primitive!(UInt32Type, UInt32, u32) + build_array_list!(UInt32Vec, UInt32) } DataType::List(fields) if fields.data_type() == &DataType::UInt64 => { - build_array_list_primitive!(UInt64Type, UInt64, u64) + build_array_list!(UInt64Vec, UInt64) } DataType::List(fields) if fields.data_type() == &DataType::Float32 => { - build_array_list_primitive!(Float32Type, Float32, f32) + build_array_list!(Float32Vec, Float32) } DataType::List(fields) if fields.data_type() == &DataType::Float64 => { - build_array_list_primitive!(Float64Type, Float64, f64) + build_array_list!(Float64Vec, Float64) } DataType::List(fields) if fields.data_type() == &DataType::Utf8 => { - build_array_list_string!(StringBuilder, Utf8) + build_array_list!(MutableStringArray, Utf8) } DataType::List(fields) if fields.data_type() == &DataType::LargeUtf8 => { - build_array_list_string!(LargeStringBuilder, LargeUtf8) + build_array_list!(MutableLargeStringArray, LargeUtf8) } _ => { return Err(DataFusionError::Internal(format!( @@ -864,39 +839,25 @@ impl ScalarValue { ), None => new_null_array(self.get_datatype(), size).into(), }, - ScalarValue::List(values, data_type) => Arc::from(match data_type.as_ref() { - DataType::Boolean => { - build_list!(BooleanBuilder, Boolean, values, size) - } - DataType::Int8 => build_list!(Int8Builder, Int8, values, size), - DataType::Int16 => build_list!(Int16Builder, Int16, values, size), - DataType::Int32 => build_list!(Int32Builder, Int32, values, size), - DataType::Int64 => build_list!(Int64Builder, Int64, values, size), - DataType::UInt8 => build_list!(UInt8Builder, UInt8, values, size), - DataType::UInt16 => { - build_list!(UInt16Builder, UInt16, values, size) - } - DataType::UInt32 => { - build_list!(UInt32Builder, UInt32, values, size) - } - DataType::UInt64 => { - build_list!(UInt64Builder, UInt64, values, size) - } - DataType::Utf8 => build_list!(StringBuilder, Utf8, values, size), - DataType::Float32 => { - build_list!(Float32Builder, Float32, values, size) - } - DataType::Float64 => { - build_list!(Float64Builder, Float64, values, size) - } + ScalarValue::List(values, data_type) => match data_type.as_ref() { + DataType::Boolean => build_list!(MutableBooleanArray, Boolean, values, size), + DataType::Int8 => build_list!(Int8Vec, Int8, values, size), + DataType::Int16 => build_list!(Int16Vec, Int16, values, size), + DataType::Int32 => build_list!(Int32Vec, Int32, values, size), + DataType::Int64 => build_list!(Int64Vec, Int64, values, size), + DataType::UInt8 => build_list!(UInt8Vec, UInt8, values, size), + DataType::UInt16 => build_list!(UInt16Vec, UInt16, values, size), + DataType::UInt32 => build_list!(UInt32Vec, UInt32, values, size), + DataType::UInt64 => build_list!(UInt64Vec, UInt64, values, size), + DataType::Float32 => build_list!(Float32Vec, Float32, values, size), + DataType::Float64 => build_list!(Float64Vec, Float64, values, size), DataType::Timestamp(unit, tz) => { build_timestamp_list!(unit.clone(), tz.clone(), values, size) } - &DataType::LargeUtf8 => { - build_list!(LargeStringBuilder, LargeUtf8, values, size) - } + DataType::Utf8 => build_list!(MutableStringArray, Utf8, values, size), + DataType::LargeUtf8 => build_list!(MutableLargeStringArray, LargeUtf8, values, size), dt => panic!("Unexpected DataType for list {:?}", dt), - }), + }, ScalarValue::Date32(e) => match e { Some(value) => dyn_to_array!(self, value, size, i32), None => new_null_array(self.get_datatype(), size).into(), @@ -1655,7 +1616,7 @@ mod tests { let i16_vals = make_typed_vec!(i8_vals, i16); let i32_vals = make_typed_vec!(i8_vals, i32); let i64_vals = make_typed_vec!(i8_vals, i64); - let days_ms_vals = &[Some(days_ms([1, 2])), None, Some(days_ms([10, 0]))]; + let days_ms_vals = &[Some(days_ms::new(1, 2)), None, Some(days_ms::new(10, 0))]; let u8_vals = vec![Some(0), None, Some(1)]; let u16_vals = make_typed_vec!(u8_vals, u16); @@ -1706,8 +1667,9 @@ mod tests { macro_rules! make_temporal_test_case { ($INPUT:expr, $ARRAY_TY:ident, $ARROW_TU:ident, $SCALAR_TY:ident) => {{ TestCase { - array: Arc::new($ARRAY_TY::from($INPUT) - .to(DataType::Interval(IntervalUnit::$ARROW_TU)), + array: Arc::new( + $ARRAY_TY::from($INPUT) + .to(DataType::Interval(IntervalUnit::$ARROW_TU)), ), scalars: $INPUT.iter().map(|v| ScalarValue::$SCALAR_TY(*v)).collect(), } @@ -1744,9 +1706,12 @@ mod tests { macro_rules! make_str_dict_test_case { ($INPUT:expr, $INDEX_TY:ty, $SCALAR_TY:ident) => {{ TestCase { - array: Arc::new( - DictionaryArray::<$INDEX_TY>::from($INPUT), - ), + array: { + let mut array = MutableDictionaryArray::<$INDEX_TY, MutableUtf8Array>::new(); + array.try_extend(*($INPUT)).unwrap(); + let array: DictionaryArray<$INDEX_TY> = array.into(); + Arc::new(array) + }, scalars: $INPUT .iter() .map(|v| ScalarValue::$SCALAR_TY(v.map(|v| v.to_string())))