Skip to content

Commit

Permalink
Split checkpoint data files. (#7149)
Browse files Browse the repository at this point in the history
ref #6827
  • Loading branch information
JinheLin authored Mar 24, 2023
1 parent e4f3bb1 commit 828da8a
Show file tree
Hide file tree
Showing 11 changed files with 388 additions and 186 deletions.
6 changes: 5 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,11 @@ namespace DB
F(type_complete_multi_part_upload, {{"type", "complete_multi_part_upload"}}, ExpBuckets{0.001, 2, 20}), \
F(type_list_objects, {{"type", "list_objects"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delete_object, {{"type", "delete_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20}))
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_checkpoint_seconds, "PageStorage checkpoint elapsed time", Histogram, \
F(type_dump_checkpoint_snapshot, {{"type", "dump_checkpoint_snapshot"}}, ExpBuckets{0.001, 2, 20}), \
F(type_dump_checkpoint_data, {{"type", "dump_checkpoint_data"}}, ExpBuckets{0.001, 2, 20}), \
F(type_upload_checkpoint, {{"type", "upload_checkpoint"}}, ExpBuckets{0.001, 2, 20}))

// clang-format on

Expand Down
54 changes: 44 additions & 10 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Storages/Page/V3/CheckpointFile/CPFilesWriter.h>
#include <Storages/Page/V3/PageEntryCheckpointInfo.h>
#include <fmt/core.h>

#include <unordered_map>

Expand All @@ -22,10 +23,10 @@ namespace DB::PS::V3

CPFilesWriter::CPFilesWriter(CPFilesWriter::Options options)
: manifest_file_id(options.manifest_file_id)
, data_writer(CPDataFileWriter::create({
.file_path = options.data_file_path,
.file_id = options.data_file_id,
}))
, data_file_id_pattern(options.data_file_id_pattern)
, data_file_path_pattern(options.data_file_path_pattern)
, sequence(options.sequence)
, max_data_file_size(options.max_data_file_size)
, manifest_writer(CPManifestFileWriter::create({
.file_path = options.manifest_file_path,
}))
Expand All @@ -41,14 +42,13 @@ void CPFilesWriter::writePrefix(const CPFilesWriter::PrefixInfo & info)

auto create_at_ms = Poco::Timestamp().epochMicroseconds() / 1000;

CheckpointProto::DataFilePrefix data_prefix;
// Init the common fields of DataPrefix.
data_prefix.set_file_format(1);
data_prefix.set_local_sequence(info.sequence);
data_prefix.set_create_at_ms(create_at_ms);
data_prefix.mutable_writer_info()->CopyFrom(info.writer);
data_prefix.set_manifest_file_id(manifest_file_id);
data_prefix.set_sub_file_index(0);
data_writer->writePrefix(data_prefix);
// Create data_writer.
newDataWriter();

CheckpointProto::ManifestFilePrefix manifest_prefix;
manifest_prefix.set_file_format(1);
Expand All @@ -73,6 +73,7 @@ CPDataWriteStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(

CPDataWriteStats write_down_stats;
std::unordered_map<String, size_t> rewrite_stats;
bool last_page_is_raft_data = true;

// 1. Iterate all edits, find these entry edits without the checkpoint info
// and collect the lock files from applied entries.
Expand Down Expand Up @@ -111,6 +112,15 @@ CPDataWriteStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
rewrite_stats.try_emplace(file_id, 0).first->second += rec_edit.entry.size;
}

bool current_page_is_raft_data = rec_edit.page_id.isRaftData();
if (current_write_size > 0 // If current_write_size is 0, data_writer is a empty file, not need to create a new one.
&& (current_page_is_raft_data != last_page_is_raft_data // Data type changed
|| (max_data_file_size != 0 && current_write_size >= max_data_file_size))) // or reach size limit.
{
newDataWriter();
}
last_page_is_raft_data = current_page_is_raft_data;

// 2. For entry edits without the checkpoint info, write them to the data file,
// and assign a new checkpoint info.
auto page = data_source->read({rec_edit.page_id, rec_edit.entry});
Expand All @@ -120,6 +130,7 @@ CPDataWriteStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
rec_edit.version,
page.data.begin(),
page.data.size());
current_write_size += data_location.size_in_file;
RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size);
bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed;
rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{
Expand All @@ -143,11 +154,11 @@ CPDataWriteStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
// 3. Write down everything to the manifest.
manifest_writer->writeEdits(edits);

write_down_stats.has_new_data = data_writer->writtenRecords() > 0;
write_down_stats.has_new_data = total_written_records + data_writer->writtenRecords() > 0;
return write_down_stats;
}

void CPFilesWriter::writeSuffix()
std::vector<String> CPFilesWriter::writeSuffix()
{
RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingEdits, "unexpected write stage {}", magic_enum::enum_name(write_stage));

Expand All @@ -156,9 +167,32 @@ void CPFilesWriter::writeSuffix()
manifest_writer->writeLocksFinish();

data_writer->writeSuffix();
data_writer->flush();
manifest_writer->writeSuffix();
manifest_writer->flush();

write_stage = WriteStage::WritingFinished;
return data_file_paths;
}

void CPFilesWriter::newDataWriter()
{
if (data_writer != nullptr)
{
total_written_records += data_writer->writtenRecords();
data_writer->writeSuffix();
data_writer->flush();
}
current_write_size = 0;
data_file_paths.push_back(fmt::format(fmt::runtime(data_file_path_pattern), fmt::arg("seq", sequence), fmt::arg("index", data_file_index)));
data_writer = CPDataFileWriter::create({
.file_path = data_file_paths.back(),
.file_id = fmt::format(fmt::runtime(data_file_id_pattern), fmt::arg("seq", sequence), fmt::arg("index", data_file_index)),
});
data_prefix.set_create_at_ms(Poco::Timestamp().epochMicroseconds() / 1000);
data_prefix.set_sub_file_index(data_file_index);
data_writer->writePrefix(data_prefix);
++data_file_index;
}

} // namespace DB::PS::V3
43 changes: 24 additions & 19 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class CPFilesWriter : private boost::noncopyable
public:
struct Options
{
const std::string & data_file_path;
const std::string & data_file_id;
const std::string & manifest_file_path;
const std::string & manifest_file_id;
const String & data_file_path_pattern;
const String & data_file_id_pattern;
const String & manifest_file_path;
const String & manifest_file_id;
const CPWriteDataSourcePtr data_source;

/**
Expand All @@ -46,6 +46,8 @@ class CPFilesWriter : private boost::noncopyable
* lock files from `writeEditsAndApplyCheckpointInfo`.
*/
const std::unordered_set<String> & must_locked_files = {};
UInt64 sequence;
UInt64 max_data_file_size;
};

static CPFilesWriterPtr create(Options options)
Expand Down Expand Up @@ -84,35 +86,38 @@ class CPFilesWriter : private boost::noncopyable
/**
* This function must be called, and must be called last, after other `writeXxx`.
*/
void writeSuffix();

void flush()
{
data_writer->flush();
manifest_writer->flush();
}

~CPFilesWriter()
{
flush();
}
[[nodiscard]] std::vector<String> writeSuffix();

#ifndef DBMS_PUBLIC_GTEST
private:
#else
public:
#endif
enum class WriteStage
{
WritingPrefix,
WritingEdits,
WritingFinished,
};

const std::string manifest_file_id;
const CPDataFileWriterPtr data_writer;
void newDataWriter();

const String manifest_file_id;
const String data_file_id_pattern;
const String data_file_path_pattern;
const UInt64 sequence;
const UInt64 max_data_file_size;
Int32 data_file_index = 0;
CPDataFileWriterPtr data_writer;
const CPManifestFileWriterPtr manifest_writer;
const CPWriteDataSourcePtr data_source;

std::unordered_set<String> locked_files;
WriteStage write_stage = WriteStage::WritingPrefix;

std::vector<String> data_file_paths;
CheckpointProto::DataFilePrefix data_prefix;
UInt64 current_write_size = 0;
UInt64 total_written_records = 0;
LoggerPtr log;
};

Expand Down
Loading

0 comments on commit 828da8a

Please sign in to comment.