Skip to content

Commit

Permalink
feat: skip add sequence column when compact (#1618)
Browse files Browse the repository at this point in the history
## Rationale


## Detailed Changes
- Refactor package metadata
- ParquetReader add `keep_sequence` args, and set it to `true`for
compaction, `false` for query.

## Test Plan

CI
```
 cargo publish --dry-run --registry crates-io
```
This commands run successfully.
  • Loading branch information
jiacai2050 authored Dec 24, 2024
1 parent 5d7c6a7 commit 47dd645
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 101 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

[workspace.package]
version = "2.2.0-alpha"
authors = ["HoraeDB Authors"]
authors = ["Apache HoraeDB(incubating) <[email protected]>"]
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/apache/horaedb"
homepage = "https://horaedb.apache.org/"
description = "A high-performance, distributed, cloud native time-series database."

[workspace]
resolver = "2"
Expand Down
19 changes: 7 additions & 12 deletions src/benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

[package]
name = "benchmarks"

[package.license]
workspace = true

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description.workspace = true

[dependencies]
bytes = { workspace = true }
Expand Down
19 changes: 7 additions & 12 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

[package]
name = "common"

[package.license]
workspace = true

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description.workspace = true

[dependencies]
serde = { workspace = true }
Expand Down
19 changes: 7 additions & 12 deletions src/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

[package]
name = "metric_engine"

[package.license]
workspace = true

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description.workspace = true

[dependencies]
anyhow = { workspace = true }
Expand Down
35 changes: 16 additions & 19 deletions src/metric_engine/src/compaction/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::sync::{
};

use anyhow::Context;
use arrow::array::{RecordBatch, UInt64Array};
use async_scoped::TokioScope;
use datafusion::{execution::TaskContext, physical_plan::execute_stream};
use futures::StreamExt;
Expand All @@ -38,7 +37,7 @@ use crate::{
ensure,
manifest::{ManifestRef, ManifestUpdate},
read::ParquetReader,
sst::{FileMeta, SstFile, SstPathGenerator},
sst::{FileId, FileMeta, SstFile, SstPathGenerator},
types::{ObjectStoreRef, RuntimeRef, StorageSchema},
Result,
};
Expand Down Expand Up @@ -162,10 +161,12 @@ impl Executor {
for f in &task.inputs[1..] {
time_range.merge(&f.meta().time_range);
}
let plan =
self.inner
.parquet_reader
.build_df_plan(task.inputs.clone(), None, Vec::new())?;
let plan = self.inner.parquet_reader.build_df_plan(
task.inputs.clone(),
None, // projection
Vec::new(), // predicate
true, // keep_sequence
)?;
let mut stream = execute_stream(plan, Arc::new(TaskContext::default()))
.context("execute datafusion plan")?;

Expand All @@ -185,16 +186,7 @@ impl Executor {
while let Some(batch) = stream.next().await {
let batch = batch.context("execute plan")?;
num_rows += batch.num_rows();
let batch_with_seq = {
let mut new_cols = batch.columns().to_vec();
// Since file_id in increasing order, we can use it as sequence.
let seq_column = Arc::new(UInt64Array::from(vec![file_id; batch.num_rows()]));
new_cols.push(seq_column);
RecordBatch::try_new(self.inner.schema.arrow_schema.clone(), new_cols)
.context("construct record batch with seq column")?
};

writer.write(&batch_with_seq).await.context("write batch")?;
writer.write(&batch).await.context("write batch")?;
}
writer.close().await.context("close writer")?;
let object_meta = self
Expand Down Expand Up @@ -225,9 +217,16 @@ impl Executor {

// From now on, no error should be returned!
// Because we have already updated manifest.
self.delete_ssts(to_deletes.into_iter());
Ok(())
}

fn delete_ssts<I>(&self, ids: I)
where
I: Iterator<Item = FileId>,
{
let (_, results) = TokioScope::scope_and_block(|scope| {
for id in to_deletes {
for id in ids {
let path = Path::from(self.inner.sst_path_gen.generate(id));
trace!(id, "Delete sst file");
scope.spawn(async move {
Expand All @@ -251,8 +250,6 @@ impl Executor {
}
}
}

Ok(())
}
}

Expand Down
69 changes: 48 additions & 21 deletions src/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ pub(crate) struct MergeExec {
seq_idx: usize,
/// Operator to merge values when primary keys are the same
value_operator: Arc<dyn MergeOperator>,
/// Whether to keep the sequence column in the output
keep_sequence: bool,
}

impl MergeExec {
Expand All @@ -113,12 +115,14 @@ impl MergeExec {
num_primary_keys: usize,
seq_idx: usize,
value_operator: Arc<dyn MergeOperator>,
keep_sequence: bool,
) -> Self {
Self {
input,
num_primary_keys,
seq_idx,
value_operator,
keep_sequence,
}
}
}
Expand All @@ -130,8 +134,8 @@ impl DisplayAs for MergeExec {
) -> std::fmt::Result {
write!(
f,
"MergeExec: [primary_keys: {}, seq_idx: {}]",
self.num_primary_keys, self.seq_idx
"MergeExec: [primary_keys: {}, seq_idx: {}, keep_sequence: {}]",
self.num_primary_keys, self.seq_idx, self.keep_sequence
)?;
Ok(())
}
Expand Down Expand Up @@ -171,6 +175,7 @@ impl ExecutionPlan for MergeExec {
self.num_primary_keys,
self.seq_idx,
self.value_operator.clone(),
self.keep_sequence,
)))
}

Expand All @@ -188,6 +193,7 @@ impl ExecutionPlan for MergeExec {
self.num_primary_keys,
self.seq_idx,
self.value_operator.clone(),
self.keep_sequence,
)))
}
}
Expand All @@ -197,6 +203,7 @@ struct MergeStream {
num_primary_keys: usize,
seq_idx: usize,
value_operator: MergeOperatorRef,
keep_sequence: bool,

pending_batch: Option<RecordBatch>,
arrow_schema: SchemaRef,
Expand All @@ -208,28 +215,37 @@ impl MergeStream {
num_primary_keys: usize,
seq_idx: usize,
value_operator: MergeOperatorRef,
keep_sequence: bool,
) -> Self {
let fields = stream
.schema()
.fields()
.into_iter()
.filter_map(|f| {
if f.name() == SEQ_COLUMN_NAME {
None
} else {
Some(f.clone())
}
})
.collect_vec();
let arrow_schema = Arc::new(Schema::new_with_metadata(
fields,
stream.schema().metadata.clone(),
));
let arrow_schema = if keep_sequence {
let schema = stream.schema();
let found_seq = schema.fields().iter().any(|f| f.name() == SEQ_COLUMN_NAME);
assert!(found_seq, "Sequence column not found");
schema
} else {
let fields = stream
.schema()
.fields()
.into_iter()
.filter_map(|f| {
if f.name() == SEQ_COLUMN_NAME {
None
} else {
Some(f.clone())
}
})
.collect_vec();
Arc::new(Schema::new_with_metadata(
fields,
stream.schema().metadata.clone(),
))
};
Self {
stream,
num_primary_keys,
seq_idx,
value_operator,
keep_sequence,
pending_batch: None,
arrow_schema,
}
Expand Down Expand Up @@ -313,6 +329,10 @@ impl MergeStream {

let mut output_batches =
concat_batches(&self.stream.schema(), output_batches.iter()).context("concat batch")?;
if self.keep_sequence {
return Ok(Some(output_batches));
}

// Remove seq column
output_batches.remove_column(self.seq_idx);
Ok(Some(output_batches))
Expand All @@ -331,7 +351,9 @@ impl Stream for MergeStream {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
let value = if let Some(mut pending) = self.pending_batch.take() {
pending.remove_column(self.seq_idx);
if !self.keep_sequence {
pending.remove_column(self.seq_idx);
}
let res = self
.value_operator
.merge(pending)
Expand Down Expand Up @@ -407,6 +429,7 @@ impl ParquetReader {
ssts: Vec<SstFile>,
projections: Option<Vec<usize>>,
predicates: Vec<Expr>,
keep_sequence: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
// we won't use url for selecting object_store.
let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
Expand Down Expand Up @@ -464,6 +487,7 @@ impl ParquetReader {
Arc::new(BytesMergeOperator::new(self.schema.value_idxes.clone()))
}
},
keep_sequence,
);
Ok(Arc::new(merge_exec))
}
Expand Down Expand Up @@ -536,7 +560,9 @@ mod tests {
.unwrap(),
]);

let stream = MergeStream::new(stream, 1, 2, merge_op);
let stream = MergeStream::new(
stream, 1, 2, merge_op, false, // keep_sequence
);
check_stream(Box::pin(stream), expected).await;
}

Expand Down Expand Up @@ -574,13 +600,14 @@ mod tests {
.collect(),
None,
vec![expr],
false, // keep_sequence
)
.unwrap();
let display_plan =
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref())
.indent(true);
assert_eq!(
r#"MergeExec: [primary_keys: 1, seq_idx: 2]
r#"MergeExec: [primary_keys: 1, seq_idx: 2, keep_sequence: false]
SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC]
FilterExec: pk1@0 = 0
ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]], predicate=pk1@0 = 0, pruning_predicate=CASE WHEN pk1_null_count@2 = pk1_row_count@3 THEN false ELSE pk1_min@0 <= 0 AND 0 <= pk1_max@1 END, required_guarantees=[pk1 in (0)]
Expand Down
1 change: 1 addition & 0 deletions src/metric_engine/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ impl TimeMergeStorage for CloudObjectStorage {
ssts,
req.projections.clone(),
req.predicate.clone(),
false, // keep_sequence
)?;

plan_for_all_segments.push(plan);
Expand Down
19 changes: 7 additions & 12 deletions src/pb_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@

[package]
name = "pb_types"

[package.license]
workspace = true

[package.version]
workspace = true

[package.authors]
workspace = true

[package.edition]
workspace = true
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description.workspace = true

[dependencies]
prost = { workspace = true }
Expand Down
Loading

0 comments on commit 47dd645

Please sign in to comment.