Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KVStore: Log when meets any commit_ts < observed max_read_tso (#8991) #9005

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,33 @@ DM::WriteResult DeltaMergeStore::write(const Context & db_context, const DB::Set
{
dedup_ver.insert(v);
}
<<<<<<< HEAD
LOG_DEBUG(log, "Record count: {}, Versions: {}", block.rows(), dedup_ver);
=======

std::unordered_set<Int64> dedup_handles;
auto extra_handle_col = tryGetByColumnId(block, EXTRA_HANDLE_COLUMN_ID);
if (extra_handle_col.column)
{
if (extra_handle_col.type->getTypeId() == TypeIndex::Int64)
{
const auto * extra_handles = toColumnVectorDataPtr<Int64>(extra_handle_col.column);
for (auto h : *extra_handles)
{
dedup_handles.insert(h);
}
}
}

LOG_DEBUG(
log,
"region_id={} applied_index={} record_count={} versions={} handles={}",
applied_status.region_id,
applied_status.applied_index,
block.rows(),
dedup_ver,
dedup_handles);
>>>>>>> 89d1d73883 (KVStore: Log when meets any commit_ts < observed max_read_tso (#8991))
}
const auto bytes = block.bytes();

Expand Down
26 changes: 25 additions & 1 deletion dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,34 @@ std::optional<RegionDataReadInfoList> ReadRegionCommitCache(const RegionPtr & re

RegionDataReadInfoList data_list_read;
data_list_read.reserve(scanner.writeMapSize());
auto read_tso = region->getLastObservedReadTso();
Timestamp min_error_commit_tso = std::numeric_limits<Timestamp>::max();
size_t error_prone_count = 0;
do
{
data_list_read.emplace_back(scanner.next());
// A read index request with read_tso will stop concurrency manager from committing txns with smaller tso by advancing max_ts to at least read_tso.
auto data_read = scanner.next();
// It's a natual fallback when there has not been any read_tso on this region.
if (data_read.commit_ts <= read_tso)
{
error_prone_count++;
min_error_commit_tso = std::min(min_error_commit_tso, data_read.commit_ts);
}
data_list_read.emplace_back(std::move(data_read));
} while (scanner.hasNext());
if unlikely (error_prone_count > 0)
{
LOG_INFO(
DB::Logger::get(),
"Error prone txn commit, tot_count={} error_prone_count={} min_error_commit_tso={} read_tso={} "
"region_id={} applied_index={}",
data_list_read.size(),
error_prone_count,
min_error_commit_tso,
read_tso,
region->id(),
region->appliedIndex());
}
return data_list_read;
}

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/KVStore/Read/LearnerReadWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ void LearnerReadWorker::waitIndex(

const auto & region = regions_snapshot.find(region_to_query.region_id)->second;

region->observeLearnerReadEvent(mvcc_query_info.start_ts);

const auto total_wait_index_elapsed_ms = watch.elapsedMilliseconds();
const auto index_to_wait = batch_read_index_result.find(region_to_query.region_id)->second.read_index();
if (timeout_ms != 0 && total_wait_index_elapsed_ms > timeout_ms)
Expand Down
16 changes: 16 additions & 0 deletions dbms/src/Storages/KVStore/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,20 @@ void Region::mergeDataFrom(const Region & other)
this->data.mergeFrom(other.data);
this->data.orphan_keys_info.mergeFrom(other.data.orphan_keys_info);
}

void Region::observeLearnerReadEvent(Timestamp read_tso) const
{
auto ori = last_observed_read_tso.load();
if (read_tso > ori)
{
// Do not retry if failed, though may lost some update here, however the total read_tso can advance.
last_observed_read_tso.compare_exchange_strong(ori, read_tso);
}
}

Timestamp Region::getLastObservedReadTso() const
{
return last_observed_read_tso.load();
}

} // namespace DB
3 changes: 3 additions & 0 deletions dbms/src/Storages/KVStore/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ class Region : public std::enable_shared_from_this<Region>
// Methods to handle orphan keys under raftstore v2.
void beforePrehandleSnapshot(uint64_t region_id, std::optional<uint64_t> deadline_index);
void afterPrehandleSnapshot(int64_t ongoing);
void observeLearnerReadEvent(Timestamp read_tso) const;
Timestamp getLastObservedReadTso() const;

private:
friend class RegionRaftCommandDelegate;
Expand Down Expand Up @@ -311,6 +313,7 @@ class Region : public std::enable_shared_from_this<Region>
UInt64 last_restart_log_applied{0};
mutable std::atomic<size_t> approx_mem_cache_rows{0};
mutable std::atomic<size_t> approx_mem_cache_bytes{0};
mutable std::atomic<Timestamp> last_observed_read_tso{0};
};

class RegionRaftCommandDelegate
Expand Down