Skip to content

Commit

Permalink
Sum statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn committed Dec 11, 2024
1 parent a7f6f6b commit de85af3
Show file tree
Hide file tree
Showing 24 changed files with 533 additions and 331 deletions.
184 changes: 97 additions & 87 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ clap = "4.5.13"
compio = "0.13"
criterion = { version = "0.5.1", features = ["html_reports"] }
croaring = "2.1.0"
datafusion = { version = "43.0.0", default-features = false }
datafusion-common = "43.0.0"
datafusion-execution = "43.0.0"
datafusion-expr = "43.0.0"
datafusion-physical-expr = "43.0.0"
datafusion-physical-plan = "43.0.0"
datafusion = { version = "43.0.0", default-features = false, path = "../datafusion/datafusion/core" }
datafusion-common = { version = "43.0.0", path = "../datafusion/datafusion/common" }
datafusion-execution = { version = "43.0.0", path = "../datafusion/datafusion/execution" }
datafusion-expr = { version = "43.0.0", path = "../datafusion/datafusion/expr" }
datafusion-physical-expr = { version = "43.0.0", path = "../datafusion/datafusion/physical-expr" }
datafusion-physical-plan = { version = "43.0.0", path = "../datafusion/datafusion/physical-plan" }
divan = "0.1.14"
enum-iterator = "2.0.0"
enum-map = "2.7.3"
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/benches/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn benchmark(c: &mut Criterion) {
let context = session_context.clone();

runtime.block_on(async move {
clickbench::register_vortex_files(&context, "hits", basepath.as_path(), &HITS_SCHEMA)
clickbench::register_vortex_files(context, "hits", basepath.as_path(), &HITS_SCHEMA)
.await
.unwrap();
});
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/src/bin/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn main() {
// The clickbench-provided file is missing some higher-level type info, so we reprocess it
// to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.
(0_u32..100).into_par_iter().for_each(|idx| {
let output_path = basepath.join(format!("hits_{idx}.parquet"));
let output_path = basepath.join("parquet").join(format!("hits_{idx}.parquet"));
idempotent(&output_path, |output_path| {
eprintln!("Fixing parquet file {idx}");

Expand Down Expand Up @@ -137,7 +137,7 @@ fn main() {
} => {
runtime.block_on(async {
clickbench::register_vortex_files(
&context,
context.clone(),
"hits",
basepath.as_path(),
&HITS_SCHEMA,
Expand Down
169 changes: 93 additions & 76 deletions bench-vortex/src/clickbench.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock};

use arrow_schema::{DataType, Field, Schema, TimeUnit};
Expand All @@ -7,6 +7,7 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use futures::{stream, StreamExt, TryStreamExt};
use tokio::fs::{create_dir_all, OpenOptions};
use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, StructArray};
Expand Down Expand Up @@ -140,99 +141,112 @@ pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
});

pub async fn register_vortex_files(
session: &SessionContext,
session: SessionContext,
table_name: &str,
input_path: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
let vortex_dir = input_path.parent().unwrap().join("vortex_compressed");
let session2 = session.clone();
let vortex_dir = input_path.join("vortex");
create_dir_all(&vortex_dir).await?;

for idx in 0..100 {
let parquet_file_path = input_path.join(format!("hits_{idx}.parquet"));
let output_path = vortex_dir.join(format!("hits_{idx}.{VORTEX_FILE_EXTENSION}"));
idempotent_async(&output_path, |vtx_file| async move {
eprintln!("Processing file {idx}");
let record_batches = session
.read_parquet(
parquet_file_path.to_str().unwrap(),
ParquetReadOptions::default(),
)
.await?
.collect()
.await?;
let format = Arc::new(VortexFormat::new(&CTX));
let table_path = vortex_dir
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
let table_path = format!("file://{table_path}/");
let table_url = ListingTableUrl::parse(table_path)?;

// Create a ChunkedArray from the set of chunks.
let sts = record_batches
.into_iter()
.map(ArrayData::try_from)
.map(|a| a.unwrap().into_struct().unwrap())
.collect::<Vec<_>>();
let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format as _))
.with_schema(schema.clone().into());

let mut arrays_map: HashMap<Arc<str>, Vec<ArrayData>> = HashMap::default();
let mut types_map: HashMap<Arc<str>, DType> = HashMap::default();
let listing_table = Arc::new(ListingTable::try_new(config)?);
session2.register_table(table_name, listing_table as _)?;

for st in sts.into_iter() {
let struct_dtype = st.dtype().as_struct().unwrap();
let names = struct_dtype.names().iter();
let types = struct_dtype.dtypes().iter();
let _paths: Vec<PathBuf> = stream::iter(0..100)
.map(|idx| {
let parquet_file_path = input_path
.join("parquet")
.join(format!("hits_{idx}.parquet"));
let output_path = vortex_dir.join(format!("hits_{idx}.{VORTEX_FILE_EXTENSION}"));
let session = session.clone();
let schema = schema.clone();

for (field_name, field_type) in names.zip(types) {
let val = arrays_map.entry(field_name.clone()).or_default();
val.push(st.field_by_name(field_name.as_ref()).unwrap());
tokio::spawn(async move {
let output_path = output_path.clone();
idempotent_async(&output_path, move |vtx_file| async move {
eprintln!("Processing file {idx}");
let record_batches = session
.read_parquet(
parquet_file_path.to_str().unwrap(),
ParquetReadOptions::default(),
)
.await?
.collect()
.await?;

types_map.insert(field_name.clone(), field_type.clone());
}
}
// Create a ChunkedArray from the set of chunks.
let sts = record_batches
.into_iter()
.map(ArrayData::try_from)
.map(|a| a.unwrap().into_struct().unwrap())
.collect::<Vec<_>>();

let fields = schema
.fields()
.iter()
.map(|field| {
let name: Arc<str> = field.name().as_str().into();
let dtype = types_map[&name].clone();
let chunks = arrays_map.remove(&name).unwrap();
let chunked_child = ChunkedArray::try_new(chunks, dtype).unwrap();
let mut arrays_map: HashMap<Arc<str>, Vec<ArrayData>> = HashMap::default();
let mut types_map: HashMap<Arc<str>, DType> = HashMap::default();

(name, chunked_child.into_array())
})
.collect::<Vec<_>>();
for st in sts.into_iter() {
let struct_dtype = st.dtype().as_struct().unwrap();
let names = struct_dtype.names().iter();
let types = struct_dtype.dtypes().iter();

let data = StructArray::from_fields(&fields)?.into_array();
for (field_name, field_type) in names.zip(types) {
let val = arrays_map.entry(field_name.clone()).or_default();
val.push(st.field_by_name(field_name.as_ref()).unwrap());

let compressor = SamplingCompressor::default();
let data = compressor.compress(&data, None)?.into_array();
types_map.insert(field_name.clone(), field_type.clone());
}
}

let f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&vtx_file)
.await?;
let fields = schema
.fields()
.iter()
.map(|field| {
let name: Arc<str> = field.name().as_str().into();
let dtype = types_map[&name].clone();
let chunks = arrays_map.remove(&name).unwrap();
let chunked_child = ChunkedArray::try_new(chunks, dtype).unwrap();

let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;
(name, chunked_child.into_array())
})
.collect::<Vec<_>>();

anyhow::Ok(())
})
.await?;
}
let data = StructArray::from_fields(&fields)?.into_array();

let format = Arc::new(VortexFormat::new(&CTX));
let table_path = vortex_dir
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
let table_path = format!("file://{table_path}/");
let table_url = ListingTableUrl::parse(table_path)?;
let compressor = SamplingCompressor::default();
let data = compressor.compress(&data, None)?.into_array();

let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format as _))
.with_schema(schema.clone().into());
let f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&vtx_file)
.await?;

let listing_table = Arc::new(ListingTable::try_new(config)?);
let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;

session.register_table(table_name, listing_table as _)?;
anyhow::Ok(())
})
.await
.expect("Failed to write Vortex file")
})
})
.buffered(16)
.try_collect::<Vec<_>>()
.await?;

Ok(())
}
Expand All @@ -244,10 +258,13 @@ pub async fn register_parquet_files(
schema: &Schema,
) -> anyhow::Result<()> {
let format = Arc::new(ParquetFormat::new());
let table_path = input_path
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
let table_path = format!("file://{table_path}/");
let table_path = input_path.join("parquet");
let table_path = format!(
"file://{}/",
table_path
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?
);
let table_url = ListingTableUrl::parse(table_path)?;

let config = ListingTableConfig::new(table_url)
Expand Down
32 changes: 31 additions & 1 deletion vortex-array/src/array/primitive/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use core::marker::PhantomData;
use std::cmp::Ordering;
use std::mem::size_of;

use arrow_array::ArrowNativeTypeOp;
use arrow_buffer::buffer::BooleanBuffer;
use itertools::{Itertools as _, MinMaxResult};
use num_traits::PrimInt;
use vortex_dtype::half::f16;
use vortex_dtype::{match_each_native_ptype, DType, NativePType, Nullability};
use vortex_dtype::{match_each_native_ptype, DType, NativePType, Nullability, PType};
use vortex_error::{vortex_panic, VortexResult};
use vortex_scalar::Scalar;

Expand Down Expand Up @@ -71,6 +72,9 @@ impl<T: PStatsType> StatisticsVTable<[T]> for PrimitiveEncoding {
);
stats
}
Stat::Sum => sum_of::<T>(array)
.map(|s| StatsSet::of(stat, s))
.unwrap_or_else(StatsSet::default),
Stat::IsConstant => {
let first = array[0];
let is_constant = array.iter().all(|x| first.is_eq(*x));
Expand All @@ -90,6 +94,32 @@ impl<T: PStatsType> StatisticsVTable<[T]> for PrimitiveEncoding {
}
}

