From bd4874b0013187c0645b937267456edbfdc05248 Mon Sep 17 00:00:00 2001
From: Minghui Yang <myang@yugabyte.com>
Date: Wed, 7 Aug 2024 01:34:55 +0000
Subject: [PATCH] [#13358] YSQL: Fix DDL atomicity stress test failure in tsan
 build

Summary:
The DDL atomicity stress tests failed more on pg15 branch with an error like:

```
WARNING: ThreadSanitizer: data race (pid=180911)
  Write of size 8 at 0x7b2c000257b8 by thread T17 (mutexes: write M0):
    #0 profile_open_file prof_file.c (libkrb5.so.3+0xf45b3)
    #1 profile_init_flags <null> (libkrb5.so.3+0xfb056)
    #2 k5_os_init_context <null> (libkrb5.so.3+0xe5546)
    #3 krb5_init_context_profile <null> (libkrb5.so.3+0xabc90)
    #4 krb5_init_context <null> (libkrb5.so.3+0xabbd5)
    #5 krb5_gss_init_context init_sec_context.c (libgssapi_krb5.so.2+0x448da)
    #6 acquire_cred_from acquire_cred.c (libgssapi_krb5.so.2+0x39159)
    #7 krb5_gss_acquire_cred_from acquire_cred.c (libgssapi_krb5.so.2+0x39072)
    #8 gss_add_cred_from <null> (libgssapi_krb5.so.2+0x1fcd3)
    #9 gss_acquire_cred_from <null> (libgssapi_krb5.so.2+0x1f69d)
    #10 gss_acquire_cred <null> (libgssapi_krb5.so.2+0x1f431)
    #11 pg_GSS_have_cred_cache ${YB_SRC_ROOT}/src/postgres/src/interfaces/libpq/../../../../../../src/postgres/src/interfaces/libpq/fe-gssapi-common.c:68:10 (libpq.so.5+0x543fe)
    #12 PQconnectPoll ${YB_SRC_ROOT}/src/postgres/src/interfaces/libpq/../../../../../../src/postgres/src/interfaces/libpq/fe-connect.c:2909:22 (libpq.so.5+0x359ca)
    #13 connectDBComplete ${YB_SRC_ROOT}/src/postgres/src/interfaces/libpq/../../../../../../src/postgres/src/interfaces/libpq/fe-connect.c:2241:10 (libpq.so.5+0x30807)
    #14 PQconnectdb ${YB_SRC_ROOT}/src/postgres/src/interfaces/libpq/../../../../../../src/postgres/src/interfaces/libpq/fe-connect.c:719:10 (libpq.so.5+0x30af1)
    #15 yb::pgwrapper::PGConn::Connect(string const&, std::chrono::time_point<yb::CoarseMonoClock, std::chrono::duration<long long, std::ratio<1l, 1000000000l>>>, bool, string const&) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_utils.cc:348:24 (libpq_utils.so+0x13c5b)
    #16 yb::pgwrapper::PGConn::Connect(string const&, bool, string const&) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_utils.h:254:12 (libpq_utils.so+0x1a77e)
    #17 yb::pgwrapper::PGConnBuilder::Connect(bool) const ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_utils.cc:743:10 (libpq_utils.so+0x1a77e)
    #18 yb::pgwrapper::LibPqTestBase::ConnectToDBAsUser(string const&, string const&, bool) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_test_base.cc:54:6 (libpg_wrapper_test_base.so+0x26f34)
    #19 yb::pgwrapper::LibPqTestBase::ConnectToDB(string const&, bool) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_test_base.cc:44:10 (libpg_wrapper_test_base.so+0x26b1e)
    #20 yb::pgwrapper::LibPqTestBase::Connect(bool) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_test_base.cc:40:10 (libpg_wrapper_test_base.so+0x26b1e)
    #21 yb::pgwrapper::PgDdlAtomicityStressTest::Connect() ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc:147:25 (pg_ddl_atomicity_stress-test+0x136d6c)
    #22 yb::pgwrapper::PgDdlAtomicityStressTest::TestDdl(std::vector<string, std::allocator<string>> const&, int) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc:165:15 (pg_ddl_atomicity_stress-test+0x136df5)
    #23 yb::pgwrapper::PgDdlAtomicityStressTest_StressTest_Test::TestBody()::$_2::operator()() const ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc:316:5 (pg_ddl_atomicity_stress-test+0x13d2eb)
```

It appears that the function `yb::pgwrapper::LibPqTestBase::Connect` isn't
thread safe. I restructured the code to make the connections in a single thread
and then pass them to various concurrent threads for testing.
Jira: DB-2996

Test Plan:
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/0 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/1 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/2 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/3 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/4 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/5 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/6 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/7 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/8 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/9 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/10 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/11 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/12 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/13 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/14 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/15 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/16 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/17 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/18 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/19 --clang17

Verified that no more tsan errors.

Reviewers: fizaa

Reviewed By: fizaa

Subscribers: yql

Differential Revision: https://phorge.dev.yugabyte.com/D37111
---
 .../pgwrapper/pg_ddl_atomicity_stress-test.cc | 50 ++++++++++---------
 1 file changed, 26 insertions(+), 24 deletions(-)

diff --git a/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc b/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc
index 891efe89e851..1b2a3d6afc6b 100644
--- a/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc
+++ b/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc
@@ -92,11 +92,11 @@ class PgDdlAtomicityStressTest
     return std::get<2>(GetParam());
   }
 
