Skip to content

Commit

Permalink
feat: init compact scheduler (#1601)
Browse files Browse the repository at this point in the history
## Rationale
Setup the basic structure for compaction

## Detailed Changes


## Test Plan
Old CI, not scheduler has no tests now.
  • Loading branch information
jiacai2050 authored Nov 29, 2024
1 parent e2970b1 commit 6cdf2e6
Show file tree
Hide file tree
Showing 11 changed files with 398 additions and 61 deletions.
7 changes: 7 additions & 0 deletions horaedb/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions horaedb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ macros = { path = "../src/components/macros" }
pb_types = { path = "pb_types" }
prost = { version = "0.13" }
arrow = { version = "53", features = ["prettyprint"] }
bytesize = "1"
arrow-schema = "53"
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
Expand Down
1 change: 1 addition & 0 deletions horaedb/metric_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ arrow-schema = { workspace = true }
async-scoped = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
Expand Down
34 changes: 34 additions & 0 deletions horaedb/metric_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod picker;
mod scheduler;

pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};

use crate::sst::{FileId, SstFile};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Input {
files: Vec<SstFile>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Task {
pub inputs: Vec<FileId>,
pub expireds: Vec<FileId>,
}
34 changes: 34 additions & 0 deletions horaedb/metric_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::time::Duration;

use crate::{compaction::Task, sst::SstFile};

pub struct TimeWindowCompactionStrategy {
segment_duration: Duration,
}

impl TimeWindowCompactionStrategy {
pub fn new(segment_duration: Duration) -> Self {
Self { segment_duration }
}

pub fn pick_candidate(&self, _ssts: Vec<SstFile>) -> Option<Task> {
todo!()
}
}
166 changes: 166 additions & 0 deletions horaedb/metric_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};

use anyhow::Context;
use tokio::{
sync::mpsc::{self, Receiver, Sender},
task::JoinHandle,
time::sleep,
};
use tracing::warn;

use crate::{
compaction::{picker::TimeWindowCompactionStrategy, Task},
manifest::ManifestRef,
sst::SstPathGenerator,
types::{ObjectStoreRef, RuntimeRef},
Result,
};

pub struct Scheduler {
runtime: RuntimeRef,

task_tx: Sender<Task>,
inused_memory: AtomicU64,
task_handle: JoinHandle<()>,
picker_handle: JoinHandle<()>,
}

impl Scheduler {
pub fn new(
runtime: RuntimeRef,
manifest: ManifestRef,
store: ObjectStoreRef,
segment_duration: Duration,
sst_path_gen: Arc<SstPathGenerator>,
config: SchedulerConfig,
) -> Self {
let (task_tx, task_rx) = mpsc::channel(config.max_pending_compaction_tasks);
let task_handle = {
let rt = runtime.clone();
let store = store.clone();
let manifest = manifest.clone();
runtime.spawn(async move {
Self::recv_task_loop(
rt,
task_rx,
store,
manifest,
sst_path_gen,
config.memory_limit,
)
.await;
})
};
let picker_handle = {
let task_tx = task_tx.clone();
let interval = config.schedule_interval;
runtime.spawn(async move {
Self::generate_task_loop(manifest, task_tx, interval, segment_duration).await;
})
};

Self {
runtime,
task_tx,
task_handle,
picker_handle,
inused_memory: AtomicU64::new(0),
}
}

pub fn try_send(&self, task: Task) -> Result<()> {
self.task_tx
.try_send(task)
.context("failed to send task to scheduler")?;

Ok(())
}

async fn recv_task_loop(
rt: RuntimeRef,
mut task_rx: Receiver<Task>,
store: ObjectStoreRef,
manifest: ManifestRef,
_sst_path_gen: Arc<SstPathGenerator>,
_mem_limit: u64,
) {
while let Some(task) = task_rx.recv().await {
let store = store.clone();
let manifest = manifest.clone();
rt.spawn(async move {
let runner = Runner { store, manifest };
if let Err(e) = runner.do_compaction(task).await {
warn!("Do compaction failed, err:{e}");
}
});
}
}

async fn generate_task_loop(
manifest: ManifestRef,
task_tx: Sender<Task>,
schedule_interval: Duration,
segment_duration: Duration,
) {
let compactor = TimeWindowCompactionStrategy::new(segment_duration);
loop {
let ssts = manifest.all_ssts().await;
if let Some(task) = compactor.pick_candidate(ssts) {
if let Err(e) = task_tx.try_send(task) {
warn!("Send task failed, err:{e}");
}
}

sleep(schedule_interval).await;
}
}
}

pub struct SchedulerConfig {
pub schedule_interval: Duration,
pub memory_limit: u64,
pub max_pending_compaction_tasks: usize,
}

