Skip to content

Commit

Permalink
Refactoring dyn Column
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Sep 2, 2022
1 parent 84e0c75 commit e550a98
Show file tree
Hide file tree
Showing 30 changed files with 374 additions and 447 deletions.
5 changes: 3 additions & 2 deletions examples/custom_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
// Of course, you can have a look at the tantivy's built-in collectors
// such as the `CountCollector` for more examples.

use std::sync::Arc;

use fastfield_codecs::Column;
// ---
// Importing tantivy...
use tantivy::collector::{Collector, SegmentCollector};
use tantivy::fastfield::DynamicFastFieldReader;
use tantivy::query::QueryParser;
use tantivy::schema::{Field, Schema, FAST, INDEXED, TEXT};
use tantivy::{doc, Index, Score, SegmentReader};
Expand Down Expand Up @@ -96,7 +97,7 @@ impl Collector for StatsCollector {
}

struct StatsSegmentCollector {
fast_field_reader: DynamicFastFieldReader<u64>,
fast_field_reader: Arc<dyn Column<u64>>,
stats: Stats,
}

Expand Down
1 change: 0 additions & 1 deletion examples/warmer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::cmp::Reverse;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock, Weak};

use fastfield_codecs::Column;
use tantivy::collector::TopDocs;
use tantivy::query::QueryParser;
use tantivy::schema::{Field, Schema, FAST, TEXT};
Expand Down
119 changes: 119 additions & 0 deletions fastfield_codecs/src/column.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::marker::PhantomData;

pub trait Column<T = u64> {
/// Return the value associated to the given idx.
///
Expand Down Expand Up @@ -42,8 +44,125 @@ pub trait Column<T = u64> {
fn max_value(&self) -> T;

fn num_vals(&self) -> u64;

/// Returns a iterator over the data
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = T> + 'a> {
Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
}
}

pub struct VecColumn<'a, T> {
values: &'a [T],
min_value: T,
max_value: T,
}

impl<'a, T: Copy + PartialOrd> Column<T> for VecColumn<'a, T> {
fn get_val(&self, position: u64) -> T {
self.values[position as usize]
}

fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = T> + 'b> {
Box::new(self.values.iter().copied())
}

fn min_value(&self) -> T {
self.min_value
}

fn max_value(&self) -> T {
self.max_value
}

fn num_vals(&self) -> u64 {
self.values.len() as u64
}
}

impl<'a, T: Copy + Ord + Default> From<&'a [T]> for VecColumn<'a, T> {
fn from(values: &'a [T]) -> Self {
if values.is_empty() {
return VecColumn {
min_value: T::default(),
max_value: T::default(),
values: &[],
};
}
let (min_value, max_value) = values[1..]
.iter()
.copied()
.fold((values[0], values[0]), |(min, max), val| {
(min.min(val), max.max(val))
});
Self {
values,
min_value,
max_value,
}
}
}

struct MonotonicMappingColumn<C, T, Input> {
from_column: C,
monotonic_mapping: T,
_phantom: PhantomData<Input>,
}

/// Creates a view of a column transformed by a monotonic mapping.
pub fn monotonic_map_column<C, T, Input, Output>(
from_column: C,
monotonic_mapping: T,
) -> impl Column<Output>
where
C: Column<Input>,
T: Fn(Input) -> Output,
{
MonotonicMappingColumn {
from_column,
monotonic_mapping,
_phantom: PhantomData,
}
}

