Skip to content

Commit

Permalink
Fix metrics when unips is enabled under non-disagg
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <[email protected]>

Add flow by different types

Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang committed Apr 7, 2023
1 parent 2bc5605 commit 5efe631
Show file tree
Hide file tree
Showing 11 changed files with 354 additions and 56 deletions.
87 changes: 66 additions & 21 deletions dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
#include <Common/Allocator.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Common/TiFlashMetrics.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <Core/TiFlashDisaggregatedMode.h>
#include <Databases/IDatabase.h>
#include <IO/UncompressedCache.h>
#include <Interpreters/AsynchronousMetrics.h>
Expand Down Expand Up @@ -125,27 +127,62 @@ static void calculateMaxAndSum(Max & max, Sum & sum, T x)

FileUsageStatistics AsynchronousMetrics::getPageStorageFileUsage()
{
RUNTIME_ASSERT(!(context.getSharedContextDisagg()->isDisaggregatedComputeMode() && context.getSharedContextDisagg()->use_autoscaler));
// Get from RegionPersister
auto & tmt = context.getTMTContext();
auto & kvstore = tmt.getKVStore();
FileUsageStatistics usage = kvstore->getFileUsageStatistics();

// Get the blob file status from all PS V3 instances
if (auto global_storage_pool = context.getGlobalStoragePool(); global_storage_pool != nullptr)
FileUsageStatistics usage;
switch (context.getSharedContextDisagg()->disaggregated_mode)
{
const auto log_usage = global_storage_pool->log_storage->getFileUsageStatistics();
const auto meta_usage = global_storage_pool->meta_storage->getFileUsageStatistics();
const auto data_usage = global_storage_pool->data_storage->getFileUsageStatistics();
case DisaggregatedMode::None:
{
if (auto uni_ps = context.tryGetWriteNodePageStorage(); uni_ps != nullptr)
{
/// When format_version=5 is enabled, then all data are stored in the `uni_ps`
usage.merge(uni_ps->getFileUsageStatistics());
}
else
{
/// When format_version < 5, then there are multiple PageStorage instances

usage.merge(log_usage)
.merge(meta_usage)
.merge(data_usage);
}
// Get from RegionPersister
auto & tmt = context.getTMTContext();
auto & kvstore = tmt.getKVStore();
usage = kvstore->getFileUsageStatistics();

// Get the blob file status from all PS V3 instances
if (auto global_storage_pool = context.getGlobalStoragePool(); global_storage_pool != nullptr)
{
const auto log_usage = global_storage_pool->log_storage->getFileUsageStatistics();
const auto meta_usage = global_storage_pool->meta_storage->getFileUsageStatistics();
const auto data_usage = global_storage_pool->data_storage->getFileUsageStatistics();

if (auto ps_cache = context.getSharedContextDisagg()->rn_page_cache_storage; ps_cache != nullptr)
usage.merge(log_usage)
.merge(meta_usage)
.merge(data_usage);
}
}
break;
}
case DisaggregatedMode::Storage:
{
// disagg write node, all data are stored in the `uni_ps`
if (auto uni_ps = context.getWriteNodePageStorage(); uni_ps != nullptr)
{
usage.merge(uni_ps->getFileUsageStatistics());
}
break;
}
case DisaggregatedMode::Compute:
{
usage.merge(ps_cache->getUniversalPageStorage()->getFileUsageStatistics());
// disagg compute node without auto-scaler, the proxy data are stored in the `uni_ps`
if (auto uni_ps = context.getWriteNodePageStorage(); uni_ps != nullptr)
{
usage.merge(uni_ps->getFileUsageStatistics());
}
// disagg compute node, all cache page data are stored in the `ps_cache`
if (auto ps_cache = context.getSharedContextDisagg()->rn_page_cache_storage; ps_cache != nullptr)
{
usage.merge(ps_cache->getUniversalPageStorage()->getFileUsageStatistics());
}
break;
}
}

