Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sum statistics #1661

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 118 additions & 87 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ clap = "4.5.13"
compio = "0.13"
criterion = { version = "0.5.1", features = ["html_reports"] }
croaring = "2.1.0"
datafusion = { version = "43.0.0", default-features = false }
datafusion-common = "43.0.0"
datafusion-execution = "43.0.0"
datafusion-expr = "43.0.0"
datafusion-physical-expr = "43.0.0"
datafusion-physical-plan = "43.0.0"
datafusion = { version = "43.0.0", default-features = false, git = "https://github.com/apache/datafusion.git", rev = "refs/pull/13736/head" }
datafusion-common = { version = "43.0.0", git = "https://github.com/apache/datafusion.git", rev = "refs/pull/13736/head" }
datafusion-execution = { version = "43.0.0", git = "https://github.com/apache/datafusion.git", rev = "refs/pull/13736/head" }
datafusion-expr = { version = "43.0.0", git = "https://github.com/apache/datafusion.git", rev = "refs/pull/13736/head" }
datafusion-physical-expr = { version = "43.0.0", git = "https://github.com/apache/datafusion.git", rev = "refs/pull/13736/head" }
datafusion-physical-plan = { version = "43.0.0", git = "https://github.com/apache/datafusion.git", rev = "refs/pull/13736/head" }
divan = "0.1.14"
enum-iterator = "2.0.0"
enum-map = "2.7.3"
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/benches/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn benchmark(c: &mut Criterion) {
let context = session_context.clone();

runtime.block_on(async move {
clickbench::register_vortex_files(&context, "hits", basepath.as_path(), &HITS_SCHEMA)
clickbench::register_vortex_files(context, "hits", basepath.as_path(), &HITS_SCHEMA)
.await
.unwrap();
});
Expand Down
4 changes: 2 additions & 2 deletions bench-vortex/src/bin/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn main() {
// The clickbench-provided file is missing some higher-level type info, so we reprocess it
// to add that info, see https://github.com/ClickHouse/ClickBench/issues/7.
(0_u32..100).into_par_iter().for_each(|idx| {
let output_path = basepath.join(format!("hits_{idx}.parquet"));
let output_path = basepath.join("parquet").join(format!("hits_{idx}.parquet"));
idempotent(&output_path, |output_path| {
eprintln!("Fixing parquet file {idx}");

Expand Down Expand Up @@ -137,7 +137,7 @@ fn main() {
} => {
runtime.block_on(async {
clickbench::register_vortex_files(
&context,
context.clone(),
"hits",
basepath.as_path(),
&HITS_SCHEMA,
Expand Down
169 changes: 93 additions & 76 deletions bench-vortex/src/clickbench.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock};

use arrow_schema::{DataType, Field, Schema, TimeUnit};
Expand All @@ -7,6 +7,7 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use futures::{stream, StreamExt, TryStreamExt};
use tokio::fs::{create_dir_all, OpenOptions};
use vortex::aliases::hash_map::HashMap;
use vortex::array::{ChunkedArray, StructArray};
Expand Down Expand Up @@ -140,99 +141,112 @@ pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
});

pub async fn register_vortex_files(
session: &SessionContext,
session: SessionContext,
table_name: &str,
input_path: &Path,
schema: &Schema,
) -> anyhow::Result<()> {
let vortex_dir = input_path.parent().unwrap().join("vortex_compressed");
let session2 = session.clone();
let vortex_dir = input_path.join("vortex");
create_dir_all(&vortex_dir).await?;

for idx in 0..100 {
let parquet_file_path = input_path.join(format!("hits_{idx}.parquet"));
let output_path = vortex_dir.join(format!("hits_{idx}.{VORTEX_FILE_EXTENSION}"));
idempotent_async(&output_path, |vtx_file| async move {
eprintln!("Processing file {idx}");
let record_batches = session
.read_parquet(
parquet_file_path.to_str().unwrap(),
ParquetReadOptions::default(),
)
.await?
.collect()
.await?;
let format = Arc::new(VortexFormat::new(&CTX));
let table_path = vortex_dir
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
let table_path = format!("file://{table_path}/");
let table_url = ListingTableUrl::parse(table_path)?;

// Create a ChunkedArray from the set of chunks.
let sts = record_batches
.into_iter()
.map(ArrayData::try_from)
.map(|a| a.unwrap().into_struct().unwrap())
.collect::<Vec<_>>();
let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format as _))
.with_schema(schema.clone().into());

let mut arrays_map: HashMap<Arc<str>, Vec<ArrayData>> = HashMap::default();
let mut types_map: HashMap<Arc<str>, DType> = HashMap::default();
let listing_table = Arc::new(ListingTable::try_new(config)?);
session2.register_table(table_name, listing_table as _)?;

for st in sts.into_iter() {
let struct_dtype = st.dtype().as_struct().unwrap();
let names = struct_dtype.names().iter();
let types = struct_dtype.dtypes().iter();
let _paths: Vec<PathBuf> = stream::iter(0..100)
.map(|idx| {
let parquet_file_path = input_path
.join("parquet")
.join(format!("hits_{idx}.parquet"));
let output_path = vortex_dir.join(format!("hits_{idx}.{VORTEX_FILE_EXTENSION}"));
let session = session.clone();
let schema = schema.clone();

for (field_name, field_type) in names.zip(types) {
let val = arrays_map.entry(field_name.clone()).or_default();
val.push(st.field_by_name(field_name.as_ref()).unwrap());
tokio::spawn(async move {
let output_path = output_path.clone();
idempotent_async(&output_path, move |vtx_file| async move {
eprintln!("Processing file {idx}");
let record_batches = session
.read_parquet(
parquet_file_path.to_str().unwrap(),
ParquetReadOptions::default(),
)
.await?
.collect()
.await?;

types_map.insert(field_name.clone(), field_type.clone());
}
}
// Create a ChunkedArray from the set of chunks.
let sts = record_batches
.into_iter()
.map(ArrayData::try_from)
.map(|a| a.unwrap().into_struct().unwrap())
.collect::<Vec<_>>();

let fields = schema
.fields()
.iter()
.map(|field| {
let name: Arc<str> = field.name().as_str().into();
let dtype = types_map[&name].clone();
let chunks = arrays_map.remove(&name).unwrap();
let chunked_child = ChunkedArray::try_new(chunks, dtype).unwrap();
let mut arrays_map: HashMap<Arc<str>, Vec<ArrayData>> = HashMap::default();
let mut types_map: HashMap<Arc<str>, DType> = HashMap::default();

(name, chunked_child.into_array())
})
.collect::<Vec<_>>();
for st in sts.into_iter() {
let struct_dtype = st.dtype().as_struct().unwrap();
let names = struct_dtype.names().iter();
let types = struct_dtype.dtypes().iter();

let data = StructArray::from_fields(&fields)?.into_array();
for (field_name, field_type) in names.zip(types) {
let val = arrays_map.entry(field_name.clone()).or_default();
val.push(st.field_by_name(field_name.as_ref()).unwrap());

let compressor = SamplingCompressor::default();
let data = compressor.compress(&data, None)?.into_array();
types_map.insert(field_name.clone(), field_type.clone());
}
}

let f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&vtx_file)
.await?;
let fields = schema
.fields()
.iter()
.map(|field| {
let name: Arc<str> = field.name().as_str().into();
let dtype = types_map[&name].clone();
let chunks = arrays_map.remove(&name).unwrap();
let chunked_child = ChunkedArray::try_new(chunks, dtype).unwrap();

let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;
(name, chunked_child.into_array())
})
.collect::<Vec<_>>();

anyhow::Ok(())
})
.await?;
}
let data = StructArray::from_fields(&fields)?.into_array();

let format = Arc::new(VortexFormat::new(&CTX));
let table_path = vortex_dir
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
let table_path = format!("file://{table_path}/");
let table_url = ListingTableUrl::parse(table_path)?;
let compressor = SamplingCompressor::default();
let data = compressor.compress(&data, None)?.into_array();

let config = ListingTableConfig::new(table_url)
.with_listing_options(ListingOptions::new(format as _))
.with_schema(schema.clone().into());
let f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&vtx_file)
.await?;

let listing_table = Arc::new(ListingTable::try_new(config)?);
let mut writer = VortexFileWriter::new(f);
writer = writer.write_array_columns(data).await?;
writer.finalize().await?;

session.register_table(table_name, listing_table as _)?;
anyhow::Ok(())
})
.await
.expect("Failed to write Vortex file")
})
})
.buffered(16)
.try_collect::<Vec<_>>()
.await?;

Ok(())
}
Expand All @@ -244,10 +258,13 @@ pub async fn register_parquet_files(
schema: &Schema,
) -> anyhow::Result<()> {
let format = Arc::new(ParquetFormat::new());
let table_path = input_path
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?;
let table_path = format!("file://{table_path}/");
let table_path = input_path.join("parquet");
let table_path = format!(
"file://{}/",
table_path
.to_str()
.ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?
);
let table_url = ListingTableUrl::parse(table_path)?;

let config = ListingTableConfig::new(table_url)
Expand Down
32 changes: 31 additions & 1 deletion vortex-array/src/array/primitive/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use core::marker::PhantomData;
use std::cmp::Ordering;
use std::mem::size_of;

use arrow_array::ArrowNativeTypeOp;
use arrow_buffer::buffer::BooleanBuffer;
use itertools::{Itertools as _, MinMaxResult};
use num_traits::PrimInt;
use vortex_dtype::half::f16;
use vortex_dtype::{match_each_native_ptype, DType, NativePType, Nullability};
use vortex_dtype::{match_each_native_ptype, DType, NativePType, Nullability, PType};
use vortex_error::{vortex_panic, VortexResult};
use vortex_scalar::Scalar;

Expand Down Expand Up @@ -71,6 +72,9 @@ impl<T: PStatsType> StatisticsVTable<[T]> for PrimitiveEncoding {
);
stats
}
Stat::Sum => sum_of::<T>(array)
.map(|s| StatsSet::of(stat, s))
.unwrap_or_else(StatsSet::default),
Stat::IsConstant => {
let first = array[0];
let is_constant = array.iter().all(|x| first.is_eq(*x));
Expand All @@ -90,6 +94,32 @@ impl<T: PStatsType> StatisticsVTable<[T]> for PrimitiveEncoding {
}
}

fn sum_of<T: NativePType>(values: &[T]) -> Option<Scalar> {
match T::PTYPE {
PType::I8 | PType::I16 | PType::I32 | PType::I64 => {
let mut sum: i64 = 0;
for v in values {
sum = sum.checked_add(v.to_i64()?)?;
}
Some(Scalar::from(sum))
}
PType::U8 | PType::U16 | PType::U32 | PType::U64 => {
let mut sum: u64 = 0;
for v in values {
sum = sum.checked_add(v.to_u64()?)?;
}
Some(Scalar::from(sum))
}
PType::F16 | PType::F32 | PType::F64 => {
let mut sum: f64 = 0.0;
for v in values {
sum = sum.add_checked(v.to_f64()?).ok()?;
}
Some(Scalar::from(sum))
}
}
}

struct NullableValues<'a, T: PStatsType>(&'a [T], &'a BooleanBuffer);

impl<T: PStatsType> StatisticsVTable<NullableValues<'_, T>> for PrimitiveEncoding {
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/array/varbin/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub fn compute_varbin_statistics<T: ArrayTrait + ArrayAccessor<[u8]>>(
}
}
Stat::Min | Stat::Max => compute_min_max(array)?,
Stat::Sum => StatsSet::default(),
Stat::IsSorted => {
let is_sorted = array.with_iterator(|iter| iter.flatten().is_sorted())?;
let mut stats = StatsSet::of(Stat::IsSorted, is_sorted);
Expand Down
12 changes: 12 additions & 0 deletions vortex-array/src/data/viewed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ impl Statistics for ViewedArrayData {
min.and_then(|v| ScalarValue::try_from(v).ok())
.map(|v| Scalar::new(self.dtype.clone(), v))
}
Stat::Sum => {
let sum = self.flatbuffer().stats()?.sum();
sum.and_then(|v| ScalarValue::try_from(v).ok()).map(|v| {
Scalar::new(
DType::Primitive(
PType::try_from(&self.dtype).unwrap().to_widest(),
self.dtype.nullability(),
),
v,
)
})
}
Stat::IsConstant => self.flatbuffer().stats()?.is_constant().map(bool::into),
Stat::IsSorted => self.flatbuffer().stats()?.is_sorted().map(bool::into),
Stat::IsStrictSorted => self
Expand Down
5 changes: 5 additions & 0 deletions vortex-array/src/stats/flatbuffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ impl WriteFlatBuffer for &dyn Statistics {
.get(Stat::Max)
.map(|max| max.into_value().write_flatbuffer(fbb));

let sum = self
.get(Stat::Sum)
.map(|sum| sum.into_value().write_flatbuffer(fbb));

let stat_args = &crate::flatbuffers::ArrayStatsArgs {
min,
max,
sum,
is_sorted: self.get_as::<bool>(Stat::IsSorted),
is_strict_sorted: self.get_as::<bool>(Stat::IsStrictSorted),
is_constant: self.get_as::<bool>(Stat::IsConstant),
Expand Down
Loading
Loading