Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.2: changefeedccl: Increase message size limits for kafka sink. #76316

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -56,6 +57,12 @@ func init() {
ctx := context.Background()
ctx = logtags.AddTag(ctx, "kafka-producer", nil)
sarama.Logger = &kafkaLogAdapter{ctx: ctx}

// Sarama should not be rejecting messages based on some arbitrary limits.
// This sink already manages its resource usage. Sarama should attempt to deliver
// messages, no matter their size. Of course, the downstream kafka may reject
// those messages, but this rejection should not be done locally.
sarama.MaxRequestSize = math.MaxInt32
}

// kafkaClient is a small interface restricting the functionality in sarama.Client
Expand Down Expand Up @@ -434,6 +441,12 @@ func (j *jsonDuration) UnmarshalJSON(b []byte) error {

// Apply configures provided kafka configuration struct based on this config.
func (c *saramaConfig) Apply(kafka *sarama.Config) error {
// Sarama limits the size of each message to be MaxMessageSize (1MB) bytes.
// This is silly; This sink already manages its memory, and therefore, if we
// had enough resources to ingest and process this message, then sarama shouldn't
// get in a way. Set this limit to be just a bit under maximum request size.
kafka.Producer.MaxMessageBytes = int(sarama.MaxRequestSize - 1)

kafka.Producer.Flush.Bytes = c.Flush.Bytes
kafka.Producer.Flush.Messages = c.Flush.Messages
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)
Expand Down