From 428efa86ea04798df9577219e60b88ebc7dfd4d3 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Wed, 15 Sep 2021 22:30:39 -0700 Subject: [PATCH] fix build error in hash_utils, hash_aggregate, hash_join, parquet and aggregate --- .../src/physical_plan/hash_aggregate.rs | 9 ++- datafusion/src/physical_plan/hash_join.rs | 4 +- datafusion/src/physical_plan/hash_utils.rs | 17 +++-- datafusion/src/physical_plan/parquet.rs | 10 +-- .../src/physical_plan/windows/aggregate.rs | 5 +- datafusion/src/scalar.rs | 76 +++++++++---------- 6 files changed, 60 insertions(+), 61 deletions(-) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 456fd9398684..db65b1cf6cbf 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -409,17 +409,17 @@ fn group_aggregate_batch( } // Collect all indices + offsets based on keys in this vec - let mut batch_indices = MutableBuffer::::new(); + let mut batch_indices = MutableBuffer::::new(); let mut offsets = vec![0]; let mut offset_so_far = 0; for group_idx in groups_with_rows.iter() { let indices = &accumulators.group_states[*group_idx].indices; - batch_indices.append_slice(indices)?; + batch_indices.extend_from_slice(indices); offset_so_far += indices.len(); offsets.push(offset_so_far); } let batch_indices = - Int32Array::from_data(DataType::Int32, batch_indices.into(), None); + UInt32Array::from_data(DataType::UInt32, batch_indices.into(), None); // `Take` all values based on indices into Arrays let values: Vec>> = aggr_input_values @@ -946,7 +946,8 @@ fn create_batch_from_map( .iter() .zip(output_schema.fields().iter()) .map(|(col, desired_field)| { - arrow::compute::cast::cast(col, desired_field.data_type()) + arrow::compute::cast::cast(col.as_ref(), desired_field.data_type()) + .map(|v| Arc::from(v)) }) .collect::>>()?; diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 52f666e56e73..8221e676f074 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -757,8 +757,8 @@ fn build_join_indexes( // If no rows matched left, still must keep the right // with all nulls for left if no_match { - left_indices.push(None)?; - right_indices.push(Some(row as u32))?; + left_indices.push(None); + right_indices.push(Some(row as u32)); } } None => { diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index 924882f7e6af..42620bfecdcf 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -594,7 +594,8 @@ pub fn create_hashes<'a>( mod tests { use std::sync::Arc; - use arrow::array::DictionaryArray; + use arrow::array::TryExtend; + use arrow::array::{DictionaryArray, MutableDictionaryArray, MutableUtf8Array}; use super::*; @@ -659,8 +660,8 @@ mod tests { #[test] fn create_hashes_for_float_arrays() -> Result<()> { - let f32_arr = Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7])); - let f64_arr = Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7])); + let f32_arr = Arc::new(Float32Array::from_slice(&[0.12, 0.5, 1f32, 444.7])); + let f64_arr = Arc::new(Float64Array::from_slice(&[0.12, 0.5, 1f64, 444.7])); let random_state = RandomState::with_seeds(0, 0, 0, 0); let hashes_buff = &mut vec![0; f32_arr.len()]; @@ -680,8 +681,9 @@ mod tests { let strings = vec![Some("foo"), None, Some("bar"), Some("foo"), None]; let string_array = Arc::new(strings.iter().cloned().collect::>()); - let dict_array = - Arc::new(strings.iter().cloned().collect::>()); + let dict_array = MutableDictionaryArray::>::new(); + dict_array.try_extend(strings.iter().cloned()).unwrap(); + let dict_array = dict_array.into_arc(); let random_state = RandomState::with_seeds(0, 0, 0, 0); @@ -721,8 +723,9 @@ mod tests { let strings2 = vec![Some("blarg"), Some("blah"), None]; let string_array = Arc::new(strings1.iter().cloned().collect::>()); - let dict_array = - Arc::new(strings2.iter().cloned().collect::>()); + let dict_array = MutableDictionaryArray::>::new(); + dict_array.try_extend(strings2.iter().cloned()).unwrap(); + let dict_array = dict_array.into_arc(); let random_state = RandomState::with_seeds(0, 0, 0, 0); diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index 839a847136af..9c91c38e6f0a 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -514,7 +514,7 @@ macro_rules! get_min_max_values { .collect(); // ignore errors converting to arrays (e.g. different types) - ScalarValue::iter_to_array(scalar_values).ok() + ScalarValue::iter_to_array(scalar_values).ok().map(|v| Arc::from(v)) }} } @@ -536,7 +536,7 @@ fn build_row_group_predicate( predicate_builder: &PruningPredicate, metrics: ParquetFileMetrics, row_group_metadata: &[RowGroupMetaData], -) -> Box bool> { +) -> Box bool> { let parquet_schema = predicate_builder.schema().as_ref(); let pruning_stats = RowGroupPruningStatistics { @@ -550,14 +550,14 @@ fn build_row_group_predicate( // NB: false means don't scan row group let num_pruned = values.iter().filter(|&v| !*v).count(); metrics.row_groups_pruned.add(num_pruned); - Box::new(move |_, i| values[i]) + Box::new(move |i, _| values[i]) } // stats filter array could not be built // return a closure which will not filter out any row groups Err(e) => { debug!("Error evaluating row group predicate values {}", e); metrics.predicate_evaluation_errors.add(1); - Box::new(|_r, _i| true) + Box::new(|_i, _r| true) } } } @@ -591,7 +591,7 @@ fn read_partition( reader.set_groups_filter(Arc::new(build_row_group_predicate( predicate_builder, file_metrics, - reader.metadata().row_groups, + &reader.metadata().row_groups, ))); } diff --git a/datafusion/src/physical_plan/windows/aggregate.rs b/datafusion/src/physical_plan/windows/aggregate.rs index 664d1dc556c1..c709c2061052 100644 --- a/datafusion/src/physical_plan/windows/aggregate.rs +++ b/datafusion/src/physical_plan/windows/aggregate.rs @@ -75,9 +75,8 @@ impl AggregateWindowExpr { let num_rows = batch.num_rows(); let partition_points = self.evaluate_partition_points(num_rows, &self.partition_columns(batch)?)?; - let sort_partition_points = self - .evaluate_partition_points(num_rows, &self.sort_columns(batch)?)? - .wtf; + let sort_partition_points = + self.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?; let values = self.evaluate_args(batch)?; let results = partition_points .iter() diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index 791bb6e9fb56..83a6c0df4fee 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -350,36 +350,25 @@ macro_rules! build_timestamp_list { true, ))), $SIZE, - ).into(); + ) + .into(); null_array } Some(values) => { let values = values.as_ref(); match $TIME_UNIT { - TimeUnit::Second => build_values_list!( - Int64Vec, - TimestampSecond, - values, - $SIZE - ), - TimeUnit::Microsecond => build_values_list!( - Int64Vec, - TimestampMillisecond, - values, - $SIZE - ), - TimeUnit::Millisecond => build_values_list!( - Int64Vec, - TimestampMicrosecond, - values, - $SIZE - ), - TimeUnit::Nanosecond => build_values_list!( - Int64Vec, - TimestampNanosecond, - values, - $SIZE - ), + TimeUnit::Second => { + build_values_list!(Int64Vec, TimestampSecond, values, $SIZE) + } + TimeUnit::Microsecond => { + build_values_list!(Int64Vec, TimestampMillisecond, values, $SIZE) + } + TimeUnit::Millisecond => { + build_values_list!(Int64Vec, TimestampMicrosecond, values, $SIZE) + } + TimeUnit::Nanosecond => { + build_values_list!(Int64Vec, TimestampNanosecond, values, $SIZE) + } } } } @@ -534,7 +523,7 @@ impl ScalarValue { /// Example /// ``` /// use datafusion::scalar::ScalarValue; - /// use arrow::array::{ArrayRef, BooleanArray}; + /// use arrow::array::BooleanArray; /// /// let scalars = vec![ /// ScalarValue::Boolean(Some(true)), @@ -546,7 +535,7 @@ impl ScalarValue { /// let array = ScalarValue::iter_to_array(scalars.into_iter()) /// .unwrap(); /// - /// let expected: ArrayRef = std::sync::Arc::new( + /// let expected: Box = Box::new( /// BooleanArray::from(vec![ /// Some(true), /// None, @@ -558,7 +547,7 @@ impl ScalarValue { /// ``` pub fn iter_to_array( scalars: impl IntoIterator, - ) -> Result { + ) -> Result> { let mut scalars = scalars.into_iter().peekable(); // figure out the type based on the first element @@ -576,7 +565,7 @@ impl ScalarValue { macro_rules! build_array_primitive { ($TY:ty, $SCALAR_TY:ident, $DT:ident) => {{ { - Arc::new(scalars + Box::new(scalars .map(|sv| { if let ScalarValue::$SCALAR_TY(v) = sv { Ok(v) @@ -589,7 +578,7 @@ impl ScalarValue { } }) .collect::>>()?.to($DT) - ) as ArrayRef + ) as Box } }}; } @@ -612,7 +601,7 @@ impl ScalarValue { } }) .collect::>()?; - Arc::new(array) + Box::new(array) } }}; } @@ -651,13 +640,13 @@ impl ScalarValue { } let array: ListArray = array.into(); - Arc::new(array) + Box::new(array) }} } use DataType::*; - let array: ArrayRef = match &data_type { - DataType::Boolean => Arc::new( + let array: Box = match &data_type { + DataType::Boolean => Box::new( scalars .map(|sv| { if let ScalarValue::Boolean(v) = sv { @@ -840,7 +829,9 @@ impl ScalarValue { None => new_null_array(self.get_datatype(), size).into(), }, ScalarValue::List(values, data_type) => match data_type.as_ref() { - DataType::Boolean => build_list!(MutableBooleanArray, Boolean, values, size), + DataType::Boolean => { + build_list!(MutableBooleanArray, Boolean, values, size) + } DataType::Int8 => build_list!(Int8Vec, Int8, values, size), DataType::Int16 => build_list!(Int16Vec, Int16, values, size), DataType::Int32 => build_list!(Int32Vec, Int32, values, size), @@ -855,7 +846,9 @@ impl ScalarValue { build_timestamp_list!(unit.clone(), tz.clone(), values, size) } DataType::Utf8 => build_list!(MutableStringArray, Utf8, values, size), - DataType::LargeUtf8 => build_list!(MutableLargeStringArray, LargeUtf8, values, size), + DataType::LargeUtf8 => { + build_list!(MutableLargeStringArray, LargeUtf8, values, size) + } dt => panic!("Unexpected DataType for list {:?}", dt), }, ScalarValue::Date32(e) => match e { @@ -1459,7 +1452,7 @@ mod tests { let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap(); - let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT)); + let expected: Box = Box::new($ARRAYTYPE::from($INPUT)); assert_eq!(&array, &expected); }}; @@ -1476,7 +1469,7 @@ mod tests { let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap(); - let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT)); + let expected: Box = Box::new($ARRAYTYPE::from($INPUT)); assert_eq!(&array, &expected); }}; @@ -1496,7 +1489,7 @@ mod tests { let expected: $ARRAYTYPE = $INPUT.iter().map(|v| v.map(|v| v.to_vec())).collect(); - let expected: ArrayRef = Arc::new(expected); + let expected: Box = Box::new(expected); assert_eq!(&array, &expected); }}; @@ -1707,7 +1700,10 @@ mod tests { ($INPUT:expr, $INDEX_TY:ty, $SCALAR_TY:ident) => {{ TestCase { array: { - let mut array = MutableDictionaryArray::<$INDEX_TY, MutableUtf8Array>::new(); + let mut array = MutableDictionaryArray::< + $INDEX_TY, + MutableUtf8Array, + >::new(); array.try_extend(*($INPUT)).unwrap(); let array: DictionaryArray<$INDEX_TY> = array.into(); Arc::new(array)