Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle snapshot in parallel if there is only one big region in raftstore v2 #8108

Merged
merged 45 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
f8b9560
refactor
CalvinNeo Sep 15, 2023
5bbc921
init
CalvinNeo Sep 18, 2023
be8c471
Merge remote-tracking branch 'upstream/master' into try-parallel-sing…
CalvinNeo Sep 18, 2023
50b8f16
z
CalvinNeo Sep 19, 2023
35dd6c2
Merge remote-tracking branch 'upstream/master' into try-parallel-sing…
CalvinNeo Sep 19, 2023
2c3d146
run a test
CalvinNeo Sep 20, 2023
6c13d38
make it run
CalvinNeo Sep 21, 2023
ad4d010
fix
CalvinNeo Sep 21, 2023
2ad5c74
fix tests
CalvinNeo Oct 7, 2023
b844320
it can run
CalvinNeo Oct 7, 2023
7e3a26c
z
CalvinNeo Oct 7, 2023
52c9052
fix runtime check
CalvinNeo Oct 8, 2023
f68147a
add tests
CalvinNeo Oct 8, 2023
633518b
several fixes
CalvinNeo Oct 8, 2023
a0c3189
reorg
CalvinNeo Oct 8, 2023
a09a024
upd proxy
CalvinNeo Oct 8, 2023
ba952b6
Update dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
CalvinNeo Oct 8, 2023
84940b8
Update dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
CalvinNeo Oct 8, 2023
46a0d09
Update dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
CalvinNeo Oct 8, 2023
47b3568
addr cmt
CalvinNeo Oct 8, 2023
62b61d2
f
CalvinNeo Oct 8, 2023
7ae903f
fix
CalvinNeo Oct 8, 2023
d1ffc49
fix
CalvinNeo Oct 8, 2023
e03fd69
clippy
CalvinNeo Oct 8, 2023
2eaf4ac
z
CalvinNeo Oct 8, 2023
b515e42
fix test
CalvinNeo Oct 8, 2023
83e31fa
z
CalvinNeo Oct 9, 2023
4e131ae
Merge branch 'master' into try-parallel-single-snapshot-2
CalvinNeo Oct 9, 2023
195bf05
shortcurt
CalvinNeo Oct 9, 2023
9ec7819
f
CalvinNeo Oct 9, 2023
f326015
move
CalvinNeo Oct 9, 2023
866002a
Update dbms/src/Storages/KVStore/MultiRaft/PrehandleSnapshot.cpp
CalvinNeo Oct 10, 2023
625dc10
addr cmt
CalvinNeo Oct 10, 2023
3d3f4c9
Merge branch 'try-parallel-single-snapshot-2' of github.com:CalvinNeo…
CalvinNeo Oct 10, 2023
78810ce
metrics
CalvinNeo Oct 10, 2023
dbf9c38
addr
CalvinNeo Oct 10, 2023
89dffc8
reorg Region.cpp
CalvinNeo Oct 10, 2023
88b27dd
reorg read index
CalvinNeo Oct 10, 2023
dd6cc6e
final check and remove todo
CalvinNeo Oct 10, 2023
5c2b19e
Merge branch 'master' into try-parallel-single-snapshot-2
CalvinNeo Oct 10, 2023
6502da5
update proxy
CalvinNeo Oct 10, 2023
6ccd95a
Merge branch 'try-parallel-single-snapshot-2' of github.com:CalvinNeo…
CalvinNeo Oct 10, 2023
2a65b93
fmt
CalvinNeo Oct 10, 2023
070888f
reorg kvstore
CalvinNeo Oct 10, 2023
5f86e22
f
CalvinNeo Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ namespace DB
M(exception_after_large_write_exceed) \
M(proactive_flush_force_set_type) \
M(exception_when_fetch_disagg_pages) \
M(force_set_parallel_prehandle_threshold) \
M(force_raise_prehandle_exception) \
M(force_agg_on_partial_block)

#define APPLY_FOR_PAUSEABLE_FAILPOINTS_ONCE(M) \
Expand Down
26 changes: 21 additions & 5 deletions dbms/src/Debug/MockRaftStoreProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ void fn_gc_rust_ptr(RawVoidPtr ptr, RawRustPtrType type_)
case RawObjType::MockAsyncWaker:
delete reinterpret_cast<MockAsyncWaker *>(ptr);
break;
case RawObjType::MockString:
delete reinterpret_cast<std::string *>(ptr);
break;
case RawObjType::MockVecOfString:
auto * inner = reinterpret_cast<RustStrWithViewVecInner *>(ptr);
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
delete inner;
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}

