Skip to content

Commit

Permalink
Merge pull request #76322 from miretskiy/backport21.1-76265
Browse files Browse the repository at this point in the history
release-21.1: changefeedccl: Increase message size limits for kafka sink.
  • Loading branch information
miretskiy authored Feb 9, 2022
2 parents 0490498 + 2be2fe3 commit 8ca6359
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"hash"
"hash/fnv"
"math"
"net/url"
"strconv"
"strings"
Expand Down Expand Up @@ -323,6 +324,12 @@ func init() {
ctx := context.Background()
ctx = logtags.AddTag(ctx, "kafka-producer", nil)
sarama.Logger = &kafkaLogAdapter{ctx: ctx}

// Sarama should not be rejecting messages based on some arbitrary limits.
// This sink already manages its resource usage. Sarama should attempt to deliver
// messages, no matter their size. Of course, the downstream kafka may reject
// those messages, but this rejection should not be done locally.
sarama.MaxRequestSize = math.MaxInt32
}

type kafkaSinkConfig struct {
Expand Down Expand Up @@ -408,6 +415,12 @@ type saramaConfig struct {
// Configure configures provided kafka configuration struct based
// on this config.
func (c *saramaConfig) Apply(kafka *sarama.Config) error {
// Sarama limits the size of each message to be MaxMessageSize (1MB) bytes.
// This is silly; This sink already manages its memory, and therefore, if we
// had enough resources to ingest and process this message, then sarama shouldn't
// get in a way. Set this limit to be just a bit under maximum request size.
kafka.Producer.MaxMessageBytes = int(sarama.MaxRequestSize - 1)

kafka.Producer.Flush.Bytes = c.Flush.Bytes
kafka.Producer.Flush.Messages = c.Flush.Messages
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)
Expand Down

0 comments on commit 8ca6359

Please sign in to comment.