Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: LargeTxn spill: Read without version col #8746

Merged
merged 45 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
9575a2b
make RegionDataReadInfo easy to add fields
CalvinNeo Feb 1, 2024
c2c4344
rename writeBlockByRegion to writeCommittedByRegion
CalvinNeo Feb 1, 2024
4503747
template ReadList
CalvinNeo Feb 2, 2024
815b236
some basic spill structure
CalvinNeo Feb 2, 2024
690cf94
try decode without version
CalvinNeo Feb 2, 2024
b67cd19
still need to fix DecodingStorageSchemaSnapshot
CalvinNeo Feb 4, 2024
f56a23d
make it work in a pathetic way
CalvinNeo Feb 5, 2024
0c9e50b
fmt
CalvinNeo Feb 5, 2024
be71bd5
make it kind of readable
CalvinNeo Feb 5, 2024
82d7fe1
remote spill
CalvinNeo Feb 5, 2024
d8da170
tidy
CalvinNeo Feb 5, 2024
f595338
tidy2
CalvinNeo Feb 5, 2024
b6d54c5
introduce AtomicReadWriteCtx to disable decoding uncommitted data
CalvinNeo Feb 5, 2024
b78482d
remove useless codes
CalvinNeo Feb 5, 2024
1c06f70
fix decode
CalvinNeo Feb 5, 2024
99feff8
fix fap test
CalvinNeo Feb 5, 2024
f87956a
log
CalvinNeo Feb 6, 2024
2b4116c
remove spill impl
CalvinNeo Feb 6, 2024
f733fc4
fmt
CalvinNeo Feb 6, 2024
78f14fb
Merge branch 'master' into big-txn-1
CalvinNeo Feb 6, 2024
dcee203
Merge branch 'master' into big-txn-1
CalvinNeo Feb 6, 2024
db58a19
address
CalvinNeo Feb 6, 2024
215aec1
Merge branch 'big-txn-1' of ssh://github.com/CalvinNeo/tics into big-…
CalvinNeo Feb 6, 2024
5b5fc18
address
CalvinNeo Feb 6, 2024
ab48246
Update dbms/src/Storages/KVStore/Decode/RegionDataRead.h
CalvinNeo Feb 6, 2024
57a5b6f
Update dbms/src/Storages/KVStore/Decode/RegionDataRead.h
CalvinNeo Feb 6, 2024
24f1d4d
Update dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
CalvinNeo Feb 6, 2024
8c2625d
Update dbms/src/Storages/KVStore/MultiRaft/Spill/RegionUncommittedDat…
CalvinNeo Feb 6, 2024
b1a89f4
replace has_version_column with with_version_column
CalvinNeo Feb 6, 2024
2c781d1
Merge branch 'big-txn-1' of ssh://github.com/CalvinNeo/tics into big-…
CalvinNeo Feb 6, 2024
8a01e3d
Update dbms/src/Storages/KVStore/MultiRaft/Spill/RegionUncommittedDat…
CalvinNeo Feb 6, 2024
ffeab59
fix
CalvinNeo Feb 6, 2024
b18c4f4
Merge branch 'big-txn-1' of ssh://github.com/CalvinNeo/tics into big-…
CalvinNeo Feb 6, 2024
f9f04c4
Update dbms/src/Storages/KVStore/Decode/DecodingStorageSchemaSnapshot…
CalvinNeo Feb 6, 2024
902239a
address
CalvinNeo Feb 7, 2024
e520200
Merge branch 'big-txn-1' of ssh://github.com/CalvinNeo/tics into big-…
CalvinNeo Feb 7, 2024
9467ea3
Merge branch 'master' into big-txn-1
CalvinNeo Feb 7, 2024
1d3a01b
Merge branch 'master' into big-txn-1
CalvinNeo Feb 7, 2024
0f2b954
f
CalvinNeo Feb 19, 2024
589593a
change back
CalvinNeo Feb 19, 2024
07e6705
add spill
CalvinNeo Feb 19, 2024
ba40684
add spill 2
CalvinNeo Feb 19, 2024
24be10c
resolve some todb
CalvinNeo Feb 19, 2024
c0aff91
fix
CalvinNeo Feb 19, 2024
125c98b
fix
CalvinNeo Feb 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncMockRaftSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,8 +609,8 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio
throw;
}

DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot;
std::tie(decoding_schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(lock, false);
DecodingStorageSchemaSnapshotConstPtr decoding_schema_snapshot
= storage->getSchemaSnapshotAndBlockForDecoding(lock, false, true).first;
res_block = createBlockSortByColumnID(decoding_schema_snapshot);
auto reader = RegionBlockReader(decoding_schema_snapshot);
return reader.read(res_block, *data_list_read, force_decode);
Expand Down
20 changes: 10 additions & 10 deletions dbms/src/Storages/DeltaMerge/tests/gtest_sst_files_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, OutputNoDTFile)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(100, 100, /*block_size=*/5));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -209,7 +209,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, OutputSingleDTFile)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/5));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -241,7 +241,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, OutputSingleDTFileWithOneBlock)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/1000));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -274,7 +274,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, OutputMultipleDTFile)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/1));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -313,7 +313,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, SplitAtBlockBoundary)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/20));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -348,7 +348,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, VeryLargeSplitThreshold)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/20));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -380,7 +380,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, NonContinuousBlock)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto blocks1 = prepareBlocks(50, 100, /*block_size=*/20);
auto blocks2 = prepareBlocks(130, 150, /*block_size=*/10);
Expand Down Expand Up @@ -422,7 +422,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, BrokenChild)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto blocks1 = prepareBlocks(50, 100, /*block_size=*/20);
auto blocks2 = prepareBlocks(0, 30, /*block_size=*/20);
Expand Down Expand Up @@ -458,7 +458,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, Cancel)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/1));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down Expand Up @@ -507,7 +507,7 @@ TEST_F(SSTFilesToDTFilesOutputStreamTest, UpperLayerCancel)
try
{
auto table_lock = storage->lockStructureForShare("foo_query_id");
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false);
auto [schema_snapshot, unused] = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false, true);

