Skip to content

Commit

Permalink
fix: avoid file purge when they are used in queries (apache#699)
Browse files Browse the repository at this point in the history
* fix file purge when query same file

* fix CR
  • Loading branch information
jiacai2050 authored Mar 8, 2023
1 parent 1551007 commit 895516d
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 4 deletions.
5 changes: 5 additions & 0 deletions analytic_engine/src/row_iter/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ impl<'a> Builder<'a> {
request_id: self.config.request_id,
schema: self.config.projected_schema.to_record_schema_with_key(),
streams,
ssts: self.ssts,
next_stream_idx: 0,
inited: false,
metrics: Metrics::new(self.memtables.len(), total_sst_streams),
Expand Down Expand Up @@ -224,6 +225,9 @@ pub struct ChainIterator {
request_id: RequestId,
schema: RecordSchemaWithKey,
streams: Vec<SequencedRecordBatchStream>,
/// ssts are kept here to avoid them from being purged.
#[allow(dead_code)]
ssts: Vec<Vec<FileHandle>>,
/// The range of the index is [0, streams.len()] and the iterator is
/// exhausted if it reaches `streams.len()`.
next_stream_idx: usize,
Expand Down Expand Up @@ -322,6 +326,7 @@ mod tests {
request_id: RequestId::next_id(),
schema: schema.to_record_schema_with_key(),
streams,
ssts: Vec::new(),
next_stream_idx: 0,
inited: false,
metrics: Metrics::new(0, 0),
Expand Down
9 changes: 9 additions & 0 deletions analytic_engine/src/row_iter/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl<'a> MergeBuilder<'a> {
// Use the schema after projection as the schema of the merge iterator.
self.config.projected_schema.to_record_schema_with_key(),
streams,
self.ssts,
self.config.merge_iter_options,
self.config.reverse,
Metrics::new(self.memtables.len(), sst_streams_num, sst_ids),
Expand Down Expand Up @@ -619,6 +620,9 @@ pub struct MergeIterator {
schema: RecordSchemaWithKey,
record_batch_builder: RecordBatchWithKeyBuilder,
origin_streams: Vec<SequencedRecordBatchStream>,
/// ssts are kept here to avoid them from being purged.
#[allow(dead_code)]
ssts: Vec<Vec<FileHandle>>,
/// Any [BufferedStream] in the hot heap is not empty.
hot: BinaryHeap<HeapBufferedStream>,
/// Any [BufferedStream] in the cold heap is not empty.
Expand All @@ -629,11 +633,13 @@ pub struct MergeIterator {
}

impl MergeIterator {
#[allow(clippy::too_many_arguments)]
pub fn new(
table_id: TableId,
request_id: RequestId,
schema: RecordSchemaWithKey,
streams: Vec<SequencedRecordBatchStream>,
ssts: Vec<Vec<FileHandle>>,
iter_options: IterOptions,
reverse: bool,
metrics: Metrics,
Expand All @@ -646,6 +652,7 @@ impl MergeIterator {
request_id,
inited: false,
schema,
ssts,
record_batch_builder,
origin_streams: streams,
hot: BinaryHeap::with_capacity(heap_cap),
Expand Down Expand Up @@ -914,6 +921,7 @@ mod tests {
RequestId::next_id(),
schema.to_record_schema_with_key(),
streams,
Vec::new(),
IterOptions::default(),
false,
Metrics::new(1, 1, vec![]),
Expand Down Expand Up @@ -966,6 +974,7 @@ mod tests {
RequestId::next_id(),
schema.to_record_schema_with_key(),
streams,
Vec::new(),
IterOptions::default(),
true,
Metrics::new(1, 1, vec![]),
Expand Down
5 changes: 3 additions & 2 deletions analytic_engine/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_util::{
metric::Meter,
runtime::{JoinHandle, Runtime},
};
use log::{debug, error, info};
use log::{error, info, warn};
use object_store::ObjectStoreRef;
use snafu::{ResultExt, Snafu};
use table_engine::table::TableId;
Expand Down Expand Up @@ -249,7 +249,7 @@ struct FileHandleInner {

impl Drop for FileHandleInner {
fn drop(&mut self) {
debug!("FileHandle is dropped, meta:{:?}", self.meta);
info!("FileHandle is dropped, meta:{:?}", self.meta);

// Push file cannot block or be async because we are in drop().
self.purge_queue.push_file(self.meta.id);
Expand Down Expand Up @@ -426,6 +426,7 @@ impl FilePurgeQueue {

fn push_file(&self, file_id: FileId) {
if self.inner.closed.load(Ordering::SeqCst) {
warn!("Purger closed, ignore file_id:{file_id}");
return;
}

Expand Down
2 changes: 0 additions & 2 deletions analytic_engine/src/tests/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ use crate::{
};

#[test]
#[ignore = "https://github.com/CeresDB/ceresdb/issues/427"]
fn test_table_compact_current_segment_rocks() {
let rocksdb_ctx = RocksDBEngineContext::default();
test_table_compact_current_segment(rocksdb_ctx);
}

#[test]
#[ignore = "https://github.com/CeresDB/ceresdb/issues/427"]
fn test_table_compact_current_segment_mem_wal() {
let memory_ctx = MemoryEngineContext::default();
test_table_compact_current_segment(memory_ctx);
Expand Down

0 comments on commit 895516d

Please sign in to comment.