Skip to content

Commit

Permalink
Add metrics about S3 checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: JaySon-Huang <[email protected]>
  • Loading branch information
JaySon-Huang committed Mar 31, 2023
1 parent 8033e12 commit 4f27f51
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 31 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Storages/Page/PageConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ static constexpr UInt64 GB = MB * 1024;

enum class StorageType
{
Unknown = 0,
Log = 1,
Data = 2,
Meta = 3,
KVStore = 4,
RaftEngine = 5,
KVEngine = 6,

_MAX_STORAGE_TYPE_, // NOLINT(bugprone-reserved-identifier)
};

enum class PageStorageRunMode : UInt8
Expand Down
6 changes: 0 additions & 6 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
namespace DB::PS::V3
{

struct CPDataWriteStats
{
bool has_new_data = false;
size_t incremental_data_bytes = 0;
size_t compact_data_bytes = 0;
};

using RemoteFileValidSizes = std::unordered_map<String, size_t>;

Expand Down
92 changes: 92 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPDumpStat.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/Page/PageConstants.h>
#include <fmt/format.h>

#include <magic_enum.hpp>

namespace DB::PS::V3
{

struct CPDataWriteStats
{
bool has_new_data = false;

size_t incremental_data_bytes = 0;
size_t compact_data_bytes = 0;

std::array<size_t, static_cast<size_t>(StorageType::_MAX_STORAGE_TYPE_)> num_keys{};
std::array<size_t, static_cast<size_t>(StorageType::_MAX_STORAGE_TYPE_)> num_bytes{};

// Total number of records in this checkpoint
size_t num_records = 0;
// Number of Pages that already uploaded to S3
// and is not changed in this checkpoint
size_t num_pages_unchanged = 0;
// Number of Pages that already uploaded to S3
// but picked by compaction in this checkpoint
size_t num_pages_compact = 0;
// Number of incremental Pages since last checkpoint
size_t num_pages_incremental = 0;
// Number of ExternalPages
size_t num_ext_pages = 0;
// Number of other records other than Pages/ExternalPages
size_t num_other_records = 0;
};

} // namespace DB::PS::V3

template <>
struct fmt::formatter<DB::PS::V3::CPDataWriteStats>
{
static constexpr auto parse(format_parse_context & ctx) { return ctx.begin(); }

template <typename FormatContext>
auto format(const DB::PS::V3::CPDataWriteStats & value, FormatContext & ctx) const -> decltype(ctx.out())
{
auto it = format_to(
ctx.out(),
"CPDataWriteStats{{"
"incremental_data_bytes={} compact_data_bytes={}"
" n_records{{total={} pages_unchanged={} pages_compact={} pages_incremental={} ext_pages={} other={}}}",
value.incremental_data_bytes,
value.compact_data_bytes,
value.num_records,
value.num_pages_unchanged,
value.num_pages_compact,
value.num_pages_incremental,
value.num_ext_pages,
value.num_other_records);
it = format_to(it, " types[");
for (size_t i = 0; i < static_cast<size_t>(DB::StorageType::_MAX_STORAGE_TYPE_); ++i)
{
if (i != 0)
it = format_to(it, " ");
it = format_to(
it,
"{{type={} keys={} bytes={}}}",
magic_enum::enum_name(static_cast<DB::StorageType>(i)),
value.num_keys[i],
value.num_bytes[i]);
}
return format_to(
it,
"]" // end of "keys"
"}}" // end of "CPDataWriteStats"
);
}
};
25 changes: 24 additions & 1 deletion dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/Page/V3/CheckpointFile/CPDumpStat.h>
#include <Storages/Page/V3/CheckpointFile/CPFilesWriter.h>
#include <Storages/Page/V3/PageEntryCheckpointInfo.h>
#include <Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h>
#include <fmt/core.h>

#include <unordered_map>
Expand Down Expand Up @@ -73,13 +75,27 @@ CPDataWriteStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
return {.has_new_data = false};

CPDataWriteStats write_down_stats;
for (size_t i = 0; i < static_cast<size_t>(StorageType::_MAX_STORAGE_TYPE_); ++i)
{
write_down_stats.num_keys[i] = 0;
write_down_stats.num_bytes[i] = 0;
}

std::unordered_map<String, size_t> compact_stats;
bool last_page_is_raft_data = true;

// 1. Iterate all edits, find these entry edits without the checkpoint info
// and collect the lock files from applied entries.
write_down_stats.num_records = records.size();
for (auto & rec_edit : records)
{
StorageType id_storage_type = StorageType::Unknown;
{
id_storage_type = UniversalPageIdFormat::getUniversalPageIdType(rec_edit.page_id);
write_down_stats.num_keys[static_cast<size_t>(id_storage_type)] += 1;
write_down_stats.num_bytes[static_cast<size_t>(id_storage_type)] += rec_edit.entry.size;
}

if (rec_edit.type == EditRecordType::VAR_EXTERNAL)
{
RUNTIME_CHECK_MSG(
Expand All @@ -90,11 +106,15 @@ CPDataWriteStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
rec_edit);
// for example, the s3 fullpath of external id
locked_files.emplace(*rec_edit.entry.checkpoint_info.data_location.data_file_id);
write_down_stats.num_ext_pages += 1;
continue;
}

if (rec_edit.type != EditRecordType::VAR_ENTRY)
{
write_down_stats.num_other_records += 1;
continue;
}

bool is_compaction = false;
if (rec_edit.entry.checkpoint_info.has_value())
Expand All @@ -104,6 +124,7 @@ CPDataWriteStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
{
// for example, the s3 fullpath that was written in the previous uploaded CheckpointDataFile
locked_files.emplace(file_id);
write_down_stats.num_pages_unchanged += 1;
continue;
}
// else we rewrite this entry data to the data file generated by this checkpoint, so that
Expand All @@ -113,7 +134,7 @@ CPDataWriteStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
compact_stats.try_emplace(file_id, 0).first->second += rec_edit.entry.size;
}

bool current_page_is_raft_data = rec_edit.page_id.isRaftData();
bool current_page_is_raft_data = (id_storage_type == StorageType::RaftEngine);
if (current_write_size > 0 // If current_write_size is 0, data_writer is a empty file, not need to create a new one.
&& (current_page_is_raft_data != last_page_is_raft_data // Data type changed
|| (max_data_file_size != 0 && current_write_size >= max_data_file_size))) // or reach size limit.
Expand Down Expand Up @@ -143,10 +164,12 @@ CPDataWriteStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
if (is_compaction)
{
write_down_stats.compact_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_compact += 1;
}
else
{
write_down_stats.incremental_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_incremental += 1;
}
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/CheckpointFile/CPDataFileStat.h>
#include <Storages/Page/V3/CheckpointFile/CPDataFileWriter.h>
#include <Storages/Page/V3/CheckpointFile/CPDumpStat.h>
#include <Storages/Page/V3/CheckpointFile/CPManifestFileWriter.h>
#include <Storages/Page/V3/CheckpointFile/CPWriteDataSource.h>
#include <Storages/Page/V3/CheckpointFile/Proto/common.pb.h>
Expand Down
6 changes: 0 additions & 6 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,7 @@ class UniversalPageId final

friend bool operator==(const String & lhs, const UniversalPageId & rhs);

bool isRaftData() const
{
return !id.empty() && id[0] == raft_prefix;
}

private:
static constexpr char raft_prefix = 0x01;
String id;
};