auto mock_stream = makeMockChild(prepareBlocks(50, 100, /*block_size=*/1));
auto prehandle_task = std::make_shared<PreHandlingTrace::Item>();
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ class IManageableStorage : public IStorage
/// This method must be called under the protection of table structure lock
virtual std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(
const TableStructureLockHolder & /* table_structure_lock */,
bool /* need_block */)
bool /* need_block */,
bool /* has_version_block */)
{
throw Exception(
"Method getDecodingSchemaSnapshot is not supported by storage " + getName(),
Expand Down
107 changes: 101 additions & 6 deletions dbms/src/Storages/KVStore/Decode/DecodingStorageSchemaSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,94 @@

namespace DB
{
DecodingStorageSchemaSnapshot::DecodingStorageSchemaSnapshot(
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
DM::ColumnDefinesPtr column_defines_,
const TiDB::TableInfo & table_info_,
const DM::ColumnDefine & original_handle_,
Int64 decoding_schema_epoch_,
bool with_version_column)
: column_defines{std::move(column_defines_)}
, pk_is_handle{table_info_.pk_is_handle}
, is_common_handle{table_info_.is_common_handle}
, decoding_schema_epoch{decoding_schema_epoch_}
{
std::unordered_map<ColumnID, size_t> column_lut(table_info_.columns.size());
// col id -> tidb pos, has no internal cols.
for (size_t i = 0; i < table_info_.columns.size(); i++)
{
const auto & ci = table_info_.columns[i];
column_lut.emplace(ci.id, i);
}
// column_defines has internal cols.
size_t index_in_block = 0;
for (size_t i = 0; i < column_defines->size(); i++)
{
auto & cd = (*column_defines)[i];
if (cd.id != VersionColumnID || with_version_column)
{
col_id_to_block_pos.insert({cd.id, index_in_block++});
}
col_id_to_def_pos.insert({cd.id, i});
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved

if (cd.id != TiDBPkColumnID && cd.id != VersionColumnID && cd.id != DelMarkColumnID)
{
const auto & columns = table_info_.columns;
column_infos.push_back(columns[column_lut.at(cd.id)]);
}
else
{
column_infos.push_back(ColumnInfo());
}
}

// create pk related metadata if needed
if (is_common_handle)
{
const auto & primary_index_cols = table_info_.getPrimaryIndexInfo().idx_cols;
for (const auto & primary_index_col : primary_index_cols)
{
auto pk_column_id = table_info_.columns[primary_index_col.offset].id;
pk_column_ids.emplace_back(pk_column_id);
pk_pos_map.emplace(pk_column_id, reinterpret_cast<size_t>(std::numeric_limits<size_t>::max()));
}
pk_type = TMTPKType::STRING;
rowkey_column_size = pk_column_ids.size();
}
else if (table_info_.pk_is_handle)
{
pk_column_ids.emplace_back(original_handle_.id);
pk_pos_map.emplace(original_handle_.id, reinterpret_cast<size_t>(std::numeric_limits<size_t>::max()));
pk_type = getTMTPKType(*original_handle_.type);
rowkey_column_size = 1;
}
else
{
pk_type = TMTPKType::INT64;
rowkey_column_size = 1;
}

// calculate pk column pos in block
if (!pk_pos_map.empty())
{
auto pk_pos_iter = pk_pos_map.begin();
size_t column_pos_in_block = 0;
for (auto & column_id_with_pos : col_id_to_block_pos)
{
if (pk_pos_iter == pk_pos_map.end())
break;
if (pk_pos_iter->first == column_id_with_pos.first)
{
pk_pos_iter->second = column_pos_in_block;
pk_pos_iter++;
}
column_pos_in_block++;
}
if (unlikely(pk_pos_iter != pk_pos_map.end()))
throw Exception("Cannot find all pk columns in block", ErrorCodes::LOGICAL_ERROR);
}
}


TMTPKType getTMTPKType(const IDataType & rhs)
{
static const DataTypeInt64 & dataTypeInt64 = {}; // NOLINT
Expand All @@ -32,15 +120,22 @@ TMTPKType getTMTPKType(const IDataType & rhs)
return TMTPKType::UNSPECIFIED;
}

Block createBlockSortByColumnID(DecodingStorageSchemaSnapshotConstPtr schema_snapshot)
Block createBlockSortByColumnID(DecodingStorageSchemaSnapshotConstPtr schema_snapshot, bool with_version_column)
{
Block block;
for (auto iter = schema_snapshot->sorted_column_id_with_pos.begin();
iter != schema_snapshot->sorted_column_id_with_pos.end();
iter++)
// # Safety
// Though `col_id_to_block_pos` lacks some fields in `col_id_to_def_pos`,
// it is always a sub-sequence of `col_id_to_def_pos`.
for (const auto & [col_id, def_pos] : schema_snapshot->getColId2DefPosMap())
{
auto col_id = iter->first;
auto & cd = (*(schema_snapshot->column_defines))[iter->second];
// col_id == cd.id
// Including some internal columns:
// - (VersionColumnID, _INTERNAL_VERSION, u64)
// - (DelMarkColumnID, _INTERNAL_DELMARK, u8)
// - (TiDBPkColumnID, _tidb_rowid, i64)
auto & cd = (*(schema_snapshot->column_defines))[def_pos];
if (!with_version_column && cd.id == VersionColumnID)
continue;
block.insert({cd.type->createColumn(), cd.type, cd.name, col_id});
}
return block;
Expand Down
106 changes: 21 additions & 85 deletions dbms/src/Storages/KVStore/Decode/DecodingStorageSchemaSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,26 @@ using ColumnInfo = TiDB::ColumnInfo;
using ColumnInfos = std::vector<ColumnInfo>;
struct DecodingStorageSchemaSnapshot
{
DecodingStorageSchemaSnapshot(
DM::ColumnDefinesPtr column_defines_,
const TiDB::TableInfo & table_info_,
const DM::ColumnDefine & original_handle_,
Int64 decoding_schema_epoch_,
bool with_version_column);

DISALLOW_COPY(DecodingStorageSchemaSnapshot);

DecodingStorageSchemaSnapshot(DecodingStorageSchemaSnapshot &&) = default;

const SortedColumnIDWithPos & getColId2BlockPosMap() const { return col_id_to_block_pos; }
const SortedColumnIDWithPos & getColId2DefPosMap() const { return col_id_to_def_pos; }

// There is a one-to-one correspondence between elements in `column_defines` and elements in `column_infos`
// Note that some columns(EXTRA_HANDLE_COLUMN, VERSION_COLUMN, TAG_COLUMN) may not be a real column in tidb schema,
// so their corresponding elements in `column_infos` are just nullptr and won't be used when decoding.
DM::ColumnDefinesPtr column_defines;
ColumnInfos column_infos;

// column id -> column pos in column_defines/column_infos
SortedColumnIDWithPos sorted_column_id_with_pos;

// 1. when the table doesn't have a common handle,
// 1) if `pk_is_handle` is false, `pk_column_ids` is empty
// 2) if `pk_is_handle` is true, `pk_column_ids` contain a single element which is the column id of the pk column
Expand All @@ -71,92 +82,17 @@ struct DecodingStorageSchemaSnapshot
// an internal increasing version for `DecodingStorageSchemaSnapshot`, has no relation with the table schema version
Int64 decoding_schema_epoch;

DecodingStorageSchemaSnapshot(
DM::ColumnDefinesPtr column_defines_,
const TiDB::TableInfo & table_info_,
const DM::ColumnDefine & original_handle_,
Int64 decoding_schema_epoch_)
: column_defines{std::move(column_defines_)}
, pk_is_handle{table_info_.pk_is_handle}
, is_common_handle{table_info_.is_common_handle}
, decoding_schema_epoch{decoding_schema_epoch_}
{
std::unordered_map<ColumnID, size_t> column_lut;
for (size_t i = 0; i < table_info_.columns.size(); i++)
{
const auto & ci = table_info_.columns[i];
column_lut.emplace(ci.id, i);
}
for (size_t i = 0; i < column_defines->size(); i++)
{
auto & cd = (*column_defines)[i];
sorted_column_id_with_pos.insert({cd.id, i});
if (cd.id != TiDBPkColumnID && cd.id != VersionColumnID && cd.id != DelMarkColumnID)
{
const auto & columns = table_info_.columns;
column_infos.push_back(columns[column_lut.at(cd.id)]);
}
else
{
column_infos.push_back(ColumnInfo());
}
}

// create pk related metadata if needed
if (is_common_handle)
{
const auto & primary_index_cols = table_info_.getPrimaryIndexInfo().idx_cols;
for (const auto & primary_index_col : primary_index_cols)
{
auto pk_column_id = table_info_.columns[primary_index_col.offset].id;
pk_column_ids.emplace_back(pk_column_id);
pk_pos_map.emplace(pk_column_id, reinterpret_cast<size_t>(std::numeric_limits<size_t>::max()));
}
pk_type = TMTPKType::STRING;
rowkey_column_size = pk_column_ids.size();
}
else if (table_info_.pk_is_handle)
{
pk_column_ids.emplace_back(original_handle_.id);
pk_pos_map.emplace(original_handle_.id, reinterpret_cast<size_t>(std::numeric_limits<size_t>::max()));
pk_type = getTMTPKType(*original_handle_.type);
rowkey_column_size = 1;
}
else
{
pk_type = TMTPKType::INT64;
rowkey_column_size = 1;
}

// calculate pk column pos in block
if (!pk_pos_map.empty())
{
auto pk_pos_iter = pk_pos_map.begin();
size_t column_pos_in_block = 0;
for (auto & column_id_with_pos : sorted_column_id_with_pos)
{
if (pk_pos_iter == pk_pos_map.end())
break;
if (pk_pos_iter->first == column_id_with_pos.first)
{
pk_pos_iter->second = column_pos_in_block;
pk_pos_iter++;
}
column_pos_in_block++;
}
if (unlikely(pk_pos_iter != pk_pos_map.end()))
throw Exception("Cannot find all pk columns in block", ErrorCodes::LOGICAL_ERROR);
}
}

DISALLOW_COPY(DecodingStorageSchemaSnapshot);

DecodingStorageSchemaSnapshot(DecodingStorageSchemaSnapshot &&) = default;
private:
// `col_id_to_def_pos` is originally `sorted_column_id_with_pos`.
// We may omit some cols in block, e.g. version col. So `col_id_to_def_pos` may have more items than `col_id_to_block_pos`.
// Both of the maps are sorted in ColumnID order, which makes the internal cols in first.
SortedColumnIDWithPos col_id_to_block_pos;
SortedColumnIDWithPos col_id_to_def_pos;
};
using DecodingStorageSchemaSnapshotPtr = std::shared_ptr<DecodingStorageSchemaSnapshot>;
using DecodingStorageSchemaSnapshotConstPtr = std::shared_ptr<const DecodingStorageSchemaSnapshot>;

Block createBlockSortByColumnID(DecodingStorageSchemaSnapshotConstPtr schema_snapshot);
Block createBlockSortByColumnID(DecodingStorageSchemaSnapshotConstPtr schema_snapshot, bool with_version_column = true);

void clearBlockData(Block & block);

Expand Down
Loading