diff --git a/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc b/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc index 891efe89e851..1b2a3d6afc6b 100644 --- a/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc +++ b/src/yb/yql/pgwrapper/pg_ddl_atomicity_stress-test.cc @@ -92,11 +92,11 @@ class PgDdlAtomicityStressTest return std::get<2>(GetParam()); } - Status TestDdl(const std::vector& ddl, const int iteration); + Status TestDdl(PGConn* conn, const std::vector& ddl, const int iteration); - Status TestConcurrentIndex(const int num_iterations); + Status TestConcurrentIndex(PGConn* conn, const int num_iterations); - Status TestDml(const int num_iterations); + Status TestDml(PGConn* conn, const int num_iterations); template Result ExecuteFormatWithRetry(PGConn* conn, const std::string& format, Args&&... args) { @@ -161,13 +161,12 @@ std::string PgDdlAtomicityStressTest::database() { } Status PgDdlAtomicityStressTest::TestDdl( - const std::vector& ddls, const int num_iterations) { - auto conn = VERIFY_RESULT(Connect()); + PGConn* conn, const std::vector& ddls, const int num_iterations) { for (int i = 0; i < num_iterations; ++i) { for (const auto& ddl : ddls) { auto stmt = Format(ddl, kTable, i); LOG(INFO) << "Executing stmt " << stmt; - while (!VERIFY_RESULT(DoExecuteWithRetry(&conn, stmt))) { + while (!VERIFY_RESULT(DoExecuteWithRetry(conn, stmt))) { LOG(INFO) << "Retry executing stmt " << stmt; } } @@ -224,35 +223,33 @@ Result PgDdlAtomicityStressTest::DoExecuteWithRetry(PGConn* conn, const st return s; } -Status PgDdlAtomicityStressTest::TestConcurrentIndex(const int num_iterations) { - auto conn = VERIFY_RESULT(Connect()); +Status PgDdlAtomicityStressTest::TestConcurrentIndex(PGConn* conn, const int num_iterations) { for (int i = 0; i < num_iterations; ++i) { bool index_created = false; while (!index_created) { // If concurrent index creation fails, it does not clean up the invalid index. Thus to // make the statement idempotent, drop the index if the create index failed before retrying. index_created = VERIFY_RESULT(ExecuteFormatWithRetry( - &conn, "CREATE INDEX idx_$0 ON $1(key)", i, kTable)); + conn, "CREATE INDEX idx_$0 ON $1(key)", i, kTable)); if (!index_created) { auto stmt = Format("DROP INDEX IF EXISTS idx_$0", i); - while (!VERIFY_RESULT(ExecuteFormatWithRetry(&conn, stmt))) { + while (!VERIFY_RESULT(ExecuteFormatWithRetry(conn, stmt))) { LOG(INFO) << "Retry executing stmt " << stmt; } } } auto stmt = Format("DROP INDEX idx_$0", i); - while (!VERIFY_RESULT(ExecuteFormatWithRetry(&conn, stmt))) { + while (!VERIFY_RESULT(ExecuteFormatWithRetry(conn, stmt))) { LOG(INFO) << "Retry executing stmt " << stmt; } } return Status::OK(); } -Status PgDdlAtomicityStressTest::TestDml(const int num_iterations) { - auto conn = VERIFY_RESULT(Connect()); +Status PgDdlAtomicityStressTest::TestDml(PGConn* conn, const int num_iterations) { for (int i = 1; i <= num_iterations;) { if (VERIFY_RESULT(ExecuteFormatWithRetry( - &conn, "UPDATE $0 SET value = 'value_$1' WHERE key = $1", kTable, i))) { + conn, "UPDATE $0 SET value = 'value_$1' WHERE key = $1", kTable, i))) { ++i; } } @@ -288,45 +285,50 @@ TEST_P(PgDdlAtomicityStressTest, StressTest) { // exists when it is executed. Each thread uses its own connection for its entire duration. // Create a thread to add and drop columns. - thread_holder.AddThreadFunctor([this, num_iterations] { + auto conn1 = ASSERT_RESULT(Connect()); + thread_holder.AddThreadFunctor([this, &conn1, num_iterations] { std::vector ddls = { "ALTER TABLE $0 ADD COLUMN col_$1 TEXT", "ALTER TABLE $0 DROP COLUMN col_$1" }; - ASSERT_OK(TestDdl(ddls, num_iterations)); + ASSERT_OK(TestDdl(&conn1, ddls, num_iterations)); LOG(INFO) << "Thread to add and drop columns has completed"; }); // Create a thread to add and drop columns with default values. - thread_holder.AddThreadFunctor([this, num_iterations] { + auto conn2 = ASSERT_RESULT(Connect()); + thread_holder.AddThreadFunctor([this, &conn2, num_iterations] { std::vector ddls = { "ALTER TABLE $0 ADD COLUMN col_def_$1 TEXT DEFAULT 'def'", "ALTER TABLE $0 DROP COLUMN col_def_$1" }; - ASSERT_OK(TestDdl(ddls, num_iterations)); + ASSERT_OK(TestDdl(&conn2, ddls, num_iterations)); LOG(INFO) << "Thread to add and drop columns with default values has completed"; }); // Create a thread to create/drop an index on this table. - thread_holder.AddThreadFunctor([this, num_iterations] { + auto conn3 = ASSERT_RESULT(Connect()); + thread_holder.AddThreadFunctor([this, &conn3, num_iterations] { std::vector ddls = { "CREATE INDEX NONCONCURRENTLY non_concurrent_idx_$1 ON $0(key)", "DROP INDEX non_concurrent_idx_$1" }; - ASSERT_OK(TestDdl(ddls, num_iterations)); + ASSERT_OK(TestDdl(&conn3, ddls, num_iterations)); LOG(INFO) << "Thread to create/drop an index has completed"; }); // ConcurrentIndex is a very long running operation. Cleaning up a failed ConcurrentIndex is // also a DDL, and this can be a very long running test. Reduce the number of iterations. - thread_holder.AddThreadFunctor([this, num_iterations] { - ASSERT_OK(TestConcurrentIndex(num_iterations / 2)); + auto conn4 = ASSERT_RESULT(Connect()); + thread_holder.AddThreadFunctor([this, &conn4, num_iterations] { + ASSERT_OK(TestConcurrentIndex(&conn4, num_iterations / 2)); LOG(INFO) << "Thread to run concurrent index has completed"; }); // Create a thread to update the rows on this table. - thread_holder.AddThreadFunctor([this, num_iterations] { - ASSERT_OK(TestDml(num_iterations)); + auto conn5 = ASSERT_RESULT(Connect()); + thread_holder.AddThreadFunctor([this, &conn5, num_iterations] { + ASSERT_OK(TestDml(&conn5, num_iterations)); LOG(INFO) << "Thread to update the rows has completed"; });