Skip to content

Commit

Permalink
Enhance metrics for KVStore write (pingcap#8472)
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinNeo committed Dec 21, 2023
1 parent 546316f commit ddfcc12
Show file tree
Hide file tree
Showing 19 changed files with 839 additions and 63 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
M(OpenFileForReadWrite) \
M(MemoryTracking) \
M(MemoryTrackingInBackgroundProcessingPool) \
M(MemoryTrackingKVStore) \
M(LogicalCPUCores) \
M(MemoryCapacity) \
M(PSMVCCNumSnapshots) \
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace CurrentMetrics
extern const Metric MemoryTrackingQueryStorageTask;
extern const Metric MemoryTrackingFetchPages;
extern const Metric MemoryTrackingSharedColumnData;
extern const Metric MemoryTrackingKVStore;
} // namespace CurrentMetrics

std::atomic<Int64> real_rss{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0};
Expand Down Expand Up @@ -76,13 +77,17 @@ static String storageMemoryUsageDetail()
{
return fmt::format(
"non-query: peak={}, amount={}; "
"kvstore: peak={}, amount={}; "
"query-storage-task: peak={}, amount={}; "
"fetch-pages: peak={}, amount={}; "
"shared-column-data: peak={}, amount={}.",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->getPeak())
: "0",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->get())
: "0",
root_of_kvstore_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_kvstore_mem_trackers->getPeak())
: "0",
root_of_kvstore_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_kvstore_mem_trackers->get()) : "0",
sub_root_of_query_storage_task_mem_trackers
? formatReadableSizeWithBinarySuffix(sub_root_of_query_storage_task_mem_trackers->getPeak())
: "0",
Expand Down Expand Up @@ -293,6 +298,7 @@ thread_local MemoryTracker * current_memory_tracker = nullptr;

std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers = MemoryTracker::createGlobalRoot();
std::shared_ptr<MemoryTracker> root_of_query_mem_trackers = MemoryTracker::createGlobalRoot();
std::shared_ptr<MemoryTracker> root_of_kvstore_mem_trackers = MemoryTracker::createGlobalRoot();

std::shared_ptr<MemoryTracker> sub_root_of_query_storage_task_mem_trackers;
std::shared_ptr<MemoryTracker> fetch_pages_mem_tracker;
Expand Down Expand Up @@ -323,6 +329,8 @@ void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit)
shared_column_data_mem_tracker
= MemoryTracker::create(0, sub_root_of_query_storage_task_mem_trackers.get(), log_in_destructor);
shared_column_data_mem_tracker->setAmountMetric(CurrentMetrics::MemoryTrackingSharedColumnData);

root_of_kvstore_mem_trackers->setAmountMetric(CurrentMetrics::MemoryTrackingKVStore);
}

namespace CurrentMemoryTracker
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ extern thread_local MemoryTracker * current_memory_tracker;

extern std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers;
extern std::shared_ptr<MemoryTracker> root_of_query_mem_trackers;
extern std::shared_ptr<MemoryTracker> root_of_kvstore_mem_trackers;

// Initialize in `initStorageMemoryTracker`.
// If a memory tracker of storage tasks is driven by query, it should inherit `sub_root_of_query_storage_task_mem_trackers`.
Expand Down
90 changes: 88 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Common/TiFlashBuildInfo.h>
#include <Common/nocopyable.h>
#include <common/types.h>
#include <Common/Exception.h>
#include <prometheus/counter.h>
#include <prometheus/exposer.h>
#include <prometheus/gateway.h>
Expand All @@ -35,6 +36,9 @@

