Skip to content

Commit

Permalink
chore: Upgrade to datafusion 12 & arrow 23 (apache#269)
Browse files Browse the repository at this point in the history
* chore: Upgrade to datafusion 12 && Arrow 23

* chore: fix clippy

* chore: fix clippy

* chore: refactor code

* test: fix tests

* refactor by CR

* refactor code
  • Loading branch information
chunshao90 authored Sep 27, 2022
1 parent 9e484e7 commit 68a525c
Show file tree
Hide file tree
Showing 33 changed files with 534 additions and 320 deletions.
383 changes: 267 additions & 116 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use arrow_deps::{
arrow::{
array::{Array, ArrayData, ArrayRef},
buffer::MutableBuffer,
compute,
record_batch::RecordBatch as ArrowRecordBatch,
util::bit_util,
},
Expand Down Expand Up @@ -299,7 +300,7 @@ impl RecordEncoder for ColumnarRecordEncoder {
fn encode(&mut self, arrow_record_batch_vec: Vec<ArrowRecordBatch>) -> Result<usize> {
assert!(self.arrow_writer.is_some());

let record_batch = ArrowRecordBatch::concat(&self.arrow_schema, &arrow_record_batch_vec)
let record_batch = compute::concat_batches(&self.arrow_schema, &arrow_record_batch_vec)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;

Expand Down Expand Up @@ -744,7 +745,7 @@ mod tests {
arrow::array::{Int32Array, StringArray, TimestampMillisecondArray, UInt64Array},
parquet::{
arrow::{ArrowReader, ParquetFileArrowReader},
file::serialized_reader::{SerializedFileReader, SliceableCursor},
file::serialized_reader::SerializedFileReader,
},
};
use common_types::{
Expand Down Expand Up @@ -949,8 +950,7 @@ mod tests {

// read encoded records back, and then compare with input records
let encoded_bytes = encoder.close().unwrap();
let reader =
SerializedFileReader::new(SliceableCursor::new(Arc::new(encoded_bytes))).unwrap();
let reader = SerializedFileReader::new(Bytes::from(encoded_bytes)).unwrap();
let mut reader = ParquetFileArrowReader::new(Arc::new(reader));
let mut reader = reader.get_record_reader(2048).unwrap();
let hybrid_record_batch = reader.next().unwrap().unwrap();
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/sst/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//! Sst implementation based on parquet.

pub mod builder;
#[allow(deprecated)]
pub mod encoding;
mod hybrid;
#[allow(deprecated)]
pub mod reader;
21 changes: 9 additions & 12 deletions analytic_engine/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ use arrow_deps::{
arrow::{error::Result as ArrowResult, record_batch::RecordBatch},
parquet::{
arrow::{ArrowReader, ParquetFileArrowReader, ProjectionMask},
file::{
metadata::RowGroupMetaData, reader::FileReader, serialized_reader::SliceableCursor,
},
file::{metadata::RowGroupMetaData, reader::FileReader},
},
};
use async_trait::async_trait;
use bytes::Bytes;
use common_types::{
projected_schema::{ProjectedSchema, RowProjector},
record_batch::{ArrowRecordBatchProjector, RecordBatchWithKey},
Expand All @@ -29,7 +28,7 @@ use futures::Stream;
use log::{debug, error, trace};
use object_store::{ObjectStoreRef, Path};
use parquet::{
reverse_reader::Builder as ReverseRecordBatchReaderBuilder, CachableSerializedFileReader,
reverse_reader::Builder as ReverseRecordBatchReaderBuilder, CacheableSerializedFileReader,
DataCacheRef, MetaCacheRef,
};
use snafu::{ensure, OptionExt, ResultExt};
Expand All @@ -53,7 +52,7 @@ pub async fn read_sst_meta(
path: &Path,
meta_cache: &Option<MetaCacheRef>,
data_cache: &Option<DataCacheRef>,
) -> Result<(CachableSerializedFileReader<SliceableCursor>, SstMetaData)> {
) -> Result<(CacheableSerializedFileReader<Bytes>, SstMetaData)> {
let get_result = storage
.get(path)
.await
Expand All @@ -71,12 +70,10 @@ pub async fn read_sst_meta(
.map_err(|e| Box::new(e) as _)
.context(ReadPersist {
path: path.to_string(),
})?
.to_vec();
let bytes = SliceableCursor::new(Arc::new(bytes));
})?;

// generate the file reader
let file_reader = CachableSerializedFileReader::new(
let file_reader = CacheableSerializedFileReader::new(
path.to_string(),
bytes,
meta_cache.clone(),
Expand Down Expand Up @@ -114,7 +111,7 @@ pub struct ParquetSstReader<'a> {
projected_schema: ProjectedSchema,
predicate: PredicateRef,
meta_data: Option<SstMetaData>,
file_reader: Option<CachableSerializedFileReader<SliceableCursor>>,
file_reader: Option<CacheableSerializedFileReader<Bytes>>,
/// The batch of rows in one `record_batch`.
batch_size: usize,
/// Read the rows in reverse order.
Expand Down Expand Up @@ -246,7 +243,7 @@ impl<'a> ParquetSstReader<'a> {
/// A reader for projection and filter on the parquet file.
struct ProjectAndFilterReader {
file_path: String,
file_reader: Option<CachableSerializedFileReader<SliceableCursor>>,
file_reader: Option<CacheableSerializedFileReader<Bytes>>,
schema: Schema,
projected_schema: ProjectedSchema,
row_projector: RowProjector,
Expand Down Expand Up @@ -300,7 +297,7 @@ impl ProjectAndFilterReader {
arrow_reader.get_record_reader(self.batch_size)
} else {
let proj_mask = ProjectionMask::leaves(
arrow_reader.get_metadata().file_metadata().schema_descr(),
arrow_reader.metadata().file_metadata().schema_descr(),
self.row_projector
.existed_source_projection()
.iter()
Expand Down
15 changes: 10 additions & 5 deletions arrow_deps/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use std::sync::Arc;
use arrow::{
array::{self, Array, DictionaryArray},
datatypes::{
ArrowNativeType, ArrowPrimitiveType, DataType, Int16Type, Int32Type, Int64Type, Int8Type,
IntervalUnit, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
ArrowNativeType, ArrowPrimitiveType, DataType, Decimal128Type, Decimal256Type, DecimalType,
Int16Type, Int32Type, Int64Type, Int8Type, IntervalUnit, TimeUnit, UInt16Type, UInt32Type,
UInt64Type, UInt8Type,
},
error::{ArrowError, Result},
};
Expand Down Expand Up @@ -252,10 +253,13 @@ macro_rules! make_string_from_fixed_size_list {
}

#[inline(always)]
pub fn make_string_from_decimal(column: &Arc<dyn Array>, row: usize) -> Result<String> {
pub fn make_string_from_decimal<T: DecimalType>(
column: &Arc<dyn Array>,
row: usize,
) -> Result<String> {
let array = column
.as_any()
.downcast_ref::<array::DecimalArray>()
.downcast_ref::<array::DecimalArray<T>>()
.unwrap();

let formatted_decimal = array.value_as_string(row);
Expand Down Expand Up @@ -318,7 +322,8 @@ pub fn array_value_to_string(column: &array::ArrayRef, row: usize) -> Result<Str
DataType::Float16 => make_string!(array::Float16Array, column, row),
DataType::Float32 => make_string!(array::Float32Array, column, row),
DataType::Float64 => make_string!(array::Float64Array, column, row),
DataType::Decimal(..) => make_string_from_decimal(column, row),
DataType::Decimal128(..) => make_string_from_decimal::<Decimal128Type>(column, row),
DataType::Decimal256(..) => make_string_from_decimal::<Decimal256Type>(column, row),
DataType::Timestamp(unit, _) if *unit == TimeUnit::Second => {
make_string_datetime!(array::TimestampSecondArray, column, row)
}
Expand Down
34 changes: 8 additions & 26 deletions benchmarks/src/parquet_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

use std::{sync::Arc, time::Instant};

use arrow_deps::parquet::{
arrow::{ArrowReader, ParquetFileArrowReader, ProjectionMask},
file::serialized_reader::{ReadOptionsBuilder, SerializedFileReader, SliceableCursor},
};
use arrow_deps::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use common_types::schema::Schema;
use common_util::runtime::Runtime;
use log::info;
Expand All @@ -24,7 +21,6 @@ pub struct ParquetBench {
projection: Vec<usize>,
_schema: Schema,
_predicate: PredicateRef,
batch_size: usize,
runtime: Arc<Runtime>,
}

Expand Down Expand Up @@ -52,7 +48,6 @@ impl ParquetBench {
projection: Vec::new(),
_schema: schema,
_predicate: config.predicate.into_predicate(),
batch_size: config.read_batch_row_num,
runtime: Arc::new(runtime),
}
}
Expand All @@ -78,34 +73,21 @@ impl ParquetBench {
self.runtime.block_on(async {
let open_instant = Instant::now();
let get_result = self.store.get(&sst_path).await.unwrap();
let cursor = SliceableCursor::new(Arc::new(get_result.bytes().await.unwrap().to_vec()));
// todo: enable predicate filter
let read_options = ReadOptionsBuilder::new()
.with_predicate(Box::new(move |_, _| true))
.build();
let file_reader = SerializedFileReader::new_with_options(cursor, read_options).unwrap();

let open_cost = open_instant.elapsed();

let filter_begin_instant = Instant::now();
let mut arrow_reader = { ParquetFileArrowReader::new(Arc::new(file_reader)) };
let filter_cost = filter_begin_instant.elapsed();

let record_reader = if self.projection.is_empty() {
arrow_reader.get_record_reader(self.batch_size).unwrap()
} else {
let proj_mask = ProjectionMask::leaves(
arrow_reader.get_metadata().file_metadata().schema_descr(),
self.projection.iter().copied(),
);
arrow_reader
.get_record_reader_by_columns(proj_mask, self.batch_size)
let arrow_reader =
ParquetRecordBatchReaderBuilder::try_new(get_result.bytes().await.unwrap())
.unwrap()
};
.build()
.unwrap();
let filter_cost = filter_begin_instant.elapsed();

let iter_begin_instant = Instant::now();
let mut total_rows = 0;
let mut batch_num = 0;
for record_batch in record_reader {
for record_batch in arrow_reader {
let num_rows = record_batch.unwrap().num_rows();
total_rows += num_rows;
batch_num += 1;
Expand Down
Loading

0 comments on commit 68a525c

Please sign in to comment.