fn sum_of<T: NativePType>(values: &[T]) -> Option<Scalar> {
match T::PTYPE {
PType::I8 | PType::I16 | PType::I32 | PType::I64 => {
let mut sum: i64 = 0;
for v in values {
sum = sum.checked_add(v.to_i64()?)?;
}
Some(Scalar::from(sum))
}
PType::U8 | PType::U16 | PType::U32 | PType::U64 => {
let mut sum: u64 = 0;
for v in values {
sum = sum.checked_add(v.to_u64()?)?;
}
Some(Scalar::from(sum))
}
PType::F16 | PType::F32 | PType::F64 => {
let mut sum: f64 = 0.0;
for v in values {
sum = sum.add_checked(v.to_f64()?).ok()?;
}
Some(Scalar::from(sum))
}
}
}

struct NullableValues<'a, T: PStatsType>(&'a [T], &'a BooleanBuffer);

impl<T: PStatsType> StatisticsVTable<NullableValues<'_, T>> for PrimitiveEncoding {
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/array/varbin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub fn compute_varbin_statistics<T: ArrayTrait + ArrayAccessor<[u8]>>(
}
}
Stat::Min | Stat::Max => compute_min_max(array)?,
Stat::Sum => StatsSet::default(),
Stat::IsSorted => {
let is_sorted = array.with_iterator(|iter| iter.flatten().is_sorted())?;
let mut stats = StatsSet::of(Stat::IsSorted, is_sorted);
Expand Down
12 changes: 12 additions & 0 deletions vortex-array/src/data/viewed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ impl Statistics for ViewedArrayData {
min.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::Sum => {
let sum = self.flatbuffer().stats()?.sum();
sum.and_then(|v| ScalarValue::try_from(v).ok()).map(|v| {
Scalar::new(
DType::Primitive(
PType::try_from(&self.dtype).unwrap().to_widest(),
self.dtype.nullability(),
),
v,
)
})
}
Stat::IsConstant => self.flatbuffer().stats()?.is_constant().map(bool::into),
Stat::IsSorted => self.flatbuffer().stats()?.is_sorted().map(bool::into),
Stat::IsStrictSorted => self
Expand Down
5 changes: 5 additions & 0 deletions vortex-array/src/stats/flatbuffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ impl WriteFlatBuffer for &dyn Statistics {
.get(Stat::Max)
.map(|max| max.into_value().write_flatbuffer(fbb));

let sum = self
.get(Stat::Sum)
.map(|sum| sum.into_value().write_flatbuffer(fbb));

let stat_args = &crate::flatbuffers::ArrayStatsArgs {
min,
max,
sum,
is_sorted: self.get_as::<bool>(Stat::IsSorted),
is_strict_sorted: self.get_as::<bool>(Stat::IsStrictSorted),
is_constant: self.get_as::<bool>(Stat::IsConstant),
Expand Down
Loading

0 comments on commit de85af3

Please sign in to comment.