Skip to content

Commit

Permalink
[#13358] YSQL: Fix DDL atomicity stress test failure in tsan build
Browse files Browse the repository at this point in the history
Summary:
The DDL atomicity stress tests failed more on pg15 branch with an error like:

```
WARNING: ThreadSanitizer: data race (pid=180911)
  Write of size 8 at 0x7b2c000257b8 by thread T17 (mutexes: write M0):
    #0 profile_open_file prof_file.c (libkrb5.so.3+0xf45b3)
    #1 profile_init_flags <null> (libkrb5.so.3+0xfb056)
    #2 k5_os_init_context <null> (libkrb5.so.3+0xe5546)
    #3 krb5_init_context_profile <null> (libkrb5.so.3+0xabc90)
    #4 krb5_init_context <null> (libkrb5.so.3+0xabbd5)
    #5 krb5_gss_init_context init_sec_context.c (libgssapi_krb5.so.2+0x448da)
    #6 acquire_cred_from acquire_cred.c (libgssapi_krb5.so.2+0x39159)
    #7 krb5_gss_acquire_cred_from acquire_cred.c (libgssapi_krb5.so.2+0x39072)
    #8 gss_add_cred_from <null> (libgssapi_krb5.so.2+0x1fcd3)
    #9 gss_acquire_cred_from <null> (libgssapi_krb5.so.2+0x1f69d)
    #10 gss_acquire_cred <null> (libgssapi_krb5.so.2+0x1f431)
    #11 pg_GSS_have_cred_cache ${YB_SRC_ROOT}/src/postgres/src/interfaces/libpq/../../../../../../src/postgres/src/interfaces/libpq/fe-gssapi-common.c:68:10 (libpq.so.5+0x543fe)
    #12 PQconnectPoll ${YB_SRC_ROOT}/src/postgres/src/interfaces/libpq/../../../../../../src/postgres/src/interfaces/libpq/fe-connect.c:2909:22 (libpq.so.5+0x359ca)
    #13 connectDBComplete ${YB_SRC_ROOT}/src/postgres/src/interfaces/libpq/../../../../../../src/postgres/src/interfaces/libpq/fe-connect.c:2241:10 (libpq.so.5+0x30807)
    #14 PQconnectdb ${YB_SRC_ROOT}/src/postgres/src/interfaces/libpq/../../../../../../src/postgres/src/interfaces/libpq/fe-connect.c:719:10 (libpq.so.5+0x30af1)
    #15 yb::pgwrapper::PGConn::Connect(string const&, std::chrono::time_point<yb::CoarseMonoClock, std::chrono::duration<long long, std::ratio<1l, 1000000000l>>>, bool, string const&) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_utils.cc:348:24 (libpq_utils.so+0x13c5b)
    #16 yb::pgwrapper::PGConn::Connect(string const&, bool, string const&) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_utils.h:254:12 (libpq_utils.so+0x1a77e)
    #17 yb::pgwrapper::PGConnBuilder::Connect(bool) const ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_utils.cc:743:10 (libpq_utils.so+0x1a77e)
    #18 yb::pgwrapper::LibPqTestBase::ConnectToDBAsUser(string const&, string const&, bool) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_test_base.cc:54:6 (libpg_wrapper_test_base.so+0x26f34)
    #19 yb::pgwrapper::LibPqTestBase::ConnectToDB(string const&, bool) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_test_base.cc:44:10 (libpg_wrapper_test_base.so+0x26b1e)
    #20 yb::pgwrapper::LibPqTestBase::Connect(bool) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/libpq_test_base.cc:40:10 (libpg_wrapper_test_base.so+0x26b1e)
    #21 yb::pgwrapper::PgDdlAtomicityStressTest::Connect() ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc:147:25 (pg_ddl_atomicity_stress-test+0x136d6c)
    #22 yb::pgwrapper::PgDdlAtomicityStressTest::TestDdl(std::vector<string, std::allocator<string>> const&, int) ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc:165:15 (pg_ddl_atomicity_stress-test+0x136df5)
    #23 yb::pgwrapper::PgDdlAtomicityStressTest_StressTest_Test::TestBody()::$_2::operator()() const ${YB_SRC_ROOT}/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc:316:5 (pg_ddl_atomicity_stress-test+0x13d2eb)
```

It appears that the function `yb::pgwrapper::LibPqTestBase::Connect` isn't
thread safe. I restructured the code to make the connections in a single thread
and then pass them to various concurrent threads for testing.
Jira: DB-2996

Test Plan:
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/0 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/1 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/2 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/3 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/4 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/5 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/6 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/7 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/8 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/9 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/10 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/11 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/12 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/13 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/14 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/15 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/16 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/17 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/18 --clang17
./yb_build.sh tsan --cxx-test pgwrapper_pg_ddl_atomicity_stress-test --gtest_filter PgDdlAtomicityStressTest/PgDdlAtomicityStressTest.StressTest/19 --clang17

Verified that no more tsan errors.

Reviewers: fizaa

Reviewed By: fizaa

Subscribers: yql

Differential Revision: https://phorge.dev.yugabyte.com/D37111
  • Loading branch information
myang2021 committed Aug 8, 2024
1 parent da9b281 commit bd4874b
Showing 1 changed file with 26 additions and 24 deletions.
50 changes: 26 additions & 24 deletions src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ class PgDdlAtomicityStressTest
return std::get<2>(GetParam());
}

