Skip to content

Commit

Permalink
support compression for IPC with revamped feature flags (#2369)
Browse files Browse the repository at this point in the history
* support compression for IPC

* fix conflict

* edit toml

* fix clippy and format

* format doc

* format code

* add padding for tail of each buffer

* try fix the arrow lz4 and zstd

* add lz4,zstd for compression cfg

* add cfg for ipm_compression

* address comments

* Rework ipc_compression feature flags and plumb through errors

* fixup flags in reader

* Make stub interface

* Compiles without ipc_compression support

* Fix tests

* Clean up writing

* use uniform flag syntax

* fix flags

* Rename for clarity

* fix compilation

* Add ipc_compression tests to IC

* fix: clippy

* merge-confligts

* Add note in doc

* Remove redundant dev dependencies

* improve variable name

* Apply suggestions from code review

Co-authored-by: Kun Liu <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>

* improve comment in stub.rs

* Fix for new clippy

* Clean up clippy

* Clean up header writing

* fmt

Co-authored-by: liukun4515 <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
4 people authored Aug 14, 2022
1 parent 1c879ae commit 5e27d93
Show file tree
Hide file tree
Showing 9 changed files with 929 additions and 68 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/arrow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ jobs:
- name: Test
run: |
cargo test -p arrow
- name: Test --features=force_validate,prettyprint,ffi
- name: Test --features=force_validate,prettyprint,ipc_compression,ffi
run: |
cargo test -p arrow --features=force_validate,prettyprint,ffi
cargo test -p arrow --features=force_validate,prettyprint,ipc_compression,ffi
- name: Run examples
run: |
# Test arrow examples
Expand Down Expand Up @@ -172,4 +172,4 @@ jobs:
rustup component add clippy
- name: Run clippy
run: |
cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi --all-targets -- -D warnings
cargo clippy -p arrow --features=prettyprint,csv,ipc,test_utils,ffi,ipc_compression --all-targets -- -D warnings
3 changes: 3 additions & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ csv_crate = { version = "1.1", default-features = false, optional = true, packag
regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] }
regex-syntax = { version = "0.6.27", default-features = false, features = ["unicode"] }
lazy_static = { version = "1.4", default-features = false }
lz4 = { version = "1.23", default-features = false, optional = true }
packed_simd = { version = "0.3", default-features = false, optional = true, package = "packed_simd_2" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.6", default-features = false, optional = true }
Expand All @@ -66,9 +67,11 @@ pyo3 = { version = "0.16", default-features = false, optional = true }
lexical-core = { version = "^0.8", default-features = false, features = ["write-integers", "write-floats", "parse-integers", "parse-floats"] }
multiversion = { version = "0.6.1", default-features = false }
bitflags = { version = "1.2.1", default-features = false }
zstd = { version = "0.11.1", default-features = false, optional = true }

[features]
default = ["csv", "ipc"]
ipc_compression = ["ipc", "zstd", "lz4"]
csv = ["csv_crate"]
ipc = ["flatbuffers"]
simd = ["packed_simd"]
Expand Down
3 changes: 2 additions & 1 deletion arrow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ However, for historical reasons, this crate uses versions with major numbers gre
The `arrow` crate provides the following features which may be enabled in your `Cargo.toml`:

- `csv` (default) - support for reading and writing Arrow arrays to/from csv files
- `ipc` (default) - support for the [arrow-flight](https://crates.io/crates/arrow-flight) IPC and wire format
- `ipc` (default) - support for reading [Arrow IPC Format](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc), also used as the wire protocol in [arrow-flight](https://crates.io/crates/arrow-flight)
- `ipc_compression` - Enables reading and writing compressed IPC streams (also enables `ipc`)
- `prettyprint` - support for formatting record batches as textual columns
- `js` - support for building arrow for WebAssembly / JavaScript
- `simd` - (_Requires Nightly Rust_) Use alternate hand optimized
Expand Down
205 changes: 205 additions & 0 deletions arrow/src/ipc/compression/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::buffer::Buffer;
use crate::error::{ArrowError, Result};
use crate::ipc::CompressionType;
use std::io::{Read, Write};

const LENGTH_NO_COMPRESSED_DATA: i64 = -1;
const LENGTH_OF_PREFIX_DATA: i64 = 8;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// Represents compressing a ipc stream using a particular compression algorithm
pub enum CompressionCodec {
Lz4Frame,
Zstd,
}

impl TryFrom<CompressionType> for CompressionCodec {
type Error = ArrowError;

fn try_from(compression_type: CompressionType) -> Result<Self> {
match compression_type {
CompressionType::ZSTD => Ok(CompressionCodec::Zstd),
CompressionType::LZ4_FRAME => Ok(CompressionCodec::Lz4Frame),
other_type => Err(ArrowError::NotYetImplemented(format!(
"compression type {:?} not supported ",
other_type
))),
}
}
}

impl CompressionCodec {
/// Compresses the data in `input` to `output` and appends the
/// data using the specified compression mechanism.
///
/// returns the number of bytes written to the stream
///
/// Writes this format to output:
/// ```text
/// [8 bytes]: uncompressed length
/// [remaining bytes]: compressed data stream
/// ```
pub(crate) fn compress_to_vec(
&self,
input: &[u8],
output: &mut Vec<u8>,
) -> Result<usize> {
let uncompressed_data_len = input.len();
let original_output_len = output.len();

if input.is_empty() {
// empty input, nothing to do
} else {
// write compressed data directly into the output buffer
output.extend_from_slice(&uncompressed_data_len.to_le_bytes());
self.compress(input, output)?;

let compression_len = output.len();
if compression_len > uncompressed_data_len {
// length of compressed data was larger than
// uncompressed data, use the uncompressed data with
// length -1 to indicate that we don't compress the
// data
output.truncate(original_output_len);
output.extend_from_slice(&LENGTH_NO_COMPRESSED_DATA.to_le_bytes());
output.extend_from_slice(input);
}
}
Ok(output.len() - original_output_len)
}

/// Decompresses the input into a [`Buffer`]
///
/// The input should look like:
/// ```text
/// [8 bytes]: uncompressed length
/// [remaining bytes]: compressed data stream
/// ```
pub(crate) fn decompress_to_buffer(&self, input: &[u8]) -> Result<Buffer> {
// read the first 8 bytes to determine if the data is
// compressed
let decompressed_length = read_uncompressed_size(input);
let buffer = if decompressed_length == 0 {
// emtpy
let empty = Vec::<u8>::new();
Buffer::from(empty)
} else if decompressed_length == LENGTH_NO_COMPRESSED_DATA {
// no compression
let data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
Buffer::from(data)
} else {
// decompress data using the codec
let mut uncompressed_buffer =
Vec::with_capacity(decompressed_length as usize);
let input_data = &input[(LENGTH_OF_PREFIX_DATA as usize)..];
self.decompress(input_data, &mut uncompressed_buffer)?;
Buffer::from(uncompressed_buffer)
};
Ok(buffer)
}

/// Compress the data in input buffer and write to output buffer
/// using the specified compression
fn compress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<()> {
match self {
CompressionCodec::Lz4Frame => {
let mut encoder = lz4::EncoderBuilder::new().build(output)?;
encoder.write_all(input)?;
match encoder.finish().1 {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
CompressionCodec::Zstd => {
let mut encoder = zstd::Encoder::new(output, 0)?;
encoder.write_all(input)?;
match encoder.finish() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
}
}

/// Decompress the data in input buffer and write to output buffer
/// using the specified compression
fn decompress(&self, input: &[u8], output: &mut Vec<u8>) -> Result<usize> {
let result: Result<usize> = match self {
CompressionCodec::Lz4Frame => {
let mut decoder = lz4::Decoder::new(input)?;
match decoder.read_to_end(output) {
Ok(size) => Ok(size),
Err(e) => Err(e.into()),
}
}
CompressionCodec::Zstd => {
let mut decoder = zstd::Decoder::new(input)?;
match decoder.read_to_end(output) {
Ok(size) => Ok(size),
Err(e) => Err(e.into()),
}
}
};
result
}
}

/// Get the uncompressed length
/// Notes:
/// LENGTH_NO_COMPRESSED_DATA: indicate that the data that follows is not compressed
/// 0: indicate that there is no data
/// positive number: indicate the uncompressed length for the following data
#[inline]
fn read_uncompressed_size(buffer: &[u8]) -> i64 {
let len_buffer = &buffer[0..8];
// 64-bit little-endian signed integer
i64::from_le_bytes(len_buffer.try_into().unwrap())
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_lz4_compression() {
let input_bytes = "hello lz4".as_bytes();
let codec: CompressionCodec = CompressionCodec::Lz4Frame;
let mut output_bytes: Vec<u8> = Vec::new();
codec.compress(input_bytes, &mut output_bytes).unwrap();
let mut result_output_bytes: Vec<u8> = Vec::new();
codec
.decompress(output_bytes.as_slice(), &mut result_output_bytes)
.unwrap();
assert_eq!(input_bytes, result_output_bytes.as_slice());
}

#[test]
fn test_zstd_compression() {
let input_bytes = "hello zstd".as_bytes();
let codec: CompressionCodec = CompressionCodec::Zstd;
let mut output_bytes: Vec<u8> = Vec::new();
codec.compress(input_bytes, &mut output_bytes).unwrap();
let mut result_output_bytes: Vec<u8> = Vec::new();
codec
.decompress(output_bytes.as_slice(), &mut result_output_bytes)
.unwrap();
assert_eq!(input_bytes, result_output_bytes.as_slice());
}
}
26 changes: 26 additions & 0 deletions arrow/src/ipc/compression/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[cfg(feature = "ipc_compression")]
mod codec;
#[cfg(feature = "ipc_compression")]
pub(crate) use codec::CompressionCodec;

#[cfg(not(feature = "ipc_compression"))]
mod stub;
#[cfg(not(feature = "ipc_compression"))]
pub(crate) use stub::CompressionCodec;
63 changes: 63 additions & 0 deletions arrow/src/ipc/compression/stub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Stubs that implement the same interface as the ipc_compression
//! codec module, but always errors.
use crate::buffer::Buffer;
use crate::error::{ArrowError, Result};
use crate::ipc::CompressionType;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionCodec {}

impl TryFrom<CompressionCodec> for CompressionType {
type Error = ArrowError;
fn try_from(codec: CompressionCodec) -> Result<Self> {
Err(ArrowError::InvalidArgumentError(
format!("codec type {:?} not supported because arrow was not compiled with the ipc_compression feature", codec)))
}
}

impl TryFrom<CompressionType> for CompressionCodec {
type Error = ArrowError;

fn try_from(compression_type: CompressionType) -> Result<Self> {
Err(ArrowError::InvalidArgumentError(
format!("compression type {:?} not supported because arrow was not compiled with the ipc_compression feature", compression_type))
)
}
}

impl CompressionCodec {
#[allow(clippy::ptr_arg)]
pub(crate) fn compress_to_vec(
&self,
_input: &[u8],
_output: &mut Vec<u8>,
) -> Result<usize> {
Err(ArrowError::InvalidArgumentError(
"compression not supported because arrow was not compiled with the ipc_compression feature".to_string()
))
}

pub(crate) fn decompress_to_buffer(&self, _input: &[u8]) -> Result<Buffer> {
Err(ArrowError::InvalidArgumentError(
"decompression not supported because arrow was not compiled with the ipc_compression feature".to_string()
))
}
}
2 changes: 2 additions & 0 deletions arrow/src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub mod convert;
pub mod reader;
pub mod writer;

mod compression;

#[allow(clippy::redundant_closure)]
#[allow(clippy::needless_lifetimes)]
#[allow(clippy::extra_unused_lifetimes)]
Expand Down
Loading

0 comments on commit 5e27d93

Please sign in to comment.