Skip to content

Commit

Permalink
add more metrics, rebased again
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Mar 3, 2022
1 parent 99e54ce commit d82f0f9
Show file tree
Hide file tree
Showing 12 changed files with 53 additions and 119 deletions.
1 change: 0 additions & 1 deletion resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"v",
"heartbeat_interval_secs",
"meta_client_retry_times",
"slow_op_threshold_ms",
"clean_wal_interval_secs",
"wal_ttl",
"clean_wal_interval_secs",
Expand Down
1 change: 0 additions & 1 deletion src/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ nebula_add_library(
Status.cpp
SanitizerOptions.cpp
SignalHandler.cpp
SlowOpTracker.cpp
${gdb_debug_script}
)

Expand Down
11 changes: 0 additions & 11 deletions src/common/base/SlowOpTracker.cpp

This file was deleted.

40 changes: 0 additions & 40 deletions src/common/base/SlowOpTracker.h

This file was deleted.

7 changes: 0 additions & 7 deletions src/common/base/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ nebula_add_test(
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME slow_op_tracker_test
SOURCES SlowOpTrackerTest.cpp
OBJECTS $<TARGET_OBJECTS:base_obj> $<TARGET_OBJECTS:time_obj>
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME lru_test
SOURCES ConcurrentLRUCacheTest.cpp
Expand Down
28 changes: 0 additions & 28 deletions src/common/base/test/SlowOpTrackerTest.cpp

This file was deleted.

1 change: 0 additions & 1 deletion src/common/meta/GflagsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ std::unordered_map<std::string, std::pair<cpp2::ConfigMode, bool>> GflagsManager
{"heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"agent_heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}},
{"slow_op_threshold_ms", {cpp2::ConfigMode::MUTABLE, false}},
{"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}},
{"clean_wal_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"custom_filter_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
Expand Down
9 changes: 8 additions & 1 deletion src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
#include <thrift/lib/cpp/util/EnumUtils.h>

#include "common/network/NetworkUtils.h"
#include "common/stats/StatsManager.h"
#include "common/time/WallClock.h"
#include "kvstore/raftex/RaftPart.h"
#include "kvstore/stats/KVStats.h"
#include "kvstore/wal/FileBasedWal.h"

DEFINE_uint32(max_appendlog_batch_size,
Expand Down Expand Up @@ -158,9 +161,12 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) {

void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr<cpp2::AppendLogRequest> req) {
using TransportException = apache::thrift::transport::TTransportException;
auto beforeRpcUs = time::WallClock::fastNowInMicroSec();
sendAppendLogRequest(eb, req)
.via(eb)
.thenValue([eb, self = shared_from_this()](cpp2::AppendLogResponse&& resp) {
.thenValue([eb, beforeRpcUs, self = shared_from_this()](cpp2::AppendLogResponse&& resp) {
stats::StatsManager::addValue(kAppendLogLatencyUs,
time::WallClock::fastNowInMicroSec() - beforeRpcUs);
VLOG_IF(1, FLAGS_trace_raft)
<< self->idStr_ << "AppendLogResponse "
<< "code " << apache::thrift::util::enumNameSafe(resp.get_error_code()) << ", currTerm "
Expand Down Expand Up @@ -333,6 +339,7 @@ nebula::cpp2::ErrorCode Host::startSendSnapshot() {
<< ", firstLogId in wal = " << part_->wal()->firstLogId()
<< ", lastLogId in wal = " << part_->wal()->lastLogId();
sendingSnapshot_ = true;
stats::StatsManager::addValue(kNumSendSnapshot);
part_->snapshot_->sendSnapshot(part_, addr_)
.thenValue([self = shared_from_this()](auto&& status) {
std::lock_guard<std::mutex> g(self->lock_);
Expand Down
48 changes: 24 additions & 24 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

#include "common/base/Base.h"
#include "common/base/CollectNSucceeded.h"
#include "common/base/SlowOpTracker.h"
#include "common/network/NetworkUtils.h"
#include "common/stats/StatsManager.h"
#include "common/thread/NamedThread.h"
#include "common/thrift/ThriftClientManager.h"
#include "common/time/ScopedTimer.h"
#include "common/time/WallClock.h"
#include "common/utils/LogStrListIterator.h"
#include "interface/gen-cpp2/RaftexServiceAsyncClient.h"
Expand Down Expand Up @@ -741,18 +741,18 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
prevLogTerm = lastLogTerm_;
committed = committedLogId_;
// Step 1: Write WAL
SlowOpTracker tracker;
if (!wal_->appendLogs(iter)) {
VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL";
res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL;
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
break;
{
SCOPED_TIMER(&execTime_);
if (!wal_->appendLogs(iter)) {
VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL";
res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL;
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
break;
}
}
stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_);
lastId = wal_->lastLogId();
if (tracker.slow()) {
tracker.output(idStr_, folly::stringPrintf("Write WAL, total %ld", lastId - prevLogId + 1));
}
VLOG(4) << idStr_ << "Succeeded writing logs [" << iter.firstLogId() << ", " << lastId
<< "] to WAL";
} while (false);
Expand Down Expand Up @@ -797,7 +797,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb,
<< iter.firstLogId() << ", " << lastLogId << "] to all peer hosts";

lastMsgSentDur_.reset();
SlowOpTracker tracker;
auto beforeAppendLogUs = time::WallClock::fastNowInMicroSec();
collectNSucceeded(gen::from(hosts) |
gen::map([self = shared_from_this(),
eb,
Expand Down Expand Up @@ -830,13 +830,11 @@ void RaftPart::replicateLogs(folly::EventBase* eb,
prevLogId,
prevLogTerm,
pHosts = std::move(hosts),
tracker](folly::Try<AppendLogResponses>&& result) mutable {
beforeAppendLogUs](folly::Try<AppendLogResponses>&& result) mutable {
VLOG(4) << self->idStr_ << "Received enough response";
CHECK(!result.hasException());
if (tracker.slow()) {
tracker.output(self->idStr_,
folly::stringPrintf("Total send logs: %ld", lastLogId - prevLogId + 1));
}
stats::StatsManager::addValue(kReplicateLogLatencyUs,
time::WallClock::fastNowInMicroSec() - beforeAppendLogUs);
self->processAppendLogResponses(*result,
eb,
std::move(it),
Expand Down Expand Up @@ -918,7 +916,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,

{
auto walIt = wal_->iterator(committedId + 1, lastLogId);
SlowOpTracker tracker;
// Step 3: Commit the batch
auto [code, lastCommitId, lastCommitTerm] = commitLogs(std::move(walIt), true);
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -938,10 +935,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,
} else {
LOG(FATAL) << idStr_ << "Failed to commit logs";
}
if (tracker.slow()) {
tracker.output(idStr_,
folly::stringPrintf("Total commit: %ld", committedLogId_ - committedId));
}
VLOG(4) << idStr_ << "Leader succeeded in committing the logs " << committedId + 1 << " to "
<< lastLogId;
}
Expand Down Expand Up @@ -1190,6 +1183,7 @@ folly::Future<bool> RaftPart::leaderElection(bool isPreVote) {

auto proposedTerm = voteReq.get_term();
auto resps = ElectionResponses();
stats::StatsManager::addValue(kNumStartElect);
if (hosts.empty()) {
auto ret = handleElectionResponses(resps, hosts, proposedTerm, isPreVote);
inElection_ = false;
Expand Down Expand Up @@ -1467,7 +1461,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req,
// Reset the last message time
lastMsgRecvDur_.reset();
isBlindFollower_ = false;
stats::StatsManager::addValue(kNumRaftVotes);
stats::StatsManager::addValue(kNumGrantVotes);
return;
}

Expand Down Expand Up @@ -1590,7 +1584,13 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req,
std::make_move_iterator(req.get_log_str_list().begin() + diffIndex),
std::make_move_iterator(req.get_log_str_list().end()));
RaftLogIterator logIter(firstId, std::move(logEntries));
if (wal_->appendLogs(logIter)) {
bool result = false;
{
SCOPED_TIMER(&execTime_);
result = wal_->appendLogs(logIter);
}
stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_);
if (result) {
CHECK_EQ(lastId, wal_->lastLogId());
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
Expand Down
17 changes: 15 additions & 2 deletions src/kvstore/stats/KVStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,30 @@ namespace nebula {

stats::CounterId kCommitLogLatencyUs;
stats::CounterId kCommitSnapshotLatencyUs;
stats::CounterId kAppendWalLatencyUs;
stats::CounterId kReplicateLogLatencyUs;
stats::CounterId kAppendLogLatencyUs;
stats::CounterId kTransferLeaderLatencyUs;
stats::CounterId kNumRaftVotes;
stats::CounterId kNumStartElect;
stats::CounterId kNumGrantVotes;
stats::CounterId kNumSendSnapshot;

void initKVStats() {
kCommitLogLatencyUs = stats::StatsManager::registerHisto(
"commit_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kCommitSnapshotLatencyUs = stats::StatsManager::registerHisto(
"commit_snapshot_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kAppendWalLatencyUs = stats::StatsManager::registerHisto(
"append_wal_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kReplicateLogLatencyUs = stats::StatsManager::registerHisto(
"replicate_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kAppendLogLatencyUs = stats::StatsManager::registerHisto(
"append_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kTransferLeaderLatencyUs = stats::StatsManager::registerHisto(
"transfer_leader_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kNumRaftVotes = stats::StatsManager::registerStats("num_raft_votes", "rate, sum");
kNumStartElect = stats::StatsManager::registerStats("num_start_elect", "rate, sum");
kNumGrantVotes = stats::StatsManager::registerStats("num_grant_votes", "rate, sum");
kNumSendSnapshot = stats::StatsManager::registerStats("num_send_snapshot", "rate, sum");
}

} // namespace nebula
7 changes: 6 additions & 1 deletion src/kvstore/stats/KVStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ namespace nebula {
// Raft related stats
extern stats::CounterId kCommitLogLatencyUs;
extern stats::CounterId kCommitSnapshotLatencyUs;
extern stats::CounterId kAppendWalLatencyUs;
extern stats::CounterId kReplicateLogLatencyUs;
extern stats::CounterId kAppendLogLatencyUs;
extern stats::CounterId kTransferLeaderLatencyUs;
extern stats::CounterId kNumRaftVotes;
extern stats::CounterId kNumStartElect;
extern stats::CounterId kNumGrantVotes;
extern stats::CounterId kNumSendSnapshot;

void initKVStats();

Expand Down
2 changes: 0 additions & 2 deletions tests/admin/test_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def test_configs(self):
expected_result = [
['GRAPH', 'v', 'int', 'MUTABLE', v],
['GRAPH', 'minloglevel', 'int', 'MUTABLE', 0],
['GRAPH', 'slow_op_threshold_ms', 'int', 'MUTABLE', 100],
['GRAPH', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1],
['GRAPH', 'meta_client_retry_times', 'int', 'MUTABLE', 3],
['GRAPH', 'accept_partial_success', 'bool', 'MUTABLE', False],
Expand All @@ -86,7 +85,6 @@ def test_configs(self):
['STORAGE', 'wal_ttl', 'int', 'MUTABLE', 14400],
['STORAGE', 'minloglevel', 'int', 'MUTABLE', 0],
['STORAGE', 'custom_filter_interval_secs', 'int', 'MUTABLE', 86400],
['STORAGE', 'slow_op_threshold_ms', 'int', 'MUTABLE', 100],
['STORAGE', 'heartbeat_interval_secs', 'int', 'MUTABLE', 1],
['STORAGE', 'meta_client_retry_times', 'int', 'MUTABLE', 3],
['STORAGE', 'rocksdb_db_options', 'map', 'MUTABLE', {}],
Expand Down

0 comments on commit d82f0f9

Please sign in to comment.