Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added CI for IO IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 13, 2022
1 parent 0bbf757 commit 30aca17
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 13 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,27 @@ jobs:
# --skip io: miri can't handle opening of files, so we skip those
run: cargo miri test --features full -- --skip io::parquet --skip io::ipc

miri-checks-io:
name: MIRI on IO IPC
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: true # needed to test IPC, which are located in a submodule
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-12-10
override: true
- uses: Swatinem/rust-cache@v1
with:
key: key1
- name: Install Miri
run: |
rustup component add miri
cargo miri setup
- name: Run
run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --tests --features io_ipc,io_json_integration io::ipc::write::write_sliced_list

coverage:
name: Coverage
runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions src/io/ipc/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ mod tests {

#[cfg(feature = "io_ipc_compression")]
#[test]
#[cfg_attr(miri, ignore)] // ZSTD uses foreign calls that miri does not support
fn round_trip_zstd() {
let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();
let mut buffer = vec![];
Expand All @@ -79,6 +80,7 @@ mod tests {

#[cfg(feature = "io_ipc_compression")]
#[test]
#[cfg_attr(miri, ignore)] // LZ4 uses foreign calls that miri does not support
fn round_trip_lz4() {
let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();
let mut buffer = vec![];
Expand Down
7 changes: 4 additions & 3 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::convert::TryInto;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

Expand All @@ -11,7 +12,7 @@ use super::super::{ARROW_MAGIC, CONTINUATION_MARKER};
use super::common::*;
use super::schema::fb_to_schema;
use super::Dictionaries;
use arrow_format::ipc::planus::{ReadAsRoot, ToOwned, Vector};
use arrow_format::ipc::planus::{ReadAsRoot, Vector};

#[derive(Debug, Clone)]
pub struct FileMetadata {
Expand Down Expand Up @@ -64,7 +65,7 @@ fn read_dictionaries<R: Read + Seek>(
reader: &mut R,
fields: &[Field],
ipc_schema: &IpcSchema,
blocks: Vector<arrow_format::ipc::Block>,
blocks: Vector<arrow_format::ipc::BlockRef>,
) -> Result<Dictionaries> {
let mut dictionaries = Default::default();
let mut data = vec![];
Expand Down Expand Up @@ -158,7 +159,7 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
ipc_schema,
blocks: blocks
.iter()
.map(|block| Ok(ToOwned::to_owned(block)?))
.map(|block| Ok(block.try_into()?))
.collect::<Result<Vec<_>>>()?,
dictionaries,
})
Expand Down
2 changes: 2 additions & 0 deletions tests/it/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,13 @@ fn read_generated_017_union() -> Result<()> {
}

#[test]
#[cfg_attr(miri, ignore)] // LZ4 uses foreign calls that miri does not support
fn read_generated_200_compression_lz4() -> Result<()> {
test_file("2.0.0-compression", "generated_lz4")
}

#[test]
#[cfg_attr(miri, ignore)] // ZSTD uses foreign calls that miri does not support
fn read_generated_200_compression_zstd() -> Result<()> {
test_file("2.0.0-compression", "generated_zstd")
}
Expand Down
15 changes: 5 additions & 10 deletions tests/it/io/ipc/write/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,11 @@ fn round_trip(
columns: Chunk<Arc<dyn Array>>,
schema: Schema,
ipc_fields: Option<Vec<IpcField>>,
compression: Option<Compression>,
) -> Result<()> {
let (expected_schema, expected_batches) = (schema.clone(), vec![columns]);

let result = write_(
&expected_batches,
&schema,
ipc_fields,
Some(Compression::ZSTD),
)?;
let result = write_(&expected_batches, &schema, ipc_fields, compression)?;
let mut reader = Cursor::new(result);
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema.clone();
Expand Down Expand Up @@ -340,7 +336,7 @@ fn write_boolean() -> Result<()> {
])) as Arc<dyn Array>;
let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]);
let columns = Chunk::try_new(vec![array])?;
round_trip(columns, schema, None)
round_trip(columns, schema, None, Some(Compression::ZSTD))
}

#[test]
Expand All @@ -350,11 +346,10 @@ fn write_sliced_utf8() -> Result<()> {
let array = Arc::new(Utf8Array::<i32>::from_slice(["aa", "bb"]).slice(1, 1)) as Arc<dyn Array>;
let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]);
let columns = Chunk::try_new(vec![array])?;
round_trip(columns, schema, None)
round_trip(columns, schema, None, Some(Compression::ZSTD))
}

#[test]
#[cfg_attr(miri, ignore)] // compression uses FFI, which miri does not support
fn write_sliced_list() -> Result<()> {
let data = vec![
Some(vec![Some(1i32), Some(2), Some(3)]),
Expand All @@ -368,5 +363,5 @@ fn write_sliced_list() -> Result<()> {

let schema = Schema::from(vec![Field::new("a", array.data_type().clone(), true)]);
let columns = Chunk::try_new(vec![array])?;
round_trip(columns, schema, None)
round_trip(columns, schema, None, None)
}

0 comments on commit 30aca17

Please sign in to comment.