From 992cdc57871e26186511670ca001f26d215ce143 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Tue, 30 Jun 2020 08:36:23 -0700 Subject: [PATCH] Refactor jaeger receiver config to allow different configs for different protocols (#1208) Signed-off-by: Bogdan Drutu --- receiver/jaegerreceiver/config.go | 14 +- receiver/jaegerreceiver/config_test.go | 42 +++--- receiver/jaegerreceiver/factory.go | 129 +++++++++--------- receiver/jaegerreceiver/factory_test.go | 88 ++++++------ .../testdata/bad_proto_config.yaml | 3 +- .../bad_typo_default_proto_config.yaml | 20 +++ receiver/jaegerreceiver/trace_receiver.go | 39 ++---- .../jaegerreceiver/trace_receiver_test.go | 8 +- testbed/testbed/receivers.go | 4 +- testbed/testbed/senders.go | 19 +-- 10 files changed, 180 insertions(+), 186 deletions(-) create mode 100644 receiver/jaegerreceiver/testdata/bad_typo_default_proto_config.yaml diff --git a/receiver/jaegerreceiver/config.go b/receiver/jaegerreceiver/config.go index dc93ab3f41e8..406cc8d4a6ab 100644 --- a/receiver/jaegerreceiver/config.go +++ b/receiver/jaegerreceiver/config.go @@ -16,6 +16,7 @@ package jaegerreceiver import ( "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configprotocol" ) @@ -30,9 +31,16 @@ type RemoteSamplingConfig struct { configgrpc.GRPCClientSettings `mapstructure:",squash"` } +type Protocols struct { + GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"` + ThriftHTTP *confighttp.HTTPServerSettings `mapstructure:"thrift_http"` + ThriftBinary *configprotocol.ProtocolServerSettings `mapstructure:"thrift_binary"` + ThriftCompact *configprotocol.ProtocolServerSettings `mapstructure:"thrift_compact"` +} + // Config defines configuration for Jaeger receiver. type Config struct { - configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - Protocols map[string]*configprotocol.ProtocolServerSettings `mapstructure:"protocols"` - RemoteSampling *RemoteSamplingConfig `mapstructure:"remote_sampling"` + configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + Protocols `mapstructure:"protocols"` + RemoteSampling *RemoteSamplingConfig `mapstructure:"remote_sampling"` } diff --git a/receiver/jaegerreceiver/config_test.go b/receiver/jaegerreceiver/config_test.go index ada5c87cfabb..1b87b12d39cf 100644 --- a/receiver/jaegerreceiver/config_test.go +++ b/receiver/jaegerreceiver/config_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/config/configtls" @@ -48,17 +49,17 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "jaeger/customname", }, - Protocols: map[string]*configprotocol.ProtocolServerSettings{ - "grpc": { + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ Endpoint: "localhost:9876", }, - "thrift_http": { + ThriftHTTP: &confighttp.HTTPServerSettings{ Endpoint: ":3456", }, - "thrift_compact": { + ThriftCompact: &configprotocol.ProtocolServerSettings{ Endpoint: "0.0.0.0:456", }, - "thrift_binary": { + ThriftBinary: &configprotocol.ProtocolServerSettings{ Endpoint: "0.0.0.0:789", }, }, @@ -78,17 +79,17 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "jaeger/defaults", }, - Protocols: map[string]*configprotocol.ProtocolServerSettings{ - "grpc": { + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ Endpoint: defaultGRPCBindEndpoint, }, - "thrift_http": { + ThriftHTTP: &confighttp.HTTPServerSettings{ Endpoint: defaultHTTPBindEndpoint, }, - "thrift_compact": { + ThriftCompact: &configprotocol.ProtocolServerSettings{ Endpoint: defaultThriftCompactBindEndpoint, }, - "thrift_binary": { + ThriftBinary: &configprotocol.ProtocolServerSettings{ Endpoint: defaultThriftBinaryBindEndpoint, }, }, @@ -101,11 +102,11 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "jaeger/mixed", }, - Protocols: map[string]*configprotocol.ProtocolServerSettings{ - "grpc": { + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ Endpoint: "localhost:9876", }, - "thrift_compact": { + ThriftCompact: &configprotocol.ProtocolServerSettings{ Endpoint: defaultThriftCompactBindEndpoint, }, }, @@ -119,8 +120,8 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "jaeger/tls", }, - Protocols: map[string]*configprotocol.ProtocolServerSettings{ - "grpc": { + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ Endpoint: "localhost:9876", TLSCredentials: &configtls.TLSServerSetting{ TLSSetting: configtls.TLSSetting{ @@ -129,7 +130,7 @@ func TestLoadConfig(t *testing.T) { }, }, }, - "thrift_http": { + ThriftHTTP: &confighttp.HTTPServerSettings{ Endpoint: ":3456", }, }, @@ -142,12 +143,15 @@ func TestFailedLoadConfig(t *testing.T) { factory := &Factory{} factories.Receivers[typeStr] = factory + _, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_typo_default_proto_config.yaml"), factories) + assert.EqualError(t, err, "error reading settings for receiver type \"jaeger\": unknown protocols in the Jaeger receiver") + _, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_proto_config.yaml"), factories) - assert.EqualError(t, err, `error reading settings for receiver type "jaeger": unknown Jaeger protocol badproto`) + assert.EqualError(t, err, "error reading settings for receiver type \"jaeger\": 1 error(s) decoding:\n\n* 'protocols' has invalid keys: thrift_htttp") _, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_no_proto_config.yaml"), factories) - assert.EqualError(t, err, `error reading settings for receiver type "jaeger": must specify at least one protocol when using the Jaeger receiver`) + assert.EqualError(t, err, "error reading settings for receiver type \"jaeger\": must specify at least one protocol when using the Jaeger receiver") _, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_empty_config.yaml"), factories) - assert.EqualError(t, err, `error reading settings for receiver type "jaeger": empty config for Jaeger receiver`) + assert.EqualError(t, err, "error reading settings for receiver type \"jaeger\": empty config for Jaeger receiver") } diff --git a/receiver/jaegerreceiver/factory.go b/receiver/jaegerreceiver/factory.go index 3dce5667c8eb..58aec8d0a027 100644 --- a/receiver/jaegerreceiver/factory.go +++ b/receiver/jaegerreceiver/factory.go @@ -23,11 +23,11 @@ import ( "strconv" "github.com/spf13/viper" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configerror" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/consumer" @@ -44,9 +44,8 @@ const ( protoThriftCompact = "thrift_compact" // Default endpoints to bind to. - defaultGRPCBindEndpoint = "0.0.0.0:14250" - defaultHTTPBindEndpoint = "0.0.0.0:14268" - + defaultGRPCBindEndpoint = "0.0.0.0:14250" + defaultHTTPBindEndpoint = "0.0.0.0:14268" defaultThriftCompactBindEndpoint = "0.0.0.0:6831" defaultThriftBinaryBindEndpoint = "0.0.0.0:6832" defaultAgentRemoteSamplingHTTPPort = 5778 @@ -68,7 +67,10 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { return fmt.Errorf("empty config for Jaeger receiver") } - // first load the config normally + componentViperSection.SetConfigType("yaml") + + // UnmarshalExact will not set struct properties to nil even if no key is provided, + // so set the protocol structs to nil where the keys were omitted. err := componentViperSection.UnmarshalExact(intoCfg) if err != nil { return err @@ -79,20 +81,38 @@ func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { return fmt.Errorf("config type not *jaegerreceiver.Config") } - // next manually search for protocols in viper that do not appear in the normally loaded config - // these protocols were excluded during normal loading and we need to add defaults for them protocols := componentViperSection.GetStringMap(protocolsFieldName) if len(protocols) == 0 { return fmt.Errorf("must specify at least one protocol when using the Jaeger receiver") } - for k := range protocols { - if _, ok := receiverCfg.Protocols[k]; !ok { - if receiverCfg.Protocols[k], err = defaultsForProtocol(k); err != nil { - return err - } - } - } + knownProtocols := 0 + if _, ok := protocols[protoGRPC]; !ok { + receiverCfg.GRPC = nil + } else { + knownProtocols++ + } + if _, ok := protocols[protoThriftHTTP]; !ok { + receiverCfg.ThriftHTTP = nil + } else { + knownProtocols++ + } + if _, ok := protocols[protoThriftBinary]; !ok { + receiverCfg.ThriftBinary = nil + } else { + knownProtocols++ + } + if _, ok := protocols[protoThriftCompact]; !ok { + receiverCfg.ThriftCompact = nil + } else { + knownProtocols++ + } + // UnmarshalExact will ignore empty entries like a protocol with no values, so if a typo happened + // in the protocol that is intended to be enabled will not be enabled. So check if the protocols + // include only known protocols. + if len(protocols) != knownProtocols { + return fmt.Errorf("unknown protocols in the Jaeger receiver") + } return nil } } @@ -104,7 +124,20 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { TypeVal: typeStr, NameVal: typeStr, }, - Protocols: map[string]*configprotocol.ProtocolServerSettings{}, + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: defaultGRPCBindEndpoint, + }, + ThriftHTTP: &confighttp.HTTPServerSettings{ + Endpoint: defaultHTTPBindEndpoint, + }, + ThriftBinary: &configprotocol.ProtocolServerSettings{ + Endpoint: defaultThriftBinaryBindEndpoint, + }, + ThriftCompact: &configprotocol.ProtocolServerSettings{ + Endpoint: defaultThriftCompactBindEndpoint, + }, + }, } } @@ -120,53 +153,43 @@ func (f *Factory) CreateTraceReceiver( // that Jaeger receiver understands. rCfg := cfg.(*Config) - - protoGRPC := rCfg.Protocols[protoGRPC] - protoHTTP := rCfg.Protocols[protoThriftHTTP] - protoThriftCompact := rCfg.Protocols[protoThriftCompact] - protoThriftBinary := rCfg.Protocols[protoThriftBinary] remoteSamplingConfig := rCfg.RemoteSampling config := Configuration{} - var grpcServerOptions []grpc.ServerOption // Set ports - if protoGRPC != nil { + if rCfg.Protocols.GRPC != nil { var err error - config.CollectorGRPCPort, err = extractPortFromEndpoint(protoGRPC.Endpoint) + config.CollectorGRPCPort, err = extractPortFromEndpoint(rCfg.Protocols.GRPC.Endpoint) if err != nil { return nil, err } - if protoGRPC.TLSCredentials != nil { - tlsCfg, err := protoGRPC.TLSCredentials.LoadTLSConfig() - if err != nil { - return nil, fmt.Errorf("failed to configure TLS: %v", err) - } - grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsCfg))) + config.CollectorGRPCOptions, err = rCfg.Protocols.GRPC.ToServerOption() + if err != nil { + return nil, err } - config.CollectorGRPCOptions = grpcServerOptions } - if protoHTTP != nil { + if rCfg.Protocols.ThriftHTTP != nil { var err error - config.CollectorHTTPPort, err = extractPortFromEndpoint(protoHTTP.Endpoint) + config.CollectorHTTPPort, err = extractPortFromEndpoint(rCfg.Protocols.ThriftHTTP.Endpoint) if err != nil { return nil, err } } - if protoThriftBinary != nil { + if rCfg.Protocols.ThriftBinary != nil { var err error - config.AgentBinaryThriftPort, err = extractPortFromEndpoint(protoThriftBinary.Endpoint) + config.AgentBinaryThriftPort, err = extractPortFromEndpoint(rCfg.Protocols.ThriftBinary.Endpoint) if err != nil { return nil, err } } - if protoThriftCompact != nil { + if rCfg.Protocols.ThriftCompact != nil { var err error - config.AgentCompactThriftPort, err = extractPortFromEndpoint(protoThriftCompact.Endpoint) + config.AgentCompactThriftPort, err = extractPortFromEndpoint(rCfg.Protocols.ThriftCompact.Endpoint) if err != nil { return nil, err } @@ -198,13 +221,13 @@ func (f *Factory) CreateTraceReceiver( } } - if (protoGRPC == nil && protoHTTP == nil && protoThriftBinary == nil && protoThriftCompact == nil) || + if (rCfg.Protocols.GRPC == nil && rCfg.Protocols.ThriftHTTP == nil && rCfg.Protocols.ThriftBinary == nil && rCfg.Protocols.ThriftCompact == nil) || (config.CollectorGRPCPort == 0 && config.CollectorHTTPPort == 0 && config.CollectorThriftPort == 0 && config.AgentBinaryThriftPort == 0 && config.AgentCompactThriftPort == 0) { err := fmt.Errorf("either %v, %v, %v, or %v protocol endpoint with non-zero port must be enabled for %s receiver", - protoGRPC, - protoThriftHTTP, - protoThriftCompact, - protoThriftBinary, + rCfg.Protocols.GRPC, + rCfg.Protocols.ThriftHTTP, + rCfg.Protocols.ThriftCompact, + rCfg.Protocols.ThriftBinary, typeStr, ) return nil, err @@ -240,25 +263,3 @@ func extractPortFromEndpoint(endpoint string) (int, error) { } return int(port), nil } - -// returns a default value for a protocol name. this really just boils down to the endpoint -func defaultsForProtocol(proto string) (*configprotocol.ProtocolServerSettings, error) { - var defaultEndpoint string - - switch proto { - case protoGRPC: - defaultEndpoint = defaultGRPCBindEndpoint - case protoThriftHTTP: - defaultEndpoint = defaultHTTPBindEndpoint - case protoThriftBinary: - defaultEndpoint = defaultThriftBinaryBindEndpoint - case protoThriftCompact: - defaultEndpoint = defaultThriftCompactBindEndpoint - default: - return nil, fmt.Errorf("unknown Jaeger protocol %s", proto) - } - - return &configprotocol.ProtocolServerSettings{ - Endpoint: defaultEndpoint, - }, nil -} diff --git a/receiver/jaegerreceiver/factory_test.go b/receiver/jaegerreceiver/factory_test.go index 81b3d07f9720..bad249831752 100644 --- a/receiver/jaegerreceiver/factory_test.go +++ b/receiver/jaegerreceiver/factory_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configerror" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/config/configtls" ) @@ -48,8 +49,9 @@ func TestCreateReceiver(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() // have to enable at least one protocol for the jaeger receiver to be created - cfg.(*Config).Protocols[protoGRPC], _ = defaultsForProtocol(protoGRPC) - + cfg.(*Config).Protocols.GRPC = &configgrpc.GRPCServerSettings{ + Endpoint: defaultGRPCBindEndpoint, + } params := component.ReceiverCreateParams{Logger: zap.NewNop()} tReceiver, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.NoError(t, err, "receiver creation failed") @@ -64,9 +66,10 @@ func TestCreateReceiver(t *testing.T) { func TestCreateDefaultGRPCEndpoint(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() - rCfg := cfg.(*Config) - rCfg.Protocols[protoGRPC], _ = defaultsForProtocol(protoGRPC) + cfg.(*Config).Protocols.GRPC = &configgrpc.GRPCServerSettings{ + Endpoint: defaultGRPCBindEndpoint, + } params := component.ReceiverCreateParams{Logger: zap.NewNop()} r, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) @@ -77,13 +80,14 @@ func TestCreateDefaultGRPCEndpoint(t *testing.T) { func TestCreateTLSGPRCEndpoint(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() - rCfg := cfg.(*Config) - rCfg.Protocols[protoGRPC], _ = defaultsForProtocol(protoGRPC) - rCfg.Protocols[protoGRPC].TLSCredentials = &configtls.TLSServerSetting{ - TLSSetting: configtls.TLSSetting{ - CertFile: "./testdata/certificate.pem", - KeyFile: "./testdata/key.pem", + cfg.(*Config).Protocols.GRPC = &configgrpc.GRPCServerSettings{ + Endpoint: defaultGRPCBindEndpoint, + TLSCredentials: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "./testdata/certificate.pem", + KeyFile: "./testdata/key.pem", + }, }, } params := component.ReceiverCreateParams{Logger: zap.NewNop()} @@ -95,9 +99,10 @@ func TestCreateTLSGPRCEndpoint(t *testing.T) { func TestCreateInvalidHTTPEndpoint(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() - rCfg := cfg.(*Config) - rCfg.Protocols[protoThriftHTTP], _ = defaultsForProtocol(protoThriftHTTP) + cfg.(*Config).Protocols.ThriftHTTP = &confighttp.HTTPServerSettings{ + Endpoint: defaultHTTPBindEndpoint, + } params := component.ReceiverCreateParams{Logger: zap.NewNop()} r, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) @@ -108,9 +113,10 @@ func TestCreateInvalidHTTPEndpoint(t *testing.T) { func TestCreateInvalidThriftBinaryEndpoint(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() - rCfg := cfg.(*Config) - rCfg.Protocols[protoThriftBinary], _ = defaultsForProtocol(protoThriftBinary) + cfg.(*Config).Protocols.ThriftBinary = &configprotocol.ProtocolServerSettings{ + Endpoint: defaultThriftBinaryBindEndpoint, + } params := component.ReceiverCreateParams{Logger: zap.NewNop()} r, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) @@ -121,9 +127,10 @@ func TestCreateInvalidThriftBinaryEndpoint(t *testing.T) { func TestCreateInvalidThriftCompactEndpoint(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() - rCfg := cfg.(*Config) - rCfg.Protocols[protoThriftCompact], _ = defaultsForProtocol(protoThriftCompact) + cfg.(*Config).Protocols.ThriftCompact = &configprotocol.ProtocolServerSettings{ + Endpoint: defaultThriftCompactBindEndpoint, + } params := component.ReceiverCreateParams{Logger: zap.NewNop()} r, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) @@ -136,7 +143,9 @@ func TestDefaultAgentRemoteSamplingEndpointAndPort(t *testing.T) { cfg := factory.CreateDefaultConfig() rCfg := cfg.(*Config) - rCfg.Protocols[protoThriftCompact], _ = defaultsForProtocol(protoThriftCompact) + rCfg.Protocols.ThriftCompact = &configprotocol.ProtocolServerSettings{ + Endpoint: defaultThriftCompactBindEndpoint, + } rCfg.RemoteSampling = &RemoteSamplingConfig{} params := component.ReceiverCreateParams{Logger: zap.NewNop()} r, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) @@ -152,7 +161,9 @@ func TestAgentRemoteSamplingEndpoint(t *testing.T) { rCfg := cfg.(*Config) endpoint := "localhost:1234" - rCfg.Protocols[protoThriftCompact], _ = defaultsForProtocol(protoThriftCompact) + rCfg.Protocols.ThriftCompact = &configprotocol.ProtocolServerSettings{ + Endpoint: defaultThriftCompactBindEndpoint, + } rCfg.RemoteSampling = &RemoteSamplingConfig{ GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: endpoint, @@ -169,9 +180,8 @@ func TestAgentRemoteSamplingEndpoint(t *testing.T) { func TestCreateNoPort(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() - rCfg := cfg.(*Config) - rCfg.Protocols[protoThriftHTTP] = &configprotocol.ProtocolServerSettings{ + cfg.(*Config).Protocols.ThriftHTTP = &confighttp.HTTPServerSettings{ Endpoint: "localhost:", } params := component.ReceiverCreateParams{Logger: zap.NewNop()} @@ -182,9 +192,8 @@ func TestCreateNoPort(t *testing.T) { func TestCreateLargePort(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() - rCfg := cfg.(*Config) - rCfg.Protocols[protoThriftHTTP] = &configprotocol.ProtocolServerSettings{ + cfg.(*Config).Protocols.ThriftHTTP = &confighttp.HTTPServerSettings{ Endpoint: "localhost:65536", } params := component.ReceiverCreateParams{Logger: zap.NewNop()} @@ -195,11 +204,11 @@ func TestCreateLargePort(t *testing.T) { func TestCreateInvalidHost(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() - rCfg := cfg.(*Config) - rCfg.Protocols[protoGRPC] = &configprotocol.ProtocolServerSettings{ + cfg.(*Config).Protocols.GRPC = &configgrpc.GRPCServerSettings{ Endpoint: "1234", } + params := component.ReceiverCreateParams{Logger: zap.NewNop()} _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.Error(t, err, "receiver creation with bad hostname must fail") @@ -208,10 +217,8 @@ func TestCreateInvalidHost(t *testing.T) { func TestCreateNoProtocols(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() - rCfg := cfg.(*Config) - - rCfg.Protocols = make(map[string]*configprotocol.ProtocolServerSettings) + cfg.(*Config).Protocols = Protocols{} params := component.ReceiverCreateParams{Logger: zap.NewNop()} _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.Error(t, err, "receiver creation with no protocols must fail") @@ -220,12 +227,10 @@ func TestCreateNoProtocols(t *testing.T) { func TestThriftBinaryBadPort(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() - rCfg := cfg.(*Config) - rCfg.Protocols[protoThriftBinary] = &configprotocol.ProtocolServerSettings{ + cfg.(*Config).Protocols.ThriftBinary = &configprotocol.ProtocolServerSettings{ Endpoint: "localhost:65536", } - params := component.ReceiverCreateParams{Logger: zap.NewNop()} _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) assert.Error(t, err, "receiver creation with a bad thrift binary port must fail") @@ -234,9 +239,8 @@ func TestThriftBinaryBadPort(t *testing.T) { func TestThriftCompactBadPort(t *testing.T) { factory := Factory{} cfg := factory.CreateDefaultConfig() - rCfg := cfg.(*Config) - rCfg.Protocols[protoThriftCompact] = &configprotocol.ProtocolServerSettings{ + cfg.(*Config).Protocols.ThriftCompact = &configprotocol.ProtocolServerSettings{ Endpoint: "localhost:65536", } @@ -253,7 +257,9 @@ func TestRemoteSamplingConfigPropagation(t *testing.T) { hostPort := 5778 endpoint := "localhost:1234" strategyFile := "strategies.json" - rCfg.Protocols[protoGRPC], _ = defaultsForProtocol(protoGRPC) + rCfg.Protocols.GRPC = &configgrpc.GRPCServerSettings{ + Endpoint: defaultGRPCBindEndpoint, + } rCfg.RemoteSampling = &RemoteSamplingConfig{ GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: endpoint, @@ -275,10 +281,13 @@ func TestRemoteSamplingFileRequiresGRPC(t *testing.T) { cfg := factory.CreateDefaultConfig() rCfg := cfg.(*Config) - strategyFile := "strategies.json" - rCfg.Protocols[protoThriftCompact], _ = defaultsForProtocol(protoThriftCompact) + // Remove all default protocols + rCfg.Protocols = Protocols{} + rCfg.Protocols.ThriftCompact = &configprotocol.ProtocolServerSettings{ + Endpoint: defaultThriftCompactBindEndpoint, + } rCfg.RemoteSampling = &RemoteSamplingConfig{ - StrategyFile: strategyFile, + StrategyFile: "strategies.json", } params := component.ReceiverCreateParams{Logger: zap.NewNop()} _, err := factory.CreateTraceReceiver(context.Background(), params, cfg, nil) @@ -298,10 +307,3 @@ func TestCustomUnmarshalErrors(t *testing.T) { err = f(config.NewViper(), &RemoteSamplingConfig{}) assert.Error(t, err, "should not have been able to marshal to a non-jaegerreceiver config") } - -func TestDefaultsForProtocolError(t *testing.T) { - d, err := defaultsForProtocol("badproto") - - assert.Nil(t, d, "defaultsForProtocol should have returned nil") - assert.Error(t, err, "defaultsForProtocol should have errored") -} diff --git a/receiver/jaegerreceiver/testdata/bad_proto_config.yaml b/receiver/jaegerreceiver/testdata/bad_proto_config.yaml index b9593fcf2339..a87240a9303b 100644 --- a/receiver/jaegerreceiver/testdata/bad_proto_config.yaml +++ b/receiver/jaegerreceiver/testdata/bad_proto_config.yaml @@ -2,7 +2,8 @@ receivers: # The following demonstrates how to enable protocols with defaults jaeger: protocols: - badproto: + thrift_htttp: + endpoint: "127.0.0.1:123" processors: exampleprocessor: diff --git a/receiver/jaegerreceiver/testdata/bad_typo_default_proto_config.yaml b/receiver/jaegerreceiver/testdata/bad_typo_default_proto_config.yaml new file mode 100644 index 000000000000..753510fd62dd --- /dev/null +++ b/receiver/jaegerreceiver/testdata/bad_typo_default_proto_config.yaml @@ -0,0 +1,20 @@ +receivers: + # The following demonstrates how to enable protocols with defaults + jaeger: + protocols: + grpc: + endpoint: "127.0.0.1:123" + thrift_htttp: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [jaeger] + processors: [exampleprocessor] + exporters: [exampleexporter] diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index 09dafc2ba558..93d262d9f00c 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -15,9 +15,7 @@ package jaegerreceiver import ( - "bytes" "context" - "errors" "fmt" "io/ioutil" "mime" @@ -53,11 +51,6 @@ import ( jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger" ) -var ( - batchSubmitNotOkResponse = &jaeger.BatchSubmitResponse{} - batchSubmitOkResponse = &jaeger.BatchSubmitResponse{Ok: true} -) - // Configuration defines the behavior and the ports that // the Jaeger receiver will use. type Configuration struct { @@ -256,42 +249,26 @@ func (jr *jReceiver) stopTraceReceptionLocked() error { return } // Otherwise combine all these errors - buf := new(bytes.Buffer) - for _, err := range errs { - fmt.Fprintf(buf, "%s\n", err.Error()) - } - err = errors.New(buf.String()) + err = componenterror.CombineErrors(errs) }) return err } -func consumeTraces( - ctx context.Context, - batches []*jaeger.Batch, - consumer consumer.TraceConsumer, -) ([]*jaeger.BatchSubmitResponse, int, error) { - - jbsr := make([]*jaeger.BatchSubmitResponse, 0, len(batches)) - var consumerError error +func consumeTraces(ctx context.Context, batches []*jaeger.Batch, consumer consumer.TraceConsumer) (int, error) { + var consumerErrors []error numSpans := 0 for _, batch := range batches { numSpans += len(batch.Spans) - if consumerError != nil { - jbsr = append(jbsr, batchSubmitNotOkResponse) - continue - } td := jaegertranslator.ThriftBatchToInternalTraces(batch) - consumerError = consumer.ConsumeTraces(ctx, td) - jsr := batchSubmitOkResponse - if consumerError != nil { - jsr = batchSubmitNotOkResponse + err := consumer.ConsumeTraces(ctx, td) + if err != nil { + consumerErrors = append(consumerErrors, err) } - jbsr = append(jbsr, jsr) } - return jbsr, numSpans, consumerError + return numSpans, componenterror.CombineErrors(consumerErrors) } var _ jaeger.Agent = (*agentHandler)(nil) @@ -493,7 +470,7 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques return } - _, numSpans, err := consumeTraces(ctx, []*jaeger.Batch{batch}, jr.nextConsumer) + numSpans, err := consumeTraces(ctx, []*jaeger.Batch{batch}, jr.nextConsumer) if err != nil { http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError) } else { diff --git a/receiver/jaegerreceiver/trace_receiver_test.go b/receiver/jaegerreceiver/trace_receiver_test.go index e29b276995d9..5de0adc7cffe 100644 --- a/receiver/jaegerreceiver/trace_receiver_test.go +++ b/receiver/jaegerreceiver/trace_receiver_test.go @@ -45,7 +45,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/configprotocol" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exportertest" @@ -594,10 +594,8 @@ func TestSamplingStrategiesMutualTLS(t *testing.T) { // at least one protocol has to be enabled thriftHTTPPort, err := randomAvailablePort() require.NoError(t, err) - cfg.Protocols = map[string]*configprotocol.ProtocolServerSettings{ - "thrift_http": { - Endpoint: fmt.Sprintf("localhost:%d", thriftHTTPPort), - }, + cfg.Protocols.ThriftHTTP = &confighttp.HTTPServerSettings{ + Endpoint: fmt.Sprintf("localhost:%d", thriftHTTPPort), } exp, err := factory.CreateTraceReceiver(context.Background(), component.ReceiverCreateParams{Logger: zap.NewNop()}, cfg, exportertest.NewNopTraceExporter()) require.NoError(t, err) diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index debb726016bc..6b30e785dfd9 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -22,8 +22,8 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" - "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/receiver/jaegerreceiver" "go.opentelemetry.io/collector/receiver/opencensusreceiver" "go.opentelemetry.io/collector/receiver/otlpreceiver" @@ -147,7 +147,7 @@ func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer factory := jaegerreceiver.Factory{} cfg := factory.CreateDefaultConfig().(*jaegerreceiver.Config) cfg.SetName(jr.ProtocolName()) - cfg.Protocols["grpc"] = &configprotocol.ProtocolServerSettings{ + cfg.Protocols.GRPC = &configgrpc.GRPCServerSettings{ Endpoint: fmt.Sprintf("localhost:%d", jr.Port), } var err error diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index 1994c8cd96de..33b5818231f5 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -157,28 +157,11 @@ func (je *JaegerGRPCDataSender) Start() error { } func (je *JaegerGRPCDataSender) GenConfigYAMLStr() string { - // Note that this generates a receiver config for agent. - // We only need to enable gRPC protocol because that's what we use in tests. - // Due to bug in Jaeger receiver (https://go.opentelemetry.io/collector/issues/445) - // which makes it impossible to disable protocols that we don't need to receive on we - // have to use fake ports for all endpoints except gRPC, otherwise it is - // impossible to start the Collector because the standard ports for those protocols - // are already listened by mock Jaeger backend that is part of the tests. - // As soon as the bug is fixed remove the endpoints and use "disabled: true" setting - // instead. return fmt.Sprintf(` jaeger: protocols: grpc: - endpoint: "%s:%d" - thrift_tchannel: - endpoint: "localhost:8372" - thrift_compact: - endpoint: "localhost:8373" - thrift_binary: - endpoint: "localhost:8374" - thrift_http: - endpoint: "localhost:8375"`, je.Host, je.Port) + endpoint: "%s:%d"`, je.Host, je.Port) } func (je *JaegerGRPCDataSender) ProtocolName() string {