Skip to content

Commit

Permalink
[#23251] YCQL, ASH: Fix CQL test failures with ASH
Browse files Browse the repository at this point in the history
Summary:
There seems to be some race conditions in CQL tests due
to D3169, it is not clear from the summary what that diff
was supposed to fix, so this diff reverts the code to
reading and writing into `request_` non-atomically.

Running a mini cluster with both YSQL and YCQL seems
to be slow in TSAN mode. CQL driver seems to timeout
most of the times while connecting to the cluster. This
diff fixes this by using either YSQL or YCQL while running
the tests.

This diff also enables concurrent updates in some cql
wait states, which fixes a few non-ASH tests when ASH
would be enabled by default
Jira: DB-12182

Test Plan: ./yb_build.sh --cxx-test wait_states-itest

Reviewers: amitanand, jason, hbhanawat

Reviewed By: amitanand

Subscribers: yql

Differential Revision: https://phorge.dev.yugabyte.com/D37312
  • Loading branch information
abhinab-yb committed Sep 2, 2024
1 parent aa31795 commit f77dd6a
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 60 deletions.
8 changes: 4 additions & 4 deletions src/yb/ash/wait_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,14 @@ WaitStateInfo::WaitStateInfo()
: metadata_(AshMetadata{}) {}

void WaitStateInfo::set_code(WaitStateCode code, const char* location) {
if (FLAGS_TEST_trace_ash_wait_code_updates) {
auto prev_code = code_.exchange(code, std::memory_order_release);
if (FLAGS_TEST_trace_ash_wait_code_updates && prev_code != code) {
if (FLAGS_tracing_level >= 1) {
VTrace(1, yb::Format("$0 at $1", ash::ToString(code), location));
} else {
VTrace(0, yb::Format("$0", ash::ToString(code)));
}
}
code_ = code;
MaybeSleepForTests(this, code);
}

Expand Down Expand Up @@ -351,8 +351,8 @@ const WaitStateInfoPtr& WaitStateInfo::CurrentWaitState() {
}

void WaitStateInfo::EnableConcurrentUpdates() {
concurrent_updates_allowed_ = true;
if (FLAGS_TEST_trace_ash_wait_code_updates) {
auto old_value = concurrent_updates_allowed_.exchange(true, std::memory_order_release);
if (FLAGS_TEST_trace_ash_wait_code_updates && !old_value) {
VTrace(0, yb::Format("Enabling concurrent updates"));
}
}
Expand Down
142 changes: 110 additions & 32 deletions src/yb/integration-tests/wait_states-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ DECLARE_int32(rpc_workers_limit);
DECLARE_int64(transaction_abort_check_interval_ms);
DECLARE_uint64(transaction_manager_workers_limit);
DECLARE_bool(transactions_poll_check_aborted);
DECLARE_bool(enable_ysql);
DECLARE_bool(master_auto_run_initdb);

DECLARE_bool(TEST_ash_fetch_wait_states_for_raft_log);
DECLARE_bool(TEST_ash_fetch_wait_states_for_rocksdb_flush_and_compaction);
Expand Down Expand Up @@ -95,9 +97,36 @@ DEFINE_test_flag(bool, verify_pull, false,

namespace yb {

YB_DEFINE_ENUM(TestMode, (kYSQL)(kYCQL));

namespace {

TestMode GetTestMode(ash::WaitStateCode code) {
switch (code) {
case ash::WaitStateCode::kCreatingNewTablet:
case ash::WaitStateCode::kConsensusMeta_Flush:
case ash::WaitStateCode::kSaveRaftGroupMetadataToDisk:
case ash::WaitStateCode::kBackfillIndex_WaitForAFreeSlot:
case ash::WaitStateCode::kYCQL_Parse:
case ash::WaitStateCode::kYCQL_Read:
case ash::WaitStateCode::kYCQL_Write:
case ash::WaitStateCode::kYCQL_Analyze:
case ash::WaitStateCode::kYCQL_Execute:
return TestMode::kYCQL;
default:
break;
}
return TestMode::kYSQL;
}

} // anonymous namespace

class WaitStateITest : public pgwrapper::PgMiniTestBase {
public:
WaitStateITest() = default;
WaitStateITest() : WaitStateITest(TestMode::kYSQL) {}
explicit WaitStateITest(TestMode test_mode)
: test_mode_(test_mode) {}

virtual ~WaitStateITest() = default;

size_t NumTabletServers() override { return 1; }
Expand All @@ -113,14 +142,19 @@ class WaitStateITest : public pgwrapper::PgMiniTestBase {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_collect_end_to_end_traces) = true;
pgwrapper::PgMiniTestBase::SetUp();

ASSERT_OK(EnsureClientCreated());
ASSERT_OK(StartCQLServer());
cql_driver_ = std::make_unique<CppCassandraDriver>(
std::vector<std::string>{cql_host_}, cql_port_, UsePartitionAwareRouting::kTrue);
if (test_mode_ == TestMode::kYCQL) {
ASSERT_OK(SetUpCQL());
}
};

std::unique_ptr<CppCassandraDriver> cql_driver_;
std::unique_ptr<cqlserver::CQLServer> cql_server_;
void DoTearDown() override {
if (test_mode_ == TestMode::kYSQL && pg_supervisor_) {
pg_supervisor_->Stop();
} else if (test_mode_ == TestMode::kYCQL && cql_server_) {
cql_server_->Shutdown();
}
YBMiniClusterTestBase::DoTearDown();
}

protected:
// Cassandra has an internal timeout of 12s for the control connection.
Expand All @@ -142,19 +176,57 @@ class WaitStateITest : public pgwrapper::PgMiniTestBase {
return result;
}

bool IsSQLEnabled() {
return test_mode_ == TestMode::kYSQL;
}

bool IsCQLEnabled() {
return test_mode_ == TestMode::kYCQL;
}

std::unique_ptr<CppCassandraDriver> cql_driver_;

private:
Status StartCQLServer() {
cql_server_ = CqlTestBase<MiniCluster>::MakeCQLServerForTServer(
cluster_.get(), /* idx */ 0, client_.get(), &cql_host_, &cql_port_);
return cql_server_->Start();
}

void StartPgSupervisor(uint16_t pg_port, const int pg_ts_idx) override {
if (test_mode_ == TestMode::kYSQL) {
pgwrapper::PgMiniTestBase::StartPgSupervisor(pg_port, pg_ts_idx);
}
}

void EnableYSQLFlags() override {
if (test_mode_ == TestMode::kYCQL) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_ysql) = false;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_master_auto_run_initdb) = false;
} else {
pgwrapper::PgMiniTestBase::EnableYSQLFlags();
}
}

Status SetUpCQL() {
RETURN_NOT_OK(EnsureClientCreated());
RETURN_NOT_OK(StartCQLServer());
cql_driver_ = std::make_unique<CppCassandraDriver>(
std::vector<std::string>{cql_host_}, cql_port_, UsePartitionAwareRouting::kTrue);
return Status::OK();
}

std::unique_ptr<cqlserver::CQLServer> cql_server_;
std::string cql_host_;
uint16_t cql_port_ = 0;
const TestMode test_mode_;
};

class WaitStateTestCheckMethodCounts : public WaitStateITest {
protected:
explicit WaitStateTestCheckMethodCounts(TestMode test_mode)
: WaitStateITest(test_mode) {}

void CreateTables();

virtual void LaunchWorkers(TestThreadHolder* thread_holder);
Expand Down Expand Up @@ -198,7 +270,7 @@ class WaitStateTestCheckMethodCounts : public WaitStateITest {
};

void WaitStateTestCheckMethodCounts::VerifyRowsFromASH() {
if (NumTabletServers() != 1) {
if (NumTabletServers() != 1 || !IsSQLEnabled()) {
return;
}

Expand Down Expand Up @@ -227,30 +299,30 @@ void WaitStateTestCheckMethodCounts::VerifyRowsFromASH() {

void WaitStateTestCheckMethodCounts::CreateCqlTables() {
std::atomic_bool is_done{false};
auto session = ASSERT_RESULT(CqlSessionWithRetries(is_done, __PRETTY_FUNCTION__));
auto session = ASSERT_RESULT(CqlSessionWithRetries(is_done, __PRETTY_FUNCTION__));;
ASSERT_OK(
session.ExecuteQuery("CREATE TABLE IF NOT EXISTS t (key INT PRIMARY KEY, value TEXT) WITH "
"transactions = { 'enabled' : true }"));
}

void WaitStateTestCheckMethodCounts::DoCqlWritesUntilStopped(std::atomic<bool>& stop) {
auto session_result = CqlSessionWithRetries(stop, __PRETTY_FUNCTION__);
auto session = ASSERT_RESULT(CqlSessionWithRetries(stop, __PRETTY_FUNCTION__));;
const auto kNumKeys = NumKeys();
for (int i = 0; !stop; i++) {
WARN_NOT_OK(
session_result->ExecuteQuery(yb::Format(
session.ExecuteQuery(yb::Format(
"INSERT INTO t (key, value) VALUES ($0, 'v-$1')", i % kNumKeys,
std::string(1000, 'A' + i % 26))),
"Insert failed");
}
}

void WaitStateTestCheckMethodCounts::DoCqlReadsUntilStopped(std::atomic<bool>& stop) {
auto session_result = CqlSessionWithRetries(stop, __PRETTY_FUNCTION__);
auto session = ASSERT_RESULT(CqlSessionWithRetries(stop, __PRETTY_FUNCTION__));;
const auto kNumKeys = NumKeys();
for (int i = 0; !stop; i++) {
WARN_NOT_OK(
session_result->ExecuteWithResult(
session.ExecuteWithResult(
yb::Format("SELECT value FROM t WHERE key = $0", i % kNumKeys)),
"Select failed");
}
Expand Down Expand Up @@ -301,7 +373,7 @@ size_t WaitStateTestCheckMethodCounts::GetMethodCount(const std::string& method)
}

void WaitStateTestCheckMethodCounts::LaunchWorkers(TestThreadHolder* thread_holder) {
if (enable_cql_) {
if (IsCQLEnabled()) {
for (int i = 0; i < NumWriterThreads(); i++) {
thread_holder->AddThreadFunctor(
[this, &stop = thread_holder->stop_flag()] { DoCqlWritesUntilStopped(stop); });
Expand All @@ -310,7 +382,7 @@ void WaitStateTestCheckMethodCounts::LaunchWorkers(TestThreadHolder* thread_hold
[this, &stop = thread_holder->stop_flag()] { DoCqlReadsUntilStopped(stop); });
}

if (enable_sql_) {
if (IsSQLEnabled()) {
for (int i = 0; i < NumWriterThreads(); i++) {
thread_holder->AddThreadFunctor(
[this, &stop = thread_holder->stop_flag()] { DoPgWritesUntilStopped(stop); });
Expand Down Expand Up @@ -378,8 +450,12 @@ void WaitStateTestCheckMethodCounts::DoAshCalls(std::atomic<bool>& stop) {
}

void WaitStateTestCheckMethodCounts::CreateTables() {
CreateCqlTables();
CreatePgTables();
if (IsCQLEnabled()) {
CreateCqlTables();
}
if (IsSQLEnabled()) {
CreatePgTables();
}
}

void WaitStateTestCheckMethodCounts::RunTestsAndFetchAshMethodCounts() {
Expand All @@ -402,6 +478,10 @@ void WaitStateTestCheckMethodCounts::RunTestsAndFetchAshMethodCounts() {
}

void WaitStateTestCheckMethodCounts::PrintRowsFromASH() {
if (!IsSQLEnabled()) {
return;
}

auto conn = ASSERT_RESULT(Connect());
std::vector<std::string> queries;
queries.push_back("SELECT count(*) FROM yb_active_session_history;");
Expand Down Expand Up @@ -476,10 +556,7 @@ bool WaitStateTestCheckMethodCounts::IsDone() {

class AshTestPg : public WaitStateTestCheckMethodCounts {
public:
AshTestPg() {
enable_cql_ = false;
enable_sql_ = true;
}
AshTestPg() : WaitStateTestCheckMethodCounts(TestMode::kYSQL) {}

protected:
void SetUp() override {
Expand All @@ -505,10 +582,7 @@ TEST_F_EX(WaitStateITest, AshPg, AshTestPg) {

class AshTestCql : public WaitStateTestCheckMethodCounts {
public:
AshTestCql() {
enable_cql_ = true;
enable_sql_ = false;
}
AshTestCql() : WaitStateTestCheckMethodCounts(TestMode::kYCQL) {}

protected:
void VerifyCountsUnlocked() REQUIRES(mutex_) override {
Expand All @@ -528,12 +602,17 @@ class AshTestCql : public WaitStateTestCheckMethodCounts {
}
};

TEST_F_EX(WaitStateITest, YB_DISABLE_TEST_IN_TSAN(AshCql), AshTestCql) {
TEST_F_EX(WaitStateITest, AshCql, AshTestCql) {
RunTestsAndFetchAshMethodCounts();
}

class AshTestWithCompactions : public WaitStateTestCheckMethodCounts {
public:
AshTestWithCompactions()
: AshTestWithCompactions(TestMode::kYSQL) {}
explicit AshTestWithCompactions(TestMode test_mode)
: WaitStateTestCheckMethodCounts(test_mode) {}

void LaunchWorkers(TestThreadHolder* thread_holder) override {
if (do_compactions_) {
thread_holder->AddThreadFunctor(
Expand Down Expand Up @@ -596,7 +675,8 @@ TEST_F_EX(WaitStateITest, AshFlushAndCompactions, AshTestWithCompactions) {
class AshTestVerifyOccurrenceBase : public AshTestWithCompactions {
public:
explicit AshTestVerifyOccurrenceBase(ash::WaitStateCode code)
: code_to_look_for_(code), verify_code_was_pulled_(GetAtomicFlag(&FLAGS_TEST_verify_pull)) {}
: AshTestWithCompactions(GetTestMode(code)),
code_to_look_for_(code), verify_code_was_pulled_(GetAtomicFlag(&FLAGS_TEST_verify_pull)) {}

void SetUp() override {
if (verify_code_was_pulled_ && ShouldSleepAtWaitCode()) {
Expand All @@ -611,7 +691,6 @@ class AshTestVerifyOccurrenceBase : public AshTestWithCompactions {
if (code_to_look_for_ == ash::WaitStateCode::kBackfillIndex_WaitForAFreeSlot) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_num_concurrent_backfills_allowed) = 1;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_slowdown_backfill_by_ms) = 20;
enable_sql_ = false;
}
if (code_to_look_for_ == ash::WaitStateCode::kRetryableRequests_SaveToDisk) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_flush_retryable_requests) = true;
Expand All @@ -624,7 +703,6 @@ class AshTestVerifyOccurrenceBase : public AshTestWithCompactions {

if (code_to_look_for_ == ash::WaitStateCode::kConflictResolution_WaitOnConflictingTxns) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_wait_queues) = true;
enable_cql_ = false;
}

const auto code_class = ash::Class(to_underlying(code_to_look_for_) >> YB_ASH_CLASS_POSITION);
Expand Down Expand Up @@ -723,7 +801,7 @@ class AshTestVerifyOccurrenceBase : public AshTestWithCompactions {
};

void AshTestVerifyOccurrenceBase::CreateIndexesUntilStopped(std::atomic<bool>& stop) {
auto session = ASSERT_RESULT(CqlSessionWithRetries(stop, __PRETTY_FUNCTION__));
auto session = ASSERT_RESULT(EstablishSession(cql_driver_.get()));
constexpr auto kNamespace = "test";
const client::YBTableName table_name(YQL_DATABASE_CQL, kNamespace, "t");
for (int i = 1; !stop; i++) {
Expand Down Expand Up @@ -819,7 +897,7 @@ INSTANTIATE_TEST_SUITE_P(
ash::WaitStateCode::kYBClient_LookingUpTablet
), WaitStateCodeToString);

TEST_P(AshTestVerifyOccurrence, YB_DISABLE_TEST_IN_TSAN(VerifyWaitStateEntered)) {
TEST_P(AshTestVerifyOccurrence, VerifyWaitStateEntered) {
RunTestsAndFetchAshMethodCounts();
}

Expand Down Expand Up @@ -872,7 +950,7 @@ INSTANTIATE_TEST_SUITE_P(
::testing::Bool()),
WaitStateCodeAndBoolToString);

TEST_P(AshTestWithPriorityQueue, YB_DISABLE_TEST_IN_TSAN(VerifyWaitStateEntered)) {
TEST_P(AshTestWithPriorityQueue, VerifyWaitStateEntered) {
RunTestsAndFetchAshMethodCounts();
}

Expand Down
6 changes: 6 additions & 0 deletions src/yb/yql/cql/cqlserver/cql_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class CQLProcessor : public ql::QLProcessor {
public:
ProcessRequestTask& Bind(CQLProcessor* processor) {
processor_ = processor;
wait_state_ = ash::WaitStateInfo::CurrentWaitState();
ASH_ENABLE_CONCURRENT_UPDATES();
return *this;
}

Expand All @@ -178,6 +180,9 @@ class CQLProcessor : public ql::QLProcessor {
void Run() override {
auto processor = processor_;
processor_ = nullptr;
auto wait_state = wait_state_;
wait_state_ = nullptr;
ADOPT_WAIT_STATE(wait_state);
std::unique_ptr<ql::CQLResponse> response(processor->ProcessRequest(*processor->request_));
if (response != nullptr) {
processor->SendResponse(*response);
Expand All @@ -187,6 +192,7 @@ class CQLProcessor : public ql::QLProcessor {
void Done(const Status& status) override {}

CQLProcessor* processor_ = nullptr;
ash::WaitStateInfoPtr wait_state_{nullptr};
};

friend class ProcessRequestTask;
Expand Down
4 changes: 0 additions & 4 deletions src/yb/yql/cql/cqlserver/cql_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,7 @@ void CQLInboundCall::RespondSuccess(const RefCntBuffer& buffer) {

void CQLInboundCall::GetCallDetails(rpc::RpcCallInProgressPB *call_in_progress_pb) const {
std::shared_ptr<const CQLRequest> request =
#ifdef THREAD_SANITIZER
request_;
#else
std::atomic_load_explicit(&request_, std::memory_order_acquire);
#endif
if (request == nullptr) {
return;
}
Expand Down
4 changes: 0 additions & 4 deletions src/yb/yql/cql/cqlserver/cql_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,7 @@ class CQLInboundCall : public rpc::InboundCall {
void GetCallDetails(rpc::RpcCallInProgressPB *call_in_progress_pb) const;
void SetRequest(std::shared_ptr<const ql::CQLRequest> request, CQLServiceImpl* service_impl) {
service_impl_ = service_impl;
#ifdef THREAD_SANITIZER
request_ = request;
#else
std::atomic_store_explicit(&request_, request, std::memory_order_release);
#endif
}

size_t ObjectSize() const override { return sizeof(*this); }
Expand Down
Loading

0 comments on commit f77dd6a

Please sign in to comment.