Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce RowLayout to represent rows for different purposes #2261

Merged
merged 7 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions datafusion/core/benches/jit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ extern crate datafusion;
mod data_utils;
use crate::criterion::Criterion;
use crate::data_utils::{create_record_batches, create_schema};
use datafusion::row::writer::{bench_write_batch, bench_write_batch_jit};
use datafusion::row::jit::writer::bench_write_batch_jit;
use datafusion::row::writer::bench_write_batch;
use datafusion::row::RowType;
use std::sync::Arc;

fn criterion_benchmark(c: &mut Criterion) {
Expand All @@ -35,15 +37,38 @@ fn criterion_benchmark(c: &mut Criterion) {
let batches =
create_record_batches(schema.clone(), array_len, partitions_len, batch_size);

c.bench_function("row serializer", |b| {
c.bench_function("compact row serializer", |b| {
b.iter(|| {
criterion::black_box(bench_write_batch(&batches, schema.clone()).unwrap())
criterion::black_box(
bench_write_batch(&batches, schema.clone(), RowType::Compact).unwrap(),
)
})
});

c.bench_function("row serializer jit", |b| {
c.bench_function("word aligned row serializer", |b| {
b.iter(|| {
criterion::black_box(bench_write_batch_jit(&batches, schema.clone()).unwrap())
criterion::black_box(
bench_write_batch(&batches, schema.clone(), RowType::WordAligned)
.unwrap(),
)
})
});

c.bench_function("compact row serializer jit", |b| {
b.iter(|| {
criterion::black_box(
bench_write_batch_jit(&batches, schema.clone(), RowType::Compact)
.unwrap(),
)
})
});

c.bench_function("word aligned row serializer jit", |b| {
b.iter(|| {
criterion::black_box(
bench_write_batch_jit(&batches, schema.clone(), RowType::WordAligned)
.unwrap(),
)
})
});
}
Expand Down
158 changes: 133 additions & 25 deletions datafusion/core/src/row/jit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

//! Just-In-Time(JIT) version for row reader and writers

mod reader;
mod writer;
pub mod reader;
pub mod writer;

#[macro_export]
/// register external functions to the assembler
Expand Down Expand Up @@ -46,42 +46,43 @@ mod tests {
use crate::error::Result;
use crate::row::jit::reader::read_as_batch_jit;
use crate::row::jit::writer::write_batch_unchecked_jit;
use crate::row::layout::RowType::{Compact, WordAligned};
use arrow::record_batch::RecordBatch;
use arrow::{array::*, datatypes::*};
use datafusion_jit::api::Assembler;
use std::sync::Arc;
use DataType::*;

macro_rules! fn_test_single_type {
($ARRAY: ident, $TYPE: expr, $VEC: expr) => {
($ARRAY: ident, $TYPE: expr, $VEC: expr, $ROWTYPE: expr) => {
paste::item! {
#[test]
#[allow(non_snake_case)]
fn [<test_single_ $TYPE _jit>]() -> Result<()> {
fn [<test_ $ROWTYPE _single_ $TYPE _jit>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)]));
let a = $ARRAY::from($VEC);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
assert_eq!(batch, output_batch);
Ok(())
}

#[test]
#[allow(non_snake_case)]
fn [<test_single_ $TYPE _jit_null_free>]() -> Result<()> {
fn [<test_ $ROWTYPE _single_ $TYPE _jit_null_free>]() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)]));
let v = $VEC.into_iter().filter(|o| o.is_some()).collect::<Vec<_>>();
let a = $ARRAY::from(v);
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?;
let mut vector = vec![0; 1024];
let assembler = Assembler::default();
let row_offsets =
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ write_batch_unchecked_jit(&mut vector, 0, &batch, 0, schema.clone(), &assembler, $ROWTYPE)? };
let output_batch = { read_as_batch_jit(&vector, schema, &row_offsets, &assembler, $ROWTYPE)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand All @@ -92,85 +93,190 @@ mod tests {
fn_test_single_type!(
BooleanArray,
Boolean,
vec![Some(true), Some(false), None, Some(true), None]
vec![Some(true), Some(false), None, Some(true), None],
Compact
);

fn_test_single_type!(
BooleanArray,
Boolean,
vec![Some(true), Some(false), None, Some(true), None],
WordAligned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌 very nice

);

fn_test_single_type!(
Int8Array,
Int8,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int8Array,
Int8,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Int16Array,
Int16,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int16Array,
Int16,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Int32Array,
Int32,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int32Array,
Int32,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Int64Array,
Int64,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Int64Array,
Int64,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt8Array,
UInt8,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt8Array,
UInt8,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt16Array,
UInt16,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt16Array,
UInt16,
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt32Array,
UInt32,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt32Array,
UInt32,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
UInt64Array,
UInt64,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
UInt64Array,
UInt64,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Float32Array,
Float32,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
Compact
);

fn_test_single_type!(
Float32Array,
Float32,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
WordAligned
);

fn_test_single_type!(
Float64Array,
Float64,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
Compact
);

fn_test_single_type!(
Float64Array,
Float64,
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)]
vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)],
WordAligned
);

fn_test_single_type!(
Date32Array,
Date32,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Date32Array,
Date32,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
Date64Array,
Date64,
vec![Some(5), Some(7), None, Some(0), Some(111)],
Compact
);

fn_test_single_type!(
Date64Array,
Date64,
vec![Some(5), Some(7), None, Some(0), Some(111)]
vec![Some(5), Some(7), None, Some(0), Some(111)],
WordAligned
);

fn_test_single_type!(
StringArray,
Utf8,
vec![Some("hello"), Some("world"), None, Some(""), Some("")]
vec![Some("hello"), Some("world"), None, Some(""), Some("")],
Compact
);

#[test]
Expand All @@ -190,10 +296,11 @@ mod tests {
0,
schema.clone(),
&assembler,
Compact,
)?
};
let output_batch =
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand All @@ -214,10 +321,11 @@ mod tests {
0,
schema.clone(),
&assembler,
Compact,
)?
};
let output_batch =
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler)? };
{ read_as_batch_jit(&vector, schema, &row_offsets, &assembler, Compact)? };
assert_eq!(batch, output_batch);
Ok(())
}
Expand Down
9 changes: 4 additions & 5 deletions datafusion/core/src/row/jit/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use crate::error::{DataFusionError, Result};
use crate::reg_fn;
use crate::row::jit::fn_name;
use crate::row::layout::RowType;
use crate::row::reader::RowReader;
use crate::row::reader::*;
use crate::row::MutableRecordBatch;
Expand All @@ -38,10 +39,11 @@ pub fn read_as_batch_jit(
schema: Arc<Schema>,
offsets: &[usize],
assembler: &Assembler,
row_type: RowType,
) -> Result<RecordBatch> {
let row_num = offsets.len();
let mut output = MutableRecordBatch::new(row_num, schema.clone());
let mut row = RowReader::new(&schema);
let mut row = RowReader::new(&schema, row_type);
register_read_functions(assembler)?;
let gen_func = gen_read_row(&schema, assembler)?;
let mut jit = assembler.create_jit();
Expand Down Expand Up @@ -102,10 +104,7 @@ fn register_read_functions(asm: &Assembler) -> Result<()> {
Ok(())
}

fn gen_read_row(
schema: &Arc<Schema>,
assembler: &Assembler,
) -> Result<GeneratedFunction> {
fn gen_read_row(schema: &Schema, assembler: &Assembler) -> Result<GeneratedFunction> {
use DataType::*;
let mut builder = assembler
.new_func_builder("read_row")
Expand Down
Loading