Skip to content

Commit

Permalink
feat: manifest support delete (apache#1610)
Browse files Browse the repository at this point in the history
## Rationale
When compact finished, we need to delete the old input sst and expired
sst.

## Detailed Changes
- The delta file use `ManifestUpdate` struct.
- Refactor compact scheduler, to make it more modular.
## Test Plan
CI
  • Loading branch information
jiacai2050 authored Dec 18, 2024
1 parent 1331e0a commit 03b1df9
Show file tree
Hide file tree
Showing 13 changed files with 582 additions and 530 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ jobs:
- name: Install check binaries
run: |
cargo install --git https://github.com/DevinR528/cargo-sort --rev 55ec890 --locked
- uses: Swatinem/rust-cache@v2
- name: Run Style Check
run: |
make fmt sort clippy
Expand All @@ -80,15 +81,14 @@ jobs:
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Release Disk Quota
run: |
sudo make ensure-disk-quota
- name: Setup Build Environment
run: |
sudo apt update
sudo apt install --yes protobuf-compiler
- uses: Swatinem/rust-cache@v2
- name: Run Unit Tests
run: |
make test
Expand Down
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ udeps:
cd $(DIR); cargo udeps --all-targets --all-features --workspace

clippy:
cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D warnings -D clippy::dbg-macro \
-A dead_code -A unused_variables -A clippy::unreachable -A clippy::too_many_arguments # Remove these once we have a clean build
cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D warnings -D clippy::dbg-macro -A clippy::too-many-arguments

ensure-disk-quota:
bash ./scripts/free-disk-space.sh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};

use anyhow::Context;
use arrow::array::{RecordBatch, UInt64Array};
Expand All @@ -31,70 +34,137 @@ use tracing::error;

use crate::{
compaction::Task,
manifest::ManifestRef,
ensure,
manifest::{ManifestRef, ManifestUpdate},
read::ParquetReader,
sst::{allocate_id, FileMeta, SstPathGenerator},
types::{ObjectStoreRef, StorageSchema},
sst::{allocate_id, FileMeta, SstFile, SstPathGenerator},
types::{ObjectStoreRef, RuntimeRef, StorageSchema},
Result,
};

