diff --git a/src/common/base/NebulaKeyUtils.cpp b/src/common/base/NebulaKeyUtils.cpp index 3c218bd708f..6c3035e299d 100644 --- a/src/common/base/NebulaKeyUtils.cpp +++ b/src/common/base/NebulaKeyUtils.cpp @@ -38,6 +38,14 @@ std::string NebulaKeyUtils::edgeKey(PartitionID partId, return key; } +// static +std::string NebulaKeyUtils::prefix(PartitionID partId) { + std::string key; + key.reserve(sizeof(PartitionID)); + key.append(reinterpret_cast(&partId), sizeof(PartitionID)); + return key; +} + // static std::string NebulaKeyUtils::prefix(PartitionID partId, VertexID srcId, EdgeType type) { std::string key; diff --git a/src/common/base/NebulaKeyUtils.h b/src/common/base/NebulaKeyUtils.h index 76a2501a8f2..ceda9d3d8d0 100644 --- a/src/common/base/NebulaKeyUtils.h +++ b/src/common/base/NebulaKeyUtils.h @@ -52,6 +52,8 @@ class NebulaKeyUtils final { static std::string prefix(PartitionID partId, VertexID src, EdgeType type, EdgeRanking ranking, VertexID dst); + static std::string prefix(PartitionID partId); + static bool isVertex(const folly::StringPiece& rawKey) { return rawKey.size() == kVertexLen; } diff --git a/src/interface/raftex.thrift b/src/interface/raftex.thrift index 987fdd30b27..be20f364250 100644 --- a/src/interface/raftex.thrift +++ b/src/interface/raftex.thrift @@ -16,7 +16,7 @@ enum ErrorCode { E_LOG_GAP = -1; E_LOG_STALE = -2; E_MISSING_COMMIT = -3; - E_PULLING_SNAPSHOT = -4; // The follower is pulling a snapshot + E_WAITING_SNAPSHOT = -4; // The follower is waiting a snapshot E_UNKNOWN_PART = -5; E_TERM_OUT_OF_DATE = -6; @@ -31,6 +31,7 @@ enum ErrorCode { E_NOT_A_LEADER = -13; E_HOST_DISCONNECTED = -14; E_TOO_MANY_REQUESTS = -15; + E_PERSIST_SNAPSHOT_FAILED = -16; E_EXCEPTION = -20; // An thrift internal exception was thrown } @@ -94,8 +95,6 @@ struct AppendLogRequest { // // Fields 10 to 11 are used for LogAppend. // - // In the case of heartbeat, the log_str_list will be empty, - // and log_term == 0 // // In the case of LogAppend, the id of the first log is the // last_log_id_sent + 1 @@ -106,7 +105,7 @@ struct AppendLogRequest { 10: TermID log_term; 11: list log_str_list; - 12: optional binary snapshot_uri; // Snapshot URL + 12: bool sending_snapshot; } @@ -118,13 +117,30 @@ struct AppendLogResponse { 5: LogID committed_log_id; 6: LogID last_log_id; 7: TermID last_log_term; - 8: bool pulling_snapshot; } +struct SendSnapshotRequest { + 1: GraphSpaceID space; + 2: PartitionID part; + 3: TermID term; + 4: LogID committed_log_id; + 5: TermID committed_log_term; + 6: IPv4 leader_ip; + 7: Port leader_port; + 8: list rows; + 9: i64 total_size; + 10: i64 total_count; + 11: bool done; +} + +struct SendSnapshotResponse { + 1: ErrorCode error_code; +} service RaftexService { AskForVoteResponse askForVote(1: AskForVoteRequest req); AppendLogResponse appendLog(1: AppendLogRequest req); + SendSnapshotResponse sendSnapshot(1: SendSnapshotRequest req); } diff --git a/src/kvstore/CMakeLists.txt b/src/kvstore/CMakeLists.txt index bc6453dd77d..f8fdd8e73bc 100644 --- a/src/kvstore/CMakeLists.txt +++ b/src/kvstore/CMakeLists.txt @@ -6,6 +6,7 @@ nebula_add_library( NebulaStore.cpp RocksEngineConfig.cpp LogEncoder.cpp + SnapshotManagerImpl.cpp ) add_subdirectory(raftex) diff --git a/src/kvstore/LogEncoder.cpp b/src/kvstore/LogEncoder.cpp index 450f5f55257..9b5b603bf35 100644 --- a/src/kvstore/LogEncoder.cpp +++ b/src/kvstore/LogEncoder.cpp @@ -13,6 +13,26 @@ namespace kvstore { constexpr auto kHeadLen = sizeof(int64_t) + 1 + sizeof(uint32_t); +std::string encodeKV(const folly::StringPiece& key, + const folly::StringPiece& val) { + uint32_t ksize = key.size(); + uint32_t vsize = val.size(); + std::string str; + str.reserve(sizeof(uint32_t) * 2 + ksize + vsize); + str.append(reinterpret_cast(&ksize), sizeof(ksize)); + str.append(reinterpret_cast(&vsize), sizeof(vsize)); + str.append(key.data(), ksize); + str.append(key.data(), vsize); + return str; +} + +std::pair decodeKV(const std::string& data) { + auto ksize = *reinterpret_cast(data.data()); + auto vsize = *reinterpret_cast(data.data() + sizeof(ksize)); + auto key = folly::StringPiece(data.data() + sizeof(ksize) + sizeof(vsize), ksize); + auto val = folly::StringPiece(data.data() + sizeof(ksize) + sizeof(vsize) + ksize, vsize); + return std::make_pair(key, val); +} std::string encodeSingleValue(LogType type, folly::StringPiece val) { std::string encoded; diff --git a/src/kvstore/LogEncoder.h b/src/kvstore/LogEncoder.h index 4d656770b2c..d592e5705f1 100644 --- a/src/kvstore/LogEncoder.h +++ b/src/kvstore/LogEncoder.h @@ -22,6 +22,10 @@ enum LogType : char { OP_ADD_LEARNER = 0x07, }; +std::string encodeKV(const folly::StringPiece& key, + const folly::StringPiece& val); + +std::pair decodeKV(const std::string& data); std::string encodeSingleValue(LogType type, folly::StringPiece val); folly::StringPiece decodeSingleValue(folly::StringPiece encoded); diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index fca74882f82..e94c0fa9c2f 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -12,6 +12,7 @@ #include "network/NetworkUtils.h" #include "fs/FileUtils.h" #include "kvstore/RocksEngine.h" +#include "kvstore/SnapshotManagerImpl.h" DEFINE_string(engine_type, "rocksdb", "rocksdb, memory..."); DEFINE_int32(custom_filter_interval_secs, 24 * 3600, "interval to trigger custom compaction"); @@ -36,7 +37,8 @@ NebulaStore::~NebulaStore() { bool NebulaStore::init() { LOG(INFO) << "Start the raft service..."; bgWorkers_ = std::make_shared(); - bgWorkers_->start(FLAGS_num_workers); + bgWorkers_->start(FLAGS_num_workers, "nebula-bgworkers"); + snapshot_.reset(new SnapshotManagerImpl(this)); raftService_ = raftex::RaftexService::createService(ioPool_, workers_, raftAddr_.second); @@ -208,7 +210,8 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, ioPool_, bgWorkers_, flusher_.get(), - workers_); + workers_, + snapshot_); auto partMeta = options_.partMan_->partMeta(spaceId, partId); std::vector peers; for (auto& h : partMeta.peers_) { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 478e7510067..96c0d0517f8 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -16,6 +16,7 @@ #include "kvstore/PartManager.h" #include "kvstore/Part.h" #include "kvstore/KVEngine.h" +#include "kvstore/raftex/SnapshotManager.h" namespace nebula { namespace kvstore { @@ -199,6 +200,7 @@ class NebulaStore : public KVStore, public Handler { std::shared_ptr raftService_; std::unique_ptr flusher_; + std::shared_ptr snapshot_; }; } // namespace kvstore diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 74eb2bb0d9d..3c1e3f2be3f 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -40,7 +40,8 @@ Part::Part(GraphSpaceID spaceId, std::shared_ptr ioPool, std::shared_ptr workers, wal::BufferFlusher* flusher, - std::shared_ptr handlers) + std::shared_ptr handlers, + std::shared_ptr snapshotMan) : RaftPart(FLAGS_cluster_id, spaceId, partId, @@ -49,7 +50,8 @@ Part::Part(GraphSpaceID spaceId, flusher, ioPool, workers, - handlers) + handlers, + snapshotMan) , spaceId_(spaceId) , partId_(partId) , walPath_(walPath) @@ -253,6 +255,30 @@ bool Part::commitLogs(std::unique_ptr iter) { return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED; } +std::pair Part::commitSnapshot(const std::vector& rows, + LogID committedLogId, + TermID committedLogTerm, + bool finished) { + auto batch = engine_->startBatchWrite(); + int64_t count = 0; + int64_t size = 0; + for (auto& row : rows) { + count++; + size += row.size(); + auto kv = decodeKV(row); + CHECK_EQ(ResultCode::SUCCEEDED, batch->put(kv.first, kv.second)); + } + if (finished) { + std::string commitMsg; + commitMsg.reserve(sizeof(LogID) + sizeof(TermID)); + commitMsg.append(reinterpret_cast(&committedLogId), sizeof(LogID)); + commitMsg.append(reinterpret_cast(&committedLogTerm), sizeof(TermID)); + batch->put(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_), commitMsg); + } + CHECK_EQ(ResultCode::SUCCEEDED, engine_->commitBatchWrite(std::move(batch))); + return std::make_pair(count, size); +} + bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index a577ea2202f..2ae533e961e 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -11,11 +11,13 @@ #include "raftex/RaftPart.h" #include "kvstore/Common.h" #include "kvstore/KVEngine.h" +#include "kvstore/raftex/SnapshotManager.h" namespace nebula { namespace kvstore { class Part : public raftex::RaftPart { + friend class SnapshotManager; public: Part(GraphSpaceID spaceId, PartitionID partId, @@ -25,7 +27,8 @@ class Part : public raftex::RaftPart { std::shared_ptr pool, std::shared_ptr workers, wal::BufferFlusher* flusher, - std::shared_ptr handlers); + std::shared_ptr handlers, + std::shared_ptr snapshotMan); virtual ~Part() { @@ -65,6 +68,15 @@ class Part : public raftex::RaftPart { ClusterID clusterId, const std::string& log) override; + std::pair commitSnapshot(const std::vector& data, + LogID committedLogId, + TermID committedLogTerm, + bool finished) override; + + void cleanup() override { + LOG(INFO) << idStr_ << "Clean up all data, not implement!"; + } + protected: GraphSpaceID spaceId_; PartitionID partId_; diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index a843d21ded9..92a7686f051 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -32,7 +32,7 @@ class RocksWriteBatch : public WriteBatch { rocksdb::DB* db_{nullptr}; public: - explicit RocksWriteBatch(rocksdb::DB* db) : db_(db) {} + explicit RocksWriteBatch(rocksdb::DB* db) : batch_(4096), db_(db) {} virtual ~RocksWriteBatch() = default; diff --git a/src/kvstore/SnapshotManagerImpl.cpp b/src/kvstore/SnapshotManagerImpl.cpp new file mode 100644 index 00000000000..33d80a1cc22 --- /dev/null +++ b/src/kvstore/SnapshotManagerImpl.cpp @@ -0,0 +1,48 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ +#include "kvstore/SnapshotManagerImpl.h" +#include "base/NebulaKeyUtils.h" +#include "kvstore/LogEncoder.h" + +DEFINE_int32(snapshot_batch_size, 1024 * 1024 * 10, "batch size for snapshot"); + +namespace nebula { +namespace kvstore { + +void SnapshotManagerImpl::accessAllRowsInSnapshot(GraphSpaceID spaceId, + PartitionID partId, + raftex::SnapshotCallback cb) { + CHECK_NOTNULL(store_); + std::unique_ptr iter; + auto prefix = NebulaKeyUtils::prefix(partId); + store_->prefix(spaceId, partId, prefix, &iter); + std::vector data; + data.reserve(1024); + int32_t batchSize = 0; + int64_t totalSize = 0; + int64_t totalCount = 0; + while (iter->valid()) { + if (batchSize >= FLAGS_snapshot_batch_size) { + cb(std::move(data), totalCount, totalSize, false); + data.clear(); + batchSize = 0; + } + auto key = iter->key(); + auto val = iter->val(); + data.emplace_back(encodeKV(key, val)); + batchSize += data.back().size(); + totalSize += data.back().size(); + totalCount++; + iter->next(); + } + if (data.size() > 0) { + cb(std::move(data), totalCount, totalSize, true); + } +} +} // namespace kvstore +} // namespace nebula + + diff --git a/src/kvstore/SnapshotManagerImpl.h b/src/kvstore/SnapshotManagerImpl.h new file mode 100644 index 00000000000..177c1c70baa --- /dev/null +++ b/src/kvstore/SnapshotManagerImpl.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef KVSTORE_SNAPSHOTMANAGERIMPL_H_ +#define KVSTORE_SNAPSHOTMANAGERIMPL_H_ + +#include "base/Base.h" +#include "kvstore/raftex/SnapshotManager.h" +#include "kvstore/KVStore.h" + +namespace nebula { +namespace kvstore { + +class SnapshotManagerImpl : public raftex::SnapshotManager { +public: + explicit SnapshotManagerImpl(KVStore* kv) : store_(kv) {} + + void accessAllRowsInSnapshot(GraphSpaceID spaceId, + PartitionID partId, + raftex::SnapshotCallback cb) override; + +private: + KVStore* store_; +}; + +} // namespace kvstore +} // namespace nebula + +#endif // KVSTORE_SNAPSHOTMANAGERIMPL_H_ + diff --git a/src/kvstore/raftex/CMakeLists.txt b/src/kvstore/raftex/CMakeLists.txt index 7437ba000ad..5ecd5bfec11 100644 --- a/src/kvstore/raftex/CMakeLists.txt +++ b/src/kvstore/raftex/CMakeLists.txt @@ -4,6 +4,7 @@ nebula_add_library( RaftPart.cpp RaftexService.cpp Host.cpp + SnapshotManager.cpp ) add_subdirectory(test) diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 249c85021d6..3aa55a421f5 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -106,37 +106,22 @@ folly::Future Host::appendLogs( std::lock_guard g(lock_); auto res = checkStatus(); - - if (logId == logIdToSend_) { - // This is a re-send or a heartbeat. If there is an - // ongoing request, we will just return SUCCEEDED - if (requestOnGoing_) { - LOG(INFO) << idStr_ << "Another request is onging," - "ignore the re-send request"; - cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::SUCCEEDED); - return r; - } - } else { - // Otherwise, logId has to be greater - if (logId < logIdToSend_) { - LOG(INFO) << idStr_ << "The log has been sended"; - cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::SUCCEEDED); - return r; - } + if (logId <= lastLogIdSent_) { + LOG(INFO) << idStr_ << "The log " << logId << " has been sended" + << ", lastLogIdSent " << lastLogIdSent_; + cpp2::AppendLogResponse r; + r.set_error_code(cpp2::ErrorCode::SUCCEEDED); + return r; } if (requestOnGoing_ && res == cpp2::ErrorCode::SUCCEEDED) { if (cachingPromise_.size() <= FLAGS_max_outstanding_requests) { pendingReq_ = std::make_tuple(term, logId, - committedLogId, - prevLogTerm, - prevLogId); + committedLogId); return cachingPromise_.getFuture(); } else { - LOG(INFO) << idStr_ + PLOG_EVERY_N(INFO, 200) << idStr_ << "Too many requests are waiting, return error"; cpp2::AppendLogResponse r; r.set_error_code(cpp2::ErrorCode::E_TOO_MANY_REQUESTS); @@ -155,14 +140,20 @@ folly::Future Host::appendLogs( VLOG(2) << idStr_ << "About to send the AppendLog request"; // No request is ongoing, let's send a new request - CHECK_GE(prevLogTerm, lastLogTermSent_); - CHECK_GE(prevLogId, lastLogIdSent_); + if (UNLIKELY(lastLogIdSent_ == 0 && lastLogTermSent_ == 0)) { + LOG(INFO) << idStr_ << "This is the first time to send the logs to this host"; + lastLogIdSent_ = prevLogId; + lastLogTermSent_ = prevLogTerm; + } + if (prevLogTerm < lastLogTermSent_ || prevLogId < lastLogIdSent_) { + LOG(INFO) << idStr_ << "We have sended this log, so go on from id " << lastLogIdSent_ + << ", term " << lastLogTermSent_ << "; current prev log id " << prevLogId + << ", current prev log term " << prevLogTerm; + } logTermToSend_ = term; logIdToSend_ = logId; - lastLogTermSent_ = prevLogTerm; - lastLogIdSent_ = prevLogId; committedLogId_ = committedLogId; - pendingReq_ = std::make_tuple(0, 0, 0, 0, 0); + pendingReq_ = std::make_tuple(0, 0, 0); promise_ = std::move(cachingPromise_); cachingPromise_ = folly::SharedPromise(); ret = promise_.getFuture(); @@ -183,7 +174,7 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) { promise_.setValue(r); cachingPromise_.setValue(r); cachingPromise_ = folly::SharedPromise(); - pendingReq_ = std::make_tuple(0, 0, 0, 0, 0); + pendingReq_ = std::make_tuple(0, 0, 0); requestOnGoing_ = false; } @@ -193,7 +184,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, [eb, self = shared_from_this()] (folly::Try&& t) { VLOG(3) << self->idStr_ << "appendLogs() call got response"; if (t.hasException()) { - LOG(ERROR) << self->idStr_ << t.exception().what(); + VLOG(2) << self->idStr_ << t.exception().what(); cpp2::AppendLogResponse r; r.set_error_code(cpp2::ErrorCode::E_EXCEPTION); { @@ -257,7 +248,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, self->promise_ = std::move(self->cachingPromise_); self->cachingPromise_ = folly::SharedPromise(); - self->pendingReq_ = std::make_tuple(0, 0, 0, 0, 0); + self->pendingReq_ = std::make_tuple(0, 0, 0); } } @@ -300,6 +291,47 @@ void Host::appendLogsInternal(folly::EventBase* eb, } return r; } + case cpp2::ErrorCode::E_WAITING_SNAPSHOT: { + VLOG(2) << self->idStr_ + << "The host is waiting for the snapshot, so we need to send log from " + << " current committedLogId " << self->committedLogId_; + std::shared_ptr newReq; + cpp2::AppendLogResponse r; + { + std::lock_guard g(self->lock_); + auto res = self->checkStatus(); + if (res != cpp2::ErrorCode::SUCCEEDED) { + VLOG(2) << self->idStr_ + << "The host is not in a proper status," + " skip waiting the snapshot"; + r.set_error_code(res); + self->setResponse(r); + } else { + self->lastLogIdSent_ = self->committedLogId_; + self->lastLogTermSent_ = self->logTermToSend_; + newReq = self->prepareAppendLogRequest(); + r = std::move(resp); + } + } + if (newReq) { + self->appendLogsInternal(eb, newReq); + } else { + self->noMoreRequestCV_.notify_all(); + } + return r; + } + case cpp2::ErrorCode::E_LOG_STALE: { + VLOG(2) << self->idStr_ << "Log stale, reset lastLogIdSent " << self->lastLogIdSent_ + << " to the followers lastLodId" << resp.get_last_log_id(); + { + std::lock_guard g(self->lock_); + self->lastLogIdSent_ = resp.get_last_log_id(); + self->lastLogTermSent_ = resp.get_last_log_term(); + self->setResponse(resp); + } + self->noMoreRequestCV_.notify_all(); + return resp; + } default: { PLOG_EVERY_N(ERROR, 100) << self->idStr_ @@ -319,7 +351,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr -Host::prepareAppendLogRequest() const { +Host::prepareAppendLogRequest() { CHECK(!lock_.try_lock()); auto req = std::make_shared(); req->set_space(part_->spaceId()); @@ -353,8 +385,25 @@ Host::prepareAppendLogRequest() const { logs.emplace_back(std::move(le)); } req->set_log_str_list(std::move(logs)); + req->set_sending_snapshot(false); } else { - LOG(FATAL) << idStr_ << "We have not support snapshot yet"; + req->set_sending_snapshot(true); + if (!sendingSnapshot_) { + LOG(INFO) << idStr_ << "Can't find log " << lastLogIdSent_ + 1 + << " in wal, send the snapshot"; + sendingSnapshot_ = true; + part_->snapshot_->sendSnapshot(part_, addr_).then([this] (Status&& status) { + if (status.ok()) { + LOG(INFO) << idStr_ << "Send snapshot succeeded!"; + } else { + LOG(INFO) << idStr_ << "Send snapshot failed!"; + // TODO(heng): we should tell the follower i am failed. + } + sendingSnapshot_ = false; + }); + } else { + LOG(INFO) << idStr_ << "The snapshot req is in queue, please wait for a moment"; + } } return req; @@ -392,7 +441,7 @@ folly::Future Host::sendAppendLogRequest( bool Host::noRequest() const { CHECK(!lock_.try_lock()); - static auto emptyTup = std::make_tuple(0, 0, 0, 0, 0); + static auto emptyTup = std::make_tuple(0, 0, 0); return pendingReq_ == emptyTup; } diff --git a/src/kvstore/raftex/Host.h b/src/kvstore/raftex/Host.h index 5c77031f284..7a76867db42 100644 --- a/src/kvstore/raftex/Host.h +++ b/src/kvstore/raftex/Host.h @@ -85,7 +85,7 @@ class Host final : public std::enable_shared_from_this { folly::EventBase* eb, std::shared_ptr req); - std::shared_ptr prepareAppendLogRequest() const; + std::shared_ptr prepareAppendLogRequest(); bool noRequest() const; @@ -97,8 +97,8 @@ class Host final : public std::enable_shared_from_this { } private: - // - using Request = std::tuple; + // + using Request = std::tuple; std::shared_ptr part_; const HostAddr addr_; @@ -115,7 +115,7 @@ class Host final : public std::enable_shared_from_this { folly::SharedPromise promise_; folly::SharedPromise cachingPromise_; - Request pendingReq_{0, 0, 0, 0, 0}; + Request pendingReq_{0, 0, 0}; // These logId and term pointing to the latest log we need to send LogID logIdToSend_{0}; @@ -126,6 +126,7 @@ class Host final : public std::enable_shared_from_this { TermID lastLogTermSent_{0}; LogID committedLogId_{0}; + std::atomic_bool sendingSnapshot_{false}; }; } // namespace raftex diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index bb234973b1b..05e20809a53 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -24,6 +24,9 @@ DEFINE_bool(accept_log_append_during_pulling, false, "Whether to accept new logs during pulling the snapshot"); DEFINE_uint32(raft_heartbeat_interval_secs, 5, "Seconds between each heartbeat"); + +DEFINE_uint64(raft_snapshot_timeout, 60 * 5, "Max seconds between two snapshot requests"); + DEFINE_uint32(max_batch_size, 256, "The max number of logs in a batch"); DEFINE_int32(wal_ttl, 86400, "Default wal ttl"); @@ -201,7 +204,8 @@ RaftPart::RaftPart(ClusterID clusterId, BufferFlusher* flusher, std::shared_ptr pool, std::shared_ptr workers, - std::shared_ptr executor) + std::shared_ptr executor, + std::shared_ptr snapshotMan) : idStr_{folly::stringPrintf("[Port: %d, Space: %d, Part: %d] ", localAddr.second, spaceId, partId)} , clusterId_{clusterId} @@ -213,13 +217,15 @@ RaftPart::RaftPart(ClusterID clusterId, , leader_{0, 0} , ioThreadPool_{pool} , bgWorkers_{workers} - , executor_(executor) { + , executor_(executor) + , snapshot_(snapshotMan) { FileBasedWalPolicy policy; policy.ttl = FLAGS_wal_ttl; policy.fileSize = FLAGS_wal_file_size; policy.bufferSize = FLAGS_wal_buffer_size; policy.numBuffers = FLAGS_wal_buffer_num; wal_ = FileBasedWal::getWal(walRoot, + idStr_, policy, flusher, [this] (LogID logId, @@ -787,6 +793,9 @@ bool RaftPart::needToStartElection() { role_ == Role::FOLLOWER && (lastMsgRecvDur_.elapsedInSec() >= FLAGS_raft_heartbeat_interval_secs || term_ == 0)) { + LOG(INFO) << idStr_ << "Start leader election, reason: lastMsgDur " + << lastMsgRecvDur_.elapsedInSec() + << ", term " << term_; role_ = Role::CANDIDATE; } @@ -981,10 +990,15 @@ void RaftPart::statusPolling() { VLOG(2) << idStr_ << "Need to send heartbeat"; sendHeartbeat(); } - wal_->cleanWAL(); + if (needToCleanupSnapshot()) { + LOG(INFO) << idStr_ << "Clean up the snapshot"; + cleanupSnapshot(); + } + wal_->cleanWAL(FLAGS_wal_ttl); { std::lock_guard g(raftLock_); - if (status_ == Status::RUNNING) { + if (status_ == Status::RUNNING || status_ == Status::WAITING_SNAPSHOT) { + VLOG(3) << idStr_ << "Schedule new task"; bgWorkers_->addDelayTask( delay, [self = shared_from_this()] { @@ -994,6 +1008,22 @@ void RaftPart::statusPolling() { } } +bool RaftPart::needToCleanupSnapshot() { + std::lock_guard g(raftLock_); + return status_ == Status::WAITING_SNAPSHOT && + role_ != Role::LEADER && + lastSnapshotRecvDur_.elapsedInSec() >= FLAGS_raft_snapshot_timeout; +} + +void RaftPart::cleanupSnapshot() { + LOG(INFO) << idStr_ << "Clean up the snapshot"; + std::lock_guard g(raftLock_); + wal_->reset(); + cleanup(); + lastLogId_ = committedLogId_ = 0; + lastTotalCount_ = lastTotalSize_ = 0; + status_ = Status::RUNNING; +} void RaftPart::processAskForVoteRequest( const cpp2::AskForVoteRequest& req, @@ -1089,8 +1119,6 @@ void RaftPart::processAskForVoteRequest( void RaftPart::processAppendLogRequest( const cpp2::AppendLogRequest& req, cpp2::AppendLogResponse& resp) { - bool hasSnapshot = req.get_snapshot_uri() != nullptr; - VLOG(2) << idStr_ << "Received logAppend " << ": GraphSpaceId = " << req.get_space() @@ -1106,9 +1134,9 @@ void RaftPart::processAppendLogRequest( ", num_logs = %ld, logTerm = %ld", req.get_log_str_list().size(), req.get_log_term()) - << (hasSnapshot - ? ", SnapshotURI = " + *(req.get_snapshot_uri()) - : ""); + << ", sendingSnapshot = " << req.get_sending_snapshot() + << ", local lastLogId = " << lastLogId_ + << ", local committedLogId = " << committedLogId_; std::lock_guard g(raftLock_); @@ -1118,7 +1146,6 @@ void RaftPart::processAppendLogRequest( resp.set_committed_log_id(committedLogId_); resp.set_last_log_id(lastLogId_); resp.set_last_log_term(lastLogTerm_); - resp.set_pulling_snapshot(false); // Check status if (UNLIKELY(status_ == Status::STOPPED)) { @@ -1148,22 +1175,52 @@ void RaftPart::processAppendLogRequest( // Reset the timeout timer lastMsgRecvDur_.reset(); - // TODO Check snapshot pulling status -// if (hasSnapshot && !isPullingSnapshot()) { -// // We need to pull the snapshot -// startSnapshotPullingThread(std::move(req.get_snapshot_uri())); -// } -// if (isPullingSnapshot()) { -// CHECK_NE(oldRole, Role::LEADER); -// resp.set_pulling_snapshot(true); -// if (!FLAGS_accept_log_append_during_pulling) { -// VLOG(2) << idStr_ -// << "Pulling the snapshot and not allowed to accept" -// " the LogAppend Requests"; -// resp.set_error_code(cpp2::ErrorCode::E_PULLING_SNAPSHOT); -// return; -// } -// } + if (req.get_sending_snapshot() && status_ != Status::WAITING_SNAPSHOT) { + LOG(INFO) << idStr_ << "Begin to wait for the snapshot"; + wal_->reset(); + cleanup(); + lastLogId_ = committedLogId_ = 0; + lastTotalCount_ = lastTotalSize_ = 0; + status_ = Status::WAITING_SNAPSHOT; + resp.set_error_code(cpp2::ErrorCode::E_WAITING_SNAPSHOT); + return; + } + + if (UNLIKELY(status_ == Status::WAITING_SNAPSHOT)) { + VLOG(2) << idStr_ + << "The part is receiving snapshot," + << "so just accept the new wals, but don't commit them." + << "last_log_id_sent " << req.get_last_log_id_sent() + << ", total log number " << req.get_log_str_list().size(); + if (lastLogId_ > 0 && req.get_last_log_id_sent() > lastLogId_) { + // There is a gap + LOG(INFO) << idStr_ << "Local is missing logs from id " + << lastLogId_ << ". Need to catch up"; + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); + return; + } + // Append new logs + size_t numLogs = req.get_log_str_list().size(); + LogID firstId = req.get_last_log_id_sent() + 1; + + VLOG(2) << idStr_ << "Writing log [" << firstId + << ", " << firstId + numLogs - 1 << "] to WAL"; + LogStrListIterator iter(firstId, + req.get_log_term(), + req.get_log_str_list()); + if (wal_->appendLogs(iter)) { + CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()); + lastLogId_ = wal_->lastLogId(); + lastLogTerm_ = wal_->lastLogTerm(); + resp.set_last_log_id(lastLogId_); + resp.set_last_log_term(lastLogTerm_); + resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); + } else { + LOG(ERROR) << idStr_ << "Failed to append logs to WAL"; + resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); + } + return; + } if (req.get_last_log_id_sent() < committedLogId_) { LOG(INFO) << idStr_ << "The log " << req.get_last_log_id_sent() @@ -1173,7 +1230,7 @@ void RaftPart::processAppendLogRequest( return; } if (lastLogTerm_ > 0 && req.get_last_log_term_sent() != lastLogTerm_) { - VLOG(2) << idStr_ << "The local last log term is " << lastLogTerm_ + LOG(INFO) << idStr_ << "The local last log term is " << lastLogTerm_ << ", which is different from the leader's prevLogTerm " << req.get_last_log_term_sent() << ". So need to rollback to last committedLogId_ " << committedLogId_; @@ -1187,12 +1244,14 @@ void RaftPart::processAppendLogRequest( return; } else if (req.get_last_log_id_sent() > lastLogId_) { // There is a gap - VLOG(2) << idStr_ << "Local is missing logs from id " + LOG(INFO) << idStr_ << "Local is missing logs from id " << lastLogId_ << ". Need to catch up"; resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); return; } else if (req.get_last_log_id_sent() < lastLogId_) { - // Local has some extra logs, which need to be rolled back + LOG(INFO) << idStr_ << "Local " << lastLogId_ + << " has some extra logs, which need to be rolled back to " + << req.get_last_log_id_sent(); wal_->rollbackToLog(req.get_last_log_id_sent()); lastLogId_ = wal_->lastLogId(); lastLogTerm_ = wal_->lastLogTerm(); @@ -1227,7 +1286,7 @@ void RaftPart::processAppendLogRequest( LogID lastLogIdCanCommit = std::min(lastLogId_, req.get_committed_log_id()); CHECK(committedLogId_ + 1 <= lastLogIdCanCommit); if (commitLogs(wal_->iterator(committedLogId_ + 1, lastLogIdCanCommit))) { - VLOG(2) << idStr_ << "Follower succeeded committing log " + VLOG(1) << idStr_ << "Follower succeeded committing log " << committedLogId_ + 1 << " to " << lastLogIdCanCommit; committedLogId_ = lastLogIdCanCommit; @@ -1315,6 +1374,54 @@ cpp2::ErrorCode RaftPart::verifyLeader( return cpp2::ErrorCode::SUCCEEDED; } +void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req, + cpp2::SendSnapshotResponse& resp) { + LOG(INFO) << idStr_ << "Receive snapshot, total rows " << req.get_rows().size() + << ", total count received " << req.get_total_count() + << ", total size received " << req.get_total_size() + << ", finished " << req.get_done(); + std::lock_guard g(raftLock_); + // Check status + if (UNLIKELY(status_ == Status::STOPPED)) { + VLOG(2) << idStr_ + << "The part has been stopped, skip the request"; + resp.set_error_code(cpp2::ErrorCode::E_BAD_STATE); + return; + } + if (UNLIKELY(status_ == Status::STARTING)) { + VLOG(2) << idStr_ << "The partition is still starting"; + resp.set_error_code(cpp2::ErrorCode::E_NOT_READY); + return; + } + if (status_ != Status::WAITING_SNAPSHOT) { + LOG(INFO) << idStr_ << "Begin to receive the snapshot"; + wal_->reset(); + cleanup(); + lastLogId_ = committedLogId_ = 0; + lastTotalCount_ = 0; + lastTotalSize_ = 0; + status_ = Status::WAITING_SNAPSHOT; + } + lastSnapshotRecvDur_.reset(); + // TODO(heng): Maybe we should save them into one sst firstly? + // And before commit the snapshot, we should clean up all data. + auto ret = commitSnapshot(req.get_rows(), + req.get_committed_log_id(), + req.get_committed_log_term(), + req.get_done()); + lastTotalCount_ += ret.first; + lastTotalSize_ += ret.second; + CHECK_EQ(lastTotalCount_, req.get_total_count()); + CHECK_EQ(lastTotalSize_, req.get_total_size()); + if (req.get_done()) { + committedLogId_ = req.get_committed_log_id(); + status_ = Status::RUNNING; + LOG(INFO) << idStr_ << "Receive all snapshot, committedLogId_ = " << committedLogId_ + << ", lastLodId " << lastLogId_; + } + resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); + return; +} folly::Future RaftPart::sendHeartbeat() { VLOG(2) << idStr_ << "Send heartbeat"; diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 96f65dd1ffd..69514d624f1 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -14,6 +14,7 @@ #include "time/Duration.h" #include "thread/GenericThreadPool.h" #include "base/LogIterator.h" +#include "kvstore/raftex/SnapshotManager.h" namespace folly { class IOThreadPoolExecutor; @@ -68,6 +69,7 @@ using AtomicOp = folly::Function; class RaftPart : public std::enable_shared_from_this { friend class AppendLogsIterator; friend class Host; + friend class SnapshotManager; public: virtual ~RaftPart(); @@ -175,6 +177,10 @@ class RaftPart : public std::enable_shared_from_this { const cpp2::AppendLogRequest& req, cpp2::AppendLogResponse& resp); + // Process sendSnapshot request + void processSendSnapshotRequest( + const cpp2::SendSnapshotRequest& req, + cpp2::SendSnapshotResponse& resp); protected: // Protected constructor to prevent from instantiating directly @@ -186,7 +192,8 @@ class RaftPart : public std::enable_shared_from_this { wal::BufferFlusher* flusher, std::shared_ptr pool, std::shared_ptr workers, - std::shared_ptr executor); + std::shared_ptr executor, + std::shared_ptr snapshotMan); const char* idStr() const { return idStr_.c_str(); @@ -216,11 +223,21 @@ class RaftPart : public std::enable_shared_from_this { ClusterID clusterId, const std::string& log) = 0; + // Return committed; + virtual std::pair commitSnapshot(const std::vector& data, + LogID committedLogId, + TermID committedLogTerm, + bool finished) = 0; + + // Clean up all data about current part in storage. + virtual void cleanup() = 0; + private: enum class Status { STARTING = 0, // The part is starting, not ready for service RUNNING, // The part is running - STOPPED // The part has been stopped + STOPPED, // The part has been stopped + WAITING_SNAPSHOT // Waiting for the snapshot. }; enum class Role { @@ -275,6 +292,10 @@ class RaftPart : public std::enable_shared_from_this { void statusPolling(); + bool needToCleanupSnapshot(); + + void cleanupSnapshot(); + // The method sends out AskForVote request // It return true if a leader is elected, otherwise returns false bool leaderElection(); @@ -461,6 +482,13 @@ class RaftPart : public std::enable_shared_from_this { std::shared_ptr bgWorkers_; // Workers pool std::shared_ptr executor_; + + std::shared_ptr snapshot_; + + // Used in snapshot, record the last total count and total size received from request + int64_t lastTotalCount_ = 0; + int64_t lastTotalSize_ = 0; + time::Duration lastSnapshotRecvDur_; }; } // namespace raftex diff --git a/src/kvstore/raftex/RaftexService.cpp b/src/kvstore/raftex/RaftexService.cpp index cb2ebdd1730..a7d67f62e45 100644 --- a/src/kvstore/raftex/RaftexService.cpp +++ b/src/kvstore/raftex/RaftexService.cpp @@ -65,6 +65,7 @@ void RaftexService::initThriftServer(std::shared_ptrsetPort(port); server_->setIdleTimeout(std::chrono::seconds(0)); + server_->setReusePort(true); if (pool != nullptr) { server_->setIOThreadPool(pool); } @@ -213,6 +214,18 @@ void RaftexService::appendLog( part->processAppendLogRequest(req, resp); } +void RaftexService::sendSnapshot( + cpp2::SendSnapshotResponse& resp, + const cpp2::SendSnapshotRequest& req) { + auto part = findPart(req.get_space(), req.get_part()); + if (!part) { + // Not found + resp.set_error_code(cpp2::ErrorCode::E_UNKNOWN_PART); + return; + } + + part->processSendSnapshotRequest(req, resp); +} } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/RaftexService.h b/src/kvstore/raftex/RaftexService.h index 684ed2357c1..ee4466048c2 100644 --- a/src/kvstore/raftex/RaftexService.h +++ b/src/kvstore/raftex/RaftexService.h @@ -46,9 +46,16 @@ class RaftexService : public cpp2::RaftexServiceSvIf { void appendLog(cpp2::AppendLogResponse& resp, const cpp2::AppendLogRequest& req) override; + void sendSnapshot( + cpp2::SendSnapshotResponse& resp, + const cpp2::SendSnapshotRequest& req) override; + void addPartition(std::shared_ptr part); void removePartition(std::shared_ptr part); + std::shared_ptr findPart(GraphSpaceID spaceId, + PartitionID partId); + private: void initThriftServer(std::shared_ptr pool, std::shared_ptr workers, @@ -61,9 +68,6 @@ class RaftexService : public cpp2::RaftexServiceSvIf { RaftexService() = default; - std::shared_ptr findPart(GraphSpaceID spaceId, - PartitionID partId); - private: std::unique_ptr server_; std::unique_ptr serverThread_; diff --git a/src/kvstore/raftex/SnapshotManager.cpp b/src/kvstore/raftex/SnapshotManager.cpp new file mode 100644 index 00000000000..da8e9a989cf --- /dev/null +++ b/src/kvstore/raftex/SnapshotManager.cpp @@ -0,0 +1,116 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ +#include "kvstore/raftex/SnapshotManager.h" +#include "base/NebulaKeyUtils.h" +#include "kvstore/raftex/RaftPart.h" + +DEFINE_int32(snapshot_worker_threads, 4, "Threads number for snapshot"); +DEFINE_int32(snapshot_io_threads, 4, "Threads number for snapshot"); +DEFINE_int32(snapshot_send_retry_times, 3, "Retry times if send failed"); +DEFINE_int32(snapshot_send_timeout_ms, 60000, "Rpc timeout for sending snapshot"); + +namespace nebula { +namespace raftex { + +SnapshotManager::SnapshotManager() { + executor_.reset(new folly::IOThreadPoolExecutor(FLAGS_snapshot_worker_threads)); + ioThreadPool_.reset(new folly::IOThreadPoolExecutor(FLAGS_snapshot_io_threads)); +} + +folly::Future SnapshotManager::sendSnapshot(std::shared_ptr part, + const HostAddr& dst) { + folly::Promise p; + auto fut = p.getFuture(); + executor_->add([this, p = std::move(p), part, dst] () mutable { + auto spaceId = part->spaceId_; + auto partId = part->partId_; + auto termId = part->term_; + auto commitLogIdAndTerm = part->lastCommittedLogId(); + const auto& localhost = part->address(); + std::vector> results; + LOG(INFO) << part->idStr_ << "Begin to send the snapshot"; + accessAllRowsInSnapshot(spaceId, + partId, + [&, this, p = std::move(p)] ( + std::vector&& data, + int64_t totalCount, + int64_t totalSize, + bool finished) mutable { + int retry = FLAGS_snapshot_send_retry_times; + while (retry-- > 0) { + auto f = send(spaceId, + partId, + termId, + commitLogIdAndTerm.first, + commitLogIdAndTerm.second, + localhost, + std::move(data), + totalSize, + totalCount, + dst, + finished); + // TODO(heng): we send request one by one to avoid too large memory occupied. + try { + auto resp = std::move(f).get(); + if (resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { + LOG(INFO) << part->idStr_ << "has sended count " << totalCount; + if (finished) { + LOG(INFO) << part->idStr_ << "Finished, totalCount " << totalCount + << ", totalSize " << totalSize; + p.setValue(Status::OK()); + } + return; + } + } catch (const std::exception& e) { + LOG(ERROR) << part->idStr_ << "Send snapshot failed, exception " << e.what(); + p.setValue(Status::Error("Send snapshot failed!")); + return; + } + } + LOG(WARNING) << part->idStr_ << "Send snapshot failed!"; + p.setValue(Status::Error("Send snapshot failed")); + return; + }); + }); + return fut; +} + +folly::Future SnapshotManager::send( + GraphSpaceID spaceId, + PartitionID partId, + TermID termId, + LogID committedLogId, + TermID committedLogTerm, + const HostAddr& localhost, + std::vector&& data, + int64_t totalSize, + int64_t totalCount, + const HostAddr& addr, + bool finished) { + VLOG(2) << "Send snapshot request to " << addr; + raftex::cpp2::SendSnapshotRequest req; + req.set_space(spaceId); + req.set_part(partId); + req.set_term(termId); + req.set_committed_log_id(committedLogId); + req.set_committed_log_term(committedLogTerm); + req.set_leader_ip(localhost.first); + req.set_leader_port(localhost.second); + req.set_rows(std::move(data)); + req.set_total_size(totalSize); + req.set_total_count(totalCount); + req.set_done(finished); + auto* evb = ioThreadPool_->getEventBase(); + return folly::via(evb, [this, addr, evb, req = std::move(req)] () mutable { + auto client = connManager_.client(addr, evb, false, FLAGS_snapshot_send_timeout_ms); + return client->future_sendSnapshot(req); + }); +} + +} // namespace raftex +} // namespace nebula + + diff --git a/src/kvstore/raftex/SnapshotManager.h b/src/kvstore/raftex/SnapshotManager.h new file mode 100644 index 00000000000..8f52da7d59c --- /dev/null +++ b/src/kvstore/raftex/SnapshotManager.h @@ -0,0 +1,65 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef RAFTEX_SNAPSHOTMANAGER_H_ +#define RAFTEX_SNAPSHOTMANAGER_H_ + +#include "base/Base.h" +#include "base/StatusOr.h" +#include "gen-cpp2/raftex_types.h" +#include "gen-cpp2/RaftexServiceAsyncClient.h" +#include "thrift/ThriftClientManager.h" +#include +#include +#include + +namespace nebula { +namespace raftex { + +using SnapshotCallback = folly::Function&& rows, + int64_t totalCount, + int64_t totalSize, + bool finished)>; +class RaftPart; + +class SnapshotManager { +public: + SnapshotManager(); + virtual ~SnapshotManager() = default; + + // Send snapshot for spaceId, partId to host dst. + folly::Future sendSnapshot(std::shared_ptr part, + const HostAddr& dst); + +private: + folly::Future send( + GraphSpaceID spaceId, + PartitionID partId, + TermID termId, + LogID committedLogId, + TermID committedLogTerm, + const HostAddr& localhost, + std::vector&& data, + int64_t totalSize, + int64_t totalCount, + const HostAddr& addr, + bool finished); + + virtual void accessAllRowsInSnapshot(GraphSpaceID spaceId, + PartitionID partId, + SnapshotCallback cb) = 0; + +private: + std::unique_ptr executor_; + std::unique_ptr ioThreadPool_; + thrift::ThriftClientManager connManager_; +}; + +} // namespace raftex +} // namespace nebula + +#endif // RAFTEX_SNAPSHOTMANAGER_H_ + diff --git a/src/kvstore/raftex/test/CMakeLists.txt b/src/kvstore/raftex/test/CMakeLists.txt index 89401af4844..7436cb3d801 100644 --- a/src/kvstore/raftex/test/CMakeLists.txt +++ b/src/kvstore/raftex/test/CMakeLists.txt @@ -57,3 +57,10 @@ nebula_add_test( OBJECTS ${RAFTEX_TEST_LIBS} LIBRARIES ${THRIFT_LIBRARIES} wangle gtest ) + +nebula_add_test( + NAME snapshot_test + SOURCES SnapshotTest.cpp RaftexTestBase.cpp TestShard.cpp + OBJECTS ${RAFTEX_TEST_LIBS} + LIBRARIES ${THRIFT_LIBRARIES} wangle gtest +) diff --git a/src/kvstore/raftex/test/LeaderElectionTest.cpp b/src/kvstore/raftex/test/LeaderElectionTest.cpp index fca3ce44cf1..04040bbcda6 100644 --- a/src/kvstore/raftex/test/LeaderElectionTest.cpp +++ b/src/kvstore/raftex/test/LeaderElectionTest.cpp @@ -99,6 +99,7 @@ TEST(LeaderElection, LeaderCrash) { services[idx]->getIOThreadPool(), workers, services[idx]->getThreadManager(), + nullptr, std::bind(&onLeadershipLost, std::ref(copies), std::ref(leader), diff --git a/src/kvstore/raftex/test/LogCommandTest.cpp b/src/kvstore/raftex/test/LogCommandTest.cpp index 4d266088bf2..7fbd76532f7 100644 --- a/src/kvstore/raftex/test/LogCommandTest.cpp +++ b/src/kvstore/raftex/test/LogCommandTest.cpp @@ -55,7 +55,7 @@ TEST_F(LogCommandTest, CommandInMiddle) { ASSERT_EQ(3, leader_->commitTimes_); // need to sleep a bit more - sleep(1); + sleep(FLAGS_raft_heartbeat_interval_secs + 1); checkConsensus(copies_, 0, 9, msgs); } diff --git a/src/kvstore/raftex/test/RaftexTestBase.cpp b/src/kvstore/raftex/test/RaftexTestBase.cpp index 63c1f4e9f3f..844a29d8306 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.cpp +++ b/src/kvstore/raftex/test/RaftexTestBase.cpp @@ -183,6 +183,7 @@ void setupRaft( if (isLearner.empty()) { isLearner.resize(allHosts.size(), false); } + auto sps = snapshots(services); // Create one copy of the shard for each service for (size_t i = 0; i < services.size(); i++) { copies.emplace_back(std::make_shared( @@ -195,6 +196,7 @@ void setupRaft( services[i]->getIOThreadPool(), workers, services[i]->getThreadManager(), + sps[i], std::bind(&onLeadershipLost, std::ref(copies), std::ref(leader), @@ -333,6 +335,16 @@ void rebootOneCopy(std::vector>& services, LOG(INFO) << "copies " << index << " reboot"; } +std::vector> snapshots( + const std::vector>& services) { + std::vector> snapshots; + for (auto& service : services) { + std::shared_ptr snapshot(new test::SnapshotManagerImpl(service.get())); + snapshots.emplace_back(std::move(snapshot)); + } + return snapshots; +} + } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/test/RaftexTestBase.h b/src/kvstore/raftex/test/RaftexTestBase.h index 31a3e6399a7..569ccb5f305 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.h +++ b/src/kvstore/raftex/test/RaftexTestBase.h @@ -14,7 +14,7 @@ #include "fs/FileUtils.h" #include "thread/GenericThreadPool.h" #include "network/NetworkUtils.h" - +#include "kvstore/raftex/SnapshotManager.h" namespace nebula { @@ -106,6 +106,9 @@ void rebootOneCopy(std::vector>& services, std::vector allHosts, size_t index); +std::vector> snapshots( + const std::vector>& services); + class RaftexTestFixture : public ::testing::Test { public: explicit RaftexTestFixture(const std::string& testName, int32_t size = 3) @@ -137,6 +140,7 @@ class RaftexTestFixture : public ::testing::Test { std::vector> services_; std::vector> copies_; std::shared_ptr leader_; + std::vector> snapshots_; }; } // namespace raftex diff --git a/src/kvstore/raftex/test/SnapshotTest.cpp b/src/kvstore/raftex/test/SnapshotTest.cpp new file mode 100644 index 00000000000..d4464057c5f --- /dev/null +++ b/src/kvstore/raftex/test/SnapshotTest.cpp @@ -0,0 +1,110 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "fs/FileUtils.h" +#include "thread/GenericThreadPool.h" +#include "network/NetworkUtils.h" +#include "kvstore/wal/BufferFlusher.h" +#include "kvstore/raftex/RaftexService.h" +#include "kvstore/raftex/test/RaftexTestBase.h" +#include "kvstore/raftex/test/TestShard.h" + +DECLARE_uint32(raft_heartbeat_interval_secs); +DECLARE_int32(wal_ttl); +DECLARE_int64(wal_file_size); +DECLARE_int32(wal_buffer_size); +DECLARE_int32(wal_buffer_num); +DECLARE_int32(raft_rpc_timeout_ms); + +namespace nebula { +namespace raftex { + +TEST(SnapshotTest, LearnerCatchUpDataTest) { + fs::TempDir walRoot("/tmp/catch_up_data.XXXXXX"); + FLAGS_wal_file_size = 128; + FLAGS_wal_buffer_size = 64; + FLAGS_wal_buffer_num = 1000; + FLAGS_raft_rpc_timeout_ms = 2000; + std::shared_ptr workers; + std::vector wals; + std::vector allHosts; + std::vector> services; + std::vector> copies; + + std::shared_ptr leader; + std::vector isLearner = {false, false, false, true}; + setupRaft(4, walRoot, workers, wals, allHosts, services, copies, leader, isLearner); + + // Check all hosts agree on the same leader + checkLeadership(copies, leader); + + std::vector msgs; + for (int i = 0; i < 10; i++) { + appendLogs(i * 100, i * 100 + 99, leader, msgs, true); + } + // Sleep a while to make sure the last log has been committed on followers + sleep(FLAGS_raft_heartbeat_interval_secs); + + // Check every copy + for (int i = 0; i < 3; i++) { + ASSERT_EQ(1000, copies[i]->getNumLogs()); + } + + for (int i = 0; i < 1000; ++i) { + for (int j = 0; j < 3; j++) { + folly::StringPiece msg; + ASSERT_TRUE(copies[j]->getLogMsg(i, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + } + // wait for the wal to be cleaned + FLAGS_wal_ttl = 1; + sleep(FLAGS_wal_ttl + 3); + FLAGS_wal_ttl = 60; + LOG(INFO) << "Add learner, we need to catch up data!"; + auto f = leader->sendCommandAsync(test::encodeLearner(allHosts[3])); + f.wait(); + + LOG(INFO) << "Let's continue to write some logs"; + for (int i = 10; i < 20; i++) { + appendLogs(i * 100, i * 100 + 99, leader, msgs, true); + } + sleep(FLAGS_raft_heartbeat_interval_secs); + + auto& learner = copies[3]; + ASSERT_EQ(2000, learner->getNumLogs()); + for (int i = 0; i < 2000; ++i) { + folly::StringPiece msg; + ASSERT_TRUE(learner->getLogMsg(i, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + + LOG(INFO) << "Finished UT"; + finishRaft(services, copies, workers, leader); + flusher.reset(); +} + +} // namespace raftex +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + // `flusher' is extern-declared in RaftexTestBase.h, defined in RaftexTestBase.cpp + using nebula::raftex::flusher; + flusher = std::make_unique(); + + return RUN_ALL_TESTS(); +} + + diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index e48292b0374..708d8885a0d 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -39,6 +39,20 @@ std::string compareAndSet(const std::string& log) { } } +std::string encodeSnapshotRow(LogID logId, const std::string& row) { + std::string rawData; + rawData.reserve(sizeof(LogID) + row.size()); + rawData.append(reinterpret_cast(&logId), sizeof(logId)); + rawData.append(row.data(), row.size()); + return rawData; +} + +std::pair decodeSnapshotRow(const std::string& rawData) { + LogID id = *reinterpret_cast(rawData.data()); + auto str = rawData.substr(sizeof(LogID)); + return std::make_pair(id, std::move(str)); +} + TestShard::TestShard(size_t idx, std::shared_ptr svc, PartitionID partId, @@ -48,6 +62,7 @@ TestShard::TestShard(size_t idx, std::shared_ptr ioPool, std::shared_ptr workers, std::shared_ptr handlersPool, + std::shared_ptr snapshotMan, std::function leadershipLostCB, std::function @@ -60,7 +75,8 @@ TestShard::TestShard(size_t idx, flusher, ioPool, workers, - handlersPool) + handlersPool, + snapshotMan) , idx_(idx) , service_(svc) , leadershipLostCB_(leadershipLostCB) @@ -118,6 +134,34 @@ bool TestShard::commitLogs(std::unique_ptr iter) { return true; } +std::pair TestShard::commitSnapshot(const std::vector& data, + LogID committedLogId, + TermID committedLogTerm, + bool finished) { + folly::RWSpinLock::WriteHolder wh(&lock_); + int64_t count = 0; + int64_t size = 0; + for (auto& row : data) { + count++; + size += row.size(); + auto idData = decodeSnapshotRow(row); + VLOG(1) << idStr_ << "Commit row logId " << idData.first << ", log " << idData.second; + data_.emplace_back(idData.first, std::move(idData.second)); + } + if (finished) { + lastCommittedLogId_ = committedLogId; + LOG(INFO) << idStr_ << "Commit the snapshot committedLogId " << committedLogId + << ", term " << committedLogTerm; + } + return std::make_pair(count, size); +} + +void TestShard::cleanup() { + folly::RWSpinLock::WriteHolder wh(&lock_); + data_.clear(); + lastCommittedLogId_ = 0; +} + size_t TestShard::getNumLogs() const { return data_.size(); } diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index 9d6d666662a..76f5fe8ba5a 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -9,11 +9,12 @@ #include "base/Base.h" #include "kvstore/raftex/RaftPart.h" +#include "kvstore/raftex/RaftexService.h" namespace nebula { namespace raftex { -class RaftexService; +// class RaftexService; namespace test { @@ -27,7 +28,13 @@ HostAddr decodeLearner(const folly::StringPiece& log); std::string compareAndSet(const std::string& log); +std::string encodeSnapshotRow(LogID logId, const std::string& row); + +std::pair decodeSnapshotRow(const std::string& rawData); + + class TestShard : public RaftPart { + friend class SnapshotManagerImpl; public: TestShard( size_t idx, @@ -39,6 +46,7 @@ class TestShard : public RaftPart { std::shared_ptr ioPool, std::shared_ptr workers, std::shared_ptr handlersPool, + std::shared_ptr snapshotMan, std::function leadershipLostCB, std::function @@ -81,6 +89,13 @@ class TestShard : public RaftPart { return true; } + std::pair commitSnapshot(const std::vector& data, + LogID committedLogId, + TermID committedLogTerm, + bool finished) override; + + void cleanup() override; + size_t getNumLogs() const; bool getLogMsg(size_t index, folly::StringPiece& msg); @@ -103,6 +118,52 @@ class TestShard : public RaftPart { becomeLeaderCB_; }; +class SnapshotManagerImpl : public SnapshotManager { +public: + explicit SnapshotManagerImpl(RaftexService* service) + : service_(service) { + CHECK_NOTNULL(service); + } + + ~SnapshotManagerImpl() { + LOG(INFO) << "~SnapshotManagerImpl()"; + } + + void accessAllRowsInSnapshot(GraphSpaceID spaceId, + PartitionID partId, + SnapshotCallback cb) override { + auto part = std::dynamic_pointer_cast(service_->findPart(spaceId, partId)); + CHECK(!!part); + int64_t totalCount = 0; + int64_t totalSize = 0; + std::vector data; + folly::RWSpinLock::ReadHolder rh(&part->lock_); + for (auto& row : part->data_) { + if (data.size() > 100) { + LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size() + << ", total count sended " << totalCount + << ", total size sended " << totalSize + << ", finished false"; + cb(std::move(data), totalCount, totalSize, false); + data.clear(); + } + auto encoded = encodeSnapshotRow(row.first, row.second); + totalSize += encoded.size(); + totalCount++; + data.emplace_back(std::move(encoded)); + } + if (data.size() > 0) { + LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size() + << ", total count sended " << totalCount + << ", total size sended " << totalSize + << ", finished true"; + cb(std::move(data), totalCount, totalSize, true); + } + } + + RaftexService* service_; +}; + } // namespace test } // namespace raftex } // namespace nebula diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index e3eb488ad9a..2551e19888e 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -24,20 +24,23 @@ using nebula::fs::FileUtils; // static std::shared_ptr FileBasedWal::getWal( const folly::StringPiece dir, + const std::string& idStr, FileBasedWalPolicy policy, BufferFlusher* flusher, PreProcessor preProcessor) { return std::shared_ptr( - new FileBasedWal(dir, std::move(policy), flusher, std::move(preProcessor))); + new FileBasedWal(dir, idStr, std::move(policy), flusher, std::move(preProcessor))); } FileBasedWal::FileBasedWal(const folly::StringPiece dir, + const std::string& idStr, FileBasedWalPolicy policy, BufferFlusher* flusher, PreProcessor preProcessor) - : flusher_(flusher) - , dir_(dir.toString()) + : dir_(dir.toString()) + , idStr_(idStr) + , flusher_(flusher) , policy_(std::move(policy)) , maxFileSize_(policy_.fileSize) , maxBufferSize_(policy_.bufferSize) @@ -64,7 +67,8 @@ FileBasedWal::~FileBasedWal() { // FileBasedWal inherits from std::enable_shared_from_this, so at this // moment, there should have no other thread holding this WAL object while (onGoingBuffersNum_ > 0) { - LOG(INFO) << "Waiting for the buffer flushed, remaining " << onGoingBuffersNum_.load(); + LOG(INFO) << idStr_ + << "Waiting for the buffer flushed, remaining " << onGoingBuffersNum_.load(); usleep(50000); } if (!buffers_.empty()) { @@ -77,7 +81,7 @@ FileBasedWal::~FileBasedWal() { // Close the last file closeCurrFile(); - LOG(INFO) << "~FileBasedWal, dir = " << dir_; + LOG(INFO) << idStr_ << "~FileBasedWal, dir = " << dir_; } @@ -301,7 +305,7 @@ void FileBasedWal::prepareNewFile(LogID startLogId) { FileUtils::joinPath(dir_, folly::stringPrintf("%019ld.wal", startLogId)), startLogId); - VLOG(1) << "Write new file " << info->path(); + VLOG(1) << idStr_ << "Write new file " << info->path(); walFiles_.emplace(std::make_pair(startLogId, info)); // Create the file for write @@ -337,7 +341,7 @@ void FileBasedWal::dumpCord(Cord& cord, do { ssize_t res = write(currFd_, start, size); if (res < 0) { - LOG(ERROR) << "Failed to write wal file (" << errno + LOG(ERROR) << idStr_ << "Failed to write wal file (" << errno << "): " << strerror(errno) << ", fd " << currFd_; return false; } @@ -347,7 +351,7 @@ void FileBasedWal::dumpCord(Cord& cord, return true; }; if (!cord.applyTo(cb)) { - LOG(FATAL) << "Failed to flush the wal file"; + LOG(FATAL) << idStr_ << "Failed to flush the wal file"; } else { // Succeeded writing all buffered content, adjust the file size currInfo_->setSize(currInfo_->size() + cord.size()); @@ -422,8 +426,8 @@ BufferPtr FileBasedWal::createNewBuffer( std::unique_lock& guard) { if (buffers_.size() >= policy_.numBuffers) { // Log appending is way too fast - LOG(WARNING) << "Write buffer is exhausted," - " need to wait for vacancy"; + PLOG_EVERY_N(WARNING, 100) << idStr_ << "Write buffer is exhausted," + " need to wait for vacancy"; // TODO: Output a counter here // Need to wait for a vacant slot slotReadyCV_.wait(guard, [self = shared_from_this()] { @@ -442,19 +446,19 @@ bool FileBasedWal::appendLogInternal(BufferPtr& buffer, ClusterID cluster, std::string msg) { if (stopped_) { - LOG(ERROR) << "WAL has stopped. Do not accept logs any more"; + LOG(ERROR) << idStr_ << "WAL has stopped. Do not accept logs any more"; return false; } if (lastLogId_ != 0 && firstLogId_ != 0 && id != lastLogId_ + 1) { - LOG(ERROR) << "There is a gap in the log id. The last log id is " + LOG(ERROR) << idStr_ << "There is a gap in the log id. The last log id is " << lastLogId_ << ", and the id being appended is " << id; return false; } if (!preProcessor_(id, term, cluster, msg)) { - LOG(ERROR) << "Pre process failed for log " << id; + LOG(ERROR) << idStr_ << "Pre process failed for log " << id; return false; } @@ -504,7 +508,7 @@ bool FileBasedWal::appendLog(LogID id, } if (!appendLogInternal(buffer, id, term, cluster, std::move(msg))) { - LOG(ERROR) << "Failed to append log for logId " << id; + LOG(ERROR) << idStr_ << "Failed to append log for logId " << id; return false; } return true; @@ -525,7 +529,7 @@ bool FileBasedWal::appendLogs(LogIterator& iter) { iter.logTerm(), iter.logSource(), iter.logMsg().toString())) { - LOG(ERROR) << "Failed to append log for logId " + LOG(ERROR) << idStr_ << "Failed to append log for logId " << iter.logId(); return false; } @@ -548,7 +552,7 @@ bool FileBasedWal::rollbackToLog(LogID id) { std::lock_guard flushGuard(flushMutex_); bool foundTarget{false}; if (id < firstLogId_ - 1 || id > lastLogId_) { - LOG(ERROR) << "Rollback target id " << id + LOG(ERROR) << idStr_ << "Rollback target id " << id << " is not in the range of [" << firstLogId_ << "," << lastLogId_ << "] of WAL"; @@ -596,7 +600,7 @@ bool FileBasedWal::rollbackToLog(LogID id) { int fd{-1}; while (!foundTarget) { - LOG(WARNING) << "Need to rollback from files." + LOG(WARNING) << idStr_ << "Need to rollback from files." " This is an expensive operation." " Please make sure it is correct and necessary"; @@ -702,7 +706,7 @@ bool FileBasedWal::reset() { return true; } -void FileBasedWal::cleanWAL() { +void FileBasedWal::cleanWAL(int32_t ttl) { std::lock_guard g(walFilesMutex_); if (walFiles_.empty()) { return; @@ -712,15 +716,21 @@ void FileBasedWal::cleanWAL() { size_t index = 0; auto it = walFiles_.begin(); auto size = walFiles_.size(); + int count = 0; + int walTTL = ttl == 0 ? policy_.ttl : ttl; while (it != walFiles_.end()) { - if (index++ < size - 1 && (now - it->second->mtime() > policy_.ttl)) { + if (index++ < size - 1 && (now - it->second->mtime() > walTTL)) { VLOG(1) << "Clean wals, Remove " << it->second->path(); unlink(it->second->path()); it = walFiles_.erase(it); + count++; } else { ++it; } } + if (count > 0) { + LOG(INFO) << idStr_ << "Clean wals number " << count; + } firstLogId_ = walFiles_.begin()->second->firstId(); } diff --git a/src/kvstore/wal/FileBasedWal.h b/src/kvstore/wal/FileBasedWal.h index 9f4227267ea..1f9e2f5bdba 100644 --- a/src/kvstore/wal/FileBasedWal.h +++ b/src/kvstore/wal/FileBasedWal.h @@ -23,7 +23,6 @@ struct FileBasedWalPolicy { // This is only a hint, the FileBasedWal will try to keep all messages // newer than ttl, but not guarantee to remove old messages right away int32_t ttl = 86400; - // The maximum size of each log message file (in byte). When the existing // log file reaches this size, a new file will be created size_t fileSize = 128 * 1024L * 1024L; @@ -44,10 +43,12 @@ class FileBasedWal final : public Wal , public std::enable_shared_from_this { FRIEND_TEST(FileBasedWal, TTLTest); + friend class FileBasedWalIterator; public: // A factory method to create a new WAL static std::shared_ptr getWal( const folly::StringPiece dir, + const std::string& idStr, FileBasedWalPolicy policy, BufferFlusher* flusher, PreProcessor preProcessor); @@ -99,7 +100,7 @@ class FileBasedWal final bool reset() override; - void cleanWAL() override; + void cleanWAL(int32_t ttl = 0) override; // Scan [firstLogId, lastLogId] // This method IS thread-safe @@ -133,6 +134,7 @@ class FileBasedWal final // Callers **SHOULD NEVER** use this constructor directly // Callers should use static method getWal() instead FileBasedWal(const folly::StringPiece dir, + const std::string& idStr, FileBasedWalPolicy policy, BufferFlusher* flusher, PreProcessor preProcessor); @@ -171,11 +173,12 @@ class FileBasedWal final * FileBasedWal Member Fields * **************************************/ + const std::string dir_; + std::string idStr_; BufferFlusher* flusher_; std::atomic stopped_{false}; - const std::string dir_; const FileBasedWalPolicy policy_; const size_t maxFileSize_; const size_t maxBufferSize_; diff --git a/src/kvstore/wal/FileBasedWalIterator.cpp b/src/kvstore/wal/FileBasedWalIterator.cpp index 7ebbcc67b88..d247c6f55c4 100644 --- a/src/kvstore/wal/FileBasedWalIterator.cpp +++ b/src/kvstore/wal/FileBasedWalIterator.cpp @@ -25,12 +25,14 @@ FileBasedWalIterator::FileBasedWalIterator( } if (currId_ > lastId_) { + LOG(ERROR) << wal_->idStr_ << "The log " << currId_ + << " is out of range, the lastLogId is " << lastId_; return; } if (startId < wal_->firstLogId()) { - LOG(ERROR) << "The given log id " << startId - << " is out of the range"; + LOG(ERROR) << wal_->idStr_ << "The given log id " << startId + << " is out of the range, the wal firstLogId is " << wal_->firstLogId(); currId_ = lastId_ + 1; return; } else { diff --git a/src/kvstore/wal/Wal.h b/src/kvstore/wal/Wal.h index 080bfab2c57..833f24bb6b7 100644 --- a/src/kvstore/wal/Wal.h +++ b/src/kvstore/wal/Wal.h @@ -44,7 +44,7 @@ class Wal { // Clean all wal files virtual bool reset() = 0; - virtual void cleanWAL() = 0; + virtual void cleanWAL(int32_t ttl = 0) = 0; // Scan [firstLogId, lastLogId] virtual std::unique_ptr iterator(LogID firstLogId, diff --git a/src/kvstore/wal/test/FileBasedWalTest.cpp b/src/kvstore/wal/test/FileBasedWalTest.cpp index 608cfa383d1..8598b051c5c 100644 --- a/src/kvstore/wal/test/FileBasedWalTest.cpp +++ b/src/kvstore/wal/test/FileBasedWalTest.cpp @@ -42,6 +42,7 @@ TEST(FileBasedWal, AppendLogs) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, flusher.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -60,6 +61,7 @@ TEST(FileBasedWal, AppendLogs) { // Now let's open it to read wal = FileBasedWal::getWal(walDir.path(), + "", policy, flusher.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -90,6 +92,7 @@ TEST(FileBasedWal, CacheOverflow) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, flusher.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -118,6 +121,7 @@ TEST(FileBasedWal, CacheOverflow) { // Now let's open it to read wal = FileBasedWal::getWal(walDir.path(), + "", policy, flusher.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -148,6 +152,7 @@ TEST(FileBasedWal, Rollback) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, flusher.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -225,6 +230,7 @@ TEST(FileBasedWal, RollbackToFile) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, flusher.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -250,6 +256,7 @@ TEST(FileBasedWal, RollbackToFile) { // Now let's open it to read wal = FileBasedWal::getWal(walDir.path(), + "", policy, flusher.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -295,6 +302,7 @@ TEST(FileBasedWal, RollbackToMemory) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, flusher.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -350,6 +358,7 @@ TEST(FileBasedWal, RollbackToZero) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, flusher.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -391,6 +400,7 @@ TEST(FileBasedWal, BackAndForth) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, flusher.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -428,6 +438,7 @@ TEST(FileBasedWal, TTLTest) { policy.fileSize = 1024; policy.numBuffers = 30; auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, flu.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -460,6 +471,7 @@ TEST(FileBasedWal, TTLTest) { { // Now let's open it to read wal = FileBasedWal::getWal(walDir.path(), + "", policy, flu.get(), [](LogID, TermID, ClusterID, const std::string&) { @@ -491,6 +503,7 @@ TEST(FileBasedWal, TTLTest) { { // Now let's open it to read wal = FileBasedWal::getWal(walDir.path(), + "", policy, flu.get(), [](LogID, TermID, ClusterID, const std::string&) {