From c395cdd698846597b0368b1d68ac1e683d577527 Mon Sep 17 00:00:00 2001 From: liwenhui-soul <38217397+liwenhui-soul@users.noreply.github.com> Date: Fri, 11 Mar 2022 18:57:15 +0800 Subject: [PATCH] cleanWal by commitLogId --- src/kvstore/Listener.cpp | 5 +++++ src/kvstore/Listener.h | 5 +++++ src/kvstore/NebulaStore.cpp | 6 +++--- src/kvstore/raftex/RaftPart.cpp | 5 +++++ src/kvstore/raftex/RaftPart.h | 5 +++++ src/kvstore/wal/FileBasedWal.cpp | 11 +++++++++-- src/kvstore/wal/test/FileBasedWalTest.cpp | 2 +- 7 files changed, 33 insertions(+), 6 deletions(-) diff --git a/src/kvstore/Listener.cpp b/src/kvstore/Listener.cpp index 17f5c5a7602..a73864bb8ca 100644 --- a/src/kvstore/Listener.cpp +++ b/src/kvstore/Listener.cpp @@ -123,6 +123,11 @@ bool Listener::preProcessLog(LogID logId, return true; } +void Listener::cleanWal() { + std::lock_guard g(raftLock_); + wal()->cleanWAL(lastApplyLogId_); +} + std::tuple Listener::commitLogs( std::unique_ptr iter, bool) { LogID lastId = kNoCommitLogId; diff --git a/src/kvstore/Listener.h b/src/kvstore/Listener.h index cd423bfac8b..ff2a376de0a 100644 --- a/src/kvstore/Listener.h +++ b/src/kvstore/Listener.h @@ -138,6 +138,11 @@ class Listener : public raftex::RaftPart { return lastApplyLogId_; } + /** + * @brief clean wal that before lastApplyLogId_ + */ + void cleanWal() override; + /** * @brief clean up data in listener, called in RaftPart::reset * diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 957d462f8ad..278d9b9b6c6 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -1151,7 +1151,7 @@ void NebulaStore::cleanWAL() { auto& part = partEntry.second; if (part->needToCleanWal()) { // clean wal by expired time - part->wal()->cleanWAL(); + part->cleanWal(); } } } @@ -1159,8 +1159,8 @@ void NebulaStore::cleanWAL() { for (const auto& partEntry : spaceEntry.second->listeners_) { for (const auto& typeEntry : partEntry.second) { const auto& listener = typeEntry.second; - // clean wal by log id - listener->wal()->cleanWAL(listener->getApplyId()); + // clean wal by commit log id + listener->cleanWal(); } } } diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index d2eb5ac2e51..9c93bb96b09 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -350,6 +350,11 @@ void RaftPart::stop() { VLOG(1) << idStr_ << "Partition has been stopped"; } +void RaftPart::cleanWal() { + std::lock_guard g(raftLock_); + wal()->cleanWAL(committedLogId_); +} + nebula::cpp2::ErrorCode RaftPart::canAppendLogs() { DCHECK(!raftLock_.try_lock()); if (UNLIKELY(status_ != Status::RUNNING)) { diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 156724ef49c..b3779ab8969 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -160,6 +160,11 @@ class RaftPart : public std::enable_shared_from_this { return term_; } + /** + * @brief clean wal that before commitLogId + */ + virtual void cleanWal(); + /** * @brief Return the wal */ diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index a10e5f4f7be..85f7f4b6111 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -672,23 +672,30 @@ void FileBasedWal::cleanWAL() { void FileBasedWal::cleanWAL(LogID id) { std::lock_guard g(walFilesMutex_); + auto now = time::WallClock::fastNowInSec(); + size_t index = 0; if (walFiles_.empty()) { return; } + auto size = walFiles_.size(); + if (size < 2) { + return; + } if (walFiles_.rbegin()->second->lastId() < id) { VLOG(3) << "Try to clean wal not existed " << id << ", lastWal is " << walFiles_.rbegin()->second->lastId(); return; } - + int walTTL = FLAGS_wal_ttl; // remove wal file that lastId is less than target auto iter = walFiles_.begin(); while (iter != walFiles_.end()) { - if (iter->second->lastId() < id) { + if (iter->second->lastId() < id && index < size - 2 && (now - iter->second->mtime() > walTTL)) { VLOG(3) << "Clean wals, Remove " << iter->second->path(); unlink(iter->second->path()); iter = walFiles_.erase(iter); + index++; } else { break; } diff --git a/src/kvstore/wal/test/FileBasedWalTest.cpp b/src/kvstore/wal/test/FileBasedWalTest.cpp index a2a6a8a45f5..a51589eecb5 100644 --- a/src/kvstore/wal/test/FileBasedWalTest.cpp +++ b/src/kvstore/wal/test/FileBasedWalTest.cpp @@ -558,7 +558,7 @@ TEST(FileBasedWal, CleanWalBeforeIdTest) { CHECK_LE(wal->firstLogId(), logToKeep); CHECK(!wal->walFiles_.empty()); CHECK_LE(wal->walFiles_.begin()->second->firstId(), logToKeep); - CHECK_GE(wal->walFiles_.begin()->second->lastId(), logToKeep); + CHECK_GE(wal->walFiles_.rbegin()->second->lastId(), logToKeep); logToKeep += folly::Random::rand64(10); } CHECK_EQ(1000, wal->lastLogId());