From 31e6e37a6697b002784fad8cb8a7bb3f0bed4507 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 18 Oct 2022 22:39:40 -0600 Subject: [PATCH 1/3] franz-go: test against redpanda Turns out, redpanda does not like the EndBeginTxnUnsafe option. That's valid. --- .github/workflows/lint-and-test.yml | 24 +++++++++++++++++++++--- pkg/kgo/helpers_test.go | 17 +++++++++++++++++ pkg/kgo/txn.go | 5 +++++ pkg/kgo/txn_test.go | 19 ++++++++++--------- 4 files changed, 53 insertions(+), 12 deletions(-) diff --git a/.github/workflows/lint-and-test.yml b/.github/workflows/lint-and-test.yml index e3eeeb2c..1ee1a727 100644 --- a/.github/workflows/lint-and-test.yml +++ b/.github/workflows/lint-and-test.yml @@ -57,7 +57,7 @@ jobs: echo "staticcheck ./..." staticcheck -checks 'all,-ST1003,-SA1012,-ST1016,-SA1019,-SA2001' ./... # actually contains atomicalign check - integration-test: + integration-test-kafka: if: github.repository == 'twmb/franz-go' needs: golangci runs-on: ubuntu-latest @@ -80,11 +80,29 @@ jobs: KAFKA_CFG_BROKER_ID: 1 ALLOW_PLAINTEXT_LISTENER: yes KAFKA_KRAFT_CLUSTER_ID: XkpGZQ27R3eTl3OdTm2LYA # 16 byte base64-encoded UUID - # BITNAMI_DEBUG: true # Enable this to get more info on startup failures steps: - uses: actions/checkout@v3 - run: go test ./... env: KGO_TEST_RF: 1 KGO_SEEDS: kafka:9092 - KGO_TEST_RECORDS: 50000 + KGO_TEST_UNSAFE: true + KGO_TEST_STABLE_FETCH: true + + integration-test-redpanda: + if: github.repository == 'twmb/franz-go' + needs: golangci + runs-on: ubuntu-latest + name: "integration test redpanda" + container: golang:1.19.2 + services: + redpanda: + image: vectorized/redpanda-nightly:latest + ports: + - 9092:9092 + steps: + - uses: actions/checkout@v3 + - run: go test ./... + env: + KGO_TEST_RF: 1 + KGO_SEEDS: redpanda:9092 diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index 42278119..2bdf49de 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -22,6 +22,17 @@ var ( adm *Client testrf = 3 testRecordLimit = 500000 + + // Kraft sometimes has massive hangs internally when completing + // transactions. Against zk Kafka as well as Redpanda, we could rely on + // our internal mitigations to never have KIP-447 problems. Not true + // against Kraft, see #223. + requireStableFetch = false + + // Redpanda is a bit more strict with transactions: we must wait for + // EndTxn to return successfully before beginning a new transaction. We + // cannot use EndAndBeginTransaction with EndBeginTxnUnsafe. + allowUnsafe = false ) func init() { @@ -37,6 +48,12 @@ func init() { if n, _ := strconv.Atoi(os.Getenv("KGO_TEST_RECORDS")); n > 0 { testRecordLimit = n } + if _, exists := os.LookupEnv("KGO_TEST_STABLE_FETCH"); exists { + requireStableFetch = true + } + if _, exists := os.LookupEnv("KGO_TEST_UNSAFE"); exists { + allowUnsafe = true + } } func getSeedBrokers() Opt { diff --git a/pkg/kgo/txn.go b/pkg/kgo/txn.go index 9512acb7..aa2edb51 100644 --- a/pkg/kgo/txn.go +++ b/pkg/kgo/txn.go @@ -474,6 +474,11 @@ const ( // application with the SAME transactional ID and produce to all the // same partitions to ensure to resume the transaction and unstick the // partitions. + // + // Also note: this option does not work on all broker implementations. + // This relies on Kafka internals. Some brokers (notably Redpanda) are + // more strict with enforcing transaction correctness and this option + // cannot be used and will cause errors. EndBeginTxnUnsafe ) diff --git a/pkg/kgo/txn_test.go b/pkg/kgo/txn_test.go index c28a714b..0fcc17bd 100644 --- a/pkg/kgo/txn_test.go +++ b/pkg/kgo/txn_test.go @@ -61,7 +61,7 @@ func TestTxnEtl(t *testing.T) { // we commit and begin a new one. if i > 0 && i%10000 == 0 { how := EndBeginTxnSafe - if safeUnsafe { + if safeUnsafe && allowUnsafe { how = EndBeginTxnUnsafe } safeUnsafe = !safeUnsafe @@ -136,19 +136,15 @@ func (c *testConsumer) goTransact(txnsBeforeQuit int) { func (c *testConsumer) transact(txnsBeforeQuit int) { defer c.wg.Done() - txnSess, _ := NewGroupTransactSession( + + opts := []Opt{ getSeedBrokers(), - // Kraft sometimes has massive hangs internally when completing - // transactions. Against zk Kafka, we could rely on our - // internal mitigations to never have KIP-447 problems. - // Not true against Kraft, see #223. - RequireStableFetchOffsets(), // Kraft sometimes returns success from topic creation, and // then returns UnknownTopicXyz for a while in metadata loads. // It also returns NotLeaderXyz; we handle both problems. UnknownTopicRetries(-1), TransactionalID(randsha()), - TransactionTimeout(10*time.Second), + TransactionTimeout(10 * time.Second), WithLogger(testLogger()), // Control records have their own unique offset, so for testing, // we keep the record to ensure we do not doubly consume control @@ -159,7 +155,12 @@ func (c *testConsumer) transact(txnsBeforeQuit int) { FetchIsolationLevel(ReadCommitted()), Balancers(c.balancer), MaxBufferedRecords(10000), - ) + } + if requireStableFetch { + opts = append(opts, RequireStableFetchOffsets()) + } + + txnSess, _ := NewGroupTransactSession(opts...) defer txnSess.Close() ntxns := 0 // for if txnsBeforeQuit is non-negative From 152bad86b5449fe939a7e3fb52fc3ccfc44a781e Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Tue, 18 Oct 2022 23:05:00 -0600 Subject: [PATCH 2/3] testing: retry on dial errors for a bit --- pkg/kgo/helpers_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index 2bdf49de..5d36194a 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -4,7 +4,9 @@ import ( "context" "crypto/sha256" "encoding/hex" + "errors" "fmt" + "net" "os" "sort" "strconv" @@ -116,7 +118,19 @@ func tmpTopic(tb testing.TB) (string, func()) { reqTopic.ReplicationFactor = int16(testrf) req.Topics = append(req.Topics, reqTopic) + start := time.Now() +issue: resp, err := req.RequestWith(context.Background(), adm) + + // If we run tests in a container _immediately_ after the container + // starts, we can receive dial errors for a bit if the container is not + // fully initialized. Handle this by retrying specifically dial errors. + if ne := (*net.OpError)(nil); errors.As(err, &ne) && ne.Op == "dial" && time.Since(start) < 5*time.Second { + tb.Log("topic creation failed with dial error, sleeping 100ms and trying again") + time.Sleep(100 * time.Millisecond) + goto issue + } + if err == nil { err = kerr.ErrorForCode(resp.Topics[0].ErrorCode) } From 7134032af2e87b97888a3b6027adc76e53115373 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Fri, 21 Oct 2022 02:42:46 +0100 Subject: [PATCH 3/3] Fix Redpanda tests --- .github/workflows/lint-and-test.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/lint-and-test.yml b/.github/workflows/lint-and-test.yml index 1ee1a727..f33afb73 100644 --- a/.github/workflows/lint-and-test.yml +++ b/.github/workflows/lint-and-test.yml @@ -100,6 +100,8 @@ jobs: image: vectorized/redpanda-nightly:latest ports: - 9092:9092 + env: + REDPANDA_ADVERTISE_KAFKA_ADDRESS: redpanda:9092 steps: - uses: actions/checkout@v3 - run: go test ./...