Skip to content

Commit

Permalink
Configure Jaeger receiver and exporter by flags
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay committed May 12, 2020
1 parent cd19b64 commit 460c611
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 25 deletions.
16 changes: 8 additions & 8 deletions cmd/collector/app/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ const (
collectorNumWorkers = "collector.num-workers"
collectorHTTPPort = "collector.http-port"
collectorGRPCPort = "collector.grpc-port"
collectorHTTPHostPort = "collector.http-server.host-port"
collectorGRPCHostPort = "collector.grpc-server.host-port"
CollectorHTTPHostPort = "collector.http-server.host-port"
CollectorGRPCHostPort = "collector.grpc-server.host-port"
collectorZipkinHTTPPort = "collector.zipkin.http-port"
collectorZipkinHTTPHostPort = "collector.zipkin.host-port"
collectorTags = "collector.tags"
Expand Down Expand Up @@ -79,11 +79,11 @@ type CollectorOptions struct {
func AddFlags(flags *flag.FlagSet) {
flags.Int(collectorQueueSize, DefaultQueueSize, "The queue size of the collector")
flags.Int(collectorNumWorkers, DefaultNumWorkers, "The number of workers pulling items from the queue")
flags.Int(collectorHTTPPort, 0, collectorHTTPPortWarning+" see --"+collectorHTTPHostPort)
flags.Int(collectorGRPCPort, 0, collectorGRPCPortWarning+" see --"+collectorGRPCHostPort)
flags.Int(collectorHTTPPort, 0, collectorHTTPPortWarning+" see --"+CollectorHTTPHostPort)
flags.Int(collectorGRPCPort, 0, collectorGRPCPortWarning+" see --"+CollectorGRPCHostPort)
flags.Int(collectorZipkinHTTPPort, 0, collectorZipkinHTTPPortWarning+" see --"+collectorZipkinHTTPHostPort)
flags.String(collectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's HTTP server")
flags.String(collectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's GRPC server")
flags.String(CollectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's HTTP server")
flags.String(CollectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's GRPC server")
flags.String(collectorZipkinHTTPHostPort, ports.PortToHostPort(0), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the collector's Zipkin server")
flags.Uint(collectorDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.")
flags.String(collectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}")
Expand All @@ -97,8 +97,8 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.DynQueueSizeMemory = v.GetUint(collectorDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes
cOpts.QueueSize = v.GetInt(collectorQueueSize)
cOpts.NumWorkers = v.GetInt(collectorNumWorkers)
cOpts.CollectorHTTPHostPort = getAddressFromCLIOptions(v.GetInt(collectorHTTPPort), v.GetString(collectorHTTPHostPort))
cOpts.CollectorGRPCHostPort = getAddressFromCLIOptions(v.GetInt(collectorGRPCPort), v.GetString(collectorGRPCHostPort))
cOpts.CollectorHTTPHostPort = getAddressFromCLIOptions(v.GetInt(collectorHTTPPort), v.GetString(CollectorHTTPHostPort))
cOpts.CollectorGRPCHostPort = getAddressFromCLIOptions(v.GetInt(collectorGRPCPort), v.GetString(CollectorGRPCHostPort))
cOpts.CollectorZipkinHTTPHostPort = getAddressFromCLIOptions(v.GetInt(collectorZipkinHTTPPort), v.GetString(collectorZipkinHTTPHostPort))
cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags))
cOpts.CollectorZipkinAllowedOrigins = v.GetString(collectorZipkinAllowedOrigins)
Expand Down
31 changes: 19 additions & 12 deletions cmd/opentelemetry-collector/app/defaults/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,24 @@ func CollectorConfig(storageType string, zipkinHostPort string, factories config

func createCollectorReceivers(zipkinHostPort string, factories config.Factories) configmodels.Receivers {
jaeger := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config)
// TODO load and serve sampling strategies
// TODO bind sampling strategies file
jaeger.Protocols = map[string]*receiver.SecureReceiverSettings{
"grpc": {
if jaeger.Protocols == nil {
jaeger.Protocols = map[string]*receiver.SecureReceiverSettings{}
}
// The CreateDefaultConfig is enabling protocols from flags
// we do not want to override it here
if _, ok := jaeger.Protocols["grpc"]; !ok {
jaeger.Protocols["grpc"] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: gRPCEndpoint,
},
},
"thrift_http": {
}
}
if _, ok := jaeger.Protocols["thrift_http"]; !ok {
jaeger.Protocols["thrift_http"] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: httpThriftBinaryEndpoint,
},
},
}
}
recvs := map[string]configmodels.Receiver{
"jaeger": jaeger,
Expand Down Expand Up @@ -159,17 +164,19 @@ func AgentConfig(factories config.Factories) *configmodels.Config {

func createAgentReceivers(factories config.Factories) configmodels.Receivers {
jaeger := factories.Receivers["jaeger"].CreateDefaultConfig().(*jaegerreceiver.Config)
jaeger.Protocols = map[string]*receiver.SecureReceiverSettings{
"thrift_compact": {
if _, ok := jaeger.Protocols["thrift_compact"]; !ok {
jaeger.Protocols["thrift_compact"] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: udpThriftCompactEndpoint,
},
},
"thrift_binary": {
}
}
if _, ok := jaeger.Protocols["thrift_binary"]; !ok {
jaeger.Protocols["thrift_binary"] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: udpThriftBinaryEndpoint,
},
},
}
}
recvs := configmodels.Receivers{
"jaeger": jaeger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter {
if len(repCfg.CollectorHostPorts) > 0 {
cfg.Endpoint = repCfg.CollectorHostPorts[0]
}
cfg.GRPCSettings.UseSecure = repCfg.TLS.Enabled
cfg.GRPCSettings.CertPemFile = repCfg.TLS.CertPath
cfg.GRPCSettings.ServerNameOverride = repCfg.TLS.ServerName
cfg.GRPCSettings.TLSConfig.UseSecure = repCfg.TLS.Enabled
cfg.GRPCSettings.TLSConfig.CaCert = repCfg.TLS.CAPath
cfg.GRPCSettings.TLSConfig.ClientCert = repCfg.TLS.CertPath
cfg.GRPCSettings.TLSConfig.ClientKey = repCfg.TLS.KeyPath
cfg.GRPCSettings.TLSConfig.ServerNameOverride = repCfg.TLS.ServerName
return cfg
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,21 @@ import (
"github.com/open-telemetry/opentelemetry-collector/component"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/consumer"
"github.com/open-telemetry/opentelemetry-collector/receiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/spf13/viper"

agentApp "github.com/jaegertracing/jaeger/cmd/agent/app"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
collectorApp "github.com/jaegertracing/jaeger/cmd/collector/app"
"github.com/jaegertracing/jaeger/plugin/sampling/strategystore/static"
)

const (
thriftBinaryHostPort = "processor.jaeger-binary.server-host-port"
thriftCompactHostPort = "processor.jaeger-compact.server-host-port"
)

// Factory wraps jaegerreceiver.Factory and makes the default config configurable via viper.
// For instance this enables using flags as default values in the config object.
type Factory struct {
Expand All @@ -48,9 +56,55 @@ func (f *Factory) Type() configmodels.Type {
func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
cfg := f.Wrapped.CreateDefaultConfig().(*jaegerreceiver.Config)
cfg.RemoteSampling = createDefaultSamplingConfig(f.Viper)
configureAgent(f.Viper, cfg)
configureCollector(f.Viper, cfg)
return cfg
}

func configureAgent(v *viper.Viper, cfg *jaegerreceiver.Config) {
aOpts := agentApp.Builder{}
aOpts.InitFromViper(v)
if v.IsSet(thriftBinaryHostPort) {
cfg.Protocols["thrift_binary"] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: v.GetString(thriftBinaryHostPort),
},
}
}
if v.IsSet(thriftCompactHostPort) {
cfg.Protocols["thrift_compact"] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: v.GetString(thriftCompactHostPort),
},
}
}
}

func configureCollector(v *viper.Viper, cfg *jaegerreceiver.Config) {
cOpts := collectorApp.CollectorOptions{}
cOpts.InitFromViper(v)
if v.IsSet(collectorApp.CollectorGRPCHostPort) {
cfg.Protocols["grpc"] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: cOpts.CollectorGRPCHostPort,
},
}
if cOpts.TLS.ClientCAPath != "" && cOpts.TLS.KeyPath != "" {
cfg.Protocols["grpc"].TLSCredentials = &receiver.TLSCredentials{
KeyFile: cOpts.TLS.KeyPath,
CertFile: cOpts.TLS.CertPath,
}
}
}
if v.IsSet(collectorApp.CollectorHTTPHostPort) {
cfg.Protocols["thrift_http"] = &receiver.SecureReceiverSettings{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: cOpts.CollectorHTTPHostPort,
},
}
}
}

func createDefaultSamplingConfig(v *viper.Viper) *jaegerreceiver.RemoteSamplingConfig {
var samplingConf *jaegerreceiver.RemoteSamplingConfig
strategyFile := v.GetString(static.SamplingStrategiesFile)
Expand All @@ -68,7 +122,12 @@ func createDefaultSamplingConfig(v *viper.Viper) *jaegerreceiver.RemoteSamplingC
if samplingConf == nil {
samplingConf = &jaegerreceiver.RemoteSamplingConfig{}
}
samplingConf.FetchEndpoint = repCfg.CollectorHostPorts[0]
samplingConf.GRPCSettings.Endpoint = repCfg.CollectorHostPorts[0]
samplingConf.GRPCSettings.TLSConfig.UseSecure = repCfg.TLS.Enabled
samplingConf.GRPCSettings.TLSConfig.CaCert = repCfg.TLS.CAPath
samplingConf.GRPCSettings.TLSConfig.ClientCert = repCfg.TLS.CertPath
samplingConf.GRPCSettings.TLSConfig.ClientKey = repCfg.TLS.KeyPath
samplingConf.GRPCSettings.TLSConfig.ServerNameOverride = repCfg.TLS.ServerName
}
return samplingConf
}
Expand Down
1 change: 1 addition & 0 deletions cmd/opentelemetry-collector/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func main() {
handleErr(err)

cmd := svc.Command()
// TODO add collector HTTP thrift and gRPC server host port flags
jconfig.AddFlags(v,
cmd,
jflags.AddConfigFileFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/opentelemetry-collector/cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func main() {
handleErr(err)
}
cmd := svc.Command()
// TODO add agent UDP processors flags
jConfig.AddFlags(v,
cmd,
collectorApp.AddFlags,
Expand Down
2 changes: 1 addition & 1 deletion cmd/opentelemetry-collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/Shopify/sarama v1.22.2-0.20190604114437-cd910a683f9f
github.com/imdario/mergo v0.3.9
github.com/jaegertracing/jaeger v1.17.0
github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200505202444-021607d68586
github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200512031848-f588c89b4778
github.com/open-telemetry/opentelemetry-proto v0.3.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.6.2
Expand Down
4 changes: 4 additions & 0 deletions cmd/opentelemetry-collector/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,8 @@ github.com/onsi/gomega v1.9.0 h1:R1uwffexN6Pr340GtYRIdZmAiN4J+iw6WG4wog1DUXg=
github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200505202444-021607d68586 h1:HD0/Opa/WiWaG+B6LP+5XT9lYqpErWd6DjH3ah9mQrU=
github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200505202444-021607d68586/go.mod h1:ht/uOm+HLoXWjhq6seX/BoTwlR0dtCKFe2Y23YRk2a4=
github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200512031848-f588c89b4778 h1:wcE5bWypMq95vlZGikUklYB6VPyg3pke/32nei5/Sww=
github.com/open-telemetry/opentelemetry-collector v0.3.1-0.20200512031848-f588c89b4778/go.mod h1:+q6GyqO1FHatlq93uplydIVjRhBrv04oQ23AbOqcn6E=
github.com/open-telemetry/opentelemetry-proto v0.3.0 h1:+ASAtcayvoELyCF40+rdCMlBOhZIn5TPDez85zSYc30=
github.com/open-telemetry/opentelemetry-proto v0.3.0/go.mod h1:PMR5GI0F7BSpio+rBGFxNm6SLzg3FypDTcFuQZnO+F8=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
Expand All @@ -968,6 +970,8 @@ github.com/openzipkin/zipkin-go v0.2.1 h1:noL5/5Uf1HpVl3wNsfkZhIKbSWCVi5jgqkONNx
github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
github.com/orijtech/prometheus-go-metrics-exporter v0.0.4 h1:AAHKuTu2lX4eMPKV+CRcTug9KdMZ/Ckez7KK+ddEQfM=
github.com/orijtech/prometheus-go-metrics-exporter v0.0.4/go.mod h1:BiTx/ugZex8LheBk3j53tktWaRdFjV5FCfT2o0P7msE=
github.com/orijtech/prometheus-go-metrics-exporter v0.0.5 h1:76JFgRIgNDA3pW1fUhmqinU2u5ndHv1gvapDfGG+7/c=
github.com/orijtech/prometheus-go-metrics-exporter v0.0.5/go.mod h1:BiTx/ugZex8LheBk3j53tktWaRdFjV5FCfT2o0P7msE=
github.com/ory/dockertest v3.3.5+incompatible/go.mod h1:1vX4m9wsvi00u5bseYwXaSnhNrne+V0E6LAcBILJdPs=
github.com/ory/dockertest/v3 v3.5.4/go.mod h1:J8ZUbNB2FOhm1cFZW9xBpDsODqsSWcyYgtJYVPcnF70=
github.com/ory/fosite v0.29.0/go.mod h1:0atSZmXO7CAcs6NPMI/Qtot8tmZYj04Nddoold4S2h0=
Expand Down

0 comments on commit 460c611

Please sign in to comment.