From b732ef40026edaca97956fcf311b97f901c83d92 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Mon, 29 Jun 2020 13:39:49 -0700 Subject: [PATCH] Split OTLP receiver by protocols, allow mTLS support (#1223) Signed-off-by: Bogdan Drutu --- receiver/otlpreceiver/README.md | 35 +- receiver/otlpreceiver/config.go | 36 +- receiver/otlpreceiver/config_test.go | 180 ++++---- receiver/otlpreceiver/factory.go | 102 +++-- receiver/otlpreceiver/factory_test.go | 132 +++--- receiver/otlpreceiver/metrics/otlp.go | 13 +- receiver/otlpreceiver/metrics/otlp_test.go | 82 +++- receiver/otlpreceiver/options.go | 56 --- receiver/otlpreceiver/otlp.go | 265 +++-------- receiver/otlpreceiver/otlp_test.go | 420 +++++------------- .../testdata/bad_empty_config.yaml | 15 + .../testdata/bad_no_proto_config.yaml | 16 + .../testdata/bad_proto_config.yaml | 18 + receiver/otlpreceiver/testdata/config.yaml | 85 ++-- .../testdata/typo_default_proto_config.yaml | 18 + receiver/otlpreceiver/trace/otlp.go | 31 +- receiver/otlpreceiver/trace/otlp_test.go | 90 ++-- service/service_test.go | 2 + service/testdata/otelcol-config-minimal.yaml | 2 + testbed/testbed/receivers.go | 3 +- testbed/testbed/senders.go | 8 +- 21 files changed, 740 insertions(+), 869 deletions(-) delete mode 100644 receiver/otlpreceiver/options.go create mode 100644 receiver/otlpreceiver/testdata/bad_empty_config.yaml create mode 100644 receiver/otlpreceiver/testdata/bad_no_proto_config.yaml create mode 100644 receiver/otlpreceiver/testdata/bad_proto_config.yaml create mode 100644 receiver/otlpreceiver/testdata/typo_default_proto_config.yaml diff --git a/receiver/otlpreceiver/README.md b/receiver/otlpreceiver/README.md index cfba2efbe48..a2073583404 100644 --- a/receiver/otlpreceiver/README.md +++ b/receiver/otlpreceiver/README.md @@ -3,17 +3,28 @@ This is the default receiver for the OpenTelemetry project. To get started, all that is required to enable the OpenTelemetry receiver is to -include it in the receiver definitions. This will enable the default values as -specified [here](./factory.go). +include it in the receiver definitions and defined the enabled protocols. This will +enable the default values as specified [here](./factory.go). The following is an example: ```yaml receivers: otlp: + protocols: + grpc: + http: ``` The full list of settings exposed for this receiver are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). +A protocol can be disabled by simply not specifying it in the list of protocols: +```yaml +receivers: + otlp/only_grpc: + protocols: + grpc: +``` + ## Communicating over TLS This receiver supports communication using Transport Layer Security (TLS). TLS can be configured by specifying a `tls_credentials` object in the receiver @@ -21,9 +32,11 @@ configuration for receivers that support it. ```yaml receivers: otlp: - tls_credentials: - key_file: /key.pem # path to private key - cert_file: /cert.pem # path to certificate + protocols: + grpc: + tls_credentials: + key_file: /key.pem # path to private key + cert_file: /cert.pem # path to certificate ``` ## Writing with HTTP/JSON @@ -45,9 +58,11 @@ specifying a list of allowed CORS origins in the `cors_allowed_origins` field: ```yaml receivers: otlp: - endpoint: "localhost:55680" - cors_allowed_origins: - - http://test.com - # Origins can have wildcards with *, use * by itself to match any origin. - - https://*.example.com + protocols: + http: + endpoint: "localhost:55681" + cors_allowed_origins: + - http://test.com + # Origins can have wildcards with *, use * by itself to match any origin. + - https://*.example.com ``` diff --git a/receiver/otlpreceiver/config.go b/receiver/otlpreceiver/config.go index 878656c92af..f6dd9f86154 100644 --- a/receiver/otlpreceiver/config.go +++ b/receiver/otlpreceiver/config.go @@ -16,39 +16,19 @@ package otlpreceiver import ( "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" ) +type Protocols struct { + GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"` + HTTP *confighttp.HTTPServerSettings `mapstructure:"http"` +} + // Config defines configuration for OTLP receiver. type Config struct { configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - // Configures the receiver server protocol. - configgrpc.GRPCServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - - // Transport to use: one of tcp or unix, defaults to tcp - Transport string `mapstructure:"transport"` - - // CorsOrigins are the allowed CORS origins for HTTP/JSON requests to grpc-gateway adapter - // for the OTLP receiver. See github.com/rs/cors - // An empty list means that CORS is not enabled at all. A wildcard (*) can be - // used to match any origin or one or more characters of an origin. - CorsOrigins []string `mapstructure:"cors_allowed_origins"` -} - -func (rOpts *Config) buildOptions() ([]Option, error) { - var opts []Option - if len(rOpts.CorsOrigins) > 0 { - opts = append(opts, WithCorsOrigins(rOpts.CorsOrigins)) - } - - grpcServerOptions, err := rOpts.GRPCServerSettings.ToServerOption() - if err != nil { - return nil, err - } - if len(grpcServerOptions) > 0 { - opts = append(opts, WithGRPCServerOptions(grpcServerOptions...)) - } - - return opts, nil + // Protocols is the configuration for the supported protocols, currently gRPC and HTTP (Proto and JSON). + Protocols `mapstructure:"protocols"` } diff --git a/receiver/otlpreceiver/config_test.go b/receiver/otlpreceiver/config_test.go index d8fab515087..ece446d55fb 100644 --- a/receiver/otlpreceiver/config_test.go +++ b/receiver/otlpreceiver/config_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configtls" ) @@ -39,144 +40,163 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, cfg) - assert.Equal(t, len(cfg.Receivers), 7) + assert.Equal(t, len(cfg.Receivers), 9) - r0 := cfg.Receivers["otlp"] - assert.Equal(t, r0, factory.CreateDefaultConfig()) + assert.Equal(t, cfg.Receivers["otlp"], factory.CreateDefaultConfig()) - r1 := cfg.Receivers["otlp/customname"].(*Config) - assert.Equal(t, r1, + defaultOnlyGRPC := factory.CreateDefaultConfig().(*Config) + defaultOnlyGRPC.SetName("otlp/only_grpc") + defaultOnlyGRPC.HTTP = nil + assert.Equal(t, cfg.Receivers["otlp/only_grpc"], defaultOnlyGRPC) + + defaultOnlyHTTP := factory.CreateDefaultConfig().(*Config) + defaultOnlyHTTP.SetName("otlp/only_http") + defaultOnlyHTTP.GRPC = nil + assert.Equal(t, cfg.Receivers["otlp/only_http"], defaultOnlyHTTP) + + assert.Equal(t, cfg.Receivers["otlp/customname"], &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: "otlp/customname", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "localhost:9090", - ReadBufferSize: 512 * 1024, + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "localhost:9090", + ReadBufferSize: 512 * 1024, + }, }, - Transport: "tcp", }) - r2 := cfg.Receivers["otlp/keepalive"].(*Config) - assert.Equal(t, r2, + assert.Equal(t, cfg.Receivers["otlp/keepalive"], &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: "otlp/keepalive", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", - ReadBufferSize: 512 * 1024, - Keepalive: &configgrpc.KeepaliveServerConfig{ - ServerParameters: &configgrpc.KeepaliveServerParameters{ - MaxConnectionIdle: 11 * time.Second, - MaxConnectionAge: 12 * time.Second, - MaxConnectionAgeGrace: 13 * time.Second, - Time: 30 * time.Second, - Timeout: 5 * time.Second, - }, - EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{ - MinTime: 10 * time.Second, - PermitWithoutStream: true, + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "0.0.0.0:55680", + ReadBufferSize: 512 * 1024, + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionIdle: 11 * time.Second, + MaxConnectionAge: 12 * time.Second, + MaxConnectionAgeGrace: 13 * time.Second, + Time: 30 * time.Second, + Timeout: 5 * time.Second, + }, + EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{ + MinTime: 10 * time.Second, + PermitWithoutStream: true, + }, }, }, }, - Transport: "tcp", }) - r3 := cfg.Receivers["otlp/msg-size-conc-connect-max-idle"].(*Config) - assert.Equal(t, r3, + assert.Equal(t, cfg.Receivers["otlp/msg-size-conc-connect-max-idle"], &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: "otlp/msg-size-conc-connect-max-idle", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", - MaxRecvMsgSizeMiB: 32, - MaxConcurrentStreams: 16, - ReadBufferSize: 1024, - WriteBufferSize: 1024, - Keepalive: &configgrpc.KeepaliveServerConfig{ - ServerParameters: &configgrpc.KeepaliveServerParameters{ - MaxConnectionIdle: 10 * time.Second, + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "0.0.0.0:55680", + MaxRecvMsgSizeMiB: 32, + MaxConcurrentStreams: 16, + ReadBufferSize: 1024, + WriteBufferSize: 1024, + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionIdle: 10 * time.Second, + }, }, }, }, - Transport: "tcp", }) // NOTE: Once the config loader checks for the files existence, this test may fail and require // use of fake cert/key for test purposes. - r4 := cfg.Receivers["otlp/tlscredentials"].(*Config) - assert.Equal(t, r4, + assert.Equal(t, cfg.Receivers["otlp/tlscredentials"], &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: "otlp/tlscredentials", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", - ReadBufferSize: 512 * 1024, - TLSCredentials: &configtls.TLSServerSetting{ - TLSSetting: configtls.TLSSetting{ - CertFile: "test.crt", - KeyFile: "test.key", + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "0.0.0.0:55680", + TLSCredentials: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "test.crt", + KeyFile: "test.key", + }, + }, + ReadBufferSize: 512 * 1024, + }, + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "0.0.0.0:55681", + TLSSetting: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "test.crt", + KeyFile: "test.key", + }, }, }, }, - Transport: "tcp", }) - r5 := cfg.Receivers["otlp/cors"].(*Config) - assert.Equal(t, r5, + assert.Equal(t, cfg.Receivers["otlp/cors"], &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: "otlp/cors", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", - ReadBufferSize: 512 * 1024, + Protocols: Protocols{ + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "0.0.0.0:55681", + CorsOrigins: []string{"https://*.test.com", "https://test.com"}, + }, }, - Transport: "tcp", - CorsOrigins: []string{"https://*.test.com", "https://test.com"}, }) - r6 := cfg.Receivers["otlp/uds"].(*Config) - assert.Equal(t, r6, + assert.Equal(t, cfg.Receivers["otlp/uds"], &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: "otlp/uds", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "/tmp/otlp.sock", - ReadBufferSize: 512 * 1024, + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "/tmp/grpc_otlp.sock", + // Transport: "unix", + ReadBufferSize: 512 * 1024, + }, + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "/tmp/http_otlp.sock", + // Transport: "unix", + }, }, - Transport: "unix", }) } -func TestBuildOptions_TLSCredentials(t *testing.T) { - cfg := Config{ - ReceiverSettings: configmodels.ReceiverSettings{ - NameVal: "IncorrectTLS", - }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - TLSCredentials: &configtls.TLSServerSetting{ - TLSSetting: configtls.TLSSetting{ - CertFile: "willfail", - }, - }, - }, - } - _, err := cfg.buildOptions() - assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) - - cfg.TLSCredentials = &configtls.TLSServerSetting{} - opt, err := cfg.buildOptions() +func TestFailedLoadConfig(t *testing.T) { + factories, err := config.ExampleComponents() assert.NoError(t, err) - assert.NotNil(t, opt) + + factory := &Factory{} + factories.Receivers[typeStr] = factory + _, err = config.LoadConfigFile(t, path.Join(".", "testdata", "typo_default_proto_config.yaml"), factories) + assert.EqualError(t, err, `error reading settings for receiver type "otlp": unknown protocols in the OTLP receiver`) + + _, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_proto_config.yaml"), factories) + assert.EqualError(t, err, "error reading settings for receiver type \"otlp\": 1 error(s) decoding:\n\n* 'protocols' has invalid keys: thrift") + + _, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_no_proto_config.yaml"), factories) + assert.EqualError(t, err, "error reading settings for receiver type \"otlp\": must specify at least one protocol when using the OTLP receiver") + + _, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_empty_config.yaml"), factories) + assert.EqualError(t, err, "error reading settings for receiver type \"otlp\": empty config for OTLP receiver") } diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index ed321ef95c7..618269f8b1f 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -16,9 +16,13 @@ package otlpreceiver import ( "context" + "fmt" + + "github.com/spf13/viper" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" ) @@ -26,6 +30,11 @@ import ( const ( // The value of "type" key in configuration. typeStr = "otlp" + + // Protocol values. + protoGRPC = "grpc" + protoHTTP = "http" + protocolsFieldName = "protocols" ) // Factory is the Factory for receiver. @@ -37,11 +46,6 @@ func (f *Factory) Type() configmodels.Type { return typeStr } -// CustomUnmarshaler returns nil because we don't need custom unmarshaling for this config. -func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { - return nil -} - // CreateDefaultConfig creates the default configuration for receiver. func (f *Factory) CreateDefaultConfig() configmodels.Receiver { return &Config{ @@ -49,18 +53,69 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { TypeVal: typeStr, NameVal: typeStr, }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", - // We almost write 0 bytes, so no need to tune WriteBufferSize. - ReadBufferSize: 512 * 1024, + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "0.0.0.0:55680", + // We almost write 0 bytes, so no need to tune WriteBufferSize. + ReadBufferSize: 512 * 1024, + }, + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "0.0.0.0:55681", + }, }, - Transport: "tcp", + } +} + +// CustomUnmarshaler is used to add defaults for named but empty protocols +func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { + return func(componentViperSection *viper.Viper, intoCfg interface{}) error { + if componentViperSection == nil || len(componentViperSection.AllKeys()) == 0 { + return fmt.Errorf("empty config for OTLP receiver") + } + // first load the config normally + err := componentViperSection.UnmarshalExact(intoCfg) + if err != nil { + return err + } + receiverCfg, ok := intoCfg.(*Config) + if !ok { + return fmt.Errorf("config type not *otlpreceiver.Config") + } + + // next manually search for protocols in viper, if a protocol is not present it means it is disable. + protocols := componentViperSection.GetStringMap(protocolsFieldName) + + // UnmarshalExact will ignore empty entries like a protocol with no values, so if a typo happened + // in the protocol that is intended to be enabled will not be enabled. So check if the protocols + // include only known protocols. + knownProtocols := 0 + if _, ok := protocols[protoGRPC]; !ok { + receiverCfg.GRPC = nil + } else { + knownProtocols++ + } + + if _, ok := protocols[protoHTTP]; !ok { + receiverCfg.HTTP = nil + } else { + knownProtocols++ + } + + if len(protocols) != knownProtocols { + return fmt.Errorf("unknown protocols in the OTLP receiver") + } + + if receiverCfg.GRPC == nil && receiverCfg.HTTP == nil { + return fmt.Errorf("must specify at least one protocol when using the OTLP receiver") + } + + return nil } } // CreateTraceReceiver creates a trace receiver based on provided config. func (f *Factory) CreateTraceReceiver( - _ context.Context, + ctx context.Context, _ component.ReceiverCreateParams, cfg configmodels.Receiver, nextConsumer consumer.TraceConsumer, @@ -69,27 +124,26 @@ func (f *Factory) CreateTraceReceiver( if err != nil { return nil, err } - - r.traceConsumer = nextConsumer - + if err = r.registerTraceConsumer(ctx, nextConsumer); err != nil { + return nil, err + } return r, nil } // CreateMetricsReceiver creates a metrics receiver based on provided config. func (f *Factory) CreateMetricsReceiver( - _ context.Context, + ctx context.Context, _ component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.MetricsConsumer, ) (component.MetricsReceiver, error) { - r, err := f.createReceiver(cfg) if err != nil { return nil, err } - - r.metricsConsumer = consumer - + if err = r.registerMetricsConsumer(ctx, consumer); err != nil { + return nil, err + } return r, nil } @@ -102,15 +156,9 @@ func (f *Factory) createReceiver(cfg configmodels.Receiver) (*Receiver, error) { // Check to see if there is already a receiver for this config. receiver, ok := receivers[rCfg] if !ok { - // Build the configuration options. - opts, err := rCfg.buildOptions() - if err != nil { - return nil, err - } - + var err error // We don't have a receiver, so create one. - receiver, err = New( - rCfg.Name(), rCfg.Transport, rCfg.Endpoint, nil, nil, opts...) + receiver, err = New(rCfg) if err != nil { return nil, err } diff --git a/receiver/otlpreceiver/factory_test.go b/receiver/otlpreceiver/factory_test.go index e13c2ced119..6713c22ce7b 100644 --- a/receiver/otlpreceiver/factory_test.go +++ b/receiver/otlpreceiver/factory_test.go @@ -17,7 +17,6 @@ package otlpreceiver import ( "context" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,6 +26,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/testutil" @@ -44,27 +44,30 @@ func TestCreateReceiver(t *testing.T) { cfg := factory.CreateDefaultConfig() config := cfg.(*Config) - config.Endpoint = testutil.GetAvailableLocalAddress(t) + config.GRPC.Endpoint = testutil.GetAvailableLocalAddress(t) + config.HTTP.Endpoint = testutil.GetAvailableLocalAddress(t) creationParams := component.ReceiverCreateParams{Logger: zap.NewNop()} - tReceiver, err := factory.CreateTraceReceiver(context.Background(), creationParams, cfg, nil) + tReceiver, err := factory.CreateTraceReceiver(context.Background(), creationParams, cfg, new(exportertest.SinkTraceExporter)) assert.NotNil(t, tReceiver) assert.NoError(t, err) - mReceiver, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, nil) + mReceiver, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, new(exportertest.SinkMetricsExporter)) assert.NotNil(t, mReceiver) assert.NoError(t, err) } func TestCreateTraceReceiver(t *testing.T) { factory := Factory{} - endpoint := testutil.GetAvailableLocalAddress(t) defaultReceiverSettings := configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: typeStr, } - defaultGRPCSettings := configgrpc.GRPCServerSettings{ - Endpoint: endpoint, + defaultGRPCSettings := &configgrpc.GRPCServerSettings{ + Endpoint: testutil.GetAvailableLocalAddress(t), + } + defaultHTTPSettings := &confighttp.HTTPServerSettings{ + Endpoint: testutil.GetAvailableLocalAddress(t), } tests := []struct { @@ -75,36 +78,44 @@ func TestCreateTraceReceiver(t *testing.T) { { name: "default", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - GRPCServerSettings: defaultGRPCSettings, - Transport: "tcp", + ReceiverSettings: defaultReceiverSettings, + Protocols: Protocols{ + GRPC: defaultGRPCSettings, + HTTP: defaultHTTPSettings, + }, }, }, { - name: "invalid_port", + name: "invalid_grpc_port", cfg: &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: typeStr, }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "localhost:112233", + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "localhost:112233", + }, + HTTP: defaultHTTPSettings, }, - Transport: "tcp", }, wantErr: true, }, { - name: "max-msg-size-and-concurrent-connections", + name: "invalid_http_port", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: endpoint, - MaxRecvMsgSizeMiB: 32, - MaxConcurrentStreams: 16, + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + Protocols: Protocols{ + GRPC: defaultGRPCSettings, + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "localhost:112233", + }, }, - Transport: "tcp", }, + wantErr: true, }, } ctx := context.Background() @@ -113,13 +124,13 @@ func TestCreateTraceReceiver(t *testing.T) { t.Run(tt.name, func(t *testing.T) { sink := new(exportertest.SinkTraceExporter) tr, err := factory.CreateTraceReceiver(ctx, creationParams, tt.cfg, sink) - if (err != nil) != tt.wantErr { - t.Errorf("factory.CreateTraceReceiver() error = %v, wantErr %v", err, tt.wantErr) - return - } - if tr != nil { - require.NoError(t, tr.Start(context.Background(), componenttest.NewNopHost()), "Start() error = %v", err) - tr.Shutdown(context.Background()) + assert.NoError(t, err) + require.NotNil(t, tr) + if tt.wantErr { + assert.Error(t, tr.Start(context.Background(), componenttest.NewNopHost())) + } else { + assert.NoError(t, tr.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, tr.Shutdown(context.Background())) } }) } @@ -127,13 +138,15 @@ func TestCreateTraceReceiver(t *testing.T) { func TestCreateMetricReceiver(t *testing.T) { factory := Factory{} - endpoint := testutil.GetAvailableLocalAddress(t) defaultReceiverSettings := configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: typeStr, } - defaultGRPCSettings := configgrpc.GRPCServerSettings{ - Endpoint: endpoint, + defaultGRPCSettings := &configgrpc.GRPCServerSettings{ + Endpoint: testutil.GetAvailableLocalAddress(t), + } + defaultHTTPSettings := &confighttp.HTTPServerSettings{ + Endpoint: testutil.GetAvailableLocalAddress(t), } tests := []struct { @@ -144,43 +157,44 @@ func TestCreateMetricReceiver(t *testing.T) { { name: "default", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - GRPCServerSettings: defaultGRPCSettings, - Transport: "tcp", + ReceiverSettings: defaultReceiverSettings, + Protocols: Protocols{ + GRPC: defaultGRPCSettings, + HTTP: defaultHTTPSettings, + }, }, }, { - name: "invalid_address", + name: "invalid_grpc_address", cfg: &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: typeStr, }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "327.0.0.1:1122", + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "327.0.0.1:1122", + }, + HTTP: defaultHTTPSettings, }, - Transport: "tcp", }, wantErr: true, }, { - name: "keepalive", + name: "invalid_http_address", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: endpoint, - Keepalive: &configgrpc.KeepaliveServerConfig{ - ServerParameters: &configgrpc.KeepaliveServerParameters{ - MaxConnectionAge: 60 * time.Second, - }, - EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{ - MinTime: 30 * time.Second, - PermitWithoutStream: true, - }, + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + Protocols: Protocols{ + GRPC: defaultGRPCSettings, + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "327.0.0.1:1122", }, }, - Transport: "tcp", }, + wantErr: true, }, } ctx := context.Background() @@ -188,14 +202,14 @@ func TestCreateMetricReceiver(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := new(exportertest.SinkMetricsExporter) - tc, err := factory.CreateMetricsReceiver(ctx, creationParams, tt.cfg, sink) - if (err != nil) != tt.wantErr { - t.Errorf("factory.CreateMetricsReceiver() error = %v, wantErr %v", err, tt.wantErr) - return - } - if tc != nil { - require.NoError(t, tc.Start(context.Background(), componenttest.NewNopHost()), "Start() error = %v", err) - tc.Shutdown(context.Background()) + mr, err := factory.CreateMetricsReceiver(ctx, creationParams, tt.cfg, sink) + assert.NoError(t, err) + require.NotNil(t, mr) + if tt.wantErr { + assert.Error(t, mr.Start(context.Background(), componenttest.NewNopHost())) + } else { + require.NoError(t, mr.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, mr.Shutdown(context.Background())) } }) } diff --git a/receiver/otlpreceiver/metrics/otlp.go b/receiver/otlpreceiver/metrics/otlp.go index 375dd97a6f7..7023a8467bb 100644 --- a/receiver/otlpreceiver/metrics/otlp.go +++ b/receiver/otlpreceiver/metrics/otlp.go @@ -17,7 +17,7 @@ package metrics import ( "context" - "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdatautil" "go.opentelemetry.io/collector/internal/data" @@ -36,15 +36,12 @@ type Receiver struct { } // New creates a new Receiver reference. -func New(instanceName string, nextConsumer consumer.MetricsConsumer) (*Receiver, error) { - if nextConsumer == nil { - return nil, componenterror.ErrNilNextConsumer - } +func New(instanceName string, nextConsumer consumer.MetricsConsumer) *Receiver { r := &Receiver{ instanceName: instanceName, nextConsumer: nextConsumer, } - return r, nil + return r } const ( @@ -71,6 +68,10 @@ func (r *Receiver) sendToNextConsumer(ctx context.Context, md data.MetricData) e return nil } + if c, ok := client.FromGRPC(ctx); ok { + ctx = client.NewContext(ctx, c) + } + ctx = obsreport.StartMetricsReceiveOp(ctx, r.instanceName, receiverTransport) err := r.nextConsumer.ConsumeMetrics(ctx, pdatautil.MetricsFromInternalMetrics(md)) obsreport.EndMetricsReceiveOp(ctx, dataFormatProtobuf, dataPointCount, metricCount, err) diff --git a/receiver/otlpreceiver/metrics/otlp_test.go b/receiver/otlpreceiver/metrics/otlp_test.go index 1196771455b..3aa735a9e8a 100644 --- a/receiver/otlpreceiver/metrics/otlp_test.go +++ b/receiver/otlpreceiver/metrics/otlp_test.go @@ -31,7 +31,6 @@ import ( collectormetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/metrics/v1" otlpcommon "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" otlpmetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/metrics/v1" - otlpresource "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/resource/v1" "go.opentelemetry.io/collector/observability" "go.opentelemetry.io/collector/testutil" ) @@ -57,20 +56,8 @@ func TestExport(t *testing.T) { resourceMetrics := []*otlpmetrics.ResourceMetrics{ { - Resource: &otlpresource.Resource{ - Attributes: []*otlpcommon.KeyValue{ - { - Key: "key1", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "value1"}}, - }, - }, - }, InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{ { - InstrumentationLibrary: &otlpcommon.InstrumentationLibrary{ - Name: "name1", - Version: "version1", - }, Metrics: []*otlpmetrics.Metric{ { MetricDescriptor: &otlpmetrics.MetricDescriptor{ @@ -130,6 +117,67 @@ func TestExport(t *testing.T) { assert.EqualValues(t, metricData, pdatautil.MetricsToInternalMetrics(metricSink.AllMetrics()[0])) } +func TestExport_EmptyRequest(t *testing.T) { + // given + + metricSink := new(exportertest.SinkMetricsExporter) + + _, port, doneFn := otlpReceiverOnGRPCServer(t, metricSink) + defer doneFn() + + metricsClient, metricsClientDoneFn, err := makeMetricsServiceClient(port) + require.NoError(t, err, "Failed to create the MetricsServiceClient: %v", err) + defer metricsClientDoneFn() + + resp, err := metricsClient.Export(context.Background(), &collectormetrics.ExportMetricsServiceRequest{}) + require.NoError(t, err) + require.NotNil(t, resp) +} + +func TestExport_ErrorConsumer(t *testing.T) { + // given + + metricSink := new(exportertest.SinkMetricsExporter) + metricSink.SetConsumeMetricsError(fmt.Errorf("error")) + + _, port, doneFn := otlpReceiverOnGRPCServer(t, metricSink) + defer doneFn() + + metricsClient, metricsClientDoneFn, err := makeMetricsServiceClient(port) + require.NoError(t, err, "Failed to create the MetricsServiceClient: %v", err) + defer metricsClientDoneFn() + + req := &collectormetrics.ExportMetricsServiceRequest{ResourceMetrics: []*otlpmetrics.ResourceMetrics{ + { + InstrumentationLibraryMetrics: []*otlpmetrics.InstrumentationLibraryMetrics{ + { + Metrics: []*otlpmetrics.Metric{ + { + MetricDescriptor: &otlpmetrics.MetricDescriptor{ + Name: "mymetric", + Description: "My metric", + Unit: "ms", + Type: otlpmetrics.MetricDescriptor_MONOTONIC_INT64, + }, + Int64DataPoints: []*otlpmetrics.Int64DataPoint{ + { + Value: 123, + }, + { + Value: 456, + }, + }, + }, + }, + }, + }, + }, + }} + resp, err := metricsClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = Unknown desc = error") + assert.Nil(t, resp) +} + func makeMetricsServiceClient(port int) (collectormetrics.MetricsServiceClient, func(), error) { addr := fmt.Sprintf(":%d", port) cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock()) @@ -155,13 +203,9 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.MetricsConsumer) (r *Rec } _, port, err = testutil.HostPortFromAddr(ln.Addr()) - if err != nil { - done() - t.Fatalf("Failed to parse host:port from listener address: %s error: %v", ln.Addr(), err) - } + require.NoError(t, err) - r, err = New(receiverTagValue, mc) - require.NoError(t, err, "Failed to create the Receiver: %v", err) + r = New(receiverTagValue, mc) // Now run it as a gRPC server srv := observability.GRPCServerWithObservabilityEnabled() diff --git a/receiver/otlpreceiver/options.go b/receiver/otlpreceiver/options.go deleted file mode 100644 index 66e608e89c9..00000000000 --- a/receiver/otlpreceiver/options.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package otlpreceiver - -import ( - "google.golang.org/grpc" -) - -// Option interface defines for configuration settings to be applied to receivers. -// -// withReceiver applies the configuration to the given receiver. -type Option interface { - withReceiver(*Receiver) -} - -type corsOrigins struct { - origins []string -} - -var _ Option = (*corsOrigins)(nil) - -func (co *corsOrigins) withReceiver(r *Receiver) { - r.corsOrigins = co.origins -} - -// WithCorsOrigins is an option to specify the allowed origins to enable writing -// HTTP/JSON requests to the grpc-gateway adapter using CORS. -func WithCorsOrigins(origins []string) Option { - return &corsOrigins{origins: origins} -} - -var _ Option = (grpcServerOptions)(nil) - -type grpcServerOptions []grpc.ServerOption - -func (gsvo grpcServerOptions) withReceiver(r *Receiver) { - r.grpcServerOptions = gsvo -} - -// WithGRPCServerOptions allows one to specify the options for starting a gRPC server. -func WithGRPCServerOptions(gsOpts ...grpc.ServerOption) Option { - gsvOpts := grpcServerOptions(gsOpts) - return gsvOpts -} diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index d6174430345..a2829384572 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -17,14 +17,11 @@ package otlpreceiver import ( "context" "errors" - "fmt" "net" "net/http" "sync" gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" - "github.com/rs/cors" - "github.com/soheilhy/cmux" "google.golang.org/grpc" "go.opentelemetry.io/collector/component" @@ -32,66 +29,43 @@ import ( "go.opentelemetry.io/collector/consumer" collectormetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/metrics/v1" collectortrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" - "go.opentelemetry.io/collector/observability" "go.opentelemetry.io/collector/receiver/otlpreceiver/metrics" "go.opentelemetry.io/collector/receiver/otlpreceiver/trace" ) // Receiver is the type that exposes Trace and Metrics reception. type Receiver struct { - mu sync.Mutex - ln net.Listener - serverGRPC *grpc.Server - serverHTTP *http.Server - gatewayMux *gatewayruntime.ServeMux - corsOrigins []string - grpcServerOptions []grpc.ServerOption + cfg *Config + serverGRPC *grpc.Server + gatewayMux *gatewayruntime.ServeMux + serverHTTP *http.Server traceReceiver *trace.Receiver metricsReceiver *metrics.Receiver - traceConsumer consumer.TraceConsumer - metricsConsumer consumer.MetricsConsumer - - stopOnce sync.Once - startServerOnce sync.Once - startTraceReceiverOnce sync.Once - startMetricsReceiverOnce sync.Once - - instanceName string + stopOnce sync.Once + startServerOnce sync.Once } // New just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func New( - instanceName string, - transport string, - addr string, - tc consumer.TraceConsumer, - mc consumer.MetricsConsumer, - opts ...Option, -) (*Receiver, error) { - ln, err := net.Listen(transport, addr) - if err != nil { - return nil, fmt.Errorf("failed to bind to address %q: %v", addr, err) - } - +func New(cfg *Config) (*Receiver, error) { r := &Receiver{ - ln: ln, - corsOrigins: []string{}, // Disable CORS by default. - gatewayMux: gatewayruntime.NewServeMux( - gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}), - ), + cfg: cfg, } - - for _, opt := range opts { - opt.withReceiver(r) + if cfg.GRPC != nil { + opts, err := cfg.GRPC.ToServerOption() + if err != nil { + return nil, err + } + r.serverGRPC = grpc.NewServer(opts...) + } + if cfg.HTTP != nil { + r.gatewayMux = gatewayruntime.NewServeMux( + gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}), + ) } - - r.instanceName = instanceName - r.traceConsumer = tc - r.metricsConsumer = mc return r, nil } @@ -99,173 +73,82 @@ func New( // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. func (r *Receiver) Start(ctx context.Context, host component.Host) error { - return r.start(host) -} - -func (r *Receiver) registerTraceConsumer() error { - var err = componenterror.ErrAlreadyStarted + if r.traceReceiver == nil && r.metricsReceiver == nil { + return errors.New("cannot start receiver: no consumers were specified") + } - r.startTraceReceiverOnce.Do(func() { - r.traceReceiver, err = trace.New(r.instanceName, r.traceConsumer) - if err != nil { - return + var err error + r.startServerOnce.Do(func() { + if r.cfg.GRPC != nil { + var gln net.Listener + gln, err = net.Listen("tcp", r.cfg.GRPC.Endpoint) + if err != nil { + return + } + go func() { + if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { + host.ReportFatalError(errGrpc) + } + }() } - srv := r.grpcServer() - collectortrace.RegisterTraceServiceServer(srv, r.traceReceiver) - }) - - return err -} - -func (r *Receiver) registerMetricsConsumer() error { - var err = componenterror.ErrAlreadyStarted - - r.startMetricsReceiverOnce.Do(func() { - r.metricsReceiver, err = metrics.New(r.instanceName, r.metricsConsumer) - if err != nil { - return + if r.cfg.HTTP != nil { + r.serverHTTP = r.cfg.HTTP.ToServer(r.gatewayMux) + var hln net.Listener + hln, err = r.cfg.HTTP.ToListener() + if err != nil { + return + } + go func() { + if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { + host.ReportFatalError(errHTTP) + } + }() } - srv := r.grpcServer() - collectormetrics.RegisterMetricsServiceServer(srv, r.metricsReceiver) }) - return err } -func (r *Receiver) grpcServer() *grpc.Server { - r.mu.Lock() - defer r.mu.Unlock() - - if r.serverGRPC == nil { - r.serverGRPC = observability.GRPCServerWithObservabilityEnabled(r.grpcServerOptions...) - } - - return r.serverGRPC -} - // Shutdown is a method to turn off receiving. func (r *Receiver) Shutdown(context.Context) error { - if err := r.stop(); err != componenterror.ErrAlreadyStopped { - return err - } - return nil -} - -// start runs all the receivers/services namely, Trace and Metrics services. -func (r *Receiver) start(host component.Host) error { - hasConsumer := false - if r.traceConsumer != nil { - hasConsumer = true - if err := r.registerTraceConsumer(); err != nil && err != componenterror.ErrAlreadyStarted { - return err - } - } - - if r.metricsConsumer != nil { - hasConsumer = true - if err := r.registerMetricsConsumer(); err != nil && err != componenterror.ErrAlreadyStarted { - return err - } - } - - if !hasConsumer { - return errors.New("cannot start receiver: no consumers were specified") - } - - if err := r.startServer(host); err != nil && err != componenterror.ErrAlreadyStarted { - return err - } - - // At this point we've successfully started all the services/receivers. - // Add other start routines here. - return nil -} - -// stop stops the underlying gRPC server and all the services running on it. -func (r *Receiver) stop() error { - r.mu.Lock() - defer r.mu.Unlock() - - var err = componenterror.ErrAlreadyStopped + var err error r.stopOnce.Do(func() { err = nil if r.serverHTTP != nil { - _ = r.serverHTTP.Close() + err = r.serverHTTP.Close() } - if r.ln != nil { - _ = r.ln.Close() + if r.serverGRPC != nil { + r.serverGRPC.Stop() } - - // TODO(nilebox): investigate, takes too long - // r.serverGRPC.Stop() }) return err } -func (r *Receiver) httpServer() *http.Server { - r.mu.Lock() - defer r.mu.Unlock() - - if r.serverHTTP == nil { - var mux http.Handler = r.gatewayMux - if len(r.corsOrigins) > 0 { - co := cors.Options{AllowedOrigins: r.corsOrigins} - mux = cors.New(co).Handler(mux) - } - r.serverHTTP = &http.Server{Handler: mux} +func (r *Receiver) registerTraceConsumer(ctx context.Context, tc consumer.TraceConsumer) error { + if tc == nil { + return componenterror.ErrNilNextConsumer } - - return r.serverHTTP + r.traceReceiver = trace.New(r.cfg.Name(), tc) + if r.serverGRPC != nil { + collectortrace.RegisterTraceServiceServer(r.serverGRPC, r.traceReceiver) + } + if r.gatewayMux != nil { + return collectortrace.RegisterTraceServiceHandlerServer(ctx, r.gatewayMux, r.traceReceiver) + } + return nil } -func (r *Receiver) startServer(host component.Host) error { - err := componenterror.ErrAlreadyStarted - r.startServerOnce.Do(func() { - err = nil - // Register the grpc-gateway on the HTTP server mux - c := context.Background() - opts := []grpc.DialOption{grpc.WithInsecure()} - endpoint := r.ln.Addr().String() - - _, ok := r.ln.(*net.UnixListener) - if ok { - endpoint = "unix:" + endpoint - } - - err = collectortrace.RegisterTraceServiceHandlerFromEndpoint(c, r.gatewayMux, endpoint, opts) - if err != nil { - return - } - - err = collectormetrics.RegisterMetricsServiceHandlerFromEndpoint(c, r.gatewayMux, endpoint, opts) - if err != nil { - return - } - - // Start the gRPC and HTTP/JSON (grpc-gateway) servers on the same port. - m := cmux.New(r.ln) - grpcL := m.MatchWithWriters( - cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), - cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto")) - - httpL := m.Match(cmux.Any()) - go func() { - if errGrpc := r.serverGRPC.Serve(grpcL); errGrpc != nil { - host.ReportFatalError(errGrpc) - } - }() - go func() { - if errHTTP := r.httpServer().Serve(httpL); errHTTP != nil { - host.ReportFatalError(errHTTP) - } - }() - go func() { - if errServe := m.Serve(); errServe != nil { - host.ReportFatalError(errServe) - } - }() - }) - return err +func (r *Receiver) registerMetricsConsumer(ctx context.Context, mc consumer.MetricsConsumer) error { + if mc == nil { + return componenterror.ErrNilNextConsumer + } + r.metricsReceiver = metrics.New(r.cfg.Name(), mc) + if r.serverGRPC != nil { + collectormetrics.RegisterMetricsServiceServer(r.serverGRPC, r.metricsReceiver) + } + if r.gatewayMux != nil { + return collectormetrics.RegisterMetricsServiceHandlerServer(ctx, r.gatewayMux, r.metricsReceiver) + } + return nil } diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index ea1d39915bb..a48fb007521 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -23,8 +23,6 @@ import ( "io/ioutil" "net" "net/http" - "os" - "strings" "testing" "time" @@ -33,10 +31,15 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exportertest" collectortrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" @@ -56,10 +59,9 @@ func TestGrpcGateway_endToEnd(t *testing.T) { // Set the buffer count to 1 to make it flush the test span immediately. sink := new(exportertest.SinkTraceExporter) - ocr, err := New(otlpReceiver, "tcp", addr, sink, nil) - require.NoError(t, err, "Failed to create trace receiver: %v", err) + ocr := newHTTPReceiver(t, otlpReceiver, addr, sink, nil) - require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") defer ocr.Shutdown(context.Background()) // TODO(nilebox): make starting server deterministic @@ -68,10 +70,6 @@ func TestGrpcGateway_endToEnd(t *testing.T) { url := fmt.Sprintf("http://%s/v1/trace", addr) - // Verify that CORS is not enabled by default, but that it gives an 405 - // method not allowed error. - verifyCorsResp(t, url, "origin.com", 405, false) - traceJSON := []byte(` { "resource_spans": [ @@ -174,11 +172,11 @@ func TestProtoHttp(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // Set the buffer count to 1 to make it flush the test span immediately. - sink := new(exportertest.SinkTraceExporter) - ocr, err := New(otlpReceiver, "tcp", addr, sink, nil) - require.NoError(t, err, "Failed to create trace receiver: %v", err) + tSink := new(exportertest.SinkTraceExporter) + mSink := new(exportertest.SinkMetricsExporter) + ocr := newHTTPReceiver(t, otlpReceiver, addr, tSink, mSink) - require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") defer ocr.Shutdown(context.Background()) // TODO(nilebox): make starting server deterministic @@ -188,7 +186,6 @@ func TestProtoHttp(t *testing.T) { url := fmt.Sprintf("http://%s/v1/trace", addr) wantOtlp := pdata.TracesToOtlp(testdata.GenerateTraceDataOneSpan()) - traceProto := collectortrace.ExportTraceServiceRequest{ ResourceSpans: wantOtlp, } @@ -198,34 +195,21 @@ func TestProtoHttp(t *testing.T) { } buf := bytes.NewBuffer(traceBytes) - - req, err := http.NewRequest("POST", url, buf) - require.NoError(t, err, "Error creating trace POST request: %v", err) - req.Header.Set("Content-Type", "application/x-protobuf") - - client := &http.Client{} - resp, err := client.Do(req) + resp, err := http.Post(url, "application/x-protobuf", buf) require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err) respBytes, err := ioutil.ReadAll(resp.Body) require.NoError(t, err, "Error reading response from trace grpc-gateway") + require.NoError(t, resp.Body.Close(), "Error closing response body") - err = resp.Body.Close() - require.NoError(t, err, "Error closing response body") - - if resp.StatusCode != 200 { - t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode) - } - - if resType := resp.Header.Get("Content-Type"); resType != "application/x-protobuf" { - t.Errorf("response Content-Type got: %s, want: %s", resType, "application/x-protobuf") - } + require.Equal(t, 200, resp.StatusCode, "Unexpected return status") + require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type") tmp := collectortrace.ExportTraceServiceResponse{} err = proto.Unmarshal(respBytes, &tmp) require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto") - gotOtlp := pdata.TracesToOtlp(sink.AllTraces()[0]) + gotOtlp := pdata.TracesToOtlp(tSink.AllTraces()[0]) if len(gotOtlp) != len(wantOtlp) { t.Fatalf("len(traces):\nGot: %d\nWant: %d\n", len(gotOtlp), len(wantOtlp)) @@ -244,279 +228,42 @@ func TestProtoHttp(t *testing.T) { } -func TestTraceGrpcGatewayCors_endToEnd(t *testing.T) { +func TestGRPCNewPortAlreadyUsed(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - corsOrigins := []string{"allowed-*.com"} - - sink := new(exportertest.SinkTraceExporter) - ocr, err := New(otlpReceiver, "tcp", addr, sink, nil, WithCorsOrigins(corsOrigins)) - require.NoError(t, err, "Failed to create trace receiver: %v", err) - defer ocr.Shutdown(context.Background()) - - require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err) - - // TODO(nilebox): make starting server deterministic - // Wait for the servers to start - <-time.After(10 * time.Millisecond) - - url := fmt.Sprintf("http://%s/v1/trace", addr) - - // Verify allowed domain gets responses that allow CORS. - verifyCorsResp(t, url, "allowed-origin.com", 200, true) - - // Verify disallowed domain gets responses that disallow CORS. - verifyCorsResp(t, url, "disallowed-origin.com", 200, false) -} - -func TestMetricsGrpcGatewayCors_endToEnd(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - corsOrigins := []string{"allowed-*.com"} - - sink := new(exportertest.SinkMetricsExporter) - ocr, err := New(otlpReceiver, "tcp", addr, nil, sink, WithCorsOrigins(corsOrigins)) - require.NoError(t, err, "Failed to create metrics receiver: %v", err) - defer ocr.Shutdown(context.Background()) - - require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start metrics receiver: %v", err) - - // TODO(nilebox): make starting server deterministic - // Wait for the servers to start - <-time.After(10 * time.Millisecond) - - url := fmt.Sprintf("http://%s/v1/metrics", addr) - - // Verify allowed domain gets responses that allow CORS. - verifyCorsResp(t, url, "allowed-origin.com", 200, true) - - // Verify disallowed domain gets responses that disallow CORS. - verifyCorsResp(t, url, "disallowed-origin.com", 200, false) -} - -// As per Issue https://github.com/census-instrumentation/opencensus-service/issues/366 -// the agent's mux should be able to accept all Proto affiliated content-types and not -// redirect them to the web-grpc-gateway endpoint. -func TestAcceptAllGRPCProtoAffiliatedContentTypes(t *testing.T) { - t.Skip("Currently a flaky test as we need a way to flush all written traces") - - addr := testutil.GetAvailableLocalAddress(t) - cbts := new(exportertest.SinkTraceExporter) - ocr, err := New(otlpReceiver, "tcp", addr, cbts, nil) - require.NoError(t, err, "Failed to create trace receiver: %v", err) - - require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start the trace receiver: %v", err) - defer ocr.Shutdown(context.Background()) - - // Now start the client with the various Proto affiliated gRPC Content-SubTypes as per: - // https://godoc.org/google.golang.org/grpc#CallContentSubtype - protoAffiliatedContentSubTypes := []string{"", "proto"} - for _, subContentType := range protoAffiliatedContentSubTypes { - if err := runContentTypeTests(addr, asSubContentType, subContentType); err != nil { - t.Errorf("%q subContentType failed to send proto: %v", subContentType, err) - } - } - - // Now start the client with the various Proto affiliated gRPC Content-Types, - // as we encountered in https://github.com/census-instrumentation/opencensus-service/issues/366 - protoAffiliatedContentTypes := []string{"application/grpc", "application/grpc+proto"} - for _, contentType := range protoAffiliatedContentTypes { - if err := runContentTypeTests(addr, asContentType, contentType); err != nil { - t.Errorf("%q Content-type failed to send proto: %v", contentType, err) - } - } - - // Before we exit we have to verify that we got exactly 4 TraceService requests. - wantLen := len(protoAffiliatedContentSubTypes) + len(protoAffiliatedContentTypes) - gotReqs := cbts.AllTraces() - if len(gotReqs) != wantLen { - t.Errorf("Receiver ExportTraceServiceRequest length mismatch:: Got %d Want %d", len(gotReqs), wantLen) - } -} - -const ( - asSubContentType = true - asContentType = false -) - -func runContentTypeTests(addr string, contentTypeDesignation bool, contentType string) error { - opts := []grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithBlock(), - grpc.WithDisableRetry(), - } - - if contentTypeDesignation == asContentType { - opts = append(opts, grpc.WithDefaultCallOptions( - grpc.Header(&metadata.MD{"Content-Type": []string{contentType}}))) - } else { - opts = append(opts, grpc.WithDefaultCallOptions(grpc.CallContentSubtype(contentType))) - } - - cc, err := grpc.Dial(addr, opts...) - if err != nil { - return fmt.Errorf("Creating grpc.ClientConn: %v", err) - } - defer cc.Close() - - acc := collectortrace.NewTraceServiceClient(cc) - - req := &collectortrace.ExportTraceServiceRequest{ - ResourceSpans: []*otlptrace.ResourceSpans{ - { - Resource: &otlpresource.Resource{ - Attributes: []*otlpcommon.KeyValue{ - { - Key: "sub-type", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: contentType}}, - }, - }, - }, - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{ - { - TraceId: []byte{ - 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, - 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, - }, - }, - }, - }, - }, - }, - }, - } - - _, err = acc.Export(context.Background(), req) - return err -} - -func verifyCorsResp(t *testing.T, url string, origin string, wantStatus int, wantAllowed bool) { - req, err := http.NewRequest("OPTIONS", url, nil) - require.NoError(t, err, "Error creating trace OPTIONS request: %v", err) - req.Header.Set("Origin", origin) - req.Header.Set("Access-Control-Request-Method", "POST") - - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err, "Error sending OPTIONS to http server: %v", err) - - err = resp.Body.Close() - if err != nil { - t.Errorf("Error closing OPTIONS response body, %v", err) - } - - assert.Equal(t, wantStatus, resp.StatusCode) - - gotAllowOrigin := resp.Header.Get("Access-Control-Allow-Origin") - gotAllowMethods := resp.Header.Get("Access-Control-Allow-Methods") + ln, err := net.Listen("tcp", addr) + require.NoError(t, err, "failed to listen on %q: %v", addr, err) + defer ln.Close() - wantAllowOrigin := "" - wantAllowMethods := "" - if wantAllowed { - wantAllowOrigin = origin - wantAllowMethods = "POST" - } - assert.Equal(t, wantAllowOrigin, gotAllowOrigin) - assert.Equal(t, wantAllowMethods, gotAllowMethods) -} + r := newGRPCReceiver(t, otlpReceiver, addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) + require.NotNil(t, r) -func TestStopWithoutStartNeverCrashes(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - ocr, err := New(otlpReceiver, "tcp", addr, nil, nil) - require.NoError(t, err, "Failed to create an OpenCensus receiver: %v", err) - // Stop it before ever invoking Start*. - ocr.stop() + require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } -func TestNewPortAlreadyUsed(t *testing.T) { +func TestHTTPNewPortAlreadyUsed(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) ln, err := net.Listen("tcp", addr) require.NoError(t, err, "failed to listen on %q: %v", addr, err) defer ln.Close() - r, err := New(otlpReceiver, "tcp", addr, nil, nil) - require.Error(t, err) - require.Nil(t, r) -} - -func TestMultipleStopReceptionShouldNotError(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - r, err := New(otlpReceiver, "tcp", addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) - require.NoError(t, err) + r := newHTTPReceiver(t, otlpReceiver, addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) require.NotNil(t, r) - require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, r.Shutdown(context.Background())) + require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } -func TestStartWithoutConsumersShouldFail(t *testing.T) { +func TestGRPCStartWithoutConsumers(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - r, err := New(otlpReceiver, "tcp", addr, nil, nil) - require.NoError(t, err) + r := newGRPCReceiver(t, otlpReceiver, addr, nil, nil) require.NotNil(t, r) - require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } -func tempSocketName(t *testing.T) string { - tmpfile, err := ioutil.TempFile("", "sock") - require.NoError(t, err) - require.NoError(t, tmpfile.Close()) - socket := tmpfile.Name() - require.NoError(t, os.Remove(socket)) - return socket -} - -func TestReceiveOnUnixDomainSocket_endToEnd(t *testing.T) { - socketName := tempSocketName(t) - cbts := new(exportertest.SinkTraceExporter) - r, err := New(otlpReceiver, "unix", socketName, cbts, nil) - require.NoError(t, err) +func TestHTTPStartWithoutConsumers(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + r := newHTTPReceiver(t, otlpReceiver, addr, nil, nil) require.NotNil(t, r) - require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) - defer r.Shutdown(context.Background()) - - // Wait for the servers to start - <-time.After(10 * time.Millisecond) - - span := ` - { - "resource_spans": [ - { - "instrumentation_library_spans": [ - { - "spans": [ - { - "trace_id": "YpsR8/le4OgjwSSxhjlrEg==", - "span_id": "2CogcbJh7Ko=", - "name": "testSpan", - "start_time_unix_nano": 1544712660000000000, - "end_time_unix_nano": 1544712661000000000 - } - ] - } - ] - } - ] - }` - - c := http.Client{ - Transport: &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (conn net.Conn, err error) { - return net.Dial("unix", socketName) - }, - }, - } - - response, err := c.Post("http://unix/v1/trace", "application/json", strings.NewReader(span)) - require.NoError(t, err) - defer response.Body.Close() - - bodyBytes, err := ioutil.ReadAll(response.Body) - require.NoError(t, err) - bodyString := string(bodyBytes) - fmt.Println(bodyString) - - require.Equal(t, 200, response.StatusCode) + require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } // TestOTLPReceiverTrace_HandleNextConsumerResponse checks if the trace receiver @@ -561,14 +308,6 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { req := &collectortrace.ExportTraceServiceRequest{ ResourceSpans: []*otlptrace.ResourceSpans{ { - Resource: &otlpresource.Resource{ - Attributes: []*otlpcommon.KeyValue{ - { - Key: conventions.AttributeServiceName, - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "test-svc"}}, - }, - }, - }, InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ { Spans: []*otlptrace.Span{ @@ -616,19 +355,13 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { sink := new(exportertest.SinkTraceExporter) - var opts []Option - ocr, err := New(otlpReceiver, "tcp", addr, nil, nil, opts...) - require.Nil(t, err) + ocr := newGRPCReceiver(t, otlpReceiver, addr, sink, nil) require.NotNil(t, ocr) - - ocr.traceConsumer = sink - require.Nil(t, ocr.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost())) defer ocr.Shutdown(context.Background()) cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock()) - if err != nil { - t.Errorf("grpc.Dial: %v", err) - } + require.NoError(t, err) defer cc.Close() for _, ingestionState := range tt.ingestionStates { @@ -656,3 +389,84 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { } } } + +func TestGRPCInvalidTLSCredentials(t *testing.T) { + cfg := &Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + NameVal: "IncorrectTLS", + }, + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "localhost:50000", + TLSCredentials: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "willfail", + }, + }, + }, + }, + } + + // TLS is resolved during Creation of the receiver for GRPC. + factory := &Factory{} + _, err := factory.createReceiver(cfg) + assert.EqualError(t, err, + `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) +} + +func TestHTTPInvalidTLSCredentials(t *testing.T) { + cfg := &Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + NameVal: "IncorrectTLS", + }, + Protocols: Protocols{ + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "localhost:50000", + TLSSetting: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "willfail", + }, + }, + }, + }, + } + + // TLS is resolved during Start for HTTP. + r := newReceiver(t, &Factory{}, cfg, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) + assert.EqualError(t, r.Start(context.Background(), componenttest.NewNopHost()), + `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) +} + +func newGRPCReceiver(t *testing.T, name string, endpoint string, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *Receiver { + factory := &Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.SetName(name) + cfg.GRPC.Endpoint = endpoint + cfg.HTTP = nil + return newReceiver(t, factory, cfg, tc, mc) +} + +func newHTTPReceiver(t *testing.T, name string, endpoint string, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *Receiver { + factory := &Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.SetName(name) + cfg.HTTP.Endpoint = endpoint + cfg.GRPC = nil + return newReceiver(t, factory, cfg, tc, mc) +} + +func newReceiver(t *testing.T, factory *Factory, cfg *Config, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *Receiver { + r, err := factory.createReceiver(cfg) + require.NoError(t, err) + if tc != nil { + params := component.ReceiverCreateParams{} + _, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc) + require.NoError(t, err) + } + if mc != nil { + params := component.ReceiverCreateParams{} + _, err = factory.CreateMetricsReceiver(context.Background(), params, cfg, mc) + require.NoError(t, err) + } + return r +} diff --git a/receiver/otlpreceiver/testdata/bad_empty_config.yaml b/receiver/otlpreceiver/testdata/bad_empty_config.yaml new file mode 100644 index 00000000000..db0b165a615 --- /dev/null +++ b/receiver/otlpreceiver/testdata/bad_empty_config.yaml @@ -0,0 +1,15 @@ +receivers: + otlp: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [otlp] + processors: [exampleprocessor] + exporters: [exampleexporter] diff --git a/receiver/otlpreceiver/testdata/bad_no_proto_config.yaml b/receiver/otlpreceiver/testdata/bad_no_proto_config.yaml new file mode 100644 index 00000000000..09731f05939 --- /dev/null +++ b/receiver/otlpreceiver/testdata/bad_no_proto_config.yaml @@ -0,0 +1,16 @@ +receivers: + otlp: + protocols: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [otlp] + processors: [exampleprocessor] + exporters: [exampleexporter] diff --git a/receiver/otlpreceiver/testdata/bad_proto_config.yaml b/receiver/otlpreceiver/testdata/bad_proto_config.yaml new file mode 100644 index 00000000000..3d79ae12a8c --- /dev/null +++ b/receiver/otlpreceiver/testdata/bad_proto_config.yaml @@ -0,0 +1,18 @@ +receivers: + otlp: + protocols: + thrift: + endpoint: "127.0.0.1:1234" + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [otlp] + processors: [exampleprocessor] + exporters: [exampleexporter] diff --git a/receiver/otlpreceiver/testdata/config.yaml b/receiver/otlpreceiver/testdata/config.yaml index 5bc45237b1b..30aeded350e 100644 --- a/receiver/otlpreceiver/testdata/config.yaml +++ b/receiver/otlpreceiver/testdata/config.yaml @@ -2,53 +2,82 @@ receivers: # The following entry initializes the default OTLP receiver. # The full name of this receiver is `otlp` and can be referenced in pipelines by 'otlp'. otlp: + protocols: + grpc: + http: + # The following entry initializes the default OTLP receiver with only gRPC support. + otlp/only_grpc: + protocols: + grpc: + # The following entry initializes the default OTLP receiver with only http support. + otlp/only_http: + protocols: + http: # The following entry demonstrates configuring the common receiver settings: # - endpoint # This configuration is of type 'otlp' and has the name 'customname' with a full name of 'otlp/customname' # ('/'. To reference this configuration in a pipeline, use the full name `otlp/customname`. otlp/customname: - # The receiver will listen on endpoint: "localhost:9090". - endpoint: localhost:9090 + protocols: + grpc: + # The receiver will listen on endpoint: "localhost:9090". + endpoint: localhost:9090 # The following entry configures all of the keep alive settings. These settings are used to configure the receiver. otlp/keepalive: - keepalive: - server_parameters: - max_connection_idle: 11s - max_connection_age: 12s - max_connection_age_grace: 13s - time: 30s - timeout: 5s - enforcement_policy: - min_time: 10s - permit_without_stream: true + protocols: + grpc: + keepalive: + server_parameters: + max_connection_idle: 11s + max_connection_age: 12s + max_connection_age_grace: 13s + time: 30s + timeout: 5s + enforcement_policy: + min_time: 10s + permit_without_stream: true # The following demonstrates how to set maximum limits on stream, message size and connection idle time. # Note: The test yaml has demonstrated configuration on a grouped by their structure; however, all of the settings can # be mix and matched like adding the maximum connection idle setting in this example. otlp/msg-size-conc-connect-max-idle: - max_recv_msg_size_mib: 32 - max_concurrent_streams: 16 - read_buffer_size: 1024 - write_buffer_size: 1024 - keepalive: - server_parameters: - max_connection_idle: 10s + protocols: + grpc: + max_recv_msg_size_mib: 32 + max_concurrent_streams: 16 + read_buffer_size: 1024 + write_buffer_size: 1024 + keepalive: + server_parameters: + max_connection_idle: 10s # The following entry demonstrates how to specify TLS credentials for the server. # Note: These files do not exist. If the receiver is started with this configuration, it will fail. otlp/tlscredentials: - tls_credentials: - cert_file: test.crt - key_file: test.key + protocols: + grpc: + tls_credentials: + cert_file: test.crt + key_file: test.key + http: + tls_settings: + cert_file: test.crt + key_file: test.key # The following entry demonstrates how to specify a Unix Domain Socket for the server. otlp/uds: - transport: unix - endpoint: /tmp/otlp.sock - + protocols: + grpc: + # transport: unix + endpoint: /tmp/grpc_otlp.sock + http: + # transport: unix + endpoint: /tmp/http_otlp.sock # The following entry demonstrates how to configure the OTLP receiver to allow Cross-Origin Resource Sharing (CORS). # Both fully qualified domain names and the use of wildcards are supported. otlp/cors: - cors_allowed_origins: - - https://*.test.com # Wildcard subdomain. Allows domains like https://www.test.com and https://foo.test.com but not https://wwwtest.com. - - https://test.com # Fully qualified domain name. Allows https://test.com only. + protocols: + http: + cors_allowed_origins: + - https://*.test.com # Wildcard subdomain. Allows domains like https://www.test.com and https://foo.test.com but not https://wwwtest.com. + - https://test.com # Fully qualified domain name. Allows https://test.com only. processors: exampleprocessor: diff --git a/receiver/otlpreceiver/testdata/typo_default_proto_config.yaml b/receiver/otlpreceiver/testdata/typo_default_proto_config.yaml new file mode 100644 index 00000000000..15cf1f28599 --- /dev/null +++ b/receiver/otlpreceiver/testdata/typo_default_proto_config.yaml @@ -0,0 +1,18 @@ +receivers: + otlp: + protocols: + grpc: + htttp: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [otlp] + processors: [exampleprocessor] + exporters: [exampleexporter] diff --git a/receiver/otlpreceiver/trace/otlp.go b/receiver/otlpreceiver/trace/otlp.go index d6b2050f8b4..d8a0a7997cd 100644 --- a/receiver/otlpreceiver/trace/otlp.go +++ b/receiver/otlpreceiver/trace/otlp.go @@ -18,7 +18,6 @@ import ( "context" "go.opentelemetry.io/collector/client" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" collectortrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" @@ -36,17 +35,13 @@ type Receiver struct { } // New creates a new Receiver reference. -func New(instanceName string, nextConsumer consumer.TraceConsumer) (*Receiver, error) { - if nextConsumer == nil { - return nil, componenterror.ErrNilNextConsumer - } - +func New(instanceName string, nextConsumer consumer.TraceConsumer) *Receiver { r := &Receiver{ instanceName: instanceName, nextConsumer: nextConsumer, } - return r, nil + return r } const ( @@ -68,22 +63,18 @@ func (r *Receiver) Export(ctx context.Context, req *collectortrace.ExportTraceSe } func (r *Receiver) sendToNextConsumer(ctx context.Context, td pdata.Traces) error { - if c, ok := client.FromGRPC(ctx); ok { - ctx = client.NewContext(ctx, c) + numSpans := td.SpanCount() + if numSpans == 0 { + return nil } - ctx = obsreport.StartTraceDataReceiveOp( - ctx, - r.instanceName, - receiverTransport) - - var consumerErr error - numSpans := td.SpanCount() - if numSpans != 0 { - consumerErr = r.nextConsumer.ConsumeTraces(ctx, td) + if c, ok := client.FromGRPC(ctx); ok { + ctx = client.NewContext(ctx, c) } - obsreport.EndTraceDataReceiveOp(ctx, dataFormatProtobuf, numSpans, consumerErr) + ctx = obsreport.StartTraceDataReceiveOp(ctx, r.instanceName, receiverTransport) + err := r.nextConsumer.ConsumeTraces(ctx, td) + obsreport.EndTraceDataReceiveOp(ctx, dataFormatProtobuf, numSpans, err) - return consumerErr + return err } diff --git a/receiver/otlpreceiver/trace/otlp_test.go b/receiver/otlpreceiver/trace/otlp_test.go index 15c48ddbb4a..c4a6e9cbcb6 100644 --- a/receiver/otlpreceiver/trace/otlp_test.go +++ b/receiver/otlpreceiver/trace/otlp_test.go @@ -29,8 +29,6 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exportertest" collectortrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" - otlpcommon "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/common/v1" - otlpresource "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/resource/v1" otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/trace/v1" "go.opentelemetry.io/collector/observability" "go.opentelemetry.io/collector/testutil" @@ -62,20 +60,8 @@ func TestExport(t *testing.T) { resourceSpans := []*otlptrace.ResourceSpans{ { - Resource: &otlpresource.Resource{ - Attributes: []*otlpcommon.KeyValue{ - { - Key: "key1", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "value1"}}, - }, - }, - }, InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ { - InstrumentationLibrary: &otlpcommon.InstrumentationLibrary{ - Name: "name1", - Version: "version1", - }, Spans: []*otlptrace.Span{ { TraceId: traceID, @@ -84,29 +70,8 @@ func TestExport(t *testing.T) { Kind: otlptrace.Span_SERVER, StartTimeUnixNano: unixnanos, EndTimeUnixNano: unixnanos, - Events: []*otlptrace.Span_Event{ - { - TimeUnixNano: unixnanos, - Name: "event1", - Attributes: []*otlpcommon.KeyValue{ - { - Key: "eventattr1", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "eventattrval1"}}, - }, - }, - DroppedAttributesCount: 4, - }, - }, - Links: []*otlptrace.Span_Link{ - { - TraceId: traceID, - SpanId: spanID, - }, - }, - DroppedAttributesCount: 1, - DroppedEventsCount: 2, - Status: &otlptrace.Status{Message: "status-cancelled", Code: otlptrace.Status_Cancelled}, - TraceState: "a=text,b=123", + Status: &otlptrace.Status{Message: "status-cancelled", Code: otlptrace.Status_Cancelled}, + TraceState: "a=text,b=123", }, }, }, @@ -133,6 +98,53 @@ func TestExport(t *testing.T) { assert.EqualValues(t, traceData, traceSink.AllTraces()[0]) } +func TestExport_EmptyRequest(t *testing.T) { + traceSink := new(exportertest.SinkTraceExporter) + + _, port, doneFn := otlpReceiverOnGRPCServer(t, traceSink) + defer doneFn() + + traceClient, traceClientDoneFn, err := makeTraceServiceClient(port) + require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) + defer traceClientDoneFn() + + resp, err := traceClient.Export(context.Background(), &collectortrace.ExportTraceServiceRequest{}) + assert.NoError(t, err, "Failed to export trace: %v", err) + assert.NotNil(t, resp, "The response is missing") +} + +func TestExport_ErrorConsumer(t *testing.T) { + traceSink := new(exportertest.SinkTraceExporter) + traceSink.SetConsumeTraceError(fmt.Errorf("error")) + + _, port, doneFn := otlpReceiverOnGRPCServer(t, traceSink) + defer doneFn() + + traceClient, traceClientDoneFn, err := makeTraceServiceClient(port) + require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) + defer traceClientDoneFn() + + req := &collectortrace.ExportTraceServiceRequest{ + ResourceSpans: []*otlptrace.ResourceSpans{ + { + InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ + { + Spans: []*otlptrace.Span{ + { + Name: "operationB", + }, + }, + }, + }, + }, + }, + } + + resp, err := traceClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = Unknown desc = error") + assert.Nil(t, resp) +} + func makeTraceServiceClient(port int) (collectortrace.TraceServiceClient, func(), error) { addr := fmt.Sprintf(":%d", port) cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock()) @@ -163,8 +175,8 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.TraceConsumer) (r *Recei t.Fatalf("Failed to parse host:port from listener address: %s error: %v", ln.Addr(), err) } - r, err = New(receiverTagValue, tc) - require.NoError(t, err, "Failed to create the Receiver: %v", err) + r = New(receiverTagValue, tc) + require.NoError(t, err) // Now run it as a gRPC server srv := observability.GRPCServerWithObservabilityEnabled() diff --git a/service/service_test.go b/service/service_test.go index c6a85053914..2b93fdc9083 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -520,6 +520,8 @@ func constructMimumalOpConfig(t *testing.T, factories config.Factories) *configm configStr := ` receivers: otlp: + protocols: + grpc: exporters: logging: processors: diff --git a/service/testdata/otelcol-config-minimal.yaml b/service/testdata/otelcol-config-minimal.yaml index 448973ae172..372bd08d5cb 100644 --- a/service/testdata/otelcol-config-minimal.yaml +++ b/service/testdata/otelcol-config-minimal.yaml @@ -1,5 +1,7 @@ receivers: otlp: + protocols: + grpc: exporters: otlp: diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index 94704d6c152..08be5fda0fb 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -198,7 +198,8 @@ func (or *OTLPDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) factory := otlpreceiver.Factory{} cfg := factory.CreateDefaultConfig().(*otlpreceiver.Config) cfg.SetName(or.ProtocolName()) - cfg.Endpoint = fmt.Sprintf("localhost:%d", or.Port) + cfg.GRPC.Endpoint = fmt.Sprintf("localhost:%d", or.Port) + cfg.HTTP = nil var err error params := component.ReceiverCreateParams{Logger: zap.NewNop()} if or.traceReceiver, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc); err != nil { diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index 748ac539609..1994c8cd96d 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -327,7 +327,9 @@ func (ote *OTLPTraceDataSender) GenConfigYAMLStr() string { // Note that this generates a receiver config for agent. return fmt.Sprintf(` otlp: - endpoint: "%s:%d"`, ote.Host, ote.Port) + protocols: + grpc: + endpoint: "%s:%d"`, ote.Host, ote.Port) } func (ote *OTLPTraceDataSender) ProtocolName() string { @@ -382,7 +384,9 @@ func (ome *OTLPMetricsDataSender) GenConfigYAMLStr() string { // Note that this generates a receiver config for agent. return fmt.Sprintf(` otlp: - endpoint: "%s:%d"`, ome.host, ome.port) + protocols: + grpc: + endpoint: "%s:%d"`, ome.host, ome.port) } func (ome *OTLPMetricsDataSender) GetCollectorPort() int {