Skip to content

Commit

Permalink
Add preProcessor for each log before it being writen into WAL (vesoft…
Browse files Browse the repository at this point in the history
  • Loading branch information
dangleptr authored Jul 16, 2019
1 parent fbdc228 commit 8841cda
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 22 deletions.
11 changes: 11 additions & 0 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,17 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED;
}

bool Part::preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) {
VLOG(3) << idStr_ << "logId " << logId
<< ", termId " << termId
<< ", clusterId " << clusterId
<< ", log " << log;
return true;
}

} // namespace kvstore
} // namespace nebula

5 changes: 5 additions & 0 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class Part : public raftex::RaftPart {

bool commitLogs(std::unique_ptr<LogIterator> iter) override;

bool preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) override;

protected:
GraphSpaceID spaceId_;
PartitionID partId_;
Expand Down
13 changes: 12 additions & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,18 @@ RaftPart::RaftPart(ClusterID clusterId,
, ioThreadPool_{pool}
, workers_{workers} {
// TODO Configure the wal policy
wal_ = FileBasedWal::getWal(walRoot, FileBasedWalPolicy(), flusher);
wal_ = FileBasedWal::getWal(walRoot,
FileBasedWalPolicy(),
flusher,
[this] (LogID logId,
TermID logTermId,
ClusterID logClusterId,
const std::string& log) {
return this->preProcessLog(logId,
logTermId,
logClusterId,
log);
});
lastLogId_ = wal_->lastLogId();
term_ = proposedTerm_ = lastLogTerm_ = wal_->lastLogTerm();
}
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
// a batch of log messages
virtual bool commitLogs(std::unique_ptr<LogIterator> iter) = 0;

virtual bool preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
const std::string& log) = 0;

