Skip to content

Commit

Permalink
Merge pull request #10 from imperiuse/feature/refactor-kafka-confluen…
Browse files Browse the repository at this point in the history
…t-lib

New version of kafka lib wrapper
  • Loading branch information
imperiuse authored Jan 17, 2023
2 parents 2b1f95e + cc8a272 commit 315d38a
Show file tree
Hide file tree
Showing 12 changed files with 1,146 additions and 896 deletions.
16 changes: 9 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ module github.com/imperiuse/golib

go 1.19

replace github.com/docker/docker => github.com/docker/docker v20.10.3-0.20221013203545-33ab36d6b304+incompatible // 22.06 branch // see more here https://golang.testcontainers.org/quickstart/.

require (
github.com/Masterminds/squirrel v1.5.3
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/docker/docker v20.10.21+incompatible
github.com/google/uuid v1.3.0
github.com/jackc/pgx/v4 v4.17.2
github.com/jinzhu/copier v0.3.5
github.com/jmoiron/sqlx v1.3.5
Expand All @@ -22,41 +25,39 @@ require (
require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/Microsoft/go-winio v0.6.0 // indirect
github.com/Microsoft/hcsshim v0.9.6 // indirect
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/containerd/containerd v1.6.14 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.13.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.13.0 // indirect
github.com/klauspost/compress v1.11.13 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/moby/sys/mount v0.3.3 // indirect
github.com/moby/sys/mountinfo v0.6.2 // indirect
github.com/moby/patternmatcher v0.5.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/term v0.0.0-20221120202655-abb19827d345 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/opencontainers/runc v1.1.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.4.0 // indirect
Expand All @@ -66,5 +67,6 @@ require (
google.golang.org/genproto v0.0.0-20221207170731-23e4bf6bdc37 // indirect
google.golang.org/grpc v1.51.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
805 changes: 9 additions & 796 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion kafka/confluent/admin_client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package confluent

import (
"context"
Expand Down
49 changes: 10 additions & 39 deletions kafka/confluent/confluent_kafka.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package kafka
package confluent

import (
"context"
Expand All @@ -15,23 +15,24 @@ const PartitionAny = kafka.PartitionAny

type (
Consumer interface {
Start(context.Context, []TopicSpecification, []string, ConsumeEventFuncGoroutine) error
Stop() error
Start(ctx context.Context, createTopics []TopicSpecification, consumeTopics []string, maxWaitReadTimeout time.Duration, f ProcessEventFunc) (error, ErrChan)
Stop()
}

Producer interface {
Start(context.Context, func(Event)) error
Stop(int)
Publish(topic *string, partition int32, key []byte, value []byte, deliveryChan chan Event) error
Start(ctx context.Context, finishTimeoutFlushInMs int) (chan Event, error)
Stop()
Publish(msg *Message) error
Flush(timeoutFlushInMs int)
}

Adminer interface {
CreateTopics(ctx context.Context, topics []TopicSpecification, opt ...CreateTopicsAdminOption) error
Stop()
}

ConsumeEventFuncGoroutine = func(context.Context, *kafka.Consumer)
ProcessEventFunc = func(context.Context, *kafka.Consumer, *Message)
ProcessEventFunc = func(*kafka.Consumer, *Message)
ErrChan = chan error // read only channel. non block. store error of kafka if not full.

TopicPartition = kafka.TopicPartition
TopicSpecification = kafka.TopicSpecification
Expand All @@ -40,41 +41,11 @@ type (
Message = kafka.Message
)

var GenDefaultConsumeEventFunc = func(
maxWaitReadTimeout time.Duration,
processEventFunc ProcessEventFunc,
) ConsumeEventFuncGoroutine {
return func(ctx context.Context, kafkaConsumer *kafka.Consumer) {
defer func() {
err := kafkaConsumer.Close()
if err != nil {
fmt.Printf("Err while close consumer: %v\n", err)
}
}()

for {
select {
case <-ctx.Done():
return

default:
msg, err := kafkaConsumer.ReadMessage(maxWaitReadTimeout)
if err != nil {
// Errors are informational and automatically handled by the consumer
continue
}

processEventFunc(ctx, kafkaConsumer, msg)
}
}
}
}

func GenDefaultProcessMsgWithZapLogger[T any](
log *zap.Logger,
businessLogicFunc func(string, any) bool,
) ProcessEventFunc {
return func(ctx context.Context, kafkaConsumer *kafka.Consumer, msg *Message) {
return func(kafkaConsumer *kafka.Consumer, msg *Message) {
key, data, err := UnmarshalKafkaValueToStruct[T](msg)
if err != nil {
log.Error("UnmarshalKafkaValueToStruct problem", zap.Error(err))
Expand Down
78 changes: 54 additions & 24 deletions kafka/confluent/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,58 +12,88 @@ import (

type (
consumer struct {
*kafka.Consumer
configMap *kafka.ConfigMap
adminClient confluent_kafka.Adminer
configMap *kafka.ConfigMap

ctx context.Context
cancel context.CancelFunc
}
)

func New(configMap *kafka.ConfigMap) confluent_kafka.Consumer {
ctx, cancel := context.WithCancel(context.Background())
return &consumer{
ctx: ctx,
cancel: cancel,
configMap: configMap,
}
}

func (c *consumer) Stop() error {
c.adminClient.Stop()
return c.Consumer.Close()
func (c *consumer) Stop() {
c.cancel()
}

func (c *consumer) Start(
ctx context.Context,
createTopics []kafka.TopicSpecification,
subTopics []string,
consFunc confluent_kafka.ConsumeEventFuncGoroutine,
) error {
maxWaitReadTimeout time.Duration,
consumeEventFunc confluent_kafka.ProcessEventFunc,
) (error, confluent_kafka.ErrChan) {
c.ctx, c.cancel = context.WithCancel(ctx)

errChan := make(confluent_kafka.ErrChan)

if len(createTopics) > 0 {
var err error
c.adminClient, err = confluent_kafka.NewAdminKafkaClient(c.configMap)
adminClient, err := confluent_kafka.NewAdminKafkaClient(c.configMap)
if err != nil {
return fmt.Errorf("could not create kafka admin client. err: %w", err)
return fmt.Errorf("could not create kafka admin client. err: %w", err), errChan
}

if err = c.adminClient.CreateTopics(ctx, createTopics); err != nil {
return fmt.Errorf("could not CreateTopics via kafka admin client. err: %w", err)
defer adminClient.Stop()

if err = adminClient.CreateTopics(c.ctx, createTopics); err != nil {
return fmt.Errorf("could not CreateTopics via kafka admin client. err: %w", err), errChan
}
}

kafkaConsumer, err := kafka.NewConsumer(c.configMap)
if err != nil {
return fmt.Errorf("could not create new kafka consumer. err: %w", err)
if err != nil || kafkaConsumer == nil {
return fmt.Errorf("could not create new kafka consumer. err: %w", err), errChan
}

if err = kafkaConsumer.SubscribeTopics(subTopics, nil); err != nil {
return fmt.Errorf("could not subscribe to topics: %s. err: %w", subTopics, err)
return fmt.Errorf("could not subscribe to topics: %s. err: %w", subTopics, err), errChan
}

go consFunc(ctx, kafkaConsumer)
go func() {
defer func() {
if err = kafkaConsumer.Close(); err != nil {
fmt.Printf("c.kafkaConsumer.Close(): %v\n", err)
}

return nil
}
close(errChan)
}()

func (c *consumer) ReadMessage(timeout time.Duration) (*confluent_kafka.Message, error) {
return c.Consumer.ReadMessage(timeout)
}
func (c *consumer) CommitMessage(msg *confluent_kafka.Message) ([]confluent_kafka.TopicPartition, error) {
return c.Consumer.CommitMessage(msg)
for {
select {
case <-ctx.Done():
return

default:
msg, err := kafkaConsumer.ReadMessage(maxWaitReadTimeout)
if err != nil {
// non-blocking write to the chan, if chan full - drop error.
select {
case errChan <- err:
default:
}
continue
}

consumeEventFunc(kafkaConsumer, msg)
}
}
}()

return nil, errChan
}
Loading

0 comments on commit 315d38a

Please sign in to comment.