diff --git a/datafusion/core/benches/jit.rs b/datafusion/core/benches/jit.rs index 6abebc294185..0c6de319d2ce 100644 --- a/datafusion/core/benches/jit.rs +++ b/datafusion/core/benches/jit.rs @@ -23,7 +23,9 @@ extern crate datafusion; mod data_utils; use crate::criterion::Criterion; use crate::data_utils::{create_record_batches, create_schema}; -use datafusion::row::writer::{bench_write_batch, bench_write_batch_jit}; +use datafusion::row::jit::writer::bench_write_batch_jit; +use datafusion::row::writer::bench_write_batch; +use datafusion::row::RowType; use std::sync::Arc; fn criterion_benchmark(c: &mut Criterion) { @@ -35,15 +37,38 @@ fn criterion_benchmark(c: &mut Criterion) { let batches = create_record_batches(schema.clone(), array_len, partitions_len, batch_size); - c.bench_function("row serializer", |b| { + c.bench_function("compact row serializer", |b| { b.iter(|| { - criterion::black_box(bench_write_batch(&batches, schema.clone()).unwrap()) + criterion::black_box( + bench_write_batch(&batches, schema.clone(), RowType::Compact).unwrap(), + ) }) }); - c.bench_function("row serializer jit", |b| { + c.bench_function("word aligned row serializer", |b| { b.iter(|| { - criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap()) + criterion::black_box( + bench_write_batch(&batches, schema.clone(), RowType::WordAligned) + .unwrap(), + ) + }) + }); + + c.bench_function("compact row serializer jit", |b| { + b.iter(|| { + criterion::black_box( + bench_write_batch_jit(&batches, schema.clone(), RowType::Compact) + .unwrap(), + ) + }) + }); + + c.bench_function("word aligned row serializer jit", |b| { + b.iter(|| { + criterion::black_box( + bench_write_batch_jit(&batches, schema.clone(), RowType::WordAligned) + .unwrap(), + ) }) }); } diff --git a/datafusion/core/src/row/jit/mod.rs b/datafusion/core/src/row/jit/mod.rs index fbb6efe3c0b2..7ee76a9b4ba8 100644 --- a/datafusion/core/src/row/jit/mod.rs +++ b/datafusion/core/src/row/jit/mod.rs @@ -17,8 +17,8 @@ //! Just-In-Time(JIT) version for row reader and writers -mod reader; -mod writer; +pub mod reader; +pub mod writer; #[macro_export] /// register external functions to the assembler @@ -46,6 +46,7 @@ mod tests { use crate::error::Result; use crate::row::jit::reader::read_as_batch_jit; use crate::row::jit::writer::write_batch_unchecked_jit; + use crate::row::layout::RowType::{Compact, WordAligned}; use arrow::record_batch::RecordBatch; use arrow::{array::*, datatypes::*}; use datafusion_jit::api::Assembler; @@ -53,26 +54,26 @@ mod tests { use DataType::*; macro_rules! fn_test_single_type { - ($ARRAY: ident, $TYPE: expr, $VEC: expr) => { + ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => { paste::item! { #[test] #[allow(non_snake_case)] - fn []() -> Result<()> { + fn []() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)])); let a = $ARRAY::from($VEC); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; let mut vector = vec![0; 1024]; let assembler = Assembler::default(); let row_offsets = - { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? }; - let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; + { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? }; + let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? }; assert_eq!(batch, output_batch); Ok(()) } #[test] #[allow(non_snake_case)] - fn []() -> Result<()> { + fn []() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); let v = $VEC.into_iter().filter(|o| o.is_some()).collect::>(); let a = $ARRAY::from(v); @@ -80,8 +81,8 @@ mod tests { let mut vector = vec![0; 1024]; let assembler = Assembler::default(); let row_offsets = - { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? }; - let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; + { write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? }; + let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? }; assert_eq!(batch, output_batch); Ok(()) } @@ -92,85 +93,190 @@ mod tests { fn_test_single_type!( BooleanArray, Boolean, - vec![Some(true), Some(false), None, Some(true), None] + vec![Some(true), Some(false), None, Some(true), None], + Compact + ); + + fn_test_single_type!( + BooleanArray, + Boolean, + vec![Some(true), Some(false), None, Some(true), None], + WordAligned ); fn_test_single_type!( Int8Array, Int8, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact + ); + + fn_test_single_type!( + Int8Array, + Int8, + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned + ); + + fn_test_single_type!( + Int16Array, + Int16, + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact ); fn_test_single_type!( Int16Array, Int16, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned ); fn_test_single_type!( Int32Array, Int32, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact + ); + + fn_test_single_type!( + Int32Array, + Int32, + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned + ); + + fn_test_single_type!( + Int64Array, + Int64, + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact ); fn_test_single_type!( Int64Array, Int64, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned ); fn_test_single_type!( UInt8Array, UInt8, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact + ); + + fn_test_single_type!( + UInt8Array, + UInt8, + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned ); fn_test_single_type!( UInt16Array, UInt16, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact + ); + + fn_test_single_type!( + UInt16Array, + UInt16, + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned + ); + + fn_test_single_type!( + UInt32Array, + UInt32, + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact ); fn_test_single_type!( UInt32Array, UInt32, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned + ); + + fn_test_single_type!( + UInt64Array, + UInt64, + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact ); fn_test_single_type!( UInt64Array, UInt64, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned + ); + + fn_test_single_type!( + Float32Array, + Float32, + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], + Compact ); fn_test_single_type!( Float32Array, Float32, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], + WordAligned + ); + + fn_test_single_type!( + Float64Array, + Float64, + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], + Compact ); fn_test_single_type!( Float64Array, Float64, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], + WordAligned + ); + + fn_test_single_type!( + Date32Array, + Date32, + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact ); fn_test_single_type!( Date32Array, Date32, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned + ); + + fn_test_single_type!( + Date64Array, + Date64, + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact ); fn_test_single_type!( Date64Array, Date64, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned ); fn_test_single_type!( StringArray, Utf8, - vec![Some("hello"), Some("world"), None, Some(""), Some("")] + vec![Some("hello"), Some("world"), None, Some(""), Some("")], + Compact ); #[test] @@ -190,10 +296,11 @@ mod tests { 0, schema.clone(), &assembler, + Compact, )? }; let output_batch = - { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; + { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? }; assert_eq!(batch, output_batch); Ok(()) } @@ -214,10 +321,11 @@ mod tests { 0, schema.clone(), &assembler, + Compact, )? }; let output_batch = - { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? }; + { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? }; assert_eq!(batch, output_batch); Ok(()) } diff --git a/datafusion/core/src/row/jit/reader.rs b/datafusion/core/src/row/jit/reader.rs index c1018366720c..80e10131fcf8 100644 --- a/datafusion/core/src/row/jit/reader.rs +++ b/datafusion/core/src/row/jit/reader.rs @@ -20,6 +20,7 @@ use crate::error::{DataFusionError, Result}; use crate::reg_fn; use crate::row::jit::fn_name; +use crate::row::layout::RowType; use crate::row::reader::RowReader; use crate::row::reader::*; use crate::row::MutableRecordBatch; @@ -38,10 +39,11 @@ pub fn read_as_batch_jit( schema: Arc, offsets: &[usize], assembler: &Assembler, + row_type: RowType, ) -> Result { let row_num = offsets.len(); let mut output = MutableRecordBatch::new(row_num, schema.clone()); - let mut row = RowReader::new(&schema); + let mut row = RowReader::new(&schema, row_type); register_read_functions(assembler)?; let gen_func = gen_read_row(&schema, assembler)?; let mut jit = assembler.create_jit(); @@ -102,10 +104,7 @@ fn register_read_functions(asm: &Assembler) -> Result<()> { Ok(()) } -fn gen_read_row( - schema: &Arc, - assembler: &Assembler, -) -> Result { +fn gen_read_row(schema: &Schema, assembler: &Assembler) -> Result { use DataType::*; let mut builder = assembler .new_func_builder("read_row") diff --git a/datafusion/core/src/row/jit/writer.rs b/datafusion/core/src/row/jit/writer.rs index 5b077caf6a47..ae9ff1308189 100644 --- a/datafusion/core/src/row/jit/writer.rs +++ b/datafusion/core/src/row/jit/writer.rs @@ -20,6 +20,7 @@ use crate::error::Result; use crate::reg_fn; use crate::row::jit::fn_name; +use crate::row::layout::RowType; use crate::row::schema_null_free; use crate::row::writer::RowWriter; use crate::row::writer::*; @@ -43,8 +44,9 @@ pub fn write_batch_unchecked_jit( row_idx: usize, schema: Arc, assembler: &Assembler, + row_type: RowType, ) -> Result> { - let mut writer = RowWriter::new(&schema); + let mut writer = RowWriter::new(&schema, row_type); let mut current_offset = offset; let mut offsets = vec![]; register_write_functions(assembler)?; @@ -74,9 +76,10 @@ pub fn write_batch_unchecked_jit( pub fn bench_write_batch_jit( batches: &[Vec], schema: Arc, + row_type: RowType, ) -> Result> { let assembler = Assembler::default(); - let mut writer = RowWriter::new(&schema); + let mut writer = RowWriter::new(&schema, row_type); let mut lengths = vec![]; register_write_functions(&assembler)?; let gen_func = gen_write_row(&schema, &assembler)?; @@ -126,10 +129,7 @@ fn register_write_functions(asm: &Assembler) -> Result<()> { Ok(()) } -fn gen_write_row( - schema: &Arc, - assembler: &Assembler, -) -> Result { +fn gen_write_row(schema: &Schema, assembler: &Assembler) -> Result { let mut builder = assembler .new_func_builder("write_row") .param("row", PTR) diff --git a/datafusion/core/src/row/layout.rs b/datafusion/core/src/row/layout.rs index 71699d6eb3b1..c14f9fa6949d 100644 --- a/datafusion/core/src/row/layout.rs +++ b/datafusion/core/src/row/layout.rs @@ -17,26 +17,94 @@ //! Various row layout for different use case -use crate::row::{schema_null_free, var_length}; +use crate::row::schema_null_free; use arrow::datatypes::{DataType, Schema}; use arrow::util::bit_util::{ceil, round_upto_power_of_2}; -use std::sync::Arc; const UTF8_DEFAULT_SIZE: usize = 20; const BINARY_DEFAULT_SIZE: usize = 100; +#[derive(Copy, Clone, Debug)] +/// Type of a RowLayout +pub enum RowType { + /// This type of layout will store each field with minimum bytes for space efficiency. + /// Its typical use case represents a sorting payload that accesses all row fields as a unit. + Compact, + /// This type of layout will store one 8-byte word per field for CPU-friendly, + /// It is mainly used to represent the rows with frequently updated content, + /// for example, grouping state for hash aggregation. + WordAligned, + // RawComparable, +} + +/// Reveals how the fields of a record are stored in the raw-bytes format +#[derive(Debug)] +pub(crate) struct RowLayout { + /// Type of the layout + #[allow(dead_code)] + row_type: RowType, + /// If a row is null free according to its schema + pub(crate) null_free: bool, + /// The number of bytes used to store null bits for each field. + pub(crate) null_width: usize, + /// Length in bytes for `values` part of the current tuple. + pub(crate) values_width: usize, + /// Total number of fields for each tuple. + pub(crate) field_count: usize, + /// Starting offset for each fields in the raw bytes. + pub(crate) field_offsets: Vec, +} + +impl RowLayout { + pub(crate) fn new(schema: &Schema, row_type: RowType) -> Self { + assert!(row_supported(schema, row_type)); + let null_free = schema_null_free(schema); + let field_count = schema.fields().len(); + let null_width = if null_free { + 0 + } else { + match row_type { + RowType::Compact => ceil(field_count, 8), + RowType::WordAligned => round_upto_power_of_2(ceil(field_count, 8), 8), + } + }; + let (field_offsets, values_width) = match row_type { + RowType::Compact => compact_offsets(null_width, schema), + RowType::WordAligned => word_aligned_offsets(null_width, schema), + }; + Self { + row_type, + null_free, + null_width, + values_width, + field_count, + field_offsets, + } + } + + #[inline(always)] + pub(crate) fn fixed_part_width(&self) -> usize { + self.null_width + self.values_width + } +} + /// Get relative offsets for each field and total width for values -pub fn get_offsets(null_width: usize, schema: &Arc) -> (Vec, usize) { +fn compact_offsets(null_width: usize, schema: &Schema) -> (Vec, usize) { let mut offsets = vec![]; let mut offset = null_width; for f in schema.fields() { offsets.push(offset); - offset += type_width(f.data_type()); + offset += compact_type_width(f.data_type()); } (offsets, offset - null_width) } -fn type_width(dt: &DataType) -> usize { +fn var_length(dt: &DataType) -> bool { + use DataType::*; + matches!(dt, Utf8 | Binary) +} + +fn compact_type_width(dt: &DataType) -> usize { use DataType::*; if var_length(dt) { return std::mem::size_of::(); @@ -50,13 +118,27 @@ fn type_width(dt: &DataType) -> usize { } } +fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec, usize) { + let mut offsets = vec![]; + let mut offset = null_width; + for f in schema.fields() { + offsets.push(offset); + assert!(!matches!(f.data_type(), DataType::Decimal(_, _))); + // All of the current support types can fit into one single 8-bytes word. + // When we decide to support Decimal type in the future, its width would be + // of two 8-bytes words and should adapt the width calculation below. + offset += 8; + } + (offsets, offset - null_width) +} + /// Estimate row width based on schema -pub fn estimate_row_width(schema: &Arc) -> usize { - let null_free = schema_null_free(schema); - let field_count = schema.fields().len(); - let mut width = if null_free { 0 } else { ceil(field_count, 8) }; +pub(crate) fn estimate_row_width(schema: &Schema, layout: &RowLayout) -> usize { + let mut width = layout.fixed_part_width(); + if matches!(layout.row_type, RowType::WordAligned) { + return width; + } for f in schema.fields() { - width += type_width(f.data_type()); match f.data_type() { DataType::Utf8 => width += UTF8_DEFAULT_SIZE, DataType::Binary => width += BINARY_DEFAULT_SIZE, @@ -65,3 +147,58 @@ pub fn estimate_row_width(schema: &Arc) -> usize { } round_upto_power_of_2(width, 8) } + +/// Tell if we can create raw-bytes based rows since we currently +/// has limited data type supports in the row format +fn row_supported(schema: &Schema, row_type: RowType) -> bool { + schema + .fields() + .iter() + .all(|f| supported_type(f.data_type(), row_type)) +} + +fn supported_type(dt: &DataType, row_type: RowType) -> bool { + use DataType::*; + + match row_type { + RowType::Compact => { + matches!( + dt, + Boolean + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Int8 + | Int16 + | Int32 + | Int64 + | Float32 + | Float64 + | Date32 + | Date64 + | Utf8 + | Binary + ) + } + // only fixed length types are supported for fast in-place update. + RowType::WordAligned => { + matches!( + dt, + Boolean + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Int8 + | Int16 + | Int32 + | Int64 + | Float32 + | Float64 + | Date32 + | Date64 + ) + } + } +} diff --git a/datafusion/core/src/row/mod.rs b/datafusion/core/src/row/mod.rs index 1fbf01275842..f8e9ff273e23 100644 --- a/datafusion/core/src/row/mod.rs +++ b/datafusion/core/src/row/mod.rs @@ -48,61 +48,21 @@ //! use arrow::array::{make_builder, ArrayBuilder}; -use arrow::datatypes::{DataType, Schema}; +use arrow::datatypes::Schema; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; +pub use layout::RowType; use std::sync::Arc; #[cfg(feature = "jit")] -mod jit; +pub mod jit; mod layout; pub mod reader; mod validity; pub mod writer; -fn supported_type(dt: &DataType) -> bool { - use DataType::*; - matches!( - dt, - Boolean - | UInt8 - | UInt16 - | UInt32 - | UInt64 - | Int8 - | Int16 - | Int32 - | Int64 - | Float32 - | Float64 - | Date32 - | Date64 - | Utf8 - | Binary - ) -} - -/// Tell if we can create raw-bytes based rows since we currently -/// has limited data type supports in the row format -pub fn row_supported(schema: &Arc) -> bool { - schema - .fields() - .iter() - .all(|f| supported_type(f.data_type())) -} - -fn var_length(dt: &DataType) -> bool { - use DataType::*; - matches!(dt, Utf8 | Binary) -} - -/// Tell if the row is of fixed size -pub fn fixed_size(schema: &Arc) -> bool { - schema.fields().iter().all(|f| !var_length(f.data_type())) -} - /// Tell if schema contains no nullable field -pub fn schema_null_free(schema: &Arc) -> bool { +pub(crate) fn schema_null_free(schema: &Schema) -> bool { schema.fields().iter().all(|f| !f.is_nullable()) } @@ -126,7 +86,7 @@ impl MutableRecordBatch { } } -fn new_arrays(schema: &Arc, batch_size: usize) -> Vec> { +fn new_arrays(schema: &Schema, batch_size: usize) -> Vec> { schema .fields() .iter() @@ -155,6 +115,7 @@ mod tests { use crate::physical_plan::file_format::FileScanConfig; use crate::physical_plan::{collect, ExecutionPlan}; use crate::prelude::SessionContext; + use crate::row::layout::RowType::{Compact, WordAligned}; use crate::row::reader::read_as_batch; use crate::row::writer::write_batch_unchecked; use arrow::record_batch::RecordBatch; @@ -166,33 +127,33 @@ mod tests { use DataType::*; macro_rules! fn_test_single_type { - ($ARRAY: ident, $TYPE: expr, $VEC: expr) => { + ($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => { paste::item! { #[test] #[allow(non_snake_case)] - fn []() -> Result<()> { + fn []() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)])); let a = $ARRAY::from($VEC); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; let mut vector = vec![0; 1024]; let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), $ROWTYPE) }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets, $ROWTYPE)? }; assert_eq!(batch, output_batch); Ok(()) } #[test] #[allow(non_snake_case)] - fn []() -> Result<()> { + fn []() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); let v = $VEC.into_iter().filter(|o| o.is_some()).collect::>(); let a = $ARRAY::from(v); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; let mut vector = vec![0; 1024]; let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), $ROWTYPE) }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets, $ROWTYPE)? }; assert_eq!(batch, output_batch); Ok(()) } @@ -203,87 +164,202 @@ mod tests { fn_test_single_type!( BooleanArray, Boolean, - vec![Some(true), Some(false), None, Some(true), None] + vec![Some(true), Some(false), None, Some(true), None], + Compact + ); + + fn_test_single_type!( + BooleanArray, + Boolean, + vec![Some(true), Some(false), None, Some(true), None], + WordAligned ); fn_test_single_type!( Int8Array, Int8, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact + ); + + fn_test_single_type!( + Int8Array, + Int8, + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned ); fn_test_single_type!( Int16Array, Int16, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact + ); + + fn_test_single_type!( + Int16Array, + Int16, + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned + ); + + fn_test_single_type!( + Int32Array, + Int32, + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact ); fn_test_single_type!( Int32Array, Int32, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned ); fn_test_single_type!( Int64Array, Int64, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact + ); + + fn_test_single_type!( + Int64Array, + Int64, + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned ); fn_test_single_type!( UInt8Array, UInt8, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact + ); + + fn_test_single_type!( + UInt8Array, + UInt8, + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned + ); + + fn_test_single_type!( + UInt16Array, + UInt16, + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact ); fn_test_single_type!( UInt16Array, UInt16, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned ); fn_test_single_type!( UInt32Array, UInt32, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact + ); + + fn_test_single_type!( + UInt32Array, + UInt32, + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned + ); + + fn_test_single_type!( + UInt64Array, + UInt64, + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact ); fn_test_single_type!( UInt64Array, UInt64, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned ); fn_test_single_type!( Float32Array, Float32, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], + Compact + ); + + fn_test_single_type!( + Float32Array, + Float32, + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], + WordAligned ); fn_test_single_type!( Float64Array, Float64, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], + Compact + ); + + fn_test_single_type!( + Float64Array, + Float64, + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)], + WordAligned + ); + + fn_test_single_type!( + Date32Array, + Date32, + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact ); fn_test_single_type!( Date32Array, Date32, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned + ); + + fn_test_single_type!( + Date64Array, + Date64, + vec![Some(5), Some(7), None, Some(0), Some(111)], + Compact ); fn_test_single_type!( Date64Array, Date64, - vec![Some(5), Some(7), None, Some(0), Some(111)] + vec![Some(5), Some(7), None, Some(0), Some(111)], + WordAligned ); fn_test_single_type!( StringArray, Utf8, - vec![Some("hello"), Some("world"), None, Some(""), Some("")] + vec![Some("hello"), Some("world"), None, Some(""), Some("")], + Compact ); + #[test] + #[should_panic(expected = "row_supported(schema, row_type)")] + fn test_unsupported_word_aligned_type() { + let a: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world"])); + let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); + let schema = batch.schema(); + let mut vector = vec![0; 1024]; + write_batch_unchecked(&mut vector, 0, &batch, 0, schema, WordAligned); + } + #[test] fn test_single_binary() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, true)])); @@ -293,8 +369,8 @@ mod tests { let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; let mut vector = vec![0; 8192]; let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), Compact) }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? }; assert_eq!(batch, output_batch); Ok(()) } @@ -307,8 +383,8 @@ mod tests { let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; let mut vector = vec![0; 8192]; let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone(), Compact) }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? }; assert_eq!(batch, output_batch); Ok(()) } @@ -327,25 +403,47 @@ mod tests { let mut vector = vec![0; 20480]; let row_offsets = - { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; + { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), Compact) }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets, Compact)? }; + assert_eq!(*batch, output_batch); + + Ok(()) + } + + #[tokio::test] + async fn test_with_parquet_word_aligned() -> Result<()> { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]); + let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; + let schema = exec.schema().clone(); + + let batches = collect(exec, task_ctx).await?; + assert_eq!(1, batches.len()); + let batch = &batches[0]; + + let mut vector = vec![0; 20480]; + let row_offsets = { + write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone(), WordAligned) + }; + let output_batch = { read_as_batch(&vector, schema, &row_offsets, WordAligned)? }; assert_eq!(*batch, output_batch); Ok(()) } #[test] - #[should_panic(expected = "supported(schema)")] + #[should_panic(expected = "row_supported(schema, row_type)")] fn test_unsupported_type_write() { let a: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); let schema = batch.schema(); let mut vector = vec![0; 1024]; - write_batch_unchecked(&mut vector, 0, &batch, 0, schema); + write_batch_unchecked(&mut vector, 0, &batch, 0, schema, Compact); } #[test] - #[should_panic(expected = "supported(schema)")] + #[should_panic(expected = "row_supported(schema, row_type)")] fn test_unsupported_type_read() { let schema = Arc::new(Schema::new(vec![Field::new( "a", @@ -354,7 +452,7 @@ mod tests { )])); let vector = vec![0; 1024]; let row_offsets = vec![0]; - read_as_batch(&vector, schema, &row_offsets).unwrap(); + read_as_batch(&vector, schema, &row_offsets, Compact).unwrap(); } async fn get_exec( diff --git a/datafusion/core/src/row/reader.rs b/datafusion/core/src/row/reader.rs index 4d9fb3136807..abaf57c14f97 100644 --- a/datafusion/core/src/row/reader.rs +++ b/datafusion/core/src/row/reader.rs @@ -18,13 +18,13 @@ //! Accessing row from raw bytes use crate::error::{DataFusionError, Result}; -use crate::row::layout::get_offsets; +use crate::row::layout::{RowLayout, RowType}; use crate::row::validity::{all_valid, NullBitsFormatter}; -use crate::row::{row_supported, schema_null_free, MutableRecordBatch}; +use crate::row::MutableRecordBatch; use arrow::array::*; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use arrow::util::bit_util::{ceil, get_bit_raw}; +use arrow::util::bit_util::get_bit_raw; use std::sync::Arc; /// Read `data` of raw-bytes rows starting at `offsets` out to a record batch @@ -32,23 +32,24 @@ pub fn read_as_batch( data: &[u8], schema: Arc, offsets: &[usize], + row_type: RowType, ) -> Result { let row_num = offsets.len(); let mut output = MutableRecordBatch::new(row_num, schema.clone()); - let mut row = RowReader::new(&schema); + let mut row = RowReader::new(&schema, row_type); for offset in offsets.iter().take(row_num) { row.point_to(*offset, data); read_row(&row, &mut output, &schema); } - output.output().map_err(DataFusionError::ArrowError) + Ok(output.output()?) } macro_rules! get_idx { ($NATIVE: ident, $SELF: ident, $IDX: ident, $WIDTH: literal) => {{ $SELF.assert_index_valid($IDX); - let offset = $SELF.field_offsets[$IDX]; + let offset = $SELF.field_offsets()[$IDX]; let start = $SELF.base_offset + offset; let end = start + $WIDTH; $NATIVE::from_le_bytes($SELF.data[start..end].try_into().unwrap()) @@ -60,7 +61,7 @@ macro_rules! fn_get_idx { paste::item! { fn [](&self, idx: usize) -> $NATIVE { self.assert_index_valid(idx); - let offset = self.field_offsets[idx]; + let offset = self.field_offsets()[idx]; let start = self.base_offset + offset; let end = start + $WIDTH; $NATIVE::from_le_bytes(self.data[start..end].try_into().unwrap()) @@ -85,32 +86,24 @@ macro_rules! fn_get_idx_opt { /// Read the tuple `data[base_offset..]` we are currently pointing to pub struct RowReader<'a> { + /// Layout on how to read each field + layout: RowLayout, /// Raw bytes slice where the tuple stores data: &'a [u8], /// Start position for the current tuple in the raw bytes slice. base_offset: usize, - /// Total number of fields for each tuple. - field_count: usize, - /// The number of bytes used to store null bits for each field. - null_width: usize, - /// Starting offset for each fields in the raw bytes. - /// For fixed length fields, it's where the actual data stores. - /// For variable length fields, it's a pack of (offset << 32 | length) if we use u64. - field_offsets: Vec, - /// If a row is null free according to its schema - null_free: bool, } impl<'a> std::fmt::Debug for RowReader<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if self.null_free { + if self.null_free() { write!(f, "null_free") } else { let null_bits = self.null_bits(); write!( f, "{:?}", - NullBitsFormatter::new(null_bits, self.field_count) + NullBitsFormatter::new(null_bits, self.layout.field_count) ) } } @@ -118,19 +111,11 @@ impl<'a> std::fmt::Debug for RowReader<'a> { impl<'a> RowReader<'a> { /// new - pub fn new(schema: &Arc) -> Self { - assert!(row_supported(schema)); - let null_free = schema_null_free(schema); - let field_count = schema.fields().len(); - let null_width = if null_free { 0 } else { ceil(field_count, 8) }; - let (field_offsets, _) = get_offsets(null_width, schema); + pub fn new(schema: &Schema, row_type: RowType) -> Self { Self { + layout: RowLayout::new(schema, row_type), data: &[], base_offset: 0, - field_count, - null_width, - field_offsets, - null_free, } } @@ -142,26 +127,36 @@ impl<'a> RowReader<'a> { #[inline] fn assert_index_valid(&self, idx: usize) { - assert!(idx < self.field_count); + assert!(idx < self.layout.field_count); + } + + #[inline(always)] + fn field_offsets(&self) -> &[usize] { + &self.layout.field_offsets + } + + #[inline(always)] + fn null_free(&self) -> bool { + self.layout.null_free } #[inline(always)] fn null_bits(&self) -> &[u8] { - if self.null_free { + if self.null_free() { &[] } else { let start = self.base_offset; - &self.data[start..start + self.null_width] + &self.data[start..start + self.layout.null_width] } } #[inline(always)] fn all_valid(&self) -> bool { - if self.null_free { + if self.null_free() { true } else { let null_bits = self.null_bits(); - all_valid(null_bits, self.field_count) + all_valid(null_bits, self.layout.field_count) } } @@ -171,14 +166,14 @@ impl<'a> RowReader<'a> { fn get_bool(&self, idx: usize) -> bool { self.assert_index_valid(idx); - let offset = self.field_offsets[idx]; + let offset = self.field_offsets()[idx]; let value = &self.data[self.base_offset + offset..]; value[0] != 0 } fn get_u8(&self, idx: usize) -> u8 { self.assert_index_valid(idx); - let offset = self.field_offsets[idx]; + let offset = self.field_offsets()[idx]; self.data[self.base_offset + offset] } @@ -257,8 +252,8 @@ impl<'a> RowReader<'a> { } /// Read the row currently pointed by RowWriter to the output columnar batch buffer -pub fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Arc) { - if row.null_free || row.all_valid() { +pub fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Schema) { + if row.all_valid() { for ((col_idx, to), field) in batch .arrays .iter_mut() diff --git a/datafusion/core/src/row/writer.rs b/datafusion/core/src/row/writer.rs index 9cb208d03a21..920eb9963404 100644 --- a/datafusion/core/src/row/writer.rs +++ b/datafusion/core/src/row/writer.rs @@ -18,12 +18,11 @@ //! Reusable row writer backed by Vec to stitch attributes together use crate::error::Result; -use crate::row::layout::{estimate_row_width, get_offsets}; -use crate::row::{fixed_size, row_supported, schema_null_free}; +use crate::row::layout::{estimate_row_width, RowLayout, RowType}; use arrow::array::*; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use arrow::util::bit_util::{ceil, round_upto_power_of_2, set_bit_raw, unset_bit_raw}; +use arrow::util::bit_util::{round_upto_power_of_2, set_bit_raw, unset_bit_raw}; use std::cmp::max; use std::sync::Arc; @@ -37,8 +36,9 @@ pub fn write_batch_unchecked( batch: &RecordBatch, row_idx: usize, schema: Arc, + row_type: RowType, ) -> Vec { - let mut writer = RowWriter::new(&schema); + let mut writer = RowWriter::new(&schema, row_type); let mut current_offset = offset; let mut offsets = vec![]; let columns = batch.columns(); @@ -58,8 +58,9 @@ pub fn write_batch_unchecked( pub fn bench_write_batch( batches: &[Vec], schema: Arc, + row_type: RowType, ) -> Result> { - let mut writer = RowWriter::new(&schema); + let mut writer = RowWriter::new(&schema, row_type); let mut lengths = vec![]; for batch in batches.iter().flatten() { @@ -77,7 +78,7 @@ pub fn bench_write_batch( macro_rules! set_idx { ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{ $SELF.assert_index_valid($IDX); - let offset = $SELF.field_offsets[$IDX]; + let offset = $SELF.field_offsets()[$IDX]; $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes()); }}; } @@ -87,7 +88,7 @@ macro_rules! fn_set_idx { paste::item! { fn [](&mut self, idx: usize, value: $NATIVE) { self.assert_index_valid(idx); - let offset = self.field_offsets[idx]; + let offset = self.field_offsets()[idx]; self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes()); } } @@ -96,51 +97,30 @@ macro_rules! fn_set_idx { /// Reusable row writer backed by Vec pub struct RowWriter { + /// Layout on how to write each field + layout: RowLayout, /// buffer for the current tuple been written. data: Vec, - /// Total number of fields for each tuple. - field_count: usize, /// Length in bytes for the current tuple, 8-bytes word aligned. pub(crate) row_width: usize, - /// The number of bytes used to store null bits for each field. - null_width: usize, - /// Length in bytes for `values` part of the current tuple. - values_width: usize, /// Length in bytes for `variable length data` part of the current tuple. varlena_width: usize, /// Current offset for the next variable length field to write to. varlena_offset: usize, - /// Starting offset for each fields in the raw bytes. - /// For fixed length fields, it's where the actual data stores. - /// For variable length fields, it's a pack of (offset << 32 | length) if we use u64. - field_offsets: Vec, - /// If a row is null free according to its schema - null_free: bool, } impl RowWriter { /// new - pub fn new(schema: &Arc) -> Self { - assert!(row_supported(schema)); - let null_free = schema_null_free(schema); - let field_count = schema.fields().len(); - let null_width = if null_free { 0 } else { ceil(field_count, 8) }; - let (field_offsets, values_width) = get_offsets(null_width, schema); - let mut init_capacity = estimate_row_width(schema); - if !fixed_size(schema) { - // double the capacity to avoid repeated resize - init_capacity *= 2; - } + pub fn new(schema: &Schema, row_type: RowType) -> Self { + let layout = RowLayout::new(schema, row_type); + let init_capacity = estimate_row_width(schema, &layout); + let varlena_offset = layout.fixed_part_width(); Self { + layout, data: vec![0; init_capacity], - field_count, row_width: 0, - null_width, - values_width, varlena_width: 0, - varlena_offset: null_width + values_width, - field_offsets, - null_free, + varlena_offset, } } @@ -149,20 +129,30 @@ impl RowWriter { self.data.fill(0); self.row_width = 0; self.varlena_width = 0; - self.varlena_offset = self.null_width + self.values_width; + self.varlena_offset = self.layout.fixed_part_width(); } #[inline] fn assert_index_valid(&self, idx: usize) { - assert!(idx < self.field_count); + assert!(idx < self.layout.field_count); + } + + #[inline(always)] + fn field_offsets(&self) -> &[usize] { + &self.layout.field_offsets + } + + #[inline(always)] + fn null_free(&self) -> bool { + self.layout.null_free } pub(crate) fn set_null_at(&mut self, idx: usize) { assert!( - !self.null_free, + !self.null_free(), "Unexpected call to set_null_at on null-free row writer" ); - let null_bits = &mut self.data[0..self.null_width]; + let null_bits = &mut self.data[0..self.layout.null_width]; unsafe { unset_bit_raw(null_bits.as_mut_ptr(), idx); } @@ -170,10 +160,10 @@ impl RowWriter { pub(crate) fn set_non_null_at(&mut self, idx: usize) { assert!( - !self.null_free, + !self.null_free(), "Unexpected call to set_non_null_at on null-free row writer" ); - let null_bits = &mut self.data[0..self.null_width]; + let null_bits = &mut self.data[0..self.layout.null_width]; unsafe { set_bit_raw(null_bits.as_mut_ptr(), idx); } @@ -181,13 +171,13 @@ impl RowWriter { fn set_bool(&mut self, idx: usize, value: bool) { self.assert_index_valid(idx); - let offset = self.field_offsets[idx]; + let offset = self.field_offsets()[idx]; self.data[offset] = if value { 1 } else { 0 }; } fn set_u8(&mut self, idx: usize, value: u8) { self.assert_index_valid(idx); - let offset = self.field_offsets[idx]; + let offset = self.field_offsets()[idx]; self.data[offset] = value; } @@ -202,7 +192,7 @@ impl RowWriter { fn set_i8(&mut self, idx: usize, value: i8) { self.assert_index_valid(idx); - let offset = self.field_offsets[idx]; + let offset = self.field_offsets()[idx]; self.data[offset] = value.to_le_bytes()[0]; } @@ -241,7 +231,7 @@ impl RowWriter { } fn current_width(&self) -> usize { - self.null_width + self.values_width + self.varlena_width + self.layout.fixed_part_width() + self.varlena_width } /// End each row at 8-byte word boundary. @@ -263,11 +253,11 @@ impl RowWriter { pub fn write_row( row: &mut RowWriter, row_idx: usize, - schema: &Arc, + schema: &Schema, columns: &[ArrayRef], ) -> usize { // Get the row from the batch denoted by row_idx - if row.null_free { + if row.null_free() { for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) { write_field(i, row_idx, col, f.data_type(), row); } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 4bdf6d666d71..6927178281a9 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -216,7 +216,7 @@ pub fn assert_is_pending<'a, T>(fut: &mut Pin + Send } /// Create vector batches -pub fn create_vec_batches(schema: &Arc, n: usize) -> Vec { +pub fn create_vec_batches(schema: &Schema, n: usize) -> Vec { let batch = create_batch(schema); let mut vec = Vec::with_capacity(n); for _ in 0..n { @@ -226,9 +226,9 @@ pub fn create_vec_batches(schema: &Arc, n: usize) -> Vec { } /// Create batch -fn create_batch(schema: &Arc) -> RecordBatch { +fn create_batch(schema: &Schema) -> RecordBatch { RecordBatch::try_new( - schema.clone(), + Arc::new(schema.clone()), vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))], ) .unwrap() diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index ef6ea52dd774..a124311aa4ff 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -16,6 +16,7 @@ // under the License. use super::*; +use datafusion::physical_plan::display::DisplayableExecutionPlan; #[tokio::test] async fn explain_analyze_baseline_metrics() { @@ -136,8 +137,16 @@ async fn explain_analyze_baseline_metrics() { } let metrics = plan.metrics().unwrap().aggregate_by_partition(); - assert!(metrics.output_rows().unwrap() > 0); - assert!(metrics.elapsed_compute().unwrap() > 0); + assert!( + metrics.output_rows().unwrap() > 0, + "plan: {}", + DisplayableExecutionPlan::with_metrics(plan).one_line() + ); + assert!( + metrics.elapsed_compute().unwrap() > 0, + "plan: {}", + DisplayableExecutionPlan::with_metrics(plan).one_line() + ); let mut saw_start = false; let mut saw_end = false;