Skip to content

Commit

Permalink
Fix vortex compressed benchmarks (#577)
Browse files Browse the repository at this point in the history
  • Loading branch information
robert3005 authored Aug 9, 2024
1 parent a602498 commit a1c1da7
Show file tree
Hide file tree
Showing 22 changed files with 168 additions and 25 deletions.
25 changes: 19 additions & 6 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::fs;
use std::fs::create_dir_all;
use std::path::Path;
use std::sync::Arc;

Expand All @@ -12,8 +13,9 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use tokio::fs::OpenOptions;
use vortex::array::{ChunkedArray, StructArray};
use vortex::arrow::FromArrowArray;
use vortex::compress::CompressionStrategy;
use vortex::variants::StructArrayTrait;
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
use vortex::{Array, ArrayDType, Context, IntoArray, IntoArrayVariant};
use vortex_datafusion::memory::VortexMemTableOptions;
use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::SessionContextExt;
Expand Down Expand Up @@ -194,12 +196,16 @@ async fn register_vortex_file(
schema: &Schema,
enable_compression: bool,
) -> anyhow::Result<()> {
let path = if enable_compression {
file.with_extension("").with_extension("vtxcmp")
let vortex_dir = file.parent().unwrap().join(if enable_compression {
"vortex_compressed"
} else {
file.with_extension("").with_extension("vtxucmp")
};
let vtx_file = idempotent_async(&path, |vtx_file| async move {
"vortex_uncompressed"
});
create_dir_all(&vortex_dir)?;
let output_file = &vortex_dir
.join(file.file_name().unwrap())
.with_extension("vxf");
let vtx_file = idempotent_async(output_file, |vtx_file| async move {
let record_batches = session
.read_csv(
file.to_str().unwrap(),
Expand Down Expand Up @@ -276,6 +282,12 @@ async fn register_vortex_file(
})
.await?;

let ctx = if enable_compression {
Arc::new(Context::default().with_encodings(SamplingCompressor::default().used_encodings()))
} else {
Arc::new(Context::default())
};

let f = OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -294,6 +306,7 @@ async fn register_vortex_file(
vtx_file.to_str().unwrap().to_string(),
file_size,
)],
ctx,
),
)?;

Expand Down
8 changes: 6 additions & 2 deletions encodings/zigzag/src/zigzag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ use vortex::validity::{ArrayValidity, LogicalValidity};
use vortex::variants::{ArrayVariants, PrimitiveArrayTrait};
use vortex::visitor::{AcceptArrayVisitor, ArrayVisitor};
use vortex::{
impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, Canonical, IntoArray, IntoCanonical,
impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, Canonical, IntoArray, IntoArrayVariant,
IntoCanonical,
};
use vortex_dtype::{DType, PType};
use vortex_error::{vortex_bail, vortex_err, VortexResult};

use crate::compress::zigzag_encode;
use crate::zigzag_decode;

impl_encoding!("vortex.zigzag", 21u16, ZigZag);

Expand Down Expand Up @@ -83,6 +85,8 @@ impl ArrayStatisticsCompute for ZigZagArray {}

impl IntoCanonical for ZigZagArray {
fn into_canonical(self) -> VortexResult<Canonical> {
todo!("ZigZagArray::flatten")
Ok(Canonical::Primitive(zigzag_decode(
&self.encoded().into_primitive()?,
)))
}
}
5 changes: 5 additions & 0 deletions vortex-array/src/compress.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use std::collections::HashSet;

use vortex_error::VortexResult;

use crate::encoding::EncodingRef;
use crate::Array;

pub trait CompressionStrategy {
fn compress(&self, array: &Array) -> VortexResult<Array>;

fn used_encodings(&self) -> HashSet<EncodingRef>;
}

/// Check that compression did not alter the length of the validity array.
Expand Down
3 changes: 2 additions & 1 deletion vortex-datafusion/examples/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::fs::OpenOptions;
use url::Url;
use vortex::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray};
use vortex::validity::Validity;
use vortex::IntoArray;
use vortex::{Context, IntoArray};
use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::persistent::provider::VortexFileTableProvider;
use vortex_serde::layouts::writer::LayoutWriter;
Expand Down Expand Up @@ -66,6 +66,7 @@ async fn main() -> anyhow::Result<()> {
Field::new("numbers", DataType::UInt32, false),
])),
vec![VortexFile::new(p, file_size)],
Arc::new(Context::default()),
);

