From 184182cd56e74ee28c1b776f38b0bb40ab486b86 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Thu, 11 Jul 2024 15:50:31 +0800 Subject: [PATCH] [test](be-ut) fix load stream test after segment num check --- be/test/runtime/load_stream_test.cpp | 152 +++++++++++++++++++-------- 1 file changed, 111 insertions(+), 41 deletions(-) diff --git a/be/test/runtime/load_stream_test.cpp b/be/test/runtime/load_stream_test.cpp index b9f22a4e7511ba..b5066ccf1691b0 100644 --- a/be/test/runtime/load_stream_test.cpp +++ b/be/test/runtime/load_stream_test.cpp @@ -495,19 +495,17 @@ class LoadStreamMgrTest : public testing::Test { : _heavy_work_pool(4, 32, "load_stream_test_heavy"), _light_work_pool(4, 32, "load_stream_test_light") {} - void close_load(MockSinkClient& client, uint32_t sender_id = NORMAL_SENDER_ID) { + void close_load(MockSinkClient& client, const std::vector& tablets_to_commit = {}, + uint32_t sender_id = NORMAL_SENDER_ID) { butil::IOBuf append_buf; PStreamHeader header; header.mutable_load_id()->set_hi(1); header.mutable_load_id()->set_lo(1); header.set_opcode(PStreamHeader::CLOSE_LOAD); header.set_src_id(sender_id); - /* TODO: fix test with tablets_to_commit - PTabletID* tablets_to_commit = header.add_tablets(); - tablets_to_commit->set_partition_id(NORMAL_PARTITION_ID); - tablets_to_commit->set_index_id(NORMAL_INDEX_ID); - tablets_to_commit->set_tablet_id(NORMAL_TABLET_ID); - */ + for (const auto& tablet : tablets_to_commit) { + *header.add_tablets() = tablet; + } size_t hdr_len = header.ByteSizeLong(); append_buf.append((char*)&hdr_len, sizeof(size_t)); append_buf.append(header.SerializeAsString()); @@ -680,14 +678,19 @@ TEST_F(LoadStreamMgrTest, one_client_normal) { write_normal(client); reset_response_stat(); - close_load(client, ABNORMAL_SENDER_ID); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); + close_load(client, {tablet}, ABNORMAL_SENDER_ID); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); - close_load(client); + close_load(client, {tablet}); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); @@ -738,14 +741,19 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_index) { EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); - close_load(client, 1); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(ABNORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -769,17 +777,23 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) { EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); - close_load(client, 1); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); - close_load(client, 0); + // on the final close_load, segment num check will fail + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); - EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); + EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2); // server will close stream on CLOSE_LOAD wait_for_close(); @@ -799,13 +813,18 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) { EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); EXPECT_EQ(g_response_stat.failed_tablet_ids[0], ABNORMAL_TABLET_ID); - close_load(client, 1); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(ABNORMAL_TABLET_ID); + tablet.set_num_segments(1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -832,21 +851,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_b 0, data, true); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -873,8 +897,13 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) { data.length(), data, false); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); // CLOSE_LOAD before EOS - close_load(client); + close_load(client, {tablet}); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -888,7 +917,7 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) { data.length(), data, true); // duplicated close, will not be handled - close_load(client); + close_load(client, {tablet}); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -915,21 +944,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) { data.length(), data, true); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); @@ -959,21 +993,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment_without 0, data, false); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -1002,21 +1041,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) { data.length(), data, true); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(1); // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -1049,21 +1093,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_two_segment) { 0, data2, true); EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(2); // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); @@ -1101,21 +1150,32 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_three_tablet) { NORMAL_TABLET_ID + 2, 0, 0, data2, true); EXPECT_EQ(g_response_stat.num, 0); + + PTabletID tablet1; + tablet1.set_partition_id(NORMAL_PARTITION_ID); + tablet1.set_index_id(NORMAL_INDEX_ID); + tablet1.set_tablet_id(NORMAL_TABLET_ID); + tablet1.set_num_segments(1); + PTabletID tablet2 {tablet1}; + tablet2.set_tablet_id(NORMAL_TABLET_ID + 1); + PTabletID tablet3 {tablet1}; + tablet3.set_tablet_id(NORMAL_TABLET_ID + 2); + // CLOSE_LOAD - close_load(client, 1); + close_load(client, {tablet1, tablet2, tablet3}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(client, 1); + close_load(client, {tablet1, tablet2, tablet3}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(client, 0); + close_load(client, {tablet1, tablet2, tablet3}, 0); wait_for_ack(3); EXPECT_EQ(g_response_stat.num, 3); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 3); @@ -1169,22 +1229,27 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) { } EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(3); // CLOSE_LOAD - close_load(clients[1], 1); + close_load(clients[1], {tablet}, 1); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); // duplicated close - close_load(clients[1], 1); + close_load(clients[1], {tablet}, 1); wait_for_ack(2); // stream closed, no response will be sent EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); - close_load(clients[0], 0); + close_load(clients[0], {tablet}, 0); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); @@ -1239,8 +1304,13 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) { } EXPECT_EQ(g_response_stat.num, 0); + PTabletID tablet; + tablet.set_partition_id(NORMAL_PARTITION_ID); + tablet.set_index_id(NORMAL_INDEX_ID); + tablet.set_tablet_id(NORMAL_TABLET_ID); + tablet.set_num_segments(3); // CLOSE_LOAD - close_load(clients[0], 0); + close_load(clients[0], {tablet}, 0); wait_for_ack(1); EXPECT_EQ(g_response_stat.num, 1); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); @@ -1257,7 +1327,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) { NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true); } - close_load(clients[1], 1); + close_load(clients[1], {tablet}, 1); wait_for_ack(2); EXPECT_EQ(g_response_stat.num, 2); EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);