Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial version of simple FileReader/Writer #516

Merged
merged 48 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
315df41
Add vortex file format
robert3005 Jul 11, 2024
e6c4818
less
robert3005 Jul 23, 2024
b47f95c
bug fix and minimal reader
AdamGS Jul 23, 2024
d449c0f
st
robert3005 Jul 23, 2024
e1a0184
dtype reader
AdamGS Jul 23, 2024
99770c3
.
AdamGS Jul 23, 2024
b1224df
some more things
AdamGS Jul 24, 2024
9f9a519
basic reader that works
AdamGS Jul 24, 2024
655aea6
should be able to read multiple batches
AdamGS Jul 24, 2024
35f7def
chunked arrays
AdamGS Jul 24, 2024
09e7862
Slightly nicer code
AdamGS Jul 24, 2024
fe06109
cosmetic changes
AdamGS Jul 24, 2024
93c02aa
unaligned
robert3005 Jul 23, 2024
2b1f4dd
something
robert3005 Jul 24, 2024
516a30c
less
robert3005 Jul 24, 2024
cd95cab
moves
robert3005 Jul 24, 2024
6a0ba24
less
robert3005 Jul 24, 2024
b48e860
refactor
robert3005 Jul 24, 2024
10f61f5
minor changes
AdamGS Jul 24, 2024
8f45a6b
more
robert3005 Jul 24, 2024
17538df
fixes
robert3005 Jul 24, 2024
d9a560f
less
robert3005 Jul 24, 2024
ad71f73
less
robert3005 Jul 24, 2024
e834679
less
robert3005 Jul 24, 2024
5a30f7a
unwind
robert3005 Jul 24, 2024
4d0ac6b
fix
robert3005 Jul 24, 2024
857c242
fixed
robert3005 Jul 24, 2024
b2ba79e
cr note
AdamGS Jul 25, 2024
bac53d5
fix lint
AdamGS Jul 25, 2024
7fb2a31
minor change
AdamGS Jul 25, 2024
5ec5a44
.
AdamGS Jul 25, 2024
868d069
Merge remote-tracking branch 'origin/develop' into rk/metadata
lwwmanning Jul 25, 2024
9a04327
asserts
robert3005 Jul 29, 2024
8175c2c
move len around
AdamGS Jul 29, 2024
8349398
More things and some refactoring
AdamGS Jul 29, 2024
370a9d0
projection work and basic test
AdamGS Jul 29, 2024
4accbe3
typo
robert3005 Jul 29, 2024
0f4a5bc
mask
robert3005 Jul 29, 2024
3baa4e7
ignore
robert3005 Jul 29, 2024
ced449a
metadatas are first in chunked layouts
robert3005 Jul 29, 2024
476fc48
.
AdamGS Jul 29, 2024
d6daeb8
ignore
robert3005 Jul 29, 2024
c36f6c2
read
robert3005 Jul 29, 2024
7436ab3
Fix metadata layout bug
AdamGS Jul 29, 2024
8b169f6
some CR notes
AdamGS Jul 29, 2024
0c903d4
less
robert3005 Jul 29, 2024
f3db79a
magic
robert3005 Jul 29, 2024
1f34b42
fmt
robert3005 Jul 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 35 additions & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ members = [
"vortex-error",
"vortex-expr",
"vortex-flatbuffers",
"vortex-ipc",
"vortex-serde",
"vortex-sampling-compressor",
]
resolver = "2"
Expand All @@ -32,6 +32,7 @@ rust-version = "1.76"
[workspace.dependencies]
ahash = "0.8.11"
allocator-api2 = "0.2.16"
anyhow = "1.0"
arrayref = "0.3.7"
arrow = { version = "52.0.0", features = ["pyarrow"] }
arrow-arith = "52.0.0"
Expand All @@ -53,7 +54,6 @@ chrono = "0.4.38"
criterion = { version = "0.5.1", features = ["html_reports"] }
croaring = "2.0.0"
csv = "1.3.0"
object_store = "0.10.1"
datafusion = "40.0.0"
datafusion-common = "40.0.0"
datafusion-execution = "40.0.0"
Expand Down Expand Up @@ -81,9 +81,11 @@ itertools = "0.13.0"
lazy_static = "1.4.0"
leb128 = "0.2.5"
log = "0.4.21"
mimalloc = "0.1.42"
monoio = "0.2.3"
num-traits = "0.2.18"
num_enum = "0.7.2"
object_store = "0.10.1"
parquet = "52.0.0"
paste = "1.0.14"
pin-project = "1.1.5"
Expand All @@ -108,8 +110,8 @@ uninit = "0.6.2"
uuid = "1.8.0"
walkdir = "2.5.0"
worker = "0.3.0"
xshell = "0.2.6"
zigzag = "0.1.0"
mimalloc = "0.1.42"

