Skip to content

Commit

Permalink
Storages: Track memory for PageDirectory (#9134)
Browse files Browse the repository at this point in the history
close #8835

Signed-off-by: CalvinNeo <[email protected]>

Co-authored-by: JaySon-Huang <[email protected]>
  • Loading branch information
CalvinNeo and JaySon-Huang authored Jun 13, 2024
1 parent 2fa02c2 commit c1f8bca
Show file tree
Hide file tree
Showing 14 changed files with 311 additions and 99 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"The freshness of tiflash data with tikv data", \
Histogram, \
F(type_syncing_data_freshness, {{"type", "data_freshness"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_memory_usage_by_class, \
"TiFlash memory consumes by class", \
Gauge, \
F(type_uni_page_ids, {"type", "uni_page_ids"}), \
F(type_versioned_entries, {"type", "versioned_entries"})) \
M(tiflash_storage_read_tasks_count, "Total number of storage engine read tasks", Counter) \
M(tiflash_storage_command_count, \
"Total number of storage's command, such as delete range / shutdown /startup", \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/AsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <Storages/Page/FileUsage.h>
#include <Storages/Page/PageConstants.h>
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/PageStorageMemorySummary.h>
#include <Storages/Page/V3/Universal/UniversalPageStorageService.h>
#include <Storages/StorageDeltaMerge.h>
#include <common/config_common.h>
Expand Down Expand Up @@ -300,6 +301,7 @@ void AsynchronousMetrics::update()
set("LogNums", usage.total_log_file_num);
set("LogDiskBytes", usage.total_log_disk_size);
set("PagesInMem", usage.num_pages);
set("VersionedEntries", DB::PS::PageStorageMemorySummary::versioned_entry_or_delete_count.load());
}

if (context.getSharedContextDisagg()->isDisaggregatedStorageMode())
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Common/setThreadName.h>
#include <Storages/KVStore/FFI/JointThreadAllocInfo.h>
#include <Storages/KVStore/FFI/ProxyFFI.h>
#include <Storages/Page/V3/PageDirectory.h>

#include <magic_enum.hpp>
#include <mutex>
Expand Down Expand Up @@ -47,6 +48,7 @@ void JointThreadInfoJeallocMap::recordThreadAllocInfo()
{
recordThreadAllocInfoForProxy();
recordThreadAllocInfoForStorage();
recordClassdAlloc();
}

JointThreadInfoJeallocMap::~JointThreadInfoJeallocMap()
Expand Down Expand Up @@ -269,4 +271,13 @@ void JointThreadInfoJeallocMap::accessStorageMap(std::function<void(const AllocM
f(storage_map);
}

void JointThreadInfoJeallocMap::recordClassdAlloc()
{
GET_METRIC(tiflash_memory_usage_by_class, type_uni_page_ids)
.Set(PS::PageStorageMemorySummary::uni_page_id_bytes.load());
GET_METRIC(tiflash_memory_usage_by_class, type_versioned_entries)
.Set(PS::PageStorageMemorySummary::versioned_entry_or_delete_bytes.load());
}


} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/FFI/JointThreadAllocInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ class JointThreadInfoJeallocMap
uint64_t value,
char aggregate_delimer);

static void recordClassdAlloc();

private:
mutable std::shared_mutex memory_allocation_mut;
AllocMap proxy_map;
Expand Down
46 changes: 27 additions & 19 deletions dbms/src/Storages/KVStore/FFI/SSTReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class MonoSSTReader : public SSTReader
BaseBuffView keyView() const override;
BaseBuffView valueView() const override;
void next() override;
SSTFormatKind sst_format_kind() const { return kind; };
SSTFormatKind sstFormatKind() const { return kind; };
size_t approxSize() const override;
std::vector<std::string> findSplitKeys(uint64_t splits_count) const override;
void seek(BaseBuffView && view) const override;
Expand Down Expand Up @@ -147,20 +147,27 @@ class MultiSSTReader : public SSTReader
}
size_t getSplitId() const override { return split_id; }

// Switch to next mono reader if current is drained,
// Switch to next mono reader if current SST is drained,
// and we have a next sst file to read.
void maybeNextReader() const
void maybeNextReader()
{
if (!mono->remained())
if (likely(mono->remained()))
return;

sst_idx++;
if (sst_idx < args.size())
{
current++;
if (current < args.size())
{
// We don't drop if mono is the last instance for safety,
// and it will be dropped as MultiSSTReader is dropped.
LOG_INFO(log, "Open sst file {}", buffToStrView(args[current].path));
mono = initer(proxy_helper, args[current], range, split_id);
}
// We don't drop if mono is the last instance for safety,
// and it will be dropped as MultiSSTReader is dropped.
LOG_INFO(
log,
"Open sst file {}, range={} sst_idx={} sst_tot={} split_id={}",
buffToStrView(args[sst_idx].path),
range->toDebugString(),
sst_idx,
args.size(),
split_id);
mono = initer(proxy_helper, args[sst_idx], range, split_id);
}
}

Expand All @@ -177,18 +184,19 @@ class MultiSSTReader : public SSTReader
, type(type_)
, initer(initer_)
, args(args_)
, current(0)
, sst_idx(0)
, range(range_)
, split_id(split_id_)
{
assert(args.size() > 0);
LOG_INFO(
log,
"Open sst file first {} range {} split_id={}",
buffToStrView(args[current].path),
"Open sst file first {}, range={} sst_tot={} split_id={}",
buffToStrView(args[sst_idx].path),
range->toDebugString(),
args.size(),
split_id);
mono = initer(proxy_helper, args[current], range, split_id);
mono = initer(proxy_helper, args[sst_idx], range, split_id);
}

~MultiSSTReader() override
Expand All @@ -202,12 +210,12 @@ class MultiSSTReader : public SSTReader
/// The instance is ill-formed if the size of `args` is zero.
mutable std::unique_ptr<R> mono;
const TiFlashRaftProxyHelper * proxy_helper;
ColumnFamilyType type;
const ColumnFamilyType type;
Initer initer;
std::vector<E> args;
mutable size_t current;
size_t sst_idx;
RegionRangeFilter range;
size_t split_id;
const size_t split_id;
};

} // namespace DB
25 changes: 12 additions & 13 deletions dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void PreHandlingTrace::waitForSubtaskResources(uint64_t region_id, size_t parall
{
LOG_DEBUG(
log,
"Prehandle resource meet, limit={}, current={}, region_id={}",
"Prehandle resource meet, limit={} current={} region_id={}",
parallel_subtask_limit,
ongoing_prehandle_subtask_count.load(),
region_id);
Expand Down Expand Up @@ -398,8 +398,7 @@ static inline std::pair<std::vector<std::string>, size_t> getSplitKey(
LOG_INFO(
log,
"getSplitKey result {}, total_concurrency={} ongoing={} total_split_parts={} split_keys={} "
"region_range={} approx_bytes={} "
"region_id={}",
"region_range={} approx_bytes={} region_id={}",
fmt_buf.toString(),
total_concurrency,
ongoing_count,
Expand Down Expand Up @@ -448,7 +447,7 @@ static void runInParallel(
= executeTransform(log, prehandle_ctx, part_new_region, part_sst_stream);
LOG_INFO(
log,
"Finished extra parallel prehandle task limit {} write_cf={} lock_cf={} default_cf={} dmfiles={} error={}, "
"Finished extra parallel prehandle task limit {} write_cf={} lock_cf={} default_cf={} dmfiles={} error={} "
"split_id={} region_id={}",
limit_tag,
part_prehandle_result.stats.write_cf_keys,
Expand Down Expand Up @@ -476,8 +475,7 @@ static void runInParallel(
LOG_INFO(
log,
"Parallel prehandling error {}"
" write_cf_off={}"
" split_id={} region_id={}",
" write_cf_off={} split_id={} region_id={}",
e.message(),
processed_keys.write_cf,
extra_id,
Expand Down Expand Up @@ -509,10 +507,11 @@ void executeParallelTransform(
split_key_count);
LOG_INFO(
log,
"Parallel prehandling for single big region, range={}, split keys={}, region_id={}",
"Parallel prehandling for single big region, range={} split_keys={} region_id={} snaps={}",
new_region->getRange()->toDebugString(),
split_key_count,
new_region->id());
new_region->id(),
snaps.len);
Stopwatch watch;
// Make sure the queue is bigger than `split_key_count`, otherwise `addTask` may fail.
auto async_tasks = SingleSnapshotAsyncTasks(split_key_count, split_key_count, split_key_count + 5);
Expand Down Expand Up @@ -550,9 +549,8 @@ void executeParallelTransform(
auto [head_result, head_prehandle_result] = executeTransform(log, prehandle_ctx, new_region, sst_stream);
LOG_INFO(
log,
"Finished extra parallel prehandle task limit={} write_cf {} lock_cf={} default_cf={} dmfiles={} "
"error={}, split_id={}, "
"region_id={}",
"Finished extra parallel prehandle task, limit={} write_cf={} lock_cf={} default_cf={} dmfiles={} "
"error={} split_id={} region_id={}",
sst_stream->getSoftLimit()->toDebugString(),
head_prehandle_result.stats.write_cf_keys,
head_prehandle_result.stats.lock_cf_keys,
Expand Down Expand Up @@ -714,9 +712,10 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
{
LOG_INFO(
log,
"Single threaded prehandling for single region, range={} region_id={}",
"Single threaded prehandling for single region, range={} region_id={} snaps={}",
new_region->getRange()->toDebugString(),
new_region->id());
new_region->id(),
snaps.len);
std::tie(result, prehandle_result) = executeTransform(log, prehandle_ctx, new_region, sst_stream);
}
else
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/KVStore/tests/gtest_new_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
#include <Storages/KVStore/Region.h>
#include <Storages/KVStore/Utils/AsyncTasks.h>
#include <Storages/KVStore/tests/region_kvstore_test.h>
#include <Storages/Page/V3/PageDefines.h>
#include <Storages/Page/V3/PageDirectory.h>
#include <Storages/Page/V3/PageDirectoryFactory.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h>
#include <Storages/RegionQueryInfo.h>
#include <TiDB/Schema/SchemaSyncService.h>
#include <TiDB/Schema/TiDBSchemaManager.h>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/tests/region_kvstore_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ inline void validateSSTGeneration(
size_t split_id) -> std::unique_ptr<MonoSSTReader> {
auto parsed_kind = MockSSTGenerator::parseSSTViewKind(buffToStrView(snap.path));
auto reader = std::make_unique<MonoSSTReader>(proxy_helper, snap, range, split_id);
assert(reader->sst_format_kind() == parsed_kind);
assert(reader->sstFormatKind() == parsed_kind);
return reader;
};
MultiSSTReader<MonoSSTReader, SSTView> reader{
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Storages/Page/PageStorageMemorySummary.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <atomic>

namespace DB::PS
{
struct PageStorageMemorySummary
{
static inline std::atomic_int64_t uni_page_id_bytes{0};
static inline std::atomic_int64_t versioned_entry_or_delete_bytes{0};
static inline std::atomic_int64_t versioned_entry_or_delete_count{0};
};

} // namespace DB::PS
48 changes: 32 additions & 16 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,31 +222,47 @@ struct EntryOrDelete
MultiVersionRefCount being_ref_count;
std::optional<PageEntryV3> entry;

static EntryOrDelete newDelete()
EntryOrDelete(const EntryOrDelete & other)
: being_ref_count(other.being_ref_count)
, entry(other.entry)
{
return EntryOrDelete{
.entry = std::nullopt,
};
};
static EntryOrDelete newNormalEntry(const PageEntryV3 & entry)
PageStorageMemorySummary::versioned_entry_or_delete_count.fetch_add(1);
if (entry)
PageStorageMemorySummary::versioned_entry_or_delete_bytes.fetch_add(sizeof(PageEntryV3));
}
EntryOrDelete() { PageStorageMemorySummary::versioned_entry_or_delete_count.fetch_add(1); }
EntryOrDelete(std::optional<PageEntryV3> entry_)
: entry(std::move(entry_))
{
PageStorageMemorySummary::versioned_entry_or_delete_count.fetch_add(1);
if (entry)
PageStorageMemorySummary::versioned_entry_or_delete_bytes.fetch_add(sizeof(PageEntryV3));
}
EntryOrDelete(MultiVersionRefCount being_ref_count_, std::optional<PageEntryV3> entry_)
: being_ref_count(being_ref_count_)
, entry(std::move(entry_))
{
return EntryOrDelete{
.entry = entry,
};
PageStorageMemorySummary::versioned_entry_or_delete_count.fetch_add(1);
if (entry)
PageStorageMemorySummary::versioned_entry_or_delete_bytes.fetch_add(sizeof(PageEntryV3));
}
~EntryOrDelete()
{
PageStorageMemorySummary::versioned_entry_or_delete_count.fetch_sub(1);
if (entry)
PageStorageMemorySummary::versioned_entry_or_delete_bytes.fetch_sub(sizeof(PageEntryV3));
}

static EntryOrDelete newDelete() { return EntryOrDelete(std::nullopt); };
static EntryOrDelete newNormalEntry(const PageEntryV3 & entry) { return EntryOrDelete(entry); }
static EntryOrDelete newReplacingEntry(const EntryOrDelete & ori_entry, const PageEntryV3 & entry)
{
return EntryOrDelete{
.being_ref_count = ori_entry.being_ref_count,
.entry = entry,
};
return EntryOrDelete(ori_entry.being_ref_count, entry);
}

static EntryOrDelete newFromRestored(PageEntryV3 entry, const PageVersion & ver, Int64 being_ref_count)
{
auto result = EntryOrDelete{
.entry = entry,
};
auto result = EntryOrDelete(std::move(entry));
result.being_ref_count.restoreFrom(ver, being_ref_count);
return result;
}
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/PageDirectory/PageIdTrait.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct PageIdTrait
static inline PageIdU64 getU64ID(const PageId & page_id) { return page_id.low; }
static inline Prefix getPrefix(const PageId & page_id) { return page_id.high; }
static inline PageIdU64 getPageMapKey(const PageId & page_id) { return page_id.low; }
static inline size_t getPageIDSize(const PageId & page_id) { return sizeof(page_id); }
};
} // namespace u128
namespace universal
Expand All @@ -45,6 +46,8 @@ struct PageIdTrait
static Prefix getPrefix(const PageId & page_id);

static inline PageId getPageMapKey(const PageId & page_id) { return page_id; }

static inline size_t getPageIDSize(const PageId & page_id) { return page_id.size(); }
};
} // namespace universal
} // namespace DB::PS::V3
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/PageEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Common/Exception.h>
#include <Storages/Page/PageStorageMemorySummary.h>
#include <Storages/Page/V3/PageDefines.h>
#include <Storages/Page/V3/PageEntryCheckpointInfo.h>
#include <fmt/format.h>
Expand Down
Loading

0 comments on commit c1f8bca

Please sign in to comment.