diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs index 227d153b56..e0dfdb3cf6 100644 --- a/horaedb/metric_engine/src/lib.rs +++ b/horaedb/metric_engine/src/lib.rs @@ -19,6 +19,7 @@ #![feature(duration_constructors)] pub mod error; +mod macros; mod manifest; mod read; mod sst; diff --git a/horaedb/metric_engine/src/macros.rs b/horaedb/metric_engine/src/macros.rs new file mode 100644 index 0000000000..71c668dbcc --- /dev/null +++ b/horaedb/metric_engine/src/macros.rs @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[macro_export] +macro_rules! compare_primitive_columns { + ($lhs_col:expr, $rhs_col:expr, $lhs_idx:expr, $rhs_idx:expr, $($type:ty),+) => { + $( + if let Some(lhs_col) = $lhs_col.as_primitive_opt::<$type>() { + let rhs_col = $rhs_col.as_primitive::<$type>(); + if !lhs_col.value($lhs_idx).eq(&rhs_col.value($rhs_idx)) { + return false; + } + } + )+ + }; +} diff --git a/horaedb/metric_engine/src/read.rs b/horaedb/metric_engine/src/read.rs index 2561297db3..6a530a6339 100644 --- a/horaedb/metric_engine/src/read.rs +++ b/horaedb/metric_engine/src/read.rs @@ -23,9 +23,12 @@ use std::{ }; use arrow::{ - array::{AsArray, BinaryArray, PrimitiveArray, RecordBatch}, + array::{AsArray, RecordBatch}, compute::concat_batches, - datatypes::{GenericBinaryType, Int8Type, UInt64Type, UInt8Type}, + datatypes::{ + GenericBinaryType, Int32Type, Int64Type, Int8Type, Schema, UInt32Type, UInt64Type, + UInt8Type, + }, }; use arrow_schema::SchemaRef; use datafusion::{ @@ -39,10 +42,14 @@ use datafusion::{ metrics::ExecutionPlanMetricsSet, DisplayAs, Distribution, ExecutionPlan, PlanProperties, }, }; -use futures::{ready, Stream, StreamExt}; +use futures::{Stream, StreamExt}; +use itertools::Itertools; use parquet::arrow::async_reader::ParquetObjectReader; -use crate::types::ObjectStoreRef; +use crate::{ + compare_primitive_columns, + types::{ObjectStoreRef, SEQ_COLUMN_NAME}, +}; #[derive(Debug, Clone)] pub struct DefaultParquetFileReaderFactory { @@ -78,7 +85,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { /// Input record batches are sorted by the primary key columns and seq /// column. #[derive(Debug)] -struct MergeExec { +pub(crate) struct MergeExec { /// Input plan input: Arc, /// (0..num_primary_keys) are primary key columns @@ -91,7 +98,7 @@ struct MergeExec { } impl MergeExec { - fn new( + pub fn new( input: Arc, num_primary_keys: usize, seq_idx: usize, @@ -187,6 +194,7 @@ struct MergeStream { value_op: Arc, pending_batch: Option, + arrow_schema: SchemaRef, } impl MergeStream { @@ -197,6 +205,22 @@ impl MergeStream { value_idx: usize, value_op: Arc, ) -> Self { + let fields = stream + .schema() + .fields() + .into_iter() + .filter_map(|f| { + if f.name() == SEQ_COLUMN_NAME { + None + } else { + Some(f.clone()) + } + }) + .collect_vec(); + let arrow_schema = Arc::new(Schema::new_with_metadata( + fields, + stream.schema().metadata.clone(), + )); Self { stream, num_primary_keys, @@ -204,10 +228,11 @@ impl MergeStream { value_idx, value_op, pending_batch: None, + arrow_schema, } } - fn primary_key_eq2( + fn primary_key_eq( &self, lhs: &RecordBatch, lhs_idx: usize, @@ -217,58 +242,63 @@ impl MergeStream { for k in 0..self.num_primary_keys { let lhs_col = lhs.column(k); let rhs_col = rhs.column(k); - if let Some(lhs_col) = lhs_col.as_primitive_opt::() { - let rhs_col = rhs_col.as_primitive::(); - if !lhs_col.value(lhs_idx).eq(&rhs_col.value(rhs_idx)) { - return false; - } - } else if let Some(lhs_col) = lhs_col.as_primitive_opt::() { - let rhs_col = rhs_col.as_primitive::(); - if !lhs_col.value(lhs_idx).eq(&rhs_col.value(rhs_idx)) { - return false; - } - } else if let Some(lhs_col) = lhs_col.as_bytes_opt::>() { + compare_primitive_columns!( + lhs_col, rhs_col, lhs_idx, rhs_idx, // TODO: Add more types here + UInt8Type, Int8Type, UInt32Type, Int32Type, UInt64Type, Int64Type + ); + + if let Some(lhs_col) = lhs_col.as_bytes_opt::>() { let rhs_col = rhs_col.as_bytes::>(); if !rhs_col.value(rhs_idx).eq(lhs_col.value(lhs_idx)) { return false; } - } else { - unreachable!("unsupported column type: {:?}", lhs_col.data_type()) } } true } - fn primary_key_eq(&self, batch: &RecordBatch, i: usize, j: usize) -> bool { - self.primary_key_eq2(batch, i, batch, j) - } - // TODO: only support deduplication now, merge operation will be added later. fn merge_batch(&mut self, batch: RecordBatch) -> DfResult { - let mut row_idx = 0; let mut batches = vec![]; - while row_idx < batch.num_rows() { - let mut cursor = row_idx + 1; - while self.primary_key_eq(&batch, row_idx, cursor) { - cursor += 1; + let mut start_idx = 0; + while start_idx < batch.num_rows() { + let mut end_idx = start_idx + 1; + while end_idx < batch.num_rows() + && self.primary_key_eq(&batch, start_idx, &batch, end_idx) + { + end_idx += 1; } - - let same_pk_batch = batch.slice(row_idx, cursor - row_idx); + let rows_with_same_primary_keys = batch.slice(start_idx, end_idx - start_idx); if let Some(pending) = self.pending_batch.take() { - if !self.primary_key_eq2(&pending, pending.num_rows() - 1, &same_pk_batch, 0) { + if !self.primary_key_eq( + &pending, + pending.num_rows() - 1, + &rows_with_same_primary_keys, + 0, + ) { // only keep the last row in this batch batches.push(pending.slice(pending.num_rows() - 1, 1)); } } - batches.push(same_pk_batch.slice(same_pk_batch.num_rows() - 1, 1)); + batches.push( + rows_with_same_primary_keys.slice(rows_with_same_primary_keys.num_rows() - 1, 1), + ); - row_idx = cursor; + start_idx = end_idx; } + + // last batch may have overlapping rows with the next batch, so keep them in + // pending_batch self.pending_batch = batches.pop(); - concat_batches(&self.stream.schema(), batches.iter().map(|v| v)) + concat_batches(&self.stream.schema(), batches.iter()) .map_err(|e| DataFusionError::ArrowError(e, None)) + .map(|mut batch| { + // Remove seq column + batch.remove_column(self.seq_idx); + batch + }) } } @@ -276,17 +306,27 @@ impl Stream for MergeStream { type Item = DfResult; fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { - Poll::Ready(ready!(self.stream.poll_next_unpin(ctx)).map(|r| { - r.and_then(|batch| { + match dbg!(self.stream.poll_next_unpin(ctx)) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => { + let value = if let Some(mut pending) = self.pending_batch.take() { + pending.remove_column(self.seq_idx); + Some(Ok(pending)) + } else { + None + }; + Poll::Ready(value) + } + Poll::Ready(Some(v)) => Poll::Ready(Some(v.and_then(|batch| { let batch = self.merge_batch(batch)?; Ok(batch) - }) - })) + }))), + } } } impl RecordBatchStream for MergeStream { fn schema(&self) -> SchemaRef { - todo!() + self.arrow_schema.clone() } } diff --git a/horaedb/metric_engine/src/storage.rs b/horaedb/metric_engine/src/storage.rs index a363b9202a..1887e4f674 100644 --- a/horaedb/metric_engine/src/storage.rs +++ b/horaedb/metric_engine/src/storage.rs @@ -31,13 +31,15 @@ use datafusion::{ physical_plan::{FileScanConfig, ParquetExec}, }, execution::{context::ExecutionProps, object_store::ObjectStoreUrl, SendableRecordBatchStream}, + functions_aggregate::first_last::LastValue, logical_expr::{utils::conjunction, Expr}, physical_expr::{create_physical_expr, LexOrdering}, physical_plan::{ execute_stream, memory::MemoryExec, sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, - ExecutionPlan, + union::UnionExec, + EmptyRecordBatchStream, ExecutionPlan, }, physical_planner::create_physical_sort_exprs, prelude::{ident, SessionContext}, @@ -55,9 +57,9 @@ use parquet::{ use crate::{ manifest::Manifest, - read::DefaultParquetFileReaderFactory, + read::{DefaultParquetFileReaderFactory, MergeExec}, sst::{allocate_id, FileId, FileMeta, SstFile}, - types::{ObjectStoreRef, TimeRange, WriteOptions, WriteResult}, + types::{ObjectStoreRef, TimeRange, WriteOptions, WriteResult, SEQ_COLUMN_NAME}, Result, }; @@ -120,9 +122,6 @@ pub struct CloudObjectStorage { /// ``` /// `root_path` is composed of `path` and `segment_duration`. impl CloudObjectStorage { - // seq column is appended to the end of schema. - const SEQ_COLUMN_NAME: &'static str = "__seq__"; - pub async fn try_new( path: String, segment_duration: Duration, @@ -136,7 +135,7 @@ impl CloudObjectStorage { Manifest::try_new(format!("{path}/{manifest_prefix}"), store.clone()).await?; let mut new_fields = arrow_schema.fields.clone().to_vec(); new_fields.push(Arc::new(Field::new( - Self::SEQ_COLUMN_NAME, + SEQ_COLUMN_NAME, DataType::UInt64, true, ))); @@ -215,7 +214,7 @@ impl CloudObjectStorage { }) .collect::>(); if sort_seq { - sort_exprs.push(ident(Self::SEQ_COLUMN_NAME).sort(true, true)); + sort_exprs.push(ident(SEQ_COLUMN_NAME).sort(true, true)); } let sort_exprs = create_physical_sort_exprs(&sort_exprs, &self.df_schema, &ExecutionProps::default()) @@ -320,22 +319,14 @@ impl CloudObjectStorage { .with_fetch(Some(1024)) .with_round_robin_repartition(true); - // TODO: Add a new plan dedup record batch based on primary keys and sequence - // number. - Ok(Arc::new(sort_exec)) - } - - async fn scan_one_segment( - &self, - ssts: Vec, - projections: Option>, - predicates: Vec, - ) -> Result { - let plan = self.build_scan_plan(ssts, projections, predicates).await?; - let ctx = SessionContext::default(); - let res = execute_stream(plan, ctx.task_ctx()).context("execute sort physical plan")?; - - Ok(res) + let merge_exec = MergeExec::new( + Arc::new(sort_exec), + self.num_primary_keys, + self.schema().fields.len() - 1, + 0, // TODO: value_idx + Arc::new(LastValue::new()), + ); + Ok(Arc::new(merge_exec)) } } @@ -347,10 +338,6 @@ impl TimeMergeStorage for CloudObjectStorage { async fn write(&self, req: WriteRequest) -> Result<()> { if req.enable_check { - ensure!( - self.arrow_schema.contains(req.batch.schema_ref()), - "schema not match" - ); let segment_duration = self.segment_duration.as_millis() as i64; ensure!( req.time_range.start.0 / segment_duration @@ -377,22 +364,37 @@ impl TimeMergeStorage for CloudObjectStorage { Ok(()) } - #[allow(clippy::never_loop)] async fn scan(&self, req: ScanRequest) -> Result { let total_ssts = self.manifest.find_ssts(&req.range).await; + if total_ssts.is_empty() { + return Ok(Box::pin(EmptyRecordBatchStream::new( + self.arrow_schema.clone(), + ))); + } + let ssts_by_segment = total_ssts.into_iter().group_by(|file| { file.meta.time_range.start.0 / self.segment_duration.as_millis() as i64 }); - for (_, ssts) in ssts_by_segment { - return self - .scan_one_segment(ssts, req.projections.clone(), req.predicate.clone()) - .await; + let mut plan_for_all_segments = Vec::new(); + for (_, ssts) in ssts_by_segment.sorted_by(|a, b| a.0.cmp(&b.0)) { + let plan = self + .build_scan_plan(ssts, req.projections.clone(), req.predicate.clone()) + .await?; + + plan_for_all_segments.push(plan); } - // TODO: currently only return records within one segment, we should merge - // them. - todo!("Merge stream from different segment") + let ctx = SessionContext::default(); + if plan_for_all_segments.len() == 1 { + let res = execute_stream(plan_for_all_segments.remove(0), ctx.task_ctx()) + .context("execute stream")?; + return Ok(res); + } + + let union_exec = Arc::new(UnionExec::new(plan_for_all_segments)); + let res = execute_stream(union_exec, ctx.task_ctx()).context("execute stream")?; + return Ok(res); } async fn compact(&self, req: CompactRequest) -> Result<()> { @@ -410,7 +412,6 @@ mod tests { use crate::{arrow_schema, types::Timestamp}; #[tokio::test] - #[ignore = "Depend on MergeExec"] async fn test_build_scan_plan() { let schema = arrow_schema!(("pk1", UInt8)); let store = Arc::new(LocalFileSystem::new()); @@ -446,15 +447,15 @@ mod tests { datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref()) .indent(true); assert_eq!( - r#"SortPreservingMergeExec: [pk1@0 ASC], fetch=1024 - ParquetExec: file_groups={3 groups: [[mock/data/100], [mock/data/101], [mock/data/102]]}, projection=[pk1], output_orderings=[[pk1@0 ASC], [pk1@0 ASC], [pk1@0 ASC]] + r#"MergeExec: [primary_keys: 1, seq_idx: 1] + SortPreservingMergeExec: [pk1@0 ASC, __seq__@1 ASC], fetch=1024 + ParquetExec: file_groups={3 groups: [[mock/data/100], [mock/data/101], [mock/data/102]]}, projection=[pk1, __seq__], output_orderings=[[pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC]] "#, format!("{display_plan}") ); } #[tokio::test] - // #[ignore = "Depend on MergeExec"] async fn test_storage_write_and_scan() { let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value", Int64)); let root_dir = temp_dir::TempDir::new().unwrap(); @@ -472,7 +473,7 @@ mod tests { let batch = record_batch!( ("pk1", UInt8, vec![11, 11, 9, 10, 5]), - ("pk2", UInt8, vec![100, 99, 1, 2, 3]), + ("pk2", UInt8, vec![100, 100, 1, 2, 3]), ("value", Int64, vec![2, 7, 4, 6, 1]) ) .unwrap(); @@ -486,9 +487,9 @@ mod tests { .unwrap(); let batch = record_batch!( - ("pk1", UInt8, vec![1, 8, 9]), - ("pk2", UInt8, vec![100, 99, 98]), - ("value", Int64, vec![2, 7, 4]) + ("pk1", UInt8, vec![11, 11, 9, 10]), + ("pk2", UInt8, vec![100, 99, 1, 2]), + ("value", Int64, vec![22, 77, 44, 66]) ) .unwrap(); storage @@ -508,15 +509,25 @@ mod tests { }) .await .unwrap(); - let expected_batch = record_batch!( - ("pk1", UInt8, vec![1, 5, 8, 9, 9, 10, 11, 11]), - ("pk2", UInt8, vec![100, 3, 99, 1, 98, 2, 99, 100]), - ("value", Int64, vec![2, 1, 7, 4, 4, 6, 7, 2]) - ) - .unwrap(); + let expected_batch = [ + record_batch!( + ("pk1", UInt8, vec![5, 9, 10, 11]), + ("pk2", UInt8, vec![3, 1, 2, 99]), + ("value", Int64, vec![1, 44, 66, 77]) + ) + .unwrap(), + record_batch!( + ("pk1", UInt8, vec![11]), + ("pk2", UInt8, vec![100]), + ("value", Int64, vec![22]) + ) + .unwrap(), + ]; + let mut idx = 0; while let Some(batch) = result_stream.next().await { let batch = batch.unwrap(); - assert_eq!(expected_batch, batch); + assert_eq!(expected_batch[idx], batch); + idx += 1; } } diff --git a/horaedb/metric_engine/src/types.rs b/horaedb/metric_engine/src/types.rs index 022d69cb2b..624b8a2e64 100644 --- a/horaedb/metric_engine/src/types.rs +++ b/horaedb/metric_engine/src/types.rs @@ -26,6 +26,10 @@ use parquet::basic::{Compression, Encoding, ZstdLevel}; use crate::sst::FileId; +// Seq column is a builtin column, and it will be appended to the end of +// user-defined schema. +pub const SEQ_COLUMN_NAME: &str = "__seq__"; + #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub struct Timestamp(pub i64);