Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use OTEL Kafka Exporter/Receiver Instead of Jaeger Core #2494

Merged
merged 18 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ type Options struct {

// AddFlags adds flags for Builder
func AddFlags(flagSet *flag.FlagSet) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please paste here the output of the help command from the ingester and collector with kafka storage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collector

$ SPAN_STORAGE_TYPE=kafka go run ./cmd/collector --help
2020/09/25 14:00:43 maxprocs: Leaving GOMAXPROCS=8: CPU quota undefined
Jaeger collector receives traces from Jaeger agents and runs them through a processing pipeline.

Usage:
  jaeger-collector [flags]
  jaeger-collector [command]

Available Commands:
  docs        Generates documentation
  env         Help about environment variables.
  help        Help about any command
  version     Print the version.

Flags:
      --admin-http-port int                            (deprecated, will be removed after 2020-06-30 or in release v1.20.0, whichever is later) see --admin.http.host-port
      --admin.http.host-port string                    The host:port (e.g. 127.0.0.1:5555 or :5555) for the admin server, including health check, /metrics, etc. (default ":14269")
      --collector.grpc-port int                        (deprecated, will be removed after 2020-06-30 or in release v1.20.0, whichever is later) see --collector.grpc-server.host-port
      --collector.grpc-server.host-port string         The host:port (e.g. 127.0.0.1:14250 or :14250) of the collector's GRPC server (default ":14250")
      --collector.grpc.tls                             (deprecated) see --collector.grpc.tls.enabled
      --collector.grpc.tls.cert string                 Path to a TLS Certificate file, used to identify this server to clients
      --collector.grpc.tls.client-ca string            Path to a TLS CA (Certification Authority) file used to verify certificates presented by clients (if unset, all clients are permitted)
      --collector.grpc.tls.client.ca string            (deprecated) see --collector.grpc.tls.client-ca
      --collector.grpc.tls.enabled                     Enable TLS on the server
      --collector.grpc.tls.key string                  Path to a TLS Private Key file, used to identify this server to clients
      --collector.http-port int                        (deprecated, will be removed after 2020-06-30 or in release v1.20.0, whichever is later) see --collector.http-server.host-port
      --collector.http-server.host-port string         The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's HTTP server (default ":14268")
      --collector.num-workers int                      The number of workers pulling items from the queue (default 50)
      --collector.queue-size int                       The queue size of the collector (default 2000)
      --collector.queue-size-memory uint               (experimental) The max memory size in MiB to use for the dynamic queue.
      --collector.tags string                          One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}
      --collector.zipkin.allowed-headers string        Comma separated list of allowed headers for the Zipkin collector service, default content-type (default "content-type")
      --collector.zipkin.allowed-origins string        Comma separated list of allowed origins for the Zipkin collector service, default accepts all (default "*")
      --collector.zipkin.host-port string              The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's Zipkin server (default ":0")
      --collector.zipkin.http-port int                 (deprecated, will be removed after 2020-06-30 or in release v1.20.0, whichever is later) see --collector.zipkin.host-port
      --config-file string                             Configuration file in JSON, TOML, YAML, HCL, or Java properties formats (default none). See spf13/viper for precedence.
      --downsampling.hashsalt string                   Salt used when hashing trace id for downsampling.
      --downsampling.ratio float                       Ratio of spans passed to storage after downsampling (between 0 and 1), e.g ratio = 0.3 means we are keeping 30% of spans and dropping 70% of spans; ratio = 1.0 disables downsampling. (default 1)
      --health-check-http-port int                     (deprecated, will be removed after 2020-03-15 or in release v1.19.0, whichever is later) see --admin.http.host-port
  -h, --help                                           help for jaeger-collector
      --kafka.producer.authentication string           Authentication type used to authenticate with kafka cluster. e.g. none, kerberos, tls (default "none")
      --kafka.producer.batch-linger duration           (experimental) Time interval to wait before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/ (default 0s)
      --kafka.producer.batch-max-messages int          (experimental) Maximum number of message to batch before sending records to Kafka
      --kafka.producer.batch-min-messages int          (experimental) The best-effort minimum number of messages needed to send a batch of records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/
      --kafka.producer.batch-size int                  (experimental) Number of bytes to batch before sending records to Kafka. Higher value reduce request to Kafka but increase latency and the possibility of data loss in case of process restart. See https://kafka.apache.org/documentation/
      --kafka.producer.brokers string                  The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234' (default "127.0.0.1:9092")
      --kafka.producer.compression string              (experimental) Type of compression (none, gzip, snappy, lz4, zstd) to use on messages (default "none")
      --kafka.producer.compression-level int           (experimental) compression level to use on messages. gzip = 1-9 (default = 6), snappy = none, lz4 = 1-17 (default = 9), zstd = -131072 - 22 (default = 3)
      --kafka.producer.encoding string                 Encoding of spans ("json" or "protobuf") sent to kafka. (default "protobuf")
      --kafka.producer.kerberos.config-file string     Path to Kerberos configuration. i.e /etc/krb5.conf (default "/etc/krb5.conf")
      --kafka.producer.kerberos.keytab-file string     Path to keytab file. i.e /etc/security/kafka.keytab (default "/etc/security/kafka.keytab")
      --kafka.producer.kerberos.password string        The Kerberos password used for authenticate with KDC
      --kafka.producer.kerberos.realm string           Kerberos realm
      --kafka.producer.kerberos.service-name string    Kerberos service name (default "kafka")
      --kafka.producer.kerberos.use-keytab             Use of keytab instead of password, if this is true, keytab file will be used instead of password
      --kafka.producer.kerberos.username string        The Kerberos username used for authenticate with KDC
      --kafka.producer.plaintext.password string       The plaintext Password for SASL/PLAIN authentication
      --kafka.producer.plaintext.username string       The plaintext Username for SASL/PLAIN authentication
      --kafka.producer.protocol-version string         Kafka protocol version - must be supported by kafka server
      --kafka.producer.required-acks string            (experimental) Required kafka broker acknowledgement. i.e. noack, local, all (default "local")
      --kafka.producer.tls                             (deprecated) see --kafka.producer.tls.enabled
      --kafka.producer.tls.ca string                   Path to a TLS CA (Certification Authority) file used to verify the remote server(s) (by default will use the system truststore)
      --kafka.producer.tls.cert string                 Path to a TLS Certificate file, used to identify this process to the remote server(s)
      --kafka.producer.tls.enabled                     Enable TLS when talking to the remote server(s)
      --kafka.producer.tls.key string                  Path to a TLS Private Key file, used to identify this process to the remote server(s)
      --kafka.producer.tls.server-name string          Override the TLS server name we expect in the certificate of the remote server(s)
      --kafka.producer.tls.skip-host-verify            (insecure) Skip server's certificate chain and host name verification
      --kafka.producer.topic string                    The name of the kafka topic (default "jaeger-spans")
      --log-level string                               Minimal allowed log Level. For more levels see https://github.com/uber-go/zap (default "info")
      --metrics-backend string                         Defines which metrics backend to use for metrics reporting: expvar, prometheus, none (default "prometheus")
      --metrics-http-route string                      Defines the route of HTTP endpoint for metrics backends that support scraping (default "/metrics")
      --sampling.strategies-file string                The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file
      --sampling.strategies-reload-interval duration   Reload interval to check and reload sampling strategies file. Zero value means no reloading (default 0s)
      --span-storage.type string                       (deprecated) please use SPAN_STORAGE_TYPE environment variable. Run this binary with the 'env' command for help.

