From 7b5cfbc24bd0bf4cbb812506d48a8a2349385207 Mon Sep 17 00:00:00 2001
From: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Date: Fri, 9 Jun 2023 20:50:46 +0800
Subject: [PATCH] Filter lock flag 'F' (#7569) (#7573)

close pingcap/tiflash#7563
---
 .../Storages/Transaction/RegionCFDataBase.cpp | 152 ---------------
 .../Storages/Transaction/TiKVRecordFormat.cpp | 176 ++++++++++++++++++
 .../Storages/Transaction/TiKVRecordFormat.h   |   1 +
 3 files changed, 177 insertions(+), 152 deletions(-)
 create mode 100644 dbms/src/Storages/Transaction/TiKVRecordFormat.cpp

diff --git a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp
index 73477526ea6..089bc0af2dc 100644
--- a/dbms/src/Storages/Transaction/RegionCFDataBase.cpp
+++ b/dbms/src/Storages/Transaction/RegionCFDataBase.cpp
@@ -290,156 +290,4 @@ typename RegionCFDataBase<Trait>::Data & RegionCFDataBase<Trait>::getDataMut()
 template struct RegionCFDataBase<RegionWriteCFDataTrait>;
 template struct RegionCFDataBase<RegionDefaultCFDataTrait>;
 template struct RegionCFDataBase<RegionLockCFDataTrait>;
-
-
-namespace RecordKVFormat
-{
-// https://github.com/tikv/tikv/blob/master/components/txn_types/src/lock.rs
-inline void decodeLockCfValue(DecodedLockCFValue & res)
-{
-    const TiKVValue & value = *res.val;
-    const char * data = value.data();
-    size_t len = value.dataSize();
-
-    kvrpcpb::Op lock_type = kvrpcpb::Op_MIN;
-    switch (readUInt8(data, len))
-    {
-    case LockType::Put:
-        lock_type = kvrpcpb::Op::Put;
-        break;
-    case LockType::Delete:
-        lock_type = kvrpcpb::Op::Del;
-        break;
-    case LockType::Lock:
-        lock_type = kvrpcpb::Op::Lock;
-        break;
-    case LockType::Pessimistic:
-        lock_type = kvrpcpb::Op::PessimisticLock;
-        break;
-    }
-    res.lock_type = lock_type;
-    res.primary_lock = readVarString<std::string_view>(data, len);
-    res.lock_version = readVarUInt(data, len);
-
-    if (len > 0)
-    {
-        res.lock_ttl = readVarUInt(data, len);
-        while (len > 0)
-        {
-            char flag = readUInt8(data, len);
-            switch (flag)
-            {
-            case SHORT_VALUE_PREFIX:
-            {
-                size_t str_len = readUInt8(data, len);
-                if (len < str_len)
-                    throw Exception("content len shorter than short value len", ErrorCodes::LOGICAL_ERROR);
-                // no need short value
-                readRawString<std::nullptr_t>(data, len, str_len);
-                break;
-            };
-            case MIN_COMMIT_TS_PREFIX:
-            {
-                res.min_commit_ts = readUInt64(data, len);
-                break;
-            }
-            case FOR_UPDATE_TS_PREFIX:
-            {
-                res.lock_for_update_ts = readUInt64(data, len);
-                break;
-            }
-            case TXN_SIZE_PREFIX:
-            {
-                res.txn_size = readUInt64(data, len);
-                break;
-            }
-            case ASYNC_COMMIT_PREFIX:
-            {
-                res.use_async_commit = true;
-                const auto * start = data;
-                UInt64 cnt = readVarUInt(data, len);
-                for (UInt64 i = 0; i < cnt; ++i)
-                {
-                    readVarString<std::nullptr_t>(data, len);
-                }
-                const auto * end = data;
-                res.secondaries = {start, static_cast<size_t>(end - start)};
-                break;
-            }
-            case ROLLBACK_TS_PREFIX:
-            {
-                UInt64 cnt = readVarUInt(data, len);
-                for (UInt64 i = 0; i < cnt; ++i)
-                {
-                    readUInt64(data, len);
-                }
-                break;
-            }
-            case LAST_CHANGE_PREFIX:
-            {
-                // Used to accelerate TiKV MVCC scan, useless for TiFlash.
-                UInt64 last_change_ts = readUInt64(data, len);
-                UInt64 versions_to_last_change = readVarUInt(data, len);
-                UNUSED(last_change_ts);
-                UNUSED(versions_to_last_change);
-                break;
-            }
-            case TXN_SOURCE_PREFIX_FOR_LOCK:
-            {
-                // Used for CDC, useless for TiFlash.
-                UInt64 txn_source_prefic = readVarUInt(data, len);
-                UNUSED(txn_source_prefic);
-                break;
-            }
-            default:
-            {
-                std::string msg = std::string("invalid flag ") + flag + " in lock value " + value.toDebugString();
-                throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
-            }
-            }
-        }
-    }
-    if (len != 0)
-        throw Exception("invalid lock value " + value.toDebugString(), ErrorCodes::LOGICAL_ERROR);
-}
-
-DecodedLockCFValue::DecodedLockCFValue(std::shared_ptr<const TiKVKey> key_, std::shared_ptr<const TiKVValue> val_)
-    : key(std::move(key_))
-    , val(std::move(val_))
-{
-    decodeLockCfValue(*this);
-}
-
-void DecodedLockCFValue::intoLockInfo(kvrpcpb::LockInfo & res) const
-{
-    res.set_lock_type(lock_type);
-    res.set_primary_lock(primary_lock.data(), primary_lock.size());
-    res.set_lock_version(lock_version);
-    res.set_lock_ttl(lock_ttl);
-    res.set_min_commit_ts(min_commit_ts);
-    res.set_lock_for_update_ts(lock_for_update_ts);
-    res.set_txn_size(txn_size);
-    res.set_use_async_commit(use_async_commit);
-    res.set_key(decodeTiKVKey(*key));
-
-    if (use_async_commit)
-    {
-        const auto * data = secondaries.data();
-        auto len = secondaries.size();
-        UInt64 cnt = readVarUInt(data, len);
-        for (UInt64 i = 0; i < cnt; ++i)
-        {
-            res.add_secondaries(readVarString<std::string>(data, len));
-        }
-    }
-}
-
-std::unique_ptr<kvrpcpb::LockInfo> DecodedLockCFValue::intoLockInfo() const
-{
-    auto res = std::make_unique<kvrpcpb::LockInfo>();
-    intoLockInfo(*res);
-    return res;
-}
-
-} // namespace RecordKVFormat
 } // namespace DB
