Skip to content

Commit

Permalink
Limit log cleanup queue (#204)
Browse files Browse the repository at this point in the history
* Limit log cleanup queue

* Tweaked error handling

* Wait for total cleanup

Co-authored-by: cheme <[email protected]>

* fmt

* Tweaks

---------

Co-authored-by: cheme <[email protected]>
  • Loading branch information
arkpar and cheme authored Apr 24, 2023
1 parent 6867b6e commit 7bd6afc
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jobs:
args: --test loom --features=loom,instrumentation --release --verbose
env:
LOOM_MAX_PREEMPTIONS: 2
LOOM_MAX_BRANCHES: 2000
LOOM_MAX_BRANCHES: 3000

test_aarch64:
runs-on: ubuntu-latest
Expand Down
45 changes: 31 additions & 14 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ const MAX_COMMIT_QUEUE_BYTES: usize = 16 * 1024 * 1024;
const MAX_LOG_QUEUE_BYTES: i64 = 128 * 1024 * 1024;
// Minimum size of log file before it is considered full.
const MIN_LOG_SIZE_BYTES: u64 = 64 * 1024 * 1024;
// Number of log files to keep after flush.
// Number of log files to keep after flush when sync mode is disabled. Give the database some chance
// to recover in case of crash.
const KEEP_LOGS: usize = 16;
// Hard limit on the number of log files in sync mode. The number of log may grow while existing
// logs are waiting on fsync. Commits will be throttled if total number of log files exceeds this
// number.
const MAX_LOG_FILES: usize = 4;

/// Value is just a vector of bytes. Value sizes up to 4Gb are allowed.
pub type Value = Vec<u8>;
Expand Down Expand Up @@ -141,6 +146,7 @@ struct DbInner {
log_queue_wait: WaitCondvar<i64>,
flush_worker_wait: Arc<WaitCondvar<bool>>,
cleanup_worker_wait: WaitCondvar<bool>,
cleanup_queue_wait: WaitCondvar<bool>,
last_enacted: AtomicU64,
next_reindex: AtomicU64,
bg_err: Mutex<Option<Arc<Error>>>,
Expand Down Expand Up @@ -222,6 +228,7 @@ impl DbInner {
log_queue_wait: WaitCondvar::new(),
flush_worker_wait: Arc::new(WaitCondvar::new()),
cleanup_worker_wait: WaitCondvar::new(),
cleanup_queue_wait: WaitCondvar::new(),
next_reindex: AtomicU64::new(1),
last_enacted: AtomicU64::new(last_enacted),
bg_err: Mutex::new(None),
Expand Down Expand Up @@ -595,8 +602,6 @@ impl DbInner {
Ok(next) => next,
Err(e) => {
log::debug!(target: "parity-db", "Error reading log: {:?}", e);
drop(reader);
self.log.clear_replay_logs();
return Ok(false)
},
};
Expand All @@ -619,7 +624,7 @@ impl DbInner {
)
},
) {
log::warn!(target: "parity-db", "Error replaying log: {:?}. Reverting", e);
log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
drop(reader);
self.log.clear_replay_logs();
return Ok(false)
Expand All @@ -636,7 +641,7 @@ impl DbInner {
)
},
) {
log::warn!(target: "parity-db", "Error replaying log: {:?}. Reverting", e);
log::warn!(target: "parity-db", "Error validating log: {:?}.", e);
drop(reader);
self.log.clear_replay_logs();
return Ok(false)
Expand Down Expand Up @@ -718,6 +723,15 @@ impl DbInner {
}
log::debug!(target: "parity-db", "Log queue size: {} bytes", *queue);
}

let max_logs = if self.options.sync_data { MAX_LOG_FILES } else { KEEP_LOGS };
let dirty_logs = self.log.num_dirty_logs();
if !validation_mode {
while self.log.num_dirty_logs() > max_logs {
log::debug!(target: "parity-db", "Waiting for log cleanup. Queued: {}", dirty_logs);
self.cleanup_queue_wait.wait();
}
}
}
Ok(true)
} else {
Expand All @@ -736,16 +750,18 @@ impl DbInner {
fn clean_logs(&self) -> Result<bool> {
let keep_logs = if self.options.sync_data { 0 } else { KEEP_LOGS };
let num_cleanup = self.log.num_dirty_logs();
if num_cleanup > keep_logs {
let result = if num_cleanup > keep_logs {
if self.options.sync_data {
for c in self.columns.iter() {
c.flush()?;
}
}
self.log.clean_logs(num_cleanup - keep_logs)
self.log.clean_logs(num_cleanup - keep_logs)?
} else {
Ok(false)
}
false
};
self.cleanup_queue_wait.signal();
Ok(result)
}

fn clean_all_logs(&self) -> Result<()> {
Expand All @@ -762,6 +778,7 @@ impl DbInner {
log::debug!(target: "parity-db", "Replaying database log {}", id);
while self.enact_logs(true)? {}
}

// Re-read any cached metadata
for c in self.columns.iter() {
c.refresh_metadata()?;
Expand Down Expand Up @@ -907,10 +924,12 @@ impl Db {
// This needs to be call before log thread: so first reindexing
// will run in correct state.
if let Err(e) = db.replay_all_logs() {
log::debug!(target: "parity-db", "Error during log replay, doing log cleanup");
db.log.clean_logs(db.log.num_dirty_logs())?;
db.log.kill_logs()?;
log::debug!(target: "parity-db", "Error during log replay.");
return Err(e)
} else {
db.log.clear_replay_logs();
db.clean_all_logs()?;
db.log.kill_logs()?;
}
let db = Arc::new(db);
#[cfg(any(test, feature = "instrumentation"))]
Expand Down Expand Up @@ -2150,11 +2169,9 @@ mod tests {
fdlimit::raise_fd_limit();
for i in 0..100 {
test_random_inner(60, 60, i);
std::thread::sleep(std::time::Duration::from_millis(30));
}
for i in 0..50 {
test_random_inner(20, 60, i);
std::thread::sleep(std::time::Duration::from_millis(30));
}
}
fn test_random_inner(size: usize, key_size: usize, seed: u64) {
Expand Down

0 comments on commit 7bd6afc

Please sign in to comment.