Use "jaeger-collector [command] --help" for more information about a command.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ingester

$ go run ./cmd/ingester --help
2020/09/25 14:01:36 maxprocs: Leaving GOMAXPROCS=8: CPU quota undefined
Jaeger ingester consumes spans from a particular Kafka topic and writes them to a configured storage.

Usage:
  jaeger-ingester [flags]
  jaeger-ingester [command]

Available Commands:
  docs        Generates documentation
  env         Help about environment variables.
  help        Help about any command
  version     Print the version.

Flags:
      --admin-http-port int                             (deprecated, will be removed after 2020-06-30 or in release v1.20.0, whichever is later) see --admin.http.host-port
      --admin.http.host-port string                     The host:port (e.g. 127.0.0.1:5555 or :5555) for the admin server, including health check, /metrics, etc. (default ":14270")
      --cassandra-archive.connect-timeout duration      Timeout used for connections to Cassandra Servers (default 0s)
      --cassandra-archive.connections-per-host int      The number of Cassandra connections from a single backend instance
      --cassandra-archive.consistency string            The Cassandra consistency level, e.g. ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE (default LOCAL_ONE)
      --cassandra-archive.disable-compression           Disables the use of the default Snappy Compression while connecting to the Cassandra Cluster if set to true. This is useful for connecting to Cassandra Clusters(like Azure Cosmos Db with Cassandra API) that do not support SnappyCompression
      --cassandra-archive.enable-dependencies-v2        (deprecated) Jaeger will automatically detect the version of the dependencies table
      --cassandra-archive.enabled                       Enable extra storage
      --cassandra-archive.keyspace string               The Cassandra keyspace for Jaeger data
      --cassandra-archive.local-dc string               The name of the Cassandra local data center for DC Aware host selection
      --cassandra-archive.max-retry-attempts int        The number of attempts when reading from Cassandra
      --cassandra-archive.password string               Password for password authentication for Cassandra
      --cassandra-archive.port int                      The port for cassandra
      --cassandra-archive.proto-version int             The Cassandra protocol version
      --cassandra-archive.reconnect-interval duration   Reconnect interval to retry connecting to downed hosts (default 0s)
      --cassandra-archive.servers string                The comma-separated list of Cassandra servers
      --cassandra-archive.socket-keep-alive duration    Cassandra's keepalive period to use, enabled if > 0 (default 0s)
      --cassandra-archive.timeout duration              Timeout used for queries. A Timeout of zero means no timeout (default 0s)
      --cassandra-archive.tls                           (deprecated) see --cassandra-archive.tls.enabled
      --cassandra-archive.tls.ca string                 Path to a TLS CA (Certification Authority) file used to verify the remote server(s) (by default will use the system truststore)
      --cassandra-archive.tls.cert string               Path to a TLS Certificate file, used to identify this process to the remote server(s)
      --cassandra-archive.tls.enabled                   Enable TLS when talking to the remote server(s)
      --cassandra-archive.tls.key string                Path to a TLS Private Key file, used to identify this process to the remote server(s)
      --cassandra-archive.tls.server-name string        Override the TLS server name we expect in the certificate of the remote server(s)
      --cassandra-archive.tls.skip-host-verify          (insecure) Skip server's certificate chain and host name verification
      --cassandra-archive.tls.verify-host               (deprecated) Enable (or disable) host key verification. Use cassandra-archive.tls.skip-host-verify instead
      --cassandra-archive.username string               Username for password authentication for Cassandra
      --cassandra.connect-timeout duration              Timeout used for connections to Cassandra Servers (default 0s)
      --cassandra.connections-per-host int              The number of Cassandra connections from a single backend instance (default 2)
      --cassandra.consistency string                    The Cassandra consistency level, e.g. ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, LOCAL_ONE (default LOCAL_ONE)
      --cassandra.disable-compression                   Disables the use of the default Snappy Compression while connecting to the Cassandra Cluster if set to true. This is useful for connecting to Cassandra Clusters(like Azure Cosmos Db with Cassandra API) that do not support SnappyCompression
      --cassandra.enable-dependencies-v2                (deprecated) Jaeger will automatically detect the version of the dependencies table
      --cassandra.index.logs                            Controls log field indexing. Set to false to disable. (default true)
      --cassandra.index.process-tags                    Controls process tag indexing. Set to false to disable. (default true)
      --cassandra.index.tag-blacklist string            The comma-separated list of span tags to blacklist from being indexed. All other tags will be indexed. Mutually exclusive with the whitelist option.
      --cassandra.index.tag-whitelist string            The comma-separated list of span tags to whitelist for being indexed. All other tags will not be indexed. Mutually exclusive with the blacklist option.
      --cassandra.index.tags                            Controls tag indexing. Set to false to disable. (default true)
      --cassandra.keyspace string                       The Cassandra keyspace for Jaeger data (default "jaeger_v1_test")
      --cassandra.local-dc string                       The name of the Cassandra local data center for DC Aware host selection
      --cassandra.max-retry-attempts int                The number of attempts when reading from Cassandra (default 3)
      --cassandra.password string                       Password for password authentication for Cassandra
      --cassandra.port int                              The port for cassandra
      --cassandra.proto-version int                     The Cassandra protocol version (default 4)
      --cassandra.reconnect-interval duration           Reconnect interval to retry connecting to downed hosts (default 1m0s)
      --cassandra.servers string                        The comma-separated list of Cassandra servers (default "127.0.0.1")
      --cassandra.socket-keep-alive duration            Cassandra's keepalive period to use, enabled if > 0 (default 0s)
      --cassandra.span-store-write-cache-ttl duration   The duration to wait before rewriting an existing service or operation name (default 12h0m0s)
      --cassandra.timeout duration                      Timeout used for queries. A Timeout of zero means no timeout (default 0s)
      --cassandra.tls                                   (deprecated) see --cassandra.tls.enabled
      --cassandra.tls.ca string                         Path to a TLS CA (Certification Authority) file used to verify the remote server(s) (by default will use the system truststore)
      --cassandra.tls.cert string                       Path to a TLS Certificate file, used to identify this process to the remote server(s)
      --cassandra.tls.enabled                           Enable TLS when talking to the remote server(s)
      --cassandra.tls.key string                        Path to a TLS Private Key file, used to identify this process to the remote server(s)
      --cassandra.tls.server-name string                Override the TLS server name we expect in the certificate of the remote server(s)
      --cassandra.tls.skip-host-verify                  (insecure) Skip server's certificate chain and host name verification
      --cassandra.tls.verify-host                       (deprecated) Enable (or disable) host key verification. Use cassandra.tls.skip-host-verify instead
      --cassandra.username string                       Username for password authentication for Cassandra
      --config-file string                              Configuration file in JSON, TOML, YAML, HCL, or Java properties formats (default none). See spf13/viper for precedence.
      --downsampling.hashsalt string                    Salt used when hashing trace id for downsampling.
      --downsampling.ratio float                        Ratio of spans passed to storage after downsampling (between 0 and 1), e.g ratio = 0.3 means we are keeping 30% of spans and dropping 70% of spans; ratio = 1.0 disables downsampling. (default 1)
      --health-check-http-port int                      (deprecated, will be removed after 2020-03-15 or in release v1.19.0, whichever is later) see --admin.http.host-port
  -h, --help                                            help for jaeger-ingester
      --ingester.deadlockInterval duration              Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check. (default 0s)
      --ingester.parallelism string                     The number of messages to process in parallel (default "1000")
      --kafka.consumer.authentication string            Authentication type used to authenticate with kafka cluster. e.g. none, kerberos, tls (default "none")
      --kafka.consumer.brokers string                   The comma-separated list of kafka brokers. i.e. '127.0.0.1:9092,0.0.0:1234' (default "127.0.0.1:9092")
      --kafka.consumer.client-id string                 The Consumer Client ID that ingester will use (default "jaeger-ingester")
      --kafka.consumer.encoding string                  The encoding of spans ("json", "protobuf", "zipkin-thrift") consumed from kafka (default "protobuf")
      --kafka.consumer.group-id string                  The Consumer Group that ingester will be consuming on behalf of (default "jaeger-ingester")
      --kafka.consumer.kerberos.config-file string      Path to Kerberos configuration. i.e /etc/krb5.conf (default "/etc/krb5.conf")
      --kafka.consumer.kerberos.keytab-file string      Path to keytab file. i.e /etc/security/kafka.keytab (default "/etc/security/kafka.keytab")
      --kafka.consumer.kerberos.password string         The Kerberos password used for authenticate with KDC
      --kafka.consumer.kerberos.realm string            Kerberos realm
      --kafka.consumer.kerberos.service-name string     Kerberos service name (default "kafka")
      --kafka.consumer.kerberos.use-keytab              Use of keytab instead of password, if this is true, keytab file will be used instead of password
      --kafka.consumer.kerberos.username string         The Kerberos username used for authenticate with KDC
      --kafka.consumer.plaintext.password string        The plaintext Password for SASL/PLAIN authentication
      --kafka.consumer.plaintext.username string        The plaintext Username for SASL/PLAIN authentication
      --kafka.consumer.protocol-version string          Kafka protocol version - must be supported by kafka server
      --kafka.consumer.tls                              (deprecated) see --kafka.consumer.tls.enabled
      --kafka.consumer.tls.ca string                    Path to a TLS CA (Certification Authority) file used to verify the remote server(s) (by default will use the system truststore)
      --kafka.consumer.tls.cert string                  Path to a TLS Certificate file, used to identify this process to the remote server(s)
      --kafka.consumer.tls.enabled                      Enable TLS when talking to the remote server(s)
      --kafka.consumer.tls.key string                   Path to a TLS Private Key file, used to identify this process to the remote server(s)
      --kafka.consumer.tls.server-name string           Override the TLS server name we expect in the certificate of the remote server(s)
      --kafka.consumer.tls.skip-host-verify             (insecure) Skip server's certificate chain and host name verification
      --kafka.consumer.topic string                     The name of the kafka topic to consume from (default "jaeger-spans")
      --log-level string                                Minimal allowed log Level. For more levels see https://github.com/uber-go/zap (default "info")
      --metrics-backend string                          Defines which metrics backend to use for metrics reporting: expvar, prometheus, none (default "prometheus")
      --metrics-http-route string                       Defines the route of HTTP endpoint for metrics backends that support scraping (default "/metrics")
      --span-storage.type string                        (deprecated) please use SPAN_STORAGE_TYPE environment variable. Run this binary with the 'env' command for help.

