Skip to content

Commit

Permalink
Use ChunkedArrayReader in random access benchmark (#393)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Jun 18, 2024
1 parent 7332c25 commit 81cb466
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 118 deletions.
83 changes: 4 additions & 79 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ croaring = "1.0.1"
csv = "1.3.0"
datafusion-common = "39.0.0"
datafusion-expr = "39.0.0"
derive_builder = "0.20.0"
divan = "0.1.14"
duckdb = { version = "0.10.1", features = ["bundled"] }
enum-iterator = "2.0.0"
Expand Down
5 changes: 4 additions & 1 deletion bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ vortex-fastlanes = { path = "../encodings/fastlanes" }
vortex-ipc = { path = "../vortex-ipc" }
vortex-ree = { path = "../encodings/runend" }
vortex-roaring = { path = "../encodings/roaring" }
serde = { workspace = true }
bytes = { workspace = true }
flexbuffers = { workspace = true }

[dev-dependencies]
criterion = { workspace = true }
criterion = { workspace = true, features = ["html_reports", "async_tokio"] }

[[bench]]
name = "compress_benchmark"
Expand Down
4 changes: 3 additions & 1 deletion bench-vortex/benches/random_access.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bench_vortex::reader::{take_parquet, take_vortex};
use bench_vortex::taxi_data::{taxi_data_parquet, taxi_data_vortex};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tokio::runtime::Runtime;

fn random_access(c: &mut Criterion) {
let mut group = c.benchmark_group("random access");
Expand All @@ -10,7 +11,8 @@ fn random_access(c: &mut Criterion) {

let taxi_vortex = taxi_data_vortex();
group.bench_function("vortex", |b| {
b.iter(|| black_box(take_vortex(&taxi_vortex, &indices).unwrap()))
b.to_async(Runtime::new().unwrap())
.iter(|| async { black_box(take_vortex(&taxi_vortex, &indices).await.unwrap()) })
});

let taxi_parquet = taxi_data_parquet();
Expand Down
7 changes: 5 additions & 2 deletions bench-vortex/src/bin/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ use bench_vortex::setup_logger;
use bench_vortex::taxi_data::taxi_data_vortex;
use log::LevelFilter;

pub fn main() {
#[tokio::main]
pub async fn main() {
setup_logger(LevelFilter::Error);
let taxi_vortex = taxi_data_vortex();
let rows = take_vortex(&taxi_vortex, &[10, 11, 12, 13, 100_000, 3_000_000]).unwrap();
let rows = take_vortex(&taxi_vortex, &[10, 11, 12, 13, 100_000, 3_000_000])
.await
.unwrap();
println!("TAKE TAXI DATA: {:?}", rows);
}
67 changes: 60 additions & 7 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::SeekFrom;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::sync::Arc;
Expand All @@ -11,18 +12,23 @@ use arrow_array::{
};
use arrow_select::concat::concat_batches;
use arrow_select::take::take_record_batch;
use bytes::{Bytes, BytesMut};
use itertools::Itertools;
use log::info;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncSeekExt};
use tokio::runtime::Runtime;
use vortex::array::chunked::ChunkedArray;
use vortex::array::primitive::PrimitiveArray;
use vortex::arrow::FromArrowType;
use vortex::compress::Compressor;
use vortex::compute::take::take;
use vortex::stream::ArrayStreamExt;
use vortex::{Array, IntoArray, ToArrayData, ViewContext};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_error::{vortex_err, VortexResult};
use vortex_ipc::chunked_reader::ChunkedArrayReader;
use vortex_ipc::io::{TokioAdapter, VortexWrite};
use vortex_ipc::writer::ArrayWriter;
use vortex_ipc::MessageReader;
Expand All @@ -31,6 +37,12 @@ use crate::CTX;

pub const BATCH_SIZE: usize = 65_536;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VortexFooter {
pub byte_offsets: Vec<u64>,
pub row_offsets: Vec<u64>,
}

pub fn open_vortex(path: &Path) -> VortexResult<Array> {
Runtime::new()
.unwrap()
Expand Down Expand Up @@ -63,12 +75,25 @@ pub async fn rewrite_parquet_as_vortex<W: VortexWrite>(
) -> VortexResult<()> {
let chunked = compress_parquet_to_vortex(parquet_path.as_path())?;

ArrayWriter::new(write, ViewContext::from(&CTX.clone()))
let written = ArrayWriter::new(write, ViewContext::from(&CTX.clone()))
.write_context()
.await?
.write_array(chunked.into_array())
.write_array_stream(chunked.array_stream())
.await?;

let layout = written.array_layouts()[0].clone();
let mut w = written.into_inner();
let mut s = flexbuffers::FlexbufferSerializer::new();
VortexFooter {
byte_offsets: layout.chunks.byte_offsets,
row_offsets: layout.chunks.row_offsets,
}
.serialize(&mut s)?;
let footer_bytes = Buffer::Bytes(Bytes::from(s.take_buffer()));
let footer_len = footer_bytes.len() as u64;
w.write_all(footer_bytes).await?;
w.write_all(footer_len.to_le_bytes()).await?;

Ok(())
}

Expand Down Expand Up @@ -109,9 +134,37 @@ pub fn write_csv_as_parquet(csv_path: PathBuf, output_path: &Path) -> VortexResu
Ok(())
}

pub fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult<Array> {
let array = open_vortex(path)?;
let taken = take(&array, &indices.to_vec().into_array())?;
pub async fn take_vortex(path: &Path, indices: &[u64]) -> VortexResult<Array> {
let mut file = tokio::fs::File::open(path).await?;

file.seek(SeekFrom::End(-8)).await?;
let footer_len = file.read_u64_le().await? as usize;

file.seek(SeekFrom::End(-(footer_len as i64 + 8))).await?;
let mut footer_bytes = BytesMut::with_capacity(footer_len);
unsafe { footer_bytes.set_len(footer_len) }
file.read_exact(footer_bytes.as_mut()).await?;

let footer: VortexFooter = VortexFooter::deserialize(
flexbuffers::Reader::get_root(footer_bytes.as_ref()).map_err(|e| vortex_err!("{}", e))?,
)?;

file.seek(SeekFrom::Start(0)).await?;
let mut reader = MessageReader::try_new(TokioAdapter(file.try_clone().await?)).await?;
let view_ctx = reader.read_view_context(&CTX).await?;
let dtype = reader.read_dtype().await?;

file.seek(SeekFrom::Start(0)).await?;
let mut reader = ChunkedArrayReader::try_new(
TokioAdapter(file),
view_ctx,
dtype,
PrimitiveArray::from(footer.byte_offsets).into_array(),
PrimitiveArray::from(footer.row_offsets).into_array(),
)?;

let indices_array = indices.to_vec().into_array();
let taken = reader.take_rows(&indices_array).await?;
// For equivalence.... we flatten to make sure we're not cheating too much.
Ok(taken.flatten()?.into_array())
}
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub trait ArrayEncodingExt {
ArrayFlatten::flatten(typed)
}

#[inline]
fn with_dyn<R, F>(array: &Array, mut f: F) -> R
where
F: for<'b> FnMut(&'b (dyn ArrayTrait + 'b)) -> R,
Expand Down
8 changes: 8 additions & 0 deletions vortex-array/src/implementation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,15 @@ macro_rules! impl_encoding {
impl TryFrom<Array> for [<$Name Array>] {
type Error = VortexError;

#[inline]
fn try_from(array: Array) -> Result<Self, Self::Error> {
TypedArray::<$Name>::try_from(array).map(Self::from)
}
}
impl TryFrom<&Array> for [<$Name Array>] {
type Error = VortexError;

#[inline]
fn try_from(array: &Array) -> Result<Self, Self::Error> {
TypedArray::<$Name>::try_from(array).map(Self::from)
}
Expand All @@ -133,14 +135,17 @@ macro_rules! impl_encoding {
#[derive(Debug)]
pub struct [<$Name Encoding>];
impl ArrayEncoding for [<$Name Encoding>] {
#[inline]
fn as_any(&self) -> &dyn Any {
self
}

#[inline]
fn id(&self) -> EncodingId {
$Name::ID
}

#[inline]
fn flatten(&self, array: Array) -> VortexResult<Flattened> {
<Self as ArrayEncodingExt>::flatten(array)
}
Expand All @@ -154,6 +159,7 @@ macro_rules! impl_encoding {
<Self as ArrayEncodingExt>::with_dyn(array, f)
}

#[inline]
fn compression(&self) -> &dyn EncodingCompression {
self
}
Expand All @@ -164,10 +170,12 @@ macro_rules! impl_encoding {

/// Implement ArrayMetadata
impl ArrayMetadata for [<$Name Metadata>] {
#[inline]
fn as_any(&self) -> &dyn Any {
self
}

#[inline]
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self
}
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl ArrayVisitor for NBytesVisitor {
}

impl Array {
#[inline]
pub fn with_dyn<R, F>(&self, mut f: F) -> R
where
F: FnMut(&dyn ArrayTrait) -> R,
Expand Down
1 change: 0 additions & 1 deletion vortex-array/src/stream/take_rows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub struct TakeRows<'idx, R: ArrayStream> {
}

impl<'idx, R: ArrayStream> TakeRows<'idx, R> {
#[allow(dead_code)]
pub fn try_new(reader: R, indices: &'idx Array) -> VortexResult<Self> {
if !indices.is_empty() {
if !indices.statistics().compute_is_sorted().unwrap_or(false) {
Expand Down
1 change: 0 additions & 1 deletion vortex-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ rust-version = { workspace = true }

[dependencies]
bytes = { workspace = true }
derive_builder = { workspace = true }
flatbuffers = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
Expand Down
Loading

0 comments on commit 81cb466

Please sign in to comment.