diff --git a/CHANGELOG.md b/CHANGELOG.md index 18e590b5c37a..0ac2f44ec227 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## Unreleased +## 🚀 New components 🚀 + +- `kafkametricsreceiver` new receiver component for collecting metrics about a kafka cluster - primarily lag and offset. [configuration instructions](receiver/kafkametricsreceiver/README.md) + + ## v0.24.0 # 🎉 OpenTelemetry Collector Contrib v0.24.0 (Beta) 🎉 diff --git a/receiver/kafkametricsreceiver/broker_scraper.go b/receiver/kafkametricsreceiver/broker_scraper.go index fcbbc0e47eef..4a7cfd6b1f6a 100644 --- a/receiver/kafkametricsreceiver/broker_scraper.go +++ b/receiver/kafkametricsreceiver/broker_scraper.go @@ -20,6 +20,7 @@ import ( "time" "github.com/Shopify/sarama" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/simple" "go.opentelemetry.io/collector/receiver/scraperhelper" @@ -29,15 +30,25 @@ import ( ) type brokerScraper struct { - client sarama.Client - logger *zap.Logger - config Config + client sarama.Client + logger *zap.Logger + config Config + saramaConfig *sarama.Config } func (s *brokerScraper) Name() string { return brokersScraperName } +func (s *brokerScraper) start(context.Context, component.Host) error { + client, err := newSaramaClient(s.config.Brokers, s.saramaConfig) + if err != nil { + return fmt.Errorf("failed to create client while starting brokers scraper: %w", err) + } + s.client = client + return nil +} + func (s *brokerScraper) shutdown(context.Context) error { if !s.client.Closed() { return s.client.Close() @@ -59,19 +70,16 @@ func (s *brokerScraper) scrape(context.Context) (pdata.ResourceMetricsSlice, err } func createBrokerScraper(_ context.Context, config Config, saramaConfig *sarama.Config, logger *zap.Logger) (scraperhelper.ResourceMetricsScraper, error) { - client, err := newSaramaClient(config.Brokers, saramaConfig) - if err != nil { - return nil, fmt.Errorf("failed to create sarama client: %w", err) - } s := brokerScraper{ - client: client, - logger: logger, - config: config, + logger: logger, + config: config, + saramaConfig: saramaConfig, } ms := scraperhelper.NewResourceMetricsScraper( s.Name(), s.scrape, scraperhelper.WithShutdown(s.shutdown), + scraperhelper.WithStart(s.start), ) return ms, nil } diff --git a/receiver/kafkametricsreceiver/broker_scraper_test.go b/receiver/kafkametricsreceiver/broker_scraper_test.go index 581b41ccb708..b8515f3c099f 100644 --- a/receiver/kafkametricsreceiver/broker_scraper_test.go +++ b/receiver/kafkametricsreceiver/broker_scraper_test.go @@ -63,18 +63,30 @@ func TestBrokerScraper_createBrokerScraper(t *testing.T) { sc := sarama.NewConfig() newSaramaClient = mockNewSaramaClient ms, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop()) - assert.Nil(t, err) + assert.NoError(t, err) assert.NotNil(t, ms) } -func TestBrokerScraper_createBrokerScraper_handles_client_error(t *testing.T) { +func TestBrokerScraperStart(t *testing.T) { + newSaramaClient = mockNewSaramaClient + sc := sarama.NewConfig() + ms, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop()) + assert.NotNil(t, ms) + assert.Nil(t, err) + err = ms.Start(context.Background(), nil) + assert.NoError(t, err) +} + +func TestBrokerScraper_startBrokerScraper_handles_client_error(t *testing.T) { newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) { return nil, fmt.Errorf("new client failed") } sc := sarama.NewConfig() ms, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop()) - assert.NotNil(t, err) - assert.Nil(t, ms) + assert.NotNil(t, ms) + assert.Nil(t, err) + err = ms.Start(context.Background(), nil) + assert.Error(t, err) } func TestBrokerScraper_scrape(t *testing.T) { @@ -98,6 +110,6 @@ func TestBrokersScraper_createBrokerScraper(t *testing.T) { sc := sarama.NewConfig() newSaramaClient = mockNewSaramaClient ms, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop()) - assert.Nil(t, err) + assert.NoError(t, err) assert.NotNil(t, ms) } diff --git a/receiver/kafkametricsreceiver/consumer_scraper.go b/receiver/kafkametricsreceiver/consumer_scraper.go index e4657632976c..b03aa6798356 100644 --- a/receiver/kafkametricsreceiver/consumer_scraper.go +++ b/receiver/kafkametricsreceiver/consumer_scraper.go @@ -21,6 +21,7 @@ import ( "time" "github.com/Shopify/sarama" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/simple" "go.opentelemetry.io/collector/receiver/scrapererror" @@ -36,6 +37,7 @@ type consumerScraper struct { groupFilter *regexp.Regexp topicFilter *regexp.Regexp clusterAdmin sarama.ClusterAdmin + saramaConfig *sarama.Config config Config } @@ -43,6 +45,23 @@ func (s *consumerScraper) Name() string { return consumersScraperName } +func (s *consumerScraper) start(context.Context, component.Host) error { + client, err := newSaramaClient(s.config.Brokers, s.saramaConfig) + if err != nil { + return fmt.Errorf("failed to create client while starting consumer scraper: %w", err) + } + clusterAdmin, err := newClusterAdmin(s.config.Brokers, s.saramaConfig) + if err != nil { + if client != nil { + _ = client.Close() + } + return fmt.Errorf("failed to create cluster admin while starting consumer scraper: %w", err) + } + s.client = client + s.clusterAdmin = clusterAdmin + return nil +} + func (s *consumerScraper) shutdown(_ context.Context) error { if !s.client.Closed() { return s.client.Close() @@ -163,25 +182,17 @@ func createConsumerScraper(_ context.Context, config Config, saramaConfig *saram if err != nil { return nil, fmt.Errorf("failed to compile topic filter: %w", err) } - client, err := newSaramaClient(config.Brokers, saramaConfig) - if err != nil { - return nil, fmt.Errorf("failed to create sarama client: %w", err) - } - clusterAdmin, err := newClusterAdmin(config.Brokers, saramaConfig) - if err != nil { - return nil, fmt.Errorf("failed to create sarama cluster admin: %w", err) - } s := consumerScraper{ - client: client, logger: logger, groupFilter: groupFilter, topicFilter: topicFilter, - clusterAdmin: clusterAdmin, config: config, + saramaConfig: saramaConfig, } return scraperhelper.NewResourceMetricsScraper( s.Name(), s.scrape, scraperhelper.WithShutdown(s.shutdown), + scraperhelper.WithStart(s.start), ), nil } diff --git a/receiver/kafkametricsreceiver/consumer_scraper_test.go b/receiver/kafkametricsreceiver/consumer_scraper_test.go index 875e99551ebf..1c0d7be25ca0 100644 --- a/receiver/kafkametricsreceiver/consumer_scraper_test.go +++ b/receiver/kafkametricsreceiver/consumer_scraper_test.go @@ -61,29 +61,49 @@ func TestConsumerScraper_createConsumerScraper(t *testing.T) { newSaramaClient = mockNewSaramaClient newClusterAdmin = mockNewClusterAdmin ms, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop()) - assert.Nil(t, err) + assert.NoError(t, err) assert.NotNil(t, ms) } -func TestConsumerScraper_createScraper_handles_client_error(t *testing.T) { +func TestConsumerScraper_startScraper_handles_client_error(t *testing.T) { newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) { return nil, fmt.Errorf("new client failed") } sc := sarama.NewConfig() ms, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop()) - assert.NotNil(t, err) - assert.Nil(t, ms) + assert.NotNil(t, ms) + assert.Nil(t, err) + err = ms.Start(context.Background(), nil) + assert.Error(t, err) } -func TestConsumerScraper_createScraper_handles_clusterAdmin_error(t *testing.T) { - newSaramaClient = mockNewSaramaClient +func TestConsumerScraper_startScraper_handles_clusterAdmin_error(t *testing.T) { + newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) { + client := newMockClient() + client.Mock. + On("Close").Return(nil) + return client, nil + } newClusterAdmin = func(addrs []string, conf *sarama.Config) (sarama.ClusterAdmin, error) { return nil, fmt.Errorf("new cluster admin failed") } sc := sarama.NewConfig() ms, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop()) - assert.NotNil(t, err) - assert.Nil(t, ms) + assert.Nil(t, err) + assert.NotNil(t, ms) + err = ms.Start(context.Background(), nil) + assert.Error(t, err) +} + +func TestConsumerScraperStart(t *testing.T) { + newSaramaClient = mockNewSaramaClient + newClusterAdmin = mockNewClusterAdmin + sc := sarama.NewConfig() + ms, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop()) + assert.Nil(t, err) + assert.NotNil(t, ms) + err = ms.Start(context.Background(), nil) + assert.NoError(t, err) } func TestConsumerScraper_createScraper_handles_invalid_topic_match(t *testing.T) { @@ -93,7 +113,7 @@ func TestConsumerScraper_createScraper_handles_invalid_topic_match(t *testing.T) ms, err := createConsumerScraper(context.Background(), Config{ TopicMatch: "[", }, sc, zap.NewNop()) - assert.NotNil(t, err) + assert.Error(t, err) assert.Nil(t, ms) } @@ -104,7 +124,7 @@ func TestConsumerScraper_createScraper_handles_invalid_group_match(t *testing.T) ms, err := createConsumerScraper(context.Background(), Config{ GroupMatch: "[", }, sc, zap.NewNop()) - assert.NotNil(t, err) + assert.Error(t, err) assert.Nil(t, ms) } @@ -118,7 +138,7 @@ func TestConsumerScraper_scrape(t *testing.T) { groupFilter: filter, } ms, err := cs.scrape(context.Background()) - assert.Nil(t, err) + assert.NoError(t, err) assert.NotNil(t, ms) } @@ -135,7 +155,7 @@ func TestConsumerScraper_scrape_handlesListTopicError(t *testing.T) { groupFilter: filter, } _, err := cs.scrape(context.Background()) - assert.NotNil(t, err) + assert.Error(t, err) } func TestConsumerScraper_scrape_handlesListConsumerGroupError(t *testing.T) { @@ -150,7 +170,7 @@ func TestConsumerScraper_scrape_handlesListConsumerGroupError(t *testing.T) { groupFilter: filter, } _, err := cs.scrape(context.Background()) - assert.NotNil(t, err) + assert.Error(t, err) } func TestConsumerScraper_scrape_handlesDescribeConsumerError(t *testing.T) { @@ -165,7 +185,7 @@ func TestConsumerScraper_scrape_handlesDescribeConsumerError(t *testing.T) { groupFilter: filter, } _, err := cs.scrape(context.Background()) - assert.NotNil(t, err) + assert.Error(t, err) } func TestConsumerScraper_scrape_handlesOffsetPartialError(t *testing.T) { @@ -183,7 +203,7 @@ func TestConsumerScraper_scrape_handlesOffsetPartialError(t *testing.T) { } s, err := cs.scrape(context.Background()) assert.NotNil(t, s) - assert.NotNil(t, err) + assert.Error(t, err) } func TestConsumerScraper_scrape_handlesPartitionPartialError(t *testing.T) { @@ -201,5 +221,5 @@ func TestConsumerScraper_scrape_handlesPartitionPartialError(t *testing.T) { } s, err := cs.scrape(context.Background()) assert.NotNil(t, s) - assert.NotNil(t, err) + assert.Error(t, err) } diff --git a/receiver/kafkametricsreceiver/kafkametrics_e2e_test.go b/receiver/kafkametricsreceiver/kafkametrics_e2e_test.go index d69b7d9ff954..6d16464a0fc6 100644 --- a/receiver/kafkametricsreceiver/kafkametrics_e2e_test.go +++ b/receiver/kafkametricsreceiver/kafkametrics_e2e_test.go @@ -56,8 +56,8 @@ func TestIntegrationSingleNode(t *testing.T) { f := NewFactory() cfg := f.CreateDefaultConfig().(*Config) cfg.Scrapers = []string{ - "consumers", "brokers", + "consumers", "topics", } cfg.Brokers = []string{kafkaAddress} @@ -67,16 +67,12 @@ func TestIntegrationSingleNode(t *testing.T) { var receiver component.MetricsReceiver var err error - t.Logf("waiting to connect to kafka...") - require.Eventuallyf(t, - func() bool { - receiver, err = f.CreateMetricsReceiver(context.Background(), params, cfg, consumer) - return err == nil - }, 30*time.Second, 5*time.Second, - fmt.Sprintf("failed to create metrics receiver. %v", err), - ) - t.Logf("connected to kafka") - require.NoError(t, receiver.Start(context.Background(), &testHost{t: t})) + receiver, err = f.CreateMetricsReceiver(context.Background(), params, cfg, consumer) + require.NoError(t, err, "failed to create receiver") + require.Eventuallyf(t, func() bool { + err = receiver.Start(context.Background(), &testHost{t: t}) + return err == nil + }, 30*time.Second, 5*time.Second, fmt.Sprintf("failed to start metrics receiver. %v", err)) t.Logf("waiting for metrics...") require.Eventuallyf(t, func() bool { diff --git a/receiver/kafkametricsreceiver/receiver.go b/receiver/kafkametricsreceiver/receiver.go index 44ba3cbadb87..182afee3e92e 100644 --- a/receiver/kafkametricsreceiver/receiver.go +++ b/receiver/kafkametricsreceiver/receiver.go @@ -33,8 +33,10 @@ const ( consumersScraperName = "consumers" ) +type createKafkaScraper func(context.Context, Config, *sarama.Config, *zap.Logger) (scraperhelper.ResourceMetricsScraper, error) + var ( - allScrapers = map[string]func(context.Context, Config, *sarama.Config, *zap.Logger) (scraperhelper.ResourceMetricsScraper, error){ + allScrapers = map[string]createKafkaScraper{ brokersScraperName: createBrokerScraper, topicsScraperName: createTopicsScraper, consumersScraperName: createConsumerScraper, diff --git a/receiver/kafkametricsreceiver/topic_scraper.go b/receiver/kafkametricsreceiver/topic_scraper.go index eebd44d7bffa..f233efa710b6 100644 --- a/receiver/kafkametricsreceiver/topic_scraper.go +++ b/receiver/kafkametricsreceiver/topic_scraper.go @@ -21,6 +21,7 @@ import ( "time" "github.com/Shopify/sarama" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/consumer/simple" "go.opentelemetry.io/collector/receiver/scrapererror" @@ -42,6 +43,15 @@ func (s *topicScraper) Name() string { return topicsScraperName } +func (s *topicScraper) start(context.Context, component.Host) error { + client, err := newSaramaClient(s.config.Brokers, s.saramaConfig) + if err != nil { + return fmt.Errorf("failed to create client while starting topics scraper: %w", err) + } + s.client = client + return nil +} + func (s *topicScraper) shutdown(context.Context) error { if !s.client.Closed() { return s.client.Close() @@ -118,12 +128,7 @@ func createTopicsScraper(_ context.Context, config Config, saramaConfig *sarama. if err != nil { return nil, fmt.Errorf("failed to compile topic filter: %w", err) } - client, err := newSaramaClient(config.Brokers, saramaConfig) - if err != nil { - return nil, fmt.Errorf("failed to create sarama client: %w", err) - } s := topicScraper{ - client: client, logger: logger, topicFilter: topicFilter, saramaConfig: saramaConfig, @@ -133,5 +138,6 @@ func createTopicsScraper(_ context.Context, config Config, saramaConfig *sarama. s.Name(), s.scrape, scraperhelper.WithShutdown(s.shutdown), + scraperhelper.WithStart(s.start), ), nil } diff --git a/receiver/kafkametricsreceiver/topic_scraper_test.go b/receiver/kafkametricsreceiver/topic_scraper_test.go index 485ad39930af..ab9a3b5d18df 100644 --- a/receiver/kafkametricsreceiver/topic_scraper_test.go +++ b/receiver/kafkametricsreceiver/topic_scraper_test.go @@ -66,18 +66,30 @@ func TestTopicScraper_createsScraper(t *testing.T) { sc := sarama.NewConfig() newSaramaClient = mockNewSaramaClient ms, err := createTopicsScraper(context.Background(), Config{}, sc, zap.NewNop()) - assert.Nil(t, err) + assert.NoError(t, err) assert.NotNil(t, ms) } -func TestTopicScraper_createScraperHandlesError(t *testing.T) { +func TestTopicScraper_startScraperHandlesError(t *testing.T) { newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) { return nil, fmt.Errorf("no scraper here") } sc := sarama.NewConfig() ms, err := createTopicsScraper(context.Background(), Config{}, sc, zap.NewNop()) - assert.NotNil(t, err) - assert.Nil(t, ms) + assert.NotNil(t, ms) + assert.Nil(t, err) + err = ms.Start(context.Background(), nil) + assert.Error(t, err) +} + +func TestTopicScraper_startScraperCreatesClient(t *testing.T) { + newSaramaClient = mockNewSaramaClient + sc := sarama.NewConfig() + ms, err := createTopicsScraper(context.Background(), Config{}, sc, zap.NewNop()) + assert.NotNil(t, ms) + assert.NoError(t, err) + err = ms.Start(context.Background(), nil) + assert.NoError(t, err) } func TestTopicScraper_createScraperHandles_invalid_topicMatch(t *testing.T) { @@ -86,7 +98,7 @@ func TestTopicScraper_createScraperHandles_invalid_topicMatch(t *testing.T) { ms, err := createTopicsScraper(context.Background(), Config{ TopicMatch: "[", }, sc, zap.NewNop()) - assert.NotNil(t, err) + assert.Error(t, err) assert.Nil(t, ms) } @@ -134,7 +146,7 @@ func TestTopicScraper_scrape_handlesTopicError(t *testing.T) { topicFilter: match, } _, err := scraper.scrape(context.Background()) - assert.NotNil(t, err) + assert.Error(t, err) } func TestTopicScraper_scrape_handlesPartitionError(t *testing.T) { @@ -148,7 +160,7 @@ func TestTopicScraper_scrape_handlesPartitionError(t *testing.T) { topicFilter: match, } _, err := scraper.scrape(context.Background()) - assert.NotNil(t, err) + assert.Error(t, err) } func TestTopicScraper_scrape_handlesPartialScrapeErrors(t *testing.T) { @@ -165,5 +177,5 @@ func TestTopicScraper_scrape_handlesPartialScrapeErrors(t *testing.T) { topicFilter: match, } _, err := scraper.scrape(context.Background()) - assert.NotNil(t, err) + assert.Error(t, err) }