Use "jaeger-ingester [command] --help" for more information about a command.

flagSet.String(
ConfigPrefix+SuffixParallelism,
strconv.Itoa(DefaultParallelism),
"The number of messages to process in parallel")
flagSet.Duration(
ConfigPrefix+SuffixDeadlockInterval,
DefaultDeadlockInterval,
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
AddOTELFlags(flagSet)
}

// AddOTELFlags adds only OTEL flags
func AddOTELFlags(flagSet *flag.FlagSet) {
// Authentication flags
flagSet.String(
KafkaConsumerConfigPrefix+SuffixBrokers,
DefaultBroker,
Expand All @@ -101,15 +115,6 @@ func AddFlags(flagSet *flag.FlagSet) {
KafkaConsumerConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \"")))
flagSet.String(
ConfigPrefix+SuffixParallelism,
strconv.Itoa(DefaultParallelism),
"The number of messages to process in parallel")
flagSet.Duration(
ConfigPrefix+SuffixDeadlockInterval,
DefaultDeadlockInterval,
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
// Authentication flags
auth.AddFlags(KafkaConsumerConfigPrefix, flagSet)
}

Expand Down
27 changes: 12 additions & 15 deletions cmd/opentelemetry/app/defaultcomponents/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"github.com/spf13/viper"
"go.opentelemetry.io/collector/component"
otelJaegerExporter "go.opentelemetry.io/collector/exporter/jaegerexporter"
otelKafkaExporter "go.opentelemetry.io/collector/exporter/kafkaexporter"
otelResourceProcessor "go.opentelemetry.io/collector/processor/resourceprocessor"
otelJaegerReceiver "go.opentelemetry.io/collector/receiver/jaegerreceiver"
otelKafkaReceiver "go.opentelemetry.io/collector/receiver/kafkareceiver"
otelZipkinReceiver "go.opentelemetry.io/collector/receiver/zipkinreceiver"
"go.opentelemetry.io/collector/service/defaultcomponents"

ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/badgerexporter"
"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/cassandraexporter"
"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/exporter/elasticsearchexporter"
Expand All @@ -42,7 +43,6 @@ import (
cassandraStorage "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
esStorage "github.com/jaegertracing/jaeger/plugin/storage/es"
grpcStorage "github.com/jaegertracing/jaeger/plugin/storage/grpc"
kafkaStorage "github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

// Components creates default and Jaeger factories
Expand All @@ -51,11 +51,6 @@ func Components(v *viper.Viper) component.Factories {
// We have to add all storage flags to viper because any exporter can be specified in the OTEL config file.
// OTEL collector creates default configurations for all factories to verify they can be created.
addDefaultValuesToViper(v)
kafkaExp := &kafkaexporter.Factory{OptionsFactory: func() *kafkaStorage.Options {
opts := kafkaexporter.DefaultOptions()
opts.InitFromViper(v)
return opts
}}
cassandraExp := &cassandraexporter.Factory{OptionsFactory: func() *cassandraStorage.Options {
opts := cassandraexporter.DefaultOptions()
opts.InitFromViper(v)
Expand All @@ -77,21 +72,22 @@ func Components(v *viper.Viper) component.Factories {
opts.InitFromViper(v)
return opts
})
kafkaRec := &kafkareceiver.Factory{OptionsFactory: func() *ingesterApp.Options {
opts := kafkareceiver.DefaultOptions()
opts.InitFromViper(v)
return opts
}}

factories, _ := defaultcomponents.Components()
factories.Exporters[kafkaExp.Type()] = kafkaExp
factories.Exporters[cassandraExp.Type()] = cassandraExp
factories.Exporters[esExp.Type()] = esExp
factories.Exporters[grpcExp.Type()] = grpcExp
factories.Exporters[memoryExp.Type()] = memoryExp
factories.Exporters[badgerExp.Type()] = badgerExp
factories.Receivers[kafkaRec.Type()] = kafkaRec

factories.Receivers[kafkareceiver.TypeStr] = &kafkareceiver.Factory{
Wrapped: otelKafkaReceiver.NewFactory(),
Viper: v,
}
factories.Exporters[kafkaexporter.TypeStr] = &kafkaexporter.Factory{
Wrapped: otelKafkaExporter.NewFactory(),
Viper: v,
}
factories.Receivers["jaeger"] = &jaegerreceiver.Factory{
Wrapped: otelJaegerReceiver.NewFactory(),
Viper: v,
Expand All @@ -115,7 +111,8 @@ func Components(v *viper.Viper) component.Factories {
// addDefaultValuesToViper adds Jaeger storage flags to viper to make the default values available.
func addDefaultValuesToViper(v *viper.Viper) {
flagSet := &flag.FlagSet{}
kafkaexporter.DefaultOptions().AddFlags(flagSet)
kafkareceiver.AddFlags(flagSet)
kafkaexporter.AddFlags(flagSet)
elasticsearchexporter.DefaultOptions().AddFlags(flagSet)
cassandraexporter.DefaultOptions().AddFlags(flagSet)
pflagSet := &pflag.FlagSet{}
Expand Down
4 changes: 0 additions & 4 deletions cmd/opentelemetry/app/defaultcomponents/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

func TestComponents(t *testing.T) {
v, _ := jConfig.Viperize(
kafkaexporter.DefaultOptions().AddFlags,
cassandraexporter.DefaultOptions().AddFlags,
elasticsearchexporter.DefaultOptions().AddFlags,
)
Expand All @@ -50,9 +49,6 @@ func TestComponents(t *testing.T) {
assert.IsType(t, &kafkareceiver.Factory{}, factories.Receivers[kafkareceiver.TypeStr])
assert.IsType(t, &zipkinreceiver.Factory{}, factories.Receivers["zipkin"])

kafkaFactory := factories.Exporters[kafkaexporter.TypeStr]
kc := kafkaFactory.CreateDefaultConfig().(*kafkaexporter.Config)
assert.Equal(t, []string{"127.0.0.1:9092"}, kc.Config.Brokers)
cassandraFactory := factories.Exporters[cassandraexporter.TypeStr]
cc := cassandraFactory.CreateDefaultConfig().(*cassandraexporter.Config)
assert.Equal(t, []string{"127.0.0.1"}, cc.Options.GetPrimary().Servers)
Expand Down
4 changes: 2 additions & 2 deletions cmd/opentelemetry/app/defaultconfig/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func createProcessors(factories component.Factories) (configmodels.Processors, [

func createReceivers(component ComponentType, factories component.Factories) configmodels.Receivers {
if component == Ingester {
kafkaReceiver := factories.Receivers[kafkareceiver.TypeStr].CreateDefaultConfig().(*kafkareceiver.Config)
kafkaReceiver := factories.Receivers[kafkareceiver.TypeStr].CreateDefaultConfig()
return configmodels.Receivers{
kafkaReceiver.Name(): kafkaReceiver,
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func createExporters(component ComponentType, storageTypes string, factories com
exporters[elasticsearchexporter.TypeStr] = es
case "kafka":
kaf := factories.Exporters[kafkaexporter.TypeStr].CreateDefaultConfig()
exporters[kafkaexporter.TypeStr] = kaf
exporters["kafka"] = kaf
case "grpc-plugin":
grpcEx := factories.Exporters[grpcpluginexporter.TypeStr].CreateDefaultConfig()
exporters[grpcpluginexporter.TypeStr] = grpcEx
Expand Down
27 changes: 0 additions & 27 deletions cmd/opentelemetry/app/exporter/kafkaexporter/config.go

This file was deleted.

94 changes: 0 additions & 94 deletions cmd/opentelemetry/app/exporter/kafkaexporter/config_test.go

This file was deleted.

34 changes: 0 additions & 34 deletions cmd/opentelemetry/app/exporter/kafkaexporter/exporter.go

This file was deleted.

Loading