From a6daf1620f7cc0d3fd99533141853cf66ce616b7 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Sun, 16 Oct 2022 00:22:26 +0000 Subject: [PATCH] changefeedccl: Disable kafka batching retries Kafka batch retry appears to be severely broken. Disable the logic by default, and ensure that errors are propagated immediately upon the next attempt to emit the message. Informs #90029 Release note (enterprise change): changefeed kafka sink no longer automatically retries when emitting message batch that gets rejected by the server. This is a temporary rollback of the functionality. --- pkg/ccl/changefeedccl/changefeed_test.go | 1 + pkg/ccl/changefeedccl/changefeedbase/settings.go | 4 ++-- pkg/ccl/changefeedccl/sink_kafka.go | 4 ++++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 9c3ab8f23642..eeee34a8ddcb 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -7235,6 +7235,7 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) { defer log.Scope(t).Close(t) defer utilccl.TestingEnableEnterprise()() + skip.WithIssue(t, 90029) testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { knobs := f.(*kafkaFeedFactory).knobs sqlDB := sqlutils.MakeSQLRunner(s.DB) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 8d022374119a..fb9ae3a8bba2 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -220,8 +220,8 @@ var ActiveProtectedTimestampsEnabled = settings.RegisterBoolSetting( var BatchReductionRetryEnabled = settings.RegisterBoolSetting( settings.TenantWritable, "changefeed.batch_reduction_retry_enabled", - "if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes", - true, + "*** DO NOT ENABLE ***; if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes", + false, ) // UseMuxRangeFeed enables the use of MuxRangeFeed RPC. diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index f2ef6832d900..50527e3f68c1 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -400,6 +400,10 @@ func (s *kafkaSink) startInflightMessage(ctx context.Context) error { s.mu.Lock() defer s.mu.Unlock() + if s.mu.flushErr != nil { + return s.mu.flushErr + } + s.mu.inflight++ if log.V(2) { log.Infof(ctx, "emitting %d inflight records to kafka", s.mu.inflight)