Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce RowLayout to represent rows for different purposes #2261

Merged
merged 7 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ path = "src/lib.rs"
# Used to enable the avro format
avro = ["avro-rs", "num-traits", "datafusion-common/avro"]
crypto_expressions = ["datafusion-physical-expr/crypto_expressions"]
default = ["crypto_expressions", "regex_expressions", "unicode_expressions"]
default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "row", "jit"]
# Used for testing ONLY: causes all values to hash to the same value (test for collisions)
force_hash_collisions = []
# Used to enable JIT code generation
Expand Down
32 changes: 26 additions & 6 deletions datafusion/core/benches/jit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ extern crate datafusion;
mod data_utils;
use crate::criterion::Criterion;
use crate::data_utils::{create_record_batches, create_schema};
use datafusion::row::jit::writer::bench_write_compact_batch_jit;
use datafusion::row::writer::bench_write_compact_batch;
use datafusion::row::jit::writer::bench_write_batch_jit;
use datafusion::row::writer::bench_write_batch;
use datafusion::row::RowType;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -36,18 +37,37 @@ fn criterion_benchmark(c: &mut Criterion) {
let batches =
create_record_batches(schema.clone(), array_len, partitions_len, batch_size);

c.bench_function("row serializer", |b| {
c.bench_function("compact row serializer", |b| {
b.iter(|| {
criterion::black_box(
bench_write_compact_batch(&batches, schema.clone()).unwrap(),
bench_write_batch(&batches, schema.clone(), RowType::Compact).unwrap(),
)
})
});

c.bench_function("row serializer jit", |b| {
c.bench_function("word aligned row serializer", |b| {
b.iter(|| {
criterion::black_box(
bench_write_compact_batch_jit(&batches, schema.clone()).unwrap(),
bench_write_batch(&batches, schema.clone(), RowType::WordAligned)
.unwrap(),
)
})
});

c.bench_function("compact row serializer jit", |b| {
b.iter(|| {
criterion::black_box(
bench_write_batch_jit(&batches, schema.clone(), RowType::Compact)
.unwrap(),
)
})
});

c.bench_function("word aligned row serializer jit", |b| {
b.iter(|| {
criterion::black_box(
bench_write_batch_jit(&batches, schema.clone(), RowType::WordAligned)
.unwrap(),
)
})
});
Expand Down
168 changes: 137 additions & 31 deletions datafusion/core/src/row/jit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,44 +44,45 @@ fn fn_name<T>(f: T) -> &'static str {
#[cfg(test)]
mod tests {
use crate::error::Result;
use crate::row::jit::reader::read_compact_rows_as_batch_jit;
use crate::row::jit::writer::write_compact_batch_unchecked_jit;
use crate::row::jit::reader::read_as_batch_jit;
use crate::row::jit::writer::write_batch_unchecked_jit;
use crate::row::layout::RowType::{Compact, WordAligned};
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
use datafusion_jit::api::Assembler;
use std::sync::Arc;
use DataType::*;

macro_rules! fn_test_single_type {
($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
paste::item! {
#[test]
#[allow(non_snake_case)]
fn [<test_single_ $TYPE _jit>]() -> Result<()> {
fn [<test_ $ROWTYPE _single_ $TYPE _jit>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)]));
let a = $ARRAY::from($VEC);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
{ write_compact_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
let output_batch = { read_compact_rows_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
assert_eq!(batch, output_batch);
Ok(())
}

#[test]
#[allow(non_snake_case)]
fn [<test_single_ $TYPE _jit_null_free>]() -> Result<()> {
fn [<test_ $ROWTYPE _single_ $TYPE _jit_null_free>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
let v = $VEC.into_iter().filter(|o| o.is_some()).collect::<Vec<_>>();
let a = $ARRAY::from(v);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
{ write_compact_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
let output_batch = { read_compact_rows_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand All @@ -92,85 +93,190 @@ mod tests {
fn_test_single_type!(
BooleanArray,
Boolean,
vec![Some(true), Some(false), None, Some(true), None]
vec![Some(true), Some(false), None, Some(true), None],
Compact
);

fn_test_single_type!(
BooleanArray,
Boolean,
vec![Some(true), Some(false), None, Some(true), None],
WordAligned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌 very nice

);

fn_test_single_type!(
Int8Array,
Int8,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int8Array,
Int8,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Int16Array,
Int16,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int16Array,
Int16,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Int32Array,
Int32,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int32Array,
Int32,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Int64Array,
Int64,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int64Array,
Int64,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt8Array,
UInt8,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt8Array,
UInt8,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt16Array,
UInt16,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt16Array,
UInt16,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt32Array,
UInt32,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt32Array,
UInt32,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt64Array,
UInt64,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt64Array,
UInt64,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Float32Array,
Float32,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
Compact
);

fn_test_single_type!(
Float32Array,
Float32,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
WordAligned
);

fn_test_single_type!(
Float64Array,
Float64,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
Compact
);

fn_test_single_type!(
Float64Array,
Float64,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
WordAligned
);

fn_test_single_type!(
Date32Array,
Date32,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Date32Array,
Date32,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Date64Array,
Date64,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Date64Array,
Date64,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
StringArray,
Utf8,
vec![Some("hello"), Some("world"), None, Some(""), Some("")]
vec![Some("hello"), Some("world"), None, Some(""), Some("")],
Compact
);

#[test]
Expand All @@ -183,18 +289,18 @@ mod tests {
let mut vector = vec![0; 8192];
let assembler = Assembler::default();
let row_offsets = {
write_compact_batch_unchecked_jit(
write_batch_unchecked_jit(
&mut vector,
0,
&batch,
0,
schema.clone(),
&assembler,
Compact,
)?
};
let output_batch = {
read_compact_rows_as_batch_jit(&vector, schema, &row_offsets, &assembler)?
};
let output_batch =
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand All @@ -208,18 +314,18 @@ mod tests {
let mut vector = vec![0; 8192];
let assembler = Assembler::default();
let row_offsets = {
write_compact_batch_unchecked_jit(
write_batch_unchecked_jit(
&mut vector,
0,
&batch,
0,
schema.clone(),
&assembler,
Compact,
)?
};
let output_batch = {
read_compact_rows_as_batch_jit(&vector, schema, &row_offsets, &assembler)?
};
let output_batch =
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/row/jit/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ use std::sync::Arc;

/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch

pub fn read_compact_rows_as_batch_jit(
pub fn read_as_batch_jit(
data: &[u8],
schema: Arc<Schema>,
offsets: &[usize],
assembler: &Assembler,
row_type: RowType,
) -> Result<RecordBatch> {
let row_num = offsets.len();
let mut output = MutableRecordBatch::new(row_num, schema.clone());
let mut row = RowReader::new(&schema, RowType::Compact);
let mut row = RowReader::new(&schema, row_type);
register_read_functions(assembler)?;
let gen_func = gen_read_row(&schema, assembler)?;
let mut jit = assembler.create_jit();
Expand Down
Loading