let provider = Arc::new(VortexFileTableProvider::try_new(url, config)?);
Expand Down
7 changes: 6 additions & 1 deletion vortex-datafusion/src/persistent/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::sync::Arc;

use arrow_schema::SchemaRef;
use chrono::TimeZone as _;
use datafusion::datasource::listing::PartitionedFile;
use object_store::path::Path;
use object_store::ObjectMeta;
use vortex::Context;

#[derive(Clone)]
pub struct VortexFile {
Expand Down Expand Up @@ -33,13 +36,15 @@ impl VortexFile {
pub struct VortexTableOptions {
pub(crate) data_files: Vec<VortexFile>,
pub(crate) schema: Option<SchemaRef>,
pub(crate) ctx: Arc<Context>,
}

impl VortexTableOptions {
pub fn new(schema: SchemaRef, data_files: Vec<VortexFile>) -> Self {
pub fn new(schema: SchemaRef, data_files: Vec<VortexFile>, ctx: Arc<Context>) -> Self {
Self {
data_files,
schema: Some(schema),
ctx,
}
}
}
5 changes: 5 additions & 0 deletions vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
};
use vortex::Context;

use crate::persistent::opener::VortexFileOpener;

Expand All @@ -18,6 +19,7 @@ pub struct VortexExec {
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
plan_properties: PlanProperties,
ctx: Arc<Context>,
}

impl VortexExec {
Expand All @@ -26,6 +28,7 @@ impl VortexExec {
metrics: ExecutionPlanMetricsSet,
projection: Option<&Vec<usize>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
ctx: Arc<Context>,
) -> DFResult<Self> {
let projected_schema = project_schema(&file_scan_config.file_schema, projection)?;
let plan_properties = PlanProperties::new(
Expand All @@ -39,6 +42,7 @@ impl VortexExec {
metrics,
predicate,
plan_properties,
ctx,
})
}
pub(crate) fn into_arc(self) -> Arc<dyn ExecutionPlan> {
Expand Down Expand Up @@ -88,6 +92,7 @@ impl ExecutionPlan for VortexExec {
.runtime_env()
.object_store(&self.file_scan_config.object_store_url)?;
let opener = VortexFileOpener {
ctx: self.ctx.clone(),
object_store,
projection: self.file_scan_config.projection.clone(),
batch_size: None,
Expand Down
9 changes: 7 additions & 2 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ use datafusion_common::Result as DFResult;
use datafusion_physical_expr::PhysicalExpr;
use futures::{FutureExt as _, TryStreamExt};
use object_store::ObjectStore;
use vortex::Context;
use vortex_serde::io::ObjectStoreReadAt;
use vortex_serde::layouts::reader::builder::VortexLayoutReaderBuilder;
use vortex_serde::layouts::reader::context::LayoutDeserializer;
use vortex_serde::layouts::reader::context::{LayoutContext, LayoutDeserializer};
use vortex_serde::layouts::reader::projections::Projection;

pub struct VortexFileOpener {
pub ctx: Arc<Context>,
pub object_store: Arc<dyn ObjectStore>,
pub batch_size: Option<usize>,
pub projection: Option<Vec<usize>>,
Expand All @@ -23,7 +25,10 @@ impl FileOpener for VortexFileOpener {
let read_at =
ObjectStoreReadAt::new(self.object_store.clone(), file_meta.location().clone());

let mut builder = VortexLayoutReaderBuilder::new(read_at, LayoutDeserializer::default());
let mut builder = VortexLayoutReaderBuilder::new(
read_at,
LayoutDeserializer::new(self.ctx.clone(), Arc::new(LayoutContext::default())),
);

if let Some(batch_size) = self.batch_size {
builder = builder.with_batch_size(batch_size);
Expand Down
10 changes: 8 additions & 2 deletions vortex-datafusion/src/persistent/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,14 @@ impl TableProvider for VortexFileTableProvider {
)
.with_projection(projection.cloned());

let exec =
VortexExec::try_new(file_scan_config, metrics, projection, predicate)?.into_arc();
let exec = VortexExec::try_new(
file_scan_config,
metrics,
projection,
predicate,
self.config.ctx.clone(),
)?
.into_arc();

Ok(exec)
}
Expand Down
9 changes: 8 additions & 1 deletion vortex-sampling-compressor/src/compressors/alp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::collections::HashSet;

use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_alp::{alp_encode_components, match_each_alp_float_ptype, ALPArray, ALP};
use vortex_alp::{alp_encode_components, match_each_alp_float_ptype, ALPArray, ALPEncoding, ALP};
use vortex_dtype::PType;
use vortex_error::VortexResult;

Expand Down Expand Up @@ -70,4 +73,8 @@ impl EncodingCompressor for ALPCompressor {
)),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&ALPEncoding as EncodingRef])
}
}
8 changes: 8 additions & 0 deletions vortex-sampling-compressor/src/compressors/bitpacked.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::collections::HashSet;

use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::stats::ArrayStatistics;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_error::{vortex_err, VortexResult};
use vortex_fastlanes::{
bitpack, bitpack_patches, count_exceptions, find_best_bit_width, BitPacked, BitPackedArray,
BitPackedEncoding,
};

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
Expand Down Expand Up @@ -83,4 +87,8 @@ impl EncodingCompressor for BitPackedCompressor {
)),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&BitPackedEncoding as EncodingRef])
}
}
9 changes: 8 additions & 1 deletion vortex-sampling-compressor/src/compressors/constant.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use vortex::array::{Constant, ConstantArray};
use std::collections::HashSet;