[workspace.lints.rust]
warnings = "deny"
Expand Down
6 changes: 3 additions & 3 deletions bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ rust-version = { workspace = true }
workspace = true

[dependencies]
anyhow = "1.0"
anyhow = { workspace = true }
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
Expand Down Expand Up @@ -51,11 +51,11 @@ vortex-dict = { path = "../encodings/dict" }
vortex-dtype = { path = "../vortex-dtype" }
vortex-error = { path = "../vortex-error", features = ["parquet"] }
vortex-fastlanes = { path = "../encodings/fastlanes" }
vortex-ipc = { path = "../vortex-ipc", features = ["object_store"] }
vortex-roaring = { path = "../encodings/roaring" }
vortex-runend = { path = "../encodings/runend" }
vortex-sampling-compressor = { path = "../vortex-sampling-compressor" }
xshell = "0.2.6"
vortex-serde = { path = "../vortex-serde", features = ["object_store"] }
xshell = { workspace = true }

[dev-dependencies]
criterion = { workspace = true, features = ["html_reports", "async_tokio"] }
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/src/data_downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use vortex::arrow::FromArrowType;
use vortex::{IntoArray, ToArrayData};
use vortex_dtype::DType;
use vortex_error::{VortexError, VortexResult};
use vortex_ipc::io::TokioAdapter;
use vortex_ipc::writer::ArrayWriter;
use vortex_serde::io::TokioAdapter;
use vortex_serde::writer::ArrayWriter;

use crate::idempotent;
use crate::reader::BATCH_SIZE;
Expand Down
37 changes: 26 additions & 11 deletions bench-vortex/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,30 @@ use arrow_array::{
use arrow_select::concat::concat_batches;
use arrow_select::take::take_record_batch;
use bytes::{Bytes, BytesMut};
use futures::stream;
use futures::StreamExt;
use itertools::Itertools;
use log::info;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use serde::{Deserialize, Serialize};
use stream::StreamExt;
use vortex::array::chunked::ChunkedArray;
use vortex::array::primitive::PrimitiveArray;
use vortex::arrow::FromArrowType;
use vortex::compress::CompressionStrategy;
use vortex::compute::take;
use vortex::stream::ArrayStreamExt;
use vortex::{Array, IntoArray, IntoCanonical, ToArrayData};
use vortex::{Array, ArrayDType, IntoArray, IntoCanonical, ToArrayData};
use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexResult};
use vortex_ipc::chunked_reader::ChunkedArrayReader;
use vortex_ipc::io::{ObjectStoreExt, TokioAdapter, VortexReadAt, VortexWrite};
use vortex_ipc::writer::ArrayWriter;
use vortex_ipc::MessageReader;
use vortex_sampling_compressor::SamplingCompressor;
use vortex_serde::chunked_reader::ChunkedArrayReader;
use vortex_serde::file::file_reader::FileReaderBuilder;
use vortex_serde::io::{ObjectStoreExt, TokioAdapter, VortexReadAt, VortexWrite};
use vortex_serde::writer::ArrayWriter;
use vortex_serde::MessageReader;

use crate::{COMPRESSORS, CTX};

Expand Down Expand Up @@ -153,7 +154,7 @@ pub async fn read_vortex_footer_format<R: VortexReadAt>(
ChunkedArrayReader::try_new(
reader,
CTX.clone(),
dtype,
dtype.into(),
PrimitiveArray::from(footer.byte_offsets).into_array(),
PrimitiveArray::from(footer.row_offsets).into_array(),
)
Expand All @@ -177,10 +178,24 @@ pub async fn take_vortex_object_store<O: ObjectStore>(
pub async fn take_vortex_tokio(path: &Path, indices: &[u64]) -> VortexResult<Array> {
let len = File::open(path)?.metadata()?.len();
let indices_array = indices.to_vec().into_array();
let taken = read_vortex_footer_format(TokioAdapter(tokio::fs::File::open(path).await?), len)

let file = TokioAdapter(tokio::fs::File::open(path).await?);
let reader = FileReaderBuilder::new(file).with_length(len).build();

let data = reader
.into_stream()
.await?
.take_rows(&indices_array)
.await?;
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<VortexResult<Vec<_>>>()?;

let dtype = data[0].dtype().clone();

let data = ChunkedArray::try_new(data, dtype)?.into_array();

let taken = take(&data, &indices_array)?;
robert3005 marked this conversation as resolved.
Show resolved Hide resolved

// For equivalence.... we flatten to make sure we're not cheating too much.
Ok(taken.into_canonical()?.into_array())
}
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/src/taxi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::path::PathBuf;
use futures::executor::block_on;
use vortex_buffer::io_buf::IoBuf;
use vortex_error::VortexError;
use vortex_ipc::io::VortexWrite;
use vortex_serde::io::VortexWrite;

use crate::data_downloads::{data_vortex_uncompressed, download_data};
use crate::reader::rewrite_parquet_as_vortex;
Expand Down
9 changes: 1 addition & 8 deletions vortex-array/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@ impl StructArray {
self.len(),
))
}
}

