From a1d10774d652b1dbca2dd8bb46eda812ddc1cc02 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 18 Oct 2022 16:53:14 -0600 Subject: [PATCH] kgo testing: fix Kraft sometimes returns UNKNOWN_TOPIC_OR_PARTITION for a while even after it replies to a create topics request with success. We now retry these errors forever. Also, something about Kraft sometimes hangs when transitioning transaction states. We need RequireStableFetchOffsets to ensure our tests do not fail. --- pkg/kgo/group_test.go | 2 ++ pkg/kgo/txn_test.go | 14 ++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index 27cd0abd..2605b6a2 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -39,6 +39,7 @@ func TestGroupETL(t *testing.T) { cl, _ := NewClient( WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), MaxBufferedRecords(10000), + UnknownTopicRetries(-1), // see txn_test comment ) defer cl.Close() @@ -116,6 +117,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { netls := 0 // for if etlsBeforeQuit is non-negative opts := []Opt{ + UnknownTopicRetries(-1), // see txn_test comment WithLogger(testLogger()), ConsumerGroup(c.group), ConsumeTopics(c.consumeFrom), diff --git a/pkg/kgo/txn_test.go b/pkg/kgo/txn_test.go index 6fe2564f..7a840eb5 100644 --- a/pkg/kgo/txn_test.go +++ b/pkg/kgo/txn_test.go @@ -32,6 +32,7 @@ func TestTxnEtl(t *testing.T) { TransactionalID("p"+randsha()), TransactionTimeout(2*time.Minute), MaxBufferedRecords(10000), + UnknownTopicRetries(-1), // see comment below ) if err != nil { panic(err) @@ -135,8 +136,17 @@ func (c *testConsumer) goTransact(txnsBeforeQuit int) { func (c *testConsumer) transact(txnsBeforeQuit int) { defer c.wg.Done() txnSess, _ := NewGroupTransactSession( + // Kraft sometimes has massive hangs internally when completing + // transactions. Against zk Kafka, we could rely on our + // internal mitigations to never have KIP-447 problems. + // Not true against Kraft, see #223. + RequireStableFetchOffsets(), + // Kraft sometimes returns success from topic creation, and + // then returns UnknownTopicXyz for a while in metadata loads. + // It also returns NotLeaderXyz; we handle both problems. + UnknownTopicRetries(-1), TransactionalID(randsha()), - TransactionTimeout(2*time.Minute), + TransactionTimeout(10*time.Second), WithLogger(testLogger()), // Control records have their own unique offset, so for testing, // we keep the record to ensure we do not doubly consume control @@ -235,7 +245,7 @@ func (c *testConsumer) transact(txnsBeforeQuit int) { for _, rec := range recs { po := partOffset{part, rec.offset} if _, exists := c.partOffsets[po]; exists { - c.errCh <- fmt.Errorf("saw double offset p%do%d", po.part, po.offset) + c.errCh <- fmt.Errorf("saw double offset t %s p%do%d", c.consumeFrom, po.part, po.offset) } c.partOffsets[po] = struct{}{}