Skip to content

Commit

Permalink
address @wenhaocs's comments, rebased (#3846)
Browse files Browse the repository at this point in the history
confirm with critical27
  • Loading branch information
critical27 authored Mar 10, 2022
1 parent 481fef7 commit b88c1e1
Show file tree
Hide file tree
Showing 25 changed files with 165 additions and 207 deletions.
1 change: 0 additions & 1 deletion resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"v",
"heartbeat_interval_secs",
"meta_client_retry_times",
"slow_op_threshold_ms",
"clean_wal_interval_secs",
"wal_ttl",
"clean_wal_interval_secs",
Expand Down
1 change: 0 additions & 1 deletion src/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ nebula_add_library(
Status.cpp
SanitizerOptions.cpp
SignalHandler.cpp
SlowOpTracker.cpp
${gdb_debug_script}
)

Expand Down
11 changes: 0 additions & 11 deletions src/common/base/SlowOpTracker.cpp

This file was deleted.

40 changes: 0 additions & 40 deletions src/common/base/SlowOpTracker.h

This file was deleted.

7 changes: 0 additions & 7 deletions src/common/base/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ nebula_add_test(
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME slow_op_tracker_test
SOURCES SlowOpTrackerTest.cpp
OBJECTS $<TARGET_OBJECTS:base_obj> $<TARGET_OBJECTS:time_obj>
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME lru_test
SOURCES ConcurrentLRUCacheTest.cpp
Expand Down
28 changes: 0 additions & 28 deletions src/common/base/test/SlowOpTrackerTest.cpp

This file was deleted.

1 change: 0 additions & 1 deletion src/common/meta/GflagsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ std::unordered_map<std::string, std::pair<cpp2::ConfigMode, bool>> GflagsManager
{"heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"agent_heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}},
{"slow_op_threshold_ms", {cpp2::ConfigMode::MUTABLE, false}},
{"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}},
{"clean_wal_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"custom_filter_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ class KVEngine {
* @param value Pointer of value
* @return nebula::cpp2::ErrorCode
*/
virtual nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) = 0;
virtual nebula::cpp2::ErrorCode get(const std::string& key,
std::string* value,
const void* snapshot = nullptr) = 0;

/**
* @brief Read a list of keys
Expand Down
4 changes: 3 additions & 1 deletion src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class KVStore {
virtual const void* GetSnapshot(GraphSpaceID spaceId,
PartitionID partID,
bool canReadFromFollower = false) = 0;

/**
* @brief Release snapshot.
*
Expand All @@ -131,7 +132,8 @@ class KVStore {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower = false) = 0;
bool canReadFromFollower = false,
const void* snapshot = nullptr) = 0;

/**
* @brief Read a list of keys
Expand Down
64 changes: 50 additions & 14 deletions src/kvstore/NebulaSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,69 @@ NebulaSnapshotManager::NebulaSnapshotManager(NebulaStore* kv) : store_(kv) {
void NebulaSnapshotManager::accessAllRowsInSnapshot(GraphSpaceID spaceId,
PartitionID partId,
raftex::SnapshotCallback cb) {
auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
CHECK_NOTNULL(store_);
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
static constexpr LogID kInvalidLogId = -1;
static constexpr TermID kInvalidLogTerm = -1;
std::vector<std::string> data;
int64_t totalSize = 0;
int64_t totalCount = 0;
LOG(INFO) << folly::sformat(
"Space {} Part {} start send snapshot, rate limited to {}, batch size is {}",
spaceId,
partId,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

const void* snapshot = store_->GetSnapshot(spaceId, partId);
CHECK_NOTNULL(store_);
auto partRet = store_->part(spaceId, partId);
if (!ok(partRet)) {
LOG(INFO) << folly::sformat("Failed to find space {} part {]", spaceId, partId);
cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return;
}
// Create a rocksdb snapshot
auto snapshot = store_->GetSnapshot(spaceId, partId);
SCOPE_EXIT {
if (snapshot != nullptr) {
store_->ReleaseSnapshot(spaceId, partId, snapshot);
}
};
auto part = nebula::value(partRet);
// Get the commit log id and commit log term of specified partition
std::string val;
auto commitRet = part->engine()->get(NebulaKeyUtils::systemCommitKey(partId), &val, snapshot);
if (commitRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << folly::sformat(
"Cannot fetch the commit log id and term of space {} part {}", spaceId, partId);
cb(kInvalidLogId, kInvalidLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return;
}
CHECK_EQ(val.size(), sizeof(LogID) + sizeof(TermID));
LogID commitLogId;
TermID commitLogTerm;
memcpy(reinterpret_cast<void*>(&commitLogId), val.data(), sizeof(LogID));
memcpy(reinterpret_cast<void*>(&commitLogTerm), val.data() + sizeof(LogID), sizeof(TermID));

LOG(INFO) << folly::sformat(
"Space {} Part {} start send snapshot of commitLogId {} commitLogTerm {}, rate limited to "
"{}, batch size is {}",
spaceId,
partId,
commitLogId,
commitLogTerm,
FLAGS_snapshot_part_rate_limit,
FLAGS_snapshot_batch_size);

auto rateLimiter = std::make_unique<kvstore::RateLimiter>();
auto tables = NebulaKeyUtils::snapshotPrefix(partId);
for (const auto& prefix : tables) {
if (!accessTable(spaceId,
partId,
snapshot,
prefix,
cb,
commitLogId,
commitLogTerm,
data,
totalCount,
totalSize,
rateLimiter.get())) {
return;
}
}
cb(data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::DONE);
}

// Promise is set in callback. Access part of the data, and try to send to
Expand All @@ -72,6 +101,8 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
LogID commitLogId,
TermID commitLogTerm,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
Expand All @@ -81,7 +112,7 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(2) << "[spaceId:" << spaceId << ", partId:" << partId << "] access prefix failed"
<< ", error code:" << static_cast<int32_t>(ret);
cb(data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
cb(commitLogId, commitLogTerm, data, totalCount, totalSize, raftex::SnapshotStatus::FAILED);
return false;
}
data.reserve(kReserveNum);
Expand All @@ -91,7 +122,12 @@ bool NebulaSnapshotManager::accessTable(GraphSpaceID spaceId,
rateLimiter->consume(static_cast<double>(batchSize), // toConsume
static_cast<double>(FLAGS_snapshot_part_rate_limit), // rate
static_cast<double>(FLAGS_snapshot_part_rate_limit)); // burstSize
if (cb(data, totalCount, totalSize, raftex::SnapshotStatus::IN_PROGRESS)) {
if (cb(commitLogId,
commitLogTerm,
data,
totalCount,
totalSize,
raftex::SnapshotStatus::IN_PROGRESS)) {
data.clear();
batchSize = 0;
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/NebulaSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class NebulaSnapshotManager : public raftex::SnapshotManager {
const void* snapshot,
const std::string& prefix,
raftex::SnapshotCallback& cb,
LogID commitLogId,
TermID commitLogTerm,
std::vector<std::string>& data,
int64_t& totalCount,
int64_t& totalSize,
Expand Down
5 changes: 3 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,8 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower) {
bool canReadFromFollower,
const void* snapshot) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
return error(ret);
Expand All @@ -588,7 +589,7 @@ nebula::cpp2::ErrorCode NebulaStore::get(GraphSpaceID spaceId,
return part->isLeader() ? nebula::cpp2::ErrorCode::E_LEADER_LEASE_FAILED
: nebula::cpp2::ErrorCode::E_LEADER_CHANGED;
}
return part->engine()->get(key, value);
return part->engine()->get(key, value, snapshot);
}

const void* NebulaStore::GetSnapshot(GraphSpaceID spaceId,
Expand Down
3 changes: 2 additions & 1 deletion src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ class NebulaStore : public KVStore, public Handler {
PartitionID partId,
const std::string& key,
std::string* value,
bool canReadFromFollower = false) override;
bool canReadFromFollower = false,
const void* snapshot = nullptr) override;

/**
* @brief Read a list of keys
Expand Down
59 changes: 10 additions & 49 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,26 +308,12 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
}
case OP_TRANS_LEADER: {
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
commitTransLeader(newLeader);
} else {
VLOG(1) << idStr_ << "Skip commit stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
commitTransLeader(newLeader);
break;
}
case OP_REMOVE_PEER: {
auto peer = decodeHost(OP_REMOVE_PEER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
commitRemovePeer(peer);
} else {
VLOG(1) << idStr_ << "Skip commit stale remove peer " << peer
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
commitRemovePeer(peer);
break;
}
default: {
Expand Down Expand Up @@ -405,51 +391,26 @@ bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, const
switch (log[sizeof(int64_t)]) {
case OP_ADD_LEARNER: {
auto learner = decodeHost(OP_ADD_LEARNER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess add learner " << learner;
addLearner(learner);
} else {
VLOG(1) << idStr_ << "Skip stale add learner " << learner << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
LOG(INFO) << idStr_ << "preprocess add learner " << learner;
addLearner(learner);
break;
}
case OP_TRANS_LEADER: {
auto newLeader = decodeHost(OP_TRANS_LEADER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
} else {
VLOG(1) << idStr_ << "Skip stale transfer leader " << newLeader
<< ", the part is opened at " << startTimeMs_ << ", but the log timestamp is "
<< ts;
}
LOG(INFO) << idStr_ << "preprocess trans leader " << newLeader;
preProcessTransLeader(newLeader);
break;
}
case OP_ADD_PEER: {
auto peer = decodeHost(OP_ADD_PEER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
} else {
VLOG(1) << idStr_ << "Skip stale add peer " << peer << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
LOG(INFO) << idStr_ << "preprocess add peer " << peer;
addPeer(peer);
break;
}
case OP_REMOVE_PEER: {
auto peer = decodeHost(OP_REMOVE_PEER, log);
auto ts = getTimestamp(log);
if (ts > startTimeMs_) {
VLOG(1) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
} else {
VLOG(1) << idStr_ << "Skip stale remove peer " << peer << ", the part is opened at "
<< startTimeMs_ << ", but the log timestamp is " << ts;
}
LOG(INFO) << idStr_ << "preprocess remove peer " << peer;
preProcessRemovePeer(peer);
break;
}
default: {
Expand Down
Loading

0 comments on commit b88c1e1

Please sign in to comment.