Skip to content

Commit

Permalink
Implement snapshot logic
Browse files Browse the repository at this point in the history
  • Loading branch information
heng committed Aug 22, 2019
1 parent e1f566e commit 2f46e97
Show file tree
Hide file tree
Showing 35 changed files with 939 additions and 113 deletions.
8 changes: 8 additions & 0 deletions src/common/base/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ std::string NebulaKeyUtils::edgeKey(PartitionID partId,
return key;
}

// static
std::string NebulaKeyUtils::prefix(PartitionID partId) {
std::string key;
key.reserve(sizeof(PartitionID));
key.append(reinterpret_cast<const char*>(&partId), sizeof(PartitionID));
return key;
}

// static
std::string NebulaKeyUtils::prefix(PartitionID partId, VertexID srcId, EdgeType type) {
std::string key;
Expand Down
2 changes: 2 additions & 0 deletions src/common/base/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class NebulaKeyUtils final {
static std::string prefix(PartitionID partId, VertexID src, EdgeType type,
EdgeRanking ranking, VertexID dst);

static std::string prefix(PartitionID partId);

static bool isVertex(const folly::StringPiece& rawKey) {
return rawKey.size() == kVertexLen;
}
Expand Down
26 changes: 21 additions & 5 deletions src/interface/raftex.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ enum ErrorCode {
E_LOG_GAP = -1;
E_LOG_STALE = -2;
E_MISSING_COMMIT = -3;
E_PULLING_SNAPSHOT = -4; // The follower is pulling a snapshot
E_WAITING_SNAPSHOT = -4; // The follower is waiting a snapshot

E_UNKNOWN_PART = -5;
E_TERM_OUT_OF_DATE = -6;
Expand All @@ -31,6 +31,7 @@ enum ErrorCode {
E_NOT_A_LEADER = -13;
E_HOST_DISCONNECTED = -14;
E_TOO_MANY_REQUESTS = -15;
E_PERSIST_SNAPSHOT_FAILED = -16;

E_EXCEPTION = -20; // An thrift internal exception was thrown
}
Expand Down Expand Up @@ -94,8 +95,6 @@ struct AppendLogRequest {
//
// Fields 10 to 11 are used for LogAppend.
//
// In the case of heartbeat, the log_str_list will be empty,
// and log_term == 0
//
// In the case of LogAppend, the id of the first log is the
// last_log_id_sent + 1
Expand All @@ -106,7 +105,7 @@ struct AppendLogRequest {
10: TermID log_term;
11: list<LogEntry> log_str_list;

12: optional binary snapshot_uri; // Snapshot URL
12: bool sending_snapshot;
}


Expand All @@ -118,13 +117,30 @@ struct AppendLogResponse {
5: LogID committed_log_id;
6: LogID last_log_id;
7: TermID last_log_term;
8: bool pulling_snapshot;
}

struct SendSnapshotRequest {
1: GraphSpaceID space;
2: PartitionID part;
3: TermID term;
4: LogID committed_log_id;
5: TermID committed_log_term;
6: IPv4 leader_ip;
7: Port leader_port;
8: list<binary> rows;
9: i64 total_size;
10: i64 total_count;
11: bool done;
}

struct SendSnapshotResponse {
1: ErrorCode error_code;
}

service RaftexService {
AskForVoteResponse askForVote(1: AskForVoteRequest req);
AppendLogResponse appendLog(1: AppendLogRequest req);
SendSnapshotResponse sendSnapshot(1: SendSnapshotRequest req);
}


1 change: 1 addition & 0 deletions src/kvstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ nebula_add_library(
NebulaStore.cpp
RocksEngineConfig.cpp
LogEncoder.cpp
SnapshotManagerImpl.cpp
)

add_subdirectory(raftex)
Expand Down
20 changes: 20 additions & 0 deletions src/kvstore/LogEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ namespace kvstore {

constexpr auto kHeadLen = sizeof(int64_t) + 1 + sizeof(uint32_t);

std::string encodeKV(const folly::StringPiece& key,
const folly::StringPiece& val) {
uint32_t ksize = key.size();
uint32_t vsize = val.size();
std::string str;
str.reserve(sizeof(uint32_t) * 2 + ksize + vsize);
str.append(reinterpret_cast<const char*>(&ksize), sizeof(ksize));
str.append(reinterpret_cast<const char*>(&vsize), sizeof(vsize));
str.append(key.data(), ksize);
str.append(key.data(), vsize);
return str;
}

std::pair<folly::StringPiece, folly::StringPiece> decodeKV(const std::string& data) {
auto ksize = *reinterpret_cast<const uint32_t*>(data.data());
auto vsize = *reinterpret_cast<const uint32_t*>(data.data() + sizeof(ksize));
auto key = folly::StringPiece(data.data() + sizeof(ksize) + sizeof(vsize), ksize);
auto val = folly::StringPiece(data.data() + sizeof(ksize) + sizeof(vsize) + ksize, vsize);
return std::make_pair(key, val);
}

std::string encodeSingleValue(LogType type, folly::StringPiece val) {
std::string encoded;
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/LogEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ enum LogType : char {
OP_ADD_LEARNER = 0x07,
};

std::string encodeKV(const folly::StringPiece& key,
const folly::StringPiece& val);

std::pair<folly::StringPiece, folly::StringPiece> decodeKV(const std::string& data);

std::string encodeSingleValue(LogType type, folly::StringPiece val);
folly::StringPiece decodeSingleValue(folly::StringPiece encoded);
Expand Down
7 changes: 5 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "network/NetworkUtils.h"
#include "fs/FileUtils.h"
#include "kvstore/RocksEngine.h"
#include "kvstore/SnapshotManagerImpl.h"

DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
DEFINE_int32(custom_filter_interval_secs, 24 * 3600, "interval to trigger custom compaction");
Expand All @@ -36,7 +37,8 @@ NebulaStore::~NebulaStore() {
bool NebulaStore::init() {
LOG(INFO) << "Start the raft service...";
bgWorkers_ = std::make_shared<thread::GenericThreadPool>();
bgWorkers_->start(FLAGS_num_workers);
bgWorkers_->start(FLAGS_num_workers, "nebula-bgworkers");
snapshot_.reset(new SnapshotManagerImpl(this));
raftService_ = raftex::RaftexService::createService(ioPool_,
workers_,
raftAddr_.second);
Expand Down Expand Up @@ -208,7 +210,8 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
ioPool_,
bgWorkers_,
flusher_.get(),
workers_);
workers_,
snapshot_);
auto partMeta = options_.partMan_->partMeta(spaceId, partId);
std::vector<HostAddr> peers;
for (auto& h : partMeta.peers_) {
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "kvstore/PartManager.h"
#include "kvstore/Part.h"
#include "kvstore/KVEngine.h"
#include "kvstore/raftex/SnapshotManager.h"

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -199,6 +200,7 @@ class NebulaStore : public KVStore, public Handler {

std::shared_ptr<raftex::RaftexService> raftService_;
std::unique_ptr<wal::BufferFlusher> flusher_;
std::shared_ptr<raftex::SnapshotManager> snapshot_;
};

} // namespace kvstore
Expand Down
30 changes: 28 additions & 2 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ Part::Part(GraphSpaceID spaceId,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<thread::GenericThreadPool> workers,
wal::BufferFlusher* flusher,
std::shared_ptr<folly::Executor> handlers)
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan)
: RaftPart(FLAGS_cluster_id,
spaceId,
partId,
Expand All @@ -49,7 +50,8 @@ Part::Part(GraphSpaceID spaceId,
flusher,
ioPool,
workers,
handlers)
handlers,
snapshotMan)
, spaceId_(spaceId)
, partId_(partId)
, walPath_(walPath)
Expand Down Expand Up @@ -253,6 +255,30 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED;
}

std::pair<int64_t, int64_t> Part::commitSnapshot(const std::vector<std::string>& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
auto batch = engine_->startBatchWrite();
int64_t count = 0;
int64_t size = 0;
for (auto& row : rows) {
count++;
size += row.size();
auto kv = decodeKV(row);
CHECK_EQ(ResultCode::SUCCEEDED, batch->put(kv.first, kv.second));
}
if (finished) {
std::string commitMsg;
commitMsg.reserve(sizeof(LogID) + sizeof(TermID));
commitMsg.append(reinterpret_cast<char*>(&committedLogId), sizeof(LogID));
commitMsg.append(reinterpret_cast<char*>(&committedLogTerm), sizeof(TermID));
batch->put(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_), commitMsg);
}
CHECK_EQ(ResultCode::SUCCEEDED, engine_->commitBatchWrite(std::move(batch)));
return std::make_pair(count, size);
}

bool Part::preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
Expand Down
14 changes: 13 additions & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
#include "raftex/RaftPart.h"
#include "kvstore/Common.h"
#include "kvstore/KVEngine.h"
#include "kvstore/raftex/SnapshotManager.h"

namespace nebula {
namespace kvstore {

class Part : public raftex::RaftPart {
friend class SnapshotManager;
public:
Part(GraphSpaceID spaceId,
PartitionID partId,
Expand All @@ -25,7 +27,8 @@ class Part : public raftex::RaftPart {
std::shared_ptr<folly::IOThreadPoolExecutor> pool,
std::shared_ptr<thread::GenericThreadPool> workers,
wal::BufferFlusher* flusher,
std::shared_ptr<folly::Executor> handlers);
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan);


virtual ~Part() {
Expand Down Expand Up @@ -65,6 +68,15 @@ class Part : public raftex::RaftPart {
ClusterID clusterId,
const std::string& log) override;

std::pair<int64_t, int64_t> commitSnapshot(const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;

void cleanup() override {
LOG(INFO) << idStr_ << "Clean up all data, not implement!";
}

protected:
GraphSpaceID spaceId_;
PartitionID partId_;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class RocksWriteBatch : public WriteBatch {
rocksdb::DB* db_{nullptr};

public:
explicit RocksWriteBatch(rocksdb::DB* db) : db_(db) {}
explicit RocksWriteBatch(rocksdb::DB* db) : batch_(4096), db_(db) {}

virtual ~RocksWriteBatch() = default;

Expand Down
48 changes: 48 additions & 0 deletions src/kvstore/SnapshotManagerImpl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#include "kvstore/SnapshotManagerImpl.h"
#include "base/NebulaKeyUtils.h"
#include "kvstore/LogEncoder.h"

DEFINE_int32(snapshot_batch_size, 1024 * 1024 * 10, "batch size for snapshot");

namespace nebula {
namespace kvstore {

void SnapshotManagerImpl::accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) {
CHECK_NOTNULL(store_);
std::unique_ptr<KVIterator> iter;
auto prefix = NebulaKeyUtils::prefix(partId);
store_->prefix(spaceId, partId, prefix, &iter);
std::vector<std::string> data;
data.reserve(1024);
int32_t batchSize = 0;
int64_t totalSize = 0;
int64_t totalCount = 0;
while (iter->valid()) {
if (batchSize >= FLAGS_snapshot_batch_size) {
cb(std::move(data), totalCount, totalSize, false);
data.clear();
batchSize = 0;
}
auto key = iter->key();
auto val = iter->val();
data.emplace_back(encodeKV(key, val));
batchSize += data.back().size();
totalSize += data.back().size();
totalCount++;
iter->next();
}
if (data.size() > 0) {
cb(std::move(data), totalCount, totalSize, true);
}
}
} // namespace kvstore
} // namespace nebula


33 changes: 33 additions & 0 deletions src/kvstore/SnapshotManagerImpl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef KVSTORE_SNAPSHOTMANAGERIMPL_H_
#define KVSTORE_SNAPSHOTMANAGERIMPL_H_

#include "base/Base.h"
#include "kvstore/raftex/SnapshotManager.h"
#include "kvstore/KVStore.h"

namespace nebula {
namespace kvstore {

class SnapshotManagerImpl : public raftex::SnapshotManager {
public:
explicit SnapshotManagerImpl(KVStore* kv) : store_(kv) {}

void accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) override;

private:
KVStore* store_;
};

} // namespace kvstore
} // namespace nebula

#endif // KVSTORE_SNAPSHOTMANAGERIMPL_H_

1 change: 1 addition & 0 deletions src/kvstore/raftex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ nebula_add_library(
RaftPart.cpp
RaftexService.cpp
Host.cpp
SnapshotManager.cpp
)

add_subdirectory(test)
Loading

0 comments on commit 2f46e97

Please sign in to comment.