From 2fda038b4743389c48d99f1d584216aa3bc306d3 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Tue, 5 May 2020 18:01:49 +0200 Subject: [PATCH 1/3] Add Kafka OTEL receiver/ingester Signed-off-by: Pavol Loffay --- cmd/ingester/app/flags.go | 8 +- .../app/defaults/defaults.go | 14 ++- .../app/defaults/defaults_test.go | 16 ++-- .../app/receiver/kafka/config.go | 27 ++++++ .../app/receiver/kafka/config_test.go | 90 ++++++++++++++++++ .../app/receiver/kafka/factory.go | 91 ++++++++++++++++++ .../app/receiver/kafka/factory_test.go | 70 ++++++++++++++ .../app/receiver/kafka/kafka_receiver.go | 93 +++++++++++++++++++ .../app/receiver/kafka/testdata/config.yaml | 33 +++++++ .../kafka/testdata/jaeger-config.yaml | 3 + cmd/opentelemetry-collector/cmd/agent/main.go | 4 +- cmd/opentelemetry-collector/go.sum | 1 + pkg/kafka/consumer/config.go | 12 +-- 13 files changed, 438 insertions(+), 24 deletions(-) create mode 100644 cmd/opentelemetry-collector/app/receiver/kafka/config.go create mode 100644 cmd/opentelemetry-collector/app/receiver/kafka/config_test.go create mode 100644 cmd/opentelemetry-collector/app/receiver/kafka/factory.go create mode 100644 cmd/opentelemetry-collector/app/receiver/kafka/factory_test.go create mode 100644 cmd/opentelemetry-collector/app/receiver/kafka/kafka_receiver.go create mode 100644 cmd/opentelemetry-collector/app/receiver/kafka/testdata/config.yaml create mode 100644 cmd/opentelemetry-collector/app/receiver/kafka/testdata/jaeger-config.yaml diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 5aca213bf2a..e4b5f523026 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -69,10 +69,10 @@ const ( // Options stores the configuration options for the Ingester type Options struct { - kafkaConsumer.Configuration - Parallelism int - Encoding string - DeadlockInterval time.Duration + kafkaConsumer.Configuration `mapstructure:",squash"` + Parallelism int `mapstructure:"parallelism"` + Encoding string `mapstructure:"encoding"` + DeadlockInterval time.Duration `mapstructure:"deadlock_interval"` } // AddFlags adds flags for Builder diff --git a/cmd/opentelemetry-collector/app/defaults/defaults.go b/cmd/opentelemetry-collector/app/defaults/defaults.go index 15041c57259..8cb909c66d6 100644 --- a/cmd/opentelemetry-collector/app/defaults/defaults.go +++ b/cmd/opentelemetry-collector/app/defaults/defaults.go @@ -25,12 +25,14 @@ import ( "github.com/spf13/pflag" "github.com/spf13/viper" + "github.com/jaegertracing/jaeger/cmd/ingester/app" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver" + kafkaRec "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/kafka" storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra" storageEs "github.com/jaegertracing/jaeger/plugin/storage/es" storageKafka "github.com/jaegertracing/jaeger/plugin/storage/kafka" @@ -42,26 +44,32 @@ func Components(v *viper.Viper) config.Factories { // We have to add all storage flags to viper because any exporter can be specified in the OTEL config file. // OTEL collector creates default configurations for all factories to verify they can be created. addDefaultValuesToViper(v) - kafkaExp := kafka.Factory{OptionsFactory: func() *storageKafka.Options { + kafkaExp := &kafka.Factory{OptionsFactory: func() *storageKafka.Options { opts := kafka.DefaultOptions() opts.InitFromViper(v) return opts }} - cassandraExp := cassandra.Factory{OptionsFactory: func() *storageCassandra.Options { + cassandraExp := &cassandra.Factory{OptionsFactory: func() *storageCassandra.Options { opts := cassandra.DefaultOptions() opts.InitFromViper(v) return opts }} - esExp := elasticsearch.Factory{OptionsFactory: func() *storageEs.Options { + esExp := &elasticsearch.Factory{OptionsFactory: func() *storageEs.Options { opts := elasticsearch.DefaultOptions() opts.InitFromViper(v) return opts }} + kafkaRec := &kafkaRec.Factory{OptionsFactory: func() *app.Options { + opts := kafkaRec.DefaultOptions() + opts.InitFromViper(v) + return opts + }} factories, _ := defaultcomponents.Components() factories.Exporters[kafkaExp.Type()] = kafkaExp factories.Exporters[cassandraExp.Type()] = cassandraExp factories.Exporters[esExp.Type()] = esExp + factories.Receivers[kafkaRec.Type()] = kafkaRec jaegerRec := factories.Receivers["jaeger"].(*otelJaegerReceiver.Factory) factories.Receivers["jaeger"] = &jaegerreceiver.Factory{ diff --git a/cmd/opentelemetry-collector/app/defaults/defaults_test.go b/cmd/opentelemetry-collector/app/defaults/defaults_test.go index 30bcef41294..eb5f4a835d2 100644 --- a/cmd/opentelemetry-collector/app/defaults/defaults_test.go +++ b/cmd/opentelemetry-collector/app/defaults/defaults_test.go @@ -17,15 +17,14 @@ package defaults import ( "testing" - "github.com/open-telemetry/opentelemetry-collector/config/configmodels" "github.com/stretchr/testify/assert" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka" - "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver" + kafkaRec "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/kafka" jConfig "github.com/jaegertracing/jaeger/pkg/config" ) @@ -36,21 +35,20 @@ func TestComponents(t *testing.T) { elasticsearch.DefaultOptions().AddFlags, ) factories := Components(v) - assert.Equal(t, configmodels.Type("jaeger_kafka"), factories.Exporters[kafka.TypeStr].Type()) - assert.Equal(t, configmodels.Type("jaeger_cassandra"), factories.Exporters[cassandra.TypeStr].Type()) - assert.Equal(t, configmodels.Type("jaeger_elasticsearch"), factories.Exporters[elasticsearch.TypeStr].Type()) + assert.IsType(t, &kafka.Factory{}, factories.Exporters[kafka.TypeStr]) + assert.IsType(t, &cassandra.Factory{}, factories.Exporters[cassandra.TypeStr]) + assert.IsType(t, &elasticsearch.Factory{}, factories.Exporters[elasticsearch.TypeStr]) + assert.IsType(t, &jaegerreceiver.Factory{}, factories.Receivers["jaeger"]) + assert.IsType(t, &jaegerexporter.Factory{}, factories.Exporters["jaeger"]) + assert.IsType(t, &kafkaRec.Factory{}, factories.Receivers[kafkaRec.TypeStr]) kafkaFactory := factories.Exporters[kafka.TypeStr] kc := kafkaFactory.CreateDefaultConfig().(*kafka.Config) assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers) - cassandraFactory := factories.Exporters[cassandra.TypeStr] cc := cassandraFactory.CreateDefaultConfig().(*cassandra.Config) assert.Equal(t, []string{"127.0.0.1"}, cc.Options.GetPrimary().Servers) esFactory := factories.Exporters[elasticsearch.TypeStr] ec := esFactory.CreateDefaultConfig().(*elasticsearch.Config) assert.Equal(t, []string{"http://127.0.0.1:9200"}, ec.GetPrimary().Servers) - assert.IsType(t, &jaegerreceiver.Factory{}, factories.Receivers["jaeger"]) - assert.IsType(t, &jaegerexporter.Factory{}, factories.Exporters["jaeger"]) - assert.IsType(t, &resourceprocessor.Factory{}, factories.Processors["resource"]) } diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/config.go b/cmd/opentelemetry-collector/app/receiver/kafka/config.go new file mode 100644 index 00000000000..5b7b6f8d19d --- /dev/null +++ b/cmd/opentelemetry-collector/app/receiver/kafka/config.go @@ -0,0 +1,27 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + + "github.com/jaegertracing/jaeger/cmd/ingester/app" +) + +// Config hold configuration for Jaeger kafka receiver/ingester. +type Config struct { + configmodels.ReceiverSettings `mapstructure:",squash"` + app.Options `mapstructure:",squash"` +} diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/config_test.go b/cmd/opentelemetry-collector/app/receiver/kafka/config_test.go new file mode 100644 index 00000000000..b477f5a659b --- /dev/null +++ b/cmd/opentelemetry-collector/app/receiver/kafka/config_test.go @@ -0,0 +1,90 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "path" + "testing" + + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/open-telemetry/opentelemetry-collector/config/configcheck" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/cmd/flags" + "github.com/jaegertracing/jaeger/cmd/ingester/app" + jConfig "github.com/jaegertracing/jaeger/pkg/config" +) + +func TestDefaultConfig(t *testing.T) { + v, c := jConfig.Viperize(app.AddFlags) + err := c.ParseFlags([]string{""}) + require.NoError(t, err) + factory := &Factory{OptionsFactory: func() *app.Options { + opts := DefaultOptions() + opts.InitFromViper(v) + return opts + }} + defaultCfg := factory.CreateDefaultConfig().(*Config) + assert.NoError(t, configcheck.ValidateConfig(defaultCfg)) + assert.Equal(t, "jaeger-spans", defaultCfg.Topic) + assert.Equal(t, "protobuf", defaultCfg.Encoding) + assert.Equal(t, []string{"127.0.0.1:9092"}, defaultCfg.Brokers) + assert.Equal(t, "none", defaultCfg.Authentication) + assert.Equal(t, "/etc/krb5.conf", defaultCfg.Kerberos.ConfigPath) + assert.Equal(t, "kafka", defaultCfg.Kerberos.ServiceName) + assert.Equal(t, false, defaultCfg.TLS.Enabled) +} + +func TestLoadConfigAndFlags(t *testing.T) { + factories, err := config.ExampleComponents() + require.NoError(t, err) + + v, c := jConfig.Viperize(app.AddFlags, flags.AddConfigFileFlag) + err = c.ParseFlags([]string{"--config-file=./testdata/jaeger-config.yaml", "--kafka.consumer.topic=jaeger-test", "--kafka.consumer.brokers=host1,host2"}) + require.NoError(t, err) + + err = flags.TryLoadConfigFile(v) + require.NoError(t, err) + + factory := &Factory{OptionsFactory: func() *app.Options { + opts := DefaultOptions() + opts.InitFromViper(v) + assert.Equal(t, "jaeger-test", opts.Topic) + assert.Equal(t, []string{"host1", "host2"}, opts.Brokers) + return opts + }} + + factories.Receivers[TypeStr] = factory + cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) + require.NoError(t, err) + require.NotNil(t, cfg) + + kafkaCfg := cfg.Receivers[TypeStr].(*Config) + assert.Equal(t, TypeStr, kafkaCfg.Name()) + assert.Equal(t, "jaeger-prod", kafkaCfg.Topic) + assert.Equal(t, "emojis", kafkaCfg.Encoding) + assert.Equal(t, []string{"foo", "bar"}, kafkaCfg.Options.Brokers) + assert.Equal(t, "tls", kafkaCfg.Options.Authentication) + assert.Equal(t, "user", kafkaCfg.Options.PlainText.UserName) + assert.Equal(t, "123", kafkaCfg.Options.PlainText.Password) + assert.Equal(t, true, kafkaCfg.Options.TLS.Enabled) + assert.Equal(t, "key.crt", kafkaCfg.Options.TLS.KeyPath) + assert.Equal(t, "cert.crt", kafkaCfg.Options.TLS.CertPath) + assert.Equal(t, true, kafkaCfg.Options.TLS.SkipHostVerify) + assert.Equal(t, "jaeger", kafkaCfg.Options.Kerberos.Realm) + assert.Equal(t, "/etc/foo", kafkaCfg.Options.Kerberos.ConfigPath) + assert.Equal(t, "from-jaeger-config", kafkaCfg.Options.Kerberos.Username) +} diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/factory.go b/cmd/opentelemetry-collector/app/receiver/kafka/factory.go new file mode 100644 index 00000000000..ddf091e920e --- /dev/null +++ b/cmd/opentelemetry-collector/app/receiver/kafka/factory.go @@ -0,0 +1,91 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector/component" + "github.com/open-telemetry/opentelemetry-collector/config/configerror" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/consumer" + + "github.com/jaegertracing/jaeger/cmd/ingester/app" +) + +const ( + TypeStr = "jaeger_kafka" +) + +// OptionsFactory returns initialized ingester app.Options structure. +type OptionsFactory func() *app.Options + +// DefaultOptions creates Kafka options supported by this receiver. +func DefaultOptions() *app.Options { + return &app.Options{} +} + +type Factory struct { + OptionsFactory OptionsFactory +} + +var _ component.ReceiverFactory = (*Factory)(nil) + +// Type returns the receiver type. +func (f Factory) Type() configmodels.Type { + return TypeStr +} + +// CreateDefaultConfig creates default config. +// This function implements OTEL component.ReceiverFactoryBase interface. +func (f Factory) CreateDefaultConfig() configmodels.Receiver { + opts := f.OptionsFactory() + return &Config{ + Options: *opts, + } +} + +// CustomUnmarshaler returns custom marshaller. +// This function implements OTEL component.ReceiverFactoryBase interface. +func (f Factory) CustomUnmarshaler() component.CustomUnmarshaler { + return nil +} + +// CreateTraceReceiver returns Kafka receiver. +// This function implements OTEL component.ReceiverFactory. +func (f Factory) CreateTraceReceiver( + _ context.Context, + params component.ReceiverCreateParams, + cfg configmodels.Receiver, + nextConsumer consumer.TraceConsumer, +) (component.TraceReceiver, error) { + kafkaCfg, ok := cfg.(*Config) + if !ok { + return nil, fmt.Errorf("could not cast configuration to %s", TypeStr) + } + return new(kafkaCfg, nextConsumer, params) +} + +// CreateMetricsReceiver returns metrics receiver. +// This function implements OTEL component.ReceiverFactory. +func (f Factory) CreateMetricsReceiver( + _ context.Context, + _ component.ReceiverCreateParams, + _ configmodels.Receiver, + _ consumer.MetricsConsumer, +) (component.MetricsReceiver, error) { + return nil, configerror.ErrDataTypeIsNotSupported +} diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/factory_test.go b/cmd/opentelemetry-collector/app/receiver/kafka/factory_test.go new file mode 100644 index 00000000000..7110e3307bc --- /dev/null +++ b/cmd/opentelemetry-collector/app/receiver/kafka/factory_test.go @@ -0,0 +1,70 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + "testing" + + "github.com/open-telemetry/opentelemetry-collector/component" + "github.com/open-telemetry/opentelemetry-collector/config/configcheck" + "github.com/open-telemetry/opentelemetry-collector/config/configerror" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/ingester/app" + jConfig "github.com/jaegertracing/jaeger/pkg/config" +) + +// TODO failing +func TestCreateTraceReceiver(t *testing.T) { + v, _ := jConfig.Viperize(app.AddFlags) + opts := DefaultOptions() + opts.InitFromViper(v) + factory := &Factory{OptionsFactory: func() *app.Options { + return opts + }} + exporter, err := factory.CreateTraceReceiver(context.Background(), component.ReceiverCreateParams{Logger: zap.NewNop()}, factory.CreateDefaultConfig(), nil) + require.Nil(t, exporter) + assert.EqualError(t, err, "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)") +} + +func TestCreateTraceExporter_nilConfig(t *testing.T) { + factory := &Factory{} + exporter, err := factory.CreateTraceReceiver(context.Background(), component.ReceiverCreateParams{}, nil, nil) + require.Nil(t, exporter) + assert.EqualError(t, err, "could not cast configuration to jaeger_kafka") +} + +func TestCreateMetricsExporter(t *testing.T) { + f := Factory{OptionsFactory: DefaultOptions} + mReceiver, err := f.CreateMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, f.CreateDefaultConfig(), nil) + assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported) + assert.Nil(t, mReceiver) +} + +func TestCreateDefaultConfig(t *testing.T) { + factory := Factory{OptionsFactory: DefaultOptions} + cfg := factory.CreateDefaultConfig() + assert.NotNil(t, cfg, "failed to create default config") + assert.NoError(t, configcheck.ValidateConfig(cfg)) +} + +func TestType(t *testing.T) { + factory := Factory{OptionsFactory: DefaultOptions} + assert.Equal(t, configmodels.Type(TypeStr), factory.Type()) +} diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/kafka_receiver.go b/cmd/opentelemetry-collector/app/receiver/kafka/kafka_receiver.go new file mode 100644 index 00000000000..2594657c0e1 --- /dev/null +++ b/cmd/opentelemetry-collector/app/receiver/kafka/kafka_receiver.go @@ -0,0 +1,93 @@ +// Copyright (c) 2020 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-collector/component" + "github.com/open-telemetry/opentelemetry-collector/consumer" + "github.com/open-telemetry/opentelemetry-collector/obsreport" + jaegertranslator "github.com/open-telemetry/opentelemetry-collector/translator/trace/jaeger" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/ingester/app/builder" + ingester "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +func new( + config *Config, + nextConsumer consumer.TraceConsumer, + params component.ReceiverCreateParams, +) (component.TraceReceiver, error) { + ctx := obsreport.ReceiverContext( + context.Background(), config.Name(), "kafka", "kafka") + ctx = obsreport.StartTraceDataReceiveOp( + ctx, TypeStr, "kafka") + w := &writer{ + ctx: ctx, + nextConsumer: nextConsumer, + } + consumer, err := builder.CreateConsumer( + params.Logger, + metrics.NullFactory, + w, + config.Options) + if err != nil { + return nil, err + } + return &kafkaReceiver{ + consumer: consumer, + logger: params.Logger, + }, nil +} + +type kafkaReceiver struct { + logger *zap.Logger + consumer *ingester.Consumer +} + +var _ component.Receiver = (*kafkaReceiver)(nil) + +// Start starts the receiver. +func (r kafkaReceiver) Start(_ context.Context, _ component.Host) error { + r.consumer.Start() + return nil +} + +// Shutdown shutdowns the receiver. +func (r kafkaReceiver) Shutdown(_ context.Context) error { + return r.consumer.Close() +} + +type writer struct { + nextConsumer consumer.TraceConsumer + ctx context.Context +} + +var _ spanstore.Writer = (*writer)(nil) + +// WriteSpan writes a span to the next consumer. +func (w writer) WriteSpan(span *model.Span) error { + batch := model.Batch{ + Spans: []*model.Span{span}, + Process: span.Process, + } + traces := jaegertranslator.ProtoBatchToInternalTraces(batch) + return w.nextConsumer.ConsumeTraces(w.ctx, traces) +} diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/testdata/config.yaml b/cmd/opentelemetry-collector/app/receiver/kafka/testdata/config.yaml new file mode 100644 index 00000000000..e141606e2f4 --- /dev/null +++ b/cmd/opentelemetry-collector/app/receiver/kafka/testdata/config.yaml @@ -0,0 +1,33 @@ +receivers: + jaeger_kafka: + brokers: foo,bar + topic: jaeger-prod + encoding: emojis + authentication: + type: tls + plaintext: + username: user + password: 123 + tls: + enabled: true + ca: ca.crt + key: key.crt + cert: cert.crt + skip_host_verify: true + kerberos: + realm: jaeger + config_file: /etc/foo + + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [jaeger_kafka] + processors: [exampleprocessor] + exporters: [exampleexporter] diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/testdata/jaeger-config.yaml b/cmd/opentelemetry-collector/app/receiver/kafka/testdata/jaeger-config.yaml new file mode 100644 index 00000000000..6a9fb59fba4 --- /dev/null +++ b/cmd/opentelemetry-collector/app/receiver/kafka/testdata/jaeger-config.yaml @@ -0,0 +1,3 @@ +kafka.consumer: + kerberos: + username: from-jaeger-config diff --git a/cmd/opentelemetry-collector/cmd/agent/main.go b/cmd/opentelemetry-collector/cmd/agent/main.go index 5dba71bbc7d..ac281bcebf8 100644 --- a/cmd/opentelemetry-collector/cmd/agent/main.go +++ b/cmd/opentelemetry-collector/cmd/agent/main.go @@ -41,8 +41,8 @@ func main() { } info := service.ApplicationStartInfo{ - ExeName: "jaeger-opentelemetry-collector", - LongName: "Jaeger OpenTelemetry Collector", + ExeName: "jaeger-opentelemetry-agent", + LongName: "Jaeger OpenTelemetry Agent", // TODO //Version: version.Version, //GitHash: version.GitHash, diff --git a/cmd/opentelemetry-collector/go.sum b/cmd/opentelemetry-collector/go.sum index 8ac885aaeae..9b24761d417 100644 --- a/cmd/opentelemetry-collector/go.sum +++ b/cmd/opentelemetry-collector/go.sum @@ -104,6 +104,7 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDf github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/bombsimon/wsl/v2 v2.0.0 h1:+Vjcn+/T5lSrO8Bjzhk4v14Un/2UyCA1E3V5j9nwTkQ= github.com/bombsimon/wsl/v2 v2.0.0/go.mod h1:mf25kr/SqFEPhhcxW1+7pxzGlW+hIl/hYTKY95VwV8U= +github.com/bsm/sarama-cluster v2.1.13+incompatible h1:bqU3gMJbWZVxLZ9PGWVKP05yOmFXUlfw61RBwuE3PYU= github.com/bsm/sarama-cluster v2.1.13+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index eba0e847858..f6b08f31d20 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -37,14 +37,14 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka consumer type Configuration struct { - auth.AuthenticationConfig + auth.AuthenticationConfig `mapstructure:"authentication"` Consumer - Brokers []string - Topic string - GroupID string - ClientID string - ProtocolVersion string + Brokers []string `mapstructure:"brokers"` + Topic string `mapstructure:"topic"` + GroupID string `mapstructure:"group_id"` + ClientID string `mapstructure:"client_id"` + ProtocolVersion string `mapstructure:"protocol_version"` } // NewConsumer creates a new kafka consumer From 86fb9ac42c65ed9b4ac613d285e3fa10ebc3b0b6 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Wed, 6 May 2020 14:01:59 +0200 Subject: [PATCH 2/3] Fix review Signed-off-by: Pavol Loffay --- cmd/opentelemetry-collector/app/defaults/defaults.go | 4 ++-- cmd/opentelemetry-collector/app/receiver/kafka/config.go | 4 ++-- .../app/receiver/kafka/config_test.go | 1 + cmd/opentelemetry-collector/app/receiver/kafka/factory.go | 8 ++++---- .../app/receiver/kafka/factory_test.go | 6 +++--- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/cmd/opentelemetry-collector/app/defaults/defaults.go b/cmd/opentelemetry-collector/app/defaults/defaults.go index 8cb909c66d6..651cac04c32 100644 --- a/cmd/opentelemetry-collector/app/defaults/defaults.go +++ b/cmd/opentelemetry-collector/app/defaults/defaults.go @@ -25,7 +25,7 @@ import ( "github.com/spf13/pflag" "github.com/spf13/viper" - "github.com/jaegertracing/jaeger/cmd/ingester/app" + ingestrApp "github.com/jaegertracing/jaeger/cmd/ingester/app" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter" @@ -59,7 +59,7 @@ func Components(v *viper.Viper) config.Factories { opts.InitFromViper(v) return opts }} - kafkaRec := &kafkaRec.Factory{OptionsFactory: func() *app.Options { + kafkaRec := &kafkaRec.Factory{OptionsFactory: func() *ingestrApp.Options { opts := kafkaRec.DefaultOptions() opts.InitFromViper(v) return opts diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/config.go b/cmd/opentelemetry-collector/app/receiver/kafka/config.go index 5b7b6f8d19d..9fefde55c74 100644 --- a/cmd/opentelemetry-collector/app/receiver/kafka/config.go +++ b/cmd/opentelemetry-collector/app/receiver/kafka/config.go @@ -17,11 +17,11 @@ package kafka import ( "github.com/open-telemetry/opentelemetry-collector/config/configmodels" - "github.com/jaegertracing/jaeger/cmd/ingester/app" + ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app" ) // Config hold configuration for Jaeger kafka receiver/ingester. type Config struct { configmodels.ReceiverSettings `mapstructure:",squash"` - app.Options `mapstructure:",squash"` + ingesterApp.Options `mapstructure:",squash"` } diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/config_test.go b/cmd/opentelemetry-collector/app/receiver/kafka/config_test.go index b477f5a659b..147902c220b 100644 --- a/cmd/opentelemetry-collector/app/receiver/kafka/config_test.go +++ b/cmd/opentelemetry-collector/app/receiver/kafka/config_test.go @@ -81,6 +81,7 @@ func TestLoadConfigAndFlags(t *testing.T) { assert.Equal(t, "user", kafkaCfg.Options.PlainText.UserName) assert.Equal(t, "123", kafkaCfg.Options.PlainText.Password) assert.Equal(t, true, kafkaCfg.Options.TLS.Enabled) + assert.Equal(t, "ca.crt", kafkaCfg.Options.TLS.CAPath) assert.Equal(t, "key.crt", kafkaCfg.Options.TLS.KeyPath) assert.Equal(t, "cert.crt", kafkaCfg.Options.TLS.CertPath) assert.Equal(t, true, kafkaCfg.Options.TLS.SkipHostVerify) diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/factory.go b/cmd/opentelemetry-collector/app/receiver/kafka/factory.go index ddf091e920e..ae1695590f2 100644 --- a/cmd/opentelemetry-collector/app/receiver/kafka/factory.go +++ b/cmd/opentelemetry-collector/app/receiver/kafka/factory.go @@ -23,7 +23,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/config/configmodels" "github.com/open-telemetry/opentelemetry-collector/consumer" - "github.com/jaegertracing/jaeger/cmd/ingester/app" + ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app" ) const ( @@ -31,11 +31,11 @@ const ( ) // OptionsFactory returns initialized ingester app.Options structure. -type OptionsFactory func() *app.Options +type OptionsFactory func() *ingesterApp.Options // DefaultOptions creates Kafka options supported by this receiver. -func DefaultOptions() *app.Options { - return &app.Options{} +func DefaultOptions() *ingesterApp.Options { + return &ingesterApp.Options{} } type Factory struct { diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/factory_test.go b/cmd/opentelemetry-collector/app/receiver/kafka/factory_test.go index 7110e3307bc..1a8e5237680 100644 --- a/cmd/opentelemetry-collector/app/receiver/kafka/factory_test.go +++ b/cmd/opentelemetry-collector/app/receiver/kafka/factory_test.go @@ -26,16 +26,16 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" - "github.com/jaegertracing/jaeger/cmd/ingester/app" + ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app" jConfig "github.com/jaegertracing/jaeger/pkg/config" ) // TODO failing func TestCreateTraceReceiver(t *testing.T) { - v, _ := jConfig.Viperize(app.AddFlags) + v, _ := jConfig.Viperize(ingesterApp.AddFlags) opts := DefaultOptions() opts.InitFromViper(v) - factory := &Factory{OptionsFactory: func() *app.Options { + factory := &Factory{OptionsFactory: func() *ingesterApp.Options { return opts }} exporter, err := factory.CreateTraceReceiver(context.Background(), component.ReceiverCreateParams{Logger: zap.NewNop()}, factory.CreateDefaultConfig(), nil) From 69ec7d68af4e6d465561f289ff5831e509409767 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Wed, 6 May 2020 15:07:27 +0200 Subject: [PATCH 3/3] Use tls.cert from flags Signed-off-by: Pavol Loffay --- cmd/opentelemetry-collector/app/defaults/defaults.go | 4 ++-- cmd/opentelemetry-collector/app/receiver/kafka/config_test.go | 4 ++-- .../app/receiver/kafka/testdata/config.yaml | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/cmd/opentelemetry-collector/app/defaults/defaults.go b/cmd/opentelemetry-collector/app/defaults/defaults.go index 651cac04c32..93522c00e62 100644 --- a/cmd/opentelemetry-collector/app/defaults/defaults.go +++ b/cmd/opentelemetry-collector/app/defaults/defaults.go @@ -25,7 +25,7 @@ import ( "github.com/spf13/pflag" "github.com/spf13/viper" - ingestrApp "github.com/jaegertracing/jaeger/cmd/ingester/app" + ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/cassandra" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch" "github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter" @@ -59,7 +59,7 @@ func Components(v *viper.Viper) config.Factories { opts.InitFromViper(v) return opts }} - kafkaRec := &kafkaRec.Factory{OptionsFactory: func() *ingestrApp.Options { + kafkaRec := &kafkaRec.Factory{OptionsFactory: func() *ingesterApp.Options { opts := kafkaRec.DefaultOptions() opts.InitFromViper(v) return opts diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/config_test.go b/cmd/opentelemetry-collector/app/receiver/kafka/config_test.go index 147902c220b..38e92a1c94d 100644 --- a/cmd/opentelemetry-collector/app/receiver/kafka/config_test.go +++ b/cmd/opentelemetry-collector/app/receiver/kafka/config_test.go @@ -53,7 +53,7 @@ func TestLoadConfigAndFlags(t *testing.T) { require.NoError(t, err) v, c := jConfig.Viperize(app.AddFlags, flags.AddConfigFileFlag) - err = c.ParseFlags([]string{"--config-file=./testdata/jaeger-config.yaml", "--kafka.consumer.topic=jaeger-test", "--kafka.consumer.brokers=host1,host2"}) + err = c.ParseFlags([]string{"--config-file=./testdata/jaeger-config.yaml", "--kafka.consumer.topic=jaeger-test", "--kafka.consumer.brokers=host1,host2", "--kafka.consumer.tls.cert=from-flag"}) require.NoError(t, err) err = flags.TryLoadConfigFile(v) @@ -83,7 +83,7 @@ func TestLoadConfigAndFlags(t *testing.T) { assert.Equal(t, true, kafkaCfg.Options.TLS.Enabled) assert.Equal(t, "ca.crt", kafkaCfg.Options.TLS.CAPath) assert.Equal(t, "key.crt", kafkaCfg.Options.TLS.KeyPath) - assert.Equal(t, "cert.crt", kafkaCfg.Options.TLS.CertPath) + assert.Equal(t, "from-flag", kafkaCfg.Options.TLS.CertPath) assert.Equal(t, true, kafkaCfg.Options.TLS.SkipHostVerify) assert.Equal(t, "jaeger", kafkaCfg.Options.Kerberos.Realm) assert.Equal(t, "/etc/foo", kafkaCfg.Options.Kerberos.ConfigPath) diff --git a/cmd/opentelemetry-collector/app/receiver/kafka/testdata/config.yaml b/cmd/opentelemetry-collector/app/receiver/kafka/testdata/config.yaml index e141606e2f4..19c8694bde2 100644 --- a/cmd/opentelemetry-collector/app/receiver/kafka/testdata/config.yaml +++ b/cmd/opentelemetry-collector/app/receiver/kafka/testdata/config.yaml @@ -12,7 +12,6 @@ receivers: enabled: true ca: ca.crt key: key.crt - cert: cert.crt skip_host_verify: true kerberos: realm: jaeger