diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index f0b674ef956a..dc51fd5c3b7c 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -48,6 +48,8 @@ pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"] force_hash_collisions = [] # Used to enable the avro format avro = ["avro-rs", "num-traits", "datafusion-common/avro"] +# Used to enable row format experiment +row = [] [dependencies] datafusion-common = { path = "../datafusion-common", version = "7.0.0" } diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index f0e29bb434cc..61afa09791fd 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -226,6 +226,8 @@ pub use arrow; pub use parquet; pub(crate) mod field_util; +#[cfg(feature = "row")] +pub(crate) mod row; pub mod from_slice; diff --git a/datafusion/src/row/mod.rs b/datafusion/src/row/mod.rs new file mode 100644 index 000000000000..9875b84975e2 --- /dev/null +++ b/datafusion/src/row/mod.rs @@ -0,0 +1,486 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! An implementation of Row backed by raw bytes +//! +//! Each tuple consists of up to three parts: [null bit set] [values] [var length data] +//! +//! The null bit set is used for null tracking and is aligned to 1-byte. It stores +//! one bit per field. +//! +//! In the region of the values, we store the fields in the order they are defined in the schema. +//! - For fixed-length, sequential access fields, we store them directly. +//! E.g., 4 bytes for int and 1 byte for bool. +//! - For fixed-length, update often fields, we store one 8-byte word per field. +//! - For fields of non-primitive or variable-length types, +//! we append their actual content to the end of the var length region and +//! store their offset relative to row base and their length, packed into an 8-byte word. +//! +//! ┌────────────────┬──────────────────────────┬───────────────────────┐ ┌───────────────────────┬────────────┐ +//! │Validity Bitmask│ Fixed Width Field │ Variable Width Field │ ... │ vardata area │ padding │ +//! │ (byte aligned) │ (native type width) │(vardata offset + len) │ │ (variable length) │ bytes │ +//! └────────────────┴──────────────────────────┴───────────────────────┘ └───────────────────────┴────────────┘ +//! +//! For example, given the schema (Int8, Utf8, Float32, Utf8) +//! +//! Encoding the tuple (1, "FooBar", NULL, "baz") +//! +//! Requires 32 bytes (31 bytes payload and 1 byte padding to make each tuple 8-bytes aligned): +//! +//! ┌──────────┬──────────┬──────────────────────┬──────────────┬──────────────────────┬───────────────────────┬──────────┐ +//! │0b00001011│ 0x01 │0x00000016 0x00000006│ 0x00000000 │0x0000001C 0x00000003│ FooBarbaz │ 0x00 │ +//! └──────────┴──────────┴──────────────────────┴──────────────┴──────────────────────┴───────────────────────┴──────────┘ +//! 0 1 2 10 14 22 31 32 +//! + +use arrow::datatypes::{DataType, Schema}; +use arrow::util::bit_util::{get_bit_raw, round_upto_power_of_2}; +use std::fmt::Write; +use std::sync::Arc; + +mod reader; +mod writer; + +const ALL_VALID_MASK: [u8; 8] = [1, 3, 7, 15, 31, 63, 127, 255]; + +const UTF8_DEFAULT_SIZE: usize = 20; +const BINARY_DEFAULT_SIZE: usize = 100; + +/// Returns if all fields are valid +pub fn all_valid(data: &[u8], n: usize) -> bool { + for item in data.iter().take(n / 8) { + if *item != ALL_VALID_MASK[7] { + return false; + } + } + if n % 8 == 0 { + true + } else { + data[n / 8] == ALL_VALID_MASK[n % 8 - 1] + } +} + +/// Show null bit for each field in a tuple, 1 for valid and 0 for null. +/// For a tuple with nine total fields, valid at field 0, 6, 7, 8 shows as `[10000011, 1]`. +pub struct NullBitsFormatter<'a> { + null_bits: &'a [u8], + field_count: usize, +} + +impl<'a> NullBitsFormatter<'a> { + /// new + pub fn new(null_bits: &'a [u8], field_count: usize) -> Self { + Self { + null_bits, + field_count, + } + } +} + +impl<'a> std::fmt::Debug for NullBitsFormatter<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut is_first = true; + let data = self.null_bits; + for i in 0..self.field_count { + if is_first { + f.write_char('[')?; + is_first = false; + } else if i % 8 == 0 { + f.write_str(", ")?; + } + if unsafe { get_bit_raw(data.as_ptr(), i) } { + f.write_char('1')?; + } else { + f.write_char('0')?; + } + } + f.write_char(']')?; + Ok(()) + } +} + +/// Get relative offsets for each field and total width for values +fn get_offsets(null_width: usize, schema: &Arc) -> (Vec, usize) { + let mut offsets = vec![]; + let mut offset = null_width; + for f in schema.fields() { + offsets.push(offset); + offset += type_width(f.data_type()); + } + (offsets, offset - null_width) +} + +fn supported_type(dt: &DataType) -> bool { + use DataType::*; + matches!( + dt, + Boolean + | UInt8 + | UInt16 + | UInt32 + | UInt64 + | Int8 + | Int16 + | Int32 + | Int64 + | Float32 + | Float64 + | Date32 + | Date64 + | Utf8 + | Binary + ) +} + +fn var_length(dt: &DataType) -> bool { + use DataType::*; + matches!(dt, Utf8 | Binary) +} + +fn type_width(dt: &DataType) -> usize { + use DataType::*; + if var_length(dt) { + return std::mem::size_of::(); + } + match dt { + Boolean | UInt8 | Int8 => 1, + UInt16 | Int16 => 2, + UInt32 | Int32 | Float32 | Date32 => 4, + UInt64 | Int64 | Float64 | Date64 => 8, + _ => unreachable!(), + } +} + +fn estimate_row_width(null_width: usize, schema: &Arc) -> usize { + let mut width = null_width; + for f in schema.fields() { + width += type_width(f.data_type()); + match f.data_type() { + DataType::Utf8 => width += UTF8_DEFAULT_SIZE, + DataType::Binary => width += BINARY_DEFAULT_SIZE, + _ => {} + } + } + round_upto_power_of_2(width, 8) +} + +fn fixed_size(schema: &Arc) -> bool { + schema.fields().iter().all(|f| !var_length(f.data_type())) +} + +fn supported(schema: &Arc) -> bool { + schema + .fields() + .iter() + .all(|f| supported_type(f.data_type())) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::datasource::file_format::parquet::ParquetFormat; + use crate::datasource::file_format::FileFormat; + use crate::datasource::object_store::local::{ + local_object_reader, local_object_reader_stream, local_unpartitioned_file, + LocalFileSystem, + }; + use crate::error::Result; + use crate::execution::runtime_env::RuntimeEnv; + use crate::physical_plan::file_format::FileScanConfig; + use crate::physical_plan::{collect, ExecutionPlan}; + use crate::row::reader::read_as_batch; + use crate::row::writer::write_batch_unchecked; + use arrow::record_batch::RecordBatch; + use arrow::util::bit_util::{ceil, set_bit_raw, unset_bit_raw}; + use arrow::{array::*, datatypes::*}; + use rand::Rng; + use DataType::*; + + fn test_validity(bs: &[bool]) { + let n = bs.len(); + let mut data = vec![0; ceil(n, 8)]; + for (i, b) in bs.iter().enumerate() { + if *b { + let data_argument = &mut data; + unsafe { + set_bit_raw(data_argument.as_mut_ptr(), i); + }; + } else { + let data_argument = &mut data; + unsafe { + unset_bit_raw(data_argument.as_mut_ptr(), i); + }; + } + } + let expected = bs.iter().all(|f| *f); + assert_eq!(all_valid(&data, bs.len()), expected); + } + + #[test] + fn test_all_valid() { + let sizes = [4, 8, 12, 16, 19, 23, 32, 44]; + for i in sizes { + { + // contains false + let input = { + let mut rng = rand::thread_rng(); + let mut input: Vec = vec![false; i]; + rng.fill(&mut input[..]); + input + }; + test_validity(&input); + } + + { + // all true + let input = vec![true; i]; + test_validity(&input); + } + } + } + + #[test] + fn test_formatter() -> std::fmt::Result { + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[0b11000001], 8)), + "[10000011]" + ); + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1], 9)), + "[10000011, 1]" + ); + assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 2)), "[10]"); + assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 3)), "[100]"); + assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 4)), "[1000]"); + assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 5)), "[10000]"); + assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 6)), "[100000]"); + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[1], 7)), + "[1000000]" + ); + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[1], 8)), + "[10000000]" + ); + // extra bytes are ignored + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1, 1, 1], 9)), + "[10000011, 1]" + ); + assert_eq!( + format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1, 1], 16)), + "[10000011, 10000000]" + ); + Ok(()) + } + + macro_rules! fn_test_single_type { + ($ARRAY: ident, $TYPE: expr, $VEC: expr) => { + paste::item! { + #[test] + #[allow(non_snake_case)] + fn []() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); + let a = $ARRAY::from($VEC); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 1024]; + let row_offsets = + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; + let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? }; + assert_eq!(batch, output_batch); + Ok(()) + } + } + }; + } + + fn_test_single_type!( + BooleanArray, + Boolean, + vec![Some(true), Some(false), None, Some(true), None] + ); + + fn_test_single_type!( + Int8Array, + Int8, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Int16Array, + Int16, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Int32Array, + Int32, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Int64Array, + Int64, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt8Array, + UInt8, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt16Array, + UInt16, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt32Array, + UInt32, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + UInt64Array, + UInt64, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Float32Array, + Float32, + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] + ); + + fn_test_single_type!( + Float64Array, + Float64, + vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] + ); + + fn_test_single_type!( + Date32Array, + Date32, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + Date64Array, + Date64, + vec![Some(5), Some(7), None, Some(0), Some(111)] + ); + + fn_test_single_type!( + StringArray, + Utf8, + vec![Some("hello"), Some("world"), None, Some(""), Some("")] + ); + + #[test] + fn test_single_binary() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", Binary, false)])); + let values: Vec> = + vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")]; + let a = BinaryArray::from_opt_vec(values); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; + let mut vector = vec![0; 8192]; + let row_offsets = + { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; + let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? }; + assert_eq!(batch, output_batch); + Ok(()) + } + + #[tokio::test] + async fn test_with_parquet() -> Result<()> { + let runtime = Arc::new(RuntimeEnv::default()); + let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; + let schema = exec.schema().clone(); + + let batches = collect(exec, runtime).await?; + assert_eq!(1, batches.len()); + let batch = &batches[0]; + + let mut vector = vec![0; 20480]; + let row_offsets = + { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) }; + let output_batch = { read_as_batch(&mut vector, schema, row_offsets)? }; + assert_eq!(*batch, output_batch); + + Ok(()) + } + + #[test] + #[should_panic(expected = "supported(schema)")] + fn test_unsupported_type_write() { + let a: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); + let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); + let schema = batch.schema(); + let mut vector = vec![0; 1024]; + write_batch_unchecked(&mut vector, 0, &batch, 0, schema); + } + + #[test] + #[should_panic(expected = "supported(schema)")] + fn test_unsupported_type_read() { + let schema = Arc::new(Schema::new(vec![Field::new( + "a", + DataType::Decimal(5, 2), + false, + )])); + let mut vector = vec![0; 1024]; + let row_offsets = vec![0]; + read_as_batch(&mut vector, schema, row_offsets).unwrap(); + } + + async fn get_exec( + file_name: &str, + projection: &Option>, + limit: Option, + ) -> Result> { + let testdata = crate::test_util::parquet_test_data(); + let filename = format!("{}/{}", testdata, file_name); + let format = ParquetFormat::default(); + let file_schema = format + .infer_schema(local_object_reader_stream(vec![filename.clone()])) + .await + .expect("Schema inference"); + let statistics = format + .infer_stats(local_object_reader(filename.clone())) + .await + .expect("Stats inference"); + let file_groups = vec![vec![local_unpartitioned_file(filename.clone())]]; + let exec = format + .create_physical_plan( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_schema, + file_groups, + statistics, + projection: projection.clone(), + limit, + table_partition_cols: vec![], + }, + &[], + ) + .await?; + Ok(exec) + } +} diff --git a/datafusion/src/row/reader.rs b/datafusion/src/row/reader.rs new file mode 100644 index 000000000000..779c09990ffc --- /dev/null +++ b/datafusion/src/row/reader.rs @@ -0,0 +1,408 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Accessing row from raw bytes + +use crate::error::{DataFusionError, Result}; +use crate::row::{all_valid, get_offsets, supported, NullBitsFormatter}; +use arrow::array::*; +use arrow::datatypes::{DataType, Schema}; +use arrow::error::Result as ArrowResult; +use arrow::record_batch::RecordBatch; +use arrow::util::bit_util::{ceil, get_bit_raw}; +use std::sync::Arc; + +/// Read `data` of raw-bytes rows starting at `offsets` out to a record batch +pub fn read_as_batch( + data: &mut [u8], + schema: Arc, + offsets: Vec, +) -> Result { + let row_num = offsets.len(); + let mut output = MutableRecordBatch::new(row_num, schema.clone()); + let mut row = RowReader::new(&schema, data); + + for offset in offsets.iter().take(row_num) { + row.point_to(*offset); + read_row(&row, &mut output, &schema); + } + + output.output().map_err(DataFusionError::ArrowError) +} + +macro_rules! get_idx { + ($NATIVE: ident, $SELF: ident, $IDX: ident, $WIDTH: literal) => {{ + $SELF.assert_index_valid($IDX); + let offset = $SELF.field_offsets[$IDX]; + let start = $SELF.base_offset + offset; + let end = start + $WIDTH; + $NATIVE::from_le_bytes($SELF.data[start..end].try_into().unwrap()) + }}; +} + +macro_rules! fn_get_idx { + ($NATIVE: ident, $WIDTH: literal) => { + paste::item! { + fn [](&self, idx: usize) -> $NATIVE { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + let start = self.base_offset + offset; + let end = start + $WIDTH; + $NATIVE::from_le_bytes(self.data[start..end].try_into().unwrap()) + } + } + }; +} + +macro_rules! fn_get_idx_opt { + ($NATIVE: ident) => { + paste::item! { + fn [](&self, idx: usize) -> Option<$NATIVE> { + if self.is_valid_at(idx) { + Some(self.[](idx)) + } else { + None + } + } + } + }; +} + +/// Read the tuple `data[base_offset..]` we are currently pointing to +pub struct RowReader<'a> { + /// Raw bytes slice where the tuple stores + data: &'a [u8], + /// Start position for the current tuple in the raw bytes slice. + base_offset: usize, + /// Total number of fields for each tuple. + field_count: usize, + /// The number of bytes used to store null bits for each field. + null_width: usize, + /// Starting offset for each fields in the raw bytes. + /// For fixed length fields, it's where the actual data stores. + /// For variable length fields, it's a pack of (offset << 32 | length) if we use u64. + field_offsets: Vec, +} + +impl<'a> std::fmt::Debug for RowReader<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let null_bits = self.null_bits(); + write!( + f, + "{:?}", + NullBitsFormatter::new(null_bits, self.field_count) + ) + } +} + +impl<'a> RowReader<'a> { + /// new + pub fn new(schema: &Arc, data: &'a [u8]) -> Self { + assert!(supported(schema)); + let field_count = schema.fields().len(); + let null_width = ceil(field_count, 8); + let (field_offsets, _) = get_offsets(null_width, schema); + Self { + data, + base_offset: 0, + field_count, + null_width, + field_offsets, + } + } + + /// Update this row to point to position `offset` in `base` + pub fn point_to(&mut self, offset: usize) { + self.base_offset = offset; + } + + #[inline] + fn assert_index_valid(&self, idx: usize) { + assert!(idx < self.field_count); + } + + #[inline(always)] + fn null_bits(&self) -> &[u8] { + let start = self.base_offset; + &self.data[start..start + self.null_width] + } + + #[inline(always)] + fn all_valid(&self) -> bool { + let null_bits = self.null_bits(); + all_valid(null_bits, self.field_count) + } + + fn is_valid_at(&self, idx: usize) -> bool { + unsafe { get_bit_raw(self.null_bits().as_ptr(), idx) } + } + + fn get_bool(&self, idx: usize) -> bool { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + let value = &self.data[self.base_offset + offset..]; + value[0] != 0 + } + + fn get_u8(&self, idx: usize) -> u8 { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[self.base_offset + offset] + } + + fn_get_idx!(u16, 2); + fn_get_idx!(u32, 4); + fn_get_idx!(u64, 8); + fn_get_idx!(i8, 1); + fn_get_idx!(i16, 2); + fn_get_idx!(i32, 4); + fn_get_idx!(i64, 8); + fn_get_idx!(f32, 4); + fn_get_idx!(f64, 8); + + fn get_date32(&self, idx: usize) -> i32 { + get_idx!(i32, self, idx, 4) + } + + fn get_date64(&self, idx: usize) -> i64 { + get_idx!(i64, self, idx, 8) + } + + fn get_utf8(&self, idx: usize) -> &str { + self.assert_index_valid(idx); + let offset_size = self.get_u64(idx); + let offset = (offset_size >> 32) as usize; + let len = (offset_size & 0xffff_ffff) as usize; + let varlena_offset = self.base_offset + offset; + let bytes = &self.data[varlena_offset..varlena_offset + len]; + std::str::from_utf8(bytes).unwrap() + } + + fn get_binary(&self, idx: usize) -> &[u8] { + self.assert_index_valid(idx); + let offset_size = self.get_u64(idx); + let offset = (offset_size >> 32) as usize; + let len = (offset_size & 0xffff_ffff) as usize; + let varlena_offset = self.base_offset + offset; + &self.data[varlena_offset..varlena_offset + len] + } + + fn_get_idx_opt!(bool); + fn_get_idx_opt!(u8); + fn_get_idx_opt!(u16); + fn_get_idx_opt!(u32); + fn_get_idx_opt!(u64); + fn_get_idx_opt!(i8); + fn_get_idx_opt!(i16); + fn_get_idx_opt!(i32); + fn_get_idx_opt!(i64); + fn_get_idx_opt!(f32); + fn_get_idx_opt!(f64); + + fn get_date32_opt(&self, idx: usize) -> Option { + if self.is_valid_at(idx) { + Some(self.get_date32(idx)) + } else { + None + } + } + + fn get_date64_opt(&self, idx: usize) -> Option { + if self.is_valid_at(idx) { + Some(self.get_date64(idx)) + } else { + None + } + } + + fn get_utf8_opt(&self, idx: usize) -> Option<&str> { + if self.is_valid_at(idx) { + Some(self.get_utf8(idx)) + } else { + None + } + } +} + +fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Arc) { + if row.all_valid() { + for ((col_idx, to), field) in batch + .arrays + .iter_mut() + .enumerate() + .zip(schema.fields().iter()) + { + read_field_null_free(to, field.data_type(), col_idx, row) + } + } else { + for ((col_idx, to), field) in batch + .arrays + .iter_mut() + .enumerate() + .zip(schema.fields().iter()) + { + read_field(to, field.data_type(), col_idx, row) + } + } +} + +macro_rules! fn_read_field { + ($NATIVE: ident, $ARRAY: ident) => { + paste::item! { + fn [](to: &mut Box, col_idx: usize, row: &RowReader) { + let to = to + .as_any_mut() + .downcast_mut::<$ARRAY>() + .unwrap(); + to.append_option(row.[](col_idx)) + .map_err(DataFusionError::ArrowError) + .unwrap(); + } + + fn [](to: &mut Box, col_idx: usize, row: &RowReader) { + let to = to + .as_any_mut() + .downcast_mut::<$ARRAY>() + .unwrap(); + to.append_value(row.[](col_idx)) + .map_err(DataFusionError::ArrowError) + .unwrap(); + } + } + }; +} + +fn_read_field!(bool, BooleanBuilder); +fn_read_field!(u8, UInt8Builder); +fn_read_field!(u16, UInt16Builder); +fn_read_field!(u32, UInt32Builder); +fn_read_field!(u64, UInt64Builder); +fn_read_field!(i8, Int8Builder); +fn_read_field!(i16, Int16Builder); +fn_read_field!(i32, Int32Builder); +fn_read_field!(i64, Int64Builder); +fn_read_field!(f32, Float32Builder); +fn_read_field!(f64, Float64Builder); +fn_read_field!(date32, Date32Builder); +fn_read_field!(date64, Date64Builder); +fn_read_field!(utf8, StringBuilder); + +fn read_field_binary(to: &mut Box, col_idx: usize, row: &RowReader) { + let to = to.as_any_mut().downcast_mut::().unwrap(); + if row.is_valid_at(col_idx) { + to.append_value(row.get_binary(col_idx)).unwrap(); + } else { + to.append_null().unwrap(); + } +} + +fn read_field_binary_nf(to: &mut Box, col_idx: usize, row: &RowReader) { + let to = to.as_any_mut().downcast_mut::().unwrap(); + to.append_value(row.get_binary(col_idx)) + .map_err(DataFusionError::ArrowError) + .unwrap(); +} + +fn read_field( + to: &mut Box, + dt: &DataType, + col_idx: usize, + row: &RowReader, +) { + use DataType::*; + match dt { + Boolean => read_field_bool(to, col_idx, row), + UInt8 => read_field_u8(to, col_idx, row), + UInt16 => read_field_u16(to, col_idx, row), + UInt32 => read_field_u32(to, col_idx, row), + UInt64 => read_field_u64(to, col_idx, row), + Int8 => read_field_i8(to, col_idx, row), + Int16 => read_field_i16(to, col_idx, row), + Int32 => read_field_i32(to, col_idx, row), + Int64 => read_field_i64(to, col_idx, row), + Float32 => read_field_f32(to, col_idx, row), + Float64 => read_field_f64(to, col_idx, row), + Date32 => read_field_date32(to, col_idx, row), + Date64 => read_field_date64(to, col_idx, row), + Utf8 => read_field_utf8(to, col_idx, row), + Binary => read_field_binary(to, col_idx, row), + _ => unimplemented!(), + } +} + +fn read_field_null_free( + to: &mut Box, + dt: &DataType, + col_idx: usize, + row: &RowReader, +) { + use DataType::*; + match dt { + Boolean => read_field_bool_nf(to, col_idx, row), + UInt8 => read_field_u8_nf(to, col_idx, row), + UInt16 => read_field_u16_nf(to, col_idx, row), + UInt32 => read_field_u32_nf(to, col_idx, row), + UInt64 => read_field_u64_nf(to, col_idx, row), + Int8 => read_field_i8_nf(to, col_idx, row), + Int16 => read_field_i16_nf(to, col_idx, row), + Int32 => read_field_i32_nf(to, col_idx, row), + Int64 => read_field_i64_nf(to, col_idx, row), + Float32 => read_field_f32_nf(to, col_idx, row), + Float64 => read_field_f64_nf(to, col_idx, row), + Date32 => read_field_date32_nf(to, col_idx, row), + Date64 => read_field_date64_nf(to, col_idx, row), + Utf8 => read_field_utf8_nf(to, col_idx, row), + Binary => read_field_binary_nf(to, col_idx, row), + _ => unimplemented!(), + } +} + +struct MutableRecordBatch { + arrays: Vec>, + schema: Arc, +} + +impl MutableRecordBatch { + fn new(target_batch_size: usize, schema: Arc) -> Self { + let arrays = new_arrays(&schema, target_batch_size); + Self { arrays, schema } + } + + fn output(&mut self) -> ArrowResult { + let result = make_batch(self.schema.clone(), self.arrays.drain(..).collect()); + result + } +} + +fn new_arrays(schema: &Arc, batch_size: usize) -> Vec> { + schema + .fields() + .iter() + .map(|field| { + let dt = field.data_type(); + make_builder(dt, batch_size) + }) + .collect::>() +} + +fn make_batch( + schema: Arc, + mut arrays: Vec>, +) -> ArrowResult { + let columns = arrays.iter_mut().map(|array| array.finish()).collect(); + RecordBatch::try_new(schema, columns) +} diff --git a/datafusion/src/row/writer.rs b/datafusion/src/row/writer.rs new file mode 100644 index 000000000000..698f7974c10a --- /dev/null +++ b/datafusion/src/row/writer.rs @@ -0,0 +1,332 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Reusable row writer backed by Vec to stitch attributes together + +use crate::row::{estimate_row_width, fixed_size, get_offsets, supported}; +use arrow::array::Array; +use arrow::datatypes::{DataType, Schema}; +use arrow::record_batch::RecordBatch; +use arrow::util::bit_util::{ceil, round_upto_power_of_2, set_bit_raw, unset_bit_raw}; +use std::cmp::max; +use std::sync::Arc; + +/// Append batch from `row_idx` to `output` buffer start from `offset` +/// # Panics +/// +/// This function will panic if the output buffer doesn't have enough space to hold all the rows +pub fn write_batch_unchecked( + output: &mut [u8], + offset: usize, + batch: &RecordBatch, + row_idx: usize, + schema: Arc, +) -> Vec { + let mut writer = RowWriter::new(&schema); + let mut current_offset = offset; + let mut offsets = vec![]; + for cur_row in row_idx..batch.num_rows() { + offsets.push(current_offset); + let row_width = write_row(&mut writer, cur_row, batch); + output[current_offset..current_offset + row_width] + .copy_from_slice(writer.get_row()); + current_offset += row_width; + writer.reset() + } + offsets +} + +macro_rules! set_idx { + ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{ + $SELF.assert_index_valid($IDX); + let offset = $SELF.field_offsets[$IDX]; + $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes()); + }}; +} + +macro_rules! fn_set_idx { + ($NATIVE: ident, $WIDTH: literal) => { + paste::item! { + fn [](&mut self, idx: usize, value: $NATIVE) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes()); + } + } + }; +} + +/// Reusable row writer backed by Vec +pub struct RowWriter { + /// buffer for the current tuple been written. + data: Vec, + /// Total number of fields for each tuple. + field_count: usize, + /// Length in bytes for the current tuple, 8-bytes word aligned. + row_width: usize, + /// The number of bytes used to store null bits for each field. + null_width: usize, + /// Length in bytes for `values` part of the current tuple. + values_width: usize, + /// Length in bytes for `variable length data` part of the current tuple. + varlena_width: usize, + /// Current offset for the next variable length field to write to. + varlena_offset: usize, + /// Starting offset for each fields in the raw bytes. + /// For fixed length fields, it's where the actual data stores. + /// For variable length fields, it's a pack of (offset << 32 | length) if we use u64. + field_offsets: Vec, +} + +impl RowWriter { + /// new + pub fn new(schema: &Arc) -> Self { + assert!(supported(schema)); + let field_count = schema.fields().len(); + let null_width = ceil(field_count, 8); + let (field_offsets, values_width) = get_offsets(null_width, schema); + let mut init_capacity = estimate_row_width(null_width, schema); + if !fixed_size(schema) { + // double the capacity to avoid repeated resize + init_capacity *= 2; + } + Self { + data: vec![0; init_capacity], + field_count, + row_width: 0, + null_width, + values_width, + varlena_width: 0, + varlena_offset: null_width + values_width, + field_offsets, + } + } + + /// Reset the row writer state for new tuple + pub fn reset(&mut self) { + self.data.fill(0); + self.row_width = 0; + self.varlena_width = 0; + self.varlena_offset = self.null_width + self.values_width; + } + + #[inline] + fn assert_index_valid(&self, idx: usize) { + assert!(idx < self.field_count); + } + + fn set_null_at(&mut self, idx: usize) { + let null_bits = &mut self.data[0..self.null_width]; + unsafe { + unset_bit_raw(null_bits.as_mut_ptr(), idx); + } + } + + fn set_non_null_at(&mut self, idx: usize) { + let null_bits = &mut self.data[0..self.null_width]; + unsafe { + set_bit_raw(null_bits.as_mut_ptr(), idx); + } + } + + fn set_bool(&mut self, idx: usize, value: bool) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset] = if value { 1 } else { 0 }; + } + + fn set_u8(&mut self, idx: usize, value: u8) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset] = value; + } + + fn_set_idx!(u16, 2); + fn_set_idx!(u32, 4); + fn_set_idx!(u64, 8); + fn_set_idx!(i16, 2); + fn_set_idx!(i32, 4); + fn_set_idx!(i64, 8); + fn_set_idx!(f32, 4); + fn_set_idx!(f64, 8); + + fn set_i8(&mut self, idx: usize, value: i8) { + self.assert_index_valid(idx); + let offset = self.field_offsets[idx]; + self.data[offset] = value.to_le_bytes()[0]; + } + + fn set_date32(&mut self, idx: usize, value: i32) { + set_idx!(4, self, idx, value) + } + + fn set_date64(&mut self, idx: usize, value: i64) { + set_idx!(8, self, idx, value) + } + + fn set_offset_size(&mut self, idx: usize, size: u32) { + let offset_and_size: u64 = (self.varlena_offset as u64) << 32 | (size as u64); + self.set_u64(idx, offset_and_size); + } + + fn set_utf8(&mut self, idx: usize, value: &str) { + self.assert_index_valid(idx); + let bytes = value.as_bytes(); + let size = bytes.len(); + self.set_offset_size(idx, size as u32); + let varlena_offset = self.varlena_offset; + self.data[varlena_offset..varlena_offset + size].copy_from_slice(bytes); + self.varlena_offset += size; + self.varlena_width += size; + } + + fn set_binary(&mut self, idx: usize, value: &[u8]) { + self.assert_index_valid(idx); + let size = value.len(); + self.set_offset_size(idx, size as u32); + let varlena_offset = self.varlena_offset; + self.data[varlena_offset..varlena_offset + size].copy_from_slice(value); + self.varlena_offset += size; + self.varlena_width += size; + } + + fn current_width(&self) -> usize { + self.null_width + self.values_width + self.varlena_width + } + + /// End each row at 8-byte word boundary. + fn end_padding(&mut self) { + let payload_width = self.current_width(); + self.row_width = round_upto_power_of_2(payload_width, 8); + if self.data.capacity() < self.row_width { + self.data.resize(self.row_width, 0); + } + } + + fn get_row(&self) -> &[u8] { + &self.data[0..self.row_width] + } +} + +/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width +fn write_row(row: &mut RowWriter, row_idx: usize, batch: &RecordBatch) -> usize { + // Get the row from the batch denoted by row_idx + for ((i, f), col) in batch + .schema() + .fields() + .iter() + .enumerate() + .zip(batch.columns().iter()) + { + if !col.is_null(row_idx) { + row.set_non_null_at(i); + write_field(i, row_idx, col, f.data_type(), row); + } else { + row.set_null_at(i); + } + } + + row.end_padding(); + row.row_width +} + +fn write_field( + col_idx: usize, + row_idx: usize, + col: &Arc, + dt: &DataType, + row: &mut RowWriter, +) { + // TODO: JIT compile this + use arrow::array::*; + use DataType::*; + match dt { + Boolean => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_bool(col_idx, c.value(row_idx)); + } + UInt8 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_u8(col_idx, c.value(row_idx)); + } + UInt16 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_u16(col_idx, c.value(row_idx)); + } + UInt32 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_u32(col_idx, c.value(row_idx)); + } + UInt64 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_u64(col_idx, c.value(row_idx)); + } + Int8 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_i8(col_idx, c.value(row_idx)); + } + Int16 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_i16(col_idx, c.value(row_idx)); + } + Int32 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_i32(col_idx, c.value(row_idx)); + } + Int64 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_i64(col_idx, c.value(row_idx)); + } + Float32 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_f32(col_idx, c.value(row_idx)); + } + Float64 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_f64(col_idx, c.value(row_idx)); + } + Date32 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_date32(col_idx, c.value(row_idx)); + } + Date64 => { + let c = col.as_any().downcast_ref::().unwrap(); + row.set_date64(col_idx, c.value(row_idx)); + } + Utf8 => { + let c = col.as_any().downcast_ref::().unwrap(); + let s = c.value(row_idx); + let new_width = row.current_width() + s.as_bytes().len(); + if new_width > row.data.capacity() { + // double the capacity to avoid repeated resize + row.data.resize(max(row.data.capacity() * 2, new_width), 0); + } + row.set_utf8(col_idx, s); + } + Binary => { + let c = col.as_any().downcast_ref::().unwrap(); + let binary = c.value(row_idx); + let new_width = row.current_width() + binary.len(); + if new_width > row.data.capacity() { + // double the capacity to avoid repeated resize + row.data.resize(max(row.data.capacity() * 2, new_width), 0); + } + row.set_binary(col_idx, binary); + } + _ => unimplemented!(), + } +}