From 4a1f8807ee5c7dbdf54821511ad64bd4a58e2eb5 Mon Sep 17 00:00:00 2001 From: Shiranka Miskin Date: Tue, 16 Aug 2022 18:35:05 +0000 Subject: [PATCH] changefeedccl: add non-batching retries to kafka sink A recurring painpoint of changefeeds is the "message too large" kafka issue where a fixed size batch may sometimes end up being too many bytes in size due to varying message sizes, but there was no mechanism to reduce the batch size dynamically, so the changefeed would simply fail. This PR adds an internal retry mechanism to the kafka sink, where upon observing a messagetoolarge error the sink will pause all new incoming messages, attempt to retry those messages with a progressively smaller batch size, then once one is found resumes operation with the original configuration. Release note (bug fix): Changefeeds emitting to Kafka upon receiving a "message too large" error will now halve the size of their batches until it either succeeds or a batch size of 1 fails. Release justification: high priority bug fix, new code is mostly only ran in cases where an unrecoverable error would've occurred previously and can be toggled with a cluster setting. --- pkg/ccl/changefeedccl/changefeed_test.go | 143 ++++++++ .../changefeedccl/changefeedbase/settings.go | 8 + pkg/ccl/changefeedccl/metrics.go | 121 ++++--- pkg/ccl/changefeedccl/sink.go | 2 +- pkg/ccl/changefeedccl/sink_kafka.go | 310 ++++++++++++++++-- .../changefeedccl/sink_kafka_connection.go | 2 +- pkg/ccl/changefeedccl/sink_test.go | 35 +- pkg/ccl/changefeedccl/testfeed_test.go | 90 ++++- pkg/ts/catalog/chart_catalog.go | 12 + 9 files changed, 629 insertions(+), 94 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 262aff97ea8b..3ee88fb62ad9 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -7158,3 +7158,146 @@ func TestSchemachangeDoesNotBreakSinklessFeed(t *testing.T) { cdcTest(t, testFn, feedTestForceSink("sinkless")) } + +func TestChangefeedKafkaMessageTooLarge(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer utilccl.TestingEnableEnterprise()() + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + knobs := f.(*kafkaFeedFactory).knobs + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (2)`) + + t.Run(`succeed eventually if batches are rejected by the server for being too large`, func(t *testing.T) { + // MaxMessages of 0 means unlimited + foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"Flush": {"MaxMessages": 0}}'`) + defer closeFeed(t, foo) + assertPayloads(t, foo, []string{ + `foo: [1]->{"after": {"a": 1}}`, + `foo: [2]->{"after": {"a": 2}}`, + }) + + // Messages should be sent by a smaller and smaller MaxMessages config + // only until ErrMessageSizeTooLarge is no longer returned. + knobs.kafkaInterceptor = func(m *sarama.ProducerMessage, client kafkaClient) error { + maxMessages := client.Config().Producer.Flush.MaxMessages + if maxMessages == 0 || maxMessages >= 250 { + return sarama.ErrMessageSizeTooLarge + } + require.Greater(t, maxMessages, 100) + return nil + } + + sqlDB.Exec(t, `INSERT INTO foo VALUES (3)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (4)`) + assertPayloads(t, foo, []string{ + `foo: [3]->{"after": {"a": 3}}`, + `foo: [4]->{"after": {"a": 4}}`, + }) + sqlDB.Exec(t, `INSERT INTO foo VALUES (5)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (6)`) + assertPayloads(t, foo, []string{ + `foo: [5]->{"after": {"a": 5}}`, + `foo: [6]->{"after": {"a": 6}}`, + }) + }) + + t.Run(`succeed against a large backfill`, func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE large (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO large (a) SELECT * FROM generate_series(1, 2000);`) + + foo := feed(t, f, `CREATE CHANGEFEED FOR large WITH kafka_sink_config='{"Flush": {"MaxMessages": 1000}}'`) + defer closeFeed(t, foo) + + rnd, _ := randutil.NewPseudoRand() + + knobs.kafkaInterceptor = func(m *sarama.ProducerMessage, client kafkaClient) error { + if client.Config().Producer.Flush.MaxMessages > 1 && rnd.Int()%5 == 0 { + return sarama.ErrMessageSizeTooLarge + } + return nil + } + + var expected []string + for i := 1; i <= 2000; i++ { + expected = append(expected, fmt.Sprintf( + `large: [%d]->{"after": {"a": %d}}`, i, i, + )) + } + assertPayloads(t, foo, expected) + }) + + // Validate that different failure scenarios result in a full changefeed retry + sqlDB.Exec(t, `CREATE TABLE errors (a INT PRIMARY KEY);`) + sqlDB.Exec(t, `INSERT INTO errors (a) SELECT * FROM generate_series(1, 1000);`) + for _, failTest := range []struct { + failInterceptor func(m *sarama.ProducerMessage, client kafkaClient) error + errMsg string + }{ + { + func(m *sarama.ProducerMessage, client kafkaClient) error { + return sarama.ErrMessageSizeTooLarge + }, + "kafka server: Message was too large, server rejected it to avoid allocation error", + }, + { + func(m *sarama.ProducerMessage, client kafkaClient) error { + return errors.Errorf("unrelated error") + }, + "unrelated error", + }, + { + func(m *sarama.ProducerMessage, client kafkaClient) error { + maxMessages := client.Config().Producer.Flush.MaxMessages + if maxMessages == 0 || maxMessages > 250 { + return sarama.ErrMessageSizeTooLarge + } + return errors.Errorf("unrelated error mid-retry") + }, + "unrelated error mid-retry", + }, + { + func() func(m *sarama.ProducerMessage, client kafkaClient) error { + // Trigger an internal retry for the first message but have successive + // messages throw a non-retryable error. This can happen in practice + // when the second message is on a different topic to the first. + startedBuffering := false + return func(m *sarama.ProducerMessage, client kafkaClient) error { + if !startedBuffering { + startedBuffering = true + return sarama.ErrMessageSizeTooLarge + } + return errors.Errorf("unrelated error mid-buffering") + } + }(), + "unrelated error mid-buffering", + }, + } { + t.Run(fmt.Sprintf(`eventually surface error for retry: %s`, failTest.errMsg), func(t *testing.T) { + knobs.kafkaInterceptor = failTest.failInterceptor + foo := feed(t, f, `CREATE CHANGEFEED FOR errors WITH kafka_sink_config='{"Flush": {"MaxMessages": 0}}'`) + defer closeFeed(t, foo) + + feedJob := foo.(cdctest.EnterpriseTestFeed) + + // check that running status correctly updates with retryable error + testutils.SucceedsSoon(t, func() error { + status, err := feedJob.FetchRunningStatus() + if err != nil { + return err + } + + if !strings.Contains(status, failTest.errMsg) { + return errors.Errorf("expected error to contain '%s', got: %v", failTest.errMsg, status) + } + return nil + }) + }) + } + } + + cdcTest(t, testFn, feedTestForceSink(`kafka`)) +} diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index f4b5f8b57482..61f851c0c5b6 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -215,6 +215,14 @@ var ActiveProtectedTimestampsEnabled = settings.RegisterBoolSetting( true, ) +// BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors +var BatchReductionRetryEnabled = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.batch_reduction_retry_enabled", + "if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes", + true, +) + // UseMuxRangeFeed enables the use of MuxRangeFeed RPC. var UseMuxRangeFeed = settings.RegisterBoolSetting( settings.TenantWritable, diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 302babbde4ab..a97908f13efc 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -44,19 +44,21 @@ const defaultSLIScope = "default" // AggMetrics are aggregated metrics keeping track of aggregated changefeed performance // indicators, combined with a limited number of per-changefeed indicators. type AggMetrics struct { - EmittedMessages *aggmetric.AggCounter - MessageSize *aggmetric.AggHistogram - EmittedBytes *aggmetric.AggCounter - FlushedBytes *aggmetric.AggCounter - BatchHistNanos *aggmetric.AggHistogram - Flushes *aggmetric.AggCounter - FlushHistNanos *aggmetric.AggHistogram - CommitLatency *aggmetric.AggHistogram - BackfillCount *aggmetric.AggGauge - BackfillPendingRanges *aggmetric.AggGauge - ErrorRetries *aggmetric.AggCounter - AdmitLatency *aggmetric.AggHistogram - RunningCount *aggmetric.AggGauge + EmittedMessages *aggmetric.AggCounter + MessageSize *aggmetric.AggHistogram + EmittedBytes *aggmetric.AggCounter + FlushedBytes *aggmetric.AggCounter + BatchHistNanos *aggmetric.AggHistogram + Flushes *aggmetric.AggCounter + FlushHistNanos *aggmetric.AggHistogram + CommitLatency *aggmetric.AggHistogram + BackfillCount *aggmetric.AggGauge + BackfillPendingRanges *aggmetric.AggGauge + ErrorRetries *aggmetric.AggCounter + AdmitLatency *aggmetric.AggHistogram + RunningCount *aggmetric.AggGauge + BatchReductionCount *aggmetric.AggGauge + InternalRetryMessageCount *aggmetric.AggGauge // There is always at least 1 sliMetrics created for defaultSLI scope. mu struct { @@ -76,6 +78,7 @@ var nilMetricsRecorderBuilder metricsRecorderBuilder = func(_ bool) metricsRecor type metricsRecorder interface { recordMessageSize(int64) + recordInternalRetry(int64, bool) recordOneMessage() recordOneMessageCallback recordEmittedBatch(startTime time.Time, numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int) recordResolvedCallback() func() @@ -92,19 +95,21 @@ func (a *AggMetrics) MetricStruct() {} // sliMetrics holds all SLI related metrics aggregated into AggMetrics. type sliMetrics struct { - EmittedMessages *aggmetric.Counter - MessageSize *aggmetric.Histogram - EmittedBytes *aggmetric.Counter - FlushedBytes *aggmetric.Counter - BatchHistNanos *aggmetric.Histogram - Flushes *aggmetric.Counter - FlushHistNanos *aggmetric.Histogram - CommitLatency *aggmetric.Histogram - ErrorRetries *aggmetric.Counter - AdmitLatency *aggmetric.Histogram - BackfillCount *aggmetric.Gauge - BackfillPendingRanges *aggmetric.Gauge - RunningCount *aggmetric.Gauge + EmittedMessages *aggmetric.Counter + MessageSize *aggmetric.Histogram + EmittedBytes *aggmetric.Counter + FlushedBytes *aggmetric.Counter + BatchHistNanos *aggmetric.Histogram + Flushes *aggmetric.Counter + FlushHistNanos *aggmetric.Histogram + CommitLatency *aggmetric.Histogram + ErrorRetries *aggmetric.Counter + AdmitLatency *aggmetric.Histogram + BackfillCount *aggmetric.Gauge + BackfillPendingRanges *aggmetric.Gauge + RunningCount *aggmetric.Gauge + BatchReductionCount *aggmetric.Gauge + InternalRetryMessageCount *aggmetric.Gauge } // sinkDoesNotCompress is a sentinel value indicating the sink @@ -131,6 +136,18 @@ func (m *sliMetrics) recordMessageSize(sz int64) { } } +func (m *sliMetrics) recordInternalRetry(numMessages int64, reducedBatchSize bool) { + if m == nil { + return + } + + if reducedBatchSize { + m.BatchReductionCount.Inc(1) + } + + m.InternalRetryMessageCount.Inc(numMessages) +} + func (m *sliMetrics) recordEmittedBatch( startTime time.Time, numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int, ) { @@ -245,6 +262,10 @@ func (w *wrappingCostController) recordMessageSize(sz int64) { w.inner.recordMessageSize(sz) } +func (w *wrappingCostController) recordInternalRetry(numMessages int64, reducedBatchSize bool) { + w.inner.recordInternalRetry(numMessages, reducedBatchSize) +} + func (w *wrappingCostController) recordResolvedCallback() func() { // TODO(ssd): We don't count resolved messages currently. These messages should be relatively // small and the error here is further in the favor of the user. @@ -411,6 +432,18 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { Measurement: "Bytes", Unit: metric.Unit_BYTES, } + metaBatchReductionCount := metric.Metadata{ + Name: "changefeed.batch_reduction_count", + Help: "Number of times a changefeed aggregator node attempted to reduce the size of message batches it emitted to the sink", + Measurement: "Batch Size Reductions", + Unit: metric.Unit_COUNT, + } + metaInternalRetryMessageCount := metric.Metadata{ + Name: "changefeed.internal_retry_message_count", + Help: "Number of messages for which an attempt to retry them within an aggregator node was made", + Measurement: "Messages", + Unit: metric.Unit_COUNT, + } // NB: When adding new histograms, use sigFigs = 1. Older histograms // retain significant figures of 2. b := aggmetric.MakeBuilder("scope") @@ -431,9 +464,11 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { histogramWindow, commitLatencyMaxValue.Nanoseconds(), 1), AdmitLatency: b.Histogram(metaAdmitLatency, histogramWindow, admitLatencyMaxValue.Nanoseconds(), 1), - BackfillCount: b.Gauge(metaChangefeedBackfillCount), - BackfillPendingRanges: b.Gauge(metaChangefeedBackfillPendingRanges), - RunningCount: b.Gauge(metaChangefeedRunning), + BackfillCount: b.Gauge(metaChangefeedBackfillCount), + BackfillPendingRanges: b.Gauge(metaChangefeedBackfillPendingRanges), + RunningCount: b.Gauge(metaChangefeedRunning), + BatchReductionCount: b.Gauge(metaBatchReductionCount), + InternalRetryMessageCount: b.Gauge(metaInternalRetryMessageCount), } a.mu.sliMetrics = make(map[string]*sliMetrics) _, err := a.getOrCreateScope(defaultSLIScope) @@ -478,19 +513,21 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { } sm := &sliMetrics{ - EmittedMessages: a.EmittedMessages.AddChild(scope), - MessageSize: a.MessageSize.AddChild(scope), - EmittedBytes: a.EmittedBytes.AddChild(scope), - FlushedBytes: a.FlushedBytes.AddChild(scope), - BatchHistNanos: a.BatchHistNanos.AddChild(scope), - Flushes: a.Flushes.AddChild(scope), - FlushHistNanos: a.FlushHistNanos.AddChild(scope), - CommitLatency: a.CommitLatency.AddChild(scope), - ErrorRetries: a.ErrorRetries.AddChild(scope), - AdmitLatency: a.AdmitLatency.AddChild(scope), - BackfillCount: a.BackfillCount.AddChild(scope), - BackfillPendingRanges: a.BackfillPendingRanges.AddChild(scope), - RunningCount: a.RunningCount.AddChild(scope), + EmittedMessages: a.EmittedMessages.AddChild(scope), + MessageSize: a.MessageSize.AddChild(scope), + EmittedBytes: a.EmittedBytes.AddChild(scope), + FlushedBytes: a.FlushedBytes.AddChild(scope), + BatchHistNanos: a.BatchHistNanos.AddChild(scope), + Flushes: a.Flushes.AddChild(scope), + FlushHistNanos: a.FlushHistNanos.AddChild(scope), + CommitLatency: a.CommitLatency.AddChild(scope), + ErrorRetries: a.ErrorRetries.AddChild(scope), + AdmitLatency: a.AdmitLatency.AddChild(scope), + BackfillCount: a.BackfillCount.AddChild(scope), + BackfillPendingRanges: a.BackfillPendingRanges.AddChild(scope), + RunningCount: a.RunningCount.AddChild(scope), + BatchReductionCount: a.BatchReductionCount.AddChild(scope), + InternalRetryMessageCount: a.InternalRetryMessageCount.AddChild(scope), } a.mu.sliMetrics[scope] = sm diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 95f794173e6f..758daaa5eeab 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -165,7 +165,7 @@ func getSink( return makeNullSink(sinkURL{URL: u}, metricsBuilder(nullIsAccounted)) case u.Scheme == changefeedbase.SinkSchemeKafka: return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) { - return makeKafkaSink(ctx, sinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(), metricsBuilder) + return makeKafkaSink(ctx, sinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(), serverCfg.Settings, metricsBuilder) }) case isWebhookSink(u): webhookOpts, err := opts.GetWebhookSinkOptions() diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 3cc2555567f2..ec98631a70bf 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -23,6 +23,7 @@ import ( "github.com/Shopify/sarama" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" @@ -34,10 +35,33 @@ import ( "github.com/cockroachdb/logtags" ) +// maybeLocker is a wrapper around a Locker that allows for successive Unlocks +type maybeLocker struct { + wrapped sync.Locker + locked bool +} + +func (l *maybeLocker) Lock() { + l.wrapped.Lock() + l.locked = true +} +func (l *maybeLocker) Unlock() { + if l.locked { + l.wrapped.Unlock() + l.locked = false + } +} + type kafkaLogAdapter struct { ctx context.Context } +type kafkaSinkKnobs struct { + OverrideClientInit func(config *sarama.Config) (kafkaClient, error) + OverrideAsyncProducerFromClient func(kafkaClient) (sarama.AsyncProducer, error) + OverrideSyncProducerFromClient func(kafkaClient) (sarama.SyncProducer, error) +} + var _ sarama.StdLogger = (*kafkaLogAdapter)(nil) func (l *kafkaLogAdapter) Print(v ...interface{}) { @@ -72,6 +96,8 @@ type kafkaClient interface { // available metadata for those topics. If no topics are provided, it will refresh // metadata for all topics. RefreshMetadata(topics ...string) error + // Config returns the sarama config used on the client + Config() *sarama.Config // Close closes kafka connection. Close() error } @@ -93,6 +119,8 @@ type kafkaSink struct { scratch bufalloc.ByteAllocator metrics metricsRecorder + knobs kafkaSinkKnobs + stats kafkaStats // Only synchronized between the client goroutine and the worker goroutine. @@ -102,6 +130,8 @@ type kafkaSink struct { flushErr error flushCh chan struct{} } + + disableInternalRetry bool } type saramaConfig struct { @@ -167,36 +197,83 @@ func defaultSaramaConfig() *saramaConfig { return config } -func (s *kafkaSink) start() { +// Dial implements the Sink interface. +func (s *kafkaSink) Dial() error { + client, err := s.newClient(s.kafkaCfg) + if err != nil { + return err + } + + producer, err := s.newAsyncProducer(client) + if err != nil { + return err + } + + s.client = client + s.producer = producer + + // Start the worker s.stopWorkerCh = make(chan struct{}) s.worker.Add(1) go s.workerLoop() + return nil } -// Dial implements the Sink interface. -func (s *kafkaSink) Dial() error { - client, err := sarama.NewClient(strings.Split(s.bootstrapAddrs, `,`), s.kafkaCfg) +func (s *kafkaSink) newClient(config *sarama.Config) (kafkaClient, error) { + // Initialize client and producer + if s.knobs.OverrideClientInit != nil { + client, err := s.knobs.OverrideClientInit(config) + return client, err + } + + client, err := sarama.NewClient(strings.Split(s.bootstrapAddrs, `,`), config) if err != nil { - return pgerror.Wrapf(err, pgcode.CannotConnectNow, + return nil, pgerror.Wrapf(err, pgcode.CannotConnectNow, `connecting to kafka: %s`, s.bootstrapAddrs) } - s.producer, err = sarama.NewAsyncProducerFromClient(client) + return client, err +} + +func (s *kafkaSink) newAsyncProducer(client kafkaClient) (sarama.AsyncProducer, error) { + var producer sarama.AsyncProducer + var err error + if s.knobs.OverrideAsyncProducerFromClient != nil { + producer, err = s.knobs.OverrideAsyncProducerFromClient(client) + } else { + producer, err = sarama.NewAsyncProducerFromClient(client.(sarama.Client)) + } if err != nil { - return pgerror.Wrapf(err, pgcode.CannotConnectNow, + return nil, pgerror.Wrapf(err, pgcode.CannotConnectNow, `connecting to kafka: %s`, s.bootstrapAddrs) } - s.client = client - s.start() - return nil + return producer, nil +} + +func (s *kafkaSink) newSyncProducer(client kafkaClient) (sarama.SyncProducer, error) { + var producer sarama.SyncProducer + var err error + if s.knobs.OverrideSyncProducerFromClient != nil { + producer, err = s.knobs.OverrideSyncProducerFromClient(client) + } else { + producer, err = sarama.NewSyncProducerFromClient(client.(sarama.Client)) + } + if err != nil { + return nil, pgerror.Wrapf(err, pgcode.CannotConnectNow, + `connecting to kafka: %s`, s.bootstrapAddrs) + } + return producer, nil } // Close implements the Sink interface. func (s *kafkaSink) Close() error { close(s.stopWorkerCh) s.worker.Wait() - // If we're shutting down, we don't care what happens to the outstanding - // messages, so ignore this error. - _ = s.producer.Close() + + if s.producer != nil { + // Ignore errors related to outstanding messages since we're either shutting + // down or beginning to retry regardless + _ = s.producer.Close() + } // s.client is only nil in tests. if s.client != nil { return s.client.Close() @@ -218,7 +295,6 @@ func (s *kafkaSink) EmitRow( updated, mvcc hlc.Timestamp, alloc kvevent.Alloc, ) error { - topic, err := s.topics.Name(topicDescr) if err != nil { return err @@ -345,9 +421,55 @@ func (s *kafkaSink) emitMessage(ctx context.Context, msg *sarama.ProducerMessage return nil } +// isInternallyRetryable returns true if the sink should attempt to re-emit the +// messages with a non-batching config first rather than surfacing the error to +// the overarching feed. +func (s *kafkaSink) isInternalRetryable(err error) bool { + if s.disableInternalRetry || err == nil { // Avoid allocating a KError if we don't need to + return false + } + var kError sarama.KError + return errors.As(err, &kError) && kError == sarama.ErrMessageSizeTooLarge +} + func (s *kafkaSink) workerLoop() { defer s.worker.Done() + // Locking/Unlocking of s.mu during the retry process must be done + // through a maybeLocker with a deferred Unlock to allow for mu to + // always be unlocked upon worker completion even if it is mid-internal-retry + muLocker := &maybeLocker{&s.mu, false} + defer muLocker.Unlock() + + // For an error like ErrMessageSizeTooLarge, we freeze incoming messages and + // retry internally with a more finely grained client until one is found that + // can successfully send the errored messages. + var retryBuf []*sarama.ProducerMessage + var retryErr error + + startInternalRetry := func(err error) { + s.mu.AssertHeld() + log.Infof( + s.ctx, + "kafka sink with flush config (%+v) beginning internal retry with %d inflight messages due to error: %s", + s.kafkaCfg.Producer.Flush, + s.mu.inflight, + err.Error(), + ) + retryErr = err + // Note that even if we reserve mu.inflight space, it may not be filled as + // some inflight messages won't error (ex: they're for a different topic + // than the one with the original message that errored). + retryBuf = make([]*sarama.ProducerMessage, 0, s.mu.inflight) + } + endInternalRetry := func() { + retryErr = nil + retryBuf = nil + } + isRetrying := func() bool { + return retryErr != nil + } + for { var ackMsg *sarama.ProducerMessage var ackError error @@ -371,29 +493,149 @@ func (s *kafkaSink) workerLoop() { } } - if m, ok := ackMsg.Metadata.(messageMetadata); ok { - if ackError == nil { - sz := ackMsg.Key.Length() + ackMsg.Value.Length() - s.stats.finishMessage(int64(sz)) - m.updateMetrics(m.mvcc, sz, sinkDoesNotCompress) - } - m.alloc.Release(s.ctx) + // If we're in a retry we already had the lock. + if !isRetrying() { + muLocker.Lock() } - - s.mu.Lock() s.mu.inflight-- - if s.mu.flushErr == nil && ackError != nil { - s.mu.flushErr = ackError + + if !isRetrying() && s.isInternalRetryable(ackError) { + startInternalRetry(ackError) } - if s.mu.inflight == 0 && s.mu.flushCh != nil { + // If we're retrying and its a valid but errored message, buffer it to be retried. + isValidMessage := ackMsg != nil && ackMsg.Key != nil && ackMsg.Value != nil + if isRetrying() && isValidMessage { + retryBuf = append(retryBuf, ackMsg) + } else { + s.finishProducerMessage(ackMsg, ackError) + } + + // Once inflight messages to retry are done buffering, find a new client + // that successfully resends and continue on with it. + if isRetrying() && s.mu.inflight == 0 { + if err := s.handleBufferedRetries(retryBuf, retryErr); err != nil { + s.mu.flushErr = err + } + endInternalRetry() + } + + // If we're in a retry inflight can be 0 but messages in retryBuf are yet to + // be resent. + if !isRetrying() && s.mu.inflight == 0 && s.mu.flushCh != nil { s.mu.flushCh <- struct{}{} s.mu.flushCh = nil } - s.mu.Unlock() + + // If we're in a retry we keep hold of the lock to stop all other operations + // until the retry has completed. + if !isRetrying() { + muLocker.Unlock() + } + } +} + +func (s *kafkaSink) finishProducerMessage(ackMsg *sarama.ProducerMessage, ackError error) { + s.mu.AssertHeld() + if m, ok := ackMsg.Metadata.(messageMetadata); ok { + if ackError == nil { + sz := ackMsg.Key.Length() + ackMsg.Value.Length() + s.stats.finishMessage(int64(sz)) + m.updateMetrics(m.mvcc, sz, sinkDoesNotCompress) + } + m.alloc.Release(s.ctx) + } + if s.mu.flushErr == nil && ackError != nil { + s.mu.flushErr = ackError } } +func (s *kafkaSink) handleBufferedRetries(msgs []*sarama.ProducerMessage, retryErr error) error { + lastSendErr := retryErr + activeConfig := s.kafkaCfg + + // Ensure memory for messages are always cleaned up + defer func() { + for _, msg := range msgs { + s.finishProducerMessage(msg, lastSendErr) + } + }() + + for { + select { + case <-s.stopWorkerCh: + log.Infof(s.ctx, "kafka sink ending retries due to worker close") + return lastSendErr + default: + } + + newConfig, wasReduced := reduceBatchingConfig(activeConfig) + + // Surface the error if its not retryable or we weren't able to reduce the + // batching config any further + if !s.isInternalRetryable(lastSendErr) { + log.Infof(s.ctx, "kafka sink abandoning internal retry due to error: %s", lastSendErr.Error()) + return lastSendErr + } else if !wasReduced { + log.Infof(s.ctx, "kafka sink abandoning internal retry due to being unable to reduce batching size") + return lastSendErr + } + + log.Infof(s.ctx, "kafka sink retrying %d messages with reduced flush config: (%+v)", len(msgs), newConfig.Producer.Flush) + activeConfig = newConfig + + newClient, err := s.newClient(newConfig) + if err != nil { + return err + } + newProducer, err := s.newSyncProducer(newClient) + if err != nil { + return err + } + + s.metrics.recordInternalRetry(int64(len(msgs)), true /* reducedBatchSize */) + + // SendMessages will attempt to send all messages into an AsyncProducer with + // the client's config and then block until the results come in. + lastSendErr = newProducer.SendMessages(msgs) + + if err := newProducer.Close(); err != nil { + log.Errorf(s.ctx, "closing of previous sarama producer for retry failed with: %s", err.Error()) + } + if err := newClient.Close(); err != nil { + log.Errorf(s.ctx, "closing of previous sarama client for retry failed with: %s", err.Error()) + } + + if lastSendErr == nil { + log.Infof(s.ctx, "kafka sink internal retry succeeded") + return nil + } + } +} + +func reduceBatchingConfig(c *sarama.Config) (*sarama.Config, bool) { + flooredHalve := func(num int) int { + if num < 2 { + return num + } + return num / 2 + } + + newConfig := *c + + newConfig.Producer.Flush.Messages = flooredHalve(c.Producer.Flush.Messages) + // MaxMessages of 0 means unlimited, so treat "halving" it as reducing it to + // 250 (an arbitrary number) + if c.Producer.Flush.MaxMessages == 0 { + newConfig.Producer.Flush.MaxMessages = 250 + } else { + newConfig.Producer.Flush.MaxMessages = flooredHalve(c.Producer.Flush.MaxMessages) + } + + wasReduced := newConfig.Producer.Flush != c.Producer.Flush + return &newConfig, wasReduced +} + // Topics gives the names of all topics that have been initialized // and will receive resolved timestamps. func (s *kafkaSink) Topics() []string { @@ -646,6 +888,7 @@ func makeKafkaSink( u sinkURL, targets changefeedbase.Targets, jsonStr changefeedbase.SinkSpecificJSONConfig, + settings *cluster.Settings, mb metricsRecorderBuilder, ) (Sink, error) { kafkaTopicPrefix := u.consumeParam(changefeedbase.SinkParamTopicPrefix) @@ -667,12 +910,15 @@ func makeKafkaSink( return nil, err } + internalRetryEnabled := settings != nil && changefeedbase.BatchReductionRetryEnabled.Get(&settings.SV) + sink := &kafkaSink{ - ctx: ctx, - kafkaCfg: config, - bootstrapAddrs: u.Host, - metrics: mb(requiresResourceAccounting), - topics: topics, + ctx: ctx, + kafkaCfg: config, + bootstrapAddrs: u.Host, + metrics: mb(requiresResourceAccounting), + topics: topics, + disableInternalRetry: !internalRetryEnabled, } if unknownParams := u.remainingQueryParams(); len(unknownParams) > 0 { diff --git a/pkg/ccl/changefeedccl/sink_kafka_connection.go b/pkg/ccl/changefeedccl/sink_kafka_connection.go index 00b7ca6b2880..7d80e1fc2942 100644 --- a/pkg/ccl/changefeedccl/sink_kafka_connection.go +++ b/pkg/ccl/changefeedccl/sink_kafka_connection.go @@ -27,7 +27,7 @@ func parseAndValidateKafkaSinkURI( // TODO(adityamaru): When we add `CREATE EXTERNAL CONNECTION ... WITH` support // to accept JSONConfig we should validate that here too. _, err := makeKafkaSink(ctx, sinkURL{URL: uri}, changefeedbase.Targets{}, "", - nilMetricsRecorderBuilder) + nil, nilMetricsRecorderBuilder) if err != nil { return nil, errors.Wrap(err, "invalid Kafka URI") } diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 6b331e3baffb..6d4651ce9ea8 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -69,6 +69,28 @@ func (p *asyncProducerMock) Close() error { return nil } +type syncProducerMock struct { + overrideSend func(*sarama.ProducerMessage) error +} + +func (p *syncProducerMock) SendMessage( + msg *sarama.ProducerMessage, +) (partition int32, offset int64, err error) { + if p.overrideSend != nil { + return 0, 0, p.overrideSend(msg) + } + return 0, 0, nil +} +func (p *syncProducerMock) SendMessages(msgs []*sarama.ProducerMessage) error { + for _, msg := range msgs { + _, _, err := p.SendMessage(msg) + if err != nil { + return err + } + } + return nil +} + // consumeAndSucceed consumes input messages and sends them to successes channel. // Returns function that must be called to stop this consumer // to clean up. The cleanup function must be called before closing asyncProducerMock. @@ -203,10 +225,19 @@ func makeTestKafkaSink( s = &kafkaSink{ ctx: context.Background(), topics: topics, - producer: p, + kafkaCfg: &sarama.Config{}, metrics: (*sliMetrics)(nil), + knobs: kafkaSinkKnobs{ + OverrideAsyncProducerFromClient: func(client kafkaClient) (sarama.AsyncProducer, error) { + return p, nil + }, + OverrideClientInit: func(config *sarama.Config) (kafkaClient, error) { + return nil, nil + }, + }, } - s.start() + err = s.Dial() + require.NoError(t, err) return s, func() { require.NoError(t, s.Close()) diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 09a66ff38750..3dd88e3be07e 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -312,7 +312,6 @@ func (f *jobFeed) WaitForStatus(statusPred func(status jobs.Status) bool) error } if statusPred(jobs.Status(status)) { return nil - } return errors.Newf("still waiting for job status; current %s", status) }) @@ -1086,13 +1085,20 @@ func newTeeGroup() *teeGroup { } // tee reads incoming messages from input channel and sends them out to one or more output channels. -func (tg *teeGroup) tee(in <-chan *sarama.ProducerMessage, out ...chan<- *sarama.ProducerMessage) { +func (tg *teeGroup) tee( + interceptor func(*sarama.ProducerMessage) bool, + in <-chan *sarama.ProducerMessage, + out ...chan<- *sarama.ProducerMessage, +) { tg.g.Go(func() error { for { select { case <-tg.done: return nil case m := <-in: + if interceptor != nil && interceptor(m) { + continue + } for i := range out { select { case <-tg.done: @@ -1111,7 +1117,9 @@ func (tg *teeGroup) wait() error { return tg.g.Wait() } -type fakeKafkaClient struct{} +type fakeKafkaClient struct { + config *sarama.Config +} func (c *fakeKafkaClient) Partitions(topic string) ([]int32, error) { return []int32{0}, nil @@ -1125,22 +1133,36 @@ func (c *fakeKafkaClient) Close() error { return nil } +func (c *fakeKafkaClient) Config() *sarama.Config { + return c.config +} + var _ kafkaClient = (*fakeKafkaClient)(nil) type ignoreCloseProducer struct { *asyncProducerMock + *syncProducerMock } +var _ sarama.AsyncProducer = &ignoreCloseProducer{} +var _ sarama.SyncProducer = &ignoreCloseProducer{} + func (p *ignoreCloseProducer) Close() error { return nil } +// sinkKnobs override behavior for the simulated sink. +type sinkKnobs struct { + kafkaInterceptor func(m *sarama.ProducerMessage, client kafkaClient) error +} + // fakeKafkaSink is a sink that arranges for fake kafka client and producer // to be used. type fakeKafkaSink struct { Sink tg *teeGroup feedCh chan *sarama.ProducerMessage + knobs *sinkKnobs } var _ Sink = (*fakeKafkaSink)(nil) @@ -1148,19 +1170,52 @@ var _ Sink = (*fakeKafkaSink)(nil) // Dial implements Sink interface func (s *fakeKafkaSink) Dial() error { kafka := s.Sink.(*kafkaSink) - kafka.client = &fakeKafkaClient{} - // The producer we give to kafka sink ignores close call. - // This is because normally, kafka sinks owns the producer and so it closes it. - // But in this case, if we let the sink close this producer, the test will panic - // because we will attempt to send acknowledgements on a closed channel. - producer := &ignoreCloseProducer{newAsyncProducerMock(unbuffered)} - - // TODO(yevgeniy): Support error injection either by acknowledging on the "errors" - // channel, or by injecting error message into sarama.ProducerMessage.Metadata. - s.tg.tee(producer.inputCh, s.feedCh, producer.successesCh) - kafka.producer = producer - kafka.start() - return nil + kafka.knobs.OverrideClientInit = func(config *sarama.Config) (kafkaClient, error) { + client := &fakeKafkaClient{config} + return client, nil + } + + kafka.knobs.OverrideAsyncProducerFromClient = func(client kafkaClient) (sarama.AsyncProducer, error) { + // The producer we give to kafka sink ignores close call. + // This is because normally, kafka sinks owns the producer and so it closes it. + // But in this case, if we let the sink close this producer, the test will panic + // because we will attempt to send acknowledgements on a closed channel. + producer := &ignoreCloseProducer{newAsyncProducerMock(100), nil} + + interceptor := func(m *sarama.ProducerMessage) bool { + if s.knobs != nil && s.knobs.kafkaInterceptor != nil { + err := s.knobs.kafkaInterceptor(m, client) + if err != nil { + select { + case producer.errorsCh <- &sarama.ProducerError{Msg: m, Err: err}: + case <-s.tg.done: + } + return true + } + } + return false + } + + s.tg.tee(interceptor, producer.inputCh, s.feedCh, producer.successesCh) + return producer, nil + } + + kafka.knobs.OverrideSyncProducerFromClient = func(client kafkaClient) (sarama.SyncProducer, error) { + return &ignoreCloseProducer{nil, &syncProducerMock{ + overrideSend: func(m *sarama.ProducerMessage) error { + if s.knobs != nil && s.knobs.kafkaInterceptor != nil { + err := s.knobs.kafkaInterceptor(m, client) + if err != nil { + return err + } + } + s.feedCh <- m + return nil + }, + }}, nil + } + + return kafka.Dial() } func (s *fakeKafkaSink) Topics() []string { @@ -1172,6 +1227,7 @@ func (s *fakeKafkaSink) Topics() []string { type kafkaFeedFactory struct { enterpriseFeedFactory + knobs *sinkKnobs } var _ cdctest.TestFeedFactory = (*kafkaFeedFactory)(nil) @@ -1181,6 +1237,7 @@ func makeKafkaFeedFactory( srv serverutils.TestTenantInterface, db *gosql.DB, ) cdctest.TestFeedFactory { return &kafkaFeedFactory{ + knobs: &sinkKnobs{}, enterpriseFeedFactory: enterpriseFeedFactory{ s: srv, db: db, @@ -1268,6 +1325,7 @@ func (k *kafkaFeedFactory) Feed(create string, args ...interface{}) (cdctest.Tes Sink: s, tg: tg, feedCh: feedCh, + knobs: k.knobs, } } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 4963b4adf005..fec688d44325 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -1477,6 +1477,18 @@ var charts = []sectionDescription{ "changefeed.flush.messages_pushback_nanos", }, }, + { + Title: "Batching", + Metrics: []string{ + "changefeed.batch_reduction_count", + }, + }, + { + Title: "Internal Retries", + Metrics: []string{ + "changefeed.internal_retry_message_count", + }, + }, }, }, {