Skip to content

Commit

Permalink
Refactor jaeger receiver config to allow different configs for differ…
Browse files Browse the repository at this point in the history
…ent protocols (#1208)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Jun 30, 2020
1 parent 2200590 commit 992cdc5
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 186 deletions.
14 changes: 11 additions & 3 deletions receiver/jaegerreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jaegerreceiver

import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configprotocol"
)
Expand All @@ -30,9 +31,16 @@ type RemoteSamplingConfig struct {
configgrpc.GRPCClientSettings `mapstructure:",squash"`
}

type Protocols struct {
GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"`
ThriftHTTP *confighttp.HTTPServerSettings `mapstructure:"thrift_http"`
ThriftBinary *configprotocol.ProtocolServerSettings `mapstructure:"thrift_binary"`
ThriftCompact *configprotocol.ProtocolServerSettings `mapstructure:"thrift_compact"`
}

// Config defines configuration for Jaeger receiver.
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
Protocols map[string]*configprotocol.ProtocolServerSettings `mapstructure:"protocols"`
RemoteSampling *RemoteSamplingConfig `mapstructure:"remote_sampling"`
configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
Protocols `mapstructure:"protocols"`
RemoteSampling *RemoteSamplingConfig `mapstructure:"remote_sampling"`
}
42 changes: 23 additions & 19 deletions receiver/jaegerreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configprotocol"
"go.opentelemetry.io/collector/config/configtls"
Expand All @@ -48,17 +49,17 @@ func TestLoadConfig(t *testing.T) {
TypeVal: typeStr,
NameVal: "jaeger/customname",
},
Protocols: map[string]*configprotocol.ProtocolServerSettings{
"grpc": {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "localhost:9876",
},
"thrift_http": {
ThriftHTTP: &confighttp.HTTPServerSettings{
Endpoint: ":3456",
},
"thrift_compact": {
ThriftCompact: &configprotocol.ProtocolServerSettings{
Endpoint: "0.0.0.0:456",
},
"thrift_binary": {
ThriftBinary: &configprotocol.ProtocolServerSettings{
Endpoint: "0.0.0.0:789",
},
},
Expand All @@ -78,17 +79,17 @@ func TestLoadConfig(t *testing.T) {
TypeVal: typeStr,
NameVal: "jaeger/defaults",
},
Protocols: map[string]*configprotocol.ProtocolServerSettings{
"grpc": {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: defaultGRPCBindEndpoint,
},
"thrift_http": {
ThriftHTTP: &confighttp.HTTPServerSettings{
Endpoint: defaultHTTPBindEndpoint,
},
"thrift_compact": {
ThriftCompact: &configprotocol.ProtocolServerSettings{
Endpoint: defaultThriftCompactBindEndpoint,
},
"thrift_binary": {
ThriftBinary: &configprotocol.ProtocolServerSettings{
Endpoint: defaultThriftBinaryBindEndpoint,
},
},
Expand All @@ -101,11 +102,11 @@ func TestLoadConfig(t *testing.T) {
TypeVal: typeStr,
NameVal: "jaeger/mixed",
},
Protocols: map[string]*configprotocol.ProtocolServerSettings{
"grpc": {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "localhost:9876",
},
"thrift_compact": {
ThriftCompact: &configprotocol.ProtocolServerSettings{
Endpoint: defaultThriftCompactBindEndpoint,
},
},
Expand All @@ -119,8 +120,8 @@ func TestLoadConfig(t *testing.T) {
TypeVal: typeStr,
NameVal: "jaeger/tls",
},
Protocols: map[string]*configprotocol.ProtocolServerSettings{
"grpc": {
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "localhost:9876",
TLSCredentials: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
Expand All @@ -129,7 +130,7 @@ func TestLoadConfig(t *testing.T) {
},
},
},
"thrift_http": {
ThriftHTTP: &confighttp.HTTPServerSettings{
Endpoint: ":3456",
},
},
Expand All @@ -142,12 +143,15 @@ func TestFailedLoadConfig(t *testing.T) {

factory := &Factory{}
factories.Receivers[typeStr] = factory
_, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_typo_default_proto_config.yaml"), factories)
assert.EqualError(t, err, "error reading settings for receiver type \"jaeger\": unknown protocols in the Jaeger receiver")

_, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_proto_config.yaml"), factories)
assert.EqualError(t, err, `error reading settings for receiver type "jaeger": unknown Jaeger protocol badproto`)
assert.EqualError(t, err, "error reading settings for receiver type \"jaeger\": 1 error(s) decoding:\n\n* 'protocols' has invalid keys: thrift_htttp")

_, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_no_proto_config.yaml"), factories)
assert.EqualError(t, err, `error reading settings for receiver type "jaeger": must specify at least one protocol when using the Jaeger receiver`)
assert.EqualError(t, err, "error reading settings for receiver type \"jaeger\": must specify at least one protocol when using the Jaeger receiver")

_, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_empty_config.yaml"), factories)
assert.EqualError(t, err, `error reading settings for receiver type "jaeger": empty config for Jaeger receiver`)
assert.EqualError(t, err, "error reading settings for receiver type \"jaeger\": empty config for Jaeger receiver")
}
129 changes: 65 additions & 64 deletions receiver/jaegerreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"strconv"

"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configerror"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configprotocol"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -44,9 +44,8 @@ const (
protoThriftCompact = "thrift_compact"

// Default endpoints to bind to.
defaultGRPCBindEndpoint = "0.0.0.0:14250"
defaultHTTPBindEndpoint = "0.0.0.0:14268"

defaultGRPCBindEndpoint = "0.0.0.0:14250"
defaultHTTPBindEndpoint = "0.0.0.0:14268"
defaultThriftCompactBindEndpoint = "0.0.0.0:6831"
defaultThriftBinaryBindEndpoint = "0.0.0.0:6832"
defaultAgentRemoteSamplingHTTPPort = 5778
Expand All @@ -68,7 +67,10 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return fmt.Errorf("empty config for Jaeger receiver")
}

// first load the config normally
componentViperSection.SetConfigType("yaml")

// UnmarshalExact will not set struct properties to nil even if no key is provided,
// so set the protocol structs to nil where the keys were omitted.
err := componentViperSection.UnmarshalExact(intoCfg)
if err != nil {
return err
Expand All @@ -79,20 +81,38 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return fmt.Errorf("config type not *jaegerreceiver.Config")
}

// next manually search for protocols in viper that do not appear in the normally loaded config
// these protocols were excluded during normal loading and we need to add defaults for them
protocols := componentViperSection.GetStringMap(protocolsFieldName)
if len(protocols) == 0 {
return fmt.Errorf("must specify at least one protocol when using the Jaeger receiver")
}
for k := range protocols {
if _, ok := receiverCfg.Protocols[k]; !ok {
if receiverCfg.Protocols[k], err = defaultsForProtocol(k); err != nil {
return err
}
}
}

knownProtocols := 0
if _, ok := protocols[protoGRPC]; !ok {
receiverCfg.GRPC = nil
} else {
knownProtocols++
}
if _, ok := protocols[protoThriftHTTP]; !ok {
receiverCfg.ThriftHTTP = nil
} else {
knownProtocols++
}
if _, ok := protocols[protoThriftBinary]; !ok {
receiverCfg.ThriftBinary = nil
} else {
knownProtocols++
}
if _, ok := protocols[protoThriftCompact]; !ok {
receiverCfg.ThriftCompact = nil
} else {
knownProtocols++
}
// UnmarshalExact will ignore empty entries like a protocol with no values, so if a typo happened
// in the protocol that is intended to be enabled will not be enabled. So check if the protocols
// include only known protocols.
if len(protocols) != knownProtocols {
return fmt.Errorf("unknown protocols in the Jaeger receiver")
}
return nil
}
}
Expand All @@ -104,7 +124,20 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
TypeVal: typeStr,
NameVal: typeStr,
},
Protocols: map[string]*configprotocol.ProtocolServerSettings{},
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: defaultGRPCBindEndpoint,
},
ThriftHTTP: &confighttp.HTTPServerSettings{
Endpoint: defaultHTTPBindEndpoint,
},
ThriftBinary: &configprotocol.ProtocolServerSettings{
Endpoint: defaultThriftBinaryBindEndpoint,
},
ThriftCompact: &configprotocol.ProtocolServerSettings{
Endpoint: defaultThriftCompactBindEndpoint,
},
},
}
}

