From 2be2fe365e6a55b68ac3024d311e9d6207ba20ad Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 8 Feb 2022 16:31:55 -0500 Subject: [PATCH] changefeedccl: Increase message size limits for kafka sink. Sarama library, used by kafka sink, limits the maximum message sizes locally. When those limits are exceeded, sarama library returns confusing error message which seems to imply that the remote kafka server rejected the message, even though this rejection happened locally: `kafka server: Message was too large, server rejected it to avoid allocation error.` This PR addresses the problem by increasing sarama limits to 2GB (max int32). An alternative approach was to extend `kafka_sink_config` to specify maximum message size. However, this alternative is less desirable. For one, the user supplied configuration can run afoul other limits imposed by sarama library (e.g. `MaxRequestSize`), so more configuration option must be added. In addition, this really exposes very low level implementation details in the sarama library -- something that we probably should not do. Fixes #76258 Release Notes (enterprise change): Kafka sink supports larger messages, up to 2GB in size. --- pkg/ccl/changefeedccl/sink.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index beb086821bd9..9e2029e3a6f8 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -18,6 +18,7 @@ import ( "fmt" "hash" "hash/fnv" + "math" "net/url" "strconv" "strings" @@ -323,6 +324,12 @@ func init() { ctx := context.Background() ctx = logtags.AddTag(ctx, "kafka-producer", nil) sarama.Logger = &kafkaLogAdapter{ctx: ctx} + + // Sarama should not be rejecting messages based on some arbitrary limits. + // This sink already manages its resource usage. Sarama should attempt to deliver + // messages, no matter their size. Of course, the downstream kafka may reject + // those messages, but this rejection should not be done locally. + sarama.MaxRequestSize = math.MaxInt32 } type kafkaSinkConfig struct { @@ -408,6 +415,12 @@ type saramaConfig struct { // Configure configures provided kafka configuration struct based // on this config. func (c *saramaConfig) Apply(kafka *sarama.Config) error { + // Sarama limits the size of each message to be MaxMessageSize (1MB) bytes. + // This is silly; This sink already manages its memory, and therefore, if we + // had enough resources to ingest and process this message, then sarama shouldn't + // get in a way. Set this limit to be just a bit under maximum request size. + kafka.Producer.MaxMessageBytes = int(sarama.MaxRequestSize - 1) + kafka.Producer.Flush.Bytes = c.Flush.Bytes kafka.Producer.Flush.Messages = c.Flush.Messages kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)