Skip to content

Commit

Permalink
Async IPC (#307)
Browse files Browse the repository at this point in the history
Implemented for both pure futures (e.g. Tokio) as well as MonoIO for
IO-uring support.
  • Loading branch information
gatesn authored May 12, 2024
1 parent fd3a72b commit 57b8581
Show file tree
Hide file tree
Showing 14 changed files with 755 additions and 4 deletions.
102 changes: 101 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ flatbuffers = "23.5.26"
flatc = "0.2.2"
flexbuffers = "2.0.0"
fs_extra = "1.3.0"
futures-util = "0.3.30"
getrandom = "0.2.14"
half = { version = "^2", features = ["std", "num-traits"] }
hashbrown = "0.14.3"
Expand All @@ -71,10 +72,12 @@ itertools = "0.12.1"
lazy_static = "1.4.0"
leb128 = "0.2.5"
log = "0.4.21"
monoio = "0.2.3"
num-traits = "0.2.18"
num_enum = "0.7.2"
parquet = "51.0.0"
paste = "1.0.14"
pin-project = "1.1.5"
prost = "0.12.4"
prost-build = "0.12.4"
prost-types = "0.12.4"
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl<'v> IntoArray<'v> for ArrayView<'v> {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ViewContext {
encodings: Vec<EncodingRef>,
}
Expand Down
6 changes: 6 additions & 0 deletions vortex-buffer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ impl From<Vec<u8>> for Buffer {
}
}

impl From<bytes::Bytes> for Buffer {
fn from(value: bytes::Bytes) -> Self {
Buffer::Bytes(value)
}
}

impl From<ArrowBuffer> for Buffer {
fn from(value: ArrowBuffer) -> Self {
Buffer::Arrow(value)
Expand Down
12 changes: 12 additions & 0 deletions vortex-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ rust-version = { workspace = true }

[dependencies]
arrow-buffer = { workspace = true }
bytes = { workspace = true }
fallible-iterator = { workspace = true }
flatbuffers = { workspace = true }
futures-util = { workspace = true, features = ["io"] }
itertools = { workspace = true }
log = { workspace = true }
monoio = { workspace = true, optional = true, features = ["bytes"] }
nougat = "0.2.4"
pin-project = { workspace = true }
tokio = { workspace = true, optional = true }
vortex-array = { path = "../vortex-array" }
vortex-buffer = { path = "../vortex-buffer" }
vortex-error = { path = "../vortex-error" }
Expand All @@ -33,6 +38,7 @@ walkdir = { workspace = true }
criterion = { workspace = true }
rand = { workspace = true }
simplelog = { workspace = true }
tokio = { workspace = true, features = ["full"] }
vortex-alp = { path = "../vortex-alp" }
vortex-fastlanes = { path = "../vortex-fastlanes" }
arrow = { workspace = true }
Expand All @@ -44,6 +50,12 @@ arrow-select = { workspace = true }
[lints]
workspace = true

[features]
default = ["futures", "monoio", "tokio"]
futures = []
monoio = ["dep:monoio"]
tokio = ["dep:tokio"]

[[bench]]
name = "ipc_take"
harness = false
Expand Down
125 changes: 125 additions & 0 deletions vortex-ipc/src/codecs/array_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use std::pin::Pin;
use std::task::Poll;

use futures_util::Stream;
use pin_project::pin_project;
use vortex::{Array, ArrayView, IntoArray, OwnedArray, ToArray, ToStatic, ViewContext};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{VortexError, VortexResult};

use crate::codecs::message_reader::MessageReader;

/// A stream of array chunks along with a DType.
///
/// Can be thought of as equivalent to Arrow's RecordBatchReader.
pub trait ArrayReader: Stream<Item = VortexResult<OwnedArray>> {
fn dtype(&self) -> &DType;
}

/// An adapter for a stream of array chunks to implement an ArrayReader.
#[pin_project]
struct ArrayReaderAdapter<S> {
dtype: DType,
#[pin]
inner: S,
}

impl<S> ArrayReader for ArrayReaderAdapter<S>
where
S: Stream<Item = VortexResult<OwnedArray>>,
{
fn dtype(&self) -> &DType {
&self.dtype
}
}

impl<S> Stream for ArrayReaderAdapter<S>
where
S: Stream<Item = VortexResult<OwnedArray>>,
{
type Item = VortexResult<OwnedArray>;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}

pub(crate) struct MessageArrayReader<'a, M: MessageReader> {
ctx: ViewContext,
dtype: DType,
messages: &'a mut M,

// State
buffers: Vec<Buffer>,
row_offset: usize,
}

impl<'m, M: MessageReader> MessageArrayReader<'m, M> {
/// Construct an ArrayReader with a message stream containing chunk messages.
pub fn new(ctx: ViewContext, dtype: DType, messages: &'m mut M) -> Self {
Self {
ctx,
dtype,
messages,
buffers: Vec::new(),
row_offset: 0,
}
}

pub fn into_reader(self) -> impl ArrayReader + 'm {
let dtype = self.dtype.clone();

let inner = futures_util::stream::unfold(self, move |mut reader| async move {
match reader.next().await {
Ok(Some(array)) => Some((Ok(array.to_static()), reader)),
Ok(None) => None,
Err(e) => Some((Err(e), reader)),
}
});

ArrayReaderAdapter { dtype, inner }
}
}

impl<M: MessageReader> MessageArrayReader<'_, M> {
pub async fn next(&mut self) -> VortexResult<Option<Array>> {
if self
.messages
.peek()
.and_then(|msg| msg.header_as_chunk())
.is_none()
{
return Ok(None);
}

// TODO(ngates): can we reuse our existing buffers?
self.buffers = self.messages.buffers().await?;

// After reading the buffers we're now able to load the next message.
let col_array = self
.messages
.next()
.await?
.header_as_chunk()
.unwrap()
.array()
.unwrap();

let view = ArrayView::try_new(&self.ctx, &self.dtype, col_array, self.buffers.as_slice())?;

// Validate it
view.to_array().with_dyn(|_| Ok::<(), VortexError>(()))?;

let array = view.into_array();
self.row_offset += array.len();
Ok(Some(array))
}
}
Loading

0 comments on commit 57b8581

Please sign in to comment.