#[derive(Clone)]
pub struct Runner {
pub struct Executor {
inner: Arc<Inner>,
}

struct Inner {
runtime: RuntimeRef,
store: ObjectStoreRef,
schema: StorageSchema,
manifest: ManifestRef,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
inused_memory: AtomicU64,
mem_limit: u64,
}

impl Runner {
impl Executor {
pub fn new(
runtime: RuntimeRef,
store: ObjectStoreRef,
schema: StorageSchema,
manifest: ManifestRef,
sst_path_gen: Arc<SstPathGenerator>,
parquet_reader: Arc<ParquetReader>,
write_props: WriterProperties,
mem_limit: u64,
) -> Self {
Self {
let inner = Inner {
runtime,
store,
schema,
manifest,
sst_path_gen,
parquet_reader,
write_props,
mem_limit,
inused_memory: AtomicU64::new(0),
};
Self {
inner: Arc::new(inner),
}
}

// TODO: Merge input sst files into one new sst file
// and delete the expired sst files
pub async fn do_compaction(&self, task: Task) -> Result<()> {
fn pre_check(&self, task: &Task) -> Result<()> {
assert!(!task.inputs.is_empty());
for f in &task.inputs {
assert!(f.is_compaction());
}
for f in &task.expireds {
assert!(f.is_compaction());
}

let task_size = task.input_size();
let inused = self.inner.inused_memory.load(Ordering::Relaxed);
let mem_limit = self.inner.mem_limit;
ensure!(
inused + task_size > mem_limit,
"Compaction memory usage too high, inused:{inused}, task_size:{task_size}, limit:{mem_limit}"
);

self.inner
.inused_memory
.fetch_add(task.input_size(), Ordering::Relaxed);
Ok(())
}

pub fn on_success(&self, task: &Task) {
let task_size = task.input_size();
self.inner
.inused_memory
.fetch_add(task_size, Ordering::Relaxed);
}

pub fn on_failure(&self, task: &Task) {
let task_size = task.input_size();
self.inner
.inused_memory
.fetch_sub(task_size, Ordering::Relaxed);

// When task execution fails, unmark sst so they can be
// reschduled.
for sst in &task.inputs {
sst.unmark_compaction();
}
for sst in &task.expireds {
sst.unmark_compaction();
}
}

pub fn submit(&self, task: Task) {
let runnable = Runnable {
executor: self.clone(),
task,
};
runnable.run()
}

// TODO: Merge input sst files into one new sst file
// and delete the expired sst files
pub async fn do_compaction(&self, task: &Task) -> Result<()> {
self.pre_check(task)?;

let mut time_range = task.inputs[0].meta().time_range.clone();
for f in &task.inputs[1..] {
time_range.merge(&f.meta().time_range);
}
let plan = self
.parquet_reader
.build_df_plan(task.inputs.clone(), None, Vec::new())?;
let plan =
self.inner
.parquet_reader
.build_df_plan(task.inputs.clone(), None, Vec::new())?;
let mut stream = execute_stream(plan, Arc::new(TaskContext::default()))
.context("execute datafusion plan")?;

let file_id = allocate_id();
let file_path = self.sst_path_gen.generate(file_id);
let file_path = self.inner.sst_path_gen.generate(file_id);
let file_path = Path::from(file_path);
let object_store_writer = ParquetObjectWriter::new(self.store.clone(), file_path.clone());
let object_store_writer =
ParquetObjectWriter::new(self.inner.store.clone(), file_path.clone());
let mut writer = AsyncArrowWriter::try_new(
object_store_writer,
self.schema.arrow_schema.clone(),
Some(self.write_props.clone()),
self.inner.schema.arrow_schema.clone(),
Some(self.inner.write_props.clone()),
)
.context("create arrow writer")?;
let mut num_rows = 0;
Expand All @@ -107,14 +177,15 @@ impl Runner {
// Since file_id in increasing order, we can use it as sequence.
let seq_column = Arc::new(UInt64Array::from(vec![file_id; batch.num_rows()]));
new_cols.push(seq_column);
RecordBatch::try_new(self.schema.arrow_schema.clone(), new_cols)
RecordBatch::try_new(self.inner.schema.arrow_schema.clone(), new_cols)
.context("construct record batch with seq column")?
};

writer.write(&batch_with_seq).await.context("write batch")?;
}
writer.close().await.context("close writer")?;
let object_meta = self
.inner
.store
.head(&file_path)
.await
Expand All @@ -126,28 +197,37 @@ impl Runner {
time_range: time_range.clone(),
};
// First add new sst to manifest, then delete expired/old sst
self.manifest.add_file(file_id, file_meta).await?;
self.manifest
.add_tombstone_files(task.expireds.clone())
.await?;
self.manifest
.add_tombstone_files(task.inputs.clone())
let to_adds = vec![SstFile::new(file_id, file_meta)];
let to_deletes = task
.expireds
.iter()
.map(|f| f.id())
.chain(task.inputs.iter().map(|f| f.id()))
.collect();
self.inner
.manifest
.update(ManifestUpdate::new(to_adds, to_deletes))
.await?;

// From now on, no error should be returned!
// Because we have already updated manifest.

let (_, results) = TokioScope::scope_and_block(|scope| {
for file in task.expireds {
let path = Path::from(self.sst_path_gen.generate(file.id()));
for file in &task.expireds {
let path = Path::from(self.inner.sst_path_gen.generate(file.id()));
scope.spawn(async move {
self.store
self.inner
.store
.delete(&path)
.await
.with_context(|| format!("failed to delete file, path:{path}"))
});
}
for file in task.inputs {
let path = Path::from(self.sst_path_gen.generate(file.id()));
for file in &task.inputs {
let path = Path::from(self.inner.sst_path_gen.generate(file.id()));
scope.spawn(async move {
self.store
self.inner
.store
.delete(&path)
.await
.with_context(|| format!("failed to delete file, path:{path}"))
Expand All @@ -170,3 +250,22 @@ impl Runner {
Ok(())
}
}

pub struct Runnable {
executor: Executor,
task: Task,
}

impl Runnable {
fn run(self) {
let rt = self.executor.inner.runtime.clone();
rt.spawn(async move {
if let Err(e) = self.executor.do_compaction(&self.task).await {
error!("Do compaction failed, err:{e}");
self.executor.on_failure(&self.task);
} else {
self.executor.on_success(&self.task);
}
});
}
}
13 changes: 7 additions & 6 deletions src/metric_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
// specific language governing permissions and limitations
// under the License.

mod executor;
mod picker;
mod runner;
mod scheduler;

pub use scheduler::{Scheduler as CompactionScheduler, SchedulerConfig};

use crate::sst::SstFile;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Input {
files: Vec<SstFile>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Task {
pub inputs: Vec<SstFile>,
pub expireds: Vec<SstFile>,
}

impl Task {
pub fn input_size(&self) -> u64 {
self.inputs.iter().map(|f| f.size() as u64).sum()
}
}
Loading

0 comments on commit 03b1df9

Please sign in to comment.