Expand Down Expand Up @@ -167,12 +174,20 @@ void fn_notify_compact_log(
}
}

RaftstoreVer fn_get_cluster_raftstore_version(RaftStoreProxyPtr ptr, uint8_t, int64_t)
static RaftstoreVer fn_get_cluster_raftstore_version(RaftStoreProxyPtr ptr, uint8_t, int64_t)
{
auto & x = as_ref(ptr);
return x.cluster_ver;
}

static RustStrWithView fn_get_config_json(RaftStoreProxyPtr, uint64_t)
{
std::string * s = new std::string("{\"raftstore\":{\"snap-handle-pool-size\":4}}");
return RustStrWithView{
.buff = cppStringAsBuff(*s),
.inner = RawRustPtr{.ptr = s, .type = static_cast<RawRustPtrType>(RawObjType::MockString)}};
}

TiFlashRaftProxyHelper MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreProxyPtr proxy_ptr)
{
TiFlashRaftProxyHelper res{};
Expand All @@ -184,6 +199,7 @@ TiFlashRaftProxyHelper MockRaftStoreProxy::SetRaftStoreProxyFFIHelper(RaftStoreP
res.fn_get_region_local_state = fn_get_region_local_state;
res.fn_notify_compact_log = fn_notify_compact_log;
res.fn_get_cluster_raftstore_version = fn_get_cluster_raftstore_version;
res.fn_get_config_json = fn_get_config_json;
{
// make sure such function pointer will be set at most once.
static std::once_flag flag;
Expand Down Expand Up @@ -941,7 +957,7 @@ void MockRaftStoreProxy::Cf::insert_raw(std::string key, std::string val)
kvs.emplace_back(std::move(key), std::move(val));
}

RegionPtr MockRaftStoreProxy::snapshot(
std::tuple<RegionPtr, PrehandleResult> MockRaftStoreProxy::snapshot(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id,
Expand Down Expand Up @@ -978,19 +994,19 @@ RegionPtr MockRaftStoreProxy::snapshot(
SSTViewVec snaps{ssts.data(), ssts.size()};
auto prehandle_result = kvs.preHandleSnapshotToFiles(new_kv_region, snaps, index, term, deadline_index, tmt);

auto rg = RegionPtrWithSnapshotFiles{new_kv_region, std::move(prehandle_result.ingest_ids)};
auto rg = RegionPtrWithSnapshotFiles{new_kv_region, std::vector(prehandle_result.ingest_ids)};
if (cancel_after_prehandle)
{
kvs.releasePreHandledSnapshot(rg, tmt);
return kvs.getRegion(region_id);
return std::make_tuple(kvs.getRegion(region_id), prehandle_result);
}
kvs.checkAndApplyPreHandledSnapshot<RegionPtrWithSnapshotFiles>(rg, tmt);
region->updateAppliedIndex(index);
// PreHandledSnapshotWithFiles will do that, however preHandleSnapshotToFiles will not.
new_kv_region->setApplied(index, term);

// Region changes during applying snapshot, must re-get.
return kvs.getRegion(region_id);
return std::make_tuple(kvs.getRegion(region_id), prehandle_result);
}

TableID MockRaftStoreProxy::bootstrapTable(Context & ctx, KVStore & kvs, TMTContext & tmt, bool drop_at_first)
Expand Down
21 changes: 20 additions & 1 deletion dbms/src/Debug/MockRaftStoreProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ struct MockRaftStoreProxy : MutexLockWrap
bool freezed;
};

RegionPtr snapshot(
std::tuple<RegionPtr, PrehandleResult> snapshot(
KVStore & kvs,
TMTContext & tmt,
UInt64 region_id,
Expand Down Expand Up @@ -312,6 +312,8 @@ enum class RawObjType : uint32_t
None,
MockReadIndexTask,
MockAsyncWaker,
MockString,
MockVecOfString
};

struct GCMonitor
Expand Down Expand Up @@ -344,4 +346,21 @@ std::vector<std::pair<std::string, std::string>> regionRangeToEncodeKeys(Types &
return ranges_str;
}

struct RustStrWithViewVecInner
{
std::vector<std::string> * vec;
BaseBuffView * buffs;
~RustStrWithViewVecInner()
{
delete vec;
delete[] buffs;
}
};

inline BaseBuffView * createBaseBuffViewArray(size_t len)
{
void * raw_memory = operator new[](len * sizeof(BaseBuffView));
BaseBuffView * ptr = static_cast<BaseBuffView *>(raw_memory);
return ptr;
}
} // namespace DB
37 changes: 37 additions & 0 deletions dbms/src/Debug/MockSSTReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#include "MockSSTReader.h"

#include <Common/Exception.h>
#include <Debug/MockRaftStoreProxy.h>

#include <ext/scope_guard.h>

namespace DB
{
Expand Down Expand Up @@ -69,6 +72,38 @@ void fn_seek(SSTReaderPtr ptr, ColumnFamilyType ct, EngineIteratorSeekType et, B
auto * reader = reinterpret_cast<MockSSTReader *>(ptr.inner);
reader->ffi_seek(ptr, ct, et, bf);
}
static uint64_t fn_approx_size(SSTReaderPtr ptr, ColumnFamilyType)
{
auto * reader = reinterpret_cast<MockSSTReader *>(ptr.inner);
return reader->ffi_approx_size();
}

static RustStrWithViewVec fn_get_split_keys(SSTReaderPtr ptr, uint64_t splits_count)
{
auto * reader = reinterpret_cast<MockSSTReader *>(ptr.inner);
auto length = reader->length();
RUNTIME_CHECK(splits_count > 1);
auto size_per_split = length / splits_count;
std::vector<std::string> * vec = new std::vector<std::string>();
auto split_key_count = splits_count - 1;
auto it = reader->getBegin();
BaseBuffView * buffs = createBaseBuffViewArray(split_key_count);
for (size_t i = 0; i < split_key_count; i++)
{
for (size_t j = 0; j < size_per_split; j++)
{
it++;
}
vec->push_back(it->first);
new (&buffs[i]) BaseBuffView{.data = vec->back().data(), .len = vec->back().size()};
}
return RustStrWithViewVec{
.buffs = buffs,
.len = split_key_count,
.inner = RawRustPtr{
.ptr = new RustStrWithViewVecInner{.vec = vec, .buffs = buffs},
.type = static_cast<RawRustPtrType>(RawObjType::MockVecOfString)}};
}

SSTReaderInterfaces make_mock_sst_reader_interface()
{
Expand All @@ -81,6 +116,8 @@ SSTReaderInterfaces make_mock_sst_reader_interface()
.fn_gc = fn_gc,
.fn_kind = fn_kind,
.fn_seek = fn_seek,
.fn_approx_size = fn_approx_size,
.fn_get_split_keys = fn_get_split_keys,
};
}
} // namespace DB
24 changes: 22 additions & 2 deletions dbms/src/Debug/MockSSTReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ struct MockSSTReader
using Key = std::pair<std::string, ColumnFamilyType>;
struct Data : std::vector<std::pair<std::string, std::string>>
{
DISALLOW_COPY(Data);
Data(Data &&) = default;
Data & operator=(Data &&) = default;
Data() = default;
static Data copyFrom(const Data & other) { return other; }

private:
Data(const Data &) = default;
};

explicit MockSSTReader(const Data & data_, SSTFormatKind kind_)
Expand All @@ -49,7 +52,14 @@ struct MockSSTReader
, end(data_.end())
, remained(iter != end)
, kind(kind_)
{}
, len(data_.size())
{
total_bytes = 0;
for (auto it = data_.begin(); it != data_.end(); it++)
{
total_bytes += it->second.size();
}
}

static SSTReaderPtr ffi_get_cf_file_reader(const Data & data_, SSTFormatKind kind_)
{
Expand Down Expand Up @@ -96,6 +106,14 @@ struct MockSSTReader
}
}

size_t ffi_approx_size() { return total_bytes; }

size_t length() const { return len; }

Data::const_iterator getBegin() const { return begin; }

Data::const_iterator getEnd() const { return end; }

static std::map<Key, MockSSTReader::Data> & getMockSSTData() { return MockSSTData; }

private:
Expand All @@ -104,6 +122,8 @@ struct MockSSTReader
Data::const_iterator end;
bool remained;
SSTFormatKind kind;
size_t total_bytes;
size_t len;

// (region_id, cf) -> Data
static std::map<Key, MockSSTReader::Data> MockSSTData;
Expand Down
16 changes: 14 additions & 2 deletions dbms/src/Storages/DeltaMerge/BoundedSSTFilesToBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ namespace DM
BoundedSSTFilesToBlockInputStream::BoundedSSTFilesToBlockInputStream( //
SSTFilesToBlockInputStreamPtr child,
const ColId pk_column_id_,
const DecodingStorageSchemaSnapshotConstPtr & schema_snap)
const DecodingStorageSchemaSnapshotConstPtr & schema_snap,
size_t split_id)
: pk_column_id(pk_column_id_)
, _raw_child(std::move(child))
{
Expand All @@ -43,12 +44,17 @@ BoundedSSTFilesToBlockInputStream::BoundedSSTFilesToBlockInputStream( //
auto stream = std::make_shared<PKSquashingBlockInputStream</*need_extra_sort=*/true>>(
_raw_child,
pk_column_id,
is_common_handle);
is_common_handle,
split_id);
mvcc_compact_stream = std::make_unique<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT>>(
stream,
*(schema_snap->column_defines),
_raw_child->opts.gc_safepoint,
is_common_handle);
LOG_INFO(
&Poco::Logger::get("BoundedSSTFilesToBlockInputStream"),
"create bounded sst file stream, split_id={}",
split_id);
}

