Skip to content

Commit

Permalink
[BACKPORT 2024.1.3][#21116] docdb: Persist transaction promotion state
Browse files Browse the repository at this point in the history
Summary:
In the transaction promotion path, we did not properly replicate and save the promotion (an empty batch with the pre-promotion transaction metadata was replicated and written into WALs). This leads to changes made on tablets first touched before/after promotion to be treated as separate transactions with the same id but different status tablets after leader stepdowns and during tablet bootstrap, resulting in data loss. For example, leader stepdown on participant tablets touched before promotion after the transaction on the old status tablet has been aborted will cause changes to be cleaned up, even if the transaction has committed or later commits.

This revision changes the promotion path to send a UpdateTransaction(PROMOTING) with the new status tablet to the participant tablet, which is then replicated and written to WALs. This entirely replaces the old UpdateTransactionStatusLocation RPC calls.

The empty batch write in the UpdateTransactionStatusLocation path was also removed, as it was effectively useless.

**Upgrade/Rollback safety:**
The change to send UpdateTransaction(PROMOTING) is gated by the auto flag `replicate_transaction_promotion` to ensure that we don't send or write the new value until the upgrade has been finalized; the old UpdateTransactionStatusLocation sending code is used until then. The old UpdateTransactionStatusLocation handling code will be left intact until after 2024.2.

Jira: DB-10148

Original commit: 6109c23 / D38718

Test Plan:
Jenkins.

Added new test:
- `./yb_build.sh --cxx-test pgwrapper_geo_transactions_promotion-test --gtest_filter GeoTransactionsPromotionTest.TestParticipantLeaderStepDown -n 100`

Jenkins run with transaction_promotion_use_update_transaction turned off (pre-upgrade case) done on D38793.

Ran ysql/sz.ol.geo.append Jepsen workload with 600s timeout 20x without failures.

Reviewers: sergei

Reviewed By: sergei

Subscribers: svc_phabricator, yql, ybase, rthallam

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D39831
  • Loading branch information
es1024 committed Nov 8, 2024
1 parent 49f2bd4 commit a2a9186
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 138 deletions.
169 changes: 111 additions & 58 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ DEFINE_RUNTIME_bool(auto_promote_nonlocal_transactions_to_global, true,
DEFINE_RUNTIME_bool(log_failed_txn_metadata, false, "Log metadata about failed transactions.");
TAG_FLAG(log_failed_txn_metadata, advanced);

DEFINE_RUNTIME_AUTO_bool(replicate_transaction_promotion, kLocalPersisted, false, true,
"Use UpdateTransaction(PROMOTING) rpc that is replicated across participant peers to announce "
"transaction status location move to participant tablets.");

DEFINE_test_flag(int32, transaction_inject_flushed_delay_ms, 0,
"Inject delay before processing flushed operations by transaction.");

Expand Down Expand Up @@ -1090,6 +1094,11 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
VLOG_WITH_PREFIX(2) << "Log prefix tag changed";
}

bool OldTransactionAborted() const {
auto state = old_status_tablet_state_.load(std::memory_order_acquire);
return state == OldTransactionState::kAborted;
}

private:
void CompleteConstruction() {
LOG_IF(FATAL, !IsAcceptableAtomicImpl(log_prefix_.tag));
Expand Down Expand Up @@ -1452,10 +1461,8 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
if (running_requests_ != 0) {
return;
}
commit_callback = std::move(commit_callback_);
} else {
commit_callback = std::move(commit_callback_);
}
commit_callback = std::move(commit_callback_);
}

VLOG_WITH_PREFIX(4) << "Commit done: " << actual_status;
Expand Down Expand Up @@ -1939,6 +1946,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
case TransactionStatus::APPLIED_IN_ALL_INVOLVED_TABLETS:
case TransactionStatus::ABORTED:
case TransactionStatus::APPLYING:
case TransactionStatus::PROMOTING:
case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS:
case TransactionStatus::IMMEDIATE_CLEANUP:
case TransactionStatus::GRACEFUL_CLEANUP:
Expand Down Expand Up @@ -1989,63 +1997,82 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
TRACE_TO(trace_, __func__);
VLOG_WITH_PREFIX(2) << "DoSendUpdateTransactionStatusLocationRpcs()";

