Skip to content

Commit

Permalink
KVStore: Enhance o11y of large txn (#8833)
Browse files Browse the repository at this point in the history
ref #8736
  • Loading branch information
CalvinNeo authored Mar 11, 2024
1 parent ee471e9 commit 77051ff
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 23 deletions.
6 changes: 6 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,12 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_default_put, {"type", "default_put"}), \
F(type_write_del, {"type", "write_del"}), \
F(type_lock_del, {"type", "lock_del"}), \
F(type_default_del, {"type", "default_del"}), \
F(type_apply_snapshot, {"type", "apply_snapshot"}), \
F(type_apply_snapshot_default, {"type", "apply_snapshot_default"}), \
F(type_apply_snapshot_write, {"type", "apply_snapshot_write"}), \
F(type_large_txn_lock_put, {"type", "large_txn_lock_put"}), \
F(type_large_txn_lock_del, {"type", "large_txn_lock_del"}), \
F(type_ingest_sst, {"type", "ingest_sst"})) \
M(tiflash_raft_apply_write_command_duration_seconds, \
"Bucketed histogram of applying write command Raft logs", \
Expand Down Expand Up @@ -513,6 +518,7 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
"Raft handled bytes in global", \
Counter, \
F(type_write, {{"type", "write"}}), \
F(type_snapshot_committed, {{"type", "snapshot_committed"}}), \
F(type_write_committed, {{"type", "write_committed"}})) \
M(tiflash_raft_write_flow_bytes, \
"Bucketed histogram of bytes for each write", \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ void SSTFilesToDTFilesOutputStream<ChildStream>::writeSuffix()
.Observe(watch.elapsedSeconds());
// Note that number of keys in different cf will be aggregated into one metrics
GET_METRIC(tiflash_raft_process_keys, type_apply_snapshot).Increment(process_keys.total());
GET_METRIC(tiflash_raft_process_keys, type_apply_snapshot_default).Increment(process_keys.default_cf);
GET_METRIC(tiflash_raft_process_keys, type_apply_snapshot_write).Increment(process_keys.write_cf);
break;
}
case FileConvertJobType::IngestSST:
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/KVStore/Decode/PartitionStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,9 +599,15 @@ Block GenRegionBlockDataWithSchema(

res_block = sortColumnsBySchemaSnap(std::move(res_block), *(schema_snap->column_defines));

auto prev_region_size = region->dataSize();
// Remove committed data
RemoveRegionCommitCache(region, *data_list_read);

auto new_region_size = region->dataSize();
if likely (new_region_size <= prev_region_size)
{
auto committed_bytes = prev_region_size - new_region_size;
GET_METRIC(tiflash_raft_throughput_bytes, type_snapshot_committed).Increment(committed_bytes);
}
return res_block;
}

Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
size_t default_put_key_count = 0;
size_t lock_del_key_count = 0;
size_t write_del_key_count = 0;
// Considering short value embeded in lock cf, it's necessary to record deletion from default cf.
size_t default_del_key_count = 0;
// How many bytes has been written to KVStore(and maybe then been moved to underlying DeltaTree).
// We don't count DEL because it is only used to delete LOCK, which is small and not count in doInsert.
size_t write_size = 0;
Expand All @@ -332,6 +334,7 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
GET_METRIC(tiflash_raft_process_keys, type_default_put).Increment(default_put_key_count);
GET_METRIC(tiflash_raft_process_keys, type_lock_del).Increment(lock_del_key_count);
GET_METRIC(tiflash_raft_process_keys, type_write_del).Increment(write_del_key_count);
GET_METRIC(tiflash_raft_process_keys, type_default_del).Increment(default_del_key_count);
auto after_size = dataSize();
if (after_size > prev_size + RAFT_REGION_BIG_WRITE_THRES)
GET_METRIC(tiflash_raft_write_flow_bytes, type_big_write_to_region).Observe(after_size - prev_size);
Expand Down Expand Up @@ -368,7 +371,7 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
}
try
{
if (is_v2)
if unlikely (is_v2)
{
// There may be orphan default key in a snapshot.
write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::AllowSame);
Expand Down Expand Up @@ -403,6 +406,10 @@ std::pair<EngineStoreApplyRes, DM::WriteResult> Region::handleWriteRaftCmd(
{
lock_del_key_count++;
}
else if (cf == ColumnFamilyType::Default)
{
default_del_key_count++;
}
try
{
doRemove(cf, tikv_key);
Expand Down
18 changes: 18 additions & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashMetrics.h>
#include <Storages/KVStore/MultiRaft/RegionCFDataBase.h>
#include <Storages/KVStore/MultiRaft/RegionCFDataTrait.h>
#include <Storages/KVStore/MultiRaft/RegionData.h>
Expand Down Expand Up @@ -56,13 +57,22 @@ RegionDataRes RegionCFDataBase<RegionLockCFDataTrait>::insert(TiKVKey && key, Ti
UNUSED(mode);
RegionDataRes added_size = calcTiKVKeyValueSize(key, value);
Pair kv_pair = RegionLockCFDataTrait::genKVPair(std::move(key), std::move(value));
const auto & decoded = std::get<2>(kv_pair.second);
bool is_large_txn = decoded->isLargeTxn();
{
auto iter = data.find(kv_pair.first);
if (iter != data.end())
{
added_size -= calcTiKVKeyValueSize(iter->second);
data.erase(iter);
}
else
{
if unlikely (is_large_txn)
{
GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_put).Increment(1);
}
}
}
// according to the process of pessimistic lock, just overwrite.
data.emplace(std::move(kv_pair.first), std::move(kv_pair.second));
Expand Down Expand Up @@ -162,6 +172,14 @@ size_t RegionCFDataBase<Trait>::remove(const Key & key, bool quiet)
return 0;

size_t size = calcTiKVKeyValueSize(value);

if constexpr (std::is_same<Trait, RegionLockCFDataTrait>::value)
{
if unlikely (std::get<2>(value)->isLargeTxn())
{
GET_METRIC(tiflash_raft_process_keys, type_large_txn_lock_del).Increment(1);
}
}
map.erase(it);
return size;
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/KVStore/MultiRaft/RegionCFDataTrait.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h>
#include <Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h>

#include <map>
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/KVStore/Read/ReadIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ std::tuple<WaitIndexStatus, double> Region::waitIndex(
const LoggerPtr & log)
{
fiu_return_on(FailPoints::force_wait_index_timeout, std::make_tuple(WaitIndexStatus::Timeout, 1.0));
if (proxy_helper == nullptr) // just for debug
if unlikely (proxy_helper == nullptr) // just for debug
return {WaitIndexStatus::Finished, 0};

if (meta.checkIndex(index))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 PingCAP, Inc.
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,12 +14,14 @@

#include <Storages/KVStore/MultiRaft/RegionCFDataBase.h>
#include <Storages/KVStore/MultiRaft/RegionCFDataTrait.h>
#include <Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h>
#include <Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h>

namespace DB
{
namespace RecordKVFormat
{

// https://github.com/tikv/tikv/blob/master/components/txn_types/src/lock.rs
inline void decodeLockCfValue(DecodedLockCFValue & res)
{
Expand Down Expand Up @@ -122,6 +124,11 @@ inline void decodeLockCfValue(DecodedLockCFValue & res)
// https://github.com/pingcap/tidb/issues/43540
break;
}
case GENERATION_PREFIX:
{
res.generation = readUInt64(data, len);
break;
}
default:
{
std::string msg = std::string("invalid flag ") + flag + " in lock value " + value.toDebugString();
Expand Down Expand Up @@ -172,5 +179,10 @@ std::unique_ptr<kvrpcpb::LockInfo> DecodedLockCFValue::intoLockInfo() const
return res;
}

bool DecodedLockCFValue::isLargeTxn() const
{
return generation > 0;
}

} // namespace RecordKVFormat
} // namespace DB
46 changes: 46 additions & 0 deletions dbms/src/Storages/KVStore/TiKVHelpers/DecodedLockCFValue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Exception.h>
#include <Core/Types.h>
#include <Storages/KVStore/Decode/DecodedTiKVKeyValue.h>

namespace DB::RecordKVFormat
{

struct DecodedLockCFValue : boost::noncopyable
{
DecodedLockCFValue(std::shared_ptr<const TiKVKey> key_, std::shared_ptr<const TiKVValue> val_);
std::unique_ptr<kvrpcpb::LockInfo> intoLockInfo() const;
void intoLockInfo(kvrpcpb::LockInfo &) const;
bool isLargeTxn() const;

std::shared_ptr<const TiKVKey> key;
std::shared_ptr<const TiKVValue> val;
UInt64 lock_version{0};
UInt64 lock_ttl{0};
UInt64 txn_size{0};
UInt64 lock_for_update_ts{0};
kvrpcpb::Op lock_type{kvrpcpb::Op_MIN};
bool use_async_commit{0};
UInt64 min_commit_ts{0};
std::string_view secondaries;
std::string_view primary_lock;
// For large txn, generation is not zero.
UInt64 generation{0};
};

} // namespace DB::RecordKVFormat
20 changes: 1 addition & 19 deletions dbms/src/Storages/KVStore/TiKVHelpers/TiKVRecordFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static const char LAST_CHANGE_PREFIX = 'l';
static const char TXN_SOURCE_PREFIX_FOR_WRITE = 'S';
static const char TXN_SOURCE_PREFIX_FOR_LOCK = 's';
static const char PESSIMISTIC_LOCK_WITH_CONFLICT_PREFIX = 'F';
static const char GENERATION_PREFIX = 'g';

static const size_t SHORT_VALUE_MAX_LEN = 64;

Expand Down Expand Up @@ -300,25 +301,6 @@ inline TiKVValue encodeLockCfValue(
return TiKVValue(res.releaseStr());
}

struct DecodedLockCFValue : boost::noncopyable
{
DecodedLockCFValue(std::shared_ptr<const TiKVKey> key_, std::shared_ptr<const TiKVValue> val_);
std::unique_ptr<kvrpcpb::LockInfo> intoLockInfo() const;
void intoLockInfo(kvrpcpb::LockInfo &) const;

std::shared_ptr<const TiKVKey> key;
std::shared_ptr<const TiKVValue> val;
UInt64 lock_version{0};
UInt64 lock_ttl{0};
UInt64 txn_size{0};
UInt64 lock_for_update_ts{0};
kvrpcpb::Op lock_type{kvrpcpb::Op_MIN};
bool use_async_commit{0};
UInt64 min_commit_ts{0};
std::string_view secondaries;
std::string_view primary_lock;
};

template <typename R = Int64>
inline R readVarInt(const char *& data, size_t & len)
{
Expand Down

0 comments on commit 77051ff

Please sign in to comment.