impl Default for SchedulerConfig {
fn default() -> Self {
Self {
schedule_interval: Duration::from_secs(30),
memory_limit: bytesize::gb(2_u64),
max_pending_compaction_tasks: 10,
}
}
}

pub struct Runner {
store: ObjectStoreRef,
manifest: ManifestRef,
}

impl Runner {
// TODO: Merge input sst files into one new sst file
// and delete the expired sst files
async fn do_compaction(&self, _task: Task) -> Result<()> {
todo!()
}
}
1 change: 1 addition & 0 deletions horaedb/metric_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Storage Engine for metrics.
#![feature(duration_constructors)]
mod compaction;
pub mod error;
mod macros;
mod manifest;
Expand Down
36 changes: 22 additions & 14 deletions horaedb/metric_engine/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ use tracing::error;

use crate::{
sst::{FileId, FileMeta, SstFile},
types::{ManifestMergeOptions, ObjectStoreRef, TimeRange},
types::{ManifestMergeOptions, ObjectStoreRef, RuntimeRef, TimeRange},
AnyhowError, Error, Result,
};

pub const PREFIX_PATH: &str = "manifest";
pub const SNAPSHOT_FILENAME: &str = "snapshot";
pub const DELTA_PREFIX: &str = "delta";

pub type ManifestRef = Arc<Manifest>;

pub struct Manifest {
delta_dir: Path,
store: ObjectStoreRef,
Expand All @@ -66,7 +68,7 @@ impl Payload {
// efficient
pub fn dedup_files(&mut self) {
let mut seen = HashSet::with_capacity(self.files.len());
self.files.retain(|file| seen.insert(file.id));
self.files.retain(|file| seen.insert(file.id()));
}
}

Expand Down Expand Up @@ -103,8 +105,8 @@ impl Manifest {
runtime: Arc<Runtime>,
merge_options: ManifestMergeOptions,
) -> Result<Self> {
let snapshot_path = Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}"));
let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));

let merger = ManifestMerger::try_new(
snapshot_path.clone(),
Expand Down Expand Up @@ -137,7 +139,7 @@ impl Manifest {
self.merger.maybe_schedule_merge().await?;

let new_sst_path = Path::from(format!("{}/{id}", self.delta_dir));
let new_sst = SstFile { id, meta };
let new_sst = SstFile::new(id, meta);

let new_sst_payload = pb_types::SstFile::from(new_sst.clone());
let mut buf: Vec<u8> = Vec::with_capacity(new_sst_payload.encoded_len());
Expand All @@ -164,13 +166,19 @@ impl Manifest {
Ok(())
}

// TODO: avoid clone
pub async fn all_ssts(&self) -> Vec<SstFile> {
let payload = self.payload.read().await;
payload.files.clone()
}

pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec<SstFile> {
let payload = self.payload.read().await;

payload
.files
.iter()
.filter(move |f| f.meta.time_range.overlaps(time_range))
.filter(move |f| f.meta().time_range.overlaps(time_range))
.cloned()
.collect()
}
Expand All @@ -185,7 +193,7 @@ struct ManifestMerger {
snapshot_path: Path,
delta_dir: Path,
store: ObjectStoreRef,
runtime: Arc<Runtime>,
runtime: RuntimeRef,
sender: Sender<MergeType>,
receiver: RwLock<Receiver<MergeType>>,
deltas_num: AtomicUsize,
Expand Down Expand Up @@ -442,12 +450,12 @@ mod tests {
size: i as u32,
time_range,
};
SstFile { id, meta }
SstFile::new(id, meta)
})
.collect::<Vec<_>>();

expected_ssts.sort_by(|a, b| a.id.cmp(&b.id));
ssts.sort_by(|a, b| a.id.cmp(&b.id));
expected_ssts.sort_by_key(|a| a.id());
ssts.sort_by_key(|a| a.id());
assert_eq!(expected_ssts, ssts);
}

Expand All @@ -458,8 +466,8 @@ mod tests {
.path()
.to_string_lossy()
.to_string();
let snapshot_path = Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}"));
let snapshot_path = Path::from(format!("{root_dir}/{PREFIX_PATH}/{SNAPSHOT_FILENAME}"));
let delta_dir = Path::from(format!("{root_dir}/{PREFIX_PATH}/{DELTA_PREFIX}"));
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
Expand Down Expand Up @@ -497,8 +505,8 @@ mod tests {
let mut mem_ssts = manifest.payload.read().await.files.clone();
let mut ssts = read_snapshot(&store, &snapshot_path).await.unwrap().files;

mem_ssts.sort_by(|a, b| a.id.cmp(&b.id));
ssts.sort_by(|a, b| a.id.cmp(&b.id));
mem_ssts.sort_by_key(|a| a.id());
ssts.sort_by_key(|a| a.id());
assert_eq!(mem_ssts, ssts);

let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
Expand Down
Loading

0 comments on commit 6cdf2e6

Please sign in to comment.