From 9ea9cf7c75f72d737183474ac668553007e2bca0 Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Tue, 28 Feb 2023 18:39:08 +0800 Subject: [PATCH] Add checkpoint utilities (#6900) ref pingcap/tiflash#6827 --- dbms/CMakeLists.txt | 16 +- dbms/src/Storages/Page/CMakeLists.txt | 3 + .../V3/CheckpointFile/CPDataFileWriter.cpp | 77 +++ .../Page/V3/CheckpointFile/CPDataFileWriter.h | 95 +++ .../Page/V3/CheckpointFile/CPFilesWriter.cpp | 131 ++++ .../Page/V3/CheckpointFile/CPFilesWriter.h | 111 +++ .../CheckpointFile/CPManifestFileReader.cpp | 80 +++ .../V3/CheckpointFile/CPManifestFileReader.h | 71 ++ .../CheckpointFile/CPManifestFileWriter.cpp | 124 ++++ .../V3/CheckpointFile/CPManifestFileWriter.h | 84 +++ .../V3/CheckpointFile/CPWriteDataSource.cpp | 39 ++ .../V3/CheckpointFile/CPWriteDataSource.h | 84 +++ .../V3/CheckpointFile/Proto/CMakeLists.txt | 23 + .../Page/V3/CheckpointFile/Proto/common.proto | 30 + .../V3/CheckpointFile/Proto/data_file.proto | 63 ++ .../CheckpointFile/Proto/manifest_file.proto | 88 +++ .../Page/V3/CheckpointFile/ProtoHelper.cpp | 55 ++ .../Page/V3/CheckpointFile/ProtoHelper.h | 28 + .../src/Storages/Page/V3/CheckpointFile/fwd.h | 34 + .../tests/gtest_file_read_write.cpp | 646 ++++++++++++++++++ .../tests/gtest_proto_helper.cpp | 57 ++ dbms/src/Storages/Page/V3/PageEntriesEdit.cpp | 80 +++ dbms/src/Storages/Page/V3/PageEntriesEdit.h | 44 ++ dbms/src/Storages/Page/V3/PageEntry.h | 7 + .../Page/V3/PageEntryCheckpointInfo.cpp | 53 ++ .../Page/V3/PageEntryCheckpointInfo.h | 58 ++ .../Page/V3/Universal/UniversalPageId.h | 14 + .../V3/Universal/UniversalWriteBatchImpl.h | 4 +- dbms/src/Storages/Page/WriteBatchImpl.h | 4 +- 29 files changed, 2192 insertions(+), 11 deletions(-) create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.cpp create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.h create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileReader.cpp create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileReader.h create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileWriter.cpp create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileWriter.h create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.cpp create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.h create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/Proto/CMakeLists.txt create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/Proto/common.proto create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/Proto/data_file.proto create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/ProtoHelper.cpp create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/ProtoHelper.h create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/fwd.h create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/tests/gtest_file_read_write.cpp create mode 100644 dbms/src/Storages/Page/V3/CheckpointFile/tests/gtest_proto_helper.cpp create mode 100644 dbms/src/Storages/Page/V3/PageEntriesEdit.cpp create mode 100644 dbms/src/Storages/Page/V3/PageEntryCheckpointInfo.cpp create mode 100644 dbms/src/Storages/Page/V3/PageEntryCheckpointInfo.h diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index ad321e3d65b..7b2ab3d6a93 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -185,6 +185,14 @@ target_link_libraries (tiflash_common_io target_include_directories (tiflash_common_io BEFORE PRIVATE ${kvClient_SOURCE_DIR}/include) target_compile_definitions(tiflash_common_io PUBLIC -DTIFLASH_SOURCE_PREFIX=\"${TiFlash_SOURCE_DIR}\") target_link_libraries (dbms + ${RE2_LIBRARY} + ${RE2_ST_LIBRARY} + ${OPENSSL_CRYPTO_LIBRARY} + ${BTRIE_LIBRARIES} + absl::synchronization + tiflash_contrib::aws_s3 + + etcdpb tiflash_parsers tiflash_common_config tiflash_common_io @@ -193,13 +201,7 @@ target_link_libraries (dbms kv_client tipb dtpb - etcdpb - ${RE2_LIBRARY} - ${RE2_ST_LIBRARY} - ${OPENSSL_CRYPTO_LIBRARY} - ${BTRIE_LIBRARIES} - absl::synchronization - tiflash_contrib::aws_s3 + PSCheckpointProto ) # always add GmSSL include dir to the include path for static analysis diff --git a/dbms/src/Storages/Page/CMakeLists.txt b/dbms/src/Storages/Page/CMakeLists.txt index 6e8c6a991b0..18c6ae47760 100644 --- a/dbms/src/Storages/Page/CMakeLists.txt +++ b/dbms/src/Storages/Page/CMakeLists.txt @@ -14,6 +14,8 @@ include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake) +add_subdirectory (./V3/CheckpointFile/Proto) + add_headers_and_sources(page .) add_headers_and_sources(page ./V1) add_headers_and_sources(page ./V1/mvcc) @@ -28,6 +30,7 @@ add_headers_and_sources(page ./V3/LogFile) add_headers_and_sources(page ./V3/PageDirectory) add_headers_and_sources(page ./V3/spacemap) add_headers_and_sources(page ./V3/Universal) +add_headers_and_sources(page ./V3/CheckpointFile) add_headers_and_sources(page ./V3/WAL) add_library(page ${page_headers} ${page_sources}) diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.cpp new file mode 100644 index 00000000000..c294f0af076 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.cpp @@ -0,0 +1,77 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB::PS::V3 +{ + +void CPDataFileWriter::writePrefix(const CheckpointProto::DataFilePrefix & prefix) +{ + RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingPrefix, "unexpected write stage {}", magic_enum::enum_name(write_stage)); + + std::string json; + google::protobuf::util::MessageToJsonString(prefix, &json); + writeStringBinary(json, *file_writer); + + write_stage = WriteStage::WritingRecords; +} + +CheckpointLocation CPDataFileWriter::write(UniversalPageId page_id, PageVersion version, const char * data, size_t n) +{ + RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingRecords, "unexpected write stage {}", magic_enum::enum_name(write_stage)); + + // Every record is prefixed with the length, so that this data file can be parsed standalone. + writeIntBinary(n, *file_writer); + + // TODO: getMaterializedBytes only works for FramedChecksumWriteBuffer, but does not work for a normal WriteBufferFromFile. + // There must be something wrong and should be fixed. + // uint64_t file_offset = file_writer->getMaterializedBytes(); + // file_writer->write(data, n); + // uint64_t write_n = file_writer->getMaterializedBytes() - file_offset; + + uint64_t file_offset = file_writer->count(); + file_writer->write(data, n); + uint64_t write_n = file_writer->count() - file_offset; + RUNTIME_CHECK(write_n == n, write_n, n); // Note: When we add compression later, write_n == n may be false. + + auto * suffix_record = file_suffix.add_records(); + suffix_record->set_page_id(page_id.asStr()); + suffix_record->set_version_sequence(version.sequence); + suffix_record->set_version_epoch(version.epoch); + suffix_record->set_offset_in_file(file_offset); + suffix_record->set_size_in_file(write_n); + + return CheckpointLocation{ + .data_file_id = file_id, + .offset_in_file = file_offset, + .size_in_file = write_n, + }; +} + +void CPDataFileWriter::writeSuffix() +{ + if (write_stage == WriteStage::WritingFinished) + return; // writeSuffix can be called multiple times without causing issues. + if (write_stage != WriteStage::WritingRecords) + RUNTIME_CHECK_MSG(false, "unexpected write stage {}", magic_enum::enum_name(write_stage)); + + std::string json; + google::protobuf::util::MessageToJsonString(file_suffix, &json); + writeStringBinary(json, *file_writer); + + write_stage = WriteStage::WritingFinished; +} + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.h b/dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.h new file mode 100644 index 00000000000..9cc5ff1739e --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPDataFileWriter.h @@ -0,0 +1,95 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace DB::PS::V3 +{ + +class CPDataFileWriter +{ +public: + struct Options + { + const std::string & file_path; + const std::string & file_id; + }; + + static CPDataFileWriterPtr create(Options options) + { + return std::make_unique(options); + } + + explicit CPDataFileWriter(Options options) + : file_writer(std::make_unique(options.file_path)) + , file_id(std::make_shared(options.file_id)) + { + // TODO: FramedChecksumWriteBuffer does not support random access for arbitrary frame sizes. + // So currently we use checksum = false. + // Need to update FramedChecksumWriteBuffer first. + // TODO: Support compressed data file. + } + + ~CPDataFileWriter() + { + flush(); + } + + void writePrefix(const CheckpointProto::DataFilePrefix & prefix); + + CheckpointLocation write(UniversalPageId page_id, PageVersion version, const char * data, size_t n); + + void writeSuffix(); + + void flush() + { + file_writer->next(); + file_writer->sync(); + } + + size_t writtenRecords() const + { + return file_suffix.records_size(); + } + +private: + enum class WriteStage + { + WritingPrefix, + WritingRecords, + WritingFinished, + }; + + const std::unique_ptr file_writer; + const std::shared_ptr file_id; // Shared in each write result + + CheckpointProto::DataFileSuffix file_suffix; + WriteStage write_stage = WriteStage::WritingPrefix; +}; + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp new file mode 100644 index 00000000000..a36645db871 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp @@ -0,0 +1,131 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB::PS::V3 +{ + +CPFilesWriter::CPFilesWriter(CPFilesWriter::Options options) + : manifest_file_id(options.manifest_file_id) + , data_writer(CPDataFileWriter::create({ + .file_path = options.data_file_path, + .file_id = options.data_file_id, + })) + , manifest_writer(CPManifestFileWriter::create({ + .file_path = options.manifest_file_path, + })) + , data_source(options.data_source) + , locked_files(options.must_locked_files) + , log(Logger::get()) +{ +} + +void CPFilesWriter::writePrefix(const CPFilesWriter::PrefixInfo & info) +{ + RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingPrefix, "unexpected write stage {}", magic_enum::enum_name(write_stage)); + + auto create_at_ms = Poco::Timestamp().epochMicroseconds() / 1000; + + CheckpointProto::DataFilePrefix data_prefix; + data_prefix.set_file_format(1); + data_prefix.set_local_sequence(info.sequence); + data_prefix.set_create_at_ms(create_at_ms); + data_prefix.mutable_writer_info()->CopyFrom(info.writer); + data_prefix.set_manifest_file_id(manifest_file_id); + data_prefix.set_sub_file_index(0); + data_writer->writePrefix(data_prefix); + + CheckpointProto::ManifestFilePrefix manifest_prefix; + manifest_prefix.set_file_format(1); + manifest_prefix.set_local_sequence(info.sequence); + manifest_prefix.set_last_local_sequence(info.last_sequence); + manifest_prefix.set_create_at_ms(create_at_ms); + manifest_prefix.mutable_writer_info()->CopyFrom(info.writer); + manifest_writer->writePrefix(manifest_prefix); + + write_stage = WriteStage::WritingEdits; +} + +bool CPFilesWriter::writeEditsAndApplyRemoteInfo(universal::PageEntriesEdit & edits) +{ + RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingEdits, "unexpected write stage {}", magic_enum::enum_name(write_stage)); + + auto & records = edits.getMutRecords(); + if (records.empty()) + return false; + + // 1. Iterate all edits, find these entry edits without the checkpoint info + // and collect the lock files from applied entries. + for (auto & rec_edit : records) + { + if (rec_edit.type == EditRecordType::VAR_EXTERNAL) + { + RUNTIME_CHECK( + rec_edit.entry.checkpoint_info.has_value() && // + rec_edit.entry.checkpoint_info->data_location.data_file_id && // + !rec_edit.entry.checkpoint_info->data_location.data_file_id->empty()); + // for example, the s3 fullpath of external id + locked_files.emplace(*rec_edit.entry.checkpoint_info->data_location.data_file_id); + continue; + } + + if (rec_edit.type != EditRecordType::VAR_ENTRY) + continue; + + if (rec_edit.entry.checkpoint_info.has_value()) + { + // for example, the s3 fullpath that was written in the previous uploaded CheckpointDataFile + locked_files.emplace(*rec_edit.entry.checkpoint_info->data_location.data_file_id); + continue; + } + + // 2. For entry edits without the checkpoint info, write them to the data file, + // and assign a new checkpoint info. + auto page = data_source->read({rec_edit.page_id, rec_edit.entry}); + RUNTIME_CHECK(page.isValid()); + auto data_location = data_writer->write( + rec_edit.page_id, + rec_edit.version, + page.data.begin(), + page.data.size()); + RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size); + rec_edit.entry.checkpoint_info = { + .data_location = data_location, + .is_local_data_reclaimed = false, + }; + locked_files.emplace(*data_location.data_file_id); + } + + // 3. Write down everything to the manifest. + manifest_writer->writeEdits(edits); + + return data_writer->writtenRecords() > 0; +} + +void CPFilesWriter::writeSuffix() +{ + RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingEdits, "unexpected write stage {}", magic_enum::enum_name(write_stage)); + + manifest_writer->writeEditsFinish(); + manifest_writer->writeLocks(locked_files); + manifest_writer->writeLocksFinish(); + + data_writer->writeSuffix(); + manifest_writer->writeSuffix(); + + write_stage = WriteStage::WritingFinished; +} + +} // namespace DB::PS::V3 \ No newline at end of file diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h b/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h new file mode 100644 index 00000000000..e345556908f --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.h @@ -0,0 +1,111 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB::PS::V3 +{ + +class CPFilesWriter : private boost::noncopyable +{ +public: + struct Options + { + const std::string & data_file_path; + const std::string & data_file_id; + const std::string & manifest_file_path; + const std::string & manifest_file_id; + const CPWriteDataSourcePtr data_source; + + /** + * The list of lock files that will be always appended to the checkpoint file. + * + * Note: In addition to the specified lock files, the checkpoint file will also contain + * lock files from `writeEditsAndApplyRemoteInfo`. + */ + const std::unordered_set & must_locked_files = {}; + }; + + static CPFilesWriterPtr create(Options options) + { + return std::make_unique(std::move(options)); + } + + explicit CPFilesWriter(Options options); + + struct PrefixInfo + { + const CheckpointProto::WriterInfo & writer; + const uint64_t sequence; + const uint64_t last_sequence; + }; + + /** + * Must be called first, before other `writeXxx`. + */ + void writePrefix(const PrefixInfo & info); + + /** + * This function can be called multiple times if there are too many edits and + * you want to write in a streaming way. You are also allowed to not call this + * function at all, if there is no edit. + * + * You must call `writeSuffix` finally, if you don't plan to write edits anymore. + */ + bool /* has_new_data */ writeEditsAndApplyRemoteInfo(universal::PageEntriesEdit & edit); + + /** + * This function must be called, and must be called last, after other `writeXxx`. + */ + void writeSuffix(); + + void flush() + { + data_writer->flush(); + manifest_writer->flush(); + } + + ~CPFilesWriter() + { + flush(); + } + +private: + enum class WriteStage + { + WritingPrefix, + WritingEdits, + WritingFinished, + }; + + const std::string manifest_file_id; + const CPDataFileWriterPtr data_writer; + const CPManifestFileWriterPtr manifest_writer; + const CPWriteDataSourcePtr data_source; + + std::unordered_set locked_files; + WriteStage write_stage = WriteStage::WritingPrefix; + + LoggerPtr log; +}; + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileReader.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileReader.cpp new file mode 100644 index 00000000000..f4747b9243d --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileReader.cpp @@ -0,0 +1,80 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +namespace DB::PS::V3 +{ + +CheckpointProto::ManifestFilePrefix CPManifestFileReader::readPrefix() +{ + RUNTIME_CHECK_MSG(read_stage == ReadStage::ReadingPrefix, "unexpected read stage {}", magic_enum::enum_name(read_stage)); + + CheckpointProto::ManifestFilePrefix ret; + details::readMessageWithLength(*compressed_reader, ret); + read_stage = ReadStage::ReadingEdits; + return ret; +} + +std::optional CPManifestFileReader::readEdits(CheckpointProto::StringsInternMap & strings_map) +{ + if (read_stage == ReadStage::ReadingEditsFinished) + return std::nullopt; + RUNTIME_CHECK_MSG(read_stage == ReadStage::ReadingEdits, "unexpected read stage {}", magic_enum::enum_name(read_stage)); + + CheckpointProto::ManifestFileEditsPart part; + details::readMessageWithLength(*compressed_reader, part); + + if (!part.has_more()) + { + read_stage = ReadStage::ReadingEditsFinished; + return std::nullopt; + } + + universal::PageEntriesEdit edit; + auto & records = edit.getMutRecords(); + for (const auto & remote_rec : part.edits()) + records.emplace_back(universal::PageEntriesEdit::EditRecord::fromProto(remote_rec, strings_map)); + + return edit; +} + +std::optional> CPManifestFileReader::readLocks() +{ + if (read_stage == ReadStage::ReadingLocksFinished) + return std::nullopt; + if (read_stage == ReadStage::ReadingEditsFinished) + read_stage = ReadStage::ReadingLocks; + RUNTIME_CHECK_MSG(read_stage == ReadStage::ReadingLocks, "unexpected read stage {}", magic_enum::enum_name(read_stage)); + + CheckpointProto::ManifestFileLocksPart part; + details::readMessageWithLength(*compressed_reader, part); + + if (!part.has_more()) + { + read_stage = ReadStage::ReadingLocksFinished; + return std::nullopt; + } + + std::unordered_set locks; + locks.reserve(part.locks_size()); + for (const auto & lock : part.locks()) + locks.emplace(lock.name()); + return locks; +} + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileReader.h b/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileReader.h new file mode 100644 index 00000000000..b5ac404a851 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileReader.h @@ -0,0 +1,71 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB::PS::V3 +{ + +class CPManifestFileReader : private boost::noncopyable +{ +public: + struct Options + { + const std::string & file_path; + }; + + static CPManifestFileReaderPtr create(Options options) + { + return std::make_unique(std::move(options)); + } + + explicit CPManifestFileReader(Options options) + : file_reader(std::make_unique(options.file_path)) + , compressed_reader(std::make_unique>(*file_reader)) + {} + + CheckpointProto::ManifestFilePrefix readPrefix(); + + /// You should call this function multiple times to read out all edits, until it returns nullopt. + std::optional readEdits(CheckpointProto::StringsInternMap & strings_map); + + /// You should call this function multiple times to read out all locks, until it returns nullopt. + std::optional> readLocks(); + +private: + enum class ReadStage + { + ReadingPrefix, + ReadingEdits, + ReadingEditsFinished, + ReadingLocks, + ReadingLocksFinished, + }; + + // compressed + const std::unique_ptr file_reader; + const ReadBufferPtr compressed_reader; + + ReadStage read_stage = ReadStage::ReadingPrefix; +}; + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileWriter.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileWriter.cpp new file mode 100644 index 00000000000..7839453c6ec --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileWriter.cpp @@ -0,0 +1,124 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +namespace DB::PS::V3 +{ + +void CPManifestFileWriter::writePrefix(const CheckpointProto::ManifestFilePrefix & prefix) +{ + RUNTIME_CHECK_MSG(write_stage == WriteStage::WritingPrefix, "unexpected write stage {}", magic_enum::enum_name(write_stage)); + + details::writeMessageWithLength(*compressed_writer, prefix); + write_stage = WriteStage::WritingEdits; +} + +void CPManifestFileWriter::flush() +{ + compressed_writer->next(); + file_writer->next(); + file_writer->sync(); +} + +void CPManifestFileWriter::writeEdits(const universal::PageEntriesEdit & edit) +{ + if (write_stage != WriteStage::WritingEdits) + RUNTIME_CHECK_MSG(false, "unexpected write stage {}", magic_enum::enum_name(write_stage)); + + if (edit.empty()) + return; + + CheckpointProto::ManifestFileEditsPart part; + part.set_has_more(true); + for (const auto & edit_record : edit.getRecords()) + { + auto * out_record = part.add_edits(); + *out_record = edit_record.toProto(); + } + details::writeMessageWithLength(*compressed_writer, part); +} + +void CPManifestFileWriter::writeEditsFinish() +{ + if (write_stage == WriteStage::WritingEditsFinished) + return; // Ignore calling finish multiple times. + if (write_stage != WriteStage::WritingEdits) + RUNTIME_CHECK_MSG(false, "unexpected write stage {}", magic_enum::enum_name(write_stage)); + + CheckpointProto::ManifestFileEditsPart part; + part.set_has_more(false); + details::writeMessageWithLength(*compressed_writer, part); + + write_stage = WriteStage::WritingEditsFinished; +} + +void CPManifestFileWriter::writeLocks(const std::unordered_set & lock_files) +{ + if (write_stage < WriteStage::WritingEditsFinished) + writeEditsFinish(); // Trying to fast-forward. There may be exceptions. + if (write_stage > WriteStage::WritingLocks) + RUNTIME_CHECK_MSG(false, "unexpected write stage {}", magic_enum::enum_name(write_stage)); + + if (lock_files.empty()) + return; + + CheckpointProto::ManifestFileLocksPart part; + part.set_has_more(true); + for (const auto & lock_file : lock_files) + part.add_locks()->set_name(lock_file); + // Always sort the lock files in order to write out deterministic results. + std::sort( + part.mutable_locks()->begin(), + part.mutable_locks()->end(), + [](const CheckpointProto::LockFile & a, const CheckpointProto::LockFile & b) { + return a.name() < b.name(); + }); + details::writeMessageWithLength(*compressed_writer, part); + + write_stage = WriteStage::WritingLocks; +} + +void CPManifestFileWriter::writeLocksFinish() +{ + if (write_stage == WriteStage::WritingLocksFinished) + return; // Ignore calling finish multiple times. + if (write_stage < WriteStage::WritingEditsFinished) + writeEditsFinish(); // Trying to fast-forward. There may be exceptions. + if (write_stage > WriteStage::WritingLocksFinished) + RUNTIME_CHECK_MSG(false, "unexpected write stage {}", magic_enum::enum_name(write_stage)); + + CheckpointProto::ManifestFileLocksPart part; + part.set_has_more(false); + details::writeMessageWithLength(*compressed_writer, part); + + write_stage = WriteStage::WritingLocksFinished; +} + +void CPManifestFileWriter::writeSuffix() +{ + if (write_stage == WriteStage::WritingFinished) + return; + if (write_stage < WriteStage::WritingLocksFinished) + writeLocksFinish(); // Trying to fast-forward. There may be exceptions. + + // Currently we do nothing in write suffix. + + write_stage = WriteStage::WritingFinished; +} + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileWriter.h b/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileWriter.h new file mode 100644 index 00000000000..8bfc5bc7500 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPManifestFileWriter.h @@ -0,0 +1,84 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include + +#include + +namespace DB::PS::V3 +{ + +class CPManifestFileWriter : private boost::noncopyable +{ +public: + struct Options + { + const std::string & file_path; + }; + + static CPManifestFileWriterPtr create(Options options) + { + return std::make_unique(std::move(options)); + } + + explicit CPManifestFileWriter(Options options) + : file_writer(std::make_unique(options.file_path)) + , compressed_writer(std::make_unique>(*file_writer, CompressionSettings())) + {} + + ~CPManifestFileWriter() + { + flush(); + } + + /// Must be called first. + void writePrefix(const CheckpointProto::ManifestFilePrefix & prefix); + + /// You can call this function multiple times. It must be called after `writePrefix`. + void writeEdits(const universal::PageEntriesEdit & edit); + void writeEditsFinish(); + + /// You can call this function multiple times. It must be called after `writeEdits`. + void writeLocks(const std::unordered_set & lock_files); + void writeLocksFinish(); + + void writeSuffix(); + + void flush(); + +private: + enum class WriteStage + { + WritingPrefix = 0, + WritingEdits, + WritingEditsFinished, + WritingLocks, + WritingLocksFinished, + WritingFinished, + }; + + // compressed + const std::unique_ptr file_writer; + const WriteBufferPtr compressed_writer; + + WriteStage write_stage = WriteStage::WritingPrefix; +}; + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.cpp new file mode 100644 index 00000000000..7841c0791c1 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.cpp @@ -0,0 +1,39 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB::PS::V3 +{ + +Page CPWriteDataSourceBlobStore::read(const BlobStore::PageIdAndEntry & page_id_and_entry) +{ + return blob_store.read(page_id_and_entry); +} + +Page CPWriteDataSourceFixture::read(const BlobStore::PageIdAndEntry & id_and_entry) +{ + auto it = data.find(id_and_entry.second.offset); + if (it == data.end()) + return Page::invalidPage(); + + auto & value = it->second; + + Page page(1); + page.mem_holder = nullptr; + page.data = ByteBuffer(value.data(), value.data() + value.size()); + return page; +} + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.h b/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.h new file mode 100644 index 00000000000..dc4a8974ad5 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.h @@ -0,0 +1,84 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::PS::V3 +{ + +/** + * The source of data when writing checkpoint data files. + */ +class CPWriteDataSource : private boost::noncopyable +{ +public: + virtual ~CPWriteDataSource() = default; + + virtual Page read(const BlobStore::PageIdAndEntry &) = 0; +}; + +using CPWriteDataSourcePtr = std::shared_ptr; + +/** + * The source of the data comes from a specified BlobStore when writing checkpoint data files. + * + * You need to ensure the BlobStore reference is alive during the lifetime of this data source. + */ +class CPWriteDataSourceBlobStore : public CPWriteDataSource +{ +public: + /** + * The caller must ensure `blob_store` is valid when using with the CPFilesWriter. + */ + explicit CPWriteDataSourceBlobStore(BlobStore & blob_store_) + : blob_store(blob_store_) + {} + + static CPWriteDataSourcePtr create(BlobStore & blob_store_) + { + return std::make_shared(blob_store_); + } + + Page read(const BlobStore::PageIdAndEntry & page_id_and_entry) override; + +private: + BlobStore & blob_store; +}; + +/** + * Should be only useful in tests. You need to specify the data that can be read out when passing different + * BlobStore offset fields. + */ +class CPWriteDataSourceFixture : public CPWriteDataSource +{ +public: + explicit CPWriteDataSourceFixture(const std::unordered_map & data_) + : data(data_) + { + } + + static CPWriteDataSourcePtr create(const std::unordered_map & data_) + { + return std::make_shared(data_); + } + + Page read(const BlobStore::PageIdAndEntry & id_and_entry) override; + +private: + std::unordered_map data; +}; + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/Proto/CMakeLists.txt b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/CMakeLists.txt new file mode 100644 index 00000000000..392214f73c8 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/CMakeLists.txt @@ -0,0 +1,23 @@ +# Copyright 2022 PingCAP, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +file(GLOB PROTO_FILES CONFIGURE_DEPENDS *.proto) +protobuf_generate_cpp(PROTO_SRCS PB_HEADERS ${PROTO_FILES}) + +add_library(PSCheckpointProto + ${PROTO_SRCS}) +target_include_directories(PSCheckpointProto + PUBLIC ${Protobuf_INCLUDE_DIR}) +target_compile_options(PSCheckpointProto + PRIVATE -Wno-unused-parameter) diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/Proto/common.proto b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/common.proto new file mode 100644 index 00000000000..9954bd95e91 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/common.proto @@ -0,0 +1,30 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.#pragma once + +syntax = "proto3"; + +package DB.PS.V3.CheckpointProto; + +message WriterInfo { + uint64 store_id = 1; + string version = 2; + string version_git = 3; + uint64 start_at_ms = 4; + RemoteInfo remote_info = 5; +} + +message RemoteInfo { + string type_name = 1; // e.g. "S3" / "LocalFS" + string name = 2; // Remote-type specific name for description purpose. +} diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/Proto/data_file.proto b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/data_file.proto new file mode 100644 index 00000000000..c53aa015f4d --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/data_file.proto @@ -0,0 +1,63 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.#pragma once + +syntax = "proto3"; + +package DB.PS.V3.CheckpointProto; + +import "common.proto"; + +/** + This prefix is only for debugging purpose. When reading the data + file, we will just seek to the specified offset and read desired data. + + The content of this prefix is useful when we want to observe and parse this data file. + */ +message DataFilePrefix { + uint64 file_format = 1; + + uint64 local_sequence = 11; + + // The first Checkpoint Manifest File ID that contains this data file. + // The referred manifest file has identical `local_sequence` and `writer_info` apparently. + string manifest_file_id = 12; + + // When data file is too large, one data file will be separated into multiples. + // This records which one it is. + uint32 sub_file_index = 13; + + // The local timestamp this data file is created. + uint64 create_at_ms = 21; + WriterInfo writer_info = 22; +} + +/** + This suffix is only for debugging purpose. When reading the data + file, we will just seek to the specified offset and read desired data. + + The content of this suffix is useful when we want to observe and parse this data file. + */ +message DataFileSuffix { + // Records contained in this data file. + repeated EntryEditRecord records = 1; +} + +message EntryEditRecord { + string page_id = 1; + uint64 version_sequence = 2; + uint64 version_epoch = 3; + + uint64 offset_in_file = 4; + uint64 size_in_file = 5; +} diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto new file mode 100644 index 00000000000..491df9dc858 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto @@ -0,0 +1,88 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.#pragma once + +syntax = "proto3"; + +package DB.PS.V3.CheckpointProto; + +import "common.proto"; + +/** + +A manifest file format V1 consists of the following parts: + +- Len|Prefix +- Len|EditsPart +- Len|LocksPart +... + + */ + +message ManifestFilePrefix { + uint64 file_format = 1; + + uint64 local_sequence = 11; + uint64 last_local_sequence = 12; + + // The local timestamp this manifest file is created. + uint64 create_at_ms = 21; + WriterInfo writer_info = 22; +} + +message ManifestFileEditsPart { + bool has_more = 1; + repeated EditRecord edits = 2; +} + +message ManifestFileLocksPart { + bool has_more = 1; + repeated LockFile locks = 2; +} + +enum EditType { + EDIT_TYPE_UNSPECIFIED = 0; + EDIT_TYPE_ENTRY = 1; + EDIT_TYPE_REF = 2; + EDIT_TYPE_EXTERNAL = 3; + EDIT_TYPE_DELETE = 4; +} + +message EntryDataLocation { + // There could be heavy duplicates. We rely on compression to reduce file size. + string data_file_id = 1; + + uint64 offset_in_file = 2; + uint64 size_in_file = 3; +} + +message EditRecord { + EditType type = 1; + string page_id = 2; + string ori_page_id = 3; + uint64 version_sequence = 4; + uint64 version_epoch = 5; + + EntryDataLocation entry_location = 6; + uint64 entry_size = 7; // The size of the data (uncompressed) + uint64 entry_tag = 8; + uint64 entry_checksum = 9; + repeated uint64 entry_fields_offset = 10; // The offset to the data (not the offset to the file) for each field. + repeated uint64 entry_fields_checksum = 11; +} + +message LockFile { + // There could be heavy duplicates. We rely on compression to reduce file size. + string name = 1; +} + diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/ProtoHelper.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/ProtoHelper.cpp new file mode 100644 index 00000000000..691fcccccb3 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/ProtoHelper.cpp @@ -0,0 +1,55 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB::PS::V3::details +{ + +constexpr UInt64 MAX_SIZE = 1UL * 1024 * 1024 * 1024; + +void readMessageWithLength(ReadBuffer & reader, google::protobuf::MessageLite & msg_out) +{ + UInt64 prefix_size = 0; + readIntBinary(prefix_size, reader); + + // Avoid corrupted data causing OOMs. + RUNTIME_CHECK_MSG(prefix_size < MAX_SIZE, "Expect total message to be < 1GiB, size={}", prefix_size); + + String buf; + buf.resize(prefix_size); + reader.readBig(buf.data(), prefix_size); + + bool ok = msg_out.ParseFromArray(buf.data(), prefix_size); + RUNTIME_CHECK(ok); +} + +void writeMessageWithLength(WriteBuffer & writer, const google::protobuf::MessageLite & msg) +{ + UInt64 prefix_size = msg.ByteSizeLong(); + writeIntBinary(prefix_size, writer); + + auto sz = writer.count(); + OutputStreamWrapper ostream{writer}; + bool ok = msg.SerializeToOstream(&ostream); + + auto sz2 = writer.count(); + RUNTIME_CHECK(ok); + RUNTIME_CHECK(UInt64(sz2 - sz) == prefix_size, sz2 - sz, prefix_size); +} + +} // namespace DB::PS::V3::details \ No newline at end of file diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/ProtoHelper.h b/dbms/src/Storages/Page/V3/CheckpointFile/ProtoHelper.h new file mode 100644 index 00000000000..2b0c3b859de --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/ProtoHelper.h @@ -0,0 +1,28 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB::PS::V3::details +{ + +void readMessageWithLength(ReadBuffer & reader, google::protobuf::MessageLite & msg_out); + +void writeMessageWithLength(WriteBuffer & writer, const google::protobuf::MessageLite & msg); + +} // namespace DB::PS::V3::details \ No newline at end of file diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/fwd.h b/dbms/src/Storages/Page/V3/CheckpointFile/fwd.h new file mode 100644 index 00000000000..ff5915889ea --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/fwd.h @@ -0,0 +1,34 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::PS::V3 +{ + +class CPFilesWriter; +using CPFilesWriterPtr = std::unique_ptr; + +class CPManifestFileWriter; +using CPManifestFileWriterPtr = std::unique_ptr; + +class CPDataFileWriter; +using CPDataFileWriterPtr = std::unique_ptr; + +class CPManifestFileReader; +using CPManifestFileReaderPtr = std::unique_ptr; + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/tests/gtest_file_read_write.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/tests/gtest_file_read_write.cpp new file mode 100644 index 00000000000..456e1e891a3 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/tests/gtest_file_read_write.cpp @@ -0,0 +1,646 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +namespace DB::PS::V3::tests +{ + +class CheckpointFileTest : public DB::base::TiFlashStorageTestBasic +{ +public: + void SetUp() override + { + dir = getTemporaryPath(); + DB::tests::TiFlashTestEnv::tryRemovePath(dir); + createIfNotExist(dir); + } + + std::string readData(const V3::CheckpointLocation & location) + { + RUNTIME_CHECK(location.offset_in_file > 0); + RUNTIME_CHECK(location.data_file_id != nullptr && !location.data_file_id->empty()); + + std::string ret; + ret.resize(location.size_in_file); + + auto buf = ReadBufferFromFile(dir + "/" + *location.data_file_id); + buf.seek(location.offset_in_file); + auto n = buf.readBig(ret.data(), location.size_in_file); + RUNTIME_CHECK(n == location.size_in_file); + + return ret; + } + +protected: + std::string dir; +}; + +TEST_F(CheckpointFileTest, WritePrefixOnly) +try +{ + auto writer = CPFilesWriter::create({ + .data_file_path = dir + "/data_1", + .data_file_id = "data_1", + .manifest_file_path = dir + "/manifest_foo", + .manifest_file_id = "manifest_foo", + .data_source = CPWriteDataSourceFixture::create({}), + }); + + writer->writePrefix({ + .writer = {}, + .sequence = 5, + .last_sequence = 3, + }); + + writer.reset(); + + ASSERT_TRUE(Poco::File(dir + "/data_1").exists()); + ASSERT_TRUE(Poco::File(dir + "/manifest_foo").exists()); + + auto manifest_reader = CPManifestFileReader::create({ + .file_path = dir + "/manifest_foo", + }); + auto prefix = manifest_reader->readPrefix(); + ASSERT_EQ(5, prefix.local_sequence()); + ASSERT_EQ(3, prefix.last_local_sequence()); +} +CATCH + +TEST_F(CheckpointFileTest, WriteEditsWithoutPrefix) +try +{ + auto writer = CPFilesWriter::create({ + .data_file_path = dir + "/data_1", + .data_file_id = "data_1", + .manifest_file_path = dir + "/manifest_foo", + .manifest_file_id = "manifest_foo", + .data_source = CPWriteDataSourceFixture::create({}), + }); + + auto edits = universal::PageEntriesEdit{}; + edits.appendRecord({.type = EditRecordType::DEL}); + + ASSERT_THROW({ + writer->writeEditsAndApplyRemoteInfo(edits); + }, + DB::Exception); +} +CATCH + +TEST_F(CheckpointFileTest, WriteEdits) +try +{ + auto writer = CPFilesWriter::create({ + .data_file_path = dir + "/data_1", + .data_file_id = "data_1", + .manifest_file_path = dir + "/manifest_foo", + .manifest_file_id = "manifest_foo", + .data_source = CPWriteDataSourceFixture::create({}), + }); + + writer->writePrefix({ + .writer = {}, + .sequence = 5, + .last_sequence = 3, + }); + { + auto edits = universal::PageEntriesEdit{}; + edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "water"}); + writer->writeEditsAndApplyRemoteInfo(edits); + } + writer->writeSuffix(); + writer.reset(); + + auto manifest_reader = CPManifestFileReader::create({ + .file_path = dir + "/manifest_foo", + }); + auto prefix = manifest_reader->readPrefix(); + CheckpointProto::StringsInternMap im; + { + auto edits_r = manifest_reader->readEdits(im); + ASSERT_TRUE(edits_r.has_value()); + ASSERT_EQ(1, edits_r->size()); + ASSERT_EQ("water", edits_r->getRecords()[0].page_id); + ASSERT_EQ(EditRecordType::VAR_DELETE, edits_r->getRecords()[0].type); + } + { + auto edits_r = manifest_reader->readEdits(im); + ASSERT_FALSE(edits_r.has_value()); + } + { + auto edits_r = manifest_reader->readEdits(im); + ASSERT_FALSE(edits_r.has_value()); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_FALSE(locks.has_value()); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_FALSE(locks.has_value()); + } +} +CATCH + +TEST_F(CheckpointFileTest, WriteMultipleEdits) +try +{ + auto writer = CPFilesWriter::create({ + .data_file_path = dir + "/data_1", + .data_file_id = "data_1", + .manifest_file_path = dir + "/manifest_foo", + .manifest_file_id = "manifest_foo", + .data_source = CPWriteDataSourceFixture::create({{5, "Said she just dreamed a dream"}, + {10, "nahida opened her eyes"}}), + }); + + writer->writePrefix({ + .writer = {}, + .sequence = 5, + .last_sequence = 3, + }); + { + auto edits = universal::PageEntriesEdit{}; + edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "water"}); + writer->writeEditsAndApplyRemoteInfo(edits); + } + { + auto edits = universal::PageEntriesEdit{}; + edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "abc", .entry = {.size = 29, .offset = 5}}); + edits.appendRecord({.type = EditRecordType::VAR_REF, .page_id = "foo", .ori_page_id = "abc"}); + edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}}); + edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "rain"}); + writer->writeEditsAndApplyRemoteInfo(edits); + } + writer->writeSuffix(); + writer.reset(); + + auto manifest_reader = CPManifestFileReader::create({ + .file_path = dir + "/manifest_foo", + }); + manifest_reader->readPrefix(); + CheckpointProto::StringsInternMap im; + { + auto edits_r = manifest_reader->readEdits(im); + ASSERT_TRUE(edits_r.has_value()); + ASSERT_EQ(1, edits_r->size()); + ASSERT_EQ(EditRecordType::VAR_DELETE, edits_r->getRecords()[0].type); + ASSERT_EQ("water", edits_r->getRecords()[0].page_id); + } + { + auto edits_r = manifest_reader->readEdits(im); + auto r = edits_r->getRecords(); + ASSERT_EQ(4, r.size()); + + ASSERT_EQ(EditRecordType::VAR_ENTRY, r[0].type); + ASSERT_EQ("abc", r[0].page_id); + ASSERT_EQ(0, r[0].entry.offset); // The deserialized offset is not the same as the original one! + ASSERT_EQ(29, r[0].entry.size); + ASSERT_TRUE(r[0].entry.checkpoint_info->is_local_data_reclaimed); + ASSERT_EQ("data_1", *r[0].entry.checkpoint_info->data_location.data_file_id); + ASSERT_EQ("Said she just dreamed a dream", readData(r[0].entry.checkpoint_info->data_location)); + + ASSERT_EQ(EditRecordType::VAR_REF, r[1].type); + ASSERT_EQ("foo", r[1].page_id); + ASSERT_EQ("abc", r[1].ori_page_id); + + ASSERT_EQ(EditRecordType::VAR_ENTRY, r[2].type); + ASSERT_EQ("aaabbb", r[2].page_id); + ASSERT_EQ(0, r[2].entry.offset); + ASSERT_EQ(22, r[2].entry.size); + ASSERT_TRUE(r[2].entry.checkpoint_info->is_local_data_reclaimed); + ASSERT_EQ("data_1", *r[2].entry.checkpoint_info->data_location.data_file_id); + ASSERT_EQ("nahida opened her eyes", readData(r[2].entry.checkpoint_info->data_location)); + + ASSERT_EQ(EditRecordType::VAR_DELETE, r[3].type); + ASSERT_EQ("rain", r[3].page_id); + + // Check data_file_id is shared. + ASSERT_EQ( + r[0].entry.checkpoint_info->data_location.data_file_id->data(), + r[2].entry.checkpoint_info->data_location.data_file_id->data()); + } + { + auto edits_r = manifest_reader->readEdits(im); + ASSERT_FALSE(edits_r.has_value()); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_TRUE(locks.has_value()); + ASSERT_EQ(1, locks->size()); + ASSERT_EQ(1, locks->count("data_1")); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_FALSE(locks.has_value()); + } +} +CATCH + +TEST_F(CheckpointFileTest, WriteEditsWithCheckpointInfo) +try +{ + auto writer = CPFilesWriter::create({ + .data_file_path = dir + "/data_1", + .data_file_id = "data_1", + .manifest_file_path = dir + "/manifest_foo", + .manifest_file_id = "manifest_foo", + .data_source = CPWriteDataSourceFixture::create({{10, "nahida opened her eyes"}}), + }); + + writer->writePrefix({ + .writer = {}, + .sequence = 5, + .last_sequence = 3, + }); + { + auto edits = universal::PageEntriesEdit{}; + edits.appendRecord({ + .type = EditRecordType::VAR_ENTRY, + .page_id = "abc", + .entry = { + .size = 10, + .offset = 5, + .checkpoint_info = CheckpointInfo{ + .data_location = { + .data_file_id = std::make_shared("my_file_id"), + }, + .is_local_data_reclaimed = false, + }, + }, + }); + edits.appendRecord({.type = EditRecordType::VAR_REF, .page_id = "foo", .ori_page_id = "abc"}); + edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}}); + edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "sun"}); + writer->writeEditsAndApplyRemoteInfo(edits); + } + writer->writeSuffix(); + writer.reset(); + + auto manifest_reader = CPManifestFileReader::create({ + .file_path = dir + "/manifest_foo", + }); + manifest_reader->readPrefix(); + CheckpointProto::StringsInternMap im; + { + auto edits_r = manifest_reader->readEdits(im); + auto r = edits_r->getRecords(); + ASSERT_EQ(4, r.size()); + + ASSERT_EQ(EditRecordType::VAR_ENTRY, r[0].type); + ASSERT_EQ("abc", r[0].page_id); + ASSERT_EQ(0, r[0].entry.offset); // The deserialized offset is not the same as the original one! + ASSERT_EQ(10, r[0].entry.size); + ASSERT_TRUE(r[0].entry.checkpoint_info->is_local_data_reclaimed); // After deserialization, this field is always true! + ASSERT_EQ("my_file_id", *r[0].entry.checkpoint_info->data_location.data_file_id); + + ASSERT_EQ(EditRecordType::VAR_REF, r[1].type); + ASSERT_EQ("foo", r[1].page_id); + ASSERT_EQ("abc", r[1].ori_page_id); + + ASSERT_EQ(EditRecordType::VAR_ENTRY, r[2].type); + ASSERT_EQ("aaabbb", r[2].page_id); + ASSERT_EQ(0, r[2].entry.offset); + ASSERT_EQ(22, r[2].entry.size); + ASSERT_TRUE(r[2].entry.checkpoint_info->is_local_data_reclaimed); + ASSERT_EQ("data_1", *r[2].entry.checkpoint_info->data_location.data_file_id); + ASSERT_EQ("nahida opened her eyes", readData(r[2].entry.checkpoint_info->data_location)); + + ASSERT_EQ(EditRecordType::VAR_DELETE, r[3].type); + ASSERT_EQ("sun", r[3].page_id); + } + { + auto edits_r = manifest_reader->readEdits(im); + ASSERT_FALSE(edits_r.has_value()); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_TRUE(locks.has_value()); + ASSERT_EQ(2, locks->size()); + ASSERT_EQ(1, locks->count("data_1")); + ASSERT_EQ(1, locks->count("my_file_id")); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_FALSE(locks.has_value()); + } +} +CATCH + +TEST_F(CheckpointFileTest, FromBlobStore) +try +{ + const auto delegator = std::make_shared(std::vector{dir}); + const auto file_provider = DB::tests::TiFlashTestEnv::getContext().getFileProvider(); + auto blob_store = BlobStore(getCurrentTestName(), file_provider, delegator, BlobConfig{}); + + auto edits = universal::PageEntriesEdit{}; + { + UniversalWriteBatch wb; + wb.putPage("page_foo", 0, "The flower carriage rocked", {4, 10, 12}); + wb.delPage("id_bar"); + wb.putPage("page_abc", 0, "Dreamed of the day that she was born"); + auto blob_store_edits = blob_store.write(wb, nullptr); + + ASSERT_EQ(blob_store_edits.size(), 3); + + edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "page_foo", .entry = blob_store_edits.getRecords()[0].entry}); + edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "id_bar"}); + edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "page_abc", .entry = blob_store_edits.getRecords()[2].entry}); + } + + auto writer = CPFilesWriter::create({ + .data_file_path = dir + "/data_1", + .data_file_id = "data_1", + .manifest_file_path = dir + "/manifest_foo", + .manifest_file_id = "manifest_foo", + .data_source = CPWriteDataSourceBlobStore::create(blob_store), + }); + writer->writePrefix({ + .writer = {}, + .sequence = 5, + .last_sequence = 3, + }); + writer->writeEditsAndApplyRemoteInfo(edits); + writer->writeSuffix(); + writer.reset(); + + auto manifest_reader = CPManifestFileReader::create({ + .file_path = dir + "/manifest_foo", + }); + manifest_reader->readPrefix(); + CheckpointProto::StringsInternMap im; + { + auto edits_r = manifest_reader->readEdits(im); + auto r = edits_r->getRecords(); + ASSERT_EQ(3, r.size()); + + ASSERT_EQ(EditRecordType::VAR_ENTRY, r[0].type); + ASSERT_EQ("page_foo", r[0].page_id); + ASSERT_EQ(0, r[0].entry.offset); + ASSERT_EQ(26, r[0].entry.size); + ASSERT_TRUE(r[0].entry.checkpoint_info->is_local_data_reclaimed); + ASSERT_EQ("The flower carriage rocked", readData(r[0].entry.checkpoint_info->data_location)); + + int begin, end; + std::tie(begin, end) = r[0].entry.getFieldOffsets(0); + ASSERT_EQ(std::make_pair(0, 4), std::make_pair(begin, end)); + std::tie(begin, end) = r[0].entry.getFieldOffsets(1); + ASSERT_EQ(std::make_pair(4, 14), std::make_pair(begin, end)); + std::tie(begin, end) = r[0].entry.getFieldOffsets(2); + ASSERT_EQ(std::make_pair(14, 26), std::make_pair(begin, end)); + ASSERT_EQ(4, r[0].entry.getFieldSize(0)); + ASSERT_EQ(10, r[0].entry.getFieldSize(1)); + ASSERT_EQ(12, r[0].entry.getFieldSize(2)); + + ASSERT_EQ(EditRecordType::VAR_DELETE, r[1].type); + ASSERT_EQ("id_bar", r[1].page_id); + + ASSERT_EQ(EditRecordType::VAR_ENTRY, r[2].type); + ASSERT_EQ("page_abc", r[2].page_id); + ASSERT_EQ(0, r[2].entry.offset); + ASSERT_EQ(36, r[2].entry.size); + ASSERT_TRUE(r[2].entry.checkpoint_info->is_local_data_reclaimed); + ASSERT_EQ("Dreamed of the day that she was born", readData(r[2].entry.checkpoint_info->data_location)); + } + EXPECT_THROW({ + // Call readLocks without draining readEdits should result in exceptions + manifest_reader->readLocks(); + }, + DB::Exception); + { + auto edits_r = manifest_reader->readEdits(im); + ASSERT_FALSE(edits_r.has_value()); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_TRUE(locks.has_value()); + ASSERT_EQ(1, locks->size()); + ASSERT_EQ(1, locks->count("data_1")); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_FALSE(locks.has_value()); + } +} +CATCH + +TEST_F(CheckpointFileTest, WriteEmptyEdits) +try +{ + auto writer = CPFilesWriter::create({ + .data_file_path = dir + "/data_1", + .data_file_id = "data_1", + .manifest_file_path = dir + "/manifest_foo", + .manifest_file_id = "manifest_foo", + .data_source = CPWriteDataSourceFixture::create({}), + }); + + writer->writePrefix({ + .writer = {}, + .sequence = 5, + .last_sequence = 3, + }); + { + auto edits = universal::PageEntriesEdit{}; + writer->writeEditsAndApplyRemoteInfo(edits); + } + { + auto edits = universal::PageEntriesEdit{}; + edits.appendRecord({.type = EditRecordType::VAR_DELETE, .page_id = "snow"}); + writer->writeEditsAndApplyRemoteInfo(edits); + } + writer->writeSuffix(); + writer.reset(); + + auto manifest_reader = CPManifestFileReader::create({ + .file_path = dir + "/manifest_foo", + }); + manifest_reader->readPrefix(); + CheckpointProto::StringsInternMap im; + + { + auto edits_r = manifest_reader->readEdits(im); + auto r = edits_r->getRecords(); + ASSERT_EQ(1, r.size()); + ASSERT_EQ(EditRecordType::VAR_DELETE, r[0].type); + } + { + auto edits_r = manifest_reader->readEdits(im); + ASSERT_FALSE(edits_r.has_value()); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_FALSE(locks.has_value()); + } +} +CATCH + +TEST_F(CheckpointFileTest, WriteEditsNotCalled) +try +{ + auto writer = CPFilesWriter::create({ + .data_file_path = dir + "/data_1", + .data_file_id = "data_1", + .manifest_file_path = dir + "/manifest_foo", + .manifest_file_id = "manifest_foo", + .data_source = CPWriteDataSourceFixture::create({}), + }); + + writer->writePrefix({ + .writer = {}, + .sequence = 5, + .last_sequence = 3, + }); + writer->writeSuffix(); + writer.reset(); + + auto manifest_reader = CPManifestFileReader::create({ + .file_path = dir + "/manifest_foo", + }); + manifest_reader->readPrefix(); + CheckpointProto::StringsInternMap im; + { + auto edits_r = manifest_reader->readEdits(im); + ASSERT_FALSE(edits_r.has_value()); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_FALSE(locks.has_value()); + } +} +CATCH + +TEST_F(CheckpointFileTest, PreDefinedLocks) +try +{ + auto writer = CPFilesWriter::create({ + .data_file_path = dir + "/data_1", + .data_file_id = "data_1", + .manifest_file_path = dir + "/manifest_foo", + .manifest_file_id = "manifest_foo", + .data_source = CPWriteDataSourceFixture::create({{10, "nahida opened her eyes"}}), + .must_locked_files = {"f1", "fx"}, + }); + + writer->writePrefix({ + .writer = {}, + .sequence = 5, + .last_sequence = 3, + }); + { + auto edits = universal::PageEntriesEdit{}; + edits.appendRecord({ + .type = EditRecordType::VAR_ENTRY, + .page_id = "abc", + .entry = { + .offset = 5, + .checkpoint_info = CheckpointInfo{ + .data_location = { + .data_file_id = std::make_shared("my_file_id"), + }, + .is_local_data_reclaimed = false, + }, + }, + }); + edits.appendRecord({.type = EditRecordType::VAR_ENTRY, .page_id = "aaabbb", .entry = {.size = 22, .offset = 10}}); + writer->writeEditsAndApplyRemoteInfo(edits); + } + writer->writeSuffix(); + writer.reset(); + + auto manifest_reader = CPManifestFileReader::create({ + .file_path = dir + "/manifest_foo", + }); + manifest_reader->readPrefix(); + CheckpointProto::StringsInternMap im; + { + auto edits_r = manifest_reader->readEdits(im); + auto r = edits_r->getRecords(); + ASSERT_EQ(2, r.size()); + } + { + auto edits_r = manifest_reader->readEdits(im); + ASSERT_FALSE(edits_r.has_value()); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_TRUE(locks.has_value()); + ASSERT_EQ(4, locks->size()); + ASSERT_EQ(1, locks->count("f1")); + ASSERT_EQ(1, locks->count("fx")); + ASSERT_EQ(1, locks->count("my_file_id")); + ASSERT_EQ(1, locks->count("data_1")); + } + { + auto locks = manifest_reader->readLocks(); + ASSERT_FALSE(locks.has_value()); + } +} +CATCH + +TEST_F(CheckpointFileTest, LotsOfEdits) +try +{ + auto writer = CPFilesWriter::create({ + .data_file_path = dir + "/data_1", + .data_file_id = "this-is-simply-a-very-long-data-file-id-7b768082-db43-4e65-a0fb-d34645200298", + .manifest_file_path = dir + "/manifest_1", + .manifest_file_id = "manifest_1", + .data_source = CPWriteDataSourceFixture::create({{10, "nahida opened her eyes"}}), + }); + + writer->writePrefix({ + .writer = {}, + .sequence = 5, + .last_sequence = 3, + }); + { + auto edits = universal::PageEntriesEdit{}; + for (size_t i = 0; i < 10000; ++i) + edits.appendRecord({ + .type = EditRecordType::VAR_ENTRY, + .page_id = fmt::format("record_{}", i), + .entry = {.size = 22, .offset = 10}, + }); + writer->writeEditsAndApplyRemoteInfo(edits); + } + writer->writeSuffix(); + writer.reset(); + + auto manifest_reader = CPManifestFileReader::create({ + .file_path = dir + "/manifest_1", + }); + manifest_reader->readPrefix(); + CheckpointProto::StringsInternMap im; + { + auto edits_r = manifest_reader->readEdits(im); + auto r = edits_r->getRecords(); + ASSERT_EQ(10000, r.size()); + } + { + auto edits_r = manifest_reader->readEdits(im); + ASSERT_FALSE(edits_r.has_value()); + } +} +CATCH + +} // namespace DB::PS::V3::tests diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/tests/gtest_proto_helper.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/tests/gtest_proto_helper.cpp new file mode 100644 index 00000000000..9b55f4f14f9 --- /dev/null +++ b/dbms/src/Storages/Page/V3/CheckpointFile/tests/gtest_proto_helper.cpp @@ -0,0 +1,57 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include + +namespace DB::PS::V3::tests +{ + +TEST(ProtoHelperTest, WriteAndRead) +try +{ + WriteBufferFromOwnString buf; + auto writer = std::make_unique>(buf, CompressionSettings()); + + CheckpointProto::EditRecord r; + r.set_page_id("foo"); + r.set_version_epoch(4); + details::writeMessageWithLength(*writer, r); + + r.Clear(); + r.set_page_id("bar box"); + details::writeMessageWithLength(*writer, r); + writer->next(); + + ReadBufferFromString read_buf(buf.str()); + auto reader = std::make_unique>(read_buf); + + CheckpointProto::EditRecord r2; + details::readMessageWithLength(*reader, r2); + ASSERT_EQ("foo", r2.page_id()); + ASSERT_EQ(4, r2.version_epoch()); + + r2.Clear(); + details::readMessageWithLength(*reader, r2); + ASSERT_EQ("bar box", r2.page_id()); + ASSERT_EQ(0, r2.version_epoch()); +} +CATCH + +} // namespace DB::PS::V3::tests diff --git a/dbms/src/Storages/Page/V3/PageEntriesEdit.cpp b/dbms/src/Storages/Page/V3/PageEntriesEdit.cpp new file mode 100644 index 00000000000..2dd8ea7b2a5 --- /dev/null +++ b/dbms/src/Storages/Page/V3/PageEntriesEdit.cpp @@ -0,0 +1,80 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB::PS::V3 +{ + +template <> +CheckpointProto::EditRecord PageEntriesEdit::EditRecord::toProto() const +{ + CheckpointProto::EditRecord proto_edit; + proto_edit.set_type(typeToProto(type)); + proto_edit.set_page_id(page_id.asStr()); + proto_edit.set_ori_page_id(ori_page_id.asStr()); + proto_edit.set_version_sequence(version.sequence); + proto_edit.set_version_epoch(version.epoch); + if (type == EditRecordType::VAR_ENTRY) + { + RUNTIME_CHECK(entry.checkpoint_info.has_value()); + proto_edit.set_entry_size(entry.size); + proto_edit.set_entry_tag(entry.tag); + proto_edit.set_entry_checksum(entry.checksum); + proto_edit.mutable_entry_location()->CopyFrom(entry.checkpoint_info->data_location.toProto()); + for (const auto & [offset, checksum] : entry.field_offsets) + { + proto_edit.add_entry_fields_offset(offset); + proto_edit.add_entry_fields_checksum(checksum); + } + } + return proto_edit; +} + +template <> +typename PageEntriesEdit::EditRecord PageEntriesEdit::EditRecord::fromProto( + const CheckpointProto::EditRecord & proto_edit, + CheckpointProto::StringsInternMap & strings_map) +{ + EditRecord rec; + rec.type = typeFromProto(proto_edit.type()); + rec.page_id = UniversalPageId(proto_edit.page_id()); + rec.ori_page_id = UniversalPageId(proto_edit.ori_page_id()); + rec.version.sequence = proto_edit.version_sequence(); + rec.version.epoch = proto_edit.version_epoch(); + rec.being_ref_count = 1; + if (rec.type == EditRecordType::VAR_ENTRY) + { + rec.entry.checkpoint_info = CheckpointInfo{ + .data_location = CheckpointLocation::fromProto(proto_edit.entry_location(), strings_map), + .is_local_data_reclaimed = true, + }; + rec.entry.size = proto_edit.entry_size(); + rec.entry.checksum = proto_edit.entry_checksum(); + rec.entry.tag = proto_edit.entry_tag(); + RUNTIME_CHECK(proto_edit.entry_fields_offset_size() == proto_edit.entry_fields_checksum_size()); + auto sz = proto_edit.entry_fields_offset_size(); + for (int i = 0; i < sz; ++i) + { + rec.entry.field_offsets.emplace_back(std::make_pair( + proto_edit.entry_fields_offset(i), + proto_edit.entry_fields_checksum(i))); + } + // Note: rec.entry.* is untouched, leaving zero value. + // We need to take care when restoring the PS instance. + } + return rec; +} + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/PageEntriesEdit.h b/dbms/src/Storages/Page/V3/PageEntriesEdit.h index efb6d0ddfd4..3c8a11bf749 100644 --- a/dbms/src/Storages/Page/V3/PageEntriesEdit.h +++ b/dbms/src/Storages/Page/V3/PageEntriesEdit.h @@ -16,14 +16,18 @@ #include #include +#include #include #include #include #include #include +#include + namespace DB::PS::V3 { + // `PageDirectory::apply` with create a version={directory.sequence, epoch=0}. // After data compaction and page entries need to be updated, will create // some entries with a version={old_sequence, epoch=old_epoch+1}. @@ -115,6 +119,40 @@ inline const char * typeToString(EditRecordType t) } } +inline CheckpointProto::EditType typeToProto(EditRecordType t) +{ + switch (t) + { + case EditRecordType::VAR_ENTRY: + return CheckpointProto::EDIT_TYPE_ENTRY; + case EditRecordType::VAR_REF: + return CheckpointProto::EDIT_TYPE_REF; + case EditRecordType::VAR_EXTERNAL: + return CheckpointProto::EDIT_TYPE_EXTERNAL; + case EditRecordType::VAR_DELETE: + return CheckpointProto::EDIT_TYPE_DELETE; + default: + RUNTIME_CHECK_MSG(false, "Unsupported Edit Type {}", magic_enum::enum_name(t)); + } +} + +inline EditRecordType typeFromProto(CheckpointProto::EditType t) +{ + switch (t) + { + case CheckpointProto::EDIT_TYPE_ENTRY: + return EditRecordType::VAR_ENTRY; + case CheckpointProto::EDIT_TYPE_REF: + return EditRecordType::VAR_REF; + case CheckpointProto::EDIT_TYPE_EXTERNAL: + return EditRecordType::VAR_EXTERNAL; + case CheckpointProto::EDIT_TYPE_DELETE: + return EditRecordType::VAR_DELETE; + default: + RUNTIME_CHECK_MSG(false, "Unsupported Proto Edit Type {}", magic_enum::enum_name(t)); + } +} + /// Page entries change to apply to PageDirectory template class PageEntriesEdit @@ -228,6 +266,12 @@ class PageEntriesEdit PageVersion version; PageEntryV3 entry; Int64 being_ref_count{1}; + + CheckpointProto::EditRecord toProto() const; + + static EditRecord fromProto( + const CheckpointProto::EditRecord & edit_rec, + CheckpointProto::StringsInternMap & strings_map); }; using EditRecords = std::vector; diff --git a/dbms/src/Storages/Page/V3/PageEntry.h b/dbms/src/Storages/Page/V3/PageEntry.h index 6259e77182b..609d697c616 100644 --- a/dbms/src/Storages/Page/V3/PageEntry.h +++ b/dbms/src/Storages/Page/V3/PageEntry.h @@ -16,6 +16,7 @@ #include #include +#include #include namespace DB @@ -37,6 +38,12 @@ struct PageEntryV3 BlobFileOffset offset = 0; // The offset of page data in file UInt64 checksum = 0; // The checksum of whole page data + /** + * Whether this page entry's data is stored in a checkpoint and where it is stored. + * If this page entry is not stored in a checkpoint file, this field is nullopt. + */ + std::optional checkpoint_info = std::nullopt; + // The offset to the beginning of specify field. PageFieldOffsetChecksums field_offsets{}; diff --git a/dbms/src/Storages/Page/V3/PageEntryCheckpointInfo.cpp b/dbms/src/Storages/Page/V3/PageEntryCheckpointInfo.cpp new file mode 100644 index 00000000000..84691d02787 --- /dev/null +++ b/dbms/src/Storages/Page/V3/PageEntryCheckpointInfo.cpp @@ -0,0 +1,53 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB::PS::V3 +{ + +CheckpointProto::EntryDataLocation CheckpointLocation::toProto() const +{ + CheckpointProto::EntryDataLocation proto_rec; + proto_rec.set_data_file_id(*data_file_id); + proto_rec.set_offset_in_file(offset_in_file); + proto_rec.set_size_in_file(size_in_file); + return proto_rec; +} + +CheckpointLocation CheckpointLocation::fromProto( + const CheckpointProto::EntryDataLocation & proto_rec, + CheckpointProto::StringsInternMap & strings_map) +{ + // Try to reuse the same data_file_id from the string intern map. + // Usually a lot of entries are placed in the same data file. This reduces memory overhead. + std::shared_ptr data_file_id = nullptr; + if (auto it = strings_map.find(proto_rec.data_file_id()); it != strings_map.end()) + { + data_file_id = it->second; + } + else + { + data_file_id = std::make_shared(proto_rec.data_file_id()); + strings_map.try_emplace(*data_file_id, data_file_id); + } + + CheckpointLocation val; + val.data_file_id = data_file_id; + val.offset_in_file = proto_rec.offset_in_file(); + val.size_in_file = proto_rec.size_in_file(); + return val; +} + +} // namespace DB::PS::V3 diff --git a/dbms/src/Storages/Page/V3/PageEntryCheckpointInfo.h b/dbms/src/Storages/Page/V3/PageEntryCheckpointInfo.h new file mode 100644 index 00000000000..c42a3cb49b7 --- /dev/null +++ b/dbms/src/Storages/Page/V3/PageEntryCheckpointInfo.h @@ -0,0 +1,58 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::PS::V3 +{ + +namespace CheckpointProto +{ +using StringsInternMap = std::unordered_map>; +} + +struct CheckpointLocation +{ + // This struct is highly coupled with manifest_file.proto -> EditEntry. + + std::shared_ptr data_file_id; + + uint64_t offset_in_file = 0; + uint64_t size_in_file = 0; + + CheckpointProto::EntryDataLocation toProto() const; + + /** + * @param strings_map A modifyable map. This function will try to reuse strings in the intern map + * or insert new strings into the intern map to share memory for the same string. + */ + static CheckpointLocation fromProto( + const CheckpointProto::EntryDataLocation & proto_rec, + CheckpointProto::StringsInternMap & strings_map); +}; + +struct CheckpointInfo +{ + CheckpointLocation data_location; + + /** + * Whether the PageEntry's local BlobData has been reclaimed. + * If the data is reclaimed, you can only read out its data from the checkpoint. + */ + bool is_local_data_reclaimed = false; +}; + +} // namespace DB::PS::V3 \ No newline at end of file diff --git a/dbms/src/Storages/Page/V3/Universal/UniversalPageId.h b/dbms/src/Storages/Page/V3/Universal/UniversalPageId.h index 2d097b4b7b9..d2247af98bd 100644 --- a/dbms/src/Storages/Page/V3/Universal/UniversalPageId.h +++ b/dbms/src/Storages/Page/V3/Universal/UniversalPageId.h @@ -106,3 +106,17 @@ struct fmt::formatter return format_to(ctx.out(), "{}", DB::details::UniversalPageIdFormatHelper::format(value)); } }; + +namespace std +{ + +template <> +struct hash +{ + std::size_t operator()(const DB::UniversalPageId & k) const + { + return hash()(k.asStr()); + } +}; + +} // namespace std diff --git a/dbms/src/Storages/Page/V3/Universal/UniversalWriteBatchImpl.h b/dbms/src/Storages/Page/V3/Universal/UniversalWriteBatchImpl.h index 0906dc918bc..162968d800f 100644 --- a/dbms/src/Storages/Page/V3/Universal/UniversalWriteBatchImpl.h +++ b/dbms/src/Storages/Page/V3/Universal/UniversalWriteBatchImpl.h @@ -94,10 +94,10 @@ class UniversalWriteBatch : private boost::noncopyable writes.emplace_back(std::move(w)); } - void putPage(const UniversalPageId & page_id, UInt64 tag, std::string_view data) + void putPage(const UniversalPageId & page_id, UInt64 tag, std::string_view data, const PageFieldSizes & data_sizes = {}) { auto buffer_ptr = std::make_shared(data); - putPage(page_id, tag, buffer_ptr, data.size()); + putPage(page_id, tag, buffer_ptr, data.size(), data_sizes); } void putExternal(const UniversalPageId & page_id, UInt64 tag) diff --git a/dbms/src/Storages/Page/WriteBatchImpl.h b/dbms/src/Storages/Page/WriteBatchImpl.h index b09a1922c2d..a7b54924dce 100644 --- a/dbms/src/Storages/Page/WriteBatchImpl.h +++ b/dbms/src/Storages/Page/WriteBatchImpl.h @@ -110,10 +110,10 @@ class WriteBatch : private boost::noncopyable writes.emplace_back(std::move(w)); } - void putPage(PageIdU64 page_id, UInt64 tag, std::string_view data) + void putPage(PageIdU64 page_id, UInt64 tag, std::string_view data, const PageFieldSizes & data_sizes = {}) { auto buffer_ptr = std::make_shared(data); - putPage(page_id, tag, buffer_ptr, data.size()); + putPage(page_id, tag, buffer_ptr, data.size(), data_sizes); } void putExternal(PageIdU64 page_id, UInt64 tag)