forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add kafkametricsreceiver initial structure (#2550)
Adds the factory, config and receiver for new (kafkametricsreceiver) component. The new kafkametricsreceiver will primarily report metrics on consumer_group lag from kafka (using the internal __consumer_offsets topic). It will also include metrics on number of brokers, topics, and offsets of topics. Testing: The current code is a scaffolding that cannot be e2e tested, appropriate unit tests were added. Documentation: README explaining usage of new receiver component is included.
- Loading branch information
1 parent
675cd8a
commit ee24ad3
Showing
11 changed files
with
1,897 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include ../../Makefile.Common |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
# Kafka Metrics Receiver | ||
|
||
Kafka metrics receiver collects kafka metrics (brokers, topics, partitions, consumer groups) from kafka server, | ||
converting into otlp. | ||
|
||
## Getting Started | ||
|
||
Required settings (no defaults): | ||
|
||
- `protocol_version`: Kafka protocol version | ||
- `scrapers`: any combination of the following scrapers can be enabled. | ||
- `topics` | ||
- `consumers` | ||
- `brokers` | ||
|
||
Optional Settings (with defaults): | ||
|
||
- `brokers` (default = localhost:9092): the list of brokers to read from. | ||
- `topic_match` (default = *): regex pattern of topics to filter for metrics collection. | ||
- `group_match` (default = *): regex pattern of consumer groups to filter on for metrics. | ||
- `client_id` (default = otel-metrics-receiver): consumer client id | ||
- `collection_interval` (default = 1m): frequency of metric collection/scraping. | ||
- `auth` (default none) | ||
- `plain_text` | ||
- `username`: The username to use. | ||
- `password`: The password to use | ||
- `tls` | ||
- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should only be used | ||
if `insecure` is set to true. | ||
- `cert_file`: path to the TLS cert to use for TLS required connections. Should only be used if `insecure` is | ||
set to true. | ||
- `key_file`: path to the TLS key to use for TLS required connections. Should only be used if `insecure` is set | ||
to true. | ||
- `insecure` (default = false): Disable verifying the server's certificate chain and host | ||
name (`InsecureSkipVerify` in the tls config) | ||
- `server_name_override`: ServerName indicates the name of the server requested by the client in order to | ||
support virtual hosting. | ||
- `kerberos` | ||
- `service_name`: Kerberos service name | ||
- `realm`: Kerberos realm | ||
- `use_keytab`: Use of keytab instead of password, if this is true, keytab file will be used instead of | ||
password | ||
- `username`: The Kerberos username used for authenticate with KDC | ||
- `password`: The Kerberos password used for authenticate with KDC | ||
- `config_file`: Path to Kerberos configuration. i.e /etc/krb5.conf | ||
- `keytab_file`: Path to keytab file. i.e /etc/security/kafka.keytab | ||
|
||
## Examples: | ||
|
||
Basic configuration with all scrapers: | ||
|
||
```yaml | ||
receivers: | ||
kafkametrics: | ||
protocol_version: 2.0.0 | ||
scrapers: | ||
- brokers | ||
- topics | ||
- consumers | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package kafkametricsreceiver | ||
|
||
import ( | ||
"go.opentelemetry.io/collector/exporter/kafkaexporter" | ||
"go.opentelemetry.io/collector/receiver/scraperhelper" | ||
) | ||
|
||
// Config represents user settings for kafkametrics receiver | ||
type Config struct { | ||
scraperhelper.ScraperControllerSettings `mapstructure:",squash"` | ||
|
||
// The list of kafka brokers (default localhost:9092) | ||
Brokers []string `mapstructure:"brokers"` | ||
|
||
// ProtocolVersion Kafka protocol version | ||
ProtocolVersion string `mapstructure:"protocol_version"` | ||
|
||
// TopicMatch topics to collect metrics on | ||
TopicMatch string `mapstructure:"topic_match"` | ||
|
||
// GroupMatch consumer groups to collect on | ||
GroupMatch string `mapstructure:"group_match"` | ||
|
||
// Authentication data | ||
Authentication kafkaexporter.Authentication `mapstructure:"auth"` | ||
|
||
// Scrapers defines which metric data points to be captured from kafka | ||
Scrapers []string `mapstructure:"scrapers"` | ||
|
||
// ClientID is the id associated with the consumer that reads from topics in kafka. | ||
ClientID string `mapstructure:"client_id"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package kafkametricsreceiver | ||
|
||
import ( | ||
"path" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/config/configtest" | ||
"go.opentelemetry.io/collector/config/configtls" | ||
"go.opentelemetry.io/collector/exporter/kafkaexporter" | ||
"go.opentelemetry.io/collector/receiver/scraperhelper" | ||
) | ||
|
||
func TestLoadConfig(t *testing.T) { | ||
factories, err := componenttest.ExampleComponents() | ||
assert.NoError(t, err) | ||
|
||
factory := NewFactory() | ||
factories.Receivers[typeStr] = factory | ||
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) | ||
require.NoError(t, err) | ||
require.Equal(t, 1, len(cfg.Receivers)) | ||
|
||
r := cfg.Receivers[typeStr].(*Config) | ||
assert.Equal(t, &Config{ | ||
ScraperControllerSettings: scraperhelper.DefaultScraperControllerSettings(typeStr), | ||
Brokers: []string{"10.10.10.10:9092"}, | ||
ProtocolVersion: "2.0.0", | ||
TopicMatch: "test_*", | ||
GroupMatch: "test_*", | ||
Authentication: kafkaexporter.Authentication{ | ||
TLS: &configtls.TLSClientSetting{ | ||
TLSSetting: configtls.TLSSetting{ | ||
CAFile: "ca.pem", | ||
CertFile: "cert.pem", | ||
KeyFile: "key.pem", | ||
}, | ||
}, | ||
}, | ||
ClientID: defaultClientID, | ||
Scrapers: []string{"brokers", "topics", "consumers"}, | ||
}, r) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package kafkametricsreceiver | ||
|
||
import ( | ||
"context" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config/configmodels" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/receiver/receiverhelper" | ||
"go.opentelemetry.io/collector/receiver/scraperhelper" | ||
) | ||
|
||
const ( | ||
typeStr = "kafkametrics" | ||
defaultBroker = "localhost:9092" | ||
defaultGroupMatch = ".*" | ||
defaultTopicMatch = ".*" | ||
defaultClientID = "otel-metrics-receiver" | ||
) | ||
|
||
// NewFactory creates kafkametrics receiver factory. | ||
func NewFactory() component.ReceiverFactory { | ||
return receiverhelper.NewFactory( | ||
typeStr, | ||
createDefaultConfig, | ||
receiverhelper.WithMetrics(createMetricsReceiver)) | ||
} | ||
|
||
func createDefaultConfig() configmodels.Receiver { | ||
return &Config{ | ||
ScraperControllerSettings: scraperhelper.DefaultScraperControllerSettings(typeStr), | ||
Brokers: []string{defaultBroker}, | ||
GroupMatch: defaultGroupMatch, | ||
TopicMatch: defaultTopicMatch, | ||
ClientID: defaultClientID, | ||
} | ||
} | ||
|
||
func createMetricsReceiver( | ||
ctx context.Context, | ||
params component.ReceiverCreateParams, | ||
cfg configmodels.Receiver, | ||
nextConsumer consumer.MetricsConsumer) (component.MetricsReceiver, error) { | ||
c := cfg.(*Config) | ||
r, err := newMetricsReceiver(ctx, *c, params, nextConsumer) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return r, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package kafkametricsreceiver | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config/configcheck" | ||
"go.opentelemetry.io/collector/consumer" | ||
) | ||
|
||
func TestCreateDefaultConfig(t *testing.T) { | ||
factory := NewFactory() | ||
cfg := factory.CreateDefaultConfig() | ||
assert.NotNil(t, cfg, "default config not created") | ||
assert.NoError(t, configcheck.ValidateConfig(cfg)) | ||
} | ||
|
||
func TestCreateMetricsReceiver_errors(t *testing.T) { | ||
factory := NewFactory() | ||
cfg := factory.CreateDefaultConfig().(*Config) | ||
cfg.Brokers = []string{"invalid:9092"} | ||
cfg.ProtocolVersion = "2.0.0" | ||
cfg.Scrapers = []string{"topics"} | ||
r, err := createMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) | ||
assert.Error(t, err) | ||
assert.Nil(t, r) | ||
} | ||
|
||
func TestCreateMetricsReceiver(t *testing.T) { | ||
prev := newMetricsReceiver | ||
newMetricsReceiver = func(ctx context.Context, config Config, params component.ReceiverCreateParams, consumer consumer.MetricsConsumer) (component.MetricsReceiver, error) { | ||
return nil, nil | ||
} | ||
factory := NewFactory() | ||
cfg := factory.CreateDefaultConfig().(*Config) | ||
cfg.Brokers = []string{"invalid:9092"} | ||
cfg.ProtocolVersion = "2.0.0" | ||
cfg.Scrapers = []string{"topics"} | ||
_, err := createMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil) | ||
newMetricsReceiver = prev | ||
assert.Nil(t, err) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver | ||
|
||
go 1.14 | ||
|
||
require ( | ||
github.com/stretchr/testify v1.7.0 | ||
go.opentelemetry.io/collector v0.21.0 | ||
) |
Oops, something went wrong.