Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Refine parallel prehandle snapshot (part-2) #9193

Merged
merged 8 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/RuntimeFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void RuntimeFilter::setTargetAttr(
target_attr = DM::FilterParser::createAttr(target_expr, scan_column_infos, table_column_defines);
}

DM::RSOperatorPtr RuntimeFilter::parseToRSOperator()
DM::RSOperatorPtr RuntimeFilter::parseToRSOperator() const
{
switch (rf_type)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/RuntimeFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class RuntimeFilter
bool await(int64_t ms_remaining);

void setTargetAttr(const TiDB::ColumnInfos & scan_column_infos, const DM::ColumnDefines & table_column_defines);
DM::RSOperatorPtr parseToRSOperator();
DM::RSOperatorPtr parseToRSOperator() const;

const int id;

Expand Down
53 changes: 26 additions & 27 deletions dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
, prehandle_task(prehandle_task_)
, opts(std::move(opts_))
{
log = Logger::get(opts.log_prefix);
const size_t split_id
= soft_limit.has_value() ? soft_limit.value().split_id : DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT;
log = Logger::get(opts.log_prefix, fmt::format("region_id={} split_id={}", region->id(), split_id));

// We have to initialize sst readers at an earlier stage,
// due to prehandle snapshot of single region feature in raftstore v2.
Expand All @@ -62,9 +64,8 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
auto make_inner_func = [&](const TiFlashRaftProxyHelper * proxy_helper,
SSTView snap,
SSTReader::RegionRangeFilter range,
size_t split_id,
size_t region_id) {
return std::make_unique<MonoSSTReader>(proxy_helper, snap, range, split_id, region_id);
const LoggerPtr & log_) {
return std::make_unique<MonoSSTReader>(proxy_helper, snap, range, log_);
};
for (UInt64 i = 0; i < snaps.len; ++i)
{
Expand Down Expand Up @@ -92,9 +93,7 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
make_inner_func,
ssts_default,
log,
region->getRange(),
soft_limit.has_value() ? soft_limit.value().split_id : DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT,
region->id());
region->getRange());
}
if (!ssts_write.empty())
{
Expand All @@ -104,9 +103,7 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
make_inner_func,
ssts_write,
log,
region->getRange(),
soft_limit.has_value() ? soft_limit.value().split_id : DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT,
region->id());
region->getRange());
}
if (!ssts_lock.empty())
{
Expand All @@ -116,9 +113,7 @@ SSTFilesToBlockInputStream::SSTFilesToBlockInputStream( //
make_inner_func,
ssts_lock,
log,
region->getRange(),
soft_limit.has_value() ? soft_limit.value().split_id : DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT,
region->id());
region->getRange());
}
LOG_INFO(
log,
Expand Down Expand Up @@ -149,8 +144,6 @@ void SSTFilesToBlockInputStream::checkFinishedState(SSTReaderPtr & reader, Colum
return;
if (!reader->remained())
return;
if (prehandle_task->isAbort())
return;

// now the stream must be stopped by `soft_limit`, let's check the keys in reader
RUNTIME_CHECK_MSG(soft_limit.has_value(), "soft_limit.has_value(), cf={}", magic_enum::enum_name(cf));
Expand All @@ -163,9 +156,13 @@ void SSTFilesToBlockInputStream::checkFinishedState(SSTReaderPtr & reader, Colum

void SSTFilesToBlockInputStream::readSuffix()
{
checkFinishedState(write_cf_reader, ColumnFamilyType::Write);
checkFinishedState(default_cf_reader, ColumnFamilyType::Default);
checkFinishedState(lock_cf_reader, ColumnFamilyType::Lock);
// For aborted task, we don't need to check the finish state
if (!prehandle_task->isAbort())
{
checkFinishedState(write_cf_reader, ColumnFamilyType::Write);
checkFinishedState(default_cf_reader, ColumnFamilyType::Default);
checkFinishedState(lock_cf_reader, ColumnFamilyType::Lock);
}

// reset all SSTReaders and return without writting blocks any more.
write_cf_reader.reset();
Expand All @@ -179,7 +176,7 @@ Block SSTFilesToBlockInputStream::read()

while (write_cf_reader && write_cf_reader->remained())
{
bool should_stop_advancing = maybeStopBySoftLimit(ColumnFamilyType::Write, write_cf_reader);
bool should_stop_advancing = maybeStopBySoftLimit(ColumnFamilyType::Write, write_cf_reader.get());
if (should_stop_advancing)
{
// Load the last batch
Expand Down Expand Up @@ -241,22 +238,19 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
const DecodedTiKVKey * const rowkey_to_be_included)
{
SSTReader * reader;
SSTReaderPtr * reader_ptr;
size_t * p_process_keys;
size_t * p_process_keys_bytes;
DecodedTiKVKey * last_loaded_rowkey;
if (cf == ColumnFamilyType::Default)
{
reader = default_cf_reader.get();
reader_ptr = &default_cf_reader;
p_process_keys = &process_keys.default_cf;
p_process_keys_bytes = &process_keys.default_cf_bytes;
last_loaded_rowkey = &default_last_loaded_rowkey;
}
else if (cf == ColumnFamilyType::Lock)
{
reader = lock_cf_reader.get();
reader_ptr = &lock_cf_reader;
p_process_keys = &process_keys.lock_cf;
p_process_keys_bytes = &process_keys.lock_cf_bytes;
last_loaded_rowkey = &lock_last_loaded_rowkey;
Expand All @@ -266,7 +260,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(

if (reader && reader->remained())
{
maybeSkipBySoftLimit(cf, *reader_ptr);
maybeSkipBySoftLimit(cf, reader);
}

Stopwatch sw;
Expand All @@ -276,7 +270,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
{
while (reader && reader->remained())
{
if (maybeStopBySoftLimit(cf, *reader_ptr))
if (maybeStopBySoftLimit(cf, reader))
{
break;
}
Expand Down Expand Up @@ -335,7 +329,7 @@ void SSTFilesToBlockInputStream::loadCFDataFromSST(
// Let's try to load keys until process_keys_offset_end
while (reader && reader->remained() && *p_process_keys < process_keys_offset_end)
{
if (maybeStopBySoftLimit(cf, *reader_ptr))
if (maybeStopBySoftLimit(cf, reader))
{
break;
}
Expand Down Expand Up @@ -420,7 +414,7 @@ std::vector<std::string> SSTFilesToBlockInputStream::findSplitKeys(size_t splits

// Returning false means no skip is performed, the reader is intact.
// Returning true means skip is performed, must read from current value.
bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTReaderPtr & reader)
bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTReader * reader)
{
if (!soft_limit.has_value())
return false;
Expand All @@ -429,6 +423,9 @@ bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTRe
if (!start_limit)
return false;

if (!reader)
return false;

if (reader && reader->remained())
{
auto key = reader->keyView();
Expand Down Expand Up @@ -504,14 +501,16 @@ bool SSTFilesToBlockInputStream::maybeSkipBySoftLimit(ColumnFamilyType cf, SSTRe
return false;
}

bool SSTFilesToBlockInputStream::maybeStopBySoftLimit(ColumnFamilyType cf, SSTReaderPtr & reader)
bool SSTFilesToBlockInputStream::maybeStopBySoftLimit(ColumnFamilyType cf, SSTReader * reader)
{
if (!soft_limit.has_value())
return false;
const SSTScanSoftLimit & sl = soft_limit.value();
const auto & end_limit = soft_limit.value().getEndLimit();
if (!end_limit)
return false;

assert(reader != nullptr);
auto key = reader->keyView();
// TODO the copy could be eliminated, but with many modifications.
auto tikv_key = TiKVKey(key.data, key.len);
Expand Down
57 changes: 6 additions & 51 deletions dbms/src/Storages/DeltaMerge/Decode/SSTFilesToBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <RaftStoreProxyFFI/ColumnFamily.h>
#include <Storages/DeltaMerge/DMVersionFilterBlockInputStream.h>
#include <Storages/DeltaMerge/Decode/SSTScanSoftLimit.h>
#include <Storages/KVStore/Decode/PartitionStreams.h>
#include <Storages/KVStore/MultiRaft/PreHandlingTrace.h>

Expand All @@ -42,52 +43,6 @@ using SSTFilesToBlockInputStreamPtr = std::shared_ptr<SSTFilesToBlockInputStream
class BoundedSSTFilesToBlockInputStream;
using BoundedSSTFilesToBlockInputStreamPtr = std::shared_ptr<BoundedSSTFilesToBlockInputStream>;

struct SSTScanSoftLimit
{
constexpr static size_t HEAD_OR_ONLY_SPLIT = SIZE_MAX;
size_t split_id;
TiKVKey raw_start;
TiKVKey raw_end;
DecodedTiKVKey decoded_start;
DecodedTiKVKey decoded_end;
std::optional<RawTiDBPK> start_limit;
std::optional<RawTiDBPK> end_limit;

SSTScanSoftLimit(size_t split_id_, TiKVKey && raw_start_, TiKVKey && raw_end_)
: split_id(split_id_)
, raw_start(std::move(raw_start_))
, raw_end(std::move(raw_end_))
{
if (!raw_start.empty())
{
decoded_start = RecordKVFormat::decodeTiKVKey(raw_start);
}
if (!raw_end.empty())
{
decoded_end = RecordKVFormat::decodeTiKVKey(raw_end);
}
if (!decoded_start.empty())
{
start_limit = RecordKVFormat::getRawTiDBPK(decoded_start);
}
if (!decoded_end.empty())
{
end_limit = RecordKVFormat::getRawTiDBPK(decoded_end);
}
}

SSTScanSoftLimit clone() const { return SSTScanSoftLimit(split_id, raw_start.toString(), raw_end.toString()); }

const std::optional<RawTiDBPK> & getStartLimit() const { return start_limit; }

const std::optional<RawTiDBPK> & getEndLimit() const { return end_limit; }

std::string toDebugString() const
{
return fmt::format("{}:{}", raw_start.toDebugString(), raw_end.toDebugString());
}
};

struct SSTFilesToBlockInputStreamOpts
{
std::string log_prefix;
Expand Down Expand Up @@ -139,7 +94,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
size_t lock_cf_bytes = 0;

inline size_t total() const { return default_cf + write_cf + lock_cf; }
inline size_t total_bytes() const { return default_cf_bytes + write_cf_bytes + lock_cf_bytes; }
inline size_t totalBytes() const { return default_cf_bytes + write_cf_bytes + lock_cf_bytes; }
};

const ProcessKeys & getProcessKeys() const { return process_keys; }
Expand All @@ -149,15 +104,15 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream
}

using SSTReaderPtr = std::unique_ptr<SSTReader>;
bool maybeSkipBySoftLimit(ColumnFamilyType cf, SSTReaderPtr & reader);
bool maybeSkipBySoftLimit() { return maybeSkipBySoftLimit(ColumnFamilyType::Write, write_cf_reader); }
bool maybeSkipBySoftLimit(ColumnFamilyType cf, SSTReader * reader);
bool maybeSkipBySoftLimit() { return maybeSkipBySoftLimit(ColumnFamilyType::Write, write_cf_reader.get()); }

private:
void loadCFDataFromSST(ColumnFamilyType cf, const DecodedTiKVKey * rowkey_to_be_included);

// Emits data into block if the transaction to this key is committed.
Block readCommitedBlock();
bool maybeStopBySoftLimit(ColumnFamilyType cf, SSTReaderPtr & reader);
bool maybeStopBySoftLimit(ColumnFamilyType cf, SSTReader * reader);
void checkFinishedState(SSTReaderPtr & reader, ColumnFamilyType cf);

private:
Expand Down Expand Up @@ -219,7 +174,7 @@ class BoundedSSTFilesToBlockInputStream final

// Note that we only keep _raw_child for getting ingest info / process key, etc. All block should be
// read from `mvcc_compact_stream`
const SSTFilesToBlockInputStreamPtr _raw_child;
const SSTFilesToBlockInputStreamPtr _raw_child; // NOLINT(readability-identifier-naming)
std::unique_ptr<DMVersionFilterBlockInputStream<DM_VERSION_FILTER_MODE_COMPACT>> mvcc_compact_stream;
};

Expand Down
71 changes: 71 additions & 0 deletions dbms/src/Storages/DeltaMerge/Decode/SSTScanSoftLimit.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <RaftStoreProxyFFI/ProxyFFI.h>
#include <Storages/KVStore/Decode/DecodedTiKVKeyValue.h>
#include <Storages/KVStore/FFI/SSTReader.h>
#include <Storages/KVStore/MultiRaft/RegionState.h>
#include <Storages/KVStore/TiKVHelpers/TiKVKeyValue.h>
#include <Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h>

namespace DB::DM
{

struct SSTScanSoftLimit
{
constexpr static size_t HEAD_OR_ONLY_SPLIT = SIZE_MAX;
size_t split_id;
TiKVKey raw_start;
TiKVKey raw_end;
DecodedTiKVKey decoded_start;
DecodedTiKVKey decoded_end;
std::optional<RawTiDBPK> start_limit;
std::optional<RawTiDBPK> end_limit;

SSTScanSoftLimit(size_t split_id_, TiKVKey && raw_start_, TiKVKey && raw_end_)
: split_id(split_id_)
, raw_start(std::move(raw_start_))
, raw_end(std::move(raw_end_))
{
if (!raw_start.empty())
{
decoded_start = RecordKVFormat::decodeTiKVKey(raw_start);
}
if (!raw_end.empty())
{
decoded_end = RecordKVFormat::decodeTiKVKey(raw_end);
}
if (!decoded_start.empty())
{
start_limit = RecordKVFormat::getRawTiDBPK(decoded_start);
}
if (!decoded_end.empty())
{
end_limit = RecordKVFormat::getRawTiDBPK(decoded_end);
}
}

SSTScanSoftLimit clone() const { return SSTScanSoftLimit(split_id, raw_start.toString(), raw_end.toString()); }

const std::optional<RawTiDBPK> & getStartLimit() const { return start_limit; }

const std::optional<RawTiDBPK> & getEndLimit() const { return end_limit; }

std::string toDebugString() const
{
return fmt::format("{}:{}", raw_start.toDebugString(), raw_end.toDebugString());
}
};

} // namespace DB::DM
6 changes: 4 additions & 2 deletions dbms/src/Storages/DeltaMerge/FilterParser/FilterParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,10 @@ RSOperatorPtr FilterParser::parseRFInExpr(
case tipb::IN:
{
if (!isColumnExpr(target_expr) || !target_attr)
return createUnsupported(
fmt::format("rf target expr is not column expr, expr.tp={}", tipb::ExprType_Name(target_expr.tp())));
return createUnsupported(fmt::format(
"rf target expr is not column expr or attr not found, expr.tp={} target_attr.has_value={}",
tipb::ExprType_Name(target_expr.tp()),
target_attr.has_value()));
const auto & attr = *target_attr;
if (target_expr.field_type().tp() == TiDB::TypeTimestamp && !timezone_info.is_utc_timezone)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ void UnorderedInputStream::prepareRuntimeFilter()
pushDownReadyRFList(ready_rf_list);
}

void UnorderedInputStream::pushDownReadyRFList(std::vector<RuntimeFilterPtr> readyRFList)
void UnorderedInputStream::pushDownReadyRFList(const std::vector<RuntimeFilterPtr> & ready_rf_list)
{
for (const RuntimeFilterPtr & rf : readyRFList)
for (const RuntimeFilterPtr & rf : ready_rf_list)
{
auto rs_operator = rf->parseToRSOperator();
task_pool->appendRSOperator(rs_operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream
private:
void prepareRuntimeFilter();

void pushDownReadyRFList(std::vector<RuntimeFilterPtr> readyRFList);
void pushDownReadyRFList(const std::vector<RuntimeFilterPtr> & ready_rf_list);

SegmentReadTaskPoolPtr task_pool;
Block header;
Expand Down
Loading