Skip to content

Commit

Permalink
refactor tests to remove the hack
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <[email protected]>
  • Loading branch information
breezewish committed May 13, 2022
1 parent 93567f5 commit b503ed0
Show file tree
Hide file tree
Showing 4 changed files with 412 additions and 189 deletions.
238 changes: 162 additions & 76 deletions dbms/src/Flash/Management/tests/gtest_manual_compact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
#include <Poco/Logger.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/tests/MultiSegmentTestUtils.h>
#include <Storages/DeltaMerge/tests/MultiSegmentTestUtil.h>
#include <Storages/DeltaMerge/tests/dm_basic_include.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/tests/TiFlashStorageTestBasic.h>
#include <common/types.h>
#include <fmt/ranges.h>
#include <gtest/gtest.h>

#include <ext/scope_guard.h>
Expand All @@ -31,69 +33,82 @@ namespace tests
{


// Test for different kind of handles (Int / Common).
class BasicManualCompactTest
: public DB::DM::tests::MultiSegmentTest
, public testing::WithParamInterface<bool>
: public DB::base::TiFlashStorageTestBasic
, public testing::WithParamInterface<DM::tests::DMTestEnv::PkType>
{
public:
static constexpr TableID TABLE_ID = 5;

void SetUp() override
{
try
{
log = &Poco::Logger::get(DB::base::TiFlashStorageTestBasic::getCurrentFullTestName());
is_common_handle = GetParam();
pk_type = GetParam();
TiFlashStorageTestBasic::SetUp();

manager = std::make_unique<DB::Management::ManualCompactManager>(*db_context);

// In tests let's only compact one segment.
db_context->setSetting("manual_compact_more_until_ms", UInt64(0));

prepareSegments(50, is_common_handle);
prepareManagedStorage();
manager = std::make_unique<DB::Management::ManualCompactManager>(*db_context);
helper = std::make_unique<DM::tests::MultiSegmentTestUtil>(*db_context);
helper->setSettings(50);

setupStorage();

// Split into 4 segments, and prepare some delta data for first 3 segments.
helper->prepareSegments(storage->getAndMaybeInitStore(), 50, pk_type);

prepareDataForFirstThreeSegments();
}
CATCH
}

void TearDown() override
{
// TODO: This is more like a hack. Should use storage->drop();
db_context->getGlobalContext().getTMTContext().getStorages().remove(store->physical_table_id);
}

void prepareManagedStorage()
void setupStorage()
{
// TODO: This is more like a hack. Should construct a Storage using column definitions.
const String table_name = "mytable";
ASTPtr astptr(new ASTIdentifier(table_name, ASTIdentifier::Kind::Table));
astptr->children.emplace_back(new ASTIdentifier("col1"));

NamesAndTypesList columns = {{"col1", std::make_shared<DataTypeInt64>()}};

TiDB::TableInfo ti;
ti.id = store->physical_table_id;
auto columns = DM::tests::DMTestEnv::getDefaultTableColumns(pk_type);
auto table_info = DM::tests::DMTestEnv::getMinimalTableInfo(TABLE_ID, pk_type);
auto astptr = DM::tests::DMTestEnv::getPrimaryKeyExpr("test_table", pk_type);

storage = StorageDeltaMerge::create("TiFlash",
/* db_name= */ "default",
table_name,
std::ref(ti),
"default" /* db_name */,
"test_table" /* table_name */,
std::ref(table_info),
ColumnsDescription{columns},
astptr,
0,
db_context->getGlobalContext());
storage->is_common_handle = store->is_common_handle;
storage->rowkey_column_size = store->rowkey_column_size;
storage->_store = store;
storage->store_inited.store(true, std::memory_order_seq_cst);
storage->startup();
}

protected:
bool is_common_handle;
void prepareDataForFirstThreeSegments()
{
// Write data to first 3 segments.
auto newly_written_rows = helper->rows_by_segments[0] + helper->rows_by_segments[1] + helper->rows_by_segments[2];
Block block = DM::tests::DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, pk_type, 5 /* new tso */);
storage->write(block, db_context->getSettingsRef());
storage->flushCache(*db_context);

helper->expected_delta_rows[0] += helper->rows_by_segments[0];
helper->expected_delta_rows[1] += helper->rows_by_segments[1];
helper->expected_delta_rows[2] += helper->rows_by_segments[2];
helper->verifyExpectedRowsForAllSegments();
}

std::unique_ptr<DB::Management::ManualCompactManager> manager;
void TearDown() override
{
storage->drop();
db_context->getTMTContext().getStorages().remove(TABLE_ID);
}

protected:
std::unique_ptr<DM::tests::MultiSegmentTestUtil> helper;
StorageDeltaMergePtr storage;
std::unique_ptr<DB::Management::ManualCompactManager> manager;

DM::tests::DMTestEnv::PkType pk_type;

[[maybe_unused]] Poco::Logger * log;
};
Expand All @@ -102,7 +117,13 @@ class BasicManualCompactTest
INSTANTIATE_TEST_CASE_P(
ByCommonHandle,
BasicManualCompactTest,
testing::Values(/* is_common_handle */ true, false));
testing::Values(
DM::tests::DMTestEnv::PkType::HiddenTiDBRowID,
DM::tests::DMTestEnv::PkType::CommonHandle,
DM::tests::DMTestEnv::PkType::PkIsHandleInt64),
[](const testing::TestParamInfo<DM::tests::DMTestEnv::PkType> & info) {
return DM::tests::DMTestEnv::PkTypeToString(info.param);
});


