Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
86138: changefeedccl: add non-batching retries to kafka sink r=samiskin a=samiskin

Resolves cockroachdb#80313                                  
                                    
A recurring painpoint of changefeeds is the "message too large" kafka
issue where a fixed size batch may sometimes end up being too many bytes
in size due to varying message sizes, but there was no mechanism to
reduce the batch size dynamically, so the changefeed would simply fail.
                                                                         
This PR adds an internal retry mechanism to the kafka sink, where upon
observing a messagetoolarge error the sink will pause all new incoming
messages, attempt to retry those messages with a progressively smaller
batch size, then once one is found resumes operation with the original
configuration.
                                                                         
Release note (bug fix): Changefeeds emitting to Kafka upon receiving a
"message too large" error will now halve the size of their batches until
it either succeeds or a batch size of 1 fails.

Release justification: high priority bug fix, new code is mostly only ran in 
cases where an unrecoverable error would've occurred previously and 
can be toggled with a cluster setting.


Co-authored-by: Shiranka Miskin <[email protected]>
  • Loading branch information
craig[bot] and samiskin committed Aug 29, 2022
2 parents 8fc4fc1 + 4a1f880 commit 3b16435
Show file tree
Hide file tree
Showing 9 changed files with 629 additions and 94 deletions.
143 changes: 143 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7158,3 +7158,146 @@ func TestSchemachangeDoesNotBreakSinklessFeed(t *testing.T) {

cdcTest(t, testFn, feedTestForceSink("sinkless"))
}

func TestChangefeedKafkaMessageTooLarge(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer utilccl.TestingEnableEnterprise()()

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
knobs := f.(*kafkaFeedFactory).knobs
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`)

t.Run(`succeed eventually if batches are rejected by the server for being too large`, func(t *testing.T) {
// MaxMessages of 0 means unlimited
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"Flush": {"MaxMessages": 0}}'`)
defer closeFeed(t, foo)
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1}}`,
`foo: [2]->{"after": {"a": 2}}`,
})

// Messages should be sent by a smaller and smaller MaxMessages config
// only until ErrMessageSizeTooLarge is no longer returned.
knobs.kafkaInterceptor = func(m *sarama.ProducerMessage, client kafkaClient) error {
maxMessages := client.Config().Producer.Flush.MaxMessages
if maxMessages == 0 || maxMessages >= 250 {
return sarama.ErrMessageSizeTooLarge
}
require.Greater(t, maxMessages, 100)
return nil
}

sqlDB.Exec(t, `INSERT INTO foo VALUES (3)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (4)`)
assertPayloads(t, foo, []string{
`foo: [3]->{"after": {"a": 3}}`,
`foo: [4]->{"after": {"a": 4}}`,
})
sqlDB.Exec(t, `INSERT INTO foo VALUES (5)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (6)`)
assertPayloads(t, foo, []string{
`foo: [5]->{"after": {"a": 5}}`,
`foo: [6]->{"after": {"a": 6}}`,
})
})

t.Run(`succeed against a large backfill`, func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE large (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO large (a) SELECT * FROM generate_series(1, 2000);`)

foo := feed(t, f, `CREATE CHANGEFEED FOR large WITH kafka_sink_config='{"Flush": {"MaxMessages": 1000}}'`)
defer closeFeed(t, foo)

rnd, _ := randutil.NewPseudoRand()

knobs.kafkaInterceptor = func(m *sarama.ProducerMessage, client kafkaClient) error {
if client.Config().Producer.Flush.MaxMessages > 1 && rnd.Int()%5 == 0 {
return sarama.ErrMessageSizeTooLarge
}
return nil
}

var expected []string
for i := 1; i <= 2000; i++ {
expected = append(expected, fmt.Sprintf(
`large: [%d]->{"after": {"a": %d}}`, i, i,
))
}
assertPayloads(t, foo, expected)
})

