Skip to content

Commit

Permalink
Fix metadata corruption release-5.3 (#4495)
Browse files Browse the repository at this point in the history
close #2576, close #3435, close #4437
  • Loading branch information
solotzg authored Apr 22, 2022
1 parent ba1f109 commit f3ec98d
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 83 deletions.
1 change: 0 additions & 1 deletion dbms/src/Flash/DiagnosticsService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1181,5 +1181,4 @@ ::grpc::Status DiagnosticsService::search_log(

return ::grpc::Status::OK;
}

} // namespace DB
24 changes: 18 additions & 6 deletions dbms/src/Storages/Transaction/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,26 @@ void KVStore::checkAndApplySnapshot(const RegionPtrWrap & new_region, TMTContext

{
const auto & new_range = new_region->getRange();
handleRegionsByRangeOverlap(new_range->comparableKeys(), [&](RegionMap region_map, const KVStoreTaskLock &) {
for (const auto & region : region_map)
handleRegionsByRangeOverlap(new_range->comparableKeys(), [&](RegionMap region_map, const KVStoreTaskLock & task_lock) {
for (const auto & overlapped_region : region_map)
{
if (region.first != region_id)
if (overlapped_region.first != region_id)
{
throw Exception(std::string(__PRETTY_FUNCTION__) + ": range of region " + std::to_string(region_id)
+ " is overlapped with region " + std::to_string(region.first) + ", should not happen",
ErrorCodes::LOGICAL_ERROR);
auto state = getProxyHelper()->getRegionLocalState(overlapped_region.first);
if (state.state() != raft_serverpb::PeerState::Tombstone)
{
throw Exception(fmt::format(
"range of region {} is overlapped with {}, state: {}",
region_id,
overlapped_region.first,
state.ShortDebugString()),
ErrorCodes::LOGICAL_ERROR);
}
else
{
LOG_INFO(log, "range of region " << region_id << " is overlapped with `Tombstone` region " << overlapped_region.first);
handleDestroy(overlapped_region.first, tmt, task_lock);
}
}
}
});
Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,11 @@ EngineStoreApplyRes KVStore::handleWriteRaftCmd(const WriteCmdsView & cmds, UInt

void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt)
{
auto task_lock = genTaskLock();
handleDestroy(region_id, tmt, genTaskLock());
}

void KVStore::handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock & task_lock)
{
const auto region = getRegion(region_id);
if (region == nullptr)
{
Expand Down Expand Up @@ -596,6 +600,7 @@ void WaitCheckRegionReady(const TMTContext & tmt, const std::atomic_size_t & ter
if (!need_retry)
{
// if region is able to get latest commit-index from TiKV, we should make it available only after it has caught up.
assert(resp.read_index() != 0);
regions_to_check.emplace(region_id, resp.read_index());
remain_regions.erase(region_id);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class KVStore final : private boost::noncopyable
TMTContext & tmt);

void persistRegion(const Region & region, const RegionTaskLock & region_task_lock, const char * caller);
void handleDestroy(UInt64 region_id, TMTContext & tmt, const KVStoreTaskLock &);

private:
RegionManager region_manager;
Expand Down
92 changes: 73 additions & 19 deletions dbms/src/Storages/Transaction/ProxyFFI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@
#include <Storages/Transaction/ProxyFFI.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/TMTContext.h>
#include <diagnosticspb.pb.h>
#include <kvproto/diagnosticspb.pb.h>

#include <ext/scope_guard.h>

#define CHECK_PARSE_PB_BUFF_IMPL(n, a, b, c) \
do \
{ \
[[maybe_unused]] bool parse_res_##n = (a).ParseFromArray(b, static_cast<int>(c)); \
assert(parse_res_##n); \
} while (false)
#define CHECK_PARSE_PB_BUFF_FWD(n, ...) CHECK_PARSE_PB_BUFF_IMPL(n, __VA_ARGS__)
#define CHECK_PARSE_PB_BUFF(...) CHECK_PARSE_PB_BUFF_FWD(__LINE__, __VA_ARGS__)

namespace CurrentMetrics
{
Expand Down Expand Up @@ -83,9 +94,8 @@ EngineStoreApplyRes HandleAdminRaftCmd(
{
raft_cmdpb::AdminRequest request;
raft_cmdpb::AdminResponse response;
request.ParseFromArray(req_buff.data, static_cast<int>(req_buff.len));
response.ParseFromArray(resp_buff.data, static_cast<int>(resp_buff.len));

CHECK_PARSE_PB_BUFF(request, req_buff.data, req_buff.len);
CHECK_PARSE_PB_BUFF(response, resp_buff.data, resp_buff.len);
auto & kvstore = server->tmt->getKVStore();
return kvstore->handleAdminRaftCmd(
std::move(request),
Expand Down Expand Up @@ -227,6 +237,13 @@ kvrpcpb::ReadIndexResponse TiFlashRaftProxyHelper::readIndex(const kvrpcpb::Read
return std::move(res.at(0).first);
}

void InsertBatchReadIndexResp(RawVoidPtr resp, BaseBuffView view, uint64_t region_id)
{
kvrpcpb::ReadIndexResponse res;
CHECK_PARSE_PB_BUFF(res, view.data, view.len);
reinterpret_cast<BatchReadIndexRes *>(resp)->emplace_back(std::move(res), region_id);
}

BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex(const std::vector<kvrpcpb::ReadIndexRequest> & req, uint64_t timeout_ms) const
{
std::vector<std::string> req_strs;
Expand All @@ -239,7 +256,7 @@ BatchReadIndexRes TiFlashRaftProxyHelper::batchReadIndex(const std::vector<kvrpc
auto outer_view = data.intoOuterView();
BatchReadIndexRes res;
res.reserve(req.size());
fn_handle_batch_read_index(proxy_ptr, outer_view, &res, timeout_ms);
fn_handle_batch_read_index(proxy_ptr, outer_view, &res, timeout_ms, InsertBatchReadIndexResp);
return res;
}

Expand Down Expand Up @@ -280,10 +297,19 @@ RawCppPtr PreHandleSnapshot(
try
{
metapb::Region region;
region.ParseFromArray(region_buff.data, static_cast<int>(region_buff.len));
CHECK_PARSE_PB_BUFF(region, region_buff.data, region_buff.len);
auto & tmt = *server->tmt;
auto & kvstore = tmt.getKVStore();
auto new_region = kvstore->genRegionPtr(std::move(region), peer_id, index, term);

#ifndef NDEBUG
{
auto & kvstore = server->tmt->getKVStore();
auto state = kvstore->getProxyHelper()->getRegionLocalState(new_region->id());
assert(state.state() == raft_serverpb::PeerState::Applying);
}
#endif

switch (kvstore->applyMethod())
{
case TiDB::SnapshotApplyMethod::Block:
Expand Down Expand Up @@ -394,24 +420,11 @@ const char * IntoEncryptionMethodName(EncryptionMethod method)
return encryption_method_name[static_cast<uint8_t>(method)];
}

void InsertBatchReadIndexResp(RawVoidPtr resp, BaseBuffView view, uint64_t region_id)
{
kvrpcpb::ReadIndexResponse res;
res.ParseFromArray(view.data, view.len);
reinterpret_cast<BatchReadIndexRes *>(resp)->emplace_back(std::move(res), region_id);
}

RawCppPtr GenRawCppPtr(RawVoidPtr ptr_, RawCppPtrTypeImpl type_)
{
return RawCppPtr{ptr_, static_cast<RawCppPtrType>(type_)};
}

void SetServerInfoResp(BaseBuffView view, RawVoidPtr ptr)
{
using diagnosticspb::ServerInfoResponse;
reinterpret_cast<ServerInfoResponse *>(ptr)->ParseFromArray(view.data, view.len);
}

CppStrWithView GetConfig(EngineStoreServerWrap * server, [[maybe_unused]] uint8_t full)
{
std::string config_file_path;
Expand All @@ -438,4 +451,45 @@ CppStrWithView GetConfig(EngineStoreServerWrap * server, [[maybe_unused]] uint8_
}
}

void SetPBMsByBytes(MsgPBType type, RawVoidPtr ptr, BaseBuffView view)
{
switch (type)
{
case MsgPBType::ReadIndexResponse:
CHECK_PARSE_PB_BUFF(*reinterpret_cast<kvrpcpb::ReadIndexResponse *>(ptr), view.data, view.len);
break;
case MsgPBType::RegionLocalState:
CHECK_PARSE_PB_BUFF(*reinterpret_cast<raft_serverpb::RegionLocalState *>(ptr), view.data, view.len);
break;
case MsgPBType::ServerInfoResponse:
CHECK_PARSE_PB_BUFF(*reinterpret_cast<diagnosticspb::ServerInfoResponse *>(ptr), view.data, view.len);
break;
}
}

raft_serverpb::RegionLocalState TiFlashRaftProxyHelper::getRegionLocalState(uint64_t region_id) const
{
assert(this->fn_get_region_local_state);

raft_serverpb::RegionLocalState state;
RawCppStringPtr error_msg_ptr{};
SCOPE_EXIT({
delete error_msg_ptr;
});
auto res = this->fn_get_region_local_state(this->proxy_ptr, region_id, &state, &error_msg_ptr);
switch (res)
{
case KVGetStatus::Ok:
break;
case KVGetStatus::Error:
{
throw Exception(fmt::format("{} meet internal error: {}", __FUNCTION__, *error_msg_ptr), ErrorCodes::LOGICAL_ERROR);
}
case KVGetStatus::NotFound:
// make not found as `Tombstone`
state.set_state(raft_serverpb::PeerState::Tombstone);
break;
}
return state;
}
} // namespace DB
11 changes: 7 additions & 4 deletions dbms/src/Storages/Transaction/ProxyFFI.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ namespace kvrpcpb
class ReadIndexResponse;
class ReadIndexRequest;
} // namespace kvrpcpb
namespace raft_serverpb
{
class RegionLocalState;
}

namespace DB
{
Expand Down Expand Up @@ -53,6 +57,7 @@ struct TiFlashRaftProxyHelper : RaftStoreProxyFFIHelper
FileEncryptionInfo linkFile(const std::string &, const std::string &) const;
kvrpcpb::ReadIndexResponse readIndex(const kvrpcpb::ReadIndexRequest &) const;
BatchReadIndexRes batchReadIndex(const std::vector<kvrpcpb::ReadIndexRequest> &, uint64_t) const;
raft_serverpb::RegionLocalState getRegionLocalState(uint64_t region_id) const;
};

extern "C" {
Expand Down Expand Up @@ -81,10 +86,9 @@ void ApplyPreHandledSnapshot(EngineStoreServerWrap * server, void * res, RawCppP
HttpRequestRes HandleHttpRequest(EngineStoreServerWrap *, BaseBuffView path, BaseBuffView query, BaseBuffView body);
uint8_t CheckHttpUriAvailable(BaseBuffView);
void GcRawCppPtr(void * ptr, RawCppPtrType type);
void InsertBatchReadIndexResp(RawVoidPtr, BaseBuffView, uint64_t);
void SetServerInfoResp(BaseBuffView, RawVoidPtr);
BaseBuffView strIntoView(const std::string * str_ptr);
CppStrWithView GetConfig(EngineStoreServerWrap *, uint8_t full);
void SetPBMsByBytes(MsgPBType type, RawVoidPtr ptr, BaseBuffView view);
}

inline EngineStoreServerHelper GetEngineStoreServerHelper(
Expand All @@ -108,9 +112,8 @@ inline EngineStoreServerHelper GetEngineStoreServerHelper(
.fn_handle_http_request = HandleHttpRequest,
.fn_check_http_uri_available = CheckHttpUriAvailable,
.fn_gc_raw_cpp_ptr = GcRawCppPtr,
.fn_insert_batch_read_index_resp = InsertBatchReadIndexResp,
.fn_set_server_info_resp = SetServerInfoResp,
.fn_get_config = GetConfig,
.fn_set_pb_msg_by_bytes = SetPBMsByBytes,
};
}
} // namespace DB
13 changes: 12 additions & 1 deletion tests/fullstack-test/sample.test
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
mysql> drop table if exists test.t

mysql> create table if not exists test.t(s varchar(256), i int)
mysql> alter table test.t set tiflash replica 1 location labels 'rack', 'host', 'abc';
mysql> delete from test.t
mysql> insert into test.t values('Hello world', 666)

mysql> alter table test.t set tiflash replica 1 location labels 'rack', 'host', 'abc';

func> wait_table test t

mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test.t
Expand All @@ -14,6 +15,16 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test.
| Hello world | 666 |
+-------------+------+

mysql> insert into test.t values('test', -1)

mysql> set session tidb_isolation_read_engines='tiflash'; select s, i from test.t
+-------------+------+
| s | i |
+-------------+------+
| Hello world | 666 |
| test | -1 |
+-------------+------+

mysql> delete from test.t

mysql> drop table if exists test.t
68 changes: 18 additions & 50 deletions tests/run-gtest.sh
Original file line number Diff line number Diff line change
@@ -1,67 +1,35 @@
#!/usr/bin/env bash

set -x

SRC_TESTS_PATH="$(
cd "$(dirname "$0")"
pwd -P
)"

NPROC=${NPROC:-$(nproc || grep -c ^processor /proc/cpuinfo)}
OUTPUT_XML=${OUTPUT_XML:-false}
ENV_VARS_PATH=${ENV_VARS_PATH:-./_env.sh}
SERIALIZE_TEST_CASES=${SERIALIZE_TEST_CASES:-false}
RUN_TESTS_PARALLEL=${RUN_TESTS_PARALLEL:-false}

function run_test() {
function run_test()
{
local name="$1"
local bin_path=$(find . -name "$name")
local args=""
if [[ "$continue_on_error" -eq 1 ]]; then
args="--gtest_catch_exceptions=1"
else
args="--gtest_break_on_failure --gtest_catch_exceptions=0"
fi
if [[ "${OUTPUT_XML}" == "true" ]]; then
args="${args} --gtest_output=xml"
fi
${bin_path} ${args}
if [[ "$continue_on_error" -eq 1 ]]; then
${bin_path} --gtest_catch_exceptions=1
else
${bin_path} --gtest_break_on_failure --gtest_catch_exceptions=0
fi
}

source ${ENV_VARS_PATH}

continue_on_error="${1:-1}" # default 1
set -ex

function run_test_parallel() {
test_bins="$1"
local args=""
if [[ ${SERIALIZE_TEST_CASES} == "true" ]]; then args="${args} --serialize_test_cases"; fi
if [[ "${continue_on_error}" == "1" ]]; then
args="${args} --gtest_catch_exceptions=1"
else
args="--gtest_break_on_failure --gtest_catch_exceptions=0"
fi

python ${SRC_TESTS_PATH}/gtest_parallel.py ${test_bins} --workers=${NPROC} ${args}
}

set -e
cd "$build_dir"

cd "${build_dir}"
tests=(
"gtests_dbms"
"gtests_libcommon"
#"gtests_tmt" # included in gtests_dbms
)

# Set env variable to run some special test cases.
export ALSO_RUN_WITH_TEST_DATA=1

if [[ ${RUN_TESTS_PARALLEL} != "true" ]]; then
tests=(
"gtests_dbms"
"gtests_libcommon"
"gtests_libdaemon"
#"gtests_tmt" # included in gtests_dbms
)
for test in ${tests[@]}; do
run_test "$test"
done
else
run_test_parallel "${build_dir}/gtests_dbms ${build_dir}/gtests_libcommon ${build_dir}/gtests_libdaemon"
fi
for test in ${tests[@]}; do
run_test "$test"
done

0 comments on commit f3ec98d

Please sign in to comment.