namespace DB
{
constexpr size_t RAFT_REGION_BIG_WRITE_THRES = 2 * 1024;
constexpr size_t RAFT_REGION_BIG_WRITE_MAX = 4 * 1024 * 1024; // raft-entry-max-size = 8MiB
static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Invalid RAFT_REGION_BIG_WRITE_THRES");
/// Central place to define metrics across all subsystems.
/// Refer to gtest_tiflash_metrics.cpp for more sample defines.
/// Usage:
Expand Down Expand Up @@ -445,12 +449,13 @@ namespace DB
M(tiflash_raft_raft_frequent_events_count, \
"Raft frequent event counter", \
Counter, \
F(type_write_commit, {{"type", "write_commit"}}), \
F(type_write, {{"type", "write"}})) \
M(tiflash_raft_region_flush_bytes, \
"Bucketed histogram of region flushed bytes", \
Histogram, \
F(type_flushed, {{"type", "flushed"}}, ExpBuckets{32, 2, 21}), \
F(type_unflushed, {{"type", "unflushed"}}, ExpBuckets{32, 2, 21})) \
F(type_flushed, {{"type", "flushed"}}, ExpBucketsWithRange{32, 4, 32 * 1024 * 1024}), \
F(type_unflushed, {{"type", "unflushed"}}, ExpBucketsWithRange{32, 4, 32 * 1024 * 1024})) \
M(tiflash_raft_entry_size, \
"Bucketed histogram entry size", \
Histogram, \
Expand All @@ -461,6 +466,20 @@ namespace DB
F(type_raft_snapshot, {{"type", "raft_snapshot"}}), \
F(type_dt_on_disk, {{"type", "dt_on_disk"}}), \
F(type_dt_total, {{"type", "dt_total"}})) \
M(tiflash_raft_throughput_bytes, \
"Raft handled bytes in global", \
Counter, \
F(type_write, {{"type", "write"}}), \
F(type_write_committed, {{"type", "write_committed"}})) \
M(tiflash_raft_write_flow_bytes, \
"Bucketed histogram of bytes for each write", \
Histogram, \
F(type_ingest_uncommitted, {{"type", "ingest_uncommitted"}}, ExpBucketsWithRange{16, 4, 64 * 1024}), \
F(type_snapshot_uncommitted, {{"type", "snapshot_uncommitted"}}, ExpBucketsWithRange{16, 4, 1024 * 1024}), \
F(type_write_committed, {{"type", "write_committed"}}, ExpBucketsWithRange{16, 2, 1024 * 1024}), \
F(type_big_write_to_region, \
{{"type", "big_write_to_region"}}, \
ExpBucketsWithRange{RAFT_REGION_BIG_WRITE_THRES, 4, RAFT_REGION_BIG_WRITE_MAX})) \
M(tiflash_raft_snapshot_total_bytes, \
"Bucketed snapshot total size", \
Histogram, \
Expand Down Expand Up @@ -752,6 +771,25 @@ struct ExpBuckets
const double base;
const size_t size;

constexpr ExpBuckets(const double start_, const double base_, const size_t size_)
: start(start_)
, base(base_)
, size(size_)
{
#ifndef NDEBUG
// Checks under debug mode
// Check the base
RUNTIME_CHECK_MSG(base > 1.0, "incorrect base for ExpBuckets, start={} base={} size={}", start, base, size);
// Too many buckets will bring more network flow by transferring metrics
RUNTIME_CHECK_MSG(
size <= 50,
"too many metrics buckets, reconsider step/unit, start={} base={} size={}",
start,
base,
size);
#endif
}

// NOLINTNEXTLINE(google-explicit-constructor)
inline operator prometheus::Histogram::BucketBoundaries() const &&
{
Expand All @@ -765,6 +803,54 @@ struct ExpBuckets
}
};

/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^x]
/// such x that start * base^(x-1) < end, and start * base^x >= end.
struct ExpBucketsWithRange
{
static size_t getSize(double l, double r, double b)
{
return static_cast<size_t>(::ceil(::log(r / l) / ::log(b))) + 1;
}

ExpBucketsWithRange(double start_, double base_, double end_)
: start(start_)
, base(base_)
, size(ExpBucketsWithRange::getSize(start_, end_, base_))
{
#ifndef NDEBUG
// Check the base
RUNTIME_CHECK_MSG(
base > 1.0,
"incorrect base for ExpBucketsWithRange, start={} base={} end={}",
start,
base,
end_);
RUNTIME_CHECK_MSG(
start_ < end_,
"incorrect start/end for ExpBucketsWithRange, start={} base={} end={}",
start,
base,
end_);
#endif
}
// NOLINTNEXTLINE(google-explicit-constructor)
inline operator prometheus::Histogram::BucketBoundaries() const &&
{
prometheus::Histogram::BucketBoundaries buckets(size);
double current = start;
std::for_each(buckets.begin(), buckets.end(), [&](auto & e) {
e = current;
current *= base;
});
return buckets;
}

private:
const double start;
const double base;
const size_t size;
};

