diff --git a/src/yb/rpc/lightweight_message.cc b/src/yb/rpc/lightweight_message.cc index dfb9e8261e45..954347990f75 100644 --- a/src/yb/rpc/lightweight_message.cc +++ b/src/yb/rpc/lightweight_message.cc @@ -31,6 +31,8 @@ DEFINE_UNKNOWN_uint64(rpc_max_message_size, 255_MB, "The maximum size of a message of any RPC that the server will accept. The sum of " "consensus_max_batch_size_bytes and 1KB should be less than rpc_max_message_size"); +DECLARE_int32(protobuf_message_total_bytes_limit); + using google::protobuf::internal::WireFormatLite; using google::protobuf::io::CodedOutputStream; @@ -378,8 +380,7 @@ Status ParseFailed(const char* field_name) { } void SetupLimit(google::protobuf::io::CodedInputStream* in) { - in->SetTotalBytesLimit(narrow_cast(FLAGS_rpc_max_message_size), - narrow_cast(FLAGS_rpc_max_message_size * 3 / 4)); + in->SetTotalBytesLimit(FLAGS_protobuf_message_total_bytes_limit, 0 /* unused */); } ThreadSafeArena& empty_arena() { diff --git a/src/yb/rpc/lwproto-test.cc b/src/yb/rpc/lwproto-test.cc index 45634f7f2d7f..a814d920007d 100644 --- a/src/yb/rpc/lwproto-test.cc +++ b/src/yb/rpc/lwproto-test.cc @@ -19,11 +19,33 @@ #include "yb/util/faststring.h" #include "yb/util/logging.h" +#include "yb/util/random_util.h" +#include "yb/util/size_literals.h" #include "yb/util/test_macros.h" +DECLARE_int32(protobuf_message_total_bytes_limit); +DECLARE_uint64(rpc_max_message_size); + namespace yb { namespace rpc { +namespace { + +template +Status SerializePB(PB& pb, faststring& buf) { + LOG(INFO) << "Source proto: " << pb.ShortDebugString(); + + AnyMessageConstPtr ptr(&pb); + buf.resize(ptr.SerializedSize()); + RETURN_NOT_OK(ptr.SerializeToArray(buf.data())); + + LOG(INFO) << "Binary dump: " << Slice(buf).ToDebugHexString(); + + return Status::OK(); +} + +} // namespace + // Make sure LW protobuf skips unknown fields. TEST(LWProtoTest, SkipsUnknownFields) { rpc_test::TestObjectPB pb; @@ -37,13 +59,7 @@ TEST(LWProtoTest, SkipsUnknownFields) { pb.set_int32_2(15); pb.mutable_record2()->set_text("record2"); - LOG(INFO) << "Source proto: " << pb.ShortDebugString(); - - AnyMessageConstPtr ptr(&pb); - buf.resize(ptr.SerializedSize()); - ASSERT_OK(ptr.SerializeToArray(buf.data())); - - LOG(INFO) << "Binary dump: " << Slice(buf).ToDebugHexString(); + ASSERT_OK(SerializePB(pb, buf)); } { @@ -82,5 +98,29 @@ TEST(LWProtoTest, SkipsUnknownFields) { } } +// Test a very large proto (rpc_max_message_size < proto size < protobuf_message_total_bytes_limit). +TEST(LWProtoTest, BigMessage) { + faststring buf; + rpc_test::TestObjectPB pb; + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_rpc_max_message_size) = 4_MB; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_protobuf_message_total_bytes_limit) = 8_MB; + + constexpr auto kPBSize = 6_MB; + + pb.set_string1(RandomHumanReadableString(kPBSize)); + ASSERT_OK(SerializePB(pb, buf)); + + ThreadSafeArena arena; + rpc_test::LWTestObjectPBv2 lwpb2(&arena); + AnyMessagePtr ptr(&lwpb2); + + ASSERT_OK(ptr.ParseFromSlice(Slice(buf))); + LOG(INFO) << "Read lightweight proto: " << lwpb2.ShortDebugString(); + + ASSERT_TRUE(lwpb2.has_string1()); + ASSERT_EQ(pb.string1(), lwpb2.string1()); +} + } // namespace rpc } // namespace yb