Skip to content

Commit

Permalink
[#20778] CDCSDK: Add retry logic over FlushTables calls in test
Browse files Browse the repository at this point in the history
Summary:
There were several instances of test runs failures with FlushTables client call timing out
caused by https://yugabyte.atlassian.net/browse/DB-9925. The fix for the root issue has been
landed as part of https://phorge.dev.yugabyte.com/D32215.

This diff will add retry function as an wrapper over FlushTables call to retry on specific error.
Jira: DB-9776

Test Plan: Jenkins: test regex: .*CDCSDK.*

Reviewers: skumar, stiwary, siddharth.shah

Reviewed By: skumar

Subscribers: ycdcxcluster

Differential Revision: https://phorge.dev.yugabyte.com/D32781
  • Loading branch information
Devansh Saxena committed Mar 7, 2024
1 parent fd3cfae commit 1f64b6e
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 177 deletions.
6 changes: 2 additions & 4 deletions src/yb/integration-tests/cdcsdk_before_image-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestBeforeImageRetention))
LOG(INFO) << "Sleeping to expire files according to TTL (history retention prevents deletion)";
SleepFor(MonoDelta::FromSeconds(2));

ASSERT_OK(test_client()->FlushTables(
{table.table_id()}, /* add_indexes = */ false,
/* timeout_secs = */ kCompactionTimeoutSec, /* is_compaction = */ true));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, kCompactionTimeoutSec, true));

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order.
const uint32_t expected_count[] = {1, 1, 2, 0, 0, 0};
Expand Down Expand Up @@ -1504,7 +1502,7 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestMultipleTableAlterWith
ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = 0;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_tablet_enable_ttl_file_filter) = false;
constexpr int kCompactionTimeoutSec = 60;
ASSERT_OK(test_client()->FlushTables(
ASSERT_OK(WaitForFlushTables(
{table.table_id()}, /* add_indexes = */ false,
/* timeout_secs = */ kCompactionTimeoutSec, /* is_compaction = */ true));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1461,7 +1461,7 @@ TEST_F(CDCSDKConsistentSnapshotTest, TestReleaseResourcesOnUnpolledSplitTablets)
ASSERT_EQ(tablets.size(), num_tablets);

ASSERT_OK(WriteRowsHelper(100, 200, &test_cluster_, true));
ASSERT_OK(test_client()->FlushTables(
ASSERT_OK(WaitForFlushTables(
{table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30,
/* is_compaction = */ false));

Expand Down
28 changes: 14 additions & 14 deletions src/yb/integration-tests/cdcsdk_consistent_stream-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ TEST_F(CDCSDKConsistentStreamTest,
t1.join();
t2.join();

ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false));

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in
// that order.
Expand Down Expand Up @@ -132,8 +132,8 @@ TEST_F(CDCSDKConsistentStreamTest,
t3.join();
t4.join();

ASSERT_OK(test_client()->FlushTables({table1.table_id()}, false, 1000, false));
ASSERT_OK(test_client()->FlushTables({table2.table_id()}, false, 1000, false));
ASSERT_OK(WaitForFlushTables({table1.table_id()}, false, 1000, false));
ASSERT_OK(WaitForFlushTables({table2.table_id()}, false, 1000, false));

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in
// that order.
Expand Down Expand Up @@ -266,7 +266,7 @@ TEST_F(CDCSDKConsistentStreamTest,
t3.join();
t4.join();

ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false));

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in
// that order.
Expand Down Expand Up @@ -368,7 +368,7 @@ TEST_F(CDCSDKConsistentStreamTest,
t3.join();
t4.join();

ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false));

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in
// that order.
Expand Down Expand Up @@ -451,7 +451,7 @@ TEST_F(CDCSDKConsistentStreamTest,
t3.join();
t4.join();

ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false));

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in
// that order.
Expand Down Expand Up @@ -557,8 +557,8 @@ TEST_F(CDCSDKConsistentStreamTest,
t3.join();
t4.join();

ASSERT_OK(test_client()->FlushTables({table1.table_id()}, false, 1000, false));
ASSERT_OK(test_client()->FlushTables({table2.table_id()}, false, 1000, false));
ASSERT_OK(WaitForFlushTables({table1.table_id()}, false, 1000, false));
ASSERT_OK(WaitForFlushTables({table2.table_id()}, false, 1000, false));

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in
// that order.
Expand Down Expand Up @@ -619,7 +619,7 @@ void CDCSDKConsistentStreamTest::TestCDCSDKConsistentStreamWithTabletSplit(
t1.join();
t2.join();

ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, true));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true));
ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets());
WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table);