TEST_P(BasicManualCompactTest, EmptyRequest)
Expand Down Expand Up @@ -132,7 +153,7 @@ TEST_P(BasicManualCompactTest, InvalidStartKey)
try
{
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(store->physical_table_id);
request.set_physical_table_id(TABLE_ID);
request.set_start_key("abcd");
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
Expand All @@ -145,64 +166,129 @@ CATCH
TEST_P(BasicManualCompactTest, NoStartKey)
try
{
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(TABLE_ID);
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

helper->expected_stable_rows[0] += helper->expected_delta_rows[0];
helper->expected_delta_rows[0] = 0;
helper->verifyExpectedRowsForAllSegments();
}
CATCH


// Start key is empty. Should compact the first segment.
TEST_P(BasicManualCompactTest, EmptyStartKey)
try
{
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(TABLE_ID);
request.set_start_key("");
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

helper->expected_stable_rows[0] += helper->expected_delta_rows[0];
helper->expected_delta_rows[0] = 0;
helper->verifyExpectedRowsForAllSegments();
}
CATCH


// Specify a key in segment[1]. Should compact this segment.
TEST_P(BasicManualCompactTest, SpecifiedStartKey)
try
{
// TODO: This test may be not appropriate. It highly relies on internal implementation:
// The encoding of the start key should be hidden from the caller.

auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(TABLE_ID);
{
// Write data to first 3 segments.
auto newly_written_rows = rows_by_segments[0] + rows_by_segments[1] + rows_by_segments[2];
Block block = DM::tests::DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, 5 /* new tso */, is_common_handle);
store->write(*db_context, db_context->getSettingsRef(), block);
store->flushCache(dm_context, DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

expected_delta_rows[0] += rows_by_segments[0];
expected_delta_rows[1] += rows_by_segments[1];
expected_delta_rows[2] += rows_by_segments[2];
verifyExpectedRowsForAllSegments();
}
{
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(store->physical_table_id);
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());
}
{
expected_stable_rows[0] += expected_delta_rows[0];
expected_delta_rows[0] = 0;
verifyExpectedRowsForAllSegments();
WriteBufferFromOwnString wb;
auto seg0 = storage->getAndMaybeInitStore()->segments.begin()->second;
auto seg1_start_key = seg0->getRowKeyRange().end;
seg1_start_key.toPrefixNext().serialize(wb);
request.set_start_key(wb.releaseStr());
}
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

helper->expected_stable_rows[1] += helper->expected_delta_rows[1];
helper->expected_delta_rows[1] = 0;
helper->verifyExpectedRowsForAllSegments();
}
CATCH


TEST_P(BasicManualCompactTest, EmptyStartKey)
TEST_P(BasicManualCompactTest, StartKeyFromPreviousResponse)
try
{
::kvrpcpb::CompactResponse response;
{
// Write data to first 3 segments.
auto newly_written_rows = rows_by_segments[0] + rows_by_segments[1] + rows_by_segments[2];
Block block = DM::tests::DMTestEnv::prepareSimpleWriteBlock(0, newly_written_rows, false, 5 /* new tso */, is_common_handle);
store->write(*db_context, db_context->getSettingsRef(), block);
store->flushCache(dm_context, DM::RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

expected_delta_rows[0] += rows_by_segments[0];
expected_delta_rows[1] += rows_by_segments[1];
expected_delta_rows[2] += rows_by_segments[2];
verifyExpectedRowsForAllSegments();
// Request 1
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(TABLE_ID);
response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

helper->expected_stable_rows[0] += helper->expected_delta_rows[0];
helper->expected_delta_rows[0] = 0;
helper->verifyExpectedRowsForAllSegments();
}
{
// Request 2, use the start key from previous response. We should compact both segment 1 and segment 2.
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(store->physical_table_id);
request.set_start_key("");
auto response = ::kvrpcpb::CompactResponse();
request.set_physical_table_id(TABLE_ID);
request.set_start_key(response.compacted_end_key());
response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

helper->expected_stable_rows[1] += helper->expected_delta_rows[1];
helper->expected_delta_rows[1] = 0;
helper->verifyExpectedRowsForAllSegments();
}
}
CATCH


TEST_P(BasicManualCompactTest, CompactMultiple)
try
{
db_context->setSetting("manual_compact_more_until_ms", UInt64(60 * 1000)); // Hope it's long enough!

auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(TABLE_ID);
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

// All segments should be compacted.
for (size_t i = 0; i < 4; ++i)
{
expected_stable_rows[0] += expected_delta_rows[0];
expected_delta_rows[0] = 0;
verifyExpectedRowsForAllSegments();
helper->expected_stable_rows[i] += helper->expected_delta_rows[i];
helper->expected_delta_rows[i] = 0;
}
helper->verifyExpectedRowsForAllSegments();
}
CATCH


// When there are duplicated logical id while processing, the later one should return error immediately.
TEST_P(BasicManualCompactTest, DuplicatedLogicalId)
try
{
}
CATCH

Expand Down
Loading

0 comments on commit b503ed0

Please sign in to comment.