Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move sarama initialization to scraper start method #3104

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions receiver/kafkametricsreceiver/broker_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/simple"
"go.opentelemetry.io/collector/receiver/scraperhelper"
Expand All @@ -29,15 +30,25 @@ import (
)

type brokerScraper struct {
client sarama.Client
logger *zap.Logger
config Config
client sarama.Client
logger *zap.Logger
config Config
saramaConfig *sarama.Config
}

func (s *brokerScraper) Name() string {
return brokersScraperName
}

func (s *brokerScraper) start(context.Context, component.Host) error {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
if err != nil {
return fmt.Errorf("failed to create client while starting brokers scraper:%w", err)
dshomoye marked this conversation as resolved.
Show resolved Hide resolved
}
s.client = client
return nil
}

func (s *brokerScraper) shutdown(context.Context) error {
if !s.client.Closed() {
return s.client.Close()
Expand All @@ -59,19 +70,16 @@ func (s *brokerScraper) scrape(context.Context) (pdata.ResourceMetricsSlice, err
}

func createBrokerScraper(_ context.Context, config Config, saramaConfig *sarama.Config, logger *zap.Logger) (scraperhelper.ResourceMetricsScraper, error) {
client, err := newSaramaClient(config.Brokers, saramaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create sarama client: %w", err)
}
s := brokerScraper{
client: client,
logger: logger,
config: config,
logger: logger,
config: config,
saramaConfig: saramaConfig,
}
ms := scraperhelper.NewResourceMetricsScraper(
s.Name(),
s.scrape,
scraperhelper.WithShutdown(s.shutdown),
scraperhelper.WithStart(s.start),
)
return ms, nil
}
16 changes: 14 additions & 2 deletions receiver/kafkametricsreceiver/broker_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,26 @@ func TestBrokerScraper_createBrokerScraper(t *testing.T) {
assert.NotNil(t, ms)
}

func TestBrokerScraper_createBrokerScraper_handles_client_error(t *testing.T) {
func TestBrokerScraperStart(t *testing.T) {
newSaramaClient = mockNewSaramaClient
sc := sarama.NewConfig()
ms, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NotNil(t, ms)
assert.Nil(t, err)
err = ms.Start(context.Background(), nil)
assert.Nil(t, err)
}

func TestBrokerScraper_startBrokerScraper_handles_client_error(t *testing.T) {
newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
return nil, fmt.Errorf("new client failed")
}
sc := sarama.NewConfig()
ms, err := createBrokerScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NotNil(t, ms)
assert.Nil(t, err)
err = ms.Start(context.Background(), nil)
assert.NotNil(t, err)
assert.Nil(t, ms)
}

func TestBrokerScraper_scrape(t *testing.T) {
Expand Down
31 changes: 21 additions & 10 deletions receiver/kafkametricsreceiver/consumer_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/simple"
"go.opentelemetry.io/collector/receiver/scrapererror"
Expand All @@ -36,13 +37,31 @@ type consumerScraper struct {
groupFilter *regexp.Regexp
topicFilter *regexp.Regexp
clusterAdmin sarama.ClusterAdmin
saramaConfig *sarama.Config
config Config
}

func (s *consumerScraper) Name() string {
return consumersScraperName
}

func (s *consumerScraper) start(context.Context, component.Host) error {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
if err != nil {
return fmt.Errorf("failed to create client while starting consumer scraper: %w", err)
}
clusterAdmin, err := newClusterAdmin(s.config.Brokers, s.saramaConfig)
if err != nil {
if client != nil {
_ = client.Close()
}
return fmt.Errorf("failed to create cluster admin while starting consumer scraper: %w", err)
dshomoye marked this conversation as resolved.
Show resolved Hide resolved
}
s.client = client
s.clusterAdmin = clusterAdmin
return nil
}

func (s *consumerScraper) shutdown(_ context.Context) error {
if !s.client.Closed() {
return s.client.Close()
Expand Down Expand Up @@ -163,25 +182,17 @@ func createConsumerScraper(_ context.Context, config Config, saramaConfig *saram
if err != nil {
return nil, fmt.Errorf("failed to compile topic filter: %w", err)
}
client, err := newSaramaClient(config.Brokers, saramaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create sarama client: %w", err)
}
clusterAdmin, err := newClusterAdmin(config.Brokers, saramaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create sarama cluster admin: %w", err)
}
s := consumerScraper{
client: client,
logger: logger,
groupFilter: groupFilter,
topicFilter: topicFilter,
clusterAdmin: clusterAdmin,
config: config,
saramaConfig: saramaConfig,
}
return scraperhelper.NewResourceMetricsScraper(
s.Name(),
s.scrape,
scraperhelper.WithShutdown(s.shutdown),
scraperhelper.WithStart(s.start),
), nil
}
30 changes: 25 additions & 5 deletions receiver/kafkametricsreceiver/consumer_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,45 @@ func TestConsumerScraper_createConsumerScraper(t *testing.T) {
assert.NotNil(t, ms)
}

func TestConsumerScraper_createScraper_handles_client_error(t *testing.T) {
func TestConsumerScraper_startScraper_handles_client_error(t *testing.T) {
newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
return nil, fmt.Errorf("new client failed")
}
sc := sarama.NewConfig()
ms, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NotNil(t, ms)
assert.Nil(t, err)
err = ms.Start(context.Background(), nil)
assert.NotNil(t, err)
assert.Nil(t, ms)
}

func TestConsumerScraper_createScraper_handles_clusterAdmin_error(t *testing.T) {
newSaramaClient = mockNewSaramaClient
func TestConsumerScraper_startScraper_handles_clusterAdmin_error(t *testing.T) {
newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
client := newMockClient()
client.Mock.
On("Close").Return(nil)
return client, nil
}
newClusterAdmin = func(addrs []string, conf *sarama.Config) (sarama.ClusterAdmin, error) {
return nil, fmt.Errorf("new cluster admin failed")
}
sc := sarama.NewConfig()
ms, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.Nil(t, err)
assert.NotNil(t, ms)
err = ms.Start(context.Background(), nil)
assert.NotNil(t, err)
assert.Nil(t, ms)
}

func TestConsumerScraperStart(t *testing.T) {
newSaramaClient = mockNewSaramaClient
newClusterAdmin = mockNewClusterAdmin
sc := sarama.NewConfig()
ms, err := createConsumerScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.Nil(t, err)
assert.NotNil(t, ms)
err = ms.Start(context.Background(), nil)
assert.Nil(t, err)
}

func TestConsumerScraper_createScraper_handles_invalid_topic_match(t *testing.T) {
Expand Down
18 changes: 7 additions & 11 deletions receiver/kafkametricsreceiver/kafkametrics_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func TestIntegrationSingleNode(t *testing.T) {
f := NewFactory()
cfg := f.CreateDefaultConfig().(*Config)
cfg.Scrapers = []string{
"consumers",
"brokers",
"consumers",
"topics",
}
cfg.Brokers = []string{kafkaAddress}
Expand All @@ -67,16 +67,12 @@ func TestIntegrationSingleNode(t *testing.T) {

var receiver component.MetricsReceiver
var err error
t.Logf("waiting to connect to kafka...")
require.Eventuallyf(t,
func() bool {
receiver, err = f.CreateMetricsReceiver(context.Background(), params, cfg, consumer)
return err == nil
}, 30*time.Second, 5*time.Second,
fmt.Sprintf("failed to create metrics receiver. %v", err),
)
t.Logf("connected to kafka")
require.NoError(t, receiver.Start(context.Background(), &testHost{t: t}))
receiver, err = f.CreateMetricsReceiver(context.Background(), params, cfg, consumer)
require.NoError(t, err, "failed to create receiver")
require.Eventuallyf(t, func() bool {
err = receiver.Start(context.Background(), &testHost{t: t})
return err == nil
}, 30*time.Second, 5*time.Second, fmt.Sprintf("failed to start metrics receiver. %v", err))
t.Logf("waiting for metrics...")
require.Eventuallyf(t,
func() bool {
Expand Down
4 changes: 3 additions & 1 deletion receiver/kafkametricsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ const (
consumersScraperName = "consumers"
)

type createKafkaScraper = func(context.Context, Config, *sarama.Config, *zap.Logger) (scraperhelper.ResourceMetricsScraper, error)
dshomoye marked this conversation as resolved.
Show resolved Hide resolved

var (
allScrapers = map[string]func(context.Context, Config, *sarama.Config, *zap.Logger) (scraperhelper.ResourceMetricsScraper, error){
allScrapers = map[string]createKafkaScraper{
brokersScraperName: createBrokerScraper,
topicsScraperName: createTopicsScraper,
consumersScraperName: createConsumerScraper,
Expand Down
16 changes: 11 additions & 5 deletions receiver/kafkametricsreceiver/topic_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/Shopify/sarama"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/simple"
"go.opentelemetry.io/collector/receiver/scrapererror"
Expand All @@ -42,6 +43,15 @@ func (s *topicScraper) Name() string {
return topicsScraperName
}

func (s *topicScraper) start(context.Context, component.Host) error {
client, err := newSaramaClient(s.config.Brokers, s.saramaConfig)
if err != nil {
return fmt.Errorf("failed to create client while starting topics scraper:%w", err)
dshomoye marked this conversation as resolved.
Show resolved Hide resolved
}
s.client = client
return nil
}

func (s *topicScraper) shutdown(context.Context) error {
if !s.client.Closed() {
return s.client.Close()
Expand Down Expand Up @@ -118,12 +128,7 @@ func createTopicsScraper(_ context.Context, config Config, saramaConfig *sarama.
if err != nil {
return nil, fmt.Errorf("failed to compile topic filter: %w", err)
}
client, err := newSaramaClient(config.Brokers, saramaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create sarama client: %w", err)
}
s := topicScraper{
client: client,
logger: logger,
topicFilter: topicFilter,
saramaConfig: saramaConfig,
Expand All @@ -133,5 +138,6 @@ func createTopicsScraper(_ context.Context, config Config, saramaConfig *sarama.
s.Name(),
s.scrape,
scraperhelper.WithShutdown(s.shutdown),
scraperhelper.WithStart(s.start),
), nil
}
16 changes: 14 additions & 2 deletions receiver/kafkametricsreceiver/topic_scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,26 @@ func TestTopicScraper_createsScraper(t *testing.T) {
assert.NotNil(t, ms)
}

func TestTopicScraper_createScraperHandlesError(t *testing.T) {
func TestTopicScraper_startScraperHandlesError(t *testing.T) {
newSaramaClient = func(addrs []string, conf *sarama.Config) (sarama.Client, error) {
return nil, fmt.Errorf("no scraper here")
}
sc := sarama.NewConfig()
ms, err := createTopicsScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NotNil(t, ms)
assert.Nil(t, err)
err = ms.Start(context.Background(), nil)
assert.NotNil(t, err)
assert.Nil(t, ms)
}

func TestTopicScraper_startScraperCreatesClient(t *testing.T) {
newSaramaClient = mockNewSaramaClient
sc := sarama.NewConfig()
ms, err := createTopicsScraper(context.Background(), Config{}, sc, zap.NewNop())
assert.NotNil(t, ms)
assert.Nil(t, err)
dshomoye marked this conversation as resolved.
Show resolved Hide resolved
err = ms.Start(context.Background(), nil)
assert.Nil(t, err)
}

func TestTopicScraper_createScraperHandles_invalid_topicMatch(t *testing.T) {
Expand Down