diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 9c3ab8f23642..eeee34a8ddcb 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -7235,6 +7235,7 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) { defer log.Scope(t).Close(t) defer utilccl.TestingEnableEnterprise()() + skip.WithIssue(t, 90029) testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { knobs := f.(*kafkaFeedFactory).knobs sqlDB := sqlutils.MakeSQLRunner(s.DB) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 8d022374119a..fb9ae3a8bba2 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -220,8 +220,8 @@ var ActiveProtectedTimestampsEnabled = settings.RegisterBoolSetting( var BatchReductionRetryEnabled = settings.RegisterBoolSetting( settings.TenantWritable, "changefeed.batch_reduction_retry_enabled", - "if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes", - true, + "*** DO NOT ENABLE ***; if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes", + false, ) // UseMuxRangeFeed enables the use of MuxRangeFeed RPC. diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index f2ef6832d900..50527e3f68c1 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -400,6 +400,10 @@ func (s *kafkaSink) startInflightMessage(ctx context.Context) error { s.mu.Lock() defer s.mu.Unlock() + if s.mu.flushErr != nil { + return s.mu.flushErr + } + s.mu.inflight++ if log.V(2) { log.Infof(ctx, "emitting %d inflight records to kafka", s.mu.inflight)