Skip to content

Commit

Permalink
Make scalar.rs compile (#2)
Browse files Browse the repository at this point in the history
* wip

* more

* Make scalar.rs compile
  • Loading branch information
yjshen authored Sep 16, 2021
1 parent b9f12d0 commit 00df64a
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 124 deletions.
30 changes: 21 additions & 9 deletions datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Parquet data source
use std::any::{Any, type_name};
use std::any::{type_name, Any};
use std::fs::File;
use std::sync::Arc;

Expand Down Expand Up @@ -221,7 +221,8 @@ impl ParquetTableDescriptor {
if let DataType::$DT = fields[i].data_type() {
let stats = stats
.as_any()
.downcast_ref::<ParquetPrimitiveStatistics<$PRIMITIVE_TYPE>>().ok_or_else(|| {
.downcast_ref::<ParquetPrimitiveStatistics<$PRIMITIVE_TYPE>>()
.ok_or_else(|| {
DataFusionError::Internal(format!(
"Failed to cast stats to {} stats",
type_name::<$PRIMITIVE_TYPE>()
Expand Down Expand Up @@ -254,9 +255,13 @@ impl ParquetTableDescriptor {
match stats.physical_type() {
PhysicalType::Boolean => {
if let DataType::Boolean = fields[i].data_type() {
let stats =
stats.as_any().downcast_ref::<ParquetBooleanStatistics>().ok_or_else(|| {
DataFusionError::Internal("Failed to cast stats to boolean stats".to_owned())
let stats = stats
.as_any()
.downcast_ref::<ParquetBooleanStatistics>()
.ok_or_else(|| {
DataFusionError::Internal(
"Failed to cast stats to boolean stats".to_owned(),
)
})?;
if let Some(max_value) = &mut max_values[i] {
if let Some(v) = stats.max_value {
Expand Down Expand Up @@ -296,9 +301,13 @@ impl ParquetTableDescriptor {
}
PhysicalType::ByteArray => {
if let DataType::Utf8 = fields[i].data_type() {
let stats =
stats.as_any().downcast_ref::<ParquetBinaryStatistics>().ok_or_else(|| {
DataFusionError::Internal("Failed to cast stats to binary stats".to_owned())
let stats = stats
.as_any()
.downcast_ref::<ParquetBinaryStatistics>()
.ok_or_else(|| {
DataFusionError::Internal(
"Failed to cast stats to binary stats".to_owned(),
)
})?;
if let Some(max_value) = &mut max_values[i] {
if let Some(v) = stats.max_value {
Expand Down Expand Up @@ -395,7 +404,10 @@ impl TableDescriptorBuilder for ParquetTableDescriptor {
};

Ok(FileAndSchema {
file: PartitionedFile { path: path.to_owned(), statistics },
file: PartitionedFile {
path: path.to_owned(),
statistics,
},
schema,
})
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
use std::sync::{Arc, Mutex};

use arrow::io::print;
use arrow::record_batch::RecordBatch;
use crate::error::Result;
use crate::execution::context::{ExecutionContext, ExecutionContextState};
use crate::logical_plan::{
Expand All @@ -31,6 +29,8 @@ use crate::{
dataframe::*,
physical_plan::{collect, collect_partitioned},
};
use arrow::io::print;
use arrow::record_batch::RecordBatch;

use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
Expand Down
7 changes: 2 additions & 5 deletions datafusion/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use futures::StreamExt;

use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream};
use async_trait::async_trait;
use arrow::array::MutableUtf8Array;
use async_trait::async_trait;

/// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input,
/// discards the results, and then prints out an annotated plan with metrics
Expand Down Expand Up @@ -182,10 +182,7 @@ impl ExecutionPlan for AnalyzeExec {

let maybe_batch = RecordBatch::try_new(
captured_schema,
vec![
type_builder.into_arc(),
plan_builder.into_arc(),
],
vec![type_builder.into_arc(), plan_builder.into_arc()],
);
// again ignore error
tx.send(maybe_batch).await.ok();
Expand Down
4 changes: 1 addition & 3 deletions datafusion/src/physical_plan/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ use crate::{
use arrow::{
array::*,
compute::cast,
datatypes::{
DataType, TimeUnit,
},
datatypes::{DataType, TimeUnit},
temporal_conversions::utf8_to_timestamp_ns_scalar,
types::NativeType,
};
Expand Down
6 changes: 2 additions & 4 deletions datafusion/src/physical_plan/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,15 @@ fn in_list_primitive<T: NativeType>(
array: &PrimitiveArray<T>,
values: &[T],
) -> Result<BooleanArray> {
compare_primitive_op_scalar!(array, values, |x, v| v
.contains(&x))
compare_primitive_op_scalar!(array, values, |x, v| v.contains(&x))
}

// whether each value on the left (can be null) is contained in the non-null list
fn not_in_list_primitive<T: NativeType>(
array: &PrimitiveArray<T>,
values: &[T],
) -> Result<BooleanArray> {
compare_primitive_op_scalar!(array, values, |x, v| !v
.contains(&x))
compare_primitive_op_scalar!(array, values, |x, v| !v.contains(&x))
}

// whether each value on the left (can be null) is contained in the non-null list
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub struct ParquetExec {
pub schema: Arc<Schema>,
/// Projection for which columns to load
projection: Vec<usize>,
/// Batch size
/// Batch size
batch_size: usize,
/// Statistics for the data set (sum of statistics for all partitions)
statistics: Statistics,
Expand Down
Loading

0 comments on commit 00df64a

Please sign in to comment.