Skip to content

Commit

Permalink
feat: support merge operator
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Dec 11, 2024
1 parent 6cdf2e6 commit abd4682
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 38 deletions.
1 change: 1 addition & 0 deletions horaedb/metric_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod compaction;
pub mod error;
mod macros;
mod manifest;
pub mod operator;
mod read;
mod sst;
pub mod storage;
Expand Down
154 changes: 154 additions & 0 deletions horaedb/metric_engine/src/operator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{fmt::Debug, sync::Arc};

use anyhow::Context;
use arrow::{
array::{BinaryArray, RecordBatch},
buffer::OffsetBuffer,
};
use arrow_schema::DataType;
use macros::ensure;

use crate::Result;

pub trait MergeOperator: Send + Sync + Debug {
fn merge(&self, _batch: &RecordBatch) -> Result<RecordBatch>;
}

#[derive(Debug)]
pub struct LastValueOperator;

impl MergeOperator for LastValueOperator {
fn merge(&self, batch: &RecordBatch) -> Result<RecordBatch> {
let last_row = batch.slice(batch.num_rows() - 1, 1);
Ok(last_row)
}
}

#[derive(Debug)]
pub struct BytesMergeOperator {
/// Column index of the column need to append together
/// The column type must be `Binary`.
value_idxes: Vec<usize>,
}

impl BytesMergeOperator {
pub fn new(value_idxes: Vec<usize>) -> Self {
Self { value_idxes }
}
}

impl MergeOperator for BytesMergeOperator {
fn merge(&self, batch: &RecordBatch) -> Result<RecordBatch> {
assert!(batch.num_rows() > 0);

for idx in &self.value_idxes {
let data_type = batch.column(*idx).data_type();
ensure!(
data_type == &DataType::Binary,
"MergeOperator is only used for binary column, current:{data_type}"
);
}

let columns = batch
.columns()
.iter()
.enumerate()
.map(|(idx, column)| {
if self.value_idxes.contains(&idx) {
// For value column, we append all elements
let binary_array = column.as_any().downcast_ref::<BinaryArray>().unwrap();
// bytes buffer is cheap for clone.
let byte_buffer = binary_array.values().clone();
let offsets = OffsetBuffer::from_lengths([byte_buffer.len()]);
let concated_column = BinaryArray::new(offsets, byte_buffer, None);
Arc::new(concated_column)
} else {
// For other columns, we just take the first element since the primary key
// columns are the same.
column.slice(0, 1)
}
})
.collect();

let merged_batch = RecordBatch::try_new(batch.schema(), columns)
.context("failed to construct RecordBatch in BytesMergeOperator.")?;

Ok(merged_batch)
}
}

#[cfg(test)]
mod tests {
use arrow::array::{self as arrow_array};
use datafusion::common::{create_array, record_batch};

use super::*;
use crate::arrow_schema;

#[test]
fn test_last_value_operator() {
let operator = LastValueOperator;
let batch = record_batch!(
("pk1", UInt8, vec![11, 11, 11, 11]),
("pk2", UInt8, vec![100, 100, 100, 100]),
("value", Int64, vec![2, 7, 4, 1])
)
.unwrap();

let actual = operator.merge(&batch).unwrap();
let expected = record_batch!(
("pk1", UInt8, vec![11]),
("pk2", UInt8, vec![100]),
("value", Int64, vec![1])
)
.unwrap();
assert_eq!(actual, expected);
}

#[test]
fn test_bytes_merge_operator() {
let operator = BytesMergeOperator::new(vec![2]);
let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value", Binary));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
create_array!(UInt8, vec![11, 11, 11, 11]),
create_array!(UInt8, vec![100, 100, 100, 100]),
Arc::new(BinaryArray::from_vec(vec![
b"one", b"two", b"three", b"four",
])),
],
)
.unwrap();

