Skip to content

Commit

Permalink
Assign machine readable errors for "invalid key" and "physical table …
Browse files Browse the repository at this point in the history
…not exist"
  • Loading branch information
breezewish committed May 17, 2022
1 parent 8263539 commit b3cfc6c
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 36 deletions.
4 changes: 3 additions & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ FlashService::FlashService(IServer & server_)
: server(server_)
, security_config(server_.securityConfig())
, log(&Poco::Logger::get("FlashService"))
, manual_compact_manager(std::make_unique<Management::ManualCompactManager>(server_.context().getGlobalContext()))
, manual_compact_manager(std::make_unique<Management::ManualCompactManager>(
server_.context().getGlobalContext(),
server_.context().getGlobalContext().getSettingsRef()))
{
auto settings = server_.context().getSettingsRef();
enable_local_tunnel = settings.enable_local_tunnel;
Expand Down
34 changes: 23 additions & 11 deletions dbms/src/Flash/Management/ManualCompact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ extern const char pause_before_server_merge_one_delta[];
namespace Management
{

ManualCompactManager::ManualCompactManager(const Context & db_context_)
: db_context(db_context_)
ManualCompactManager::ManualCompactManager(const Context & global_context_, const Settings & settings_)
: global_context(global_context_.getGlobalContext())
, settings(settings_)
, log(&Poco::Logger::get("ManualCompactManager"))
{
const auto & settings = db_context_.getSettingsRef();
worker_pool = std::make_unique<ThreadPool>(static_cast<size_t>(settings.manual_compact_pool_size), [] { setThreadName("m-compact-pool"); });
}

Expand Down Expand Up @@ -104,18 +104,21 @@ grpc::Status ManualCompactManager::doWorkWithCatch(const ::kvrpcpb::CompactReque

grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * request, ::kvrpcpb::CompactResponse * response)
{
const auto & tmt_context = db_context.getTMTContext();
const auto & tmt_context = global_context.getTMTContext();
auto storage = tmt_context.getStorages().get(request->physical_table_id());

if (storage == nullptr)
{
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, fmt::format("Physical table (id={}) not found", request->physical_table_id()));
response->mutable_error()->mutable_err_physical_table_not_exist();
response->set_has_remaining(false);
return grpc::Status::OK;
}

auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
if (dm_storage == nullptr)
{
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, fmt::format("Physical table (id={}) is not a DeltaTree table", request->physical_table_id()));
response->mutable_error()->mutable_err_physical_table_not_exist();
response->set_has_remaining(false);
return grpc::Status::OK;
}

DM::RowKeyValue start_key;
Expand All @@ -141,10 +144,19 @@ grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * requ
// This will cause OOM.
// Also it is not a good idea to use Try-Catch for this scenario.
start_key = DM::RowKeyValue::deserialize(buf);

if (start_key.is_common_handle != dm_storage->isCommonHandle())
{
response->mutable_error()->mutable_err_invalid_start_key();
response->set_has_remaining(false);
return grpc::Status::OK;
}
}
catch (...)
{
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Invalid start key");
response->mutable_error()->mutable_err_invalid_start_key();
response->set_has_remaining(false);
return grpc::Status::OK;
}
}

