Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test](be-ut) fix load stream test after segment num check #37660

Merged
merged 1 commit into from
Jul 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 111 additions & 41 deletions be/test/runtime/load_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,19 +495,17 @@ class LoadStreamMgrTest : public testing::Test {
: _heavy_work_pool(4, 32, "load_stream_test_heavy"),
_light_work_pool(4, 32, "load_stream_test_light") {}

void close_load(MockSinkClient& client, uint32_t sender_id = NORMAL_SENDER_ID) {
void close_load(MockSinkClient& client, const std::vector<PTabletID>& tablets_to_commit = {},
uint32_t sender_id = NORMAL_SENDER_ID) {
butil::IOBuf append_buf;
PStreamHeader header;
header.mutable_load_id()->set_hi(1);
header.mutable_load_id()->set_lo(1);
header.set_opcode(PStreamHeader::CLOSE_LOAD);
header.set_src_id(sender_id);
/* TODO: fix test with tablets_to_commit
PTabletID* tablets_to_commit = header.add_tablets();
tablets_to_commit->set_partition_id(NORMAL_PARTITION_ID);
tablets_to_commit->set_index_id(NORMAL_INDEX_ID);
tablets_to_commit->set_tablet_id(NORMAL_TABLET_ID);
*/
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets() = tablet;
}
size_t hdr_len = header.ByteSizeLong();
append_buf.append((char*)&hdr_len, sizeof(size_t));
append_buf.append(header.SerializeAsString());
Expand Down Expand Up @@ -680,14 +678,19 @@ TEST_F(LoadStreamMgrTest, one_client_normal) {
write_normal(client);

reset_response_stat();
close_load(client, ABNORMAL_SENDER_ID);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, ABNORMAL_SENDER_ID);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);

close_load(client);
close_load(client, {tablet});
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
Expand Down Expand Up @@ -738,14 +741,19 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_index) {
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);

close_load(client, 1);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(ABNORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -769,17 +777,23 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) {
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);

close_load(client, 1);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);

close_load(client, 0);
// on the final close_load, segment num check will fail
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2);

// server will close stream on CLOSE_LOAD
wait_for_close();
Expand All @@ -799,13 +813,18 @@ TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) {
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], ABNORMAL_TABLET_ID);

close_load(client, 1);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(ABNORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -832,21 +851,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_b
0, data, true);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -873,8 +897,13 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) {
data.length(), data, false);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD before EOS
close_load(client);
close_load(client, {tablet});
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -888,7 +917,7 @@ TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) {
data.length(), data, true);

// duplicated close, will not be handled
close_load(client);
close_load(client, {tablet});
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -915,21 +944,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) {
data.length(), data, true);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
Expand Down Expand Up @@ -959,21 +993,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment_without
0, data, false);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand Down Expand Up @@ -1002,21 +1041,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) {
data.length(), data, true);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand Down Expand Up @@ -1049,21 +1093,26 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_two_segment) {
0, data2, true);

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(2);
// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
Expand Down Expand Up @@ -1101,21 +1150,32 @@ TEST_F(LoadStreamMgrTest, one_client_one_index_three_tablet) {
NORMAL_TABLET_ID + 2, 0, 0, data2, true);

EXPECT_EQ(g_response_stat.num, 0);

PTabletID tablet1;
tablet1.set_partition_id(NORMAL_PARTITION_ID);
tablet1.set_index_id(NORMAL_INDEX_ID);
tablet1.set_tablet_id(NORMAL_TABLET_ID);
tablet1.set_num_segments(1);
PTabletID tablet2 {tablet1};
tablet2.set_tablet_id(NORMAL_TABLET_ID + 1);
PTabletID tablet3 {tablet1};
tablet3.set_tablet_id(NORMAL_TABLET_ID + 2);

// CLOSE_LOAD
close_load(client, 1);
close_load(client, {tablet1, tablet2, tablet3}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(client, 1);
close_load(client, {tablet1, tablet2, tablet3}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(client, 0);
close_load(client, {tablet1, tablet2, tablet3}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 3);
Expand Down Expand Up @@ -1169,22 +1229,27 @@ TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) {
}

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(3);
// CLOSE_LOAD
close_load(clients[1], 1);
close_load(clients[1], {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

// duplicated close
close_load(clients[1], 1);
close_load(clients[1], {tablet}, 1);
wait_for_ack(2);
// stream closed, no response will be sent
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);

close_load(clients[0], 0);
close_load(clients[0], {tablet}, 0);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
Expand Down Expand Up @@ -1239,8 +1304,13 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) {
}

EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(3);
// CLOSE_LOAD
close_load(clients[0], 0);
close_load(clients[0], {tablet}, 0);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
Expand All @@ -1257,7 +1327,7 @@ TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) {
NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true);
}

close_load(clients[1], 1);
close_load(clients[1], {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
Expand Down
Loading