From dc832e0df273a882f06348f42ad50fe9c7ffc7b9 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Mon, 18 Dec 2023 12:46:12 -0800 Subject: [PATCH] [chore] move kafkareceiver to generated lifecycle tests (#29946) Related to https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/27850 --- receiver/kafkareceiver/factory_test.go | 12 +- .../kafkareceiver/generated_component_test.go | 103 ++++++++++++ receiver/kafkareceiver/kafka_receiver.go | 146 +++++++----------- receiver/kafkareceiver/kafka_receiver_test.go | 43 +++--- receiver/kafkareceiver/metadata.yaml | 4 + 5 files changed, 193 insertions(+), 115 deletions(-) create mode 100644 receiver/kafkareceiver/generated_component_test.go diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index 25d999886b90..45386eca9657 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -35,9 +35,9 @@ func TestCreateTracesReceiver(t *testing.T) { cfg.ProtocolVersion = "2.0.0" f := kafkaReceiverFactory{tracesUnmarshalers: defaultTracesUnmarshalers()} r, err := f.createTracesReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + require.NoError(t, err) // no available broker - require.Error(t, err) - assert.Nil(t, r) + require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } func TestCreateTracesReceiver_error(t *testing.T) { @@ -79,9 +79,9 @@ func TestCreateMetricsReceiver(t *testing.T) { cfg.ProtocolVersion = "2.0.0" f := kafkaReceiverFactory{metricsUnmarshalers: defaultMetricsUnmarshalers()} r, err := f.createMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + require.NoError(t, err) // no available broker - require.Error(t, err) - assert.Nil(t, r) + require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } func TestCreateMetricsReceiver_error(t *testing.T) { @@ -123,9 +123,9 @@ func TestCreateLogsReceiver(t *testing.T) { cfg.ProtocolVersion = "2.0.0" f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers("Test Version", zap.NewNop())} r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) + require.NoError(t, err) // no available broker - require.Error(t, err) - assert.Nil(t, r) + require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } func TestCreateLogsReceiver_error(t *testing.T) { diff --git a/receiver/kafkareceiver/generated_component_test.go b/receiver/kafkareceiver/generated_component_test.go new file mode 100644 index 000000000000..c7cce24e803f --- /dev/null +++ b/receiver/kafkareceiver/generated_component_test.go @@ -0,0 +1,103 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package kafkareceiver + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" + + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +// assertNoErrorHost implements a component.Host that asserts that there were no errors. +type assertNoErrorHost struct { + component.Host + *testing.T +} + +var _ component.Host = (*assertNoErrorHost)(nil) + +// newAssertNoErrorHost returns a new instance of assertNoErrorHost. +func newAssertNoErrorHost(t *testing.T) component.Host { + return &assertNoErrorHost{ + componenttest.NewNopHost(), + t, + } +} + +func (aneh *assertNoErrorHost) ReportFatalError(err error) { + assert.NoError(aneh, err) +} + +func Test_ComponentLifecycle(t *testing.T) { + factory := NewFactory() + + tests := []struct { + name string + createFn func(ctx context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) + }{ + + { + name: "logs", + createFn: func(ctx context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) { + return factory.CreateLogsReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, + + { + name: "metrics", + createFn: func(ctx context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) { + return factory.CreateMetricsReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, + + { + name: "traces", + createFn: func(ctx context.Context, set receiver.CreateSettings, cfg component.Config) (component.Component, error) { + return factory.CreateTracesReceiver(ctx, set, cfg, consumertest.NewNop()) + }, + }, + } + + cm, err := confmaptest.LoadConf("metadata.yaml") + require.NoError(t, err) + cfg := factory.CreateDefaultConfig() + sub, err := cm.Sub("tests::config") + require.NoError(t, err) + require.NoError(t, component.UnmarshalConfig(sub, cfg)) + + for _, test := range tests { + t.Run(test.name+"-shutdown", func(t *testing.T) { + c, err := test.createFn(context.Background(), receivertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + err = c.Shutdown(context.Background()) + require.NoError(t, err) + }) + + t.Run(test.name+"-lifecycle", func(t *testing.T) { + + // TODO support lifecycle + t.SkipNow() + + firstRcvr, err := test.createFn(context.Background(), receivertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + host := newAssertNoErrorHost(t) + require.NoError(t, err) + require.NoError(t, firstRcvr.Start(context.Background(), host)) + require.NoError(t, firstRcvr.Shutdown(context.Background())) + secondRcvr, err := test.createFn(context.Background(), receivertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + require.NoError(t, secondRcvr.Start(context.Background(), host)) + require.NoError(t, secondRcvr.Shutdown(context.Background())) + }) + } +} diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 8ae63f8c9e26..0e40c6ad6f08 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -28,6 +28,7 @@ var errInvalidInitialOffset = fmt.Errorf("invalid initial offset") // kafkaTracesConsumer uses sarama to consume and handle messages from kafka. type kafkaTracesConsumer struct { + config Config consumerGroup sarama.ConsumerGroup nextConsumer consumer.Traces topics []string @@ -44,6 +45,7 @@ type kafkaTracesConsumer struct { // kafkaMetricsConsumer uses sarama to consume and handle messages from kafka. type kafkaMetricsConsumer struct { + config Config consumerGroup sarama.ConsumerGroup nextConsumer consumer.Metrics topics []string @@ -60,6 +62,7 @@ type kafkaMetricsConsumer struct { // kafkaLogsConsumer uses sarama to consume and handle messages from kafka. type kafkaLogsConsumer struct { + config Config consumerGroup sarama.ConsumerGroup nextConsumer consumer.Logs topics []string @@ -83,37 +86,8 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshaler T return nil, errUnrecognizedEncoding } - c := sarama.NewConfig() - c.ClientID = config.ClientID - c.Metadata.Full = config.Metadata.Full - c.Metadata.Retry.Max = config.Metadata.Retry.Max - c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff - c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable - c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval - if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil { - c.Consumer.Offsets.Initial = initialOffset - } else { - return nil, err - } - if config.ResolveCanonicalBootstrapServersOnly { - c.Net.ResolveCanonicalBootstrapServers = true - } - if config.ProtocolVersion != "" { - version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) - if err != nil { - return nil, err - } - c.Version = version - } - if err := kafka.ConfigureAuthentication(config.Authentication, c); err != nil { - return nil, err - } - client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c) - if err != nil { - return nil, err - } return &kafkaTracesConsumer{ - consumerGroup: client, + config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, unmarshaler: unmarshaler, @@ -125,6 +99,32 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshaler T }, nil } +func createKafkaClient(config Config) (sarama.ConsumerGroup, error) { + saramaConfig := sarama.NewConfig() + saramaConfig.ClientID = config.ClientID + saramaConfig.Metadata.Full = config.Metadata.Full + saramaConfig.Metadata.Retry.Max = config.Metadata.Retry.Max + saramaConfig.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff + saramaConfig.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable + saramaConfig.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval + var err error + if saramaConfig.Consumer.Offsets.Initial, err = toSaramaInitialOffset(config.InitialOffset); err != nil { + return nil, err + } + if config.ResolveCanonicalBootstrapServersOnly { + saramaConfig.Net.ResolveCanonicalBootstrapServers = true + } + if config.ProtocolVersion != "" { + if saramaConfig.Version, err = sarama.ParseKafkaVersion(config.ProtocolVersion); err != nil { + return nil, err + } + } + if err := kafka.ConfigureAuthentication(config.Authentication, saramaConfig); err != nil { + return nil, err + } + return sarama.NewConsumerGroup(config.Brokers, config.GroupID, saramaConfig) +} + func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) error { ctx, cancel := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancel @@ -136,6 +136,12 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro if err != nil { return err } + // consumerGroup may be set in tests to inject fake implementation. + if c.consumerGroup == nil { + if c.consumerGroup, err = createKafkaClient(c.config); err != nil { + return err + } + } consumerGroup := &tracesConsumerGroupHandler{ logger: c.settings.Logger, unmarshaler: c.unmarshaler, @@ -178,6 +184,9 @@ func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.Co } func (c *kafkaTracesConsumer) Shutdown(context.Context) error { + if c.cancelConsumeLoop == nil { + return nil + } c.cancelConsumeLoop() return c.consumerGroup.Close() } @@ -187,34 +196,8 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshaler return nil, errUnrecognizedEncoding } - c := sarama.NewConfig() - c.ClientID = config.ClientID - c.Metadata.Full = config.Metadata.Full - c.Metadata.Retry.Max = config.Metadata.Retry.Max - c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff - c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable - c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval - if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil { - c.Consumer.Offsets.Initial = initialOffset - } else { - return nil, err - } - if config.ProtocolVersion != "" { - version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) - if err != nil { - return nil, err - } - c.Version = version - } - if err := kafka.ConfigureAuthentication(config.Authentication, c); err != nil { - return nil, err - } - client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c) - if err != nil { - return nil, err - } return &kafkaMetricsConsumer{ - consumerGroup: client, + config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, unmarshaler: unmarshaler, @@ -237,6 +220,12 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err if err != nil { return err } + // consumerGroup may be set in tests to inject fake implementation. + if c.consumerGroup == nil { + if c.consumerGroup, err = createKafkaClient(c.config); err != nil { + return err + } + } metricsConsumerGroup := &metricsConsumerGroupHandler{ logger: c.settings.Logger, unmarshaler: c.unmarshaler, @@ -279,6 +268,9 @@ func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.C } func (c *kafkaMetricsConsumer) Shutdown(context.Context) error { + if c.cancelConsumeLoop == nil { + return nil + } c.cancelConsumeLoop() return c.consumerGroup.Close() } @@ -287,35 +279,9 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshaler Log if unmarshaler == nil { return nil, errUnrecognizedEncoding } - c := sarama.NewConfig() - c.ClientID = config.ClientID - c.Metadata.Full = config.Metadata.Full - c.Metadata.Retry.Max = config.Metadata.Retry.Max - c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff - c.Consumer.Offsets.AutoCommit.Enable = config.AutoCommit.Enable - c.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval - if initialOffset, err := toSaramaInitialOffset(config.InitialOffset); err == nil { - c.Consumer.Offsets.Initial = initialOffset - } else { - return nil, err - } - if config.ProtocolVersion != "" { - var version sarama.KafkaVersion - version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) - if err != nil { - return nil, err - } - c.Version = version - } - if err := kafka.ConfigureAuthentication(config.Authentication, c); err != nil { - return nil, err - } - client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c) - if err != nil { - return nil, err - } + return &kafkaLogsConsumer{ - consumerGroup: client, + config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, unmarshaler: unmarshaler, @@ -338,7 +304,12 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error if err != nil { return err } - + // consumerGroup may be set in tests to inject fake implementation. + if c.consumerGroup == nil { + if c.consumerGroup, err = createKafkaClient(c.config); err != nil { + return err + } + } logsConsumerGroup := &logsConsumerGroupHandler{ logger: c.settings.Logger, unmarshaler: c.unmarshaler, @@ -381,6 +352,9 @@ func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.Cons } func (c *kafkaLogsConsumer) Shutdown(context.Context) error { + if c.cancelConsumeLoop == nil { + return nil + } c.cancelConsumeLoop() return c.consumerGroup.Close() } diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index be46036d4ff7..ca4b5f544dc1 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -39,8 +39,9 @@ func TestNewTracesReceiver_version_err(t *testing.T) { } unmarshaler := defaultTracesUnmarshalers()[c.Encoding] r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) - assert.Nil(t, r) } func TestNewTracesReceiver_encoding_err(t *testing.T) { @@ -71,9 +72,9 @@ func TestNewTracesReceiver_err_auth_type(t *testing.T) { } unmarshaler := defaultTracesUnmarshalers()[c.Encoding] r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) - assert.Error(t, err) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) assert.Contains(t, err.Error(), "failed to load TLS config") - assert.Nil(t, r) } func TestNewTracesReceiver_initial_offset_err(t *testing.T) { @@ -83,8 +84,9 @@ func TestNewTracesReceiver_initial_offset_err(t *testing.T) { } unmarshaler := defaultTracesUnmarshalers()[c.Encoding] r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) require.Error(t, err) - assert.Nil(t, r) assert.EqualError(t, err, errInvalidInitialOffset.Error()) } @@ -304,8 +306,9 @@ func TestNewMetricsReceiver_version_err(t *testing.T) { } unmarshaler := defaultMetricsUnmarshalers()[c.Encoding] r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) - assert.Nil(t, r) } func TestNewMetricsReceiver_encoding_err(t *testing.T) { @@ -313,9 +316,8 @@ func TestNewMetricsReceiver_encoding_err(t *testing.T) { Encoding: "foo", } unmarshaler := defaultMetricsUnmarshalers()[c.Encoding] - r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) + _, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) require.Error(t, err) - assert.Nil(t, r) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) } @@ -336,9 +338,10 @@ func TestNewMetricsExporter_err_auth_type(t *testing.T) { } unmarshaler := defaultMetricsUnmarshalers()[c.Encoding] r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to load TLS config") - assert.Nil(t, r) } func TestNewMetricsReceiver_initial_offset_err(t *testing.T) { @@ -348,22 +351,12 @@ func TestNewMetricsReceiver_initial_offset_err(t *testing.T) { } unmarshaler := defaultMetricsUnmarshalers()[c.Encoding] r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) require.Error(t, err) - assert.Nil(t, r) assert.EqualError(t, err, errInvalidInitialOffset.Error()) } -func TestMetricsReceiverStart(t *testing.T) { - c := kafkaMetricsConsumer{ - nextConsumer: consumertest.NewNop(), - settings: receivertest.NewNopCreateSettings(), - consumerGroup: &testConsumerGroup{}, - } - - require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, c.Shutdown(context.Background())) -} - func TestMetricsReceiverStartConsume(t *testing.T) { c := kafkaMetricsConsumer{ nextConsumer: consumertest.NewNop(), @@ -567,8 +560,9 @@ func TestNewLogsReceiver_version_err(t *testing.T) { } unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[c.Encoding] r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) - assert.Nil(t, r) } func TestNewLogsReceiver_encoding_err(t *testing.T) { @@ -599,9 +593,10 @@ func TestNewLogsExporter_err_auth_type(t *testing.T) { } unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[c.Encoding] r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to load TLS config") - assert.Nil(t, r) } func TestNewLogsReceiver_initial_offset_err(t *testing.T) { @@ -611,8 +606,9 @@ func TestNewLogsReceiver_initial_offset_err(t *testing.T) { } unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[c.Encoding] r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) + require.NoError(t, err) + err = r.Start(context.Background(), componenttest.NewNopHost()) require.Error(t, err) - assert.Nil(t, r) assert.EqualError(t, err, errInvalidInitialOffset.Error()) } @@ -653,6 +649,7 @@ func TestLogsReceiver_error(t *testing.T) { nextConsumer: consumertest.NewNop(), settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, + config: *createDefaultConfig().(*Config), } require.NoError(t, c.Start(context.Background(), componenttest.NewNopHost())) diff --git a/receiver/kafkareceiver/metadata.yaml b/receiver/kafkareceiver/metadata.yaml index 58c399f2171a..f8886ef0b32f 100644 --- a/receiver/kafkareceiver/metadata.yaml +++ b/receiver/kafkareceiver/metadata.yaml @@ -13,3 +13,7 @@ status: - sumo codeowners: active: [pavolloffay, MovieStoreGuy] + +tests: + config: + skip_lifecycle: true