// Validate that different failure scenarios result in a full changefeed retry
sqlDB.Exec(t, `CREATE TABLE errors (a INT PRIMARY KEY);`)
sqlDB.Exec(t, `INSERT INTO errors (a) SELECT * FROM generate_series(1, 1000);`)
for _, failTest := range []struct {
failInterceptor func(m *sarama.ProducerMessage, client kafkaClient) error
errMsg string
}{
{
func(m *sarama.ProducerMessage, client kafkaClient) error {
return sarama.ErrMessageSizeTooLarge
},
"kafka server: Message was too large, server rejected it to avoid allocation error",
},
{
func(m *sarama.ProducerMessage, client kafkaClient) error {
return errors.Errorf("unrelated error")
},
"unrelated error",
},
{
func(m *sarama.ProducerMessage, client kafkaClient) error {
maxMessages := client.Config().Producer.Flush.MaxMessages
if maxMessages == 0 || maxMessages > 250 {
return sarama.ErrMessageSizeTooLarge
}
return errors.Errorf("unrelated error mid-retry")
},
"unrelated error mid-retry",
},
{
func() func(m *sarama.ProducerMessage, client kafkaClient) error {
// Trigger an internal retry for the first message but have successive
// messages throw a non-retryable error. This can happen in practice
// when the second message is on a different topic to the first.
startedBuffering := false
return func(m *sarama.ProducerMessage, client kafkaClient) error {
if !startedBuffering {
startedBuffering = true
return sarama.ErrMessageSizeTooLarge
}
return errors.Errorf("unrelated error mid-buffering")
}
}(),
"unrelated error mid-buffering",
},
} {
t.Run(fmt.Sprintf(`eventually surface error for retry: %s`, failTest.errMsg), func(t *testing.T) {
knobs.kafkaInterceptor = failTest.failInterceptor
foo := feed(t, f, `CREATE CHANGEFEED FOR errors WITH kafka_sink_config='{"Flush": {"MaxMessages": 0}}'`)
defer closeFeed(t, foo)

feedJob := foo.(cdctest.EnterpriseTestFeed)

// check that running status correctly updates with retryable error
testutils.SucceedsSoon(t, func() error {
status, err := feedJob.FetchRunningStatus()
if err != nil {
return err
}

if !strings.Contains(status, failTest.errMsg) {
return errors.Errorf("expected error to contain '%s', got: %v", failTest.errMsg, status)
}
return nil
})
})
}
}

cdcTest(t, testFn, feedTestForceSink(`kafka`))
}
8 changes: 8 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ var ActiveProtectedTimestampsEnabled = settings.RegisterBoolSetting(
true,
)

// BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors
var BatchReductionRetryEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.batch_reduction_retry_enabled",
"if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes",
true,
)

