Skip to content

Commit

Permalink
[#16912] DocDB: Expose GetOldTransactions RPC to fetch long-running t…
Browse files Browse the repository at this point in the history
…ransaction metadata

Summary:
This revision adds a GetOldTransactions endpoint at the tserver. This endpoint allows fetching the oldest transactions at a given status tablet, along with associated metadata such as involved tablets and aborted subtransactions which will be required when fetching lock statuses for these transactions in pg_locks. The RPC returns only those transactions which are older than a specified minimum age, and optionally at most a specified max number of transactions (prioritizing the oldest first).

A few changes needed to be made to support this endpoint:
1. The client needs to report involved tablets to the coordinator during heartbeat even if the transaction is pending. Previously we would only report these tablets on commit. This behavior can be turned off using the new flag `--disable_heartbeat_send_involved_tablets`, which is included in this revision as a precautionary measure
2. We index managed transactions in the coordinator by a new first_touch field, which stores the start time of the transaction reported by the client

Test Plan: `ybd --cxx-test pgwrapper_pg_old_txn-test`

Reviewers: bkolagani, pjain, esheng, sergei

Reviewed By: bkolagani, esheng

Subscribers: bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D25351
  • Loading branch information
robertsami committed Jun 21, 2023
1 parent 3c2b229 commit 320cffb
Show file tree
Hide file tree
Showing 11 changed files with 494 additions and 29 deletions.
62 changes: 39 additions & 23 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ DEFINE_test_flag(int32, old_txn_status_abort_delay_ms, 0,
DEFINE_test_flag(uint64, override_transaction_priority, 0,
"Override priority of transactions if nonzero.");

DEFINE_RUNTIME_bool(disable_heartbeat_send_involved_tablets, false,
"If disabled, do not send involved tablets on heartbeats for pending "
"transactions. This behavior is needed to support fetching old transactions "
"and their involved tablets in order to support yb_lock_status/pg_locks.");

METRIC_DEFINE_counter(server, transaction_promotions,
"Number of transactions being promoted to global transactions",
yb::MetricUnit::kTransactions,
Expand Down Expand Up @@ -256,10 +261,10 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
LOG_IF_WITH_PREFIX(DFATAL, !waiters_.empty()) << "Non empty waiters";
const auto threshold = GetAtomicFlag(&FLAGS_txn_slow_op_threshold_ms);
const auto print_trace_every_n = GetAtomicFlag(&FLAGS_txn_print_trace_every_n);
const auto now = CoarseMonoClock::Now();
const auto now = manager_->clock()->Now().GetPhysicalValueMicros();
// start_ is not set if Init is not called - this happens for transactions that get
// aborted without doing anything, so we set time_spent to 0 for these transactions.
const auto time_spent = now - (start_ == CoarseTimePoint() ? now : start_);
const auto time_spent = (start_ == 0 ? 0 : now - start_) * 1us;
if ((trace_ && trace_->must_print())
|| (threshold > 0 && ToMilliseconds(time_spent) > threshold)
|| (FLAGS_txn_print_trace_on_error && !status_.ok())) {
Expand Down Expand Up @@ -475,7 +480,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
const std::string* prev_tablet_id = nullptr;
for (const auto& op : ops) {
const std::string& tablet_id = op.tablet->tablet_id();
if (op.yb_op->applied() && op.yb_op->should_add_intents(metadata_.isolation)) {
if (op.yb_op->applied() && op.yb_op->should_apply_intents(metadata_.isolation)) {
if (prev_tablet_id == nullptr || tablet_id != *prev_tablet_id) {
prev_tablet_id = &tablet_id;
tablets_[tablet_id].has_metadata = true;
Expand Down Expand Up @@ -618,12 +623,12 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
REQUIRES(mutex_) {
for (auto& group : groups) {
auto& first_op = *group.begin;
const auto should_add_intents = first_op.yb_op->should_add_intents(metadata_.isolation);
const auto should_apply_intents = first_op.yb_op->should_apply_intents(metadata_.isolation);
const auto& tablet = first_op.tablet;
const auto& tablet_id = tablet->tablet_id();

bool has_metadata;
if (initial && should_add_intents) {
if (initial && should_apply_intents) {
auto& tablet_state = tablets_[tablet_id];
// TODO(dtxn) Handle skipped writes, i.e. writes that did not write anything (#3220)
first_op.batch_idx = tablet_state.num_batches;
Expand Down Expand Up @@ -1072,7 +1077,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
} else {
metadata_.start_time = read_point_.Now();
}
start_ = CoarseMonoClock::Now();
start_ = manager_->clock()->Now().GetPhysicalValueMicros();
}

void SetReadTimeIfNeeded(bool do_it) {
Expand Down Expand Up @@ -1114,11 +1119,24 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
});
}

struct TabletState {
size_t num_batches = 0;
size_t num_completed_batches = 0;
bool has_metadata = false;

std::string ToString() const {
return Format("{ num_batches: $0 num_completed_batches: $1 has_metadata: $2 }",
num_batches, num_completed_batches, has_metadata);
}
};

typedef std::unordered_map<TabletId, TabletState> TabletStates;

rpc::RpcCommandPtr PrepareHeartbeatRPC(
CoarseTimePoint deadline, const internal::RemoteTabletPtr& status_tablet,
TransactionStatus status, UpdateTransactionCallback callback,
std::optional<SubtxnSet> aborted_set_for_rollback_heartbeat =
std::nullopt) {
std::optional<SubtxnSet> aborted_set_for_rollback_heartbeat = std::nullopt,
const TabletStates& tablets_with_locks = {}) {
tserver::UpdateTransactionRequestPB req;
req.set_tablet_id(status_tablet->tablet_id());
req.set_propagated_hybrid_time(manager_->Now().ToUint64());
Expand All @@ -1127,6 +1145,16 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
state.set_status(status);

if (start_ > 0) {
state.set_start_time(start_);

if (!PREDICT_FALSE(FLAGS_disable_heartbeat_send_involved_tablets)) {
for (const auto& [tablet_id, _] : tablets_with_locks) {
state.add_tablets(tablet_id);
}
}
}

if (aborted_set_for_rollback_heartbeat) {
VLOG_WITH_PREFIX(4) << "Setting aborted_set_for_rollback_heartbeat: "
<< aborted_set_for_rollback_heartbeat.value().ToString();
Expand Down Expand Up @@ -1697,7 +1725,8 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
rpc = PrepareHeartbeatRPC(
CoarseMonoClock::now() + timeout, status_tablet, status,
std::bind(
&Impl::HeartbeatDone, this, _1, _2, _3, status, transaction, send_to_new_tablet));
&Impl::HeartbeatDone, this, _1, _2, _3, status, transaction, send_to_new_tablet),
std::nullopt, tablets_);
}

auto& handle = send_to_new_tablet ? new_heartbeat_handle_ : heartbeat_handle_;
Expand Down Expand Up @@ -2081,7 +2110,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
// The trace buffer.
scoped_refptr<Trace> trace_;

CoarseTimePoint start_;
MicrosTime start_;

// Manager is created once per service.
TransactionManager* const manager_;
Expand Down Expand Up @@ -2133,19 +2162,6 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
std::unordered_map<TabletId, rpc::Rpcs::Handle>
transaction_status_move_handles_ GUARDED_BY(mutex_);

struct TabletState {
size_t num_batches = 0;
size_t num_completed_batches = 0;
bool has_metadata = false;

std::string ToString() const {
return Format("{ num_batches: $0 num_completed_batches: $1 has_metadata: $2 }",
num_batches, num_completed_batches, has_metadata);
}
};

typedef std::unordered_map<TabletId, TabletState> TabletStates;

std::shared_mutex mutex_;
TabletStates tablets_ GUARDED_BY(mutex_);
std::vector<Waiter> waiters_ GUARDED_BY(mutex_);
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/yb_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ Status YBNoOp::Execute(YBClient* client, const dockv::YBPartialRow& key) {
return Status::OK();
}

bool YBPgsqlReadOp::should_add_intents(IsolationLevel isolation_level) {
bool YBPgsqlReadOp::should_apply_intents(IsolationLevel isolation_level) {
return isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION ||
IsValidRowMarkType(GetRowMarkTypeFromPB(*request_));
}
Expand Down
4 changes: 2 additions & 2 deletions src/yb/client/yb_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class YBOperation {
return succeeded();
}

virtual bool should_add_intents(IsolationLevel isolation_level) {
virtual bool should_apply_intents(IsolationLevel isolation_level) {
return !read_only() || isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION;
}

Expand Down Expand Up @@ -557,7 +557,7 @@ class YBPgsqlReadOp : public YBPgsqlOp {
static std::vector<ColumnSchema> MakeColumnSchemasFromColDesc(
const google::protobuf::RepeatedPtrField<PgsqlRSColDescPB>& rscol_descs);

bool should_add_intents(IsolationLevel isolation_level) override;
bool should_apply_intents(IsolationLevel isolation_level) override;
void SetUsedReadTime(const ReadHybridTime& used_time);
const ReadHybridTime& used_read_time() const { return used_read_time_; }

Expand Down
4 changes: 4 additions & 0 deletions src/yb/tablet/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message TransactionStatePB {
optional TransactionStatus status = 2;

// tablets has different meaning, depending on status:
// PENDING - list of involved tablets, if FLAGS_disable_heartbeat_send_involved_tablets = false.
// COMMITTED - list of involved tablets
// APPLYING - single entry, status tablet of this transaction
// APPLIED - single entry, tablet that applied this transaction
Expand All @@ -52,6 +53,9 @@ message TransactionStatePB {
optional fixed64 external_hybrid_time = 8;
// For xcluster writes, the producer side status tablet for the transaction.
optional string external_status_tablet_id = 9;

// The time at which this transaction started at the client.
optional fixed64 start_time = 10;
}

message TruncatePB {
Expand Down
82 changes: 79 additions & 3 deletions src/yb/tablet/transaction_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//

#include "yb/tablet/transaction_coordinator.h"
#include <time.h>

#include <atomic>
#include <iterator>
Expand Down Expand Up @@ -212,11 +213,13 @@ class TransactionState {
public:
explicit TransactionState(TransactionStateContext* context,
const TransactionId& id,
MicrosTime first_touch,
HybridTime last_touch,
const std::string& parent_log_prefix)
: context_(*context),
id_(id),
log_prefix_(BuildLogPrefix(parent_log_prefix, id)),
first_touch_(first_touch),
last_touch_(last_touch),
aborted_subtxn_info_(std::make_shared<const SubtxnSetAndPB>()) {
}
Expand All @@ -238,6 +241,14 @@ class TransactionState {
return last_touch_;
}

MicrosTime first_touch() const {
return first_touch_;
}

const auto& pending_involved_tablets() const {
return pending_involved_tablets_;
}

// Status of transaction.
TransactionStatus status() const {
return status_;
Expand Down Expand Up @@ -680,6 +691,14 @@ class TransactionState {
"Transaction in wrong state during heartbeat: $0",
TransactionStatus_Name(status_));
}

// Store pending involved tablets and txn start time in memory. Clear the tablets field if the
// transaction is still PENDING to avoid raft-replicating this additional metadata.
for (const auto& tablet_id : request->request()->tablets()) {
pending_involved_tablets_.insert(tablet_id.ToBuffer());
}
first_touch_ = request->request()->start_time();
request->mutable_request()->clear_tablets();
}

if (!status.ok()) {
Expand Down Expand Up @@ -930,6 +949,7 @@ class TransactionState {
const TransactionId id_;
const std::string log_prefix_;
TransactionStatus status_ = TransactionStatus::PENDING;
MicrosTime first_touch_;
HybridTime last_touch_;
// It should match last_touch_, but it is possible that because of some code errors it
// would not be so. To add stability we introduce a separate field for it.
Expand All @@ -955,6 +975,11 @@ class TransactionState {
}
};

// Set of tablets at which this txn has written data or acquired locks, while the transaction is
// still pending. Note that involved_tablets_ is only populated on COMMIT and includes additional
// metadata not known or needed before COMMIT time.
std::unordered_set<TabletId> pending_involved_tablets_;

// Tablets participating in this transaction.
std::unordered_map<
TabletId, InvolvedTabletState, StringHash, std::equal_to<void>> involved_tablets_;
Expand Down Expand Up @@ -1093,6 +1118,43 @@ class TransactionCoordinator::Impl : public TransactionStateContext,
return Status::OK();
}

Status GetOldTransactions(const tserver::GetOldTransactionsRequestPB* req,
tserver::GetOldTransactionsResponsePB* resp,
CoarseTimePoint deadline) {
auto min_age = req->min_txn_age_ms() * 1ms;
auto now = context_.clock().Now();

{
std::unique_lock<std::mutex> lock(managed_mutex_);
const auto& index = managed_transactions_.get<FirstTouchTag>();
for (auto it = index.begin(); it != index.end(); ++it) {
if (static_cast<uint32_t>(resp->txn_size()) >= req->max_num_txns()) {
break;
}
if (it->status() != TransactionStatus::PENDING || !it->first_touch()) {
continue;
}

auto age = (now.GetPhysicalValueMicros() - it->first_touch()) * 1us;
if (age <= min_age) {
// Since our iterator is sorted by first_touch, if we encounter a transaction which is too
// new, we can discontinue our scan of active transactions.
break;
}

auto* resp_txn = resp->add_txn();
const auto& id = it->id();
resp_txn->set_transaction_id(id.data(), id.size());
*resp_txn->mutable_aborted_subtxn_set() = it->GetAbortedSubtxnInfo()->pb();

for (const auto& tablet_id : it->pending_involved_tablets()) {
resp_txn->add_tablets(tablet_id);
}
}
}
return Status::OK();
}

Status GetStatus(const google::protobuf::RepeatedPtrField<std::string>& transaction_ids,
CoarseTimePoint deadline,
tserver::GetTransactionStatusResponsePB* response) {
Expand Down Expand Up @@ -1307,7 +1369,7 @@ class TransactionCoordinator::Impl : public TransactionStateContext,
{
std::lock_guard lock(managed_mutex_);
postponed_leader_actions_.leader_term = data.leader_term;
auto it = GetTransaction(*id, data.state.status(), data.hybrid_time);
auto it = GetTransaction(*id, data.state.status(), data.state.start_time(), data.hybrid_time);
if (it == managed_transactions_.end()) {
return Status::OK();
}
Expand Down Expand Up @@ -1427,7 +1489,7 @@ class TransactionCoordinator::Impl : public TransactionStateContext,
auto status = HandleTransactionNotFound(*id, state);
if (status.ok()) {
it = managed_transactions_.emplace(
this, *id, context_.clock().Now(), log_prefix_).first;
this, *id, state.start_time(), context_.clock().Now(), log_prefix_).first;
} else {
lock.unlock();
status = status.CloneAndAddErrorCode(TransactionError(TransactionErrorCode::kAborted));
Expand Down Expand Up @@ -1504,6 +1566,7 @@ class TransactionCoordinator::Impl : public TransactionStateContext,

private:
class LastTouchTag;
class FirstTouchTag;
class FirstEntryIndexTag;

typedef boost::multi_index_container<TransactionState,
Expand All @@ -1519,6 +1582,12 @@ class TransactionCoordinator::Impl : public TransactionStateContext,
HybridTime,
&TransactionState::last_touch>
>,
boost::multi_index::ordered_non_unique <
boost::multi_index::tag<FirstTouchTag>,
boost::multi_index::const_mem_fun<TransactionState,
MicrosTime,
&TransactionState::first_touch>
>,
boost::multi_index::ordered_non_unique <
boost::multi_index::tag<FirstEntryIndexTag>,
boost::multi_index::const_mem_fun<TransactionState,
Expand Down Expand Up @@ -1659,11 +1728,12 @@ class TransactionCoordinator::Impl : public TransactionStateContext,

ManagedTransactions::iterator GetTransaction(const TransactionId& id,
TransactionStatus status,
MicrosTime start_time,
HybridTime hybrid_time) {
auto it = managed_transactions_.find(id);
if (it == managed_transactions_.end()) {
if (status != TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS) {
it = managed_transactions_.emplace(this, id, hybrid_time, log_prefix_).first;
it = managed_transactions_.emplace(this, id, start_time, hybrid_time, log_prefix_).first;
VLOG_WITH_PREFIX(1) << Format("Added: $0", *it);
}
}
Expand Down Expand Up @@ -1900,6 +1970,12 @@ Status TransactionCoordinator::GetStatus(
return impl_->GetStatus(transaction_ids, deadline, response);
}

Status TransactionCoordinator::GetOldTransactions(
const tserver::GetOldTransactionsRequestPB* req, tserver::GetOldTransactionsResponsePB* resp,
CoarseTimePoint deadline) {
return impl_->GetOldTransactions(req, resp, deadline);
}

void TransactionCoordinator::Abort(const TransactionId& transaction_id,
int64_t term,
TransactionAbortCallback callback) {
Expand Down
8 changes: 8 additions & 0 deletions src/yb/tablet/transaction_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ class TransactionCoordinator {
bool CancelTransactionIfFound(
const TransactionId& transaction_id, int64_t term, TransactionAbortCallback callback);

// Populates old transactions and their metadata (involved tablets, active subtransactions) based
// on the arguments in the request. Used by pg_client_service to determine which transactions to
// display in pg_locks/yb_lock_status.
Status GetOldTransactions(
const tserver::GetOldTransactionsRequestPB* req,
tserver::GetOldTransactionsResponsePB* resp,
CoarseTimePoint deadline);

std::string DumpTransactions();

// Returns count of managed transactions. Used in tests.
Expand Down
Loading

0 comments on commit 320cffb

Please sign in to comment.