diff --git a/contrib/googletest b/contrib/googletest index d175c8bf823..2fe3bd994b3 160000 --- a/contrib/googletest +++ b/contrib/googletest @@ -1 +1 @@ -Subproject commit d175c8bf823e709d570772b038757fadf63bc632 +Subproject commit 2fe3bd994b3189899d93f1d5a881e725e046fdc2 diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 718d18c4954..1cb1b8e84b8 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -57,6 +57,7 @@ include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake) add_headers_and_sources(clickhouse_common_io src/Common) add_headers_and_sources(clickhouse_common_io src/Common/HashTable) +add_headers_and_sources(clickhouse_common_io src/Common/SyncPoint) add_headers_and_sources(clickhouse_common_io src/IO) add_headers_and_sources(dbms src/Analyzers) @@ -262,6 +263,7 @@ if (ENABLE_TESTS) include (${TiFlash_SOURCE_DIR}/cmake/find_gtest.cmake) if (USE_INTERNAL_GTEST_LIBRARY) + set(INSTALL_GTEST OFF) # Google Test from sources add_subdirectory(${TiFlash_SOURCE_DIR}/contrib/googletest/googletest ${CMAKE_CURRENT_BINARY_DIR}/googletest) # avoid problems with diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 2cea1c02562..e321ea19118 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -101,8 +101,7 @@ std::unordered_map> FailPointHelper::f M(pause_when_writing_to_dt_store) \ M(pause_when_ingesting_to_dt_store) \ M(pause_when_altering_dt_store) \ - M(pause_after_copr_streams_acquired) \ - M(pause_before_server_merge_one_delta) + M(pause_after_copr_streams_acquired) namespace FailPoints { diff --git a/dbms/src/Common/SyncPoint/Ctl.cpp b/dbms/src/Common/SyncPoint/Ctl.cpp new file mode 100644 index 00000000000..a13ea936f0f --- /dev/null +++ b/dbms/src/Common/SyncPoint/Ctl.cpp @@ -0,0 +1,119 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +#ifdef FIU_ENABLE + +void SyncPointCtl::enable(const char * name) +{ + { + std::unique_lock lock(mu); + channels.try_emplace(name, + std::make_pair( + std::make_shared(), + std::make_shared())); + } + fiu_enable(name, 1, nullptr, 0); + LOG_FMT_DEBUG(getLogger(), "Enabled: {}", name); +} + +void SyncPointCtl::disable(const char * name) +{ + fiu_disable(name); + { + std::unique_lock lock(mu); + if (auto const & iter = channels.find(name); iter != channels.end()) + { + auto [first_ch, second_ch] = iter->second; + first_ch->close(); + second_ch->close(); + channels.erase(iter); + } + } + LOG_FMT_DEBUG(getLogger(), "Disabled: {}", name); +} + +std::pair SyncPointCtl::mustGetChannel(const char * name) +{ + std::unique_lock lock(mu); + if (auto iter = channels.find(name); iter == channels.end()) + { + throw Exception(fmt::format("SyncPoint {} is not enabled", name)); + } + else + { + return iter->second; + } +} + +void SyncPointCtl::waitAndPause(const char * name) +{ + auto ch = mustGetChannel(name).first; + LOG_FMT_DEBUG(getLogger(), "waitAndPause({}) waiting...", name); + auto result = ch->recv(); + LOG_FMT_DEBUG(getLogger(), "waitAndPause({}) {}", name, result ? "finished" : "cancelled"); +} + +void SyncPointCtl::next(const char * name) +{ + auto ch = mustGetChannel(name).second; + LOG_FMT_DEBUG(getLogger(), "next({}) trying...", name); + auto result = ch->send(); + LOG_FMT_DEBUG(getLogger(), "next({}) {}", name, result ? "done" : "cancelled"); +} + +void SyncPointCtl::sync(const char * name) +{ + auto [ch_1, ch_2] = mustGetChannel(name); + // Print a stack, which is helpful to know where undesired SYNC_FOR comes from. + LOG_FMT_DEBUG(getLogger(), "SYNC_FOR({}) trying... \n\n# Current Stack: {}", name, StackTrace().toString()); + auto result = ch_1->send(); + LOG_FMT_DEBUG(getLogger(), "SYNC_FOR({}) {}", name, // + result ? "matched waitAndPause(), paused until calling next()..." : "cancelled"); + if (!result) + return; + result = ch_2->recv(); + LOG_FMT_DEBUG(getLogger(), "SYNC_FOR({}) {}", name, result ? "done" : "cancelled"); +} + +#else + +void SyncPointCtl::enable(const char *) +{} + +void SyncPointCtl::disable(const char *) {} + +void SyncPointCtl::waitAndPause(const char *) {} + +void SyncPointCtl::next(const char *) {} + +void SyncPointCtl::sync(const char *) {} + +#endif + +SyncPointScopeGuard SyncPointCtl::enableInScope(const char * name) +{ + return SyncPointScopeGuard(name); +} + +} // namespace DB diff --git a/dbms/src/Common/SyncPoint/Ctl.h b/dbms/src/Common/SyncPoint/Ctl.h new file mode 100644 index 00000000000..594da3c102b --- /dev/null +++ b/dbms/src/Common/SyncPoint/Ctl.h @@ -0,0 +1,94 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include +#include +#include + +namespace DB +{ + +class SyncPointCtl +{ +public: + /** + * Enable the sync point. After enabling, when executed to the sync point defined with `SYNC_FOR()`, + * the execution will be suspended, until `waitAndPause()` or `waitAndNext()` is called + * somewhere (e.g. in tests). + */ + static void enable(const char * name); + + /** + * Disable the sync point. Existing suspends will be continued. + */ + static void disable(const char * name); + + /** + * Suspend the execution, until `waitAndPause()`, `next()` or `waitAndNext()` is called somewhere. + * You should not invoke this function directly. Invoke `SYNC_FOR()` instead. + */ + static void sync(const char * name); + + /** + * Wait for the sync point being executed. The code at the sync point will keep + * pausing until you call `next()`. + */ + static void waitAndPause(const char * name); + + /** + * Continue the execution after the specified sync point. + * You must first `waitAndPause()` for it, then `next()` it. + */ + static void next(const char * name); + + /** + * Wait for the sync point being executed. After that, continue the execution after the sync point. + */ + static void waitAndNext(const char * name) + { + waitAndPause(name); + next(name); + } + + /** + * Enable the sync point in the current scope. When scope exits, the sync point will be disabled. + * + * After enabling, when executed to the sync point defined with `SYNC_FOR()`, the execution + * will be suspended, until `waitAndPause()` or `waitAndNext()` is called somewhere (e.g. in tests). + */ + static SyncPointScopeGuard enableInScope(const char * name); + +private: + class SyncChannel; + using SyncChannelPtr = std::shared_ptr; + + static Poco::Logger * getLogger() + { + static Poco::Logger * logger = &Poco::Logger::get("SyncPointCtl"); + return logger; + } + + static std::pair mustGetChannel(const char * name); + + inline static std::unordered_map> + channels{}; + inline static std::mutex mu{}; +}; + +} // namespace DB diff --git a/dbms/src/Common/SyncPoint/ScopeGuard.cpp b/dbms/src/Common/SyncPoint/ScopeGuard.cpp new file mode 100644 index 00000000000..feb174c2c9f --- /dev/null +++ b/dbms/src/Common/SyncPoint/ScopeGuard.cpp @@ -0,0 +1,50 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +namespace DB +{ + +SyncPointScopeGuard::SyncPointScopeGuard(const char * name_) + : name(name_) +{ + SyncPointCtl::enable(name_); +} + +void SyncPointScopeGuard::disable() +{ + if (disabled) + return; + SyncPointCtl::disable(name.c_str()); + disabled = true; +} + +void SyncPointScopeGuard::waitAndPause() +{ + SyncPointCtl::waitAndPause(name.c_str()); +} + +void SyncPointScopeGuard::next() +{ + SyncPointCtl::next(name.c_str()); +} + +void SyncPointScopeGuard::waitAndNext() +{ + SyncPointCtl::waitAndNext(name.c_str()); +} + +} // namespace DB diff --git a/dbms/src/Common/SyncPoint/ScopeGuard.h b/dbms/src/Common/SyncPoint/ScopeGuard.h new file mode 100644 index 00000000000..c070365d380 --- /dev/null +++ b/dbms/src/Common/SyncPoint/ScopeGuard.h @@ -0,0 +1,60 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB +{ + +class SyncPointScopeGuard +{ +public: + explicit SyncPointScopeGuard(const char * name_); + + ~SyncPointScopeGuard() + { + disable(); + } + + /** + * Disable this sync point beforehand, instead of at the moment when + * this scope guard is destructed. + */ + void disable(); + + /** + * Wait for the sync point being executed. The code at the sync point will keep + * pausing until you call `next()`. + */ + void waitAndPause(); + + /** + * Continue the execution after the specified sync point. + * You must first `waitAndPause()` for it, then `next()` it. + */ + void next(); + + /** + * Wait for the sync point being executed. After that, continue the execution after the sync point. + */ + void waitAndNext(); + +private: + std::string name; + bool disabled = false; +}; + +} // namespace DB diff --git a/dbms/src/Common/SyncPoint/SyncChannel.h b/dbms/src/Common/SyncPoint/SyncChannel.h new file mode 100644 index 00000000000..dc4e2cce145 --- /dev/null +++ b/dbms/src/Common/SyncPoint/SyncChannel.h @@ -0,0 +1,117 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include + +namespace DB +{ + +class SyncPointCtl::SyncChannel +{ +public: + /** + * Copy and move are disallowed. A single SyncChannel instance can be shared for multiple threads. + */ + DISALLOW_COPY_AND_MOVE(SyncChannel); + + explicit SyncChannel() = default; + + ~SyncChannel() + { + close(); + // It is possible that there are `recv()` or `send()` running or blocked. + // They should exit when receiving the close signal from `cv`. + // Let's simply wait them to finish. This ensures that memory is always released after + // no existing function is running anymore. + while (pending_op > 0) {} + } + + void close() + { + pending_op++; + { + std::lock_guard lock_cv(m_cv); + is_closing = true; + cv.notify_all(); + } + pending_op--; + } + + /** + * Blocked until one send() is called, or channel is closed. + */ + bool recv() + { + pending_op++; + // wrap a scope for locks to ensure no more access to the member after pending_op-- + auto is_wait_fulfilled = [this]() { + std::unique_lock lock_recv(m_recv); + std::unique_lock lock_cv(m_cv); + has_receiver = true; + cv.notify_all(); + cv.wait(lock_cv, [this] { + return has_data || is_closing; + }); + if (is_closing) + return false; + has_data = false; // consumes one data + has_receiver = false; + return true; + }(); + pending_op--; + return is_wait_fulfilled; + } + + /** + * Blocked until there is a receiver, or channel is closed. + * Queued if multiple send() is called concurrently. + */ + bool send() + { + pending_op++; + auto is_wait_fulfilled = [this]() { + std::unique_lock lock_send(m_send); + std::unique_lock lock_cv(m_cv); + cv.wait(lock_cv, [this] { + return (has_receiver && !has_data) || is_closing; + }); + if (is_closing) + return false; + has_data = true; + cv.notify_all(); + return true; + }(); + pending_op--; + return is_wait_fulfilled; + } + +private: + bool has_receiver = false; + bool has_data = false; + bool is_closing = false; + + std::atomic pending_op = 0; + + std::mutex m_send; + std::mutex m_recv; + std::mutex m_cv; + std::condition_variable cv; +}; + +} // namespace DB diff --git a/dbms/src/Common/SyncPoint/SyncPoint.h b/dbms/src/Common/SyncPoint/SyncPoint.h new file mode 100644 index 00000000000..6ebe3b301e3 --- /dev/null +++ b/dbms/src/Common/SyncPoint/SyncPoint.h @@ -0,0 +1,36 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +// Expose publicly +#include +#include +// ======= + +#include + +namespace DB +{ + +/** + * Suspend the execution (when enabled), until `SyncPointCtl::waitAndPause()`, + * `SyncPointCtl::next()` or `SyncPointCtl::waitAndNext()` is called somewhere + * (e.g. in tests). + * + * Usually this is invoked in actual business logics. + */ +#define SYNC_FOR(name) fiu_do_on(name, SyncPointCtl::sync(name);) + +} // namespace DB diff --git a/dbms/src/Flash/Management/ManualCompact.cpp b/dbms/src/Flash/Management/ManualCompact.cpp index 54373fe4c79..2143be88cc3 100644 --- a/dbms/src/Flash/Management/ManualCompact.cpp +++ b/dbms/src/Flash/Management/ManualCompact.cpp @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include #include @@ -25,10 +24,6 @@ namespace DB { -namespace FailPoints -{ -extern const char pause_before_server_merge_one_delta[]; -} // namespace FailPoints namespace Management { @@ -172,7 +167,6 @@ grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * requ // Repeatedly merge multiple segments as much as possible. while (true) { - FAIL_POINT_PAUSE(FailPoints::pause_before_server_merge_one_delta); auto compacted_range = dm_storage->mergeDeltaBySegment(global_context, start_key, DM::DeltaMergeStore::TaskRunThread::ForegroundRPC); if (compacted_range == std::nullopt) diff --git a/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp b/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp index 922ced2470c..038619f033b 100644 --- a/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp +++ b/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include #include #include #include @@ -31,10 +31,6 @@ namespace DB { -namespace FailPoints -{ -extern const char pause_before_server_merge_one_delta[]; -} // namespace FailPoints namespace tests { @@ -341,18 +337,12 @@ CATCH TEST_P(BasicManualCompactTest, DuplicatedLogicalId) try { - using namespace std::chrono_literals; - - FailPointHelper::enableFailPoint(FailPoints::pause_before_server_merge_one_delta); - - auto thread_1_is_ready = std::promise(); - std::thread t_req1([&]() { - // req1 + auto sp_req1_merge_delta = SyncPointCtl::enableInScope("before_DeltaMergeStore::mergeDeltaBySegment"); + auto req1 = std::async([&]() { auto request = ::kvrpcpb::CompactRequest(); request.set_physical_table_id(TABLE_ID); request.set_logical_table_id(2); auto response = ::kvrpcpb::CompactResponse(); - thread_1_is_ready.set_value(); auto status_code = manager->handleRequest(&request, &response); ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); ASSERT_FALSE(response.has_error()); @@ -361,13 +351,12 @@ try helper->verifyExpectedRowsForAllSegments(); }); - { - // send req1, wait request being processed. - thread_1_is_ready.get_future().wait(); - std::this_thread::sleep_for(500ms); // TODO: Maybe better to use sync_channel to avoid hardcoded wait duration. + sp_req1_merge_delta.waitAndPause(); - // req2: Now let's send another request with the same logical id. - // Although worker pool size is 1, this request will be returned immediately, but with an error. + // req2: Another request with the same logical id. + // Although worker pool size is 1, this request will be returned immediately with an error, + // because there is already same logic id working in progress. + { auto request = ::kvrpcpb::CompactRequest(); request.set_physical_table_id(TABLE_ID); request.set_logical_table_id(2); @@ -379,9 +368,9 @@ try helper->verifyExpectedRowsForAllSegments(); } - // Now let's continue req1's work - FailPointHelper::disableFailPoint(FailPoints::pause_before_server_merge_one_delta); - t_req1.join(); + // Proceed the execution of req1. Everything should work normally. + sp_req1_merge_delta.next(); + req1.wait(); } CATCH diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.cpp index 53c2e901a5a..373efa10445 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -61,6 +62,8 @@ DeltaIndex::Updates ColumnFileFlushTask::prepare(WriteBatches & wbs) bool ColumnFileFlushTask::commit(ColumnFilePersistedSetPtr & persisted_file_set, WriteBatches & wbs) { + SYNC_FOR("before_ColumnFileFlushTask::commit"); + if (!persisted_file_set->checkAndIncreaseFlushVersion(flush_version)) return false; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 670de73f551..5473e4ad7c8 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -1046,6 +1047,8 @@ void DeltaMergeStore::mergeDeltaAll(const Context & context) std::optional DeltaMergeStore::mergeDeltaBySegment(const Context & context, const RowKeyValue & start_key, const TaskRunThread run_thread) { + SYNC_FOR("before_DeltaMergeStore::mergeDeltaBySegment"); + updateGCSafePoint(); auto dm_context = newDMContext(context, context.getSettingsRef(), /*tracing_id*/ fmt::format("mergeDeltaBySegment_{}", latest_gc_safe_point.load(std::memory_order_relaxed))); @@ -1081,6 +1084,8 @@ std::optional DeltaMergeStore::mergeDeltaBySegment(const Contex } // else: sleep and retry } // else: sleep and retry + SYNC_FOR("before_DeltaMergeStore::mergeDeltaBySegment|retry_segment"); + // Typical cases: // #1. flushCache failed // - The segment is abandoned (due to segment updated) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 8398fdcee40..1536508ef68 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -865,6 +866,8 @@ std::optional Segment::prepareSplit(DMContext & dm_context, const SegmentSnapshotPtr & segment_snap, WriteBatches & wbs) const { + SYNC_FOR("before_Segment::prepareSplit"); + if (!dm_context.enable_logical_split // || segment_snap->stable->getPacks() <= 3 // || segment_snap->delta->getRows() > segment_snap->stable->getRows()) diff --git a/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h b/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h index bc6b7d5c3e6..9343e78def3 100644 --- a/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h +++ b/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h @@ -71,13 +71,33 @@ class MultiSegmentTestUtil : private boost::noncopyable FailPointHelper::disableFailPoint(FailPoints::skip_check_segment_update); } + void resetExpectedRows() + { + auto * log = &Poco::Logger::get(tracing_id); + + rows_by_segments.clear(); + expected_stable_rows.clear(); + expected_delta_rows.clear(); + + std::shared_lock lock(store->read_write_mutex); + auto segment_idx = 0; + for (auto & [_key, seg] : store->segments) + { + UNUSED(_key); + LOG_FMT_INFO(log, "Segment #{}: Range = {}", segment_idx, seg->getRowKeyRange().toDebugString()); + rows_by_segments[segment_idx] = seg->getStable()->getRows(); + expected_stable_rows[segment_idx] = seg->getStable()->getRows(); + expected_delta_rows[segment_idx] = seg->getDelta()->getRows(); + segment_idx++; + } + } + /// Prepare segments * 4. The rows of each segment will be roughly close to n_avg_rows_per_segment. /// The exact rows will be recorded in rows_by_segments. void prepareSegments(DeltaMergeStorePtr store_, size_t n_avg_rows_per_segment, DMTestEnv::PkType pk_type) { store = store_; - auto * log = &Poco::Logger::get(tracing_id); auto dm_context = store->newDMContext(db_context, db_context.getSettingsRef(), /*tracing_id*/ tracing_id); { // Write [0, 4*N) data with tso=2. @@ -105,18 +125,16 @@ class MultiSegmentTestUtil : private boost::noncopyable { std::shared_lock lock(store->read_write_mutex); // Now we have 4 segments. + resetExpectedRows(); + ASSERT_EQ(rows_by_segments.size(), 4); + + // Verify our expectations. auto total_stable_rows = 0; - auto segment_idx = 0; - for (auto & [_key, seg] : store->segments) + for (size_t i = 0; i < rows_by_segments.size(); i++) { - LOG_FMT_INFO(log, "Segment #{}: Range = {}", segment_idx, seg->getRowKeyRange().toDebugString()); - ASSERT_EQ(seg->getDelta()->getRows(), 0); - ASSERT_GT(seg->getStable()->getRows(), 0); // We don't check the exact rows of each segment. - total_stable_rows += seg->getStable()->getRows(); - rows_by_segments[segment_idx] = seg->getStable()->getRows(); - expected_stable_rows[segment_idx] = seg->getStable()->getRows(); - expected_delta_rows[segment_idx] = seg->getDelta()->getRows(); // = 0 - segment_idx++; + ASSERT_EQ(expected_delta_rows[i], 0); + ASSERT_GT(expected_stable_rows[i], 0); // We don't check the exact rows of each segment. + total_stable_rows += expected_stable_rows[i]; } ASSERT_EQ(total_stable_rows, 4 * n_avg_rows_per_segment); } @@ -144,7 +162,7 @@ class MultiSegmentTestUtil : private boost::noncopyable void verifyExpectedRowsForAllSegments() { std::shared_lock lock(store->read_write_mutex); - ASSERT_EQ(store->segments.size(), 4); + ASSERT_EQ(store->segments.size(), expected_delta_rows.size()); auto segment_idx = 0; for (auto & [_key, seg] : store->segments) { @@ -157,4 +175,4 @@ class MultiSegmentTestUtil : private boost::noncopyable } // namespace tests } // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index caa18661a37..d3e2b2352b3 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -33,6 +34,7 @@ #include #include +#include #include #include @@ -3622,6 +3624,123 @@ try CATCH +// There is another flush cache executing for the same segment. +TEST_P(DeltaMergeStoreMergeDeltaBySegmentTest, RetryByFlushCache) +try +{ + { + // Write new data to segment[1] without flush. + auto newly_written_rows = helper->rows_by_segments[1]; + Block block = DMTestEnv::prepareSimpleWriteBlock(helper->rows_by_segments[0], helper->rows_by_segments[0] + newly_written_rows, false, pk_type, 10 /* new tso */); + store->write(*db_context, db_context->getSettingsRef(), block); + helper->expected_delta_rows[1] += helper->rows_by_segments[1]; + helper->verifyExpectedRowsForAllSegments(); + } + + auto sp_flush_commit = SyncPointCtl::enableInScope("before_ColumnFileFlushTask::commit"); + auto sp_merge_delta_retry = SyncPointCtl::enableInScope("before_DeltaMergeStore::mergeDeltaBySegment|retry_segment"); + + // Start a flush and suspend it before flushCommit. + auto th_flush = std::async([&]() { + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef(), "test"); + auto segment1 = std::next(store->segments.begin())->second; + auto result = segment1->flushCache(*dm_context); + ASSERT_TRUE(result); + ASSERT_EQ(segment1->getDelta()->getUnsavedRows(), 0); + // There should be still rows in the delta layer. + ASSERT_GT(segment1->getDelta()->getRows(), 0); + helper->verifyExpectedRowsForAllSegments(); + }); + sp_flush_commit.waitAndPause(); + + // Start a mergeDelta. It should hit retry immediately due to a flush is in progress. + auto th_merge_delta = std::async([&]() { + auto segment1 = std::next(store->segments.begin())->second; + auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground); + ASSERT_NE(result, std::nullopt); + // All rows in the delta layer should be merged into the stable layer. + helper->expected_stable_rows[1] += helper->expected_delta_rows[1]; + helper->expected_delta_rows[1] = 0; + helper->verifyExpectedRowsForAllSegments(); + }); + sp_merge_delta_retry.waitAndPause(); + + // Let's finish the flush. + sp_flush_commit.next(); + th_flush.wait(); + + // Proceed the mergeDelta retry. Retry should succeed without triggering any new flush. + sp_merge_delta_retry.next(); + th_merge_delta.wait(); +} +CATCH + + +// The segment is splitted during the execution. +TEST_P(DeltaMergeStoreMergeDeltaBySegmentTest, RetryBySplit) +try +{ + auto sp_split_prepare = SyncPointCtl::enableInScope("before_Segment::prepareSplit"); + auto sp_merge_delta_retry = SyncPointCtl::enableInScope("before_DeltaMergeStore::mergeDeltaBySegment|retry_segment"); + + // Start a split and suspend it during prepareSplit to simulate a long-running split. + auto th_split = std::async([&] { + auto old_rows_by_segments = helper->rows_by_segments; + ASSERT_EQ(4, old_rows_by_segments.size()); + + // Split segment1 into 2. + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef(), "test"); + auto segment1 = std::next(store->segments.begin())->second; + auto result = store->segmentSplit(*dm_context, segment1, /*is_foreground*/ true); + ASSERT_NE(result.second, nullptr); + + helper->resetExpectedRows(); + ASSERT_EQ(5, helper->rows_by_segments.size()); + ASSERT_EQ(old_rows_by_segments[0], helper->rows_by_segments[0]); + ASSERT_EQ(old_rows_by_segments[1], helper->rows_by_segments[1] + helper->rows_by_segments[2]); + ASSERT_EQ(old_rows_by_segments[2], helper->rows_by_segments[3]); + ASSERT_EQ(old_rows_by_segments[3], helper->rows_by_segments[4]); + }); + sp_split_prepare.waitAndPause(); + + // Start a mergeDelta. As there is a split in progress, we would expect several retries. + auto th_merge_delta = std::async([&] { + // mergeDeltaBySegment for segment1 + auto segment1 = std::next(store->segments.begin())->second; + auto result = store->mergeDeltaBySegment(*db_context, segment1->getRowKeyRange().start, DeltaMergeStore::TaskRunThread::Foreground); + ASSERT_NE(result, std::nullopt); + + // Although original segment1 has been split into 2, we still expect only segment1's delta + // was merged. + ASSERT_EQ(5, helper->rows_by_segments.size()); + helper->expected_stable_rows[1] += helper->expected_delta_rows[1]; + helper->expected_delta_rows[1] = 0; + helper->verifyExpectedRowsForAllSegments(); + }); + sp_merge_delta_retry.waitAndNext(); + sp_merge_delta_retry.waitAndNext(); + sp_merge_delta_retry.waitAndPause(); + + // Proceed and finish the split. + sp_split_prepare.next(); + th_split.wait(); + { + // Write to the new segment1 + segment2 after split. + auto newly_written_rows = helper->rows_by_segments[1] + helper->rows_by_segments[2]; + Block block = DMTestEnv::prepareSimpleWriteBlock(helper->rows_by_segments[0], helper->rows_by_segments[0] + newly_written_rows, false, pk_type, 10 /* new tso */); + store->write(*db_context, db_context->getSettingsRef(), block); + helper->expected_delta_rows[1] += helper->rows_by_segments[1]; + helper->expected_delta_rows[2] += helper->rows_by_segments[2]; + helper->verifyExpectedRowsForAllSegments(); + } + + // This time the retry should succeed without any future retries. + sp_merge_delta_retry.next(); + th_merge_delta.wait(); +} +CATCH + + } // namespace tests } // namespace DM } // namespace DB diff --git a/dbms/src/TestUtils/TiFlashTestBasic.h b/dbms/src/TestUtils/TiFlashTestBasic.h index b22c2ddcf96..91c2cc1d061 100644 --- a/dbms/src/TestUtils/TiFlashTestBasic.h +++ b/dbms/src/TestUtils/TiFlashTestBasic.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -50,23 +51,26 @@ namespace DB { namespace tests { -#define CATCH \ - catch (const DB::tests::TiFlashTestException & e) \ - { \ - std::string text = e.displayText(); \ - text += "\n\n"; \ - if (text.find("Stack trace") == std::string::npos) \ - text += fmt::format("Stack trace:\n{}\n", e.getStackTrace().toString()); \ - FAIL() << text; \ - } \ - catch (const DB::Exception & e) \ - { \ - std::string text = e.displayText(); \ - fmt::print(stderr, "Code: {}. {}\n\n", e.code(), text); \ - auto embedded_stack_trace_pos = text.find("Stack trace"); \ - if (std::string::npos == embedded_stack_trace_pos) \ - fmt::print(stderr, "Stack trace:\n{}\n", e.getStackTrace().toString()); \ - throw; \ +#define CATCH \ + catch (const ::DB::tests::TiFlashTestException & e) \ + { \ + std::string text = e.displayText(); \ + text += "\n\n"; \ + if (text.find("Stack trace") == std::string::npos) \ + text += fmt::format("Stack trace:\n{}\n", e.getStackTrace().toString()); \ + FAIL() << text; \ + } \ + catch (const ::DB::Exception & e) \ + { \ + std::string text = fmt::format("Code: {}. {}\n\n", e.code(), e.displayText()); \ + if (text.find("Stack trace") == std::string::npos) \ + text += fmt::format("Stack trace:\n{}\n", e.getStackTrace().toString()); \ + FAIL() << text; \ + } \ + catch (...) \ + { \ + ::DB::tryLogCurrentException(__PRETTY_FUNCTION__); \ + FAIL(); \ } /// helper functions for comparing DataType diff --git a/dbms/src/TestUtils/gtests_dbms_main.cpp b/dbms/src/TestUtils/gtests_dbms_main.cpp index 26c456b5b31..8f9a618de7d 100644 --- a/dbms/src/TestUtils/gtests_dbms_main.cpp +++ b/dbms/src/TestUtils/gtests_dbms_main.cpp @@ -14,15 +14,47 @@ #include #include +#include +#include namespace DB::FailPoints { extern const char force_set_dtfile_exist_when_acquire_id[]; } // namespace DB::FailPoints +void fault_signal_handler(int signum) +{ + ::signal(signum, SIG_DFL); + std::cerr << "Received signal " << strsignal(signum) << std::endl; + std::cerr << StackTrace().toString() << std::endl; + ::raise(signum); +} + +void install_fault_signal_handlers(std::initializer_list signums) +{ + for (auto signum : signums) + { + ::signal(signum, fault_signal_handler); + } +} + +class ThrowListener : public testing::EmptyTestEventListener +{ + void OnTestPartResult(const testing::TestPartResult & result) override + { + if (result.type() == testing::TestPartResult::kFatalFailure) + { + throw ::testing::AssertionException(result); + } + } +}; + + // TODO: Optmize set-up & tear-down process which may cost more than 2s. It's NOT friendly for gtest_parallel. int main(int argc, char ** argv) { + install_fault_signal_handlers({SIGSEGV, SIGILL, SIGFPE, SIGABRT, SIGTERM}); + DB::tests::TiFlashTestEnv::setupLogger(); DB::tests::TiFlashTestEnv::initializeGlobalContext(); @@ -33,6 +65,8 @@ int main(int argc, char ** argv) #endif ::testing::InitGoogleTest(&argc, argv); + ::testing::UnitTest::GetInstance()->listeners().Append(new ThrowListener); + auto ret = RUN_ALL_TESTS(); DB::tests::TiFlashTestEnv::shutdown();