impl<'a> StructArray {
pub fn children(&'a self) -> impl Iterator<Item = Array> + '_ {
pub fn children(&self) -> impl Iterator<Item = Array> + '_ {
(0..self.nfields()).map(move |idx| self.field(idx).unwrap())
}
}

impl StructArray {
pub fn try_new(
names: FieldNames,
fields: Vec<Array>,
Expand Down Expand Up @@ -85,11 +81,8 @@ impl StructArray {
Self::try_new(FieldNames::from(names), fields, len, Validity::NonNullable)
.expect("building StructArray with helper")
}
}

impl StructArray {
// TODO(aduffy): Add equivalent function to support field masks for nested column access.

/// Return a new StructArray with the given projection applied.
///
/// Projection does not copy data arrays. Projection is defined by an ordinal array slice
Expand Down
8 changes: 1 addition & 7 deletions vortex-array/src/array/varbin/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@ impl ArrayCompute for VarBinArray {
impl ScalarAtFn for VarBinArray {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
if self.is_valid(index) {
Ok(varbin_scalar(
self.bytes_at(index)?
// TODO(ngates): update to use buffer when we refactor scalars.
.into_vec()
.unwrap_or_else(|b| b.as_ref().to_vec()),
self.dtype(),
))
Ok(varbin_scalar(self.bytes_at(index)?, self.dtype()))
} else {
Ok(Scalar::null(self.dtype().clone()))
}
Expand Down
7 changes: 3 additions & 4 deletions vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,11 @@ impl<'a> FromIterator<Option<&'a str>> for VarBinArray {
}
}

pub fn varbin_scalar(value: Vec<u8>, dtype: &DType) -> Scalar {
pub fn varbin_scalar(value: Buffer, dtype: &DType) -> Scalar {
if matches!(dtype, DType::Utf8(_)) {
let str = unsafe { String::from_utf8_unchecked(value) };
Scalar::utf8(str, dtype.nullability())
Scalar::try_utf8(value, dtype.nullability()).unwrap()
} else {
Scalar::binary(value.into(), dtype.nullability())
Scalar::binary(value, dtype.nullability())
}
}

Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/array/varbin/stats.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::Ordering;
use std::collections::HashMap;

use vortex_buffer::Buffer;
use vortex_dtype::DType;
use vortex_error::VortexResult;

Expand Down Expand Up @@ -101,8 +102,8 @@ impl<'a> VarBinAccumulator<'a> {

pub fn finish(&self, dtype: &DType) -> StatsSet {
StatsSet::from(HashMap::from([
(Stat::Min, varbin_scalar(self.min.to_vec(), dtype)),
(Stat::Max, varbin_scalar(self.max.to_vec(), dtype)),
(Stat::Min, varbin_scalar(Buffer::from(self.min), dtype)),
(Stat::Max, varbin_scalar(Buffer::from(self.max), dtype)),
(Stat::RunCount, self.runs.into()),
(Stat::IsSorted, self.is_sorted.into()),
(Stat::IsStrictSorted, self.is_strict_sorted.into()),
Expand Down
3 changes: 2 additions & 1 deletion vortex-array/src/array/varbinview/compute.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use vortex_buffer::Buffer;
use vortex_error::VortexResult;
use vortex_scalar::Scalar;

Expand All @@ -22,7 +23,7 @@ impl ScalarAtFn for VarBinViewArray {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
if self.is_valid(index) {
self.bytes_at(index)
.map(|bytes| varbin_scalar(bytes, self.dtype()))
.map(|bytes| varbin_scalar(Buffer::from(bytes), self.dtype()))
} else {
Ok(Scalar::null(self.dtype().clone()))
}
Expand Down
7 changes: 7 additions & 0 deletions vortex-dtype/src/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ impl DType {
pub fn is_boolean(&self) -> bool {
matches!(self, Bool(_))
}

pub fn as_struct(&self) -> Option<&StructDType> {
match self {
Struct(s, _) => Some(s),
_ => None,
}
}
}

impl Display for DType {
Expand Down
Loading
Loading