Skip to content

Commit

Permalink
This is a suqashed commit from pingcap#5924
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
JaySon-Huang committed Sep 26, 2022
1 parent 70bbf76 commit b39d2a5
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 74 deletions.
61 changes: 38 additions & 23 deletions dbms/src/Storages/Transaction/RowCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,18 @@ struct RowEncoderV2
/// Cache encoded individual columns.
for (size_t i_col = 0, i_val = 0; i_col < table_info.columns.size(); i_col++)
{
if (i_val == fields.size())
break;

const auto & column_info = table_info.columns[i_col];
const auto & field = fields[i_val];
if ((table_info.pk_is_handle || table_info.is_common_handle) && column_info.hasPriKeyFlag())
{
// for common handle/pk is handle table,
// the field with primary key flag is usually encoded to key instead of value
continue;
}

if (column_info.id > std::numeric_limits<typename RowV2::Types<false>::ColumnIDType>::max())
is_big = true;
if (!field.isNull())
Expand All @@ -199,9 +207,6 @@ struct RowEncoderV2
null_column_ids.emplace(column_info.id);
}
i_val++;

if (i_val == fields.size())
break;
}
is_big = is_big || value_length > std::numeric_limits<RowV2::Types<false>::ValueOffsetType>::max();

Expand Down Expand Up @@ -300,7 +305,7 @@ bool appendRowV2ToBlock(
ColumnID pk_handle_id,
bool force_decode)
{
UInt8 row_flag = readLittleEndian<UInt8>(&raw_value[1]);
auto row_flag = readLittleEndian<UInt8>(&raw_value[1]);
bool is_big = row_flag & RowV2::BigRowMask;
return is_big ? appendRowV2ToBlockImpl<true>(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode)
: appendRowV2ToBlockImpl<false>(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode);
Expand Down Expand Up @@ -346,9 +351,10 @@ bool appendRowV2ToBlockImpl(
decodeUInts<ColumnID, typename RowV2::Types<is_big>::ColumnIDType>(cursor, raw_value, num_null_columns, null_column_ids);
decodeUInts<size_t, typename RowV2::Types<is_big>::ValueOffsetType>(cursor, raw_value, num_not_null_columns, value_offsets);
size_t values_start_pos = cursor;
size_t id_not_null = 0, id_null = 0;
size_t idx_not_null = 0;
size_t idx_null = 0;
// Merge ordered not null/null columns to keep order.
while (id_not_null < not_null_column_ids.size() || id_null < null_column_ids.size())
while (idx_not_null < not_null_column_ids.size() || idx_null < null_column_ids.size())
{
if (column_ids_iter == column_ids_iter_end)
{
Expand All @@ -357,24 +363,32 @@ bool appendRowV2ToBlockImpl(
}

bool is_null;
if (id_not_null < not_null_column_ids.size() && id_null < null_column_ids.size())
is_null = not_null_column_ids[id_not_null] > null_column_ids[id_null];
if (idx_not_null < not_null_column_ids.size() && idx_null < null_column_ids.size())
is_null = not_null_column_ids[idx_not_null] > null_column_ids[idx_null];
else
is_null = id_null < null_column_ids.size();
is_null = idx_null < null_column_ids.size();

auto next_datum_column_id = is_null ? null_column_ids[id_null] : not_null_column_ids[id_not_null];
if (column_ids_iter->first > next_datum_column_id)
auto next_datum_column_id = is_null ? null_column_ids[idx_null] : not_null_column_ids[idx_not_null];
const auto next_column_id = column_ids_iter->first;
if (next_column_id > next_datum_column_id)
{
// extra column
// The next column id to read is bigger than the column id of next datum in encoded row.
// It means this is the datum of extra column. May happen when reading after dropping
// a column.
if (!force_decode)
return false;
// Ignore the extra column and continue to parse other datum
if (is_null)
id_null++;
idx_null++;
else
id_not_null++;
idx_not_null++;
}
else if (column_ids_iter->first < next_datum_column_id)
else if (next_column_id < next_datum_column_id)
{
// The next column id to read is less than the column id of next datum in encoded row.
// It means this is the datum of missing column. May happen when reading after adding
// a column.
// Fill with default value and continue to read data for next column id.
const auto & column_info = column_infos[column_ids_iter->second];
if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode))
return false;
Expand All @@ -383,23 +397,24 @@ bool appendRowV2ToBlockImpl(
}
else
{
// if pk_handle_id is a valid column id, then it means the table's pk_is_handle is true
// If pk_handle_id is a valid column id, then it means the table's pk_is_handle is true
// we can just ignore the pk value encoded in value part
if (unlikely(column_ids_iter->first == pk_handle_id))
if (unlikely(next_column_id == pk_handle_id))
{
column_ids_iter++;
block_column_pos++;
if (is_null)
{
id_null++;
idx_null++;
}
else
{
id_not_null++;
idx_not_null++;
}
continue;
}

// Parse the datum.
auto * raw_column = const_cast<IColumn *>((block.getByPosition(block_column_pos)).column.get());
const auto & column_info = column_infos[column_ids_iter->second];
if (is_null)
Expand All @@ -418,15 +433,15 @@ bool appendRowV2ToBlockImpl(
}
// ColumnNullable::insertDefault just insert a null value
raw_column->insertDefault();
id_null++;
idx_null++;
}
else
{
size_t start = id_not_null ? value_offsets[id_not_null - 1] : 0;
size_t length = value_offsets[id_not_null] - start;
size_t start = idx_not_null ? value_offsets[idx_not_null - 1] : 0;
size_t length = value_offsets[idx_not_null] - start;
if (!raw_column->decodeTiDBRowV2Datum(values_start_pos + start, raw_value, length, force_decode))
return false;
id_not_null++;
idx_not_null++;
}
column_ids_iter++;
block_column_pos++;
Expand Down
45 changes: 28 additions & 17 deletions dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#pragma once
#include <Storages/Transaction/DecodingStorageSchemaSnapshot.h>
#include <Storages/Transaction/RowCodec.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/TypeMapping.h>
#include <Storages/Transaction/Types.h>

namespace DB::tests
{
Expand Down Expand Up @@ -132,7 +134,7 @@ struct ColumnIDValue<T, true>
{
static constexpr bool value_is_null = true;
using ValueType = std::decay_t<T>;
ColumnIDValue(ColumnID id_)
explicit ColumnIDValue(ColumnID id_)
: id(id_)
{}
ColumnID id;
Expand Down Expand Up @@ -197,46 +199,55 @@ void getTableInfoFieldsInternal(OrderedColumnInfoFields & column_info_fields, Ty
}

template <typename... Types>
std::pair<TableInfo, std::vector<Field>> getTableInfoAndFields(ColumnIDs handle_ids, bool is_common_handle, Types &&... column_value_ids)
std::pair<TableInfo, std::vector<Field>> getTableInfoAndFields(ColumnIDs pk_col_ids, bool is_common_handle, Types &&... column_value_ids)
{
OrderedColumnInfoFields column_info_fields;
getTableInfoFieldsInternal(column_info_fields, std::forward<Types>(column_value_ids)...);
TableInfo table_info;
std::vector<Field> fields;
bool pk_is_handle = pk_col_ids.size() == 1 && pk_col_ids[0] != ::DB::TiDBPkColumnID;

for (auto & column_info_field : column_info_fields)
{
auto & column = std::get<0>(column_info_field.second);
auto & field = std::get<1>(column_info_field.second);
if (std::find(handle_ids.begin(), handle_ids.end(), column.id) != handle_ids.end())
if (std::find(pk_col_ids.begin(), pk_col_ids.end(), column.id) != pk_col_ids.end())
{
column.setPriKeyFlag();
if (column.tp != TiDB::TypeLong && column.tp != TiDB::TypeTiny && column.tp != TiDB::TypeLongLong && column.tp != TiDB::TypeShort && column.tp != TiDB::TypeInt24)
{
pk_is_handle = false;
}
}
table_info.columns.emplace_back(std::move(column));
fields.emplace_back(std::move(field));
}
if (!is_common_handle)
{
if (handle_ids[0] != EXTRA_HANDLE_COLUMN_ID)
table_info.pk_is_handle = true;
}
else

table_info.pk_is_handle = pk_is_handle;
table_info.is_common_handle = is_common_handle;
if (is_common_handle)
{
table_info.is_common_handle = true;
TiDB::IndexInfo index_info;
for (auto handle_id : handle_ids)
// TiFlash maintains the column name of primary key
// for common handle table
TiDB::IndexInfo pk_index_info;
pk_index_info.is_primary = true;
pk_index_info.idx_name = "PRIMARY";
pk_index_info.is_unique = true;
for (auto pk_col_id : pk_col_ids)
{
TiDB::IndexColumnInfo index_column_info;
for (auto & column : table_info.columns)
{
if (column.id == handle_id)
if (column.id == pk_col_id)
{
index_column_info.name = column.name;
break;
}
}
index_info.idx_cols.emplace_back(index_column_info);
pk_index_info.idx_cols.emplace_back(index_column_info);
}
table_info.index_infos.emplace_back(index_info);
table_info.index_infos.emplace_back(pk_index_info);
}

return std::make_pair(std::move(table_info), std::move(fields));
Expand All @@ -258,7 +269,7 @@ inline DecodingStorageSchemaSnapshotConstPtr getDecodingStorageSchemaSnapshot(co
store_columns.emplace_back(VERSION_COLUMN_ID, VERSION_COLUMN_NAME, VERSION_COLUMN_TYPE);
store_columns.emplace_back(TAG_COLUMN_ID, TAG_COLUMN_NAME, TAG_COLUMN_TYPE);
ColumnID handle_id = EXTRA_HANDLE_COLUMN_ID;
for (auto & column_info : table_info.columns)
for (const auto & column_info : table_info.columns)
{
if (table_info.pk_is_handle)
{
Expand Down Expand Up @@ -287,7 +298,7 @@ size_t valueStartPos(const TableInfo & table_info)

inline Block decodeRowToBlock(const String & row_value, DecodingStorageSchemaSnapshotConstPtr decoding_schema)
{
auto & sorted_column_id_with_pos = decoding_schema->sorted_column_id_with_pos;
const auto & sorted_column_id_with_pos = decoding_schema->sorted_column_id_with_pos;
auto iter = sorted_column_id_with_pos.begin();
const size_t value_column_num = 3;
// skip first three column which is EXTRA_HANDLE_COLUMN, VERSION_COLUMN, TAG_COLUMN
Expand Down Expand Up @@ -333,4 +344,4 @@ T getValueByRowV1(const T & v)
return static_cast<T>(std::move((*block.getByPosition(0).column)[0].template safeGet<NearestType>()));
}

} // namespace DB::tests
} // namespace DB::tests
Loading

0 comments on commit b39d2a5

Please sign in to comment.