diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index 16cd9a9a0a5a..fcc4d2c371b2 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -51,9 +51,9 @@ jobs: - name: Test run: | cargo test -p arrow - - name: Test --features=force_validate,prettyprint,ffi + - name: Test --features=force_validate,prettyprint,ipc_compression,ffi run: | - cargo test -p arrow --features=force_validate,prettyprint,ffi + cargo test -p arrow --features=force_validate,prettyprint,ipc_compression,ffi - name: Run examples run: | # Test arrow examples @@ -172,4 +172,4 @@ jobs: rustup component add clippy - name: Run clippy run: | - cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi --all-targets -- -D warnings + cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi,ipc_compression --all-targets -- -D warnings diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index dbc606ad20e3..bebaadcbc69f 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -56,6 +56,7 @@ csv_crate = { version = "1.1", default-features = false, optional = true, packag regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] } regex-syntax = { version = "0.6.27", default-features = false, features = ["unicode"] } lazy_static = { version = "1.4", default-features = false } +lz4 = { version = "1.23", default-features = false, optional = true } packed_simd = { version = "0.3", default-features = false, optional = true, package = "packed_simd_2" } chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.6", default-features = false, optional = true } @@ -66,9 +67,11 @@ pyo3 = { version = "0.16", default-features = false, optional = true } lexical-core = { version = "^0.8", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] } multiversion = { version = "0.6.1", default-features = false } bitflags = { version = "1.2.1", default-features = false } +zstd = { version = "0.11.1", default-features = false, optional = true } [features] default = ["csv", "ipc"] +ipc_compression = ["ipc", "zstd", "lz4"] csv = ["csv_crate"] ipc = ["flatbuffers"] simd = ["packed_simd"] diff --git a/arrow/README.md b/arrow/README.md index f7ccb9696455..5e20a42538d5 100644 --- a/arrow/README.md +++ b/arrow/README.md @@ -42,7 +42,8 @@ However, for historical reasons, this crate uses versions with major numbers gre The `arrow` crate provides the following features which may be enabled in your `Cargo.toml`: - `csv` (default) - support for reading and writing Arrow arrays to/from csv files -- `ipc` (default) - support for the [arrow-flight](https://crates.io/crates/arrow-flight) IPC and wire format +- `ipc` (default) - support for reading [Arrow IPC Format](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc), also used as the wire protocol in [arrow-flight](https://crates.io/crates/arrow-flight) +- `ipc_compression` - Enables reading and writing compressed IPC streams (also enables `ipc`) - `prettyprint` - support for formatting record batches as textual columns - `js` - support for building arrow for WebAssembly / JavaScript - `simd` - (_Requires Nightly Rust_) Use alternate hand optimized diff --git a/arrow/src/ipc/compression/codec.rs b/arrow/src/ipc/compression/codec.rs new file mode 100644 index 000000000000..9d870fc22241 --- /dev/null +++ b/arrow/src/ipc/compression/codec.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::buffer::Buffer; +use crate::error::{ArrowError, Result}; +use crate::ipc::CompressionType; +use std::io::{Read, Write}; + +const LENGTH_NO_COMPRESSED_DATA: i64 = -1; +const LENGTH_OF_PREFIX_DATA: i64 = 8; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +/// Represents compressing a ipc stream using a particular compression algorithm +pub enum CompressionCodec { + Lz4Frame, + Zstd, +} + +impl TryFrom for CompressionCodec { + type Error = ArrowError; + + fn try_from(compression_type: CompressionType) -> Result { + match compression_type { + CompressionType::ZSTD => Ok(CompressionCodec::Zstd), + CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame), + other_type => Err(ArrowError::NotYetImplemented(format!( + "compression type {:?} not supported ", + other_type + ))), + } + } +} + +impl CompressionCodec { + /// Compresses the data in `input` to `output` and appends the + /// data using the specified compression mechanism. + /// + /// returns the number of bytes written to the stream + /// + /// Writes this format to output: + /// ```text + /// [8 bytes]: uncompressed length + /// [remaining bytes]: compressed data stream + /// ``` + pub(crate) fn compress_to_vec( + &self, + input: &[u8], + output: &mut Vec, + ) -> Result { + let uncompressed_data_len = input.len(); + let original_output_len = output.len(); + + if input.is_empty() { + // empty input, nothing to do + } else { + // write compressed data directly into the output buffer + output.extend_from_slice(&uncompressed_data_len.to_le_bytes()); + self.compress(input, output)?; + + let compression_len = output.len(); + if compression_len > uncompressed_data_len { + // length of compressed data was larger than + // uncompressed data, use the uncompressed data with + // length -1 to indicate that we don't compress the + // data + output.truncate(original_output_len); + output.extend_from_slice(&LENGTH_NO_COMPRESSED_DATA.to_le_bytes()); + output.extend_from_slice(input); + } + } + Ok(output.len() - original_output_len) + } + + /// Decompresses the input into a [`Buffer`] + /// + /// The input should look like: + /// ```text + /// [8 bytes]: uncompressed length + /// [remaining bytes]: compressed data stream + /// ``` + pub(crate) fn decompress_to_buffer(&self, input: &[u8]) -> Result { + // read the first 8 bytes to determine if the data is + // compressed + let decompressed_length = read_uncompressed_size(input); + let buffer = if decompressed_length == 0 { + // emtpy + let empty = Vec::::new(); + Buffer::from(empty) + } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA { + // no compression + let data = &input[(LENGTH_OF_PREFIX_DATA as usize)..]; + Buffer::from(data) + } else { + // decompress data using the codec + let mut uncompressed_buffer = + Vec::with_capacity(decompressed_length as usize); + let input_data = &input[(LENGTH_OF_PREFIX_DATA as usize)..]; + self.decompress(input_data, &mut uncompressed_buffer)?; + Buffer::from(uncompressed_buffer) + }; + Ok(buffer) + } + + /// Compress the data in input buffer and write to output buffer + /// using the specified compression + fn compress(&self, input: &[u8], output: &mut Vec) -> Result<()> { + match self { + CompressionCodec::Lz4Frame => { + let mut encoder = lz4::EncoderBuilder::new().build(output)?; + encoder.write_all(input)?; + match encoder.finish().1 { + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + } + } + CompressionCodec::Zstd => { + let mut encoder = zstd::Encoder::new(output, 0)?; + encoder.write_all(input)?; + match encoder.finish() { + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + } + } + } + } + + /// Decompress the data in input buffer and write to output buffer + /// using the specified compression + fn decompress(&self, input: &[u8], output: &mut Vec) -> Result { + let result: Result = match self { + CompressionCodec::Lz4Frame => { + let mut decoder = lz4::Decoder::new(input)?; + match decoder.read_to_end(output) { + Ok(size) => Ok(size), + Err(e) => Err(e.into()), + } + } + CompressionCodec::Zstd => { + let mut decoder = zstd::Decoder::new(input)?; + match decoder.read_to_end(output) { + Ok(size) => Ok(size), + Err(e) => Err(e.into()), + } + } + }; + result + } +} + +/// Get the uncompressed length +/// Notes: +/// LENGTH_NO_COMPRESSED_DATA: indicate that the data that follows is not compressed +/// 0: indicate that there is no data +/// positive number: indicate the uncompressed length for the following data +#[inline] +fn read_uncompressed_size(buffer: &[u8]) -> i64 { + let len_buffer = &buffer[0..8]; + // 64-bit little-endian signed integer + i64::from_le_bytes(len_buffer.try_into().unwrap()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lz4_compression() { + let input_bytes = "hello lz4".as_bytes(); + let codec: CompressionCodec = CompressionCodec::Lz4Frame; + let mut output_bytes: Vec = Vec::new(); + codec.compress(input_bytes, &mut output_bytes).unwrap(); + let mut result_output_bytes: Vec = Vec::new(); + codec + .decompress(output_bytes.as_slice(), &mut result_output_bytes) + .unwrap(); + assert_eq!(input_bytes, result_output_bytes.as_slice()); + } + + #[test] + fn test_zstd_compression() { + let input_bytes = "hello zstd".as_bytes(); + let codec: CompressionCodec = CompressionCodec::Zstd; + let mut output_bytes: Vec = Vec::new(); + codec.compress(input_bytes, &mut output_bytes).unwrap(); + let mut result_output_bytes: Vec = Vec::new(); + codec + .decompress(output_bytes.as_slice(), &mut result_output_bytes) + .unwrap(); + assert_eq!(input_bytes, result_output_bytes.as_slice()); + } +} diff --git a/arrow/src/ipc/compression/mod.rs b/arrow/src/ipc/compression/mod.rs new file mode 100644 index 000000000000..666fa6d86a27 --- /dev/null +++ b/arrow/src/ipc/compression/mod.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(feature = "ipc_compression")] +mod codec; +#[cfg(feature = "ipc_compression")] +pub(crate) use codec::CompressionCodec; + +#[cfg(not(feature = "ipc_compression"))] +mod stub; +#[cfg(not(feature = "ipc_compression"))] +pub(crate) use stub::CompressionCodec; diff --git a/arrow/src/ipc/compression/stub.rs b/arrow/src/ipc/compression/stub.rs new file mode 100644 index 000000000000..6240f084be3f --- /dev/null +++ b/arrow/src/ipc/compression/stub.rs @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Stubs that implement the same interface as the ipc_compression +//! codec module, but always errors. + +use crate::buffer::Buffer; +use crate::error::{ArrowError, Result}; +use crate::ipc::CompressionType; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CompressionCodec {} + +impl TryFrom for CompressionType { + type Error = ArrowError; + fn try_from(codec: CompressionCodec) -> Result { + Err(ArrowError::InvalidArgumentError( + format!("codec type {:?} not supported because arrow was not compiled with the ipc_compression feature", codec))) + } +} + +impl TryFrom for CompressionCodec { + type Error = ArrowError; + + fn try_from(compression_type: CompressionType) -> Result { + Err(ArrowError::InvalidArgumentError( + format!("compression type {:?} not supported because arrow was not compiled with the ipc_compression feature", compression_type)) + ) + } +} + +impl CompressionCodec { + #[allow(clippy::ptr_arg)] + pub(crate) fn compress_to_vec( + &self, + _input: &[u8], + _output: &mut Vec, + ) -> Result { + Err(ArrowError::InvalidArgumentError( + "compression not supported because arrow was not compiled with the ipc_compression feature".to_string() + )) + } + + pub(crate) fn decompress_to_buffer(&self, _input: &[u8]) -> Result { + Err(ArrowError::InvalidArgumentError( + "decompression not supported because arrow was not compiled with the ipc_compression feature".to_string() + )) + } +} diff --git a/arrow/src/ipc/mod.rs b/arrow/src/ipc/mod.rs index d5455b454e7f..2b30e72206c3 100644 --- a/arrow/src/ipc/mod.rs +++ b/arrow/src/ipc/mod.rs @@ -22,6 +22,8 @@ pub mod convert; pub mod reader; pub mod writer; +mod compression; + #[allow(clippy::redundant_closure)] #[allow(clippy::needless_lifetimes)] #[allow(clippy::extra_unused_lifetimes)] diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index ce44d74a1a1b..393128371b19 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -21,6 +21,7 @@ //! however the `FileReader` expects a reader that supports `Seek`ing use std::collections::HashMap; +use std::fmt; use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; @@ -32,15 +33,35 @@ use crate::error::{ArrowError, Result}; use crate::ipc; use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader}; +use crate::ipc::compression::CompressionCodec; use ipc::CONTINUATION_MARKER; use DataType::*; /// Read a buffer based on offset and length -fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer { +/// From +/// Each constituent buffer is first compressed with the indicated +/// compressor, and then written with the uncompressed length in the first 8 +/// bytes as a 64-bit little-endian signed integer followed by the compressed +/// buffer bytes (and then padding as required by the protocol). The +/// uncompressed length may be set to -1 to indicate that the data that +/// follows is not compressed, which can be useful for cases where +/// compression does not yield appreciable savings. +fn read_buffer( + buf: &ipc::Buffer, + a_data: &[u8], + compression_codec: &Option, +) -> Result { let start_offset = buf.offset() as usize; let end_offset = start_offset + buf.length() as usize; let buf_data = &a_data[start_offset..end_offset]; - Buffer::from(&buf_data) + // corner case: empty buffer + if buf_data.is_empty() { + return Ok(Buffer::from(buf_data)); + } + match compression_codec { + Some(decompressor) => decompressor.decompress_to_buffer(buf_data), + None => Ok(Buffer::from(buf_data)), + } } /// Coordinates reading arrays based on data types. @@ -61,6 +82,7 @@ fn create_array( dictionaries_by_id: &HashMap, mut node_index: usize, mut buffer_index: usize, + compression_codec: &Option, metadata: &ipc::MetadataVersion, ) -> Result<(ArrayRef, usize, usize)> { use DataType::*; @@ -72,8 +94,8 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 3] .iter() - .map(|buf| read_buffer(buf, data)) - .collect(), + .map(|buf| read_buffer(buf, data, compression_codec)) + .collect::>()?, ); node_index += 1; buffer_index += 3; @@ -85,8 +107,8 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) - .collect(), + .map(|buf| read_buffer(buf, data, compression_codec)) + .collect::>()?, ); node_index += 1; buffer_index += 2; @@ -96,8 +118,8 @@ fn create_array( let list_node = &nodes[node_index]; let list_buffers: Vec = buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) - .collect(); + .map(|buf| read_buffer(buf, data, compression_codec)) + .collect::>()?; node_index += 1; buffer_index += 2; let triple = create_array( @@ -108,6 +130,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec, metadata, )?; node_index = triple.1; @@ -119,8 +142,8 @@ fn create_array( let list_node = &nodes[node_index]; let list_buffers: Vec = buffers[buffer_index..=buffer_index] .iter() - .map(|buf| read_buffer(buf, data)) - .collect(); + .map(|buf| read_buffer(buf, data, compression_codec)) + .collect::>()?; node_index += 1; buffer_index += 1; let triple = create_array( @@ -131,6 +154,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec, metadata, )?; node_index = triple.1; @@ -140,7 +164,8 @@ fn create_array( } Struct(struct_fields) => { let struct_node = &nodes[node_index]; - let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data); + let null_buffer: Buffer = + read_buffer(&buffers[buffer_index], data, compression_codec)?; node_index += 1; buffer_index += 1; @@ -157,6 +182,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec, metadata, )?; node_index = triple.1; @@ -177,8 +203,8 @@ fn create_array( let index_node = &nodes[node_index]; let index_buffers: Vec = buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) - .collect(); + .map(|buf| read_buffer(buf, data, compression_codec)) + .collect::>()?; let dict_id = field.dict_id().ok_or_else(|| { ArrowError::IoError(format!("Field {} does not have dict id", field)) @@ -209,18 +235,20 @@ fn create_array( // In V4, union types has validity bitmap // In V5 and later, union types have no validity bitmap if metadata < &ipc::MetadataVersion::V5 { - read_buffer(&buffers[buffer_index], data); + read_buffer(&buffers[buffer_index], data, compression_codec)?; buffer_index += 1; } let type_ids: Buffer = - read_buffer(&buffers[buffer_index], data)[..len].into(); + read_buffer(&buffers[buffer_index], data, compression_codec)?[..len] + .into(); buffer_index += 1; let value_offsets = match mode { UnionMode::Dense => { - let buffer = read_buffer(&buffers[buffer_index], data); + let buffer = + read_buffer(&buffers[buffer_index], data, compression_codec)?; buffer_index += 1; Some(buffer[..len * 4].into()) } @@ -238,6 +266,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec, metadata, )?; @@ -277,8 +306,8 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) - .collect(), + .map(|buf| read_buffer(buf, data, compression_codec)) + .collect::>()?, ); node_index += 1; buffer_index += 2; @@ -603,6 +632,11 @@ pub fn read_record_batch( let field_nodes = batch.nodes().ok_or_else(|| { ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string()) })?; + let batch_compression = batch.compression(); + let compression_codec: Option = batch_compression + .map(|batch_compression| batch_compression.codec().try_into()) + .transpose()?; + // keep track of buffer and node index, the functions that create arrays mutate these let mut buffer_index = 0; let mut node_index = 0; @@ -626,6 +660,7 @@ pub fn read_record_batch( dictionaries_by_id, node_index, buffer_index, + &compression_codec, metadata, )?; node_index = triple.1; @@ -664,6 +699,7 @@ pub fn read_record_batch( dictionaries_by_id, node_index, buffer_index, + &compression_codec, metadata, )?; node_index = triple.1; @@ -761,6 +797,21 @@ pub struct FileReader { projection: Option<(Vec, Schema)>, } +impl fmt::Debug for FileReader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { + f.debug_struct("FileReader") + .field("reader", &"BufReader<..>") + .field("schema", &self.schema) + .field("blocks", &self.blocks) + .field("current_block", &self.current_block) + .field("total_blocks", &self.total_blocks) + .field("dictionaries_by_id", &self.dictionaries_by_id) + .field("metadata_version", &self.metadata_version) + .field("projection", &self.projection) + .finish() + } +} + impl FileReader { /// Try to create a new file reader /// @@ -921,7 +972,6 @@ impl FileReader { let mut block_data = vec![0; meta_len as usize]; self.reader.read_exact(&mut block_data)?; - let message = ipc::root_as_message(&block_data[..]).map_err(|err| { ArrowError::IoError(format!("Unable to get root as footer: {:?}", err)) })?; @@ -1013,6 +1063,18 @@ pub struct StreamReader { projection: Option<(Vec, Schema)>, } +impl fmt::Debug for StreamReader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { + f.debug_struct("StreamReader") + .field("reader", &"BufReader<..>") + .field("schema", &self.schema) + .field("dictionaries_by_id", &self.dictionaries_by_id) + .field("finished", &self.finished) + .field("projection", &self.projection) + .finish() + } +} + impl StreamReader { /// Try to create a new stream reader /// @@ -1414,6 +1476,105 @@ mod tests { }); } + #[test] + #[cfg(feature = "ipc_compression")] + fn read_generated_streams_200() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "2.0.0-compression"; + + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let mut reader = StreamReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + // the next batch must be empty + assert!(reader.next().is_none()); + // the stream must indicate that it's finished + assert!(reader.is_finished()); + }); + } + + #[test] + #[cfg(not(feature = "ipc_compression"))] + fn read_generated_streams_200_negative() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "2.0.0-compression"; + + // the test is repetitive, thus we can read all supported files at once + let cases = vec![("generated_lz4", "LZ4_FRAME"), ("generated_zstd", "ZSTD")]; + cases.iter().for_each(|(path, compression_name)| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let mut reader = StreamReader::try_new(file, None).unwrap(); + let err = reader.next().unwrap().unwrap_err(); + let expected_error = format!( + "Invalid argument error: compression type {} not supported because arrow was not compiled with the ipc_compression feature", + compression_name + ); + assert_eq!(err.to_string(), expected_error); + }); + } + + #[test] + #[cfg(feature = "ipc_compression")] + fn read_generated_files_200() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "2.0.0-compression"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); + } + + #[test] + #[cfg(not(feature = "ipc_compression"))] + fn read_generated_files_200_negative() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "2.0.0-compression"; + // the test is repetitive, thus we can read all supported files at once + let cases = vec![("generated_lz4", "LZ4_FRAME"), ("generated_zstd", "ZSTD")]; + cases.iter().for_each(|(path, compression_name)| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + let err = reader.next().unwrap().unwrap_err(); + let expected_error = format!( + "Invalid argument error: compression type {} not supported because arrow was not compiled with the ipc_compression feature", + compression_name + ); + assert_eq!(err.to_string(), expected_error); + }); + } + fn create_test_projection_schema() -> Schema { // define field types let list_data_type = diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index f0942b074cfe..c817cb77c13a 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -39,6 +39,7 @@ use crate::ipc; use crate::record_batch::RecordBatch; use crate::util::bit_util; +use crate::ipc::compression::CompressionCodec; use ipc::CONTINUATION_MARKER; /// IPC write options used to control the behaviour of the writer @@ -58,9 +59,30 @@ pub struct IpcWriteOptions { /// version 2.0.0: V4, with legacy format enabled /// version 4.0.0: V5 metadata_version: ipc::MetadataVersion, + /// Compression, if desired. Only supported when `ipc_compression` + /// feature is enabled + batch_compression_type: Option, } impl IpcWriteOptions { + /// Configures compression when writing IPC files. Requires the + /// `ipc_compression` feature of the crate to be activated. + #[cfg(feature = "ipc_compression")] + pub fn try_with_compression( + mut self, + batch_compression_type: Option, + ) -> Result { + self.batch_compression_type = batch_compression_type; + + if self.batch_compression_type.is_some() + && self.metadata_version < ipc::MetadataVersion::V5 + { + return Err(ArrowError::InvalidArgumentError( + "Compression only supported in metadata v5 and above".to_string(), + )); + } + Ok(self) + } /// Try create IpcWriteOptions, checking for incompatible settings pub fn try_new( alignment: usize, @@ -82,6 +104,7 @@ impl IpcWriteOptions { alignment, write_legacy_ipc_format, metadata_version, + batch_compression_type: None, }), ipc::MetadataVersion::V5 => { if write_legacy_ipc_format { @@ -94,10 +117,14 @@ impl IpcWriteOptions { alignment, write_legacy_ipc_format, metadata_version, + batch_compression_type: None, }) } } - z => panic!("Unsupported ipc::MetadataVersion {:?}", z), + z => Err(ArrowError::InvalidArgumentError(format!( + "Unsupported ipc::MetadataVersion {:?}", + z + ))), } } } @@ -108,6 +135,7 @@ impl Default for IpcWriteOptions { alignment: 8, write_legacy_ipc_format: false, metadata_version: ipc::MetadataVersion::V5, + batch_compression_type: None, } } } @@ -278,7 +306,7 @@ impl IpcDataGenerator { dict_id, dict_values, write_options, - )); + )?); } } _ => self._encode_dictionaries( @@ -312,7 +340,7 @@ impl IpcDataGenerator { )?; } - let encoded_message = self.record_batch_to_bytes(batch, write_options); + let encoded_message = self.record_batch_to_bytes(batch, write_options)?; Ok((encoded_dictionaries, encoded_message)) } @@ -322,13 +350,27 @@ impl IpcDataGenerator { &self, batch: &RecordBatch, write_options: &IpcWriteOptions, - ) -> EncodedData { + ) -> Result { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; let mut arrow_data: Vec = vec![]; let mut offset = 0; + + // get the type of compression + let batch_compression_type = write_options.batch_compression_type; + + let compression = batch_compression_type.map(|batch_compression_type| { + let mut c = ipc::BodyCompressionBuilder::new(&mut fbb); + c.add_method(ipc::BodyCompressionMethod::BUFFER); + c.add_codec(batch_compression_type); + c.finish() + }); + + let compression_codec: Option = + batch_compression_type.map(TryInto::try_into).transpose()?; + for array in batch.columns() { let array_data = array.data(); offset = write_array_data( @@ -339,19 +381,26 @@ impl IpcDataGenerator { offset, array.len(), array.null_count(), + &compression_codec, write_options, - ); + )?; } + // pad the tail of body data + let len = arrow_data.len(); + let pad_len = pad_to_8(len as u32); + arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); // write data let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); - let root = { let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); batch_builder.add_length(batch.num_rows() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); + if let Some(c) = compression { + batch_builder.add_compression(c); + } let b = batch_builder.finish(); b.as_union_value() }; @@ -365,10 +414,10 @@ impl IpcDataGenerator { fbb.finish(root, None); let finished_data = fbb.finished_data(); - EncodedData { + Ok(EncodedData { ipc_message: finished_data.to_vec(), arrow_data, - } + }) } /// Write dictionary values into two sets of bytes, one for the header (ipc::Message) and the @@ -378,13 +427,27 @@ impl IpcDataGenerator { dict_id: i64, array_data: &ArrayData, write_options: &IpcWriteOptions, - ) -> EncodedData { + ) -> Result { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; let mut arrow_data: Vec = vec![]; + // get the type of compression + let batch_compression_type = write_options.batch_compression_type; + + let compression = batch_compression_type.map(|batch_compression_type| { + let mut c = ipc::BodyCompressionBuilder::new(&mut fbb); + c.add_method(ipc::BodyCompressionMethod::BUFFER); + c.add_codec(batch_compression_type); + c.finish() + }); + + let compression_codec: Option = batch_compression_type + .map(|batch_compression_type| batch_compression_type.try_into()) + .transpose()?; + write_array_data( array_data, &mut buffers, @@ -393,8 +456,14 @@ impl IpcDataGenerator { 0, array_data.len(), array_data.null_count(), + &compression_codec, write_options, - ); + )?; + + // pad the tail of body data + let len = arrow_data.len(); + let pad_len = pad_to_8(len as u32); + arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); // write data let buffers = fbb.create_vector(&buffers); @@ -405,6 +474,9 @@ impl IpcDataGenerator { batch_builder.add_length(array_data.len() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); + if let Some(c) = compression { + batch_builder.add_compression(c); + } batch_builder.finish() }; @@ -427,10 +499,10 @@ impl IpcDataGenerator { fbb.finish(root, None); let finished_data = fbb.finished_data(); - EncodedData { + Ok(EncodedData { ipc_message: finished_data.to_vec(), arrow_data, - } + }) } } @@ -519,9 +591,10 @@ impl FileWriter { ) -> Result { let data_gen = IpcDataGenerator::default(); let mut writer = BufWriter::new(writer); - // write magic to header + // write magic to header aligned on 8 byte boundary + let header_size = super::ARROW_MAGIC.len() + 2; + assert_eq!(header_size, 8); writer.write_all(&super::ARROW_MAGIC[..])?; - // create an 8-byte boundary after the header writer.write_all(&[0, 0])?; // write the schema, set the written bytes to the schema + header let encoded_message = data_gen.schema_to_bytes(schema, &write_options); @@ -530,7 +603,7 @@ impl FileWriter { writer, write_options, schema: schema.clone(), - block_offsets: meta + data + 8, + block_offsets: meta + data + header_size, dictionary_blocks: vec![], record_blocks: vec![], finished: false, @@ -984,8 +1057,9 @@ fn write_array_data( offset: i64, num_rows: usize, null_count: usize, + compression_codec: &Option, write_options: &IpcWriteOptions, -) -> i64 { +) -> Result { let mut offset = offset; if !matches!(array_data.data_type(), DataType::Null) { nodes.push(ipc::FieldNode::new(num_rows as i64, null_count as i64)); @@ -1007,7 +1081,13 @@ fn write_array_data( Some(buffer) => buffer.bit_slice(array_data.offset(), array_data.len()), }; - offset = write_buffer(null_buffer.as_slice(), buffers, arrow_data, offset); + offset = write_buffer( + null_buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + )?; } let data_type = array_data.data_type(); @@ -1040,18 +1120,36 @@ fn write_array_data( ) }; - offset = write_buffer(new_offsets.as_slice(), buffers, arrow_data, offset); + offset = write_buffer( + new_offsets.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + )?; let total_bytes = get_binary_buffer_len(array_data); let value_buffer = &array_data.buffers()[1]; let buffer_length = min(total_bytes, value_buffer.len() - byte_offset); let buffer_slice = &value_buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]; - offset = write_buffer(buffer_slice, buffers, arrow_data, offset); + offset = write_buffer( + buffer_slice, + buffers, + arrow_data, + offset, + compression_codec, + )?; } else { - array_data.buffers().iter().for_each(|buffer| { - offset = write_buffer(buffer.as_slice(), buffers, arrow_data, offset); - }); + for buffer in array_data.buffers() { + offset = write_buffer( + buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + )?; + } } } else if DataType::is_numeric(data_type) || DataType::is_temporal(data_type) @@ -1074,19 +1172,32 @@ fn write_array_data( let buffer_length = min(min_length, buffer.len() - byte_offset); let buffer_slice = &buffer.as_slice()[byte_offset..(byte_offset + buffer_length)]; - offset = write_buffer(buffer_slice, buffers, arrow_data, offset); + offset = write_buffer( + buffer_slice, + buffers, + arrow_data, + offset, + compression_codec, + )?; } else { - offset = write_buffer(buffer.as_slice(), buffers, arrow_data, offset); + offset = write_buffer( + buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + )?; } } else { - array_data.buffers().iter().for_each(|buffer| { - offset = write_buffer(buffer, buffers, arrow_data, offset); - }); + for buffer in array_data.buffers() { + offset = + write_buffer(buffer, buffers, arrow_data, offset, compression_codec)?; + } } if !matches!(array_data.data_type(), DataType::Dictionary(_, _)) { // recursively write out nested structures - array_data.child_data().iter().for_each(|data_ref| { + for data_ref in array_data.child_data() { // write the nested data (e.g list data) offset = write_array_data( data_ref, @@ -1096,29 +1207,56 @@ fn write_array_data( offset, data_ref.len(), data_ref.null_count(), + compression_codec, write_options, - ); - }); + )?; + } } - offset + Ok(offset) } -/// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector +/// Write a buffer into `arrow_data`, a vector of bytes, and adds its +/// [`ipc::Buffer`] to `buffers`. Returns the new offset in `arrow_data` +/// +/// +/// From +/// Each constituent buffer is first compressed with the indicated +/// compressor, and then written with the uncompressed length in the first 8 +/// bytes as a 64-bit little-endian signed integer followed by the compressed +/// buffer bytes (and then padding as required by the protocol). The +/// uncompressed length may be set to -1 to indicate that the data that +/// follows is not compressed, which can be useful for cases where +/// compression does not yield appreciable savings. fn write_buffer( - buffer: &[u8], - buffers: &mut Vec, - arrow_data: &mut Vec, - offset: i64, -) -> i64 { - let len = buffer.len(); - let pad_len = pad_to_8(len as u32); - let total_len: i64 = (len + pad_len) as i64; - // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); - buffers.push(ipc::Buffer::new(offset, total_len)); - arrow_data.extend_from_slice(buffer); - arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); - offset + total_len + buffer: &[u8], // input + buffers: &mut Vec, // output buffer descriptors + arrow_data: &mut Vec, // output stream + offset: i64, // current output stream offset + compression_codec: &Option, +) -> Result { + let len: i64 = match compression_codec { + Some(compressor) => compressor.compress_to_vec(buffer, arrow_data)?, + None => { + arrow_data.extend_from_slice(buffer); + buffer.len() + } + } + .try_into() + .map_err(|e| { + ArrowError::InvalidArgumentError(format!( + "Could not convert compressed size to i64: {}", + e + )) + })?; + + // make new index entry + buffers.push(ipc::Buffer::new(offset, len)); + // padding and make offset 8 bytes aligned + let pad_len = pad_to_8(len as u32) as i64; + arrow_data.extend_from_slice(&vec![0u8; pad_len as usize][..]); + + Ok(offset + len + pad_len) } /// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes @@ -1143,6 +1281,167 @@ mod tests { use crate::ipc::reader::*; use crate::util::integration_util::*; + #[test] + #[cfg(feature = "ipc_compression")] + fn test_write_empty_record_batch_lz4_compression() { + let file_name = "arrow_lz4_empty"; + let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]); + let values: Vec> = vec![]; + let array = Int32Array::from(values); + let record_batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]) + .unwrap(); + { + let file = + File::create(format!("target/debug/testdata/{}.arrow_file", file_name)) + .unwrap(); + let write_option = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) + .unwrap() + .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME)) + .unwrap(); + + let mut writer = + FileWriter::try_new_with_options(file, &schema, write_option).unwrap(); + writer.write(&record_batch).unwrap(); + writer.finish().unwrap(); + } + { + // read file + let file = + File::open(format!("target/debug/testdata/{}.arrow_file", file_name)) + .unwrap(); + let mut reader = FileReader::try_new(file, None).unwrap(); + loop { + match reader.next() { + Some(Ok(read_batch)) => { + read_batch + .columns() + .iter() + .zip(record_batch.columns()) + .for_each(|(a, b)| { + assert_eq!(a.data_type(), b.data_type()); + assert_eq!(a.len(), b.len()); + assert_eq!(a.null_count(), b.null_count()); + }); + } + Some(Err(e)) => { + panic!("{}", e); + } + None => { + break; + } + } + } + } + } + + #[test] + #[cfg(feature = "ipc_compression")] + fn test_write_file_with_lz4_compression() { + let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]); + let values: Vec> = vec![Some(12), Some(1)]; + let array = Int32Array::from(values); + let record_batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]) + .unwrap(); + { + let file = + File::create("target/debug/testdata/arrow_lz4.arrow_file").unwrap(); + let write_option = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) + .unwrap() + .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME)) + .unwrap(); + + let mut writer = + FileWriter::try_new_with_options(file, &schema, write_option).unwrap(); + writer.write(&record_batch).unwrap(); + writer.finish().unwrap(); + } + { + // read file + let file = + File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_lz4")) + .unwrap(); + let mut reader = FileReader::try_new(file, None).unwrap(); + loop { + match reader.next() { + Some(Ok(read_batch)) => { + read_batch + .columns() + .iter() + .zip(record_batch.columns()) + .for_each(|(a, b)| { + assert_eq!(a.data_type(), b.data_type()); + assert_eq!(a.len(), b.len()); + assert_eq!(a.null_count(), b.null_count()); + }); + } + Some(Err(e)) => { + panic!("{}", e); + } + None => { + break; + } + } + } + } + } + + #[test] + #[cfg(feature = "ipc_compression")] + fn test_write_file_with_zstd_compression() { + let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]); + let values: Vec> = vec![Some(12), Some(1)]; + let array = Int32Array::from(values); + let record_batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]) + .unwrap(); + { + let file = + File::create("target/debug/testdata/arrow_zstd.arrow_file").unwrap(); + let write_option = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) + .unwrap() + .try_with_compression(Some(ipc::CompressionType::ZSTD)) + .unwrap(); + + let mut writer = + FileWriter::try_new_with_options(file, &schema, write_option).unwrap(); + writer.write(&record_batch).unwrap(); + writer.finish().unwrap(); + } + { + // read file + let file = + File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_zstd")) + .unwrap(); + let mut reader = FileReader::try_new(file, None).unwrap(); + loop { + match reader.next() { + Some(Ok(read_batch)) => { + read_batch + .columns() + .iter() + .zip(record_batch.columns()) + .for_each(|(a, b)| { + assert_eq!(a.data_type(), b.data_type()); + assert_eq!(a.len(), b.len()); + assert_eq!(a.null_count(), b.null_count()); + }); + } + Some(Err(e)) => { + panic!("{}", e); + } + None => { + break; + } + } + } + } + } + #[test] fn test_write_file() { let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]); @@ -1499,6 +1798,107 @@ mod tests { }); } + #[test] + #[cfg(feature = "ipc_compression")] + fn read_and_rewrite_compression_files_200() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "2.0.0-compression"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read and rewrite the file to a temp location + { + let file = File::create(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); + // write IPC version 5 + let options = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) + .unwrap() + .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME)) + .unwrap(); + + let mut writer = + FileWriter::try_new_with_options(file, &reader.schema(), options) + .unwrap(); + while let Some(Ok(batch)) = reader.next() { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + } + + let file = File::open(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); + } + + #[test] + #[cfg(feature = "ipc_compression")] + fn read_and_rewrite_compression_stream_200() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "2.0.0-compression"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let reader = StreamReader::try_new(file, None).unwrap(); + + // read and rewrite the stream to a temp location + { + let file = File::create(format!( + "target/debug/testdata/{}-{}.stream", + version, path + )) + .unwrap(); + let options = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) + .unwrap() + .try_with_compression(Some(ipc::CompressionType::ZSTD)) + .unwrap(); + + let mut writer = + StreamWriter::try_new_with_options(file, &reader.schema(), options) + .unwrap(); + reader.for_each(|batch| { + writer.write(&batch.unwrap()).unwrap(); + }); + writer.finish().unwrap(); + } + + let file = + File::open(format!("target/debug/testdata/{}-{}.stream", version, path)) + .unwrap(); + let mut reader = StreamReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader).unwrap()); + }); + } + /// Read gzipped JSON file fn read_gzip_json(version: &str, path: &str) -> ArrowJson { let testdata = crate::util::test_util::arrow_test_data();