Skip to content

Commit

Permalink
Merge pull request #225 from twmb/redpanda
Browse files Browse the repository at this point in the history
franz-go: test against redpanda
  • Loading branch information
twmb authored Oct 21, 2022
2 parents d20a322 + 0f20096 commit b605610
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 12 deletions.
26 changes: 23 additions & 3 deletions .github/workflows/lint-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
echo "staticcheck ./..."
staticcheck -checks 'all,-ST1003,-SA1012,-ST1016,-SA1019,-SA2001' ./... # actually contains atomicalign check
integration-test:
integration-test-kafka:
if: github.repository == 'twmb/franz-go'
needs: golangci
runs-on: ubuntu-latest
Expand All @@ -80,11 +80,31 @@ jobs:
KAFKA_CFG_BROKER_ID: 1
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_KRAFT_CLUSTER_ID: XkpGZQ27R3eTl3OdTm2LYA # 16 byte base64-encoded UUID
# BITNAMI_DEBUG: true # Enable this to get more info on startup failures
steps:
- uses: actions/checkout@v3
- run: go test ./...
env:
KGO_TEST_RF: 1
KGO_SEEDS: kafka:9092
KGO_TEST_RECORDS: 50000
KGO_TEST_UNSAFE: true
KGO_TEST_STABLE_FETCH: true

integration-test-redpanda:
if: github.repository == 'twmb/franz-go'
needs: golangci
runs-on: ubuntu-latest
name: "integration test redpanda"
container: golang:1.19.2
services:
redpanda:
image: vectorized/redpanda-nightly:latest
ports:
- 9092:9092
env:
REDPANDA_ADVERTISE_KAFKA_ADDRESS: redpanda:9092
steps:
- uses: actions/checkout@v3
- run: go test ./...
env:
KGO_TEST_RF: 1
KGO_SEEDS: redpanda:9092
31 changes: 31 additions & 0 deletions pkg/kgo/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"net"
"os"
"sort"
"strconv"
Expand All @@ -22,6 +24,17 @@ var (
adm *Client
testrf = 3
testRecordLimit = 500000

// Kraft sometimes has massive hangs internally when completing
// transactions. Against zk Kafka as well as Redpanda, we could rely on
// our internal mitigations to never have KIP-447 problems. Not true
// against Kraft, see #223.
requireStableFetch = false

// Redpanda is a bit more strict with transactions: we must wait for
// EndTxn to return successfully before beginning a new transaction. We
// cannot use EndAndBeginTransaction with EndBeginTxnUnsafe.
allowUnsafe = false
)

func init() {
Expand All @@ -37,6 +50,12 @@ func init() {
if n, _ := strconv.Atoi(os.Getenv("KGO_TEST_RECORDS")); n > 0 {
testRecordLimit = n
}
if _, exists := os.LookupEnv("KGO_TEST_STABLE_FETCH"); exists {
requireStableFetch = true
}
if _, exists := os.LookupEnv("KGO_TEST_UNSAFE"); exists {
allowUnsafe = true
}
}

func getSeedBrokers() Opt {
Expand Down Expand Up @@ -99,7 +118,19 @@ func tmpTopic(tb testing.TB) (string, func()) {
reqTopic.ReplicationFactor = int16(testrf)
req.Topics = append(req.Topics, reqTopic)

start := time.Now()
issue:
resp, err := req.RequestWith(context.Background(), adm)

// If we run tests in a container _immediately_ after the container
// starts, we can receive dial errors for a bit if the container is not
// fully initialized. Handle this by retrying specifically dial errors.
if ne := (*net.OpError)(nil); errors.As(err, &ne) && ne.Op == "dial" && time.Since(start) < 5*time.Second {
tb.Log("topic creation failed with dial error, sleeping 100ms and trying again")
time.Sleep(100 * time.Millisecond)
goto issue
}

if err == nil {
err = kerr.ErrorForCode(resp.Topics[0].ErrorCode)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,11 @@ const (
// application with the SAME transactional ID and produce to all the
// same partitions to ensure to resume the transaction and unstick the
// partitions.
//
// Also note: this option does not work on all broker implementations.
// This relies on Kafka internals. Some brokers (notably Redpanda) are
// more strict with enforcing transaction correctness and this option
// cannot be used and will cause errors.
EndBeginTxnUnsafe
)

Expand Down
19 changes: 10 additions & 9 deletions pkg/kgo/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestTxnEtl(t *testing.T) {
// we commit and begin a new one.
if i > 0 && i%10000 == 0 {
how := EndBeginTxnSafe
if safeUnsafe {
if safeUnsafe && allowUnsafe {
how = EndBeginTxnUnsafe
}
safeUnsafe = !safeUnsafe
Expand Down Expand Up @@ -136,19 +136,15 @@ func (c *testConsumer) goTransact(txnsBeforeQuit int) {

func (c *testConsumer) transact(txnsBeforeQuit int) {
defer c.wg.Done()
txnSess, _ := NewGroupTransactSession(

opts := []Opt{
getSeedBrokers(),
// Kraft sometimes has massive hangs internally when completing
// transactions. Against zk Kafka, we could rely on our
// internal mitigations to never have KIP-447 problems.
// Not true against Kraft, see #223.
RequireStableFetchOffsets(),
// Kraft sometimes returns success from topic creation, and
// then returns UnknownTopicXyz for a while in metadata loads.
// It also returns NotLeaderXyz; we handle both problems.
UnknownTopicRetries(-1),
TransactionalID(randsha()),
TransactionTimeout(10*time.Second),
TransactionTimeout(10 * time.Second),
WithLogger(testLogger()),
// Control records have their own unique offset, so for testing,
// we keep the record to ensure we do not doubly consume control
Expand All @@ -159,7 +155,12 @@ func (c *testConsumer) transact(txnsBeforeQuit int) {
FetchIsolationLevel(ReadCommitted()),
Balancers(c.balancer),
MaxBufferedRecords(10000),
)
}
if requireStableFetch {
opts = append(opts, RequireStableFetchOffsets())
}

txnSess, _ := NewGroupTransactSession(opts...)
defer txnSess.Close()

ntxns := 0 // for if txnsBeforeQuit is non-negative
Expand Down

0 comments on commit b605610

Please sign in to comment.