Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for IPC 2.0 (compression).
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 24, 2021
1 parent c93b3eb commit 5b24f33
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 72 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ prettytable-rs = { version = "^0.8", optional = true }
flatbuffers = { version = "=0.8.4", optional = true }
hex = { version = "^0.4", optional = true }

# for IPC compression
lz4 = { version = "1.23.1", optional = true }
zstd = { version = "^0.6", optional = true }

rand = { version = "0.7", optional = true }

itertools = { version = "^0.10", optional = true }
Expand Down Expand Up @@ -72,7 +76,7 @@ default = ["io_csv", "io_json", "io_ipc", "io_json_integration", "io_print", "io
merge_sort = ["itertools"]
io_csv = ["csv", "lazy_static", "regex"]
io_json = ["serde", "serde_derive", "serde_json", "indexmap"]
io_ipc = ["flatbuffers"]
io_ipc = ["flatbuffers", "lz4", "zstd"]
io_json_integration = ["io_json", "hex"]
io_print = ["prettytable-rs"]
# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ venv/bin/python parquet_integration/write_parquet.py
* Generalized parsing of CSV based on logical data types
* conditional compilation based on cargo `features` to reduce dependencies and size
* faster IPC reader (different design that avoids an extra copy of all data)
* IPC supports 2.0 (compression)

## Features in the original not available in this crate

Expand Down
1 change: 1 addition & 0 deletions arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ pub fn flight_data_to_arrow_batch(
&dictionaries_by_field,
&mut reader,
0,
message.compression(),
)
})?
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ async fn record_batch_from_message(
&dictionaries_by_field,
&mut reader,
0,
message.compression(),
);

arrow_batch_result
Expand Down
13 changes: 13 additions & 0 deletions src/io/ipc/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use std::io::Read;

use crate::error::Result;

pub fn decompress_lz4(input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> {
let mut decoder = lz4::Decoder::new(input_buf)?;
decoder.read_exact(output_buf).map_err(|e| e.into())
}

pub fn decompress_zstd(input_buf: &[u8], output_buf: &mut [u8]) -> Result<()> {
let mut decoder = zstd::Decoder::new(input_buf)?;
decoder.read_exact(output_buf).map_err(|e| e.into())
}
1 change: 1 addition & 0 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
pub mod gen;

pub(crate) mod common;
mod compression;
mod convert;

pub use convert::fb_to_schema;
Expand Down
9 changes: 2 additions & 7 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ pub fn read_record_batch<R: Read + Seek>(
block_offset: u64,
compression: Option<BodyCompression>,
) -> Result<RecordBatch> {
if compression.is_some() {
return Err(ArrowError::NotYetImplemented(
"IPC format with compression".to_string(),
));
}

let buffers = batch
.buffers()
.ok_or_else(|| ArrowError::Ipc("Unable to get buffers from IPC RecordBatch".to_string()))?;
Expand Down Expand Up @@ -73,9 +67,10 @@ pub fn read_record_batch<R: Read + Seek>(
reader,
block_offset,
is_little_endian,
compression,
)
})
.collect::<std::io::Result<Vec<_>>>()?;
.collect::<Result<Vec<_>>>()?;

RecordBatch::try_new(schema.clone(), arrays)
}
Expand Down
Loading

0 comments on commit 5b24f33

Please sign in to comment.