Skip to content

Commit

Permalink
merge plan for all segments
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Nov 20, 2024
1 parent 1ae3b60 commit fa22563
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 91 deletions.
1 change: 1 addition & 0 deletions horaedb/metric_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#![feature(duration_constructors)]
pub mod error;
mod macros;
mod manifest;
mod read;
mod sst;
Expand Down
30 changes: 30 additions & 0 deletions horaedb/metric_engine/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#[macro_export]
macro_rules! compare_primitive_columns {
($lhs_col:expr, $rhs_col:expr, $lhs_idx:expr, $rhs_idx:expr, $($type:ty),+) => {
$(
if let Some(lhs_col) = $lhs_col.as_primitive_opt::<$type>() {
let rhs_col = $rhs_col.as_primitive::<$type>();
if !lhs_col.value($lhs_idx).eq(&rhs_col.value($rhs_idx)) {
return false;
}
}
)+
};
}
120 changes: 80 additions & 40 deletions horaedb/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ use std::{
};

use arrow::{
array::{AsArray, BinaryArray, PrimitiveArray, RecordBatch},
array::{AsArray, RecordBatch},
compute::concat_batches,
datatypes::{GenericBinaryType, Int8Type, UInt64Type, UInt8Type},
datatypes::{
GenericBinaryType, Int32Type, Int64Type, Int8Type, Schema, UInt32Type, UInt64Type,
UInt8Type,
},
};
use arrow_schema::SchemaRef;
use datafusion::{
Expand All @@ -39,10 +42,14 @@ use datafusion::{
metrics::ExecutionPlanMetricsSet, DisplayAs, Distribution, ExecutionPlan, PlanProperties,
},
};
use futures::{ready, Stream, StreamExt};
use futures::{Stream, StreamExt};
use itertools::Itertools;
use parquet::arrow::async_reader::ParquetObjectReader;

use crate::types::ObjectStoreRef;
use crate::{
compare_primitive_columns,
types::{ObjectStoreRef, SEQ_COLUMN_NAME},
};

