diff --git a/dbms/src/Columns/ColumnAggregateFunction.cpp b/dbms/src/Columns/ColumnAggregateFunction.cpp index 4d5bbec12fe..c6fbb3d5863 100644 --- a/dbms/src/Columns/ColumnAggregateFunction.cpp +++ b/dbms/src/Columns/ColumnAggregateFunction.cpp @@ -83,11 +83,20 @@ void ColumnAggregateFunction::insertRangeFrom(const IColumn & from, size_t start const ColumnAggregateFunction & from_concrete = static_cast(from); if (start + length > from_concrete.getData().size()) +<<<<<<< HEAD throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnAggregateFunction::insertRangeFrom method" " (data.size() = " + toString(from_concrete.getData().size()) + ").", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnAggregateFunction::insertRangeFrom method, start={}, length={}, from.size()={}", + start, + length, + from_concrete.getData().size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); if (!empty() && src.get() != &from_concrete) diff --git a/dbms/src/Columns/ColumnArray.cpp b/dbms/src/Columns/ColumnArray.cpp index 537870b0209..5591ce671b3 100644 --- a/dbms/src/Columns/ColumnArray.cpp +++ b/dbms/src/Columns/ColumnArray.cpp @@ -383,7 +383,16 @@ void ColumnArray::insertRangeFrom(const IColumn & src, size_t start, size_t leng const ColumnArray & src_concrete = static_cast(src); if (start + length > src_concrete.getOffsets().size()) +<<<<<<< HEAD throw Exception("Parameter out of bound in ColumnArray::insertRangeFrom method.", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnArray::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_concrete.getOffsets().size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t nested_offset = src_concrete.offsetAt(start); diff --git a/dbms/src/Columns/ColumnDecimal.cpp b/dbms/src/Columns/ColumnDecimal.cpp index 15b22bacb53..9cf7e1e58d9 100644 --- a/dbms/src/Columns/ColumnDecimal.cpp +++ b/dbms/src/Columns/ColumnDecimal.cpp @@ -194,8 +194,17 @@ void ColumnDecimal::insertRangeFrom(const IColumn & src, size_t start, size_t const ColumnDecimal & src_vec = static_cast(src); if (start + length > src_vec.data.size()) +<<<<<<< HEAD throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnDecimal::insertRangeFrom method (data.size() = " + toString(src_vec.data.size()) + ").", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnDecimal::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_vec.data.size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t old_size = data.size(); diff --git a/dbms/src/Columns/ColumnFixedString.cpp b/dbms/src/Columns/ColumnFixedString.cpp index d95aa3d3f54..208b32dc3aa 100644 --- a/dbms/src/Columns/ColumnFixedString.cpp +++ b/dbms/src/Columns/ColumnFixedString.cpp @@ -150,10 +150,19 @@ void ColumnFixedString::insertRangeFrom(const IColumn & src, size_t start, size_ const ColumnFixedString & src_concrete = static_cast(src); if (start + length > src_concrete.size()) +<<<<<<< HEAD throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnFixedString::insertRangeFrom method" " (size() = " + toString(src_concrete.size()) + ").", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnFixedString::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_concrete.size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t old_size = chars.size(); diff --git a/dbms/src/Columns/ColumnString.cpp b/dbms/src/Columns/ColumnString.cpp index 3d1b80a86d2..b68b7d76f44 100644 --- a/dbms/src/Columns/ColumnString.cpp +++ b/dbms/src/Columns/ColumnString.cpp @@ -71,7 +71,16 @@ void ColumnString::insertRangeFrom(const IColumn & src, size_t start, size_t len const ColumnString & src_concrete = static_cast(src); if (start + length > src_concrete.offsets.size()) +<<<<<<< HEAD throw Exception("Parameter out of bound in IColumnString::insertRangeFrom method.", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnString::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_concrete.size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t nested_offset = src_concrete.offsetAt(start); diff --git a/dbms/src/Columns/ColumnVector.cpp b/dbms/src/Columns/ColumnVector.cpp index 21d7a8ef923..aba57882c66 100644 --- a/dbms/src/Columns/ColumnVector.cpp +++ b/dbms/src/Columns/ColumnVector.cpp @@ -155,10 +155,19 @@ void ColumnVector::insertRangeFrom(const IColumn & src, size_t start, size_t const ColumnVector & src_vec = static_cast(src); if (start + length > src_vec.data.size()) +<<<<<<< HEAD throw Exception("Parameters start = " + toString(start) + ", length = " + toString(length) + " are out of bound in ColumnVector::insertRangeFrom method" " (data.size() = " + toString(src_vec.data.size()) + ").", +======= + throw Exception( + fmt::format( + "Parameters are out of bound in ColumnVector::insertRangeFrom method, start={}, length={}, src.size()={}", + start, + length, + src_vec.data.size()), +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) ErrorCodes::PARAMETER_OUT_OF_BOUND); size_t old_size = data.size(); diff --git a/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h new file mode 100644 index 00000000000..490a646f5fe --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/SSTFilesToBlockInputStream.h @@ -0,0 +1,149 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace Poco +{ +class Logger; +} + +namespace DB +{ +class TMTContext; +class Region; +using RegionPtr = std::shared_ptr; + +struct SSTViewVec; +struct TiFlashRaftProxyHelper; +struct SSTReader; +class StorageDeltaMerge; + +namespace DM +{ +struct ColumnDefine; +using ColumnDefines = std::vector; +using ColumnDefinesPtr = std::shared_ptr; + +// forward declaration +class SSTFilesToBlockInputStream; +using SSTFilesToBlockInputStreamPtr = std::shared_ptr; +class BoundedSSTFilesToBlockInputStream; +using BoundedSSTFilesToBlockInputStreamPtr = std::shared_ptr; + +class SSTFilesToBlockInputStream final : public IBlockInputStream +{ +public: + SSTFilesToBlockInputStream(RegionPtr region_, + const SSTViewVec & snaps_, + const TiFlashRaftProxyHelper * proxy_helper_, + DecodingStorageSchemaSnapshotConstPtr schema_snap_, + Timestamp gc_safepoint_, + bool force_decode_, + TMTContext & tmt_, + size_t expected_size_ = DEFAULT_MERGE_BLOCK_SIZE); + ~SSTFilesToBlockInputStream() override; + + String getName() const override { return "SSTFilesToBlockInputStream"; } + + Block getHeader() const override { return toEmptyBlock(*(schema_snap->column_defines)); } + + void readPrefix() override; + void readSuffix() override; + Block read() override; + +public: + struct ProcessKeys + { + size_t default_cf = 0; + size_t write_cf = 0; + size_t lock_cf = 0; + + inline size_t total() const { return default_cf + write_cf + lock_cf; } + }; + +private: + void loadCFDataFromSST(ColumnFamilyType cf, const DecodedTiKVKey * rowkey_to_be_included); + + Block readCommitedBlock(); + +private: + RegionPtr region; + const SSTViewVec & snaps; + const TiFlashRaftProxyHelper * proxy_helper{nullptr}; + DecodingStorageSchemaSnapshotConstPtr schema_snap; + TMTContext & tmt; + const Timestamp gc_safepoint; + size_t expected_size; + Poco::Logger * log; + + using SSTReaderPtr = std::unique_ptr; + SSTReaderPtr write_cf_reader; + SSTReaderPtr default_cf_reader; + SSTReaderPtr lock_cf_reader; + + DecodedTiKVKey default_last_loaded_rowkey; + DecodedTiKVKey lock_last_loaded_rowkey; + + friend class BoundedSSTFilesToBlockInputStream; + + const bool force_decode; + bool is_decode_cancelled = false; + + ProcessKeys process_keys; +}; + +// Bound the blocks read from SSTFilesToBlockInputStream by column `_tidb_rowid` and +// do some calculation for the `DMFileWriter::BlockProperty` of read blocks. +class BoundedSSTFilesToBlockInputStream final +{ +public: + BoundedSSTFilesToBlockInputStream(SSTFilesToBlockInputStreamPtr child, + const ColId pk_column_id_, + const DecodingStorageSchemaSnapshotConstPtr & schema_snap); + + String getName() const { return "BoundedSSTFilesToBlockInputStream"; } + + void readPrefix(); + + void readSuffix(); + + Block read(); + + SSTFilesToBlockInputStream::ProcessKeys getProcessKeys() const; + + RegionPtr getRegion() const; + + // Return values: (effective rows, not clean rows, is delete rows, gc hint version) + std::tuple getMvccStatistics() const; + +private: + const ColId pk_column_id; + + // Note that we only keep _raw_child for getting ingest info / process key, etc. All block should be + // read from `mvcc_compact_stream` + const SSTFilesToBlockInputStreamPtr _raw_child; + std::unique_ptr> mvcc_compact_stream; +}; + +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/Transaction/ApplySnapshot.cpp b/dbms/src/Storages/Transaction/ApplySnapshot.cpp index 5d4f5edd504..057621d7790 100644 --- a/dbms/src/Storages/Transaction/ApplySnapshot.cpp +++ b/dbms/src/Storages/Transaction/ApplySnapshot.cpp @@ -14,6 +14,11 @@ #include #include #include +<<<<<<< HEAD +======= +#include +#include +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) #include @@ -235,8 +240,30 @@ void KVStore::onSnapshot(const RegionPtrWithBlock & new_region_wrap, RegionPtr o } } +<<<<<<< HEAD extern RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr &, Context &); +======= +std::vector KVStore::preHandleSnapshotToFiles( + RegionPtr new_region, + const SSTViewVec snaps, + uint64_t index, + uint64_t term, + TMTContext & tmt) +{ + std::vector ingest_ids; + try + { + ingest_ids = preHandleSSTsToDTFiles(new_region, snaps, index, term, DM::FileConvertJobType::ApplySnapshot, tmt); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("(while preHandleSnapshot region_id={}, index={}, term={})", new_region->id(), index, term)); + e.rethrow(); + } + return ingest_ids; +} +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) RegionPreDecodeBlockDataPtr KVStore::preHandleSnapshot(RegionPtr new_region, const SSTViewVec snaps, TMTContext & tmt) { @@ -257,6 +284,12 @@ RegionPreDecodeBlockDataPtr KVStore::preHandleSnapshot(RegionPtr new_region, con SCOPE_EXIT( { GET_METRIC(metrics, tiflash_raft_command_duration_seconds, type_apply_snapshot_predecode).Observe(watch.elapsedSeconds()); }); +<<<<<<< HEAD +======= + PageIds generated_ingest_ids; + TableID physical_table_id = InvalidTableID; + while (true) +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) { LOG_INFO(log, "Pre-handle snapshot " << new_region->toString(false) << " with " << snaps.len << " TiKV sst files"); // Iterator over all SST files and insert key-values into `new_region` @@ -274,12 +307,44 @@ RegionPreDecodeBlockDataPtr KVStore::preHandleSnapshot(RegionPtr new_region, con ++kv_size; sst_reader.next(); } + physical_table_id = storage->getTableInfo().id; +<<<<<<< HEAD LOG_INFO(log, "Decode " << std::string_view(snapshot.path.data, snapshot.path.len) << " got [cf: " << CFToName(snapshot.type) << ", kv size: " << kv_size << "]"); // Note that number of keys in different cf will be aggregated into one metrics GET_METRIC(metrics, tiflash_raft_process_keys, type_apply_snapshot).Increment(kv_size); +======= + // Read from SSTs and refine the boundary of blocks output to DTFiles + auto sst_stream = std::make_shared( + new_region, + snaps, + proxy_helper, + schema_snap, + gc_safepoint, + force_decode, + tmt, + expected_block_size); + auto bounded_stream = std::make_shared(sst_stream, ::DB::TiDBPkColumnID, schema_snap); + stream = std::make_shared>( + bounded_stream, + storage, + schema_snap, + snapshot_apply_method, + job_type, + /* split_after_rows */ 0, + /* split_after_size */ 0, + tmt.getContext()); + + stream->writePrefix(); + stream->write(); + stream->writeSuffix(); + generated_ingest_ids = stream->ingestIds(); + + (void)table_drop_lock; // the table should not be dropped during ingesting file + break; +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } { LOG_INFO(log, "Start to pre-decode " << new_region->toString() << " into block"); @@ -287,14 +352,26 @@ RegionPreDecodeBlockDataPtr KVStore::preHandleSnapshot(RegionPtr new_region, con if (block_cache) LOG_INFO(log, "Got pre-decode block cache"; block_cache->toString(oss_internal_rare)); else +<<<<<<< HEAD LOG_INFO(log, "Got empty pre-decode block cache"); cache = std::move(block_cache); +======= + { + // Other unrecoverable error, throw + e.addMessage(fmt::format("physical_table_id={}", physical_table_id)); + throw; + } +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } LOG_INFO(log, "Pre-handle snapshot " << new_region->toString(false) << " cost " << watch.elapsedMilliseconds() << "ms"); } +<<<<<<< HEAD return cache; +======= + return generated_ingest_ids; +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } void KVStore::handlePreApplySnapshot(const RegionPtrWithBlock & new_region, TMTContext & tmt) @@ -402,4 +479,70 @@ EngineStoreApplyRes KVStore::handleIngestSST(UInt64 region_id, const SSTViewVec } } +<<<<<<< HEAD +======= +RegionPtr KVStore::handleIngestSSTByDTFile(const RegionPtr & region, const SSTViewVec snaps, UInt64 index, UInt64 term, TMTContext & tmt) +{ + if (index <= region->appliedIndex()) + return nullptr; + + // Create a tmp region to store uncommitted data + RegionPtr tmp_region; + { + auto meta_region = region->getMetaRegion(); + auto meta_snap = region->dumpRegionMetaSnapshot(); + auto peer_id = meta_snap.peer.id(); + tmp_region = genRegionPtr(std::move(meta_region), peer_id, index, term); + } + + // Decode the KV pairs in ingesting SST into DTFiles + PageIds ingest_ids; + try + { + ingest_ids = preHandleSSTsToDTFiles(tmp_region, snaps, index, term, DM::FileConvertJobType::IngestSST, tmt); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format("(while handleIngestSST region_id={}, index={}, term={})", tmp_region->id(), index, term)); + e.rethrow(); + } + + // If `ingest_ids` is empty, ingest SST won't write delete_range for ingest region, it is safe to + // ignore the step of calling `ingestFiles` + if (!ingest_ids.empty()) + { + auto table_id = region->getMappedTableID(); + if (auto storage = tmt.getStorages().get(table_id); storage) + { + // Ingest DTFiles into DeltaMerge storage + auto & context = tmt.getContext(); + try + { + // Acquire `drop_lock` so that no other threads can drop the storage. `alter_lock` is not required. + auto table_lock = storage->lockForShare(getThreadName()); + auto key_range = DM::RowKeyRange::fromRegionRange( + region->getRange(), + table_id, + storage->isCommonHandle(), + storage->getRowKeyColumnSize()); + // Call `ingestFiles` to ingest external DTFiles. + // Note that ingest sst won't remove the data in the key range + auto dm_storage = std::dynamic_pointer_cast(storage); + dm_storage->ingestFiles(key_range, ingest_ids, /*clear_data_in_range=*/false, context.getSettingsRef()); + } + catch (DB::Exception & e) + { + // We can ignore if storage is dropped. + if (e.code() == ErrorCodes::TABLE_IS_DROPPED) + return nullptr; + else + throw; + } + } + } + + return tmp_region; +} + +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } // namespace DB diff --git a/dbms/src/Storages/Transaction/PartitionStreams.cpp b/dbms/src/Storages/Transaction/PartitionStreams.cpp index ec8de5b9e25..7a52dd3b729 100644 --- a/dbms/src/Storages/Transaction/PartitionStreams.cpp +++ b/dbms/src/Storages/Transaction/PartitionStreams.cpp @@ -234,22 +234,20 @@ std::variant ReadRegionCommitCache(const RegionPtr & region, bool lock_region = true) +std::optional ReadRegionCommitCache(const RegionPtr & region, bool lock_region) { auto scanner = region->createCommittedScanner(lock_region); /// Some sanity checks for region meta. - { - if (region->isPendingRemove()) - return std::nullopt; - } + if (region->isPendingRemove()) + return std::nullopt; /// Read raw KVs from region cache. - { - // Shortcut for empty region. - if (!scanner.hasNext()) - return std::nullopt; + // Shortcut for empty region. + if (!scanner.hasNext()) + return std::nullopt; +<<<<<<< HEAD RegionDataReadInfoList data_list_read; data_list_read.reserve(scanner.writeMapSize()); @@ -259,6 +257,15 @@ std::optional ReadRegionCommitCache(const RegionPtr & re } while (scanner.hasNext()); return std::move(data_list_read); } +======= + RegionDataReadInfoList data_list_read; + data_list_read.reserve(scanner.writeMapSize()); + do + { + data_list_read.emplace_back(scanner.next()); + } while (scanner.hasNext()); + return data_list_read; +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoList & data_list_read, bool lock_region = true) @@ -397,7 +404,7 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio std::optional data_list_read = std::nullopt; try { - data_list_read = ReadRegionCommitCache(region); + data_list_read = ReadRegionCommitCache(region, true); if (!data_list_read) return nullptr; } @@ -475,4 +482,122 @@ RegionPtrWithBlock::CachePtr GenRegionPreDecodeBlockData(const RegionPtr & regio return std::make_unique(std::move(res_block), schema_version, std::move(*data_list_read)); } +<<<<<<< HEAD +======= +std::tuple, DecodingStorageSchemaSnapshotConstPtr> // +AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt) +{ + TableLockHolder drop_lock = nullptr; + std::shared_ptr dm_storage; + DecodingStorageSchemaSnapshotConstPtr schema_snapshot; + + auto table_id = region->getMappedTableID(); + LOG_FMT_DEBUG(&Poco::Logger::get(__PRETTY_FUNCTION__), "Get schema for table {}", table_id); + auto context = tmt.getContext(); + const auto atomic_get = [&](bool force_decode) -> bool { + auto storage = tmt.getStorages().get(table_id); + if (storage == nullptr) + { + if (!force_decode) + return false; + if (storage == nullptr) // Table must have just been GC-ed + return true; + } + // Get a structure read lock. It will throw exception if the table has been dropped, + // the caller should handle this situation. + auto table_lock = storage->lockStructureForShare(getThreadName()); + dm_storage = std::dynamic_pointer_cast(storage); + // only dt storage engine support `getSchemaSnapshotAndBlockForDecoding`, other engine will throw exception + std::tie(schema_snapshot, std::ignore) = storage->getSchemaSnapshotAndBlockForDecoding(table_lock, false); + std::tie(std::ignore, drop_lock) = std::move(table_lock).release(); + return true; + }; + + if (!atomic_get(false)) + { + GET_METRIC(tiflash_schema_trigger_count, type_raft_decode).Increment(); + tmt.getSchemaSyncer()->syncSchemas(context); + + if (!atomic_get(true)) + throw Exception("Get " + region->toString() + " belonging table " + DB::toString(table_id) + " is_command_handle fail", + ErrorCodes::LOGICAL_ERROR); + } + + return {std::move(drop_lock), std::move(dm_storage), std::move(schema_snapshot)}; +} + +static Block sortColumnsBySchemaSnap(Block && ori, const DM::ColumnDefines & schema) +{ +#ifndef NDEBUG + // Some trival check to ensure the input is legal + if (ori.columns() != schema.size()) + { + throw Exception("Try to sortColumnsBySchemaSnap with different column size [block_columns=" + DB::toString(ori.columns()) + + "] [schema_columns=" + DB::toString(schema.size()) + "]"); + } +#endif + + std::map index_by_cid; + for (size_t i = 0; i < ori.columns(); ++i) + { + const ColumnWithTypeAndName & c = ori.getByPosition(i); + index_by_cid[c.column_id] = i; + } + + Block res; + for (const auto & cd : schema) + { + res.insert(ori.getByPosition(index_by_cid[cd.id])); + } +#ifndef NDEBUG + assertBlocksHaveEqualStructure(res, DM::toEmptyBlock(schema), "sortColumnsBySchemaSnap"); +#endif + + return res; +} + +/// Decode region data into block and belonging schema snapshot, remove committed data from `region` +/// The return value is a block that store the committed data scanned and removed from `region`. +/// The columns of returned block is sorted by `schema_snap`. +Block GenRegionBlockDataWithSchema(const RegionPtr & region, // + const DecodingStorageSchemaSnapshotConstPtr & schema_snap, + Timestamp gc_safepoint, + bool force_decode, + TMTContext & /* */) +{ + // In 5.0.1, feature `compaction filter` is enabled by default. Under such feature tikv will do gc in write & default cf individually. + // If some rows were updated and add tiflash replica, tiflash store may receive region snapshot with unmatched data in write & default cf sst files. + fiu_do_on(FailPoints::force_set_safepoint_when_decode_block, + { gc_safepoint = 10000000; }); // Mock a GC safepoint for testing compaction filter + region->tryCompactionFilter(gc_safepoint); + + std::optional data_list_read = ReadRegionCommitCache(region, true); + + Block res_block; + // No committed data, just return + if (!data_list_read) + return res_block; + + { + Stopwatch watch; + { + // Compare schema_snap with current schema, throw exception if changed. + auto reader = RegionBlockReader(schema_snap); + res_block = createBlockSortByColumnID(schema_snap); + if (unlikely(!reader.read(res_block, *data_list_read, force_decode))) + throw Exception("RegionBlockReader decode error", ErrorCodes::REGION_DATA_SCHEMA_UPDATED); + } + + GET_METRIC(tiflash_raft_write_data_to_storage_duration_seconds, type_decode).Observe(watch.elapsedSeconds()); + } + + res_block = sortColumnsBySchemaSnap(std::move(res_block), *(schema_snap->column_defines)); + + // Remove committed data + RemoveRegionCommitCache(region, *data_list_read); + + return res_block; +} + +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } // namespace DB diff --git a/dbms/src/Storages/Transaction/PartitionStreams.h b/dbms/src/Storages/Transaction/PartitionStreams.h new file mode 100644 index 00000000000..1c2c1844f9b --- /dev/null +++ b/dbms/src/Storages/Transaction/PartitionStreams.h @@ -0,0 +1,42 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace DB +{ +class Region; +using RegionPtr = std::shared_ptr; +class StorageDeltaMerge; +class TMTContext; + +std::optional ReadRegionCommitCache(const RegionPtr & region, bool lock_region); + +std::tuple, DecodingStorageSchemaSnapshotConstPtr> // +AtomicGetStorageSchema(const RegionPtr & region, TMTContext & tmt); + +Block GenRegionBlockDataWithSchema(const RegionPtr & region, // + const DecodingStorageSchemaSnapshotConstPtr & schema_snap, + Timestamp gc_safepoint, + bool force_decode, + TMTContext & tmt); + +} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.cpp b/dbms/src/Storages/Transaction/RegionBlockReader.cpp index fc407fc7dc3..744d0e7bb15 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.cpp +++ b/dbms/src/Storages/Transaction/RegionBlockReader.cpp @@ -1,14 +1,21 @@ #include +<<<<<<< HEAD #include +======= +#include +#include +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) #include #include #include #include #include +#include #include #include #include #include +#include #include @@ -161,6 +168,7 @@ void setPKVersionDel(ColumnUInt8 & delmark_col, else column_map.getMutableColumnPtr(pk_column_ids[0])->insert(Field(static_cast(pk))); } +<<<<<<< HEAD } template @@ -185,6 +193,9 @@ bool setColumnValues(ColumnUInt8 & delmark_col, DecodedRecordData decoded_data(visible_column_to_read_lut.size()); std::unique_ptr tmp_row; // decode row into Field list here for temporary use if necessary. +======= + +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) size_t index = 0; for (const auto & [pk, write_type, commit_ts, value_ptr] : data_list) { @@ -230,6 +241,7 @@ bool setColumnValues(ColumnUInt8 & delmark_col, } else { +<<<<<<< HEAD const TiKVValue & value = *value_ptr; const DecodedRow * row = nullptr; { @@ -262,6 +274,37 @@ bool setColumnValues(ColumnUInt8 & delmark_col, const auto & column_info = table_info.columns[id_to_idx.second]; if (auto it = findByColumnID(id_to_idx.first, unknown_fields); it != unknown_fields.end()) +======= + // Parse column value from encoded value + if (!appendRowToBlock(*value_ptr, column_ids_iter, read_column_ids.end(), block, next_column_pos, schema_snapshot, force_decode)) + return false; + } + } + + /// set extra handle column and pk columns from encoded key if need + if constexpr (pk_type != TMTPKType::STRING) + { + // For non-common handle, extra handle column's type is always Int64. + // We need to copy the handle value from encoded key. + const auto handle_value = static_cast(pk); + auto * raw_extra_column = const_cast((block.getByPosition(extra_handle_column_pos)).column.get()); + static_cast(raw_extra_column)->getData().push_back(handle_value); + // For pk_is_handle == true, we need to decode the handle value from encoded key, and insert + // to the specify column + if (!pk_column_ids.empty()) + { + auto * raw_pk_column = const_cast((block.getByPosition(pk_pos_map.at(pk_column_ids[0]))).column.get()); + if constexpr (pk_type == TMTPKType::INT64) + static_cast(raw_pk_column)->getData().push_back(handle_value); + else if constexpr (pk_type == TMTPKType::UINT64) + static_cast(raw_pk_column)->getData().push_back(UInt64(handle_value)); + else + { + // The pk_type must be Int32/UInt32 or more narrow type + // so cannot tell its' exact type here, just use `insert(Field)` + raw_pk_column->insert(Field(handle_value)); + if (unlikely(raw_pk_column->getInt(index) != handle_value)) +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) { if (!row->unknown_fields.with_codec_flag) { @@ -270,7 +313,12 @@ bool setColumnValues(ColumnUInt8 & delmark_col, } else { +<<<<<<< HEAD decoded_data.push_back(it); +======= + throw Exception(fmt::format("Detected overflow value when decoding pk column, type={} handle={}", raw_pk_column->getName(), handle_value), + ErrorCodes::LOGICAL_ERROR); +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } continue; } diff --git a/dbms/src/Storages/Transaction/RegionBlockReader.h b/dbms/src/Storages/Transaction/RegionBlockReader.h index 19c03e52d80..3dc2dc2121b 100644 --- a/dbms/src/Storages/Transaction/RegionBlockReader.h +++ b/dbms/src/Storages/Transaction/RegionBlockReader.h @@ -1,5 +1,6 @@ #pragma once +<<<<<<< HEAD #include #include #include @@ -16,6 +17,13 @@ class IManageableStorage; using ManageableStoragePtr = std::shared_ptr; struct ColumnsDescription; +======= +#include +#include + +namespace DB +{ +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) class Block; class RegionScanFilter @@ -110,13 +118,23 @@ class RegionBlockReader : private boost::noncopyable /// Read `data_list` as a block. /// - /// On decode error, i.e. column number/type mismatch, will do force apply schema, + /// On decode error, i.e. column number/type mismatch, caller should trigger a schema-sync and retry with `force_decode=True`, /// i.e. add/remove/cast unknown/missing/type-mismatch column if force_decode is true, otherwise return empty block and false. /// Moreover, exception will be thrown if we see fatal decode error meanwhile `force_decode` is true. /// +<<<<<<< HEAD /// `RegionBlockReader::read` is the common routine used by both 'flush' and 'read' processes of TXN engine (Delta-Tree, TXN-MergeTree), /// each of which will use carefully adjusted 'start_ts' and 'force_decode' with appropriate error handling/retry to get what they want. std::tuple read(const Names & column_names_to_read, RegionDataReadInfoList & data_list, bool force_decode); +======= + /// `RegionBlockReader::read` is the common routine used by both 'flush' and 'read' processes of Delta-Tree engine, + /// which will use carefully adjusted 'force_decode' with appropriate error handling/retry to get what they want. + bool read(Block & block, const RegionDataReadInfoList & data_list, bool force_decode); + +private: + template + bool readImpl(Block & block, const RegionDataReadInfoList & data_list, bool force_decode); +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) /// Read all columns from `data_list` as a block. inline std::tuple read(RegionDataReadInfoList & data_list, bool force_decode) diff --git a/dbms/src/Storages/Transaction/RowCodec.cpp b/dbms/src/Storages/Transaction/RowCodec.cpp index dceb539e3e7..43638a23bdd 100644 --- a/dbms/src/Storages/Transaction/RowCodec.cpp +++ b/dbms/src/Storages/Transaction/RowCodec.cpp @@ -518,4 +518,350 @@ void encodeRowV2(const TiDB::TableInfo & table_info, const std::vector & RowEncoderV2(table_info, fields).encode(ss); } +<<<<<<< HEAD +======= +// pre-declar block +template +bool appendRowV2ToBlockImpl( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const ColumnInfos & column_infos, + ColumnID pk_handle_id, + bool ignore_pk_if_absent, + bool force_decode); + +bool appendRowV1ToBlock( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const ColumnInfos & column_infos, + ColumnID pk_handle_id, + bool ignore_pk_if_absent, + bool force_decode); +// pre-declar block end + +bool appendRowToBlock( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const DecodingStorageSchemaSnapshotConstPtr & schema_snapshot, + bool force_decode) +{ + const ColumnInfos & column_infos = schema_snapshot->column_infos; + // when pk is handle, we need skip pk column when decoding value + ColumnID pk_handle_id = InvalidColumnID; + if (schema_snapshot->pk_is_handle) + { + pk_handle_id = schema_snapshot->pk_column_ids[0]; + } + + // For pk_is_handle table, the column with primary key flag is decoded from encoded key instead of encoded value. + // For common handle table, the column with primary key flag is (usually) decoded from encoded key. We skip + // filling the columns with primary key flags inside this method. + // For other table (non-clustered, use hidden _tidb_rowid as handle), the column with primary key flags could be + // changed, we need to fill missing column with default value. + const bool ignore_pk_if_absent = schema_snapshot->is_common_handle || schema_snapshot->pk_is_handle; + + switch (static_cast(raw_value[0])) + { + case static_cast(RowCodecVer::ROW_V2): + { + auto row_flag = readLittleEndian(&raw_value[1]); + bool is_big = row_flag & RowV2::BigRowMask; + return is_big ? appendRowV2ToBlockImpl(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, ignore_pk_if_absent, force_decode) + : appendRowV2ToBlockImpl(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, ignore_pk_if_absent, force_decode); + } + default: + return appendRowV1ToBlock(raw_value, column_ids_iter, column_ids_iter_end, block, block_column_pos, column_infos, pk_handle_id, ignore_pk_if_absent, force_decode); + } +} + +inline bool addDefaultValueToColumnIfPossible(const ColumnInfo & column_info, Block & block, size_t block_column_pos, bool ignore_pk_if_absent, bool force_decode) +{ + // We consider a missing column could be safely filled with NULL, unless it has not default value and is NOT NULL. + // This could saves lots of unnecessary schema syncs for old data with a newer schema that has newly added columns. + + if (column_info.hasPriKeyFlag()) + { + // For clustered index or pk_is_handle, if the pk column does not exists, it can still be decoded from the key + if (ignore_pk_if_absent) + return true; + + assert(!ignore_pk_if_absent); + if (!force_decode) + return false; + // Else non-clustered index, and not pk_is_handle, it could be a row encoded by older schema, + // we need to fill the column wich has primary key flag with default value. + // fallthrough to fill default value when force_decode + } + + if (column_info.hasNoDefaultValueFlag() && column_info.hasNotNullFlag()) + { + if (!force_decode) + return false; + // Else the row does not contain this "not null" / "no default value" column, + // it could be a row encoded by older schema. + // fallthrough to fill default value when force_decode + } + // not null or has no default value, tidb will fill with specific value. + auto * raw_column = const_cast((block.getByPosition(block_column_pos)).column.get()); + raw_column->insert(column_info.defaultValueToField()); + return true; +} + +template +bool appendRowV2ToBlockImpl( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const ColumnInfos & column_infos, + ColumnID pk_handle_id, + bool ignore_pk_if_absent, + bool force_decode) +{ + size_t cursor = 2; // Skip the initial codec ver and row flag. + size_t num_not_null_columns = decodeUInt(cursor, raw_value); + size_t num_null_columns = decodeUInt(cursor, raw_value); + std::vector not_null_column_ids; + std::vector null_column_ids; + std::vector value_offsets; + decodeUInts::ColumnIDType>(cursor, raw_value, num_not_null_columns, not_null_column_ids); + decodeUInts::ColumnIDType>(cursor, raw_value, num_null_columns, null_column_ids); + decodeUInts::ValueOffsetType>(cursor, raw_value, num_not_null_columns, value_offsets); + size_t values_start_pos = cursor; + size_t idx_not_null = 0; + size_t idx_null = 0; + // Merge ordered not null/null columns to keep order. + while (idx_not_null < not_null_column_ids.size() || idx_null < null_column_ids.size()) + { + if (column_ids_iter == column_ids_iter_end) + { + // extra column + return force_decode; + } + + bool is_null; + if (idx_not_null < not_null_column_ids.size() && idx_null < null_column_ids.size()) + is_null = not_null_column_ids[idx_not_null] > null_column_ids[idx_null]; + else + is_null = idx_null < null_column_ids.size(); + + auto next_datum_column_id = is_null ? null_column_ids[idx_null] : not_null_column_ids[idx_not_null]; + const auto next_column_id = column_ids_iter->first; + if (next_column_id > next_datum_column_id) + { + // The next column id to read is bigger than the column id of next datum in encoded row. + // It means this is the datum of extra column. May happen when reading after dropping + // a column. + if (!force_decode) + return false; + // Ignore the extra column and continue to parse other datum + if (is_null) + idx_null++; + else + idx_not_null++; + } + else if (next_column_id < next_datum_column_id) + { + // The next column id to read is less than the column id of next datum in encoded row. + // It means this is the datum of missing column. May happen when reading after adding + // a column. + // Fill with default value and continue to read data for next column id. + const auto & column_info = column_infos[column_ids_iter->second]; + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) + return false; + column_ids_iter++; + block_column_pos++; + } + else + { + // If pk_handle_id is a valid column id, then it means the table's pk_is_handle is true + // we can just ignore the pk value encoded in value part + if (unlikely(next_column_id == pk_handle_id)) + { + column_ids_iter++; + block_column_pos++; + if (is_null) + { + idx_null++; + } + else + { + idx_not_null++; + } + continue; + } + + // Parse the datum. + auto * raw_column = const_cast((block.getByPosition(block_column_pos)).column.get()); + const auto & column_info = column_infos[column_ids_iter->second]; + if (is_null) + { + if (!raw_column->isColumnNullable()) + { + if (!force_decode) + { + return false; + } + else + { + throw Exception("Detected invalid null when decoding data of column " + column_info.name + " with column type " + raw_column->getName(), + ErrorCodes::LOGICAL_ERROR); + } + } + // ColumnNullable::insertDefault just insert a null value + raw_column->insertDefault(); + idx_null++; + } + else + { + size_t start = idx_not_null ? value_offsets[idx_not_null - 1] : 0; + size_t length = value_offsets[idx_not_null] - start; + if (!raw_column->decodeTiDBRowV2Datum(values_start_pos + start, raw_value, length, force_decode)) + return false; + idx_not_null++; + } + column_ids_iter++; + block_column_pos++; + } + } + while (column_ids_iter != column_ids_iter_end) + { + if (column_ids_iter->first != pk_handle_id) + { + const auto & column_info = column_infos[column_ids_iter->second]; + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) + return false; + } + column_ids_iter++; + block_column_pos++; + } + return true; +} + +using TiDB::DatumFlat; +bool appendRowV1ToBlock( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const ColumnInfos & column_infos, + ColumnID pk_handle_id, + bool ignore_pk_if_absent, + bool force_decode) +{ + size_t cursor = 0; + std::map decoded_fields; + while (cursor < raw_value.size()) + { + Field f = DecodeDatum(cursor, raw_value); + if (f.isNull()) + break; + ColumnID col_id = f.get(); + decoded_fields.emplace(col_id, DecodeDatum(cursor, raw_value)); + } + if (cursor != raw_value.size()) + throw Exception(std::string(__PRETTY_FUNCTION__) + ": cursor is not end, remaining: " + raw_value.substr(cursor), + ErrorCodes::LOGICAL_ERROR); + + auto decoded_field_iter = decoded_fields.begin(); + while (decoded_field_iter != decoded_fields.end()) + { + if (column_ids_iter == column_ids_iter_end) + { + // extra column + return force_decode; + } + + auto next_field_column_id = decoded_field_iter->first; + if (column_ids_iter->first > next_field_column_id) + { + // extra column + if (!force_decode) + return false; + decoded_field_iter++; + } + else if (column_ids_iter->first < next_field_column_id) + { + const auto & column_info = column_infos[column_ids_iter->second]; + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) + return false; + column_ids_iter++; + block_column_pos++; + } + else + { + // if pk_handle_id is a valid column id, then it means the table's pk_is_handle is true + // we can just ignore the pk value encoded in value part + if (unlikely(column_ids_iter->first == pk_handle_id)) + { + decoded_field_iter++; + column_ids_iter++; + block_column_pos++; + continue; + } + + auto * raw_column = const_cast((block.getByPosition(block_column_pos)).column.get()); + const auto & column_info = column_infos[column_ids_iter->second]; + DatumFlat datum(decoded_field_iter->second, column_info.tp); + const Field & unflattened = datum.field(); + if (datum.overflow(column_info)) + { + // Overflow detected, fatal if force_decode is true, + // as schema being newer and narrow shouldn't happen. + // Otherwise return false to outer, outer should sync schema and try again. + if (force_decode) + { + throw Exception("Detected overflow when decoding data " + std::to_string(unflattened.get()) + " of column " + + column_info.name + " with column " + raw_column->getName(), + ErrorCodes::LOGICAL_ERROR); + } + + return false; + } + if (datum.invalidNull(column_info)) + { + // Null value with non-null type detected, fatal if force_decode is true, + // as schema being newer and with invalid null shouldn't happen. + // Otherwise return false to outer, outer should sync schema and try again. + if (force_decode) + { + throw Exception("Detected invalid null when decoding data " + std::to_string(unflattened.get()) + + " of column " + column_info.name + " with type " + raw_column->getName(), + ErrorCodes::LOGICAL_ERROR); + } + + return false; + } + raw_column->insert(unflattened); + decoded_field_iter++; + column_ids_iter++; + block_column_pos++; + } + } + while (column_ids_iter != column_ids_iter_end) + { + if (column_ids_iter->first != pk_handle_id) + { + const auto & column_info = column_infos[column_ids_iter->second]; + if (!addDefaultValueToColumnIfPossible(column_info, block, block_column_pos, ignore_pk_if_absent, force_decode)) + return false; + } + column_ids_iter++; + block_column_pos++; + } + return true; +} + +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } // namespace DB diff --git a/dbms/src/Storages/Transaction/RowCodec.h b/dbms/src/Storages/Transaction/RowCodec.h index 13df72dcd2b..e2f0398c060 100644 --- a/dbms/src/Storages/Transaction/RowCodec.h +++ b/dbms/src/Storages/Transaction/RowCodec.h @@ -8,6 +8,7 @@ namespace DB { +<<<<<<< HEAD using TiDB::ColumnInfo; using TiDB::TableInfo; @@ -31,5 +32,20 @@ Field decodeUnknownColumnV2(const Field & unknown, const ColumnInfo & column_inf /// The following two encode functions are used for testing. void encodeRowV1(const TiDB::TableInfo & table_info, const std::vector & fields, std::stringstream & ss); void encodeRowV2(const TiDB::TableInfo & table_info, const std::vector & fields, std::stringstream & ss); +======= +/// The following two encode functions are used for testing. +void encodeRowV1(const TiDB::TableInfo & table_info, const std::vector & fields, WriteBuffer & ss); +void encodeRowV2(const TiDB::TableInfo & table_info, const std::vector & fields, WriteBuffer & ss); + +bool appendRowToBlock( + const TiKVValue::Base & raw_value, + SortedColumnIDWithPosConstIter column_ids_iter, + SortedColumnIDWithPosConstIter column_ids_iter_end, + Block & block, + size_t block_column_pos, + const DecodingStorageSchemaSnapshotConstPtr & schema_snapshot, + bool force_decode); + +>>>>>>> 8e411ae86b (Fix decode error when "NULL" value in the column with "primary key" flag (#5879)) } // namespace DB diff --git a/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h b/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h new file mode 100644 index 00000000000..8d7034de6b9 --- /dev/null +++ b/dbms/src/Storages/Transaction/tests/RowCodecTestUtils.h @@ -0,0 +1,359 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include +#include +#include +#include + +namespace DB::tests +{ +using DM::ColumnDefine; +using DM::ColumnDefines; +using TiDB::ColumnInfo; +using TiDB::TableInfo; +using ColumnIDs = std::vector; + +template +struct ColumnTP +{ +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeTiny; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeShort; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeLong; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeLongLong; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeTiny; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeShort; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeLong; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeLongLong; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeFloat; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeDouble; +}; +template <> +struct ColumnTP +{ + static const auto tp = TiDB::TypeString; +}; +template <> +struct ColumnTP> +{ + static const auto tp = TiDB::TypeNewDecimal; +}; +template <> +struct ColumnTP> +{ + static const auto tp = TiDB::TypeNewDecimal; +}; +template <> +struct ColumnTP> +{ + static const auto tp = TiDB::TypeNewDecimal; +}; +template <> +struct ColumnTP> +{ + static const auto tp = TiDB::TypeNewDecimal; +}; + +inline String getTestColumnName(ColumnID id) +{ + return "column" + std::to_string(id); +} + +template +ColumnInfo getColumnInfo(ColumnID id) +{ + ColumnInfo column_info; + column_info.id = id; + column_info.tp = ColumnTP::tp; + column_info.name = getTestColumnName(id); + if constexpr (std::is_unsigned_v) + column_info.setUnsignedFlag(); + if constexpr (!nullable) + column_info.setNotNullFlag(); + return column_info; +} + +template +struct ColumnIDValue +{ + static constexpr bool value_is_null = is_null; + using ValueType = std::decay_t; + ColumnIDValue(ColumnID id_, const T & value_) + : id(id_) + , value(value_) + {} + ColumnIDValue(ColumnID id_, T && value_) + : id(id_) + , value(std::move(value_)) + {} + ColumnID id; + ValueType value; +}; + +template +struct ColumnIDValue +{ + static constexpr bool value_is_null = true; + using ValueType = std::decay_t; + explicit ColumnIDValue(ColumnID id_) + : id(id_) + {} + ColumnID id; +}; + +template +using ColumnIDValueNull = ColumnIDValue; + +using OrderedColumnInfoFields = std::map>; + +template +constexpr bool IsDecimalFieldType = false; +template <> +inline constexpr bool IsDecimalFieldType> = true; +template <> +inline constexpr bool IsDecimalFieldType> = true; +template <> +inline constexpr bool IsDecimalFieldType> = true; +template <> +inline constexpr bool IsDecimalFieldType> = true; + +template +void getTableInfoFieldsInternal(OrderedColumnInfoFields & column_info_fields, Type && column_id_value) +{ + using DecayType = std::decay_t; + using ValueType = typename DecayType::ValueType; + using NearestType = typename NearestFieldType::Type; + if constexpr (DecayType::value_is_null) + { + ColumnInfo column_info = getColumnInfo(column_id_value.id); + // create non zero flen and decimal to avoid error when creating decimal type + if constexpr (IsDecimalFieldType) + { + column_info.flen = 1; + column_info.decimal = 1; + } + column_info_fields.emplace(column_id_value.id, std::make_tuple(column_info, Field())); + } + else + { + if constexpr (IsDecimalFieldType) + { + ColumnInfo column_info = getColumnInfo(column_id_value.id); + auto field = static_cast(std::move(column_id_value.value)); + column_info.flen = field.getPrec(); + column_info.decimal = field.getScale(); + column_info_fields.emplace(column_id_value.id, std::make_tuple(column_info, field)); + } + else + { + column_info_fields.emplace(column_id_value.id, + std::make_tuple(getColumnInfo(column_id_value.id), + static_cast(std::move(column_id_value.value)))); + } + } +} + +template +void getTableInfoFieldsInternal(OrderedColumnInfoFields & column_info_fields, Type && first, Rest &&... rest) +{ + getTableInfoFieldsInternal(column_info_fields, first); + getTableInfoFieldsInternal(column_info_fields, std::forward(rest)...); +} + +template +std::pair> getTableInfoAndFields(ColumnIDs pk_col_ids, bool is_common_handle, Types &&... column_value_ids) +{ + OrderedColumnInfoFields column_info_fields; + getTableInfoFieldsInternal(column_info_fields, std::forward(column_value_ids)...); + TableInfo table_info; + std::vector fields; + bool pk_is_handle = pk_col_ids.size() == 1 && pk_col_ids[0] != ::DB::TiDBPkColumnID; + + for (auto & column_info_field : column_info_fields) + { + auto & column = std::get<0>(column_info_field.second); + auto & field = std::get<1>(column_info_field.second); + if (std::find(pk_col_ids.begin(), pk_col_ids.end(), column.id) != pk_col_ids.end()) + { + column.setPriKeyFlag(); + if (column.tp != TiDB::TypeLong && column.tp != TiDB::TypeTiny && column.tp != TiDB::TypeLongLong && column.tp != TiDB::TypeShort && column.tp != TiDB::TypeInt24) + { + pk_is_handle = false; + } + } + table_info.columns.emplace_back(std::move(column)); + fields.emplace_back(std::move(field)); + } + + table_info.pk_is_handle = pk_is_handle; + table_info.is_common_handle = is_common_handle; + if (is_common_handle) + { + table_info.is_common_handle = true; + // TiFlash maintains the column name of primary key + // for common handle table + TiDB::IndexInfo pk_index_info; + pk_index_info.is_primary = true; + pk_index_info.idx_name = "PRIMARY"; + pk_index_info.is_unique = true; + for (auto pk_col_id : pk_col_ids) + { + TiDB::IndexColumnInfo index_column_info; + for (auto & column : table_info.columns) + { + if (column.id == pk_col_id) + { + index_column_info.name = column.name; + break; + } + } + pk_index_info.idx_cols.emplace_back(index_column_info); + } + table_info.index_infos.emplace_back(pk_index_info); + } + + return std::make_pair(std::move(table_info), std::move(fields)); +} + +inline DecodingStorageSchemaSnapshotConstPtr getDecodingStorageSchemaSnapshot(const TableInfo & table_info) +{ + ColumnDefines store_columns; + if (table_info.is_common_handle) + { + DM::ColumnDefine extra_handle_column{EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_STRING_TYPE}; + store_columns.emplace_back(extra_handle_column); + } + else + { + DM::ColumnDefine extra_handle_column{EXTRA_HANDLE_COLUMN_ID, EXTRA_HANDLE_COLUMN_NAME, EXTRA_HANDLE_COLUMN_INT_TYPE}; + store_columns.emplace_back(extra_handle_column); + } + store_columns.emplace_back(VERSION_COLUMN_ID, VERSION_COLUMN_NAME, VERSION_COLUMN_TYPE); + store_columns.emplace_back(TAG_COLUMN_ID, TAG_COLUMN_NAME, TAG_COLUMN_TYPE); + ColumnID handle_id = EXTRA_HANDLE_COLUMN_ID; + for (const auto & column_info : table_info.columns) + { + if (table_info.pk_is_handle) + { + if (column_info.hasPriKeyFlag()) + handle_id = column_info.id; + } + store_columns.emplace_back(column_info.id, column_info.name, DB::getDataTypeByColumnInfo(column_info)); + } + + if (handle_id != EXTRA_HANDLE_COLUMN_ID) + { + auto iter = std::find_if(store_columns.begin(), store_columns.end(), [&](const ColumnDefine & cd) { return cd.id == handle_id; }); + return std::make_shared(std::make_shared(store_columns), table_info, *iter, /* decoding_schema_version_ */ 1); + } + else + { + return std::make_shared(std::make_shared(store_columns), table_info, store_columns[0], /* decoding_schema_version_ */ 1); + } +} + +template +size_t valueStartPos(const TableInfo & table_info) +{ + return 1 + 1 + 2 + 2 + (is_big ? 8 : 3) * table_info.columns.size(); +} + +inline Block decodeRowToBlock(const String & row_value, DecodingStorageSchemaSnapshotConstPtr decoding_schema) +{ + const auto & sorted_column_id_with_pos = decoding_schema->sorted_column_id_with_pos; + auto iter = sorted_column_id_with_pos.begin(); + const size_t value_column_num = 3; + // skip first three column which is EXTRA_HANDLE_COLUMN, VERSION_COLUMN, TAG_COLUMN + for (size_t i = 0; i < value_column_num; i++) + iter++; + + Block block = createBlockSortByColumnID(decoding_schema); + appendRowToBlock(row_value, iter, sorted_column_id_with_pos.end(), block, value_column_num, decoding_schema, true); + + // remove first three column + for (size_t i = 0; i < value_column_num; i++) + block.erase(0); + return block; +} + +template +std::tuple getValueLengthByRowV2(const T & v) +{ + using NearestType = typename NearestFieldType::Type; + auto [table_info, fields] = getTableInfoAndFields({EXTRA_HANDLE_COLUMN_ID}, false, ColumnIDValue(1, v)); + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + WriteBufferFromOwnString ss; + encodeRowV2(table_info, fields, ss); + auto encoded = ss.str(); + Block block = decodeRowToBlock(encoded, decoding_schema); + return std::make_tuple(static_cast(std::move((*block.getByPosition(0).column)[0].template safeGet())), + encoded.size() - valueStartPos(table_info)); +} + +template +T getValueByRowV1(const T & v) +{ + using NearestType = typename NearestFieldType::Type; + auto [table_info, fields] = getTableInfoAndFields({EXTRA_HANDLE_COLUMN_ID}, false, ColumnIDValue(1, v)); + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + WriteBufferFromOwnString ss; + encodeRowV1(table_info, fields, ss); + auto encoded = ss.str(); + Block block = decodeRowToBlock(encoded, decoding_schema); + return static_cast(std::move((*block.getByPosition(0).column)[0].template safeGet())); +} + +} // namespace DB::tests diff --git a/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp new file mode 100644 index 00000000000..7d90ac520f3 --- /dev/null +++ b/dbms/src/Storages/Transaction/tests/gtest_kvstore.cpp @@ -0,0 +1,1500 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace RegionBench +{ +extern void setupPutRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &, const TiKVValue &); +extern void setupDelRequest(raft_cmdpb::Request *, const std::string &, const TiKVKey &); +} // namespace RegionBench + +extern void RemoveRegionCommitCache(const RegionPtr & region, const RegionDataReadInfoList & data_list_read, bool lock_region = true); +extern void CheckRegionForMergeCmd(const raft_cmdpb::AdminResponse & response, const RegionState & region_state); +extern void ChangeRegionStateRange(RegionState & region_state, bool source_at_left, const RegionState & source_region_state); + +namespace tests +{ + +// TODO: Use another way to workaround calling the private methods on KVStore +class RegionKVStoreTest : public ::testing::Test +{ +public: + RegionKVStoreTest() + { + test_path = TiFlashTestEnv::getTemporaryPath("/region_kvs_test"); + } + + static void SetUpTestCase() {} + + void SetUp() override + { + // clean data and create path pool instance + path_pool = createCleanPathPool(test_path); + + reloadKVSFromDisk(); + + proxy_instance = std::make_unique(); + proxy_helper = std::make_unique(MockRaftStoreProxy::SetRaftStoreProxyFFIHelper( + RaftStoreProxyPtr{proxy_instance.get()})); + proxy_instance->init(100); + + kvstore->restore(*path_pool, proxy_helper.get()); + } + + void TearDown() override {} + +protected: + KVStore & getKVS() { return *kvstore; } + KVStore & reloadKVSFromDisk() + { + kvstore.reset(); + auto & global_ctx = TiFlashTestEnv::getGlobalContext(); + kvstore = std::make_unique(global_ctx, TiDB::SnapshotApplyMethod::DTFile_Directory); + // only recreate kvstore and restore data from disk, don't recreate proxy instance + kvstore->restore(*path_pool, proxy_helper.get()); + return *kvstore; + } + +protected: + static void testRaftSplit(KVStore & kvs, TMTContext & tmt); + static void testRaftMerge(KVStore & kvs, TMTContext & tmt); + static void testRaftChangePeer(KVStore & kvs, TMTContext & tmt); + static void testRaftMergeRollback(KVStore & kvs, TMTContext & tmt); + + static std::unique_ptr createCleanPathPool(const String & path) + { + // Drop files on disk + Poco::File file(path); + if (file.exists()) + file.remove(true); + file.createDirectories(); + + auto & global_ctx = TiFlashTestEnv::getGlobalContext(); + auto path_capacity = global_ctx.getPathCapacity(); + auto provider = global_ctx.getFileProvider(); + // Create a PathPool instance on the clean directory + Strings main_data_paths{path}; + return std::make_unique(main_data_paths, main_data_paths, Strings{}, path_capacity, provider); + } + + std::string test_path; + + std::unique_ptr path_pool; + std::unique_ptr kvstore; + + std::unique_ptr proxy_instance; + std::unique_ptr proxy_helper; +}; + +TEST_F(RegionKVStoreTest, NewProxy) +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + + KVStore & kvs = getKVS(); + { + auto store = metapb::Store{}; + store.set_id(1234); + kvs.setStore(store); + ASSERT_EQ(kvs.getStoreID(), store.id()); + } + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + } + { + kvs.tryPersist(1); + kvs.gcRegionPersistedCache(Seconds{0}); + } + { + // test CompactLog + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + auto region = kvs.getRegion(1); + region->markCompactLog(); + kvs.setRegionCompactLogConfig(100000, 1000, 1000); + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + // CompactLog always returns true now, even if we can't do a flush. + // We use a tryFlushData to pre-filter. + ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response), 1, 5, 1, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + // Filter + ASSERT_EQ(kvs.tryFlushRegionData(1, false, ctx.getTMTContext(), 0, 0), false); + } +} + +TEST_F(RegionKVStoreTest, ReadIndex) +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + + // start mock proxy in other thread + std::atomic_bool over{false}; + auto proxy_runner = std::thread([&]() { + proxy_instance->testRunNormal(over); + }); + KVStore & kvs = getKVS(); + ASSERT_EQ(kvs.getProxyHelper(), proxy_helper.get()); + + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10), kvs.getProxyHelper()); + lock.regions.emplace(1, region); + lock.index.add(region); + } + { + auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20), kvs.getProxyHelper()); + lock.regions.emplace(2, region); + lock.index.add(region); + } + { + auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40), kvs.getProxyHelper()); + lock.regions.emplace(3, region); + lock.index.add(region); + } + } + { + ASSERT_EQ(kvs.read_index_worker_manager, nullptr); + { + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 8); + try + { + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "`fn_handle_batch_read_index` is deprecated"); + } + } + kvs.initReadIndexWorkers( + []() { + return std::chrono::milliseconds(10); + }, + 1); + ASSERT_NE(kvs.read_index_worker_manager, nullptr); + + { + kvs.asyncRunReadIndexWorkers(); + SCOPE_EXIT({ + kvs.stopReadIndexWorkers(); + }); + + auto tar_region_id = 9; + { + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + + auto region = makeRegion(tar_region_id, RecordKVFormat::genKey(2, 0), RecordKVFormat::genKey(2, 10)); + lock.regions.emplace(region->id(), region); + lock.index.add(region); + } + { + ASSERT_EQ(proxy_instance->regions.at(tar_region_id)->getLatestCommitIndex(), 5); + proxy_instance->regions.at(tar_region_id)->updateCommitIndex(66); + } + + AsyncWaker::Notifier notifier; + const std::atomic_size_t terminate_signals_counter{}; + std::thread t([&]() { + notifier.wake(); + WaitCheckRegionReady(ctx.getTMTContext(), kvs, terminate_signals_counter, 1 / 1000.0, 20, 20 * 60); + }); + SCOPE_EXIT({ + t.join(); + kvs.handleDestroy(tar_region_id, ctx.getTMTContext()); + }); + ASSERT_EQ(notifier.blockedWaitFor(std::chrono::milliseconds(1000 * 3600)), AsyncNotifier::Status::Normal); + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + auto tar = kvs.getRegion(tar_region_id); + ASSERT_EQ( + tar->handleWriteRaftCmd({}, 66, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + } + { + kvs.asyncRunReadIndexWorkers(); + SCOPE_EXIT({ + kvs.stopReadIndexWorkers(); + }); + + auto tar_region_id = 9; + { + ASSERT_EQ(proxy_instance->regions.at(tar_region_id)->getLatestCommitIndex(), 66); + proxy_instance->unsafeInvokeForTest([&](MockRaftStoreProxy & p) { + p.region_id_to_error.emplace(tar_region_id); + p.regions.at(2)->updateCommitIndex(6); + }); + } + + AsyncWaker::Notifier notifier; + const std::atomic_size_t terminate_signals_counter{}; + std::thread t([&]() { + notifier.wake(); + WaitCheckRegionReady(ctx.getTMTContext(), kvs, terminate_signals_counter, 1 / 1000.0, 2 / 1000.0, 5 / 1000.0); + }); + SCOPE_EXIT({ + t.join(); + }); + ASSERT_EQ(notifier.blockedWaitFor(std::chrono::milliseconds(1000 * 3600)), AsyncNotifier::Status::Normal); + } + + kvs.asyncRunReadIndexWorkers(); + SCOPE_EXIT({ + kvs.stopReadIndexWorkers(); + }); + + { + // test read index + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 8); + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_EQ(resp[0].first.read_index(), 5); + { + auto r = region->waitIndex(5, 0, []() { return true; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Finished); + } + { + auto r = region->waitIndex(8, 1, []() { return false; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Terminated); + } + } + for (auto & r : proxy_instance->regions) + { + r.second->updateCommitIndex(667); + } + { + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 8); + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_EQ(resp[0].first.read_index(), 5); // history + } + { + auto region = kvs.getRegion(1); + auto req = GenRegionReadIndexReq(*region, 10); + auto resp = kvs.batchReadIndex({req}, 100); + ASSERT_EQ(resp[0].first.read_index(), 667); + } + { + auto region = kvs.getRegion(2); + auto req = GenRegionReadIndexReq(*region, 5); + auto resp = proxy_helper->batchReadIndex({req}, 100); // v2 + ASSERT_EQ(resp[0].first.read_index(), 667); // got latest + { + auto r = region->waitIndex(667 + 1, 2, []() { return true; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Timeout); + } + { + AsyncWaker::Notifier notifier; + std::thread t([&]() { + notifier.wake(); + auto r = region->waitIndex(667 + 1, 100000, []() { return true; }); + ASSERT_EQ(std::get<0>(r), WaitIndexResult::Finished); + }); + SCOPE_EXIT({ + t.join(); + }); + ASSERT_EQ(notifier.blockedWaitFor(std::chrono::milliseconds(1000 * 3600)), AsyncNotifier::Status::Normal); + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + region->handleWriteRaftCmd({}, 667 + 1, 6, ctx.getTMTContext()); + } + } + } + kvs.stopReadIndexWorkers(); + kvs.releaseReadIndexWorkers(); + over = true; + proxy_instance->wake(); + proxy_runner.join(); + ASSERT(GCMonitor::instance().checkClean()); + ASSERT(!GCMonitor::instance().empty()); +} + +void RegionKVStoreTest::testRaftMergeRollback(KVStore & kvs, TMTContext & tmt) +{ + uint64_t region_id = 7; + { + auto source_region = kvs.getRegion(region_id); + auto target_region = kvs.getRegion(1); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::PrepareMerge); + auto * prepare_merge = request.mutable_prepare_merge(); + { + auto min_index = source_region->appliedIndex(); + prepare_merge->set_min_index(min_index); + + metapb::Region * target = prepare_merge->mutable_target(); + *target = target_region->getMetaRegion(); + } + } + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + region_id, + 31, + 6, + tmt); + ASSERT_TRUE(source_region->isMerging()); + } + { + auto region = kvs.getRegion(region_id); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::RollbackMerge); + + auto * rollback_merge = request.mutable_rollback_merge(); + { + auto merge_state = region->getMergeState(); + rollback_merge->set_commit(merge_state.commit()); + } + } + region->setStateApplying(); + + try + { + raft_cmdpb::AdminRequest first_request = request; + raft_cmdpb::AdminResponse first_response = response; + kvs.handleAdminRaftCmd(std::move(first_request), + std::move(first_response), + region_id, + 32, + 6, + tmt); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "execRollbackMerge: region state is Applying, expect Merging"); + } + ASSERT_EQ(region->peerState(), raft_serverpb::PeerState::Applying); + region->setPeerState(raft_serverpb::PeerState::Merging); + + region->meta.region_state.getMutMergeState().set_commit(1234); + try + { + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + region_id, + 32, + 6, + tmt); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "execRollbackMerge: merge commit index is 1234, expect 31"); + } + region->meta.region_state.getMutMergeState().set_commit(31); + } + { + auto region = kvs.getRegion(region_id); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::RollbackMerge); + + auto * rollback_merge = request.mutable_rollback_merge(); + { + auto merge_state = region->getMergeState(); + rollback_merge->set_commit(merge_state.commit()); + } + } + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + region_id, + 32, + 6, + tmt); + ASSERT_EQ(region->peerState(), raft_serverpb::PeerState::Normal); + } +} + +void RegionKVStoreTest::testRaftSplit(KVStore & kvs, TMTContext & tmt) +{ + { + auto region = kvs.getRegion(1); + auto table_id = 1; + region->insert("lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + region->insert("lock", RecordKVFormat::genKey(table_id, 8), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 8, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 8, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + + ASSERT_EQ(region->dataInfo(), "[write 2 lock 2 default 2 ]"); + } + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + { + // split region + auto region_id = 1; + RegionID region_id2 = 7; + auto source_region = kvs.getRegion(region_id); + metapb::RegionEpoch new_epoch; + new_epoch.set_version(source_region->version() + 1); + new_epoch.set_conf_ver(source_region->confVer()); + TiKVKey start_key1, start_key2, end_key1, end_key2; + { + start_key1 = RecordKVFormat::genKey(1, 5); + start_key2 = RecordKVFormat::genKey(1, 0); + end_key1 = RecordKVFormat::genKey(1, 10); + end_key2 = RecordKVFormat::genKey(1, 5); + } + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::BatchSplit); + raft_cmdpb::BatchSplitResponse * splits = response.mutable_splits(); + { + auto * region = splits->add_regions(); + region->set_id(region_id); + region->set_start_key(start_key1); + region->set_end_key(end_key1); + region->add_peers(); + *region->mutable_region_epoch() = new_epoch; + } + { + auto * region = splits->add_regions(); + region->set_id(region_id2); + region->set_start_key(start_key2); + region->set_end_key(end_key2); + region->add_peers(); + *region->mutable_region_epoch() = new_epoch; + } + } + } + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 1, 20, 5, tmt); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 5), RecordKVFormat::genKey(1, 10))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[write 1 lock 1 default 1 ]"); + ASSERT_EQ(kvs.getRegion(7)->dataInfo(), "[lock 1 ]"); + } + // rollback 1 to before split + // 7 is persisted + { + kvs.handleDestroy(1, tmt); + { + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + auto table_id = 1; + auto region = kvs.getRegion(1); + region->insert("lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + region->insert("lock", RecordKVFormat::genKey(table_id, 8), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 8, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 8, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + + ASSERT_EQ(region->dataInfo(), "[write 2 lock 2 default 2 ]"); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 2); + } + // split again + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 1, 20, 5, tmt); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 5), RecordKVFormat::genKey(1, 10))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[write 1 lock 1 default 1 ]"); + ASSERT_EQ(kvs.getRegion(7)->dataInfo(), "[lock 1 ]"); + } +} + +void RegionKVStoreTest::testRaftChangePeer(KVStore & kvs, TMTContext & tmt) +{ + { + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + auto region = makeRegion(88, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 100)); + lock.regions.emplace(88, region); + lock.index.add(region); + } + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeer); + auto meta = kvs.getRegion(88)->getMetaRegion(); + meta.mutable_peers()->Clear(); + meta.add_peers()->set_id(2); + meta.add_peers()->set_id(4); + *response.mutable_change_peer()->mutable_region() = meta; + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 88, 6, 5, tmt); + ASSERT_NE(kvs.getRegion(88), nullptr); + } + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.set_cmd_type(raft_cmdpb::AdminCmdType::ChangePeerV2); + auto meta = kvs.getRegion(88)->getMetaRegion(); + meta.mutable_peers()->Clear(); + meta.add_peers()->set_id(3); + meta.add_peers()->set_id(4); + *response.mutable_change_peer()->mutable_region() = meta; + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), raft_cmdpb::AdminResponse(response), 88, 7, 5, tmt); + ASSERT_EQ(kvs.getRegion(88), nullptr); + } +} + +void RegionKVStoreTest::testRaftMerge(KVStore & kvs, TMTContext & tmt) +{ + { + kvs.getRegion(1)->clearAllData(); + kvs.getRegion(7)->clearAllData(); + + { + auto region = kvs.getRegion(1); + auto table_id = 1; + region->insert("lock", RecordKVFormat::genKey(table_id, 6), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 6, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 6, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(region->dataInfo(), "[write 1 lock 1 default 1 ]"); + } + { + auto region = kvs.getRegion(7); + auto table_id = 1; + region->insert("lock", RecordKVFormat::genKey(table_id, 2), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 2, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 2, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(region->dataInfo(), "[write 1 lock 1 default 1 ]"); + } + } + + { + auto region_id = 7; + auto source_region = kvs.getRegion(region_id); + auto target_region = kvs.getRegion(1); + + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::PrepareMerge); + + auto * prepare_merge = request.mutable_prepare_merge(); + { + auto min_index = source_region->appliedIndex(); + prepare_merge->set_min_index(min_index); + + metapb::Region * target = prepare_merge->mutable_target(); + *target = target_region->getMetaRegion(); + } + } + + kvs.handleAdminRaftCmd(std::move(request), + std::move(response), + source_region->id(), + 35, + 6, + tmt); + ASSERT_EQ(source_region->peerState(), raft_serverpb::PeerState::Merging); + } + + { + auto source_id = 7, target_id = 1; + auto source_region = kvs.getRegion(source_id); + raft_cmdpb::AdminRequest request; + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::CommitMerge); + auto * commit_merge = request.mutable_commit_merge(); + { + commit_merge->set_commit(source_region->appliedIndex()); + *commit_merge->mutable_source() = source_region->getMetaRegion(); + } + } + source_region->setStateApplying(); + source_region->makeRaftCommandDelegate(kvs.genTaskLock()); + const auto & source_region_meta_delegate = source_region->meta.makeRaftCommandDelegate(); + try + { + kvs.getRegion(target_id)->meta.makeRaftCommandDelegate().checkBeforeCommitMerge(request, source_region_meta_delegate); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "checkBeforeCommitMerge: unexpected state Applying of source 1"); + } + source_region->setPeerState(raft_serverpb::PeerState::Normal); + { + request.mutable_commit_merge()->mutable_source()->mutable_start_key()->clear(); + } + try + { + kvs.getRegion(target_id)->meta.makeRaftCommandDelegate().checkBeforeCommitMerge(request, source_region_meta_delegate); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "checkBeforeCommitMerge: source region not match exist region meta"); + } + } + + { + auto source_id = 7, target_id = 1; + auto source_region = kvs.getRegion(source_id); + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + { + request.set_cmd_type(raft_cmdpb::AdminCmdType::CommitMerge); + auto * commit_merge = request.mutable_commit_merge(); + { + commit_merge->set_commit(source_region->appliedIndex()); + *commit_merge->mutable_source() = source_region->getMetaRegion(); + } + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_TRUE(mmp.count(target_id) != 0); + ASSERT_EQ(mmp.size(), 2); + } + + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), + raft_cmdpb::AdminResponse(response), + target_id, + 36, + 6, + tmt); + + ASSERT_EQ(kvs.getRegion(source_id), nullptr); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + { + // add 7 back + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + auto region = makeRegion(7, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5)); + lock.regions.emplace(7, region); + lock.index.add(region); + } + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(7) != 0); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 2); + } + kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest(request), + raft_cmdpb::AdminResponse(response), + target_id, + 36, + 6, + tmt); + { + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 5))); + ASSERT_TRUE(mmp.count(1) != 0); + ASSERT_EQ(mmp.size(), 1); + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[lock 2 ]"); + } +} + +TEST_F(RegionKVStoreTest, Region) +{ + TableID table_id = 100; + { + auto meta = RegionMeta(createPeer(2, true), createRegionInfo(666, RecordKVFormat::genKey(0, 0), RecordKVFormat::genKey(0, 1000)), initialApplyState()); + ASSERT_EQ(meta.peerId(), 2); + } + auto region = makeRegion(1, RecordKVFormat::genKey(table_id, 0), RecordKVFormat::genKey(table_id, 1000)); + { + ASSERT_TRUE(region->checkIndex(5)); + } + { + auto start_ts = 199; + auto req = GenRegionReadIndexReq(*region, start_ts); + ASSERT_EQ(req.ranges().size(), 1); + ASSERT_EQ(req.start_ts(), start_ts); + ASSERT_EQ(region->getMetaRegion().region_epoch().DebugString(), + req.context().region_epoch().DebugString()); + ASSERT_EQ(region->getRange()->comparableKeys().first.key, req.ranges()[0].start_key()); + ASSERT_EQ(region->getRange()->comparableKeys().second.key, req.ranges()[0].end_key()); + } + { + region->insert("lock", RecordKVFormat::genKey(table_id, 3), RecordKVFormat::encodeLockCfValue(RecordKVFormat::CFModifyFlag::PutFlag, "PK", 3, 20)); + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(1, region->writeCFCount()); + ASSERT_EQ(region->dataInfo(), "[write 1 lock 1 default 1 ]"); + { + auto iter = region->createCommittedScanner(); + auto lock = iter.getLockInfo({100, nullptr}); + ASSERT_NE(lock, nullptr); + auto k = lock->intoLockInfo(); + ASSERT_EQ(k->lock_version(), 3); + } + { + std::optional data_list_read = ReadRegionCommitCache(region, true); + ASSERT_TRUE(data_list_read); + ASSERT_EQ(1, data_list_read->size()); + RemoveRegionCommitCache(region, *data_list_read); + } + ASSERT_EQ(0, region->writeCFCount()); + { + region->remove("lock", RecordKVFormat::genKey(table_id, 3)); + auto iter = region->createCommittedScanner(); + auto lock = iter.getLockInfo({100, nullptr}); + ASSERT_EQ(lock, nullptr); + } + region->clearAllData(); + } + { + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_EQ(region->dataInfo(), "[write 1 ]"); + + auto ori_size = region->dataSize(); + try + { + // insert duplicate records + region->insert("write", RecordKVFormat::genKey(table_id, 3, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::PutFlag, 5)); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Found existing key in hex: 7480000000000000FF645F728000000000FF0000030000000000FAFFFFFFFFFFFFFFF7"); + } + ASSERT_EQ(ori_size, region->dataSize()); + + region->tryCompactionFilter(100); + ASSERT_EQ(region->dataInfo(), "[]"); + } + { + region->insert("write", RecordKVFormat::genKey(table_id, 4, 8), RecordKVFormat::encodeWriteCfValue(RecordKVFormat::CFModifyFlag::DelFlag, 5)); + ASSERT_EQ(1, region->writeCFCount()); + region->remove("write", RecordKVFormat::genKey(table_id, 4, 8)); + ASSERT_EQ(1, region->writeCFCount()); + { + std::optional data_list_read = ReadRegionCommitCache(region, true); + ASSERT_TRUE(data_list_read); + ASSERT_EQ(1, data_list_read->size()); + RemoveRegionCommitCache(region, *data_list_read); + } + ASSERT_EQ(0, region->writeCFCount()); + } + { + ASSERT_EQ(0, region->dataSize()); + + region->insert("default", RecordKVFormat::genKey(table_id, 3, 5), TiKVValue("value1")); + ASSERT_LT(0, region->dataSize()); + + region->remove("default", RecordKVFormat::genKey(table_id, 3, 5)); + ASSERT_EQ(0, region->dataSize()); + + // remove duplicate records + region->remove("default", RecordKVFormat::genKey(table_id, 3, 5)); + ASSERT_EQ(0, region->dataSize()); + } +} + +TEST_F(RegionKVStoreTest, KVStore) +{ + auto ctx = TiFlashTestEnv::getGlobalContext(); + + KVStore & kvs = getKVS(); + { + // Run without read-index workers + + kvs.initReadIndexWorkers( + []() { + return std::chrono::milliseconds(10); + }, + 0); + ASSERT_EQ(kvs.read_index_worker_manager, nullptr); + kvs.asyncRunReadIndexWorkers(); + kvs.stopReadIndexWorkers(); + kvs.releaseReadIndexWorkers(); + } + { + auto store = metapb::Store{}; + store.set_id(1234); + kvs.setStore(store); + ASSERT_EQ(kvs.getStoreID(), store.id()); + } + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + { + auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)); + lock.regions.emplace(2, region); + lock.index.add(region); + } + { + auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)); + lock.regions.emplace(3, region); + lock.index.add(region); + } + } + { + kvs.tryPersist(1); + kvs.gcRegionPersistedCache(Seconds{0}); + } + { + ASSERT_EQ(kvs.regionSize(), 3); + auto mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 15), TiKVKey(""))); + ASSERT_EQ(mmp.size(), 2); + kvs.handleDestroy(3, ctx.getTMTContext()); + kvs.handleDestroy(3, ctx.getTMTContext()); + } + { + RegionMap mmp = kvs.getRegionsByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 15), TiKVKey(""))); + ASSERT_EQ(mmp.size(), 1); + ASSERT_EQ(mmp.at(2)->id(), 2); + } + { + { + raft_cmdpb::RaftCmdRequest request; + { + auto lock_key = RecordKVFormat::genKey(1, 2333); + TiKVValue lock_value = RecordKVFormat::encodeLockCfValue(Region::DelFlag, "pk", 77, 0); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Lock, lock_key, lock_value); + auto write_key = RecordKVFormat::genKey(1, 2333, 1); + TiKVValue write_value = RecordKVFormat::encodeWriteCfValue(Region::PutFlag, 2333); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Write, write_key, write_value); + } + try + { + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 6, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Raw TiDB PK: 800000000000091D, Prewrite ts: 2333 can not found in default cf for key: 7480000000000000FF015F728000000000FF00091D0000000000FAFFFFFFFFFFFFFFFE"); + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[write 1 lock 1 ]"); + kvs.getRegion(1)->tryCompactionFilter(1000); + } + try + { + raft_cmdpb::RaftCmdRequest request; + { + auto key = RecordKVFormat::genKey(1, 2333, 1); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Default, key, "v1"); + } + { + // duplicate + auto key = RecordKVFormat::genKey(1, 2333, 1); + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Default, key, "v1"); + } + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 6, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Found existing key in hex: 7480000000000000FF015F728000000000FF00091D0000000000FAFFFFFFFFFFFFFFFE"); + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[lock 1 default 1 ]"); + kvs.getRegion(1)->remove("default", RecordKVFormat::genKey(1, 2333, 1)); + try + { + raft_cmdpb::RaftCmdRequest request; + { + RegionBench::setupPutRequest(request.add_requests(), ColumnFamilyName::Default, std::string("k1"), "v1"); + } + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 6, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Unexpected eof"); + } + try + { + raft_cmdpb::RaftCmdRequest request; + request.add_requests()->set_cmd_type(::raft_cmdpb::CmdType::Invalid); + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 10, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Unsupport raft cmd Invalid"); + } + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[lock 1 ]"); + { + raft_cmdpb::RaftCmdRequest request; + { + auto lock_key = RecordKVFormat::genKey(1, 2333); + TiKVValue lock_value = RecordKVFormat::encodeLockCfValue(Region::DelFlag, "pk", 77, 0); + RegionBench::setupDelRequest(request.add_requests(), ColumnFamilyName::Lock, lock_key); + } + raft_cmdpb::RaftCmdRequest first_request = request; + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(first_request), 1, 7, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + + RegionBench::setupDelRequest(request.add_requests(), ColumnFamilyName::Write, TiKVKey("illegal key")); + // index <= appliedIndex(), ignore + raft_cmdpb::RaftCmdRequest second_request; + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(second_request), 1, 7, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + try + { + // + request.clear_requests(); + RegionBench::setupDelRequest(request.add_requests(), ColumnFamilyName::Write, TiKVKey("illegal key")); + ASSERT_EQ(kvs.handleWriteRaftCmd(std::move(request), 1, 9, 6, ctx.getTMTContext()), + EngineStoreApplyRes::None); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "Key padding"); + } + + ASSERT_EQ(kvs.getRegion(1)->appliedIndex(), 7); + } + ASSERT_EQ(kvs.getRegion(1)->dataInfo(), "[]"); + + ASSERT_EQ( + kvs.handleWriteRaftCmd(raft_cmdpb::RaftCmdRequest{}, 8192, 7, 6, ctx.getTMTContext()), + EngineStoreApplyRes::NotFound); + } + { + kvs.handleDestroy(2, ctx.getTMTContext()); + ASSERT_EQ(kvs.regionSize(), 1); + } + { + testRaftSplit(kvs, ctx.getTMTContext()); + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{}, raft_cmdpb::AdminResponse{}, 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + } + { + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{}, raft_cmdpb::AdminResponse{}, 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + } + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + raft_cmdpb::AdminResponse first_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(first_response), 7, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + raft_cmdpb::AdminResponse second_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(second_response), 7, 23, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + + request.set_cmd_type(::raft_cmdpb::AdminCmdType::ComputeHash); + raft_cmdpb::AdminResponse third_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(third_response), 7, 24, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); + + request.set_cmd_type(::raft_cmdpb::AdminCmdType::VerifyHash); + raft_cmdpb::AdminResponse fourth_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(fourth_response), 7, 25, 6, ctx.getTMTContext()), EngineStoreApplyRes::None); + + raft_cmdpb::AdminResponse fifth_response = response; + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(fifth_response), 8192, 5, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + { + kvs.setRegionCompactLogConfig(0, 0, 0); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + ASSERT_EQ(kvs.handleAdminRaftCmd(std::move(request), std::move(response), 7, 26, 6, ctx.getTMTContext()), EngineStoreApplyRes::Persist); + } + } + { + testRaftMergeRollback(kvs, ctx.getTMTContext()); + testRaftMerge(kvs, ctx.getTMTContext()); + } + { + testRaftChangePeer(kvs, ctx.getTMTContext()); + } + { + auto ori_snapshot_apply_method = kvs.snapshot_apply_method; + kvs.snapshot_apply_method = TiDB::SnapshotApplyMethod::DTFile_Single; + SCOPE_EXIT({ + kvs.snapshot_apply_method = ori_snapshot_apply_method; + }); + + + auto region_id = 19; + auto region = makeRegion(region_id, RecordKVFormat::genKey(1, 50), RecordKVFormat::genKey(1, 60)); + auto region_id_str = std::to_string(19); + auto & mmp = MockSSTReader::getMockSSTData(); + MockSSTReader::getMockSSTData().clear(); + MockSSTReader::Data default_kv_list; + { + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 55, 5).getStr(), TiKVValue("value1").getStr()); + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 58, 5).getStr(), TiKVValue("value2").getStr()); + } + mmp[MockSSTReader::Key{region_id_str, ColumnFamilyType::Default}] = std::move(default_kv_list); + std::vector sst_views; + sst_views.push_back(SSTView{ + ColumnFamilyType::Default, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }); + { + RegionMockTest mock_test(kvstore.get(), region); + + kvs.handleApplySnapshot( + region->getMetaRegion(), + 2, + SSTViewVec{sst_views.data(), sst_views.size()}, + 8, + 5, + ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(19)->checkIndex(8), true); + try + { + kvs.handleApplySnapshot( + region->getMetaRegion(), + 2, + {}, // empty + 6, // smaller index + 5, + ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "[region 19] already has newer apply-index 8 than 6, should not happen"); + } + } + + { + { + auto region = makeRegion(22, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 9, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); + } + try + { + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 9, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, but not tombstone + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "range of region 20 is overlapped with 22, state: region { id: 22 }"); + } + + { + const auto * ori_ptr = proxy_helper->proxy_ptr.inner; + proxy_helper->proxy_ptr.inner = nullptr; + SCOPE_EXIT({ + proxy_helper->proxy_ptr.inner = ori_ptr; + }); + + try + { + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 10, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "getRegionLocalState meet internal error: RaftStoreProxyPtr is none"); + } + } + + { + proxy_instance->getRegion(22)->setSate(({ + raft_serverpb::RegionLocalState s; + s.set_state(::raft_serverpb::PeerState::Tombstone); + s; + })); + auto region = makeRegion(20, RecordKVFormat::genKey(55, 50), RecordKVFormat::genKey(55, 100)); + auto ingest_ids = kvs.preHandleSnapshotToFiles( + region, + {}, + 10, + 5, + ctx.getTMTContext()); + kvs.checkAndApplySnapshot(RegionPtrWithSnapshotFiles{region, std::move(ingest_ids)}, ctx.getTMTContext()); // overlap, tombstone, remove previous one + + auto state = proxy_helper->getRegionLocalState(8192); + ASSERT_EQ(state.state(), raft_serverpb::PeerState::Tombstone); + } + + kvs.handleDestroy(20, ctx.getTMTContext()); + } + } + + { + auto region_id = 19; + auto region_id_str = std::to_string(19); + auto & mmp = MockSSTReader::getMockSSTData(); + MockSSTReader::getMockSSTData().clear(); + MockSSTReader::Data default_kv_list; + { + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 55, 5).getStr(), TiKVValue("value1").getStr()); + default_kv_list.emplace_back(RecordKVFormat::genKey(1, 58, 5).getStr(), TiKVValue("value2").getStr()); + } + mmp[MockSSTReader::Key{region_id_str, ColumnFamilyType::Default}] = std::move(default_kv_list); + + // Mock SST data for handle [star, end) + auto region = kvs.getRegion(region_id); + + RegionMockTest mock_test(kvstore.get(), region); + + { + // Mocking ingest a SST for column family "Write" + std::vector sst_views; + sst_views.push_back(SSTView{ + ColumnFamilyType::Default, + BaseBuffView{region_id_str.data(), region_id_str.length()}, + }); + kvs.handleIngestSST( + region_id, + SSTViewVec{sst_views.data(), sst_views.size()}, + 100, + 1, + ctx.getTMTContext()); + ASSERT_EQ(kvs.getRegion(19)->checkIndex(100), true); + } + } + + { + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::InvalidAdmin); + + try + { + kvs.handleAdminRaftCmd(std::move(request), std::move(response), 1, 110, 6, ctx.getTMTContext()); + ASSERT_TRUE(false); + } + catch (Exception & e) + { + ASSERT_EQ(e.message(), "unsupported admin command type InvalidAdmin"); + } + } + { + // There shall be data to flush. + ASSERT_EQ(kvs.needFlushRegionData(19, ctx.getTMTContext()), true); + // Force flush until succeed only for testing. + ASSERT_EQ(kvs.tryFlushRegionData(19, true, ctx.getTMTContext(), 0, 0), true); + // Non existing region. + // Flush and CompactLog will not panic. + ASSERT_EQ(kvs.tryFlushRegionData(1999, true, ctx.getTMTContext(), 0, 0), true); + raft_cmdpb::AdminRequest request; + raft_cmdpb::AdminResponse response; + request.mutable_compact_log(); + request.set_cmd_type(::raft_cmdpb::AdminCmdType::CompactLog); + ASSERT_EQ(kvs.handleAdminRaftCmd(raft_cmdpb::AdminRequest{request}, std::move(response), 1999, 22, 6, ctx.getTMTContext()), EngineStoreApplyRes::NotFound); + } +} + +TEST_F(RegionKVStoreTest, KVStoreRestore) +{ + { + KVStore & kvs = getKVS(); + { + auto store = metapb::Store{}; + store.set_id(1234); + kvs.setStore(store); + ASSERT_EQ(kvs.getStoreID(), store.id()); + } + { + ASSERT_EQ(kvs.getRegion(0), nullptr); + auto task_lock = kvs.genTaskLock(); + auto lock = kvs.genRegionWriteLock(task_lock); + { + auto region = makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)); + lock.regions.emplace(1, region); + lock.index.add(region); + } + { + auto region = makeRegion(2, RecordKVFormat::genKey(1, 10), RecordKVFormat::genKey(1, 20)); + lock.regions.emplace(2, region); + lock.index.add(region); + } + { + auto region = makeRegion(3, RecordKVFormat::genKey(1, 30), RecordKVFormat::genKey(1, 40)); + lock.regions.emplace(3, region); + lock.index.add(region); + } + } + kvs.tryPersist(1); + kvs.tryPersist(2); + kvs.tryPersist(3); + } + { + KVStore & kvs = reloadKVSFromDisk(); + kvs.getRegion(1); + kvs.getRegion(2); + kvs.getRegion(3); + } +} + +void test_mergeresult() +{ + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "x", ""), createRegionInfo(1000, "", "x")).source_at_left, false); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "", "x"), createRegionInfo(1000, "x", "")).source_at_left, true); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "x", "y"), createRegionInfo(1000, "y", "z")).source_at_left, true); + ASSERT_EQ(MetaRaftCommandDelegate::computeRegionMergeResult(createRegionInfo(1, "y", "z"), createRegionInfo(1000, "x", "y")).source_at_left, false); + + { + RegionState region_state; + bool source_at_left; + RegionState source_region_state; + + region_state.setStartKey(RecordKVFormat::genKey(1, 0)); + region_state.setEndKey(RecordKVFormat::genKey(1, 10)); + + source_region_state.setStartKey(RecordKVFormat::genKey(1, 10)); + source_region_state.setEndKey(RecordKVFormat::genKey(1, 20)); + + source_at_left = false; + + ChangeRegionStateRange(region_state, source_at_left, source_region_state); + + ASSERT_EQ(region_state.getRange()->comparableKeys().first.key, RecordKVFormat::genKey(1, 0)); + ASSERT_EQ(region_state.getRange()->comparableKeys().second.key, RecordKVFormat::genKey(1, 20)); + } + { + RegionState region_state; + bool source_at_left; + RegionState source_region_state; + + region_state.setStartKey(RecordKVFormat::genKey(2, 5)); + region_state.setEndKey(RecordKVFormat::genKey(2, 10)); + + source_region_state.setStartKey(RecordKVFormat::genKey(2, 0)); + source_region_state.setEndKey(RecordKVFormat::genKey(2, 5)); + + source_at_left = true; + + ChangeRegionStateRange(region_state, source_at_left, source_region_state); + + ASSERT_EQ(region_state.getRange()->comparableKeys().first.key, RecordKVFormat::genKey(2, 0)); + ASSERT_EQ(region_state.getRange()->comparableKeys().second.key, RecordKVFormat::genKey(2, 10)); + } +} + +TEST_F(RegionKVStoreTest, Basic) +{ + { + RegionsRangeIndex region_index; + const auto & root_map = region_index.getRoot(); + ASSERT_EQ(root_map.size(), 2); + + region_index.add(makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10))); + + ASSERT_EQ(root_map.begin()->second.region_map.size(), 0); + + region_index.add(makeRegion(2, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 3))); + region_index.add(makeRegion(3, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1))); + + auto res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 3); + + region_index.add(makeRegion(4, RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 4))); + + ASSERT_EQ(root_map.size(), 7); + + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 4); + + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 1), TiKVKey(""))); + ASSERT_EQ(res.size(), 3); + + res = region_index.findByRangeOverlap( + RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 2), RecordKVFormat::genKey(1, 5))); + ASSERT_EQ(res.size(), 3); + ASSERT_TRUE(res.find(1) != res.end()); + ASSERT_TRUE(res.find(2) != res.end()); + ASSERT_TRUE(res.find(4) != res.end()); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 4)), 4); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 3); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1)), 3); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 2); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 3)), 2); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_EQ(res.size(), 1); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 10)), 1); + res = region_index.findByRangeOverlap(RegionRangeKeys::makeComparableKeys(TiKVKey(""), TiKVKey(""))); + ASSERT_TRUE(res.empty()); + + ASSERT_EQ(root_map.size(), 2); + } + + { + RegionsRangeIndex region_index; + const auto & root_map = region_index.getRoot(); + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(TiKVKey(), TiKVKey()), 1); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found region 1"); + } + + region_index.add(makeRegion(2, RecordKVFormat::genKey(1, 3), RecordKVFormat::genKey(1, 5))); + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 4), RecordKVFormat::genKey(1, 5)), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found start key"); + } + + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 3), RecordKVFormat::genKey(1, 4)), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found end key"); + } + + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 3), RecordKVFormat::genKey(1, 3)), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): range of region 2 is empty"); + } + + try + { + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 3), TiKVKey()), 2); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + ASSERT_EQ(res, "void DB::RegionsRangeIndex::remove(const DB::RegionRange &, DB::RegionID): not found region 2"); + } + + region_index.clear(); + + try + { + region_index.add(makeRegion(6, RecordKVFormat::genKey(6, 6), RecordKVFormat::genKey(6, 6))); + assert(false); + } + catch (Exception & e) + { + const auto & res = e.message(); + std::string tar = "Illegal region range, should not happen"; + ASSERT(res.size() > tar.size()); + ASSERT_EQ(res.substr(0, tar.size()), tar); + } + + region_index.clear(); + + region_index.add(makeRegion(1, RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1))); + region_index.add(makeRegion(2, RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 2))); + region_index.add(makeRegion(3, RecordKVFormat::genKey(1, 2), RecordKVFormat::genKey(1, 3))); + + ASSERT_EQ(root_map.size(), 6); + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 2), RecordKVFormat::genKey(1, 3)), 3); + ASSERT_EQ(root_map.size(), 5); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 0), RecordKVFormat::genKey(1, 1)), 1); + ASSERT_EQ(root_map.size(), 4); + + region_index.remove(RegionRangeKeys::makeComparableKeys(RecordKVFormat::genKey(1, 1), RecordKVFormat::genKey(1, 2)), 2); + ASSERT_EQ(root_map.size(), 2); + } + { + test_mergeresult(); + } +} + +} // namespace tests +} // namespace DB diff --git a/dbms/src/Storages/Transaction/tests/gtest_region_block_reader.cpp b/dbms/src/Storages/Transaction/tests/gtest_region_block_reader.cpp new file mode 100644 index 00000000000..f31133656e9 --- /dev/null +++ b/dbms/src/Storages/Transaction/tests/gtest_region_block_reader.cpp @@ -0,0 +1,640 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using TableInfo = TiDB::TableInfo; + +namespace DB::tests +{ +using ColumnIDs = std::vector; +class RegionBlockReaderTest : public ::testing::Test +{ +public: + RegionBlockReaderTest() + : logger(Logger::get("RegionBlockReaderTest")) + {} + +protected: + Int64 handle_value = 100; + UInt8 del_mark_value = 0; + UInt64 version_value = 100; + size_t rows = 3; + + RegionDataReadInfoList data_list_read; + std::unordered_map fields_map; + + LoggerPtr logger; + + enum RowEncodeVersion + { + RowV1, + RowV2 + }; + +protected: + void SetUp() override + { + data_list_read.clear(); + fields_map.clear(); + } + + void TearDown() override {} + + void encodeColumns(const TableInfo & table_info, const std::vector & fields, RowEncodeVersion row_version) + { + // for later check + std::unordered_map column_name_columns_index_map; + for (size_t i = 0; i < table_info.columns.size(); i++) + { + fields_map.emplace(table_info.columns[i].id, fields[i]); + column_name_columns_index_map.emplace(table_info.columns[i].name, i); + } + + std::vector value_encode_fields; + std::vector key_encode_fields; + for (size_t i = 0; i < table_info.columns.size(); i++) + { + if (table_info.is_common_handle || table_info.pk_is_handle) + { + if (table_info.columns[i].hasPriKeyFlag()) + key_encode_fields.emplace_back(fields[i]); + else + value_encode_fields.emplace_back(fields[i]); + } + else + { + value_encode_fields.emplace_back(fields[i]); + } + } + + // create the RawTiDBPK section of encoded key + WriteBufferFromOwnString pk_buf; + if (table_info.is_common_handle) + { + const auto & primary_index_info = table_info.getPrimaryIndexInfo(); + for (size_t i = 0; i < primary_index_info.idx_cols.size(); i++) + { + auto idx = column_name_columns_index_map[primary_index_info.idx_cols[i].name]; + EncodeDatum(key_encode_fields[i], table_info.columns[idx].getCodecFlag(), pk_buf); + } + } + else + { + DB::EncodeInt64(handle_value, pk_buf); + } + RawTiDBPK pk{std::make_shared(pk_buf.releaseStr())}; + + // create encoded value + WriteBufferFromOwnString value_buf; + if (row_version == RowEncodeVersion::RowV1) + { + encodeRowV1(table_info, value_encode_fields, value_buf); + } + else if (row_version == RowEncodeVersion::RowV2) + { + encodeRowV2(table_info, value_encode_fields, value_buf); + } + else + { + throw Exception("Unknown row format " + std::to_string(row_version), ErrorCodes::LOGICAL_ERROR); + } + auto row_value = std::make_shared(value_buf.releaseStr()); + for (size_t i = 0; i < rows; i++) + data_list_read.emplace_back(pk, del_mark_value, version_value, row_value); + } + + void checkBlock(DecodingStorageSchemaSnapshotConstPtr decoding_schema, const Block & block) const + { + ASSERT_EQ(block.columns(), decoding_schema->column_defines->size()); + for (size_t row = 0; row < rows; row++) + { + for (size_t pos = 0; pos < block.columns(); pos++) + { + const auto & column_element = block.getByPosition(pos); + auto gen_error_log = [&]() { + return fmt::format( + " when checking column\n id={}, name={}, nrow={}\n decoded block is:\n{}\n", + column_element.column_id, + column_element.name, + row, + getColumnsContent(block.getColumnsWithTypeAndName())); + }; + if (row == 0) + { + ASSERT_EQ(column_element.column->size(), rows); + } + if (column_element.name == EXTRA_HANDLE_COLUMN_NAME) + { + if (decoding_schema->is_common_handle) + { + ASSERT_FIELD_EQ((*column_element.column)[row], Field(*std::get<0>(data_list_read[row]))) << gen_error_log(); + } + else + { + ASSERT_FIELD_EQ((*column_element.column)[row], Field(handle_value)) << gen_error_log(); + } + } + else if (column_element.name == VERSION_COLUMN_NAME) + { + ASSERT_FIELD_EQ((*column_element.column)[row], Field(version_value)) << gen_error_log(); + } + else if (column_element.name == TAG_COLUMN_NAME) + { + ASSERT_FIELD_EQ((*column_element.column)[row], Field(NearestFieldType::Type(del_mark_value))) << gen_error_log(); + } + else + { + if (fields_map.count(column_element.column_id) > 0) + ASSERT_FIELD_EQ((*column_element.column)[row], fields_map.at(column_element.column_id)) << gen_error_log(); + else + LOG_INFO(logger, "ignore value check for new added column, id={}, name={}", column_element.column_id, column_element.name); + } + } + } + } + + bool decodeAndCheckColumns(DecodingStorageSchemaSnapshotConstPtr decoding_schema, bool force_decode) const + { + RegionBlockReader reader{decoding_schema}; + Block block = createBlockSortByColumnID(decoding_schema); + if (!reader.read(block, data_list_read, force_decode)) + return false; + + checkBlock(decoding_schema, block); + return true; + } + + std::pair> getNormalTableInfoFields(const ColumnIDs & pk_col_ids, bool is_common_handle) const + { + return getTableInfoAndFields( + pk_col_ids, + is_common_handle, + ColumnIDValue(2, handle_value), + ColumnIDValue(3, std::numeric_limits::max()), + ColumnIDValue(4, std::numeric_limits::min()), + ColumnIDValue(9, String("aaa")), + ColumnIDValue(10, DecimalField(ToDecimal(12345678910ULL, 4), 4)), + ColumnIDValueNull(11)); + } + + TableInfo getTableInfoWithMoreColumns(const ColumnIDs & handle_ids, bool is_common_handle) + { + TableInfo table_info; + std::tie(table_info, std::ignore) = getTableInfoAndFields( + handle_ids, + is_common_handle, + ColumnIDValue(1, String("")), + ColumnIDValue(2, handle_value), + ColumnIDValue(3, std::numeric_limits::max()), + ColumnIDValue(4, std::numeric_limits::min()), + ColumnIDValue(8, String("")), + ColumnIDValue(9, String("aaa")), + ColumnIDValue(10, DecimalField(ToDecimal(12345678910ULL, 4), 4)), + ColumnIDValueNull(11), + ColumnIDValue(13, String(""))); + + // add default value for missing column + std::vector missing_column_ids{1, 8, 13}; + String missing_column_default_value = String("default"); + for (auto & column : table_info.columns) + { + if (std::find(missing_column_ids.begin(), missing_column_ids.end(), column.id) != missing_column_ids.end()) + { + column.origin_default_value = missing_column_default_value; + fields_map.emplace(column.id, Field(missing_column_default_value)); + } + } + return table_info; + } + + TableInfo getTableInfoWithLessColumns(const ColumnIDs & handle_ids, bool is_common_handle) const + { + TableInfo table_info; + std::tie(table_info, std::ignore) = getTableInfoAndFields( + handle_ids, + is_common_handle, + ColumnIDValue(2, handle_value), + ColumnIDValue(4, std::numeric_limits::min()), + ColumnIDValue(9, String("aaa")), + ColumnIDValue(10, DecimalField(ToDecimal(12345678910ULL, 4), 4))); + return table_info; + } + + TableInfo getTableInfoWithMoreNarrowIntType(const ColumnIDs & handle_ids, bool is_common_handle) const + { + TableInfo table_info; + std::tie(table_info, std::ignore) = getTableInfoAndFields( + handle_ids, + is_common_handle, + ColumnIDValue(2, handle_value), + ColumnIDValue(3, std::numeric_limits::max()), + ColumnIDValue(4, std::numeric_limits::min()), + ColumnIDValue(9, String("aaa")), + ColumnIDValue(10, DecimalField(ToDecimal(12345678910ULL, 4), 4)), + ColumnIDValueNull(11)); + return table_info; + } + + TableInfo getTableInfoFieldsForInvalidNULLTest(const ColumnIDs & handle_ids, bool is_common_handle) const + { + TableInfo table_info; + std::tie(table_info, std::ignore) = getTableInfoAndFields( + handle_ids, + is_common_handle, + ColumnIDValue(2, handle_value), + ColumnIDValue(3, std::numeric_limits::max()), + ColumnIDValue(4, std::numeric_limits::min()), + ColumnIDValue(9, String("aaa")), + ColumnIDValue(10, DecimalField(ToDecimal(12345678910ULL, 4), 4)), + ColumnIDValue(11, std::numeric_limits::min())); + return table_info; + } +}; + +String bytesFromHexString(std::string_view hex_str) +{ + assert(hex_str.size() % 2 == 0); + String bytes(hex_str.size() / 2, '\x00'); + for (size_t i = 0; i < bytes.size(); ++i) + { + bytes[i] = unhex2(hex_str.data() + i * 2); + } + return bytes; +} + +TEST_F(RegionBlockReaderTest, PKIsNotHandle) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_FALSE(table_info.getColumnInfo(2).hasPriKeyFlag()); + + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + ASSERT_TRUE(decodeAndCheckColumns(decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, PKIsHandle) +{ + auto [table_info, fields] = getNormalTableInfoFields({2}, false); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, true); + ASSERT_TRUE(table_info.getColumnInfo(2).hasPriKeyFlag()); + + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + ASSERT_TRUE(decodeAndCheckColumns(decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, CommonHandle) +{ + auto [table_info, fields] = getNormalTableInfoFields({2, 3, 4}, true); + ASSERT_EQ(table_info.is_common_handle, true); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(2).hasPriKeyFlag()); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(table_info.getColumnInfo(4).hasPriKeyFlag()); + + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + ASSERT_TRUE(decodeAndCheckColumns(decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, MissingColumnRowV2) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto new_table_info = getTableInfoWithMoreColumns({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_TRUE(decodeAndCheckColumns(new_decoding_schema, false)); +} + +TEST_F(RegionBlockReaderTest, MissingColumnRowV1) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + auto new_table_info = getTableInfoWithMoreColumns({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_TRUE(decodeAndCheckColumns(new_decoding_schema, false)); +} + +TEST_F(RegionBlockReaderTest, ExtraColumnRowV2) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto new_table_info = getTableInfoWithLessColumns({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, ExtraColumnRowV1) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + auto new_table_info = getTableInfoWithLessColumns({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, OverflowColumnRowV2) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + auto new_table_info = getTableInfoWithMoreNarrowIntType({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_ANY_THROW(decodeAndCheckColumns(new_decoding_schema, true)); + + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + ASSERT_TRUE(decodeAndCheckColumns(decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, OverflowColumnRowV1) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + auto new_table_info = getTableInfoWithMoreNarrowIntType({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_ANY_THROW(decodeAndCheckColumns(new_decoding_schema, true)); + + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + ASSERT_TRUE(decodeAndCheckColumns(decoding_schema, true)); +} + +TEST_F(RegionBlockReaderTest, InvalidNULLRowV2) +try +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + ASSERT_FALSE(table_info.getColumnInfo(11).hasNotNullFlag()); // col 11 is nullable + + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + + auto new_table_info = getTableInfoFieldsForInvalidNULLTest({EXTRA_HANDLE_COLUMN_ID}, false); + ASSERT_TRUE(new_table_info.getColumnInfo(11).hasNotNullFlag()); // col 11 is not null + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_ANY_THROW(decodeAndCheckColumns(new_decoding_schema, true)); +} +CATCH + +TEST_F(RegionBlockReaderTest, InvalidNULLRowV1) +{ + auto [table_info, fields] = getNormalTableInfoFields({EXTRA_HANDLE_COLUMN_ID}, false); + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + auto new_table_info = getTableInfoFieldsForInvalidNULLTest({EXTRA_HANDLE_COLUMN_ID}, false); + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + ASSERT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + ASSERT_ANY_THROW(decodeAndCheckColumns(new_decoding_schema, true)); +} + + +TEST_F(RegionBlockReaderTest, MissingPrimaryKeyColumnRowV2) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) DEFAULT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello"), ColumnIDValueNull(4)); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_FALSE(table_info.getColumnInfo(4).hasPriKeyFlag()); + + // FIXME: actually TiDB won't encode the "NULL" for column4 into value + // but now the `RowEncoderV2` does not support this, we use `RegionBlockReaderTest::ReadFromRegion` + // to test that. + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + + // Mock re-create the primary key index with "column4" that contains `NULL` value + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + // FIXME: actually we need to decode the block with force_decode=true, see the + // comments before `encodeColumns` + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); + // // force_decode=false can not decode because there are + // // missing value for column with primary key flag. + // EXPECT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + // // force_decode=true, decode ok. + // EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} +CATCH + +TEST_F(RegionBlockReaderTest, MissingPrimaryKeyColumnRowV1) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) DEFAULT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello"), ColumnIDValueNull(4)); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_FALSE(table_info.getColumnInfo(4).hasPriKeyFlag()); + + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + + // Mock re-create the primary key index with "column4" that contains `NULL` value + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, false)); +} +CATCH + +TEST_F(RegionBlockReaderTest, NewMissingPrimaryKeyColumnRowV2) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello")); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_ANY_THROW(table_info.getColumnInfo(4)); // not exist + + encodeColumns(table_info, fields, RowEncodeVersion::RowV2); + + // Mock re-create the primary key index with new-added "column4" + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + // force_decode=false can not decode because there are + // missing value for column with primary key flag. + EXPECT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + // force_decode=true, decode ok. + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} +CATCH + +TEST_F(RegionBlockReaderTest, NewMissingPrimaryKeyColumnRowV1) +try +{ + // Mock a table + // `t_case` { + // column3 varchar(32) NOT NULL, + // primary key (`column3`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + auto [table_info, fields] = getTableInfoAndFields(/*pk_col_ids*/ {3}, false, ColumnIDValue(3, "hello")); + ASSERT_EQ(table_info.is_common_handle, false); + ASSERT_EQ(table_info.pk_is_handle, false); + ASSERT_TRUE(table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_ANY_THROW(table_info.getColumnInfo(4)); // not exist + + encodeColumns(table_info, fields, RowEncodeVersion::RowV1); + + // Mock re-create the primary key index with new-added "column4" + // `t_case` { + // column3 varchar(32) NOT NULL, + // column4 varchar(20) NOT NULL, + // primary key (`column3`, `column4`) /*T![clustered_index] NONCLUSTERED */ + // -- _tidb_rowid bigint, // hidden handle + // } + TableInfo new_table_info; + std::tie(new_table_info, std::ignore) = getTableInfoAndFields(/*pk_col_ids*/ {3, 4}, false, ColumnIDValueNull(3), ColumnIDValueNull(4)); + ASSERT_EQ(new_table_info.is_common_handle, false); + ASSERT_EQ(new_table_info.pk_is_handle, false); + ASSERT_TRUE(new_table_info.getColumnInfo(3).hasPriKeyFlag()); + ASSERT_TRUE(new_table_info.getColumnInfo(4).hasPriKeyFlag()); + + auto new_decoding_schema = getDecodingStorageSchemaSnapshot(new_table_info); + // force_decode=false can not decode because there are + // missing value for column with primary key flag. + EXPECT_FALSE(decodeAndCheckColumns(new_decoding_schema, false)); + // force_decode=true, decode ok. + EXPECT_TRUE(decodeAndCheckColumns(new_decoding_schema, true)); +} +CATCH + +TEST_F(RegionBlockReaderTest, ReadFromRegion) +try +{ + TableInfo table_info(R"({"cols":[ + {"comment":"","default":null,"default_bit":null,"id":1,"name":{"L":"case_no","O":"case_no"},"offset":0,"origin_default":null,"state":5,"type":{"Charset":"utf8mb4","Collate":"utf8mb4_bin","Decimal":0,"Elems":null,"Flag":4099,"Flen":32,"Tp":15}}, + {"comment":"","default":null,"default_bit":null,"id":2,"name":{"L":"p","O":"p"},"offset":1,"origin_default":null,"state":5,"type":{"Charset":"utf8mb4","Collate":"utf8mb4_bin","Decimal":0,"Elems":null,"Flag":0,"Flen":12,"Tp":15}}, + {"comment":"","default":null,"default_bit":null,"id":3,"name":{"L":"source","O":"source"},"offset":2,"origin_default":"","state":5,"type":{"Charset":"utf8mb4","Collate":"utf8mb4_bin","Decimal":0,"Elems":null,"Flag":4099,"Flen":20,"Tp":15}} + ],"comment":"","id":77,"index_info":[],"is_common_handle":false,"name":{"L":"t_case","O":"t_case"},"partition":null,"pk_is_handle":false,"schema_version":62,"state":5,"tiflash_replica":{"Count":1},"update_timestamp":435984541435559947})"); + + RegionID region_id = 4; + String region_start_key(bytesFromHexString("7480000000000000FF445F720000000000FA")); + String region_end_key(bytesFromHexString("7480000000000000FF4500000000000000F8")); + auto region = makeRegion(region_id, region_start_key, region_end_key); + // the hex kv dump from SSTFile + std::vector> kvs = { + {"7480000000000000FF4D5F728000000000FF0000010000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000010000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010031"}, + {"7480000000000000FF4D5F728000000000FF0000020000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000020000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010032"}, + {"7480000000000000FF4D5F728000000000FF0000030000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000030000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010033"}, + {"7480000000000000FF4D5F728000000000FF0000040000000000FAF9F3125EFCF3FFFE", "4C8280809290B4BB8606"}, + {"7480000000000000FF4D5F728000000000FF0000040000000000FAF9F3126548ABFFFC", "508180D0BAABB3BB8606760A80000100000001010034"}, + }; + for (const auto & [k, v] : kvs) + { + region->insert(ColumnFamilyType::Write, TiKVKey(bytesFromHexString(k)), TiKVValue(bytesFromHexString(v))); + } + + auto data_list_read = ReadRegionCommitCache(region, true); + ASSERT_TRUE(data_list_read.has_value()); + + auto decoding_schema = getDecodingStorageSchemaSnapshot(table_info); + { + // force_decode=false can not decode because there are + // missing value for column with primary key flag. + auto reader = RegionBlockReader(decoding_schema); + Block res_block = createBlockSortByColumnID(decoding_schema); + EXPECT_FALSE(reader.read(res_block, *data_list_read, false)); + } + { + // force_decode=true can decode the block + auto reader = RegionBlockReader(decoding_schema); + Block res_block = createBlockSortByColumnID(decoding_schema); + EXPECT_TRUE(reader.read(res_block, *data_list_read, true)); + res_block.checkNumberOfRows(); + EXPECT_EQ(res_block.rows(), 4); + ASSERT_COLUMN_EQ(res_block.getByName("case_no"), createColumn({"1", "2", "3", "4"})); + } +} +CATCH + + +} // namespace DB::tests diff --git a/tests/fullstack-test2/ddl/alter_pk.test b/tests/fullstack-test2/ddl/alter_pk.test index 3541fcb270a..5859e21cf46 100644 --- a/tests/fullstack-test2/ddl/alter_pk.test +++ b/tests/fullstack-test2/ddl/alter_pk.test @@ -43,3 +43,56 @@ mysql> alter table test.t drop primary key; │ f │ Nullable(Int32) │ │ │ │ _tidb_rowid │ Int64 │ │ │ └─────────────┴─────────────────┴──────────────┴────────────────────┘ + +# issue 5859, case 1 +mysql> drop table if exists test.t_case; +## create table with `source` is nullable +## insert some data and left `source` to be empty +mysql> create table test.t_case (`case_no` varchar(32) not null,`source` varchar(20) default null,`p` varchar(12) DEFAULT NULL,primary key (`case_no`)); +mysql> insert into test.t_case(case_no) values ("1"), ("2"), ("3"), ("4"); + +## drop the primary key, fill the `source` to be non-empty +## add new primary key with case_no and source +mysql> alter table test.t_case drop primary key; +mysql> update test.t_case set `source` = '' where `source` is NULL; +mysql> alter table test.t_case add primary key (`case_no`, `source`); + +## send the snapshot data to tiflash +mysql> alter table test.t_case set tiflash replica 1; +func> wait_table test t_case +mysql> select case_no,p,source from test.t_case; ++---------+------+--------+ +| case_no | p | source | ++---------+------+--------+ +| 1 | NULL | | +| 2 | NULL | | +| 3 | NULL | | +| 4 | NULL | | ++---------+------+--------+ + + +# issue 5859, case 2 +mysql> drop table if exists test.t_case; +## create table with `case_no` +mysql> create table test.t_case (`case_no` varchar(32) not null,`p` varchar(12) DEFAULT NULL,primary key (`case_no`)); +mysql> insert into test.t_case(case_no) values ("1"), ("2"), ("3"), ("4"); + +mysql> alter table test.t_case add column `source` varchar(20) not null; +## drop the primary key, add new primary key with case_no and source +mysql> alter table test.t_case drop primary key; +mysql> alter table test.t_case add primary key (`case_no`, `source`); + +## send the snapshot data to tiflash +mysql> alter table test.t_case set tiflash replica 1; +func> wait_table test t_case +mysql> select case_no,p,source from test.t_case; ++---------+------+--------+ +| case_no | p | source | ++---------+------+--------+ +| 1 | NULL | | +| 2 | NULL | | +| 3 | NULL | | +| 4 | NULL | | ++---------+------+--------+ + +mysql> drop table if exists test.t_case;