Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Rate limit for Kafka and initial offset support and refactor config #829

Merged
merged 12 commits into from
Aug 13, 2020
32 changes: 29 additions & 3 deletions api/event-source.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 52 additions & 3 deletions api/event-source.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 13 additions & 6 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 42 additions & 27 deletions eventsources/sources/kafka/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"strconv"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/argoproj/argo-events/common"
Expand Down Expand Up @@ -81,14 +82,10 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
}

func (listener *EventListener) consumerGroupConsumer(ctx context.Context, log *zap.SugaredLogger, kafkaEventSource *v1alpha1.KafkaEventSource, dispatch func([]byte) error) error {
config := sarama.NewConfig()

version, err := sarama.ParseKafkaVersion(kafkaEventSource.ConsumerGroup.KafkaVersion)
config, err := getSaramaConfig(kafkaEventSource, log)
if err != nil {
log.Errorf("Error parsing Kafka version: %v", err)
return err
}
config.Version = version

switch kafkaEventSource.ConsumerGroup.RebalanceStrategy {
case "sticky":
Expand All @@ -102,15 +99,6 @@ func (listener *EventListener) consumerGroupConsumer(ctx context.Context, log *z
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
}

if kafkaEventSource.TLS != nil {
tlsConfig, err := common.GetTLSConfig(kafkaEventSource.TLS.CACertPath, kafkaEventSource.TLS.ClientCertPath, kafkaEventSource.TLS.ClientKeyPath)
if err != nil {
return errors.Wrap(err, "failed to get the tls configuration")
}
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}

consumer := Consumer{
ready: make(chan bool),
dispatch: dispatch,
Expand Down Expand Up @@ -169,20 +157,10 @@ func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.Sugared
log.Info("connecting to Kafka cluster...")
if err := sources.Connect(common.GetConnectionBackoff(kafkaEventSource.ConnectionBackoff), func() error {
var err error
config := sarama.NewConfig()

if kafkaEventSource.TLS != nil {
tlsConfig, err := common.GetTLSConfig(kafkaEventSource.TLS.CACertPath, kafkaEventSource.TLS.ClientCertPath, kafkaEventSource.TLS.ClientKeyPath)
if err != nil {
return errors.Wrap(err, "failed to get the tls configuration")
}
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
} else {
consumer, err = sarama.NewConsumer([]string{kafkaEventSource.URL}, nil)
if err != nil {
return err
}
config, err := getSaramaConfig(kafkaEventSource, log)
if err != nil {
return err
}

consumer, err = sarama.NewConsumer([]string{kafkaEventSource.URL}, config)
Expand Down Expand Up @@ -259,6 +237,37 @@ func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.Sugared
}
}

func getSaramaConfig(kafkaEventSource *v1alpha1.KafkaEventSource, log *zap.SugaredLogger) (*sarama.Config, error) { //nolint:interfacer
config := sarama.NewConfig()

if kafkaEventSource.Version == "" {
config.Version = sarama.V1_0_0_0
} else {
version, err := sarama.ParseKafkaVersion(kafkaEventSource.Version)
if err != nil {
log.Errorf("Error parsing Kafka version: %v", err)
return nil, err
}
config.Version = version
}

if kafkaEventSource.TLS != nil {
tlsConfig, err := common.GetTLSConfig(kafkaEventSource.TLS.CACertPath, kafkaEventSource.TLS.ClientCertPath, kafkaEventSource.TLS.ClientKeyPath)
if err != nil {
return nil, errors.Wrap(err, "failed to get the tls configuration")
}
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Enable = true
}

if kafkaEventSource.ConsumerGroup != nil {
if kafkaEventSource.ConsumerGroup.Oldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
}
}
return config, nil
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
Expand Down Expand Up @@ -310,6 +319,12 @@ func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, clai
} else {
session.MarkMessage(message, "")
}
if consumer.kafkaEventSource.LimitEventsPerSecond > 0 {
//1000000000 is 1 second in nanoseconds
d := (1000000000 / time.Duration(consumer.kafkaEventSource.LimitEventsPerSecond) * time.Nanosecond) * time.Nanosecond
consumer.logger.Infof("Sleeping for: %v.", d)
time.Sleep(d)
}
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion examples/event-sources/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ spec:
# Use a consumer group, if this is used you do not need to specify a "partition: <id>"
# consumerGroup:
# groupName: test-group
# kafkaVersion: "2.5.0"
# oldest: false
# rebalanceStrategy: range
# limitEventsPerSecond: 1
# version: "2.5.0"

# example-tls:
# url: "kafka.argo-events:9092"
Expand Down
Loading