Skip to content

Commit

Permalink
fix: message body size unset after parsed which leads to large io thr…
Browse files Browse the repository at this point in the history
…oughputs (#1008)
  • Loading branch information
padmejin authored Jan 10, 2022
1 parent b54fb9d commit 3d1c988
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 84 deletions.
9 changes: 7 additions & 2 deletions src/runtime/rpc/thrift_message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <dsn/cpp/serialization_helper/thrift_helper.h>
#include <dsn/cpp/serialization_helper/dsn.layer2_types.h>
#include <dsn/cpp/message_utils.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/utility/ports.h>
#include <dsn/utility/crc.h>
#include <dsn/utility/endians.h>
Expand Down Expand Up @@ -199,19 +200,21 @@ message_ex *thrift_message_parser::parse_request_body_v0(message_reader *reader,
return nullptr;
}

buf = buf.range(0, _meta_v0->body_length);
reader->consume_buffer(_meta_v0->body_length);
message_ex *msg = create_message_from_request_blob(buf);
if (msg == nullptr) {
read_next = -1;
reset();
return nullptr;
}

reader->consume_buffer(_meta_v0->body_length);
read_next = (reader->_buffer_occupied >= HEADER_LENGTH_V0
? 0
: HEADER_LENGTH_V0 - reader->_buffer_occupied);

msg->header->body_length = _meta_v0->body_length;
dcheck_eq(msg->header->body_length, msg->buffers[1].size());
msg->header->gpid.set_app_id(_meta_v0->app_id);
msg->header->gpid.set_partition_index(_meta_v0->partition_index);
msg->header->client.timeout_ms = _meta_v0->client_timeout;
Expand Down Expand Up @@ -246,19 +249,21 @@ message_ex *thrift_message_parser::parse_request_body_v1(message_reader *reader,
read_next = _v1_specific_vars->_body_length - buf.size();
return nullptr;
}
buf = buf.range(0, _v1_specific_vars->_body_length);
reader->consume_buffer(_v1_specific_vars->_meta_length + _v1_specific_vars->_body_length);
message_ex *msg = create_message_from_request_blob(buf);
if (msg == nullptr) {
read_next = -1;
reset();
return nullptr;
}

reader->consume_buffer(_v1_specific_vars->_meta_length + _v1_specific_vars->_body_length);
read_next = (reader->_buffer_occupied >= HEADER_LENGTH_V1
? 0
: HEADER_LENGTH_V1 - reader->_buffer_occupied);

msg->header->body_length = _v1_specific_vars->_body_length;
dcheck_eq(msg->header->body_length, msg->buffers[1].size());
msg->header->gpid.set_app_id(_v1_specific_vars->_meta_v1->app_id);
msg->header->gpid.set_partition_index(_v1_specific_vars->_meta_v1->partition_index);
msg->header->client.timeout_ms = _v1_specific_vars->_meta_v1->client_timeout;
Expand Down
187 changes: 105 additions & 82 deletions src/runtime/test/thrift_message_parser_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ DEFINE_TASK_CODE_RPC(RPC_TEST_THRIFT_MESSAGE_PARSER, TASK_PRIORITY_COMMON, THREA
class thrift_message_parser_test : public testing::Test
{
public:
void mock_reader_read_data(message_reader &reader, const std::string &data)
void
mock_reader_read_data(message_reader &reader, const std::string &data, int message_count = 1)
{
char *buf = reader.read_buffer_ptr(data.length());
memcpy(buf, data.c_str(), data.size());
reader.mark_read(data.length());
char *buf = reader.read_buffer_ptr(data.length() * message_count);
for (int i = 0; i < message_count; i++) {
memcpy(buf + i * data.length(), data.c_str(), data.length());
reader.mark_read(data.length());
}
}

void test_get_message_on_receive_v0_data(message_reader &reader,
apache::thrift::protocol::TMessageType messageType,
bool is_request)
bool is_request,
int message_count = 1)
{
/// write rpc message
size_t body_length = 0;
Expand Down Expand Up @@ -72,50 +76,54 @@ class thrift_message_parser_test : public testing::Test
ASSERT_EQ(stream.get_buffer().size(), body_length);
memcpy(&data[48], stream.get_buffer().data(), stream.get_buffer().size());

mock_reader_read_data(reader, data);
msg = parser.get_message_on_receive(&reader, read_next);

if (is_request) {
ASSERT_NE(msg, nullptr);
ASSERT_EQ(msg->hdr_format, NET_HDR_THRIFT);

ASSERT_EQ(msg->header->body_length, body_length);
ASSERT_EQ(msg->header->gpid, gpid(1, 28));
ASSERT_EQ(msg->header->hdr_type, THRIFT_HDR_SIG);
ASSERT_EQ(msg->header->hdr_length, sizeof(message_header));
ASSERT_EQ(msg->header->hdr_crc32, CRC_INVALID);
ASSERT_EQ(msg->header->body_crc32, CRC_INVALID);
ASSERT_EQ(msg->header->id, 999);

ASSERT_EQ(msg->header->client.timeout_ms, 1000);
ASSERT_EQ(msg->header->client.thread_hash, 64);
ASSERT_EQ(msg->header->client.partition_hash, 5000000000);

ASSERT_EQ(msg->header->context.u.is_request, true);
ASSERT_EQ(msg->header->context.u.serialize_format, DSF_THRIFT_BINARY);

// v0 Thrift network format doesn't support message context.
ASSERT_EQ(msg->header->context.u.is_backup_request, false);
ASSERT_EQ(msg->header->context.u.is_forwarded, false);
ASSERT_EQ(msg->header->context.u.is_forward_supported, false);

ASSERT_EQ(reader.buffer().size(), 0);

// must be reset
ASSERT_EQ(parser._header_version, -1);
ASSERT_EQ(parser._v1_specific_vars->_meta_parsed, false);
ASSERT_EQ(parser._v1_specific_vars->_meta_length, 0);
ASSERT_EQ(parser._v1_specific_vars->_body_length, 0);
} else {
ASSERT_EQ(msg, nullptr);
ASSERT_EQ(read_next, -1);
mock_reader_read_data(reader, data, message_count);

for (int i = 0; i < message_count; i++) {
msg = parser.get_message_on_receive(&reader, read_next);

if (is_request) {
ASSERT_NE(msg, nullptr);
ASSERT_EQ(msg->hdr_format, NET_HDR_THRIFT);

ASSERT_EQ(msg->header->body_length, body_length);
ASSERT_EQ(msg->header->gpid, gpid(1, 28));
ASSERT_EQ(msg->header->hdr_type, THRIFT_HDR_SIG);
ASSERT_EQ(msg->header->hdr_length, sizeof(message_header));
ASSERT_EQ(msg->header->hdr_crc32, CRC_INVALID);
ASSERT_EQ(msg->header->body_crc32, CRC_INVALID);
ASSERT_EQ(msg->header->id, 999);

ASSERT_EQ(msg->header->client.timeout_ms, 1000);
ASSERT_EQ(msg->header->client.thread_hash, 64);
ASSERT_EQ(msg->header->client.partition_hash, 5000000000);

ASSERT_EQ(msg->header->context.u.is_request, true);
ASSERT_EQ(msg->header->context.u.serialize_format, DSF_THRIFT_BINARY);

// v0 Thrift network format doesn't support message context.
ASSERT_EQ(msg->header->context.u.is_backup_request, false);
ASSERT_EQ(msg->header->context.u.is_forwarded, false);
ASSERT_EQ(msg->header->context.u.is_forward_supported, false);

ASSERT_EQ(msg->buffers[1].size(), body_length);

// must be reset
ASSERT_EQ(parser._header_version, -1);
ASSERT_EQ(parser._v1_specific_vars->_meta_parsed, false);
ASSERT_EQ(parser._v1_specific_vars->_meta_length, 0);
ASSERT_EQ(parser._v1_specific_vars->_body_length, 0);
} else {
ASSERT_EQ(msg, nullptr);
ASSERT_EQ(read_next, -1);
}
}
}

void test_get_message_on_receive_v1_data(message_reader &reader,
apache::thrift::protocol::TMessageType messageType,
bool is_request,
bool is_backup_request)
bool is_backup_request,
int message_count = 1)
{
/// write rpc message
size_t body_length = 0;
Expand Down Expand Up @@ -165,43 +173,43 @@ class thrift_message_parser_test : public testing::Test
memcpy(&data[16 + meta_length],
body_stream.get_buffer().data(),
body_stream.get_buffer().size());

mock_reader_read_data(reader, data);
ASSERT_EQ(reader.buffer().size(), data.size());
ASSERT_EQ(reader.buffer().size(), 16 + meta_length + body_length);

msg = parser.get_message_on_receive(&reader, read_next);

if (is_request) {
ASSERT_NE(msg, nullptr);
ASSERT_EQ(msg->hdr_format, NET_HDR_THRIFT);

ASSERT_EQ(msg->header->body_length, body_length);
ASSERT_EQ(msg->header->gpid, gpid(1, 28));
ASSERT_EQ(msg->header->hdr_type, THRIFT_HDR_SIG);
ASSERT_EQ(msg->header->hdr_length, sizeof(message_header));
ASSERT_EQ(msg->header->hdr_crc32, CRC_INVALID);
ASSERT_EQ(msg->header->body_crc32, CRC_INVALID);
ASSERT_EQ(msg->header->id, 999);

ASSERT_EQ(msg->header->client.timeout_ms, 1000);
ASSERT_EQ(msg->header->client.thread_hash, 7947);
ASSERT_EQ(msg->header->client.partition_hash, 5000000000);

ASSERT_EQ(msg->header->context.u.is_request, true);
ASSERT_EQ(msg->header->context.u.serialize_format, DSF_THRIFT_BINARY);
ASSERT_EQ(msg->header->context.u.is_backup_request, is_backup_request);
ASSERT_EQ(msg->header->context.u.is_forwarded, false);
ASSERT_EQ(msg->header->context.u.is_forward_supported, false);

ASSERT_EQ(reader.buffer().size(), 0);

// must be reset
ASSERT_EQ(parser._header_version, -1);
ASSERT_EQ(parser._v1_specific_vars->_meta_parsed, false);
} else {
ASSERT_EQ(msg, nullptr);
ASSERT_EQ(read_next, -1);
ASSERT_EQ(16 + meta_length + body_length, data.size());
mock_reader_read_data(reader, data, message_count);
ASSERT_EQ(reader.buffer().size(), data.size() * message_count);

for (int i = 0; i != message_count; ++i) {
msg = parser.get_message_on_receive(&reader, read_next);

if (is_request) {
ASSERT_NE(msg, nullptr);
ASSERT_EQ(msg->hdr_format, NET_HDR_THRIFT);

ASSERT_EQ(msg->header->body_length, body_length);
ASSERT_EQ(msg->header->gpid, gpid(1, 28));
ASSERT_EQ(msg->header->hdr_type, THRIFT_HDR_SIG);
ASSERT_EQ(msg->header->hdr_length, sizeof(message_header));
ASSERT_EQ(msg->header->hdr_crc32, CRC_INVALID);
ASSERT_EQ(msg->header->body_crc32, CRC_INVALID);
ASSERT_EQ(msg->header->id, 999);

ASSERT_EQ(msg->header->client.timeout_ms, 1000);
ASSERT_EQ(msg->header->client.thread_hash, 7947);
ASSERT_EQ(msg->header->client.partition_hash, 5000000000);

ASSERT_EQ(msg->header->context.u.is_request, true);
ASSERT_EQ(msg->header->context.u.serialize_format, DSF_THRIFT_BINARY);
ASSERT_EQ(msg->header->context.u.is_backup_request, is_backup_request);
ASSERT_EQ(msg->header->context.u.is_forwarded, false);
ASSERT_EQ(msg->header->context.u.is_forward_supported, false);

// must be reset
ASSERT_EQ(parser._header_version, -1);
ASSERT_EQ(parser._v1_specific_vars->_meta_parsed, false);
ASSERT_EQ(msg->buffers[1].size(), body_length);
} else {
ASSERT_EQ(msg, nullptr);
ASSERT_EQ(read_next, -1);
}
}
}
};
Expand Down Expand Up @@ -333,10 +341,11 @@ TEST_F(thrift_message_parser_test, get_message_on_receive_v0_not_request)
// ensure server won't corrupt when it receives a non-request.
ASSERT_NO_FATAL_FAILURE(
test_get_message_on_receive_v0_data(reader, apache::thrift::protocol::T_REPLY, false));
reader.truncate_read();
// bad message should be consumed and discarded
ASSERT_EQ(reader.buffer().size(), 0);
ASSERT_NO_FATAL_FAILURE(test_get_message_on_receive_v0_data(
reader, apache::thrift::protocol::TMessageType(65), false));
reader.truncate_read();
ASSERT_EQ(reader.buffer().size(), 0);
}

TEST_F(thrift_message_parser_test, get_message_on_receive_incomplete_v1_hdr)
Expand Down Expand Up @@ -406,4 +415,18 @@ TEST_F(thrift_message_parser_test, get_message_on_receive_v1_data)
reader.truncate_read();
}

TEST_F(thrift_message_parser_test, get_message_on_large_writes_pileup_v0)
{
message_reader reader(4096);
ASSERT_NO_FATAL_FAILURE(
test_get_message_on_receive_v0_data(reader, apache::thrift::protocol::T_CALL, true, 10));
}

TEST_F(thrift_message_parser_test, get_message_on_large_writes_pileup_v1)
{
message_reader reader(4096);
ASSERT_NO_FATAL_FAILURE(test_get_message_on_receive_v1_data(
reader, apache::thrift::protocol::T_CALL, true, true, 10));
}

} // namespace dsn

0 comments on commit 3d1c988

Please sign in to comment.