From 8bcb9b4087ac482fe1a7899a7897ab87010a16de Mon Sep 17 00:00:00 2001 From: Basava Date: Thu, 6 Apr 2023 19:21:06 +0000 Subject: [PATCH] [#15930] DocDB: Fixes data race issue with YBSubTransaction in TableSizeTest_PartitionedTableSize Summary: As we allow buffering of operations in YSQL, multiple batches are launched in async mode without completion of the previous batch (only when there is no dependency among these operations). There can be only one outstanding batch executing `PgClientSession::SetupSession`, while there can be many outstanding batches executing `YBTransaction::Impl::Prepare` as part of callback from `LookupByKeyRpc`. All these batches belong to the same subtxn id. This can be confirmed as we wait for result of all previously launched in-flight ops in `PgSession::SetActiveSubTransaction`. In the current implementation, it leads to data race issues with YBSubTransaction. A previously launched batch is trying to access `highest_subtransaction_id_` during the `Prepare` to populate in-flight ops metadata, while a subsequent batch is trying to set the same field `highest_subtransaction_id_`. Though the writer thread tries to overwrite `highest_subtransaction_id_` to the same old value, this leads to a read-write conflict To address the data race, we now set subtxn metadata for the batch (batch of ops) by setting it during `Batcher::FlushAsync`. Batcher then launches `YBTransaction::Impl::Prepare` for the underlying transaction, which sets only the transaction metadata. The diff also addresses an anomaly with `active_sub_transaction_id_` passed from `pg_session`. Postgres assigns subtransaction id(s) starting from 1. but in the existing implementation, we see that `active_sub_transaction_id_` starts from 0 and then bumps up to 2 on savepoint creation (value as seen in the requests at `pg_client_session.cc`). In `client/transaction.cc`, we check if savepoint has been created, and if not, leave the subtxn metadata unpopulated. Down the stream, it is assumed that the subtransaction belonged to id 1 since the subtxn metadata was left empty. To avoid this confusion, we change the default value of `active_sub_transaction_id_` and populate the subtxn metadata pb only when subtxn is not in its default state. Not enabling the test to run in tsan mode for now, as there are a few more race issues that need to be addressed with the pggate code. For instance, the below stack trace points out an issue. (filed github [[ https://github.com/yugabyte/yugabyte-db/issues/16390 | issue ]]) ``` WARNING: ThreadSanitizer: data race (pid=1415195) Read of size 8 at 0x7fb746055e18 by thread T1: #0 YBCPgIsYugaByteEnabled /nfusr/dev-server/bkolagani/code/yugabyte-db/build/tsan-clang15-dynamic-ninja/../../src/yb/yql/pggate/ybc_pggate.cc:1368:10 (libyb_pggate.so+0x8f21a) #1 IsYugaByteEnabled /nfusr/dev-server/bkolagani/code/yugabyte-db/src/postgres/src/backend/utils/misc/../../../../../../../src/postgres/src/backend/utils/misc/pg_yb_utils.c:177:9 (postgres+0xc77565) #2 die /nfusr/dev-server/bkolagani/code/yugabyte-db/src/postgres/src/backend/tcop/../../../../../../src/postgres/src/backend/tcop/postgres.c:2752:6 (postgres+0xa4adc1) #3 __tsan::CallUserSignalHandler(__tsan::ThreadState*, bool, bool, int, __sanitizer::__sanitizer_siginfo*, void*) /opt/yb-build/llvm/yb-llvm-v15.0.3-yb-1-1667030060-0b8d1183-almalinux8-x86_64-build/src/llvm-project/compiler-rt/lib/tsan/rtl/tsan_interceptors_posix.cpp:2025:5 (postgres+0x415a3a) ... Previous write of size 8 at 0x7fb746055e18 by main thread: #0 YBCDestroyPgGate /nfusr/dev-server/bkolagani/code/yugabyte-db/build/tsan-clang15-dynamic-ninja/../../src/yb/yql/pggate/ybc_pggate.cc:196:11 (libyb_pggate.so+0x866ae) #1 YBOnPostgresBackendShutdown /nfusr/dev-server/bkolagani/code/yugabyte-db/src/postgres/src/backend/utils/misc/../../../../../../../src/postgres/src/backend/utils/misc/pg_yb_utils.c:609:2 (postgres+0xc79003) #2 proc_exit /nfusr/dev-server/bkolagani/code/yugabyte-db/src/postgres/src/backend/storage/ipc/../../../../../../../src/postgres/src/backend/storage/ipc/ipc.c:153:3 (postgres+0xa080cc) ``` Test Plan: Jenkins ``` ./yb_build.sh --gtest_filter PgTableSizeTest.PartitionedTableSize ``` Reviewers: esheng, rthallam, pjain, rsami Reviewed By: rthallam, pjain, rsami Subscribers: bogdan, ybase Differential Revision: https://phabricator.dev.yugabyte.com/D23412 --- src/yb/client/async_rpc.cc | 11 +++++----- src/yb/client/batcher.cc | 2 ++ src/yb/client/batcher.h | 2 +- src/yb/client/transaction.cc | 32 +++++++++++++++++------------ src/yb/client/transaction.h | 12 ++++++----- src/yb/tserver/pg_client_service.cc | 2 ++ src/yb/tserver/pg_client_session.cc | 10 +++++---- src/yb/yql/pggate/pg_txn_manager.cc | 4 ++-- src/yb/yql/pggate/pg_txn_manager.h | 3 ++- 9 files changed, 46 insertions(+), 32 deletions(-) diff --git a/src/yb/client/async_rpc.cc b/src/yb/client/async_rpc.cc index 588f723c225c..9598ab963556 100644 --- a/src/yb/client/async_rpc.cc +++ b/src/yb/client/async_rpc.cc @@ -227,13 +227,13 @@ void AsyncRpc::SendRpc() { std::string AsyncRpc::ToString() const { const auto& transaction = batcher_->in_flight_ops().metadata.transaction; - const auto subtransaction_opt = batcher_->in_flight_ops().metadata.subtransaction; + const auto subtransaction_pb_opt = batcher_->in_flight_ops().metadata.subtransaction_pb; return Format("$0(tablet: $1, num_ops: $2, num_attempts: $3, txn: $4, subtxn: $5)", ops_.front().yb_op->read_only() ? "Read" : "Write", tablet().tablet_id(), ops_.size(), num_attempts(), transaction.transaction_id, - subtransaction_opt - ? Format("$0", subtransaction_opt->subtransaction_id) + subtransaction_pb_opt + ? Format("$0", subtransaction_pb_opt->subtransaction_id()) : "[none]"); } @@ -352,9 +352,8 @@ void SetMetadata(const InFlightOpsTransactionMetadata& metadata, metadata.transaction.TransactionIdToPB(transaction); } dest->set_deprecated_may_have_metadata(true); - - if (metadata.subtransaction && !metadata.subtransaction->IsDefaultState()) { - metadata.subtransaction->ToPB(dest->mutable_subtransaction()); + if (metadata.subtransaction_pb) { + *dest->mutable_subtransaction() = *metadata.subtransaction_pb; } } diff --git a/src/yb/client/batcher.cc b/src/yb/client/batcher.cc index 3e64cc6d0e85..7b8fbaf1b236 100644 --- a/src/yb/client/batcher.cc +++ b/src/yb/client/batcher.cc @@ -237,6 +237,8 @@ void Batcher::FlushAsync( // expected by transaction. if (transaction && !is_within_transaction_retry) { transaction->batcher_if().ExpectOperations(operations_count); + // Set subtxn metadata for the current batch of ops. + ops_info_.metadata.subtransaction_pb = transaction->GetSubTransactionMetadataPB(); } ops_queue_.reserve(ops_.size()); diff --git a/src/yb/client/batcher.h b/src/yb/client/batcher.h index 66ec2f195228..da8e3fa95919 100644 --- a/src/yb/client/batcher.h +++ b/src/yb/client/batcher.h @@ -69,7 +69,7 @@ struct InFlightOpsGroup { struct InFlightOpsTransactionMetadata { TransactionMetadata transaction; - boost::optional subtransaction; + boost::optional subtransaction_pb; }; struct InFlightOpsGroupsWithMetadata { diff --git a/src/yb/client/transaction.cc b/src/yb/client/transaction.cc index 1b6adbbfb66d..68de9e4681eb 100644 --- a/src/yb/client/transaction.cc +++ b/src/yb/client/transaction.cc @@ -418,15 +418,9 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { SetReadTimeIfNeeded(ops_info->groups.size() > 1 || force_consistent_read); } - { - ops_info->metadata = { - .transaction = metadata_, - .subtransaction = subtransaction_.active() - ? boost::make_optional(subtransaction_.get()) - : boost::none, - }; - } - + // Set the transaction's metadata alone. ops_info->metadata.subtransaction_pb has already been + // set upsteam in Batcher::FlushAsync. + ops_info->metadata.transaction = metadata_; return true; } @@ -896,13 +890,12 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { } bool HasSubTransaction(SubTransactionId id) EXCLUDES(mutex_) { - SharedLock lock(mutex_); - return subtransaction_.active() && subtransaction_.HasSubTransaction(id); + return subtransaction_.HasSubTransaction(id); } Status RollbackToSubTransaction(SubTransactionId id, CoarseTimePoint deadline) EXCLUDES(mutex_) { SCHECK( - subtransaction_.active(), InternalError, + subtransaction_.HasSubTransaction(kMinSubTransactionId + 1), InternalError, "Attempted to rollback to savepoint before creating any savepoints."); // A heartbeat should be sent (& waited for) to the txn status tablet(s) as part of a rollback. @@ -1033,6 +1026,15 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { VLOG_WITH_PREFIX(2) << "Log prefix tag changed"; } + boost::optional GetSubTransactionMetadataPB() const { + if (subtransaction_.IsDefaultState()) { + return boost::none; + } + SubTransactionMetadataPB subtxn_metadata_pb; + subtransaction_.get().ToPB(&subtxn_metadata_pb); + return boost::make_optional(subtxn_metadata_pb); + } + private: void CompleteConstruction() { LOG_IF(FATAL, !IsAcceptableAtomicImpl(log_prefix_.tag)); @@ -1185,7 +1187,7 @@ class YBTransaction::Impl final : public internal::TxnBatcherIf { return; } - if (subtransaction_.active()) { + if (!subtransaction_.IsDefaultState()) { subtransaction_.get().aborted.ToPB(state.mutable_aborted()->mutable_set()); } @@ -2314,5 +2316,9 @@ void YBTransaction::SetLogPrefixTag(const LogPrefixName& name, uint64_t value) { return impl_->SetLogPrefixTag(name, value); } +boost::optional YBTransaction::GetSubTransactionMetadataPB() const { + return impl_->GetSubTransactionMetadataPB(); +} + } // namespace client } // namespace yb diff --git a/src/yb/client/transaction.h b/src/yb/client/transaction.h index 5d2443c44ea9..07a1dbb45649 100644 --- a/src/yb/client/transaction.h +++ b/src/yb/client/transaction.h @@ -185,6 +185,8 @@ class YBTransaction : public std::enable_shared_from_this { // sub-transactions). std::unordered_map GetTableMutationCounts() const; + boost::optional GetSubTransactionMetadataPB() const; + private: class Impl; std::unique_ptr impl_; @@ -192,16 +194,16 @@ class YBTransaction : public std::enable_shared_from_this { class YBSubTransaction { public: - bool active() const { - return highest_subtransaction_id_ >= kMinSubTransactionId; - } - void SetActiveSubTransaction(SubTransactionId id); Status RollbackToSubTransaction(SubTransactionId id); bool HasSubTransaction(SubTransactionId id) const; + bool IsDefaultState() const { + return sub_txn_.IsDefaultState(); + } + const SubTransactionMetadata& get() const; std::string ToString() const; @@ -213,7 +215,7 @@ class YBSubTransaction { // Tracks the highest observed subtransaction_id. Used during "ROLLBACK TO s" to abort from s to // the highest live subtransaction_id. - SubTransactionId highest_subtransaction_id_ = 0; + SubTransactionId highest_subtransaction_id_ = kMinSubTransactionId; }; } // namespace client diff --git a/src/yb/tserver/pg_client_service.cc b/src/yb/tserver/pg_client_service.cc index 5978377d64c4..6c8025512c6b 100644 --- a/src/yb/tserver/pg_client_service.cc +++ b/src/yb/tserver/pg_client_service.cc @@ -562,6 +562,8 @@ class PgClientServiceImpl::Impl { } Status DoPerform(PgPerformRequestPB* req, PgPerformResponsePB* resp, rpc::RpcContext* context) { + // GetSession ensures that there is at most one thread running PgClientSession::Perform, for a + // given session. Refer PgClientSessionLocker for details. return VERIFY_RESULT(GetSession(*req))->Perform(req, resp, context); } diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 947780b84e0c..1bbb303caa55 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -1098,10 +1098,13 @@ PgClientSession::SetupSession( } session->SetDeadline(deadline); - if (transaction) { - DCHECK_GE(options.active_sub_transaction_id(), 0); - transaction->SetActiveSubTransaction(options.active_sub_transaction_id()); + const auto active_subtxn_id = options.active_sub_transaction_id(); + RSTATUS_DCHECK_GE( + active_subtxn_id, kMinSubTransactionId, InvalidArgument, + Format("Expected active_sub_transaction_id ($0) to be greater than ($1)", + active_subtxn_id, kMinSubTransactionId)); + transaction->SetActiveSubTransaction(active_subtxn_id); } return std::make_pair(sessions_[to_underlying(kind)], used_read_time); @@ -1170,7 +1173,6 @@ Status PgClientSession::BeginTransactionIfNecessary( } txn->SetPriority(priority); session->SetTransaction(txn); - return Status::OK(); } diff --git a/src/yb/yql/pggate/pg_txn_manager.cc b/src/yb/yql/pggate/pg_txn_manager.cc index 06cb30124783..b7917be8967e 100644 --- a/src/yb/yql/pggate/pg_txn_manager.cc +++ b/src/yb/yql/pggate/pg_txn_manager.cc @@ -364,7 +364,7 @@ void PgTxnManager::ResetTxnAndSession() { isolation_level_ = IsolationLevel::NON_TRANSACTIONAL; priority_ = 0; ++txn_serial_no_; - active_sub_transaction_id_ = 0; + active_sub_transaction_id_ = kMinSubTransactionId; enable_follower_reads_ = false; read_only_ = false; @@ -421,7 +421,7 @@ std::string PgTxnManager::TxnStateDebugStr() const { uint64_t PgTxnManager::SetupPerformOptions(tserver::PgPerformOptionsPB* options) { if (!IsDdlMode() && !txn_in_progress_) { ++txn_serial_no_; - active_sub_transaction_id_ = 0; + active_sub_transaction_id_ = kMinSubTransactionId; } options->set_isolation(isolation_level_); options->set_ddl_mode(IsDdlMode()); diff --git a/src/yb/yql/pggate/pg_txn_manager.h b/src/yb/yql/pggate/pg_txn_manager.h index 0aa22681919e..5e0f2bdef668 100644 --- a/src/yb/yql/pggate/pg_txn_manager.h +++ b/src/yb/yql/pggate/pg_txn_manager.h @@ -103,7 +103,8 @@ class PgTxnManager : public RefCountedThreadSafe { bool txn_in_progress_ = false; IsolationLevel isolation_level_ = IsolationLevel::NON_TRANSACTIONAL; uint64_t txn_serial_no_ = 0; - SubTransactionId active_sub_transaction_id_ = 0; + // Postgres assigns subtransaction id(s) starting from 1. + SubTransactionId active_sub_transaction_id_ = kMinSubTransactionId; bool need_restart_ = false; bool need_defer_read_point_ = false; tserver::ReadTimeManipulation read_time_manipulation_ = tserver::ReadTimeManipulation::NONE;