// UseMuxRangeFeed enables the use of MuxRangeFeed RPC.
var UseMuxRangeFeed = settings.RegisterBoolSetting(
settings.TenantWritable,
Expand Down
121 changes: 79 additions & 42 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,21 @@ const defaultSLIScope = "default"
// AggMetrics are aggregated metrics keeping track of aggregated changefeed performance
// indicators, combined with a limited number of per-changefeed indicators.
type AggMetrics struct {
EmittedMessages *aggmetric.AggCounter
MessageSize *aggmetric.AggHistogram
EmittedBytes *aggmetric.AggCounter
FlushedBytes *aggmetric.AggCounter
BatchHistNanos *aggmetric.AggHistogram
Flushes *aggmetric.AggCounter
FlushHistNanos *aggmetric.AggHistogram
CommitLatency *aggmetric.AggHistogram
BackfillCount *aggmetric.AggGauge
BackfillPendingRanges *aggmetric.AggGauge
ErrorRetries *aggmetric.AggCounter
AdmitLatency *aggmetric.AggHistogram
RunningCount *aggmetric.AggGauge
EmittedMessages *aggmetric.AggCounter
MessageSize *aggmetric.AggHistogram
EmittedBytes *aggmetric.AggCounter
FlushedBytes *aggmetric.AggCounter
BatchHistNanos *aggmetric.AggHistogram
Flushes *aggmetric.AggCounter
FlushHistNanos *aggmetric.AggHistogram
CommitLatency *aggmetric.AggHistogram
BackfillCount *aggmetric.AggGauge
BackfillPendingRanges *aggmetric.AggGauge
ErrorRetries *aggmetric.AggCounter
AdmitLatency *aggmetric.AggHistogram
RunningCount *aggmetric.AggGauge
BatchReductionCount *aggmetric.AggGauge
InternalRetryMessageCount *aggmetric.AggGauge

// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
Expand All @@ -76,6 +78,7 @@ var nilMetricsRecorderBuilder metricsRecorderBuilder = func(_ bool) metricsRecor

type metricsRecorder interface {
recordMessageSize(int64)
recordInternalRetry(int64, bool)
recordOneMessage() recordOneMessageCallback
recordEmittedBatch(startTime time.Time, numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int)
recordResolvedCallback() func()
Expand All @@ -92,19 +95,21 @@ func (a *AggMetrics) MetricStruct() {}

// sliMetrics holds all SLI related metrics aggregated into AggMetrics.
type sliMetrics struct {
EmittedMessages *aggmetric.Counter
MessageSize *aggmetric.Histogram
EmittedBytes *aggmetric.Counter
FlushedBytes *aggmetric.Counter
BatchHistNanos *aggmetric.Histogram
Flushes *aggmetric.Counter
FlushHistNanos *aggmetric.Histogram
CommitLatency *aggmetric.Histogram
ErrorRetries *aggmetric.Counter
AdmitLatency *aggmetric.Histogram
BackfillCount *aggmetric.Gauge
BackfillPendingRanges *aggmetric.Gauge
RunningCount *aggmetric.Gauge
EmittedMessages *aggmetric.Counter
MessageSize *aggmetric.Histogram
EmittedBytes *aggmetric.Counter
FlushedBytes *aggmetric.Counter
BatchHistNanos *aggmetric.Histogram
Flushes *aggmetric.Counter
FlushHistNanos *aggmetric.Histogram
CommitLatency *aggmetric.Histogram
ErrorRetries *aggmetric.Counter
AdmitLatency *aggmetric.Histogram
BackfillCount *aggmetric.Gauge
BackfillPendingRanges *aggmetric.Gauge
RunningCount *aggmetric.Gauge
BatchReductionCount *aggmetric.Gauge
InternalRetryMessageCount *aggmetric.Gauge
}

// sinkDoesNotCompress is a sentinel value indicating the sink
Expand All @@ -131,6 +136,18 @@ func (m *sliMetrics) recordMessageSize(sz int64) {
}
}

func (m *sliMetrics) recordInternalRetry(numMessages int64, reducedBatchSize bool) {
if m == nil {
return
}

if reducedBatchSize {
m.BatchReductionCount.Inc(1)
}

m.InternalRetryMessageCount.Inc(numMessages)
}

func (m *sliMetrics) recordEmittedBatch(
startTime time.Time, numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int,
) {
Expand Down Expand Up @@ -245,6 +262,10 @@ func (w *wrappingCostController) recordMessageSize(sz int64) {
w.inner.recordMessageSize(sz)
}

func (w *wrappingCostController) recordInternalRetry(numMessages int64, reducedBatchSize bool) {
w.inner.recordInternalRetry(numMessages, reducedBatchSize)
}

func (w *wrappingCostController) recordResolvedCallback() func() {
// TODO(ssd): We don't count resolved messages currently. These messages should be relatively
// small and the error here is further in the favor of the user.
Expand Down Expand Up @@ -411,6 +432,18 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaBatchReductionCount := metric.Metadata{
Name: "changefeed.batch_reduction_count",
Help: "Number of times a changefeed aggregator node attempted to reduce the size of message batches it emitted to the sink",
Measurement: "Batch Size Reductions",
Unit: metric.Unit_COUNT,
}
metaInternalRetryMessageCount := metric.Metadata{
Name: "changefeed.internal_retry_message_count",
Help: "Number of messages for which an attempt to retry them within an aggregator node was made",
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
Expand All @@ -431,9 +464,11 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
histogramWindow, commitLatencyMaxValue.Nanoseconds(), 1),
AdmitLatency: b.Histogram(metaAdmitLatency, histogramWindow,
admitLatencyMaxValue.Nanoseconds(), 1),
BackfillCount: b.Gauge(metaChangefeedBackfillCount),
BackfillPendingRanges: b.Gauge(metaChangefeedBackfillPendingRanges),
RunningCount: b.Gauge(metaChangefeedRunning),
BackfillCount: b.Gauge(metaChangefeedBackfillCount),
BackfillPendingRanges: b.Gauge(metaChangefeedBackfillPendingRanges),
RunningCount: b.Gauge(metaChangefeedRunning),
BatchReductionCount: b.Gauge(metaBatchReductionCount),
InternalRetryMessageCount: b.Gauge(metaInternalRetryMessageCount),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -478,19 +513,21 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
}

sm := &sliMetrics{
EmittedMessages: a.EmittedMessages.AddChild(scope),
MessageSize: a.MessageSize.AddChild(scope),
EmittedBytes: a.EmittedBytes.AddChild(scope),
FlushedBytes: a.FlushedBytes.AddChild(scope),
BatchHistNanos: a.BatchHistNanos.AddChild(scope),
Flushes: a.Flushes.AddChild(scope),
FlushHistNanos: a.FlushHistNanos.AddChild(scope),
CommitLatency: a.CommitLatency.AddChild(scope),
ErrorRetries: a.ErrorRetries.AddChild(scope),
AdmitLatency: a.AdmitLatency.AddChild(scope),
BackfillCount: a.BackfillCount.AddChild(scope),
BackfillPendingRanges: a.BackfillPendingRanges.AddChild(scope),
RunningCount: a.RunningCount.AddChild(scope),
EmittedMessages: a.EmittedMessages.AddChild(scope),
MessageSize: a.MessageSize.AddChild(scope),
EmittedBytes: a.EmittedBytes.AddChild(scope),
FlushedBytes: a.FlushedBytes.AddChild(scope),
BatchHistNanos: a.BatchHistNanos.AddChild(scope),
Flushes: a.Flushes.AddChild(scope),
FlushHistNanos: a.FlushHistNanos.AddChild(scope),
CommitLatency: a.CommitLatency.AddChild(scope),
ErrorRetries: a.ErrorRetries.AddChild(scope),
AdmitLatency: a.AdmitLatency.AddChild(scope),
BackfillCount: a.BackfillCount.AddChild(scope),
BackfillPendingRanges: a.BackfillPendingRanges.AddChild(scope),
RunningCount: a.RunningCount.AddChild(scope),
BatchReductionCount: a.BatchReductionCount.AddChild(scope),
InternalRetryMessageCount: a.InternalRetryMessageCount.AddChild(scope),
}

a.mu.sliMetrics[scope] = sm
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func getSink(
return makeNullSink(sinkURL{URL: u}, metricsBuilder(nullIsAccounted))
case u.Scheme == changefeedbase.SinkSchemeKafka:
return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) {
return makeKafkaSink(ctx, sinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(), metricsBuilder)
return makeKafkaSink(ctx, sinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(), serverCfg.Settings, metricsBuilder)
})
case isWebhookSink(u):
webhookOpts, err := opts.GetWebhookSinkOptions()
Expand Down
Loading

0 comments on commit 3b16435

Please sign in to comment.