From 55670189da1b7570f2c3731ca35fdbb402e5585d Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 18 Oct 2022 16:21:47 -0600 Subject: [PATCH 1/5] kgo sink: only allow more than one in flight if we have ok response See large comment. For #223. --- pkg/kgo/sink.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 6be05b2b..69b02eda 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -107,7 +107,7 @@ func (s *sink) createReq(id int64, epoch int16) (*produceRequest, *kmsg.AddParti recBufsIdx = (recBufsIdx + 1) % len(s.recBufs) recBuf.mu.Lock() - if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx || recBuf.inflightOnSink != nil && recBuf.inflightOnSink != s { + if recBuf.failing || len(recBuf.batches) == recBuf.batchDrainIdx || recBuf.inflightOnSink != nil && recBuf.inflightOnSink != s || recBuf.inflight != 0 && !recBuf.okOnSink { recBuf.mu.Unlock() continue } @@ -828,6 +828,9 @@ func (s *sink) handleReqRespBatch( "err_is_retriable", kerr.IsRetriable(err), "max_retries_reached", !failUnknown && batch.tries >= s.cl.cfg.recordRetries, ) + batch.owner.okOnSink = false + } else { + batch.owner.okOnSink = true } s.cl.finishBatch(batch.recBatch, producerID, producerEpoch, partition, baseOffset, err) didProduce = err == nil @@ -1045,6 +1048,20 @@ type recBuf struct { // finishing, we would allow requests to finish out of order: // handleSeqResps works per sink, not across sinks. inflightOnSink *sink + // We only want to allow more than 1 inflight on a sink *if* we are + // currently receiving successful responses. Unimportantly, this allows + // us to save resources if the broker is having a problem or just + // recovered from one. Importantly, we work around an edge case in + // Kafka. Kafka will accept the first produce request for a pid/epoch + // with *any* sequence number. Say we sent two requests inflight. The + // first request Kafka replies to with NOT_LEADER_FOR_PARTITION, the + // second, the broker finished setting up and accepts. The broker now + // has the second request but not the first, we will retry both + // requests and receive OOOSN, and the broker has logs out of order. + // By only allowing more than one inflight if we have seen an ok + // response, we largely eliminate risk of this problem. See #223 for + // more details. + okOnSink bool // Inflight tracks the number of requests inflight using batches from // this recBuf. Every time this hits zero, if the batchDrainIdx is not // at the end, we clear inflightOnSink and trigger the *current* sink @@ -1463,9 +1480,8 @@ func (b *recBatch) decInflight() { if recBuf.inflight != 0 { return } - oldSink := recBuf.inflightOnSink recBuf.inflightOnSink = nil - if oldSink != recBuf.sink && recBuf.batchDrainIdx != len(recBuf.batches) { + if recBuf.batchDrainIdx != len(recBuf.batches) { recBuf.sink.maybeDrain() } } From f4a508cad09941b6fe06a1e381964448e110eaad Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 18 Oct 2022 16:43:56 -0600 Subject: [PATCH 2/5] kgo: handle CONCURRENT_TRANSACTIONS more widely Single-broker localhost kraft testing shows this error being returned **much** more frequently. Rather than having a 5s timeout, we now continue until the input context is canceled. This largely affects ending a transaction. We also improve the locking situation in one area. It's a bit ugly, but the solution is better than unlocking directly at every return site. For #223. --- pkg/kgo/config.go | 3 ++ pkg/kgo/producer.go | 4 ++- pkg/kgo/sink.go | 2 +- pkg/kgo/txn.go | 88 +++++++++++++++++++++++++++++++++------------ 4 files changed, 73 insertions(+), 24 deletions(-) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 818cfa91..3844de39 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -801,6 +801,9 @@ func WithHooks(hooks ...Hook) Opt { // are expected to backoff slightly and retry the operation. Lower backoffs may // increase load on the brokers, while higher backoffs may increase transaction // latency in clients. +// +// Note that if brokers are hanging in this concurrent transactions state for +// too long, the client progressively increases the backoff. func ConcurrentTransactionsBackoff(backoff time.Duration) Opt { return clientOpt{func(cfg *cfg) { cfg.txnBackoff = backoff }} } diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index 38719a02..340ce45b 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -738,7 +738,9 @@ func (cl *Client) doInitProducerID(lastID int64, lastEpoch int16) (*producerID, } if err = kerr.ErrorForCode(resp.ErrorCode); err != nil { - if kerr.IsRetriable(err) { // this could return ConcurrentTransactions, but this is rare; ignore until a user report + // We could receive concurrent transactions; this is ignorable + // and we just want to re-init. + if kerr.IsRetriable(err) || errors.Is(err, kerr.ConcurrentTransactions) { cl.cfg.logger.Log(LogLevelInfo, "producer id initialization resulted in retriable error, discarding initialization attempt", "err", err) return &producerID{lastID, lastEpoch, err}, false } diff --git a/pkg/kgo/sink.go b/pkg/kgo/sink.go index 69b02eda..c08a3d35 100644 --- a/pkg/kgo/sink.go +++ b/pkg/kgo/sink.go @@ -427,7 +427,7 @@ func (s *sink) doTxnReq( req.batches.eachOwnerLocked(seqRecBatch.removeFromTxn) } }() - err = s.cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error { + err = s.cl.doWithConcurrentTransactions(s.cl.ctx, "AddPartitionsToTxn", func() error { stripped, err = s.issueTxnReq(req, txnReq) return err }) diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index aed72ea9..9512acb7 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -260,6 +260,19 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry kip447 := false if wantCommit && !failed { + isAbortableCommitErr := func(err error) bool { + switch { + case errors.Is(err, kerr.IllegalGeneration), // rebalance begun & completed before we committed + errors.Is(err, kerr.RebalanceInProgress), // in rebalance, abort & retry later + errors.Is(err, kerr.CoordinatorNotAvailable), // req failed too many times (same for next two) + errors.Is(err, kerr.CoordinatorLoadInProgress), + errors.Is(err, kerr.NotCoordinator), + errors.Is(err, kerr.ConcurrentTransactions): // kafka not harmonized, we can just abort or retry / eventually abort + return true + } + return false + } + var commitErrs []string committed := make(chan struct{}) @@ -267,6 +280,10 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry func(_ *kmsg.TxnOffsetCommitRequest, resp *kmsg.TxnOffsetCommitResponse, err error) { defer close(committed) if err != nil { + if isAbortableCommitErr(err) { + hasAbortableCommitErr = true + return + } commitErrs = append(commitErrs, err.Error()) return } @@ -274,16 +291,12 @@ func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry for _, t := range resp.Topics { for _, p := range t.Partitions { - switch err := kerr.ErrorForCode(p.ErrorCode); err { - case nil: - case kerr.IllegalGeneration, // rebalance begun & completed before we committed - kerr.RebalanceInProgress, // in rebalance, abort & retry later - kerr.CoordinatorNotAvailable, // req failed too many times (same for next two) - kerr.CoordinatorLoadInProgress, - kerr.NotCoordinator: - hasAbortableCommitErr = true - default: - commitErrs = append(commitErrs, fmt.Sprintf("topic %s partition %d: %v", t.Topic, p.Partition, err)) + if err := kerr.ErrorForCode(p.ErrorCode); err != nil { + if isAbortableCommitErr(err) { + hasAbortableCommitErr = true + } else { + commitErrs = append(commitErrs, fmt.Sprintf("topic %s partition %d: %v", t.Topic, p.Partition, err)) + } } } } @@ -578,7 +591,7 @@ func (cl *Client) EndAndBeginTransaction( "commit", commit, ) cl.producer.readded = false - err = cl.doWithConcurrentTransactions("EndTxn", func() error { + err = cl.doWithConcurrentTransactions(ctx, "EndTxn", func() error { req := kmsg.NewPtrEndTxnRequest() req.TransactionalID = *cl.cfg.txnID req.ProducerID = id @@ -643,7 +656,7 @@ func (cl *Client) EndAndBeginTransaction( // there could be a stranded txn within Kafka's ProducerStateManager, // but ideally the user will reconnect with the same txnal id. cl.producer.readded = true - return cl.doWithConcurrentTransactions("AddPartitionsToTxn", func() error { + return cl.doWithConcurrentTransactions(ctx, "AddPartitionsToTxn", func() error { req := kmsg.NewPtrAddPartitionsToTxnRequest() req.TransactionalID = *cl.cfg.txnID req.ProducerID = id @@ -824,7 +837,7 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) ) cl.producer.readded = false - err = cl.doWithConcurrentTransactions("EndTxn", func() error { + err = cl.doWithConcurrentTransactions(ctx, "EndTxn", func() error { req := kmsg.NewPtrEndTxnRequest() req.TransactionalID = *cl.cfg.txnID req.ProducerID = id @@ -838,7 +851,8 @@ func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) }) // If the returned error is still a Kafka error, this is fatal and we - // need to fail our producer ID we loaded above. + // need to fail our producer ID we loaded above. ConcurrentTransactions + // could be var ke *kerr.Error if errors.As(err, &ke) && !ke.Retriable { cl.failProducerID(id, epoch, err) @@ -886,13 +900,32 @@ func (cl *Client) maybeRecoverProducerID() (necessary, did bool, err error) { // If a transaction is begun too quickly after finishing an old transaction, // Kafka may still be finalizing its commit / abort and will return a // concurrent transactions error. We handle that by retrying for a bit. -func (cl *Client) doWithConcurrentTransactions(name string, fn func() error) error { +func (cl *Client) doWithConcurrentTransactions(ctx context.Context, name string, fn func() error) error { start := time.Now() tries := 0 backoff := cl.cfg.txnBackoff + start: err := fn() - if errors.Is(err, kerr.ConcurrentTransactions) && time.Since(start) < 5*time.Second { + if errors.Is(err, kerr.ConcurrentTransactions) { + // The longer we are stalled, the more we enforce a minimum + // backoff. + since := time.Since(start) + switch { + case since > time.Second: + if backoff < 200*time.Millisecond { + backoff = 200 * time.Millisecond + } + case since > 5*time.Second/2: + if backoff < 500*time.Millisecond { + backoff = 500 * time.Millisecond + } + case since > 5*time.Second: + if backoff < time.Second { + backoff = time.Second + } + } + tries++ cl.cfg.logger.Log(LogLevelDebug, fmt.Sprintf("%s failed with CONCURRENT_TRANSACTIONS, which may be because we ended a txn and began producing in a new txn too quickly; backing off and retrying", name), "backoff", backoff, @@ -901,6 +934,9 @@ start: ) select { case <-time.After(backoff): + case <-ctx.Done(): + cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("abandoning %s retry due to client ctx quitting", name)) + return err case <-cl.ctx.Done(): cl.cfg.logger.Log(LogLevelError, fmt.Sprintf("abandoning %s retry due to client ctx quitting", name)) return err @@ -941,12 +977,18 @@ func (cl *Client) commitTransactionOffsets( // unlock the producer txnMu before committing to allow EndTransaction // to go through, even though that could cut off our commit. cl.producer.txnMu.Lock() + var unlockedTxn bool + unlockTxn := func() { + if !unlockedTxn { + cl.producer.txnMu.Unlock() + } + unlockedTxn = true + } + defer unlockTxn() if !cl.producer.inTxn { onDone(nil, nil, errNotInTransaction) - cl.producer.txnMu.Unlock() return nil } - cl.producer.txnMu.Unlock() g := cl.consumer.g if g == nil { @@ -958,9 +1000,6 @@ func (cl *Client) commitTransactionOffsets( return g } - g.mu.Lock() - defer g.mu.Unlock() - if !g.offsetsAddedToTxn { if err := cl.addOffsetsToTxn(g.ctx, g.cfg.group); err != nil { if onDone != nil { @@ -971,6 +1010,11 @@ func (cl *Client) commitTransactionOffsets( g.offsetsAddedToTxn = true } + unlockTxn() + + g.mu.Lock() + defer g.mu.Unlock() + g.commitTxn(ctx, uncommitted, onDone) return g } @@ -984,7 +1028,7 @@ func (cl *Client) addOffsetsToTxn(ctx context.Context, group string) error { return err } - err = cl.doWithConcurrentTransactions("AddOffsetsToTxn", func() error { // committing offsets without producing causes a transaction to begin within Kafka + err = cl.doWithConcurrentTransactions(ctx, "AddOffsetsToTxn", func() error { // committing offsets without producing causes a transaction to begin within Kafka cl.cfg.logger.Log(LogLevelInfo, "issuing AddOffsetsToTxn", "txn", *cl.cfg.txnID, "producerID", id, From 2b4d7ea86a6eb0828260ec2df74cbe989f2e8f82 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 18 Oct 2022 16:51:00 -0600 Subject: [PATCH 3/5] kgo source: avoid spinloop if topic is deleted If a topic is deleted while being consumed, brokers will suddently return UNKNOWN_TOPIC_OR_PARTITION. This error is retriable and the partition is internally stripped. This would cause a fetch to finish immediately and re-fetch, which would spin. We now backoff if all partitions are stripped. --- pkg/kgo/source.go | 68 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 21 deletions(-) diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 4416e80c..4ef007d3 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -594,16 +594,15 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct return } - // If we had an error, we backoff. Killing a fetch quits the backoff, - // but that is fine; we may just re-request too early and fall into - // another backoff. - if err != nil { + var didBackoff bool + backoff := func() { // We preemptively allow more fetches (since we are not buffering) // and reset our session because of the error (who knows if kafka // processed the request but the client failed to receive it). doneFetch <- struct{}{} alreadySentToDoneFetch = true s.session.reset() + didBackoff = true s.cl.triggerUpdateMetadata(false, "opportunistic load during source backoff") // as good a time as any s.consecutiveFailures++ @@ -613,19 +612,31 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct case <-after.C: case <-ctx.Done(): } + } + defer func() { + if !didBackoff { + s.consecutiveFailures = 0 + } + }() + + // If we had an error, we backoff. Killing a fetch quits the backoff, + // but that is fine; we may just re-request too early and fall into + // another backoff. + if err != nil { + backoff() return } - s.consecutiveFailures = 0 resp := kresp.(*kmsg.FetchResponse) var ( - fetch Fetch - reloadOffsets listOrEpochLoads - preferreds cursorPreferreds - updateMeta bool - updateWhy string - handled = make(chan struct{}) + fetch Fetch + reloadOffsets listOrEpochLoads + preferreds cursorPreferreds + allErrsStripped bool + updateMeta bool + updateWhy string + handled = make(chan struct{}) ) // Theoretically, handleReqResp could take a bit of CPU time due to @@ -635,7 +646,7 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct // Processing the response only needs the source's nodeID and client. go func() { defer close(handled) - fetch, reloadOffsets, preferreds, updateMeta, updateWhy = s.handleReqResp(br, req, resp) + fetch, reloadOffsets, preferreds, allErrsStripped, updateMeta, updateWhy = s.handleReqResp(br, req, resp) }() select { @@ -729,6 +740,12 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct s.sem = make(chan struct{}) s.hook(&fetch, true, false) // buffered, not polled s.cl.consumer.addSourceReadyForDraining(s) + } else if allErrsStripped { + // If we stripped all errors from the response, we are likely + // fetching from topics that were deleted. We want to back off + // a bit rather than spin-loop immediately re-requesting + // deleted topics. + backoff() } return } @@ -740,15 +757,20 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct // the source mutex. // // This function, and everything it calls, is side effect free. -func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchResponse) (Fetch, listOrEpochLoads, cursorPreferreds, bool, string) { +func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchResponse) ( + f Fetch, + reloadOffsets listOrEpochLoads, + preferreds cursorPreferreds, + allErrsStripped bool, + updateMeta bool, + why string, +) { + f = Fetch{Topics: make([]FetchTopic, 0, len(resp.Topics))} var ( - f = Fetch{ - Topics: make([]FetchTopic, 0, len(resp.Topics)), - } - reloadOffsets listOrEpochLoads - preferreds []cursorOffsetPreferred - updateMeta bool - updateWhy multiUpdateWhy + updateWhy multiUpdateWhy + + numParts int + numErrsStripped int kip320 = s.cl.supportsOffsetForLeaderEpoch() ) @@ -791,6 +813,8 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } + numParts++ + // If we are fetching from the replica already, Kafka replies with a -1 // preferred read replica. If Kafka replies with a preferred replica, // it sends no records. @@ -827,6 +851,8 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe kerr.UnknownLeaderEpoch, // our meta is newer than broker we fetched from kerr.OffsetNotAvailable: // fetched from out of sync replica or a behind in-sync one (KIP-392: case 1 and case 2) + numErrsStripped++ + case kerr.OffsetOutOfRange: // If we are out of range, we reset to what we can. // With Kafka >= 2.1.0, we should only get offset out @@ -919,7 +945,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe } } - return f, reloadOffsets, preferreds, updateMeta, updateWhy.reason("fetch had inner topic errors") + return f, reloadOffsets, preferreds, numParts == numErrsStripped, updateMeta, updateWhy.reason("fetch had inner topic errors") } // processRespPartition processes all records in all potentially compressed From 0a0076fa016b6a4846fa0ec749a3e60d6d55eab0 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 18 Oct 2022 16:53:14 -0600 Subject: [PATCH 4/5] kgo testing: fix Kraft sometimes returns UNKNOWN_TOPIC_OR_PARTITION for a while even after it replies to a create topics request with success. We now retry these errors forever. Also, something about Kraft sometimes hangs when transitioning transaction states. We need RequireStableFetchOffsets to ensure our tests do not fail. --- pkg/kgo/group_test.go | 2 ++ pkg/kgo/txn_test.go | 14 ++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index 27cd0abd..2605b6a2 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -39,6 +39,7 @@ func TestGroupETL(t *testing.T) { cl, _ := NewClient( WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), MaxBufferedRecords(10000), + UnknownTopicRetries(-1), // see txn_test comment ) defer cl.Close() @@ -116,6 +117,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { netls := 0 // for if etlsBeforeQuit is non-negative opts := []Opt{ + UnknownTopicRetries(-1), // see txn_test comment WithLogger(testLogger()), ConsumerGroup(c.group), ConsumeTopics(c.consumeFrom), diff --git a/pkg/kgo/txn_test.go b/pkg/kgo/txn_test.go index 6fe2564f..7a840eb5 100644 --- a/pkg/kgo/txn_test.go +++ b/pkg/kgo/txn_test.go @@ -32,6 +32,7 @@ func TestTxnEtl(t *testing.T) { TransactionalID("p"+randsha()), TransactionTimeout(2*time.Minute), MaxBufferedRecords(10000), + UnknownTopicRetries(-1), // see comment below ) if err != nil { panic(err) @@ -135,8 +136,17 @@ func (c *testConsumer) goTransact(txnsBeforeQuit int) { func (c *testConsumer) transact(txnsBeforeQuit int) { defer c.wg.Done() txnSess, _ := NewGroupTransactSession( + // Kraft sometimes has massive hangs internally when completing + // transactions. Against zk Kafka, we could rely on our + // internal mitigations to never have KIP-447 problems. + // Not true against Kraft, see #223. + RequireStableFetchOffsets(), + // Kraft sometimes returns success from topic creation, and + // then returns UnknownTopicXyz for a while in metadata loads. + // It also returns NotLeaderXyz; we handle both problems. + UnknownTopicRetries(-1), TransactionalID(randsha()), - TransactionTimeout(2*time.Minute), + TransactionTimeout(10*time.Second), WithLogger(testLogger()), // Control records have their own unique offset, so for testing, // we keep the record to ensure we do not doubly consume control @@ -235,7 +245,7 @@ func (c *testConsumer) transact(txnsBeforeQuit int) { for _, rec := range recs { po := partOffset{part, rec.offset} if _, exists := c.partOffsets[po]; exists { - c.errCh <- fmt.Errorf("saw double offset p%do%d", po.part, po.offset) + c.errCh <- fmt.Errorf("saw double offset t %s p%do%d", c.consumeFrom, po.part, po.offset) } c.partOffsets[po] = struct{}{} From b4c041b4c917b1c41573fc5655731c065f5fe813 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Sat, 15 Oct 2022 23:54:11 +0100 Subject: [PATCH 5/5] fix integration test action --- .github/workflows/lint-and-test.yml | 63 +++++++++++++++-------------- pkg/kgo/group_test.go | 2 + pkg/kgo/helpers_test.go | 14 ++++--- pkg/kgo/txn_test.go | 2 + 4 files changed, 46 insertions(+), 35 deletions(-) diff --git a/.github/workflows/lint-and-test.yml b/.github/workflows/lint-and-test.yml index 2f5f3d4b..d3b38ebf 100644 --- a/.github/workflows/lint-and-test.yml +++ b/.github/workflows/lint-and-test.yml @@ -14,7 +14,7 @@ jobs: golangci: if: github.repository == 'twmb/franz-go' runs-on: ubuntu-latest - name: 'golangci-lint on amd64' + name: "golangci-lint on amd64" steps: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 @@ -28,7 +28,7 @@ jobs: if: github.repository == 'twmb/franz-go' needs: golangci runs-on: ubuntu-latest - name: 'vet on arm' + name: "vet on arm" steps: - uses: actions/checkout@v3 with: @@ -58,31 +58,34 @@ jobs: echo "staticcheck ./..." staticcheck -checks 'all,-ST1003,-SA1012,-ST1016,-SA1019,-SA2001' ./... # actually contains atomicalign check -# TODO: fix -# integration-test: -# if: github.repository == 'twmb/franz-go' -# needs: golangci -# runs-on: ubuntu-latest -# name: 'integration test kafka' -# container: golang:1.19.2 -# services: -# zk: -# image: bitnami/zookeeper:latest -# ports: -# - 2181:2181 -# env: -# ALLOW_ANONYMOUS_LOGIN: yes -# kafka: -# image: bitnami/kafka:latest -# ports: -# - 9092:9092 -# env: -# ALLOW_PLAINTEXT_LISTENER: yes -# KAFKA_CFG_ZOOKEEPER_CONNECT: zk:2181 -# steps: -# - uses: actions/checkout@v3 -# - run: go test ./... -# env: -# KGO_TEST_RF: 1 -# KGO_SEEDS: kafka:9092 -# KGO_TEST_RECORDS: 50000 + integration-test: + if: github.repository == 'twmb/franz-go' + needs: golangci + runs-on: ubuntu-latest + name: "integration test kafka" + container: golang:1.19.2 + services: + kafka: + image: bitnami/kafka:latest + ports: + - 9092:9092 + env: + KAFKA_ENABLE_KRAFT: yes + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093 + # Set this to "PLAINTEXT://127.0.0.1:9092" if you want to run this container on localhost via Docker + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_CFG_BROKER_ID: 1 + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_KRAFT_CLUSTER_ID: XkpGZQ27R3eTl3OdTm2LYA # 16 byte base64-encoded UUID + # BITNAMI_DEBUG: true # Enable this to get more info on startup failures + steps: + - uses: actions/checkout@v3 + - run: go test ./... + env: + KGO_TEST_RF: 1 + KGO_SEEDS: kafka:9092 + KGO_TEST_RECORDS: 50000 diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index 2605b6a2..39069dce 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -37,6 +37,7 @@ func TestGroupETL(t *testing.T) { go func() { cl, _ := NewClient( + getSeedBrokers(), WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), MaxBufferedRecords(10000), UnknownTopicRetries(-1), // see txn_test comment @@ -117,6 +118,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { netls := 0 // for if etlsBeforeQuit is non-negative opts := []Opt{ + getSeedBrokers(), UnknownTopicRetries(-1), // see txn_test comment WithLogger(testLogger()), ConsumerGroup(c.group), diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index 6e441126..42278119 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -25,12 +25,8 @@ var ( ) func init() { - seeds := os.Getenv("KGO_SEEDS") - if seeds == "" { - seeds = "127.0.0.1:9092" - } var err error - adm, err = NewClient(SeedBrokers(strings.Split(seeds, ",")...)) + adm, err = NewClient(getSeedBrokers()) if err != nil { panic(fmt.Sprintf("unable to create admin client: %v", err)) } @@ -43,6 +39,14 @@ func init() { } } +func getSeedBrokers() Opt { + seeds := os.Getenv("KGO_SEEDS") + if seeds == "" { + seeds = "127.0.0.1:9092" + } + return SeedBrokers(strings.Split(seeds, ",")...) +} + var loggerNum int64 var testLogLevel = func() LogLevel { diff --git a/pkg/kgo/txn_test.go b/pkg/kgo/txn_test.go index 7a840eb5..c28a714b 100644 --- a/pkg/kgo/txn_test.go +++ b/pkg/kgo/txn_test.go @@ -28,6 +28,7 @@ func TestTxnEtl(t *testing.T) { go func() { cl, err := NewClient( + getSeedBrokers(), WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), TransactionalID("p"+randsha()), TransactionTimeout(2*time.Minute), @@ -136,6 +137,7 @@ func (c *testConsumer) goTransact(txnsBeforeQuit int) { func (c *testConsumer) transact(txnsBeforeQuit int) { defer c.wg.Done() txnSess, _ := NewGroupTransactSession( + getSeedBrokers(), // Kraft sometimes has massive hangs internally when completing // transactions. Against zk Kafka, we could rely on our // internal mitigations to never have KIP-447 problems.