Skip to content

Commit

Permalink
stabilize tests and add tests for the logical table id
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <[email protected]>
  • Loading branch information
breezewish committed May 14, 2022
1 parent b503ed0 commit 9407b18
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 34 deletions.
7 changes: 7 additions & 0 deletions .clang-tidy
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ Checks: '-*,
modernize-redundant-void-arg,
modernize-replace-auto-ptr,
modernize-replace-random-shuffle,
modernize-use-auto,
modernize-use-bool-literals,
modernize-use-nullptr,
modernize-use-using,
modernize-use-override,
modernize-use-equals-default,
modernize-use-equals-delete,
Expand Down Expand Up @@ -160,6 +162,11 @@ Checks: '-*,
clang-analyzer-unix.cstring.NullArg,
boost-use-to-string,
cppcoreguidelines-pro-type-cstyle-cast,
cppcoreguidelines-pro-type-member-init,
cppcoreguidelines-no-malloc,
cppcoreguidelines-virtual-class-destructor,
'
WarningsAsErrors: '*'

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(exception_after_drop_segment)

#define APPLY_FOR_FAILPOINTS(M) \
M(skip_check_segment_update) \
M(force_set_page_file_write_errno) \
M(force_split_io_size_4k) \
M(minimum_block_size_for_cross_join) \
Expand Down Expand Up @@ -99,7 +100,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(pause_when_writing_to_dt_store) \
M(pause_when_ingesting_to_dt_store) \
M(pause_when_altering_dt_store) \
M(pause_after_copr_streams_acquired)
M(pause_after_copr_streams_acquired) \
M(pause_before_server_merge_one_delta)

namespace FailPoints
{
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Flash/Management/ManualCompact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/setThreadName.h>
#include <Flash/Management/ManualCompact.h>
#include <Flash/ServiceUtils.h>
Expand All @@ -24,6 +25,10 @@

namespace DB
{
namespace FailPoints
{
extern const char pause_before_server_merge_one_delta[];
} // namespace FailPoints

namespace Management
{
Expand Down Expand Up @@ -153,7 +158,9 @@ grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * requ
// Repeatedly merge multiple segments as much as possible.
while (true)
{
FAIL_POINT_PAUSE(FailPoints::pause_before_server_merge_one_delta);
auto compacted_range = dm_storage->mergeDeltaBySegment(db_context, start_key);

if (compacted_range == std::nullopt)
{
// Segment not found according to current start key
Expand Down
9 changes: 6 additions & 3 deletions dbms/src/Flash/Management/ManualCompact.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ namespace DB
namespace Management
{

/// Serves manual compact requests.
/// Serves manual compact requests. Notice that the "compact" term here has different meanings compared to
/// the word "compact" in the DeltaMerge Store. The "compact request" here refer to the "delta merge" process
/// (major compact), while the "compact" in delta merge store refers to the "compact delta layer" process
/// (minor compact).
/// This class is thread safe. Every public member function can be called without synchronization.
class ManualCompactManager : private boost::noncopyable
{
Expand Down Expand Up @@ -74,9 +77,9 @@ class ManualCompactManager : private boost::noncopyable

/// When there is a task containing the same logical_table running,
/// the task will be rejected.
std::set<int64_t> unsync_active_logical_table_ids;
std::set<int64_t> unsync_active_logical_table_ids = {};

size_t unsync_running_or_pending_tasks;
size_t unsync_running_or_pending_tasks = 0;

// == Accessing members above must be synchronized ==

Expand Down
77 changes: 61 additions & 16 deletions dbms/src/Flash/Management/tests/gtest_manual_compact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FailPoint.h>
#include <Flash/Management/ManualCompact.h>
#include <Parsers/ASTIdentifier.h>
#include <Poco/Logger.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
Expand All @@ -26,9 +26,16 @@
#include <gtest/gtest.h>

#include <ext/scope_guard.h>
#include <future>
#include <thread>

namespace DB
{
namespace FailPoints
{
extern const char pause_before_server_merge_one_delta[];
} // namespace FailPoints

namespace tests
{

Expand All @@ -50,17 +57,14 @@ class BasicManualCompactTest

manager = std::make_unique<DB::Management::ManualCompactManager>(*db_context);

setupStorage();

// In tests let's only compact one segment.
db_context->setSetting("manual_compact_more_until_ms", UInt64(0));

helper = std::make_unique<DM::tests::MultiSegmentTestUtil>(*db_context);
helper->setSettings(50);

setupStorage();

// Split into 4 segments, and prepare some delta data for first 3 segments.
helper = std::make_unique<DM::tests::MultiSegmentTestUtil>(*db_context);
helper->prepareSegments(storage->getAndMaybeInitStore(), 50, pk_type);

prepareDataForFirstThreeSegments();
}
CATCH
Expand Down Expand Up @@ -131,7 +135,7 @@ try
{
auto request = ::kvrpcpb::CompactRequest();
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::INVALID_ARGUMENT);
}
CATCH
Expand All @@ -143,7 +147,7 @@ try
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(9999);
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::INVALID_ARGUMENT);
}
CATCH
Expand All @@ -156,7 +160,7 @@ try
request.set_physical_table_id(TABLE_ID);
request.set_start_key("abcd");
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::INVALID_ARGUMENT);
}
CATCH
Expand All @@ -169,7 +173,7 @@ try
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(TABLE_ID);
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

Expand All @@ -188,7 +192,7 @@ try
request.set_physical_table_id(TABLE_ID);
request.set_start_key("");
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

Expand Down Expand Up @@ -216,7 +220,7 @@ try
request.set_start_key(wb.releaseStr());
}
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

Expand All @@ -236,7 +240,7 @@ try
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(TABLE_ID);
response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

Expand All @@ -250,7 +254,7 @@ try
request.set_physical_table_id(TABLE_ID);
request.set_start_key(response.compacted_end_key());
response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

Expand All @@ -270,7 +274,7 @@ try
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(TABLE_ID);
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->doWork(&request, &response);
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());

Expand All @@ -289,6 +293,47 @@ CATCH
TEST_P(BasicManualCompactTest, DuplicatedLogicalId)
try
{
using namespace std::chrono_literals;

FailPointHelper::enableFailPoint(FailPoints::pause_before_server_merge_one_delta);

auto thread_1_is_ready = std::promise<void>();
std::thread t_req1([&]() {
// req1
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(TABLE_ID);
request.set_logical_table_id(2);
auto response = ::kvrpcpb::CompactResponse();
thread_1_is_ready.set_value();
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());
helper->expected_stable_rows[0] += helper->expected_delta_rows[0];
helper->expected_delta_rows[0] = 0;
helper->verifyExpectedRowsForAllSegments();
});

{
// send req1, wait request being processed.
thread_1_is_ready.get_future().wait();
std::this_thread::sleep_for(500ms); // TODO: Maybe better to use sync_channel to avoid hardcoded wait duration.

// req2: Now let's send another request with the same logical id.
// Although worker pool size is 1, this request will be returned immediately, but with an error.
auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(TABLE_ID);
request.set_logical_table_id(2);
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_TRUE(response.has_error());
ASSERT_TRUE(response.error().has_err_compact_in_progress());
helper->verifyExpectedRowsForAllSegments();
}

// Now let's continue req1's work
FailPointHelper::disableFailPoint(FailPoints::pause_before_server_merge_one_delta);
t_req1.join();
}
CATCH

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Columns/ColumnVector.h>
#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/Logger.h>
#include <Common/TiFlashMetrics.h>
#include <Common/assert_cast.h>
#include <Common/typeid_cast.h>
#include <Core/SortDescription.h>
#include <Functions/FunctionsConversion.h>
#include <Interpreters/sortBlock.h>
Expand All @@ -31,7 +29,6 @@
#include <Storages/DeltaMerge/SchemaUpdate.h>
#include <Storages/DeltaMerge/Segment.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>
#include <Storages/DeltaMerge/StableValueSpace.h>
#include <Storages/DeltaMerge/WriteBatches.h>
#include <Storages/Page/PageStorage.h>
#include <Storages/Page/V2/VersionSet/PageEntriesVersionSetWithDelta.h>
Expand Down Expand Up @@ -84,6 +81,7 @@ extern const int LOGICAL_ERROR;

