Skip to content

Commit

Permalink
Fix check fail when cancel parallel prehandling (#8209)
Browse files Browse the repository at this point in the history
close #8201
  • Loading branch information
CalvinNeo authored Oct 18, 2023
1 parent 81059d6 commit d7badaa
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 14 deletions.
22 changes: 16 additions & 6 deletions dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
const TiFlashRaftProxyHelper * proxy_helper_,
TMTContext & tmt_,
std::optional<SSTScanSoftLimit> && soft_limit_,
std::shared_ptr<PreHandlingTrace::Item> prehandle_task_,
SSTFilesToBlockInputStreamOpts && opts_)
: region(std::move(region_))
, snapshot_index(snapshot_index_)
, snaps(snaps_)
, proxy_helper(proxy_helper_)
, tmt(tmt_)
, soft_limit(std::move(soft_limit_))
, prehandle_task(prehandle_task_)
, opts(std::move(opts_))
{
log = Logger::get(opts.log_prefix);
Expand Down Expand Up @@ -139,23 +141,30 @@ SSTFilesToBlockInputStream::~SSTFilesToBlockInputStream() = default;

void SSTFilesToBlockInputStream::readPrefix() {}

void SSTFilesToBlockInputStream::checkFinishedState(SSTReaderPtr & reader)
void SSTFilesToBlockInputStream::checkFinishedState(SSTReaderPtr & reader, ColumnFamilyType cf)
{
// There must be no data left when we write suffix
if (!reader)
return;
if (!reader->remained())
return;
RUNTIME_CHECK_MSG(soft_limit.has_value(), "soft_limit.has_value()");
if (prehandle_task->abort_flag.load())
return;

// now the stream must be stopped by `soft_limit`, let's check the keys in reader
RUNTIME_CHECK_MSG(soft_limit.has_value(), "soft_limit.has_value(), cf={}", magic_enum::enum_name(cf));
BaseBuffView cur = reader->keyView();
RUNTIME_CHECK_MSG(buffToStrView(cur) > soft_limit.value().raw_end, "cur > raw_end");
RUNTIME_CHECK_MSG(
buffToStrView(cur) > soft_limit.value().raw_end,
"cur > raw_end, cf={}",
magic_enum::enum_name(cf));
}

void SSTFilesToBlockInputStream::readSuffix()
{
checkFinishedState(write_cf_reader);
checkFinishedState(default_cf_reader);
checkFinishedState(lock_cf_reader);
checkFinishedState(write_cf_reader, ColumnFamilyType::Write);
checkFinishedState(default_cf_reader, ColumnFamilyType::Default);
checkFinishedState(lock_cf_reader, ColumnFamilyType::Lock);

// reset all SSTReaders and return without writting blocks any more.
write_cf_reader.reset();
Expand Down Expand Up @@ -209,6 +218,7 @@ Block SSTFilesToBlockInputStream::read()
loadCFDataFromSST(ColumnFamilyType::Lock, &rowkey);

auto block = readCommitedBlock();

if (block.rows() != 0)
return block;
// else continue to decode key-value from write CF.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <RaftStoreProxyFFI/ColumnFamily.h>
#include <Storages/DeltaMerge/DMVersionFilterBlockInputStream.h>
#include <Storages/KVStore/Decode/PartitionStreams.h>
#include <Storages/KVStore/MultiRaft/PreHandlingTrace.h>

#include <memory>
#include <string_view>
Expand Down Expand Up @@ -116,6 +117,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
const TiFlashRaftProxyHelper * proxy_helper_,
TMTContext & tmt_,
std::optional<SSTScanSoftLimit> && soft_limit_,
std::shared_ptr<PreHandlingTrace::Item> prehandle_task_,
SSTFilesToBlockInputStreamOpts && opts_);
~SSTFilesToBlockInputStream() override;

Expand Down Expand Up @@ -163,7 +165,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
// Emits data into block if the transaction to this key is committed.
Block readCommitedBlock();
bool maybeStopBySoftLimit(ColumnFamilyType cf, SSTReaderPtr & reader);
void checkFinishedState(SSTReaderPtr & reader);
void checkFinishedState(SSTReaderPtr & reader, ColumnFamilyType cf);

private:
RegionPtr region;
Expand All @@ -172,6 +174,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
const TiFlashRaftProxyHelper * proxy_helper{nullptr};
TMTContext & tmt;
std::optional<SSTScanSoftLimit> soft_limit;
std::shared_ptr<PreHandlingTrace::Item> prehandle_task;
const SSTFilesToBlockInputStreamOpts opts;
LoggerPtr log;

Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,6 @@ Block GenRegionBlockDataWithSchema(
// No committed data, just return
if (!data_list_read)
return res_block;

{
Stopwatch watch;
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ static void runInParallel(
proxy_helper,
tmt,
std::move(part_limit),
prehandle_task,
DM::SSTFilesToBlockInputStreamOpts(opt));
try
{
Expand Down Expand Up @@ -608,6 +609,7 @@ PrehandleResult KVStore::preHandleSSTsToDTFiles(
proxy_helper,
tmt,
std::nullopt,
prehandle_task,
DM::SSTFilesToBlockInputStreamOpts(opt));

// `split_keys` do not begin with 'z'.
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ RegionDataRes RegionCFDataBase<Trait>::insert(TiKVKey && key, TiKVValue && value

if (!kv_pair)
return 0;

return insert(std::move(*kv_pair), mode);
}

Expand Down
1 change: 0 additions & 1 deletion dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ inline DecodedWriteCFValue decodeWriteCfValue(const TiKVValue & value)
auto write_type = RecordKVFormat::readUInt8(data, len); //write type

bool can_ignore = write_type != CFModifyFlag::DelFlag && write_type != CFModifyFlag::PutFlag;

if (can_ignore)
return std::nullopt;

Expand Down
77 changes: 72 additions & 5 deletions dbms/src/Storages/KVStore/tests/gtest_raftstore_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,73 @@ extern const char force_set_sst_to_dtfile_block_size[];
namespace tests
{

// Test if active cancel from proxy.
TEST_F(RegionKVStoreTest, KVStoreSingleSnapCancel)
try
{
auto ctx = TiFlashTestEnv::getGlobalContext();
proxy_instance->cluster_ver = RaftstoreVer::V2;
ASSERT_NE(proxy_helper->sst_reader_interfaces.fn_key, nullptr);
ASSERT_NE(proxy_helper->fn_get_config_json, nullptr);
UInt64 region_id = 1;
TableID table_id;
FailPointHelper::enableFailPoint(FailPoints::force_set_parallel_prehandle_threshold, static_cast<size_t>(0));
FailPointHelper::enableFailPoint(FailPoints::force_set_sst_to_dtfile_block_size, static_cast<size_t>(1));
SCOPE_EXIT({ FailPointHelper::disableFailPoint("force_set_sst_to_dtfile_block_size"); });
SCOPE_EXIT({ FailPointHelper::disableFailPoint("force_set_parallel_prehandle_threshold"); });
{
region_id = 2;
initStorages();
KVStore & kvs = getKVS();
HandleID table_limit = 40;
HandleID sst_limit = 40;
table_id = proxy_instance->bootstrapTable(ctx, kvs, ctx.getTMTContext());
auto start = RecordKVFormat::genKey(table_id, 0);
auto end = RecordKVFormat::genKey(table_id, table_limit);
proxy_instance->bootstrapWithRegion(
kvs,
ctx.getTMTContext(),
region_id,
std::make_pair(start.toString(), end.toString()));
auto r1 = proxy_instance->getRegion(region_id);

auto [value_write, value_default] = proxy_instance->generateTiKVKeyValue(111, 999);
auto kkk = RecordKVFormat::decodeWriteCfValue(TiKVValue::copyFrom(value_write));
{
MockSSTReader::getMockSSTData().clear();
MockRaftStoreProxy::Cf default_cf{region_id, table_id, ColumnFamilyType::Default};
for (HandleID h = 1; h < sst_limit; h++)
{
auto k = RecordKVFormat::genKey(table_id, h, 111);
default_cf.insert_raw(k, value_default);
}
default_cf.finish_file(SSTFormatKind::KIND_TABLET);
default_cf.freeze();
MockRaftStoreProxy::Cf write_cf{region_id, table_id, ColumnFamilyType::Write};
for (HandleID h = 1; h < sst_limit; h++)
{
auto k = RecordKVFormat::genKey(table_id, h, 111);
write_cf.insert_raw(k, value_write);
}
write_cf.finish_file(SSTFormatKind::KIND_TABLET);
write_cf.freeze();

auto sp = SyncPointCtl::enableInScope("before_SSTFilesToDTFilesOutputStream::handle_one");
std::thread t([&]() {
auto [kvr1, res]
= proxy_instance
->snapshot(kvs, ctx.getTMTContext(), region_id, {default_cf, write_cf}, 0, 0, std::nullopt);
});
sp.waitAndPause();
kvs.abortPreHandleSnapshot(region_id, ctx.getTMTContext());
sp.next();
sp.disable();
t.join();
}
}
}
CATCH

// Test several uncommitted keys with only one version.
TEST_F(RegionKVStoreTest, KVStoreSingleSnap1)
try
Expand Down Expand Up @@ -74,7 +141,7 @@ try
auto k = RecordKVFormat::genKey(table_id, h, 111);
if (h == uncommitted)
continue;
write_cf.insert_raw(k, value_default);
write_cf.insert_raw(k, value_write);
}
write_cf.finish_file(SSTFormatKind::KIND_TABLET);
write_cf.freeze();
Expand Down Expand Up @@ -154,7 +221,7 @@ try
{
auto [value_write, value_default] = proxy_instance->generateTiKVKeyValue(tso, 999);
auto k = RecordKVFormat::genKey(table_id, 10, tso);
write_cf.insert_raw(k, value_default);
write_cf.insert_raw(k, value_write);
}
write_cf.finish_file(SSTFormatKind::KIND_TABLET);
write_cf.freeze();
Expand Down Expand Up @@ -216,7 +283,7 @@ try
for (HandleID h = 1; h < sst_limit; h++)
{
auto k = RecordKVFormat::genKey(table_id, h, 111);
write_cf.insert_raw(k, value_default);
write_cf.insert_raw(k, value_write);
}
write_cf.finish_file(SSTFormatKind::KIND_TABLET);
write_cf.freeze();
Expand Down Expand Up @@ -275,7 +342,7 @@ try
for (HandleID h = 1; h < sst_limit; h++)
{
auto k = RecordKVFormat::genKey(table_id, h, 111);
write_cf.insert_raw(k, value_default);
write_cf.insert_raw(k, value_write);
}
write_cf.finish_file(SSTFormatKind::KIND_TABLET);
write_cf.freeze();
Expand Down Expand Up @@ -348,7 +415,7 @@ try
for (HandleID h = table_limit_start + 10; h < table_limit_end - 10; h++)
{
auto k = RecordKVFormat::genKey(table_id, h, 111);
write_cf.insert_raw(k, value_default);
write_cf.insert_raw(k, value_write);
}
write_cf.finish_file(SSTFormatKind::KIND_TABLET);
write_cf.freeze();
Expand Down

0 comments on commit d7badaa

Please sign in to comment.