diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 3fb7ff1d3858..0e346e5595fe 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -14,6 +14,7 @@ import ( "crypto/x509" "encoding/json" "fmt" + "math" "strings" "sync" "time" @@ -56,6 +57,12 @@ func init() { ctx := context.Background() ctx = logtags.AddTag(ctx, "kafka-producer", nil) sarama.Logger = &kafkaLogAdapter{ctx: ctx} + + // Sarama should not be rejecting messages based on some arbitrary limits. + // This sink already manages its resource usage. Sarama should attempt to deliver + // messages, no matter their size. Of course, the downstream kafka may reject + // those messages, but this rejection should not be done locally. + sarama.MaxRequestSize = math.MaxInt32 } // kafkaClient is a small interface restricting the functionality in sarama.Client @@ -442,6 +449,12 @@ func (j *jsonDuration) UnmarshalJSON(b []byte) error { // Apply configures provided kafka configuration struct based on this config. func (c *saramaConfig) Apply(kafka *sarama.Config) error { + // Sarama limits the size of each message to be MaxMessageSize (1MB) bytes. + // This is silly; This sink already manages its memory, and therefore, if we + // had enough resources to ingest and process this message, then sarama shouldn't + // get in a way. Set this limit to be just a bit under maximum request size. + kafka.Producer.MaxMessageBytes = int(sarama.MaxRequestSize - 1) + kafka.Producer.Flush.Bytes = c.Flush.Bytes kafka.Producer.Flush.Messages = c.Flush.Messages kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)