diff --git a/Cargo.toml b/Cargo.toml index 9f5df523d1..9d7dc82590 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,9 +17,12 @@ [workspace.package] version = "2.2.0-alpha" -authors = ["HoraeDB Authors"] +authors = ["Apache HoraeDB(incubating) "] edition = "2021" license = "Apache-2.0" +repository = "https://github.com/apache/horaedb" +homepage = "https://horaedb.apache.org/" +description = "A high-performance, distributed, cloud native time-series database." [workspace] resolver = "2" diff --git a/src/benchmarks/Cargo.toml b/src/benchmarks/Cargo.toml index 077c42dd62..27c536ac4d 100644 --- a/src/benchmarks/Cargo.toml +++ b/src/benchmarks/Cargo.toml @@ -17,18 +17,13 @@ [package] name = "benchmarks" - -[package.license] -workspace = true - -[package.version] -workspace = true - -[package.authors] -workspace = true - -[package.edition] -workspace = true +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description.workspace = true [dependencies] bytes = { workspace = true } diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index f8df372e43..dcc79abd81 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -17,18 +17,13 @@ [package] name = "common" - -[package.license] -workspace = true - -[package.version] -workspace = true - -[package.authors] -workspace = true - -[package.edition] -workspace = true +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description.workspace = true [dependencies] serde = { workspace = true } diff --git a/src/metric_engine/Cargo.toml b/src/metric_engine/Cargo.toml index 4286610366..7ee48ddb86 100644 --- a/src/metric_engine/Cargo.toml +++ b/src/metric_engine/Cargo.toml @@ -17,18 +17,13 @@ [package] name = "metric_engine" - -[package.license] -workspace = true - -[package.version] -workspace = true - -[package.authors] -workspace = true - -[package.edition] -workspace = true +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description.workspace = true [dependencies] anyhow = { workspace = true } diff --git a/src/metric_engine/src/compaction/executor.rs b/src/metric_engine/src/compaction/executor.rs index ceca4e8179..736eed237f 100644 --- a/src/metric_engine/src/compaction/executor.rs +++ b/src/metric_engine/src/compaction/executor.rs @@ -21,7 +21,6 @@ use std::sync::{ }; use anyhow::Context; -use arrow::array::{RecordBatch, UInt64Array}; use async_scoped::TokioScope; use datafusion::{execution::TaskContext, physical_plan::execute_stream}; use futures::StreamExt; @@ -38,7 +37,7 @@ use crate::{ ensure, manifest::{ManifestRef, ManifestUpdate}, read::ParquetReader, - sst::{FileMeta, SstFile, SstPathGenerator}, + sst::{FileId, FileMeta, SstFile, SstPathGenerator}, types::{ObjectStoreRef, RuntimeRef, StorageSchema}, Result, }; @@ -162,10 +161,12 @@ impl Executor { for f in &task.inputs[1..] { time_range.merge(&f.meta().time_range); } - let plan = - self.inner - .parquet_reader - .build_df_plan(task.inputs.clone(), None, Vec::new())?; + let plan = self.inner.parquet_reader.build_df_plan( + task.inputs.clone(), + None, // projection + Vec::new(), // predicate + true, // keep_sequence + )?; let mut stream = execute_stream(plan, Arc::new(TaskContext::default())) .context("execute datafusion plan")?; @@ -185,16 +186,7 @@ impl Executor { while let Some(batch) = stream.next().await { let batch = batch.context("execute plan")?; num_rows += batch.num_rows(); - let batch_with_seq = { - let mut new_cols = batch.columns().to_vec(); - // Since file_id in increasing order, we can use it as sequence. - let seq_column = Arc::new(UInt64Array::from(vec![file_id; batch.num_rows()])); - new_cols.push(seq_column); - RecordBatch::try_new(self.inner.schema.arrow_schema.clone(), new_cols) - .context("construct record batch with seq column")? - }; - - writer.write(&batch_with_seq).await.context("write batch")?; + writer.write(&batch).await.context("write batch")?; } writer.close().await.context("close writer")?; let object_meta = self @@ -225,9 +217,16 @@ impl Executor { // From now on, no error should be returned! // Because we have already updated manifest. + self.delete_ssts(to_deletes.into_iter()); + Ok(()) + } + fn delete_ssts(&self, ids: I) + where + I: Iterator, + { let (_, results) = TokioScope::scope_and_block(|scope| { - for id in to_deletes { + for id in ids { let path = Path::from(self.inner.sst_path_gen.generate(id)); trace!(id, "Delete sst file"); scope.spawn(async move { @@ -251,8 +250,6 @@ impl Executor { } } } - - Ok(()) } } diff --git a/src/metric_engine/src/read.rs b/src/metric_engine/src/read.rs index 0d27433760..f315991394 100644 --- a/src/metric_engine/src/read.rs +++ b/src/metric_engine/src/read.rs @@ -105,6 +105,8 @@ pub(crate) struct MergeExec { seq_idx: usize, /// Operator to merge values when primary keys are the same value_operator: Arc, + /// Whether to keep the sequence column in the output + keep_sequence: bool, } impl MergeExec { @@ -113,12 +115,14 @@ impl MergeExec { num_primary_keys: usize, seq_idx: usize, value_operator: Arc, + keep_sequence: bool, ) -> Self { Self { input, num_primary_keys, seq_idx, value_operator, + keep_sequence, } } } @@ -130,8 +134,8 @@ impl DisplayAs for MergeExec { ) -> std::fmt::Result { write!( f, - "MergeExec: [primary_keys: {}, seq_idx: {}]", - self.num_primary_keys, self.seq_idx + "MergeExec: [primary_keys: {}, seq_idx: {}, keep_sequence: {}]", + self.num_primary_keys, self.seq_idx, self.keep_sequence )?; Ok(()) } @@ -171,6 +175,7 @@ impl ExecutionPlan for MergeExec { self.num_primary_keys, self.seq_idx, self.value_operator.clone(), + self.keep_sequence, ))) } @@ -188,6 +193,7 @@ impl ExecutionPlan for MergeExec { self.num_primary_keys, self.seq_idx, self.value_operator.clone(), + self.keep_sequence, ))) } } @@ -197,6 +203,7 @@ struct MergeStream { num_primary_keys: usize, seq_idx: usize, value_operator: MergeOperatorRef, + keep_sequence: bool, pending_batch: Option, arrow_schema: SchemaRef, @@ -208,28 +215,37 @@ impl MergeStream { num_primary_keys: usize, seq_idx: usize, value_operator: MergeOperatorRef, + keep_sequence: bool, ) -> Self { - let fields = stream - .schema() - .fields() - .into_iter() - .filter_map(|f| { - if f.name() == SEQ_COLUMN_NAME { - None - } else { - Some(f.clone()) - } - }) - .collect_vec(); - let arrow_schema = Arc::new(Schema::new_with_metadata( - fields, - stream.schema().metadata.clone(), - )); + let arrow_schema = if keep_sequence { + let schema = stream.schema(); + let found_seq = schema.fields().iter().any(|f| f.name() == SEQ_COLUMN_NAME); + assert!(found_seq, "Sequence column not found"); + schema + } else { + let fields = stream + .schema() + .fields() + .into_iter() + .filter_map(|f| { + if f.name() == SEQ_COLUMN_NAME { + None + } else { + Some(f.clone()) + } + }) + .collect_vec(); + Arc::new(Schema::new_with_metadata( + fields, + stream.schema().metadata.clone(), + )) + }; Self { stream, num_primary_keys, seq_idx, value_operator, + keep_sequence, pending_batch: None, arrow_schema, } @@ -313,6 +329,10 @@ impl MergeStream { let mut output_batches = concat_batches(&self.stream.schema(), output_batches.iter()).context("concat batch")?; + if self.keep_sequence { + return Ok(Some(output_batches)); + } + // Remove seq column output_batches.remove_column(self.seq_idx); Ok(Some(output_batches)) @@ -331,7 +351,9 @@ impl Stream for MergeStream { Poll::Pending => return Poll::Pending, Poll::Ready(None) => { let value = if let Some(mut pending) = self.pending_batch.take() { - pending.remove_column(self.seq_idx); + if !self.keep_sequence { + pending.remove_column(self.seq_idx); + } let res = self .value_operator .merge(pending) @@ -407,6 +429,7 @@ impl ParquetReader { ssts: Vec, projections: Option>, predicates: Vec, + keep_sequence: bool, ) -> Result> { // we won't use url for selecting object_store. let dummy_url = ObjectStoreUrl::parse("empty://").unwrap(); @@ -464,6 +487,7 @@ impl ParquetReader { Arc::new(BytesMergeOperator::new(self.schema.value_idxes.clone())) } }, + keep_sequence, ); Ok(Arc::new(merge_exec)) } @@ -536,7 +560,9 @@ mod tests { .unwrap(), ]); - let stream = MergeStream::new(stream, 1, 2, merge_op); + let stream = MergeStream::new( + stream, 1, 2, merge_op, false, // keep_sequence + ); check_stream(Box::pin(stream), expected).await; } @@ -574,13 +600,14 @@ mod tests { .collect(), None, vec![expr], + false, // keep_sequence ) .unwrap(); let display_plan = datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref()) .indent(true); assert_eq!( - r#"MergeExec: [primary_keys: 1, seq_idx: 2] + r#"MergeExec: [primary_keys: 1, seq_idx: 2, keep_sequence: false] SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC] FilterExec: pk1@0 = 0 ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]], predicate=pk1@0 = 0, pruning_predicate=CASE WHEN pk1_null_count@2 = pk1_row_count@3 THEN false ELSE pk1_min@0 <= 0 AND 0 <= pk1_max@1 END, required_guarantees=[pk1 in (0)] diff --git a/src/metric_engine/src/storage.rs b/src/metric_engine/src/storage.rs index 2d5b3a6b50..6ebf549111 100644 --- a/src/metric_engine/src/storage.rs +++ b/src/metric_engine/src/storage.rs @@ -380,6 +380,7 @@ impl TimeMergeStorage for CloudObjectStorage { ssts, req.projections.clone(), req.predicate.clone(), + false, // keep_sequence )?; plan_for_all_segments.push(plan); diff --git a/src/pb_types/Cargo.toml b/src/pb_types/Cargo.toml index e6929fa018..c3f903aef5 100644 --- a/src/pb_types/Cargo.toml +++ b/src/pb_types/Cargo.toml @@ -17,18 +17,13 @@ [package] name = "pb_types" - -[package.license] -workspace = true - -[package.version] -workspace = true - -[package.authors] -workspace = true - -[package.edition] -workspace = true +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description.workspace = true [dependencies] prost = { workspace = true } diff --git a/src/server/Cargo.toml b/src/server/Cargo.toml index 94617fa98c..8136b7b777 100644 --- a/src/server/Cargo.toml +++ b/src/server/Cargo.toml @@ -17,18 +17,13 @@ [package] name = "server" - -[package.license] -workspace = true - -[package.version] -workspace = true - -[package.authors] -workspace = true - -[package.edition] -workspace = true +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description.workspace = true [dependencies] actix-web = "4"