diff --git a/dbms/src/Storages/Transaction/TiKVRecordFormat.cpp b/dbms/src/Storages/Transaction/TiKVRecordFormat.cpp
new file mode 100644
index 00000000000..2d5bbaccdeb
--- /dev/null
+++ b/dbms/src/Storages/Transaction/TiKVRecordFormat.cpp
@@ -0,0 +1,176 @@
+// Copyright 2023 PingCAP, Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <Storages/Transaction/RegionCFDataBase.h>
+#include <Storages/Transaction/RegionCFDataTrait.h>
+#include <Storages/Transaction/TiKVRecordFormat.h>
+
+namespace DB
+{
+namespace RecordKVFormat
+{
+// https://github.com/tikv/tikv/blob/master/components/txn_types/src/lock.rs
+inline void decodeLockCfValue(DecodedLockCFValue & res)
+{
+    const TiKVValue & value = *res.val;
+    const char * data = value.data();
+    size_t len = value.dataSize();
+
+    kvrpcpb::Op lock_type = kvrpcpb::Op_MIN;
+    switch (readUInt8(data, len))
+    {
+    case LockType::Put:
+        lock_type = kvrpcpb::Op::Put;
+        break;
+    case LockType::Delete:
+        lock_type = kvrpcpb::Op::Del;
+        break;
+    case LockType::Lock:
+        lock_type = kvrpcpb::Op::Lock;
+        break;
+    case LockType::Pessimistic:
+        lock_type = kvrpcpb::Op::PessimisticLock;
+        break;
+    }
+    res.lock_type = lock_type;
+    res.primary_lock = readVarString<std::string_view>(data, len);
+    res.lock_version = readVarUInt(data, len);
+
+    if (len > 0)
+    {
+        res.lock_ttl = readVarUInt(data, len);
+        while (len > 0)
+        {
+            char flag = readUInt8(data, len);
+            switch (flag)
+            {
+            case SHORT_VALUE_PREFIX:
+            {
+                size_t str_len = readUInt8(data, len);
+                if (len < str_len)
+                    throw Exception("content len shorter than short value len", ErrorCodes::LOGICAL_ERROR);
+                // no need short value
+                readRawString<std::nullptr_t>(data, len, str_len);
+                break;
+            };
+            case MIN_COMMIT_TS_PREFIX:
+            {
+                res.min_commit_ts = readUInt64(data, len);
+                break;
+            }
+            case FOR_UPDATE_TS_PREFIX:
+            {
+                res.lock_for_update_ts = readUInt64(data, len);
+                break;
+            }
+            case TXN_SIZE_PREFIX:
+            {
+                res.txn_size = readUInt64(data, len);
+                break;
+            }
+            case ASYNC_COMMIT_PREFIX:
+            {
+                res.use_async_commit = true;
+                const auto * start = data;
+                UInt64 cnt = readVarUInt(data, len);
+                for (UInt64 i = 0; i < cnt; ++i)
+                {
+                    readVarString<std::nullptr_t>(data, len);
+                }
+                const auto * end = data;
+                res.secondaries = {start, static_cast<size_t>(end - start)};
+                break;
+            }
+            case ROLLBACK_TS_PREFIX:
+            {
+                UInt64 cnt = readVarUInt(data, len);
+                for (UInt64 i = 0; i < cnt; ++i)
+                {
+                    readUInt64(data, len);
+                }
+                break;
+            }
+            case LAST_CHANGE_PREFIX:
+            {
+                // Used to accelerate TiKV MVCC scan, useless for TiFlash.
+                UInt64 last_change_ts = readUInt64(data, len);
+                UInt64 versions_to_last_change = readVarUInt(data, len);
+                UNUSED(last_change_ts);
+                UNUSED(versions_to_last_change);
+                break;
+            }
+            case TXN_SOURCE_PREFIX_FOR_LOCK:
+            {
+                // Used for CDC, useless for TiFlash.
+                UInt64 txn_source_prefic = readVarUInt(data, len);
+                UNUSED(txn_source_prefic);
+                break;
+            }
+            case PESSIMISTIC_LOCK_WITH_CONFLICT_PREFIX:
+            {
+                // https://github.com/pingcap/tidb/issues/43540
+                break;
+            }
+            default:
+            {
+                std::string msg = std::string("invalid flag ") + flag + " in lock value " + value.toDebugString();
+                throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
+            }
+            }
+        }
+    }
+    if (len != 0)
+        throw Exception("invalid lock value " + value.toDebugString(), ErrorCodes::LOGICAL_ERROR);
+}
+
+DecodedLockCFValue::DecodedLockCFValue(std::shared_ptr<const TiKVKey> key_, std::shared_ptr<const TiKVValue> val_)
+    : key(std::move(key_))
+    , val(std::move(val_))
+{
+    decodeLockCfValue(*this);
+}
+
+void DecodedLockCFValue::intoLockInfo(kvrpcpb::LockInfo & res) const
+{
+    res.set_lock_type(lock_type);
+    res.set_primary_lock(primary_lock.data(), primary_lock.size());
+    res.set_lock_version(lock_version);
+    res.set_lock_ttl(lock_ttl);
+    res.set_min_commit_ts(min_commit_ts);
+    res.set_lock_for_update_ts(lock_for_update_ts);
+    res.set_txn_size(txn_size);
+    res.set_use_async_commit(use_async_commit);
+    res.set_key(decodeTiKVKey(*key));
+
+    if (use_async_commit)
+    {
+        const auto * data = secondaries.data();
+        auto len = secondaries.size();
+        UInt64 cnt = readVarUInt(data, len);
+        for (UInt64 i = 0; i < cnt; ++i)
+        {
+            res.add_secondaries(readVarString<std::string>(data, len));
+        }
+    }
+}
+
+std::unique_ptr<kvrpcpb::LockInfo> DecodedLockCFValue::intoLockInfo() const
+{
+    auto res = std::make_unique<kvrpcpb::LockInfo>();
+    intoLockInfo(*res);
+    return res;
+}
+
+} // namespace RecordKVFormat
+} // namespace DB
\ No newline at end of file
diff --git a/dbms/src/Storages/Transaction/TiKVRecordFormat.h b/dbms/src/Storages/Transaction/TiKVRecordFormat.h
index 37dcd77b014..2ef266d5e8d 100644
--- a/dbms/src/Storages/Transaction/TiKVRecordFormat.h
+++ b/dbms/src/Storages/Transaction/TiKVRecordFormat.h
@@ -68,6 +68,7 @@ static const char GC_FENCE_PREFIX = 'F';
 static const char LAST_CHANGE_PREFIX = 'l';
 static const char TXN_SOURCE_PREFIX_FOR_WRITE = 'S';
 static const char TXN_SOURCE_PREFIX_FOR_LOCK = 's';
+static const char PESSIMISTIC_LOCK_WITH_CONFLICT_PREFIX = 'F';
 
 static const size_t SHORT_VALUE_MAX_LEN = 64;