let actual = operator.merge(&batch).unwrap();
let expected = RecordBatch::try_new(
schema.clone(),
vec![
create_array!(UInt8, vec![11]),
create_array!(UInt8, vec![100]),
Arc::new(BinaryArray::from_vec(vec![b"onetwothreefour"])),
],
)
.unwrap();
assert_eq!(actual, expected);
}
}
36 changes: 15 additions & 21 deletions horaedb/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use datafusion::{
datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
error::{DataFusionError, Result as DfResult},
execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
logical_expr::AggregateUDFImpl,
parquet::arrow::async_reader::AsyncFileReader,
physical_plan::{
metrics::ExecutionPlanMetricsSet, DisplayAs, Distribution, ExecutionPlan, PlanProperties,
Expand All @@ -48,6 +47,7 @@ use parquet::arrow::async_reader::ParquetObjectReader;

use crate::{
compare_primitive_columns,
operator::MergeOperator,
types::{ObjectStoreRef, SEQ_COLUMN_NAME},
};

Expand Down Expand Up @@ -92,25 +92,22 @@ pub(crate) struct MergeExec {
num_primary_keys: usize,
/// Sequence column index
seq_idx: usize,
// (idx, merge_op)
value_idx: usize,
value_op: Arc<dyn AggregateUDFImpl>,
/// Operator to merge values when primary keys are the same
value_operator: Arc<dyn MergeOperator>,
}

impl MergeExec {
pub fn new(
input: Arc<dyn ExecutionPlan>,
num_primary_keys: usize,
seq_idx: usize,
value_idx: usize,
value_op: Arc<dyn AggregateUDFImpl>,
value_operator: Arc<dyn MergeOperator>,
) -> Self {
Self {
input,
num_primary_keys,
seq_idx,
value_idx,
value_op,
value_operator,
}
}
}
Expand Down Expand Up @@ -162,8 +159,7 @@ impl ExecutionPlan for MergeExec {
Arc::clone(&children[0]),
self.num_primary_keys,
self.seq_idx,
self.value_idx,
self.value_op.clone(),
self.value_operator.clone(),
)))
}

Expand All @@ -180,8 +176,7 @@ impl ExecutionPlan for MergeExec {
self.input.execute(partition, context)?,
self.num_primary_keys,
self.seq_idx,
self.value_idx,
self.value_op.clone(),
self.value_operator.clone(),
)))
}
}
Expand All @@ -190,8 +185,7 @@ struct MergeStream {
stream: SendableRecordBatchStream,
num_primary_keys: usize,
seq_idx: usize,
value_idx: usize,
value_op: Arc<dyn AggregateUDFImpl>,
value_operator: Arc<dyn MergeOperator>,

pending_batch: Option<RecordBatch>,
arrow_schema: SchemaRef,
Expand All @@ -202,8 +196,7 @@ impl MergeStream {
stream: SendableRecordBatchStream,
num_primary_keys: usize,
seq_idx: usize,
value_idx: usize,
value_op: Arc<dyn AggregateUDFImpl>,
value_operator: Arc<dyn MergeOperator>,
) -> Self {
let fields = stream
.schema()
Expand All @@ -225,8 +218,7 @@ impl MergeStream {
stream,
num_primary_keys,
seq_idx,
value_idx,
value_op,
value_operator,
pending_batch: None,
arrow_schema,
}
Expand All @@ -242,6 +234,7 @@ impl MergeStream {
for k in 0..self.num_primary_keys {
let lhs_col = lhs.column(k);
let rhs_col = rhs.column(k);

compare_primitive_columns!(
lhs_col, rhs_col, lhs_idx, rhs_idx, // TODO: Add more types here
UInt8Type, Int8Type, UInt32Type, Int32Type, UInt64Type, Int64Type
Expand Down Expand Up @@ -277,12 +270,13 @@ impl MergeStream {
&rows_with_same_primary_keys,
0,
) {
// only keep the last row in this batch
batches.push(pending.slice(pending.num_rows() - 1, 1));
batches.push(self.value_operator.merge(&pending).unwrap());
}
}
batches.push(
rows_with_same_primary_keys.slice(rows_with_same_primary_keys.num_rows() - 1, 1),
self.value_operator
.merge(&rows_with_same_primary_keys)
.unwrap(),
);

start_idx = end_idx;
Expand Down
Loading

0 comments on commit abd4682

Please sign in to comment.