Status TestDdl(const std::vector<std::string>& ddl, const int iteration);
Status TestDdl(PGConn* conn, const std::vector<std::string>& ddl, const int iteration);

Status TestConcurrentIndex(const int num_iterations);
Status TestConcurrentIndex(PGConn* conn, const int num_iterations);

Status TestDml(const int num_iterations);
Status TestDml(PGConn* conn, const int num_iterations);

template<class... Args>
Result<bool> ExecuteFormatWithRetry(PGConn* conn, const std::string& format, Args&&... args) {
Expand Down Expand Up @@ -161,13 +161,12 @@ std::string PgDdlAtomicityStressTest::database() {
}

Status PgDdlAtomicityStressTest::TestDdl(
const std::vector<std::string>& ddls, const int num_iterations) {
auto conn = VERIFY_RESULT(Connect());
PGConn* conn, const std::vector<std::string>& ddls, const int num_iterations) {
for (int i = 0; i < num_iterations; ++i) {
for (const auto& ddl : ddls) {
auto stmt = Format(ddl, kTable, i);
LOG(INFO) << "Executing stmt " << stmt;
while (!VERIFY_RESULT(DoExecuteWithRetry(&conn, stmt))) {
while (!VERIFY_RESULT(DoExecuteWithRetry(conn, stmt))) {
LOG(INFO) << "Retry executing stmt " << stmt;
}
}
Expand Down Expand Up @@ -224,35 +223,33 @@ Result<bool> PgDdlAtomicityStressTest::DoExecuteWithRetry(PGConn* conn, const st
return s;
}

Status PgDdlAtomicityStressTest::TestConcurrentIndex(const int num_iterations) {
auto conn = VERIFY_RESULT(Connect());
Status PgDdlAtomicityStressTest::TestConcurrentIndex(PGConn* conn, const int num_iterations) {
for (int i = 0; i < num_iterations; ++i) {
bool index_created = false;
while (!index_created) {
// If concurrent index creation fails, it does not clean up the invalid index. Thus to
// make the statement idempotent, drop the index if the create index failed before retrying.
index_created = VERIFY_RESULT(ExecuteFormatWithRetry(
&conn, "CREATE INDEX idx_$0 ON $1(key)", i, kTable));
conn, "CREATE INDEX idx_$0 ON $1(key)", i, kTable));
if (!index_created) {
auto stmt = Format("DROP INDEX IF EXISTS idx_$0", i);
while (!VERIFY_RESULT(ExecuteFormatWithRetry(&conn, stmt))) {
while (!VERIFY_RESULT(ExecuteFormatWithRetry(conn, stmt))) {
LOG(INFO) << "Retry executing stmt " << stmt;
}
}
}
auto stmt = Format("DROP INDEX idx_$0", i);
while (!VERIFY_RESULT(ExecuteFormatWithRetry(&conn, stmt))) {
while (!VERIFY_RESULT(ExecuteFormatWithRetry(conn, stmt))) {
LOG(INFO) << "Retry executing stmt " << stmt;
}
}
return Status::OK();
}

Status PgDdlAtomicityStressTest::TestDml(const int num_iterations) {
auto conn = VERIFY_RESULT(Connect());
Status PgDdlAtomicityStressTest::TestDml(PGConn* conn, const int num_iterations) {
for (int i = 1; i <= num_iterations;) {
if (VERIFY_RESULT(ExecuteFormatWithRetry(
&conn, "UPDATE $0 SET value = 'value_$1' WHERE key = $1", kTable, i))) {
conn, "UPDATE $0 SET value = 'value_$1' WHERE key = $1", kTable, i))) {
++i;
}
}
Expand Down Expand Up @@ -288,45 +285,50 @@ TEST_P(PgDdlAtomicityStressTest, StressTest) {
// exists when it is executed. Each thread uses its own connection for its entire duration.

// Create a thread to add and drop columns.
thread_holder.AddThreadFunctor([this, num_iterations] {
auto conn1 = ASSERT_RESULT(Connect());
thread_holder.AddThreadFunctor([this, &conn1, num_iterations] {
std::vector<std::string> ddls = {
"ALTER TABLE $0 ADD COLUMN col_$1 TEXT",
"ALTER TABLE $0 DROP COLUMN col_$1"
};
ASSERT_OK(TestDdl(ddls, num_iterations));
ASSERT_OK(TestDdl(&conn1, ddls, num_iterations));
LOG(INFO) << "Thread to add and drop columns has completed";
});

// Create a thread to add and drop columns with default values.
thread_holder.AddThreadFunctor([this, num_iterations] {
auto conn2 = ASSERT_RESULT(Connect());
thread_holder.AddThreadFunctor([this, &conn2, num_iterations] {
std::vector<std::string> ddls = {
"ALTER TABLE $0 ADD COLUMN col_def_$1 TEXT DEFAULT 'def'",
"ALTER TABLE $0 DROP COLUMN col_def_$1"
};
ASSERT_OK(TestDdl(ddls, num_iterations));
ASSERT_OK(TestDdl(&conn2, ddls, num_iterations));
LOG(INFO) << "Thread to add and drop columns with default values has completed";
});

// Create a thread to create/drop an index on this table.
thread_holder.AddThreadFunctor([this, num_iterations] {
auto conn3 = ASSERT_RESULT(Connect());
thread_holder.AddThreadFunctor([this, &conn3, num_iterations] {
std::vector<std::string> ddls = {
"CREATE INDEX NONCONCURRENTLY non_concurrent_idx_$1 ON $0(key)",
"DROP INDEX non_concurrent_idx_$1"
};
ASSERT_OK(TestDdl(ddls, num_iterations));
ASSERT_OK(TestDdl(&conn3, ddls, num_iterations));
LOG(INFO) << "Thread to create/drop an index has completed";
});

// ConcurrentIndex is a very long running operation. Cleaning up a failed ConcurrentIndex is
// also a DDL, and this can be a very long running test. Reduce the number of iterations.
thread_holder.AddThreadFunctor([this, num_iterations] {
ASSERT_OK(TestConcurrentIndex(num_iterations / 2));
auto conn4 = ASSERT_RESULT(Connect());
thread_holder.AddThreadFunctor([this, &conn4, num_iterations] {
ASSERT_OK(TestConcurrentIndex(&conn4, num_iterations / 2));
LOG(INFO) << "Thread to run concurrent index has completed";
});

// Create a thread to update the rows on this table.
thread_holder.AddThreadFunctor([this, num_iterations] {
ASSERT_OK(TestDml(num_iterations));
auto conn5 = ASSERT_RESULT(Connect());
thread_holder.AddThreadFunctor([this, &conn5, num_iterations] {
ASSERT_OK(TestDml(&conn5, num_iterations));
LOG(INFO) << "Thread to update the rows has completed";
});

Expand Down

0 comments on commit bd4874b

Please sign in to comment.