Skip to content

Commit

Permalink
fix: manifest delta num may overflow (apache#1616)
Browse files Browse the repository at this point in the history
## Rationale
Fix bugs found in local write bench.

## Detailed Changes
- When manifest starts up, the delta num may overflow since it's
initialized to 0.
- When manifest merge_update, since the deltas is unsorted, so we may
first delete a non-existing file.

## Test Plan
CI
  • Loading branch information
jiacai2050 authored Dec 23, 2024
1 parent d2f878f commit f8bb726
Show file tree
Hide file tree
Showing 19 changed files with 537 additions and 294 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions docs/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@

port = 5000

[metric_engine]
[test]
enable_write = true
write_worker_num = 1
write_interval = "500ms"

[metric_engine.storage]
[metric_engine.threads]
sst_thread_num = 2
manifest_thread_num = 2

[metric_engine.storage.object_store]
type = "Local"
data_dir = "/tmp/horaedb-storage"
data_dir = "/tmp/horaedb-storage"
14 changes: 3 additions & 11 deletions src/benchmarks/src/encoding_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use bytes::Bytes;
use metric_engine::{
manifest::{ManifestUpdate, Snapshot},
manifest::Snapshot,
sst::{FileMeta, SstFile},
};

Expand All @@ -43,11 +43,7 @@ impl EncodingBench {
);
let sstfiles = vec![sstfile.clone(); config.record_count];
let mut snapshot = Snapshot::try_from(Bytes::new()).unwrap();
let update = ManifestUpdate {
to_adds: sstfiles,
to_deletes: vec![],
};
let _ = snapshot.merge_update(update);
snapshot.add_records(sstfiles);

EncodingBench {
raw_bytes: snapshot.into_bytes().unwrap(),
Expand All @@ -60,11 +56,7 @@ impl EncodingBench {
// first decode snapshot and then append with delta sstfiles, serialize to bytes
// at last
let mut snapshot = Snapshot::try_from(self.raw_bytes.clone()).unwrap();
let update = ManifestUpdate {
to_adds: self.to_append.clone(),
to_deletes: vec![],
};
let _ = snapshot.merge_update(update);
snapshot.add_records(self.to_append.clone());
let _ = snapshot.into_bytes();
}
}
1 change: 1 addition & 0 deletions src/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ object_store = { workspace = true }
parquet = { workspace = true, features = ["object_store"] }
pb_types = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
Expand Down
40 changes: 22 additions & 18 deletions src/metric_engine/src/compaction/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ use parquet::{
arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
file::properties::WriterProperties,
};
use tracing::error;
use tokio::sync::mpsc::Sender;
use tracing::{debug, error, trace};

use crate::{
compaction::Task,
ensure,
manifest::{ManifestRef, ManifestUpdate},
read::ParquetReader,
sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
sst::{FileMeta, SstFile, SstPathGenerator},
types::{ObjectStoreRef, RuntimeRef, StorageSchema},
Result,
};
Expand All @@ -57,6 +58,7 @@ struct Inner {
write_props: WriterProperties,
inused_memory: AtomicU64,
mem_limit: u64,
trigger_tx: Sender<()>,
}

impl Executor {
Expand All @@ -70,6 +72,7 @@ impl Executor {
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
mem_limit: u64,
trigger_tx: Sender<()>,
) -> Self {
let inner = Inner {
runtime,
Expand All @@ -81,6 +84,7 @@ impl Executor {
write_props,
mem_limit,
inused_memory: AtomicU64::new(0),
trigger_tx,
};
Self {
inner: Arc::new(inner),
Expand Down Expand Up @@ -141,11 +145,19 @@ impl Executor {
runnable.spawn()
}

fn trigger_more_task(&self) {
if let Err(e) = self.inner.trigger_tx.try_send(()) {
debug!("Send pick task trigger signal failed, err{e:?}");
}
}

// TODO: Merge input sst files into one new sst file
// and delete the expired sst files
pub async fn do_compaction(&self, task: &Task) -> Result<()> {
self.pre_check(task)?;
self.trigger_more_task();

debug!(input_len = task.inputs.len(), "Start do compaction");
let mut time_range = task.inputs[0].meta().time_range.clone();
for f in &task.inputs[1..] {
time_range.merge(&f.meta().time_range);
Expand All @@ -157,7 +169,7 @@ impl Executor {
let mut stream = execute_stream(plan, Arc::new(TaskContext::default()))
.context("execute datafusion plan")?;

let file_id = allocate_id();
let file_id = SstFile::allocate_id();
let file_path = self.inner.sst_path_gen.generate(file_id);
let file_path = Path::from(file_path);
let object_store_writer =
Expand Down Expand Up @@ -197,35 +209,27 @@ impl Executor {
size: object_meta.size as u32,
time_range: time_range.clone(),
};
debug!(file_meta = ?file_meta, "Compact output new sst");
// First add new sst to manifest, then delete expired/old sst
let to_adds = vec![SstFile::new(file_id, file_meta)];
let to_deletes = task
.expireds
.iter()
.map(|f| f.id())
.chain(task.inputs.iter().map(|f| f.id()))
.collect();
.collect::<Vec<_>>();
self.inner
.manifest
.update(ManifestUpdate::new(to_adds, to_deletes))
.update(ManifestUpdate::new(to_adds, to_deletes.clone()))
.await?;

// From now on, no error should be returned!
// Because we have already updated manifest.

let (_, results) = TokioScope::scope_and_block(|scope| {
for file in &task.expireds {
let path = Path::from(self.inner.sst_path_gen.generate(file.id()));
scope.spawn(async move {
self.inner
.store
.delete(&path)
.await
.with_context(|| format!("failed to delete file, path:{path}"))
});
}
for file in &task.inputs {
let path = Path::from(self.inner.sst_path_gen.generate(file.id()));
for id in to_deletes {
let path = Path::from(self.inner.sst_path_gen.generate(id));
trace!(id, "Delete sst file");
scope.spawn(async move {
self.inner
.store
Expand Down Expand Up @@ -262,7 +266,7 @@ impl Runnable {
let rt = self.executor.inner.runtime.clone();
rt.spawn(async move {
if let Err(e) = self.executor.do_compaction(&self.task).await {
error!("Do compaction failed, err:{e}");
error!("Do compaction failed, err:{e:?}");
self.executor.on_failure(&self.task);
} else {
self.executor.on_success(&self.task);
Expand Down
2 changes: 1 addition & 1 deletion src/metric_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod executor;
mod picker;
mod scheduler;

pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};
pub use scheduler::Scheduler as CompactionScheduler;

use crate::sst::SstFile;

Expand Down
29 changes: 19 additions & 10 deletions src/metric_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use std::{collections::BTreeMap, time::Duration};

use common::now;
use tracing::debug;
use tracing::trace;

use crate::{compaction::Task, manifest::ManifestRef, sst::SstFile, types::Timestamp};

Expand All @@ -35,6 +35,7 @@ impl Picker {
segment_duration: Duration,
new_sst_max_size: u64,
input_sst_max_num: usize,
input_sst_min_num: usize,
) -> Self {
Self {
manifest,
Expand All @@ -43,11 +44,15 @@ impl Picker {
segment_duration,
new_sst_max_size,
input_sst_max_num,
input_sst_min_num,
),
}
}

pub async fn pick_candidate(&self) -> Option<Task> {
/// This function picks a candidate for compaction.
/// Note: It can only execute sequentially, otherwise a SST may be picked by
/// multiple threads(that's why it take a mutable self).
pub async fn pick_candidate(&mut self) -> Option<Task> {
let ssts = self.manifest.all_ssts().await;
let expire_time = self.ttl.map(|ttl| (now() - ttl.as_micros() as i64).into());
self.strategy.pick_candidate(ssts, expire_time)
Expand All @@ -58,18 +63,21 @@ pub struct TimeWindowCompactionStrategy {
segment_duration: Duration,
new_sst_max_size: u64,
input_sst_max_num: usize,
input_sst_min_num: usize,
}

impl TimeWindowCompactionStrategy {
pub fn new(
segment_duration: Duration,
new_sst_max_size: u64,
input_sst_max_num: usize,
input_sst_min_num: usize,
) -> Self {
Self {
segment_duration,
new_sst_max_size,
input_sst_max_num,
input_sst_min_num,
}
}

Expand All @@ -80,7 +88,7 @@ impl TimeWindowCompactionStrategy {
) -> Option<Task> {
let (uncompacted_files, expired_files) =
Self::find_uncompacted_and_expired_files(ssts, expire_time);
debug!(uncompacted_files = ?uncompacted_files, expired_files = ?expired_files, "Begin pick candidate");
trace!(uncompacted_files = ?uncompacted_files, expired_files = ?expired_files, "Begin pick candidate");

let files_by_segment = self.files_by_segment(uncompacted_files);
let compaction_files = self.pick_compaction_files(files_by_segment)?;
Expand All @@ -101,7 +109,7 @@ impl TimeWindowCompactionStrategy {
expireds: expired_files,
};

debug!(task = ?task, "End pick candidate");
trace!(task = ?task, "End pick candidate");

Some(task)
}
Expand Down Expand Up @@ -130,14 +138,14 @@ impl TimeWindowCompactionStrategy {
let segment_duration = self.segment_duration;
for file in files {
let segment = file.meta().time_range.start.truncate_by(segment_duration);
debug!(segment = ?segment, file = ?file);
trace!(segment = ?segment, file = ?file);
files_by_segment
.entry(segment)
.or_insert_with(Vec::new)
.push(file);
}

debug!(
trace!(
files = ?files_by_segment,
"Group files of similar timestamp into segment"
);
Expand All @@ -149,13 +157,14 @@ impl TimeWindowCompactionStrategy {
files_by_segment: BTreeMap<Timestamp, Vec<SstFile>>,
) -> Option<Vec<SstFile>> {
for (segment, mut files) in files_by_segment.into_iter().rev() {
debug!(segment = ?segment, files = ?files, "Loop segment for pick files");
if files.len() < 2 {
trace!(segment = ?segment, files = ?files.len(), "Loop segment for pick files");
if files.len() < self.input_sst_min_num {
continue;
}

// Prefer to compact smaller files first.
files.sort_unstable_by_key(SstFile::size);
trace!(sorted_files = ?files, "Sort files by size");

let mut input_size = 0;
// Suppose the comaction will reduce the size of files by 10%.
Expand All @@ -170,7 +179,7 @@ impl TimeWindowCompactionStrategy {
})
.collect::<Vec<_>>();

if compaction_files.len() >= 2 {
if compaction_files.len() >= self.input_sst_min_num {
return Some(compaction_files);
}
}
Expand All @@ -192,7 +201,7 @@ mod tests {
#[test]
fn test_pick_candidate() {
let segment_duration = Duration::from_millis(20);
let strategy = TimeWindowCompactionStrategy::new(segment_duration, 9999, 10);
let strategy = TimeWindowCompactionStrategy::new(segment_duration, 9999, 10, 2);

let ssts = (0_i64..5_i64)
.map(|i| {
Expand Down
Loading

0 comments on commit f8bb726

Please sign in to comment.