From 1f64b6ed7f3c93f69416d75ddffaec403dd3f76d Mon Sep 17 00:00:00 2001 From: Devansh Saxena Date: Wed, 6 Mar 2024 22:59:37 +0530 Subject: [PATCH] [#20778] CDCSDK: Add retry logic over FlushTables calls in test Summary: There were several instances of test runs failures with FlushTables client call timing out caused by https://yugabyte.atlassian.net/browse/DB-9925. The fix for the root issue has been landed as part of https://phorge.dev.yugabyte.com/D32215. This diff will add retry function as an wrapper over FlushTables call to retry on specific error. Jira: DB-9776 Test Plan: Jenkins: test regex: .*CDCSDK.* Reviewers: skumar, stiwary, siddharth.shah Reviewed By: skumar Subscribers: ycdcxcluster Differential Revision: https://phorge.dev.yugabyte.com/D32781 --- .../cdcsdk_before_image-test.cc | 6 +- .../cdcsdk_consistent_snapshot-test.cc | 2 +- .../cdcsdk_consistent_stream-test.cc | 28 +-- ...sdk_consumption_consistent_changes-test.cc | 2 +- .../integration-tests/cdcsdk_snapshot-test.cc | 10 +- .../cdcsdk_tablet_split-test.cc | 98 +++++----- src/yb/integration-tests/cdcsdk_ysql-test.cc | 184 +++++++++--------- .../cdcsdk_ysql_test_base.cc | 47 +++-- .../integration-tests/cdcsdk_ysql_test_base.h | 4 + 9 files changed, 204 insertions(+), 177 deletions(-) diff --git a/src/yb/integration-tests/cdcsdk_before_image-test.cc b/src/yb/integration-tests/cdcsdk_before_image-test.cc index 3dcf958df7c3..f255d9a8e7de 100644 --- a/src/yb/integration-tests/cdcsdk_before_image-test.cc +++ b/src/yb/integration-tests/cdcsdk_before_image-test.cc @@ -94,9 +94,7 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestBeforeImageRetention)) LOG(INFO) << "Sleeping to expire files according to TTL (history retention prevents deletion)"; SleepFor(MonoDelta::FromSeconds(2)); - ASSERT_OK(test_client()->FlushTables( - {table.table_id()}, /* add_indexes = */ false, - /* timeout_secs = */ kCompactionTimeoutSec, /* is_compaction = */ true)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, kCompactionTimeoutSec, true)); // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order. const uint32_t expected_count[] = {1, 1, 2, 0, 0, 0}; @@ -1504,7 +1502,7 @@ TEST_F(CDCSDKBeforeImageTest, YB_DISABLE_TEST_IN_TSAN(TestMultipleTableAlterWith ANNOTATE_UNPROTECTED_WRITE(FLAGS_rocksdb_level0_file_num_compaction_trigger) = 0; ANNOTATE_UNPROTECTED_WRITE(FLAGS_tablet_enable_ttl_file_filter) = false; constexpr int kCompactionTimeoutSec = 60; - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ kCompactionTimeoutSec, /* is_compaction = */ true)); diff --git a/src/yb/integration-tests/cdcsdk_consistent_snapshot-test.cc b/src/yb/integration-tests/cdcsdk_consistent_snapshot-test.cc index 49474eb7df7c..a71d36d9d1cd 100644 --- a/src/yb/integration-tests/cdcsdk_consistent_snapshot-test.cc +++ b/src/yb/integration-tests/cdcsdk_consistent_snapshot-test.cc @@ -1461,7 +1461,7 @@ TEST_F(CDCSDKConsistentSnapshotTest, TestReleaseResourcesOnUnpolledSplitTablets) ASSERT_EQ(tablets.size(), num_tablets); ASSERT_OK(WriteRowsHelper(100, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); diff --git a/src/yb/integration-tests/cdcsdk_consistent_stream-test.cc b/src/yb/integration-tests/cdcsdk_consistent_stream-test.cc index b6fbd9429f94..738fa6f59463 100644 --- a/src/yb/integration-tests/cdcsdk_consistent_stream-test.cc +++ b/src/yb/integration-tests/cdcsdk_consistent_stream-test.cc @@ -50,7 +50,7 @@ TEST_F(CDCSDKConsistentStreamTest, t1.join(); t2.join(); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false)); // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in // that order. @@ -132,8 +132,8 @@ TEST_F(CDCSDKConsistentStreamTest, t3.join(); t4.join(); - ASSERT_OK(test_client()->FlushTables({table1.table_id()}, false, 1000, false)); - ASSERT_OK(test_client()->FlushTables({table2.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table1.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table2.table_id()}, false, 1000, false)); // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in // that order. @@ -266,7 +266,7 @@ TEST_F(CDCSDKConsistentStreamTest, t3.join(); t4.join(); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false)); // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in // that order. @@ -368,7 +368,7 @@ TEST_F(CDCSDKConsistentStreamTest, t3.join(); t4.join(); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false)); // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in // that order. @@ -451,7 +451,7 @@ TEST_F(CDCSDKConsistentStreamTest, t3.join(); t4.join(); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false)); // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in // that order. @@ -557,8 +557,8 @@ TEST_F(CDCSDKConsistentStreamTest, t3.join(); t4.join(); - ASSERT_OK(test_client()->FlushTables({table1.table_id()}, false, 1000, false)); - ASSERT_OK(test_client()->FlushTables({table2.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table1.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table2.table_id()}, false, 1000, false)); // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in // that order. @@ -619,7 +619,7 @@ void CDCSDKConsistentStreamTest::TestCDCSDKConsistentStreamWithTabletSplit( t1.join(); t2.join(); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, true)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); @@ -635,7 +635,7 @@ void CDCSDKConsistentStreamTest::TestCDCSDKConsistentStreamWithTabletSplit( t3.join(); t4.join(); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, true)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); google::protobuf::RepeatedPtrField tablets_after_first_split; @@ -767,7 +767,7 @@ TEST_F(CDCSDKConsistentStreamTest, // Committed transactions should change max_op_id. ASSERT_OK(WriteRowsHelper(0, 100, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, true)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true)); OpId historical_max_op_id; ASSERT_OK(WaitFor( @@ -815,7 +815,7 @@ TEST_F(CDCSDKConsistentStreamTest, MonoDelta::FromSeconds(5), "historical_max_op_id should change")); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, true)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); @@ -946,7 +946,7 @@ TEST_F(CDCSDKConsistentStreamTest, t1.join(); t2.join(); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false)); auto get_changes_resp = GetAllPendingChangesWithRandomReqSafeTimeChanges(stream_id, tablets); std::unordered_set seen_unique_pk_values; @@ -984,7 +984,7 @@ TEST_F(CDCSDKConsistentStreamTest, // Commit another transaction while we still have the previous one open. ASSERT_OK(WriteRowsHelper(100, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false)); uint32 seen_insert_records = 0; auto update_insert_count = [&](const GetChangesResponsePB& change_resp) { diff --git a/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc b/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc index c8d2bf660842..f7c537255dac 100644 --- a/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc +++ b/src/yb/integration-tests/cdcsdk_consumption_consistent_changes-test.cc @@ -1029,7 +1029,7 @@ TEST_F(CDCSDKConsumptionConsistentChangesTest, TestCDCSDKConsistentStreamWithTab t1.join(); t2.join(); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, true)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); // Split two tablets. WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table, 4); diff --git a/src/yb/integration-tests/cdcsdk_snapshot-test.cc b/src/yb/integration-tests/cdcsdk_snapshot-test.cc index 6686c95ab72c..fa6952bf6e96 100644 --- a/src/yb/integration-tests/cdcsdk_snapshot-test.cc +++ b/src/yb/integration-tests/cdcsdk_snapshot-test.cc @@ -79,7 +79,7 @@ TEST_F(CDCSDKYsqlTest, InsertSingleRowSnapshot) { ASSERT_FALSE(set_resp.has_error()); ASSERT_OK(WriteRowsHelper(1 /* start */, 2 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, kFlushTimeoutSecs, /* is_compaction = */ false)); @@ -115,7 +115,7 @@ TEST_F(CDCSDKYsqlTest, UpdateInsertedRowSnapshot) { ASSERT_FALSE(set_resp.has_error()); ASSERT_OK(WriteRowsHelper(1 /* start */, 2 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, kFlushTimeoutSecs, /* is_compaction = */ false)); ASSERT_OK(UpdateRows(1 /* key */, 1 /* value */, &test_cluster_)); @@ -152,7 +152,7 @@ TEST_F(CDCSDKYsqlTest, DeleteInsertedRowSnapshot) { ASSERT_FALSE(set_resp.has_error()); ASSERT_OK(WriteRowsHelper(1 /* start */, 2 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, kFlushTimeoutSecs, /* is_compaction = */ false)); ASSERT_OK(DeleteRows(1 /* key */, &test_cluster_)); @@ -749,7 +749,7 @@ TEST_F(CDCSDKYsqlTest, TestLeadershipChangeAndSnapshotAffectsCheckpoint) { ASSERT_OK(WriteRowsHelper(0 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, kFlushTimeoutSecs, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -896,7 +896,7 @@ TEST_F(CDCSDKYsqlTest, TestSnapshotNoData) { ASSERT_EQ(change_resp.cdc_sdk_checkpoint().key(), ""); ASSERT_OK(WriteRows(1 /* start */, 1001 /* end */, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, kFlushTimeoutSecs, /* is_compaction = */ false)); diff --git a/src/yb/integration-tests/cdcsdk_tablet_split-test.cc b/src/yb/integration-tests/cdcsdk_tablet_split-test.cc index 1e4844e6fa85..0d5f1c1a270e 100644 --- a/src/yb/integration-tests/cdcsdk_tablet_split-test.cc +++ b/src/yb/integration-tests/cdcsdk_tablet_split-test.cc @@ -152,7 +152,7 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestTabletSplitWithBeforeI ASSERT_OK(conn.ExecuteFormat( "UPDATE $0 SET $1 = 4 WHERE $2 = 1", kTableName, kValue2ColumnName, kKeyColumnName)); SleepFor(MonoDelta::FromSeconds(2)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -300,11 +300,11 @@ void CDCSDKTabletSplitTest::TestCheckpointPersistencyAfterTabletSplit( ASSERT_FALSE(resp.has_error()); ASSERT_OK(WriteRowsHelper(100, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_OK(WriteRowsHelper(200, 300, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -364,7 +364,7 @@ void CDCSDKTabletSplitTest::TestTransactionInsertAfterTabletSplit( ASSERT_FALSE(resp.has_error()); ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -395,7 +395,7 @@ void CDCSDKTabletSplitTest::TestTransactionInsertAfterTabletSplit( MonoDelta::FromSeconds(90), "GetChanges did not report error for tablet split")); ASSERT_OK(WriteRowsHelper(200, 300, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -432,7 +432,7 @@ void CDCSDKTabletSplitTest::TestGetChangesReportsTabletSplitErrorOnRetries( for (int i = 1; i <= 50; i++) { ASSERT_OK(WriteRowsHelper(i * 100, (i + 1) * 100, &test_cluster_, true)); } - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); @@ -496,7 +496,7 @@ void CDCSDKTabletSplitTest::TestGetChangesAfterTabletSplitWithMasterShutdown( TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -553,7 +553,7 @@ void CDCSDKTabletSplitTest::TestGetChangesOnChildrenOnSplit(CDCCheckpointType ch TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -603,7 +603,7 @@ void CDCSDKTabletSplitTest::TestGetChangesOnParentTabletAfterTabletSplit( TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -664,7 +664,7 @@ void CDCSDKTabletSplitTest::TestGetChangesMultipleStreamsTabletSplit( TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(0, 100, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); @@ -673,7 +673,7 @@ void CDCSDKTabletSplitTest::TestGetChangesMultipleStreamsTabletSplit( ASSERT_RESULT(GetChangesFromCDC(stream_id_1, tablets, &change_resp_1.cdc_sdk_checkpoint())); ASSERT_OK(WriteRowsHelper(100, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); @@ -731,7 +731,7 @@ void CDCSDKTabletSplitTest::TestSetCDCCheckpointAfterTabletSplit( xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamBasedOnCheckpointType(checkpoint_type)); ASSERT_OK(WriteRowsHelper(0, 1000, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -773,7 +773,7 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestTabletSplitBeforeBoots TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -844,7 +844,7 @@ void CDCSDKTabletSplitTest::TestCDCStateTableAfterTabletSplit(CDCCheckpointType TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -909,7 +909,7 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestCDCStateTableAfterTabl TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -996,7 +996,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCAfterTabletSplitReporte TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -1078,7 +1078,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCBeforeTabletSplitReport TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -1120,7 +1120,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCBootstrapWithTabletSpli TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); @@ -1170,7 +1170,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCBootstrapWithTwoTabletS TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); @@ -1179,7 +1179,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCBootstrapWithTwoTabletS LOG(INFO) << "First tablet split succeded on tablet: " << tablets[0].tablet_id(); ASSERT_OK(WriteRowsHelper(200, 400, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); @@ -1231,7 +1231,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCWithTwoTabletSplits( TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); @@ -1239,7 +1239,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCWithTwoTabletSplits( WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); ASSERT_OK(WriteRowsHelper(200, 400, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); @@ -1350,7 +1350,7 @@ void CDCSDKTabletSplitTest::TestTabletSplitOnAddedTableForCDC(CDCCheckpointType ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint())); ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true, 2, "test_table_1")); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table_2_id}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); @@ -1416,7 +1416,7 @@ void CDCSDKTabletSplitTest::TestTabletSplitOnAddedTableForCDCWithMasterRestart( ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint())); ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true, 2, "test_table_1")); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table_2_id}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); @@ -1519,8 +1519,8 @@ void CDCSDKTabletSplitTest::TestTransactionCommitAfterTabletSplit( ASSERT_OK(conn.Execute("BEGIN")); ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( - {table}, /* add_indexes = */ false, /* timeout_secs = */ 30, + ASSERT_OK(WaitForFlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); @@ -1543,7 +1543,7 @@ void CDCSDKTabletSplitTest::TestTransactionCommitAfterTabletSplit( // Commit the trasaction after the tablet split. ASSERT_OK(conn.Execute("COMMIT")); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -1604,7 +1604,7 @@ void CDCSDKTabletSplitTest::TestTabletSplitBeforeBootstrapGetCheckpoint( TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -1646,7 +1646,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCWithOnlyOnePolledChild( TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -1711,12 +1711,12 @@ void CDCSDKTabletSplitTest::TestRecordCountsAfterMultipleTabletSplits( TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRows(0, 200, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 100, false)); WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); ASSERT_OK(WriteRows(200, 400, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 100, false)); google::protobuf::RepeatedPtrField tablets_after_first_split; ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_first_split, nullptr)); @@ -1726,7 +1726,7 @@ void CDCSDKTabletSplitTest::TestRecordCountsAfterMultipleTabletSplits( WaitUntilSplitIsSuccesful(tablets_after_first_split.Get(1).tablet_id(), table, 4); ASSERT_OK(WriteRows(400, 600, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 100, false)); google::protobuf::RepeatedPtrField tablets_after_third_split; ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_third_split, nullptr)); @@ -1735,7 +1735,7 @@ void CDCSDKTabletSplitTest::TestRecordCountsAfterMultipleTabletSplits( WaitUntilSplitIsSuccesful(tablets_after_third_split.Get(1).tablet_id(), table, 5); ASSERT_OK(WriteRows(600, 1000, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 100, false)); const int expected_total_records = 1000; std::map tablet_to_checkpoint; @@ -1767,12 +1767,12 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestSplitAfterSplit)) { TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRows(0, 200, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 100, false)); WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); ASSERT_OK(WriteRows(200, 400, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 100, false)); google::protobuf::RepeatedPtrField tablets_after_first_split; ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_first_split, nullptr)); @@ -1782,7 +1782,7 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestSplitAfterSplit)) { WaitUntilSplitIsSuccesful(tablets_after_first_split.Get(1).tablet_id(), table, 4); // Don't insert anything, just split the tablets further - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 100, false)); google::protobuf::RepeatedPtrField tablets_after_third_split; ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_third_split, nullptr)); @@ -1854,12 +1854,12 @@ void CDCSDKTabletSplitTest::TestRecordCountAfterMultipleTabletSplitsInMultiNodeC TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRows(0, 200, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 100, false)); WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); ASSERT_OK(WriteRows(200, 400, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 100, false)); google::protobuf::RepeatedPtrField tablets_after_first_split; ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_first_split, nullptr)); @@ -1887,6 +1887,7 @@ void CDCSDKTabletSplitTest::TestStreamMetaDataCleanupDropTableAfterTabletSplit( const vector table_list_suffix = {"_1", "_2", "_3"}; const int kNumTables = 3; vector table(kNumTables); + vector table_ids(kNumTables); int idx = 0; vector> tablets(kNumTables); vector tablet_ids_before_split; @@ -1900,6 +1901,7 @@ void CDCSDKTabletSplitTest::TestStreamMetaDataCleanupDropTableAfterTabletSplit( ASSERT_OK(WriteEnumsRows( 0 /* start */, 100 /* end */, &test_cluster_, table_suffix, kNamespaceName, kTableName)); + table_ids[idx] = table[idx].table_id(); idx += 1; } auto stream_id = ASSERT_RESULT(CreateDBStreamBasedOnCheckpointType(checkpoint_type)); @@ -1915,8 +1917,8 @@ void CDCSDKTabletSplitTest::TestStreamMetaDataCleanupDropTableAfterTabletSplit( 100 /* start */, 200 /* end */, &test_cluster_, table_list_suffix[0], kNamespaceName, kTableName)); - ASSERT_OK(test_client()->FlushTables( - table, /* add_indexes = */ false, /* timeout_secs = */ 30, + ASSERT_OK(WaitForFlushTables( + table_ids, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); @@ -2007,7 +2009,7 @@ void CDCSDKTabletSplitTest::TestGetTabletListToPollForCDCWithTabletId( TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); @@ -2068,6 +2070,7 @@ void CDCSDKTabletSplitTest::TestCleanUpCDCStreamsMetadataDuringTabletSplit( const vector table_list_suffix = {"_1", "_2"}; const int kNumTables = 2; vector table(kNumTables); + vector table_ids(kNumTables); int idx = 0; vector> tablets(kNumTables); @@ -2078,6 +2081,7 @@ void CDCSDKTabletSplitTest::TestCleanUpCDCStreamsMetadataDuringTabletSplit( table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr)); ASSERT_OK(WriteEnumsRows( 0 /* start */, 100 /* end */, &test_cluster_, table_suffix, kNamespaceName, kTableName)); + table_ids[idx] = table[idx].table_id(); idx += 1; } auto stream_id = ASSERT_RESULT(CreateDBStreamBasedOnCheckpointType(checkpoint_type)); @@ -2086,8 +2090,8 @@ void CDCSDKTabletSplitTest::TestCleanUpCDCStreamsMetadataDuringTabletSplit( // remains in the cdc_state table after the split is complete. auto resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets[0])); ASSERT_FALSE(resp.has_error()); - ASSERT_OK(test_client()->FlushTables( - table, /* add_indexes = */ false, /* timeout_secs = */ 30, + ASSERT_OK(WaitForFlushTables( + table_ids, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); std::this_thread::sleep_for(std::chrono::milliseconds(FLAGS_aborted_intent_cleanup_ms)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); @@ -2198,8 +2202,8 @@ TEST_F(CDCSDKTabletSplitTest, TestTabletSplitDuringConsistentSnapshot) { if (do_tablet_split) { // ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); - ASSERT_OK(test_client()->FlushTables( - {table}, /* add_indexes = */ false, /* timeout_secs = */ 30, + ASSERT_OK(WaitForFlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); LOG(INFO) << "Tablet split succeded"; @@ -2225,8 +2229,8 @@ TEST_F(CDCSDKTabletSplitTest, TestTabletSplitAfterConsistentSnapshotStreamCreati xrepl::StreamId stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream()); - ASSERT_OK(test_client()->FlushTables( - {table}, /* add_indexes = */ false, /* timeout_secs = */ 30, + ASSERT_OK(WaitForFlushTables( + {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); LOG(INFO) << "Tablet split succeded"; diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 3901e884c08c..4bc9c745db21 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -178,7 +178,7 @@ void CDCSDKYsqlTest::TestCDCLagMetric(CDCCheckpointType checkpoint_type) { // Insert test rows, one at a time so they have different hybrid times. ASSERT_OK(WriteRowsHelper(0, 1, &test_cluster_, true)); ASSERT_OK(WriteRowsHelper(1, 2, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -203,7 +203,7 @@ void CDCSDKYsqlTest::TestCDCLagMetric(CDCCheckpointType checkpoint_type) { SleepFor(MonoDelta::FromSeconds(5)); ASSERT_OK(WriteRowsHelper(3, 4, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -621,7 +621,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(MultiColumnUpdateFollowedByUpdate ASSERT_OK(UpdateRowsHelper( 1 /* start */, 2 /* end */, &test_cluster_, true, 1, col_val_map1, col_val_map2, num_cols)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -677,7 +677,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(MultiColumnUpdateFollowedByDelete ASSERT_OK(UpdateDeleteRowsHelper( 1 /* start */, 2 /* end */, &test_cluster_, true, 1, col_val_map, num_cols)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -728,7 +728,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(MultiColumnUpdateFollowedByUpdate ASSERT_OK(UpdateRowsHelper( 1 /* start */, 2 /* end */, &test_cluster_, true, 1, col_val_map1, col_val_map2, num_cols)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -1243,7 +1243,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCheckPointPersistencyNodeRest // insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -1255,7 +1255,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCheckPointPersistencyNodeRest LOG(INFO) << "Total records read by get change call: " << record_size; ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); // Greater than 100 check because we got records for BEGIN, COMMIT also. @@ -1310,7 +1310,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCleanupSingleStreamSingleTser // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_EQ(DeleteCDCStream(stream_id), true); @@ -1338,7 +1338,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCleanupSingleStreamMultiTserv // insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_EQ(DeleteCDCStream(stream_id), true); @@ -1371,7 +1371,7 @@ TEST_F( // insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_EQ(DeleteCDCStream(stream_id_1), true); @@ -1404,7 +1404,7 @@ TEST_F( // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_EQ(DeleteCDCStream(stream_id_1), true); @@ -1437,7 +1437,7 @@ TEST_F( // insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_EQ(DeleteCDCStream(stream_id_1), true); @@ -1472,7 +1472,7 @@ TEST_F( // insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_EQ(DeleteCDCStream(stream_id_1), true); @@ -1568,7 +1568,7 @@ void CDCSDKYsqlTest::TestMultipleActiveStreamOnSameTablet(CDCCheckpointType chec uint32_t end = 100; for (uint32_t insert_idx = 0; insert_idx < 3; insert_idx++) { ASSERT_OK(WriteRowsHelper(start /* start */, end /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -1788,7 +1788,7 @@ void CDCSDKYsqlTest::TestCheckpointPersistencyAllNodesRestart(CDCCheckpointType // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -1804,7 +1804,7 @@ void CDCSDKYsqlTest::TestCheckpointPersistencyAllNodesRestart(CDCCheckpointType ASSERT_GT(record_size, 100); ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -1875,7 +1875,7 @@ void CDCSDKYsqlTest::TestIntentCountPersistencyAllNodesRestart(CDCCheckpointType // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); GetChangesResponsePB change_resp_1; @@ -1885,12 +1885,12 @@ void CDCSDKYsqlTest::TestIntentCountPersistencyAllNodesRestart(CDCCheckpointType << change_resp_1.cdc_sdk_proto_records_size(); ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_OK(WriteRowsHelper(200 /* start */, 300 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); SleepFor(MonoDelta::FromSeconds(10)); @@ -1967,12 +1967,12 @@ void CDCSDKYsqlTest::TestHighIntentCountPersistencyAllNodesRestart( // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 1 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_OK(WriteRowsHelper(1, 75, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2022,7 +2022,7 @@ void CDCSDKYsqlTest::TestIntentCountPersistencyBootstrap(CDCCheckpointType check // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); GetChangesResponsePB change_resp_1; @@ -2042,7 +2042,7 @@ void CDCSDKYsqlTest::TestIntentCountPersistencyBootstrap(CDCCheckpointType check } ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2114,7 +2114,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestEnum)) { int insert_count = 10; // Insert some records in transaction. ASSERT_OK(WriteEnumsRows(0, insert_count, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2163,7 +2163,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestEnumOnRestart)) { int insert_count = 20; // Insert some records in transaction. ASSERT_OK(WriteEnumsRows(0, insert_count / 2, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2174,7 +2174,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestEnumOnRestart)) { // Insert some more records in transaction. ASSERT_OK(WriteEnumsRows(insert_count / 2, insert_count, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2237,7 +2237,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestEnumMultipleStreams)) { // Insert some records in transaction. ASSERT_OK(WriteEnumsRows(0, insert_count, &test_cluster_, "1")); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table1.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2283,7 +2283,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCompositeType)) { int insert_count = 10; // Insert some records in transaction. ASSERT_OK(WriteCompositeRows(0, insert_count, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2327,7 +2327,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCompositeTypeWithRestart)) { int insert_count = 20; // Insert some records in transaction. ASSERT_OK(WriteCompositeRows(0, insert_count / 2, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2338,7 +2338,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCompositeTypeWithRestart)) { // Insert some more records in transaction. ASSERT_OK(WriteCompositeRows(insert_count / 2, insert_count, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2380,7 +2380,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestNestedCompositeType)) { int insert_count = 10; // Insert some records in transaction. ASSERT_OK(WriteNestedCompositeRows(0, insert_count, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2424,7 +2424,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestArrayCompositeType)) { int insert_count = 10; // Insert some records in transaction. ASSERT_OK(WriteArrayCompositeRows(0, insert_count, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2471,7 +2471,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestRangeCompositeType)) { int insert_count = 10; // Insert some records in transaction. ASSERT_OK(WriteRangeCompositeRows(0, insert_count, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2520,7 +2520,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestRangeArrayCompositeType)) { int insert_count = 10; // Insert some records in transaction. ASSERT_OK(WriteRangeArrayCompositeRows(0, insert_count, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2568,7 +2568,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestTransactionWithLargeBatchSize // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); GetChangesResponsePB change_resp_1; @@ -2577,7 +2577,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestTransactionWithLargeBatchSize // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(100, 500, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2625,7 +2625,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestIntentCountPersistencyAfterCo // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); GetChangesResponsePB change_resp_1; @@ -2633,12 +2633,12 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestIntentCountPersistencyAfterCo LOG(INFO) << "Number of records after first transaction: " << change_resp_1.records().size(); ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_OK(WriteRowsHelper(200 /* start */, 300 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); SleepFor(MonoDelta::FromSeconds(10)); @@ -2726,7 +2726,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestLogGCForNewTablesAddedAfterCr ASSERT_OK(WriteRows(100 /* start */, 200 /* end */, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 100, /* is_compaction = */ false)); @@ -2791,7 +2791,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestLogGCedWithTabletBootStrap)) ASSERT_OK(WriteRows(100 /* start */, 200 /* end */, &test_cluster_)); // SleepFor(MonoDelta::FromSeconds(FLAGS_cdc_min_replicated_index_considered_stale_secs * 2)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 100, /* is_compaction = */ false)); @@ -2861,7 +2861,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestXClusterLogGCedWithTabletBoot ASSERT_FALSE(change_resp_1.has_error()); ASSERT_OK(WriteRows(100 /* start */, 200 /* end */, &test_cluster_)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 100, /* is_compaction = */ false)); @@ -2932,7 +2932,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestEnumWithMultipleTablets)) { } ASSERT_OK(WriteEnumsRows(0, 100, &test_cluster_, tablePrefix[idx], kNamespaceName, kTableName)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3202,14 +3202,14 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDeletedStreamRowRemovedEvenAf // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); GetChangesResponsePB change_resp_1; ASSERT_OK(WaitForGetChangesToFetchRecords(&change_resp_1, stream_id, tablets, 100)); ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3363,7 +3363,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCreateStreamAfterSetCheckpoin // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3484,7 +3484,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderReElect) // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3524,7 +3524,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderReElect) // Insert some records in transaction after first leader stepdown. ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3585,7 +3585,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderRestart) // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3638,7 +3638,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderRestart) // Insert some records in transaction after leader shutdown. ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3666,7 +3666,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWithLeaderRestart) ASSERT_OK(StepDownLeader(first_leader_index, tablets[0].tablet_id())); ASSERT_OK(WriteRowsHelper(200 /* start */, 300 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3708,7 +3708,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKActiveTimeCacheInSyncWi // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3748,7 +3748,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKActiveTimeCacheInSyncWi // Insert some records in transaction after first leader stepdown. ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3798,7 +3798,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWhenAFollowerIsUna // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3811,7 +3811,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKCacheWhenAFollowerIsUna // Insert some records in transaction after leader shutdown. ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3856,7 +3856,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestColocation)) { int insert_count = 30; ASSERT_OK(PopulateColocatedData(&test_cluster_, insert_count)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3914,7 +3914,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestIntentsInColocation)) { int insert_count = 30; ASSERT_OK(PopulateColocatedData(&test_cluster_, insert_count, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -3978,7 +3978,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKLagMetrics)) { // Insert test rows, one at a time so they have different hybrid times. ASSERT_OK(WriteRowsHelper(0, 1, &test_cluster_, true)); ASSERT_OK(WriteRowsHelper(1, 2, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -4028,7 +4028,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKLastSentTimeMetric)) { tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); ASSERT_OK(WriteRowsHelper(0, 1, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -4040,7 +4040,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKLastSentTimeMetric)) { uint64_t last_sent_time = metrics->cdcsdk_last_sent_physicaltime->value(); ASSERT_OK(WriteRowsHelper(1, 2, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -4075,7 +4075,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKExpiryMetric)) { auto cdc_service = dynamic_cast( tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); ASSERT_OK(WriteRowsHelper(1, 100, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -4118,7 +4118,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKTrafficSentMetric)) { auto cdc_service = dynamic_cast( tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); ASSERT_OK(WriteRowsHelper(1, 100, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -4134,7 +4134,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKTrafficSentMetric)) { // Isnert few more records ASSERT_OK(WriteRowsHelper(101, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -4178,7 +4178,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKChangeEventCountMetric) auto cdc_service = dynamic_cast( tserver->rpc_server()->TEST_service_pool("yb.cdc.CDCService")->TEST_get_service().get()); ASSERT_OK(WriteRowsHelper(1, 100, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -4243,7 +4243,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKMetricsTwoTablesSingleS for (uint32_t idx = 0; idx < num_tables; idx++) { ASSERT_OK( WriteRowsHelper(1, 50, &test_cluster_, true, 2, (kTableName + table_suffix[idx]).c_str())); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table[idx].table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -4312,7 +4312,7 @@ TEST_F( int64_t current_traffic_sent_bytes = 0; ASSERT_OK(WriteRowsHelper( 1, 100, &test_cluster_, true, 2, (kTableName + underscore + std::to_string(idx)).c_str())); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table[idx].table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); GetChangesResponsePB change_resp; @@ -4381,7 +4381,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKMetricsTwoTablesTwoStre int64_t current_traffic_sent_bytes = 0; ASSERT_OK(WriteRowsHelper( 1, 100, &test_cluster_, true, 2, (kTableName + underscore + std::to_string(idx)).c_str())); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table[idx].table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -4433,7 +4433,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKMetricsWithAddStream)) int64_t current_traffic_sent_bytes = 0; ASSERT_OK(WriteRowsHelper(1, 100, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -4464,7 +4464,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKMetricsWithAddStream)) current_traffic_sent_bytes = metrics->cdcsdk_traffic_sent->value(); ASSERT_OK(WriteRowsHelper(101, 200, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -5189,7 +5189,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBackwardCompatibillitySupport // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -5231,7 +5231,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBackwardCompatibillitySupport // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -5291,7 +5291,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDDLRecordValidationWithColoca int insert_count = 30; ASSERT_OK(PopulateColocatedData(&test_cluster_, insert_count, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -5356,7 +5356,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBeginCommitRecordValidationWi int insert_count = 30; ASSERT_OK(PopulateColocatedData(&test_cluster_, insert_count, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -5454,7 +5454,7 @@ TEST_F( // Commit the trasaction. ASSERT_OK(conn.Execute("COMMIT")); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -5529,7 +5529,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKLagMetricUnchangedOnEmp // Commit the trasaction. ASSERT_OK(conn.Execute("COMMIT")); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -5653,7 +5653,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCommitTimeOfTransactionRecord // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); GetChangesResponsePB change_resp_1; @@ -5690,7 +5690,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCommitTimeIncreasesForTransac // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(0 /* start */, 100 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); GetChangesResponsePB change_resp_1; @@ -5707,7 +5707,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCommitTimeIncreasesForTransac // Insert some records in transaction. ASSERT_OK(WriteRowsHelper(100 /* start */, 200 /* end */, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_OK(WaitForGetChangesToFetchRecords( @@ -5761,10 +5761,10 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCommitTimeOrderAcrossMultiTab ASSERT_OK(WriteRowsToTwoTables(0, 2, &test_cluster_, true, kTableName, second_table_name)); ASSERT_OK(WriteRowsToTwoTables(2, 4, &test_cluster_, true, kTableName, second_table_name)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {second_table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); LOG(INFO) << "inserted two transactions"; @@ -6291,7 +6291,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestColocationWithMultipleAlterAn ASSERT_OK(conn.ExecuteFormat("INSERT INTO test2 VALUES ($0, $1)", i, i + 1)); } ASSERT_OK(conn.Execute("COMMIT")); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -6360,7 +6360,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestColocationWithMultipleAlterAn ASSERT_OK(AddColumn(&test_cluster_, kNamespaceName, "test1", kValue3ColumnName)); ASSERT_OK(DropColumn(&test_cluster_, kNamespaceName, "test2", kValue3ColumnName)); ASSERT_OK(DropColumn(&test_cluster_, kNamespaceName, "test2", kValue2ColumnName)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -6415,7 +6415,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestColocationWithMultipleAlterAn ASSERT_OK(conn.ExecuteFormat("INSERT INTO test2 VALUES ($0, $1)", i, i + 1)); } ASSERT_OK(conn.Execute("COMMIT")); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -6485,7 +6485,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestColocationWithRepeatedRequest ASSERT_OK(DropColumn(&test_cluster_, kNamespaceName, "test2", kValue3ColumnName)); ASSERT_OK(DropColumn(&test_cluster_, kNamespaceName, "test2", kValue2ColumnName)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -6528,7 +6528,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestColocationWithRepeatedRequest ASSERT_OK(conn.ExecuteFormat("INSERT INTO test2 VALUES ($0, $1)", i, i + 1)); } ASSERT_OK(conn.Execute("COMMIT")); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -6645,10 +6645,10 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestTransactionWithZeroIntents)) } ASSERT_OK(conn.Execute("COMMIT")); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {parent_table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {fk_table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -7242,7 +7242,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestFromOpIdInGetChangesResponse) ASSERT_OK(WriteRowsHelper(30, 80, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false)); const int expected_count[] = {3, 80, 0, 0, 0, 0, 2, 2}; int count[] = {0, 0, 0, 0, 0, 0, 0, 0}; @@ -7309,7 +7309,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAtomicDDLRollback)) { ASSERT_OK(conn.Execute("DELETE FROM test_table where key = 1;")); ASSERT_OK(conn.Execute("COMMIT;")); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false)); // Sleep to ensure that rollback has taken place SleepFor(MonoDelta::FromSeconds(30)); @@ -7365,7 +7365,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestInsertAndUpdateIntentsWithInc ASSERT_OK(conn.Execute("INSERT INTO test_table values (2,2,2,2);")); ASSERT_OK(conn.Execute("COMMIT;")); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false)); // Call getChanges to consume the records auto get_changes_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); @@ -7387,7 +7387,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestInsertAndUpdateIntentsWithInc ASSERT_OK(conn.Execute("UPDATE test_table set col2 = 4, col3 = 4 where col1 = 1;")); ASSERT_OK(conn.Execute("COMMIT;")); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false)); // Call getChanges to consume the records prev_checkpoint = get_changes_resp.cdc_sdk_checkpoint(); @@ -7427,7 +7427,7 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestLargeTxnWithExplicitStream)) // cdc_max_stream_intent_records. const int row_count = 40; ASSERT_OK(WriteRowsHelper(0, row_count, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 1000, false)); + ASSERT_OK(WaitForFlushTables({table.table_id()}, false, 1000, false)); const int expected_count[] = {1, 40, 0, 0, 0, 0, 1, 1}; int count[] = {0, 0, 0, 0, 0, 0, 0, 0}; diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc index 8505179d58bd..86157328db70 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc @@ -334,7 +334,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { "INSERT INTO $0($1, $2) VALUES ($3, $4)", kTableName, kKeyColumnName, kValueColumnName, i, i + 1)); } - RETURN_NOT_OK(test_client()->FlushTables( + RETURN_NOT_OK(WaitForFlushTables( {table_id}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -346,7 +346,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { "UPDATE $0 SET $1 = $2 WHERE $3 = $4", kTableName, kValueColumnName, col_value_pair.second, kKeyColumnName, col_value_pair.first)); } - RETURN_NOT_OK(test_client()->FlushTables( + RETURN_NOT_OK(WaitForFlushTables( {table_id}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2173,7 +2173,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { // After time expired insert few more records if (set_flag_to_a_smaller_value && extend_expiration) { ASSERT_OK(WriteRowsHelper(10, 20, &test_cluster_, true)); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2722,7 +2722,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { ASSERT_OK(WriteRowsHelper( 0 /* start */, 11 /* end */, &test_cluster_, true, 4, kTableName, {kValue2ColumnName, kValue3ColumnName})); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); @@ -2748,7 +2748,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { ASSERT_OK(WriteRowsHelper( 11 /* start */, 21 /* end */, &test_cluster_, true, 5, kTableName, {kValue2ColumnName, kValue3ColumnName, kValue4ColumnName})); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); change_resp = @@ -2910,7 +2910,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { ASSERT_OK(WriteRowsHelper( 1 /* start */, 11 /* end */, &test_cluster_, true, 4, kTableName, {kValue2ColumnName, kValue3ColumnName})); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_OK(DropColumn(&test_cluster_, kNamespaceName, kTableName, kValue2ColumnName)); @@ -2935,7 +2935,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { ASSERT_EQ(tablets.size(), num_tablets); ASSERT_OK(WriteRowsHelper( 11 /* start */, 21 /* end */, &test_cluster_, true, 3, kTableName, {kValue3ColumnName})); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); change_resp = @@ -3045,7 +3045,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { // Insert some records in transaction. ASSERT_OK(WriteRowsHelper( 1 /* start */, 10 /* end */, &test_cluster_, true, 3, kTableName, {kValue2ColumnName})); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); ASSERT_OK(RenameColumn( @@ -3071,7 +3071,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { ASSERT_EQ(tablets.size(), num_tablets); ASSERT_OK(WriteRowsHelper( 11 /* start */, 21 /* end */, &test_cluster_, true, 3, kTableName, {kValue3ColumnName})); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); change_resp = @@ -3206,7 +3206,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { ASSERT_OK(WriteRowsHelper(1 /* start */, 11 /* end */, &test_cluster_, true)); // Call Getchanges - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); GetChangesResponsePB change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); @@ -3238,7 +3238,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { ASSERT_OK(AddColumn(&test_cluster_, kNamespaceName, kTableName, kValue2ColumnName)); ASSERT_OK(WriteRowsHelper( 21 /* start */, 31 /* end */, &test_cluster_, true, 3, kTableName, {kValue2ColumnName})); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); change_resp = @@ -3365,7 +3365,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { ASSERT_OK(WriteRowsHelper(1 /* start */, 101 /* end */, &test_cluster_, true)); // Call Getchanges - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); GetChangesResponsePB change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); @@ -3375,7 +3375,7 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { ASSERT_OK(AddColumn(&test_cluster_, kNamespaceName, kTableName, kValue2ColumnName)); ASSERT_OK(WriteRowsHelper( 101 /* start */, 201 /* end */, &test_cluster_, true, 3, kTableName, {kValue2ColumnName})); - ASSERT_OK(test_client()->FlushTables( + ASSERT_OK(WaitForFlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ false)); change_resp = @@ -3925,6 +3925,27 @@ Result CDCSDKYsqlTest::GetUniverseId(PostgresMiniCluster* cluster) { "Waiting for GetChanges to fetch: " + std::to_string(expected_count) + " records"); } + Status CDCSDKYsqlTest::WaitForFlushTables( + const std::vector& table_ids, bool add_indexes, int timeout_secs, + bool is_compaction) { + RETURN_NOT_OK(WaitFor( + [&]() -> Result { + auto status = test_client()->FlushTables( + table_ids, /* add_indexes = */ add_indexes, + /* timeout_secs = */ timeout_secs, /* is_compaction = */ is_compaction); + if (!status.ok()) { + if (status.IsInternalError()) { + return false; + } else { + RETURN_NOT_OK(status); + } + } + return true; + }, + MonoDelta::FromSeconds(timeout_secs), "Waiting for flush operation to complete")); + return Status::OK(); + } + Status CDCSDKYsqlTest::XreplValidateSplitCandidateTable(const TableId& table_id) { auto& cm = test_cluster_.mini_cluster_->mini_master()->catalog_manager_impl(); auto table = cm.GetTableInfo(table_id); diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index dcabc238a4f7..a283c2f02d15 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -653,6 +653,10 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { const CDCSDKCheckpointPB* cp = nullptr, const int64& safe_hybrid_time = -1, const int& wal_segment_index = 0, const double& timeout_secs = 5); + Status WaitForFlushTables( + const std::vector& table_ids, bool add_indexes, int timeout_secs, + bool is_compaction); + Status XreplValidateSplitCandidateTable(const TableId& table); void LogRetentionBarrierAndRelatedDetails(const GetCheckpointResponsePB& checkpoint_result,