#[derive(Debug, Clone)]
pub struct DefaultParquetFileReaderFactory {
Expand Down Expand Up @@ -78,7 +85,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
/// Input record batches are sorted by the primary key columns and seq
/// column.
#[derive(Debug)]
struct MergeExec {
pub(crate) struct MergeExec {
/// Input plan
input: Arc<dyn ExecutionPlan>,
/// (0..num_primary_keys) are primary key columns
Expand All @@ -91,7 +98,7 @@ struct MergeExec {
}

impl MergeExec {
fn new(
pub fn new(
input: Arc<dyn ExecutionPlan>,
num_primary_keys: usize,
seq_idx: usize,
Expand Down Expand Up @@ -187,6 +194,7 @@ struct MergeStream {
value_op: Arc<dyn AggregateUDFImpl>,

pending_batch: Option<RecordBatch>,
arrow_schema: SchemaRef,
}

impl MergeStream {
Expand All @@ -197,17 +205,34 @@ impl MergeStream {
value_idx: usize,
value_op: Arc<dyn AggregateUDFImpl>,
) -> Self {
let fields = stream
.schema()
.fields()
.into_iter()
.filter_map(|f| {
if f.name() == SEQ_COLUMN_NAME {
None
} else {
Some(f.clone())
}
})
.collect_vec();
let arrow_schema = Arc::new(Schema::new_with_metadata(
fields,
stream.schema().metadata.clone(),
));
Self {
stream,
num_primary_keys,
seq_idx,
value_idx,
value_op,
pending_batch: None,
arrow_schema,
}
}

fn primary_key_eq2(
fn primary_key_eq(
&self,
lhs: &RecordBatch,
lhs_idx: usize,
Expand All @@ -217,76 +242,91 @@ impl MergeStream {
for k in 0..self.num_primary_keys {
let lhs_col = lhs.column(k);
let rhs_col = rhs.column(k);
if let Some(lhs_col) = lhs_col.as_primitive_opt::<UInt8Type>() {
let rhs_col = rhs_col.as_primitive::<UInt8Type>();
if !lhs_col.value(lhs_idx).eq(&rhs_col.value(rhs_idx)) {
return false;
}
} else if let Some(lhs_col) = lhs_col.as_primitive_opt::<UInt64Type>() {
let rhs_col = rhs_col.as_primitive::<UInt64Type>();
if !lhs_col.value(lhs_idx).eq(&rhs_col.value(rhs_idx)) {
return false;
}
} else if let Some(lhs_col) = lhs_col.as_bytes_opt::<GenericBinaryType<i32>>() {
compare_primitive_columns!(
lhs_col, rhs_col, lhs_idx, rhs_idx, // TODO: Add more types here
UInt8Type, Int8Type, UInt32Type, Int32Type, UInt64Type, Int64Type
);

if let Some(lhs_col) = lhs_col.as_bytes_opt::<GenericBinaryType<i32>>() {
let rhs_col = rhs_col.as_bytes::<GenericBinaryType<i32>>();
if !rhs_col.value(rhs_idx).eq(lhs_col.value(lhs_idx)) {
return false;
}
} else {
unreachable!("unsupported column type: {:?}", lhs_col.data_type())
}
}

true
}

fn primary_key_eq(&self, batch: &RecordBatch, i: usize, j: usize) -> bool {
self.primary_key_eq2(batch, i, batch, j)
}

// TODO: only support deduplication now, merge operation will be added later.
fn merge_batch(&mut self, batch: RecordBatch) -> DfResult<RecordBatch> {
let mut row_idx = 0;
let mut batches = vec![];
while row_idx < batch.num_rows() {
let mut cursor = row_idx + 1;
while self.primary_key_eq(&batch, row_idx, cursor) {
cursor += 1;
let mut start_idx = 0;
while start_idx < batch.num_rows() {
let mut end_idx = start_idx + 1;
while end_idx < batch.num_rows()
&& self.primary_key_eq(&batch, start_idx, &batch, end_idx)
{
end_idx += 1;
}

let same_pk_batch = batch.slice(row_idx, cursor - row_idx);
let rows_with_same_primary_keys = batch.slice(start_idx, end_idx - start_idx);
if let Some(pending) = self.pending_batch.take() {
if !self.primary_key_eq2(&pending, pending.num_rows() - 1, &same_pk_batch, 0) {
if !self.primary_key_eq(
&pending,
pending.num_rows() - 1,
&rows_with_same_primary_keys,
0,
) {
// only keep the last row in this batch
batches.push(pending.slice(pending.num_rows() - 1, 1));
}
}
batches.push(same_pk_batch.slice(same_pk_batch.num_rows() - 1, 1));
batches.push(
rows_with_same_primary_keys.slice(rows_with_same_primary_keys.num_rows() - 1, 1),
);

row_idx = cursor;
start_idx = end_idx;
}

// last batch may have overlapping rows with the next batch, so keep them in
// pending_batch
self.pending_batch = batches.pop();

concat_batches(&self.stream.schema(), batches.iter().map(|v| v))
concat_batches(&self.stream.schema(), batches.iter())
.map_err(|e| DataFusionError::ArrowError(e, None))
.map(|mut batch| {
// Remove seq column
batch.remove_column(self.seq_idx);
batch
})
}
}

impl Stream for MergeStream {
type Item = DfResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
Poll::Ready(ready!(self.stream.poll_next_unpin(ctx)).map(|r| {
r.and_then(|batch| {
match dbg!(self.stream.poll_next_unpin(ctx)) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => {
let value = if let Some(mut pending) = self.pending_batch.take() {
pending.remove_column(self.seq_idx);
Some(Ok(pending))
} else {
None
};
Poll::Ready(value)
}
Poll::Ready(Some(v)) => Poll::Ready(Some(v.and_then(|batch| {
let batch = self.merge_batch(batch)?;
Ok(batch)
})
}))
}))),
}
}
}

impl RecordBatchStream for MergeStream {
fn schema(&self) -> SchemaRef {
todo!()
self.arrow_schema.clone()
}
}
Loading

0 comments on commit fa22563

Please sign in to comment.