Expand All @@ -120,53 +153,43 @@ func (f *Factory) CreateTraceReceiver(
// that Jaeger receiver understands.

rCfg := cfg.(*Config)

protoGRPC := rCfg.Protocols[protoGRPC]
protoHTTP := rCfg.Protocols[protoThriftHTTP]
protoThriftCompact := rCfg.Protocols[protoThriftCompact]
protoThriftBinary := rCfg.Protocols[protoThriftBinary]
remoteSamplingConfig := rCfg.RemoteSampling

config := Configuration{}
var grpcServerOptions []grpc.ServerOption

// Set ports
if protoGRPC != nil {
if rCfg.Protocols.GRPC != nil {
var err error
config.CollectorGRPCPort, err = extractPortFromEndpoint(protoGRPC.Endpoint)
config.CollectorGRPCPort, err = extractPortFromEndpoint(rCfg.Protocols.GRPC.Endpoint)
if err != nil {
return nil, err
}

if protoGRPC.TLSCredentials != nil {
tlsCfg, err := protoGRPC.TLSCredentials.LoadTLSConfig()
if err != nil {
return nil, fmt.Errorf("failed to configure TLS: %v", err)
}
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsCfg)))
config.CollectorGRPCOptions, err = rCfg.Protocols.GRPC.ToServerOption()
if err != nil {
return nil, err
}
config.CollectorGRPCOptions = grpcServerOptions
}

