diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTestWithRpcMetric.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTestWithRpcMetric.java index d40e0a947e49..3e5ae83e00f9 100644 --- a/java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTestWithRpcMetric.java +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/BasePgSQLTestWithRpcMetric.java @@ -41,11 +41,14 @@ public int value() { protected static class OperationsCounter { public Map tableWrites = new HashMap<>(); + public Map intentdbSeeks = new HashMap<>(); + public Counter rpc = new Counter(); public OperationsCounter(String... tableNames) { for (String table : tableNames) { tableWrites.put(table, new Counter()); + intentdbSeeks.put(table, new Counter()); } } } @@ -65,6 +68,11 @@ protected OperationsCounter updateCounter(OperationsCounter counter) throws Exce if (writes != null) { writes.update(new Metrics(obj).getCounter("intentsdb_rocksdb_write_self").value); } + + Counter seeks = counter.intentdbSeeks.get(tableName); + if (seeks != null) { + seeks.update(new Metrics(obj).getCounter("intentsdb_rocksdb_number_db_seek").value); + } } } return counter; diff --git a/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgFastpathIntentdbSeeks.java b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgFastpathIntentdbSeeks.java new file mode 100644 index 000000000000..f403a65b96ac --- /dev/null +++ b/java/yb-pgsql/src/test/java/org/yb/pgsql/TestPgFastpathIntentdbSeeks.java @@ -0,0 +1,72 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package org.yb.pgsql; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.yb.YBTestRunner; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.Map; + +import static org.yb.AssertionWrappers.*; + +@RunWith(YBTestRunner.class) +public class TestPgFastpathIntentdbSeeks extends BasePgSQLTestWithRpcMetric { + @Override + protected Map getTServerFlags() { + Map flagMap = super.getTServerFlags(); + flagMap.put("enable_wait_queues", "false"); + flagMap.put("ysql_max_write_restart_attempts", Integer.toString(0)); + // Verbose logging of intentsdb seeks/postgres statements + flagMap.put("vmodule", "docdb=4,conflict_resolution=5"); + flagMap.put("ysql_log_statement", "all"); + return flagMap; + } + + @Test + public void testFastpathIntentdbSeeks() throws Exception { + try (Connection extraConnection = getConnectionBuilder().connect(); + Statement stmt = connection.createStatement(); + Statement extraStmt = extraConnection.createStatement()) { + stmt.execute("SET yb_transaction_priority_lower_bound = 1"); + stmt.execute("CREATE TABLE t(k1 int, k2 int, r1 int, r2 int, v1 int, v2 int, v3 int, v4 int, " + + "PRIMARY KEY((k1,k2)HASH, r1, r2))"); + stmt.execute("INSERT INTO t VALUES (1, 1, 1, 1, 1, 1, 1, 1)"); + stmt.execute("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"); + stmt.execute("UPDATE t SET v1 = 2, v2 = 2, v3 = 2, v4 = 2 WHERE k1 = 1 AND k2 = 1 AND r1 = 1 " + + "AND r2 = 1"); + OperationsCounter counter = updateCounter(new OperationsCounter("t")); + + // This query will trigger a single tablet operation (fastpath), which will seek in the + // intentsdb for transaction conflicts. It will go through conflict resulution, performing + // one seek per doc key component. + runInvalidQuery(extraStmt, "UPDATE t SET v1 = 3, v2 = 3, v3 = 3, v4 = 3 WHERE k1 = 1 AND " + + "k2 = 1 AND r1 = 1 AND r2 = 1", true, + "could not serialize access due to concurrent update", + "conflicts with higher priority transaction"); + updateCounter(counter); + final int seeks = counter.intentdbSeeks.get("t").value(); + + // - Expect four seeks for the columns being updated (kStrongWrite intents) + // - Expect four seeks for the weak intents + // - Two seeks for the range components + // - One seek for the hash component + // - One seek for the tablet/relation + assertEquals(seeks, 8); + stmt.execute("COMMIT"); + } + } +} diff --git a/src/yb/docdb/conflict_resolution.cc b/src/yb/docdb/conflict_resolution.cc index a6b289903701..5d108050a8bc 100644 --- a/src/yb/docdb/conflict_resolution.cc +++ b/src/yb/docdb/conflict_resolution.cc @@ -653,16 +653,16 @@ using IntentTypesContainer = std::map; class IntentProcessor { public: - IntentProcessor(IntentTypesContainer* container, const IntentTypeSet& intent_types) - : container_(*container), - intent_types_(intent_types), - weak_intent_types_(MakeWeak(intent_types_)) + explicit IntentProcessor(IntentTypesContainer* container) + : container_(*container) {} - void Process(dockv::AncestorDocKey ancestor_doc_key, - dockv::FullDocKey full_doc_key, - KeyBytes* intent_key) { - const auto& intent_type_set = ancestor_doc_key ? weak_intent_types_ : intent_types_; + void Process( + dockv::AncestorDocKey ancestor_doc_key, + dockv::FullDocKey full_doc_key, + const KeyBytes* const intent_key, + IntentTypeSet intent_types) { + const auto& intent_type_set = ancestor_doc_key ? MakeWeak(intent_types) : intent_types; auto i = container_.find(intent_key->data()); if (i == container_.end()) { container_.emplace(intent_key->data(), @@ -702,8 +702,6 @@ class IntentProcessor { private: IntentTypesContainer& container_; - const IntentTypeSet intent_types_; - const IntentTypeSet weak_intent_types_; }; class StrongConflictChecker { @@ -948,9 +946,10 @@ class TransactionConflictResolverContext : public ConflictResolverContextBase { buffer.Reserve(kKeyBufferInitialSize); const auto row_mark = GetRowMarkTypeFromPB(write_batch_); IntentTypesContainer container; - IntentProcessor write_processor( - &container, - GetIntentTypeSet(metadata_.isolation, dockv::OperationKind::kWrite, row_mark)); + IntentTypeSet intent_types = + GetIntentTypeSet(metadata_.isolation, dockv::OperationKind::kWrite, row_mark); + + IntentProcessor intent_processor(&container); for (const auto& doc_op : doc_ops()) { paths.clear(); IsolationLevel ignored_isolation_level; @@ -963,9 +962,10 @@ class TransactionConflictResolverContext : public ConflictResolverContextBase { RETURN_NOT_OK(EnumerateIntents( path.as_slice(), /* intent_value */ Slice(), - [&write_processor]( - auto strength, dockv::FullDocKey full_doc_key, auto, auto intent_key, auto) { - write_processor.Process(strength, full_doc_key, intent_key); + [&intent_processor, intent_types]( + auto ancestor_doc_key, dockv::FullDocKey full_doc_key, auto, auto intent_key, + auto) { + intent_processor.Process(ancestor_doc_key, full_doc_key, intent_key, intent_types); return Status::OK(); }, &buffer, @@ -974,15 +974,13 @@ class TransactionConflictResolverContext : public ConflictResolverContextBase { } const auto& pairs = write_batch_.read_pairs(); + intent_types = GetIntentTypeSet(metadata_.isolation, dockv::OperationKind::kRead, row_mark); if (!pairs.empty()) { - IntentProcessor read_processor( - &container, - GetIntentTypeSet(metadata_.isolation, dockv::OperationKind::kRead, row_mark)); RETURN_NOT_OK(EnumerateIntents( pairs, - [&read_processor] ( - auto strength, dockv::FullDocKey full_doc_key, auto, auto intent_key, auto) { - read_processor.Process(strength, full_doc_key, intent_key); + [&intent_processor, intent_types] ( + auto ancestor_doc_key, dockv::FullDocKey full_doc_key, auto, auto intent_key, auto) { + intent_processor.Process(ancestor_doc_key, full_doc_key, intent_key, intent_types); return Status::OK(); }, resolver->partial_range_key_intents())); @@ -1132,14 +1130,8 @@ class OperationConflictResolverContext : public ConflictResolverContextBase { IntentTypeSet intent_types; - dockv::EnumerateIntentsCallback callback = [&intent_types, resolver]( - dockv::AncestorDocKey ancestor_doc_key, dockv::FullDocKey, Slice, - KeyBytes* encoded_key_buffer, dockv::LastKey) { - return resolver->ReadIntentConflicts( - ancestor_doc_key ? MakeWeak(intent_types) : intent_types, - encoded_key_buffer); - }; - + IntentTypesContainer container; + IntentProcessor intent_processor(&container); for (const auto& doc_op : doc_ops()) { doc_paths.clear(); IsolationLevel isolation; @@ -1152,11 +1144,28 @@ class OperationConflictResolverContext : public ConflictResolverContextBase { VLOG_WITH_PREFIX_AND_FUNC(4) << "Doc path: " << SubDocKey::DebugSliceToString(doc_path.as_slice()); RETURN_NOT_OK(EnumerateIntents( - doc_path.as_slice(), Slice(), callback, &encoded_key_buffer, - PartialRangeKeyIntents::kTrue)); + doc_path.as_slice(), + /* intent_value */ Slice(), + [&intent_processor, intent_types]( + auto ancestor_doc_key, dockv::FullDocKey full_doc_key, auto, auto intent_key, + auto) { + intent_processor.Process(ancestor_doc_key, full_doc_key, intent_key, intent_types); + return Status::OK(); + }, + &encoded_key_buffer, + resolver->partial_range_key_intents())); } } + if (container.empty()) { + return Status::OK(); + } + + for (const auto& [key, intent_data] : container) { + encoded_key_buffer.Reset(key.AsSlice()); + RETURN_NOT_OK(resolver->ReadIntentConflicts(intent_data.types, &encoded_key_buffer)); + } + return Status::OK(); }