Expand All @@ -159,7 +171,7 @@ grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * requ
while (true)
{
FAIL_POINT_PAUSE(FailPoints::pause_before_server_merge_one_delta);
auto compacted_range = dm_storage->mergeDeltaBySegment(db_context, start_key);
auto compacted_range = dm_storage->mergeDeltaBySegment(global_context, start_key);

if (compacted_range == std::nullopt)
{
Expand Down Expand Up @@ -212,13 +224,13 @@ grpc::Status ManualCompactManager::doWork(const ::kvrpcpb::CompactRequest * requ

uint64_t ManualCompactManager::getSettingCompactMoreUntilMs() const
{
return db_context.getSettingsRef().manual_compact_more_until_ms.get();
return settings.manual_compact_more_until_ms.get();
}

uint64_t ManualCompactManager::getSettingMaxConcurrency() const
{
auto current_thread_size = worker_pool->size();
auto val = db_context.getSettingsRef().manual_compact_max_concurrency.get();
auto val = settings.manual_compact_max_concurrency.get();
return std::max(val, current_thread_size);
}

Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Flash/Management/ManualCompact.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#pragma once

#include <Interpreters/Context.h>
#include <Interpreters/Settings.h>
#include <common/ThreadPool.h>
#include <common/logger_useful.h>
#pragma GCC diagnostic push
Expand Down Expand Up @@ -45,7 +46,7 @@ namespace Management
class ManualCompactManager : private boost::noncopyable
{
public:
explicit ManualCompactManager(const Context & db_context_);
explicit ManualCompactManager(const Context & global_context_, const Settings & settings_);

~ManualCompactManager() = default;

Expand Down Expand Up @@ -86,7 +87,8 @@ class ManualCompactManager : private boost::noncopyable
#ifndef DBMS_PUBLIC_GTEST
private:
#endif
const Context & db_context;
const Context & global_context;
const Settings & settings;
Poco::Logger * log;

/// Placed last to be destroyed first.
Expand Down
52 changes: 48 additions & 4 deletions dbms/src/Flash/Management/tests/gtest_manual_compact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class BasicManualCompactTest
{
TiFlashStorageTestBasic::SetUp();

manager = std::make_unique<DB::Management::ManualCompactManager>(*db_context);
manager = std::make_unique<DB::Management::ManualCompactManager>(*db_context, db_context->getSettingsRef());

setupStorage();

Expand Down Expand Up @@ -140,7 +140,9 @@ try
auto request = ::kvrpcpb::CompactRequest();
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::INVALID_ARGUMENT);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_TRUE(response.has_error());
ASSERT_TRUE(response.error().has_err_physical_table_not_exist());
}
CATCH

Expand All @@ -152,7 +154,9 @@ try
request.set_physical_table_id(9999);
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::INVALID_ARGUMENT);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_TRUE(response.has_error());
ASSERT_TRUE(response.error().has_err_physical_table_not_exist());
}
CATCH

Expand All @@ -165,7 +169,41 @@ try
request.set_start_key("abcd");
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::INVALID_ARGUMENT);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_TRUE(response.has_error());
ASSERT_TRUE(response.error().has_err_invalid_start_key());
}
CATCH


TEST_P(BasicManualCompactTest, MalformedStartKey)
try
{
// Specify an int key for common handle table, and vise versa.
DM::RowKeyValue malformed_start_key;
switch (pk_type)
{
case DM::tests::DMTestEnv::PkType::HiddenTiDBRowID:
case DM::tests::DMTestEnv::PkType::PkIsHandleInt64:
malformed_start_key = DM::RowKeyValue::COMMON_HANDLE_MIN_KEY;
break;
case DM::tests::DMTestEnv::PkType::CommonHandle:
malformed_start_key = DM::RowKeyValue::INT_HANDLE_MIN_KEY;
break;
default:
throw Exception("Unknown pk type for test");
}

auto request = ::kvrpcpb::CompactRequest();
request.set_physical_table_id(TABLE_ID);
WriteBufferFromOwnString wb;
malformed_start_key.serialize(wb);
request.set_start_key(wb.releaseStr());
auto response = ::kvrpcpb::CompactResponse();
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_TRUE(response.has_error());
ASSERT_TRUE(response.error().has_err_invalid_start_key());
}
CATCH

Expand All @@ -180,6 +218,7 @@ try
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());
ASSERT_TRUE(response.has_remaining());

helper->expected_stable_rows[0] += helper->expected_delta_rows[0];
helper->expected_delta_rows[0] = 0;
Expand All @@ -199,6 +238,7 @@ try
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());
ASSERT_TRUE(response.has_remaining());

