Skip to content

Commit

Permalink
KVStore: LargeTxn spill: Read without version col (#8746)
Browse files Browse the repository at this point in the history
ref #8736
  • Loading branch information
CalvinNeo authored Feb 19, 2024
1 parent 63d5348 commit 82a13c4
Show file tree
Hide file tree
Showing 28 changed files with 727 additions and 276 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,8 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
throw;
}

DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot;
std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(lock, false);
DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot
= storage->getSchemaSnapshotAndBlockForDecoding(lock, false, true).first;
res_block = createBlockSortByColumnID(decoding_schema_snapshot);
auto reader = RegionBlockReader(decoding_schema_snapshot);
return reader.read(res_block, *data_list_read, force_decode);
Expand Down
20 changes: 10 additions & 10 deletions dbms/src/Storages/DeltaMerge/tests/gtest_sst_files_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, OutputNoDTFile)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(100, 100, /*block_size=*/5));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -209,7 +209,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, OutputSingleDTFile)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/5));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -241,7 +241,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, OutputSingleDTFileWithOneBlock)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/1000));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -274,7 +274,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, OutputMultipleDTFile)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/1));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -313,7 +313,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, SplitAtBlockBoundary)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/20));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -348,7 +348,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, VeryLargeSplitThreshold)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/20));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -380,7 +380,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, NonContinuousBlock)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto blocks1 = prepareBlocks(50, 100, /*block_size=*/20);
auto blocks2 = prepareBlocks(130, 150, /*block_size=*/10);
Expand Down Expand Up @@ -422,7 +422,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, BrokenChild)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto blocks1 = prepareBlocks(50, 100, /*block_size=*/20);
auto blocks2 = prepareBlocks(0, 30, /*block_size=*/20);
Expand Down Expand Up @@ -458,7 +458,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, Cancel)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/1));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -507,7 +507,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, UpperLayerCancel)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/1));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ class IManageableStorage : public IStorage
/// This method must be called under the protection of table structure lock
virtual std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(
const TableStructureLockHolder & /* table_structure_lock */,
bool /* need_block */)
bool /* need_block */,
bool /* has_version_block */)
{
throw Exception(
"Method getDecodingSchemaSnapshot is not supported by storage " + getName(),
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ add_headers_and_sources(kvstore .)
add_headers_and_sources(kvstore ./FFI)
add_headers_and_sources(kvstore ./MultiRaft)
add_headers_and_sources(kvstore ./MultiRaft/Disagg)
add_headers_and_sources(kvstore ./MultiRaft/Spill)
add_headers_and_sources(kvstore ./Utils)
add_headers_and_sources(kvstore ./TiKVHelpers)
add_headers_and_sources(kvstore ./Decode)
Expand Down
107 changes: 101 additions & 6 deletions dbms/src/Storages/KVStore/Decode/DecodingStorageSchemaSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,94 @@

namespace DB
{
DecodingStorageSchemaSnapshot::DecodingStorageSchemaSnapshot(
DM::ColumnDefinesPtr column_defines_,
const TiDB::TableInfo & table_info_,
const DM::ColumnDefine & original_handle_,
Int64 decoding_schema_epoch_,
bool with_version_column)
: column_defines{std::move(column_defines_)}
, pk_is_handle{table_info_.pk_is_handle}
, is_common_handle{table_info_.is_common_handle}
, decoding_schema_epoch{decoding_schema_epoch_}
{
std::unordered_map<ColumnID, size_t> column_lut(table_info_.columns.size());
// col id -> tidb pos, has no internal cols.
for (size_t i = 0; i < table_info_.columns.size(); i++)
{
const auto & ci = table_info_.columns[i];
column_lut.emplace(ci.id, i);
}
// column_defines has internal cols.
size_t index_in_block = 0;
for (size_t i = 0; i < column_defines->size(); i++)
{
auto & cd = (*column_defines)[i];
if (cd.id != VersionColumnID || with_version_column)
{
col_id_to_block_pos.insert({cd.id, index_in_block++});
}
col_id_to_def_pos.insert({cd.id, i});

if (cd.id != TiDBPkColumnID && cd.id != VersionColumnID && cd.id != DelMarkColumnID)
{
const auto & columns = table_info_.columns;
column_infos.push_back(columns[column_lut.at(cd.id)]);
}
else
{
column_infos.push_back(ColumnInfo());
}
}

// create pk related metadata if needed
if (is_common_handle)
{
const auto & primary_index_cols = table_info_.getPrimaryIndexInfo().idx_cols;
for (const auto & primary_index_col : primary_index_cols)
{
auto pk_column_id = table_info_.columns[primary_index_col.offset].id;
pk_column_ids.emplace_back(pk_column_id);
pk_pos_map.emplace(pk_column_id, reinterpret_cast<size_t>(std::numeric_limits<size_t>::max()));
}
pk_type = TMTPKType::STRING;
rowkey_column_size = pk_column_ids.size();
}
else if (table_info_.pk_is_handle)
{
pk_column_ids.emplace_back(original_handle_.id);
pk_pos_map.emplace(original_handle_.id, reinterpret_cast<size_t>(std::numeric_limits<size_t>::max()));
pk_type = getTMTPKType(*original_handle_.type);
rowkey_column_size = 1;
}
else
{
pk_type = TMTPKType::INT64;
rowkey_column_size = 1;
}

// calculate pk column pos in block
if (!pk_pos_map.empty())
{
auto pk_pos_iter = pk_pos_map.begin();
size_t column_pos_in_block = 0;
for (auto & column_id_with_pos : col_id_to_block_pos)
{
if (pk_pos_iter == pk_pos_map.end())
break;
if (pk_pos_iter->first == column_id_with_pos.first)
{
pk_pos_iter->second = column_pos_in_block;
pk_pos_iter++;
}
column_pos_in_block++;
}
if (unlikely(pk_pos_iter != pk_pos_map.end()))
throw Exception("Cannot find all pk columns in block", ErrorCodes::LOGICAL_ERROR);
}
}


TMTPKType getTMTPKType(const IDataType & rhs)
{
static const DataTypeInt64 & dataTypeInt64 = {}; // NOLINT
Expand All @@ -32,15 +120,22 @@ TMTPKType getTMTPKType(const IDataType & rhs)
return TMTPKType::UNSPECIFIED;
}

Block createBlockSortByColumnID(DecodingStorageSchemaSnapshotConstPtr schema_snapshot)
Block createBlockSortByColumnID(DecodingStorageSchemaSnapshotConstPtr schema_snapshot, bool with_version_column)
{
Block block;
for (auto iter = schema_snapshot->sorted_column_id_with_pos.begin();
iter != schema_snapshot->sorted_column_id_with_pos.end();
iter++)
// # Safety
// Though `col_id_to_block_pos` lacks some fields in `col_id_to_def_pos`,
// it is always a sub-sequence of `col_id_to_def_pos`.
for (const auto & [col_id, def_pos] : schema_snapshot->getColId2DefPosMap())
{
auto col_id = iter->first;
auto & cd = (*(schema_snapshot->column_defines))[iter->second];
// col_id == cd.id
// Including some internal columns:
// - (VersionColumnID, _INTERNAL_VERSION, u64)
// - (DelMarkColumnID, _INTERNAL_DELMARK, u8)
// - (TiDBPkColumnID, _tidb_rowid, i64)
auto & cd = (*(schema_snapshot->column_defines))[def_pos];
if (!with_version_column && cd.id == VersionColumnID)
continue;
block.insert({cd.type->createColumn(), cd.type, cd.name, col_id});
}
return block;
Expand Down
106 changes: 21 additions & 85 deletions dbms/src/Storages/KVStore/Decode/DecodingStorageSchemaSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,26 @@ using ColumnInfo = TiDB::ColumnInfo;
using ColumnInfos = std::vector<ColumnInfo>;
struct DecodingStorageSchemaSnapshot
{
DecodingStorageSchemaSnapshot(
DM::ColumnDefinesPtr column_defines_,
const TiDB::TableInfo & table_info_,
const DM::ColumnDefine & original_handle_,
Int64 decoding_schema_epoch_,
bool with_version_column);

DISALLOW_COPY(DecodingStorageSchemaSnapshot);

DecodingStorageSchemaSnapshot(DecodingStorageSchemaSnapshot &&) = default;

const SortedColumnIDWithPos & getColId2BlockPosMap() const { return col_id_to_block_pos; }
const SortedColumnIDWithPos & getColId2DefPosMap() const { return col_id_to_def_pos; }

// There is a one-to-one correspondence between elements in `column_defines` and elements in `column_infos`
// Note that some columns(EXTRA_HANDLE_COLUMN, VERSION_COLUMN, TAG_COLUMN) may not be a real column in tidb schema,
// so their corresponding elements in `column_infos` are just nullptr and won't be used when decoding.
DM::ColumnDefinesPtr column_defines;
ColumnInfos column_infos;

// column id -> column pos in column_defines/column_infos
SortedColumnIDWithPos sorted_column_id_with_pos;

// 1. when the table doesn't have a common handle,
// 1) if `pk_is_handle` is false, `pk_column_ids` is empty
// 2) if `pk_is_handle` is true, `pk_column_ids` contain a single element which is the column id of the pk column
Expand All @@ -71,92 +82,17 @@ struct DecodingStorageSchemaSnapshot
// an internal increasing version for `DecodingStorageSchemaSnapshot`, has no relation with the table schema version
Int64 decoding_schema_epoch;

DecodingStorageSchemaSnapshot(
DM::ColumnDefinesPtr column_defines_,
const TiDB::TableInfo & table_info_,
const DM::ColumnDefine & original_handle_,
Int64 decoding_schema_epoch_)
: column_defines{std::move(column_defines_)}
, pk_is_handle{table_info_.pk_is_handle}
, is_common_handle{table_info_.is_common_handle}
, decoding_schema_epoch{decoding_schema_epoch_}
{
std::unordered_map<ColumnID, size_t> column_lut;
for (size_t i = 0; i < table_info_.columns.size(); i++)
{
const auto & ci = table_info_.columns[i];
column_lut.emplace(ci.id, i);
}
for (size_t i = 0; i < column_defines->size(); i++)
{
auto & cd = (*column_defines)[i];
sorted_column_id_with_pos.insert({cd.id, i});
if (cd.id != TiDBPkColumnID && cd.id != VersionColumnID && cd.id != DelMarkColumnID)
{
const auto & columns = table_info_.columns;
column_infos.push_back(columns[column_lut.at(cd.id)]);
}
else
{
column_infos.push_back(ColumnInfo());
}
}

// create pk related metadata if needed
if (is_common_handle)
{
const auto & primary_index_cols = table_info_.getPrimaryIndexInfo().idx_cols;
for (const auto & primary_index_col : primary_index_cols)
{
auto pk_column_id = table_info_.columns[primary_index_col.offset].id;
pk_column_ids.emplace_back(pk_column_id);
pk_pos_map.emplace(pk_column_id, reinterpret_cast<size_t>(std::numeric_limits<size_t>::max()));
}
pk_type = TMTPKType::STRING;
rowkey_column_size = pk_column_ids.size();
}
else if (table_info_.pk_is_handle)
{
pk_column_ids.emplace_back(original_handle_.id);
pk_pos_map.emplace(original_handle_.id, reinterpret_cast<size_t>(std::numeric_limits<size_t>::max()));
pk_type = getTMTPKType(*original_handle_.type);
rowkey_column_size = 1;
}
else
{
pk_type = TMTPKType::INT64;
rowkey_column_size = 1;
}

// calculate pk column pos in block
if (!pk_pos_map.empty())
{
auto pk_pos_iter = pk_pos_map.begin();
size_t column_pos_in_block = 0;
for (auto & column_id_with_pos : sorted_column_id_with_pos)
{
if (pk_pos_iter == pk_pos_map.end())
break;
if (pk_pos_iter->first == column_id_with_pos.first)
{
pk_pos_iter->second = column_pos_in_block;
pk_pos_iter++;
}
column_pos_in_block++;
}
if (unlikely(pk_pos_iter != pk_pos_map.end()))
throw Exception("Cannot find all pk columns in block", ErrorCodes::LOGICAL_ERROR);
}
}

DISALLOW_COPY(DecodingStorageSchemaSnapshot);

DecodingStorageSchemaSnapshot(DecodingStorageSchemaSnapshot &&) = default;
private:
// `col_id_to_def_pos` is originally `sorted_column_id_with_pos`.
// We may omit some cols in block, e.g. version col. So `col_id_to_def_pos` may have more items than `col_id_to_block_pos`.
// Both of the maps are sorted in ColumnID order, which makes the internal cols in first.
SortedColumnIDWithPos col_id_to_block_pos;
SortedColumnIDWithPos col_id_to_def_pos;
};
using DecodingStorageSchemaSnapshotPtr = std::shared_ptr<DecodingStorageSchemaSnapshot>;
using DecodingStorageSchemaSnapshotConstPtr = std::shared_ptr<const DecodingStorageSchemaSnapshot>;

Block createBlockSortByColumnID(DecodingStorageSchemaSnapshotConstPtr schema_snapshot);
Block createBlockSortByColumnID(DecodingStorageSchemaSnapshotConstPtr schema_snapshot, bool with_version_column = true);

void clearBlockData(Block & block);

Expand Down
Loading

0 comments on commit 82a13c4

Please sign in to comment.