Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add more metrics, rebased again
Browse files Browse the repository at this point in the history
critical27 committed Feb 10, 2022
1 parent 3ed30de commit d276a84
Showing 12 changed files with 53 additions and 119 deletions.
1 change: 0 additions & 1 deletion resources/gflags.json
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@
"v",
"heartbeat_interval_secs",
"meta_client_retry_times",
"slow_op_threshold_ms",
"clean_wal_interval_secs",
"wal_ttl",
"clean_wal_interval_secs",
1 change: 0 additions & 1 deletion src/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@ nebula_add_library(
Status.cpp
SanitizerOptions.cpp
SignalHandler.cpp
SlowOpTracker.cpp
${gdb_debug_script}
)

11 changes: 0 additions & 11 deletions src/common/base/SlowOpTracker.cpp

This file was deleted.

40 changes: 0 additions & 40 deletions src/common/base/SlowOpTracker.h

This file was deleted.

7 changes: 0 additions & 7 deletions src/common/base/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -79,13 +79,6 @@ nebula_add_test(
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME slow_op_tracker_test
SOURCES SlowOpTrackerTest.cpp
OBJECTS $<TARGET_OBJECTS:base_obj> $<TARGET_OBJECTS:time_obj>
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME lru_test
SOURCES ConcurrentLRUCacheTest.cpp
28 changes: 0 additions & 28 deletions src/common/base/test/SlowOpTrackerTest.cpp

This file was deleted.

1 change: 0 additions & 1 deletion src/common/meta/GflagsManager.cpp
Original file line number Diff line number Diff line change
@@ -55,7 +55,6 @@ std::unordered_map<std::string, std::pair<cpp2::ConfigMode, bool>> GflagsManager
{"heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"agent_heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}},
{"slow_op_threshold_ms", {cpp2::ConfigMode::MUTABLE, false}},
{"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}},
{"clean_wal_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"custom_filter_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
9 changes: 8 additions & 1 deletion src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
@@ -10,7 +10,10 @@
#include <thrift/lib/cpp/util/EnumUtils.h>

#include "common/network/NetworkUtils.h"
#include "common/stats/StatsManager.h"
#include "common/time/WallClock.h"
#include "kvstore/raftex/RaftPart.h"
#include "kvstore/stats/KVStats.h"
#include "kvstore/wal/FileBasedWal.h"

DEFINE_uint32(max_appendlog_batch_size,
@@ -158,9 +161,12 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) {

void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr<cpp2::AppendLogRequest> req) {
using TransportException = apache::thrift::transport::TTransportException;
auto beforeRpcUs = time::WallClock::fastNowInMicroSec();
sendAppendLogRequest(eb, req)
.via(eb)
.thenValue([eb, self = shared_from_this()](cpp2::AppendLogResponse&& resp) {
.thenValue([eb, beforeRpcUs, self = shared_from_this()](cpp2::AppendLogResponse&& resp) {
stats::StatsManager::addValue(kAppendLogLatencyUs,
time::WallClock::fastNowInMicroSec() - beforeRpcUs);
VLOG_IF(1, FLAGS_trace_raft)
<< self->idStr_ << "AppendLogResponse "
<< "code " << apache::thrift::util::enumNameSafe(resp.get_error_code()) << ", currTerm "
@@ -333,6 +339,7 @@ nebula::cpp2::ErrorCode Host::startSendSnapshot() {
<< ", firstLogId in wal = " << part_->wal()->firstLogId()
<< ", lastLogId in wal = " << part_->wal()->lastLogId();
sendingSnapshot_ = true;
stats::StatsManager::addValue(kNumSendSnapshot);
part_->snapshot_->sendSnapshot(part_, addr_)
.thenValue([self = shared_from_this()](auto&& status) {
std::lock_guard<std::mutex> g(self->lock_);
48 changes: 24 additions & 24 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
@@ -12,11 +12,11 @@

#include "common/base/Base.h"
#include "common/base/CollectNSucceeded.h"
#include "common/base/SlowOpTracker.h"
#include "common/network/NetworkUtils.h"
#include "common/stats/StatsManager.h"
#include "common/thread/NamedThread.h"
#include "common/thrift/ThriftClientManager.h"
#include "common/time/ScopedTimer.h"
#include "common/time/WallClock.h"
#include "common/utils/LogStrListIterator.h"
#include "interface/gen-cpp2/RaftexServiceAsyncClient.h"
@@ -741,18 +741,18 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
prevLogTerm = lastLogTerm_;
committed = committedLogId_;
// Step 1: Write WAL
SlowOpTracker tracker;
if (!wal_->appendLogs(iter)) {
VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL";
res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL;
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
break;
{
SCOPED_TIMER(&execTime_);
if (!wal_->appendLogs(iter)) {
VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL";
res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL;
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
break;
}
}
stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_);
lastId = wal_->lastLogId();
if (tracker.slow()) {
tracker.output(idStr_, folly::stringPrintf("Write WAL, total %ld", lastId - prevLogId + 1));
}
VLOG(4) << idStr_ << "Succeeded writing logs [" << iter.firstLogId() << ", " << lastId
<< "] to WAL";
} while (false);
@@ -797,7 +797,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb,
<< iter.firstLogId() << ", " << lastLogId << "] to all peer hosts";

lastMsgSentDur_.reset();
SlowOpTracker tracker;
auto beforeAppendLogUs = time::WallClock::fastNowInMicroSec();
collectNSucceeded(gen::from(hosts) |
gen::map([self = shared_from_this(),
eb,
@@ -830,13 +830,11 @@ void RaftPart::replicateLogs(folly::EventBase* eb,
prevLogId,
prevLogTerm,
pHosts = std::move(hosts),
tracker](folly::Try<AppendLogResponses>&& result) mutable {
beforeAppendLogUs](folly::Try<AppendLogResponses>&& result) mutable {
VLOG(4) << self->idStr_ << "Received enough response";
CHECK(!result.hasException());
if (tracker.slow()) {
tracker.output(self->idStr_,
folly::stringPrintf("Total send logs: %ld", lastLogId - prevLogId + 1));
}
stats::StatsManager::addValue(kReplicateLogLatencyUs,
time::WallClock::fastNowInMicroSec() - beforeAppendLogUs);
self->processAppendLogResponses(*result,
eb,
std::move(it),
@@ -913,7 +911,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,

{
auto walIt = wal_->iterator(committedId + 1, lastLogId);
SlowOpTracker tracker;
// Step 3: Commit the batch
auto [code, lastCommitId, lastCommitTerm] = commitLogs(std::move(walIt), true);
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
@@ -933,10 +930,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,
} else {
LOG(FATAL) << idStr_ << "Failed to commit logs";
}
if (tracker.slow()) {
tracker.output(idStr_,
folly::stringPrintf("Total commit: %ld", committedLogId_ - committedId));
}
VLOG(4) << idStr_ << "Leader succeeded in committing the logs " << committedId + 1 << " to "
<< lastLogId;
}
@@ -1185,6 +1178,7 @@ folly::Future<bool> RaftPart::leaderElection(bool isPreVote) {

auto proposedTerm = voteReq.get_term();
auto resps = ElectionResponses();
stats::StatsManager::addValue(kNumStartElect);
if (hosts.empty()) {
auto ret = handleElectionResponses(resps, hosts, proposedTerm, isPreVote);
inElection_ = false;
@@ -1462,7 +1456,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req,
// Reset the last message time
lastMsgRecvDur_.reset();
isBlindFollower_ = false;
stats::StatsManager::addValue(kNumRaftVotes);
stats::StatsManager::addValue(kNumGrantVotes);
return;
}

@@ -1585,7 +1579,13 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req,
std::make_move_iterator(req.get_log_str_list().begin() + diffIndex),
std::make_move_iterator(req.get_log_str_list().end()));
RaftLogIterator logIter(firstId, std::move(logEntries));
if (wal_->appendLogs(logIter)) {
bool result = false;
{
SCOPED_TIMER(&execTime_);
result = wal_->appendLogs(logIter);
}
stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_);
if (result) {
CHECK_EQ(lastId, wal_->lastLogId());
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
17 changes: 15 additions & 2 deletions src/kvstore/stats/KVStats.cpp
Original file line number Diff line number Diff line change
@@ -12,17 +12,30 @@ namespace nebula {

stats::CounterId kCommitLogLatencyUs;
stats::CounterId kCommitSnapshotLatencyUs;
stats::CounterId kAppendWalLatencyUs;
stats::CounterId kReplicateLogLatencyUs;
stats::CounterId kAppendLogLatencyUs;
stats::CounterId kTransferLeaderLatencyUs;
stats::CounterId kNumRaftVotes;
stats::CounterId kNumStartElect;
stats::CounterId kNumGrantVotes;
stats::CounterId kNumSendSnapshot;

void initKVStats() {
kCommitLogLatencyUs = stats::StatsManager::registerHisto(
"commit_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kCommitSnapshotLatencyUs = stats::StatsManager::registerHisto(
"commit_snapshot_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kAppendWalLatencyUs = stats::StatsManager::registerHisto(
"append_wal_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kReplicateLogLatencyUs = stats::StatsManager::registerHisto(
"replicate_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kAppendLogLatencyUs = stats::StatsManager::registerHisto(
"append_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kTransferLeaderLatencyUs = stats::StatsManager::registerHisto(
"transfer_leader_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kNumRaftVotes = stats::StatsManager::registerStats("num_raft_votes", "rate, sum");
kNumStartElect = stats::StatsManager::registerStats("num_start_elect", "rate, sum");
kNumGrantVotes = stats::StatsManager::registerStats("num_grant_votes", "rate, sum");
kNumSendSnapshot = stats::StatsManager::registerStats("num_send_snapshot", "rate, sum");
}

} // namespace nebula
7 changes: 6 additions & 1 deletion src/kvstore/stats/KVStats.h
Original file line number Diff line number Diff line change
@@ -13,8 +13,13 @@ namespace nebula {

extern stats::CounterId kCommitLogLatencyUs;
extern stats::CounterId kCommitSnapshotLatencyUs;
extern stats::CounterId kAppendWalLatencyUs;
extern stats::CounterId kReplicateLogLatencyUs;
extern stats::CounterId kAppendLogLatencyUs;
extern stats::CounterId kTransferLeaderLatencyUs;
extern stats::CounterId kNumRaftVotes;
extern stats::CounterId kNumStartElect;
extern stats::CounterId kNumGrantVotes;
extern stats::CounterId kNumSendSnapshot;

void initKVStats();

2 changes: 0 additions & 2 deletions tests/admin/test_configs.py
Original file line number Diff line number Diff line change
@@ -66,7 +66,6 @@ def test_configs(self):
expected_result = [
['GRAPH', 'v', 'int', 'MUTABLE', v],
['GRAPH', 'minloglevel', 'int', 'MUTABLE', 0],
['GRAPH', 'slow_op_threshold_ms', 'int', 'MUTABLE', 100],
['GRAPH', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1],
['GRAPH', 'meta_client_retry_times', 'int', 'MUTABLE', 3],
['GRAPH', 'accept_partial_success', 'bool', 'MUTABLE', False],
@@ -86,7 +85,6 @@ def test_configs(self):
['STORAGE', 'wal_ttl', 'int', 'MUTABLE', 14400],
['STORAGE', 'minloglevel', 'int', 'MUTABLE', 0],
['STORAGE', 'custom_filter_interval_secs', 'int', 'MUTABLE', 86400],
['STORAGE', 'slow_op_threshold_ms', 'int', 'MUTABLE', 100],
['STORAGE', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1],
['STORAGE', 'meta_client_retry_times', 'int', 'MUTABLE', 3],
['STORAGE', 'rocksdb_db_options', 'map', 'MUTABLE', {}],

0 comments on commit d276a84

Please sign in to comment.