diff --git a/.github/workflows/lint-and-test.yml b/.github/workflows/lint-and-test.yml index 2f5f3d4b..d3b38ebf 100644 --- a/.github/workflows/lint-and-test.yml +++ b/.github/workflows/lint-and-test.yml @@ -14,7 +14,7 @@ jobs: golangci: if: github.repository == 'twmb/franz-go' runs-on: ubuntu-latest - name: 'golangci-lint on amd64' + name: "golangci-lint on amd64" steps: - uses: actions/checkout@v3 - uses: actions/setup-go@v3 @@ -28,7 +28,7 @@ jobs: if: github.repository == 'twmb/franz-go' needs: golangci runs-on: ubuntu-latest - name: 'vet on arm' + name: "vet on arm" steps: - uses: actions/checkout@v3 with: @@ -58,31 +58,34 @@ jobs: echo "staticcheck ./..." staticcheck -checks 'all,-ST1003,-SA1012,-ST1016,-SA1019,-SA2001' ./... # actually contains atomicalign check -# TODO: fix -# integration-test: -# if: github.repository == 'twmb/franz-go' -# needs: golangci -# runs-on: ubuntu-latest -# name: 'integration test kafka' -# container: golang:1.19.2 -# services: -# zk: -# image: bitnami/zookeeper:latest -# ports: -# - 2181:2181 -# env: -# ALLOW_ANONYMOUS_LOGIN: yes -# kafka: -# image: bitnami/kafka:latest -# ports: -# - 9092:9092 -# env: -# ALLOW_PLAINTEXT_LISTENER: yes -# KAFKA_CFG_ZOOKEEPER_CONNECT: zk:2181 -# steps: -# - uses: actions/checkout@v3 -# - run: go test ./... -# env: -# KGO_TEST_RF: 1 -# KGO_SEEDS: kafka:9092 -# KGO_TEST_RECORDS: 50000 + integration-test: + if: github.repository == 'twmb/franz-go' + needs: golangci + runs-on: ubuntu-latest + name: "integration test kafka" + container: golang:1.19.2 + services: + kafka: + image: bitnami/kafka:latest + ports: + - 9092:9092 + env: + KAFKA_ENABLE_KRAFT: yes + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@127.0.0.1:9093 + # Set this to "PLAINTEXT://127.0.0.1:9092" if you want to run this container on localhost via Docker + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_CFG_BROKER_ID: 1 + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_KRAFT_CLUSTER_ID: XkpGZQ27R3eTl3OdTm2LYA # 16 byte base64-encoded UUID + # BITNAMI_DEBUG: true # Enable this to get more info on startup failures + steps: + - uses: actions/checkout@v3 + - run: go test ./... + env: + KGO_TEST_RF: 1 + KGO_SEEDS: kafka:9092 + KGO_TEST_RECORDS: 50000 diff --git a/pkg/kgo/group_test.go b/pkg/kgo/group_test.go index 2605b6a2..39069dce 100644 --- a/pkg/kgo/group_test.go +++ b/pkg/kgo/group_test.go @@ -37,6 +37,7 @@ func TestGroupETL(t *testing.T) { go func() { cl, _ := NewClient( + getSeedBrokers(), WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), MaxBufferedRecords(10000), UnknownTopicRetries(-1), // see txn_test comment @@ -117,6 +118,7 @@ func (c *testConsumer) etl(etlsBeforeQuit int) { netls := 0 // for if etlsBeforeQuit is non-negative opts := []Opt{ + getSeedBrokers(), UnknownTopicRetries(-1), // see txn_test comment WithLogger(testLogger()), ConsumerGroup(c.group), diff --git a/pkg/kgo/helpers_test.go b/pkg/kgo/helpers_test.go index 6e441126..42278119 100644 --- a/pkg/kgo/helpers_test.go +++ b/pkg/kgo/helpers_test.go @@ -25,12 +25,8 @@ var ( ) func init() { - seeds := os.Getenv("KGO_SEEDS") - if seeds == "" { - seeds = "127.0.0.1:9092" - } var err error - adm, err = NewClient(SeedBrokers(strings.Split(seeds, ",")...)) + adm, err = NewClient(getSeedBrokers()) if err != nil { panic(fmt.Sprintf("unable to create admin client: %v", err)) } @@ -43,6 +39,14 @@ func init() { } } +func getSeedBrokers() Opt { + seeds := os.Getenv("KGO_SEEDS") + if seeds == "" { + seeds = "127.0.0.1:9092" + } + return SeedBrokers(strings.Split(seeds, ",")...) +} + var loggerNum int64 var testLogLevel = func() LogLevel { diff --git a/pkg/kgo/txn_test.go b/pkg/kgo/txn_test.go index 7a840eb5..c28a714b 100644 --- a/pkg/kgo/txn_test.go +++ b/pkg/kgo/txn_test.go @@ -28,6 +28,7 @@ func TestTxnEtl(t *testing.T) { go func() { cl, err := NewClient( + getSeedBrokers(), WithLogger(BasicLogger(os.Stderr, testLogLevel, nil)), TransactionalID("p"+randsha()), TransactionTimeout(2*time.Minute), @@ -136,6 +137,7 @@ func (c *testConsumer) goTransact(txnsBeforeQuit int) { func (c *testConsumer) transact(txnsBeforeQuit int) { defer c.wg.Done() txnSess, _ := NewGroupTransactSession( + getSeedBrokers(), // Kraft sometimes has massive hangs internally when completing // transactions. Against zk Kafka, we could rely on our // internal mitigations to never have KIP-447 problems.