Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanWal by commitLogId #4015

Merged
merged 3 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ bool Listener::preProcessLog(LogID logId,
return true;
}

void Listener::cleanWal() {
std::lock_guard<std::mutex> g(raftLock_);
wal()->cleanWAL(lastApplyLogId_);
}

std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Listener::commitLogs(
std::unique_ptr<LogIterator> iter, bool) {
LogID lastId = kNoCommitLogId;
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ class Listener : public raftex::RaftPart {
return lastApplyLogId_;
}

/**
* @brief clean wal that before lastApplyLogId_
*/
void cleanWal() override;

/**
* @brief clean up data in listener, called in RaftPart::reset
*
Expand Down
6 changes: 3 additions & 3 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1151,16 +1151,16 @@ void NebulaStore::cleanWAL() {
auto& part = partEntry.second;
if (part->needToCleanWal()) {
// clean wal by expired time
part->wal()->cleanWAL();
part->cleanWal();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe unify all usage, either implement cleanWAL() in Part and Listener, or just call part->wal()->cleanWAL(id).

}
}
}
for (const auto& spaceEntry : spaceListeners_) {
for (const auto& partEntry : spaceEntry.second->listeners_) {
for (const auto& typeEntry : partEntry.second) {
const auto& listener = typeEntry.second;
// clean wal by log id
listener->wal()->cleanWAL(listener->getApplyId());
// clean wal by commit log id
listener->cleanWal();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Listener need to override it, because it use apply id, not commitId

}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ void RaftPart::stop() {
VLOG(1) << idStr_ << "Partition has been stopped";
}

void RaftPart::cleanWal() {
std::lock_guard<std::mutex> g(raftLock_);
wal()->cleanWAL(committedLogId_);
}

nebula::cpp2::ErrorCode RaftPart::canAppendLogs() {
DCHECK(!raftLock_.try_lock());
if (UNLIKELY(status_ != Status::RUNNING)) {
Expand Down
5 changes: 5 additions & 0 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
return term_;
}

/**
* @brief clean wal that before commitLogId
*/
virtual void cleanWal();

/**
* @brief Return the wal
*/
Expand Down
11 changes: 9 additions & 2 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,23 +672,30 @@ void FileBasedWal::cleanWAL() {

void FileBasedWal::cleanWAL(LogID id) {
std::lock_guard<std::mutex> g(walFilesMutex_);
auto now = time::WallClock::fastNowInSec();
size_t index = 0;
if (walFiles_.empty()) {
return;
}
auto size = walFiles_.size();
if (size < 2) {
return;
}

if (walFiles_.rbegin()->second->lastId() < id) {
VLOG(3) << "Try to clean wal not existed " << id << ", lastWal is "
<< walFiles_.rbegin()->second->lastId();
return;
}

int walTTL = FLAGS_wal_ttl;
// remove wal file that lastId is less than target
auto iter = walFiles_.begin();
while (iter != walFiles_.end()) {
if (iter->second->lastId() < id) {
if (iter->second->lastId() < id && index < size - 2 && (now - iter->second->mtime() > walTTL)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do not use walTTL to limit the cleanWAL, what will happen?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then no wal would be deleted

VLOG(3) << "Clean wals, Remove " << iter->second->path();
unlink(iter->second->path());
iter = walFiles_.erase(iter);
index++;
} else {
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/wal/test/FileBasedWalTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ TEST(FileBasedWal, CleanWalBeforeIdTest) {
CHECK_LE(wal->firstLogId(), logToKeep);
CHECK(!wal->walFiles_.empty());
CHECK_LE(wal->walFiles_.begin()->second->firstId(), logToKeep);
CHECK_GE(wal->walFiles_.begin()->second->lastId(), logToKeep);
CHECK_GE(wal->walFiles_.rbegin()->second->lastId(), logToKeep);
logToKeep += folly::Random::rand64(10);
}
CHECK_EQ(1000, wal->lastLogId());
Expand Down