Skip to content

Commit

Permalink
changefeedccl: Disable kafka batching retries
Browse files Browse the repository at this point in the history
Kafka batch retry appears to be severely broken.
Disable the logic by default, and ensure that
errors are propagated immediately upon the next
attempt to emit the message.

Informs #90029

Release note (enterprise change): changefeed kafka sink
no longer automatically retries when emitting message
batch that gets rejected by the server. This is a temporary
rollback of the functionality.
  • Loading branch information
Yevgeniy Miretskiy committed Oct 16, 2022
1 parent 05ac355 commit a6daf16
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 2 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7235,6 +7235,7 @@ func TestChangefeedKafkaMessageTooLarge(t *testing.T) {
defer log.Scope(t).Close(t)
defer utilccl.TestingEnableEnterprise()()

skip.WithIssue(t, 90029)
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
knobs := f.(*kafkaFeedFactory).knobs
sqlDB := sqlutils.MakeSQLRunner(s.DB)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ var ActiveProtectedTimestampsEnabled = settings.RegisterBoolSetting(
var BatchReductionRetryEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.batch_reduction_retry_enabled",
"if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes",
true,
"*** DO NOT ENABLE ***; if true, kafka changefeeds upon erroring on an oversized batch will attempt to resend the messages with progressively lower batch sizes",
false,
)

// UseMuxRangeFeed enables the use of MuxRangeFeed RPC.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,10 @@ func (s *kafkaSink) startInflightMessage(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()

if s.mu.flushErr != nil {
return s.mu.flushErr
}

s.mu.inflight++
if log.V(2) {
log.Infof(ctx, "emitting %d inflight records to kafka", s.mu.inflight)
Expand Down

0 comments on commit a6daf16

Please sign in to comment.