Skip to content

Commit

Permalink
kgo testing: fix
Browse files Browse the repository at this point in the history
Kraft sometimes returns UNKNOWN_TOPIC_OR_PARTITION for a while even
after it replies to a create topics request with success. We now retry
these errors forever.

Also, something about Kraft sometimes hangs when transitioning
transaction states. We need RequireStableFetchOffsets to ensure our
tests do not fail.
  • Loading branch information
twmb committed Oct 18, 2022
1 parent 2b4d7ea commit 0a0076f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pkg/kgo/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestGroupETL(t *testing.T) {
cl, _ := NewClient(
WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)),
MaxBufferedRecords(10000),
UnknownTopicRetries(-1), // see txn_test comment
)
defer cl.Close()

Expand Down Expand Up @@ -116,6 +117,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) {
netls := 0 // for if etlsBeforeQuit is non-negative

opts := []Opt{
UnknownTopicRetries(-1), // see txn_test comment
WithLogger(testLogger()),
ConsumerGroup(c.group),
ConsumeTopics(c.consumeFrom),
Expand Down
14 changes: 12 additions & 2 deletions pkg/kgo/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestTxnEtl(t *testing.T) {
TransactionalID("p"+randsha()),
TransactionTimeout(2*time.Minute),
MaxBufferedRecords(10000),
UnknownTopicRetries(-1), // see comment below
)
if err != nil {
panic(err)
Expand Down Expand Up @@ -135,8 +136,17 @@ func (c *testConsumer) goTransact(txnsBeforeQuit int) {
func (c *testConsumer) transact(txnsBeforeQuit int) {
defer c.wg.Done()
txnSess, _ := NewGroupTransactSession(
// Kraft sometimes has massive hangs internally when completing
// transactions. Against zk Kafka, we could rely on our
// internal mitigations to never have KIP-447 problems.
// Not true against Kraft, see #223.
RequireStableFetchOffsets(),
// Kraft sometimes returns success from topic creation, and
// then returns UnknownTopicXyz for a while in metadata loads.
// It also returns NotLeaderXyz; we handle both problems.
UnknownTopicRetries(-1),
TransactionalID(randsha()),
TransactionTimeout(2*time.Minute),
TransactionTimeout(10*time.Second),
WithLogger(testLogger()),
// Control records have their own unique offset, so for testing,
// we keep the record to ensure we do not doubly consume control
Expand Down Expand Up @@ -235,7 +245,7 @@ func (c *testConsumer) transact(txnsBeforeQuit int) {
for _, rec := range recs {
po := partOffset{part, rec.offset}
if _, exists := c.partOffsets[po]; exists {
c.errCh <- fmt.Errorf("saw double offset p%do%d", po.part, po.offset)
c.errCh <- fmt.Errorf("saw double offset t %s p%do%d", c.consumeFrom, po.part, po.offset)
}
c.partOffsets[po] = struct{}{}

Expand Down

0 comments on commit 0a0076f

Please sign in to comment.