return usage;
Expand Down Expand Up @@ -206,7 +243,6 @@ void AsynchronousMetrics::update()
set("MaxDTBackgroundTasksLength", max_dt_background_tasks_length);
}

if (!(context.getSharedContextDisagg()->isDisaggregatedComputeMode() && context.getSharedContextDisagg()->use_autoscaler))
{
const FileUsageStatistics usage = getPageStorageFileUsage();
set("BlobFileNums", usage.total_file_num);
Expand All @@ -217,6 +253,15 @@ void AsynchronousMetrics::update()
set("PagesInMem", usage.num_pages);
}

if (context.getSharedContextDisagg()->isDisaggregatedStorageMode())
{
auto & tmt = context.getTMTContext();
if (auto s3_gc_owner = tmt.getS3GCOwnerManager(); s3_gc_owner->isOwner())
{
GET_METRIC(tiflash_storage_s3_gc_status, type_owner).Set(1.0);
}
}

#if USE_MIMALLOC
#define MI_STATS_SET(X) set("mimalloc." #X, X)

Expand Down Expand Up @@ -256,7 +301,7 @@ void AsynchronousMetrics::update()
M("background_thread.num_runs", uint64_t) \
M("background_thread.run_interval", uint64_t)

#define GET_METRIC(NAME, TYPE) \
#define GET_JEMALLOC_METRIC(NAME, TYPE) \
do \
{ \
TYPE value{}; \
Expand All @@ -265,9 +310,9 @@ void AsynchronousMetrics::update()
set("jemalloc." NAME, value); \
} while (0);

FOR_EACH_METRIC(GET_METRIC);
FOR_EACH_METRIC(GET_JEMALLOC_METRIC);

#undef GET_METRIC
#undef GET_JEMALLOC_METRIC
#undef FOR_EACH_METRIC
}
#endif
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Page/PageConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ static constexpr UInt64 GB = MB * 1024;

enum class StorageType
{
Unknown = 0,
Log = 1,
Data = 2,
Meta = 3,
KVStore = 4,
RaftEngine = 5,
KVEngine = 6,

_MAX_STORAGE_TYPE_, // NOLINT(bugprone-reserved-identifier)
};