if protoHTTP != nil {
if rCfg.Protocols.ThriftHTTP != nil {
var err error
config.CollectorHTTPPort, err = extractPortFromEndpoint(protoHTTP.Endpoint)
config.CollectorHTTPPort, err = extractPortFromEndpoint(rCfg.Protocols.ThriftHTTP.Endpoint)
if err != nil {
return nil, err
}
}

if protoThriftBinary != nil {
if rCfg.Protocols.ThriftBinary != nil {
var err error
config.AgentBinaryThriftPort, err = extractPortFromEndpoint(protoThriftBinary.Endpoint)
config.AgentBinaryThriftPort, err = extractPortFromEndpoint(rCfg.Protocols.ThriftBinary.Endpoint)
if err != nil {
return nil, err
}
}

if protoThriftCompact != nil {
if rCfg.Protocols.ThriftCompact != nil {
var err error
config.AgentCompactThriftPort, err = extractPortFromEndpoint(protoThriftCompact.Endpoint)
config.AgentCompactThriftPort, err = extractPortFromEndpoint(rCfg.Protocols.ThriftCompact.Endpoint)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -198,13 +221,13 @@ func (f *Factory) CreateTraceReceiver(
}
}

if (protoGRPC == nil && protoHTTP == nil && protoThriftBinary == nil && protoThriftCompact == nil) ||
if (rCfg.Protocols.GRPC == nil && rCfg.Protocols.ThriftHTTP == nil && rCfg.Protocols.ThriftBinary == nil && rCfg.Protocols.ThriftCompact == nil) ||
(config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0 && config.AgentBinaryThriftPort == 0 && config.AgentCompactThriftPort == 0) {
err := fmt.Errorf("either %v, %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver",
protoGRPC,
protoThriftHTTP,
protoThriftCompact,
protoThriftBinary,
rCfg.Protocols.GRPC,
rCfg.Protocols.ThriftHTTP,
rCfg.Protocols.ThriftCompact,
rCfg.Protocols.ThriftBinary,
typeStr,
)
return nil, err
Expand Down Expand Up @@ -240,25 +263,3 @@ func extractPortFromEndpoint(endpoint string) (int, error) {
}
return int(port), nil
}

// returns a default value for a protocol name. this really just boils down to the endpoint
func defaultsForProtocol(proto string) (*configprotocol.ProtocolServerSettings, error) {
var defaultEndpoint string

switch proto {
case protoGRPC:
defaultEndpoint = defaultGRPCBindEndpoint
case protoThriftHTTP:
defaultEndpoint = defaultHTTPBindEndpoint
case protoThriftBinary:
defaultEndpoint = defaultThriftBinaryBindEndpoint
case protoThriftCompact:
defaultEndpoint = defaultThriftCompactBindEndpoint
default:
return nil, fmt.Errorf("unknown Jaeger protocol %s", proto)
}

return &configprotocol.ProtocolServerSettings{
Endpoint: defaultEndpoint,
}, nil
}
Loading

0 comments on commit 992cdc5

Please sign in to comment.