From d5c138d4a052d04a7d2f005d39f39a5c4b0ebdbf Mon Sep 17 00:00:00 2001 From: JaySon Date: Fri, 16 Sep 2022 21:42:59 +0800 Subject: [PATCH] This is an automated cherry-pick of #5879 Signed-off-by: ti-chi-bot --- dbms/src/Columns/ColumnAggregateFunction.cpp | 2 +- dbms/src/Columns/ColumnArray.cpp | 9 +- dbms/src/Columns/ColumnDecimal.cpp | 9 +- dbms/src/Columns/ColumnFixedString.cpp | 13 +- dbms/src/Columns/ColumnString.cpp | 9 +- dbms/src/Columns/ColumnVector.cpp | 2 +- .../DeltaMerge/SSTFilesToBlockInputStream.h | 2 +- .../Storages/Transaction/ApplySnapshot.cpp | 45 +- .../Storages/Transaction/PartitionStreams.cpp | 40 +- .../Storages/Transaction/PartitionStreams.h | 4 + .../Transaction/RegionBlockReader.cpp | 39 +- .../Storages/Transaction/RegionBlockReader.h | 23 +- dbms/src/Storages/Transaction/RowCodec.cpp | 91 +- dbms/src/Storages/Transaction/RowCodec.h | 36 +- .../Transaction/tests/RowCodecTestUtils.h | 8 +- .../Transaction/tests/gtest_kvstore.cpp | 1500 +++++++++++++++++ .../tests/gtest_region_block_reader.cpp | 286 +++- tests/fullstack-test2/ddl/alter_pk.test | 53 + 18 files changed, 2033 insertions(+), 138 deletions(-) create mode 100644 dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp diff --git a/dbms/src/Columns/ColumnAggregateFunction.cpp b/dbms/src/Columns/ColumnAggregateFunction.cpp index 5c60a05f630..bc8ce3ed527 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.cpp +++ b/dbms/src/Columns/ColumnAggregateFunction.cpp @@ -87,7 +87,7 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start if (start + length > from_concrete.getData().size()) throw Exception( fmt::format( - "Parameters start = {}, length = {} are out of bound in ColumnAggregateFunction::insertRangeFrom method (data.size() = {}).", + "Parameters are out of bound in ColumnAggregateFunction::insertRangeFrom method, start={}, length={}, from.size()={}", start, length, from_concrete.getData().size()), diff --git a/dbms/src/Columns/ColumnArray.cpp b/dbms/src/Columns/ColumnArray.cpp index 9b4fd0e09e0..de1a67d0045 100644 --- a/dbms/src/Columns/ColumnArray.cpp +++ b/dbms/src/Columns/ColumnArray.cpp @@ -414,8 +414,13 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng const ColumnArray & src_concrete = static_cast(src); if (start + length > src_concrete.getOffsets().size()) - throw Exception("Parameter out of bound in ColumnArray::insertRangeFrom method.", - ErrorCodes::PARAMETER_OUT_OF_BOUND); + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnArray::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_concrete.getOffsets().size()), + ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t nested_offset = src_concrete.offsetAt(start); size_t nested_length = src_concrete.getOffsets()[start + length - 1] - nested_offset; diff --git a/dbms/src/Columns/ColumnDecimal.cpp b/dbms/src/Columns/ColumnDecimal.cpp index d32d115a068..6ed842d95fd 100644 --- a/dbms/src/Columns/ColumnDecimal.cpp +++ b/dbms/src/Columns/ColumnDecimal.cpp @@ -258,8 +258,13 @@ void ColumnDecimal::insertRangeFrom(const IColumn & src, size_t start, size_t const ColumnDecimal & src_vec = static_cast(src); if (start + length > src_vec.data.size()) - throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnDecimal::insertRangeFrom method (data.size() = " + toString(src_vec.data.size()) + ").", - ErrorCodes::PARAMETER_OUT_OF_BOUND); + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnDecimal::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_vec.data.size()), + ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t old_size = data.size(); data.resize(old_size + length); diff --git a/dbms/src/Columns/ColumnFixedString.cpp b/dbms/src/Columns/ColumnFixedString.cpp index 29c9c67d70c..7b9e50403a9 100644 --- a/dbms/src/Columns/ColumnFixedString.cpp +++ b/dbms/src/Columns/ColumnFixedString.cpp @@ -169,12 +169,13 @@ void ColumnFixedString::insertRangeFrom(const IColumn & src, size_t start, size_ const ColumnFixedString & src_concrete = static_cast(src); if (start + length > src_concrete.size()) - throw Exception("Parameters start = " - + toString(start) + ", length = " - + toString(length) + " are out of bound in ColumnFixedString::insertRangeFrom method" - " (size() = " - + toString(src_concrete.size()) + ").", - ErrorCodes::PARAMETER_OUT_OF_BOUND); + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnFixedString::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_concrete.size()), + ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t old_size = chars.size(); chars.resize(old_size + length * n); diff --git a/dbms/src/Columns/ColumnString.cpp b/dbms/src/Columns/ColumnString.cpp index 7cf76a762cc..53e70c680ed 100644 --- a/dbms/src/Columns/ColumnString.cpp +++ b/dbms/src/Columns/ColumnString.cpp @@ -69,8 +69,13 @@ void ColumnString::insertRangeFrom(const IColumn & src, size_t start, size_t len const ColumnString & src_concrete = static_cast(src); if (start + length > src_concrete.offsets.size()) - throw Exception("Parameter out of bound in IColumnString::insertRangeFrom method.", - ErrorCodes::PARAMETER_OUT_OF_BOUND); + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnString::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_concrete.size()), + ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t nested_offset = src_concrete.offsetAt(start); size_t nested_length = src_concrete.offsets[start + length - 1] - nested_offset; diff --git a/dbms/src/Columns/ColumnVector.cpp b/dbms/src/Columns/ColumnVector.cpp index ae67b865968..be114997ebb 100644 --- a/dbms/src/Columns/ColumnVector.cpp +++ b/dbms/src/Columns/ColumnVector.cpp @@ -184,7 +184,7 @@ void ColumnVector::insertRangeFrom(const IColumn & src, size_t start, size_t if (start + length > src_vec.data.size()) throw Exception( fmt::format( - "Parameters start = {}, length = {} are out of bound in ColumnVector::insertRangeFrom method (data.size() = {}).", + "Parameters are out of bound in ColumnVector::insertRangeFrom method, start={}, length={}, src.size()={}", start, length, src_vec.data.size()), diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h index 5c2d8d4b260..2ad01bcc1a0 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h @@ -47,7 +47,7 @@ class SSTFilesToBlockInputStream final : public IBlockInputStream bool force_decode_, TMTContext & tmt_, size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE); - ~SSTFilesToBlockInputStream(); + ~SSTFilesToBlockInputStream() override; String getName() const override { return "SSTFilesToBlockInputStream"; } diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index a27feeced59..ea1a4b2b467 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -16,6 +16,11 @@ #include #include #include +<<<<<<< HEAD +======= +#include +#include +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) #include @@ -232,6 +237,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re } } +<<<<<<< HEAD extern RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr &, Context &); /// `preHandleSnapshotToBlock` read data from SSTFiles and predoced the data as a block @@ -297,6 +303,8 @@ RegionPreDecodeBlockDataPtr KVStore::preHandleSnapshotToBlock( return cache; } +======= +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) std::vector KVStore::preHandleSnapshotToFiles( RegionPtr new_region, const SSTViewVec snaps, @@ -304,7 +312,17 @@ std::vector KVStore::preHandleSnapshotToFiles( uint64_t term, TMTContext & tmt) { - return preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt); + std::vector ingest_ids; + try + { + ingest_ids = preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("(while preHandleSnapshot region_id={}, index={}, term={})", new_region->id(), index, term)); + e.rethrow(); + } + return ingest_ids; } /// `preHandleSSTsToDTFiles` read data from SSTFiles and generate DTFile(s) for commited data @@ -324,7 +342,15 @@ std::vector KVStore::preHandleSSTsToDTFiles( // Use failpoint to change the expected_block_size for some test cases fiu_do_on(FailPoints::force_set_sst_to_dtfile_block_size, { expected_block_size = 3; }); +<<<<<<< HEAD PageIds ids; +======= + Stopwatch watch; + SCOPE_EXIT({ GET_METRIC(tiflash_raft_command_duration_seconds, type_apply_snapshot_predecode).Observe(watch.elapsedSeconds()); }); + + PageIds generated_ingest_ids; + TableID physical_table_id = InvalidTableID; +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) while (true) { // If any schema changes is detected during decoding SSTs to DTFiles, we need to cancel and recreate DTFiles with @@ -349,6 +375,7 @@ std::vector KVStore::preHandleSSTsToDTFiles( /* ignore_cache= */ false, context.getSettingsRef().safe_point_update_interval_seconds); } + physical_table_id = storage->getTableInfo().id; // Read from SSTs and refine the boundary of blocks output to DTFiles auto sst_stream = std::make_shared( @@ -372,7 +399,7 @@ std::vector KVStore::preHandleSSTsToDTFiles( stream->writePrefix(); stream->write(); stream->writeSuffix(); - ids = stream->ingestIds(); + generated_ingest_ids = stream->ingestIds(); (void)table_drop_lock; // the table should not be dropped during ingesting file break; @@ -416,12 +443,13 @@ std::vector KVStore::preHandleSSTsToDTFiles( else { // Other unrecoverable error, throw + e.addMessage(fmt::format("physical_table_id={}", physical_table_id)); throw; } } } - return ids; + return generated_ingest_ids; } template @@ -589,7 +617,16 @@ RegionPtr KVStore::handleIngestSSTByDTFile(const RegionPtr & region, const SSTVi } // Decode the KV pairs in ingesting SST into DTFiles - PageIds ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt); + PageIds ingest_ids; + try + { + ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("(while handleIngestSST region_id={}, index={}, term={})", tmp_region->id(), index, term)); + e.rethrow(); + } // If `ingest_ids` is empty, ingest SST won't write delete_range for ingest region, it is safe to // ignore the step of calling `ingestFiles` diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index c708490a6e4..85c40f6f732 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -241,31 +241,26 @@ std::variant ReadRegionCommitCache(const RegionPtr & region, bool lock_region = true) +std::optional ReadRegionCommitCache(const RegionPtr & region, bool lock_region) { auto scanner = region->createCommittedScanner(lock_region); /// Some sanity checks for region meta. - { - if (region->isPendingRemove()) - return std::nullopt; - } + if (region->isPendingRemove()) + return std::nullopt; /// Read raw KVs from region cache. - { - // Shortcut for empty region. - if (!scanner.hasNext()) - return std::nullopt; + // Shortcut for empty region. + if (!scanner.hasNext()) + return std::nullopt; - RegionDataReadInfoList data_list_read; - data_list_read.reserve(scanner.writeMapSize()); - - do - { - data_list_read.emplace_back(scanner.next()); - } while (scanner.hasNext()); - return data_list_read; - } + RegionDataReadInfoList data_list_read; + data_list_read.reserve(scanner.writeMapSize()); + do + { + data_list_read.emplace_back(scanner.next()); + } while (scanner.hasNext()); + return data_list_read; } void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoList & data_list_read, bool lock_region = true) @@ -447,7 +442,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio std::optional data_list_read = std::nullopt; try { - data_list_read = ReadRegionCommitCache(region); + data_list_read = ReadRegionCommitCache(region, true); if (!data_list_read) return nullptr; } @@ -603,7 +598,7 @@ Block GenRegionBlockDataWithSchema(const RegionPtr & region, // const DecodingStorageSchemaSnapshotConstPtr & schema_snap, Timestamp gc_safepoint, bool force_decode, - TMTContext & tmt) + TMTContext & /* */) { // In 5.0.1, feature `compaction filter` is enabled by default. Under such feature tikv will do gc in write & default cf individually. // If some rows were updated and add tiflash replica, tiflash store may receive region snapshot with unmatched data in write & default cf sst files. @@ -611,16 +606,13 @@ Block GenRegionBlockDataWithSchema(const RegionPtr & region, // { gc_safepoint = 10000000; }); // Mock a GC safepoint for testing compaction filter region->tryCompactionFilter(gc_safepoint); - std::optional data_list_read = std::nullopt; - data_list_read = ReadRegionCommitCache(region); + std::optional data_list_read = ReadRegionCommitCache(region, true); Block res_block; // No committed data, just return if (!data_list_read) return res_block; - auto context = tmt.getContext(); - { Stopwatch watch; { diff --git a/dbms/src/Storages/Transaction/PartitionStreams.h b/dbms/src/Storages/Transaction/PartitionStreams.h index 99670394e74..602098e3b88 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.h +++ b/dbms/src/Storages/Transaction/PartitionStreams.h @@ -2,7 +2,9 @@ #include #include +#include #include +#include #include namespace DB @@ -11,6 +13,8 @@ class Region; using RegionPtr = std::shared_ptr; class StorageDeltaMerge; +std::optional ReadRegionCommitCache(const RegionPtr & region, bool lock_region); + std::tuple, DecodingStorageSchemaSnapshotConstPtr> // AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt); diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.cpp b/dbms/src/Storages/Transaction/RegionBlockReader.cpp index a5a8e956dd8..8210a7d2164 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.cpp +++ b/dbms/src/Storages/Transaction/RegionBlockReader.cpp @@ -1,12 +1,16 @@ #include +#include +#include #include #include #include #include +#include #include #include #include #include +#include namespace DB { @@ -95,6 +99,7 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d raw_column->reserve(expected_rows); } } + size_t index = 0; for (const auto & [pk, write_type, commit_ts, value_ptr] : data_list) { @@ -142,37 +147,37 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d } else { - if (schema_snapshot->pk_is_handle) - { - if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot->column_infos, schema_snapshot->pk_column_ids[0], force_decode)) - return false; - } - else - { - if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot->column_infos, InvalidColumnID, force_decode)) - return false; - } + // Parse column value from encoded value + if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot, force_decode)) + return false; } } - /// set extra handle column and pk columns if need + /// set extra handle column and pk columns from encoded key if need if constexpr (pk_type != TMTPKType::STRING) { - // extra handle column's type is always Int64 + // For non-common handle, extra handle column's type is always Int64. + // We need to copy the handle value from encoded key. + const auto handle_value = static_cast(pk); auto * raw_extra_column = const_cast((block.getByPosition(extra_handle_column_pos)).column.get()); - static_cast(raw_extra_column)->getData().push_back(Int64(pk)); + static_cast(raw_extra_column)->getData().push_back(handle_value); + // For pk_is_handle == true, we need to decode the handle value from encoded key, and insert + // to the specify column if (!pk_column_ids.empty()) { auto * raw_pk_column = const_cast((block.getByPosition(pk_pos_map.at(pk_column_ids[0]))).column.get()); if constexpr (pk_type == TMTPKType::INT64) - static_cast(raw_pk_column)->getData().push_back(Int64(pk)); + static_cast(raw_pk_column)->getData().push_back(handle_value); else if constexpr (pk_type == TMTPKType::UINT64) - static_cast(raw_pk_column)->getData().push_back(UInt64(pk)); + static_cast(raw_pk_column)->getData().push_back(UInt64(handle_value)); else { - // The pk_type must be Int32/Uint32 or more narrow type + // The pk_type must be Int32/UInt32 or more narrow type // so cannot tell its' exact type here, just use `insert(Field)` +<<<<<<< HEAD HandleID handle_value(static_cast(pk)); +======= +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) raw_pk_column->insert(Field(handle_value)); if (unlikely(raw_pk_column->getInt(index) != handle_value)) { @@ -182,7 +187,7 @@ bool RegionBlockReader::readImpl(Block & block, const RegionDataReadInfoList & d } else { - throw Exception("Detected overflow value when decoding pk column of type " + raw_pk_column->getName(), + throw Exception(fmt::format("Detected overflow value when decoding pk column, type={} handle={}", raw_pk_column->getName(), handle_value), ErrorCodes::LOGICAL_ERROR); } } diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.h b/dbms/src/Storages/Transaction/RegionBlockReader.h index 82c3c04e135..f1c18f6ce9d 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.h +++ b/dbms/src/Storages/Transaction/RegionBlockReader.h @@ -1,26 +1,10 @@ #pragma once -#include -#include -#include -#include -#include #include -#include #include -#include - -namespace TiDB -{ -struct TableInfo; -}; namespace DB { -class IManageableStorage; -using ManageableStoragePtr = std::shared_ptr; - -struct ColumnsDescription; class Block; class RegionScanFilter @@ -98,12 +82,17 @@ class RegionBlockReader : private boost::noncopyable /// Read `data_list` as a block. /// - /// On decode error, i.e. column number/type mismatch, will do force apply schema, + /// On decode error, i.e. column number/type mismatch, caller should trigger a schema-sync and retry with `force_decode=True`, /// i.e. add/remove/cast unknown/missing/type-mismatch column if force_decode is true, otherwise return empty block and false. /// Moreover, exception will be thrown if we see fatal decode error meanwhile `force_decode` is true. /// +<<<<<<< HEAD /// `RegionBlockReader::read` is the common routine used by both 'flush' and 'read' processes of TXN engine (Delta-Tree, TXN-MergeTree), /// each of which will use carefully adjusted 'start_ts' and 'force_decode' with appropriate error handling/retry to get what they want. +======= + /// `RegionBlockReader::read` is the common routine used by both 'flush' and 'read' processes of Delta-Tree engine, + /// which will use carefully adjusted 'force_decode' with appropriate error handling/retry to get what they want. +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) bool read(Block & block, const RegionDataReadInfoList & data_list, bool force_decode); private: diff --git a/dbms/src/Storages/Transaction/RowCodec.cpp b/dbms/src/Storages/Transaction/RowCodec.cpp index 8e5cfa34f2e..520eaf4f404 100644 --- a/dbms/src/Storages/Transaction/RowCodec.cpp +++ b/dbms/src/Storages/Transaction/RowCodec.cpp @@ -271,7 +271,9 @@ void encodeRowV2(const TiDB::TableInfo & table_info, const std::vector & RowEncoderV2(table_info, fields).encode(ss); } -bool appendRowToBlock( +// pre-declar block +template +bool appendRowV2ToBlockImpl( const TiKVValue::Base & raw_value, SortedColumnIDWithPosConstIter column_ids_iter, SortedColumnIDWithPosConstIter column_ids_iter_end, @@ -279,18 +281,10 @@ bool appendRowToBlock( size_t block_column_pos, const ColumnInfos & column_infos, ColumnID pk_handle_id, - bool force_decode) -{ - switch (static_cast(raw_value[0])) - { - case static_cast(RowCodecVer::ROW_V2): - return appendRowV2ToBlock(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode); - default: - return appendRowV1ToBlock(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode); - } -} + bool ignore_pk_if_absent, + bool force_decode); -bool appendRowV2ToBlock( +bool appendRowV1ToBlock( const TiKVValue::Base & raw_value, SortedColumnIDWithPosConstIter column_ids_iter, SortedColumnIDWithPosConstIter column_ids_iter_end, @@ -298,26 +292,81 @@ bool appendRowV2ToBlock( size_t block_column_pos, const ColumnInfos & column_infos, ColumnID pk_handle_id, + bool ignore_pk_if_absent, + bool force_decode); +// pre-declar block end + +bool appendRowToBlock( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const DecodingStorageSchemaSnapshotConstPtr & schema_snapshot, bool force_decode) { +<<<<<<< HEAD UInt8 row_flag = readLittleEndian(&raw_value[1]); bool is_big = row_flag & RowV2::BigRowMask; return is_big ? appendRowV2ToBlockImpl(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode) : appendRowV2ToBlockImpl(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, force_decode); +======= + const ColumnInfos & column_infos = schema_snapshot->column_infos; + // when pk is handle, we need skip pk column when decoding value + ColumnID pk_handle_id = InvalidColumnID; + if (schema_snapshot->pk_is_handle) + { + pk_handle_id = schema_snapshot->pk_column_ids[0]; + } + + // For pk_is_handle table, the column with primary key flag is decoded from encoded key instead of encoded value. + // For common handle table, the column with primary key flag is (usually) decoded from encoded key. We skip + // filling the columns with primary key flags inside this method. + // For other table (non-clustered, use hidden _tidb_rowid as handle), the column with primary key flags could be + // changed, we need to fill missing column with default value. + const bool ignore_pk_if_absent = schema_snapshot->is_common_handle || schema_snapshot->pk_is_handle; + + switch (static_cast(raw_value[0])) + { + case static_cast(RowCodecVer::ROW_V2): + { + auto row_flag = readLittleEndian(&raw_value[1]); + bool is_big = row_flag & RowV2::BigRowMask; + return is_big ? appendRowV2ToBlockImpl(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, ignore_pk_if_absent, force_decode) + : appendRowV2ToBlockImpl(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, ignore_pk_if_absent, force_decode); + } + default: + return appendRowV1ToBlock(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, ignore_pk_if_absent, force_decode); + } +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } -inline bool addDefaultValueToColumnIfPossible(const ColumnInfo & column_info, Block & block, size_t block_column_pos, bool force_decode) +inline bool addDefaultValueToColumnIfPossible(const ColumnInfo & column_info, Block & block, size_t block_column_pos, bool ignore_pk_if_absent, bool force_decode) { // We consider a missing column could be safely filled with NULL, unless it has not default value and is NOT NULL. - // This could saves lots of unnecessary schema syncs for old data with a schema that has newly added columns. - // for clustered index, if the pk column does not exists, it can still be decoded from the key + // This could saves lots of unnecessary schema syncs for old data with a newer schema that has newly added columns. + if (column_info.hasPriKeyFlag()) - return true; + { + // For clustered index or pk_is_handle, if the pk column does not exists, it can still be decoded from the key + if (ignore_pk_if_absent) + return true; + + assert(!ignore_pk_if_absent); + if (!force_decode) + return false; + // Else non-clustered index, and not pk_is_handle, it could be a row encoded by older schema, + // we need to fill the column wich has primary key flag with default value. + // fallthrough to fill default value when force_decode + } if (column_info.hasNoDefaultValueFlag() && column_info.hasNotNullFlag()) { if (!force_decode) return false; + // Else the row does not contain this "not null" / "no default value" column, + // it could be a row encoded by older schema. + // fallthrough to fill default value when force_decode } // not null or has no default value, tidb will fill with specific value. auto * raw_column = const_cast((block.getByPosition(block_column_pos)).column.get()); @@ -334,6 +383,7 @@ bool appendRowV2ToBlockImpl( size_t block_column_pos, const ColumnInfos & column_infos, ColumnID pk_handle_id, + bool ignore_pk_if_absent, bool force_decode) { size_t cursor = 2; // Skip the initial codec ver and row flag. @@ -376,7 +426,7 @@ bool appendRowV2ToBlockImpl( else if (column_ids_iter->first < next_datum_column_id) { const auto & column_info = column_infos[column_ids_iter->second]; - if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode)) + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) return false; column_ids_iter++; block_column_pos++; @@ -437,7 +487,7 @@ bool appendRowV2ToBlockImpl( if (column_ids_iter->first != pk_handle_id) { const auto & column_info = column_infos[column_ids_iter->second]; - if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode)) + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) return false; } column_ids_iter++; @@ -455,6 +505,7 @@ bool appendRowV1ToBlock( size_t block_column_pos, const ColumnInfos & column_infos, ColumnID pk_handle_id, + bool ignore_pk_if_absent, bool force_decode) { size_t cursor = 0; @@ -491,7 +542,7 @@ bool appendRowV1ToBlock( else if (column_ids_iter->first < next_field_column_id) { const auto & column_info = column_infos[column_ids_iter->second]; - if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode)) + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) return false; column_ids_iter++; block_column_pos++; @@ -551,7 +602,7 @@ bool appendRowV1ToBlock( if (column_ids_iter->first != pk_handle_id) { const auto & column_info = column_infos[column_ids_iter->second]; - if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, force_decode)) + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) return false; } column_ids_iter++; diff --git a/dbms/src/Storages/Transaction/RowCodec.h b/dbms/src/Storages/Transaction/RowCodec.h index 78208b31612..f39c0c8f9fe 100644 --- a/dbms/src/Storages/Transaction/RowCodec.h +++ b/dbms/src/Storages/Transaction/RowCodec.h @@ -6,9 +6,6 @@ namespace DB { -using TiDB::ColumnInfo; -using TiDB::TableInfo; - /// The following two encode functions are used for testing. void encodeRowV1(const TiDB::TableInfo & table_info, const std::vector & fields, WriteBuffer & ss); void encodeRowV2(const TiDB::TableInfo & table_info, const std::vector & fields, WriteBuffer & ss); @@ -19,39 +16,8 @@ bool appendRowToBlock( SortedColumnIDWithPosConstIter column_ids_iter_end, Block & block, size_t block_column_pos, - const ColumnInfos & column_infos, - ColumnID pk_handle_id, // when pk is handle, we need skip pk column when decoding value + const DecodingStorageSchemaSnapshotConstPtr & schema_snapshot, bool force_decode); -bool appendRowV2ToBlock( - const TiKVValue::Base & raw_value, - SortedColumnIDWithPosConstIter column_ids_iter, - SortedColumnIDWithPosConstIter column_ids_iter_end, - Block & block, - size_t block_column_pos, - const ColumnInfos & column_infos, - ColumnID pk_handle_id, - bool force_decode); - -template -bool appendRowV2ToBlockImpl( - const TiKVValue::Base & raw_value, - SortedColumnIDWithPosConstIter column_ids_iter, - SortedColumnIDWithPosConstIter column_ids_iter_end, - Block & block, - size_t block_column_pos, - const ColumnInfos & column_infos, - ColumnID pk_handle_id, - bool force_decode); - -bool appendRowV1ToBlock( - const TiKVValue::Base & raw_value, - SortedColumnIDWithPosConstIter column_ids_iter, - SortedColumnIDWithPosConstIter column_ids_iter_end, - Block & block, - size_t block_column_pos, - const ColumnInfos & column_infos, - ColumnID pk_handle_id, - bool force_decode); } // namespace DB diff --git a/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h b/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h index 5d1450ab566..8b097b97ab3 100644 --- a/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h +++ b/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h @@ -184,7 +184,8 @@ void getTableInfoFieldsInternal(OrderedColumnInfoFields & column_info_fields, Ty else { column_info_fields.emplace(column_id_value.id, - std::make_tuple(getColumnInfo(column_id_value.id), static_cast(std::move(column_id_value.value)))); + std::make_tuple(getColumnInfo(column_id_value.id), + static_cast(std::move(column_id_value.value)))); } } } @@ -295,10 +296,7 @@ inline Block decodeRowToBlock(const String & row_value, DecodingStorageSchemaSna iter++; Block block = createBlockSortByColumnID(decoding_schema); - if (decoding_schema->pk_is_handle) - appendRowToBlock(row_value, iter, sorted_column_id_with_pos.end(), block, value_column_num, decoding_schema->column_infos, decoding_schema->pk_column_ids[0], true); - else - appendRowToBlock(row_value, iter, sorted_column_id_with_pos.end(), block, value_column_num, decoding_schema->column_infos, InvalidColumnID, true); + appendRowToBlock(row_value, iter, sorted_column_id_with_pos.end(), block, value_column_num, decoding_schema, true); // remove first three column for (size_t i = 0; i < value_column_num; i++) diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp new file mode 100644 index 00000000000..7d90ac520f3 --- /dev/null +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -0,0 +1,1500 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace RegionBench +{ +extern void setupPutRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &, const TiKVValue &); +extern void setupDelRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &); +} // namespace RegionBench + +extern void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoList & data_list_read, bool lock_region = true); +extern void CheckRegionForMergeCmd(const raft_cmdpb::AdminResponse & response, const RegionState & region_state); +extern void ChangeRegionStateRange(RegionState & region_state, bool source_at_left, const RegionState & source_region_state); + +namespace tests +{ + +// TODO: Use another way to workaround calling the private methods on KVStore +class RegionKVStoreTest : public ::testing::Test +{ +public: + RegionKVStoreTest() + { + test_path = TiFlashTestEnv::getTemporaryPath("/region_kvs_test"); + } + + static void SetUpTestCase() {} + + void SetUp() override + { + // clean data and create path pool instance + path_pool = createCleanPathPool(test_path); + + reloadKVSFromDisk(); + + proxy_instance = std::make_unique(); + proxy_helper = std::make_unique(MockRaftStoreProxy::SetRaftStoreProxyFFIHelper( + RaftStoreProxyPtr{proxy_instance.get()})); + proxy_instance->init(100); + + kvstore->restore(*path_pool, proxy_helper.get()); + } + + void TearDown() override {} + +protected: + KVStore & getKVS() { return *kvstore; } + KVStore & reloadKVSFromDisk() + { + kvstore.reset(); + auto & global_ctx = TiFlashTestEnv::getGlobalContext(); + kvstore = std::make_unique(global_ctx, TiDB::SnapshotApplyMethod::DTFile_Directory); + // only recreate kvstore and restore data from disk, don't recreate proxy instance + kvstore->restore(*path_pool, proxy_helper.get()); + return *kvstore; + } + +protected: + static void testRaftSplit(KVStore & kvs, TMTContext & tmt); + static void testRaftMerge(KVStore & kvs, TMTContext & tmt); + static void testRaftChangePeer(KVStore & kvs, TMTContext & tmt); + static void testRaftMergeRollback(KVStore & kvs, TMTContext & tmt); + + static std::unique_ptr createCleanPathPool(const String & path) + { + // Drop files on disk + Poco::File file(path); + if (file.exists()) + file.remove(true); + file.createDirectories(); + + auto & global_ctx = TiFlashTestEnv::getGlobalContext(); + auto path_capacity = global_ctx.getPathCapacity(); + auto provider = global_ctx.getFileProvider(); + // Create a PathPool instance on the clean directory + Strings main_data_paths{path}; + return std::make_unique(main_data_paths, main_data_paths, Strings{}, path_capacity, provider); + } + + std::string test_path; + + std::unique_ptr path_pool; + std::unique_ptr kvstore; + + std::unique_ptr proxy_instance; + std::unique_ptr proxy_helper; +}; + +TEST_F(RegionKVStoreTest, NewProxy) +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + + KVStore & kvs = getKVS(); + { + auto store = metapb::Store{}; + store.set_id(1234); + kvs.setStore(store); + ASSERT_EQ(kvs.getStoreID(), store.id()); + } + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + } + { + kvs.tryPersist(1); + kvs.gcRegionPersistedCache(Seconds{0}); + } + { + // test CompactLog + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + auto region = kvs.getRegion(1); + region->markCompactLog(); + kvs.setRegionCompactLogConfig(100000, 1000, 1000); + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + // CompactLog always returns true now, even if we can't do a flush. + // We use a tryFlushData to pre-filter. + ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response), 1, 5, 1, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + // Filter + ASSERT_EQ(kvs.tryFlushRegionData(1, false, ctx.getTMTContext(), 0, 0), false); + } +} + +TEST_F(RegionKVStoreTest, ReadIndex) +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + + // start mock proxy in other thread + std::atomic_bool over{false}; + auto proxy_runner = std::thread([&]() { + proxy_instance->testRunNormal(over); + }); + KVStore & kvs = getKVS(); + ASSERT_EQ(kvs.getProxyHelper(), proxy_helper.get()); + + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10), kvs.getProxyHelper()); + lock.regions.emplace(1, region); + lock.index.add(region); + } + { + auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20), kvs.getProxyHelper()); + lock.regions.emplace(2, region); + lock.index.add(region); + } + { + auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40), kvs.getProxyHelper()); + lock.regions.emplace(3, region); + lock.index.add(region); + } + } + { + ASSERT_EQ(kvs.read_index_worker_manager, nullptr); + { + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 8); + try + { + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "`fn_handle_batch_read_index` is deprecated"); + } + } + kvs.initReadIndexWorkers( + []() { + return std::chrono::milliseconds(10); + }, + 1); + ASSERT_NE(kvs.read_index_worker_manager, nullptr); + + { + kvs.asyncRunReadIndexWorkers(); + SCOPE_EXIT({ + kvs.stopReadIndexWorkers(); + }); + + auto tar_region_id = 9; + { + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + + auto region = makeRegion(tar_region_id, RecordKVFormat::genKey(2, 0), RecordKVFormat::genKey(2, 10)); + lock.regions.emplace(region->id(), region); + lock.index.add(region); + } + { + ASSERT_EQ(proxy_instance->regions.at(tar_region_id)->getLatestCommitIndex(), 5); + proxy_instance->regions.at(tar_region_id)->updateCommitIndex(66); + } + + AsyncWaker::Notifier notifier; + const std::atomic_size_t terminate_signals_counter{}; + std::thread t([&]() { + notifier.wake(); + WaitCheckRegionReady(ctx.getTMTContext(), kvs, terminate_signals_counter, 1 / 1000.0, 20, 20 * 60); + }); + SCOPE_EXIT({ + t.join(); + kvs.handleDestroy(tar_region_id, ctx.getTMTContext()); + }); + ASSERT_EQ(notifier.blockedWaitFor(std::chrono::milliseconds(1000 * 3600)), AsyncNotifier::Status::Normal); + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + auto tar = kvs.getRegion(tar_region_id); + ASSERT_EQ( + tar->handleWriteRaftCmd({}, 66, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + } + { + kvs.asyncRunReadIndexWorkers(); + SCOPE_EXIT({ + kvs.stopReadIndexWorkers(); + }); + + auto tar_region_id = 9; + { + ASSERT_EQ(proxy_instance->regions.at(tar_region_id)->getLatestCommitIndex(), 66); + proxy_instance->unsafeInvokeForTest([&](MockRaftStoreProxy & p) { + p.region_id_to_error.emplace(tar_region_id); + p.regions.at(2)->updateCommitIndex(6); + }); + } + + AsyncWaker::Notifier notifier; + const std::atomic_size_t terminate_signals_counter{}; + std::thread t([&]() { + notifier.wake(); + WaitCheckRegionReady(ctx.getTMTContext(), kvs, terminate_signals_counter, 1 / 1000.0, 2 / 1000.0, 5 / 1000.0); + }); + SCOPE_EXIT({ + t.join(); + }); + ASSERT_EQ(notifier.blockedWaitFor(std::chrono::milliseconds(1000 * 3600)), AsyncNotifier::Status::Normal); + } + + kvs.asyncRunReadIndexWorkers(); + SCOPE_EXIT({ + kvs.stopReadIndexWorkers(); + }); + + { + // test read index + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 8); + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_EQ(resp[0].first.read_index(), 5); + { + auto r = region->waitIndex(5, 0, []() { return true; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Finished); + } + { + auto r = region->waitIndex(8, 1, []() { return false; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Terminated); + } + } + for (auto & r : proxy_instance->regions) + { + r.second->updateCommitIndex(667); + } + { + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 8); + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_EQ(resp[0].first.read_index(), 5); // history + } + { + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 10); + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_EQ(resp[0].first.read_index(), 667); + } + { + auto region = kvs.getRegion(2); + auto req = GenRegionReadIndexReq(*region, 5); + auto resp = proxy_helper->batchReadIndex({req}, 100); // v2 + ASSERT_EQ(resp[0].first.read_index(), 667); // got latest + { + auto r = region->waitIndex(667 + 1, 2, []() { return true; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Timeout); + } + { + AsyncWaker::Notifier notifier; + std::thread t([&]() { + notifier.wake(); + auto r = region->waitIndex(667 + 1, 100000, []() { return true; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Finished); + }); + SCOPE_EXIT({ + t.join(); + }); + ASSERT_EQ(notifier.blockedWaitFor(std::chrono::milliseconds(1000 * 3600)), AsyncNotifier::Status::Normal); + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + region->handleWriteRaftCmd({}, 667 + 1, 6, ctx.getTMTContext()); + } + } + } + kvs.stopReadIndexWorkers(); + kvs.releaseReadIndexWorkers(); + over = true; + proxy_instance->wake(); + proxy_runner.join(); + ASSERT(GCMonitor::instance().checkClean()); + ASSERT(!GCMonitor::instance().empty()); +} + +void RegionKVStoreTest::testRaftMergeRollback(KVStore & kvs, TMTContext & tmt) +{ + uint64_t region_id = 7; + { + auto source_region = kvs.getRegion(region_id); + auto target_region = kvs.getRegion(1); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::PrepareMerge); + auto * prepare_merge = request.mutable_prepare_merge(); + { + auto min_index = source_region->appliedIndex(); + prepare_merge->set_min_index(min_index); + + metapb::Region * target = prepare_merge->mutable_target(); + *target = target_region->getMetaRegion(); + } + } + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + region_id, + 31, + 6, + tmt); + ASSERT_TRUE(source_region->isMerging()); + } + { + auto region = kvs.getRegion(region_id); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::RollbackMerge); + + auto * rollback_merge = request.mutable_rollback_merge(); + { + auto merge_state = region->getMergeState(); + rollback_merge->set_commit(merge_state.commit()); + } + } + region->setStateApplying(); + + try + { + raft_cmdpb::AdminRequest first_request = request; + raft_cmdpb::AdminResponse first_response = response; + kvs.handleAdminRaftCmd(std::move(first_request), + std::move(first_response), + region_id, + 32, + 6, + tmt); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "execRollbackMerge: region state is Applying, expect Merging"); + } + ASSERT_EQ(region->peerState(), raft_serverpb::PeerState::Applying); + region->setPeerState(raft_serverpb::PeerState::Merging); + + region->meta.region_state.getMutMergeState().set_commit(1234); + try + { + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + region_id, + 32, + 6, + tmt); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "execRollbackMerge: merge commit index is 1234, expect 31"); + } + region->meta.region_state.getMutMergeState().set_commit(31); + } + { + auto region = kvs.getRegion(region_id); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::RollbackMerge); + + auto * rollback_merge = request.mutable_rollback_merge(); + { + auto merge_state = region->getMergeState(); + rollback_merge->set_commit(merge_state.commit()); + } + } + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + region_id, + 32, + 6, + tmt); + ASSERT_EQ(region->peerState(), raft_serverpb::PeerState::Normal); + } +} + +void RegionKVStoreTest::testRaftSplit(KVStore & kvs, TMTContext & tmt) +{ + { + auto region = kvs.getRegion(1); + auto table_id = 1; + region->insert("lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + region->insert("lock", RecordKVFormat::genKey(table_id, 8), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 8, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 8, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + + ASSERT_EQ(region->dataInfo(), "[write 2 lock 2 default 2 ]"); + } + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + // split region + auto region_id = 1; + RegionID region_id2 = 7; + auto source_region = kvs.getRegion(region_id); + metapb::RegionEpoch new_epoch; + new_epoch.set_version(source_region->version() + 1); + new_epoch.set_conf_ver(source_region->confVer()); + TiKVKey start_key1, start_key2, end_key1, end_key2; + { + start_key1 = RecordKVFormat::genKey(1, 5); + start_key2 = RecordKVFormat::genKey(1, 0); + end_key1 = RecordKVFormat::genKey(1, 10); + end_key2 = RecordKVFormat::genKey(1, 5); + } + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::BatchSplit); + raft_cmdpb::BatchSplitResponse * splits = response.mutable_splits(); + { + auto * region = splits->add_regions(); + region->set_id(region_id); + region->set_start_key(start_key1); + region->set_end_key(end_key1); + region->add_peers(); + *region->mutable_region_epoch() = new_epoch; + } + { + auto * region = splits->add_regions(); + region->set_id(region_id2); + region->set_start_key(start_key2); + region->set_end_key(end_key2); + region->add_peers(); + *region->mutable_region_epoch() = new_epoch; + } + } + } + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 1, 20, 5, tmt); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 5), RecordKVFormat::genKey(1, 10))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[write 1 lock 1 default 1 ]"); + ASSERT_EQ(kvs.getRegion(7)->dataInfo(), "[lock 1 ]"); + } + // rollback 1 to before split + // 7 is persisted + { + kvs.handleDestroy(1, tmt); + { + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + auto table_id = 1; + auto region = kvs.getRegion(1); + region->insert("lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + region->insert("lock", RecordKVFormat::genKey(table_id, 8), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 8, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 8, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + + ASSERT_EQ(region->dataInfo(), "[write 2 lock 2 default 2 ]"); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 2); + } + // split again + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 1, 20, 5, tmt); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 5), RecordKVFormat::genKey(1, 10))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[write 1 lock 1 default 1 ]"); + ASSERT_EQ(kvs.getRegion(7)->dataInfo(), "[lock 1 ]"); + } +} + +void RegionKVStoreTest::testRaftChangePeer(KVStore & kvs, TMTContext & tmt) +{ + { + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + auto region = makeRegion(88, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 100)); + lock.regions.emplace(88, region); + lock.index.add(region); + } + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeer); + auto meta = kvs.getRegion(88)->getMetaRegion(); + meta.mutable_peers()->Clear(); + meta.add_peers()->set_id(2); + meta.add_peers()->set_id(4); + *response.mutable_change_peer()->mutable_region() = meta; + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 88, 6, 5, tmt); + ASSERT_NE(kvs.getRegion(88), nullptr); + } + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeerV2); + auto meta = kvs.getRegion(88)->getMetaRegion(); + meta.mutable_peers()->Clear(); + meta.add_peers()->set_id(3); + meta.add_peers()->set_id(4); + *response.mutable_change_peer()->mutable_region() = meta; + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 88, 7, 5, tmt); + ASSERT_EQ(kvs.getRegion(88), nullptr); + } +} + +void RegionKVStoreTest::testRaftMerge(KVStore & kvs, TMTContext & tmt) +{ + { + kvs.getRegion(1)->clearAllData(); + kvs.getRegion(7)->clearAllData(); + + { + auto region = kvs.getRegion(1); + auto table_id = 1; + region->insert("lock", RecordKVFormat::genKey(table_id, 6), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 6, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 6, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(region->dataInfo(), "[write 1 lock 1 default 1 ]"); + } + { + auto region = kvs.getRegion(7); + auto table_id = 1; + region->insert("lock", RecordKVFormat::genKey(table_id, 2), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 2, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 2, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(region->dataInfo(), "[write 1 lock 1 default 1 ]"); + } + } + + { + auto region_id = 7; + auto source_region = kvs.getRegion(region_id); + auto target_region = kvs.getRegion(1); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::PrepareMerge); + + auto * prepare_merge = request.mutable_prepare_merge(); + { + auto min_index = source_region->appliedIndex(); + prepare_merge->set_min_index(min_index); + + metapb::Region * target = prepare_merge->mutable_target(); + *target = target_region->getMetaRegion(); + } + } + + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + source_region->id(), + 35, + 6, + tmt); + ASSERT_EQ(source_region->peerState(), raft_serverpb::PeerState::Merging); + } + + { + auto source_id = 7, target_id = 1; + auto source_region = kvs.getRegion(source_id); + raft_cmdpb::AdminRequest request; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::CommitMerge); + auto * commit_merge = request.mutable_commit_merge(); + { + commit_merge->set_commit(source_region->appliedIndex()); + *commit_merge->mutable_source() = source_region->getMetaRegion(); + } + } + source_region->setStateApplying(); + source_region->makeRaftCommandDelegate(kvs.genTaskLock()); + const auto & source_region_meta_delegate = source_region->meta.makeRaftCommandDelegate(); + try + { + kvs.getRegion(target_id)->meta.makeRaftCommandDelegate().checkBeforeCommitMerge(request, source_region_meta_delegate); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "checkBeforeCommitMerge: unexpected state Applying of source 1"); + } + source_region->setPeerState(raft_serverpb::PeerState::Normal); + { + request.mutable_commit_merge()->mutable_source()->mutable_start_key()->clear(); + } + try + { + kvs.getRegion(target_id)->meta.makeRaftCommandDelegate().checkBeforeCommitMerge(request, source_region_meta_delegate); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "checkBeforeCommitMerge: source region not match exist region meta"); + } + } + + { + auto source_id = 7, target_id = 1; + auto source_region = kvs.getRegion(source_id); + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::CommitMerge); + auto * commit_merge = request.mutable_commit_merge(); + { + commit_merge->set_commit(source_region->appliedIndex()); + *commit_merge->mutable_source() = source_region->getMetaRegion(); + } + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_TRUE(mmp.count(target_id) != 0); + ASSERT_EQ(mmp.size(), 2); + } + + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), + raft_cmdpb::AdminResponse(response), + target_id, + 36, + 6, + tmt); + + ASSERT_EQ(kvs.getRegion(source_id), nullptr); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + // add 7 back + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + auto region = makeRegion(7, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5)); + lock.regions.emplace(7, region); + lock.index.add(region); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 2); + } + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), + raft_cmdpb::AdminResponse(response), + target_id, + 36, + 6, + tmt); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[lock 2 ]"); + } +} + +TEST_F(RegionKVStoreTest, Region) +{ + TableID table_id = 100; + { + auto meta = RegionMeta(createPeer(2, true), createRegionInfo(666, RecordKVFormat::genKey(0, 0), RecordKVFormat::genKey(0, 1000)), initialApplyState()); + ASSERT_EQ(meta.peerId(), 2); + } + auto region = makeRegion(1, RecordKVFormat::genKey(table_id, 0), RecordKVFormat::genKey(table_id, 1000)); + { + ASSERT_TRUE(region->checkIndex(5)); + } + { + auto start_ts = 199; + auto req = GenRegionReadIndexReq(*region, start_ts); + ASSERT_EQ(req.ranges().size(), 1); + ASSERT_EQ(req.start_ts(), start_ts); + ASSERT_EQ(region->getMetaRegion().region_epoch().DebugString(), + req.context().region_epoch().DebugString()); + ASSERT_EQ(region->getRange()->comparableKeys().first.key, req.ranges()[0].start_key()); + ASSERT_EQ(region->getRange()->comparableKeys().second.key, req.ranges()[0].end_key()); + } + { + region->insert("lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(1, region->writeCFCount()); + ASSERT_EQ(region->dataInfo(), "[write 1 lock 1 default 1 ]"); + { + auto iter = region->createCommittedScanner(); + auto lock = iter.getLockInfo({100, nullptr}); + ASSERT_NE(lock, nullptr); + auto k = lock->intoLockInfo(); + ASSERT_EQ(k->lock_version(), 3); + } + { + std::optional data_list_read = ReadRegionCommitCache(region, true); + ASSERT_TRUE(data_list_read); + ASSERT_EQ(1, data_list_read->size()); + RemoveRegionCommitCache(region, *data_list_read); + } + ASSERT_EQ(0, region->writeCFCount()); + { + region->remove("lock", RecordKVFormat::genKey(table_id, 3)); + auto iter = region->createCommittedScanner(); + auto lock = iter.getLockInfo({100, nullptr}); + ASSERT_EQ(lock, nullptr); + } + region->clearAllData(); + } + { + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(region->dataInfo(), "[write 1 ]"); + + auto ori_size = region->dataSize(); + try + { + // insert duplicate records + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Found existing key in hex: 7480000000000000FF645F728000000000FF0000030000000000FAFFFFFFFFFFFFFFF7"); + } + ASSERT_EQ(ori_size, region->dataSize()); + + region->tryCompactionFilter(100); + ASSERT_EQ(region->dataInfo(), "[]"); + } + { + region->insert("write", RecordKVFormat::genKey(table_id, 4, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::DelFlag, 5)); + ASSERT_EQ(1, region->writeCFCount()); + region->remove("write", RecordKVFormat::genKey(table_id, 4, 8)); + ASSERT_EQ(1, region->writeCFCount()); + { + std::optional data_list_read = ReadRegionCommitCache(region, true); + ASSERT_TRUE(data_list_read); + ASSERT_EQ(1, data_list_read->size()); + RemoveRegionCommitCache(region, *data_list_read); + } + ASSERT_EQ(0, region->writeCFCount()); + } + { + ASSERT_EQ(0, region->dataSize()); + + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + ASSERT_LT(0, region->dataSize()); + + region->remove("default", RecordKVFormat::genKey(table_id, 3, 5)); + ASSERT_EQ(0, region->dataSize()); + + // remove duplicate records + region->remove("default", RecordKVFormat::genKey(table_id, 3, 5)); + ASSERT_EQ(0, region->dataSize()); + } +} + +TEST_F(RegionKVStoreTest, KVStore) +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + + KVStore & kvs = getKVS(); + { + // Run without read-index workers + + kvs.initReadIndexWorkers( + []() { + return std::chrono::milliseconds(10); + }, + 0); + ASSERT_EQ(kvs.read_index_worker_manager, nullptr); + kvs.asyncRunReadIndexWorkers(); + kvs.stopReadIndexWorkers(); + kvs.releaseReadIndexWorkers(); + } + { + auto store = metapb::Store{}; + store.set_id(1234); + kvs.setStore(store); + ASSERT_EQ(kvs.getStoreID(), store.id()); + } + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + { + auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)); + lock.regions.emplace(2, region); + lock.index.add(region); + } + { + auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)); + lock.regions.emplace(3, region); + lock.index.add(region); + } + } + { + kvs.tryPersist(1); + kvs.gcRegionPersistedCache(Seconds{0}); + } + { + ASSERT_EQ(kvs.regionSize(), 3); + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 15), TiKVKey(""))); + ASSERT_EQ(mmp.size(), 2); + kvs.handleDestroy(3, ctx.getTMTContext()); + kvs.handleDestroy(3, ctx.getTMTContext()); + } + { + RegionMap mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 15), TiKVKey(""))); + ASSERT_EQ(mmp.size(), 1); + ASSERT_EQ(mmp.at(2)->id(), 2); + } + { + { + raft_cmdpb::RaftCmdRequest request; + { + auto lock_key = RecordKVFormat::genKey(1, 2333); + TiKVValue lock_value = RecordKVFormat::encodeLockCfValue(Region::DelFlag, "pk", 77, 0); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Lock, lock_key, lock_value); + auto write_key = RecordKVFormat::genKey(1, 2333, 1); + TiKVValue write_value = RecordKVFormat::encodeWriteCfValue(Region::PutFlag, 2333); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Write, write_key, write_value); + } + try + { + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 6, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Raw TiDB PK: 800000000000091D, Prewrite ts: 2333 can not found in default cf for key: 7480000000000000FF015F728000000000FF00091D0000000000FAFFFFFFFFFFFFFFFE"); + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[write 1 lock 1 ]"); + kvs.getRegion(1)->tryCompactionFilter(1000); + } + try + { + raft_cmdpb::RaftCmdRequest request; + { + auto key = RecordKVFormat::genKey(1, 2333, 1); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Default, key, "v1"); + } + { + // duplicate + auto key = RecordKVFormat::genKey(1, 2333, 1); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Default, key, "v1"); + } + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 6, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Found existing key in hex: 7480000000000000FF015F728000000000FF00091D0000000000FAFFFFFFFFFFFFFFFE"); + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[lock 1 default 1 ]"); + kvs.getRegion(1)->remove("default", RecordKVFormat::genKey(1, 2333, 1)); + try + { + raft_cmdpb::RaftCmdRequest request; + { + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Default, std::string("k1"), "v1"); + } + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 6, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Unexpected eof"); + } + try + { + raft_cmdpb::RaftCmdRequest request; + request.add_requests()->set_cmd_type(::raft_cmdpb::CmdType::Invalid); + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 10, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Unsupport raft cmd Invalid"); + } + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[lock 1 ]"); + { + raft_cmdpb::RaftCmdRequest request; + { + auto lock_key = RecordKVFormat::genKey(1, 2333); + TiKVValue lock_value = RecordKVFormat::encodeLockCfValue(Region::DelFlag, "pk", 77, 0); + RegionBench::setupDelRequest(request.add_requests(), ColumnFamilyName::Lock, lock_key); + } + raft_cmdpb::RaftCmdRequest first_request = request; + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(first_request), 1, 7, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + + RegionBench::setupDelRequest(request.add_requests(), ColumnFamilyName::Write, TiKVKey("illegal key")); + // index <= appliedIndex(), ignore + raft_cmdpb::RaftCmdRequest second_request; + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(second_request), 1, 7, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + try + { + // + request.clear_requests(); + RegionBench::setupDelRequest(request.add_requests(), ColumnFamilyName::Write, TiKVKey("illegal key")); + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 9, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Key padding"); + } + + ASSERT_EQ(kvs.getRegion(1)->appliedIndex(), 7); + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[]"); + + ASSERT_EQ( + kvs.handleWriteRaftCmd(raft_cmdpb::RaftCmdRequest{}, 8192, 7, 6, ctx.getTMTContext()), + EngineStoreApplyRes::NotFound); + } + { + kvs.handleDestroy(2, ctx.getTMTContext()); + ASSERT_EQ(kvs.regionSize(), 1); + } + { + testRaftSplit(kvs, ctx.getTMTContext()); + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{}, raft_cmdpb::AdminResponse{}, 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + } + { + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{}, raft_cmdpb::AdminResponse{}, 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + } + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + raft_cmdpb::AdminResponse first_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(first_response), 7, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + raft_cmdpb::AdminResponse second_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(second_response), 7, 23, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + request.set_cmd_type(::raft_cmdpb::AdminCmdType::ComputeHash); + raft_cmdpb::AdminResponse third_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(third_response), 7, 24, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); + + request.set_cmd_type(::raft_cmdpb::AdminCmdType::VerifyHash); + raft_cmdpb::AdminResponse fourth_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(fourth_response), 7, 25, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); + + raft_cmdpb::AdminResponse fifth_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(fifth_response), 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + { + kvs.setRegionCompactLogConfig(0, 0, 0); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response), 7, 26, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + } + } + { + testRaftMergeRollback(kvs, ctx.getTMTContext()); + testRaftMerge(kvs, ctx.getTMTContext()); + } + { + testRaftChangePeer(kvs, ctx.getTMTContext()); + } + { + auto ori_snapshot_apply_method = kvs.snapshot_apply_method; + kvs.snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Single; + SCOPE_EXIT({ + kvs.snapshot_apply_method = ori_snapshot_apply_method; + }); + + + auto region_id = 19; + auto region = makeRegion(region_id, RecordKVFormat::genKey(1, 50), RecordKVFormat::genKey(1, 60)); + auto region_id_str = std::to_string(19); + auto & mmp = MockSSTReader::getMockSSTData(); + MockSSTReader::getMockSSTData().clear(); + MockSSTReader::Data default_kv_list; + { + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 55, 5).getStr(), TiKVValue("value1").getStr()); + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 58, 5).getStr(), TiKVValue("value2").getStr()); + } + mmp[MockSSTReader::Key{region_id_str, ColumnFamilyType::Default}] = std::move(default_kv_list); + std::vector sst_views; + sst_views.push_back(SSTView{ + ColumnFamilyType::Default, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }); + { + RegionMockTest mock_test(kvstore.get(), region); + + kvs.handleApplySnapshot( + region->getMetaRegion(), + 2, + SSTViewVec{sst_views.data(), sst_views.size()}, + 8, + 5, + ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(19)->checkIndex(8), true); + try + { + kvs.handleApplySnapshot( + region->getMetaRegion(), + 2, + {}, // empty + 6, // smaller index + 5, + ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "[region 19] already has newer apply-index 8 than 6, should not happen"); + } + } + + { + { + auto region = makeRegion(22, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 9, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); + } + try + { + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 9, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, but not tombstone + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "range of region 20 is overlapped with 22, state: region { id: 22 }"); + } + + { + const auto * ori_ptr = proxy_helper->proxy_ptr.inner; + proxy_helper->proxy_ptr.inner = nullptr; + SCOPE_EXIT({ + proxy_helper->proxy_ptr.inner = ori_ptr; + }); + + try + { + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 10, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "getRegionLocalState meet internal error: RaftStoreProxyPtr is none"); + } + } + + { + proxy_instance->getRegion(22)->setSate(({ + raft_serverpb::RegionLocalState s; + s.set_state(::raft_serverpb::PeerState::Tombstone); + s; + })); + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 10, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, tombstone, remove previous one + + auto state = proxy_helper->getRegionLocalState(8192); + ASSERT_EQ(state.state(), raft_serverpb::PeerState::Tombstone); + } + + kvs.handleDestroy(20, ctx.getTMTContext()); + } + } + + { + auto region_id = 19; + auto region_id_str = std::to_string(19); + auto & mmp = MockSSTReader::getMockSSTData(); + MockSSTReader::getMockSSTData().clear(); + MockSSTReader::Data default_kv_list; + { + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 55, 5).getStr(), TiKVValue("value1").getStr()); + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 58, 5).getStr(), TiKVValue("value2").getStr()); + } + mmp[MockSSTReader::Key{region_id_str, ColumnFamilyType::Default}] = std::move(default_kv_list); + + // Mock SST data for handle [star, end) + auto region = kvs.getRegion(region_id); + + RegionMockTest mock_test(kvstore.get(), region); + + { + // Mocking ingest a SST for column family "Write" + std::vector sst_views; + sst_views.push_back(SSTView{ + ColumnFamilyType::Default, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }); + kvs.handleIngestSST( + region_id, + SSTViewVec{sst_views.data(), sst_views.size()}, + 100, + 1, + ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(19)->checkIndex(100), true); + } + } + + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::InvalidAdmin); + + try + { + kvs.handleAdminRaftCmd(std::move(request), std::move(response), 1, 110, 6, ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "unsupported admin command type InvalidAdmin"); + } + } + { + // There shall be data to flush. + ASSERT_EQ(kvs.needFlushRegionData(19, ctx.getTMTContext()), true); + // Force flush until succeed only for testing. + ASSERT_EQ(kvs.tryFlushRegionData(19, true, ctx.getTMTContext(), 0, 0), true); + // Non existing region. + // Flush and CompactLog will not panic. + ASSERT_EQ(kvs.tryFlushRegionData(1999, true, ctx.getTMTContext(), 0, 0), true); + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), 1999, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + } +} + +TEST_F(RegionKVStoreTest, KVStoreRestore) +{ + { + KVStore & kvs = getKVS(); + { + auto store = metapb::Store{}; + store.set_id(1234); + kvs.setStore(store); + ASSERT_EQ(kvs.getStoreID(), store.id()); + } + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + { + auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)); + lock.regions.emplace(2, region); + lock.index.add(region); + } + { + auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)); + lock.regions.emplace(3, region); + lock.index.add(region); + } + } + kvs.tryPersist(1); + kvs.tryPersist(2); + kvs.tryPersist(3); + } + { + KVStore & kvs = reloadKVSFromDisk(); + kvs.getRegion(1); + kvs.getRegion(2); + kvs.getRegion(3); + } +} + +void test_mergeresult() +{ + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "x", ""), createRegionInfo(1000, "", "x")).source_at_left, false); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "", "x"), createRegionInfo(1000, "x", "")).source_at_left, true); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "x", "y"), createRegionInfo(1000, "y", "z")).source_at_left, true); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "y", "z"), createRegionInfo(1000, "x", "y")).source_at_left, false); + + { + RegionState region_state; + bool source_at_left; + RegionState source_region_state; + + region_state.setStartKey(RecordKVFormat::genKey(1, 0)); + region_state.setEndKey(RecordKVFormat::genKey(1, 10)); + + source_region_state.setStartKey(RecordKVFormat::genKey(1, 10)); + source_region_state.setEndKey(RecordKVFormat::genKey(1, 20)); + + source_at_left = false; + + ChangeRegionStateRange(region_state, source_at_left, source_region_state); + + ASSERT_EQ(region_state.getRange()->comparableKeys().first.key, RecordKVFormat::genKey(1, 0)); + ASSERT_EQ(region_state.getRange()->comparableKeys().second.key, RecordKVFormat::genKey(1, 20)); + } + { + RegionState region_state; + bool source_at_left; + RegionState source_region_state; + + region_state.setStartKey(RecordKVFormat::genKey(2, 5)); + region_state.setEndKey(RecordKVFormat::genKey(2, 10)); + + source_region_state.setStartKey(RecordKVFormat::genKey(2, 0)); + source_region_state.setEndKey(RecordKVFormat::genKey(2, 5)); + + source_at_left = true; + + ChangeRegionStateRange(region_state, source_at_left, source_region_state); + + ASSERT_EQ(region_state.getRange()->comparableKeys().first.key, RecordKVFormat::genKey(2, 0)); + ASSERT_EQ(region_state.getRange()->comparableKeys().second.key, RecordKVFormat::genKey(2, 10)); + } +} + +TEST_F(RegionKVStoreTest, Basic) +{ + { + RegionsRangeIndex region_index; + const auto & root_map = region_index.getRoot(); + ASSERT_EQ(root_map.size(), 2); + + region_index.add(makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10))); + + ASSERT_EQ(root_map.begin()->second.region_map.size(), 0); + + region_index.add(makeRegion(2, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 3))); + region_index.add(makeRegion(3, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1))); + + auto res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 3); + + region_index.add(makeRegion(4, RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 4))); + + ASSERT_EQ(root_map.size(), 7); + + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 4); + + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 1), TiKVKey(""))); + ASSERT_EQ(res.size(), 3); + + res = region_index.findByRangeOverlap( + RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 2), RecordKVFormat::genKey(1, 5))); + ASSERT_EQ(res.size(), 3); + ASSERT_TRUE(res.find(1) != res.end()); + ASSERT_TRUE(res.find(2) != res.end()); + ASSERT_TRUE(res.find(4) != res.end()); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 4)), 4); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 3); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1)), 3); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 2); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 3)), 2); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 1); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)), 1); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_TRUE(res.empty()); + + ASSERT_EQ(root_map.size(), 2); + } + + { + RegionsRangeIndex region_index; + const auto & root_map = region_index.getRoot(); + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(TiKVKey(), TiKVKey()), 1); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found region 1"); + } + + region_index.add(makeRegion(2, RecordKVFormat::genKey(1, 3), RecordKVFormat::genKey(1, 5))); + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 4), RecordKVFormat::genKey(1, 5)), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found start key"); + } + + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 3), RecordKVFormat::genKey(1, 4)), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found end key"); + } + + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 3), RecordKVFormat::genKey(1, 3)), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): range of region 2 is empty"); + } + + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 3), TiKVKey()), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found region 2"); + } + + region_index.clear(); + + try + { + region_index.add(makeRegion(6, RecordKVFormat::genKey(6, 6), RecordKVFormat::genKey(6, 6))); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + std::string tar = "Illegal region range, should not happen"; + ASSERT(res.size() > tar.size()); + ASSERT_EQ(res.substr(0, tar.size()), tar); + } + + region_index.clear(); + + region_index.add(makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1))); + region_index.add(makeRegion(2, RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 2))); + region_index.add(makeRegion(3, RecordKVFormat::genKey(1, 2), RecordKVFormat::genKey(1, 3))); + + ASSERT_EQ(root_map.size(), 6); + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 2), RecordKVFormat::genKey(1, 3)), 3); + ASSERT_EQ(root_map.size(), 5); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1)), 1); + ASSERT_EQ(root_map.size(), 4); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 2)), 2); + ASSERT_EQ(root_map.size(), 2); + } + { + test_mergeresult(); + } +} + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Storages/Transaction/tests/gtest_region_block_reader.cpp b/dbms/src/Storages/Transaction/tests/gtest_region_block_reader.cpp index 6314142d500..34d9d5fa00d 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_region_block_reader.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_region_block_reader.cpp @@ -1,8 +1,42 @@ +<<<<<<< HEAD #include #include #include #include "RowCodecTestUtils.h" +======= +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) using TableInfo = TiDB::TableInfo; @@ -49,8 +83,18 @@ class RegionBlockReaderTestFixture : public ::testing::Test std::vector pk_fields; for (size_t i = 0; i < table_info.columns.size(); i++) { +<<<<<<< HEAD if (!table_info.columns[i].hasPriKeyFlag()) value_fields.emplace_back(fields[i]); +======= + if (table_info.is_common_handle || table_info.pk_is_handle) + { + if (table_info.columns[i].hasPriKeyFlag()) + key_encode_fields.emplace_back(fields[i]); + else + value_encode_fields.emplace_back(fields[i]); + } +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) else pk_fields.emplace_back(fields[i]); } @@ -59,7 +103,7 @@ class RegionBlockReaderTestFixture : public ::testing::Test WriteBufferFromOwnString pk_buf; if (table_info.is_common_handle) { - auto & primary_index_info = table_info.getPrimaryIndexInfo(); + const auto & primary_index_info = table_info.getPrimaryIndexInfo(); for (size_t i = 0; i < primary_index_info.idx_cols.size(); i++) { auto idx = column_name_columns_index_map[primary_index_info.idx_cols[i].name]; @@ -123,7 +167,14 @@ class RegionBlockReaderTestFixture : public ::testing::Test } else { +<<<<<<< HEAD ASSERT_EQ((*column_element.column)[row], fields_map.at(column_element.column_id)); +======= + if (fields_map.count(column_element.column_id) > 0) + ASSERT_FIELD_EQ((*column_element.column)[row], fields_map.at(column_element.column_id)) << gen_error_log(); + else + LOG_INFO(logger, "ignore value check for new added column, id={}, name={}", column_element.column_id, column_element.name); +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } } } @@ -227,7 +278,22 @@ class RegionBlockReaderTestFixture : public ::testing::Test } }; +<<<<<<< HEAD TEST_F(RegionBlockReaderTestFixture, PKIsNotHandle) +======= +String bytesFromHexString(std::string_view hex_str) +{ + assert(hex_str.size() % 2 == 0); + String bytes(hex_str.size() / 2, '\x00'); + for (size_t i = 0; i < bytes.size(); ++i) + { + bytes[i] = unhex2(hex_str.data() + i * 2); + } + return bytes; +} + +TEST_F(RegionBlockReaderTest, PKIsNotHandle) +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) { auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); encodeColumns(table_info, fields, RowEncodeVersion::RowV2); @@ -335,4 +401,222 @@ TEST_F(RegionBlockReaderTestFixture, InvalidNULLRowV1) ASSERT_ANY_THROW(decodeAndCheckColumns(new_decoding_schema, true)); } + +TEST_F(RegionBlockReaderTest, MissingPrimaryKeyColumnRowV2) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) DEFAULT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello"), ColumnIDValueNull(4)); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_FALSE(table_info.getColumnInfo(4).hasPriKeyFlag()); + + // FIXME: actually TiDB won't encode the "NULL" for column4 into value + // but now the `RowEncoderV2` does not support this, we use `RegionBlockReaderTest::ReadFromRegion` + // to test that. + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + + // Mock re-create the primary key index with "column4" that contains `NULL` value + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + // FIXME: actually we need to decode the block with force_decode=true, see the + // comments before `encodeColumns` + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); + // // force_decode=false can not decode because there are + // // missing value for column with primary key flag. + // EXPECT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + // // force_decode=true, decode ok. + // EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} +CATCH + +TEST_F(RegionBlockReaderTest, MissingPrimaryKeyColumnRowV1) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) DEFAULT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello"), ColumnIDValueNull(4)); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_FALSE(table_info.getColumnInfo(4).hasPriKeyFlag()); + + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + + // Mock re-create the primary key index with "column4" that contains `NULL` value + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, false)); +} +CATCH + +TEST_F(RegionBlockReaderTest, NewMissingPrimaryKeyColumnRowV2) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello")); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_ANY_THROW(table_info.getColumnInfo(4)); // not exist + + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + + // Mock re-create the primary key index with new-added "column4" + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + // force_decode=false can not decode because there are + // missing value for column with primary key flag. + EXPECT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + // force_decode=true, decode ok. + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} +CATCH + +TEST_F(RegionBlockReaderTest, NewMissingPrimaryKeyColumnRowV1) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello")); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_ANY_THROW(table_info.getColumnInfo(4)); // not exist + + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + + // Mock re-create the primary key index with new-added "column4" + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + // force_decode=false can not decode because there are + // missing value for column with primary key flag. + EXPECT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + // force_decode=true, decode ok. + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} +CATCH + +TEST_F(RegionBlockReaderTest, ReadFromRegion) +try +{ + TableInfo table_info(R"({"cols":[ + {"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"case_no","O":"case_no"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"utf8mb4","Collate":"utf8mb4_bin","Decimal":0,"Elems":null,"Flag":4099,"Flen":32,"Tp":15}}, + {"comment":"","default":null,"default_bit":null,"id":2,"name":{"L":"p","O":"p"},"offset":1,"origin_default":null,"state":5,"type":{"Charset":"utf8mb4","Collate":"utf8mb4_bin","Decimal":0,"Elems":null,"Flag":0,"Flen":12,"Tp":15}}, + {"comment":"","default":null,"default_bit":null,"id":3,"name":{"L":"source","O":"source"},"offset":2,"origin_default":"","state":5,"type":{"Charset":"utf8mb4","Collate":"utf8mb4_bin","Decimal":0,"Elems":null,"Flag":4099,"Flen":20,"Tp":15}} + ],"comment":"","id":77,"index_info":[],"is_common_handle":false,"name":{"L":"t_case","O":"t_case"},"partition":null,"pk_is_handle":false,"schema_version":62,"state":5,"tiflash_replica":{"Count":1},"update_timestamp":435984541435559947})"); + + RegionID region_id = 4; + String region_start_key(bytesFromHexString("7480000000000000FF445F720000000000FA")); + String region_end_key(bytesFromHexString("7480000000000000FF4500000000000000F8")); + auto region = makeRegion(region_id, region_start_key, region_end_key); + // the hex kv dump from SSTFile + std::vector> kvs = { + {"7480000000000000FF4D5F728000000000FF0000010000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000010000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010031"}, + {"7480000000000000FF4D5F728000000000FF0000020000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000020000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010032"}, + {"7480000000000000FF4D5F728000000000FF0000030000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000030000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010033"}, + {"7480000000000000FF4D5F728000000000FF0000040000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000040000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010034"}, + }; + for (const auto & [k, v] : kvs) + { + region->insert(ColumnFamilyType::Write, TiKVKey(bytesFromHexString(k)), TiKVValue(bytesFromHexString(v))); + } + + auto data_list_read = ReadRegionCommitCache(region, true); + ASSERT_TRUE(data_list_read.has_value()); + + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + { + // force_decode=false can not decode because there are + // missing value for column with primary key flag. + auto reader = RegionBlockReader(decoding_schema); + Block res_block = createBlockSortByColumnID(decoding_schema); + EXPECT_FALSE(reader.read(res_block, *data_list_read, false)); + } + { + // force_decode=true can decode the block + auto reader = RegionBlockReader(decoding_schema); + Block res_block = createBlockSortByColumnID(decoding_schema); + EXPECT_TRUE(reader.read(res_block, *data_list_read, true)); + res_block.checkNumberOfRows(); + EXPECT_EQ(res_block.rows(), 4); + ASSERT_COLUMN_EQ(res_block.getByName("case_no"), createColumn({"1", "2", "3", "4"})); + } +} +CATCH + + } // namespace DB::tests diff --git a/tests/fullstack-test2/ddl/alter_pk.test b/tests/fullstack-test2/ddl/alter_pk.test index 3541fcb270a..5859e21cf46 100644 --- a/tests/fullstack-test2/ddl/alter_pk.test +++ b/tests/fullstack-test2/ddl/alter_pk.test @@ -43,3 +43,56 @@ mysql> alter table test.t drop primary key; │ f │ Nullable(Int32) │ │ │ │ _tidb_rowid │ Int64 │ │ │ └─────────────┴─────────────────┴──────────────┴────────────────────┘ + +# issue 5859, case 1 +mysql> drop table if exists test.t_case; +## create table with `source` is nullable +## insert some data and left `source` to be empty +mysql> create table test.t_case (`case_no` varchar(32) not null,`source` varchar(20) default null,`p` varchar(12) DEFAULT NULL,primary key (`case_no`)); +mysql> insert into test.t_case(case_no) values ("1"), ("2"), ("3"), ("4"); + +## drop the primary key, fill the `source` to be non-empty +## add new primary key with case_no and source +mysql> alter table test.t_case drop primary key; +mysql> update test.t_case set `source` = '' where `source` is NULL; +mysql> alter table test.t_case add primary key (`case_no`, `source`); + +## send the snapshot data to tiflash +mysql> alter table test.t_case set tiflash replica 1; +func> wait_table test t_case +mysql> select case_no,p,source from test.t_case; ++---------+------+--------+ +| case_no | p | source | ++---------+------+--------+ +| 1 | NULL | | +| 2 | NULL | | +| 3 | NULL | | +| 4 | NULL | | ++---------+------+--------+ + + +# issue 5859, case 2 +mysql> drop table if exists test.t_case; +## create table with `case_no` +mysql> create table test.t_case (`case_no` varchar(32) not null,`p` varchar(12) DEFAULT NULL,primary key (`case_no`)); +mysql> insert into test.t_case(case_no) values ("1"), ("2"), ("3"), ("4"); + +mysql> alter table test.t_case add column `source` varchar(20) not null; +## drop the primary key, add new primary key with case_no and source +mysql> alter table test.t_case drop primary key; +mysql> alter table test.t_case add primary key (`case_no`, `source`); + +## send the snapshot data to tiflash +mysql> alter table test.t_case set tiflash replica 1; +func> wait_table test t_case +mysql> select case_no,p,source from test.t_case; ++---------+------+--------+ +| case_no | p | source | ++---------+------+--------+ +| 1 | NULL | | +| 2 | NULL | | +| 3 | NULL | | +| 4 | NULL | | ++---------+------+--------+ + +mysql> drop table if exists test.t_case;