From 788434a01e2e547a94b0667d58c77ee322f1820a Mon Sep 17 00:00:00 2001 From: Eric Sheng Date: Mon, 11 Mar 2024 11:17:19 -0700 Subject: [PATCH] [#18771, #21352] docdb: Fix LightweightMessage max size when parsing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: LightweightMessage currently sets a maximum size for reading to `rpc_max_message_size` (default 255 MB), but Preparer batches based on `protobuf_message_total_bytes_limit` (default 511 MB). This results in cases where under default settings, we may have protobufs between 255 MB and 511 MB, which `LightweightMessage::ParseFromSlice` is unable to read. This diff changes the limit to use `protobuf_message_total_bytes_limit`, so that protobufs between `rpc_max_message_size` and `protobuf_message_total_bytes_limit` can be parsed properly. This addresses errors such as: ``` Found a corruption in a closed log segment: OK Error: Corruption (yb/consensus/log_util.cc:965): Log file corruption detected.: Failed to parse PB at offset: 26013423, length: 303149529. Cause: Corruption (yb/rpc/lightweight_message.cc:376): Failed to parse ‘entry’: Failed trying to read batch #5 at offset 26013423 for log segment /mnt/d0/yb-data/tserver/wals/table-12345678901234567890123456789012/tablet-12345678901234567890123456789012/wal-000003000: ... ``` (length larger than 255 MB) when such protobufs are written to WALs. This also fixes cause of flakiness for TabletPeerTest.MaxRaftBatchProtobufLimit in TSAN builds. Jira: DB-7654, DB-10251 Test Plan: Jenkins. Added test: ``` yb_build.sh --cxx-test rpc_lwproto-test --gtest_filter LWProtoTest.BigMessage ``` Also ran TabletPeerTest.MaxRaftBatchProtobufLimit 100x on Jenkins. Reviewers: sergei, qhu Reviewed By: qhu Subscribers: yyan, bogdan, rthallam, ybase Differential Revision: https://phorge.dev.yugabyte.com/D33041 --- src/yb/rpc/lightweight_message.cc | 5 +-- src/yb/rpc/lwproto-test.cc | 54 +++++++++++++++++++++++++++---- 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/src/yb/rpc/lightweight_message.cc b/src/yb/rpc/lightweight_message.cc index dfb9e8261e45..954347990f75 100644 --- a/src/yb/rpc/lightweight_message.cc +++ b/src/yb/rpc/lightweight_message.cc @@ -31,6 +31,8 @@ DEFINE_UNKNOWN_uint64(rpc_max_message_size, 255_MB, "The maximum size of a message of any RPC that the server will accept. The sum of " "consensus_max_batch_size_bytes and 1KB should be less than rpc_max_message_size"); +DECLARE_int32(protobuf_message_total_bytes_limit); + using google::protobuf::internal::WireFormatLite; using google::protobuf::io::CodedOutputStream; @@ -378,8 +380,7 @@ Status ParseFailed(const char* field_name) { } void SetupLimit(google::protobuf::io::CodedInputStream* in) { - in->SetTotalBytesLimit(narrow_cast(FLAGS_rpc_max_message_size), - narrow_cast(FLAGS_rpc_max_message_size * 3 / 4)); + in->SetTotalBytesLimit(FLAGS_protobuf_message_total_bytes_limit, 0 /* unused */); } ThreadSafeArena& empty_arena() { diff --git a/src/yb/rpc/lwproto-test.cc b/src/yb/rpc/lwproto-test.cc index 45634f7f2d7f..a814d920007d 100644 --- a/src/yb/rpc/lwproto-test.cc +++ b/src/yb/rpc/lwproto-test.cc @@ -19,11 +19,33 @@ #include "yb/util/faststring.h" #include "yb/util/logging.h" +#include "yb/util/random_util.h" +#include "yb/util/size_literals.h" #include "yb/util/test_macros.h" +DECLARE_int32(protobuf_message_total_bytes_limit); +DECLARE_uint64(rpc_max_message_size); + namespace yb { namespace rpc { +namespace { + +template +Status SerializePB(PB& pb, faststring& buf) { + LOG(INFO) << "Source proto: " << pb.ShortDebugString(); + + AnyMessageConstPtr ptr(&pb); + buf.resize(ptr.SerializedSize()); + RETURN_NOT_OK(ptr.SerializeToArray(buf.data())); + + LOG(INFO) << "Binary dump: " << Slice(buf).ToDebugHexString(); + + return Status::OK(); +} + +} // namespace + // Make sure LW protobuf skips unknown fields. TEST(LWProtoTest, SkipsUnknownFields) { rpc_test::TestObjectPB pb; @@ -37,13 +59,7 @@ TEST(LWProtoTest, SkipsUnknownFields) { pb.set_int32_2(15); pb.mutable_record2()->set_text("record2"); - LOG(INFO) << "Source proto: " << pb.ShortDebugString(); - - AnyMessageConstPtr ptr(&pb); - buf.resize(ptr.SerializedSize()); - ASSERT_OK(ptr.SerializeToArray(buf.data())); - - LOG(INFO) << "Binary dump: " << Slice(buf).ToDebugHexString(); + ASSERT_OK(SerializePB(pb, buf)); } { @@ -82,5 +98,29 @@ TEST(LWProtoTest, SkipsUnknownFields) { } } +// Test a very large proto (rpc_max_message_size < proto size < protobuf_message_total_bytes_limit). +TEST(LWProtoTest, BigMessage) { + faststring buf; + rpc_test::TestObjectPB pb; + + ANNOTATE_UNPROTECTED_WRITE(FLAGS_rpc_max_message_size) = 4_MB; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_protobuf_message_total_bytes_limit) = 8_MB; + + constexpr auto kPBSize = 6_MB; + + pb.set_string1(RandomHumanReadableString(kPBSize)); + ASSERT_OK(SerializePB(pb, buf)); + + ThreadSafeArena arena; + rpc_test::LWTestObjectPBv2 lwpb2(&arena); + AnyMessagePtr ptr(&lwpb2); + + ASSERT_OK(ptr.ParseFromSlice(Slice(buf))); + LOG(INFO) << "Read lightweight proto: " << lwpb2.ShortDebugString(); + + ASSERT_TRUE(lwpb2.has_string1()); + ASSERT_EQ(pb.string1(), lwpb2.string1()); +} + } // namespace rpc } // namespace yb