From ae1db913b62d27e87067f8900206731b3c0b9ddc Mon Sep 17 00:00:00 2001 From: yibin Date: Tue, 12 Dec 2023 16:05:19 +0800 Subject: [PATCH 1/8] Add one more tsan.suppression Signed-off-by: yibin --- tests/sanitize/tsan.suppression | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/sanitize/tsan.suppression b/tests/sanitize/tsan.suppression index bcdf61be0d4..29ac8020569 100644 --- a/tests/sanitize/tsan.suppression +++ b/tests/sanitize/tsan.suppression @@ -1,5 +1,6 @@ race:dbms/src/Common/TiFlashMetrics.h race:DB::Context::setCancelTest race:DB::getCurrentExceptionMessage +race:google::protobuf::internal::AssignDescriptors race:fiu_fail race:dbms/src/DataStreams/BlockStreamProfileInfo.h From aa1139478411bfad07ab3c110f69249110bad9a6 Mon Sep 17 00:00:00 2001 From: SeaRise Date: Wed, 13 Dec 2023 15:25:51 +0800 Subject: [PATCH 2/8] Refine cancel for read thread stream (#8511) ref pingcap/tiflash#8505 --- .../ReadThread/UnorderedInputStream.h | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index a887f187587..c69a9f1cb3d 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -46,11 +46,9 @@ class UnorderedInputStream : public IProfilingBlockInputStream LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->pool_id, ref_no); } - ~UnorderedInputStream() override - { - task_pool->decreaseUnorderedInputStreamRefCount(); - LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); - } + void cancel(bool /*kill*/) override { decreaseRefCount(); } + + ~UnorderedInputStream() override { decreaseRefCount(); } String getName() const override { return NAME; } @@ -67,6 +65,16 @@ class UnorderedInputStream : public IProfilingBlockInputStream } protected: + void decreaseRefCount() + { + bool ori = false; + if (is_stopped.compare_exchange_strong(ori, true)) + { + task_pool->decreaseUnorderedInputStreamRefCount(); + LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); + } + } + Block readImpl() override { FilterPtr filter_ignored; @@ -146,5 +154,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream // runtime filter std::vector runtime_filter_list; int max_wait_time_ms; + + std::atomic_bool is_stopped = false; }; } // namespace DB::DM From fe91038f40f1d8ae626ca6b7937624533767c54f Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com> Date: Wed, 13 Dec 2023 16:21:19 +0800 Subject: [PATCH 3/8] *: add util functions to generate random data (#8481) ref pingcap/tiflash#6233 --- dbms/src/Common/RandomData.cpp | 98 +++++++++++++++++++ dbms/src/Common/RandomData.h | 30 ++++++ .../tests/gtest_encryption_test.cpp | 20 +--- dbms/src/Server/DTTool/DTToolBench.cpp | 18 +--- .../DeltaMerge/workload/DataGenerator.cpp | 71 +------------- dbms/src/TestUtils/ColumnGenerator.cpp | 88 +++-------------- dbms/src/TestUtils/ColumnGenerator.h | 22 +---- 7 files changed, 155 insertions(+), 192 deletions(-) create mode 100644 dbms/src/Common/RandomData.cpp create mode 100644 dbms/src/Common/RandomData.h diff --git a/dbms/src/Common/RandomData.cpp b/dbms/src/Common/RandomData.cpp new file mode 100644 index 00000000000..98dbc77ed52 --- /dev/null +++ b/dbms/src/Common/RandomData.cpp @@ -0,0 +1,98 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include + +namespace DB::random +{ + +String randomString(UInt64 length) +{ + static const std::string charset{ + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!@#$%^&*()|[]{}:;',<.>`~"}; + std::random_device rand_dev; + std::mt19937_64 rand_gen(rand_dev()); + String str(length, '\x00'); + std::generate_n(str.begin(), str.size(), [&]() { return charset[rand_gen() % charset.size()]; }); + return str; +} + +int randomTimeOffset() +{ + std::random_device rand_dev; + std::mt19937_64 rand_gen(rand_dev()); + static constexpr int max_offset = 24 * 3600 * 10000; // 10000 days for test + return (rand_gen() % max_offset) * (rand_gen() % 2 == 0 ? 1 : -1); +} + +time_t randomUTCTimestamp() +{ + using namespace std::chrono; + return duration_cast(system_clock::now().time_since_epoch()).count() + randomTimeOffset(); +} + +struct tm randomLocalTime() +{ + time_t t = randomUTCTimestamp(); + struct tm res + { + }; + if (localtime_r(&t, &res) == nullptr) + { + throw std::invalid_argument(fmt::format("localtime_r({}) ret {}", t, strerror(errno))); + } + return res; +} + +String randomDate() +{ + auto res = randomLocalTime(); + return fmt::format("{}-{}-{}", res.tm_year + 1900, res.tm_mon + 1, res.tm_mday); +} + +String randomDateTime() +{ + auto res = randomLocalTime(); + return fmt::format( + "{}-{}-{} {}:{}:{}", + res.tm_year + 1900, + res.tm_mon + 1, + res.tm_mday, + res.tm_hour, + res.tm_min, + res.tm_sec); +} + +String randomDuration() +{ + auto res = randomLocalTime(); + return fmt::format("{}:{}:{}", res.tm_hour, res.tm_min, res.tm_sec); +} + +String randomDecimal(uint64_t prec, uint64_t scale) +{ + std::random_device rand_dev; + std::mt19937_64 rand_gen(rand_dev()); + auto s = std::to_string(rand_gen()); + if (s.size() < prec) + s += String(prec - s.size(), '0'); + else if (s.size() > prec) + s = s.substr(0, prec); + return s.substr(0, prec - scale) + "." + s.substr(prec - scale); +} + +} // namespace DB::random diff --git a/dbms/src/Common/RandomData.h b/dbms/src/Common/RandomData.h new file mode 100644 index 00000000000..890bccd93a2 --- /dev/null +++ b/dbms/src/Common/RandomData.h @@ -0,0 +1,30 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::random +{ + +String randomString(UInt64 length); +time_t randomUTCTimestamp(); +struct tm randomLocalTime(); +String randomDate(); +String randomDateTime(); +String randomDuration(); +String randomDecimal(uint64_t prec, uint64_t scale); + +} // namespace DB::random diff --git a/dbms/src/Encryption/tests/gtest_encryption_test.cpp b/dbms/src/Encryption/tests/gtest_encryption_test.cpp index 087eac2a5b2..51f269b51bc 100644 --- a/dbms/src/Encryption/tests/gtest_encryption_test.cpp +++ b/dbms/src/Encryption/tests/gtest_encryption_test.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -50,19 +51,6 @@ const unsigned char KEY[33] = "\xe4\x3e\x8e\xca\x2a\x83\xe1\x88\xfb\xd8\x02\xdc\ const unsigned char IV_RANDOM[17] = "\x77\x9b\x82\x72\x26\xb5\x76\x50\xf7\x05\xd2\xd6\xb8\xaa\xa9\x2c"; const unsigned char IV_OVERFLOW_LOW[17] = "\x77\x9b\x82\x72\x26\xb5\x76\x50\xff\xff\xff\xff\xff\xff\xff\xff"; const unsigned char IV_OVERFLOW_FULL[17] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff"; - -std::string random_string(size_t length) -{ - std::string str("0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"); - while (str.length() < length) - { - str += str; - } - std::random_device rd; - std::mt19937 generator(rd()); - std::shuffle(str.begin(), str.end(), generator); - return str.substr(0, length); -} } // namespace test constexpr size_t MAX_SIZE = 16 * 10; @@ -72,13 +60,13 @@ constexpr size_t MAX_SIZE = 16 * 10; class EncryptionTest : public testing::TestWithParam> { public: - unsigned char plaintext[MAX_SIZE]; + unsigned char plaintext[MAX_SIZE]{}; // Reserve a bit more room to make sure OpenSSL have enough buffer. - unsigned char ciphertext[MAX_SIZE + 16 * 2]; + unsigned char ciphertext[MAX_SIZE + 16 * 2]{}; void generateCiphertext(const unsigned char * iv) { - std::string random_string = test::random_string(MAX_SIZE); + std::string random_string = DB::random::randomString(MAX_SIZE); memcpy(plaintext, random_string.data(), MAX_SIZE); EVP_CIPHER_CTX * ctx; diff --git a/dbms/src/Server/DTTool/DTToolBench.cpp b/dbms/src/Server/DTTool/DTToolBench.cpp index f8c6e8f956f..de965c2b9c6 100644 --- a/dbms/src/Server/DTTool/DTToolBench.cpp +++ b/dbms/src/Server/DTTool/DTToolBench.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -92,21 +93,6 @@ ColumnDefinesPtr createColumnDefines(size_t column_number) return primitive; } -String createRandomString(std::size_t limit, std::mt19937_64 & eng, size_t & acc) -{ - // libc++-15 forbids instantiate `std::uniform_int_distribution`. - // see https://github.com/llvm/llvm-project/blob/bfcd536a8ef6b1d6e9dd211925be3b078d06fe77/libcxx/include/__random/is_valid.h#L28 - // and https://github.com/llvm/llvm-project/blob/bfcd536a8ef6b1d6e9dd211925be3b078d06fe77/libcxx/include/__random/uniform_int_distribution.h#L162 - std::uniform_int_distribution dist('a', 'z'); - std::string buffer((eng() % limit) + 1, 0); - for (auto & i : buffer) - { - i = dist(eng); - } - acc += buffer.size(); - return buffer; -} - DB::Block createBlock( size_t column_number, size_t start, @@ -191,7 +177,7 @@ DB::Block createBlock( IColumn::MutablePtr m_col = str_col.type->createColumn(); for (size_t j = 0; j < row_number; j++) { - Field field = createRandomString(limit, eng, acc); + Field field = DB::random::randomString(limit); m_col->insert(field); } str_col.column = std::move(m_col); diff --git a/dbms/src/Storages/DeltaMerge/workload/DataGenerator.cpp b/dbms/src/Storages/DeltaMerge/workload/DataGenerator.cpp index f9e97ea9e9f..6a6ee198044 100644 --- a/dbms/src/Storages/DeltaMerge/workload/DataGenerator.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/DataGenerator.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -146,7 +147,7 @@ class RandomDataGenerator : public DataGenerator } else if (family_name == "String") { - Field f = randomString(); + Field f = DB::random::randomString(128); mut_col->insert(f); } else if (family_name == "Enum8") @@ -165,19 +166,19 @@ class RandomDataGenerator : public DataGenerator } else if (family_name == "MyDateTime") { - Field f = parseMyDateTime(randomDateTime()); + Field f = parseMyDateTime(DB::random::randomDateTime()); mut_col->insert(f); } else if (family_name == "MyDate") { - Field f = parseMyDateTime(randomDate()); + Field f = parseMyDateTime(DB::random::randomDate()); mut_col->insert(f); } else if (family_name == "Decimal") { auto prec = getDecimalPrecision(*data_type, 0); auto scale = getDecimalScale(*data_type, 0); - auto s = randomDecimal(prec, scale); + auto s = DB::random::randomDecimal(prec, scale); bool negative = rand_gen() % 2 == 0; Field f; if (parseDecimal(s.data(), s.size(), negative, f)) @@ -198,68 +199,6 @@ class RandomDataGenerator : public DataGenerator return col; } - std::string randomDecimal(uint64_t prec, uint64_t scale) - { - auto s = std::to_string(rand_gen()); - if (s.size() < prec) - { - s += std::string(prec - s.size(), '0'); - } - else if (s.size() > prec) - { - s = s.substr(0, prec); - } - return s.substr(0, prec - scale) + "." + s.substr(prec - scale); - } - - std::string randomDate() - { - auto res = randomLocalTime(); - return fmt::format("{}-{}-{}", res.tm_year + 1900, res.tm_mon + 1, res.tm_mday); - } - - std::string randomDateTime() - { - auto res = randomLocalTime(); - return fmt::format( - "{}-{}-{} {}:{}:{}", - res.tm_year + 1900, - res.tm_mon + 1, - res.tm_mday, - res.tm_hour, - res.tm_min, - res.tm_sec); - } - - time_t randomUTCTimestamp() { return ::time(nullptr) + randomTimeOffset(); } - - int randomTimeOffset() - { - static constexpr int max_offset = 24 * 3600 * 10000; // 10000 days for test - return (rand_gen() % max_offset) * (rand_gen() % 2 == 0 ? 1 : -1); - } - - struct tm randomLocalTime() - { - time_t t = randomUTCTimestamp(); - struct tm res - { - }; - if (localtime_r(&t, &res) == nullptr) - { - throw std::invalid_argument(fmt::format("localtime_r({}) ret {}", t, strerror(errno))); - } - return res; - } - - std::string randomString() - { - constexpr int size = 128; - std::string str(size, 0); - std::generate_n(str.begin(), str.size(), [this]() { return charset[rand_gen() % charset.size()]; }); - return str; - } - const TableInfo & table_info; TimestampGenerator & ts_gen; std::mt19937_64 rand_gen; diff --git a/dbms/src/TestUtils/ColumnGenerator.cpp b/dbms/src/TestUtils/ColumnGenerator.cpp index 92563ed3fef..340770cebe6 100644 --- a/dbms/src/TestUtils/ColumnGenerator.cpp +++ b/dbms/src/TestUtils/ColumnGenerator.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. #include +#include #include #include #include @@ -31,7 +32,6 @@ ColumnWithTypeAndName ColumnGenerator::generateNullMapColumn(const ColumnGenerat ColumnWithTypeAndName ColumnGenerator::generate(const ColumnGeneratorOpts & opts) { - int_rand_gen = std::uniform_int_distribution(0, opts.string_max_size); DataTypePtr type; if (opts.type_name == "Decimal") type = createDecimalType(); @@ -97,9 +97,12 @@ ColumnWithTypeAndName ColumnGenerator::generate(const ColumnGeneratorOpts & opts genFloat(col); break; case TypeIndex::String: + { + auto int_rand_gen = std::uniform_int_distribution(0, opts.string_max_size); for (size_t i = 0; i < opts.size; ++i) - genString(col); + genString(col, int_rand_gen(rand_gen)); break; + } case TypeIndex::Decimal32: case TypeIndex::Decimal64: case TypeIndex::Decimal128: @@ -124,8 +127,6 @@ ColumnWithTypeAndName ColumnGenerator::generate(const ColumnGeneratorOpts & opts for (size_t i = 0; i < opts.size; ++i) genEnumValue(col, type); break; - { - } default: throw std::invalid_argument("RandomColumnGenerator invalid type"); } @@ -141,73 +142,6 @@ DataTypePtr ColumnGenerator::createDecimalType() return DB::createDecimal(prec, scale); } -String ColumnGenerator::randomString() -{ - String str(int_rand_gen(rand_gen), 0); - std::generate_n(str.begin(), str.size(), [this]() { return charset[rand_gen() % charset.size()]; }); - return str; -} - -int ColumnGenerator::randomTimeOffset() -{ - static constexpr int max_offset = 24 * 3600 * 10000; // 10000 days for test - return (rand_gen() % max_offset) * (rand_gen() % 2 == 0 ? 1 : -1); -} - -time_t ColumnGenerator::randomUTCTimestamp() -{ - return ::time(nullptr) + randomTimeOffset(); -} - -struct tm ColumnGenerator::randomLocalTime() -{ - time_t t = randomUTCTimestamp(); - struct tm res - { - }; - - if (localtime_r(&t, &res) == nullptr) - { - throw std::invalid_argument(fmt::format("localtime_r({}) ret {}", t, strerror(errno))); - } - return res; -} - -String ColumnGenerator::randomDate() -{ - auto res = randomLocalTime(); - return fmt::format("{}-{}-{}", res.tm_year + 1900, res.tm_mon + 1, res.tm_mday); -} - -String ColumnGenerator::randomDuration() -{ - auto res = randomLocalTime(); - return fmt::format("{}:{}:{}", res.tm_hour, res.tm_min, res.tm_sec); -} - -String ColumnGenerator::randomDateTime() -{ - auto res = randomLocalTime(); - return fmt::format( - "{}-{}-{} {}:{}:{}", - res.tm_year + 1900, - res.tm_mon + 1, - res.tm_mday, - res.tm_hour, - res.tm_min, - res.tm_sec); -} - -String ColumnGenerator::randomDecimal(uint64_t prec, uint64_t scale) -{ - auto s = std::to_string(rand_gen()); - if (s.size() < prec) - s += String(prec - s.size(), '0'); - else if (s.size() > prec) - s = s.substr(0, prec); - return s.substr(0, prec - scale) + "." + s.substr(prec - scale); -} - template void ColumnGenerator::genInt(MutableColumnPtr & col) { @@ -264,27 +198,27 @@ void ColumnGenerator::genFloat(MutableColumnPtr & col) col->insert(f); } -void ColumnGenerator::genString(MutableColumnPtr & col) +void ColumnGenerator::genString(MutableColumnPtr & col, UInt64 max_size) { - Field f = randomString(); + Field f = DB::random::randomString(max_size); col->insert(f); } void ColumnGenerator::genDate(MutableColumnPtr & col) { - Field f = parseMyDateTime(randomDate()); + Field f = parseMyDateTime(DB::random::randomDate()); col->insert(f); } void ColumnGenerator::genDateTime(MutableColumnPtr & col) { - Field f = parseMyDateTime(randomDateTime()); + Field f = parseMyDateTime(DB::random::randomDateTime()); col->insert(f); } void ColumnGenerator::genDuration(MutableColumnPtr & col) { - Field f = parseMyDuration(randomDuration()); + Field f = parseMyDuration(DB::random::randomDuration()); col->insert(f); } @@ -292,7 +226,7 @@ void ColumnGenerator::genDecimal(MutableColumnPtr & col, DataTypePtr & data_type { auto prec = getDecimalPrecision(*data_type, 0); auto scale = getDecimalScale(*data_type, 0); - auto s = randomDecimal(prec, scale); + auto s = DB::random::randomDecimal(prec, scale); bool negative = rand_gen() % 2 == 0; Field f; if (parseDecimal(s.data(), s.size(), negative, f)) diff --git a/dbms/src/TestUtils/ColumnGenerator.h b/dbms/src/TestUtils/ColumnGenerator.h index f1bf33b73a4..1722dee83fe 100644 --- a/dbms/src/TestUtils/ColumnGenerator.h +++ b/dbms/src/TestUtils/ColumnGenerator.h @@ -33,7 +33,7 @@ struct ColumnGeneratorOpts size_t size; String type_name; DataDistribution distribution; - String name = ""; + String name = ""; // NOLINT size_t string_max_size = 128; }; @@ -45,19 +45,7 @@ class ColumnGenerator : public ext::Singleton private: ColumnWithTypeAndName generateNullMapColumn(const ColumnGeneratorOpts & opts); std::mt19937_64 rand_gen; - std::uniform_int_distribution int_rand_gen = std::uniform_int_distribution(0, 128); std::uniform_real_distribution real_rand_gen; - /// todo support multibyte characters - const std::string charset{"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz!@#$%^&*()|[]{}:;',<.>`~"}; - - String randomString(); - int randomTimeOffset(); - time_t randomUTCTimestamp(); - struct tm randomLocalTime(); - String randomDate(); - String randomDateTime(); - String randomDuration(); - String randomDecimal(uint64_t prec, uint64_t scale); DataTypePtr createDecimalType(); @@ -67,10 +55,10 @@ class ColumnGenerator : public ext::Singleton template void genUInt(MutableColumnPtr & col); void genFloat(MutableColumnPtr & col); - void genString(MutableColumnPtr & col); - void genDate(MutableColumnPtr & col); - void genDateTime(MutableColumnPtr & col); - void genDuration(MutableColumnPtr & col); + static void genString(MutableColumnPtr & col, UInt64 max_size); + static void genDate(MutableColumnPtr & col); + static void genDateTime(MutableColumnPtr & col); + static void genDuration(MutableColumnPtr & col); void genDecimal(MutableColumnPtr & col, DataTypePtr & data_type); void genEnumValue(MutableColumnPtr & col, DataTypePtr & enum_type); }; From 929b061b53aae92b445ab92f9c6c4c6eb999ae81 Mon Sep 17 00:00:00 2001 From: jinhelin Date: Thu, 14 Dec 2023 14:22:49 +0800 Subject: [PATCH 4/8] Storages: small refine of MergedTask. (#8512) ref pingcap/tiflash#6834 --- .../Pipeline/Schedule/Tasks/RFWaitTask.h | 7 +- dbms/src/Operators/UnorderedSourceOp.cpp | 4 +- .../DeltaMerge/ReadThread/MergedTask.cpp | 47 ++++---- .../DeltaMerge/ReadThread/MergedTask.h | 100 +++++++----------- .../ReadThread/SegmentReadTaskScheduler.cpp | 14 +-- .../ReadThread/SegmentReadTaskScheduler.h | 3 +- .../DeltaMerge/ReadThread/SegmentReader.cpp | 7 +- .../ReadThread/UnorderedInputStream.h | 2 +- 8 files changed, 74 insertions(+), 110 deletions(-) diff --git a/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h b/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h index 245d45f8e30..03e3db6ff4e 100644 --- a/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h +++ b/dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h @@ -64,14 +64,15 @@ class RFWaitTask : public Task static void submitReadyRfsAndSegmentTaskPool( const RuntimeFilteList & ready_rf_list, - const DM::SegmentReadTaskPoolPtr & task_pool) + const DM::SegmentReadTaskPoolPtr & task_pool, + const LoggerPtr & log) { for (const RuntimeFilterPtr & rf : ready_rf_list) { auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead()); task_pool->appendRSOperator(rs_operator); } - DM::SegmentReadTaskScheduler::instance().add(task_pool); + DM::SegmentReadTaskScheduler::instance().add(task_pool, log); } private: @@ -82,7 +83,7 @@ class RFWaitTask : public Task filterAndMoveReadyRfs(waiting_rf_list, ready_rf_list); if (waiting_rf_list.empty() || stopwatch.elapsed() >= max_wait_time_ns) { - submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool); + submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool, log); return ExecTaskStatus::FINISHED; } return ExecTaskStatus::WAITING; diff --git a/dbms/src/Operators/UnorderedSourceOp.cpp b/dbms/src/Operators/UnorderedSourceOp.cpp index da1702ef468..085752a61d4 100644 --- a/dbms/src/Operators/UnorderedSourceOp.cpp +++ b/dbms/src/Operators/UnorderedSourceOp.cpp @@ -64,7 +64,7 @@ void UnorderedSourceOp::operatePrefixImpl() std::call_once(task_pool->addToSchedulerFlag(), [&]() { if (waiting_rf_list.empty()) { - DM::SegmentReadTaskScheduler::instance().add(task_pool); + DM::SegmentReadTaskScheduler::instance().add(task_pool, log); } else { @@ -74,7 +74,7 @@ void UnorderedSourceOp::operatePrefixImpl() if (max_wait_time_ms <= 0 || waiting_rf_list.empty()) { - RFWaitTask::submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool); + RFWaitTask::submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool, log); } else { diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp index fd0d477f86e..f039b2e7eb1 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp @@ -44,7 +44,7 @@ void MergedTask::initOnce() auto & [pool, task, stream] = units[cur_idx]; if (!pool->valid()) { - setStreamFinished(cur_idx); + setUnitFinish(cur_idx); continue; } if (pool->isRUExhausted()) @@ -65,7 +65,7 @@ int MergedTask::readOneBlock() int read_block_count = 0; for (cur_idx = 0; cur_idx < static_cast(units.size()); cur_idx++) { - if (isStreamFinished(cur_idx)) + if (units[cur_idx].isFinished()) { continue; } @@ -74,7 +74,7 @@ int MergedTask::readOneBlock() if (!pool->valid()) { - setStreamFinished(cur_idx); + setUnitFinish(cur_idx); continue; } @@ -94,7 +94,7 @@ int MergedTask::readOneBlock() } else { - setStreamFinished(cur_idx); + setUnitFinish(cur_idx); } } return read_block_count; @@ -102,29 +102,25 @@ int MergedTask::readOneBlock() void MergedTask::setException(const DB::Exception & e) { - for (auto & unit : units) - { - if (unit.pool != nullptr) - { - unit.pool->setException(e); - } - } + std::for_each(units.begin(), units.end(), [&e](auto & u) { + if (u.pool != nullptr) + u.pool->setException(e); + }); } MergedTaskPtr MergedTaskPool::pop(uint64_t pool_id) { std::lock_guard lock(mtx); - MergedTaskPtr target; - for (auto itr = merged_task_pool.begin(); itr != merged_task_pool.end(); ++itr) + auto itr = std::find_if(merged_task_pool.begin(), merged_task_pool.end(), [pool_id](const auto & merged_task) { + return merged_task->containPool(pool_id); + }); + if (itr != merged_task_pool.end()) { - if ((*itr)->containPool(pool_id)) - { - target = *itr; - merged_task_pool.erase(itr); - break; - } + auto target = *itr; + merged_task_pool.erase(itr); + return target; } - return target; + return nullptr; // Not Found. } void MergedTaskPool::push(const MergedTaskPtr & t) @@ -136,13 +132,8 @@ void MergedTaskPool::push(const MergedTaskPtr & t) bool MergedTaskPool::has(UInt64 pool_id) { std::lock_guard lock(mtx); - for (const auto & t : merged_task_pool) - { - if (t->containPool(pool_id)) - { - return true; - } - } - return false; + return std::any_of(merged_task_pool.begin(), merged_task_pool.end(), [pool_id](const auto & merged_task) { + return merged_task->containPool(pool_id); + }); } } // namespace DB::DM \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h index 5c3b63a7593..abec43dbe38 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h @@ -25,6 +25,29 @@ struct MergedUnit , task(task_) {} + ~MergedUnit() + { + // Calling `setFinish()` for updating memory statistics of `MemoryTracker`. + [[maybe_unused]] auto res = setFinish(); + } + + [[nodiscard]] bool isFinished() const { return pool == nullptr && task == nullptr && stream == nullptr; } + + // If setted return true else return false. + [[nodiscard]] bool setFinish() + { + if (!isFinished()) + { + // For updating memory statistics of `MemoryTracker`. + MemoryTrackerSetter setter(true, pool->mem_tracker.get()); + task = nullptr; + stream = nullptr; + pool = nullptr; + return true; + } + return false; + } + SegmentReadTaskPoolPtr pool; // The information of a read request. SegmentReadTaskPtr task; // The information of a segment that want to read. BlockInputStreamPtr stream; // BlockInputStream of a segment, will be created by read threads. @@ -44,7 +67,6 @@ class MergedTask , inited(false) , cur_idx(-1) , finished_count(0) - , log(Logger::get()) { passive_merged_segments.fetch_add(units.size() - 1, std::memory_order_relaxed); GET_METRIC(tiflash_storage_read_thread_gauge, type_merged_task).Increment(); @@ -54,8 +76,6 @@ class MergedTask passive_merged_segments.fetch_sub(units.size() - 1, std::memory_order_relaxed); GET_METRIC(tiflash_storage_read_thread_gauge, type_merged_task).Decrement(); GET_METRIC(tiflash_storage_read_thread_seconds, type_merged_task).Observe(sw.elapsedSeconds()); - // `setAllStreamFinished` must be called to explicitly releasing all streams for updating memory statistics of `MemoryTracker`. - setAllStreamsFinished(); } int readBlock(); @@ -64,36 +84,27 @@ class MergedTask const GlobalSegmentID & getSegmentId() const { return seg_id; } - size_t getPoolCount() const { return units.size(); } - - std::vector getPoolIds() const + bool containPool(uint64_t pool_id) const { - std::vector ids; - ids.reserve(units.size()); - for (const auto & unit : units) - { - if (unit.pool != nullptr) - { - ids.push_back(unit.pool->pool_id); - } - } - return ids; + return std::any_of(units.begin(), units.end(), [pool_id](const auto & u) { + return u.pool != nullptr && u.pool->pool_id == pool_id; + }); } - bool containPool(uint64_t pool_id) const + void setException(const DB::Exception & e); + + String toString() const { - for (const auto & unit : units) - { - if (unit.pool != nullptr && unit.pool->pool_id == pool_id) - { - return true; - } - } - return false; + std::vector ids; + ids.reserve(units.size()); + std::for_each(units.begin(), units.end(), [&ids](const auto & u) { + if (u.pool != nullptr) + ids.push_back(u.pool->pool_id); + }); + return fmt::format("seg_id:{} pool_id:{}", seg_id, ids); } - void setException(const DB::Exception & e); - const LoggerPtr getCurrentLogger() const + LoggerPtr getCurrentLogger() const { // `std::cmp_*` is safety to compare negative signed integers and unsigned integers. if (likely( @@ -115,41 +126,13 @@ class MergedTask private: void initOnce(); int readOneBlock(); + void setUnitFinish(int i) { finished_count += units[i].setFinish(); } - bool isStreamFinished(size_t i) const - { - return units[i].pool == nullptr && units[i].task == nullptr && units[i].stream == nullptr; - } - - void setStreamFinished(size_t i) - { - if (!isStreamFinished(i)) - { - // `MergedUnit.stream` must be released explicitly for updating memory statistics of `MemoryTracker`. - auto & [pool, task, stream] = units[i]; - { - MemoryTrackerSetter setter(true, pool->mem_tracker.get()); - task = nullptr; - stream = nullptr; - } - pool = nullptr; - finished_count++; - } - } - - void setAllStreamsFinished() - { - for (size_t i = 0; i < units.size(); ++i) - { - setStreamFinished(i); - } - } GlobalSegmentID seg_id; std::vector units; bool inited; int cur_idx; size_t finished_count; - LoggerPtr log; Stopwatch sw; inline static std::atomic passive_merged_segments{0}; }; @@ -162,10 +145,6 @@ using MergedTaskPtr = std::shared_ptr; class MergedTaskPool { public: - MergedTaskPool() - : log(Logger::get()) - {} - MergedTaskPtr pop(uint64_t pool_id); void push(const MergedTaskPtr & t); bool has(UInt64 pool_id); @@ -173,6 +152,5 @@ class MergedTaskPool private: std::mutex mtx; std::list merged_task_pool GUARDED_BY(mtx); - LoggerPtr log; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp index 1deea8cb498..17fb7582a68 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.cpp @@ -30,10 +30,12 @@ SegmentReadTaskScheduler::~SegmentReadTaskScheduler() sched_thread.join(); } -void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool) +void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool, const LoggerPtr & req_log) { Stopwatch sw_add; + // `add_lock` is only used in this function to make all threads calling `add` to execute serially. std::lock_guard add_lock(add_mtx); + // `lock` is used to protect data. std::lock_guard lock(mtx); Stopwatch sw_do_add; read_pools.add(pool); @@ -43,12 +45,11 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool) { merging_segments[seg_id].push_back(pool->pool_id); } - auto block_slots = pool->getFreeBlockSlots(); LOG_DEBUG( - log, + req_log, "Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={:.3f}us do_add_cost={:.3f}us", // pool->pool_id, - block_slots, + pool->getFreeBlockSlots(), tasks.size(), read_pools.size(), sw_add.elapsed() / 1000.0, @@ -235,9 +236,8 @@ bool SegmentReadTaskScheduler::schedule() { LOG_DEBUG( log, - "scheduleMergedTask segment_id={} pool_ids={} cost={}ms pool_count={}", - merged_task->getSegmentId(), - merged_task->getPoolIds(), + "scheduleMergedTask merged_task=<{}> cost={}ms pool_count={}", + merged_task->toString(), elapsed_ms, pool_count); } diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h index bd80f6b9728..a269c0812a0 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReadTaskScheduler.h @@ -17,7 +17,6 @@ #include #include -#include namespace DB::DM { using SegmentReadTaskPoolList = CircularScanList; @@ -43,7 +42,7 @@ class SegmentReadTaskScheduler DISALLOW_COPY_AND_MOVE(SegmentReadTaskScheduler); // Add SegmentReadTaskPool to `read_pools` and index segments into merging_segments. - void add(const SegmentReadTaskPoolPtr & pool) LOCKS_EXCLUDED(add_mtx, mtx); + void add(const SegmentReadTaskPoolPtr & pool, const LoggerPtr & req_log) LOCKS_EXCLUDED(add_mtx, mtx); void pushMergedTask(const MergedTaskPtr & p) { merged_task_pool.push(p); } diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp index a1e714507a0..570d67bd5a6 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp @@ -99,12 +99,7 @@ class SegmentReader } if (read_count <= 0) { - LOG_DEBUG( - log, - "All finished, pool_ids={} segment_id={} read_count={}", - merged_task->getPoolIds(), - merged_task->getSegmentId(), - read_count); + LOG_DEBUG(log, "All finished, merged_task=<{}> read_count={}", merged_task->toString(), read_count); } // If `merged_task` is pushed back to `MergedTaskPool`, it can be accessed by another read thread if it is scheduled. // So do not push back to `MergedTaskPool` when exception happened since current read thread can still access to this `merged_task` object and set exception message to it. diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index c69a9f1cb3d..2bdee467848 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -131,7 +131,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream } std::call_once(task_pool->addToSchedulerFlag(), [&]() { prepareRuntimeFilter(); - SegmentReadTaskScheduler::instance().add(task_pool); + SegmentReadTaskScheduler::instance().add(task_pool, log); }); task_pool_added = true; } From 7b85fc913d2ee7a5f61d1b22e5acb38be6bf609f Mon Sep 17 00:00:00 2001 From: Liqi Geng Date: Thu, 14 Dec 2023 17:09:20 +0800 Subject: [PATCH 5/8] Process streams of partition tables one by one in MultiplexInputStream (#8507) close pingcap/tiflash#8505 --- dbms/src/DataStreams/MultiplexInputStream.h | 82 +++++++------------ .../ReadThread/UnorderedInputStream.h | 8 +- 2 files changed, 33 insertions(+), 57 deletions(-) diff --git a/dbms/src/DataStreams/MultiplexInputStream.h b/dbms/src/DataStreams/MultiplexInputStream.h index ceddfa22f90..78c00c8e76c 100644 --- a/dbms/src/DataStreams/MultiplexInputStream.h +++ b/dbms/src/DataStreams/MultiplexInputStream.h @@ -33,43 +33,49 @@ class MultiPartitionStreamPool public: MultiPartitionStreamPool() = default; + void cancel(bool kill) + { + std::deque tmp_streams; + { + std::unique_lock lk(mu); + if (is_cancelled) + return; + + is_cancelled = true; + tmp_streams.swap(added_streams); + } + + for (auto & stream : tmp_streams) + if (auto * p_stream = dynamic_cast(stream.get())) + { + p_stream->cancel(kill); + } + } + void addPartitionStreams(const BlockInputStreams & cur_streams) { if (cur_streams.empty()) return; std::unique_lock lk(mu); - streams_queue_by_partition.push_back(std::make_shared>>()); - for (const auto & stream : cur_streams) - streams_queue_by_partition.back()->push(stream); added_streams.insert(added_streams.end(), cur_streams.begin(), cur_streams.end()); } - std::shared_ptr pickOne() + BlockInputStreamPtr pickOne() { std::unique_lock lk(mu); - if (streams_queue_by_partition.empty()) + if (added_streams.empty()) return nullptr; - if (streams_queue_id >= static_cast(streams_queue_by_partition.size())) - streams_queue_id = 0; - - auto & q = *streams_queue_by_partition[streams_queue_id]; - std::shared_ptr ret = nullptr; - assert(!q.empty()); - ret = q.front(); - q.pop(); - if (q.empty()) - streams_queue_id = removeQueue(streams_queue_id); - else - streams_queue_id = nextQueueId(streams_queue_id); + + auto ret = std::move(added_streams.front()); + added_streams.pop_front(); return ret; } - int exportAddedStreams(BlockInputStreams & ret_streams) + void exportAddedStreams(BlockInputStreams & ret_streams) { std::unique_lock lk(mu); for (auto & stream : added_streams) ret_streams.push_back(stream); - return added_streams.size(); } int addedStreamsCnt() @@ -79,40 +85,8 @@ class MultiPartitionStreamPool } private: - int removeQueue(int queue_id) - { - streams_queue_by_partition[queue_id] = nullptr; - if (queue_id != static_cast(streams_queue_by_partition.size()) - 1) - { - swap(streams_queue_by_partition[queue_id], streams_queue_by_partition.back()); - streams_queue_by_partition.pop_back(); - return queue_id; - } - else - { - streams_queue_by_partition.pop_back(); - return 0; - } - } - - int nextQueueId(int queue_id) const - { - if (queue_id + 1 < static_cast(streams_queue_by_partition.size())) - return queue_id + 1; - else - return 0; - } - - static void swap( - std::shared_ptr>> & a, - std::shared_ptr>> & b) - { - a.swap(b); - } - - std::vector>>> streams_queue_by_partition; - std::vector> added_streams; - int streams_queue_id = 0; + std::deque added_streams; + bool is_cancelled; std::mutex mu; }; @@ -170,6 +144,8 @@ class MultiplexInputStream final : public IProfilingBlockInputStream child->cancel(kill); } } + + shared_pool->cancel(kill); } Block getHeader() const override { return children.at(0)->getHeader(); } diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h index 2bdee467848..7e3ce7d4cc8 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h @@ -46,9 +46,9 @@ class UnorderedInputStream : public IProfilingBlockInputStream LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->pool_id, ref_no); } - void cancel(bool /*kill*/) override { decreaseRefCount(); } + void cancel(bool /*kill*/) override { decreaseRefCount(true); } - ~UnorderedInputStream() override { decreaseRefCount(); } + ~UnorderedInputStream() override { decreaseRefCount(false); } String getName() const override { return NAME; } @@ -65,13 +65,13 @@ class UnorderedInputStream : public IProfilingBlockInputStream } protected: - void decreaseRefCount() + void decreaseRefCount(bool is_cancel) { bool ori = false; if (is_stopped.compare_exchange_strong(ori, true)) { task_pool->decreaseUnorderedInputStreamRefCount(); - LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->pool_id, ref_no); + LOG_DEBUG(log, "{}, pool_id={} ref_no={}", is_cancel ? "Cancel" : "Destroy", task_pool->pool_id, ref_no); } } From e926e5395405b144176cdaca01ee4351662df22b Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 14 Dec 2023 18:58:50 +0800 Subject: [PATCH 6/8] FAP: Persist Segment with meta info rather than segment_id (#8498) close pingcap/tiflash#8382 --- dbms/src/Common/FailPoint.cpp | 1 + dbms/src/Common/TiFlashMetrics.h | 1 + .../DeltaMerge/ColumnFile/ColumnFileBig.cpp | 1 + .../DeltaMerge/ColumnFile/ColumnFileBig.h | 1 + .../ColumnFile/ColumnFilePersisted.cpp | 17 +-- .../ColumnFile/ColumnFilePersisted.h | 2 + .../DeltaMerge/ColumnFile/ColumnFileTiny.cpp | 14 ++- .../DeltaMerge/ColumnFile/ColumnFileTiny.h | 1 + .../DeltaMerge/ColumnFile/ColumnFile_V3.cpp | 5 +- .../Delta/ColumnFilePersistedSet.cpp | 26 ++++- .../DeltaMerge/Delta/ColumnFilePersistedSet.h | 9 ++ .../DeltaMerge/Delta/DeltaValueSpace.cpp | 25 +++- .../DeltaMerge/Delta/DeltaValueSpace.h | 13 +++ .../src/Storages/DeltaMerge/DeltaMergeStore.h | 10 +- .../DeltaMerge/DeltaMergeStore_Ingest.cpp | 27 ++--- dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 1 - dbms/src/Storages/DeltaMerge/Segment.cpp | 35 ++++-- dbms/src/Storages/DeltaMerge/Segment.h | 6 +- .../Storages/DeltaMerge/StableValueSpace.cpp | 29 ++++- .../Storages/DeltaMerge/StableValueSpace.h | 6 + dbms/src/Storages/KVStore/CMakeLists.txt | 4 +- dbms/src/Storages/KVStore/KVStore.cpp | 2 + .../KVStore/MultiRaft/Disagg/CMakeLists.txt | 23 ++++ .../MultiRaft/Disagg/CheckpointIngestInfo.cpp | 108 ++++++++++-------- .../KVStore/MultiRaft/Disagg/FastAddPeer.h | 4 +- .../MultiRaft/Disagg/fast_add_peer.proto | 29 +++++ .../KVStore/MultiRaft/RegionPersister.cpp | 2 +- .../KVStore/MultiRaft/RegionPersister.h | 2 +- dbms/src/Storages/KVStore/Utils/AsyncTasks.h | 3 +- .../tests/gtest_kvstore_fast_add_peer.cpp | 28 +++++ .../CheckpointFile/Proto/manifest_file.proto | 3 +- .../Page/V3/Universal/S3PageReader.cpp | 5 +- .../Storages/Page/V3/Universal/S3PageReader.h | 2 +- dbms/src/Storages/StorageDeltaMerge.cpp | 2 +- dbms/src/Storages/StorageDeltaMerge.h | 2 +- 35 files changed, 338 insertions(+), 111 deletions(-) create mode 100644 dbms/src/Storages/KVStore/MultiRaft/Disagg/CMakeLists.txt create mode 100644 dbms/src/Storages/KVStore/MultiRaft/Disagg/fast_add_peer.proto diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index afee3ff95f2..24d5bb9e053 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -107,6 +107,7 @@ namespace DB M(force_set_parallel_prehandle_threshold) \ M(force_raise_prehandle_exception) \ M(force_agg_on_partial_block) \ + M(force_not_clean_fap_on_destroy) \ M(delta_tree_create_node_fail) \ M(disable_flush_cache) diff --git a/dbms/src/Common/TiFlashMetrics.h b/dbms/src/Common/TiFlashMetrics.h index 3af6272bfe4..4c7e12c2e04 100644 --- a/dbms/src/Common/TiFlashMetrics.h +++ b/dbms/src/Common/TiFlashMetrics.h @@ -396,6 +396,7 @@ namespace DB F(type_failed_timeout, {{"type", "failed_timeout"}}), \ F(type_failed_baddata, {{"type", "failed_baddata"}}), \ F(type_failed_repeated, {{"type", "failed_repeated"}}), \ + F(type_restore, {{"type", "restore"}}), \ F(type_succeed, {{"type", "succeed"}})) \ M(tiflash_fap_task_state, \ "", \ diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 1675ceddcfa..973666f6167 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -132,6 +132,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata( } ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint( + [[maybe_unused]] const LoggerPtr & parent_log, DMContext & dm_context, // const RowKeyRange & target_range, ReadBuffer & buf, diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h index 41a289b19e2..ae6ab8da042 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h @@ -89,6 +89,7 @@ class ColumnFileBig : public ColumnFilePersisted ReadBuffer & buf); static ColumnFilePersistedPtr createFromCheckpoint( + const LoggerPtr & parent_log, DMContext & context, // const RowKeyRange & target_range, ReadBuffer & buf, diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp index 8f38b1b1d64..167f7b47e2a 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp @@ -140,14 +140,16 @@ ColumnFilePersisteds deserializeSavedColumnFiles( break; default: throw Exception( - "Unexpected delta value version: " + DB::toString(version) - + ", latest version: " + DB::toString(DeltaFormat::V3), - ErrorCodes::LOGICAL_ERROR); + ErrorCodes::LOGICAL_ERROR, + "Unexpected delta value version: {}, latest version: {}", + version, + DeltaFormat::V3); } return column_files; } ColumnFilePersisteds createColumnFilesFromCheckpoint( // + const LoggerPtr & parent_log, DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf, @@ -162,13 +164,14 @@ ColumnFilePersisteds createColumnFilesFromCheckpoint( // switch (version) { case DeltaFormat::V3: - column_files = createColumnFilesInV3FormatFromCheckpoint(context, segment_range, buf, temp_ps, wbs); + column_files = createColumnFilesInV3FormatFromCheckpoint(parent_log, context, segment_range, buf, temp_ps, wbs); break; default: throw Exception( - "Unexpected delta value version: " + DB::toString(version) - + ", latest version: " + DB::toString(DeltaFormat::V3), - ErrorCodes::LOGICAL_ERROR); + ErrorCodes::LOGICAL_ERROR, + "Unexpected delta value version: {}, latest version: {}", + version, + DeltaFormat::V3); } return column_files; } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h index 85cf08bb7dd..6b792bc33bd 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h @@ -62,6 +62,7 @@ ColumnFilePersisteds deserializeSavedColumnFiles( ReadBuffer & buf); ColumnFilePersisteds createColumnFilesFromCheckpoint( // + const LoggerPtr & parent_log, DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf, @@ -78,6 +79,7 @@ ColumnFilePersisteds deserializeSavedColumnFilesInV3Format( ReadBuffer & buf); ColumnFilePersisteds createColumnFilesInV3FormatFromCheckpoint( // + const LoggerPtr & parent_log, DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf, diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp index 9526e78d217..720b973dd21 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp @@ -183,6 +183,7 @@ ColumnFilePersistedPtr ColumnFileTiny::deserializeMetadata( } std::tuple ColumnFileTiny::createFromCheckpoint( + const LoggerPtr & parent_log, const DMContext & context, ReadBuffer & buf, UniversalPageStoragePtr temp_ps, @@ -201,6 +202,7 @@ std::tuple ColumnFileTiny::createFromCheckpoin readIntBinary(rows, buf); readIntBinary(bytes, buf); auto new_cf_id = context.storage_pool->newLogPageId(); + /// Generate a new RemotePage with an entry with data location on S3 auto remote_page_id = UniversalPageIdFormat::toFullPageId( UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Log, context.physical_table_id), data_page_id); @@ -213,14 +215,16 @@ std::tuple ColumnFileTiny::createFromCheckpoin PS::V3::CheckpointLocation new_remote_data_location{ .data_file_id = std::make_shared(remote_data_file_key), .offset_in_file = remote_data_location->offset_in_file, - .size_in_file = remote_data_location->size_in_file}; + .size_in_file = remote_data_location->size_in_file, + }; + // TODO: merge the `getEntry` and `getCheckpointLocation` auto entry = temp_ps->getEntry(remote_page_id); LOG_DEBUG( - Logger::get(), - "Write remote page[page_id={} remote_location={}] using local page id {}", - remote_page_id, + parent_log, + "Write remote page to local, page_id={} remote_location={} remote_page_id={}", + new_cf_id, new_remote_data_location.toDebugString(), - new_cf_id); + remote_page_id); wbs.log.putRemotePage(new_cf_id, 0, entry.size, new_remote_data_location, std::move(entry.field_offsets)); auto column_file_schema = std::make_shared(*schema); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h index eebb77134c5..073e42b0ccd 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h @@ -144,6 +144,7 @@ class ColumnFileTiny : public ColumnFilePersisted ColumnFileSchemaPtr & last_schema); static std::tuple createFromCheckpoint( + const LoggerPtr & parent_log, const DMContext & context, ReadBuffer & buf, UniversalPageStoragePtr temp_ps, diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp index 0df8a3ca380..b3c3514ff4d 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp @@ -103,6 +103,7 @@ ColumnFilePersisteds deserializeSavedColumnFilesInV3Format( } ColumnFilePersisteds createColumnFilesInV3FormatFromCheckpoint( // + const LoggerPtr & parent_log, DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf, @@ -127,12 +128,12 @@ ColumnFilePersisteds createColumnFilesInV3FormatFromCheckpoint( // case ColumnFile::Type::TINY_FILE: { std::tie(column_file, last_schema) - = ColumnFileTiny::createFromCheckpoint(context, buf, temp_ps, last_schema, wbs); + = ColumnFileTiny::createFromCheckpoint(parent_log, context, buf, temp_ps, last_schema, wbs); break; } case ColumnFile::Type::BIG_FILE: { - column_file = ColumnFileBig::createFromCheckpoint(context, segment_range, buf, temp_ps, wbs); + column_file = ColumnFileBig::createFromCheckpoint(parent_log, context, segment_range, buf, temp_ps, wbs); break; } default: diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp index 19748f60ab9..5c11c73879e 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp @@ -29,14 +29,19 @@ namespace DB { namespace DM { +inline UInt64 serializeColumnFilePersisteds(WriteBuffer & buf, const ColumnFilePersisteds & persisted_files) +{ + serializeSavedColumnFiles(buf, persisted_files); + return buf.count(); +} + inline void serializeColumnFilePersisteds( WriteBatches & wbs, PageIdU64 id, const ColumnFilePersisteds & persisted_files) { MemoryWriteBuffer buf(0, COLUMN_FILE_SERIALIZE_BUFFER_SIZE); - serializeSavedColumnFiles(buf, persisted_files); - auto data_size = buf.count(); + auto data_size = serializeColumnFilePersisteds(buf, persisted_files); wbs.meta.putPage(id, 0, buf.tryGetReadBuffer(), data_size); } @@ -98,11 +103,21 @@ ColumnFilePersistedSetPtr ColumnFilePersistedSet::restore( // { Page page = context.storage_pool->metaReader()->read(id); ReadBufferFromMemory buf(page.data.begin(), page.data.size()); + return ColumnFilePersistedSet::restore(context, segment_range, buf, id); +} + +ColumnFilePersistedSetPtr ColumnFilePersistedSet::restore( // + DMContext & context, + const RowKeyRange & segment_range, + ReadBuffer & buf, + PageIdU64 id) +{ auto column_files = deserializeSavedColumnFiles(context, segment_range, buf); return std::make_shared(id, column_files); } ColumnFilePersistedSetPtr ColumnFilePersistedSet::createFromCheckpoint( // + const LoggerPtr & parent_log, DMContext & context, UniversalPageStoragePtr temp_ps, const RowKeyRange & segment_range, @@ -114,7 +129,7 @@ ColumnFilePersistedSetPtr ColumnFilePersistedSet::createFromCheckpoint( // delta_id); auto meta_page = temp_ps->read(delta_page_id); ReadBufferFromMemory meta_buf(meta_page.data.begin(), meta_page.data.size()); - auto column_files = createColumnFilesFromCheckpoint(context, segment_range, meta_buf, temp_ps, wbs); + auto column_files = createColumnFilesFromCheckpoint(parent_log, context, segment_range, meta_buf, temp_ps, wbs); auto new_persisted_set = std::make_shared(delta_id, column_files); return new_persisted_set; } @@ -124,6 +139,11 @@ void ColumnFilePersistedSet::saveMeta(WriteBatches & wbs) const serializeColumnFilePersisteds(wbs, metadata_id, persisted_files); } +void ColumnFilePersistedSet::saveMeta(WriteBuffer & buf) const +{ + serializeColumnFilePersisteds(buf, persisted_files); +} + void ColumnFilePersistedSet::recordRemoveColumnFilesPages(WriteBatches & wbs) const { for (const auto & file : persisted_files) diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index 8d46e9a2982..6fb6b7ffd7a 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -75,8 +75,16 @@ class ColumnFilePersistedSet /// Restore the metadata of this instance. /// Only called after reboot. static ColumnFilePersistedSetPtr restore(DMContext & context, const RowKeyRange & segment_range, PageIdU64 id); + /// Restore from a checkpoint from other peer. + /// Only used in FAP. + static ColumnFilePersistedSetPtr restore( // + DMContext & context, + const RowKeyRange & segment_range, + ReadBuffer & buf, + PageIdU64 id); static ColumnFilePersistedSetPtr createFromCheckpoint( // + const LoggerPtr & parent_log, DMContext & context, UniversalPageStoragePtr temp_ps, const RowKeyRange & segment_range, @@ -107,6 +115,7 @@ class ColumnFilePersistedSet const ColumnFilePersisteds & getFiles() const { return persisted_files; } + void saveMeta(WriteBuffer & buf) const; void saveMeta(WriteBatches & wbs) const; void recordRemoveColumnFilesPages(WriteBatches & wbs) const; diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index a72262c2914..26f4598b797 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -68,7 +68,18 @@ DeltaValueSpacePtr DeltaValueSpace::restore(DMContext & context, const RowKeyRan return std::make_shared(std::move(persisted_file_set)); } +DeltaValueSpacePtr DeltaValueSpace::restore( + DMContext & context, + const RowKeyRange & segment_range, + ReadBuffer & buf, + PageIdU64 id) +{ + auto persisted_file_set = ColumnFilePersistedSet::restore(context, segment_range, buf, id); + return std::make_shared(std::move(persisted_file_set)); +} + DeltaValueSpacePtr DeltaValueSpace::createFromCheckpoint( // + const LoggerPtr & parent_log, DMContext & context, UniversalPageStoragePtr temp_ps, const RowKeyRange & segment_range, @@ -76,15 +87,27 @@ DeltaValueSpacePtr DeltaValueSpace::createFromCheckpoint( // WriteBatches & wbs) { auto persisted_file_set - = ColumnFilePersistedSet::createFromCheckpoint(context, temp_ps, segment_range, delta_id, wbs); + = ColumnFilePersistedSet::createFromCheckpoint(parent_log, context, temp_ps, segment_range, delta_id, wbs); return std::make_shared(std::move(persisted_file_set)); } +void DeltaValueSpace::saveMeta(WriteBuffer & buf) const +{ + persisted_file_set->saveMeta(buf); +} + void DeltaValueSpace::saveMeta(WriteBatches & wbs) const { persisted_file_set->saveMeta(wbs); } +std::string DeltaValueSpace::serializeMeta() const +{ + WriteBufferFromOwnString wb; + saveMeta(wb); + return wb.releaseStr(); +} + template struct CloneColumnFilesHelper { diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index 357b9c1f1bd..10d817181e1 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -108,6 +108,9 @@ class DeltaValueSpace LoggerPtr log; +private: + void saveMeta(WriteBuffer & buf) const; + public: explicit DeltaValueSpace( PageIdU64 id_, @@ -119,8 +122,17 @@ class DeltaValueSpace /// Restore the metadata of this instance. /// Only called after reboot. static DeltaValueSpacePtr restore(DMContext & context, const RowKeyRange & segment_range, PageIdU64 id); + /// Restore from a checkpoint from other peer. + /// Only used in FAP. + static DeltaValueSpacePtr restore( + DMContext & context, + const RowKeyRange & segment_range, + ReadBuffer & buf, + PageIdU64 id); + static DeltaValueSpacePtr createFromCheckpoint( // + const LoggerPtr & parent_log, DMContext & context, UniversalPageStoragePtr temp_ps, const RowKeyRange & segment_range, @@ -161,6 +173,7 @@ class DeltaValueSpace bool hasAbandoned() const { return abandoned.load(std::memory_order_relaxed); } void saveMeta(WriteBatches & wbs) const; + std::string serializeMeta() const; void recordRemoveColumnFilesPages(WriteBatches & wbs) const; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index a58ca9d7ad2..bf4250df291 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -335,7 +335,7 @@ class DeltaMergeStore : private boost::noncopyable std::vector ingestSegmentsUsingSplit( const DMContextPtr & dm_context, const RowKeyRange & ingest_range, - const std::vector & target_segments); + const std::vector & segments_to_ingest); bool ingestSegmentDataIntoSegmentUsingSplit( DMContext & dm_context, @@ -346,13 +346,13 @@ class DeltaMergeStore : private boost::noncopyable Segments buildSegmentsFromCheckpointInfo( const DMContextPtr & dm_context, const DM::RowKeyRange & range, - CheckpointInfoPtr checkpoint_info); + const CheckpointInfoPtr & checkpoint_info) const; Segments buildSegmentsFromCheckpointInfo( const Context & db_context, const DB::Settings & db_settings, const DM::RowKeyRange & range, - CheckpointInfoPtr checkpoint_info) + const CheckpointInfoPtr & checkpoint_info) { auto dm_context = newDMContext(db_context, db_settings); return buildSegmentsFromCheckpointInfo(dm_context, range, checkpoint_info); @@ -361,13 +361,13 @@ class DeltaMergeStore : private boost::noncopyable void ingestSegmentsFromCheckpointInfo( const DMContextPtr & dm_context, const DM::RowKeyRange & range, - CheckpointIngestInfoPtr checkpoint_info); + const CheckpointIngestInfoPtr & checkpoint_info); void ingestSegmentsFromCheckpointInfo( const Context & db_context, const DB::Settings & db_settings, const DM::RowKeyRange & range, - CheckpointIngestInfoPtr checkpoint_info) + const CheckpointIngestInfoPtr & checkpoint_info) { auto dm_context = newDMContext(db_context, db_settings); return ingestSegmentsFromCheckpointInfo(dm_context, range, checkpoint_info); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index fe2c77ab1fb..6895280f4c0 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -829,7 +829,7 @@ UInt64 DeltaMergeStore::ingestFiles( std::vector DeltaMergeStore::ingestSegmentsUsingSplit( const DMContextPtr & dm_context, const RowKeyRange & ingest_range, - const std::vector & target_segments) + const std::vector & segments_to_ingest) { std::set updated_segments; @@ -914,19 +914,19 @@ std::vector DeltaMergeStore::ingestSegmentsUsingSplit( log, "Table ingest checkpoint using split - split ingest phase - begin, ingest_range={}, files_n={}", ingest_range.toDebugString(), - target_segments.size()); + segments_to_ingest.size()); - for (size_t segment_idx = 0; segment_idx < target_segments.size(); segment_idx++) + for (size_t remote_segment_idx = 0; remote_segment_idx < segments_to_ingest.size(); remote_segment_idx++) { // We may meet empty segment, just ignore it - if (target_segments[segment_idx]->getEstimatedRows() == 0) + if (segments_to_ingest[remote_segment_idx]->getEstimatedRows() == 0) { LOG_INFO( log, "Table ingest checkpoint using split - split ingest phase - Meet empty Segment, skipped. " "ingest_range={} segment_idx={}", ingest_range.toDebugString(), - segment_idx); + remote_segment_idx); continue; } @@ -938,7 +938,7 @@ std::vector DeltaMergeStore::ingestSegmentsUsingSplit( * │ │-- Seg --│------- Segment -----│ │ * We will try to ingest it into all overlapped segments. */ - auto file_ingest_range = target_segments[segment_idx]->getRowKeyRange(); + auto file_ingest_range = segments_to_ingest[remote_segment_idx]->getRowKeyRange(); while (!file_ingest_range.none()) // This DMFile has remaining data to ingest { auto [segment, is_empty] = getSegmentByStartKey( @@ -962,9 +962,9 @@ std::vector DeltaMergeStore::ingestSegmentsUsingSplit( LOG_INFO( log, "Table ingest checkpoint using split - split ingest phase - Try to ingest file into segment, " - "segment_idx={} segment_id={} segment_ingest_range={} segment={} segment_ingest_range={}", - segment_idx, - target_segments[segment_idx]->segmentId(), + "remote_segment_idx={} remote_segment_id={} remote_ingest_range={} segment={} segment_ingest_range={}", + remote_segment_idx, + segments_to_ingest[remote_segment_idx]->segmentId(), file_ingest_range.toDebugString(), segment->simpleInfo(), segment_ingest_range.toDebugString()); @@ -973,7 +973,7 @@ std::vector DeltaMergeStore::ingestSegmentsUsingSplit( *dm_context, segment, segment_ingest_range, - target_segments[segment_idx]); + segments_to_ingest[remote_segment_idx]); if (succeeded) { updated_segments.insert(segment); @@ -1126,7 +1126,7 @@ bool DeltaMergeStore::ingestSegmentDataIntoSegmentUsingSplit( Segments DeltaMergeStore::buildSegmentsFromCheckpointInfo( const DMContextPtr & dm_context, const DM::RowKeyRange & range, - CheckpointInfoPtr checkpoint_info) + const CheckpointInfoPtr & checkpoint_info) const { if (unlikely(range.none())) { @@ -1134,7 +1134,7 @@ Segments DeltaMergeStore::buildSegmentsFromCheckpointInfo( } LOG_INFO( log, - "Ingest checkpoint from remote, store_id={} region_id={}", + "Build checkpoint from remote, store_id={} region_id={}", checkpoint_info->remote_store_id, checkpoint_info->region_id); auto segment_meta_infos = Segment::readAllSegmentsMetaInfoInRange(*dm_context, range, checkpoint_info); @@ -1155,13 +1155,14 @@ Segments DeltaMergeStore::buildSegmentsFromCheckpointInfo( return {}; } wbs.writeLogAndData(); + LOG_INFO(log, "Finish write fap checkpoint, region_id={}", checkpoint_info->region_id); return restored_segments; } void DeltaMergeStore::ingestSegmentsFromCheckpointInfo( const DMContextPtr & dm_context, const DM::RowKeyRange & range, - CheckpointIngestInfoPtr checkpoint_info) + const CheckpointIngestInfoPtr & checkpoint_info) { if (unlikely(shutdown_called.load(std::memory_order_relaxed))) { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 66b69c4a126..bddaebc585a 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -132,7 +132,6 @@ DMFilePtr DMFile::create( fiu_do_on(FailPoints::force_use_dmfile_format_v3, { // some unit test we need mock upload DMFile to S3, which only support DMFileFormat::V3 version = DMFileFormat::V3; - LOG_WARNING(Logger::get(), "!!!force use DMFileFormat::V3!!!"); }); // On create, ref_id is the same as file_id. DMFilePtr new_dmfile(new DMFile( diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 6fe24ab2e47..395657dd3df 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -317,7 +317,7 @@ SegmentPtr Segment::newSegment( // context.storage_pool->newMetaPageId()); } -inline void readSegmentMetaInfo(ReadBuffer & buf, Segment::SegmentMetaInfo & segment_info) +void readSegmentMetaInfo(ReadBuffer & buf, Segment::SegmentMetaInfo & segment_info) { readIntBinary(segment_info.version, buf); readIntBinary(segment_info.epoch, buf); @@ -472,9 +472,14 @@ Segments Segment::createTargetSegmentsFromCheckpoint( // segment_info.range.toDebugString(), segment_info.epoch, segment_info.next_segment_id); - auto stable = StableValueSpace::createFromCheckpoint(context, temp_ps, segment_info.stable_id, wbs); - auto delta - = DeltaValueSpace::createFromCheckpoint(context, temp_ps, segment_info.range, segment_info.delta_id, wbs); + auto stable = StableValueSpace::createFromCheckpoint(parent_log, context, temp_ps, segment_info.stable_id, wbs); + auto delta = DeltaValueSpace::createFromCheckpoint( + parent_log, + context, + temp_ps, + segment_info.range, + segment_info.delta_id, + wbs); auto segment = std::make_shared( Logger::get("Checkpoint"), segment_info.epoch, @@ -496,17 +501,33 @@ Segments Segment::createTargetSegmentsFromCheckpoint( // return segments; } -void Segment::serialize(WriteBatchWrapper & wb) +void Segment::serializeToFAPTempSegment(FastAddPeerProto::FAPTempSegmentInfo * segment_info) +{ + { + WriteBufferFromOwnString wb; + storeSegmentMetaInfo(wb); + segment_info->set_segment_meta(wb.releaseStr()); + } + segment_info->set_delta_meta(delta->serializeMeta()); + segment_info->set_stable_meta(stable->serializeMeta()); +} + +UInt64 Segment::storeSegmentMetaInfo(WriteBuffer & buf) const { - MemoryWriteBuffer buf(0, SEGMENT_BUFFER_SIZE); writeIntBinary(STORAGE_FORMAT_CURRENT.segment, buf); writeIntBinary(epoch, buf); rowkey_range.serialize(buf); writeIntBinary(next_segment_id, buf); writeIntBinary(delta->getId(), buf); writeIntBinary(stable->getId(), buf); + return buf.count(); +} - auto data_size = buf.count(); // Must be called before tryGetReadBuffer. +void Segment::serialize(WriteBatchWrapper & wb) const +{ + MemoryWriteBuffer buf(0, SEGMENT_BUFFER_SIZE); + // Must be called before tryGetReadBuffer. + auto data_size = storeSegmentMetaInfo(buf); wb.putPage(segment_id, 0, buf.tryGetReadBuffer(), data_size); } diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 24e77aa57a4..85bb94886e9 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -27,6 +27,7 @@ #include #include #include +#include #include namespace DB::DM @@ -187,7 +188,9 @@ class Segment UniversalPageStoragePtr temp_ps, WriteBatches & wbs); - void serialize(WriteBatchWrapper & wb); + void serializeToFAPTempSegment(DB::FastAddPeerProto::FAPTempSegmentInfo * segment_info); + UInt64 storeSegmentMetaInfo(WriteBuffer & buf) const; + void serialize(WriteBatchWrapper & wb) const; /// Attach a new ColumnFile into the Segment. The ColumnFile will be added to MemFileSet and flushed to disk later. /// The block data of the passed in ColumnFile should be placed on disk before calling this function. @@ -752,4 +755,5 @@ class Segment const LoggerPtr log; }; +void readSegmentMetaInfo(ReadBuffer & buf, Segment::SegmentMetaInfo & segment_info); } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 2f7321af09d..7a56f9834aa 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -81,23 +81,41 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang void StableValueSpace::saveMeta(WriteBatchWrapper & meta_wb) { MemoryWriteBuffer buf(0, 8192); + // The method must call `buf.count()` to get the last seralized size before `buf.tryGetReadBuffer` + auto data_size = saveMeta(buf); + meta_wb.putPage(id, 0, buf.tryGetReadBuffer(), data_size); +} + +UInt64 StableValueSpace::saveMeta(WriteBuffer & buf) const +{ writeIntBinary(STORAGE_FORMAT_CURRENT.stable, buf); writeIntBinary(valid_rows, buf); writeIntBinary(valid_bytes, buf); writeIntBinary(static_cast(files.size()), buf); - for (auto & f : files) + for (const auto & f : files) writeIntBinary(f->pageId(), buf); - auto data_size = buf.count(); // Must be called before tryGetReadBuffer. - meta_wb.putPage(id, 0, buf.tryGetReadBuffer(), data_size); + return buf.count(); } -StableValueSpacePtr StableValueSpace::restore(DMContext & dm_context, PageIdU64 id) +std::string StableValueSpace::serializeMeta() const { - auto stable = std::make_shared(id); + WriteBufferFromOwnString wb; + saveMeta(wb); + return wb.releaseStr(); +} +StableValueSpacePtr StableValueSpace::restore(DMContext & dm_context, PageIdU64 id) +{ Page page = dm_context.storage_pool->metaReader()->read(id); // not limit restore ReadBufferFromMemory buf(page.data.begin(), page.data.size()); + return StableValueSpace::restore(dm_context, buf, id); +} + +StableValueSpacePtr StableValueSpace::restore(DMContext & dm_context, ReadBuffer & buf, PageIdU64 id) +{ + auto stable = std::make_shared(id); + UInt64 version, valid_rows, valid_bytes, size; readIntBinary(version, buf); if (version != StableFormat::V1) @@ -158,6 +176,7 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & dm_context, PageIdU64 } StableValueSpacePtr StableValueSpace::createFromCheckpoint( // + [[maybe_unused]] const LoggerPtr & parent_log, DMContext & dm_context, UniversalPageStoragePtr temp_ps, PageIdU64 stable_id, diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.h b/dbms/src/Storages/DeltaMerge/StableValueSpace.h index 878924a7587..7e33dcba92f 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.h @@ -45,8 +45,10 @@ class StableValueSpace : public std::enable_shared_from_this {} static StableValueSpacePtr restore(DMContext & context, PageIdU64 id); + static StableValueSpacePtr restore(DMContext & context, ReadBuffer & buf, PageIdU64 id); static StableValueSpacePtr createFromCheckpoint( // + const LoggerPtr & parent_log, DMContext & context, UniversalPageStoragePtr temp_ps, PageIdU64 stable_id, @@ -66,6 +68,7 @@ class StableValueSpace : public std::enable_shared_from_this PageIdU64 getId() const { return id; } void saveMeta(WriteBatchWrapper & meta_wb); + std::string serializeMeta() const; size_t getRows() const; size_t getBytes() const; @@ -258,6 +261,9 @@ class StableValueSpace : public std::enable_shared_from_this size_t avgRowBytes(const ColumnDefines & read_columns); +private: + UInt64 saveMeta(WriteBuffer & buf) const; + private: const PageIdU64 id; diff --git a/dbms/src/Storages/KVStore/CMakeLists.txt b/dbms/src/Storages/KVStore/CMakeLists.txt index 4173a539b89..8acf48d5fe2 100644 --- a/dbms/src/Storages/KVStore/CMakeLists.txt +++ b/dbms/src/Storages/KVStore/CMakeLists.txt @@ -21,8 +21,10 @@ add_headers_and_sources(kvstore ./TiKVHelpers) add_headers_and_sources(kvstore ./Decode) add_headers_and_sources(kvstore ./Read) +add_subdirectory (./MultiRaft/Disagg) + add_library(kvstore ${kvstore_headers} ${kvstore_sources}) -target_link_libraries(kvstore PRIVATE dbms page) +target_link_libraries(kvstore PRIVATE dbms page KVStoreFastAddPeerProto) if (ENABLE_TESTS) add_headers_and_sources(kvstore tests) diff --git a/dbms/src/Storages/KVStore/KVStore.cpp b/dbms/src/Storages/KVStore/KVStore.cpp index 07a6fb72c02..3d4f9f7f34d 100644 --- a/dbms/src/Storages/KVStore/KVStore.cpp +++ b/dbms/src/Storages/KVStore/KVStore.cpp @@ -50,6 +50,7 @@ namespace FailPoints { extern const char force_fail_in_flush_region_data[]; extern const char pause_passive_flush_before_persist_region[]; +extern const char force_not_clean_fap_on_destroy[]; } // namespace FailPoints KVStore::KVStore(Context & context) @@ -349,6 +350,7 @@ void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTas if (tmt.getContext().getSharedContextDisagg()->isDisaggregatedStorageMode()) { + fiu_do_on(FailPoints::force_not_clean_fap_on_destroy, { return; }); // Everytime we remove region, we try to clean obsolete fap ingest info. auto fap_ctx = tmt.getContext().getSharedContextDisagg()->fap_context; fap_ctx->cleanCheckpointIngestInfo(tmt, region_id); diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CMakeLists.txt b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CMakeLists.txt new file mode 100644 index 00000000000..47bdc1a084f --- /dev/null +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CMakeLists.txt @@ -0,0 +1,23 @@ +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +file(GLOB PROTO_FILES CONFIGURE_DEPENDS *.proto) +protobuf_generate_cpp(PROTO_SRCS PB_HEADERS ${PROTO_FILES}) + +add_library(KVStoreFastAddPeerProto + ${PROTO_SRCS}) +target_include_directories(KVStoreFastAddPeerProto + PUBLIC ${Protobuf_INCLUDE_DIR}) +target_compile_options(KVStoreFastAddPeerProto + PRIVATE -Wno-unused-parameter) diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp index 21ea19d1ba8..7da4356c89a 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -28,18 +29,17 @@ namespace DB { -static constexpr uint8_t FAP_INGEST_INFO_PERSIST_FMT_VER = 1; - CheckpointIngestInfoPtr CheckpointIngestInfo::restore( TMTContext & tmt, const TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 peer_id) { - StoreID remote_store_id; + GET_METRIC(tiflash_fap_task_result, type_restore).Increment(); RegionPtr region; DM::Segments restored_segments; + auto log = DB::Logger::get("CheckpointIngestInfo"); auto uni_ps = tmt.getContext().getWriteNodePageStorage(); auto snapshot = uni_ps->getSnapshot(fmt::format("read_fap_i_{}", region_id)); auto page_id @@ -54,43 +54,62 @@ CheckpointIngestInfoPtr CheckpointIngestInfo::restore( peer_id, tmt.getKVStore()->getStoreID(std::memory_order_relaxed)); } - ReadBufferFromMemory buf(page.data.begin(), page.data.size()); - RUNTIME_CHECK_MSG(readBinary2(buf) == FAP_INGEST_INFO_PERSIST_FMT_VER, "wrong fap ingest info format"); - std::vector restored_segments_id; + FastAddPeerProto::CheckpointIngestInfoPersisted ingest_info_persisted; + if (!ingest_info_persisted.ParseFromArray(page.data.data(), page.data.size())) { - auto count = readBinary2(buf); - for (size_t i = 0; i < count; ++i) - { - auto segment_id = readBinary2(buf); - restored_segments_id.push_back(segment_id); - } - remote_store_id = readBinary2(buf); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Can't parse CheckpointIngestInfo, region_id={} peer_id={} store_id={}", + region_id, + peer_id, + tmt.getKVStore()->getStoreID(std::memory_order_relaxed)); + } + + { + ReadBufferFromMemory buf( + ingest_info_persisted.region_info().data(), + ingest_info_persisted.region_info().size()); + region = Region::deserialize(buf, proxy_helper); } - region = Region::deserialize(buf, proxy_helper); + + StoreID remote_store_id = ingest_info_persisted.remote_store_id(); auto & storages = tmt.getStorages(); auto keyspace_id = region->getKeyspaceID(); auto table_id = region->getMappedTableID(); auto storage = storages.get(keyspace_id, table_id); - - auto log = DB::Logger::get("CheckpointIngestInfo"); if (storage && storage->engineType() == TiDB::StorageEngine::DT) { auto dm_storage = std::dynamic_pointer_cast(storage); auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); - for (auto segment_id : restored_segments_id) + for (const auto & seg_persisted : ingest_info_persisted.segments()) { - restored_segments.emplace_back(DM::Segment::restoreSegment(log, *dm_context, segment_id)); + ReadBufferFromString buf(seg_persisted.segment_meta()); + DM::Segment::SegmentMetaInfo segment_info; + readSegmentMetaInfo(buf, segment_info); + + ReadBufferFromString buf_delta(seg_persisted.delta_meta()); + auto delta + = DM::DeltaValueSpace::restore(*dm_context, segment_info.range, buf_delta, segment_info.delta_id); + ReadBufferFromString buf_stable(seg_persisted.stable_meta()); + auto stable = DM::StableValueSpace::restore(*dm_context, buf_stable, segment_info.stable_id); + + LOG_DEBUG( + log, + "Restore segments for checkpoint, remote_segment_id={} range={} remote_store_id={}", + segment_info.segment_id, + segment_info.range.toDebugString(), + remote_store_id); + restored_segments.push_back(std::make_shared( + log, + segment_info.epoch, + segment_info.range, + segment_info.segment_id, + segment_info.next_segment_id, + delta, + stable)); } - LOG_INFO( - log, - "CheckpointIngestInfo restore success with {} segments, region_id={} table_id={} keyspace_id={} region={}", - restored_segments.size(), - region_id, - table_id, - keyspace_id, - region->getDebugString()); } else { @@ -117,43 +136,34 @@ void CheckpointIngestInfo::persistToLocal() const } auto uni_ps = tmt.getContext().getWriteNodePageStorage(); UniversalWriteBatch wb; - MemoryWriteBuffer wb_buffer; + // Write: // - The region, which is actually data and meta in KVStore. // - The segment ids point to segments which are already persisted but not ingested. - static_assert(sizeof(FAP_INGEST_INFO_PERSIST_FMT_VER) == 1); - auto data_size = writeBinary2(FAP_INGEST_INFO_PERSIST_FMT_VER, wb_buffer); + + FastAddPeerProto::CheckpointIngestInfoPersisted ingest_info_persisted; + { - size_t segment_data_size = 0; - segment_data_size += writeBinary2(restored_segments.size(), wb_buffer); for (const auto & restored_segment : restored_segments) { - data_size += writeBinary2(restored_segment->segmentId(), wb_buffer); + auto * segment_info = ingest_info_persisted.add_segments(); + restored_segment->serializeToFAPTempSegment(segment_info); } - segment_data_size += writeBinary2(remote_store_id, wb_buffer); - data_size += segment_data_size; - RUNTIME_CHECK_MSG( - wb_buffer.count() == data_size, - "buffer {} != data_size {}, segment_data_size={}", - wb_buffer.count(), - data_size, - segment_data_size); } { // Although the region is the first peer of this region in this store, we can't write it to formal KVStore for now. // Otherwise it could be uploaded and then overwritten. - auto region_size = RegionPersister::computeRegionWriteBuffer(*region, wb_buffer); - data_size += region_size; - RUNTIME_CHECK_MSG( - wb_buffer.count() == data_size, - "buffer {} != data_size {}, region_size={}", - wb_buffer.count(), - data_size, - region_size); + WriteBufferFromOwnString wb; + RegionPersister::computeRegionWriteBuffer(*region, wb); + ingest_info_persisted.set_region_info(wb.releaseStr()); } + ingest_info_persisted.set_remote_store_id(remote_store_id); + + auto s = ingest_info_persisted.SerializeAsString(); + auto data_size = s.size(); + auto read_buf = std::make_shared(s); auto page_id = UniversalPageIdFormat::toLocalKVPrefix(UniversalPageIdFormat::LocalKVKeyType::FAPIngestInfo, region_id); - auto read_buf = wb_buffer.tryGetReadBuffer(); wb.putPage(UniversalPageId(page_id.data(), page_id.size()), 0, read_buf, data_size); uni_ps->write(std::move(wb), DB::PS::V3::PageType::Local, nullptr); LOG_INFO( diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h index e03699ecf3f..5119fce89d3 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/FastAddPeer.h @@ -62,10 +62,12 @@ class FastAddPeerContext RegionPtr region, DM::Segments && segments, UInt64 start_time); - void debugRemoveCheckpointIngestInfo(UInt64 region_id); std::optional tryGetCheckpointIngestInfo(UInt64 region_id) const; void cleanCheckpointIngestInfo(TMTContext & tmt, UInt64 region_id); + // Remove the checkpoint ingest info from memory. Only for testing. + void debugRemoveCheckpointIngestInfo(UInt64 region_id); + public: std::shared_ptr tasks_trace; diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/fast_add_peer.proto b/dbms/src/Storages/KVStore/MultiRaft/Disagg/fast_add_peer.proto new file mode 100644 index 00000000000..1f976d749de --- /dev/null +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/fast_add_peer.proto @@ -0,0 +1,29 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package DB.FastAddPeerProto; + +message FAPTempSegmentInfo { + bytes segment_meta = 1; + bytes delta_meta = 2; + bytes stable_meta = 3; +} + +message CheckpointIngestInfoPersisted { + bytes region_info = 1; + repeated FAPTempSegmentInfo segments = 2; + uint64 remote_store_id = 3; +} \ No newline at end of file diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.cpp b/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.cpp index b22e553cabb..a7d0191ee29 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.cpp @@ -67,7 +67,7 @@ void RegionPersister::computeRegionWriteBuffer(const Region & region, RegionCach } } -size_t RegionPersister::computeRegionWriteBuffer(const Region & region, MemoryWriteBuffer & buffer) +size_t RegionPersister::computeRegionWriteBuffer(const Region & region, WriteBuffer & buffer) { auto region_size = 0; std::tie(region_size, std::ignore) = region.serialize(buffer); diff --git a/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.h b/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.h index 406c0afeed2..c4cab4b0f8d 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.h +++ b/dbms/src/Storages/KVStore/MultiRaft/RegionPersister.h @@ -52,7 +52,7 @@ class RegionPersister final : private boost::noncopyable using RegionCacheWriteElement = std::tuple; static void computeRegionWriteBuffer(const Region & region, RegionCacheWriteElement & region_write_buffer); - static size_t computeRegionWriteBuffer(const Region & region, MemoryWriteBuffer & buffer); + static size_t computeRegionWriteBuffer(const Region & region, WriteBuffer & buffer); PageStorageConfig getPageStorageSettings() const; diff --git a/dbms/src/Storages/KVStore/Utils/AsyncTasks.h b/dbms/src/Storages/KVStore/Utils/AsyncTasks.h index 6bb94d6688d..e6d8e7e2c00 100644 --- a/dbms/src/Storages/KVStore/Utils/AsyncTasks.h +++ b/dbms/src/Storages/KVStore/Utils/AsyncTasks.h @@ -46,6 +46,7 @@ struct AsyncTasks using P = std::packaged_task; std::shared_ptr

p = std::make_shared

(P(f)); + // TODO(fap) `start_time` may not be set immediately when calling `p`, will be fixed in another PR. auto res = thread_pool->trySchedule([p]() { (*p)(); }, 0, 0); if (res) { @@ -96,7 +97,7 @@ struct AsyncTasks { std::scoped_lock l(mtx); auto it2 = start_time.find(key); - RUNTIME_CHECK_MSG(it2 != start_time.end(), "queryElapsed meets empty key"); + RUNTIME_CHECK_MSG(it2 != start_time.end(), "queryStartTime meets empty key"); return it2->second; } diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index d35b9abc182..8f766be1bcd 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -48,6 +49,11 @@ FastAddPeerRes FastAddPeerImplWrite( UInt64 start_time); void ApplyFapSnapshotImpl(TMTContext & tmt, TiFlashRaftProxyHelper * proxy_helper, UInt64 region_id, UInt64 peer_id); +namespace FailPoints +{ +extern const char force_not_clean_fap_on_destroy[]; +} // namespace FailPoints + namespace tests { class RegionKVStoreTestFAP : public KVStoreTestBase @@ -313,7 +319,16 @@ try auto fap_context = global_context.getSharedContextDisagg()->fap_context; uint64_t region_id = 1; FastAddPeerImplWrite(global_context.getTMTContext(), region_id, 2333, std::move(mock_data), 0); + + // Remove the checkpoint ingest info and region from memory. + // Testing whether FAP can be handled properly after restart. fap_context->debugRemoveCheckpointIngestInfo(region_id); + // Remove the region so that the snapshot will be accepted. + FailPointHelper::enableFailPoint("force_not_clean_fap_on_destroy"); + SCOPE_EXIT({ FailPointHelper::disableFailPoint("force_not_clean_fap_on_destroy"); }); + kvstore->handleDestroy(region_id, global_context.getTMTContext()); + + // After restart, continue the FAP from persisted checkpoint ingest info. ApplyFapSnapshotImpl(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333); { @@ -323,6 +338,7 @@ try ASSERT_TRUE(storage && storage->engineType() == TiDB::StorageEngine::DT); auto dm_storage = std::dynamic_pointer_cast(storage); auto store = dm_storage->getStore(); + ASSERT_EQ(store->getRowKeyColumnSize(), 1); verifyRows( global_context, store, @@ -370,9 +386,21 @@ try auto & global_context = TiFlashTestEnv::getGlobalContext(); auto fap_context = global_context.getSharedContextDisagg()->fap_context; uint64_t region_id = 1; + + // Will generate and persist some information in local ps, which will not be uploaded. FastAddPeerImplWrite(global_context.getTMTContext(), region_id, 2333, std::move(mock_data), 0); dumpCheckpoint(); FastAddPeerImplWrite(global_context.getTMTContext(), region_id, 2333, std::move(mock_data), 0); + auto in_mem_ingest_info = fap_context->getOrRestoreCheckpointIngestInfo( + global_context.getTMTContext(), + proxy_helper.get(), + region_id, + 2333); + auto in_disk_ingest_info + = CheckpointIngestInfo::restore(global_context.getTMTContext(), proxy_helper.get(), region_id, 2333); + ASSERT_EQ(in_mem_ingest_info->getRegion()->getDebugString(), in_disk_ingest_info->getRegion()->getDebugString()); + ASSERT_EQ(in_mem_ingest_info->getRestoredSegments().size(), in_disk_ingest_info->getRestoredSegments().size()); + ASSERT_EQ(in_mem_ingest_info->getRemoteStoreId(), in_disk_ingest_info->getRemoteStoreId()); auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); const auto manifests = S3::CheckpointManifestS3Set::getFromS3(*s3_client, kvs.getStoreID()); diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto index 26eb086a26a..b222cef4b35 100644 --- a/dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto +++ b/dbms/src/Storages/Page/V3/CheckpointFile/Proto/manifest_file.proto @@ -85,5 +85,4 @@ message EditRecord { message LockFile { // There could be heavy duplicates. We rely on compression to reduce file size. bytes name = 1; -} - +} \ No newline at end of file diff --git a/dbms/src/Storages/Page/V3/Universal/S3PageReader.cpp b/dbms/src/Storages/Page/V3/Universal/S3PageReader.cpp index 3dd7da31270..e28fa13946e 100644 --- a/dbms/src/Storages/Page/V3/Universal/S3PageReader.cpp +++ b/dbms/src/Storages/Page/V3/Universal/S3PageReader.cpp @@ -74,12 +74,13 @@ UniversalPageMap S3PageReader::read(const UniversalPageIdAndEntries & page_id_an return page_map; } -std::pair S3PageReader::read(const FieldReadInfos & to_read) +std::pair S3PageReader::read(FieldReadInfos & to_read) { UniversalPageMap complete_page_map; size_t read_fields_size = 0; - for (const auto & read_info : to_read) + for (auto & read_info : to_read) { + std::sort(read_info.fields.begin(), read_info.fields.end()); const auto & page_entry = read_info.entry; // read the whole page from S3 and save it as `complete_page` complete_page_map.emplace(read_info.page_id, read(std::make_pair(read_info.page_id, page_entry))); diff --git a/dbms/src/Storages/Page/V3/Universal/S3PageReader.h b/dbms/src/Storages/Page/V3/Universal/S3PageReader.h index dcd8f31c03c..26b53a899ff 100644 --- a/dbms/src/Storages/Page/V3/Universal/S3PageReader.h +++ b/dbms/src/Storages/Page/V3/Universal/S3PageReader.h @@ -47,7 +47,7 @@ class S3PageReader : private Allocator using FieldReadInfos = PS::V3::universal::BlobStoreType::FieldReadInfos; // return two page_maps, the first contains the whole page for given page id which is used to update local cache, // the second just contains read fields data. - std::pair read(const FieldReadInfos & to_read); + std::pair read(FieldReadInfos & to_read); }; using S3PageReaderPtr = std::unique_ptr; diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 2a4e39ac05f..491119e1288 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1190,7 +1190,7 @@ DM::Segments StorageDeltaMerge::buildSegmentsFromCheckpointInfo( void StorageDeltaMerge::ingestSegmentsFromCheckpointInfo( const DM::RowKeyRange & range, - CheckpointIngestInfoPtr checkpoint_info, + const CheckpointIngestInfoPtr & checkpoint_info, const Settings & settings) { GET_METRIC(tiflash_storage_command_count, type_ingest_checkpoint).Increment(); diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index e5b210576ee..b854841c84c 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -133,7 +133,7 @@ class StorageDeltaMerge void ingestSegmentsFromCheckpointInfo( const DM::RowKeyRange & range, - CheckpointIngestInfoPtr checkpoint_info, + const CheckpointIngestInfoPtr & checkpoint_info, const Settings & settings); UInt64 onSyncGc(Int64, const DM::GCOptions &) override; From 0aa97ad2bfe7cf9db554f20c54c729e24bf4fecb Mon Sep 17 00:00:00 2001 From: jinhelin Date: Thu, 14 Dec 2023 20:06:19 +0800 Subject: [PATCH 7/8] Storages: fix segmentfault when `fetchPages` failed. (#8523) close pingcap/tiflash#8515 --- .../Storages/DeltaMerge/SegmentReadTask.cpp | 40 +++++++++++++++++-- .../src/Storages/DeltaMerge/SegmentReadTask.h | 1 + 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index 68a0ec7d832..20f1425a5ec 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -502,6 +502,18 @@ void SegmentReadTask::checkMemTableSet(const ColumnFileSetSnapshotPtr & mem_tabl } } +void SegmentReadTask::checkMemTableSetReady() const +{ + const auto & mem_table_snap = read_snapshot->delta->getMemTableSetSnapshot(); + for (auto & cf : mem_table_snap->getColumnFiles()) + { + if (auto * in_mem_cf = cf->tryToInMemoryFile(); in_mem_cf) + { + RUNTIME_CHECK_MSG(in_mem_cf->getCache() != nullptr, "Fail to fetch MemTableSet from {}", *this); + } + } +} + bool SegmentReadTask::needFetchMemTableSet() const { // Check if any object of ColumnFileInMemory does not contain data. @@ -543,12 +555,33 @@ void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest auto stream_resp = rpc.call(&client_context, request); RUNTIME_CHECK(stream_resp != nullptr); SCOPE_EXIT({ - // TODO: Not sure whether we really need this. Maybe RAII is already there? - stream_resp->Finish(); + // Most of the time, it will call `Finish()` and check the status of grpc when `Read()` return false. + // `Finish()` will be called here when exceptions thrown. + if (unlikely(stream_resp != nullptr)) + { + stream_resp->Finish(); + } }); doFetchPagesImpl( - [&stream_resp](disaggregated::PagesPacket & packet) { return stream_resp->Read(&packet); }, + [&stream_resp, this](disaggregated::PagesPacket & packet) { + if (stream_resp->Read(&packet)) + { + return true; + } + else + { + auto status = stream_resp->Finish(); + stream_resp.reset(); // Reset to avoid calling `Finish()` repeatedly. + RUNTIME_CHECK_MSG( + status.ok(), + "Failed to fetch all pages from {}, status={}, message={}", + *this, + static_cast(status.error_code()), + status.error_message()); + return false; + } + }, std::unordered_set(request.page_ids().begin(), request.page_ids().end())); } @@ -671,6 +704,7 @@ void SegmentReadTask::doFetchPagesImpl( wait_write_page_ns += sw_wait_write_page_finished.elapsed(); // Verify all pending pages are now received. + checkMemTableSetReady(); RUNTIME_CHECK_MSG( remaining_pages_to_fetch.empty(), "Failed to fetch all pages (from {}), remaining_pages_to_fetch={}", diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h index 1a55ae1e059..0924c775f33 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h @@ -133,6 +133,7 @@ struct SegmentReadTask std::unordered_set remaining_pages_to_fetch); void checkMemTableSet(const ColumnFileSetSnapshotPtr & mem_table_snap) const; bool needFetchMemTableSet() const; + void checkMemTableSetReady() const; void initColumnFileDataProvider(const Remote::RNLocalPageCacheGuardPtr & pages_guard); From e7a86ee7b09d317dcd030ade8d23bd7daa95b853 Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 15 Dec 2023 11:38:24 +0800 Subject: [PATCH 8/8] Update tsan suppression file to ignore false positive warnings Signed-off-by: yibin --- tests/sanitize/tsan.suppression | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/sanitize/tsan.suppression b/tests/sanitize/tsan.suppression index 29ac8020569..2c5b8be7843 100644 --- a/tests/sanitize/tsan.suppression +++ b/tests/sanitize/tsan.suppression @@ -1,6 +1,7 @@ race:dbms/src/Common/TiFlashMetrics.h race:DB::Context::setCancelTest race:DB::getCurrentExceptionMessage -race:google::protobuf::internal::AssignDescriptors +race:google::protobuf::Message::DebugString +race:google::protobuf::Message::ShortDebugString race:fiu_fail race:dbms/src/DataStreams/BlockStreamProfileInfo.h