Skip to content

Commit

Permalink
Merge pull request #90037 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.2.0-90030

release-22.2.0: changefeedccl: Disable kafka batching retries
  • Loading branch information
miretskiy authored Oct 16, 2022
2 parents 05ac355 + a6daf16 commit 856b492
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 2 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7235,6 +7235,7 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) {
defer log.Scope(t).Close(t)
defer utilccl.TestingEnableEnterprise()()

skip.WithIssue(t, 90029)
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
knobs := f.(*kafkaFeedFactory).knobs
sqlDB := sqlutils.MakeSQLRunner(s.DB)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ var ActiveProtectedTimestampsEnabled = settings.RegisterBoolSetting(
var BatchReductionRetryEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.batch_reduction_retry_enabled",
"if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes",
true,
"*** DO NOT ENABLE ***; if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes",
false,
)

// UseMuxRangeFeed enables the use of MuxRangeFeed RPC.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ func (s *kafkaSink) startInflightMessage(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()

if s.mu.flushErr != nil {
return s.mu.flushErr
}

s.mu.inflight++
if log.V(2) {
log.Infof(ctx, "emitting %d inflight records to kafka", s.mu.inflight)
Expand Down

0 comments on commit 856b492

Please sign in to comment.