// Buckets with same width
struct EqualWidthBuckets
{
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/KVStore/Decode/DecodedTiKVKeyValue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Storages/KVStore/Decode/DecodedTiKVKeyValue.h>
#include <Storages/KVStore/TiKVHelpers/TiKVKeyspaceIDImpl.h>
#include <Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h>

namespace DB
{
Expand All @@ -37,4 +38,10 @@ std::string DecodedTiKVKey::makeKeyspacePrefix(KeyspaceID keyspace_id)
{
return TiKVKeyspaceID::makeKeyspacePrefix(keyspace_id);
}

HandleID RawTiDBPK::getHandleID() const
{
const auto & pk = *this;
return RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(pk->data()));
}
} // namespace DB
11 changes: 9 additions & 2 deletions dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,16 @@ DM::WriteResult RegionTable::writeBlockByRegion(

reportUpstreamLatency(*data_list_read);
auto write_result = writeRegionDataToStorage(context, region, *data_list_read, log);

auto prev_region_size = region->dataSize();
RemoveRegionCommitCache(region, *data_list_read, lock_region);

auto new_region_size = region->dataSize();
if likely (new_region_size <= prev_region_size)
{
auto committed_bytes = prev_region_size - new_region_size;
GET_METRIC(tiflash_raft_write_flow_bytes, type_write_committed).Observe(committed_bytes);
GET_METRIC(tiflash_raft_throughput_bytes, type_write_committed).Increment(committed_bytes);
GET_METRIC(tiflash_raft_raft_frequent_events_count, type_write_commit).Increment(1);
}
/// Save removed data to outer.
data_list_to_remove = std::move(*data_list_read);
return write_result;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ void KVStore::StoreMeta::update(Base && base_)

KVStore::~KVStore()
{
LOG_INFO(log, "Destroy KVStore");
releaseReadIndexWorkers();
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ void KVStore::onSnapshot(
manage_lock.index.add(new_region);
}

GET_METRIC(tiflash_raft_write_flow_bytes, type_snapshot_uncommitted).Observe(new_region->dataSize());
persistRegion(*new_region, &region_lock, PersistRegionReason::ApplySnapshotCurRegion, "");

tmt.getRegionTable().shrinkRegionRange(*new_region);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/IngestSST.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ void Region::finishIngestSSTByDTFile(RegionPtr && temp_region, UInt64 index, UIn
{
std::unique_lock<std::shared_mutex> lock(mutex);

auto uncommitted_ingest = temp_region->dataSize();
GET_METRIC(tiflash_raft_write_flow_bytes, type_ingest_uncommitted).Observe(uncommitted_ingest);
if (temp_region)
{
// Merge the uncommitted data from `temp_region`.
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(

size_t put_key_count = 0;
size_t del_key_count = 0;
// How many bytes has been written to KVStore(and maybe then been moved to underlying DeltaTree).
// We don't count DEL because it is only used to delete LOCK, which is small and not count in doInsert.
size_t write_size = 0;
size_t prev_size = dataSize();

SCOPE_EXIT({
GET_METRIC(tiflash_raft_apply_write_command_duration_seconds, type_write).Observe(watch.elapsedSeconds());
Expand All @@ -322,6 +326,10 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
GET_METRIC(tiflash_raft_raft_frequent_events_count, type_write).Increment(1);
GET_METRIC(tiflash_raft_process_keys, type_write_put).Increment(put_key_count);
GET_METRIC(tiflash_raft_process_keys, type_write_del).Increment(del_key_count);
auto after_size = dataSize();
if (after_size > prev_size + RAFT_REGION_BIG_WRITE_THRES)
GET_METRIC(tiflash_raft_write_flow_bytes, type_big_write_to_region).Observe(after_size - prev_size);
GET_METRIC(tiflash_raft_throughput_bytes, type_write).Increment(write_size);
});

if (cmds.len)
Expand Down Expand Up @@ -349,11 +357,11 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
if (is_v2)
{
// There may be orphan default key in a snapshot.
doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::AllowSame);
write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::AllowSame);
}
else
{
doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::Deny);
write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::Deny);
}
}
catch (Exception & e)
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ RegionDataRes RegionCFDataBase<Trait>::insert(TiKVKey && key, TiKVValue && value
{
const auto & raw_key = RecordKVFormat::decodeTiKVKey(key);
auto kv_pair = Trait::genKVPair(std::move(key), raw_key, std::move(value));

if (!kv_pair)
return 0;

Expand All @@ -58,6 +57,7 @@ RegionDataRes RegionCFDataBase<RegionLockCFDataTrait>::insert(TiKVKey && key, Ti
Pair kv_pair = RegionLockCFDataTrait::genKVPair(std::move(key), std::move(value));
// according to the process of pessimistic lock, just overwrite.
data.insert_or_assign(std::move(kv_pair.first), std::move(kv_pair.second));
// lock cf is not count into the size of RegionData, always return 0
return 0;
}

Expand Down Expand Up @@ -94,6 +94,8 @@ RegionDataRes RegionCFDataBase<Trait>::insert(std::pair<Key, Value> && kv_pair,
+ " new_val: " + prev_value.toDebugString(),
ErrorCodes::LOGICAL_ERROR);
}
// duplicated key is ignored
return 0;
}
else
{
Expand Down Expand Up @@ -122,7 +124,9 @@ size_t RegionCFDataBase<Trait>::calcTiKVKeyValueSize(const TiKVKey & key, const
return 0;
}
else
{
return key.dataSize() + value.dataSize();
}
}


Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct RegionCFDataBase

size_t getSize() const;

RegionCFDataBase() {}
RegionCFDataBase() = default;
RegionCFDataBase(RegionCFDataBase && region);
RegionCFDataBase & operator=(RegionCFDataBase && region);

Expand Down
Loading

0 comments on commit ddfcc12

Please sign in to comment.