Skip to content

Commit

Permalink
[chore] move kafkareceiver to generated lifecycle tests (#29946)
Browse files Browse the repository at this point in the history
Related to
#27850
  • Loading branch information
atoulme authored Dec 18, 2023
1 parent 91fa118 commit dc832e0
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 115 deletions.
12 changes: 6 additions & 6 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func TestCreateTracesReceiver(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
f := kafkaReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshalers()}
r, err := f.createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
require.NoError(t, err)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
}

func TestCreateTracesReceiver_error(t *testing.T) {
Expand Down Expand Up @@ -79,9 +79,9 @@ func TestCreateMetricsReceiver(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
f := kafkaReceiverFactory{metricsUnmarshalers: defaultMetricsUnmarshalers()}
r, err := f.createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
require.NoError(t, err)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
}

func TestCreateMetricsReceiver_error(t *testing.T) {
Expand Down Expand Up @@ -123,9 +123,9 @@ func TestCreateLogsReceiver(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers("Test Version", zap.NewNop())}
r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
require.NoError(t, err)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
require.Error(t, r.Start(context.Background(), componenttest.NewNopHost()))
}

func TestCreateLogsReceiver_error(t *testing.T) {
Expand Down
103 changes: 103 additions & 0 deletions receiver/kafkareceiver/generated_component_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

146 changes: 60 additions & 86 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var errInvalidInitialOffset = fmt.Errorf("invalid initial offset")

// kafkaTracesConsumer uses sarama to consume and handle messages from kafka.
type kafkaTracesConsumer struct {
config Config
consumerGroup sarama.ConsumerGroup
nextConsumer consumer.Traces
topics []string
Expand All @@ -44,6 +45,7 @@ type kafkaTracesConsumer struct {

// kafkaMetricsConsumer uses sarama to consume and handle messages from kafka.
type kafkaMetricsConsumer struct {
config Config
consumerGroup sarama.ConsumerGroup
nextConsumer consumer.Metrics
topics []string
Expand All @@ -60,6 +62,7 @@ type kafkaMetricsConsumer struct {

// kafkaLogsConsumer uses sarama to consume and handle messages from kafka.
type kafkaLogsConsumer struct {
config Config
consumerGroup sarama.ConsumerGroup
nextConsumer consumer.Logs
topics []string
Expand All @@ -83,37 +86,8 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshaler T
return nil, errUnrecognizedEncoding
}

c := sarama.NewConfig()
c.ClientID = config.ClientID
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable
c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval
if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil {
c.Consumer.Offsets.Initial = initialOffset
} else {
return nil, err
}
if config.ResolveCanonicalBootstrapServersOnly {
c.Net.ResolveCanonicalBootstrapServers = true
}
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}
if err := kafka.ConfigureAuthentication(config.Authentication, c); err != nil {
return nil, err
}
client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c)
if err != nil {
return nil, err
}
return &kafkaTracesConsumer{
consumerGroup: client,
config: config,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaler: unmarshaler,
Expand All @@ -125,6 +99,32 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshaler T
}, nil
}

func createKafkaClient(config Config) (sarama.ConsumerGroup, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.ClientID = config.ClientID
saramaConfig.Metadata.Full = config.Metadata.Full
saramaConfig.Metadata.Retry.Max = config.Metadata.Retry.Max
saramaConfig.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
saramaConfig.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable
saramaConfig.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval
var err error
if saramaConfig.Consumer.Offsets.Initial, err = toSaramaInitialOffset(config.InitialOffset); err != nil {
return nil, err
}
if config.ResolveCanonicalBootstrapServersOnly {
saramaConfig.Net.ResolveCanonicalBootstrapServers = true
}
if config.ProtocolVersion != "" {
if saramaConfig.Version, err = sarama.ParseKafkaVersion(config.ProtocolVersion); err != nil {
return nil, err
}
}
if err := kafka.ConfigureAuthentication(config.Authentication, saramaConfig); err != nil {
return nil, err
}
return sarama.NewConsumerGroup(config.Brokers, config.GroupID, saramaConfig)
}

func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancel
Expand All @@ -136,6 +136,12 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro
if err != nil {
return err
}
// consumerGroup may be set in tests to inject fake implementation.
if c.consumerGroup == nil {
if c.consumerGroup, err = createKafkaClient(c.config); err != nil {
return err
}
}
consumerGroup := &tracesConsumerGroupHandler{
logger: c.settings.Logger,
unmarshaler: c.unmarshaler,
Expand Down Expand Up @@ -178,6 +184,9 @@ func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.Co
}

func (c *kafkaTracesConsumer) Shutdown(context.Context) error {
if c.cancelConsumeLoop == nil {
return nil
}
c.cancelConsumeLoop()
return c.consumerGroup.Close()
}
Expand All @@ -187,34 +196,8 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshaler
return nil, errUnrecognizedEncoding
}

c := sarama.NewConfig()
c.ClientID = config.ClientID
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable
c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval
if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil {
c.Consumer.Offsets.Initial = initialOffset
} else {
return nil, err
}
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}
if err := kafka.ConfigureAuthentication(config.Authentication, c); err != nil {
return nil, err
}
client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c)
if err != nil {
return nil, err
}
return &kafkaMetricsConsumer{
consumerGroup: client,
config: config,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaler: unmarshaler,
Expand All @@ -237,6 +220,12 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err
if err != nil {
return err
}
// consumerGroup may be set in tests to inject fake implementation.
if c.consumerGroup == nil {
if c.consumerGroup, err = createKafkaClient(c.config); err != nil {
return err
}
}
metricsConsumerGroup := &metricsConsumerGroupHandler{
logger: c.settings.Logger,
unmarshaler: c.unmarshaler,
Expand Down Expand Up @@ -279,6 +268,9 @@ func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.C
}

func (c *kafkaMetricsConsumer) Shutdown(context.Context) error {
if c.cancelConsumeLoop == nil {
return nil
}
c.cancelConsumeLoop()
return c.consumerGroup.Close()
}
Expand All @@ -287,35 +279,9 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshaler Log
if unmarshaler == nil {
return nil, errUnrecognizedEncoding
}
c := sarama.NewConfig()
c.ClientID = config.ClientID
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable
c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval
if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil {
c.Consumer.Offsets.Initial = initialOffset
} else {
return nil, err
}
if config.ProtocolVersion != "" {
var version sarama.KafkaVersion
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}
if err := kafka.ConfigureAuthentication(config.Authentication, c); err != nil {
return nil, err
}
client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c)
if err != nil {
return nil, err
}

return &kafkaLogsConsumer{
consumerGroup: client,
config: config,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
unmarshaler: unmarshaler,
Expand All @@ -338,7 +304,12 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error
if err != nil {
return err
}

// consumerGroup may be set in tests to inject fake implementation.
if c.consumerGroup == nil {
if c.consumerGroup, err = createKafkaClient(c.config); err != nil {
return err
}
}
logsConsumerGroup := &logsConsumerGroupHandler{
logger: c.settings.Logger,
unmarshaler: c.unmarshaler,
Expand Down Expand Up @@ -381,6 +352,9 @@ func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.Cons
}

func (c *kafkaLogsConsumer) Shutdown(context.Context) error {
if c.cancelConsumeLoop == nil {
return nil
}
c.cancelConsumeLoop()
return c.consumerGroup.Close()
}
Expand Down
Loading

0 comments on commit dc832e0

Please sign in to comment.