Skip to content

Commit

Permalink
trigger compaction sonner
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Dec 22, 2024
1 parent 0e3e4d1 commit 0579c71
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
10 changes: 7 additions & 3 deletions src/metric_engine/src/compaction/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ impl Executor {
self.inner
.inused_memory
.fetch_sub(task_size, Ordering::Relaxed);
if let Err(e) = self.inner.trigger_tx.try_send(()) {
debug!("send pick task trigger signal failed, err{e:?}");
}
}

pub fn on_failure(&self, task: &Task) {
Expand All @@ -148,10 +145,17 @@ impl Executor {
runnable.spawn()
}

fn trigger_more_task(&self) {
if let Err(e) = self.inner.trigger_tx.try_send(()) {
debug!("Send pick task trigger signal failed, err{e:?}");
}
}

// TODO: Merge input sst files into one new sst file
// and delete the expired sst files
pub async fn do_compaction(&self, task: &Task) -> Result<()> {
self.pre_check(task)?;
self.trigger_more_task();

debug!(input_len = task.inputs.len(), "Start do compaction");
let mut time_range = task.inputs[0].meta().time_range.clone();
Expand Down
4 changes: 2 additions & 2 deletions src/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ impl ParquetReader {
})
.collect::<Vec<_>>();
let scan_config = FileScanConfig::new(dummy_url, self.schema.arrow_schema.clone())
.with_output_ordering(vec![sort_exprs.clone()])
.with_output_ordering(vec![sort_exprs.clone(); file_groups.len()])
.with_file_groups(file_groups)
.with_projection(projections);

Expand Down Expand Up @@ -569,7 +569,7 @@ mod tests {
.indent(true);
assert_eq!(
r#"MergeExec: [primary_keys: 1, seq_idx: 2]
SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC], fetch=1024
SortPreservingMergeExec: [pk1@0 ASC, __seq__@2 ASC]
ParquetExec: file_groups={3 groups: [[mock/data/100.sst], [mock/data/101.sst], [mock/data/102.sst]]}, projection=[pk1, value, __seq__], output_orderings=[[pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC], [pk1@0 ASC, __seq__@2 ASC]]
"#,
format!("{display_plan}")
Expand Down

0 comments on commit 0579c71

Please sign in to comment.