From 478f606d3b54b64959b748d12a260f2e4cd96d0a Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Fri, 17 Sep 2021 12:05:47 +0800 Subject: [PATCH] Make DataFusion Core compile (#3) * wip * more * Make scalar.rs compile * Fix various compilation error due to API difference * Make datafusion core compile * fmt * wip --- .../src/execution_plans/shuffle_writer.rs | 43 ++- .../src/serde/physical_plan/from_proto.rs | 2 - ballista/rust/core/src/utils.rs | 1 + datafusion/benches/physical_plan.rs | 2 +- datafusion/src/arrow_temporal_util.rs | 302 ++++++++++++++++++ datafusion/src/datasource/parquet.rs | 8 +- datafusion/src/execution/context.rs | 20 +- datafusion/src/lib.rs | 2 + datafusion/src/optimizer/constant_folding.rs | 4 +- .../src/physical_optimizer/repartition.rs | 2 + .../src/physical_plan/datetime_expressions.rs | 11 +- .../src/physical_plan/distinct_expressions.rs | 84 ++--- datafusion/src/physical_plan/explain.rs | 6 +- .../src/physical_plan/expressions/binary.rs | 2 +- .../src/physical_plan/expressions/in_list.rs | 16 +- .../src/physical_plan/expressions/lead_lag.rs | 20 +- .../physical_plan/expressions/nth_value.rs | 23 +- .../src/physical_plan/expressions/rank.rs | 10 +- .../physical_plan/expressions/row_number.rs | 11 +- datafusion/src/physical_plan/hash_utils.rs | 4 +- datafusion/src/physical_plan/parquet.rs | 4 +- .../physical_plan/sort_preserving_merge.rs | 28 +- datafusion/src/scalar.rs | 28 +- 23 files changed, 467 insertions(+), 166 deletions(-) create mode 100644 datafusion/src/arrow_temporal_util.rs diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 36e445bc4ead..31143323cb34 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -34,14 +34,11 @@ use crate::utils; use crate::serde::protobuf::ShuffleWritePartition; use crate::serde::scheduler::{PartitionLocation, PartitionStats}; use async_trait::async_trait; -use datafusion::arrow::array::{ - Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder, - UInt64Builder, -}; +use datafusion::arrow::array::*; use datafusion::arrow::compute::take; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion::arrow::ipc::reader::FileReader; -use datafusion::arrow::ipc::writer::FileWriter; +use datafusion::arrow::io::ipc::read::FileReader; +use datafusion::arrow::io::ipc::write::FileWriter; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; use datafusion::physical_plan::hash_utils::create_hashes; @@ -244,7 +241,7 @@ impl ShuffleWriterExec { .collect::>>>()?; let output_batch = - RecordBatch::try_new(input_batch.schema(), columns)?; + RecordBatch::try_new(input_batch.schema().clone(), columns)?; // write non-empty batch out @@ -356,18 +353,18 @@ impl ExecutionPlan for ShuffleWriterExec { // build metadata result batch let num_writers = part_loc.len(); - let mut partition_builder = UInt32Builder::new(num_writers); - let mut path_builder = StringBuilder::new(num_writers); - let mut num_rows_builder = UInt64Builder::new(num_writers); - let mut num_batches_builder = UInt64Builder::new(num_writers); - let mut num_bytes_builder = UInt64Builder::new(num_writers); + let mut partition_builder = UInt32Vec::with_capacity(num_writers); + let mut path_builder = MutableUtf8Array::with_capacity(num_writers); + let mut num_rows_builder = UInt64Vec::with_capacity(num_writers); + let mut num_batches_builder = UInt64Vec::with_capacity(num_writers); + let mut num_bytes_builder = UInt64Vec::with_capacity(num_writers); for loc in &part_loc { - path_builder.append_value(loc.path.clone())?; - partition_builder.append_value(loc.partition_id as u32)?; - num_rows_builder.append_value(loc.num_rows)?; - num_batches_builder.append_value(loc.num_batches)?; - num_bytes_builder.append_value(loc.num_bytes)?; + path_builder.push(Some(loc.path.clone())); + partition_builder.push(Some(loc.partition_id as u32)); + num_rows_builder.push(Some(loc.num_rows)); + num_batches_builder.push(Some(loc.num_batches)); + num_bytes_builder.push(Some(loc.num_bytes)); } // build arrays @@ -428,17 +425,17 @@ fn result_schema() -> SchemaRef { ])) } -struct ShuffleWriter { +struct ShuffleWriter<'a> { path: String, - writer: FileWriter, + writer: FileWriter<'a, File>, num_batches: u64, num_rows: u64, num_bytes: u64, } -impl ShuffleWriter { +impl<'a> ShuffleWriter<'a> { fn new(path: &str, schema: &Schema) -> Result { - let file = File::create(path) + let mut file = File::create(path) .map_err(|e| { BallistaError::General(format!( "Failed to create partition file at {}: {:?}", @@ -451,7 +448,7 @@ impl ShuffleWriter { num_rows: 0, num_bytes: 0, path: path.to_owned(), - writer: FileWriter::try_new(file, schema)?, + writer: FileWriter::try_new(&mut file, schema)?, }) } @@ -480,7 +477,7 @@ impl ShuffleWriter { #[cfg(test)] mod tests { use super::*; - use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, UInt64Array}; + use datafusion::arrow::array::{Utf8Array, StructArray, UInt32Array, UInt64Array}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::expressions::Column; use datafusion::physical_plan::limit::GlobalLimitExec; diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index d371fabdf098..8b9544498264 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -61,7 +61,6 @@ use datafusion::physical_plan::{ expressions::{ col, Avg, BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, Literal, NegativeExpr, NotExpr, PhysicalSortExpr, TryCastExpr, - DEFAULT_DATAFUSION_CAST_OPTIONS, }, filter::FilterExec, functions::{self, BuiltinScalarFunction, ScalarFunctionExpr}, @@ -620,7 +619,6 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { ExprType::Cast(e) => Arc::new(CastExpr::new( convert_box_required!(e.expr)?, convert_required!(e.arrow_type)?, - DEFAULT_DATAFUSION_CAST_OPTIONS, )), ExprType::TryCast(e) => Arc::new(TryCastExpr::new( convert_box_required!(e.expr)?, diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index b7e465ccd20a..a1d3a63fb9b8 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -31,6 +31,7 @@ use crate::serde::scheduler::PartitionStats; use crate::config::BallistaConfig; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::error::Result as ArrowResult; use datafusion::arrow::{ array::*, diff --git a/datafusion/benches/physical_plan.rs b/datafusion/benches/physical_plan.rs index 9222ae131b8f..ce1893b37257 100644 --- a/datafusion/benches/physical_plan.rs +++ b/datafusion/benches/physical_plan.rs @@ -51,7 +51,7 @@ fn sort_preserving_merge_operator(batches: Vec, sort: &[&str]) { let exec = MemoryExec::try_new( &batches.into_iter().map(|rb| vec![rb]).collect::>(), - schema, + schema.clone(), None, ) .unwrap(); diff --git a/datafusion/src/arrow_temporal_util.rs b/datafusion/src/arrow_temporal_util.rs new file mode 100644 index 000000000000..d8ca4f7ec89f --- /dev/null +++ b/datafusion/src/arrow_temporal_util.rs @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::error::{ArrowError, Result}; +use chrono::{prelude::*, LocalResult}; + +/// Accepts a string in RFC3339 / ISO8601 standard format and some +/// variants and converts it to a nanosecond precision timestamp. +/// +/// Implements the `to_timestamp` function to convert a string to a +/// timestamp, following the model of spark SQL’s to_`timestamp`. +/// +/// In addition to RFC3339 / ISO8601 standard timestamps, it also +/// accepts strings that use a space ` ` to separate the date and time +/// as well as strings that have no explicit timezone offset. +/// +/// Examples of accepted inputs: +/// * `1997-01-31T09:26:56.123Z` # RCF3339 +/// * `1997-01-31T09:26:56.123-05:00` # RCF3339 +/// * `1997-01-31 09:26:56.123-05:00` # close to RCF3339 but with a space rather than T +/// * `1997-01-31T09:26:56.123` # close to RCF3339 but no timezone offset specified +/// * `1997-01-31 09:26:56.123` # close to RCF3339 but uses a space and no timezone offset +/// * `1997-01-31 09:26:56` # close to RCF3339, no fractional seconds +// +/// Internally, this function uses the `chrono` library for the +/// datetime parsing +/// +/// We hope to extend this function in the future with a second +/// parameter to specifying the format string. +/// +/// ## Timestamp Precision +/// +/// Function uses the maximum precision timestamps supported by +/// Arrow (nanoseconds stored as a 64-bit integer) timestamps. This +/// means the range of dates that timestamps can represent is ~1677 AD +/// to 2262 AM +/// +/// +/// ## Timezone / Offset Handling +/// +/// Numerical values of timestamps are stored compared to offset UTC. +/// +/// This function intertprets strings without an explicit time zone as +/// timestamps with offsets of the local time on the machine +/// +/// For example, `1997-01-31 09:26:56.123Z` is interpreted as UTC, as +/// it has an explicit timezone specifier (“Z” for Zulu/UTC) +/// +/// `1997-01-31T09:26:56.123` is interpreted as a local timestamp in +/// the timezone of the machine. For example, if +/// the system timezone is set to Americas/New_York (UTC-5) the +/// timestamp will be interpreted as though it were +/// `1997-01-31T09:26:56.123-05:00` +/// +/// TODO: remove this hack and redesign DataFusion's time related API, with regard to timezone. +#[inline] +pub(crate) fn string_to_timestamp_nanos(s: &str) -> Result { + // Fast path: RFC3339 timestamp (with a T) + // Example: 2020-09-08T13:42:29.190855Z + if let Ok(ts) = DateTime::parse_from_rfc3339(s) { + return Ok(ts.timestamp_nanos()); + } + + // Implement quasi-RFC3339 support by trying to parse the + // timestamp with various other format specifiers to to support + // separating the date and time with a space ' ' rather than 'T' to be + // (more) compatible with Apache Spark SQL + + // timezone offset, using ' ' as a separator + // Example: 2020-09-08 13:42:29.190855-05:00 + if let Ok(ts) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z") { + return Ok(ts.timestamp_nanos()); + } + + // with an explicit Z, using ' ' as a separator + // Example: 2020-09-08 13:42:29Z + if let Ok(ts) = Utc.datetime_from_str(s, "%Y-%m-%d %H:%M:%S%.fZ") { + return Ok(ts.timestamp_nanos()); + } + + // Support timestamps without an explicit timezone offset, again + // to be compatible with what Apache Spark SQL does. + + // without a timezone specifier as a local time, using T as a separator + // Example: 2020-09-08T13:42:29.190855 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S.%f") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using T as a + // separator, no fractional seconds + // Example: 2020-09-08T13:42:29 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using ' ' as a separator + // Example: 2020-09-08 13:42:29.190855 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S.%f") { + return naive_datetime_to_timestamp(s, ts); + } + + // without a timezone specifier as a local time, using ' ' as a + // separator, no fractional seconds + // Example: 2020-09-08 13:42:29 + if let Ok(ts) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") { + return naive_datetime_to_timestamp(s, ts); + } + + // Note we don't pass along the error message from the underlying + // chrono parsing because we tried several different format + // strings and we don't know which the user was trying to + // match. Ths any of the specific error messages is likely to be + // be more confusing than helpful + Err(ArrowError::Other(format!( + "Error parsing '{}' as timestamp", + s + ))) +} + +/// Converts the naive datetime (which has no specific timezone) to a +/// nanosecond epoch timestamp relative to UTC. +fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result { + let l = Local {}; + + match l.from_local_datetime(&datetime) { + LocalResult::None => Err(ArrowError::Other(format!( + "Error parsing '{}' as timestamp: local time representation is invalid", + s + ))), + LocalResult::Single(local_datetime) => { + Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) + } + // Ambiguous times can happen if the timestamp is exactly when + // a daylight savings time transition occurs, for example, and + // so the datetime could validly be said to be in two + // potential offsets. However, since we are about to convert + // to UTC anyways, we can pick one arbitrarily + LocalResult::Ambiguous(local_datetime, _) => { + Ok(local_datetime.with_timezone(&Utc).timestamp_nanos()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn string_to_timestamp_timezone() -> Result<()> { + // Explicit timezone + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08T13:42:29.190855+00:00")? + ); + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08T13:42:29.190855Z")? + ); + assert_eq!( + 1599572549000000000, + parse_timestamp("2020-09-08T13:42:29Z")? + ); // no fractional part + assert_eq!( + 1599590549190855000, + parse_timestamp("2020-09-08T13:42:29.190855-05:00")? + ); + Ok(()) + } + + #[test] + fn string_to_timestamp_timezone_space() -> Result<()> { + // Ensure space rather than T between time and date is accepted + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08 13:42:29.190855+00:00")? + ); + assert_eq!( + 1599572549190855000, + parse_timestamp("2020-09-08 13:42:29.190855Z")? + ); + assert_eq!( + 1599572549000000000, + parse_timestamp("2020-09-08 13:42:29Z")? + ); // no fractional part + assert_eq!( + 1599590549190855000, + parse_timestamp("2020-09-08 13:42:29.190855-05:00")? + ); + Ok(()) + } + + /// Interprets a naive_datetime (with no explicit timzone offset) + /// using the local timezone and returns the timestamp in UTC (0 + /// offset) + fn naive_datetime_to_timestamp(naive_datetime: &NaiveDateTime) -> i64 { + // Note: Use chrono APIs that are different than + // naive_datetime_to_timestamp to compute the utc offset to + // try and double check the logic + let utc_offset_secs = match Local.offset_from_local_datetime(&naive_datetime) { + LocalResult::Single(local_offset) => { + local_offset.fix().local_minus_utc() as i64 + } + _ => panic!("Unexpected failure converting to local datetime"), + }; + let utc_offset_nanos = utc_offset_secs * 1_000_000_000; + naive_datetime.timestamp_nanos() - utc_offset_nanos + } + + #[test] + #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function: mktime + fn string_to_timestamp_no_timezone() -> Result<()> { + // This test is designed to succeed in regardless of the local + // timezone the test machine is running. Thus it is still + // somewhat suceptable to bugs in the use of chrono + let naive_datetime = NaiveDateTime::new( + NaiveDate::from_ymd(2020, 9, 8), + NaiveTime::from_hms_nano(13, 42, 29, 190855), + ); + + // Ensure both T and ' ' variants work + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime), + parse_timestamp("2020-09-08T13:42:29.190855")? + ); + + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime), + parse_timestamp("2020-09-08 13:42:29.190855")? + ); + + // Also ensure that parsing timestamps with no fractional + // second part works as well + let naive_datetime_whole_secs = NaiveDateTime::new( + NaiveDate::from_ymd(2020, 9, 8), + NaiveTime::from_hms(13, 42, 29), + ); + + // Ensure both T and ' ' variants work + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime_whole_secs), + parse_timestamp("2020-09-08T13:42:29")? + ); + + assert_eq!( + naive_datetime_to_timestamp(&naive_datetime_whole_secs), + parse_timestamp("2020-09-08 13:42:29")? + ); + + Ok(()) + } + + #[test] + fn string_to_timestamp_invalid() { + // Test parsing invalid formats + + // It would be nice to make these messages better + expect_timestamp_parse_error("", "Error parsing '' as timestamp"); + expect_timestamp_parse_error("SS", "Error parsing 'SS' as timestamp"); + expect_timestamp_parse_error( + "Wed, 18 Feb 2015 23:16:09 GMT", + "Error parsing 'Wed, 18 Feb 2015 23:16:09 GMT' as timestamp", + ); + } + + // Parse a timestamp to timestamp int with a useful human readable error message + fn parse_timestamp(s: &str) -> Result { + let result = string_to_timestamp_nanos(s); + if let Err(e) = &result { + eprintln!("Error parsing timestamp '{}': {:?}", s, e); + } + result + } + + fn expect_timestamp_parse_error(s: &str, expected_err: &str) { + match string_to_timestamp_nanos(s) { + Ok(v) => panic!( + "Expected error '{}' while parsing '{}', but parsed {} instead", + expected_err, s, v + ), + Err(e) => { + assert!(e.to_string().contains(expected_err), + "Can not find expected error '{}' while parsing '{}'. Actual error '{}'", + expected_err, s, e); + } + } + } +} diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index 7c4e0e1a07c1..5134ccce3ffd 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -310,7 +310,7 @@ impl ParquetTableDescriptor { ) })?; if let Some(max_value) = &mut max_values[i] { - if let Some(v) = stats.max_value { + if let Some(v) = &stats.max_value { match max_value.update(&[ScalarValue::Utf8( std::str::from_utf8(&*v).map(|s| s.to_string()).ok(), )]) { @@ -322,7 +322,7 @@ impl ParquetTableDescriptor { } } if let Some(min_value) = &mut min_values[i] { - if let Some(v) = stats.min_value { + if let Some(v) = &stats.min_value { match min_value.update(&[ScalarValue::Utf8( std::str::from_utf8(&*v).map(|s| s.to_string()).ok(), )]) { @@ -366,10 +366,10 @@ impl TableDescriptorBuilder for ParquetTableDescriptor { let columns_null_counts = row_group_meta .columns() .iter() - .flat_map(|c| c.statistics().map(|stats| stats.null_count())); + .flat_map(|c| c.statistics().map(|stats| stats.unwrap().null_count())); for (i, cnt) in columns_null_counts.enumerate() { - null_counts[i] += cnt as usize + null_counts[i] += cnt.unwrap_or(0) as usize } for (i, column) in row_group_meta.columns().iter().enumerate() { diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index ec14a15aa35f..ac797b448bbd 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -2504,43 +2504,43 @@ mod tests { let type_values = vec![ ( DataType::Int8, - Arc::new(Int8Array::from(vec![1])) as ArrayRef, + Arc::new(Int8Array::from_values(vec![1])) as ArrayRef, ), ( DataType::Int16, - Arc::new(Int16Array::from(vec![1])) as ArrayRef, + Arc::new(Int16Array::from_values(vec![1])) as ArrayRef, ), ( DataType::Int32, - Arc::new(Int32Array::from(vec![1])) as ArrayRef, + Arc::new(Int32Array::from_values(vec![1])) as ArrayRef, ), ( DataType::Int64, - Arc::new(Int64Array::from(vec![1])) as ArrayRef, + Arc::new(Int64Array::from_values(vec![1])) as ArrayRef, ), ( DataType::UInt8, - Arc::new(UInt8Array::from(vec![1])) as ArrayRef, + Arc::new(UInt8Array::from_values(vec![1])) as ArrayRef, ), ( DataType::UInt16, - Arc::new(UInt16Array::from(vec![1])) as ArrayRef, + Arc::new(UInt16Array::from_values(vec![1])) as ArrayRef, ), ( DataType::UInt32, - Arc::new(UInt32Array::from(vec![1])) as ArrayRef, + Arc::new(UInt32Array::from_values(vec![1])) as ArrayRef, ), ( DataType::UInt64, - Arc::new(UInt64Array::from(vec![1])) as ArrayRef, + Arc::new(UInt64Array::from_values(vec![1])) as ArrayRef, ), ( DataType::Float32, - Arc::new(Float32Array::from(vec![1.0_f32])) as ArrayRef, + Arc::new(Float32Array::from_values(vec![1.0_f32])) as ArrayRef, ), ( DataType::Float64, - Arc::new(Float64Array::from(vec![1.0_f64])) as ArrayRef, + Arc::new(Float64Array::from_values(vec![1.0_f64])) as ArrayRef, ), ]; diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index 5841a2a144c8..529809729cf6 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -229,6 +229,8 @@ pub mod variable; // re-export dependencies from arrow-rs to minimise version maintenance for crate users pub use arrow; +mod arrow_temporal_util; + #[cfg(test)] pub mod test; pub mod test_util; diff --git a/datafusion/src/optimizer/constant_folding.rs b/datafusion/src/optimizer/constant_folding.rs index 97b6603e08fd..94404148c00d 100644 --- a/datafusion/src/optimizer/constant_folding.rs +++ b/datafusion/src/optimizer/constant_folding.rs @@ -22,8 +22,8 @@ use std::sync::Arc; use arrow::compute::cast; use arrow::datatypes::DataType; -use arrow::temporal_conversions::utf8_to_timestamp_ns_scalar; +use crate::arrow_temporal_util::string_to_timestamp_nanos; use crate::error::Result; use crate::execution::context::ExecutionProps; use crate::logical_plan::{DFSchemaRef, Expr, ExprRewriter, LogicalPlan, Operator}; @@ -227,7 +227,7 @@ impl<'a> ExprRewriter for ConstantRewriter<'a> { if !args.is_empty() { match &args[0] { Expr::Literal(ScalarValue::Utf8(Some(val))) => { - match utf8_to_timestamp_ns_scalar(val) { + match string_to_timestamp_nanos(val) { Ok(timestamp) => Expr::Literal( ScalarValue::TimestampNanosecond(Some(timestamp)), ), diff --git a/datafusion/src/physical_optimizer/repartition.rs b/datafusion/src/physical_optimizer/repartition.rs index 31d10d627261..fd8650411d71 100644 --- a/datafusion/src/physical_optimizer/repartition.rs +++ b/datafusion/src/physical_optimizer/repartition.rs @@ -133,6 +133,7 @@ mod tests { metrics, None, 2048, + None, )), )?; @@ -173,6 +174,7 @@ mod tests { metrics, None, 2048, + None, )), )?), )?; diff --git a/datafusion/src/physical_plan/datetime_expressions.rs b/datafusion/src/physical_plan/datetime_expressions.rs index 076d3119d0c0..638b91f5f8ae 100644 --- a/datafusion/src/physical_plan/datetime_expressions.rs +++ b/datafusion/src/physical_plan/datetime_expressions.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use super::ColumnarValue; +use crate::arrow_temporal_util::string_to_timestamp_nanos; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, @@ -27,13 +28,13 @@ use arrow::{ array::*, compute::cast, datatypes::{DataType, TimeUnit}, - temporal_conversions::utf8_to_timestamp_ns_scalar, types::NativeType, }; use arrow::{compute::temporal, temporal_conversions::timestamp_ns_to_datetime}; use chrono::prelude::{DateTime, Utc}; use chrono::Datelike; use chrono::Duration; +use chrono::Timelike; /// given a function `op` that maps a `&str` to a Result of an arrow native type, /// returns a `PrimitiveArray` after the application @@ -132,9 +133,9 @@ where } } -/// Calls cast::utf8_to_timestamp_ns_scalar and converts the error type +/// Calls cast::string_to_timestamp_nanos and converts the error type fn string_to_timestamp_nanos_shim(s: &str) -> Result { - utf8_to_timestamp_ns_scalar(s).map_err(|e| e.into()) + string_to_timestamp_nanos(s).map_err(|e| e.into()) } /// to_timestamp SQL function @@ -399,8 +400,8 @@ mod tests { ]; cases.iter().for_each(|(original, granularity, expected)| { - let original = utf8_to_timestamp_ns_scalar(original).unwrap(); - let expected = utf8_to_timestamp_ns_scalar(expected).unwrap(); + let original = string_to_timestamp_nanos(original).unwrap(); + let expected = string_to_timestamp_nanos(expected).unwrap(); let result = date_trunc_single(granularity, original).unwrap(); assert_eq!(result, expected); }); diff --git a/datafusion/src/physical_plan/distinct_expressions.rs b/datafusion/src/physical_plan/distinct_expressions.rs index 25031e54810b..f09481a94400 100644 --- a/datafusion/src/physical_plan/distinct_expressions.rs +++ b/datafusion/src/physical_plan/distinct_expressions.rs @@ -158,8 +158,8 @@ impl Accumulator for DistinctCountAccumulator { (0..col_values[0].len()).try_for_each(|row_index| { let row_values = col_values .iter() - .map(|col| ScalarValue::try_from_array(col, row_index)) - .collect::>>()?; + .map(|col| col[row_index].clone()) + .collect::>(); self.update(&row_values) }) } @@ -212,52 +212,25 @@ mod tests { use arrow::datatypes::DataType; macro_rules! state_to_vec { - ($LIST:expr, $DATA_TYPE:ident, $ARRAY_TY:ty) => {{ + ($LIST:expr, $DATA_TYPE:ident, $PRIM_TY:ty) => {{ match $LIST { - ScalarValue::List(_, data_type) => assert_eq!( - ListArray::::get_child_type(data_type), - &DataType::$DATA_TYPE - ), - _ => panic!("Expected a ScalarValue::List"), - } - - match $LIST { - ScalarValue::List(None, _) => None, - ScalarValue::List(Some(values), _) => { - let vec = values - .as_any() - .downcast_ref::<$ARRAY_TY>() - .unwrap() - .iter() - .map(|x| x.map(|x| *x)) - .collect::>(); - - Some(vec) - } - _ => unreachable!(), - } - }}; - } - - macro_rules! state_to_vec_bool { - ($LIST:expr, $DATA_TYPE:ident, $ARRAY_TY:ty) => {{ - match $LIST { - ScalarValue::List(_, data_type) => assert_eq!( - ListArray::::get_child_type(data_type), - &DataType::$DATA_TYPE - ), + ScalarValue::List(_, data_type) => match data_type.as_ref() { + &DataType::$DATA_TYPE => (), + _ => panic!("Unexpected DataType for list"), + }, _ => panic!("Expected a ScalarValue::List"), } match $LIST { ScalarValue::List(None, _) => None, - ScalarValue::List(Some(values), _) => { - let vec = values - .as_any() - .downcast_ref::<$ARRAY_TY>() - .unwrap() + ScalarValue::List(Some(scalar_values), _) => { + let vec = scalar_values .iter() - .collect::>(); + .map(|scalar_value| match scalar_value { + ScalarValue::$DATA_TYPE(value) => *value, + _ => panic!("Unexpected ScalarValue variant"), + }) + .collect::>>(); Some(vec) } @@ -336,7 +309,7 @@ mod tests { macro_rules! test_count_distinct_update_batch_numeric { ($ARRAY_TYPE:ident, $DATA_TYPE:ident, $PRIM_TYPE:ty) => {{ - let values = &[ + let values: Vec> = vec![ Some(1), Some(1), None, @@ -353,7 +326,7 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; let mut state_vec = - state_to_vec!(&states[0], $DATA_TYPE, $ARRAY_TYPE).unwrap(); + state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap(); state_vec.sort(); assert_eq!(states.len(), 1); @@ -405,7 +378,7 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; let mut state_vec = - state_to_vec!(&states[0], $DATA_TYPE, $ARRAY_TYPE).unwrap(); + state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap(); state_vec.sort_by(|a, b| match (a, b) { (Some(lhs), Some(rhs)) => { OrderedFloat::from(*lhs).cmp(&OrderedFloat::from(*rhs)) @@ -489,8 +462,7 @@ mod tests { let get_count = |data: BooleanArray| -> Result<(Vec>, u64)> { let arrays = vec![Arc::new(data) as ArrayRef]; let (states, result) = run_update_batch(&arrays)?; - let mut state_vec = - state_to_vec_bool!(&states[0], Boolean, BooleanArray).unwrap(); + let mut state_vec = state_to_vec!(&states[0], Boolean, bool).unwrap(); state_vec.sort(); let count = match result { ScalarValue::UInt64(c) => c.ok_or_else(|| { @@ -550,7 +522,7 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; assert_eq!(states.len(), 1); - assert_eq!(state_to_vec!(&states[0], Int32, Int32Array), Some(vec![])); + assert_eq!(state_to_vec!(&states[0], Int32, i32), Some(vec![])); assert_eq!(result, ScalarValue::UInt64(Some(0))); Ok(()) @@ -563,7 +535,7 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; assert_eq!(states.len(), 1); - assert_eq!(state_to_vec!(&states[0], Int32, Int32Array), Some(vec![])); + assert_eq!(state_to_vec!(&states[0], Int32, i32), Some(vec![])); assert_eq!(result, ScalarValue::UInt64(Some(0))); Ok(()) @@ -577,8 +549,8 @@ mod tests { let (states, result) = run_update_batch(&arrays)?; - let state_vec1 = state_to_vec!(&states[0], Int8, Int8Array).unwrap(); - let state_vec2 = state_to_vec!(&states[1], Int16, Int16Array).unwrap(); + let state_vec1 = state_to_vec!(&states[0], Int8, i8).unwrap(); + let state_vec2 = state_to_vec!(&states[1], Int16, i16).unwrap(); let state_pairs = collect_states::(&state_vec1, &state_vec2); assert_eq!(states.len(), 2); @@ -607,8 +579,8 @@ mod tests { ], )?; - let state_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap(); - let state_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap(); + let state_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap(); + let state_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap(); let state_pairs = collect_states::(&state_vec1, &state_vec2); assert_eq!(states.len(), 2); @@ -644,8 +616,8 @@ mod tests { ], )?; - let state_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap(); - let state_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap(); + let state_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap(); + let state_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap(); let state_pairs = collect_states::(&state_vec1, &state_vec2); assert_eq!(states.len(), 2); @@ -680,8 +652,8 @@ mod tests { let (states, result) = run_merge_batch(&[Arc::new(state_in1), Arc::new(state_in2)])?; - let state_out_vec1 = state_to_vec!(&states[0], Int32, Int32Array).unwrap(); - let state_out_vec2 = state_to_vec!(&states[1], UInt64, UInt64Array).unwrap(); + let state_out_vec1 = state_to_vec!(&states[0], Int32, i32).unwrap(); + let state_out_vec2 = state_to_vec!(&states[1], UInt64, u64).unwrap(); let state_pairs = collect_states::(&state_out_vec1, &state_out_vec2); assert_eq!( diff --git a/datafusion/src/physical_plan/explain.rs b/datafusion/src/physical_plan/explain.rs index 4fa926eb68a1..8f833c166689 100644 --- a/datafusion/src/physical_plan/explain.rs +++ b/datafusion/src/physical_plan/explain.rs @@ -121,13 +121,13 @@ impl ExecutionPlan for ExplainExec { let mut prev: Option<&StringifiedPlan> = None; for p in plans_to_print { - type_builder.append_value(p.plan_type.to_string())?; + type_builder.push(Some(p.plan_type.to_string())); match prev { Some(prev) if !should_show(prev, p) => { - plan_builder.append_value("SAME TEXT AS ABOVE")?; + plan_builder.push(Some("SAME TEXT AS ABOVE")); } Some(_) | None => { - plan_builder.append_value(&*p.plan)?; + plan_builder.push(Some(p.plan.to_string())); } } prev = Some(p); diff --git a/datafusion/src/physical_plan/expressions/binary.rs b/datafusion/src/physical_plan/expressions/binary.rs index 59bafe134935..3185d036a837 100644 --- a/datafusion/src/physical_plan/expressions/binary.rs +++ b/datafusion/src/physical_plan/expressions/binary.rs @@ -818,7 +818,7 @@ mod tests { apply_arithmetic::( schema, vec![a, b], - Operator::Modulus, + Operator::Modulo, Int32Array::from_slice(&[0, 0, 2, 8, 0]), )?; diff --git a/datafusion/src/physical_plan/expressions/in_list.rs b/datafusion/src/physical_plan/expressions/in_list.rs index 31da1238f783..cc037debdc97 100644 --- a/datafusion/src/physical_plan/expressions/in_list.rs +++ b/datafusion/src/physical_plan/expressions/in_list.rs @@ -37,7 +37,11 @@ macro_rules! compare_op_scalar { let validity = $left.validity(); let values = Bitmap::from_trusted_len_iter($left.values_iter().map(|x| $op(x, $right))); - Ok(BooleanArray::from_data(DataType::Boolean, values, validity)) + Ok(BooleanArray::from_data( + DataType::Boolean, + values, + validity.clone(), + )) }}; } @@ -48,7 +52,11 @@ macro_rules! compare_primitive_op_scalar { let validity = $left.validity(); let values = Bitmap::from_trusted_len_iter($left.values().iter().map(|x| $op(x, $right))); - Ok(BooleanArray::from_data(DataType::Boolean, values, validity)) + Ok(BooleanArray::from_data( + DataType::Boolean, + values, + validity.clone(), + )) }}; } @@ -175,7 +183,7 @@ fn in_list_primitive( array: &PrimitiveArray, values: &[T], ) -> Result { - compare_primitive_op_scalar!(array, values, |x, v| v.contains(&x)) + compare_primitive_op_scalar!(array, values, |x, v: &[T]| v.contains(x)) } // whether each value on the left (can be null) is contained in the non-null list @@ -183,7 +191,7 @@ fn not_in_list_primitive( array: &PrimitiveArray, values: &[T], ) -> Result { - compare_primitive_op_scalar!(array, values, |x, v| !v.contains(&x)) + compare_primitive_op_scalar!(array, values, |x, v: &[T]| !v.contains(x)) } // whether each value on the left (can be null) is contained in the non-null list diff --git a/datafusion/src/physical_plan/expressions/lead_lag.rs b/datafusion/src/physical_plan/expressions/lead_lag.rs index f9533e0a61c9..76ba5692f693 100644 --- a/datafusion/src/physical_plan/expressions/lead_lag.rs +++ b/datafusion/src/physical_plan/expressions/lead_lag.rs @@ -27,6 +27,7 @@ use arrow::compute::cast::cast; use arrow::datatypes::{DataType, Field}; use arrow::record_batch::RecordBatch; use std::any::Any; +use std::borrow::Borrow; use std::ops::Neg; use std::ops::Range; use std::sync::Arc; @@ -127,9 +128,11 @@ fn create_empty_array( let array = value .as_ref() .map(|scalar| scalar.to_array_of_size(size)) - .unwrap_or_else(|| new_null_array(data_type, size)); + .unwrap_or_else(|| ArrayRef::from(new_null_array(data_type.clone(), size))); if array.data_type() != data_type { - cast(&array, data_type).map_err(DataFusionError::ArrowError) + cast(array.borrow(), data_type) + .map_err(DataFusionError::ArrowError) + .map(ArrayRef::from) } else { Ok(array) } @@ -145,7 +148,7 @@ fn shift_with_default_value( let value_len = array.len() as i64; if offset == 0 { - Ok(array.as_arc()) + Ok(array.clone()) } else if offset == i64::MIN || offset.abs() >= value_len { create_empty_array(value, array.data_type(), array.len()) } else { @@ -160,9 +163,11 @@ fn shift_with_default_value( if offset > 0 { concat::concatenate(&[default_values.as_ref(), slice.as_ref()]) .map_err(DataFusionError::ArrowError) + .map(ArrayRef::from) } else { concat::concatenate(&[slice.as_ref(), default_values.as_ref()]) .map_err(DataFusionError::ArrowError) + .map(ArrayRef::from) } } } @@ -171,7 +176,11 @@ impl PartitionEvaluator for WindowShiftEvaluator { fn evaluate_partition(&self, partition: Range) -> Result { let value = &self.values[0]; let value = value.slice(partition.start, partition.end - partition.start); - shift_with_default_value(&value, self.shift_offset, &self.default_value) + shift_with_default_value( + ArrayRef::from(value).borrow(), + self.shift_offset, + &self.default_value, + ) } } @@ -184,7 +193,8 @@ mod tests { use arrow::{array::*, datatypes::*}; fn test_i32_result(expr: WindowShift, expected: Int32Array) -> Result<()> { - let arr: ArrayRef = Arc::new(Int32Array::from(vec![1, -2, 3, -4, 5, -6, 7, 8])); + let arr: ArrayRef = + Arc::new(Int32Array::from_slice(&[1, -2, 3, -4, 5, -6, 7, 8])); let values = vec![arr]; let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs index 8e88c7eab7ec..b363f9c1606c 100644 --- a/datafusion/src/physical_plan/expressions/nth_value.rs +++ b/datafusion/src/physical_plan/expressions/nth_value.rs @@ -174,12 +174,15 @@ impl PartitionEvaluator for NthValueEvaluator { .collect::>>()? .into_iter() .flatten(); - ScalarValue::iter_to_array(values) + ScalarValue::iter_to_array(values).map(ArrayRef::from) } NthValueKind::Nth(n) => { let index = (n as usize) - 1; if index >= num_rows { - Ok(new_null_array(arr.data_type(), num_rows)) + Ok(ArrayRef::from(new_null_array( + arr.data_type().clone(), + num_rows, + ))) } else { let value = ScalarValue::try_from_array(arr, partition.start + index)?; @@ -187,7 +190,9 @@ impl PartitionEvaluator for NthValueEvaluator { // because the default window frame is between unbounded preceding and current // row, hence the shift because for values with indices < index they should be // null. This changes when window frames other than default is implemented - shift(arr.as_ref(), index as i64).map_err(DataFusionError::ArrowError) + shift(arr.as_ref(), index as i64) + .map_err(DataFusionError::ArrowError) + .map(ArrayRef::from) } } } @@ -202,7 +207,7 @@ mod tests { use arrow::record_batch::RecordBatch; use arrow::{array::*, datatypes::*}; - fn test_i32_result(expr: NthValue, expected: Vec) -> Result<()> { + fn test_i32_result(expr: NthValue, expected: Int32Array) -> Result<()> { let arr: ArrayRef = Arc::new(Int32Array::from_slice(&[1, -2, 3, -4, 5, -6, 7, 8])); let values = vec![arr]; @@ -212,7 +217,7 @@ mod tests { .create_evaluator(&batch)? .evaluate_with_rank(vec![0..8], vec![0..8])?; assert_eq!(1, result.len()); - let result = result.as_any().downcast_ref::().unwrap(); + let result = result[0].as_any().downcast_ref::().unwrap(); assert_eq!(expected, *result); Ok(()) } @@ -224,7 +229,7 @@ mod tests { Arc::new(Column::new("arr", 0)), DataType::Int32, ); - test_i32_result(first_value, Int32Array::from_iter_values(vec![1; 8]))?; + test_i32_result(first_value, Int32Array::from_values(vec![1; 8]))?; Ok(()) } @@ -235,7 +240,7 @@ mod tests { Arc::new(Column::new("arr", 0)), DataType::Int32, ); - test_i32_result(last_value, Int32Array::from_iter_values(vec![8; 8]))?; + test_i32_result(last_value, Int32Array::from_values(vec![8; 8]))?; Ok(()) } @@ -247,7 +252,7 @@ mod tests { DataType::Int32, 1, )?; - test_i32_result(nth_value, Int32Array::from_iter_values(vec![1; 8]))?; + test_i32_result(nth_value, Int32Array::from_values(vec![1; 8]))?; Ok(()) } @@ -261,7 +266,7 @@ mod tests { )?; test_i32_result( nth_value, - Int32Array::from(vec![ + Int32Array::from(&[ None, Some(-2), Some(-2), diff --git a/datafusion/src/physical_plan/expressions/rank.rs b/datafusion/src/physical_plan/expressions/rank.rs index b88dec378c06..e9f10622f2fd 100644 --- a/datafusion/src/physical_plan/expressions/rank.rs +++ b/datafusion/src/physical_plan/expressions/rank.rs @@ -93,14 +93,14 @@ impl PartitionEvaluator for RankEvaluator { ranks_in_partition: &[Range], ) -> Result { let result = if self.dense { - UInt64Array::from_iter_values(ranks_in_partition.iter().zip(1u64..).flat_map( + UInt64Array::from_values(ranks_in_partition.iter().zip(1u64..).flat_map( |(range, rank)| { let len = range.end - range.start; iter::repeat(rank).take(len) }, )) } else { - UInt64Array::from_iter_values( + UInt64Array::from_values( ranks_in_partition .iter() .scan(1_u64, |acc, range| { @@ -140,7 +140,7 @@ mod tests { ranks: Vec>, expected: Vec, ) -> Result<()> { - let arr: ArrayRef = Arc::new(Int32Array::from(data)); + let arr: ArrayRef = Arc::new(Int32Array::from_values(data)); let values = vec![arr]; let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; @@ -149,8 +149,8 @@ mod tests { .evaluate_with_rank(vec![0..8], ranks)?; assert_eq!(1, result.len()); let result = result[0].as_any().downcast_ref::().unwrap(); - let result = result.values(); - assert_eq!(expected, result); + let expected = UInt64Array::from_values(expected); + assert_eq!(expected, *result); Ok(()) } diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index 08484d2f0efd..abcb2df3b913 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -74,9 +74,7 @@ pub(crate) struct NumRowsEvaluator {} impl PartitionEvaluator for NumRowsEvaluator { fn evaluate_partition(&self, partition: Range) -> Result { let num_rows = partition.end - partition.start; - Ok(Arc::new(UInt64Array::from_iter_values( - 1..(num_rows as u64) + 1, - ))) + Ok(Arc::new(UInt64Array::from_values(1..(num_rows as u64) + 1))) } } @@ -98,7 +96,7 @@ mod tests { let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?; assert_eq!(1, result.len()); let result = result[0].as_any().downcast_ref::().unwrap(); - let result = result.values(); + let result = result.values().as_slice(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); Ok(()) } @@ -111,8 +109,9 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = row_number.evaluate(batch.num_rows(), &[])?; - let result = result.as_any().downcast_ref::().unwrap(); + let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?; + assert_eq!(1, result.len()); + let result = result[0].as_any().downcast_ref::().unwrap(); let result = result.values().as_slice(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); Ok(()) diff --git a/datafusion/src/physical_plan/hash_utils.rs b/datafusion/src/physical_plan/hash_utils.rs index 42620bfecdcf..bc7f4f611601 100644 --- a/datafusion/src/physical_plan/hash_utils.rs +++ b/datafusion/src/physical_plan/hash_utils.rs @@ -681,7 +681,7 @@ mod tests { let strings = vec![Some("foo"), None, Some("bar"), Some("foo"), None]; let string_array = Arc::new(strings.iter().cloned().collect::>()); - let dict_array = MutableDictionaryArray::>::new(); + let mut dict_array = MutableDictionaryArray::>::new(); dict_array.try_extend(strings.iter().cloned()).unwrap(); let dict_array = dict_array.into_arc(); @@ -723,7 +723,7 @@ mod tests { let strings2 = vec![Some("blarg"), Some("blah"), None]; let string_array = Arc::new(strings1.iter().cloned().collect::>()); - let dict_array = MutableDictionaryArray::>::new(); + let mut dict_array = MutableDictionaryArray::>::new(); dict_array.try_extend(strings2.iter().cloned()).unwrap(); let dict_array = dict_array.into_arc(); diff --git a/datafusion/src/physical_plan/parquet.rs b/datafusion/src/physical_plan/parquet.rs index 9c91c38e6f0a..aa2221be9f6e 100644 --- a/datafusion/src/physical_plan/parquet.rs +++ b/datafusion/src/physical_plan/parquet.rs @@ -577,8 +577,8 @@ fn read_partition( for partitioned_file in all_files { let file_metrics = ParquetFileMetrics::new(partition_index, &*partitioned_file.path, &metrics); - let mut file = File::open(partitioned_file.path.as_str())?; - let reader = read::RecordReader::try_new( + let file = File::open(partitioned_file.path.as_str())?; + let mut reader = read::RecordReader::try_new( std::io::BufReader::new(file), Some(projection.to_vec()), limit, diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index 5fd3806e1970..ef668afcb2cf 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -197,7 +197,7 @@ impl ExecutionPlan for SortPreservingMergeExec { /// `SortKeyCursor::compare` can then be used to compare the sort key pointed to /// by this row cursor, with that of another `SortKeyCursor`. A cursor stores /// a row comparator for each other cursor that it is compared to. -struct SortKeyCursor<'a> { +struct SortKeyCursor { columns: Vec, cur_row: usize, num_rows: usize, @@ -209,10 +209,10 @@ struct SortKeyCursor<'a> { // A collection of comparators that compare rows in this cursor's batch to // the cursors in other batches. Other batches are uniquely identified by // their batch_idx. - batch_comparators: HashMap>>, + batch_comparators: HashMap>, } -impl<'a, 'b> std::fmt::Debug for SortKeyCursor<'b> { +impl<'a> std::fmt::Debug for SortKeyCursor { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SortKeyCursor") .field("columns", &self.columns) @@ -225,7 +225,7 @@ impl<'a, 'b> std::fmt::Debug for SortKeyCursor<'b> { } } -impl<'a> SortKeyCursor<'a> { +impl SortKeyCursor { fn new( batch_idx: usize, batch: RecordBatch, @@ -257,9 +257,9 @@ impl<'a> SortKeyCursor<'a> { } /// Compares the sort key pointed to by this instance's row cursor with that of another - fn compare<'b>( + fn compare( &mut self, - other: &SortKeyCursor<'b>, + other: &SortKeyCursor, options: &[SortOptions], ) -> Result { if self.columns.len() != other.columns.len() { @@ -330,7 +330,7 @@ struct RowIndex { } #[derive(Debug)] -struct SortPreservingMergeStream<'a> { +struct SortPreservingMergeStream { /// The schema of the RecordBatches yielded by this stream schema: SchemaRef, /// The sorted input streams to merge together @@ -339,7 +339,7 @@ struct SortPreservingMergeStream<'a> { /// /// Exhausted cursors will be popped off the front once all /// their rows have been yielded to the output - cursors: Vec>>, + cursors: Vec>, /// The accumulated row indexes for the next record batch in_progress: Vec, /// The physical expressions to sort by @@ -357,7 +357,7 @@ struct SortPreservingMergeStream<'a> { next_batch_index: usize, } -impl<'a> SortPreservingMergeStream<'a> { +impl SortPreservingMergeStream { fn new( streams: Vec>>, schema: SchemaRef, @@ -546,7 +546,7 @@ impl<'a> SortPreservingMergeStream<'a> { } } -impl<'a> Stream for SortPreservingMergeStream<'a> { +impl Stream for SortPreservingMergeStream { type Item = ArrowResult; fn poll_next( @@ -558,7 +558,7 @@ impl<'a> Stream for SortPreservingMergeStream<'a> { } } -impl<'a> SortPreservingMergeStream<'a> { +impl SortPreservingMergeStream { #[inline] fn poll_next_inner( self: &mut Pin<&mut Self>, @@ -630,7 +630,7 @@ impl<'a> SortPreservingMergeStream<'a> { } } -impl<'a> RecordBatchStream for SortPreservingMergeStream<'a> { +impl RecordBatchStream for SortPreservingMergeStream { fn schema(&self) -> SchemaRef { self.schema.clone() } @@ -898,7 +898,7 @@ mod tests { options: Default::default(), }, ]; - let exec = MemoryExec::try_new(partitions, schema, None).unwrap(); + let exec = MemoryExec::try_new(partitions, schema.clone(), None).unwrap(); let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec), 1024)); let collected = collect(merge).await.unwrap(); @@ -1244,7 +1244,7 @@ mod tests { Arc::new(Utf8Array::::from_iter(vec![Some("b"), Some("d")])); let b2 = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap(); - let schema = b1.schema(); + let schema = b1.schema().clone(); let sort = vec![PhysicalSortExpr { expr: col("b", &schema).unwrap(), options: Default::default(), diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index 83a6c0df4fee..bdb3d0053a74 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -27,6 +27,7 @@ use arrow::{ types::days_ms, }; use ordered_float::OrderedFloat; +use std::borrow::Borrow; use std::cmp::Ordering; use std::convert::{Infallible, TryInto}; use std::str::FromStr; @@ -907,11 +908,9 @@ impl ScalarValue { let value = match list_array.is_null(index) { true => None, false => { - let nested_array = list_array.value(index); + let nested_array = ArrayRef::from(list_array.value(index)); let scalar_vec = (0..nested_array.len()) - .map(|i| { - ScalarValue::try_from_array(&Arc::from(nested_array), i) - }) + .map(|i| ScalarValue::try_from_array(&nested_array, i)) .collect::>>()?; Some(scalar_vec) } @@ -1732,14 +1731,19 @@ mod tests { make_str_test_case!(str_vals, LargeStringArray, LargeUtf8), make_binary_test_case!(str_vals, SmallBinaryArray, Binary), make_binary_test_case!(str_vals, LargeBinaryArray, LargeBinary), - make_date_test_case!(i32_vals, Int32Array, Date32), - make_date_test_case!(i64_vals, Int64Array, Date64), - make_ts_test_case!(i64_vals, Int64Array, Second, TimestampSecond), - make_ts_test_case!(i64_vals, Int64Array, Millisecond, TimestampMillisecond), - make_ts_test_case!(i64_vals, Int64Array, Microsecond, TimestampMicrosecond), - make_ts_test_case!(i64_vals, Int64Array, Nanosecond, TimestampNanosecond), - make_temporal_test_case!(i32_vals, Int32Array, YearMonth, IntervalYearMonth), - make_temporal_test_case!(days_ms_vals, DaysMsArray, DayTime, IntervalDayTime), + make_date_test_case!(&i32_vals, Int32Array, Date32), + make_date_test_case!(&i64_vals, Int64Array, Date64), + make_ts_test_case!(&i64_vals, Int64Array, Second, TimestampSecond), + make_ts_test_case!(&i64_vals, Int64Array, Millisecond, TimestampMillisecond), + make_ts_test_case!(&i64_vals, Int64Array, Microsecond, TimestampMicrosecond), + make_ts_test_case!(&i64_vals, Int64Array, Nanosecond, TimestampNanosecond), + make_temporal_test_case!(&i32_vals, Int32Array, YearMonth, IntervalYearMonth), + make_temporal_test_case!( + &days_ms_vals, + DaysMsArray, + DayTime, + IntervalDayTime + ), make_str_dict_test_case!(str_vals, i8, Utf8), make_str_dict_test_case!(str_vals, i16, Utf8), make_str_dict_test_case!(str_vals, i32, Utf8),