diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index beb086821bd9..9e2029e3a6f8 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -18,6 +18,7 @@ import ( "fmt" "hash" "hash/fnv" + "math" "net/url" "strconv" "strings" @@ -323,6 +324,12 @@ func init() { ctx := context.Background() ctx = logtags.AddTag(ctx, "kafka-producer", nil) sarama.Logger = &kafkaLogAdapter{ctx: ctx} + + // Sarama should not be rejecting messages based on some arbitrary limits. + // This sink already manages its resource usage. Sarama should attempt to deliver + // messages, no matter their size. Of course, the downstream kafka may reject + // those messages, but this rejection should not be done locally. + sarama.MaxRequestSize = math.MaxInt32 } type kafkaSinkConfig struct { @@ -408,6 +415,12 @@ type saramaConfig struct { // Configure configures provided kafka configuration struct based // on this config. func (c *saramaConfig) Apply(kafka *sarama.Config) error { + // Sarama limits the size of each message to be MaxMessageSize (1MB) bytes. + // This is silly; This sink already manages its memory, and therefore, if we + // had enough resources to ingest and process this message, then sarama shouldn't + // get in a way. Set this limit to be just a bit under maximum request size. + kafka.Producer.MaxMessageBytes = int(sarama.MaxRequestSize - 1) + kafka.Producer.Flush.Bytes = c.Flush.Bytes kafka.Producer.Flush.Messages = c.Flush.Messages kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)