namespace FailPoints
{
extern const char skip_check_segment_update[];
extern const char pause_before_dt_background_delta_merge[];
extern const char pause_until_dt_background_delta_merge[];
extern const char pause_when_writing_to_dt_store[];
Expand Down Expand Up @@ -1295,6 +1293,8 @@ void DeltaMergeStore::waitForDeleteRange(const DB::DM::DMContextPtr &, const DB:

void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const SegmentPtr & segment, ThreadType thread_type)
{
fiu_do_on(FailPoints::skip_check_segment_update, { return; });

if (segment->hasAbandoned())
return;
const auto & delta = segment->getDelta();
Expand Down
19 changes: 11 additions & 8 deletions dbms/src/Storages/DeltaMerge/tests/MultiSegmentTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@

namespace DB
{
namespace FailPoints
{
extern const char skip_check_segment_update[];
} // namespace FailPoints

namespace DM
{
namespace tests
Expand All @@ -54,18 +59,16 @@ class MultiSegmentTestUtil : private boost::noncopyable
std::map<size_t, size_t> expected_stable_rows;
std::map<size_t, size_t> expected_delta_rows;

MultiSegmentTestUtil(Context & db_context_)
explicit MultiSegmentTestUtil(Context & db_context_)
: tracing_id(DB::base::TiFlashStorageTestBasic::getCurrentFullTestName())
, db_context(db_context_)
{}
{
FailPointHelper::enableFailPoint(FailPoints::skip_check_segment_update);
}

/// Update context settings to keep multiple segments stable.
void setSettings(size_t rows_per_segment)
~MultiSegmentTestUtil()
{
// Avoid bg merge.
// TODO (wenxuan): Seems to be not very stable.
db_context.setSetting("dt_bg_gc_max_segments_to_check_every_round", UInt64(0));
db_context.setSetting("dt_segment_limit_rows", UInt64(rows_per_segment));
FailPointHelper::disableFailPoint(FailPoints::skip_check_segment_update);
}

/// Prepare segments * 4. The rows of each segment will be roughly close to n_avg_rows_per_segment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3376,12 +3376,10 @@ class DeltaMergeStoreMergeDeltaBySegmentTest
setStorageFormat(ps_ver);
TiFlashStorageTestBasic::SetUp();

helper = std::make_unique<MultiSegmentTestUtil>(*db_context);
helper->setSettings(50);

setupDMStore();

// Split into 4 segments.
helper = std::make_unique<MultiSegmentTestUtil>(*db_context);
helper->prepareSegments(store, 50, pk_type);
}
CATCH
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Page/PageUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ extern const int FILE_SIZE_NOT_MATCH;

namespace FailPoints
{
extern const char skip_check_segment_update[];
extern const char force_set_page_file_write_errno[];
extern const char force_split_io_size_4k[];
} // namespace FailPoints
Expand Down

0 comments on commit 9407b18

Please sign in to comment.