Skip to content

Commit

Permalink
KVStore: Log when meets any commit_ts < observed max_read_tso (#8991)
Browse files Browse the repository at this point in the history
ref #8845
  • Loading branch information
CalvinNeo authored Apr 25, 2024
1 parent b77839b commit 89d1d73
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 3 deletions.
20 changes: 18 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -558,13 +558,29 @@ DM::WriteResult DeltaMergeStore::write(
{
dedup_ver.insert(v);
}

std::unordered_set<Int64> dedup_handles;
auto extra_handle_col = tryGetByColumnId(block, EXTRA_HANDLE_COLUMN_ID);
if (extra_handle_col.column)
{
if (extra_handle_col.type->getTypeId() == TypeIndex::Int64)
{
const auto * extra_handles = toColumnVectorDataPtr<Int64>(extra_handle_col.column);
for (auto h : *extra_handles)
{
dedup_handles.insert(h);
}
}
}

LOG_DEBUG(
log,
"region_id={} applied_index={} record_count={} versions={}",
"region_id={} applied_index={} record_count={} versions={} handles={}",
applied_status.region_id,
applied_status.applied_index,
block.rows(),
dedup_ver);
dedup_ver,
dedup_handles);
}
const auto bytes = block.bytes();

Expand Down
26 changes: 25 additions & 1 deletion dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,34 @@ std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & re

RegionDataReadInfoList data_list_read;
data_list_read.reserve(scanner.writeMapSize());
auto read_tso = region->getLastObservedReadTso();
Timestamp min_error_commit_tso = std::numeric_limits<Timestamp>::max();
size_t error_prone_count = 0;
do
{
data_list_read.emplace_back(scanner.next());
// A read index request with read_tso will stop concurrency manager from committing txns with smaller tso by advancing max_ts to at least read_tso.
auto data_read = scanner.next();
// It's a natual fallback when there has not been any read_tso on this region.
if (data_read.commit_ts <= read_tso)
{
error_prone_count++;
min_error_commit_tso = std::min(min_error_commit_tso, data_read.commit_ts);
}
data_list_read.emplace_back(std::move(data_read));
} while (scanner.hasNext());
if unlikely (error_prone_count > 0)
{
LOG_INFO(
DB::Logger::get(),
"Error prone txn commit, tot_count={} error_prone_count={} min_error_commit_tso={} read_tso={} "
"region_id={} applied_index={}",
data_list_read.size(),
error_prone_count,
min_error_commit_tso,
read_tso,
region->id(),
region->appliedIndex());
}
return data_list_read;
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ void LearnerReadWorker::waitIndex(

const auto & region = regions_snapshot.find(region_to_query.region_id)->second;

region->observeLearnerReadEvent(mvcc_query_info.start_ts);

const auto total_wait_index_elapsed_ms = watch.elapsedMilliseconds();
const auto index_to_wait = batch_read_index_result.find(region_to_query.region_id)->second.read_index();
if (timeout_ms != 0 && total_wait_index_elapsed_ms > timeout_ms)
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Storages/KVStore/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,20 @@ void Region::mergeDataFrom(const Region & other)
this->data.mergeFrom(other.data);
this->data.orphan_keys_info.mergeFrom(other.data.orphan_keys_info);
}

void Region::observeLearnerReadEvent(Timestamp read_tso) const
{
auto ori = last_observed_read_tso.load();
if (read_tso > ori)
{
// Do not retry if failed, though may lost some update here, however the total read_tso can advance.
last_observed_read_tso.compare_exchange_strong(ori, read_tso);
}
}

Timestamp Region::getLastObservedReadTso() const
{
return last_observed_read_tso.load();
}

} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Storages/KVStore/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ class Region : public std::enable_shared_from_this<Region>
// Methods to handle orphan keys under raftstore v2.
void beforePrehandleSnapshot(uint64_t region_id, std::optional<uint64_t> deadline_index);
void afterPrehandleSnapshot(int64_t ongoing);
void observeLearnerReadEvent(Timestamp read_tso) const;
Timestamp getLastObservedReadTso() const;

private:
friend class RegionRaftCommandDelegate;
Expand Down Expand Up @@ -311,6 +313,7 @@ class Region : public std::enable_shared_from_this<Region>
UInt64 last_restart_log_applied{0};
mutable std::atomic<size_t> approx_mem_cache_rows{0};
mutable std::atomic<size_t> approx_mem_cache_bytes{0};
mutable std::atomic<Timestamp> last_observed_read_tso{0};
};

class RegionRaftCommandDelegate
Expand Down

0 comments on commit 89d1d73

Please sign in to comment.