diff --git a/.clang-tidy b/.clang-tidy old mode 100755 new mode 100644 index 99a845d8633..cde27d68c8a --- a/.clang-tidy +++ b/.clang-tidy @@ -18,9 +18,11 @@ Checks: '-*, modernize-redundant-void-arg, modernize-replace-auto-ptr, modernize-replace-random-shuffle, + modernize-use-auto, modernize-use-bool-literals, modernize-use-nullptr, modernize-use-using, + modernize-use-override, modernize-use-equals-default, modernize-use-equals-delete, @@ -160,6 +162,11 @@ Checks: '-*, clang-analyzer-unix.cstring.NullArg, boost-use-to-string, + + cppcoreguidelines-pro-type-cstyle-cast, + cppcoreguidelines-pro-type-member-init, + cppcoreguidelines-no-malloc, + cppcoreguidelines-virtual-class-destructor, ' WarningsAsErrors: '*' diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 602a86642fb..ae6e6308055 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -68,6 +68,7 @@ std::unordered_map> FailPointHelper::f M(exception_after_drop_segment) #define APPLY_FOR_FAILPOINTS(M) \ + M(skip_check_segment_update) \ M(force_set_page_file_write_errno) \ M(force_split_io_size_4k) \ M(minimum_block_size_for_cross_join) \ @@ -99,7 +100,8 @@ std::unordered_map> FailPointHelper::f M(pause_when_writing_to_dt_store) \ M(pause_when_ingesting_to_dt_store) \ M(pause_when_altering_dt_store) \ - M(pause_after_copr_streams_acquired) + M(pause_after_copr_streams_acquired) \ + M(pause_before_server_merge_one_delta) namespace FailPoints { diff --git a/dbms/src/Flash/Management/ManualCompact.cpp b/dbms/src/Flash/Management/ManualCompact.cpp index 64e9080d66b..d6871266e8f 100644 --- a/dbms/src/Flash/Management/ManualCompact.cpp +++ b/dbms/src/Flash/Management/ManualCompact.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -24,6 +25,10 @@ namespace DB { +namespace FailPoints +{ +extern const char pause_before_server_merge_one_delta[]; +} // namespace FailPoints namespace Management { @@ -153,7 +158,9 @@ grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * requ // Repeatedly merge multiple segments as much as possible. while (true) { + FAIL_POINT_PAUSE(FailPoints::pause_before_server_merge_one_delta); auto compacted_range = dm_storage->mergeDeltaBySegment(db_context, start_key); + if (compacted_range == std::nullopt) { // Segment not found according to current start key diff --git a/dbms/src/Flash/Management/ManualCompact.h b/dbms/src/Flash/Management/ManualCompact.h index 72b6bd02c8d..aae904c4044 100644 --- a/dbms/src/Flash/Management/ManualCompact.h +++ b/dbms/src/Flash/Management/ManualCompact.h @@ -37,7 +37,10 @@ namespace DB namespace Management { -/// Serves manual compact requests. +/// Serves manual compact requests. Notice that the "compact" term here has different meanings compared to +/// the word "compact" in the DeltaMerge Store. The "compact request" here refer to the "delta merge" process +/// (major compact), while the "compact" in delta merge store refers to the "compact delta layer" process +/// (minor compact). /// This class is thread safe. Every public member function can be called without synchronization. class ManualCompactManager : private boost::noncopyable { @@ -74,9 +77,9 @@ class ManualCompactManager : private boost::noncopyable /// When there is a task containing the same logical_table running, /// the task will be rejected. - std::set unsync_active_logical_table_ids; + std::set unsync_active_logical_table_ids = {}; - size_t unsync_running_or_pending_tasks; + size_t unsync_running_or_pending_tasks = 0; // == Accessing members above must be synchronized == diff --git a/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp b/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp index 76f7a894ef9..8c07a849b99 100644 --- a/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp +++ b/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include -#include #include #include #include @@ -26,9 +26,16 @@ #include #include +#include +#include namespace DB { +namespace FailPoints +{ +extern const char pause_before_server_merge_one_delta[]; +} // namespace FailPoints + namespace tests { @@ -50,17 +57,14 @@ class BasicManualCompactTest manager = std::make_unique(*db_context); + setupStorage(); + // In tests let's only compact one segment. db_context->setSetting("manual_compact_more_until_ms", UInt64(0)); - helper = std::make_unique(*db_context); - helper->setSettings(50); - - setupStorage(); - // Split into 4 segments, and prepare some delta data for first 3 segments. + helper = std::make_unique(*db_context); helper->prepareSegments(storage->getAndMaybeInitStore(), 50, pk_type); - prepareDataForFirstThreeSegments(); } CATCH @@ -131,7 +135,7 @@ try { auto request = ::kvrpcpb::CompactRequest(); auto response = ::kvrpcpb::CompactResponse(); - auto status_code = manager->doWork(&request, &response); + auto status_code = manager->handleRequest(&request, &response); ASSERT_EQ(status_code.error_code(), grpc::StatusCode::INVALID_ARGUMENT); } CATCH @@ -143,7 +147,7 @@ try auto request = ::kvrpcpb::CompactRequest(); request.set_physical_table_id(9999); auto response = ::kvrpcpb::CompactResponse(); - auto status_code = manager->doWork(&request, &response); + auto status_code = manager->handleRequest(&request, &response); ASSERT_EQ(status_code.error_code(), grpc::StatusCode::INVALID_ARGUMENT); } CATCH @@ -156,7 +160,7 @@ try request.set_physical_table_id(TABLE_ID); request.set_start_key("abcd"); auto response = ::kvrpcpb::CompactResponse(); - auto status_code = manager->doWork(&request, &response); + auto status_code = manager->handleRequest(&request, &response); ASSERT_EQ(status_code.error_code(), grpc::StatusCode::INVALID_ARGUMENT); } CATCH @@ -169,7 +173,7 @@ try auto request = ::kvrpcpb::CompactRequest(); request.set_physical_table_id(TABLE_ID); auto response = ::kvrpcpb::CompactResponse(); - auto status_code = manager->doWork(&request, &response); + auto status_code = manager->handleRequest(&request, &response); ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); ASSERT_FALSE(response.has_error()); @@ -188,7 +192,7 @@ try request.set_physical_table_id(TABLE_ID); request.set_start_key(""); auto response = ::kvrpcpb::CompactResponse(); - auto status_code = manager->doWork(&request, &response); + auto status_code = manager->handleRequest(&request, &response); ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); ASSERT_FALSE(response.has_error()); @@ -216,7 +220,7 @@ try request.set_start_key(wb.releaseStr()); } auto response = ::kvrpcpb::CompactResponse(); - auto status_code = manager->doWork(&request, &response); + auto status_code = manager->handleRequest(&request, &response); ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); ASSERT_FALSE(response.has_error()); @@ -236,7 +240,7 @@ try auto request = ::kvrpcpb::CompactRequest(); request.set_physical_table_id(TABLE_ID); response = ::kvrpcpb::CompactResponse(); - auto status_code = manager->doWork(&request, &response); + auto status_code = manager->handleRequest(&request, &response); ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); ASSERT_FALSE(response.has_error()); @@ -250,7 +254,7 @@ try request.set_physical_table_id(TABLE_ID); request.set_start_key(response.compacted_end_key()); response = ::kvrpcpb::CompactResponse(); - auto status_code = manager->doWork(&request, &response); + auto status_code = manager->handleRequest(&request, &response); ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); ASSERT_FALSE(response.has_error()); @@ -270,7 +274,7 @@ try auto request = ::kvrpcpb::CompactRequest(); request.set_physical_table_id(TABLE_ID); auto response = ::kvrpcpb::CompactResponse(); - auto status_code = manager->doWork(&request, &response); + auto status_code = manager->handleRequest(&request, &response); ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); ASSERT_FALSE(response.has_error()); @@ -289,6 +293,47 @@ CATCH TEST_P(BasicManualCompactTest, DuplicatedLogicalId) try { + using namespace std::chrono_literals; + + FailPointHelper::enableFailPoint(FailPoints::pause_before_server_merge_one_delta); + + auto thread_1_is_ready = std::promise(); + std::thread t_req1([&]() { + // req1 + auto request = ::kvrpcpb::CompactRequest(); + request.set_physical_table_id(TABLE_ID); + request.set_logical_table_id(2); + auto response = ::kvrpcpb::CompactResponse(); + thread_1_is_ready.set_value(); + auto status_code = manager->handleRequest(&request, &response); + ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); + ASSERT_FALSE(response.has_error()); + helper->expected_stable_rows[0] += helper->expected_delta_rows[0]; + helper->expected_delta_rows[0] = 0; + helper->verifyExpectedRowsForAllSegments(); + }); + + { + // send req1, wait request being processed. + thread_1_is_ready.get_future().wait(); + std::this_thread::sleep_for(500ms); // TODO: Maybe better to use sync_channel to avoid hardcoded wait duration. + + // req2: Now let's send another request with the same logical id. + // Although worker pool size is 1, this request will be returned immediately, but with an error. + auto request = ::kvrpcpb::CompactRequest(); + request.set_physical_table_id(TABLE_ID); + request.set_logical_table_id(2); + auto response = ::kvrpcpb::CompactResponse(); + auto status_code = manager->handleRequest(&request, &response); + ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); + ASSERT_TRUE(response.has_error()); + ASSERT_TRUE(response.error().has_err_compact_in_progress()); + helper->verifyExpectedRowsForAllSegments(); + } + + // Now let's continue req1's work + FailPointHelper::disableFailPoint(FailPoints::pause_before_server_merge_one_delta); + t_req1.join(); } CATCH diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 5aaf7b9ee8e..b722d73feae 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -12,13 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include #include #include #include #include #include -#include #include #include #include @@ -31,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -84,6 +81,7 @@ extern const int LOGICAL_ERROR; namespace FailPoints { +extern const char skip_check_segment_update[]; extern const char pause_before_dt_background_delta_merge[]; extern const char pause_until_dt_background_delta_merge[]; extern const char pause_when_writing_to_dt_store[]; @@ -1295,6 +1293,8 @@ void DeltaMergeStore::waitForDeleteRange(const DB::DM::DMContextPtr &, const DB: void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const SegmentPtr & segment, ThreadType thread_type) { + fiu_do_on(FailPoints::skip_check_segment_update, { return; }); + if (segment->hasAbandoned()) return; const auto & delta = segment->getDelta(); diff --git a/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h b/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h index f1f24705d79..bc6b7d5c3e6 100644 --- a/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h +++ b/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h @@ -34,6 +34,11 @@ namespace DB { +namespace FailPoints +{ +extern const char skip_check_segment_update[]; +} // namespace FailPoints + namespace DM { namespace tests @@ -54,18 +59,16 @@ class MultiSegmentTestUtil : private boost::noncopyable std::map expected_stable_rows; std::map expected_delta_rows; - MultiSegmentTestUtil(Context & db_context_) + explicit MultiSegmentTestUtil(Context & db_context_) : tracing_id(DB::base::TiFlashStorageTestBasic::getCurrentFullTestName()) , db_context(db_context_) - {} + { + FailPointHelper::enableFailPoint(FailPoints::skip_check_segment_update); + } - /// Update context settings to keep multiple segments stable. - void setSettings(size_t rows_per_segment) + ~MultiSegmentTestUtil() { - // Avoid bg merge. - // TODO (wenxuan): Seems to be not very stable. - db_context.setSetting("dt_bg_gc_max_segments_to_check_every_round", UInt64(0)); - db_context.setSetting("dt_segment_limit_rows", UInt64(rows_per_segment)); + FailPointHelper::disableFailPoint(FailPoints::skip_check_segment_update); } /// Prepare segments * 4. The rows of each segment will be roughly close to n_avg_rows_per_segment. diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 5b45117ae0f..a6b94c9caa5 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -3376,12 +3376,10 @@ class DeltaMergeStoreMergeDeltaBySegmentTest setStorageFormat(ps_ver); TiFlashStorageTestBasic::SetUp(); - helper = std::make_unique(*db_context); - helper->setSettings(50); - setupDMStore(); // Split into 4 segments. + helper = std::make_unique(*db_context); helper->prepareSegments(store, 50, pk_type); } CATCH diff --git a/dbms/src/Storages/Page/PageUtil.h b/dbms/src/Storages/Page/PageUtil.h index 7edae303be7..9dc2936e22e 100644 --- a/dbms/src/Storages/Page/PageUtil.h +++ b/dbms/src/Storages/Page/PageUtil.h @@ -77,6 +77,7 @@ extern const int FILE_SIZE_NOT_MATCH; namespace FailPoints { +extern const char skip_check_segment_update[]; extern const char force_set_page_file_write_errno[]; extern const char force_split_io_size_4k[]; } // namespace FailPoints