Skip to content

Commit

Permalink
[yugabyte#15930] DocDB: Fixes data race issue with YBSubTransaction i…
Browse files Browse the repository at this point in the history
…n TableSizeTest_PartitionedTableSize

Summary:
As we allow buffering of operations in YSQL, multiple batches are launched in async mode without completion of the previous batch (only when there is no dependency among these operations). There can be only one outstanding batch executing `PgClientSession::SetupSession`, while there can be many outstanding batches executing `YBTransaction::Impl::Prepare` as part of callback from `LookupByKeyRpc`. All these batches belong to the same subtxn id. This can be confirmed as we wait for result of all previously launched in-flight ops in `PgSession::SetActiveSubTransaction`.

In the current implementation, it leads to data race issues with YBSubTransaction. A previously launched batch is trying to access `highest_subtransaction_id_` during the `Prepare` to populate in-flight ops metadata, while a subsequent batch is trying to set the same field `highest_subtransaction_id_`. Though the writer thread tries to overwrite `highest_subtransaction_id_` to the same old value, this leads to a read-write conflict

To address the data race, we now set subtxn metadata for the batch (batch of ops) by setting it during `Batcher::FlushAsync`. Batcher then launches `YBTransaction::Impl::Prepare` for the underlying transaction, which sets only the transaction metadata.

The diff also addresses an anomaly with `active_sub_transaction_id_` passed from `pg_session`. Postgres assigns subtransaction id(s) starting from 1. but in the existing implementation, we see that `active_sub_transaction_id_` starts from 0 and then bumps up to 2 on savepoint creation (value as seen in the requests at `pg_client_session.cc`). In `client/transaction.cc`, we check if savepoint has been created, and if not, leave the subtxn metadata unpopulated. Down the stream, it is assumed that the subtransaction belonged to id 1 since the subtxn metadata was left empty. To avoid this confusion, we change the default value of `active_sub_transaction_id_` and populate the subtxn metadata pb only when subtxn is not in its default state.

Not enabling the test to run in tsan mode for now, as there are a few more race issues that need to be addressed with the pggate code. For instance, the below stack trace points out an issue. (filed github [[ yugabyte#16390 | issue ]])
```
WARNING: ThreadSanitizer: data race (pid=1415195)
  Read of size 8 at 0x7fb746055e18 by thread T1:
    #0 YBCPgIsYugaByteEnabled /nfusr/dev-server/bkolagani/code/yugabyte-db/build/tsan-clang15-dynamic-ninja/../../src/yb/yql/pggate/ybc_pggate.cc:1368:10 (libyb_pggate.so+0x8f21a)
    #1 IsYugaByteEnabled /nfusr/dev-server/bkolagani/code/yugabyte-db/src/postgres/src/backend/utils/misc/../../../../../../../src/postgres/src/backend/utils/misc/pg_yb_utils.c:177:9 (postgres+0xc77565)
    #2 die /nfusr/dev-server/bkolagani/code/yugabyte-db/src/postgres/src/backend/tcop/../../../../../../src/postgres/src/backend/tcop/postgres.c:2752:6 (postgres+0xa4adc1)
    yugabyte#3 __tsan::CallUserSignalHandler(__tsan::ThreadState*, bool, bool, int, __sanitizer::__sanitizer_siginfo*, void*) /opt/yb-build/llvm/yb-llvm-v15.0.3-yb-1-1667030060-0b8d1183-almalinux8-x86_64-build/src/llvm-project/compiler-rt/lib/tsan/rtl/tsan_interceptors_posix.cpp:2025:5 (postgres+0x415a3a)
...
  Previous write of size 8 at 0x7fb746055e18 by main thread:
    #0 YBCDestroyPgGate /nfusr/dev-server/bkolagani/code/yugabyte-db/build/tsan-clang15-dynamic-ninja/../../src/yb/yql/pggate/ybc_pggate.cc:196:11 (libyb_pggate.so+0x866ae)
    #1 YBOnPostgresBackendShutdown /nfusr/dev-server/bkolagani/code/yugabyte-db/src/postgres/src/backend/utils/misc/../../../../../../../src/postgres/src/backend/utils/misc/pg_yb_utils.c:609:2 (postgres+0xc79003)
    #2 proc_exit /nfusr/dev-server/bkolagani/code/yugabyte-db/src/postgres/src/backend/storage/ipc/../../../../../../../src/postgres/src/backend/storage/ipc/ipc.c:153:3 (postgres+0xa080cc)
```

Test Plan:
Jenkins
```
./yb_build.sh --gtest_filter PgTableSizeTest.PartitionedTableSize
```

Reviewers: esheng, rthallam, pjain, rsami

Reviewed By: rthallam, pjain, rsami

Subscribers: bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D23412
  • Loading branch information
basavaraj29 committed Apr 7, 2023
1 parent 6d97e62 commit 8bcb9b4
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 32 deletions.
11 changes: 5 additions & 6 deletions src/yb/client/async_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ void AsyncRpc::SendRpc() {

std::string AsyncRpc::ToString() const {
const auto& transaction = batcher_->in_flight_ops().metadata.transaction;
const auto subtransaction_opt = batcher_->in_flight_ops().metadata.subtransaction;
const auto subtransaction_pb_opt = batcher_->in_flight_ops().metadata.subtransaction_pb;
return Format("$0(tablet: $1, num_ops: $2, num_attempts: $3, txn: $4, subtxn: $5)",
ops_.front().yb_op->read_only() ? "Read" : "Write",
tablet().tablet_id(), ops_.size(), num_attempts(),
transaction.transaction_id,
subtransaction_opt
? Format("$0", subtransaction_opt->subtransaction_id)
subtransaction_pb_opt
? Format("$0", subtransaction_pb_opt->subtransaction_id())
: "[none]");
}

Expand Down Expand Up @@ -352,9 +352,8 @@ void SetMetadata(const InFlightOpsTransactionMetadata& metadata,
metadata.transaction.TransactionIdToPB(transaction);
}
dest->set_deprecated_may_have_metadata(true);

if (metadata.subtransaction && !metadata.subtransaction->IsDefaultState()) {
metadata.subtransaction->ToPB(dest->mutable_subtransaction());
if (metadata.subtransaction_pb) {
*dest->mutable_subtransaction() = *metadata.subtransaction_pb;
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/yb/client/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ void Batcher::FlushAsync(
// expected by transaction.
if (transaction && !is_within_transaction_retry) {
transaction->batcher_if().ExpectOperations(operations_count);
// Set subtxn metadata for the current batch of ops.
ops_info_.metadata.subtransaction_pb = transaction->GetSubTransactionMetadataPB();
}

ops_queue_.reserve(ops_.size());
Expand Down
2 changes: 1 addition & 1 deletion src/yb/client/batcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct InFlightOpsGroup {

struct InFlightOpsTransactionMetadata {
TransactionMetadata transaction;
boost::optional<SubTransactionMetadata> subtransaction;
boost::optional<SubTransactionMetadataPB> subtransaction_pb;
};

struct InFlightOpsGroupsWithMetadata {
Expand Down
32 changes: 19 additions & 13 deletions src/yb/client/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,15 +418,9 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
SetReadTimeIfNeeded(ops_info->groups.size() > 1 || force_consistent_read);
}

{
ops_info->metadata = {
.transaction = metadata_,
.subtransaction = subtransaction_.active()
? boost::make_optional(subtransaction_.get())
: boost::none,
};
}

// Set the transaction's metadata alone. ops_info->metadata.subtransaction_pb has already been
// set upsteam in Batcher::FlushAsync.
ops_info->metadata.transaction = metadata_;
return true;
}

Expand Down Expand Up @@ -896,13 +890,12 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
}

bool HasSubTransaction(SubTransactionId id) EXCLUDES(mutex_) {
SharedLock<std::shared_mutex> lock(mutex_);
return subtransaction_.active() && subtransaction_.HasSubTransaction(id);
return subtransaction_.HasSubTransaction(id);
}

Status RollbackToSubTransaction(SubTransactionId id, CoarseTimePoint deadline) EXCLUDES(mutex_) {
SCHECK(
subtransaction_.active(), InternalError,
subtransaction_.HasSubTransaction(kMinSubTransactionId + 1), InternalError,
"Attempted to rollback to savepoint before creating any savepoints.");

// A heartbeat should be sent (& waited for) to the txn status tablet(s) as part of a rollback.
Expand Down Expand Up @@ -1033,6 +1026,15 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
VLOG_WITH_PREFIX(2) << "Log prefix tag changed";
}

boost::optional<SubTransactionMetadataPB> GetSubTransactionMetadataPB() const {
if (subtransaction_.IsDefaultState()) {
return boost::none;
}
SubTransactionMetadataPB subtxn_metadata_pb;
subtransaction_.get().ToPB(&subtxn_metadata_pb);
return boost::make_optional(subtxn_metadata_pb);
}

private:
void CompleteConstruction() {
LOG_IF(FATAL, !IsAcceptableAtomicImpl(log_prefix_.tag));
Expand Down Expand Up @@ -1185,7 +1187,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf {
return;
}

if (subtransaction_.active()) {
if (!subtransaction_.IsDefaultState()) {
subtransaction_.get().aborted.ToPB(state.mutable_aborted()->mutable_set());
}

Expand Down Expand Up @@ -2314,5 +2316,9 @@ void YBTransaction::SetLogPrefixTag(const LogPrefixName& name, uint64_t value) {
return impl_->SetLogPrefixTag(name, value);
}

boost::optional<SubTransactionMetadataPB> YBTransaction::GetSubTransactionMetadataPB() const {
return impl_->GetSubTransactionMetadataPB();
}

} // namespace client
} // namespace yb
12 changes: 7 additions & 5 deletions src/yb/client/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,23 +185,25 @@ class YBTransaction : public std::enable_shared_from_this<YBTransaction> {
// sub-transactions).
std::unordered_map<TableId, uint64_t> GetTableMutationCounts() const;

boost::optional<SubTransactionMetadataPB> GetSubTransactionMetadataPB() const;

private:
class Impl;
std::unique_ptr<Impl> impl_;
};

class YBSubTransaction {
public:
bool active() const {
return highest_subtransaction_id_ >= kMinSubTransactionId;
}

void SetActiveSubTransaction(SubTransactionId id);

Status RollbackToSubTransaction(SubTransactionId id);

bool HasSubTransaction(SubTransactionId id) const;

bool IsDefaultState() const {
return sub_txn_.IsDefaultState();
}

const SubTransactionMetadata& get() const;

std::string ToString() const;
Expand All @@ -213,7 +215,7 @@ class YBSubTransaction {

// Tracks the highest observed subtransaction_id. Used during "ROLLBACK TO s" to abort from s to
// the highest live subtransaction_id.
SubTransactionId highest_subtransaction_id_ = 0;
SubTransactionId highest_subtransaction_id_ = kMinSubTransactionId;
};

} // namespace client
Expand Down
2 changes: 2 additions & 0 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,8 @@ class PgClientServiceImpl::Impl {
}

Status DoPerform(PgPerformRequestPB* req, PgPerformResponsePB* resp, rpc::RpcContext* context) {
// GetSession ensures that there is at most one thread running PgClientSession::Perform, for a
// given session. Refer PgClientSessionLocker for details.
return VERIFY_RESULT(GetSession(*req))->Perform(req, resp, context);
}

Expand Down
10 changes: 6 additions & 4 deletions src/yb/tserver/pg_client_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1098,10 +1098,13 @@ PgClientSession::SetupSession(
}

session->SetDeadline(deadline);

if (transaction) {
DCHECK_GE(options.active_sub_transaction_id(), 0);
transaction->SetActiveSubTransaction(options.active_sub_transaction_id());
const auto active_subtxn_id = options.active_sub_transaction_id();
RSTATUS_DCHECK_GE(
active_subtxn_id, kMinSubTransactionId, InvalidArgument,
Format("Expected active_sub_transaction_id ($0) to be greater than ($1)",
active_subtxn_id, kMinSubTransactionId));
transaction->SetActiveSubTransaction(active_subtxn_id);
}

return std::make_pair(sessions_[to_underlying(kind)], used_read_time);
Expand Down Expand Up @@ -1170,7 +1173,6 @@ Status PgClientSession::BeginTransactionIfNecessary(
}
txn->SetPriority(priority);
session->SetTransaction(txn);

return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions src/yb/yql/pggate/pg_txn_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ void PgTxnManager::ResetTxnAndSession() {
isolation_level_ = IsolationLevel::NON_TRANSACTIONAL;
priority_ = 0;
++txn_serial_no_;
active_sub_transaction_id_ = 0;
active_sub_transaction_id_ = kMinSubTransactionId;
enable_follower_reads_ = false;
read_only_ = false;
Expand Down Expand Up @@ -421,7 +421,7 @@ std::string PgTxnManager::TxnStateDebugStr() const {
uint64_t PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options) {
if (!IsDdlMode() && !txn_in_progress_) {
++txn_serial_no_;
active_sub_transaction_id_ = 0;
active_sub_transaction_id_ = kMinSubTransactionId;
}
options->set_isolation(isolation_level_);
options->set_ddl_mode(IsDdlMode());
Expand Down
3 changes: 2 additions & 1 deletion src/yb/yql/pggate/pg_txn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ class PgTxnManager : public RefCountedThreadSafe<PgTxnManager> {
bool txn_in_progress_ = false;
IsolationLevel isolation_level_ = IsolationLevel::NON_TRANSACTIONAL;
uint64_t txn_serial_no_ = 0;
SubTransactionId active_sub_transaction_id_ = 0;
// Postgres assigns subtransaction id(s) starting from 1.
SubTransactionId active_sub_transaction_id_ = kMinSubTransactionId;
bool need_restart_ = false;
bool need_defer_read_point_ = false;
tserver::ReadTimeManipulation read_time_manipulation_ = tserver::ReadTimeManipulation::NONE;
Expand Down

0 comments on commit 8bcb9b4

Please sign in to comment.