UniqueLock lock(mutex_);
std::vector<TabletId> participant_tablets;
std::function<void(const TabletId&)> start_location_update;
{
UniqueLock lock(mutex_);

if (state_.load(std::memory_order_acquire) == TransactionState::kPromoting) {
VLOG_WITH_PREFIX(1) << "Initial heartbeat to new status tablet has not completed yet, "
<< "skipping UpdateTransactionStatusLocation rpcs for now";
return;
}
if (state_.load(std::memory_order_acquire) == TransactionState::kPromoting) {
VLOG_WITH_PREFIX(1) << "Initial heartbeat to new status tablet has not completed yet, "
<< "skipping UpdateTransactionStatusLocation rpcs for now";
return;
}

if (transaction_status_move_tablets_.empty()) {
auto old_status_tablet = old_status_tablet_;
lock.unlock();
VLOG_WITH_PREFIX(1) << "No participants to send transaction status location updates to";
SendAbortToOldStatusTabletIfNeeded(
TransactionRpcDeadline(), std::move(transaction), old_status_tablet);
return;
}
if (transaction_status_move_tablets_.empty()) {
auto old_status_tablet = old_status_tablet_;
lock.unlock();
VLOG_WITH_PREFIX(1) << "No participants to send transaction status location updates to";
SendAbortToOldStatusTabletIfNeeded(
TransactionRpcDeadline(), std::move(transaction), old_status_tablet);
return;
}

std::vector<TabletId> participant_tablets;
for (const auto& tablet_id : transaction_status_move_tablets_) {
const auto& tablet_state = tablets_.find(tablet_id);
CHECK(tablet_state != tablets_.end());
if (tablet_state->second.num_completed_batches == 0) {
VLOG_WITH_PREFIX(1) << "Tablet " << tablet_id
<< " has no completed batches, skipping UpdateTransactionStatusLocation"
<< " rpc for now";
continue;
for (const auto& tablet_id : transaction_status_move_tablets_) {
const auto& tablet_state = tablets_.find(tablet_id);
CHECK(tablet_state != tablets_.end());
if (tablet_state->second.num_completed_batches == 0) {
VLOG_WITH_PREFIX(1) << "Tablet " << tablet_id
<< " has no completed batches, skipping"
<< " UpdateTransactionStatusLocation rpc for now";
continue;
}
participant_tablets.push_back(tablet_id);
}
participant_tablets.push_back(tablet_id);
}

if (participant_tablets.empty()) {
VLOG_WITH_PREFIX(1) << "No participants ready for transaction status location update yet";
return;
}
if (participant_tablets.empty()) {
VLOG_WITH_PREFIX(1) << "No participants ready for transaction status location update yet";
return;
}

for (const auto& tablet_id : participant_tablets) {
transaction_status_move_tablets_.erase(tablet_id);
}
for (const auto& tablet_id : participant_tablets) {
transaction_status_move_tablets_.erase(tablet_id);
}

tserver::UpdateTransactionStatusLocationRequestPB req;
req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
req.set_new_status_tablet_id(status_tablet_->tablet_id());
if (PREDICT_TRUE(GetAtomicFlag(&FLAGS_replicate_transaction_promotion))) {
tserver::UpdateTransactionRequestPB req;

lock.unlock();
auto& state = *req.mutable_state();
state.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
state.set_status(TransactionStatus::PROMOTING);
state.add_tablets(status_tablet_->tablet_id());
state.set_start_time(start_.load(std::memory_order_acquire));

start_location_update = [&, req](const TabletId& tablet) {
LookupTabletForTransactionStatusLocationUpdate(transaction, id, req, tablet);
};
} else {
tserver::UpdateTransactionStatusLocationRequestPB req;
req.set_transaction_id(metadata_.transaction_id.data(), metadata_.transaction_id.size());
req.set_new_status_tablet_id(status_tablet_->tablet_id());

start_location_update = [&, req](const TabletId& tablet) {
LookupTabletForTransactionStatusLocationUpdate(transaction, id, req, tablet);
};
}
}

for (const auto& participant_tablet : participant_tablets) {
VLOG_WITH_PREFIX(2) << "SendUpdateTransactionStatusLocationRpcs() to tablet "
<< participant_tablet;
LookupTabletForTransactionStatusLocationUpdate(
std::move(transaction), id, req, participant_tablet);
start_location_update(participant_tablet);
}
}