Expand Down
54 changes: 48 additions & 6 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <Storages/Transaction/TiKVKeyspaceIDImpl.h>
#include <fmt/format.h>

#include "Storages/Page/PageConstants.h"

namespace DB
{
// General UniversalPageId Format: Prefix + PageIdU64.
Expand Down Expand Up @@ -92,12 +94,15 @@ struct UniversalPageIdFormat
return toFullPageId(getSubPrefix(StorageType::KVStore), region_id);
}

static constexpr char RAFT_PREFIX = 0x01;
static constexpr char KV_PREFIX = 0x02;

// data is in kv engine, so it is prepended by KV_PREFIX
// KV_PREFIX LOCAL_PREFIX REGION_RAFT_PREFIX region_id APPLY_STATE_SUFFIX
static UniversalPageId toRaftApplyStateKeyInKVEngine(UInt64 region_id)
{
WriteBufferFromOwnString buff;
writeChar(0x02, buff);
writeChar(KV_PREFIX, buff);
writeChar(0x01, buff);
writeChar(0x02, buff);
encodeUInt64(region_id, buff);
Expand All @@ -110,7 +115,7 @@ struct UniversalPageIdFormat
static UniversalPageId toRegionLocalStateKeyInKVEngine(UInt64 region_id)
{
WriteBufferFromOwnString buff;
writeChar(0x02, buff);
writeChar(KV_PREFIX, buff);
writeChar(0x01, buff);
writeChar(0x03, buff);
encodeUInt64(region_id, buff);
Expand All @@ -122,7 +127,7 @@ struct UniversalPageIdFormat
static String toFullRaftLogPrefix(UInt64 region_id)
{
WriteBufferFromOwnString buff;
writeChar(0x01, buff);
writeChar(RAFT_PREFIX, buff);
writeChar(0x01, buff);
writeChar(0x02, buff);
encodeUInt64(region_id, buff);
Expand All @@ -134,7 +139,7 @@ struct UniversalPageIdFormat
static String toFullRaftLogScanEnd(UInt64 region_id)
{
WriteBufferFromOwnString buff;
writeChar(0x01, buff);
writeChar(RAFT_PREFIX, buff);
writeChar(0x01, buff);
writeChar(0x02, buff);
encodeUInt64(region_id, buff);
Expand All @@ -148,7 +153,7 @@ struct UniversalPageIdFormat
static String getStoreIdentIdInKVEngine()
{
WriteBufferFromOwnString buff;
writeChar(0x02, buff);
writeChar(KV_PREFIX, buff);
writeChar(0x01, buff);
writeChar(0x01, buff);
return buff.releaseStr();
Expand All @@ -158,7 +163,7 @@ struct UniversalPageIdFormat
static String getStoreIdentId()
{
WriteBufferFromOwnString buff;
writeChar(0x01, buff);
writeChar(RAFT_PREFIX, buff);
writeChar(0x01, buff);
writeChar(0x01, buff);
return buff.releaseStr();
Expand Down Expand Up @@ -190,6 +195,43 @@ struct UniversalPageIdFormat
return page_id_without_keyspace.starts_with(getSubPrefix(type));
}

static inline StorageType getUniversalPageIdType(const UniversalPageId & page_id)
{
if (page_id.empty())
return StorageType::Unknown;

const auto & page_id_str = page_id.asStr();
if (page_id_str[0] == RAFT_PREFIX)
{
return StorageType::RaftEngine;
}
else if (page_id_str[0] == KV_PREFIX)
{
return StorageType::KVEngine;
}
else
{
auto page_id_without_keyspace = TiKVKeyspaceID::removeKeyspaceID(std::string_view(page_id_str.data(), page_id_str.size()));
if (page_id_without_keyspace.starts_with(getSubPrefix(StorageType::Log)))
{
return StorageType::Log;
}
if (page_id_without_keyspace.starts_with(getSubPrefix(StorageType::Data)))
{
return StorageType::Data;
}
if (page_id_without_keyspace.starts_with(getSubPrefix(StorageType::Meta)))
{
return StorageType::Meta;
}
if (page_id_without_keyspace.starts_with(getSubPrefix(StorageType::KVStore)))
{
return StorageType::KVStore;
}
}
return StorageType::Unknown;
}

private:
static inline void encodeUInt64(const UInt64 x, WriteBuffer & ss)
{
Expand Down
23 changes: 11 additions & 12 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,18 +516,17 @@ PS::V3::CPDataWriteStats UniversalPageStorage::dumpIncrementalCheckpoint(const U
GET_METRIC(tiflash_storage_checkpoint_seconds, type_dump_checkpoint_data).Observe(dump_data_seconds);
GET_METRIC(tiflash_storage_checkpoint_seconds, type_upload_checkpoint).Observe(upload_seconds);
GET_METRIC(tiflash_storage_checkpoint_seconds, type_copy_checkpoint_info).Observe(copy_checkpoint_info_seconds);
LOG_DEBUG(log,
"Checkpoint result: files={}, dump_snapshot={:.3f}s, dump_data={:.3f}s, upload={:.3f}s, copy_checkpoint_info={:.3f}s, "
"total={:.3f}s, sequence={}, incremental_data_bytes={}, compact_data_bytes={}",
data_file_paths,
dump_snapshot_seconds,
dump_data_seconds,
upload_seconds,
copy_checkpoint_info_seconds,
sw.elapsedSeconds(),
sequence,
write_stats.incremental_data_bytes,
write_stats.compact_data_bytes);
LOG_INFO(log,
"Checkpoint result: files={} dump_snapshot={:.3f}s dump_data={:.3f}s upload={:.3f}s copy_checkpoint_info={:.3f}s "
"total={:.3f}s sequence={} {}",
data_file_paths,
dump_snapshot_seconds,
dump_data_seconds,
upload_seconds,
copy_checkpoint_info_seconds,
sw.elapsedSeconds(),
sequence,
write_stats);
return write_stats;
}

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <Storages/Page/Snapshot.h>
#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/CheckpointFile/CPDataFileStat.h>
#include <Storages/Page/V3/CheckpointFile/CPDumpStat.h>
#include <Storages/Page/V3/CheckpointFile/CheckpointFiles.h>
#include <Storages/Page/V3/GCDefines.h>
#include <Storages/Page/V3/PageDirectory.h>
Expand Down

0 comments on commit 4f27f51

Please sign in to comment.