From ed6ef008d9644fddcfd79a78e0c0e9ad0e8ebb22 Mon Sep 17 00:00:00 2001 From: JaySon Date: Fri, 16 Sep 2022 21:42:59 +0800 Subject: [PATCH] This is an automated cherry-pick of #5879 Signed-off-by: ti-chi-bot --- dbms/src/Columns/ColumnAggregateFunction.cpp | 9 + dbms/src/Columns/ColumnArray.cpp | 9 + dbms/src/Columns/ColumnDecimal.cpp | 9 + dbms/src/Columns/ColumnFixedString.cpp | 9 + dbms/src/Columns/ColumnString.cpp | 9 + dbms/src/Columns/ColumnVector.cpp | 9 + .../DeltaMerge/SSTFilesToBlockInputStream.h | 12 + .../Storages/Transaction/ApplySnapshot.cpp | 45 +- .../Storages/Transaction/PartitionStreams.cpp | 46 +- .../Storages/Transaction/PartitionStreams.h | 12 + .../Transaction/RegionBlockReader.cpp | 48 + .../Storages/Transaction/RegionBlockReader.h | 20 +- dbms/src/Storages/Transaction/RowCodec.cpp | 346 ++++ dbms/src/Storages/Transaction/RowCodec.h | 16 + .../Transaction/tests/RowCodecTestUtils.h | 359 ++++ .../Transaction/tests/gtest_kvstore.cpp | 1500 +++++++++++++++++ .../tests/gtest_region_block_reader.cpp | 640 +++++++ tests/fullstack-test2/ddl/alter_pk.test | 53 + 18 files changed, 3125 insertions(+), 26 deletions(-) create mode 100644 dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h create mode 100644 dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp create mode 100644 dbms/src/Storages/Transaction/tests/gtest_region_block_reader.cpp diff --git a/dbms/src/Columns/ColumnAggregateFunction.cpp b/dbms/src/Columns/ColumnAggregateFunction.cpp index ea00aee2e5a..a0d066f0d9e 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.cpp +++ b/dbms/src/Columns/ColumnAggregateFunction.cpp @@ -84,11 +84,20 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start const ColumnAggregateFunction & from_concrete = static_cast(from); if (start + length > from_concrete.getData().size()) +<<<<<<< HEAD throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnAggregateFunction::insertRangeFrom method" " (data.size() = " + toString(from_concrete.getData().size()) + ").", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnAggregateFunction::insertRangeFrom method, start={}, length={}, from.size()={}", + start, + length, + from_concrete.getData().size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); if (!empty() && src.get() != &from_concrete) diff --git a/dbms/src/Columns/ColumnArray.cpp b/dbms/src/Columns/ColumnArray.cpp index 19adac44b62..cac97d52107 100644 --- a/dbms/src/Columns/ColumnArray.cpp +++ b/dbms/src/Columns/ColumnArray.cpp @@ -413,7 +413,16 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng const ColumnArray & src_concrete = static_cast(src); if (start + length > src_concrete.getOffsets().size()) +<<<<<<< HEAD throw Exception("Parameter out of bound in ColumnArray::insertRangeFrom method.", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnArray::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_concrete.getOffsets().size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t nested_offset = src_concrete.offsetAt(start); diff --git a/dbms/src/Columns/ColumnDecimal.cpp b/dbms/src/Columns/ColumnDecimal.cpp index c6c0e880f54..aa9357e414a 100644 --- a/dbms/src/Columns/ColumnDecimal.cpp +++ b/dbms/src/Columns/ColumnDecimal.cpp @@ -222,8 +222,17 @@ void ColumnDecimal::insertRangeFrom(const IColumn & src, size_t start, size_t const ColumnDecimal & src_vec = static_cast(src); if (start + length > src_vec.data.size()) +<<<<<<< HEAD throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnDecimal::insertRangeFrom method (data.size() = " + toString(src_vec.data.size()) + ").", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnDecimal::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_vec.data.size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t old_size = data.size(); diff --git a/dbms/src/Columns/ColumnFixedString.cpp b/dbms/src/Columns/ColumnFixedString.cpp index 378beff4f0a..c9964b2d685 100644 --- a/dbms/src/Columns/ColumnFixedString.cpp +++ b/dbms/src/Columns/ColumnFixedString.cpp @@ -172,10 +172,19 @@ void ColumnFixedString::insertRangeFrom(const IColumn & src, size_t start, size_ const ColumnFixedString & src_concrete = static_cast(src); if (start + length > src_concrete.size()) +<<<<<<< HEAD throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnFixedString::insertRangeFrom method" " (size() = " + toString(src_concrete.size()) + ").", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnFixedString::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_concrete.size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t old_size = chars.size(); diff --git a/dbms/src/Columns/ColumnString.cpp b/dbms/src/Columns/ColumnString.cpp index b5ecfea33ec..281f363d649 100644 --- a/dbms/src/Columns/ColumnString.cpp +++ b/dbms/src/Columns/ColumnString.cpp @@ -72,7 +72,16 @@ void ColumnString::insertRangeFrom(const IColumn & src, size_t start, size_t len const ColumnString & src_concrete = static_cast(src); if (start + length > src_concrete.offsets.size()) +<<<<<<< HEAD throw Exception("Parameter out of bound in IColumnString::insertRangeFrom method.", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnString::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_concrete.size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t nested_offset = src_concrete.offsetAt(start); diff --git a/dbms/src/Columns/ColumnVector.cpp b/dbms/src/Columns/ColumnVector.cpp index 821b28e8040..47f5067f372 100644 --- a/dbms/src/Columns/ColumnVector.cpp +++ b/dbms/src/Columns/ColumnVector.cpp @@ -181,10 +181,19 @@ void ColumnVector::insertRangeFrom(const IColumn & src, size_t start, size_t const ColumnVector & src_vec = static_cast(src); if (start + length > src_vec.data.size()) +<<<<<<< HEAD throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnVector::insertRangeFrom method" " (data.size() = " + toString(src_vec.data.size()) + ").", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnVector::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_vec.data.size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t old_size = data.size(); diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h index af9183c867d..229010f077f 100644 --- a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h @@ -41,6 +41,7 @@ using BoundedSSTFilesToBlockInputStreamPtr = std::shared_ptr>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) String getName() const override { return "SSTFilesToBlockInputStream"; } diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 8d0d4ffd2b4..db6be5d9e83 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -19,6 +19,11 @@ #include #include #include +<<<<<<< HEAD +======= +#include +#include +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) #include @@ -258,6 +263,7 @@ void KVStore::onSnapshot(const RegionPtrWrap & new_region_wrap, RegionPtr old_re } } +<<<<<<< HEAD extern RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr &, Context &); /// `preHandleSnapshotToBlock` read data from SSTFiles and predoced the data as a block @@ -321,10 +327,22 @@ RegionPreDecodeBlockDataPtr KVStore::preHandleSnapshotToBlock( return cache; } +======= +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) std::vector KVStore::preHandleSnapshotToFiles( RegionPtr new_region, const SSTViewVec snaps, uint64_t index, uint64_t term, TMTContext & tmt) { - return preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt); + std::vector ingest_ids; + try + { + ingest_ids = preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("(while preHandleSnapshot region_id={}, index={}, term={})", new_region->id(), index, term)); + e.rethrow(); + } + return ingest_ids; } /// `preHandleSSTsToDTFiles` read data from SSTFiles and generate DTFile(s) for commited data @@ -339,7 +357,15 @@ std::vector KVStore::preHandleSSTsToDTFiles( // Use failpoint to change the expected_block_size for some test cases fiu_do_on(FailPoints::force_set_sst_to_dtfile_block_size, { expected_block_size = 3; }); +<<<<<<< HEAD PageIds ids; +======= + Stopwatch watch; + SCOPE_EXIT({ GET_METRIC(tiflash_raft_command_duration_seconds, type_apply_snapshot_predecode).Observe(watch.elapsedSeconds()); }); + + PageIds generated_ingest_ids; + TableID physical_table_id = InvalidTableID; +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) while (true) { // If any schema changes is detected during decoding SSTs to DTFiles, we need to cancel and recreate DTFiles with @@ -364,6 +390,7 @@ std::vector KVStore::preHandleSSTsToDTFiles( /* ignore_cache= */ false, context.getSettingsRef().safe_point_update_interval_seconds); } + physical_table_id = storage->getTableInfo().id; // Read from SSTs and refine the boundary of blocks output to DTFiles auto sst_stream = std::make_shared( @@ -375,7 +402,7 @@ std::vector KVStore::preHandleSSTsToDTFiles( stream->writePrefix(); stream->write(); stream->writeSuffix(); - ids = stream->ingestIds(); + generated_ingest_ids = stream->ingestIds(); (void)table_drop_lock; // the table should not be dropped during ingesting file break; @@ -420,12 +447,13 @@ std::vector KVStore::preHandleSSTsToDTFiles( else { // Other unrecoverable error, throw + e.addMessage(fmt::format("physical_table_id={}", physical_table_id)); throw; } } } - return ids; + return generated_ingest_ids; } template @@ -594,7 +622,16 @@ RegionPtr KVStore::handleIngestSSTByDTFile(const RegionPtr & region, const SSTVi } // Decode the KV pairs in ingesting SST into DTFiles - PageIds ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt); + PageIds ingest_ids; + try + { + ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("(while handleIngestSST region_id={}, index={}, term={})", tmp_region->id(), index, term)); + e.rethrow(); + } // If `ingest_ids` is empty, ingest SST won't write delete_range for ingest region, it is safe to // ignore the step of calling `ingestFiles` diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index 4af07e0d61d..3f33b78b523 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -238,31 +238,26 @@ std::variant ReadRegionCommitCache(const RegionPtr & region, bool lock_region = true) +std::optional ReadRegionCommitCache(const RegionPtr & region, bool lock_region) { auto scanner = region->createCommittedScanner(lock_region); /// Some sanity checks for region meta. - { - if (region->isPendingRemove()) - return std::nullopt; - } + if (region->isPendingRemove()) + return std::nullopt; /// Read raw KVs from region cache. - { - // Shortcut for empty region. - if (!scanner.hasNext()) - return std::nullopt; + // Shortcut for empty region. + if (!scanner.hasNext()) + return std::nullopt; - RegionDataReadInfoList data_list_read; - data_list_read.reserve(scanner.writeMapSize()); - - do - { - data_list_read.emplace_back(scanner.next()); - } while (scanner.hasNext()); - return data_list_read; - } + RegionDataReadInfoList data_list_read; + data_list_read.reserve(scanner.writeMapSize()); + do + { + data_list_read.emplace_back(scanner.next()); + } while (scanner.hasNext()); + return data_list_read; } void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoList & data_list_read, bool lock_region = true) @@ -401,7 +396,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio std::optional data_list_read = std::nullopt; try { - data_list_read = ReadRegionCommitCache(region); + data_list_read = ReadRegionCommitCache(region, true); if (!data_list_read) return nullptr; } @@ -567,10 +562,17 @@ static Block sortColumnsBySchemaSnap(Block && ori, const DM::ColumnDefines & sch /// The return value is a block that store the committed data scanned and removed from `region`. /// The columns of returned block is sorted by `schema_snap`. Block GenRegionBlockDataWithSchema(const RegionPtr & region, // +<<<<<<< HEAD const DecodingStorageSchemaSnapshot & schema_snap, Timestamp gc_safepoint, bool force_decode, TMTContext & tmt) +======= + const DecodingStorageSchemaSnapshotConstPtr & schema_snap, + Timestamp gc_safepoint, + bool force_decode, + TMTContext & /* */) +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) { // In 5.0.1, feature `compaction filter` is enabled by default. Under such feature tikv will do gc in write & default cf individually. // If some rows were updated and add tiflash replica, tiflash store may receive region snapshot with unmatched data in write & default cf sst files. @@ -578,17 +580,19 @@ Block GenRegionBlockDataWithSchema(const RegionPtr & region, // { gc_safepoint = 10000000; }); // Mock a GC safepoint for testing compaction filter region->tryCompactionFilter(gc_safepoint); - std::optional data_list_read = std::nullopt; - data_list_read = ReadRegionCommitCache(region); + std::optional data_list_read = ReadRegionCommitCache(region, true); Block res_block; // No committed data, just return if (!data_list_read) return res_block; +<<<<<<< HEAD auto context = tmt.getContext(); auto metrics = context.getTiFlashMetrics(); +======= +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) { Stopwatch watch; // Compare schema_snap with current schema, throw exception if changed. diff --git a/dbms/src/Storages/Transaction/PartitionStreams.h b/dbms/src/Storages/Transaction/PartitionStreams.h index f1406104b6b..1b087ff0d7a 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.h +++ b/dbms/src/Storages/Transaction/PartitionStreams.h @@ -2,6 +2,12 @@ #include #include +<<<<<<< HEAD +======= +#include +#include +#include +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) #include namespace DB @@ -11,6 +17,7 @@ class Region; using RegionPtr = std::shared_ptr; class StorageDeltaMerge; +<<<<<<< HEAD /** * A snapshot of the table structure of a DeltaTree storage. We use it to decode Raft snapshot * data with a consistent table structure. @@ -38,6 +45,11 @@ struct DecodingStorageSchemaSnapshot }; std::tuple, DecodingStorageSchemaSnapshot> // +======= +std::optional ReadRegionCommitCache(const RegionPtr & region, bool lock_region); + +std::tuple, DecodingStorageSchemaSnapshotConstPtr> // +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt); Block GenRegionBlockDataWithSchema(const RegionPtr & region, // diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.cpp b/dbms/src/Storages/Transaction/RegionBlockReader.cpp index fc407fc7dc3..744d0e7bb15 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.cpp +++ b/dbms/src/Storages/Transaction/RegionBlockReader.cpp @@ -1,14 +1,21 @@ #include +<<<<<<< HEAD #include +======= +#include +#include +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) #include #include #include #include #include +#include #include #include #include #include +#include #include @@ -161,6 +168,7 @@ void setPKVersionDel(ColumnUInt8 & delmark_col, else column_map.getMutableColumnPtr(pk_column_ids[0])->insert(Field(static_cast(pk))); } +<<<<<<< HEAD } template @@ -185,6 +193,9 @@ bool setColumnValues(ColumnUInt8 & delmark_col, DecodedRecordData decoded_data(visible_column_to_read_lut.size()); std::unique_ptr tmp_row; // decode row into Field list here for temporary use if necessary. +======= + +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) size_t index = 0; for (const auto & [pk, write_type, commit_ts, value_ptr] : data_list) { @@ -230,6 +241,7 @@ bool setColumnValues(ColumnUInt8 & delmark_col, } else { +<<<<<<< HEAD const TiKVValue & value = *value_ptr; const DecodedRow * row = nullptr; { @@ -262,6 +274,37 @@ bool setColumnValues(ColumnUInt8 & delmark_col, const auto & column_info = table_info.columns[id_to_idx.second]; if (auto it = findByColumnID(id_to_idx.first, unknown_fields); it != unknown_fields.end()) +======= + // Parse column value from encoded value + if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot, force_decode)) + return false; + } + } + + /// set extra handle column and pk columns from encoded key if need + if constexpr (pk_type != TMTPKType::STRING) + { + // For non-common handle, extra handle column's type is always Int64. + // We need to copy the handle value from encoded key. + const auto handle_value = static_cast(pk); + auto * raw_extra_column = const_cast((block.getByPosition(extra_handle_column_pos)).column.get()); + static_cast(raw_extra_column)->getData().push_back(handle_value); + // For pk_is_handle == true, we need to decode the handle value from encoded key, and insert + // to the specify column + if (!pk_column_ids.empty()) + { + auto * raw_pk_column = const_cast((block.getByPosition(pk_pos_map.at(pk_column_ids[0]))).column.get()); + if constexpr (pk_type == TMTPKType::INT64) + static_cast(raw_pk_column)->getData().push_back(handle_value); + else if constexpr (pk_type == TMTPKType::UINT64) + static_cast(raw_pk_column)->getData().push_back(UInt64(handle_value)); + else + { + // The pk_type must be Int32/UInt32 or more narrow type + // so cannot tell its' exact type here, just use `insert(Field)` + raw_pk_column->insert(Field(handle_value)); + if (unlikely(raw_pk_column->getInt(index) != handle_value)) +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) { if (!row->unknown_fields.with_codec_flag) { @@ -270,7 +313,12 @@ bool setColumnValues(ColumnUInt8 & delmark_col, } else { +<<<<<<< HEAD decoded_data.push_back(it); +======= + throw Exception(fmt::format("Detected overflow value when decoding pk column, type={} handle={}", raw_pk_column->getName(), handle_value), + ErrorCodes::LOGICAL_ERROR); +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } continue; } diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.h b/dbms/src/Storages/Transaction/RegionBlockReader.h index 19c03e52d80..3dc2dc2121b 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.h +++ b/dbms/src/Storages/Transaction/RegionBlockReader.h @@ -1,5 +1,6 @@ #pragma once +<<<<<<< HEAD #include #include #include @@ -16,6 +17,13 @@ class IManageableStorage; using ManageableStoragePtr = std::shared_ptr; struct ColumnsDescription; +======= +#include +#include + +namespace DB +{ +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) class Block; class RegionScanFilter @@ -110,13 +118,23 @@ class RegionBlockReader : private boost::noncopyable /// Read `data_list` as a block. /// - /// On decode error, i.e. column number/type mismatch, will do force apply schema, + /// On decode error, i.e. column number/type mismatch, caller should trigger a schema-sync and retry with `force_decode=True`, /// i.e. add/remove/cast unknown/missing/type-mismatch column if force_decode is true, otherwise return empty block and false. /// Moreover, exception will be thrown if we see fatal decode error meanwhile `force_decode` is true. /// +<<<<<<< HEAD /// `RegionBlockReader::read` is the common routine used by both 'flush' and 'read' processes of TXN engine (Delta-Tree, TXN-MergeTree), /// each of which will use carefully adjusted 'start_ts' and 'force_decode' with appropriate error handling/retry to get what they want. std::tuple read(const Names & column_names_to_read, RegionDataReadInfoList & data_list, bool force_decode); +======= + /// `RegionBlockReader::read` is the common routine used by both 'flush' and 'read' processes of Delta-Tree engine, + /// which will use carefully adjusted 'force_decode' with appropriate error handling/retry to get what they want. + bool read(Block & block, const RegionDataReadInfoList & data_list, bool force_decode); + +private: + template + bool readImpl(Block & block, const RegionDataReadInfoList & data_list, bool force_decode); +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) /// Read all columns from `data_list` as a block. inline std::tuple read(RegionDataReadInfoList & data_list, bool force_decode) diff --git a/dbms/src/Storages/Transaction/RowCodec.cpp b/dbms/src/Storages/Transaction/RowCodec.cpp index 5b4567d100b..052860d8199 100644 --- a/dbms/src/Storages/Transaction/RowCodec.cpp +++ b/dbms/src/Storages/Transaction/RowCodec.cpp @@ -524,4 +524,350 @@ void encodeRowV2(const TiDB::TableInfo & table_info, const std::vector & RowEncoderV2(table_info, fields).encode(ss); } +<<<<<<< HEAD +======= +// pre-declar block +template +bool appendRowV2ToBlockImpl( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const ColumnInfos & column_infos, + ColumnID pk_handle_id, + bool ignore_pk_if_absent, + bool force_decode); + +bool appendRowV1ToBlock( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const ColumnInfos & column_infos, + ColumnID pk_handle_id, + bool ignore_pk_if_absent, + bool force_decode); +// pre-declar block end + +bool appendRowToBlock( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const DecodingStorageSchemaSnapshotConstPtr & schema_snapshot, + bool force_decode) +{ + const ColumnInfos & column_infos = schema_snapshot->column_infos; + // when pk is handle, we need skip pk column when decoding value + ColumnID pk_handle_id = InvalidColumnID; + if (schema_snapshot->pk_is_handle) + { + pk_handle_id = schema_snapshot->pk_column_ids[0]; + } + + // For pk_is_handle table, the column with primary key flag is decoded from encoded key instead of encoded value. + // For common handle table, the column with primary key flag is (usually) decoded from encoded key. We skip + // filling the columns with primary key flags inside this method. + // For other table (non-clustered, use hidden _tidb_rowid as handle), the column with primary key flags could be + // changed, we need to fill missing column with default value. + const bool ignore_pk_if_absent = schema_snapshot->is_common_handle || schema_snapshot->pk_is_handle; + + switch (static_cast(raw_value[0])) + { + case static_cast(RowCodecVer::ROW_V2): + { + auto row_flag = readLittleEndian(&raw_value[1]); + bool is_big = row_flag & RowV2::BigRowMask; + return is_big ? appendRowV2ToBlockImpl(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, ignore_pk_if_absent, force_decode) + : appendRowV2ToBlockImpl(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, ignore_pk_if_absent, force_decode); + } + default: + return appendRowV1ToBlock(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, ignore_pk_if_absent, force_decode); + } +} + +inline bool addDefaultValueToColumnIfPossible(const ColumnInfo & column_info, Block & block, size_t block_column_pos, bool ignore_pk_if_absent, bool force_decode) +{ + // We consider a missing column could be safely filled with NULL, unless it has not default value and is NOT NULL. + // This could saves lots of unnecessary schema syncs for old data with a newer schema that has newly added columns. + + if (column_info.hasPriKeyFlag()) + { + // For clustered index or pk_is_handle, if the pk column does not exists, it can still be decoded from the key + if (ignore_pk_if_absent) + return true; + + assert(!ignore_pk_if_absent); + if (!force_decode) + return false; + // Else non-clustered index, and not pk_is_handle, it could be a row encoded by older schema, + // we need to fill the column wich has primary key flag with default value. + // fallthrough to fill default value when force_decode + } + + if (column_info.hasNoDefaultValueFlag() && column_info.hasNotNullFlag()) + { + if (!force_decode) + return false; + // Else the row does not contain this "not null" / "no default value" column, + // it could be a row encoded by older schema. + // fallthrough to fill default value when force_decode + } + // not null or has no default value, tidb will fill with specific value. + auto * raw_column = const_cast((block.getByPosition(block_column_pos)).column.get()); + raw_column->insert(column_info.defaultValueToField()); + return true; +} + +template +bool appendRowV2ToBlockImpl( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const ColumnInfos & column_infos, + ColumnID pk_handle_id, + bool ignore_pk_if_absent, + bool force_decode) +{ + size_t cursor = 2; // Skip the initial codec ver and row flag. + size_t num_not_null_columns = decodeUInt(cursor, raw_value); + size_t num_null_columns = decodeUInt(cursor, raw_value); + std::vector not_null_column_ids; + std::vector null_column_ids; + std::vector value_offsets; + decodeUInts::ColumnIDType>(cursor, raw_value, num_not_null_columns, not_null_column_ids); + decodeUInts::ColumnIDType>(cursor, raw_value, num_null_columns, null_column_ids); + decodeUInts::ValueOffsetType>(cursor, raw_value, num_not_null_columns, value_offsets); + size_t values_start_pos = cursor; + size_t idx_not_null = 0; + size_t idx_null = 0; + // Merge ordered not null/null columns to keep order. + while (idx_not_null < not_null_column_ids.size() || idx_null < null_column_ids.size()) + { + if (column_ids_iter == column_ids_iter_end) + { + // extra column + return force_decode; + } + + bool is_null; + if (idx_not_null < not_null_column_ids.size() && idx_null < null_column_ids.size()) + is_null = not_null_column_ids[idx_not_null] > null_column_ids[idx_null]; + else + is_null = idx_null < null_column_ids.size(); + + auto next_datum_column_id = is_null ? null_column_ids[idx_null] : not_null_column_ids[idx_not_null]; + const auto next_column_id = column_ids_iter->first; + if (next_column_id > next_datum_column_id) + { + // The next column id to read is bigger than the column id of next datum in encoded row. + // It means this is the datum of extra column. May happen when reading after dropping + // a column. + if (!force_decode) + return false; + // Ignore the extra column and continue to parse other datum + if (is_null) + idx_null++; + else + idx_not_null++; + } + else if (next_column_id < next_datum_column_id) + { + // The next column id to read is less than the column id of next datum in encoded row. + // It means this is the datum of missing column. May happen when reading after adding + // a column. + // Fill with default value and continue to read data for next column id. + const auto & column_info = column_infos[column_ids_iter->second]; + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) + return false; + column_ids_iter++; + block_column_pos++; + } + else + { + // If pk_handle_id is a valid column id, then it means the table's pk_is_handle is true + // we can just ignore the pk value encoded in value part + if (unlikely(next_column_id == pk_handle_id)) + { + column_ids_iter++; + block_column_pos++; + if (is_null) + { + idx_null++; + } + else + { + idx_not_null++; + } + continue; + } + + // Parse the datum. + auto * raw_column = const_cast((block.getByPosition(block_column_pos)).column.get()); + const auto & column_info = column_infos[column_ids_iter->second]; + if (is_null) + { + if (!raw_column->isColumnNullable()) + { + if (!force_decode) + { + return false; + } + else + { + throw Exception("Detected invalid null when decoding data of column " + column_info.name + " with column type " + raw_column->getName(), + ErrorCodes::LOGICAL_ERROR); + } + } + // ColumnNullable::insertDefault just insert a null value + raw_column->insertDefault(); + idx_null++; + } + else + { + size_t start = idx_not_null ? value_offsets[idx_not_null - 1] : 0; + size_t length = value_offsets[idx_not_null] - start; + if (!raw_column->decodeTiDBRowV2Datum(values_start_pos + start, raw_value, length, force_decode)) + return false; + idx_not_null++; + } + column_ids_iter++; + block_column_pos++; + } + } + while (column_ids_iter != column_ids_iter_end) + { + if (column_ids_iter->first != pk_handle_id) + { + const auto & column_info = column_infos[column_ids_iter->second]; + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) + return false; + } + column_ids_iter++; + block_column_pos++; + } + return true; +} + +using TiDB::DatumFlat; +bool appendRowV1ToBlock( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const ColumnInfos & column_infos, + ColumnID pk_handle_id, + bool ignore_pk_if_absent, + bool force_decode) +{ + size_t cursor = 0; + std::map decoded_fields; + while (cursor < raw_value.size()) + { + Field f = DecodeDatum(cursor, raw_value); + if (f.isNull()) + break; + ColumnID col_id = f.get(); + decoded_fields.emplace(col_id, DecodeDatum(cursor, raw_value)); + } + if (cursor != raw_value.size()) + throw Exception(std::string(__PRETTY_FUNCTION__) + ": cursor is not end, remaining: " + raw_value.substr(cursor), + ErrorCodes::LOGICAL_ERROR); + + auto decoded_field_iter = decoded_fields.begin(); + while (decoded_field_iter != decoded_fields.end()) + { + if (column_ids_iter == column_ids_iter_end) + { + // extra column + return force_decode; + } + + auto next_field_column_id = decoded_field_iter->first; + if (column_ids_iter->first > next_field_column_id) + { + // extra column + if (!force_decode) + return false; + decoded_field_iter++; + } + else if (column_ids_iter->first < next_field_column_id) + { + const auto & column_info = column_infos[column_ids_iter->second]; + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) + return false; + column_ids_iter++; + block_column_pos++; + } + else + { + // if pk_handle_id is a valid column id, then it means the table's pk_is_handle is true + // we can just ignore the pk value encoded in value part + if (unlikely(column_ids_iter->first == pk_handle_id)) + { + decoded_field_iter++; + column_ids_iter++; + block_column_pos++; + continue; + } + + auto * raw_column = const_cast((block.getByPosition(block_column_pos)).column.get()); + const auto & column_info = column_infos[column_ids_iter->second]; + DatumFlat datum(decoded_field_iter->second, column_info.tp); + const Field & unflattened = datum.field(); + if (datum.overflow(column_info)) + { + // Overflow detected, fatal if force_decode is true, + // as schema being newer and narrow shouldn't happen. + // Otherwise return false to outer, outer should sync schema and try again. + if (force_decode) + { + throw Exception("Detected overflow when decoding data " + std::to_string(unflattened.get()) + " of column " + + column_info.name + " with column " + raw_column->getName(), + ErrorCodes::LOGICAL_ERROR); + } + + return false; + } + if (datum.invalidNull(column_info)) + { + // Null value with non-null type detected, fatal if force_decode is true, + // as schema being newer and with invalid null shouldn't happen. + // Otherwise return false to outer, outer should sync schema and try again. + if (force_decode) + { + throw Exception("Detected invalid null when decoding data " + std::to_string(unflattened.get()) + + " of column " + column_info.name + " with type " + raw_column->getName(), + ErrorCodes::LOGICAL_ERROR); + } + + return false; + } + raw_column->insert(unflattened); + decoded_field_iter++; + column_ids_iter++; + block_column_pos++; + } + } + while (column_ids_iter != column_ids_iter_end) + { + if (column_ids_iter->first != pk_handle_id) + { + const auto & column_info = column_infos[column_ids_iter->second]; + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) + return false; + } + column_ids_iter++; + block_column_pos++; + } + return true; +} + +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } // namespace DB diff --git a/dbms/src/Storages/Transaction/RowCodec.h b/dbms/src/Storages/Transaction/RowCodec.h index f5ea640bb2e..07412772e9a 100644 --- a/dbms/src/Storages/Transaction/RowCodec.h +++ b/dbms/src/Storages/Transaction/RowCodec.h @@ -8,6 +8,7 @@ namespace DB { +<<<<<<< HEAD using TiDB::ColumnInfo; using TiDB::TableInfo; @@ -31,5 +32,20 @@ Field decodeUnknownColumnV2(const Field & unknown, const ColumnInfo & column_inf /// The following two encode functions are used for testing. void encodeRowV1(const TiDB::TableInfo & table_info, const std::vector & fields, WriteBuffer & ss); void encodeRowV2(const TiDB::TableInfo & table_info, const std::vector & fields, WriteBuffer & ss); +======= +/// The following two encode functions are used for testing. +void encodeRowV1(const TiDB::TableInfo & table_info, const std::vector & fields, WriteBuffer & ss); +void encodeRowV2(const TiDB::TableInfo & table_info, const std::vector & fields, WriteBuffer & ss); + +bool appendRowToBlock( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const DecodingStorageSchemaSnapshotConstPtr & schema_snapshot, + bool force_decode); + +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } // namespace DB diff --git a/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h b/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h new file mode 100644 index 00000000000..8d7034de6b9 --- /dev/null +++ b/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h @@ -0,0 +1,359 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include +#include +#include + +namespace DB::tests +{ +using DM::ColumnDefine; +using DM::ColumnDefines; +using TiDB::ColumnInfo; +using TiDB::TableInfo; +using ColumnIDs = std::vector; + +template +struct ColumnTP +{ +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeTiny; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeShort; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeLong; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeLongLong; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeTiny; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeShort; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeLong; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeLongLong; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeFloat; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeDouble; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeString; +}; +template <> +struct ColumnTP> +{ + static const auto tp = TiDB::TypeNewDecimal; +}; +template <> +struct ColumnTP> +{ + static const auto tp = TiDB::TypeNewDecimal; +}; +template <> +struct ColumnTP> +{ + static const auto tp = TiDB::TypeNewDecimal; +}; +template <> +struct ColumnTP> +{ + static const auto tp = TiDB::TypeNewDecimal; +}; + +inline String getTestColumnName(ColumnID id) +{ + return "column" + std::to_string(id); +} + +template +ColumnInfo getColumnInfo(ColumnID id) +{ + ColumnInfo column_info; + column_info.id = id; + column_info.tp = ColumnTP::tp; + column_info.name = getTestColumnName(id); + if constexpr (std::is_unsigned_v) + column_info.setUnsignedFlag(); + if constexpr (!nullable) + column_info.setNotNullFlag(); + return column_info; +} + +template +struct ColumnIDValue +{ + static constexpr bool value_is_null = is_null; + using ValueType = std::decay_t; + ColumnIDValue(ColumnID id_, const T & value_) + : id(id_) + , value(value_) + {} + ColumnIDValue(ColumnID id_, T && value_) + : id(id_) + , value(std::move(value_)) + {} + ColumnID id; + ValueType value; +}; + +template +struct ColumnIDValue +{ + static constexpr bool value_is_null = true; + using ValueType = std::decay_t; + explicit ColumnIDValue(ColumnID id_) + : id(id_) + {} + ColumnID id; +}; + +template +using ColumnIDValueNull = ColumnIDValue; + +using OrderedColumnInfoFields = std::map>; + +template +constexpr bool IsDecimalFieldType = false; +template <> +inline constexpr bool IsDecimalFieldType> = true; +template <> +inline constexpr bool IsDecimalFieldType> = true; +template <> +inline constexpr bool IsDecimalFieldType> = true; +template <> +inline constexpr bool IsDecimalFieldType> = true; + +template +void getTableInfoFieldsInternal(OrderedColumnInfoFields & column_info_fields, Type && column_id_value) +{ + using DecayType = std::decay_t; + using ValueType = typename DecayType::ValueType; + using NearestType = typename NearestFieldType::Type; + if constexpr (DecayType::value_is_null) + { + ColumnInfo column_info = getColumnInfo(column_id_value.id); + // create non zero flen and decimal to avoid error when creating decimal type + if constexpr (IsDecimalFieldType) + { + column_info.flen = 1; + column_info.decimal = 1; + } + column_info_fields.emplace(column_id_value.id, std::make_tuple(column_info, Field())); + } + else + { + if constexpr (IsDecimalFieldType) + { + ColumnInfo column_info = getColumnInfo(column_id_value.id); + auto field = static_cast(std::move(column_id_value.value)); + column_info.flen = field.getPrec(); + column_info.decimal = field.getScale(); + column_info_fields.emplace(column_id_value.id, std::make_tuple(column_info, field)); + } + else + { + column_info_fields.emplace(column_id_value.id, + std::make_tuple(getColumnInfo(column_id_value.id), + static_cast(std::move(column_id_value.value)))); + } + } +} + +template +void getTableInfoFieldsInternal(OrderedColumnInfoFields & column_info_fields, Type && first, Rest &&... rest) +{ + getTableInfoFieldsInternal(column_info_fields, first); + getTableInfoFieldsInternal(column_info_fields, std::forward(rest)...); +} + +template +std::pair> getTableInfoAndFields(ColumnIDs pk_col_ids, bool is_common_handle, Types &&... column_value_ids) +{ + OrderedColumnInfoFields column_info_fields; + getTableInfoFieldsInternal(column_info_fields, std::forward(column_value_ids)...); + TableInfo table_info; + std::vector fields; + bool pk_is_handle = pk_col_ids.size() == 1 && pk_col_ids[0] != ::DB::TiDBPkColumnID; + + for (auto & column_info_field : column_info_fields) + { + auto & column = std::get<0>(column_info_field.second); + auto & field = std::get<1>(column_info_field.second); + if (std::find(pk_col_ids.begin(), pk_col_ids.end(), column.id) != pk_col_ids.end()) + { + column.setPriKeyFlag(); + if (column.tp != TiDB::TypeLong && column.tp != TiDB::TypeTiny && column.tp != TiDB::TypeLongLong && column.tp != TiDB::TypeShort && column.tp != TiDB::TypeInt24) + { + pk_is_handle = false; + } + } + table_info.columns.emplace_back(std::move(column)); + fields.emplace_back(std::move(field)); + } + + table_info.pk_is_handle = pk_is_handle; + table_info.is_common_handle = is_common_handle; + if (is_common_handle) + { + table_info.is_common_handle = true; + // TiFlash maintains the column name of primary key + // for common handle table + TiDB::IndexInfo pk_index_info; + pk_index_info.is_primary = true; + pk_index_info.idx_name = "PRIMARY"; + pk_index_info.is_unique = true; + for (auto pk_col_id : pk_col_ids) + { + TiDB::IndexColumnInfo index_column_info; + for (auto & column : table_info.columns) + { + if (column.id == pk_col_id) + { + index_column_info.name = column.name; + break; + } + } + pk_index_info.idx_cols.emplace_back(index_column_info); + } + table_info.index_infos.emplace_back(pk_index_info); + } + + return std::make_pair(std::move(table_info), std::move(fields)); +} + +inline DecodingStorageSchemaSnapshotConstPtr getDecodingStorageSchemaSnapshot(const TableInfo & table_info) +{ + ColumnDefines store_columns; + if (table_info.is_common_handle) + { + DM::ColumnDefine extra_handle_column{EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_STRING_TYPE}; + store_columns.emplace_back(extra_handle_column); + } + else + { + DM::ColumnDefine extra_handle_column{EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_INT_TYPE}; + store_columns.emplace_back(extra_handle_column); + } + store_columns.emplace_back(VERSION_COLUMN_ID, VERSION_COLUMN_NAME, VERSION_COLUMN_TYPE); + store_columns.emplace_back(TAG_COLUMN_ID, TAG_COLUMN_NAME, TAG_COLUMN_TYPE); + ColumnID handle_id = EXTRA_HANDLE_COLUMN_ID; + for (const auto & column_info : table_info.columns) + { + if (table_info.pk_is_handle) + { + if (column_info.hasPriKeyFlag()) + handle_id = column_info.id; + } + store_columns.emplace_back(column_info.id, column_info.name, DB::getDataTypeByColumnInfo(column_info)); + } + + if (handle_id != EXTRA_HANDLE_COLUMN_ID) + { + auto iter = std::find_if(store_columns.begin(), store_columns.end(), [&](const ColumnDefine & cd) { return cd.id == handle_id; }); + return std::make_shared(std::make_shared(store_columns), table_info, *iter, /* decoding_schema_version_ */ 1); + } + else + { + return std::make_shared(std::make_shared(store_columns), table_info, store_columns[0], /* decoding_schema_version_ */ 1); + } +} + +template +size_t valueStartPos(const TableInfo & table_info) +{ + return 1 + 1 + 2 + 2 + (is_big ? 8 : 3) * table_info.columns.size(); +} + +inline Block decodeRowToBlock(const String & row_value, DecodingStorageSchemaSnapshotConstPtr decoding_schema) +{ + const auto & sorted_column_id_with_pos = decoding_schema->sorted_column_id_with_pos; + auto iter = sorted_column_id_with_pos.begin(); + const size_t value_column_num = 3; + // skip first three column which is EXTRA_HANDLE_COLUMN, VERSION_COLUMN, TAG_COLUMN + for (size_t i = 0; i < value_column_num; i++) + iter++; + + Block block = createBlockSortByColumnID(decoding_schema); + appendRowToBlock(row_value, iter, sorted_column_id_with_pos.end(), block, value_column_num, decoding_schema, true); + + // remove first three column + for (size_t i = 0; i < value_column_num; i++) + block.erase(0); + return block; +} + +template +std::tuple getValueLengthByRowV2(const T & v) +{ + using NearestType = typename NearestFieldType::Type; + auto [table_info, fields] = getTableInfoAndFields({EXTRA_HANDLE_COLUMN_ID}, false, ColumnIDValue(1, v)); + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + WriteBufferFromOwnString ss; + encodeRowV2(table_info, fields, ss); + auto encoded = ss.str(); + Block block = decodeRowToBlock(encoded, decoding_schema); + return std::make_tuple(static_cast(std::move((*block.getByPosition(0).column)[0].template safeGet())), + encoded.size() - valueStartPos(table_info)); +} + +template +T getValueByRowV1(const T & v) +{ + using NearestType = typename NearestFieldType::Type; + auto [table_info, fields] = getTableInfoAndFields({EXTRA_HANDLE_COLUMN_ID}, false, ColumnIDValue(1, v)); + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + WriteBufferFromOwnString ss; + encodeRowV1(table_info, fields, ss); + auto encoded = ss.str(); + Block block = decodeRowToBlock(encoded, decoding_schema); + return static_cast(std::move((*block.getByPosition(0).column)[0].template safeGet())); +} + +} // namespace DB::tests diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp new file mode 100644 index 00000000000..7d90ac520f3 --- /dev/null +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -0,0 +1,1500 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace RegionBench +{ +extern void setupPutRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &, const TiKVValue &); +extern void setupDelRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &); +} // namespace RegionBench + +extern void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoList & data_list_read, bool lock_region = true); +extern void CheckRegionForMergeCmd(const raft_cmdpb::AdminResponse & response, const RegionState & region_state); +extern void ChangeRegionStateRange(RegionState & region_state, bool source_at_left, const RegionState & source_region_state); + +namespace tests +{ + +// TODO: Use another way to workaround calling the private methods on KVStore +class RegionKVStoreTest : public ::testing::Test +{ +public: + RegionKVStoreTest() + { + test_path = TiFlashTestEnv::getTemporaryPath("/region_kvs_test"); + } + + static void SetUpTestCase() {} + + void SetUp() override + { + // clean data and create path pool instance + path_pool = createCleanPathPool(test_path); + + reloadKVSFromDisk(); + + proxy_instance = std::make_unique(); + proxy_helper = std::make_unique(MockRaftStoreProxy::SetRaftStoreProxyFFIHelper( + RaftStoreProxyPtr{proxy_instance.get()})); + proxy_instance->init(100); + + kvstore->restore(*path_pool, proxy_helper.get()); + } + + void TearDown() override {} + +protected: + KVStore & getKVS() { return *kvstore; } + KVStore & reloadKVSFromDisk() + { + kvstore.reset(); + auto & global_ctx = TiFlashTestEnv::getGlobalContext(); + kvstore = std::make_unique(global_ctx, TiDB::SnapshotApplyMethod::DTFile_Directory); + // only recreate kvstore and restore data from disk, don't recreate proxy instance + kvstore->restore(*path_pool, proxy_helper.get()); + return *kvstore; + } + +protected: + static void testRaftSplit(KVStore & kvs, TMTContext & tmt); + static void testRaftMerge(KVStore & kvs, TMTContext & tmt); + static void testRaftChangePeer(KVStore & kvs, TMTContext & tmt); + static void testRaftMergeRollback(KVStore & kvs, TMTContext & tmt); + + static std::unique_ptr createCleanPathPool(const String & path) + { + // Drop files on disk + Poco::File file(path); + if (file.exists()) + file.remove(true); + file.createDirectories(); + + auto & global_ctx = TiFlashTestEnv::getGlobalContext(); + auto path_capacity = global_ctx.getPathCapacity(); + auto provider = global_ctx.getFileProvider(); + // Create a PathPool instance on the clean directory + Strings main_data_paths{path}; + return std::make_unique(main_data_paths, main_data_paths, Strings{}, path_capacity, provider); + } + + std::string test_path; + + std::unique_ptr path_pool; + std::unique_ptr kvstore; + + std::unique_ptr proxy_instance; + std::unique_ptr proxy_helper; +}; + +TEST_F(RegionKVStoreTest, NewProxy) +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + + KVStore & kvs = getKVS(); + { + auto store = metapb::Store{}; + store.set_id(1234); + kvs.setStore(store); + ASSERT_EQ(kvs.getStoreID(), store.id()); + } + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + } + { + kvs.tryPersist(1); + kvs.gcRegionPersistedCache(Seconds{0}); + } + { + // test CompactLog + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + auto region = kvs.getRegion(1); + region->markCompactLog(); + kvs.setRegionCompactLogConfig(100000, 1000, 1000); + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + // CompactLog always returns true now, even if we can't do a flush. + // We use a tryFlushData to pre-filter. + ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response), 1, 5, 1, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + // Filter + ASSERT_EQ(kvs.tryFlushRegionData(1, false, ctx.getTMTContext(), 0, 0), false); + } +} + +TEST_F(RegionKVStoreTest, ReadIndex) +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + + // start mock proxy in other thread + std::atomic_bool over{false}; + auto proxy_runner = std::thread([&]() { + proxy_instance->testRunNormal(over); + }); + KVStore & kvs = getKVS(); + ASSERT_EQ(kvs.getProxyHelper(), proxy_helper.get()); + + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10), kvs.getProxyHelper()); + lock.regions.emplace(1, region); + lock.index.add(region); + } + { + auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20), kvs.getProxyHelper()); + lock.regions.emplace(2, region); + lock.index.add(region); + } + { + auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40), kvs.getProxyHelper()); + lock.regions.emplace(3, region); + lock.index.add(region); + } + } + { + ASSERT_EQ(kvs.read_index_worker_manager, nullptr); + { + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 8); + try + { + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "`fn_handle_batch_read_index` is deprecated"); + } + } + kvs.initReadIndexWorkers( + []() { + return std::chrono::milliseconds(10); + }, + 1); + ASSERT_NE(kvs.read_index_worker_manager, nullptr); + + { + kvs.asyncRunReadIndexWorkers(); + SCOPE_EXIT({ + kvs.stopReadIndexWorkers(); + }); + + auto tar_region_id = 9; + { + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + + auto region = makeRegion(tar_region_id, RecordKVFormat::genKey(2, 0), RecordKVFormat::genKey(2, 10)); + lock.regions.emplace(region->id(), region); + lock.index.add(region); + } + { + ASSERT_EQ(proxy_instance->regions.at(tar_region_id)->getLatestCommitIndex(), 5); + proxy_instance->regions.at(tar_region_id)->updateCommitIndex(66); + } + + AsyncWaker::Notifier notifier; + const std::atomic_size_t terminate_signals_counter{}; + std::thread t([&]() { + notifier.wake(); + WaitCheckRegionReady(ctx.getTMTContext(), kvs, terminate_signals_counter, 1 / 1000.0, 20, 20 * 60); + }); + SCOPE_EXIT({ + t.join(); + kvs.handleDestroy(tar_region_id, ctx.getTMTContext()); + }); + ASSERT_EQ(notifier.blockedWaitFor(std::chrono::milliseconds(1000 * 3600)), AsyncNotifier::Status::Normal); + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + auto tar = kvs.getRegion(tar_region_id); + ASSERT_EQ( + tar->handleWriteRaftCmd({}, 66, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + } + { + kvs.asyncRunReadIndexWorkers(); + SCOPE_EXIT({ + kvs.stopReadIndexWorkers(); + }); + + auto tar_region_id = 9; + { + ASSERT_EQ(proxy_instance->regions.at(tar_region_id)->getLatestCommitIndex(), 66); + proxy_instance->unsafeInvokeForTest([&](MockRaftStoreProxy & p) { + p.region_id_to_error.emplace(tar_region_id); + p.regions.at(2)->updateCommitIndex(6); + }); + } + + AsyncWaker::Notifier notifier; + const std::atomic_size_t terminate_signals_counter{}; + std::thread t([&]() { + notifier.wake(); + WaitCheckRegionReady(ctx.getTMTContext(), kvs, terminate_signals_counter, 1 / 1000.0, 2 / 1000.0, 5 / 1000.0); + }); + SCOPE_EXIT({ + t.join(); + }); + ASSERT_EQ(notifier.blockedWaitFor(std::chrono::milliseconds(1000 * 3600)), AsyncNotifier::Status::Normal); + } + + kvs.asyncRunReadIndexWorkers(); + SCOPE_EXIT({ + kvs.stopReadIndexWorkers(); + }); + + { + // test read index + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 8); + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_EQ(resp[0].first.read_index(), 5); + { + auto r = region->waitIndex(5, 0, []() { return true; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Finished); + } + { + auto r = region->waitIndex(8, 1, []() { return false; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Terminated); + } + } + for (auto & r : proxy_instance->regions) + { + r.second->updateCommitIndex(667); + } + { + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 8); + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_EQ(resp[0].first.read_index(), 5); // history + } + { + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 10); + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_EQ(resp[0].first.read_index(), 667); + } + { + auto region = kvs.getRegion(2); + auto req = GenRegionReadIndexReq(*region, 5); + auto resp = proxy_helper->batchReadIndex({req}, 100); // v2 + ASSERT_EQ(resp[0].first.read_index(), 667); // got latest + { + auto r = region->waitIndex(667 + 1, 2, []() { return true; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Timeout); + } + { + AsyncWaker::Notifier notifier; + std::thread t([&]() { + notifier.wake(); + auto r = region->waitIndex(667 + 1, 100000, []() { return true; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Finished); + }); + SCOPE_EXIT({ + t.join(); + }); + ASSERT_EQ(notifier.blockedWaitFor(std::chrono::milliseconds(1000 * 3600)), AsyncNotifier::Status::Normal); + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + region->handleWriteRaftCmd({}, 667 + 1, 6, ctx.getTMTContext()); + } + } + } + kvs.stopReadIndexWorkers(); + kvs.releaseReadIndexWorkers(); + over = true; + proxy_instance->wake(); + proxy_runner.join(); + ASSERT(GCMonitor::instance().checkClean()); + ASSERT(!GCMonitor::instance().empty()); +} + +void RegionKVStoreTest::testRaftMergeRollback(KVStore & kvs, TMTContext & tmt) +{ + uint64_t region_id = 7; + { + auto source_region = kvs.getRegion(region_id); + auto target_region = kvs.getRegion(1); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::PrepareMerge); + auto * prepare_merge = request.mutable_prepare_merge(); + { + auto min_index = source_region->appliedIndex(); + prepare_merge->set_min_index(min_index); + + metapb::Region * target = prepare_merge->mutable_target(); + *target = target_region->getMetaRegion(); + } + } + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + region_id, + 31, + 6, + tmt); + ASSERT_TRUE(source_region->isMerging()); + } + { + auto region = kvs.getRegion(region_id); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::RollbackMerge); + + auto * rollback_merge = request.mutable_rollback_merge(); + { + auto merge_state = region->getMergeState(); + rollback_merge->set_commit(merge_state.commit()); + } + } + region->setStateApplying(); + + try + { + raft_cmdpb::AdminRequest first_request = request; + raft_cmdpb::AdminResponse first_response = response; + kvs.handleAdminRaftCmd(std::move(first_request), + std::move(first_response), + region_id, + 32, + 6, + tmt); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "execRollbackMerge: region state is Applying, expect Merging"); + } + ASSERT_EQ(region->peerState(), raft_serverpb::PeerState::Applying); + region->setPeerState(raft_serverpb::PeerState::Merging); + + region->meta.region_state.getMutMergeState().set_commit(1234); + try + { + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + region_id, + 32, + 6, + tmt); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "execRollbackMerge: merge commit index is 1234, expect 31"); + } + region->meta.region_state.getMutMergeState().set_commit(31); + } + { + auto region = kvs.getRegion(region_id); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::RollbackMerge); + + auto * rollback_merge = request.mutable_rollback_merge(); + { + auto merge_state = region->getMergeState(); + rollback_merge->set_commit(merge_state.commit()); + } + } + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + region_id, + 32, + 6, + tmt); + ASSERT_EQ(region->peerState(), raft_serverpb::PeerState::Normal); + } +} + +void RegionKVStoreTest::testRaftSplit(KVStore & kvs, TMTContext & tmt) +{ + { + auto region = kvs.getRegion(1); + auto table_id = 1; + region->insert("lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + region->insert("lock", RecordKVFormat::genKey(table_id, 8), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 8, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 8, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + + ASSERT_EQ(region->dataInfo(), "[write 2 lock 2 default 2 ]"); + } + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + // split region + auto region_id = 1; + RegionID region_id2 = 7; + auto source_region = kvs.getRegion(region_id); + metapb::RegionEpoch new_epoch; + new_epoch.set_version(source_region->version() + 1); + new_epoch.set_conf_ver(source_region->confVer()); + TiKVKey start_key1, start_key2, end_key1, end_key2; + { + start_key1 = RecordKVFormat::genKey(1, 5); + start_key2 = RecordKVFormat::genKey(1, 0); + end_key1 = RecordKVFormat::genKey(1, 10); + end_key2 = RecordKVFormat::genKey(1, 5); + } + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::BatchSplit); + raft_cmdpb::BatchSplitResponse * splits = response.mutable_splits(); + { + auto * region = splits->add_regions(); + region->set_id(region_id); + region->set_start_key(start_key1); + region->set_end_key(end_key1); + region->add_peers(); + *region->mutable_region_epoch() = new_epoch; + } + { + auto * region = splits->add_regions(); + region->set_id(region_id2); + region->set_start_key(start_key2); + region->set_end_key(end_key2); + region->add_peers(); + *region->mutable_region_epoch() = new_epoch; + } + } + } + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 1, 20, 5, tmt); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 5), RecordKVFormat::genKey(1, 10))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[write 1 lock 1 default 1 ]"); + ASSERT_EQ(kvs.getRegion(7)->dataInfo(), "[lock 1 ]"); + } + // rollback 1 to before split + // 7 is persisted + { + kvs.handleDestroy(1, tmt); + { + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + auto table_id = 1; + auto region = kvs.getRegion(1); + region->insert("lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + region->insert("lock", RecordKVFormat::genKey(table_id, 8), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 8, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 8, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + + ASSERT_EQ(region->dataInfo(), "[write 2 lock 2 default 2 ]"); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 2); + } + // split again + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 1, 20, 5, tmt); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 5), RecordKVFormat::genKey(1, 10))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[write 1 lock 1 default 1 ]"); + ASSERT_EQ(kvs.getRegion(7)->dataInfo(), "[lock 1 ]"); + } +} + +void RegionKVStoreTest::testRaftChangePeer(KVStore & kvs, TMTContext & tmt) +{ + { + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + auto region = makeRegion(88, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 100)); + lock.regions.emplace(88, region); + lock.index.add(region); + } + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeer); + auto meta = kvs.getRegion(88)->getMetaRegion(); + meta.mutable_peers()->Clear(); + meta.add_peers()->set_id(2); + meta.add_peers()->set_id(4); + *response.mutable_change_peer()->mutable_region() = meta; + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 88, 6, 5, tmt); + ASSERT_NE(kvs.getRegion(88), nullptr); + } + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeerV2); + auto meta = kvs.getRegion(88)->getMetaRegion(); + meta.mutable_peers()->Clear(); + meta.add_peers()->set_id(3); + meta.add_peers()->set_id(4); + *response.mutable_change_peer()->mutable_region() = meta; + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 88, 7, 5, tmt); + ASSERT_EQ(kvs.getRegion(88), nullptr); + } +} + +void RegionKVStoreTest::testRaftMerge(KVStore & kvs, TMTContext & tmt) +{ + { + kvs.getRegion(1)->clearAllData(); + kvs.getRegion(7)->clearAllData(); + + { + auto region = kvs.getRegion(1); + auto table_id = 1; + region->insert("lock", RecordKVFormat::genKey(table_id, 6), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 6, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 6, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(region->dataInfo(), "[write 1 lock 1 default 1 ]"); + } + { + auto region = kvs.getRegion(7); + auto table_id = 1; + region->insert("lock", RecordKVFormat::genKey(table_id, 2), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 2, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 2, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(region->dataInfo(), "[write 1 lock 1 default 1 ]"); + } + } + + { + auto region_id = 7; + auto source_region = kvs.getRegion(region_id); + auto target_region = kvs.getRegion(1); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::PrepareMerge); + + auto * prepare_merge = request.mutable_prepare_merge(); + { + auto min_index = source_region->appliedIndex(); + prepare_merge->set_min_index(min_index); + + metapb::Region * target = prepare_merge->mutable_target(); + *target = target_region->getMetaRegion(); + } + } + + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + source_region->id(), + 35, + 6, + tmt); + ASSERT_EQ(source_region->peerState(), raft_serverpb::PeerState::Merging); + } + + { + auto source_id = 7, target_id = 1; + auto source_region = kvs.getRegion(source_id); + raft_cmdpb::AdminRequest request; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::CommitMerge); + auto * commit_merge = request.mutable_commit_merge(); + { + commit_merge->set_commit(source_region->appliedIndex()); + *commit_merge->mutable_source() = source_region->getMetaRegion(); + } + } + source_region->setStateApplying(); + source_region->makeRaftCommandDelegate(kvs.genTaskLock()); + const auto & source_region_meta_delegate = source_region->meta.makeRaftCommandDelegate(); + try + { + kvs.getRegion(target_id)->meta.makeRaftCommandDelegate().checkBeforeCommitMerge(request, source_region_meta_delegate); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "checkBeforeCommitMerge: unexpected state Applying of source 1"); + } + source_region->setPeerState(raft_serverpb::PeerState::Normal); + { + request.mutable_commit_merge()->mutable_source()->mutable_start_key()->clear(); + } + try + { + kvs.getRegion(target_id)->meta.makeRaftCommandDelegate().checkBeforeCommitMerge(request, source_region_meta_delegate); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "checkBeforeCommitMerge: source region not match exist region meta"); + } + } + + { + auto source_id = 7, target_id = 1; + auto source_region = kvs.getRegion(source_id); + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::CommitMerge); + auto * commit_merge = request.mutable_commit_merge(); + { + commit_merge->set_commit(source_region->appliedIndex()); + *commit_merge->mutable_source() = source_region->getMetaRegion(); + } + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_TRUE(mmp.count(target_id) != 0); + ASSERT_EQ(mmp.size(), 2); + } + + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), + raft_cmdpb::AdminResponse(response), + target_id, + 36, + 6, + tmt); + + ASSERT_EQ(kvs.getRegion(source_id), nullptr); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + // add 7 back + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + auto region = makeRegion(7, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5)); + lock.regions.emplace(7, region); + lock.index.add(region); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 2); + } + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), + raft_cmdpb::AdminResponse(response), + target_id, + 36, + 6, + tmt); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[lock 2 ]"); + } +} + +TEST_F(RegionKVStoreTest, Region) +{ + TableID table_id = 100; + { + auto meta = RegionMeta(createPeer(2, true), createRegionInfo(666, RecordKVFormat::genKey(0, 0), RecordKVFormat::genKey(0, 1000)), initialApplyState()); + ASSERT_EQ(meta.peerId(), 2); + } + auto region = makeRegion(1, RecordKVFormat::genKey(table_id, 0), RecordKVFormat::genKey(table_id, 1000)); + { + ASSERT_TRUE(region->checkIndex(5)); + } + { + auto start_ts = 199; + auto req = GenRegionReadIndexReq(*region, start_ts); + ASSERT_EQ(req.ranges().size(), 1); + ASSERT_EQ(req.start_ts(), start_ts); + ASSERT_EQ(region->getMetaRegion().region_epoch().DebugString(), + req.context().region_epoch().DebugString()); + ASSERT_EQ(region->getRange()->comparableKeys().first.key, req.ranges()[0].start_key()); + ASSERT_EQ(region->getRange()->comparableKeys().second.key, req.ranges()[0].end_key()); + } + { + region->insert("lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(1, region->writeCFCount()); + ASSERT_EQ(region->dataInfo(), "[write 1 lock 1 default 1 ]"); + { + auto iter = region->createCommittedScanner(); + auto lock = iter.getLockInfo({100, nullptr}); + ASSERT_NE(lock, nullptr); + auto k = lock->intoLockInfo(); + ASSERT_EQ(k->lock_version(), 3); + } + { + std::optional data_list_read = ReadRegionCommitCache(region, true); + ASSERT_TRUE(data_list_read); + ASSERT_EQ(1, data_list_read->size()); + RemoveRegionCommitCache(region, *data_list_read); + } + ASSERT_EQ(0, region->writeCFCount()); + { + region->remove("lock", RecordKVFormat::genKey(table_id, 3)); + auto iter = region->createCommittedScanner(); + auto lock = iter.getLockInfo({100, nullptr}); + ASSERT_EQ(lock, nullptr); + } + region->clearAllData(); + } + { + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(region->dataInfo(), "[write 1 ]"); + + auto ori_size = region->dataSize(); + try + { + // insert duplicate records + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Found existing key in hex: 7480000000000000FF645F728000000000FF0000030000000000FAFFFFFFFFFFFFFFF7"); + } + ASSERT_EQ(ori_size, region->dataSize()); + + region->tryCompactionFilter(100); + ASSERT_EQ(region->dataInfo(), "[]"); + } + { + region->insert("write", RecordKVFormat::genKey(table_id, 4, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::DelFlag, 5)); + ASSERT_EQ(1, region->writeCFCount()); + region->remove("write", RecordKVFormat::genKey(table_id, 4, 8)); + ASSERT_EQ(1, region->writeCFCount()); + { + std::optional data_list_read = ReadRegionCommitCache(region, true); + ASSERT_TRUE(data_list_read); + ASSERT_EQ(1, data_list_read->size()); + RemoveRegionCommitCache(region, *data_list_read); + } + ASSERT_EQ(0, region->writeCFCount()); + } + { + ASSERT_EQ(0, region->dataSize()); + + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + ASSERT_LT(0, region->dataSize()); + + region->remove("default", RecordKVFormat::genKey(table_id, 3, 5)); + ASSERT_EQ(0, region->dataSize()); + + // remove duplicate records + region->remove("default", RecordKVFormat::genKey(table_id, 3, 5)); + ASSERT_EQ(0, region->dataSize()); + } +} + +TEST_F(RegionKVStoreTest, KVStore) +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + + KVStore & kvs = getKVS(); + { + // Run without read-index workers + + kvs.initReadIndexWorkers( + []() { + return std::chrono::milliseconds(10); + }, + 0); + ASSERT_EQ(kvs.read_index_worker_manager, nullptr); + kvs.asyncRunReadIndexWorkers(); + kvs.stopReadIndexWorkers(); + kvs.releaseReadIndexWorkers(); + } + { + auto store = metapb::Store{}; + store.set_id(1234); + kvs.setStore(store); + ASSERT_EQ(kvs.getStoreID(), store.id()); + } + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + { + auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)); + lock.regions.emplace(2, region); + lock.index.add(region); + } + { + auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)); + lock.regions.emplace(3, region); + lock.index.add(region); + } + } + { + kvs.tryPersist(1); + kvs.gcRegionPersistedCache(Seconds{0}); + } + { + ASSERT_EQ(kvs.regionSize(), 3); + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 15), TiKVKey(""))); + ASSERT_EQ(mmp.size(), 2); + kvs.handleDestroy(3, ctx.getTMTContext()); + kvs.handleDestroy(3, ctx.getTMTContext()); + } + { + RegionMap mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 15), TiKVKey(""))); + ASSERT_EQ(mmp.size(), 1); + ASSERT_EQ(mmp.at(2)->id(), 2); + } + { + { + raft_cmdpb::RaftCmdRequest request; + { + auto lock_key = RecordKVFormat::genKey(1, 2333); + TiKVValue lock_value = RecordKVFormat::encodeLockCfValue(Region::DelFlag, "pk", 77, 0); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Lock, lock_key, lock_value); + auto write_key = RecordKVFormat::genKey(1, 2333, 1); + TiKVValue write_value = RecordKVFormat::encodeWriteCfValue(Region::PutFlag, 2333); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Write, write_key, write_value); + } + try + { + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 6, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Raw TiDB PK: 800000000000091D, Prewrite ts: 2333 can not found in default cf for key: 7480000000000000FF015F728000000000FF00091D0000000000FAFFFFFFFFFFFFFFFE"); + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[write 1 lock 1 ]"); + kvs.getRegion(1)->tryCompactionFilter(1000); + } + try + { + raft_cmdpb::RaftCmdRequest request; + { + auto key = RecordKVFormat::genKey(1, 2333, 1); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Default, key, "v1"); + } + { + // duplicate + auto key = RecordKVFormat::genKey(1, 2333, 1); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Default, key, "v1"); + } + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 6, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Found existing key in hex: 7480000000000000FF015F728000000000FF00091D0000000000FAFFFFFFFFFFFFFFFE"); + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[lock 1 default 1 ]"); + kvs.getRegion(1)->remove("default", RecordKVFormat::genKey(1, 2333, 1)); + try + { + raft_cmdpb::RaftCmdRequest request; + { + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Default, std::string("k1"), "v1"); + } + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 6, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Unexpected eof"); + } + try + { + raft_cmdpb::RaftCmdRequest request; + request.add_requests()->set_cmd_type(::raft_cmdpb::CmdType::Invalid); + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 10, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Unsupport raft cmd Invalid"); + } + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[lock 1 ]"); + { + raft_cmdpb::RaftCmdRequest request; + { + auto lock_key = RecordKVFormat::genKey(1, 2333); + TiKVValue lock_value = RecordKVFormat::encodeLockCfValue(Region::DelFlag, "pk", 77, 0); + RegionBench::setupDelRequest(request.add_requests(), ColumnFamilyName::Lock, lock_key); + } + raft_cmdpb::RaftCmdRequest first_request = request; + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(first_request), 1, 7, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + + RegionBench::setupDelRequest(request.add_requests(), ColumnFamilyName::Write, TiKVKey("illegal key")); + // index <= appliedIndex(), ignore + raft_cmdpb::RaftCmdRequest second_request; + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(second_request), 1, 7, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + try + { + // + request.clear_requests(); + RegionBench::setupDelRequest(request.add_requests(), ColumnFamilyName::Write, TiKVKey("illegal key")); + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 9, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Key padding"); + } + + ASSERT_EQ(kvs.getRegion(1)->appliedIndex(), 7); + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[]"); + + ASSERT_EQ( + kvs.handleWriteRaftCmd(raft_cmdpb::RaftCmdRequest{}, 8192, 7, 6, ctx.getTMTContext()), + EngineStoreApplyRes::NotFound); + } + { + kvs.handleDestroy(2, ctx.getTMTContext()); + ASSERT_EQ(kvs.regionSize(), 1); + } + { + testRaftSplit(kvs, ctx.getTMTContext()); + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{}, raft_cmdpb::AdminResponse{}, 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + } + { + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{}, raft_cmdpb::AdminResponse{}, 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + } + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + raft_cmdpb::AdminResponse first_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(first_response), 7, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + raft_cmdpb::AdminResponse second_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(second_response), 7, 23, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + request.set_cmd_type(::raft_cmdpb::AdminCmdType::ComputeHash); + raft_cmdpb::AdminResponse third_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(third_response), 7, 24, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); + + request.set_cmd_type(::raft_cmdpb::AdminCmdType::VerifyHash); + raft_cmdpb::AdminResponse fourth_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(fourth_response), 7, 25, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); + + raft_cmdpb::AdminResponse fifth_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(fifth_response), 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + { + kvs.setRegionCompactLogConfig(0, 0, 0); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response), 7, 26, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + } + } + { + testRaftMergeRollback(kvs, ctx.getTMTContext()); + testRaftMerge(kvs, ctx.getTMTContext()); + } + { + testRaftChangePeer(kvs, ctx.getTMTContext()); + } + { + auto ori_snapshot_apply_method = kvs.snapshot_apply_method; + kvs.snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Single; + SCOPE_EXIT({ + kvs.snapshot_apply_method = ori_snapshot_apply_method; + }); + + + auto region_id = 19; + auto region = makeRegion(region_id, RecordKVFormat::genKey(1, 50), RecordKVFormat::genKey(1, 60)); + auto region_id_str = std::to_string(19); + auto & mmp = MockSSTReader::getMockSSTData(); + MockSSTReader::getMockSSTData().clear(); + MockSSTReader::Data default_kv_list; + { + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 55, 5).getStr(), TiKVValue("value1").getStr()); + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 58, 5).getStr(), TiKVValue("value2").getStr()); + } + mmp[MockSSTReader::Key{region_id_str, ColumnFamilyType::Default}] = std::move(default_kv_list); + std::vector sst_views; + sst_views.push_back(SSTView{ + ColumnFamilyType::Default, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }); + { + RegionMockTest mock_test(kvstore.get(), region); + + kvs.handleApplySnapshot( + region->getMetaRegion(), + 2, + SSTViewVec{sst_views.data(), sst_views.size()}, + 8, + 5, + ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(19)->checkIndex(8), true); + try + { + kvs.handleApplySnapshot( + region->getMetaRegion(), + 2, + {}, // empty + 6, // smaller index + 5, + ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "[region 19] already has newer apply-index 8 than 6, should not happen"); + } + } + + { + { + auto region = makeRegion(22, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 9, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); + } + try + { + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 9, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, but not tombstone + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "range of region 20 is overlapped with 22, state: region { id: 22 }"); + } + + { + const auto * ori_ptr = proxy_helper->proxy_ptr.inner; + proxy_helper->proxy_ptr.inner = nullptr; + SCOPE_EXIT({ + proxy_helper->proxy_ptr.inner = ori_ptr; + }); + + try + { + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 10, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "getRegionLocalState meet internal error: RaftStoreProxyPtr is none"); + } + } + + { + proxy_instance->getRegion(22)->setSate(({ + raft_serverpb::RegionLocalState s; + s.set_state(::raft_serverpb::PeerState::Tombstone); + s; + })); + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 10, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, tombstone, remove previous one + + auto state = proxy_helper->getRegionLocalState(8192); + ASSERT_EQ(state.state(), raft_serverpb::PeerState::Tombstone); + } + + kvs.handleDestroy(20, ctx.getTMTContext()); + } + } + + { + auto region_id = 19; + auto region_id_str = std::to_string(19); + auto & mmp = MockSSTReader::getMockSSTData(); + MockSSTReader::getMockSSTData().clear(); + MockSSTReader::Data default_kv_list; + { + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 55, 5).getStr(), TiKVValue("value1").getStr()); + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 58, 5).getStr(), TiKVValue("value2").getStr()); + } + mmp[MockSSTReader::Key{region_id_str, ColumnFamilyType::Default}] = std::move(default_kv_list); + + // Mock SST data for handle [star, end) + auto region = kvs.getRegion(region_id); + + RegionMockTest mock_test(kvstore.get(), region); + + { + // Mocking ingest a SST for column family "Write" + std::vector sst_views; + sst_views.push_back(SSTView{ + ColumnFamilyType::Default, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }); + kvs.handleIngestSST( + region_id, + SSTViewVec{sst_views.data(), sst_views.size()}, + 100, + 1, + ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(19)->checkIndex(100), true); + } + } + + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::InvalidAdmin); + + try + { + kvs.handleAdminRaftCmd(std::move(request), std::move(response), 1, 110, 6, ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "unsupported admin command type InvalidAdmin"); + } + } + { + // There shall be data to flush. + ASSERT_EQ(kvs.needFlushRegionData(19, ctx.getTMTContext()), true); + // Force flush until succeed only for testing. + ASSERT_EQ(kvs.tryFlushRegionData(19, true, ctx.getTMTContext(), 0, 0), true); + // Non existing region. + // Flush and CompactLog will not panic. + ASSERT_EQ(kvs.tryFlushRegionData(1999, true, ctx.getTMTContext(), 0, 0), true); + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), 1999, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + } +} + +TEST_F(RegionKVStoreTest, KVStoreRestore) +{ + { + KVStore & kvs = getKVS(); + { + auto store = metapb::Store{}; + store.set_id(1234); + kvs.setStore(store); + ASSERT_EQ(kvs.getStoreID(), store.id()); + } + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + { + auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)); + lock.regions.emplace(2, region); + lock.index.add(region); + } + { + auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)); + lock.regions.emplace(3, region); + lock.index.add(region); + } + } + kvs.tryPersist(1); + kvs.tryPersist(2); + kvs.tryPersist(3); + } + { + KVStore & kvs = reloadKVSFromDisk(); + kvs.getRegion(1); + kvs.getRegion(2); + kvs.getRegion(3); + } +} + +void test_mergeresult() +{ + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "x", ""), createRegionInfo(1000, "", "x")).source_at_left, false); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "", "x"), createRegionInfo(1000, "x", "")).source_at_left, true); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "x", "y"), createRegionInfo(1000, "y", "z")).source_at_left, true); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "y", "z"), createRegionInfo(1000, "x", "y")).source_at_left, false); + + { + RegionState region_state; + bool source_at_left; + RegionState source_region_state; + + region_state.setStartKey(RecordKVFormat::genKey(1, 0)); + region_state.setEndKey(RecordKVFormat::genKey(1, 10)); + + source_region_state.setStartKey(RecordKVFormat::genKey(1, 10)); + source_region_state.setEndKey(RecordKVFormat::genKey(1, 20)); + + source_at_left = false; + + ChangeRegionStateRange(region_state, source_at_left, source_region_state); + + ASSERT_EQ(region_state.getRange()->comparableKeys().first.key, RecordKVFormat::genKey(1, 0)); + ASSERT_EQ(region_state.getRange()->comparableKeys().second.key, RecordKVFormat::genKey(1, 20)); + } + { + RegionState region_state; + bool source_at_left; + RegionState source_region_state; + + region_state.setStartKey(RecordKVFormat::genKey(2, 5)); + region_state.setEndKey(RecordKVFormat::genKey(2, 10)); + + source_region_state.setStartKey(RecordKVFormat::genKey(2, 0)); + source_region_state.setEndKey(RecordKVFormat::genKey(2, 5)); + + source_at_left = true; + + ChangeRegionStateRange(region_state, source_at_left, source_region_state); + + ASSERT_EQ(region_state.getRange()->comparableKeys().first.key, RecordKVFormat::genKey(2, 0)); + ASSERT_EQ(region_state.getRange()->comparableKeys().second.key, RecordKVFormat::genKey(2, 10)); + } +} + +TEST_F(RegionKVStoreTest, Basic) +{ + { + RegionsRangeIndex region_index; + const auto & root_map = region_index.getRoot(); + ASSERT_EQ(root_map.size(), 2); + + region_index.add(makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10))); + + ASSERT_EQ(root_map.begin()->second.region_map.size(), 0); + + region_index.add(makeRegion(2, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 3))); + region_index.add(makeRegion(3, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1))); + + auto res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 3); + + region_index.add(makeRegion(4, RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 4))); + + ASSERT_EQ(root_map.size(), 7); + + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 4); + + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 1), TiKVKey(""))); + ASSERT_EQ(res.size(), 3); + + res = region_index.findByRangeOverlap( + RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 2), RecordKVFormat::genKey(1, 5))); + ASSERT_EQ(res.size(), 3); + ASSERT_TRUE(res.find(1) != res.end()); + ASSERT_TRUE(res.find(2) != res.end()); + ASSERT_TRUE(res.find(4) != res.end()); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 4)), 4); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 3); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1)), 3); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 2); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 3)), 2); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 1); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)), 1); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_TRUE(res.empty()); + + ASSERT_EQ(root_map.size(), 2); + } + + { + RegionsRangeIndex region_index; + const auto & root_map = region_index.getRoot(); + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(TiKVKey(), TiKVKey()), 1); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found region 1"); + } + + region_index.add(makeRegion(2, RecordKVFormat::genKey(1, 3), RecordKVFormat::genKey(1, 5))); + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 4), RecordKVFormat::genKey(1, 5)), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found start key"); + } + + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 3), RecordKVFormat::genKey(1, 4)), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found end key"); + } + + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 3), RecordKVFormat::genKey(1, 3)), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): range of region 2 is empty"); + } + + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 3), TiKVKey()), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found region 2"); + } + + region_index.clear(); + + try + { + region_index.add(makeRegion(6, RecordKVFormat::genKey(6, 6), RecordKVFormat::genKey(6, 6))); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + std::string tar = "Illegal region range, should not happen"; + ASSERT(res.size() > tar.size()); + ASSERT_EQ(res.substr(0, tar.size()), tar); + } + + region_index.clear(); + + region_index.add(makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1))); + region_index.add(makeRegion(2, RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 2))); + region_index.add(makeRegion(3, RecordKVFormat::genKey(1, 2), RecordKVFormat::genKey(1, 3))); + + ASSERT_EQ(root_map.size(), 6); + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 2), RecordKVFormat::genKey(1, 3)), 3); + ASSERT_EQ(root_map.size(), 5); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1)), 1); + ASSERT_EQ(root_map.size(), 4); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 2)), 2); + ASSERT_EQ(root_map.size(), 2); + } + { + test_mergeresult(); + } +} + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Storages/Transaction/tests/gtest_region_block_reader.cpp b/dbms/src/Storages/Transaction/tests/gtest_region_block_reader.cpp new file mode 100644 index 00000000000..f31133656e9 --- /dev/null +++ b/dbms/src/Storages/Transaction/tests/gtest_region_block_reader.cpp @@ -0,0 +1,640 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using TableInfo = TiDB::TableInfo; + +namespace DB::tests +{ +using ColumnIDs = std::vector; +class RegionBlockReaderTest : public ::testing::Test +{ +public: + RegionBlockReaderTest() + : logger(Logger::get("RegionBlockReaderTest")) + {} + +protected: + Int64 handle_value = 100; + UInt8 del_mark_value = 0; + UInt64 version_value = 100; + size_t rows = 3; + + RegionDataReadInfoList data_list_read; + std::unordered_map fields_map; + + LoggerPtr logger; + + enum RowEncodeVersion + { + RowV1, + RowV2 + }; + +protected: + void SetUp() override + { + data_list_read.clear(); + fields_map.clear(); + } + + void TearDown() override {} + + void encodeColumns(const TableInfo & table_info, const std::vector & fields, RowEncodeVersion row_version) + { + // for later check + std::unordered_map column_name_columns_index_map; + for (size_t i = 0; i < table_info.columns.size(); i++) + { + fields_map.emplace(table_info.columns[i].id, fields[i]); + column_name_columns_index_map.emplace(table_info.columns[i].name, i); + } + + std::vector value_encode_fields; + std::vector key_encode_fields; + for (size_t i = 0; i < table_info.columns.size(); i++) + { + if (table_info.is_common_handle || table_info.pk_is_handle) + { + if (table_info.columns[i].hasPriKeyFlag()) + key_encode_fields.emplace_back(fields[i]); + else + value_encode_fields.emplace_back(fields[i]); + } + else + { + value_encode_fields.emplace_back(fields[i]); + } + } + + // create the RawTiDBPK section of encoded key + WriteBufferFromOwnString pk_buf; + if (table_info.is_common_handle) + { + const auto & primary_index_info = table_info.getPrimaryIndexInfo(); + for (size_t i = 0; i < primary_index_info.idx_cols.size(); i++) + { + auto idx = column_name_columns_index_map[primary_index_info.idx_cols[i].name]; + EncodeDatum(key_encode_fields[i], table_info.columns[idx].getCodecFlag(), pk_buf); + } + } + else + { + DB::EncodeInt64(handle_value, pk_buf); + } + RawTiDBPK pk{std::make_shared(pk_buf.releaseStr())}; + + // create encoded value + WriteBufferFromOwnString value_buf; + if (row_version == RowEncodeVersion::RowV1) + { + encodeRowV1(table_info, value_encode_fields, value_buf); + } + else if (row_version == RowEncodeVersion::RowV2) + { + encodeRowV2(table_info, value_encode_fields, value_buf); + } + else + { + throw Exception("Unknown row format " + std::to_string(row_version), ErrorCodes::LOGICAL_ERROR); + } + auto row_value = std::make_shared(value_buf.releaseStr()); + for (size_t i = 0; i < rows; i++) + data_list_read.emplace_back(pk, del_mark_value, version_value, row_value); + } + + void checkBlock(DecodingStorageSchemaSnapshotConstPtr decoding_schema, const Block & block) const + { + ASSERT_EQ(block.columns(), decoding_schema->column_defines->size()); + for (size_t row = 0; row < rows; row++) + { + for (size_t pos = 0; pos < block.columns(); pos++) + { + const auto & column_element = block.getByPosition(pos); + auto gen_error_log = [&]() { + return fmt::format( + " when checking column\n id={}, name={}, nrow={}\n decoded block is:\n{}\n", + column_element.column_id, + column_element.name, + row, + getColumnsContent(block.getColumnsWithTypeAndName())); + }; + if (row == 0) + { + ASSERT_EQ(column_element.column->size(), rows); + } + if (column_element.name == EXTRA_HANDLE_COLUMN_NAME) + { + if (decoding_schema->is_common_handle) + { + ASSERT_FIELD_EQ((*column_element.column)[row], Field(*std::get<0>(data_list_read[row]))) << gen_error_log(); + } + else + { + ASSERT_FIELD_EQ((*column_element.column)[row], Field(handle_value)) << gen_error_log(); + } + } + else if (column_element.name == VERSION_COLUMN_NAME) + { + ASSERT_FIELD_EQ((*column_element.column)[row], Field(version_value)) << gen_error_log(); + } + else if (column_element.name == TAG_COLUMN_NAME) + { + ASSERT_FIELD_EQ((*column_element.column)[row], Field(NearestFieldType::Type(del_mark_value))) << gen_error_log(); + } + else + { + if (fields_map.count(column_element.column_id) > 0) + ASSERT_FIELD_EQ((*column_element.column)[row], fields_map.at(column_element.column_id)) << gen_error_log(); + else + LOG_INFO(logger, "ignore value check for new added column, id={}, name={}", column_element.column_id, column_element.name); + } + } + } + } + + bool decodeAndCheckColumns(DecodingStorageSchemaSnapshotConstPtr decoding_schema, bool force_decode) const + { + RegionBlockReader reader{decoding_schema}; + Block block = createBlockSortByColumnID(decoding_schema); + if (!reader.read(block, data_list_read, force_decode)) + return false; + + checkBlock(decoding_schema, block); + return true; + } + + std::pair> getNormalTableInfoFields(const ColumnIDs & pk_col_ids, bool is_common_handle) const + { + return getTableInfoAndFields( + pk_col_ids, + is_common_handle, + ColumnIDValue(2, handle_value), + ColumnIDValue(3, std::numeric_limits::max()), + ColumnIDValue(4, std::numeric_limits::min()), + ColumnIDValue(9, String("aaa")), + ColumnIDValue(10, DecimalField(ToDecimal(12345678910ULL, 4), 4)), + ColumnIDValueNull(11)); + } + + TableInfo getTableInfoWithMoreColumns(const ColumnIDs & handle_ids, bool is_common_handle) + { + TableInfo table_info; + std::tie(table_info, std::ignore) = getTableInfoAndFields( + handle_ids, + is_common_handle, + ColumnIDValue(1, String("")), + ColumnIDValue(2, handle_value), + ColumnIDValue(3, std::numeric_limits::max()), + ColumnIDValue(4, std::numeric_limits::min()), + ColumnIDValue(8, String("")), + ColumnIDValue(9, String("aaa")), + ColumnIDValue(10, DecimalField(ToDecimal(12345678910ULL, 4), 4)), + ColumnIDValueNull(11), + ColumnIDValue(13, String(""))); + + // add default value for missing column + std::vector missing_column_ids{1, 8, 13}; + String missing_column_default_value = String("default"); + for (auto & column : table_info.columns) + { + if (std::find(missing_column_ids.begin(), missing_column_ids.end(), column.id) != missing_column_ids.end()) + { + column.origin_default_value = missing_column_default_value; + fields_map.emplace(column.id, Field(missing_column_default_value)); + } + } + return table_info; + } + + TableInfo getTableInfoWithLessColumns(const ColumnIDs & handle_ids, bool is_common_handle) const + { + TableInfo table_info; + std::tie(table_info, std::ignore) = getTableInfoAndFields( + handle_ids, + is_common_handle, + ColumnIDValue(2, handle_value), + ColumnIDValue(4, std::numeric_limits::min()), + ColumnIDValue(9, String("aaa")), + ColumnIDValue(10, DecimalField(ToDecimal(12345678910ULL, 4), 4))); + return table_info; + } + + TableInfo getTableInfoWithMoreNarrowIntType(const ColumnIDs & handle_ids, bool is_common_handle) const + { + TableInfo table_info; + std::tie(table_info, std::ignore) = getTableInfoAndFields( + handle_ids, + is_common_handle, + ColumnIDValue(2, handle_value), + ColumnIDValue(3, std::numeric_limits::max()), + ColumnIDValue(4, std::numeric_limits::min()), + ColumnIDValue(9, String("aaa")), + ColumnIDValue(10, DecimalField(ToDecimal(12345678910ULL, 4), 4)), + ColumnIDValueNull(11)); + return table_info; + } + + TableInfo getTableInfoFieldsForInvalidNULLTest(const ColumnIDs & handle_ids, bool is_common_handle) const + { + TableInfo table_info; + std::tie(table_info, std::ignore) = getTableInfoAndFields( + handle_ids, + is_common_handle, + ColumnIDValue(2, handle_value), + ColumnIDValue(3, std::numeric_limits::max()), + ColumnIDValue(4, std::numeric_limits::min()), + ColumnIDValue(9, String("aaa")), + ColumnIDValue(10, DecimalField(ToDecimal(12345678910ULL, 4), 4)), + ColumnIDValue(11, std::numeric_limits::min())); + return table_info; + } +}; + +String bytesFromHexString(std::string_view hex_str) +{ + assert(hex_str.size() % 2 == 0); + String bytes(hex_str.size() / 2, '\x00'); + for (size_t i = 0; i < bytes.size(); ++i) + { + bytes[i] = unhex2(hex_str.data() + i * 2); + } + return bytes; +} + +TEST_F(RegionBlockReaderTest, PKIsNotHandle) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_FALSE(table_info.getColumnInfo(2).hasPriKeyFlag()); + + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + ASSERT_TRUE(decodeAndCheckColumns(decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, PKIsHandle) +{ + auto [table_info, fields] = getNormalTableInfoFields({2}, false); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, true); + ASSERT_TRUE(table_info.getColumnInfo(2).hasPriKeyFlag()); + + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + ASSERT_TRUE(decodeAndCheckColumns(decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, CommonHandle) +{ + auto [table_info, fields] = getNormalTableInfoFields({2, 3, 4}, true); + ASSERT_EQ(table_info.is_common_handle, true); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(2).hasPriKeyFlag()); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(table_info.getColumnInfo(4).hasPriKeyFlag()); + + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + ASSERT_TRUE(decodeAndCheckColumns(decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, MissingColumnRowV2) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto new_table_info = getTableInfoWithMoreColumns({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_TRUE(decodeAndCheckColumns(new_decoding_schema, false)); +} + +TEST_F(RegionBlockReaderTest, MissingColumnRowV1) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + auto new_table_info = getTableInfoWithMoreColumns({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_TRUE(decodeAndCheckColumns(new_decoding_schema, false)); +} + +TEST_F(RegionBlockReaderTest, ExtraColumnRowV2) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto new_table_info = getTableInfoWithLessColumns({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, ExtraColumnRowV1) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + auto new_table_info = getTableInfoWithLessColumns({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, OverflowColumnRowV2) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto new_table_info = getTableInfoWithMoreNarrowIntType({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_ANY_THROW(decodeAndCheckColumns(new_decoding_schema, true)); + + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + ASSERT_TRUE(decodeAndCheckColumns(decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, OverflowColumnRowV1) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + auto new_table_info = getTableInfoWithMoreNarrowIntType({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_ANY_THROW(decodeAndCheckColumns(new_decoding_schema, true)); + + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + ASSERT_TRUE(decodeAndCheckColumns(decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, InvalidNULLRowV2) +try +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + ASSERT_FALSE(table_info.getColumnInfo(11).hasNotNullFlag()); // col 11 is nullable + + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + + auto new_table_info = getTableInfoFieldsForInvalidNULLTest({EXTRA_HANDLE_COLUMN_ID}, false); + ASSERT_TRUE(new_table_info.getColumnInfo(11).hasNotNullFlag()); // col 11 is not null + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_ANY_THROW(decodeAndCheckColumns(new_decoding_schema, true)); +} +CATCH + +TEST_F(RegionBlockReaderTest, InvalidNULLRowV1) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + auto new_table_info = getTableInfoFieldsForInvalidNULLTest({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_ANY_THROW(decodeAndCheckColumns(new_decoding_schema, true)); +} + + +TEST_F(RegionBlockReaderTest, MissingPrimaryKeyColumnRowV2) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) DEFAULT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello"), ColumnIDValueNull(4)); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_FALSE(table_info.getColumnInfo(4).hasPriKeyFlag()); + + // FIXME: actually TiDB won't encode the "NULL" for column4 into value + // but now the `RowEncoderV2` does not support this, we use `RegionBlockReaderTest::ReadFromRegion` + // to test that. + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + + // Mock re-create the primary key index with "column4" that contains `NULL` value + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + // FIXME: actually we need to decode the block with force_decode=true, see the + // comments before `encodeColumns` + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); + // // force_decode=false can not decode because there are + // // missing value for column with primary key flag. + // EXPECT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + // // force_decode=true, decode ok. + // EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} +CATCH + +TEST_F(RegionBlockReaderTest, MissingPrimaryKeyColumnRowV1) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) DEFAULT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello"), ColumnIDValueNull(4)); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_FALSE(table_info.getColumnInfo(4).hasPriKeyFlag()); + + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + + // Mock re-create the primary key index with "column4" that contains `NULL` value + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, false)); +} +CATCH + +TEST_F(RegionBlockReaderTest, NewMissingPrimaryKeyColumnRowV2) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello")); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_ANY_THROW(table_info.getColumnInfo(4)); // not exist + + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + + // Mock re-create the primary key index with new-added "column4" + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + // force_decode=false can not decode because there are + // missing value for column with primary key flag. + EXPECT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + // force_decode=true, decode ok. + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} +CATCH + +TEST_F(RegionBlockReaderTest, NewMissingPrimaryKeyColumnRowV1) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello")); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_ANY_THROW(table_info.getColumnInfo(4)); // not exist + + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + + // Mock re-create the primary key index with new-added "column4" + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + // force_decode=false can not decode because there are + // missing value for column with primary key flag. + EXPECT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + // force_decode=true, decode ok. + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} +CATCH + +TEST_F(RegionBlockReaderTest, ReadFromRegion) +try +{ + TableInfo table_info(R"({"cols":[ + {"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"case_no","O":"case_no"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"utf8mb4","Collate":"utf8mb4_bin","Decimal":0,"Elems":null,"Flag":4099,"Flen":32,"Tp":15}}, + {"comment":"","default":null,"default_bit":null,"id":2,"name":{"L":"p","O":"p"},"offset":1,"origin_default":null,"state":5,"type":{"Charset":"utf8mb4","Collate":"utf8mb4_bin","Decimal":0,"Elems":null,"Flag":0,"Flen":12,"Tp":15}}, + {"comment":"","default":null,"default_bit":null,"id":3,"name":{"L":"source","O":"source"},"offset":2,"origin_default":"","state":5,"type":{"Charset":"utf8mb4","Collate":"utf8mb4_bin","Decimal":0,"Elems":null,"Flag":4099,"Flen":20,"Tp":15}} + ],"comment":"","id":77,"index_info":[],"is_common_handle":false,"name":{"L":"t_case","O":"t_case"},"partition":null,"pk_is_handle":false,"schema_version":62,"state":5,"tiflash_replica":{"Count":1},"update_timestamp":435984541435559947})"); + + RegionID region_id = 4; + String region_start_key(bytesFromHexString("7480000000000000FF445F720000000000FA")); + String region_end_key(bytesFromHexString("7480000000000000FF4500000000000000F8")); + auto region = makeRegion(region_id, region_start_key, region_end_key); + // the hex kv dump from SSTFile + std::vector> kvs = { + {"7480000000000000FF4D5F728000000000FF0000010000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000010000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010031"}, + {"7480000000000000FF4D5F728000000000FF0000020000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000020000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010032"}, + {"7480000000000000FF4D5F728000000000FF0000030000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000030000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010033"}, + {"7480000000000000FF4D5F728000000000FF0000040000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000040000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010034"}, + }; + for (const auto & [k, v] : kvs) + { + region->insert(ColumnFamilyType::Write, TiKVKey(bytesFromHexString(k)), TiKVValue(bytesFromHexString(v))); + } + + auto data_list_read = ReadRegionCommitCache(region, true); + ASSERT_TRUE(data_list_read.has_value()); + + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + { + // force_decode=false can not decode because there are + // missing value for column with primary key flag. + auto reader = RegionBlockReader(decoding_schema); + Block res_block = createBlockSortByColumnID(decoding_schema); + EXPECT_FALSE(reader.read(res_block, *data_list_read, false)); + } + { + // force_decode=true can decode the block + auto reader = RegionBlockReader(decoding_schema); + Block res_block = createBlockSortByColumnID(decoding_schema); + EXPECT_TRUE(reader.read(res_block, *data_list_read, true)); + res_block.checkNumberOfRows(); + EXPECT_EQ(res_block.rows(), 4); + ASSERT_COLUMN_EQ(res_block.getByName("case_no"), createColumn({"1", "2", "3", "4"})); + } +} +CATCH + + +} // namespace DB::tests diff --git a/tests/fullstack-test2/ddl/alter_pk.test b/tests/fullstack-test2/ddl/alter_pk.test index 3541fcb270a..5859e21cf46 100644 --- a/tests/fullstack-test2/ddl/alter_pk.test +++ b/tests/fullstack-test2/ddl/alter_pk.test @@ -43,3 +43,56 @@ mysql> alter table test.t drop primary key; │ f │ Nullable(Int32) │ │ │ │ _tidb_rowid │ Int64 │ │ │ └─────────────┴─────────────────┴──────────────┴────────────────────┘ + +# issue 5859, case 1 +mysql> drop table if exists test.t_case; +## create table with `source` is nullable +## insert some data and left `source` to be empty +mysql> create table test.t_case (`case_no` varchar(32) not null,`source` varchar(20) default null,`p` varchar(12) DEFAULT NULL,primary key (`case_no`)); +mysql> insert into test.t_case(case_no) values ("1"), ("2"), ("3"), ("4"); + +## drop the primary key, fill the `source` to be non-empty +## add new primary key with case_no and source +mysql> alter table test.t_case drop primary key; +mysql> update test.t_case set `source` = '' where `source` is NULL; +mysql> alter table test.t_case add primary key (`case_no`, `source`); + +## send the snapshot data to tiflash +mysql> alter table test.t_case set tiflash replica 1; +func> wait_table test t_case +mysql> select case_no,p,source from test.t_case; ++---------+------+--------+ +| case_no | p | source | ++---------+------+--------+ +| 1 | NULL | | +| 2 | NULL | | +| 3 | NULL | | +| 4 | NULL | | ++---------+------+--------+ + + +# issue 5859, case 2 +mysql> drop table if exists test.t_case; +## create table with `case_no` +mysql> create table test.t_case (`case_no` varchar(32) not null,`p` varchar(12) DEFAULT NULL,primary key (`case_no`)); +mysql> insert into test.t_case(case_no) values ("1"), ("2"), ("3"), ("4"); + +mysql> alter table test.t_case add column `source` varchar(20) not null; +## drop the primary key, add new primary key with case_no and source +mysql> alter table test.t_case drop primary key; +mysql> alter table test.t_case add primary key (`case_no`, `source`); + +## send the snapshot data to tiflash +mysql> alter table test.t_case set tiflash replica 1; +func> wait_table test t_case +mysql> select case_no,p,source from test.t_case; ++---------+------+--------+ +| case_no | p | source | ++---------+------+--------+ +| 1 | NULL | | +| 2 | NULL | | +| 3 | NULL | | +| 4 | NULL | | ++---------+------+--------+ + +mysql> drop table if exists test.t_case;