template<typename RequestPB>
void LookupTabletForTransactionStatusLocationUpdate(
std::shared_ptr<YBTransaction> transaction,
const TransactionId& id,
const tserver::UpdateTransactionStatusLocationRequestPB& request_template,
const RequestPB& request_template,
const TabletId& tablet_id) EXCLUDES(mutex_) {
TRACE_TO(trace_, __func__);
manager_->client()->LookupTabletById(
Expand All @@ -2056,7 +2083,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
TransactionRpcDeadline(),
[this, transaction = std::move(transaction), id, request_template, tablet_id](
const Result<internal::RemoteTabletPtr>& result) mutable {
LookupTabletForTransactionStatusLocationUpdateDone(
LookupTabletForTransactionStatusLocationUpdateDone<RequestPB>(
result, std::move(transaction), id, request_template, tablet_id);
},
client::UseCache::kTrue);
Expand All @@ -2068,35 +2095,58 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
return &handle->second;
}

rpc::RpcCommandPtr PrepareUpdateTransactionStatusLocationRpc(
template<typename RequestPB, typename Rpc>
rpc::RpcCommandPtr DoPrepareUpdateTransactionStatusLocationRpc(
std::shared_ptr<YBTransaction> transaction,
const TransactionId& id,
const TabletId &tablet_id, internal::RemoteTabletPtr participant_tablet,
const tserver::UpdateTransactionStatusLocationRequestPB& request_template) {
tserver::UpdateTransactionStatusLocationRequestPB request = request_template;
const TabletId &tablet_id,
internal::RemoteTabletPtr participant_tablet,
const RequestPB& request_template,
Rpc rpc) {
RequestPB request = request_template;
request.set_propagated_hybrid_time(manager_->Now().ToUint64());
request.set_tablet_id(tablet_id);

return UpdateTransactionStatusLocation(
return rpc(
TransactionRpcDeadline(),
participant_tablet.get(),
manager_->client(),
&request,
[this, transaction = std::move(transaction), id, tablet_id, participant_tablet,
request_template](
const Status& status,
const tserver::UpdateTransactionStatusLocationResponsePB& response) mutable {
request_template](const Status& status, auto&&...) mutable {
UpdateTransactionStatusLocationDone(
std::move(transaction), id, tablet_id, participant_tablet, request_template,
status, response);
std::move(transaction), id, tablet_id, participant_tablet, status);
});
}

rpc::RpcCommandPtr PrepareUpdateTransactionStatusLocationRpc(
std::shared_ptr<YBTransaction> transaction,
const TransactionId& id,
const TabletId &tablet_id,
internal::RemoteTabletPtr participant_tablet,
const tserver::UpdateTransactionStatusLocationRequestPB& request_template) {
return DoPrepareUpdateTransactionStatusLocationRpc(
std::move(transaction), id, tablet_id, std::move(participant_tablet), request_template,
UpdateTransactionStatusLocation);
}

rpc::RpcCommandPtr PrepareUpdateTransactionStatusLocationRpc(
std::shared_ptr<YBTransaction> transaction,
const TransactionId& id,
const TabletId &tablet_id,
internal::RemoteTabletPtr participant_tablet,
const tserver::UpdateTransactionRequestPB& request_template) {
return DoPrepareUpdateTransactionStatusLocationRpc(
std::move(transaction), id, tablet_id, std::move(participant_tablet), request_template,
UpdateTransaction);
}

template<typename RequestPB>
void LookupTabletForTransactionStatusLocationUpdateDone(
const Result<internal::RemoteTabletPtr>& result,
std::shared_ptr<YBTransaction> transaction,
const TransactionId& id,
const tserver::UpdateTransactionStatusLocationRequestPB& request_template,
const RequestPB& request_template,
const TabletId& tablet_id) EXCLUDES(mutex_) {
TRACE_TO(trace_, __func__);
VLOG_WITH_PREFIX(1) << "Lookup tablet done: " << yb::ToString(result);
Expand All @@ -2119,10 +2169,9 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
void UpdateTransactionStatusLocationDone(
std::shared_ptr<YBTransaction> transaction,
const TransactionId& id,
const TabletId &tablet_id, internal::RemoteTabletPtr participant_tablet,
const tserver::UpdateTransactionStatusLocationRequestPB& request_template,
const Status& status,
const tserver::UpdateTransactionStatusLocationResponsePB& response) EXCLUDES(mutex_) {
const TabletId &tablet_id,
internal::RemoteTabletPtr participant_tablet,
const Status& status) EXCLUDES(mutex_) {
TRACE_TO(trace_, __func__);
VLOG_WITH_PREFIX(1) << "Transaction status update for participant tablet "
<< tablet_id << ": " << yb::ToString(status);
Expand Down Expand Up @@ -2496,5 +2545,9 @@ void YBTransaction::SetLogPrefixTag(const LogPrefixName& name, uint64_t value) {
return impl_->SetLogPrefixTag(name, value);
}

bool YBTransaction::OldTransactionAborted() const {
return impl_->OldTransactionAborted();
}

} // namespace client
} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/client/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ class YBTransaction : public std::enable_shared_from_this<YBTransaction> {
// sub-transactions).
std::unordered_map<TableId, uint64_t> GetTableMutationCounts() const;

bool OldTransactionAborted() const;

private:
class Impl;
std::unique_ptr<Impl> impl_;
Expand Down
5 changes: 3 additions & 2 deletions src/yb/common/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ enum TransactionStatus {

PROMOTED = 9;

// The APPLYING status is used in Raft in transaction participant tablets but not in status
// tablets.
// The APPLYING and PROMOTING statuses are used in Raft in transaction participant tablets but not
// in status tablets.
APPLYING = 20;
PROMOTING = 24;

// All following entries are not used in RAFT, but as events between status tablet and involved
// tablets:
Expand Down
1 change: 1 addition & 0 deletions src/yb/tablet/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ message TransactionStatePB {
// PENDING - list of involved tablets, if FLAGS_disable_heartbeat_send_involved_tablets = false.
// COMMITTED - list of involved tablets
// APPLYING - single entry, status tablet of this transaction
// PROMOTING - single entry, new status tablet of this transaction
// APPLIED - single entry, tablet that applied this transaction
// Not used is other cases.
repeated bytes tablets = 3;
Expand Down
3 changes: 1 addition & 2 deletions src/yb/tablet/running_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

#include "yb/tablet/running_transaction.h"

#include <glog/logging.h>

#include "yb/client/transaction_rpc.h"

#include "yb/common/hybrid_time.h"
Expand Down Expand Up @@ -635,6 +633,7 @@ const TabletId& RunningTransaction::status_tablet() const {
void RunningTransaction::UpdateTransactionStatusLocation(const TabletId& new_status_tablet) {
metadata_.old_status_tablet = std::move(metadata_.status_tablet);
metadata_.status_tablet = new_status_tablet;
metadata_.locality = TransactionLocality::GLOBAL;
}

void RunningTransaction::UpdateAbortCheckHT(HybridTime now, UpdateAbortCheckHTMode mode) {
Expand Down
11 changes: 7 additions & 4 deletions src/yb/tablet/transaction_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ class TransactionState {
case TransactionStatus::COMMITTED: FALLTHROUGH_INTENDED;
case TransactionStatus::PROMOTED: FALLTHROUGH_INTENDED;
case TransactionStatus::APPLYING: FALLTHROUGH_INTENDED;
case TransactionStatus::PROMOTING: FALLTHROUGH_INTENDED;
case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: FALLTHROUGH_INTENDED;
case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED;
case TransactionStatus::GRACEFUL_CLEANUP:
Expand Down Expand Up @@ -470,6 +471,7 @@ class TransactionState {
case TransactionStatus::CREATED: FALLTHROUGH_INTENDED;
case TransactionStatus::PROMOTED: FALLTHROUGH_INTENDED;
case TransactionStatus::APPLYING: FALLTHROUGH_INTENDED;
case TransactionStatus::PROMOTING: FALLTHROUGH_INTENDED;
case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS: FALLTHROUGH_INTENDED;
case TransactionStatus::IMMEDIATE_CLEANUP: FALLTHROUGH_INTENDED;
case TransactionStatus::GRACEFUL_CLEANUP:
Expand Down Expand Up @@ -672,10 +674,11 @@ class TransactionState {
case TransactionStatus::PROMOTED: FALLTHROUGH_INTENDED;
case TransactionStatus::PENDING:
return PendingReplicationFinished(data);
case TransactionStatus::APPLYING:
// APPLYING is handled separately, because it is received for transactions not managed by
// this tablet as a transaction status tablet, but tablets that are involved in the data
// path (receive write intents) for this transactions
case TransactionStatus::APPLYING: FALLTHROUGH_INTENDED;
case TransactionStatus::PROMOTING:
// APPLYING and PROMOTING handled separately, because it is received for transactions not
// managed by this tablet as a transaction status tablet, but tablets that are involved in
// the data path (receive write intents) for this transactions
FATAL_INVALID_ENUM_VALUE(TransactionStatus, data.state.status());
case TransactionStatus::APPLIED_IN_ONE_OF_INVOLVED_TABLETS:
// APPLIED_IN_ONE_OF_INVOLVED_TABLETS handled w/o use of RAFT log
Expand Down
Loading

0 comments on commit a2a9186

Please sign in to comment.