impl<C, T, Input, Output> Column<Output> for MonotonicMappingColumn<C, T, Input>
where
C: Column<Input>,
T: Fn(Input) -> Output,
{
fn get_val(&self, idx: u64) -> Output {
let from_val = self.from_column.get_val(idx);
(self.monotonic_mapping)(from_val)
}

fn min_value(&self) -> Output {
let from_min_value = self.from_column.min_value();
(self.monotonic_mapping)(from_min_value)
}

fn max_value(&self) -> Output {
let from_max_value = self.from_column.max_value();
(self.monotonic_mapping)(from_max_value)
}

fn num_vals(&self) -> u64 {
self.from_column.num_vals()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_monotonic_mapping() {
let vals = &[1u64, 3u64][..];
let col = VecColumn::from(vals);
let mapped = monotonic_map_column(col, |el| el + 4);
assert_eq!(mapped.min_value(), 5u64);
assert_eq!(mapped.max_value(), 7u64);
assert_eq!(mapped.num_vals(), 2);
assert_eq!(mapped.num_vals(), 2);
assert_eq!(mapped.get_val(0), 5);
assert_eq!(mapped.get_val(0), 7);
}
}
45 changes: 8 additions & 37 deletions fastfield_codecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub mod linear;

mod column;

pub use self::column::Column;
pub use self::column::{monotonic_map_column, Column, VecColumn};

#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)]
#[repr(u8)]
Expand Down Expand Up @@ -56,12 +56,12 @@ impl FastFieldCodecType {

/// The FastFieldSerializerEstimate trait is required on all variants
/// of fast field compressions, to decide which one to choose.
pub trait FastFieldCodec {
pub trait FastFieldCodec: 'static {
/// A codex needs to provide a unique name and id, which is
/// used for debugging and de/serialization.
const CODEC_TYPE: FastFieldCodecType;

type Reader: Column<u64>;
type Reader: Column<u64> + 'static;

/// Reads the metadata and returns the CodecReader
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader>;
Expand Down Expand Up @@ -90,35 +90,6 @@ pub struct FastFieldStats {
pub num_vals: u64,
}

struct VecColum<'a>(&'a [u64]);
impl<'a> Column for VecColum<'a> {
fn get_val(&self, position: u64) -> u64 {
self.0[position as usize]
}

fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u64> + 'b> {
Box::new(self.0.iter().cloned())
}

fn min_value(&self) -> u64 {
self.0.iter().min().cloned().unwrap_or(0)
}

fn max_value(&self) -> u64 {
self.0.iter().max().cloned().unwrap_or(0)
}

fn num_vals(&self) -> u64 {
self.0.len() as u64
}
}

impl<'a> From<&'a [u64]> for VecColum<'a> {
fn from(data: &'a [u64]) -> Self {
Self(data)
}
}

