Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafkametricsreceiver initial structure #2550

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions receiver/kafkametricsreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
60 changes: 60 additions & 0 deletions receiver/kafkametricsreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Kafka Metrics Receiver

Kafka metrics receiver collects kafka metrics (brokers, topics, partitions, consumer groups) from kafka server,
converting into otlp.

## Getting Started

Required settings (no defaults):

- `protocol_version`: Kafka protocol version
- `scrapers`: any combination of the following scrapers can be enabled.
- `topics`
- `consumers`
- `brokers`

Optional Settings (with defaults):

- `brokers` (default = localhost:9092): the list of brokers to read from.
- `topic_match` (default = *): regex pattern of topics to filter for metrics collection.
- `group_match` (default = *): regex pattern of consumer groups to filter on for metrics.
- `client_id` (default = otel-metrics-receiver): consumer client id
- `collection_interval` (default = 1m): frequency of metric collection/scraping.
- `auth` (default none)
- `plain_text`
- `username`: The username to use.
- `password`: The password to use
- `tls`
- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should only be used
if `insecure` is set to true.
- `cert_file`: path to the TLS cert to use for TLS required connections. Should only be used if `insecure` is
set to true.
- `key_file`: path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set
to true.
- `insecure` (default = false): Disable verifying the server's certificate chain and host
name (`InsecureSkipVerify` in the tls config)
- `server_name_override`: ServerName indicates the name of the server requested by the client in order to
support virtual hosting.
- `kerberos`
- `service_name`: Kerberos service name
- `realm`: Kerberos realm
- `use_keytab`: Use of keytab instead of password, if this is true, keytab file will be used instead of
password
- `username`: The Kerberos username used for authenticate with KDC
- `password`: The Kerberos password used for authenticate with KDC
- `config_file`: Path to Kerberos configuration. i.e /etc/krb5.conf
- `keytab_file`: Path to keytab file. i.e /etc/security/kafka.keytab

## Examples:

Basic configuration with all scrapers:

```yaml
receivers:
kafkametrics:
protocol_version: 2.0.0
scrapers:
- brokers
- topics
- consumers
```
46 changes: 46 additions & 0 deletions receiver/kafkametricsreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkametricsreceiver

import (
"go.opentelemetry.io/collector/exporter/kafkaexporter"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)

// Config represents user settings for kafkametrics receiver
type Config struct {
scraperhelper.ScraperControllerSettings `mapstructure:",squash"`

// The list of kafka brokers (default localhost:9092)
Brokers []string `mapstructure:"brokers"`

// ProtocolVersion Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`

// TopicMatch topics to collect metrics on
TopicMatch string `mapstructure:"topic_match"`

// GroupMatch consumer groups to collect on
GroupMatch string `mapstructure:"group_match"`

// Authentication data
Authentication kafkaexporter.Authentication `mapstructure:"auth"`

// Scrapers defines which metric data points to be captured from kafka
Scrapers []string `mapstructure:"scrapers"`

// ClientID is the id associated with the consumer that reads from topics in kafka.
ClientID string `mapstructure:"client_id"`
}
59 changes: 59 additions & 0 deletions receiver/kafkametricsreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkametricsreceiver

import (
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/kafkaexporter"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := NewFactory()
factories.Receivers[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
require.Equal(t, 1, len(cfg.Receivers))

r := cfg.Receivers[typeStr].(*Config)
assert.Equal(t, &Config{
ScraperControllerSettings: scraperhelper.DefaultScraperControllerSettings(typeStr),
Brokers: []string{"10.10.10.10:9092"},
ProtocolVersion: "2.0.0",
TopicMatch: "test_*",
GroupMatch: "test_*",
Authentication: kafkaexporter.Authentication{
TLS: &configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
CAFile: "ca.pem",
CertFile: "cert.pem",
KeyFile: "key.pem",
},
},
},
ClientID: defaultClientID,
Scrapers: []string{"brokers", "topics", "consumers"},
}, r)
}
64 changes: 64 additions & 0 deletions receiver/kafkametricsreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkametricsreceiver

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/collector/receiver/scraperhelper"
)

const (
typeStr = "kafkametrics"
defaultBroker = "localhost:9092"
defaultGroupMatch = ".*"
defaultTopicMatch = ".*"
defaultClientID = "otel-metrics-receiver"
)

// NewFactory creates kafkametrics receiver factory.
func NewFactory() component.ReceiverFactory {
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithMetrics(createMetricsReceiver))
}

func createDefaultConfig() configmodels.Receiver {
return &Config{
ScraperControllerSettings: scraperhelper.DefaultScraperControllerSettings(typeStr),
Brokers: []string{defaultBroker},
GroupMatch: defaultGroupMatch,
TopicMatch: defaultTopicMatch,
ClientID: defaultClientID,
}
}

func createMetricsReceiver(
ctx context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.MetricsConsumer) (component.MetricsReceiver, error) {
c := cfg.(*Config)
r, err := newMetricsReceiver(ctx, *c, params, nextConsumer)
if err != nil {
return nil, err
}
return r, nil
}
58 changes: 58 additions & 0 deletions receiver/kafkametricsreceiver/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkametricsreceiver

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/consumer"
)

func TestCreateDefaultConfig(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "default config not created")
assert.NoError(t, configcheck.ValidateConfig(cfg))
}

func TestCreateMetricsReceiver_errors(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
cfg.Scrapers = []string{"topics"}
r, err := createMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
assert.Error(t, err)
assert.Nil(t, r)
}

func TestCreateMetricsReceiver(t *testing.T) {
prev := newMetricsReceiver
newMetricsReceiver = func(ctx context.Context, config Config, params component.ReceiverCreateParams, consumer consumer.MetricsConsumer) (component.MetricsReceiver, error) {
return nil, nil
}
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
cfg.Scrapers = []string{"topics"}
_, err := createMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
newMetricsReceiver = prev
assert.Nil(t, err)
}
8 changes: 8 additions & 0 deletions receiver/kafkametricsreceiver/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver

go 1.14

require (
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/collector v0.21.0
)
Loading