Skip to content

Commit

Permalink
cleanWal by commitLogId
Browse files Browse the repository at this point in the history
  • Loading branch information
liwenhui-soul committed Mar 14, 2022
1 parent ca378d3 commit d165c59
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 6 deletions.
5 changes: 5 additions & 0 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ bool Listener::preProcessLog(LogID logId,
return true;
}

void Listener::cleanWal() {
std::lock_guard<std::mutex> g(raftLock_);
wal()->cleanWAL(lastApplyLogId_);
}

std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Listener::commitLogs(
std::unique_ptr<LogIterator> iter, bool) {
LogID lastId = kNoCommitLogId;
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class Listener : public raftex::RaftPart {
return lastApplyLogId_;
}

void cleanWal() override;

/**
* @brief clean up data in listener, called in RaftPart::reset
*
Expand Down
6 changes: 3 additions & 3 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,16 +1151,16 @@ void NebulaStore::cleanWAL() {
auto& part = partEntry.second;
if (part->needToCleanWal()) {
// clean wal by expired time
part->wal()->cleanWAL();
part->cleanWal();
}
}
}
for (const auto& spaceEntry : spaceListeners_) {
for (const auto& partEntry : spaceEntry.second->listeners_) {
for (const auto& typeEntry : partEntry.second) {
const auto& listener = typeEntry.second;
// clean wal by log id
listener->wal()->cleanWAL(listener->getApplyId());
// clean wal by commit log id
listener->cleanWal();
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ void RaftPart::stop() {
VLOG(1) << idStr_ << "Partition has been stopped";
}

void RaftPart::cleanWal() {
std::lock_guard<std::mutex> g(raftLock_);
wal()->cleanWAL(committedLogId_);
}

nebula::cpp2::ErrorCode RaftPart::canAppendLogs() {
DCHECK(!raftLock_.try_lock());
if (UNLIKELY(status_ != Status::RUNNING)) {
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
return term_;
}

/**
* @brief clean wal that before commitLogId
*/
virtual void cleanWal();

/**
* @brief Return the wal
*/
Expand Down
11 changes: 9 additions & 2 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,23 +672,30 @@ void FileBasedWal::cleanWAL() {

void FileBasedWal::cleanWAL(LogID id) {
std::lock_guard<std::mutex> g(walFilesMutex_);
auto now = time::WallClock::fastNowInSec();
size_t index = 0;
if (walFiles_.empty()) {
return;
}
auto size = walFiles_.size();
if (size < 2) {
return;
}

if (walFiles_.rbegin()->second->lastId() < id) {
VLOG(3) << "Try to clean wal not existed " << id << ", lastWal is "
<< walFiles_.rbegin()->second->lastId();
return;
}

int walTTL = FLAGS_wal_ttl;
// remove wal file that lastId is less than target
auto iter = walFiles_.begin();
while (iter != walFiles_.end()) {
if (iter->second->lastId() < id) {
if (iter->second->lastId() < id && index < size - 2 && (now - iter->second->mtime() > walTTL)) {
VLOG(3) << "Clean wals, Remove " << iter->second->path();
unlink(iter->second->path());
iter = walFiles_.erase(iter);
index++;
} else {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/wal/test/FileBasedWalTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ TEST(FileBasedWal, CleanWalBeforeIdTest) {
CHECK_LE(wal->firstLogId(), logToKeep);
CHECK(!wal->walFiles_.empty());
CHECK_LE(wal->walFiles_.begin()->second->firstId(), logToKeep);
CHECK_GE(wal->walFiles_.begin()->second->lastId(), logToKeep);
CHECK_GE(wal->walFiles_.rbegin()->second->lastId(), logToKeep);
logToKeep += folly::Random::rand64(10);
}
CHECK_EQ(1000, wal->lastLogId());
Expand Down

0 comments on commit d165c59

Please sign in to comment.