Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add preProcessor for each log before it being writen into WAL #619

Merged
merged 3 commits into from
Jul 16, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,17 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED;
}

bool Part::preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) {
VLOG(3) << idStr_ << "logId " << logId
<< ", termId " << termId
<< ", clusterId " << clusterId
<< ", log " << log;
return true;
}

} // namespace kvstore
} // namespace nebula

5 changes: 5 additions & 0 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class Part : public raftex::RaftPart {

bool commitLogs(std::unique_ptr<LogIterator> iter) override;

bool preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) override;

protected:
GraphSpaceID spaceId_;
PartitionID partId_;
Expand Down
13 changes: 12 additions & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,18 @@ RaftPart::RaftPart(ClusterID clusterId,
, ioThreadPool_{pool}
, workers_{workers} {
// TODO Configure the wal policy
wal_ = FileBasedWal::getWal(walRoot, FileBasedWalPolicy(), flusher);
wal_ = FileBasedWal::getWal(walRoot,
FileBasedWalPolicy(),
flusher,
[this] (LogID logId,
TermID logTermId,
ClusterID logClusterId,
const std::string& log) {
return this->preProcessLog(logId,
logTermId,
logClusterId,
log);
});
lastLogId_ = wal_->lastLogId();
term_ = proposedTerm_ = lastLogTerm_ = wal_->lastLogTerm();
}
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
// a batch of log messages
virtual bool commitLogs(std::unique_ptr<LogIterator> iter) = 0;

virtual bool preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) = 0;