#[cfg(test)]
mod tests {
use proptest::prelude::*;
Expand All @@ -133,10 +104,10 @@ mod tests {
data: &[u64],
name: &str,
) -> Option<(f32, f32)> {
let estimation = Codec::estimate(&VecColum::from(data))?;
let estimation = Codec::estimate(&VecColumn::from(data))?;

let mut out: Vec<u8> = Vec::new();
Codec::serialize(&mut out, &VecColum::from(data)).unwrap();
Codec::serialize(&mut out, &VecColumn::from(data)).unwrap();

let actual_compression = out.len() as f32 / (data.len() as f32 * 8.0);

Expand Down Expand Up @@ -233,7 +204,7 @@ mod tests {
#[test]
fn estimation_good_interpolation_case() {
let data = (10..=20000_u64).collect::<Vec<_>>();
let data: VecColum = data.as_slice().into();
let data: VecColumn = data.as_slice().into();

let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
assert_le!(linear_interpol_estimation, 0.01);
Expand All @@ -249,7 +220,7 @@ mod tests {
fn estimation_test_bad_interpolation_case() {
let data: &[u64] = &[200, 10, 10, 10, 10, 1000, 20];

let data: VecColum = data.into();
let data: VecColumn = data.into();
let linear_interpol_estimation = LinearCodec::estimate(&data).unwrap();
assert_le!(linear_interpol_estimation, 0.32);

Expand All @@ -259,8 +230,8 @@ mod tests {
#[test]
fn estimation_test_bad_interpolation_case_monotonically_increasing() {
let mut data: Vec<u64> = (200..=20000_u64).collect();
let data: VecColumn = data.as_slice().into();
data.push(1_000_000);
let data: VecColum = data.as_slice().into();

// in this case the linear interpolation can't in fact not be worse than bitpacking,
// but the estimator adds some threshold, which leads to estimated worse behavior
Expand Down
23 changes: 14 additions & 9 deletions src/aggregation/agg_req_with_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use std::rc::Rc;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use fastfield_codecs::Column;

use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
use super::bucket::{HistogramAggregation, RangeAggregation, TermsAggregation};
use super::metric::{AverageAggregation, StatsAggregation};
use super::segment_agg_result::BucketCount;
use super::VecWithNames;
use crate::fastfield::{
type_and_cardinality, DynamicFastFieldReader, FastType, MultiValuedFastFieldReader,
};
use crate::fastfield::{type_and_cardinality, FastType, MultiValuedFastFieldReader};
use crate::schema::{Cardinality, Type};
use crate::{InvertedIndexReader, SegmentReader, TantivyError};

Expand All @@ -37,10 +37,16 @@ impl AggregationsWithAccessor {
#[derive(Clone)]
pub(crate) enum FastFieldAccessor {
Multi(MultiValuedFastFieldReader<u64>),
Single(DynamicFastFieldReader<u64>),
Single(Arc<dyn Column<u64>>),
}
impl FastFieldAccessor {
pub fn as_single(&self) -> Option<&DynamicFastFieldReader<u64>> {
pub fn as_single(&self) -> Option<&dyn Column<u64>> {
match self {
FastFieldAccessor::Multi(_) => None,
FastFieldAccessor::Single(reader) => Some(&**reader),
}
}
pub fn into_single(self) -> Option<Arc<dyn Column<u64>>> {
match self {
FastFieldAccessor::Multi(_) => None,
FastFieldAccessor::Single(reader) => Some(reader),
Expand Down Expand Up @@ -118,7 +124,7 @@ impl BucketAggregationWithAccessor {
pub struct MetricAggregationWithAccessor {
pub metric: MetricAggregation,
pub field_type: Type,
pub accessor: DynamicFastFieldReader<u64>,
pub accessor: Arc<dyn Column>,
}

impl MetricAggregationWithAccessor {
Expand All @@ -134,9 +140,8 @@ impl MetricAggregationWithAccessor {

Ok(MetricAggregationWithAccessor {
accessor: accessor
.as_single()
.expect("unexpected fast field cardinality")
.clone(),
.into_single()
.expect("unexpected fast field cardinality"),
field_type,
metric: metric.clone(),
})
Expand Down
3 changes: 1 addition & 2 deletions src/aggregation/bucket/histogram/histogram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
};
use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
use crate::fastfield::DynamicFastFieldReader;
use crate::schema::Type;
use crate::{DocId, TantivyError};

Expand Down Expand Up @@ -264,7 +263,7 @@ impl SegmentHistogramCollector {
req: &HistogramAggregation,
sub_aggregation: &AggregationsWithAccessor,
field_type: Type,
accessor: &DynamicFastFieldReader<u64>,
accessor: &dyn Column<u64>,
) -> crate::Result<Self> {
req.validate()?;
let min = f64_from_fastfield_u64(accessor.min_value(), &field_type);
Expand Down
1 change: 0 additions & 1 deletion src/aggregation/bucket/range.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::fmt::Debug;
use std::ops::Range;

use fastfield_codecs::Column;
use fnv::FnvHashMap;
use serde::{Deserialize, Serialize};

Expand Down
3 changes: 1 addition & 2 deletions src/aggregation/metric/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use fastfield_codecs::Column;
use serde::{Deserialize, Serialize};

use crate::aggregation::f64_from_fastfield_u64;
use crate::fastfield::DynamicFastFieldReader;
use crate::schema::Type;
use crate::DocId;

Expand Down Expand Up @@ -58,7 +57,7 @@ impl SegmentAverageCollector {
data: Default::default(),
}
}
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &DynamicFastFieldReader<u64>) {
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn Column<u64>) {
let mut iter = doc.chunks_exact(4);
for docs in iter.by_ref() {
let val1 = field.get_val(docs[0] as u64);
Expand Down
3 changes: 1 addition & 2 deletions src/aggregation/metric/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use fastfield_codecs::Column;
use serde::{Deserialize, Serialize};

use crate::aggregation::f64_from_fastfield_u64;
use crate::fastfield::DynamicFastFieldReader;
use crate::schema::Type;
use crate::{DocId, TantivyError};

Expand Down Expand Up @@ -164,7 +163,7 @@ impl SegmentStatsCollector {
stats: IntermediateStats::default(),
}
}
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &DynamicFastFieldReader<u64>) {
pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &dyn Column<u64>) {
let mut iter = doc.chunks_exact(4);
for docs in iter.by_ref() {
let val1 = field.get_val(docs[0] as u64);
Expand Down
4 changes: 2 additions & 2 deletions src/aggregation/segment_agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ impl SegmentMetricResultCollector {
pub(crate) fn collect_block(&mut self, doc: &[DocId], metric: &MetricAggregationWithAccessor) {
match self {
SegmentMetricResultCollector::Average(avg_collector) => {
avg_collector.collect_block(doc, &metric.accessor);
avg_collector.collect_block(doc, &*metric.accessor);
}
SegmentMetricResultCollector::Stats(stats_collector) => {
stats_collector.collect_block(doc, &metric.accessor);
stats_collector.collect_block(doc, &*metric.accessor);
}
}
}
Expand Down
Loading

0 comments on commit e550a98

Please sign in to comment.