Expand All @@ -635,7 +635,7 @@ void CDCSDKConsistentStreamTest::TestCDCSDKConsistentStreamWithTabletSplit(
t3.join();
t4.join();

ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, true));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true));
ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets());

google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets_after_first_split;
Expand Down Expand Up @@ -767,7 +767,7 @@ TEST_F(CDCSDKConsistentStreamTest,

// Committed transactions should change max_op_id.
ASSERT_OK(WriteRowsHelper(0, 100, &test_cluster_, true));
ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, true));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true));

OpId historical_max_op_id;
ASSERT_OK(WaitFor(
Expand Down Expand Up @@ -815,7 +815,7 @@ TEST_F(CDCSDKConsistentStreamTest,
MonoDelta::FromSeconds(5),
"historical_max_op_id should change"));

ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, true));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true));
ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets());
WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table);

Expand Down Expand Up @@ -946,7 +946,7 @@ TEST_F(CDCSDKConsistentStreamTest,
t1.join();
t2.join();

ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false));

auto get_changes_resp = GetAllPendingChangesWithRandomReqSafeTimeChanges(stream_id, tablets);
std::unordered_set<int32_t> seen_unique_pk_values;
Expand Down Expand Up @@ -984,7 +984,7 @@ TEST_F(CDCSDKConsistentStreamTest,

// Commit another transaction while we still have the previous one open.
ASSERT_OK(WriteRowsHelper(100, 200, &test_cluster_, true));
ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false));

uint32 seen_insert_records = 0;
auto update_insert_count = [&](const GetChangesResponsePB& change_resp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestCDCSDKConsistentStreamWithTab
t1.join();
t2.join();

ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, true));
ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true));
ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets());
// Split two tablets.
WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table, 4);
Expand Down
10 changes: 5 additions & 5 deletions src/yb/integration-tests/cdcsdk_snapshot-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ TEST_F(CDCSDKYsqlTest, InsertSingleRowSnapshot) {
ASSERT_FALSE(set_resp.has_error());

ASSERT_OK(WriteRowsHelper(1 /* start */, 2 /* end */, &test_cluster_, true));
ASSERT_OK(test_client()->FlushTables(
ASSERT_OK(WaitForFlushTables(
{table.table_id()}, /* add_indexes = */ false, kFlushTimeoutSecs,
/* is_compaction = */ false));

Expand Down Expand Up @@ -115,7 +115,7 @@ TEST_F(CDCSDKYsqlTest, UpdateInsertedRowSnapshot) {
ASSERT_FALSE(set_resp.has_error());

ASSERT_OK(WriteRowsHelper(1 /* start */, 2 /* end */, &test_cluster_, true));
ASSERT_OK(test_client()->FlushTables(
ASSERT_OK(WaitForFlushTables(
{table.table_id()}, /* add_indexes = */ false, kFlushTimeoutSecs,
/* is_compaction = */ false));
ASSERT_OK(UpdateRows(1 /* key */, 1 /* value */, &test_cluster_));
Expand Down Expand Up @@ -152,7 +152,7 @@ TEST_F(CDCSDKYsqlTest, DeleteInsertedRowSnapshot) {
ASSERT_FALSE(set_resp.has_error());

ASSERT_OK(WriteRowsHelper(1 /* start */, 2 /* end */, &test_cluster_, true));
ASSERT_OK(test_client()->FlushTables(
ASSERT_OK(WaitForFlushTables(
{table.table_id()}, /* add_indexes = */ false, kFlushTimeoutSecs,
/* is_compaction = */ false));
ASSERT_OK(DeleteRows(1 /* key */, &test_cluster_));
Expand Down Expand Up @@ -749,7 +749,7 @@ TEST_F(CDCSDKYsqlTest, TestLeadershipChangeAndSnapshotAffectsCheckpoint) {

ASSERT_OK(WriteRowsHelper(0 /* start */, 200 /* end */, &test_cluster_, true));

ASSERT_OK(test_client()->FlushTables(
ASSERT_OK(WaitForFlushTables(
{table.table_id()}, /* add_indexes = */ false, kFlushTimeoutSecs,
/* is_compaction = */ true));
std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms));
Expand Down Expand Up @@ -896,7 +896,7 @@ TEST_F(CDCSDKYsqlTest, TestSnapshotNoData) {
ASSERT_EQ(change_resp.cdc_sdk_checkpoint().key(), "");

ASSERT_OK(WriteRows(1 /* start */, 1001 /* end */, &test_cluster_));
ASSERT_OK(test_client()->FlushTables(
ASSERT_OK(WaitForFlushTables(
{table.table_id()}, /* add_indexes = */ false, kFlushTimeoutSecs,
/* is_compaction = */ false));

Expand Down
Loading

0 comments on commit 1f64b6e

Please sign in to comment.