From 352931ac145c4d4aa0f67111b198b34211643c2d Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 8 Feb 2022 21:31:55 +0000 Subject: [PATCH] changefeedccl: Increase message size limits for kafka sink. Sarama library, used by kafka sink, limits the maximum message sizes locally. When those limits are exceeded, sarama library returns confusing error message which seems to imply that the remote kafka server rejected the message, even though this rejection happened locally: `kafka server: Message was too large, server rejected it to avoid allocation error.` This PR addresses the problem by increasing sarama limits to 2GB (max int32). An alternative approach was to extend `kafka_sink_config` to specify maximum message size. However, this alternative is less desirable. For one, the user supplied configuration can run afoul other limits imposed by sarama library (e.g. `MaxRequestSize`), so more configuration option must be added. In addition, this really exposes very low level implementation details in the sarama library -- something that we probably should not do. Fixes #76258 Release Notes (enterprise change): Kafka sink supports larger messages, up to 2GB in size. --- pkg/ccl/changefeedccl/sink_kafka.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 3a88dda9b997..734c09efeac4 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -14,6 +14,7 @@ import ( "crypto/x509" "encoding/json" "fmt" + "math" "strings" "sync" "time" @@ -56,6 +57,12 @@ func init() { ctx := context.Background() ctx = logtags.AddTag(ctx, "kafka-producer", nil) sarama.Logger = &kafkaLogAdapter{ctx: ctx} + + // Sarama should not be rejecting messages based on some arbitrary limits. + // This sink already manages its resource usage. Sarama should attempt to deliver + // messages, no matter their size. Of course, the downstream kafka may reject + // those messages, but this rejection should not be done locally. + sarama.MaxRequestSize = math.MaxInt32 } // kafkaClient is a small interface restricting the functionality in sarama.Client @@ -434,6 +441,12 @@ func (j *jsonDuration) UnmarshalJSON(b []byte) error { // Apply configures provided kafka configuration struct based on this config. func (c *saramaConfig) Apply(kafka *sarama.Config) error { + // Sarama limits the size of each message to be MaxMessageSize (1MB) bytes. + // This is silly; This sink already manages its memory, and therefore, if we + // had enough resources to ingest and process this message, then sarama shouldn't + // get in a way. Set this limit to be just a bit under maximum request size. + kafka.Producer.MaxMessageBytes = int(sarama.MaxRequestSize - 1) + kafka.Producer.Flush.Bytes = c.Flush.Bytes kafka.Producer.Flush.Messages = c.Flush.Messages kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)