diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 12436ebddee..586dbaf92cf 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -235,6 +235,17 @@ bool Part::commitLogs(std::unique_ptr iter) { return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED; } +bool Part::preProcessLog(LogID logId, + TermID termId, + ClusterID clusterId, + const std::string& log) { + VLOG(3) << idStr_ << "logId " << logId + << ", termId " << termId + << ", clusterId " << clusterId + << ", log " << log; + return true; +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index dec60c60f5f..1a4ee5d7b6e 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -57,6 +57,11 @@ class Part : public raftex::RaftPart { bool commitLogs(std::unique_ptr iter) override; + bool preProcessLog(LogID logId, + TermID termId, + ClusterID clusterId, + const std::string& log) override; + protected: GraphSpaceID spaceId_; PartitionID partId_; diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index d617e185e94..fb7f5954d74 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -208,7 +208,18 @@ RaftPart::RaftPart(ClusterID clusterId, , ioThreadPool_{pool} , workers_{workers} { // TODO Configure the wal policy - wal_ = FileBasedWal::getWal(walRoot, FileBasedWalPolicy(), flusher); + wal_ = FileBasedWal::getWal(walRoot, + FileBasedWalPolicy(), + flusher, + [this] (LogID logId, + TermID logTermId, + ClusterID logClusterId, + const std::string& log) { + return this->preProcessLog(logId, + logTermId, + logClusterId, + log); + }); lastLogId_ = wal_->lastLogId(); term_ = proposedTerm_ = lastLogTerm_ = wal_->lastLogTerm(); } diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index c0fcbf6e697..4d1b0b98e65 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -204,6 +204,10 @@ class RaftPart : public std::enable_shared_from_this { // a batch of log messages virtual bool commitLogs(std::unique_ptr iter) = 0; + virtual bool preProcessLog(LogID logId, + TermID termId, + ClusterID clusterId, + const std::string& log) = 0; private: enum class Status { diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index c4d0c46b4a8..80715d4d103 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -51,6 +51,13 @@ class TestShard : public RaftPart { std::string compareAndSet(const std::string& log) override; bool commitLogs(std::unique_ptr iter) override; + bool preProcessLog(LogID, + TermID, + ClusterID, + const std::string&) override { + return true; + } + size_t getNumLogs() const; bool getLogMsg(LogID id, folly::StringPiece& msg) const; diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index f09a5653eda..3af7b5da1bf 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -24,20 +24,23 @@ using nebula::fs::FileUtils; std::shared_ptr FileBasedWal::getWal( const folly::StringPiece dir, FileBasedWalPolicy policy, - BufferFlusher* flusher) { + BufferFlusher* flusher, + PreProcessor preProcessor) { return std::shared_ptr( - new FileBasedWal(dir, std::move(policy), flusher)); + new FileBasedWal(dir, std::move(policy), flusher, std::move(preProcessor))); } FileBasedWal::FileBasedWal(const folly::StringPiece dir, FileBasedWalPolicy policy, - BufferFlusher* flusher) + BufferFlusher* flusher, + PreProcessor preProcessor) : flusher_(flusher) , dir_(dir.toString()) , policy_(std::move(policy)) , maxFileSize_(policy_.fileSize * 1024L * 1024L) - , maxBufferSize_(policy_.bufferSize * 1024L * 1024L) { + , maxBufferSize_(policy_.bufferSize * 1024L * 1024L) + , preProcessor_(std::move(preProcessor)) { // Make sure WAL directory exist if (FileUtils::fileType(dir_.c_str()) == fs::FileType::NOTEXIST) { FileUtils::makeDir(dir_); @@ -441,6 +444,10 @@ bool FileBasedWal::appendLogInternal(BufferPtr& buffer, << ", and the id being appended is " << id; return false; } + if (!preProcessor_(id, term, cluster, msg)) { + LOG(ERROR) << "Pre process failed for log " << id; + return false; + } if (buffer && (buffer->size() + diff --git a/src/kvstore/wal/FileBasedWal.h b/src/kvstore/wal/FileBasedWal.h index 9e065d8f0cf..9408ee90262 100644 --- a/src/kvstore/wal/FileBasedWal.h +++ b/src/kvstore/wal/FileBasedWal.h @@ -8,6 +8,7 @@ #define WAL_FILEBASEDWAL_H_ #include "base/Base.h" +#include #include "base/Cord.h" #include "kvstore/wal/Wal.h" #include "kvstore/wal/InMemoryLogBuffer.h" @@ -37,7 +38,7 @@ struct FileBasedWalPolicy { class BufferFlusher; - +using PreProcessor = folly::Function; class FileBasedWal final : public Wal , public std::enable_shared_from_this { @@ -46,7 +47,8 @@ class FileBasedWal final static std::shared_ptr getWal( const folly::StringPiece dir, FileBasedWalPolicy policy, - BufferFlusher* flusher); + BufferFlusher* flusher, + PreProcessor preProcessor); virtual ~FileBasedWal(); @@ -126,7 +128,8 @@ class FileBasedWal final // Callers should use static method getWal() instead FileBasedWal(const folly::StringPiece dir, FileBasedWalPolicy policy, - BufferFlusher* flusher); + BufferFlusher* flusher, + PreProcessor preProcessor); // Scan all WAL files void scanAllWalFiles(); @@ -195,6 +198,7 @@ class FileBasedWal final std::condition_variable slotReadyCV_; mutable std::mutex flushMutex_; + PreProcessor preProcessor_; }; } // namespace wal diff --git a/src/kvstore/wal/test/FileBasedWalTest.cpp b/src/kvstore/wal/test/FileBasedWalTest.cpp index d25d193f662..1504bc27e8d 100644 --- a/src/kvstore/wal/test/FileBasedWalTest.cpp +++ b/src/kvstore/wal/test/FileBasedWalTest.cpp @@ -42,7 +42,12 @@ TEST(FileBasedWal, AppendLogs) { TempDir walDir("/tmp/testWal.XXXXXX"); // Create a new WAL, add one log and close it - auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get()); + auto wal = FileBasedWal::getWal(walDir.path(), + policy, + flusher.get(), + [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); EXPECT_EQ(0, wal->lastLogId()); for (int i = 1; i <= 10; i++) { @@ -55,7 +60,12 @@ TEST(FileBasedWal, AppendLogs) { wal.reset(); // Now let's open it to read - wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get()); + wal = FileBasedWal::getWal(walDir.path(), + policy, + flusher.get(), + [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); EXPECT_EQ(10, wal->lastLogId()); auto it = wal->iterator(1, 10); @@ -80,9 +90,13 @@ TEST(FileBasedWal, CacheOverflow) { policy.numBuffers = 2; TempDir walDir("/tmp/testWal.XXXXXX"); - + auto wal = FileBasedWal::getWal(walDir.path(), + policy, + flusher.get(), + [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); // Create a new WAL, add one log and close it - auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get()); EXPECT_EQ(0, wal->lastLogId()); // Append > 10MB logs in total @@ -105,7 +119,13 @@ TEST(FileBasedWal, CacheOverflow) { ASSERT_EQ(11, files.size()); // Now let's open it to read - wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get()); + wal = FileBasedWal::getWal(walDir.path(), + policy, + flusher.get(), + [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); + EXPECT_EQ(10000, wal->lastLogId()); auto it = wal->iterator(1, 10000); @@ -129,9 +149,13 @@ TEST(FileBasedWal, Rollback) { policy.numBuffers = 2; TempDir walDir("/tmp/testWal.XXXXXX"); - // Create a new WAL, add one log and close it - auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get()); + auto wal = FileBasedWal::getWal(walDir.path(), + policy, + flusher.get(), + [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); EXPECT_EQ(0, wal->lastLogId()); // Append > 10MB logs in total @@ -203,9 +227,13 @@ TEST(FileBasedWal, RollbackToFile) { policy.numBuffers = 2; TempDir walDir("/tmp/testWal.XXXXXX"); - // Create a new WAL, add one log and close it - auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get()); + auto wal = FileBasedWal::getWal(walDir.path(), + policy, + flusher.get(), + [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); EXPECT_EQ(0, wal->lastLogId()); // Append > 1MB logs in total for (int i = 1; i <= 1000; i++) { @@ -225,7 +253,12 @@ TEST(FileBasedWal, RollbackToFile) { ASSERT_EQ(2, files.size()); // Now let's open it to read - wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get()); + wal = FileBasedWal::getWal(walDir.path(), + policy, + flusher.get(), + [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); EXPECT_EQ(1000, wal->lastLogId()); // Let's verify the logs @@ -265,9 +298,13 @@ TEST(FileBasedWal, RollbackToMemory) { policy.numBuffers = 2; TempDir walDir("/tmp/testWal.XXXXXX"); - // Create a new WAL, add one log and close it - auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get()); + auto wal = FileBasedWal::getWal(walDir.path(), + policy, + flusher.get(), + [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); EXPECT_EQ(0, wal->lastLogId()); // Append < 1MB logs in total @@ -317,9 +354,13 @@ TEST(FileBasedWal, RollbackToZero) { policy.numBuffers = 2; TempDir walDir("/tmp/testWal.XXXXXX"); - // Create a new WAL, add one log and close it - auto wal = FileBasedWal::getWal(walDir.path(), policy, flusher.get()); + auto wal = FileBasedWal::getWal(walDir.path(), + policy, + flusher.get(), + [](LogID, TermID, ClusterID, const std::string&) { + return true; + }); ASSERT_EQ(0, wal->lastLogId()); // Rollback ASSERT_TRUE(wal->rollbackToLog(0));