private:
enum class Status {
Expand Down
7 changes: 7 additions & 0 deletions src/kvstore/raftex/test/TestShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ class TestShard : public RaftPart {
std::string compareAndSet(const std::string& log) override;
bool commitLogs(std::unique_ptr<LogIterator> iter) override;

bool preProcessLog(LogID,
TermID,
ClusterID,
const std::string&) override {
return true;
}

size_t getNumLogs() const;
bool getLogMsg(LogID id, folly::StringPiece& msg) const;

Expand Down
15 changes: 11 additions & 4 deletions src/kvstore/wal/FileBasedWal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@ using nebula::fs::FileUtils;
std::shared_ptr<FileBasedWal> FileBasedWal::getWal(
const folly::StringPiece dir,
FileBasedWalPolicy policy,
BufferFlusher* flusher) {
BufferFlusher* flusher,
PreProcessor preProcessor) {
return std::shared_ptr<FileBasedWal>(
new FileBasedWal(dir, std::move(policy), flusher));
new FileBasedWal(dir, std::move(policy), flusher, std::move(preProcessor)));
}


FileBasedWal::FileBasedWal(const folly::StringPiece dir,
FileBasedWalPolicy policy,
BufferFlusher* flusher)
BufferFlusher* flusher,
PreProcessor preProcessor)
: flusher_(flusher)
, dir_(dir.toString())
, policy_(std::move(policy))
, maxFileSize_(policy_.fileSize * 1024L * 1024L)
, maxBufferSize_(policy_.bufferSize * 1024L * 1024L) {
, maxBufferSize_(policy_.bufferSize * 1024L * 1024L)
, preProcessor_(std::move(preProcessor)) {
// Make sure WAL directory exist
if (FileUtils::fileType(dir_.c_str()) == fs::FileType::NOTEXIST) {
FileUtils::makeDir(dir_);
Expand Down Expand Up @@ -441,6 +444,10 @@ bool FileBasedWal::appendLogInternal(BufferPtr& buffer,
<< ", and the id being appended is " << id;
return false;
}
if (!preProcessor_(id, term, cluster, msg)) {
LOG(ERROR) << "Pre process failed for log " << id;
return false;
}

if (buffer &&
(buffer->size() +
Expand Down
10 changes: 7 additions & 3 deletions src/kvstore/wal/FileBasedWal.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define WAL_FILEBASEDWAL_H_

#include "base/Base.h"
#include <folly/Function.h>
#include "base/Cord.h"
#include "kvstore/wal/Wal.h"
#include "kvstore/wal/InMemoryLogBuffer.h"
Expand Down Expand Up @@ -37,7 +38,7 @@ struct FileBasedWalPolicy {

class BufferFlusher;


using PreProcessor = folly::Function<bool(LogID, TermID, ClusterID, const std::string& log)>;
class FileBasedWal final
: public Wal
, public std::enable_shared_from_this<FileBasedWal> {
Expand All @@ -46,7 +47,8 @@ class FileBasedWal final
static std::shared_ptr<FileBasedWal> getWal(
const folly::StringPiece dir,
FileBasedWalPolicy policy,
BufferFlusher* flusher);
BufferFlusher* flusher,
PreProcessor preProcessor);

virtual ~FileBasedWal();

Expand Down Expand Up @@ -126,7 +128,8 @@ class FileBasedWal final
// Callers should use static method getWal() instead
FileBasedWal(const folly::StringPiece dir,
FileBasedWalPolicy policy,
BufferFlusher* flusher);
BufferFlusher* flusher,
PreProcessor preProcessor);

// Scan all WAL files
void scanAllWalFiles();
Expand Down Expand Up @@ -195,6 +198,7 @@ class FileBasedWal final
std::condition_variable slotReadyCV_;

mutable std::mutex flushMutex_;
PreProcessor preProcessor_;
};

} // namespace wal
Expand Down
69 changes: 55 additions & 14 deletions src/kvstore/wal/test/FileBasedWalTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ TEST(FileBasedWal, AppendLogs) {
TempDir walDir("/tmp/testWal.XXXXXX");

// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(0, wal->lastLogId());

for (int i = 1; i <= 10; i++) {
Expand All @@ -55,7 +60,12 @@ TEST(FileBasedWal, AppendLogs) {
wal.reset();

// Now let's open it to read
wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(10, wal->lastLogId());

auto it = wal->iterator(1, 10);
Expand All @@ -80,9 +90,13 @@ TEST(FileBasedWal, CacheOverflow) {
policy.numBuffers = 2;

TempDir walDir("/tmp/testWal.XXXXXX");

auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
EXPECT_EQ(0, wal->lastLogId());

// Append > 10MB logs in total
Expand All @@ -105,7 +119,13 @@ TEST(FileBasedWal, CacheOverflow) {
ASSERT_EQ(11, files.size());

// Now let's open it to read
wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});

EXPECT_EQ(10000, wal->lastLogId());

auto it = wal->iterator(1, 10000);
Expand All @@ -129,9 +149,13 @@ TEST(FileBasedWal, Rollback) {
policy.numBuffers = 2;

TempDir walDir("/tmp/testWal.XXXXXX");

// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(0, wal->lastLogId());

// Append > 10MB logs in total
Expand Down Expand Up @@ -203,9 +227,13 @@ TEST(FileBasedWal, RollbackToFile) {
policy.numBuffers = 2;

TempDir walDir("/tmp/testWal.XXXXXX");

// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(0, wal->lastLogId());
// Append > 1MB logs in total
for (int i = 1; i <= 1000; i++) {
Expand All @@ -225,7 +253,12 @@ TEST(FileBasedWal, RollbackToFile) {
ASSERT_EQ(2, files.size());

// Now let's open it to read
wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(1000, wal->lastLogId());

// Let's verify the logs
Expand Down Expand Up @@ -265,9 +298,13 @@ TEST(FileBasedWal, RollbackToMemory) {
policy.numBuffers = 2;

TempDir walDir("/tmp/testWal.XXXXXX");

// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
EXPECT_EQ(0, wal->lastLogId());

// Append < 1MB logs in total
Expand Down Expand Up @@ -317,9 +354,13 @@ TEST(FileBasedWal, RollbackToZero) {
policy.numBuffers = 2;

TempDir walDir("/tmp/testWal.XXXXXX");

// Create a new WAL, add one log and close it
auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get());
auto wal = FileBasedWal::getWal(walDir.path(),
policy,
flusher.get(),
[](LogID, TermID, ClusterID, const std::string&) {
return true;
});
ASSERT_EQ(0, wal->lastLogId());
// Rollback
ASSERT_TRUE(wal->rollbackToLog(0));
Expand Down

0 comments on commit 8841cda

Please sign in to comment.