void BoundedSSTFilesToBlockInputStream::readPrefix()
Expand All @@ -71,6 +77,12 @@ SSTFilesToBlockInputStream::ProcessKeys BoundedSSTFilesToBlockInputStream::getPr
return _raw_child->process_keys;
}


size_t BoundedSSTFilesToBlockInputStream::getSplitId() const
{
return _raw_child->getSplitId();
}

RegionPtr BoundedSSTFilesToBlockInputStream::getRegion() const
{
return _raw_child->region;
Expand Down
23 changes: 19 additions & 4 deletions dbms/src/Storages/DeltaMerge/PKSquashingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Interpreters/sortBlock.h>
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/SSTFilesToBlockInputStream.h>

namespace DB
{
Expand All @@ -32,10 +33,15 @@ template <bool need_extra_sort>
class PKSquashingBlockInputStream final : public IBlockInputStream
{
public:
PKSquashingBlockInputStream(BlockInputStreamPtr child, ColId pk_column_id_, bool is_common_handle_)
PKSquashingBlockInputStream(
BlockInputStreamPtr child,
ColId pk_column_id_,
bool is_common_handle_,
size_t split_id_ = DM::SSTScanSoftLimit::HEAD_SPLIT)
: sorted_input_stream(child)
, pk_column_id(pk_column_id_)
, is_common_handle(is_common_handle_)
, split_id(split_id_)
{
assert(sorted_input_stream != nullptr);
cur_block = {};
Expand Down Expand Up @@ -86,7 +92,8 @@ class PKSquashingBlockInputStream final : public IBlockInputStream
}
#endif

const size_t cut_offset = findCutOffsetInNextBlock(cur_block, next_block, pk_column_id, is_common_handle);
const size_t cut_offset
= findCutOffsetInNextBlock(split_id, cur_block, next_block, pk_column_id, is_common_handle);
if (unlikely(cut_offset == 0))
// There is no pk overlap between `cur_block` and `next_block`, or `next_block` is empty, just return `cur_block`.
return finializeBlock(std::move(cur_block));
Expand Down Expand Up @@ -123,11 +130,13 @@ class PKSquashingBlockInputStream final : public IBlockInputStream

private:
static size_t findCutOffsetInNextBlock(
size_t split_id,
const Block & cur_block,
const Block & next_block,
const ColId pk_column_id,
bool is_common_handle)
{
UNUSED(split_id);
assert(cur_block);
if (!next_block)
return 0;
Expand All @@ -138,6 +147,7 @@ class PKSquashingBlockInputStream final : public IBlockInputStream
auto next_col = getByColumnId(next_block, pk_column_id).column;
RowKeyColumnContainer next_rowkey_column(next_col, is_common_handle);
size_t cut_offset = 0;

for (/* */; cut_offset < next_col->size(); ++cut_offset)
{
const auto next_pk = next_rowkey_column.getRowKeyValue(cut_offset);
Expand All @@ -147,8 +157,12 @@ class PKSquashingBlockInputStream final : public IBlockInputStream
{
if (unlikely(next_pk < last_curr_pk))
throw Exception(
"InputStream is not sorted, pk in next block is smaller than current block: "
+ next_pk.toDebugString() + " < " + last_curr_pk.toDebugString(),
fmt::format(
JinheLin marked this conversation as resolved.
Show resolved Hide resolved
"InputStream is not sorted, pk in next block {} is smaller than current block {}, "
"split_id={}",
next_pk.toDebugString(),
last_curr_pk.toDebugString(),
split_id),
ErrorCodes::LOGICAL_ERROR);
}
break;
Expand Down Expand Up @@ -180,6 +194,7 @@ class PKSquashingBlockInputStream final : public IBlockInputStream

bool first_read = true;
const bool is_common_handle;
size_t split_id;
};

} // namespace DM
Expand Down
Loading