use vortex::array::{Constant, ConstantArray, ConstantEncoding};
use vortex::compute::unary::scalar_at;
use vortex::encoding::EncodingRef;
use vortex::stats::ArrayStatistics;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_error::VortexResult;
Expand Down Expand Up @@ -31,4 +34,8 @@ impl EncodingCompressor for ConstantCompressor {
Some(CompressionTree::flat(self)),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&ConstantEncoding as EncodingRef])
}
}
11 changes: 10 additions & 1 deletion vortex-sampling-compressor/src/compressors/date_time_parts.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::collections::HashSet;

use vortex::array::temporal::TemporalMetadata;
use vortex::array::TemporalArray;
use vortex::encoding::EncodingRef;
use vortex::{Array, ArrayDType, ArrayDef, IntoArray};
use vortex_datetime_parts::{compress_temporal, DateTimeParts, DateTimePartsArray};
use vortex_datetime_parts::{
compress_temporal, DateTimeParts, DateTimePartsArray, DateTimePartsEncoding,
};
use vortex_error::VortexResult;

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
Expand Down Expand Up @@ -58,4 +63,8 @@ impl EncodingCompressor for DateTimePartsCompressor {
)),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&DateTimePartsEncoding as EncodingRef])
}
}
9 changes: 8 additions & 1 deletion vortex-sampling-compressor/src/compressors/delta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashSet;

use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_error::VortexResult;
use vortex_fastlanes::{delta_compress, Delta, DeltaArray};
use vortex_fastlanes::{delta_compress, Delta, DeltaArray, DeltaEncoding};

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;
Expand Down Expand Up @@ -51,4 +54,8 @@ impl EncodingCompressor for DeltaCompressor {
Some(CompressionTree::new(self, vec![bases.path, deltas.path])),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&DeltaEncoding as EncodingRef])
}
}
9 changes: 8 additions & 1 deletion vortex-sampling-compressor/src/compressors/dict.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashSet;

use vortex::array::{Primitive, PrimitiveArray, VarBin, VarBinArray};
use vortex::encoding::EncodingRef;
use vortex::stats::ArrayStatistics;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray};
use vortex_dict::{dict_encode_primitive, dict_encode_varbin, Dict, DictArray, DictEncoding};
use vortex_error::VortexResult;

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
Expand Down Expand Up @@ -69,4 +72,8 @@ impl EncodingCompressor for DictCompressor {
Some(CompressionTree::new(self, vec![codes.path, values.path])),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&DictEncoding as EncodingRef])
}
}
9 changes: 8 additions & 1 deletion vortex-sampling-compressor/src/compressors/for.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::collections::HashSet;

use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::stats::{trailing_zeros, ArrayStatistics};
use vortex::validity::ArrayValidity;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_dtype::match_each_integer_ptype;
use vortex_error::VortexResult;
use vortex_fastlanes::{for_compress, FoR, FoRArray};
use vortex_fastlanes::{for_compress, FoR, FoRArray, FoREncoding};

use crate::compressors::{CompressedArray, CompressionTree, EncodingCompressor};
use crate::SamplingCompressor;
Expand Down Expand Up @@ -60,4 +63,8 @@ impl EncodingCompressor for FoRCompressor {
Some(CompressionTree::new(self, vec![compressed_child.path])),
))
}

fn used_encodings(&self) -> HashSet<EncodingRef> {
HashSet::from([&FoREncoding as EncodingRef])
}
}
Loading

0 comments on commit a1c1da7

Please sign in to comment.