diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index bf7b29d8fb8..acb8b011333 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -558,13 +558,29 @@ DM::WriteResult DeltaMergeStore::write( { dedup_ver.insert(v); } + + std::unordered_set dedup_handles; + auto extra_handle_col = tryGetByColumnId(block, EXTRA_HANDLE_COLUMN_ID); + if (extra_handle_col.column) + { + if (extra_handle_col.type->getTypeId() == TypeIndex::Int64) + { + const auto * extra_handles = toColumnVectorDataPtr(extra_handle_col.column); + for (auto h : *extra_handles) + { + dedup_handles.insert(h); + } + } + } + LOG_DEBUG( log, - "region_id={} applied_index={} record_count={} versions={}", + "region_id={} applied_index={} record_count={} versions={} handles={}", applied_status.region_id, applied_status.applied_index, block.rows(), - dedup_ver); + dedup_ver, + dedup_handles); } const auto bytes = block.bytes(); diff --git a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp index c01b8c3fe25..73a97f6924b 100644 --- a/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp +++ b/dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp @@ -355,10 +355,34 @@ std::optional ReadRegionCommitCache(const RegionPtr & re RegionDataReadInfoList data_list_read; data_list_read.reserve(scanner.writeMapSize()); + auto read_tso = region->getLastObservedReadTso(); + Timestamp min_error_commit_tso = std::numeric_limits::max(); + size_t error_prone_count = 0; do { - data_list_read.emplace_back(scanner.next()); + // A read index request with read_tso will stop concurrency manager from committing txns with smaller tso by advancing max_ts to at least read_tso. + auto data_read = scanner.next(); + // It's a natual fallback when there has not been any read_tso on this region. + if (data_read.commit_ts <= read_tso) + { + error_prone_count++; + min_error_commit_tso = std::min(min_error_commit_tso, data_read.commit_ts); + } + data_list_read.emplace_back(std::move(data_read)); } while (scanner.hasNext()); + if unlikely (error_prone_count > 0) + { + LOG_INFO( + DB::Logger::get(), + "Error prone txn commit, tot_count={} error_prone_count={} min_error_commit_tso={} read_tso={} " + "region_id={} applied_index={}", + data_list_read.size(), + error_prone_count, + min_error_commit_tso, + read_tso, + region->id(), + region->appliedIndex()); + } return data_list_read; } diff --git a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp index 186c84f9142..ee2df9cb4f0 100644 --- a/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp +++ b/dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp @@ -365,6 +365,8 @@ void LearnerReadWorker::waitIndex( const auto & region = regions_snapshot.find(region_to_query.region_id)->second; + region->observeLearnerReadEvent(mvcc_query_info.start_ts); + const auto total_wait_index_elapsed_ms = watch.elapsedMilliseconds(); const auto index_to_wait = batch_read_index_result.find(region_to_query.region_id)->second.read_index(); if (timeout_ms != 0 && total_wait_index_elapsed_ms > timeout_ms) diff --git a/dbms/src/Storages/KVStore/Region.cpp b/dbms/src/Storages/KVStore/Region.cpp index 4f29de08e73..5ccbc6326f7 100644 --- a/dbms/src/Storages/KVStore/Region.cpp +++ b/dbms/src/Storages/KVStore/Region.cpp @@ -396,4 +396,20 @@ void Region::mergeDataFrom(const Region & other) this->data.mergeFrom(other.data); this->data.orphan_keys_info.mergeFrom(other.data.orphan_keys_info); } + +void Region::observeLearnerReadEvent(Timestamp read_tso) const +{ + auto ori = last_observed_read_tso.load(); + if (read_tso > ori) + { + // Do not retry if failed, though may lost some update here, however the total read_tso can advance. + last_observed_read_tso.compare_exchange_strong(ori, read_tso); + } +} + +Timestamp Region::getLastObservedReadTso() const +{ + return last_observed_read_tso.load(); +} + } // namespace DB diff --git a/dbms/src/Storages/KVStore/Region.h b/dbms/src/Storages/KVStore/Region.h index 5ace3a2723d..f5afabc6d8a 100644 --- a/dbms/src/Storages/KVStore/Region.h +++ b/dbms/src/Storages/KVStore/Region.h @@ -261,6 +261,8 @@ class Region : public std::enable_shared_from_this // Methods to handle orphan keys under raftstore v2. void beforePrehandleSnapshot(uint64_t region_id, std::optional deadline_index); void afterPrehandleSnapshot(int64_t ongoing); + void observeLearnerReadEvent(Timestamp read_tso) const; + Timestamp getLastObservedReadTso() const; private: friend class RegionRaftCommandDelegate; @@ -311,6 +313,7 @@ class Region : public std::enable_shared_from_this UInt64 last_restart_log_applied{0}; mutable std::atomic approx_mem_cache_rows{0}; mutable std::atomic approx_mem_cache_bytes{0}; + mutable std::atomic last_observed_read_tso{0}; }; class RegionRaftCommandDelegate