enum class PageStorageRunMode : UInt8
Expand Down
6 changes: 0 additions & 6 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
namespace DB::PS::V3
{

struct CPDataWriteStats
{
bool has_new_data = false;
size_t incremental_data_bytes = 0;
size_t compact_data_bytes = 0;
};

using RemoteFileValidSizes = std::unordered_map<String, size_t>;

Expand Down
78 changes: 78 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPDumpStat.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashMetrics.h>
#include <Storages/Page/PageConstants.h>
#include <Storages/Page/V3/CheckpointFile/CPDumpStat.h>
#include <fmt/core.h>

namespace DB::PS::V3
{

void SetMetrics(const CPDataDumpStats & stats)
{
for (size_t i = 0; i < static_cast<size_t>(DB::StorageType::_MAX_STORAGE_TYPE_); ++i)
{
auto type = static_cast<DB::StorageType>(i);
switch (type)
{
case DB::StorageType::Unknown:
{
GET_METRIC(tiflash_storage_checkpoint_keys_by_types, type_unknown).Increment(stats.num_keys[i]);
GET_METRIC(tiflash_storage_checkpoint_flow_by_types, type_unknown).Increment(stats.num_bytes[i]);
break;
}
case DB::StorageType::RaftEngine:
{
GET_METRIC(tiflash_storage_checkpoint_keys_by_types, type_raftengine).Increment(stats.num_keys[i]);
GET_METRIC(tiflash_storage_checkpoint_flow_by_types, type_raftengine).Increment(stats.num_bytes[i]);
break;
}
case DB::StorageType::KVEngine:
{
GET_METRIC(tiflash_storage_checkpoint_keys_by_types, type_kvengine).Increment(stats.num_keys[i]);
GET_METRIC(tiflash_storage_checkpoint_flow_by_types, type_kvengine).Increment(stats.num_bytes[i]);
break;
}
case DB::StorageType::KVStore:
{
GET_METRIC(tiflash_storage_checkpoint_keys_by_types, type_kvstore).Increment(stats.num_keys[i]);
GET_METRIC(tiflash_storage_checkpoint_flow_by_types, type_kvstore).Increment(stats.num_bytes[i]);
break;
}
case DB::StorageType::Data:
{
GET_METRIC(tiflash_storage_checkpoint_keys_by_types, type_data).Increment(stats.num_keys[i]);
GET_METRIC(tiflash_storage_checkpoint_flow_by_types, type_data).Increment(stats.num_bytes[i]);
break;
}
case DB::StorageType::Log:
{
GET_METRIC(tiflash_storage_checkpoint_keys_by_types, type_log).Increment(stats.num_keys[i]);
GET_METRIC(tiflash_storage_checkpoint_flow_by_types, type_log).Increment(stats.num_bytes[i]);
break;
}
case DB::StorageType::Meta:
{
GET_METRIC(tiflash_storage_checkpoint_keys_by_types, type_meta).Increment(stats.num_keys[i]);
GET_METRIC(tiflash_storage_checkpoint_flow_by_types, type_meta).Increment(stats.num_bytes[i]);
break;
}
default:
__builtin_unreachable();
}
}
}

} // namespace DB::PS::V3
102 changes: 102 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPDumpStat.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/Page/PageConstants.h>
#include <fmt/format.h>

#include <magic_enum.hpp>

namespace DB::PS::V3
{

struct CPDataDumpStats
{
bool has_new_data = false;

size_t incremental_data_bytes = 0;
size_t compact_data_bytes = 0;

std::array<size_t, static_cast<size_t>(StorageType::_MAX_STORAGE_TYPE_)> num_keys{};
std::array<size_t, static_cast<size_t>(StorageType::_MAX_STORAGE_TYPE_)> num_bytes{};

// Total number of records in this checkpoint
size_t num_records = 0;
// Number of Pages that already uploaded to S3
// and is not changed in this checkpoint
size_t num_pages_unchanged = 0;
// Number of Pages that already uploaded to S3
// but picked by compaction in this checkpoint
size_t num_pages_compact = 0;
// Number of incremental Pages since last checkpoint
size_t num_pages_incremental = 0;
// Number of ExternalPages
size_t num_ext_pages = 0;
// Number of RefPages
size_t num_ref_pages = 0;
// Number of delete records
size_t num_delete_records = 0;
// Number of other records other than Pages/ExternalPages
size_t num_other_records = 0;
};

void SetMetrics(const CPDataDumpStats & stats);

} // namespace DB::PS::V3

template <>
struct fmt::formatter<DB::PS::V3::CPDataDumpStats>
{
static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); }

template <typename FormatContext>
auto format(const DB::PS::V3::CPDataDumpStats & value, FormatContext & ctx) const -> decltype(ctx.out())
{
auto it = format_to(
ctx.out(),
"CPDataDumpStats{{"
"incremental_data_bytes={} compact_data_bytes={}"
" n_records{{total={}"
" pages_unchanged={} pages_compact={} pages_incremental={} ext_pages={} ref_pages={}"
" delete={} other={}}}",
value.incremental_data_bytes,
value.compact_data_bytes,
value.num_records,
value.num_pages_unchanged,
value.num_pages_compact,
value.num_pages_incremental,
value.num_ext_pages,
value.num_ref_pages,
value.num_delete_records,
value.num_other_records);
it = format_to(it, " types[");
for (size_t i = 0; i < static_cast<size_t>(DB::StorageType::_MAX_STORAGE_TYPE_); ++i)
{
if (i != 0)
it = format_to(it, " ");
it = format_to(
it,
"{{type={} keys={} bytes={}}}",
magic_enum::enum_name(static_cast<DB::StorageType>(i)),
value.num_keys[i],
value.num_bytes[i]);
}
return format_to(
it,
"]" // end of "keys"
"}}" // end of "CPDataDumpStats"
);
}
};
Loading

0 comments on commit 5efe631

Please sign in to comment.