private:
enum class Status {
Expand Down
7 changes: 7 additions & 0 deletions src/kvstore/raftex/test/TestShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ class TestShard : public RaftPart {
std::string compareAndSet(const std::string& log) override;
bool commitLogs(std::unique_ptr<LogIterator> iter) override;

bool preProcessLog(LogID,
TermID,
ClusterID,
const std::string&) override {
return true;
}

size_t getNumLogs() const;
bool getLogMsg(LogID id, folly::StringPiece& msg) const;

Expand Down
15 changes: 11 additions & 4 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@ using nebula::fs::FileUtils;
std::shared_ptr<FileBasedWal> FileBasedWal::getWal(
const folly::StringPiece dir,
FileBasedWalPolicy policy,
BufferFlusher* flusher) {
BufferFlusher* flusher,
PreProcessor preProcessor) {
return std::shared_ptr<FileBasedWal>(
new FileBasedWal(dir, std::move(policy), flusher));
new FileBasedWal(dir, std::move(policy), flusher, std::move(preProcessor)));
}


FileBasedWal::FileBasedWal(const folly::StringPiece dir,
FileBasedWalPolicy policy,
BufferFlusher* flusher)
BufferFlusher* flusher,
PreProcessor preProcessor)
: flusher_(flusher)
, dir_(dir.toString())
, policy_(std::move(policy))
, maxFileSize_(policy_.fileSize * 1024L * 1024L)
, maxBufferSize_(policy_.bufferSize * 1024L * 1024L) {
, maxBufferSize_(policy_.bufferSize * 1024L * 1024L)
, preProcessor_(std::move(preProcessor)) {
// Make sure WAL directory exist
if (FileUtils::fileType(dir_.c_str()) == fs::FileType::NOTEXIST) {
FileUtils::makeDir(dir_);
Expand Down Expand Up @@ -441,6 +444,10 @@ bool FileBasedWal::appendLogInternal(BufferPtr& buffer,
<< ", and the id being appended is " << id;
return false;
}
if (!preProcessor_(id, term, cluster, msg)) {
LOG(ERROR) << "Pre process failed for log " << id;
return false;
}

if (buffer &&
(buffer->size() +
Expand Down
10 changes: 7 additions & 3 deletions src/kvstore/wal/FileBasedWal.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define WAL_FILEBASEDWAL_H_

#include "base/Base.h"
#include <folly/Function.h>
#include "base/Cord.h"
#include "kvstore/wal/Wal.h"
#include "kvstore/wal/InMemoryLogBuffer.h"
Expand Down Expand Up @@ -37,7 +38,7 @@ struct FileBasedWalPolicy {

class BufferFlusher;


using PreProcessor = folly::Function<bool(LogID, TermID, ClusterID, const std::string& log)>;
class FileBasedWal final
: public Wal
, public std::enable_shared_from_this<FileBasedWal> {
Expand All @@ -46,7 +47,8 @@ class FileBasedWal final
static std::shared_ptr<FileBasedWal> getWal(
const folly::StringPiece dir,
FileBasedWalPolicy policy,
BufferFlusher* flusher);
BufferFlusher* flusher,
PreProcessor preProcessor);

virtual ~FileBasedWal();

Expand Down Expand Up @@ -126,7 +128,8 @@ class FileBasedWal final
// Callers should use static method getWal() instead
FileBasedWal(const folly::StringPiece dir,
FileBasedWalPolicy policy,
BufferFlusher* flusher);
BufferFlusher* flusher,
PreProcessor preProcessor);

// Scan all WAL files
void scanAllWalFiles();
Expand Down Expand Up @@ -195,6 +198,7 @@ class FileBasedWal final
std::condition_variable slotReadyCV_;

mutable std::mutex flushMutex_;
PreProcessor preProcessor_;
};

} // namespace wal
Expand Down
69 changes: 55 additions & 14 deletions src/kvstore/wal/test/FileBasedWalTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ TEST(FileBasedWal, AppendLogs) {
TempDir walDir("/tmp/testWal.XXXXXX");

// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(0, wal->lastLogId());

for (int i = 1; i <= 10; i++) {
Expand All @@ -55,7 +60,12 @@ TEST(FileBasedWal, AppendLogs) {
wal.reset();

// Now let's open it to read
wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(10, wal->lastLogId());

auto it = wal->iterator(1, 10);
Expand All @@ -80,9 +90,13 @@ TEST(FileBasedWal, CacheOverflow) {
policy.numBuffers = 2;

TempDir walDir("/tmp/testWal.XXXXXX");

auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
EXPECT_EQ(0, wal->lastLogId());

// Append > 10MB logs in total
Expand All @@ -105,7 +119,13 @@ TEST(FileBasedWal, CacheOverflow) {
ASSERT_EQ(11, files.size());

// Now let's open it to read
wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});

EXPECT_EQ(10000, wal->lastLogId());

auto it = wal->iterator(1, 10000);
Expand All @@ -129,9 +149,13 @@ TEST(FileBasedWal, Rollback) {
policy.numBuffers = 2;

TempDir walDir("/tmp/testWal.XXXXXX");

// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(0, wal->lastLogId());

// Append > 10MB logs in total
Expand Down Expand Up @@ -203,9 +227,13 @@ TEST(FileBasedWal, RollbackToFile) {
policy.numBuffers = 2;

TempDir walDir("/tmp/testWal.XXXXXX");

// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(0, wal->lastLogId());
// Append > 1MB logs in total
for (int i = 1; i <= 1000; i++) {
Expand All @@ -225,7 +253,12 @@ TEST(FileBasedWal, RollbackToFile) {
ASSERT_EQ(2, files.size());

// Now let's open it to read
wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(1000, wal->lastLogId());

// Let's verify the logs
Expand Down Expand Up @@ -265,9 +298,13 @@ TEST(FileBasedWal, RollbackToMemory) {
policy.numBuffers = 2;

TempDir walDir("/tmp/testWal.XXXXXX");

// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(0, wal->lastLogId());

// Append < 1MB logs in total
Expand Down Expand Up @@ -317,9 +354,13 @@ TEST(FileBasedWal, RollbackToZero) {
policy.numBuffers = 2;

TempDir walDir("/tmp/testWal.XXXXXX");

// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
ASSERT_EQ(0, wal->lastLogId());
// Rollback
ASSERT_TRUE(wal->rollbackToLog(0));
Expand Down