From b503ed00103e3ad15f158cbadde8ec62f50be622 Mon Sep 17 00:00:00 2001 From: Wish Date: Fri, 13 May 2022 21:33:17 +0800 Subject: [PATCH] refactor tests to remove the hack Signed-off-by: Wish --- .../Management/tests/gtest_manual_compact.cpp | 238 ++++++++++++------ ...mentTestUtils.h => MultiSegmentTestUtil.h} | 92 +++---- .../DeltaMerge/tests/dm_basic_include.h | 144 +++++++++-- .../tests/gtest_dm_delta_merge_store.cpp | 127 ++++++---- 4 files changed, 412 insertions(+), 189 deletions(-) rename dbms/src/Storages/DeltaMerge/tests/{MultiSegmentTestUtils.h => MultiSegmentTestUtil.h} (68%) diff --git a/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp b/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp index bcfef2d5720..76f7a894ef9 100644 --- a/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp +++ b/dbms/src/Flash/Management/tests/gtest_manual_compact.cpp @@ -17,10 +17,12 @@ #include #include #include -#include +#include +#include #include #include #include +#include #include #include @@ -31,69 +33,82 @@ namespace tests { -// Test for different kind of handles (Int / Common). class BasicManualCompactTest - : public DB::DM::tests::MultiSegmentTest - , public testing::WithParamInterface + : public DB::base::TiFlashStorageTestBasic + , public testing::WithParamInterface { public: + static constexpr TableID TABLE_ID = 5; + void SetUp() override { try { log = &Poco::Logger::get(DB::base::TiFlashStorageTestBasic::getCurrentFullTestName()); - is_common_handle = GetParam(); + pk_type = GetParam(); TiFlashStorageTestBasic::SetUp(); + manager = std::make_unique(*db_context); + // In tests let's only compact one segment. db_context->setSetting("manual_compact_more_until_ms", UInt64(0)); - prepareSegments(50, is_common_handle); - prepareManagedStorage(); - manager = std::make_unique(*db_context); + helper = std::make_unique(*db_context); + helper->setSettings(50); + + setupStorage(); + + // Split into 4 segments, and prepare some delta data for first 3 segments. + helper->prepareSegments(storage->getAndMaybeInitStore(), 50, pk_type); + + prepareDataForFirstThreeSegments(); } CATCH } - void TearDown() override - { - // TODO: This is more like a hack. Should use storage->drop(); - db_context->getGlobalContext().getTMTContext().getStorages().remove(store->physical_table_id); - } - - void prepareManagedStorage() + void setupStorage() { - // TODO: This is more like a hack. Should construct a Storage using column definitions. - const String table_name = "mytable"; - ASTPtr astptr(new ASTIdentifier(table_name, ASTIdentifier::Kind::Table)); - astptr->children.emplace_back(new ASTIdentifier("col1")); - - NamesAndTypesList columns = {{"col1", std::make_shared()}}; - - TiDB::TableInfo ti; - ti.id = store->physical_table_id; + auto columns = DM::tests::DMTestEnv::getDefaultTableColumns(pk_type); + auto table_info = DM::tests::DMTestEnv::getMinimalTableInfo(TABLE_ID, pk_type); + auto astptr = DM::tests::DMTestEnv::getPrimaryKeyExpr("test_table", pk_type); storage = StorageDeltaMerge::create("TiFlash", - /* db_name= */ "default", - table_name, - std::ref(ti), + "default" /* db_name */, + "test_table" /* table_name */, + std::ref(table_info), ColumnsDescription{columns}, astptr, 0, db_context->getGlobalContext()); - storage->is_common_handle = store->is_common_handle; - storage->rowkey_column_size = store->rowkey_column_size; - storage->_store = store; - storage->store_inited.store(true, std::memory_order_seq_cst); storage->startup(); } -protected: - bool is_common_handle; + void prepareDataForFirstThreeSegments() + { + // Write data to first 3 segments. + auto newly_written_rows = helper->rows_by_segments[0] + helper->rows_by_segments[1] + helper->rows_by_segments[2]; + Block block = DM::tests::DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, pk_type, 5 /* new tso */); + storage->write(block, db_context->getSettingsRef()); + storage->flushCache(*db_context); + + helper->expected_delta_rows[0] += helper->rows_by_segments[0]; + helper->expected_delta_rows[1] += helper->rows_by_segments[1]; + helper->expected_delta_rows[2] += helper->rows_by_segments[2]; + helper->verifyExpectedRowsForAllSegments(); + } - std::unique_ptr manager; + void TearDown() override + { + storage->drop(); + db_context->getTMTContext().getStorages().remove(TABLE_ID); + } +protected: + std::unique_ptr helper; StorageDeltaMergePtr storage; + std::unique_ptr manager; + + DM::tests::DMTestEnv::PkType pk_type; [[maybe_unused]] Poco::Logger * log; }; @@ -102,7 +117,13 @@ class BasicManualCompactTest INSTANTIATE_TEST_CASE_P( ByCommonHandle, BasicManualCompactTest, - testing::Values(/* is_common_handle */ true, false)); + testing::Values( + DM::tests::DMTestEnv::PkType::HiddenTiDBRowID, + DM::tests::DMTestEnv::PkType::CommonHandle, + DM::tests::DMTestEnv::PkType::PkIsHandleInt64), + [](const testing::TestParamInfo & info) { + return DM::tests::DMTestEnv::PkTypeToString(info.param); + }); TEST_P(BasicManualCompactTest, EmptyRequest) @@ -132,7 +153,7 @@ TEST_P(BasicManualCompactTest, InvalidStartKey) try { auto request = ::kvrpcpb::CompactRequest(); - request.set_physical_table_id(store->physical_table_id); + request.set_physical_table_id(TABLE_ID); request.set_start_key("abcd"); auto response = ::kvrpcpb::CompactResponse(); auto status_code = manager->doWork(&request, &response); @@ -145,64 +166,129 @@ CATCH TEST_P(BasicManualCompactTest, NoStartKey) try { + auto request = ::kvrpcpb::CompactRequest(); + request.set_physical_table_id(TABLE_ID); + auto response = ::kvrpcpb::CompactResponse(); + auto status_code = manager->doWork(&request, &response); + ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); + ASSERT_FALSE(response.has_error()); + + helper->expected_stable_rows[0] += helper->expected_delta_rows[0]; + helper->expected_delta_rows[0] = 0; + helper->verifyExpectedRowsForAllSegments(); +} +CATCH + + +// Start key is empty. Should compact the first segment. +TEST_P(BasicManualCompactTest, EmptyStartKey) +try +{ + auto request = ::kvrpcpb::CompactRequest(); + request.set_physical_table_id(TABLE_ID); + request.set_start_key(""); + auto response = ::kvrpcpb::CompactResponse(); + auto status_code = manager->doWork(&request, &response); + ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); + ASSERT_FALSE(response.has_error()); + + helper->expected_stable_rows[0] += helper->expected_delta_rows[0]; + helper->expected_delta_rows[0] = 0; + helper->verifyExpectedRowsForAllSegments(); +} +CATCH + + +// Specify a key in segment[1]. Should compact this segment. +TEST_P(BasicManualCompactTest, SpecifiedStartKey) +try +{ + // TODO: This test may be not appropriate. It highly relies on internal implementation: + // The encoding of the start key should be hidden from the caller. + + auto request = ::kvrpcpb::CompactRequest(); + request.set_physical_table_id(TABLE_ID); { - // Write data to first 3 segments. - auto newly_written_rows = rows_by_segments[0] + rows_by_segments[1] + rows_by_segments[2]; - Block block = DM::tests::DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, 5 /* new tso */, is_common_handle); - store->write(*db_context, db_context->getSettingsRef(), block); - store->flushCache(dm_context, DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); - - expected_delta_rows[0] += rows_by_segments[0]; - expected_delta_rows[1] += rows_by_segments[1]; - expected_delta_rows[2] += rows_by_segments[2]; - verifyExpectedRowsForAllSegments(); - } - { - auto request = ::kvrpcpb::CompactRequest(); - request.set_physical_table_id(store->physical_table_id); - auto response = ::kvrpcpb::CompactResponse(); - auto status_code = manager->doWork(&request, &response); - ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); - ASSERT_FALSE(response.has_error()); - } - { - expected_stable_rows[0] += expected_delta_rows[0]; - expected_delta_rows[0] = 0; - verifyExpectedRowsForAllSegments(); + WriteBufferFromOwnString wb; + auto seg0 = storage->getAndMaybeInitStore()->segments.begin()->second; + auto seg1_start_key = seg0->getRowKeyRange().end; + seg1_start_key.toPrefixNext().serialize(wb); + request.set_start_key(wb.releaseStr()); } + auto response = ::kvrpcpb::CompactResponse(); + auto status_code = manager->doWork(&request, &response); + ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); + ASSERT_FALSE(response.has_error()); + + helper->expected_stable_rows[1] += helper->expected_delta_rows[1]; + helper->expected_delta_rows[1] = 0; + helper->verifyExpectedRowsForAllSegments(); } CATCH -TEST_P(BasicManualCompactTest, EmptyStartKey) +TEST_P(BasicManualCompactTest, StartKeyFromPreviousResponse) try { + ::kvrpcpb::CompactResponse response; { - // Write data to first 3 segments. - auto newly_written_rows = rows_by_segments[0] + rows_by_segments[1] + rows_by_segments[2]; - Block block = DM::tests::DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, 5 /* new tso */, is_common_handle); - store->write(*db_context, db_context->getSettingsRef(), block); - store->flushCache(dm_context, DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); - - expected_delta_rows[0] += rows_by_segments[0]; - expected_delta_rows[1] += rows_by_segments[1]; - expected_delta_rows[2] += rows_by_segments[2]; - verifyExpectedRowsForAllSegments(); + // Request 1 + auto request = ::kvrpcpb::CompactRequest(); + request.set_physical_table_id(TABLE_ID); + response = ::kvrpcpb::CompactResponse(); + auto status_code = manager->doWork(&request, &response); + ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); + ASSERT_FALSE(response.has_error()); + + helper->expected_stable_rows[0] += helper->expected_delta_rows[0]; + helper->expected_delta_rows[0] = 0; + helper->verifyExpectedRowsForAllSegments(); } { + // Request 2, use the start key from previous response. We should compact both segment 1 and segment 2. auto request = ::kvrpcpb::CompactRequest(); - request.set_physical_table_id(store->physical_table_id); - request.set_start_key(""); - auto response = ::kvrpcpb::CompactResponse(); + request.set_physical_table_id(TABLE_ID); + request.set_start_key(response.compacted_end_key()); + response = ::kvrpcpb::CompactResponse(); auto status_code = manager->doWork(&request, &response); ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); ASSERT_FALSE(response.has_error()); + + helper->expected_stable_rows[1] += helper->expected_delta_rows[1]; + helper->expected_delta_rows[1] = 0; + helper->verifyExpectedRowsForAllSegments(); } +} +CATCH + + +TEST_P(BasicManualCompactTest, CompactMultiple) +try +{ + db_context->setSetting("manual_compact_more_until_ms", UInt64(60 * 1000)); // Hope it's long enough! + + auto request = ::kvrpcpb::CompactRequest(); + request.set_physical_table_id(TABLE_ID); + auto response = ::kvrpcpb::CompactResponse(); + auto status_code = manager->doWork(&request, &response); + ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK); + ASSERT_FALSE(response.has_error()); + + // All segments should be compacted. + for (size_t i = 0; i < 4; ++i) { - expected_stable_rows[0] += expected_delta_rows[0]; - expected_delta_rows[0] = 0; - verifyExpectedRowsForAllSegments(); + helper->expected_stable_rows[i] += helper->expected_delta_rows[i]; + helper->expected_delta_rows[i] = 0; } + helper->verifyExpectedRowsForAllSegments(); +} +CATCH + + +// When there are duplicated logical id while processing, the later one should return error immediately. +TEST_P(BasicManualCompactTest, DuplicatedLogicalId) +try +{ } CATCH diff --git a/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtils.h b/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h similarity index 68% rename from dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtils.h rename to dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h index 6ec470d85d7..f1f24705d79 100644 --- a/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtils.h +++ b/dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h @@ -39,61 +39,47 @@ namespace DM namespace tests { -/// This utility helps you set up a DMStore with 4 segments. -class MultiSegmentTest : public DB::base::TiFlashStorageTestBasic +/// Helper class to test with multiple segments. +/// You can call `prepareSegments` to prepare multiple segments. After that, +/// you can use `verifyExpectedRowsForAllSegments` to verify the expectation for each segment. +class MultiSegmentTestUtil : private boost::noncopyable { protected: + String tracing_id; + Context & db_context; DeltaMergeStorePtr store; - DMContextPtr dm_context; // Will be set after calling prepareSegments. - std::map rows_by_segments; // Will be set after calling prepareSegments. - std::map expected_stable_rows; // Will be set after calling prepareSegments. - std::map expected_delta_rows; // Will be set after calling prepareSegments. - /// Retry until a segment at index is successfully split. - void forceForegroundSplit(size_t segment_idx) const - { - while (true) - { - store->read_write_mutex.lock(); - auto seg = std::next(store->segments.begin(), segment_idx)->second; - store->read_write_mutex.unlock(); - auto result = store->segmentSplit(*dm_context, seg, /*is_foreground*/ true); - if (result.first) - { - break; - } - } - } +public: + std::map rows_by_segments; + std::map expected_stable_rows; + std::map expected_delta_rows; - /// Prepare segments * 4. The rows of each segment will be roughly close to n_avg_rows_per_segment. The exact rows will be recorded in rows_by_segments. - void prepareSegments(size_t n_avg_rows_per_segment, bool is_common_handle) - { - auto * log = &Poco::Logger::get(GET_GTEST_FULL_NAME); + MultiSegmentTestUtil(Context & db_context_) + : tracing_id(DB::base::TiFlashStorageTestBasic::getCurrentFullTestName()) + , db_context(db_context_) + {} + /// Update context settings to keep multiple segments stable. + void setSettings(size_t rows_per_segment) + { // Avoid bg merge. // TODO (wenxuan): Seems to be not very stable. - db_context->setSetting("dt_bg_gc_max_segments_to_check_every_round", UInt64(0)); - db_context->setSetting("dt_segment_limit_rows", UInt64(n_avg_rows_per_segment)); + db_context.setSetting("dt_bg_gc_max_segments_to_check_every_round", UInt64(0)); + db_context.setSetting("dt_segment_limit_rows", UInt64(rows_per_segment)); + } - { - auto cols = DMTestEnv::getDefaultColumns(is_common_handle ? DMTestEnv::PkType::CommonHandle : DMTestEnv::PkType::HiddenTiDBRowID); - ColumnDefine handle_column_define = (*cols)[0]; - store = std::make_shared(*db_context, - false, - "test", - GET_GTEST_FULL_NAME, - 101, - *cols, - handle_column_define, - is_common_handle, - 1, - DeltaMergeStore::Settings()); - } - dm_context = store->newDMContext(*db_context, db_context->getSettingsRef(), /*tracing_id*/ GET_GTEST_FULL_NAME); + /// Prepare segments * 4. The rows of each segment will be roughly close to n_avg_rows_per_segment. + /// The exact rows will be recorded in rows_by_segments. + void prepareSegments(DeltaMergeStorePtr store_, size_t n_avg_rows_per_segment, DMTestEnv::PkType pk_type) + { + store = store_; + + auto * log = &Poco::Logger::get(tracing_id); + auto dm_context = store->newDMContext(db_context, db_context.getSettingsRef(), /*tracing_id*/ tracing_id); { // Write [0, 4*N) data with tso=2. - Block block = DMTestEnv::prepareSimpleWriteBlock(0, n_avg_rows_per_segment * 4, false, 2, is_common_handle); - store->write(*db_context, db_context->getSettingsRef(), block); + Block block = DMTestEnv::prepareSimpleWriteBlock(0, n_avg_rows_per_segment * 4, false, pk_type, 2); + store->write(db_context, db_context.getSettingsRef(), block); store->flushCache(dm_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); } { @@ -131,6 +117,24 @@ class MultiSegmentTest : public DB::base::TiFlashStorageTestBasic } ASSERT_EQ(total_stable_rows, 4 * n_avg_rows_per_segment); } + verifyExpectedRowsForAllSegments(); + } + + /// Retry until a segment at index is successfully split. + void forceForegroundSplit(size_t segment_idx) const + { + auto dm_context = store->newDMContext(db_context, db_context.getSettingsRef(), tracing_id); + while (true) + { + store->read_write_mutex.lock(); + auto seg = std::next(store->segments.begin(), segment_idx)->second; + store->read_write_mutex.unlock(); + auto result = store->segmentSplit(*dm_context, seg, /*is_foreground*/ true); + if (result.first) + { + break; + } + } } /// Checks whether current rows in segments meets our expectation. diff --git a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h index 20fc53263b6..b35dae0cbe2 100644 --- a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h +++ b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -107,6 +108,8 @@ class DMTestEnv static constexpr const char * PK_NAME_PK_IS_HANDLE = "id"; + static constexpr ColId PK_ID_PK_IS_HANDLE = 2; + enum class PkType { // If the primary key is composed of multiple columns and non-clustered-index, @@ -150,10 +153,10 @@ class DMTestEnv columns->emplace_back(getExtraHandleColumnDefine(/*is_common_handle=*/true)); break; case PkType::PkIsHandleInt64: - columns->emplace_back(ColumnDefine{2, PK_NAME_PK_IS_HANDLE, EXTRA_HANDLE_COLUMN_INT_TYPE}); + columns->emplace_back(ColumnDefine{PK_ID_PK_IS_HANDLE, PK_NAME_PK_IS_HANDLE, EXTRA_HANDLE_COLUMN_INT_TYPE}); break; case PkType::PkIsHandleInt32: - columns->emplace_back(ColumnDefine{2, PK_NAME_PK_IS_HANDLE, DataTypeFactory::instance().get("Int32")}); + columns->emplace_back(ColumnDefine{PK_ID_PK_IS_HANDLE, PK_NAME_PK_IS_HANDLE, DataTypeFactory::instance().get("Int32")}); break; default: throw Exception("Unknown pk type for test"); @@ -163,6 +166,96 @@ class DMTestEnv return columns; } + /// Returns a NamesAndTypesList that can be used to construct StorageDeltaMerge. + static NamesAndTypesList getDefaultTableColumns(PkType pk_type = PkType::HiddenTiDBRowID) + { + NamesAndTypesList columns; + switch (pk_type) + { + case PkType::HiddenTiDBRowID: + columns.push_back({EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_INT_TYPE}); + break; + case PkType::CommonHandle: + columns.push_back({PK_NAME_PK_IS_HANDLE, EXTRA_HANDLE_COLUMN_STRING_TYPE}); // For common handle, there must be a user-given primary key. + columns.push_back({EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_STRING_TYPE}); // For common handle, a _tidb_rowid is also constructed. + break; + case PkType::PkIsHandleInt64: + columns.emplace_back(PK_NAME_PK_IS_HANDLE, EXTRA_HANDLE_COLUMN_INT_TYPE); + break; + case PkType::PkIsHandleInt32: + throw Exception("PkIsHandleInt32 is unsupported"); + default: + throw Exception("Unknown pk type for test"); + } + return columns; + } + + /// Returns a TableInfo that can be used to construct StorageDeltaMerge. + static TiDB::TableInfo getMinimalTableInfo(TableID table_id, PkType pk_type = PkType::HiddenTiDBRowID) + { + TiDB::TableInfo table_info; + table_info.id = table_id; + switch (pk_type) + { + case PkType::HiddenTiDBRowID: + table_info.is_common_handle = false; + table_info.pk_is_handle = false; + break; + case PkType::CommonHandle: + { + table_info.is_common_handle = true; + table_info.pk_is_handle = false; + ColumnInfo pk_column; // For common handle, there must be a user-given primary key. + pk_column.id = PK_ID_PK_IS_HANDLE; + pk_column.name = PK_NAME_PK_IS_HANDLE; + pk_column.setPriKeyFlag(); + table_info.columns.push_back(pk_column); + break; + } + case PkType::PkIsHandleInt64: + { + table_info.is_common_handle = false; + table_info.pk_is_handle = true; + ColumnInfo pk_column; + pk_column.id = PK_ID_PK_IS_HANDLE; + pk_column.name = PK_NAME_PK_IS_HANDLE; + pk_column.setPriKeyFlag(); + table_info.columns.push_back(pk_column); + break; + } + case PkType::PkIsHandleInt32: + throw Exception("PkIsHandleInt32 is unsupported"); + default: + throw Exception("Unknown pk type for test"); + } + return table_info; + } + + /// Return a ASTPtr that can be used to construct StorageDeltaMerge. + static ASTPtr getPrimaryKeyExpr(const String & table_name, PkType pk_type = PkType::HiddenTiDBRowID) + { + ASTPtr astptr(new ASTIdentifier(table_name, ASTIdentifier::Kind::Table)); + String name; + switch (pk_type) + { + case PkType::HiddenTiDBRowID: + name = EXTRA_HANDLE_COLUMN_NAME; + break; + case PkType::CommonHandle: + name = EXTRA_HANDLE_COLUMN_NAME; + break; + case PkType::PkIsHandleInt64: + name = PK_NAME_PK_IS_HANDLE; + break; + case PkType::PkIsHandleInt32: + throw Exception("PkIsHandleInt32 is unsupported"); + default: + throw Exception("Unknown pk type for test"); + } + astptr->children.emplace_back(new ASTIdentifier(name)); + return astptr; + } + /** * Create a simple block with 3 columns: * * `pk` - Int64 / `version` / `tag` @@ -179,7 +272,8 @@ class DMTestEnv ColumnID pk_col_id = EXTRA_HANDLE_COLUMN_ID, DataTypePtr pk_type = EXTRA_HANDLE_COLUMN_INT_TYPE, bool is_common_handle = false, - size_t rowkey_column_size = 1) + size_t rowkey_column_size = 1, + bool with_internal_columns = true) { Block block; const size_t num_rows = (end - beg); @@ -221,16 +315,19 @@ class DMTestEnv EXTRA_HANDLE_COLUMN_ID}); } } - // version_col - block.insert(DB::tests::createColumn( - std::vector(num_rows, tso), - VERSION_COLUMN_NAME, - VERSION_COLUMN_ID)); - // tag_col - block.insert(DB::tests::createColumn( - std::vector(num_rows, 0), - TAG_COLUMN_NAME, - TAG_COLUMN_ID)); + if (with_internal_columns) + { + // version_col + block.insert(DB::tests::createColumn( + std::vector(num_rows, tso), + VERSION_COLUMN_NAME, + VERSION_COLUMN_ID)); + // tag_col + block.insert(DB::tests::createColumn( + std::vector(num_rows, 0), + TAG_COLUMN_NAME, + TAG_COLUMN_ID)); + } return block; } @@ -245,11 +342,24 @@ class DMTestEnv static Block prepareSimpleWriteBlock(size_t beg, size_t end, bool reversed, - UInt64 tso, - bool is_common_handle, - size_t rowkey_column_size = 1) + PkType pk_type, + UInt64 tso = 2, + bool with_internal_columns = true) { - return prepareSimpleWriteBlock(beg, end, reversed, tso, pk_name, EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_INT_TYPE, is_common_handle, rowkey_column_size); + switch (pk_type) + { + case PkType::HiddenTiDBRowID: + return prepareSimpleWriteBlock(beg, end, reversed, tso, EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_INT_TYPE, false, 1, with_internal_columns); + case PkType::CommonHandle: + return prepareSimpleWriteBlock(beg, end, reversed, tso, EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_STRING_TYPE, true, 1, with_internal_columns); + case PkType::PkIsHandleInt64: + return prepareSimpleWriteBlock(beg, end, reversed, tso, PK_NAME_PK_IS_HANDLE, PK_ID_PK_IS_HANDLE, EXTRA_HANDLE_COLUMN_INT_TYPE, false, 1, with_internal_columns); + break; + case PkType::PkIsHandleInt32: + throw Exception("PkIsHandleInt32 is unsupported"); + default: + throw Exception("Unknown pk type for test"); + } } /** diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index 21740e8d948..5b45117ae0f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -36,8 +36,8 @@ #include #include +#include "MultiSegmentTestUtil.h" #include "dm_basic_include.h" -#include "MultiSegmentTestUtils.h" namespace DB { @@ -3362,45 +3362,68 @@ INSTANTIATE_TEST_CASE_P( testModeToString); -using MergeDeltaBySegmentParam = std::tuple; - -String mergeDeltaBySegmentParamToString(const ::testing::TestParamInfo & info) -{ - const auto [ps_ver, is_common_handle] = info.param; - return fmt::format("PsV{}_{}", ps_ver, is_common_handle ? "CommonHandle" : "IntHandle"); -} - -// Test for different kind of handles (Int / Common). class DeltaMergeStoreMergeDeltaBySegmentTest - : public MultiSegmentTest - , public testing::WithParamInterface + : public DB::base::TiFlashStorageTestBasic + , public testing::WithParamInterface> { public: void SetUp() override { - log = &Poco::Logger::get(GET_GTEST_FULL_NAME); - std::tie(ps_ver, is_common_handle) = GetParam(); - setStorageFormat(ps_ver); - TiFlashStorageTestBasic::SetUp(); - prepareSegments(50, is_common_handle); + try + { + log = &Poco::Logger::get(DB::base::TiFlashStorageTestBasic::getCurrentFullTestName()); + std::tie(ps_ver, pk_type) = GetParam(); + setStorageFormat(ps_ver); + TiFlashStorageTestBasic::SetUp(); + + helper = std::make_unique(*db_context); + helper->setSettings(50); + + setupDMStore(); + + // Split into 4 segments. + helper->prepareSegments(store, 50, pk_type); + } + CATCH + } + + void setupDMStore() + { + auto cols = DMTestEnv::getDefaultColumns(pk_type); + store = std::make_shared(*db_context, + false, + "test", + DB::base::TiFlashStorageTestBasic::getCurrentFullTestName(), + 101, + *cols, + (*cols)[0], + pk_type == DMTestEnv::PkType::CommonHandle, + 1, + DeltaMergeStore::Settings()); + dm_context = store->newDMContext(*db_context, db_context->getSettingsRef(), DB::base::TiFlashStorageTestBasic::getCurrentFullTestName()); } protected: + std::unique_ptr helper; + DeltaMergeStorePtr store; + DMContextPtr dm_context; + UInt64 ps_ver; - bool is_common_handle; + DMTestEnv::PkType pk_type; [[maybe_unused]] Poco::Logger * log; }; INSTANTIATE_TEST_CASE_P( - ByPsVerAndCommonHandle, + ByPsVerAndPkType, DeltaMergeStoreMergeDeltaBySegmentTest, - testing::Values( - std::make_pair(2, true), - std::make_pair(2, false), - std::make_pair(3, true), - std::make_pair(3, false)), - mergeDeltaBySegmentParamToString); + ::testing::Combine( + ::testing::Values(2, 3), + ::testing::Values(DMTestEnv::PkType::HiddenTiDBRowID, DMTestEnv::PkType::CommonHandle, DMTestEnv::PkType::PkIsHandleInt64)), + [](const testing::TestParamInfo> & info) { + const auto [ps_ver, pk_type] = info.param; + return fmt::format("PsV{}_{}", ps_ver, DMTestEnv::PkTypeToString(pk_type)); + }); // The given key is the boundary of the segment. @@ -3409,17 +3432,17 @@ try { { // Write data to first 3 segments. - auto newly_written_rows = rows_by_segments[0] + rows_by_segments[1] + rows_by_segments[2]; - Block block = DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, 5 /* new tso */, is_common_handle); + auto newly_written_rows = helper->rows_by_segments[0] + helper->rows_by_segments[1] + helper->rows_by_segments[2]; + Block block = DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, pk_type, 5 /* new tso */); store->write(*db_context, db_context->getSettingsRef(), block); store->flushCache(dm_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); - expected_delta_rows[0] += rows_by_segments[0]; - expected_delta_rows[1] += rows_by_segments[1]; - expected_delta_rows[2] += rows_by_segments[2]; - verifyExpectedRowsForAllSegments(); + helper->expected_delta_rows[0] += helper->rows_by_segments[0]; + helper->expected_delta_rows[1] += helper->rows_by_segments[1]; + helper->expected_delta_rows[2] += helper->rows_by_segments[2]; + helper->verifyExpectedRowsForAllSegments(); } - if (is_common_handle) + if (store->isCommonHandle()) { // For common handle, give int handle key and have a try auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MIN_KEY); @@ -3442,7 +3465,7 @@ try std::optional result_1; { // Specifies MIN_KEY. In this case, the first segment should be processed. - if (is_common_handle) + if (store->isCommonHandle()) { result_1 = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MIN_KEY); } @@ -3454,16 +3477,16 @@ try ASSERT_NE(result_1, std::nullopt); ASSERT_EQ(*result_1, store->segments.begin()->second->getRowKeyRange()); - expected_stable_rows[0] += expected_delta_rows[0]; - expected_delta_rows[0] = 0; - verifyExpectedRowsForAllSegments(); + helper->expected_stable_rows[0] += helper->expected_delta_rows[0]; + helper->expected_delta_rows[0] = 0; + helper->verifyExpectedRowsForAllSegments(); } { // Compact the first segment again, nothing should change. auto result = store->mergeDeltaBySegment(*db_context, result_1->start); ASSERT_EQ(*result, *result_1); - verifyExpectedRowsForAllSegments(); + helper->verifyExpectedRowsForAllSegments(); } std::optional result_2; { @@ -3472,9 +3495,9 @@ try ASSERT_NE(result_2, std::nullopt); ASSERT_EQ(*result_2, std::next(store->segments.begin())->second->getRowKeyRange()); - expected_stable_rows[1] += expected_delta_rows[1]; - expected_delta_rows[1] = 0; - verifyExpectedRowsForAllSegments(); + helper->expected_stable_rows[1] += helper->expected_delta_rows[1]; + helper->expected_delta_rows[1] = 0; + helper->verifyExpectedRowsForAllSegments(); } } CATCH @@ -3492,13 +3515,13 @@ try result = store->mergeDeltaBySegment(*db_context, seg->getRowKeyRange().start); ASSERT_NE(result, std::nullopt); - verifyExpectedRowsForAllSegments(); + helper->verifyExpectedRowsForAllSegments(); } { // As we are the last segment, compact "next segment" should result in failure. A nullopt is returned. auto result2 = store->mergeDeltaBySegment(*db_context, result->end); ASSERT_EQ(result2, std::nullopt); - verifyExpectedRowsForAllSegments(); + helper->verifyExpectedRowsForAllSegments(); } } CATCH @@ -3510,27 +3533,27 @@ try { { // Write data to first 3 segments. - auto newly_written_rows = rows_by_segments[0] + rows_by_segments[1] + rows_by_segments[2]; - Block block = DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, 5 /* new tso */, is_common_handle); + auto newly_written_rows = helper->rows_by_segments[0] + helper->rows_by_segments[1] + helper->rows_by_segments[2]; + Block block = DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, pk_type, 5 /* new tso */); store->write(*db_context, db_context->getSettingsRef(), block); store->flushCache(dm_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())); - expected_delta_rows[0] += rows_by_segments[0]; - expected_delta_rows[1] += rows_by_segments[1]; - expected_delta_rows[2] += rows_by_segments[2]; - verifyExpectedRowsForAllSegments(); + helper->expected_delta_rows[0] += helper->rows_by_segments[0]; + helper->expected_delta_rows[1] += helper->rows_by_segments[1]; + helper->expected_delta_rows[2] += helper->rows_by_segments[2]; + helper->verifyExpectedRowsForAllSegments(); } { - // Compact segment[1] + // Compact segment[1] by giving a prefix-next key. auto range = std::next(store->segments.begin())->second->getRowKeyRange(); auto compact_key = range.start.toPrefixNext(); auto result = store->mergeDeltaBySegment(*db_context, compact_key); ASSERT_NE(result, std::nullopt); - expected_stable_rows[1] += expected_delta_rows[1]; - expected_delta_rows[1] = 0; - verifyExpectedRowsForAllSegments(); + helper->expected_stable_rows[1] += helper->expected_delta_rows[1]; + helper->expected_delta_rows[1] = 0; + helper->verifyExpectedRowsForAllSegments(); } } CATCH