diff --git a/src/yb/client/transaction.cc b/src/yb/client/transaction.cc index 3e0166db968c..d2be90469a78 100644 --- a/src/yb/client/transaction.cc +++ b/src/yb/client/transaction.cc @@ -102,6 +102,11 @@ DEFINE_test_flag(int32, old_txn_status_abort_delay_ms, 0, DEFINE_test_flag(uint64, override_transaction_priority, 0, "Override priority of transactions if nonzero."); +DEFINE_RUNTIME_bool(disable_heartbeat_send_involved_tablets, false, + "If disabled, do not send involved tablets on heartbeats for pending " + "transactions. This behavior is needed to support fetching old transactions " + "and their involved tablets in order to support yb_lock_status/pg_locks."); + METRIC_DEFINE_counter(server, transaction_promotions, "Number of transactions being promoted to global transactions", yb::MetricUnit::kTransactions, @@ -256,10 +261,10 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { LOG_IF_WITH_PREFIX(DFATAL, !waiters_.empty()) << "Non empty waiters"; const auto threshold = GetAtomicFlag(&FLAGS_txn_slow_op_threshold_ms); const auto print_trace_every_n = GetAtomicFlag(&FLAGS_txn_print_trace_every_n); - const auto now = CoarseMonoClock::Now(); + const auto now = manager_->clock()->Now().GetPhysicalValueMicros(); // start_ is not set if Init is not called - this happens for transactions that get // aborted without doing anything, so we set time_spent to 0 for these transactions. - const auto time_spent = now - (start_ == CoarseTimePoint() ? now : start_); + const auto time_spent = (start_ == 0 ? 0 : now - start_) * 1us; if ((trace_ && trace_->must_print()) || (threshold > 0 && ToMilliseconds(time_spent) > threshold) || (FLAGS_txn_print_trace_on_error && !status_.ok())) { @@ -475,7 +480,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { const std::string* prev_tablet_id = nullptr; for (const auto& op : ops) { const std::string& tablet_id = op.tablet->tablet_id(); - if (op.yb_op->applied() && op.yb_op->should_add_intents(metadata_.isolation)) { + if (op.yb_op->applied() && op.yb_op->should_apply_intents(metadata_.isolation)) { if (prev_tablet_id == nullptr || tablet_id != *prev_tablet_id) { prev_tablet_id = &tablet_id; tablets_[tablet_id].has_metadata = true; @@ -618,12 +623,12 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { REQUIRES(mutex_) { for (auto& group : groups) { auto& first_op = *group.begin; - const auto should_add_intents = first_op.yb_op->should_add_intents(metadata_.isolation); + const auto should_apply_intents = first_op.yb_op->should_apply_intents(metadata_.isolation); const auto& tablet = first_op.tablet; const auto& tablet_id = tablet->tablet_id(); bool has_metadata; - if (initial && should_add_intents) { + if (initial && should_apply_intents) { auto& tablet_state = tablets_[tablet_id]; // TODO(dtxn) Handle skipped writes, i.e. writes that did not write anything (#3220) first_op.batch_idx = tablet_state.num_batches; @@ -1072,7 +1077,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { } else { metadata_.start_time = read_point_.Now(); } - start_ = CoarseMonoClock::Now(); + start_ = manager_->clock()->Now().GetPhysicalValueMicros(); } void SetReadTimeIfNeeded(bool do_it) { @@ -1114,11 +1119,24 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { }); } + struct TabletState { + size_t num_batches = 0; + size_t num_completed_batches = 0; + bool has_metadata = false; + + std::string ToString() const { + return Format("{ num_batches: $0 num_completed_batches: $1 has_metadata: $2 }", + num_batches, num_completed_batches, has_metadata); + } + }; + + typedef std::unordered_map TabletStates; + rpc::RpcCommandPtr PrepareHeartbeatRPC( CoarseTimePoint deadline, const internal::RemoteTabletPtr& status_tablet, TransactionStatus status, UpdateTransactionCallback callback, - std::optional aborted_set_for_rollback_heartbeat = - std::nullopt) { + std::optional aborted_set_for_rollback_heartbeat = std::nullopt, + const TabletStates& tablets_with_locks = {}) { tserver::UpdateTransactionRequestPB req; req.set_tablet_id(status_tablet->tablet_id()); req.set_propagated_hybrid_time(manager_->Now().ToUint64()); @@ -1127,6 +1145,16 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size()); state.set_status(status); + if (start_ > 0) { + state.set_start_time(start_); + + if (!PREDICT_FALSE(FLAGS_disable_heartbeat_send_involved_tablets)) { + for (const auto& [tablet_id, _] : tablets_with_locks) { + state.add_tablets(tablet_id); + } + } + } + if (aborted_set_for_rollback_heartbeat) { VLOG_WITH_PREFIX(4) << "Setting aborted_set_for_rollback_heartbeat: " << aborted_set_for_rollback_heartbeat.value().ToString(); @@ -1697,7 +1725,8 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { rpc = PrepareHeartbeatRPC( CoarseMonoClock::now() + timeout, status_tablet, status, std::bind( - &Impl::HeartbeatDone, this, _1, _2, _3, status, transaction, send_to_new_tablet)); + &Impl::HeartbeatDone, this, _1, _2, _3, status, transaction, send_to_new_tablet), + std::nullopt, tablets_); } auto& handle = send_to_new_tablet ? new_heartbeat_handle_ : heartbeat_handle_; @@ -2081,7 +2110,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { // The trace buffer. scoped_refptr trace_; - CoarseTimePoint start_; + MicrosTime start_; // Manager is created once per service. TransactionManager* const manager_; @@ -2133,19 +2162,6 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { std::unordered_map transaction_status_move_handles_ GUARDED_BY(mutex_); - struct TabletState { - size_t num_batches = 0; - size_t num_completed_batches = 0; - bool has_metadata = false; - - std::string ToString() const { - return Format("{ num_batches: $0 num_completed_batches: $1 has_metadata: $2 }", - num_batches, num_completed_batches, has_metadata); - } - }; - - typedef std::unordered_map TabletStates; - std::shared_mutex mutex_; TabletStates tablets_ GUARDED_BY(mutex_); std::vector waiters_ GUARDED_BY(mutex_); diff --git a/src/yb/client/yb_op.cc b/src/yb/client/yb_op.cc index 7f9082fd5dec..3f83903d6fa2 100644 --- a/src/yb/client/yb_op.cc +++ b/src/yb/client/yb_op.cc @@ -1164,7 +1164,7 @@ Status YBNoOp::Execute(YBClient* client, const dockv::YBPartialRow& key) { return Status::OK(); } -bool YBPgsqlReadOp::should_add_intents(IsolationLevel isolation_level) { +bool YBPgsqlReadOp::should_apply_intents(IsolationLevel isolation_level) { return isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION || IsValidRowMarkType(GetRowMarkTypeFromPB(*request_)); } diff --git a/src/yb/client/yb_op.h b/src/yb/client/yb_op.h index a7c2409b8eb0..8da955cd8748 100644 --- a/src/yb/client/yb_op.h +++ b/src/yb/client/yb_op.h @@ -111,7 +111,7 @@ class YBOperation { return succeeded(); } - virtual bool should_add_intents(IsolationLevel isolation_level) { + virtual bool should_apply_intents(IsolationLevel isolation_level) { return !read_only() || isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION; } @@ -557,7 +557,7 @@ class YBPgsqlReadOp : public YBPgsqlOp { static std::vector MakeColumnSchemasFromColDesc( const google::protobuf::RepeatedPtrField& rscol_descs); - bool should_add_intents(IsolationLevel isolation_level) override; + bool should_apply_intents(IsolationLevel isolation_level) override; void SetUsedReadTime(const ReadHybridTime& used_time); const ReadHybridTime& used_read_time() const { return used_read_time_; } diff --git a/src/yb/tablet/operations.proto b/src/yb/tablet/operations.proto index 2f0f3dc46ada..c73386729d66 100644 --- a/src/yb/tablet/operations.proto +++ b/src/yb/tablet/operations.proto @@ -27,6 +27,7 @@ message TransactionStatePB { optional TransactionStatus status = 2; // tablets has different meaning, depending on status: + // PENDING - list of involved tablets, if FLAGS_disable_heartbeat_send_involved_tablets = false. // COMMITTED - list of involved tablets // APPLYING - single entry, status tablet of this transaction // APPLIED - single entry, tablet that applied this transaction @@ -52,6 +53,9 @@ message TransactionStatePB { optional fixed64 external_hybrid_time = 8; // For xcluster writes, the producer side status tablet for the transaction. optional string external_status_tablet_id = 9; + + // The time at which this transaction started at the client. + optional fixed64 start_time = 10; } message TruncatePB { diff --git a/src/yb/tablet/transaction_coordinator.cc b/src/yb/tablet/transaction_coordinator.cc index 27362f8866de..675ab1a3ec7b 100644 --- a/src/yb/tablet/transaction_coordinator.cc +++ b/src/yb/tablet/transaction_coordinator.cc @@ -14,6 +14,7 @@ // #include "yb/tablet/transaction_coordinator.h" +#include #include #include @@ -212,11 +213,13 @@ class TransactionState { public: explicit TransactionState(TransactionStateContext* context, const TransactionId& id, + MicrosTime first_touch, HybridTime last_touch, const std::string& parent_log_prefix) : context_(*context), id_(id), log_prefix_(BuildLogPrefix(parent_log_prefix, id)), + first_touch_(first_touch), last_touch_(last_touch), aborted_subtxn_info_(std::make_shared()) { } @@ -238,6 +241,14 @@ class TransactionState { return last_touch_; } + MicrosTime first_touch() const { + return first_touch_; + } + + const auto& pending_involved_tablets() const { + return pending_involved_tablets_; + } + // Status of transaction. TransactionStatus status() const { return status_; @@ -680,6 +691,14 @@ class TransactionState { "Transaction in wrong state during heartbeat: $0", TransactionStatus_Name(status_)); } + + // Store pending involved tablets and txn start time in memory. Clear the tablets field if the + // transaction is still PENDING to avoid raft-replicating this additional metadata. + for (const auto& tablet_id : request->request()->tablets()) { + pending_involved_tablets_.insert(tablet_id.ToBuffer()); + } + first_touch_ = request->request()->start_time(); + request->mutable_request()->clear_tablets(); } if (!status.ok()) { @@ -930,6 +949,7 @@ class TransactionState { const TransactionId id_; const std::string log_prefix_; TransactionStatus status_ = TransactionStatus::PENDING; + MicrosTime first_touch_; HybridTime last_touch_; // It should match last_touch_, but it is possible that because of some code errors it // would not be so. To add stability we introduce a separate field for it. @@ -955,6 +975,11 @@ class TransactionState { } }; + // Set of tablets at which this txn has written data or acquired locks, while the transaction is + // still pending. Note that involved_tablets_ is only populated on COMMIT and includes additional + // metadata not known or needed before COMMIT time. + std::unordered_set pending_involved_tablets_; + // Tablets participating in this transaction. std::unordered_map< TabletId, InvolvedTabletState, StringHash, std::equal_to> involved_tablets_; @@ -1093,6 +1118,43 @@ class TransactionCoordinator::Impl : public TransactionStateContext, return Status::OK(); } + Status GetOldTransactions(const tserver::GetOldTransactionsRequestPB* req, + tserver::GetOldTransactionsResponsePB* resp, + CoarseTimePoint deadline) { + auto min_age = req->min_txn_age_ms() * 1ms; + auto now = context_.clock().Now(); + + { + std::unique_lock lock(managed_mutex_); + const auto& index = managed_transactions_.get(); + for (auto it = index.begin(); it != index.end(); ++it) { + if (static_cast(resp->txn_size()) >= req->max_num_txns()) { + break; + } + if (it->status() != TransactionStatus::PENDING || !it->first_touch()) { + continue; + } + + auto age = (now.GetPhysicalValueMicros() - it->first_touch()) * 1us; + if (age <= min_age) { + // Since our iterator is sorted by first_touch, if we encounter a transaction which is too + // new, we can discontinue our scan of active transactions. + break; + } + + auto* resp_txn = resp->add_txn(); + const auto& id = it->id(); + resp_txn->set_transaction_id(id.data(), id.size()); + *resp_txn->mutable_aborted_subtxn_set() = it->GetAbortedSubtxnInfo()->pb(); + + for (const auto& tablet_id : it->pending_involved_tablets()) { + resp_txn->add_tablets(tablet_id); + } + } + } + return Status::OK(); + } + Status GetStatus(const google::protobuf::RepeatedPtrField& transaction_ids, CoarseTimePoint deadline, tserver::GetTransactionStatusResponsePB* response) { @@ -1307,7 +1369,7 @@ class TransactionCoordinator::Impl : public TransactionStateContext, { std::lock_guard lock(managed_mutex_); postponed_leader_actions_.leader_term = data.leader_term; - auto it = GetTransaction(*id, data.state.status(), data.hybrid_time); + auto it = GetTransaction(*id, data.state.status(), data.state.start_time(), data.hybrid_time); if (it == managed_transactions_.end()) { return Status::OK(); } @@ -1427,7 +1489,7 @@ class TransactionCoordinator::Impl : public TransactionStateContext, auto status = HandleTransactionNotFound(*id, state); if (status.ok()) { it = managed_transactions_.emplace( - this, *id, context_.clock().Now(), log_prefix_).first; + this, *id, state.start_time(), context_.clock().Now(), log_prefix_).first; } else { lock.unlock(); status = status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted)); @@ -1504,6 +1566,7 @@ class TransactionCoordinator::Impl : public TransactionStateContext, private: class LastTouchTag; + class FirstTouchTag; class FirstEntryIndexTag; typedef boost::multi_index_container >, + boost::multi_index::ordered_non_unique < + boost::multi_index::tag, + boost::multi_index::const_mem_fun + >, boost::multi_index::ordered_non_unique < boost::multi_index::tag, boost::multi_index::const_mem_funGetStatus(transaction_ids, deadline, response); } +Status TransactionCoordinator::GetOldTransactions( + const tserver::GetOldTransactionsRequestPB* req, tserver::GetOldTransactionsResponsePB* resp, + CoarseTimePoint deadline) { + return impl_->GetOldTransactions(req, resp, deadline); +} + void TransactionCoordinator::Abort(const TransactionId& transaction_id, int64_t term, TransactionAbortCallback callback) { diff --git a/src/yb/tablet/transaction_coordinator.h b/src/yb/tablet/transaction_coordinator.h index 33c64c99d719..b962e85b66ee 100644 --- a/src/yb/tablet/transaction_coordinator.h +++ b/src/yb/tablet/transaction_coordinator.h @@ -146,6 +146,14 @@ class TransactionCoordinator { bool CancelTransactionIfFound( const TransactionId& transaction_id, int64_t term, TransactionAbortCallback callback); + // Populates old transactions and their metadata (involved tablets, active subtransactions) based + // on the arguments in the request. Used by pg_client_service to determine which transactions to + // display in pg_locks/yb_lock_status. + Status GetOldTransactions( + const tserver::GetOldTransactionsRequestPB* req, + tserver::GetOldTransactionsResponsePB* resp, + CoarseTimePoint deadline); + std::string DumpTransactions(); // Returns count of managed transactions. Used in tests. diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index 7a05296c4c73..2448ffdd48a4 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -1163,6 +1163,23 @@ void TabletServiceImpl::GetTransactionStatus(const GetTransactionStatusRequestPB }); } +void TabletServiceImpl::GetOldTransactions(const GetOldTransactionsRequestPB* req, + GetOldTransactionsResponsePB* resp, + rpc::RpcContext context) { + TRACE("GetOldTransactions"); + + PerformAtLeader(req, resp, &context, + [req, resp, &context](const LeaderTabletPeer& tablet_peer) { + auto* transaction_coordinator = tablet_peer.tablet->transaction_coordinator(); + if (!transaction_coordinator) { + return STATUS_FORMAT( + InvalidArgument, "No transaction coordinator at tablet $0", + tablet_peer.peer->tablet_id()); + } + return transaction_coordinator->GetOldTransactions(req, resp, context.GetClientDeadline()); + }); +} + void TabletServiceImpl::GetTransactionStatusAtParticipant( const GetTransactionStatusAtParticipantRequestPB* req, GetTransactionStatusAtParticipantResponsePB* resp, diff --git a/src/yb/tserver/tablet_service.h b/src/yb/tserver/tablet_service.h index c39d65ec9d0b..a76c6373f504 100644 --- a/src/yb/tserver/tablet_service.h +++ b/src/yb/tserver/tablet_service.h @@ -119,6 +119,10 @@ class TabletServiceImpl : public TabletServerServiceIf, public ReadTabletProvide GetTransactionStatusResponsePB* resp, rpc::RpcContext context) override; + void GetOldTransactions(const GetOldTransactionsRequestPB* req, + GetOldTransactionsResponsePB* resp, + rpc::RpcContext context) override; + void GetTransactionStatusAtParticipant(const GetTransactionStatusAtParticipantRequestPB* req, GetTransactionStatusAtParticipantResponsePB* resp, rpc::RpcContext context) override; diff --git a/src/yb/tserver/tserver_service.proto b/src/yb/tserver/tserver_service.proto index 0855e9562abd..1feb3c596ee1 100644 --- a/src/yb/tserver/tserver_service.proto +++ b/src/yb/tserver/tserver_service.proto @@ -66,6 +66,8 @@ service TabletServerService { rpc UpdateTransaction(UpdateTransactionRequestPB) returns (UpdateTransactionResponsePB); // Returns transaction status at coordinator, i.e. PENDING, ABORTED, COMMITTED etc. rpc GetTransactionStatus(GetTransactionStatusRequestPB) returns (GetTransactionStatusResponsePB); + // Returns the oldest transactions (older than a specified age) from a specified status tablet. + rpc GetOldTransactions(GetOldTransactionsRequestPB) returns (GetOldTransactionsResponsePB); // Returns transaction status at participant, i.e. number of replicated batches or whether it was // aborted. rpc GetTransactionStatusAtParticipant(GetTransactionStatusAtParticipantRequestPB) @@ -190,6 +192,7 @@ message UpdateTransactionRequestPB { optional fixed64 propagated_hybrid_time = 3; optional bool is_external = 4; + optional fixed64 start_time = 5; } message UpdateTransactionResponsePB { @@ -227,6 +230,27 @@ message GetTransactionStatusResponsePB { repeated SubtxnSetPB aborted_subtxn_set = 7; } +message GetOldTransactionsRequestPB { + optional bytes tablet_id = 1; + optional uint32 min_txn_age_ms = 2; + optional uint32 max_num_txns = 3; + optional fixed64 propagated_hybrid_time = 4; +} + +message GetOldTransactionsResponsePB { + // Error message, if any. + optional TabletServerErrorPB error = 1; + + message OldTransactionMetadataPB { + optional bytes transaction_id = 1; + repeated bytes tablets = 2; + optional SubtxnSetPB aborted_subtxn_set = 3; + } + repeated OldTransactionMetadataPB txn = 2; + + optional fixed64 propagated_hybrid_time = 3; +} + message UpdateTransactionWaitingForStatusRequestPB { message BlockingTransaction { optional bytes transaction_id = 1; diff --git a/src/yb/yql/pgwrapper/CMakeLists.txt b/src/yb/yql/pgwrapper/CMakeLists.txt index 588d83a96623..168f82e37621 100644 --- a/src/yb/yql/pgwrapper/CMakeLists.txt +++ b/src/yb/yql/pgwrapper/CMakeLists.txt @@ -129,6 +129,7 @@ ADD_YB_TEST(pg_load_balancer-test) ADD_YB_TEST(pg_master_failover-test) ADD_YB_TEST(pg_mini-test) ADD_YB_TEST(pg_namespace-test) +ADD_YB_TEST(pg_old_txn-test) ADD_YB_TEST(pg_on_conflict-test) ADD_YB_TEST(pg_op_buffering-test) ADD_YB_TEST(pg_packed_row-test) diff --git a/src/yb/yql/pgwrapper/pg_old_txn-test.cc b/src/yb/yql/pgwrapper/pg_old_txn-test.cc new file mode 100644 index 000000000000..cd41130a390d --- /dev/null +++ b/src/yb/yql/pgwrapper/pg_old_txn-test.cc @@ -0,0 +1,315 @@ +// Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include +#include + +#include "yb/client/table.h" +#include "yb/client/tablet_server.h" +#include "yb/client/yb_table_name.h" +#include "yb/common/transaction.h" +#include "yb/integration-tests/xcluster_ycql_test_base.h" +#include "yb/master/master_defaults.h" +#include "yb/rpc/rpc_controller.h" + +#include "yb/tserver/mini_tablet_server.h" + +#include "yb/tserver/tablet_server.h" + +#include "yb/tserver/tserver_service.proxy.h" + +#include "yb/util/monotime.h" +#include "yb/yql/pgwrapper/pg_mini_test_base.h" + + +DECLARE_int32(transaction_table_num_tablets); +DECLARE_int32(heartbeat_interval_ms); + +using namespace std::literals; +using std::string; + +namespace yb { +namespace pgwrapper { + +class TransactionInfoValidator { + public: + TransactionInfoValidator(): is_noop_(true) {} + explicit TransactionInfoValidator( + tserver::GetOldTransactionsResponsePB::OldTransactionMetadataPB data): data_(std::move(data)) + {} + + void WithTablets(std::vector tablets = {}) { + if (is_noop_) return; + + std::vector response_tablets; + for (const auto& tablet_id : data_.tablets()) { + response_tablets.push_back(tablet_id); + } + std::sort(tablets.begin(), tablets.end()); + std::sort(response_tablets.begin(), response_tablets.end()); + + auto not_equal = [&]() { + return strings::Substitute( + "Tablets received for txn $0 differ from tablets expected. Received: $1 vs. expected: $2", + data_.transaction_id(), VectorToString(response_tablets), VectorToString(tablets)); + }; + + ASSERT_EQ(tablets.size(), response_tablets.size()) << not_equal(); + + for (size_t i = 0; i < tablets.size(); ++i) { + if (tablets.at(i) != response_tablets.at(i)) { + EXPECT_EQ(tablets.at(i), response_tablets.at(i)) << not_equal(); + } + } + } + + private: + tserver::GetOldTransactionsResponsePB::OldTransactionMetadataPB data_; + bool is_noop_ = false; +}; + +class GetOldTransactionsValidator { + public: + explicit GetOldTransactionsValidator( + tserver::GetOldTransactionsResponsePB resp): resp_(std::move(resp)) {} + + TransactionInfoValidator HasTransaction(const TransactionId& txn_id) const { + TransactionInfoValidator validator; + bool found_txn = false; + for (const auto& txn : resp_.txn()) { + auto resp_txn = CHECK_RESULT(FullyDecodeTransactionId(txn.transaction_id())); + if (resp_txn == txn_id) { + EXPECT_FALSE(found_txn) << "Unexpected duplicate entry for txn"; + found_txn = true; + validator = TransactionInfoValidator(txn); + } + } + + EXPECT_TRUE(found_txn) << "Did not find txn in response"; + return validator; + } + + void DoesNotHaveTransaction(const TransactionId& txn_id) const { + for (const auto& txn : resp_.txn()) { + EXPECT_NE(txn.transaction_id(), txn_id.ToString()); + } + } + + private: + const tserver::GetOldTransactionsResponsePB resp_; +}; + +class PgGetOldTxnsTest : public PgMiniTestBase { + protected: + void SetUp() override { + FLAGS_transaction_table_num_tablets = 1; + PgMiniTestBase::SetUp(); + + ts_proxy_ = std::make_unique( + &client_->proxy_cache(), + HostPort::FromBoundEndpoint(cluster_->mini_tablet_server(0)->bound_rpc_addr())); + } + + size_t NumTabletServers() override { + return 1; + } + + void OverrideMiniClusterOptions(MiniClusterOptions* options) override { + options->transaction_table_num_tablets = 1; + } + + GetOldTransactionsValidator CheckThat(const tserver::GetOldTransactionsResponsePB& resp) { + return GetOldTransactionsValidator(resp); + } + + Result GetSinglularStatusTabletId() { + auto table_name = client::YBTableName( + YQL_DATABASE_CQL, master::kSystemNamespaceName, kGlobalTransactionsTableName); + std::vector tablet_uuids; + RETURN_NOT_OK(client_->GetTablets( + table_name, 1000 /* max_tablets */, &tablet_uuids, nullptr /* ranges */)); + CHECK_EQ(tablet_uuids.size(), 1); + return tablet_uuids.at(0); + } + + Result GetSinglularTabletId(const string& table_name) { + auto tables = VERIFY_RESULT(client_->ListTables(table_name)); + if (tables.size() != 1) { + return STATUS_FORMAT(InvalidArgument, "Unexpected number of tables $0", tables.size()); + } + auto table_id = tables.at(0).table_id(); + if (table_id.empty()) { + return STATUS(InvalidArgument, "Empty table_id"); + } + + google::protobuf::RepeatedPtrField tablets; + RETURN_NOT_OK(client_->GetTabletsFromTableId(table_id, 5000, &tablets)); + if (tablets.size() != 1) { + return STATUS_FORMAT(InvalidArgument, "Unexpected number of tablets $0", tablets.size()); + } + return tablets.begin()->tablet_id(); + } + + Result GetOldTransactions( + uint32_t min_txn_age_ms, uint32_t max_num_txns) { + auto tablet_id = VERIFY_RESULT(GetSinglularStatusTabletId()); + + constexpr int kTimeoutMs = 1000; + tserver::GetOldTransactionsRequestPB req; + tserver::GetOldTransactionsResponsePB resp; + rpc::RpcController rpc; + req.set_tablet_id(tablet_id); + req.set_min_txn_age_ms(min_txn_age_ms); + req.set_max_num_txns(max_num_txns); + rpc.set_timeout(kTimeoutMs * 1ms); + + RETURN_NOT_OK(ts_proxy_->GetOldTransactions(req, &resp, &rpc)); + + return resp; + } + + Result GetSingularTransactionOn(const TabletId& tablet_id) { + auto txns = VERIFY_RESULT(GetOldTransactions(0 /* min_txn_age_ms */, 1000 /* max_num_txns */)); + TransactionId txn_id = TransactionId::Nil(); + for (const auto& txn : txns.txn()) { + for (const auto& tablet : txn.tablets()) { + if (tablet == tablet_id) { + if (!txn_id.IsNil()) { + return STATUS(InvalidArgument, "Found multiple transactions at tablet, expected one."); + } + txn_id = VERIFY_RESULT(FullyDecodeTransactionId(txn.transaction_id())); + } + } + } + if (txn_id.IsNil()) { + return STATUS(NotFound, "Unexpected empty transaction ID"); + } + return txn_id; + } + + struct Session { + PGConn conn; + TabletId first_involved_tablet; + string first_involved_table_name; + TransactionId txn_id = TransactionId::Nil(); + }; + + Result InitSession(const string& table_name) { + auto setup_conn = VERIFY_RESULT(Connect()); + + RETURN_NOT_OK(setup_conn.ExecuteFormat( + "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", table_name)); + RETURN_NOT_OK(setup_conn.ExecuteFormat( + "INSERT INTO $0 SELECT generate_series(1, 10), 0", table_name)); + + Session s { + .conn = VERIFY_RESULT(Connect()), + .first_involved_tablet = VERIFY_RESULT(GetSinglularTabletId(table_name)), + .first_involved_table_name = table_name, + }; + RETURN_NOT_OK(s.conn.StartTransaction(IsolationLevel::SNAPSHOT_ISOLATION)); + + RETURN_NOT_OK(s.conn.FetchFormat("SELECT * FROM $0 WHERE k=1 FOR UPDATE", table_name)); + + SleepFor(2 * FLAGS_heartbeat_interval_ms * 1ms); + + s.txn_id = VERIFY_RESULT(GetSingularTransactionOn(s.first_involved_tablet)); + return s; + } + + private: + std::unique_ptr ts_proxy_; +}; + +TEST_F(PgGetOldTxnsTest, DoesNotReturnNewTransactions) { + auto start = CoarseMonoClock::Now(); + + auto s1 = ASSERT_RESULT(InitSession("foo1")); + auto s2 = ASSERT_RESULT(InitSession("foo2")); + auto s3 = ASSERT_RESULT(InitSession("foo3")); + + auto time_since_start_ms = (uint32_t) ((CoarseMonoClock::Now() - start) / 1ms); + auto min_txn_age_ms = time_since_start_ms * 2; + auto res = ASSERT_RESULT(GetOldTransactions(min_txn_age_ms, 5 /* max_num_txns */)); + CheckThat(res).DoesNotHaveTransaction(s1.txn_id); + CheckThat(res).DoesNotHaveTransaction(s2.txn_id); + CheckThat(res).DoesNotHaveTransaction(s3.txn_id); +} + + +TEST_F(PgGetOldTxnsTest, ReturnsNewTransactions) { + auto s1 = ASSERT_RESULT(InitSession("foo1")); + auto s2 = ASSERT_RESULT(InitSession("foo2")); + auto s3 = ASSERT_RESULT(InitSession("foo3")); + + auto res = ASSERT_RESULT(GetOldTransactions(0 /* min_txn_age_ms */, 5 /* max_num_txns */)); + CheckThat(res).HasTransaction(s1.txn_id) + .WithTablets({s1.first_involved_tablet}); + CheckThat(res).HasTransaction(s2.txn_id) + .WithTablets({s2.first_involved_tablet}); + CheckThat(res).HasTransaction(s3.txn_id) + .WithTablets({s3.first_involved_tablet}); +} + +TEST_F(PgGetOldTxnsTest, SessionAtMultipleTablets) { + auto s1 = ASSERT_RESULT(InitSession("foo1")); + auto s2 = ASSERT_RESULT(InitSession("foo2")); + auto s3 = ASSERT_RESULT(InitSession("foo3")); + + ASSERT_OK(s1.conn.ExecuteFormat("UPDATE $0 SET v=30 WHERE k=12", s2.first_involved_table_name)); + + SleepFor(2 * FLAGS_heartbeat_interval_ms * 1ms); + + auto res = ASSERT_RESULT(GetOldTransactions(0 /* min_txn_age_ms */, 5 /* max_num_txns */)); + CheckThat(res).HasTransaction(s1.txn_id) + .WithTablets({s1.first_involved_tablet, s2.first_involved_tablet}); + CheckThat(res).HasTransaction(s2.txn_id) + .WithTablets({s2.first_involved_tablet}); + CheckThat(res).HasTransaction(s3.txn_id) + .WithTablets({s3.first_involved_tablet}); +} + +TEST_F(PgGetOldTxnsTest, ReturnsOnlyOldEnoughTransactions) { + auto s1 = ASSERT_RESULT(InitSession("foo1")); + auto s2 = ASSERT_RESULT(InitSession("foo2")); + auto s3 = ASSERT_RESULT(InitSession("foo3")); + + SleepFor(2 * FLAGS_heartbeat_interval_ms * 1ms); + auto s4 = ASSERT_RESULT(InitSession("foo4")); + ASSERT_OK(s2.conn.ExecuteFormat("UPDATE $0 SET v=30 WHERE k=15", s4.first_involved_table_name)); + SleepFor(2 * FLAGS_heartbeat_interval_ms * 1ms); + + auto min_txn_age_ms = FLAGS_heartbeat_interval_ms * 5; + auto res = ASSERT_RESULT(GetOldTransactions(min_txn_age_ms, 5 /* max_num_txns */)); + CheckThat(res).HasTransaction(s1.txn_id) + .WithTablets({s1.first_involved_tablet}); + CheckThat(res).HasTransaction(s2.txn_id) + .WithTablets({s2.first_involved_tablet, s4.first_involved_tablet}); + CheckThat(res).HasTransaction(s3.txn_id) + .WithTablets({s3.first_involved_tablet}); + CheckThat(res).DoesNotHaveTransaction(s4.txn_id); + + SleepFor(3 * FLAGS_heartbeat_interval_ms * 1ms); + res = ASSERT_RESULT(GetOldTransactions(min_txn_age_ms, 5 /* max_num_txns */)); + CheckThat(res).HasTransaction(s1.txn_id) + .WithTablets({s1.first_involved_tablet}); + CheckThat(res).HasTransaction(s2.txn_id) + .WithTablets({s2.first_involved_tablet, s4.first_involved_tablet}); + CheckThat(res).HasTransaction(s3.txn_id) + .WithTablets({s3.first_involved_tablet}); + CheckThat(res).HasTransaction(s4.txn_id) + .WithTablets({s4.first_involved_tablet}); +} + +} // namespace pgwrapper +} // namespace yb