Skip to content

Commit

Permalink
changefeedccl: Increase message size limits for kafka sink.
Browse files Browse the repository at this point in the history
Sarama library, used by kafka sink, limits the maximum message
sizes locally. When those limits are exceeded, sarama library
returns confusing error message which seems to imply that the remote
kafka server rejected the message, even though this rejection happened
locally:
   `kafka server: Message was too large, server rejected it to avoid allocation error.`

This PR addresses the problem by increasing sarama limits to 2GB
(max int32).

An alternative approach was to extend `kafka_sink_config` to specify
maximum message size.  However, this alternative is less desirable.
For one, the user supplied configuration can run afoul other limits
imposed by sarama library (e.g. `MaxRequestSize`), so more configuration
option must be added.  In addition, this really exposes very low level
implementation details in the sarama library -- something that we
probably should not do.

Fixes cockroachdb#76258

Release Notes (enterprise change): Kafka sink supports larger messages,
up to 2GB in size.
  • Loading branch information
Yevgeniy Miretskiy authored and RajivTS committed Mar 6, 2022
1 parent 918d80a commit 487db96
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"math"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -56,6 +57,12 @@ func init() {
ctx := context.Background()
ctx = logtags.AddTag(ctx, "kafka-producer", nil)
sarama.Logger = &kafkaLogAdapter{ctx: ctx}

// Sarama should not be rejecting messages based on some arbitrary limits.
// This sink already manages its resource usage. Sarama should attempt to deliver
// messages, no matter their size. Of course, the downstream kafka may reject
// those messages, but this rejection should not be done locally.
sarama.MaxRequestSize = math.MaxInt32
}

// kafkaClient is a small interface restricting the functionality in sarama.Client
Expand Down Expand Up @@ -442,6 +449,12 @@ func (j *jsonDuration) UnmarshalJSON(b []byte) error {

// Apply configures provided kafka configuration struct based on this config.
func (c *saramaConfig) Apply(kafka *sarama.Config) error {
// Sarama limits the size of each message to be MaxMessageSize (1MB) bytes.
// This is silly; This sink already manages its memory, and therefore, if we
// had enough resources to ingest and process this message, then sarama shouldn't
// get in a way. Set this limit to be just a bit under maximum request size.
kafka.Producer.MaxMessageBytes = int(sarama.MaxRequestSize - 1)

kafka.Producer.Flush.Bytes = c.Flush.Bytes
kafka.Producer.Flush.Messages = c.Flush.Messages
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)
Expand Down

0 comments on commit 487db96

Please sign in to comment.