helper->expected_stable_rows[0] += helper->expected_delta_rows[0];
helper->expected_delta_rows[0] = 0;
Expand Down Expand Up @@ -227,6 +267,7 @@ try
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());
ASSERT_TRUE(response.has_remaining());

helper->expected_stable_rows[1] += helper->expected_delta_rows[1];
helper->expected_delta_rows[1] = 0;
Expand All @@ -247,6 +288,7 @@ try
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());
ASSERT_TRUE(response.has_remaining());

helper->expected_stable_rows[0] += helper->expected_delta_rows[0];
helper->expected_delta_rows[0] = 0;
Expand All @@ -261,6 +303,7 @@ try
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());
ASSERT_TRUE(response.has_remaining());

helper->expected_stable_rows[1] += helper->expected_delta_rows[1];
helper->expected_delta_rows[1] = 0;
Expand All @@ -281,6 +324,7 @@ try
auto status_code = manager->handleRequest(&request, &response);
ASSERT_EQ(status_code.error_code(), grpc::StatusCode::OK);
ASSERT_FALSE(response.has_error());
ASSERT_FALSE(response.has_remaining());

// All segments should be compacted.
for (size_t i = 0; i < 4; ++i)
Expand Down
5 changes: 0 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1028,12 +1028,7 @@ void DeltaMergeStore::mergeDeltaAll(const Context & context)
std::optional<DM::RowKeyRange> DeltaMergeStore::mergeDeltaBySegment(const Context & context, const RowKeyValue & start_key)
{
auto dm_context = newDMContext(context, context.getSettingsRef(), /*tracing_id*/ "mergeDeltaBySegment");
if (start_key.is_common_handle != dm_context->is_common_handle)
{
return std::nullopt;
}

// TODO: Is it a good idea not to retry?
while (true)
{
SegmentPtr segment;
Expand Down
31 changes: 20 additions & 11 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3449,23 +3449,15 @@ try
}
if (store->isCommonHandle())
{
// For common handle, give int handle key and have a try
auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MIN_KEY);
// Specifies MAX_KEY. nullopt should be returned.
auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MAX_KEY);
ASSERT_EQ(result, std::nullopt);
}
else
{
// For int handle, give common handle key and have a try
auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MIN_KEY);
ASSERT_EQ(result, std::nullopt);
}
{
// Try with max key, should also fail
// Specifies MAX_KEY. nullopt should be returned.
auto result = store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MAX_KEY);
ASSERT_EQ(result, std::nullopt);

result = store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MAX_KEY);
ASSERT_EQ(result, std::nullopt);
}
std::optional<RowKeyRange> result_1;
{
Expand Down Expand Up @@ -3507,6 +3499,23 @@ try
}
CATCH

TEST_P(DeltaMergeStoreMergeDeltaBySegmentTest, InvalidKey)
{
// Expect exceptions when invalid key is given.
EXPECT_ANY_THROW({
if (store->isCommonHandle())
{
// For common handle, give int handle key and have a try
store->mergeDeltaBySegment(*db_context, RowKeyValue::INT_HANDLE_MIN_KEY);
}
else
{
// For int handle, give common handle key and have a try
store->mergeDeltaBySegment(*db_context, RowKeyValue::COMMON_HANDLE_MIN_KEY);
}
});
}


// Give the last segment key.
TEST_P(DeltaMergeStoreMergeDeltaBySegmentTest, LastSegment)
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class IManageableStorage : public IStorage
/// we need to update database name with `new_database_name`, and table name in tidb table info with `new_display_table_name`.
///
/// Called when the table structure is locked for write.
/// TODO: For TiFlash, we can rename without any lock on data?
virtual void rename(
const String & new_path_to_db,
const String & new_database_name,
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/Page/PageUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ extern const int FILE_SIZE_NOT_MATCH;

namespace FailPoints
{
extern const char skip_check_segment_update[];
extern const char force_set_page_file_write_errno[];
extern const char force_split_io_size_4k[];
} // namespace FailPoints
Expand Down

0 comments on commit b3cfc6c

Please sign in to comment.