-  Status TestDdl(const std::vector<std::string>& ddl, const int iteration);
+  Status TestDdl(PGConn* conn, const std::vector<std::string>& ddl, const int iteration);
 
-  Status TestConcurrentIndex(const int num_iterations);
+  Status TestConcurrentIndex(PGConn* conn, const int num_iterations);
 
-  Status TestDml(const int num_iterations);
+  Status TestDml(PGConn* conn, const int num_iterations);
 
   template<class... Args>
   Result<bool> ExecuteFormatWithRetry(PGConn* conn, const std::string& format, Args&&... args) {
@@ -161,13 +161,12 @@ std::string PgDdlAtomicityStressTest::database() {
 }
 
 Status PgDdlAtomicityStressTest::TestDdl(
-    const std::vector<std::string>& ddls, const int num_iterations) {
-  auto conn = VERIFY_RESULT(Connect());
+    PGConn* conn, const std::vector<std::string>& ddls, const int num_iterations) {
   for (int i = 0; i < num_iterations; ++i) {
     for (const auto& ddl : ddls) {
       auto stmt = Format(ddl, kTable, i);
       LOG(INFO) << "Executing stmt " << stmt;
-      while (!VERIFY_RESULT(DoExecuteWithRetry(&conn, stmt))) {
+      while (!VERIFY_RESULT(DoExecuteWithRetry(conn, stmt))) {
         LOG(INFO) << "Retry executing stmt " << stmt;
       }
     }
@@ -224,35 +223,33 @@ Result<bool> PgDdlAtomicityStressTest::DoExecuteWithRetry(PGConn* conn, const st
   return s;
 }
 
-Status PgDdlAtomicityStressTest::TestConcurrentIndex(const int num_iterations) {
-  auto conn = VERIFY_RESULT(Connect());
+Status PgDdlAtomicityStressTest::TestConcurrentIndex(PGConn* conn, const int num_iterations) {
   for (int i = 0; i < num_iterations; ++i) {
     bool index_created = false;
     while (!index_created) {
       // If concurrent index creation fails, it does not clean up the invalid index. Thus to
       // make the statement idempotent, drop the index if the create index failed before retrying.
       index_created = VERIFY_RESULT(ExecuteFormatWithRetry(
-          &conn, "CREATE INDEX idx_$0 ON $1(key)", i, kTable));
+          conn, "CREATE INDEX idx_$0 ON $1(key)", i, kTable));
       if (!index_created) {
         auto stmt = Format("DROP INDEX IF EXISTS idx_$0", i);
-        while (!VERIFY_RESULT(ExecuteFormatWithRetry(&conn, stmt))) {
+        while (!VERIFY_RESULT(ExecuteFormatWithRetry(conn, stmt))) {
           LOG(INFO) << "Retry executing stmt " << stmt;
         }
       }
     }
     auto stmt = Format("DROP INDEX idx_$0", i);
-    while (!VERIFY_RESULT(ExecuteFormatWithRetry(&conn, stmt))) {
+    while (!VERIFY_RESULT(ExecuteFormatWithRetry(conn, stmt))) {
       LOG(INFO) << "Retry executing stmt " << stmt;
     }
   }
   return Status::OK();
 }
 
-Status PgDdlAtomicityStressTest::TestDml(const int num_iterations) {
-  auto conn = VERIFY_RESULT(Connect());
+Status PgDdlAtomicityStressTest::TestDml(PGConn* conn, const int num_iterations) {
   for (int i = 1; i <= num_iterations;) {
     if (VERIFY_RESULT(ExecuteFormatWithRetry(
-                      &conn, "UPDATE $0 SET value = 'value_$1' WHERE key = $1", kTable, i))) {
+                      conn, "UPDATE $0 SET value = 'value_$1' WHERE key = $1", kTable, i))) {
       ++i;
     }
   }
@@ -288,45 +285,50 @@ TEST_P(PgDdlAtomicityStressTest, StressTest) {
   // exists when it is executed. Each thread uses its own connection for its entire duration.
 
   // Create a thread to add and drop columns.
-  thread_holder.AddThreadFunctor([this, num_iterations] {
+  auto conn1 = ASSERT_RESULT(Connect());
+  thread_holder.AddThreadFunctor([this, &conn1, num_iterations] {
     std::vector<std::string> ddls = {
       "ALTER TABLE $0 ADD COLUMN col_$1 TEXT",
       "ALTER TABLE $0 DROP COLUMN col_$1"
     };
-    ASSERT_OK(TestDdl(ddls, num_iterations));
+    ASSERT_OK(TestDdl(&conn1, ddls, num_iterations));
     LOG(INFO) << "Thread to add and drop columns has completed";
   });
 
   // Create a thread to add and drop columns with default values.
-  thread_holder.AddThreadFunctor([this, num_iterations] {
+  auto conn2 = ASSERT_RESULT(Connect());
+  thread_holder.AddThreadFunctor([this, &conn2, num_iterations] {
     std::vector<std::string> ddls = {
       "ALTER TABLE $0 ADD COLUMN col_def_$1 TEXT DEFAULT 'def'",
       "ALTER TABLE $0 DROP COLUMN col_def_$1"
     };
-    ASSERT_OK(TestDdl(ddls, num_iterations));
+    ASSERT_OK(TestDdl(&conn2, ddls, num_iterations));
     LOG(INFO) << "Thread to add and drop columns with default values has completed";
   });
 
   // Create a thread to create/drop an index on this table.
-  thread_holder.AddThreadFunctor([this, num_iterations] {
+  auto conn3 = ASSERT_RESULT(Connect());
+  thread_holder.AddThreadFunctor([this, &conn3, num_iterations] {
     std::vector<std::string> ddls = {
       "CREATE INDEX NONCONCURRENTLY non_concurrent_idx_$1 ON $0(key)",
       "DROP INDEX non_concurrent_idx_$1"
     };
-    ASSERT_OK(TestDdl(ddls, num_iterations));
+    ASSERT_OK(TestDdl(&conn3, ddls, num_iterations));
     LOG(INFO) << "Thread to create/drop an index has completed";
   });
 
   // ConcurrentIndex is a very long running operation. Cleaning up a failed ConcurrentIndex is
   // also a DDL, and this can be a very long running test. Reduce the number of iterations.
-  thread_holder.AddThreadFunctor([this, num_iterations] {
-    ASSERT_OK(TestConcurrentIndex(num_iterations / 2));
+  auto conn4 = ASSERT_RESULT(Connect());
+  thread_holder.AddThreadFunctor([this, &conn4, num_iterations] {
+    ASSERT_OK(TestConcurrentIndex(&conn4, num_iterations / 2));
     LOG(INFO) << "Thread to run concurrent index has completed";
   });
 
   // Create a thread to update the rows on this table.
-  thread_holder.AddThreadFunctor([this, num_iterations] {
-    ASSERT_OK(TestDml(num_iterations));
+  auto conn5 = ASSERT_RESULT(Connect());
+  thread_holder.AddThreadFunctor([this, &conn5, num_iterations] {
+    ASSERT_OK(TestDml(&conn5, num_iterations));
     LOG(INFO) << "Thread to update the rows has completed";
   });