diff --git a/receiver/otlpreceiver/config.go b/receiver/otlpreceiver/config.go index 878656c92af..f6dd9f86154 100644 --- a/receiver/otlpreceiver/config.go +++ b/receiver/otlpreceiver/config.go @@ -16,39 +16,19 @@ package otlpreceiver import ( "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" ) +type Protocols struct { + GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"` + HTTP *confighttp.HTTPServerSettings `mapstructure:"http"` +} + // Config defines configuration for OTLP receiver. type Config struct { configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - // Configures the receiver server protocol. - configgrpc.GRPCServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - - // Transport to use: one of tcp or unix, defaults to tcp - Transport string `mapstructure:"transport"` - - // CorsOrigins are the allowed CORS origins for HTTP/JSON requests to grpc-gateway adapter - // for the OTLP receiver. See github.com/rs/cors - // An empty list means that CORS is not enabled at all. A wildcard (*) can be - // used to match any origin or one or more characters of an origin. - CorsOrigins []string `mapstructure:"cors_allowed_origins"` -} - -func (rOpts *Config) buildOptions() ([]Option, error) { - var opts []Option - if len(rOpts.CorsOrigins) > 0 { - opts = append(opts, WithCorsOrigins(rOpts.CorsOrigins)) - } - - grpcServerOptions, err := rOpts.GRPCServerSettings.ToServerOption() - if err != nil { - return nil, err - } - if len(grpcServerOptions) > 0 { - opts = append(opts, WithGRPCServerOptions(grpcServerOptions...)) - } - - return opts, nil + // Protocols is the configuration for the supported protocols, currently gRPC and HTTP (Proto and JSON). + Protocols `mapstructure:"protocols"` } diff --git a/receiver/otlpreceiver/config_test.go b/receiver/otlpreceiver/config_test.go index 95003e27531..2eb12b12bbc 100644 --- a/receiver/otlpreceiver/config_test.go +++ b/receiver/otlpreceiver/config_test.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configtls" ) @@ -51,10 +52,11 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/customname", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "localhost:9090", + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "localhost:9090", + }, }, - Transport: "tcp", }) r2 := cfg.Receivers["otlp/keepalive"].(*Config) @@ -64,23 +66,24 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/keepalive", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", - Keepalive: &configgrpc.KeepaliveServerConfig{ - ServerParameters: &configgrpc.KeepaliveServerParameters{ - MaxConnectionIdle: 11 * time.Second, - MaxConnectionAge: 12 * time.Second, - MaxConnectionAgeGrace: 13 * time.Second, - Time: 30 * time.Second, - Timeout: 5 * time.Second, - }, - EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{ - MinTime: 10 * time.Second, - PermitWithoutStream: true, + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "0.0.0.0:55680", + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionIdle: 11 * time.Second, + MaxConnectionAge: 12 * time.Second, + MaxConnectionAgeGrace: 13 * time.Second, + Time: 30 * time.Second, + Timeout: 5 * time.Second, + }, + EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{ + MinTime: 10 * time.Second, + PermitWithoutStream: true, + }, }, }, }, - Transport: "tcp", }) r3 := cfg.Receivers["otlp/msg-size-conc-connect-max-idle"].(*Config) @@ -90,17 +93,18 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/msg-size-conc-connect-max-idle", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", - MaxRecvMsgSizeMiB: 32, - MaxConcurrentStreams: 16, - Keepalive: &configgrpc.KeepaliveServerConfig{ - ServerParameters: &configgrpc.KeepaliveServerParameters{ - MaxConnectionIdle: 10 * time.Second, + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "0.0.0.0:55680", + MaxRecvMsgSizeMiB: 32, + MaxConcurrentStreams: 16, + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionIdle: 10 * time.Second, + }, }, }, }, - Transport: "tcp", }) // NOTE: Once the config loader checks for the files existence, this test may fail and require @@ -112,16 +116,26 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/tlscredentials", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", - TLSCredentials: &configtls.TLSServerSetting{ - TLSSetting: configtls.TLSSetting{ - CertFile: "test.crt", - KeyFile: "test.key", + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "0.0.0.0:55680", + TLSCredentials: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "test.crt", + KeyFile: "test.key", + }, + }, + }, + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "0.0.0.0:55681", + TLSSetting: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "test.crt", + KeyFile: "test.key", + }, }, }, }, - Transport: "tcp", }) r5 := cfg.Receivers["otlp/cors"].(*Config) @@ -131,12 +145,12 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/cors", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", - TLSCredentials: nil, + Protocols: Protocols{ + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "0.0.0.0:55681", + CorsOrigins: []string{"https://*.test.com", "https://test.com"}, + }, }, - Transport: "tcp", - CorsOrigins: []string{"https://*.test.com", "https://test.com"}, }) r6 := cfg.Receivers["otlp/uds"].(*Config) @@ -146,32 +160,15 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/uds", }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "/tmp/otlp.sock", - TLSCredentials: nil, - }, - Transport: "unix", - }) -} - -func TestBuildOptions_TLSCredentials(t *testing.T) { - cfg := Config{ - ReceiverSettings: configmodels.ReceiverSettings{ - NameVal: "IncorrectTLS", - }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - TLSCredentials: &configtls.TLSServerSetting{ - TLSSetting: configtls.TLSSetting{ - CertFile: "willfail", + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "/tmp/grpc_otlp.sock", + // Transport: "unix", + }, + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "/tmp/http_otlp.sock", + // Transport: "unix", }, }, - }, - } - _, err := cfg.buildOptions() - assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) - - cfg.TLSCredentials = &configtls.TLSServerSetting{} - opt, err := cfg.buildOptions() - assert.NoError(t, err) - assert.NotNil(t, opt) + }) } diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 50419e5d3cc..a86930574f4 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -16,9 +16,13 @@ package otlpreceiver import ( "context" + "fmt" + + "github.com/spf13/viper" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer" ) @@ -26,6 +30,11 @@ import ( const ( // The value of "type" key in configuration. typeStr = "otlp" + + // Protocol values. + protoGRPC = "grpc" + protoHTTP = "http" + protocolsFieldName = "protocols" ) // Factory is the Factory for receiver. @@ -37,11 +46,6 @@ func (f *Factory) Type() configmodels.Type { return typeStr } -// CustomUnmarshaler returns nil because we don't need custom unmarshaling for this config. -func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { - return nil -} - // CreateDefaultConfig creates the default configuration for receiver. func (f *Factory) CreateDefaultConfig() configmodels.Receiver { return &Config{ @@ -49,16 +53,54 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { TypeVal: typeStr, NameVal: typeStr, }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "0.0.0.0:55680", + }, + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "0.0.0.0:55681", + }, }, - Transport: "tcp", + } +} + +// CustomUnmarshaler is used to add defaults for named but empty protocols +func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler { + return func(componentViperSection *viper.Viper, intoCfg interface{}) error { + if componentViperSection == nil || len(componentViperSection.AllKeys()) == 0 { + return fmt.Errorf("empty config for Jaeger receiver") + } + // first load the config normally + err := componentViperSection.UnmarshalExact(intoCfg) + if err != nil { + return err + } + receiverCfg, ok := intoCfg.(*Config) + if !ok { + return fmt.Errorf("config type not *otlpreceiver.Config") + } + + // next manually search for protocols in viper, if a protocol is not present it means it is disable. + protocols := componentViperSection.GetStringMap(protocolsFieldName) + if _, ok := protocols[protoGRPC]; !ok { + receiverCfg.GRPC = nil + } + + if _, ok := protocols[protoHTTP]; !ok { + receiverCfg.HTTP = nil + } + + if receiverCfg.GRPC == nil && receiverCfg.HTTP == nil { + return fmt.Errorf("must specify at least one protocol when using the OTLP receiver") + } + + return nil } } // CreateTraceReceiver creates a trace receiver based on provided config. func (f *Factory) CreateTraceReceiver( - _ context.Context, + ctx context.Context, _ component.ReceiverCreateParams, cfg configmodels.Receiver, nextConsumer consumer.TraceConsumer, @@ -67,27 +109,27 @@ func (f *Factory) CreateTraceReceiver( if err != nil { return nil, err } - - r.traceConsumer = nextConsumer - + if err = r.registerTraceConsumer(ctx, nextConsumer); err != nil { + return nil, err + } return r, nil } // CreateMetricsReceiver creates a metrics receiver based on provided config. func (f *Factory) CreateMetricsReceiver( - _ context.Context, + ctx context.Context, _ component.ReceiverCreateParams, cfg configmodels.Receiver, consumer consumer.MetricsConsumer, ) (component.MetricsReceiver, error) { - r, err := f.createReceiver(cfg) if err != nil { return nil, err } - r.metricsConsumer = consumer - + if err = r.registerMetricsConsumer(ctx, consumer); err != nil { + return nil, err + } return r, nil } @@ -100,15 +142,9 @@ func (f *Factory) createReceiver(cfg configmodels.Receiver) (*Receiver, error) { // Check to see if there is already a receiver for this config. receiver, ok := receivers[rCfg] if !ok { - // Build the configuration options. - opts, err := rCfg.buildOptions() - if err != nil { - return nil, err - } - + var err error // We don't have a receiver, so create one. - receiver, err = New( - rCfg.Name(), rCfg.Transport, rCfg.Endpoint, nil, nil, opts...) + receiver, err = New(rCfg) if err != nil { return nil, err } diff --git a/receiver/otlpreceiver/factory_test.go b/receiver/otlpreceiver/factory_test.go index e13c2ced119..6713c22ce7b 100644 --- a/receiver/otlpreceiver/factory_test.go +++ b/receiver/otlpreceiver/factory_test.go @@ -17,7 +17,6 @@ package otlpreceiver import ( "context" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,6 +26,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/testutil" @@ -44,27 +44,30 @@ func TestCreateReceiver(t *testing.T) { cfg := factory.CreateDefaultConfig() config := cfg.(*Config) - config.Endpoint = testutil.GetAvailableLocalAddress(t) + config.GRPC.Endpoint = testutil.GetAvailableLocalAddress(t) + config.HTTP.Endpoint = testutil.GetAvailableLocalAddress(t) creationParams := component.ReceiverCreateParams{Logger: zap.NewNop()} - tReceiver, err := factory.CreateTraceReceiver(context.Background(), creationParams, cfg, nil) + tReceiver, err := factory.CreateTraceReceiver(context.Background(), creationParams, cfg, new(exportertest.SinkTraceExporter)) assert.NotNil(t, tReceiver) assert.NoError(t, err) - mReceiver, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, nil) + mReceiver, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, new(exportertest.SinkMetricsExporter)) assert.NotNil(t, mReceiver) assert.NoError(t, err) } func TestCreateTraceReceiver(t *testing.T) { factory := Factory{} - endpoint := testutil.GetAvailableLocalAddress(t) defaultReceiverSettings := configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: typeStr, } - defaultGRPCSettings := configgrpc.GRPCServerSettings{ - Endpoint: endpoint, + defaultGRPCSettings := &configgrpc.GRPCServerSettings{ + Endpoint: testutil.GetAvailableLocalAddress(t), + } + defaultHTTPSettings := &confighttp.HTTPServerSettings{ + Endpoint: testutil.GetAvailableLocalAddress(t), } tests := []struct { @@ -75,36 +78,44 @@ func TestCreateTraceReceiver(t *testing.T) { { name: "default", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - GRPCServerSettings: defaultGRPCSettings, - Transport: "tcp", + ReceiverSettings: defaultReceiverSettings, + Protocols: Protocols{ + GRPC: defaultGRPCSettings, + HTTP: defaultHTTPSettings, + }, }, }, { - name: "invalid_port", + name: "invalid_grpc_port", cfg: &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: typeStr, }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "localhost:112233", + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "localhost:112233", + }, + HTTP: defaultHTTPSettings, }, - Transport: "tcp", }, wantErr: true, }, { - name: "max-msg-size-and-concurrent-connections", + name: "invalid_http_port", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: endpoint, - MaxRecvMsgSizeMiB: 32, - MaxConcurrentStreams: 16, + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + Protocols: Protocols{ + GRPC: defaultGRPCSettings, + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "localhost:112233", + }, }, - Transport: "tcp", }, + wantErr: true, }, } ctx := context.Background() @@ -113,13 +124,13 @@ func TestCreateTraceReceiver(t *testing.T) { t.Run(tt.name, func(t *testing.T) { sink := new(exportertest.SinkTraceExporter) tr, err := factory.CreateTraceReceiver(ctx, creationParams, tt.cfg, sink) - if (err != nil) != tt.wantErr { - t.Errorf("factory.CreateTraceReceiver() error = %v, wantErr %v", err, tt.wantErr) - return - } - if tr != nil { - require.NoError(t, tr.Start(context.Background(), componenttest.NewNopHost()), "Start() error = %v", err) - tr.Shutdown(context.Background()) + assert.NoError(t, err) + require.NotNil(t, tr) + if tt.wantErr { + assert.Error(t, tr.Start(context.Background(), componenttest.NewNopHost())) + } else { + assert.NoError(t, tr.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, tr.Shutdown(context.Background())) } }) } @@ -127,13 +138,15 @@ func TestCreateTraceReceiver(t *testing.T) { func TestCreateMetricReceiver(t *testing.T) { factory := Factory{} - endpoint := testutil.GetAvailableLocalAddress(t) defaultReceiverSettings := configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: typeStr, } - defaultGRPCSettings := configgrpc.GRPCServerSettings{ - Endpoint: endpoint, + defaultGRPCSettings := &configgrpc.GRPCServerSettings{ + Endpoint: testutil.GetAvailableLocalAddress(t), + } + defaultHTTPSettings := &confighttp.HTTPServerSettings{ + Endpoint: testutil.GetAvailableLocalAddress(t), } tests := []struct { @@ -144,43 +157,44 @@ func TestCreateMetricReceiver(t *testing.T) { { name: "default", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - GRPCServerSettings: defaultGRPCSettings, - Transport: "tcp", + ReceiverSettings: defaultReceiverSettings, + Protocols: Protocols{ + GRPC: defaultGRPCSettings, + HTTP: defaultHTTPSettings, + }, }, }, { - name: "invalid_address", + name: "invalid_grpc_address", cfg: &Config{ ReceiverSettings: configmodels.ReceiverSettings{ TypeVal: typeStr, NameVal: typeStr, }, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "327.0.0.1:1122", + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "327.0.0.1:1122", + }, + HTTP: defaultHTTPSettings, }, - Transport: "tcp", }, wantErr: true, }, { - name: "keepalive", + name: "invalid_http_address", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: endpoint, - Keepalive: &configgrpc.KeepaliveServerConfig{ - ServerParameters: &configgrpc.KeepaliveServerParameters{ - MaxConnectionAge: 60 * time.Second, - }, - EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{ - MinTime: 30 * time.Second, - PermitWithoutStream: true, - }, + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + Protocols: Protocols{ + GRPC: defaultGRPCSettings, + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "327.0.0.1:1122", }, }, - Transport: "tcp", }, + wantErr: true, }, } ctx := context.Background() @@ -188,14 +202,14 @@ func TestCreateMetricReceiver(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := new(exportertest.SinkMetricsExporter) - tc, err := factory.CreateMetricsReceiver(ctx, creationParams, tt.cfg, sink) - if (err != nil) != tt.wantErr { - t.Errorf("factory.CreateMetricsReceiver() error = %v, wantErr %v", err, tt.wantErr) - return - } - if tc != nil { - require.NoError(t, tc.Start(context.Background(), componenttest.NewNopHost()), "Start() error = %v", err) - tc.Shutdown(context.Background()) + mr, err := factory.CreateMetricsReceiver(ctx, creationParams, tt.cfg, sink) + assert.NoError(t, err) + require.NotNil(t, mr) + if tt.wantErr { + assert.Error(t, mr.Start(context.Background(), componenttest.NewNopHost())) + } else { + require.NoError(t, mr.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, mr.Shutdown(context.Background())) } }) } diff --git a/receiver/otlpreceiver/options.go b/receiver/otlpreceiver/options.go deleted file mode 100644 index 66e608e89c9..00000000000 --- a/receiver/otlpreceiver/options.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package otlpreceiver - -import ( - "google.golang.org/grpc" -) - -// Option interface defines for configuration settings to be applied to receivers. -// -// withReceiver applies the configuration to the given receiver. -type Option interface { - withReceiver(*Receiver) -} - -type corsOrigins struct { - origins []string -} - -var _ Option = (*corsOrigins)(nil) - -func (co *corsOrigins) withReceiver(r *Receiver) { - r.corsOrigins = co.origins -} - -// WithCorsOrigins is an option to specify the allowed origins to enable writing -// HTTP/JSON requests to the grpc-gateway adapter using CORS. -func WithCorsOrigins(origins []string) Option { - return &corsOrigins{origins: origins} -} - -var _ Option = (grpcServerOptions)(nil) - -type grpcServerOptions []grpc.ServerOption - -func (gsvo grpcServerOptions) withReceiver(r *Receiver) { - r.grpcServerOptions = gsvo -} - -// WithGRPCServerOptions allows one to specify the options for starting a gRPC server. -func WithGRPCServerOptions(gsOpts ...grpc.ServerOption) Option { - gsvOpts := grpcServerOptions(gsOpts) - return gsvOpts -} diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index d6174430345..be1103022c2 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -17,81 +17,55 @@ package otlpreceiver import ( "context" "errors" - "fmt" "net" "net/http" "sync" gatewayruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" - "github.com/rs/cors" - "github.com/soheilhy/cmux" "google.golang.org/grpc" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" "go.opentelemetry.io/collector/consumer" collectormetrics "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/metrics/v1" collectortrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" - "go.opentelemetry.io/collector/observability" "go.opentelemetry.io/collector/receiver/otlpreceiver/metrics" "go.opentelemetry.io/collector/receiver/otlpreceiver/trace" ) // Receiver is the type that exposes Trace and Metrics reception. type Receiver struct { - mu sync.Mutex - ln net.Listener - serverGRPC *grpc.Server - serverHTTP *http.Server - gatewayMux *gatewayruntime.ServeMux - corsOrigins []string - grpcServerOptions []grpc.ServerOption + cfg *Config + serverGRPC *grpc.Server + gatewayMux *gatewayruntime.ServeMux + serverHTTP *http.Server traceReceiver *trace.Receiver metricsReceiver *metrics.Receiver - traceConsumer consumer.TraceConsumer - metricsConsumer consumer.MetricsConsumer - - stopOnce sync.Once - startServerOnce sync.Once - startTraceReceiverOnce sync.Once - startMetricsReceiverOnce sync.Once - - instanceName string + stopOnce sync.Once + startServerOnce sync.Once } // New just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func New( - instanceName string, - transport string, - addr string, - tc consumer.TraceConsumer, - mc consumer.MetricsConsumer, - opts ...Option, -) (*Receiver, error) { - ln, err := net.Listen(transport, addr) - if err != nil { - return nil, fmt.Errorf("failed to bind to address %q: %v", addr, err) - } - +func New(cfg *Config) (*Receiver, error) { r := &Receiver{ - ln: ln, - corsOrigins: []string{}, // Disable CORS by default. - gatewayMux: gatewayruntime.NewServeMux( - gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}), - ), + cfg: cfg, } - - for _, opt := range opts { - opt.withReceiver(r) + if cfg.GRPC != nil { + opts, err := cfg.GRPC.ToServerOption() + if err != nil { + return nil, err + } + opts = append(opts, grpc.ReadBufferSize(512*1024)) + r.serverGRPC = grpc.NewServer(opts...) + } + if cfg.HTTP != nil { + r.gatewayMux = gatewayruntime.NewServeMux( + gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}), + ) } - - r.instanceName = instanceName - r.traceConsumer = tc - r.metricsConsumer = mc return r, nil } @@ -99,173 +73,84 @@ func New( // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. func (r *Receiver) Start(ctx context.Context, host component.Host) error { - return r.start(host) -} - -func (r *Receiver) registerTraceConsumer() error { - var err = componenterror.ErrAlreadyStarted + if r.traceReceiver == nil && r.metricsReceiver == nil { + return errors.New("cannot start receiver: no consumers were specified") + } - r.startTraceReceiverOnce.Do(func() { - r.traceReceiver, err = trace.New(r.instanceName, r.traceConsumer) - if err != nil { - return + var err error + r.startServerOnce.Do(func() { + if r.cfg.GRPC != nil { + var gln net.Listener + gln, err = net.Listen("tcp", r.cfg.GRPC.Endpoint) + if err != nil { + return + } + go func() { + if errGrpc := r.serverGRPC.Serve(gln); errGrpc != nil { + host.ReportFatalError(errGrpc) + } + }() } - srv := r.grpcServer() - collectortrace.RegisterTraceServiceServer(srv, r.traceReceiver) - }) - - return err -} - -func (r *Receiver) registerMetricsConsumer() error { - var err = componenterror.ErrAlreadyStarted - - r.startMetricsReceiverOnce.Do(func() { - r.metricsReceiver, err = metrics.New(r.instanceName, r.metricsConsumer) - if err != nil { - return + if r.cfg.HTTP != nil { + r.serverHTTP = r.cfg.HTTP.ToServer(r.gatewayMux) + var hln net.Listener + hln, err = r.cfg.HTTP.ToListener() + if err != nil { + return + } + go func() { + if errHTTP := r.serverHTTP.Serve(hln); errHTTP != nil { + host.ReportFatalError(errHTTP) + } + }() } - srv := r.grpcServer() - collectormetrics.RegisterMetricsServiceServer(srv, r.metricsReceiver) }) - return err } -func (r *Receiver) grpcServer() *grpc.Server { - r.mu.Lock() - defer r.mu.Unlock() - - if r.serverGRPC == nil { - r.serverGRPC = observability.GRPCServerWithObservabilityEnabled(r.grpcServerOptions...) - } - - return r.serverGRPC -} - // Shutdown is a method to turn off receiving. func (r *Receiver) Shutdown(context.Context) error { - if err := r.stop(); err != componenterror.ErrAlreadyStopped { - return err - } - return nil -} - -// start runs all the receivers/services namely, Trace and Metrics services. -func (r *Receiver) start(host component.Host) error { - hasConsumer := false - if r.traceConsumer != nil { - hasConsumer = true - if err := r.registerTraceConsumer(); err != nil && err != componenterror.ErrAlreadyStarted { - return err - } - } - - if r.metricsConsumer != nil { - hasConsumer = true - if err := r.registerMetricsConsumer(); err != nil && err != componenterror.ErrAlreadyStarted { - return err - } - } - - if !hasConsumer { - return errors.New("cannot start receiver: no consumers were specified") - } - - if err := r.startServer(host); err != nil && err != componenterror.ErrAlreadyStarted { - return err - } - - // At this point we've successfully started all the services/receivers. - // Add other start routines here. - return nil -} - -// stop stops the underlying gRPC server and all the services running on it. -func (r *Receiver) stop() error { - r.mu.Lock() - defer r.mu.Unlock() - - var err = componenterror.ErrAlreadyStopped + var err error r.stopOnce.Do(func() { err = nil if r.serverHTTP != nil { - _ = r.serverHTTP.Close() + err = r.serverHTTP.Close() } - if r.ln != nil { - _ = r.ln.Close() + if r.serverGRPC != nil { + r.serverGRPC.Stop() } - - // TODO(nilebox): investigate, takes too long - // r.serverGRPC.Stop() }) return err } -func (r *Receiver) httpServer() *http.Server { - r.mu.Lock() - defer r.mu.Unlock() - - if r.serverHTTP == nil { - var mux http.Handler = r.gatewayMux - if len(r.corsOrigins) > 0 { - co := cors.Options{AllowedOrigins: r.corsOrigins} - mux = cors.New(co).Handler(mux) - } - r.serverHTTP = &http.Server{Handler: mux} +func (r *Receiver) registerTraceConsumer(ctx context.Context, tc consumer.TraceConsumer) error { + var err error + r.traceReceiver, err = trace.New(r.cfg.Name(), tc) + if err != nil { + return err } - - return r.serverHTTP + if r.serverGRPC != nil { + collectortrace.RegisterTraceServiceServer(r.serverGRPC, r.traceReceiver) + } + if r.gatewayMux != nil { + return collectortrace.RegisterTraceServiceHandlerServer(ctx, r.gatewayMux, r.traceReceiver) + } + return nil } -func (r *Receiver) startServer(host component.Host) error { - err := componenterror.ErrAlreadyStarted - r.startServerOnce.Do(func() { - err = nil - // Register the grpc-gateway on the HTTP server mux - c := context.Background() - opts := []grpc.DialOption{grpc.WithInsecure()} - endpoint := r.ln.Addr().String() - - _, ok := r.ln.(*net.UnixListener) - if ok { - endpoint = "unix:" + endpoint - } - - err = collectortrace.RegisterTraceServiceHandlerFromEndpoint(c, r.gatewayMux, endpoint, opts) - if err != nil { - return - } - - err = collectormetrics.RegisterMetricsServiceHandlerFromEndpoint(c, r.gatewayMux, endpoint, opts) - if err != nil { - return - } - - // Start the gRPC and HTTP/JSON (grpc-gateway) servers on the same port. - m := cmux.New(r.ln) - grpcL := m.MatchWithWriters( - cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), - cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto")) - - httpL := m.Match(cmux.Any()) - go func() { - if errGrpc := r.serverGRPC.Serve(grpcL); errGrpc != nil { - host.ReportFatalError(errGrpc) - } - }() - go func() { - if errHTTP := r.httpServer().Serve(httpL); errHTTP != nil { - host.ReportFatalError(errHTTP) - } - }() - go func() { - if errServe := m.Serve(); errServe != nil { - host.ReportFatalError(errServe) - } - }() - }) - return err +func (r *Receiver) registerMetricsConsumer(ctx context.Context, mc consumer.MetricsConsumer) error { + var err error + r.metricsReceiver, err = metrics.New(r.cfg.Name(), mc) + if err != nil { + return err + } + if r.serverGRPC != nil { + collectormetrics.RegisterMetricsServiceServer(r.serverGRPC, r.metricsReceiver) + } + if r.gatewayMux != nil { + return collectormetrics.RegisterMetricsServiceHandlerServer(ctx, r.gatewayMux, r.metricsReceiver) + } + return nil } diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index ea1d39915bb..a48fb007521 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -23,8 +23,6 @@ import ( "io/ioutil" "net" "net/http" - "os" - "strings" "testing" "time" @@ -33,10 +31,15 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exportertest" collectortrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" @@ -56,10 +59,9 @@ func TestGrpcGateway_endToEnd(t *testing.T) { // Set the buffer count to 1 to make it flush the test span immediately. sink := new(exportertest.SinkTraceExporter) - ocr, err := New(otlpReceiver, "tcp", addr, sink, nil) - require.NoError(t, err, "Failed to create trace receiver: %v", err) + ocr := newHTTPReceiver(t, otlpReceiver, addr, sink, nil) - require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") defer ocr.Shutdown(context.Background()) // TODO(nilebox): make starting server deterministic @@ -68,10 +70,6 @@ func TestGrpcGateway_endToEnd(t *testing.T) { url := fmt.Sprintf("http://%s/v1/trace", addr) - // Verify that CORS is not enabled by default, but that it gives an 405 - // method not allowed error. - verifyCorsResp(t, url, "origin.com", 405, false) - traceJSON := []byte(` { "resource_spans": [ @@ -174,11 +172,11 @@ func TestProtoHttp(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) // Set the buffer count to 1 to make it flush the test span immediately. - sink := new(exportertest.SinkTraceExporter) - ocr, err := New(otlpReceiver, "tcp", addr, sink, nil) - require.NoError(t, err, "Failed to create trace receiver: %v", err) + tSink := new(exportertest.SinkTraceExporter) + mSink := new(exportertest.SinkMetricsExporter) + ocr := newHTTPReceiver(t, otlpReceiver, addr, tSink, mSink) - require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver") defer ocr.Shutdown(context.Background()) // TODO(nilebox): make starting server deterministic @@ -188,7 +186,6 @@ func TestProtoHttp(t *testing.T) { url := fmt.Sprintf("http://%s/v1/trace", addr) wantOtlp := pdata.TracesToOtlp(testdata.GenerateTraceDataOneSpan()) - traceProto := collectortrace.ExportTraceServiceRequest{ ResourceSpans: wantOtlp, } @@ -198,34 +195,21 @@ func TestProtoHttp(t *testing.T) { } buf := bytes.NewBuffer(traceBytes) - - req, err := http.NewRequest("POST", url, buf) - require.NoError(t, err, "Error creating trace POST request: %v", err) - req.Header.Set("Content-Type", "application/x-protobuf") - - client := &http.Client{} - resp, err := client.Do(req) + resp, err := http.Post(url, "application/x-protobuf", buf) require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err) respBytes, err := ioutil.ReadAll(resp.Body) require.NoError(t, err, "Error reading response from trace grpc-gateway") + require.NoError(t, resp.Body.Close(), "Error closing response body") - err = resp.Body.Close() - require.NoError(t, err, "Error closing response body") - - if resp.StatusCode != 200 { - t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode) - } - - if resType := resp.Header.Get("Content-Type"); resType != "application/x-protobuf" { - t.Errorf("response Content-Type got: %s, want: %s", resType, "application/x-protobuf") - } + require.Equal(t, 200, resp.StatusCode, "Unexpected return status") + require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type") tmp := collectortrace.ExportTraceServiceResponse{} err = proto.Unmarshal(respBytes, &tmp) require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto") - gotOtlp := pdata.TracesToOtlp(sink.AllTraces()[0]) + gotOtlp := pdata.TracesToOtlp(tSink.AllTraces()[0]) if len(gotOtlp) != len(wantOtlp) { t.Fatalf("len(traces):\nGot: %d\nWant: %d\n", len(gotOtlp), len(wantOtlp)) @@ -244,279 +228,42 @@ func TestProtoHttp(t *testing.T) { } -func TestTraceGrpcGatewayCors_endToEnd(t *testing.T) { +func TestGRPCNewPortAlreadyUsed(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - corsOrigins := []string{"allowed-*.com"} - - sink := new(exportertest.SinkTraceExporter) - ocr, err := New(otlpReceiver, "tcp", addr, sink, nil, WithCorsOrigins(corsOrigins)) - require.NoError(t, err, "Failed to create trace receiver: %v", err) - defer ocr.Shutdown(context.Background()) - - require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start trace receiver: %v", err) - - // TODO(nilebox): make starting server deterministic - // Wait for the servers to start - <-time.After(10 * time.Millisecond) - - url := fmt.Sprintf("http://%s/v1/trace", addr) - - // Verify allowed domain gets responses that allow CORS. - verifyCorsResp(t, url, "allowed-origin.com", 200, true) - - // Verify disallowed domain gets responses that disallow CORS. - verifyCorsResp(t, url, "disallowed-origin.com", 200, false) -} - -func TestMetricsGrpcGatewayCors_endToEnd(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - corsOrigins := []string{"allowed-*.com"} - - sink := new(exportertest.SinkMetricsExporter) - ocr, err := New(otlpReceiver, "tcp", addr, nil, sink, WithCorsOrigins(corsOrigins)) - require.NoError(t, err, "Failed to create metrics receiver: %v", err) - defer ocr.Shutdown(context.Background()) - - require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start metrics receiver: %v", err) - - // TODO(nilebox): make starting server deterministic - // Wait for the servers to start - <-time.After(10 * time.Millisecond) - - url := fmt.Sprintf("http://%s/v1/metrics", addr) - - // Verify allowed domain gets responses that allow CORS. - verifyCorsResp(t, url, "allowed-origin.com", 200, true) - - // Verify disallowed domain gets responses that disallow CORS. - verifyCorsResp(t, url, "disallowed-origin.com", 200, false) -} - -// As per Issue https://github.com/census-instrumentation/opencensus-service/issues/366 -// the agent's mux should be able to accept all Proto affiliated content-types and not -// redirect them to the web-grpc-gateway endpoint. -func TestAcceptAllGRPCProtoAffiliatedContentTypes(t *testing.T) { - t.Skip("Currently a flaky test as we need a way to flush all written traces") - - addr := testutil.GetAvailableLocalAddress(t) - cbts := new(exportertest.SinkTraceExporter) - ocr, err := New(otlpReceiver, "tcp", addr, cbts, nil) - require.NoError(t, err, "Failed to create trace receiver: %v", err) - - require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost()), "Failed to start the trace receiver: %v", err) - defer ocr.Shutdown(context.Background()) - - // Now start the client with the various Proto affiliated gRPC Content-SubTypes as per: - // https://godoc.org/google.golang.org/grpc#CallContentSubtype - protoAffiliatedContentSubTypes := []string{"", "proto"} - for _, subContentType := range protoAffiliatedContentSubTypes { - if err := runContentTypeTests(addr, asSubContentType, subContentType); err != nil { - t.Errorf("%q subContentType failed to send proto: %v", subContentType, err) - } - } - - // Now start the client with the various Proto affiliated gRPC Content-Types, - // as we encountered in https://github.com/census-instrumentation/opencensus-service/issues/366 - protoAffiliatedContentTypes := []string{"application/grpc", "application/grpc+proto"} - for _, contentType := range protoAffiliatedContentTypes { - if err := runContentTypeTests(addr, asContentType, contentType); err != nil { - t.Errorf("%q Content-type failed to send proto: %v", contentType, err) - } - } - - // Before we exit we have to verify that we got exactly 4 TraceService requests. - wantLen := len(protoAffiliatedContentSubTypes) + len(protoAffiliatedContentTypes) - gotReqs := cbts.AllTraces() - if len(gotReqs) != wantLen { - t.Errorf("Receiver ExportTraceServiceRequest length mismatch:: Got %d Want %d", len(gotReqs), wantLen) - } -} - -const ( - asSubContentType = true - asContentType = false -) - -func runContentTypeTests(addr string, contentTypeDesignation bool, contentType string) error { - opts := []grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithBlock(), - grpc.WithDisableRetry(), - } - - if contentTypeDesignation == asContentType { - opts = append(opts, grpc.WithDefaultCallOptions( - grpc.Header(&metadata.MD{"Content-Type": []string{contentType}}))) - } else { - opts = append(opts, grpc.WithDefaultCallOptions(grpc.CallContentSubtype(contentType))) - } - - cc, err := grpc.Dial(addr, opts...) - if err != nil { - return fmt.Errorf("Creating grpc.ClientConn: %v", err) - } - defer cc.Close() - - acc := collectortrace.NewTraceServiceClient(cc) - - req := &collectortrace.ExportTraceServiceRequest{ - ResourceSpans: []*otlptrace.ResourceSpans{ - { - Resource: &otlpresource.Resource{ - Attributes: []*otlpcommon.KeyValue{ - { - Key: "sub-type", - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: contentType}}, - }, - }, - }, - InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ - { - Spans: []*otlptrace.Span{ - { - TraceId: []byte{ - 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, - 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, - }, - }, - }, - }, - }, - }, - }, - } - - _, err = acc.Export(context.Background(), req) - return err -} - -func verifyCorsResp(t *testing.T, url string, origin string, wantStatus int, wantAllowed bool) { - req, err := http.NewRequest("OPTIONS", url, nil) - require.NoError(t, err, "Error creating trace OPTIONS request: %v", err) - req.Header.Set("Origin", origin) - req.Header.Set("Access-Control-Request-Method", "POST") - - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err, "Error sending OPTIONS to http server: %v", err) - - err = resp.Body.Close() - if err != nil { - t.Errorf("Error closing OPTIONS response body, %v", err) - } - - assert.Equal(t, wantStatus, resp.StatusCode) - - gotAllowOrigin := resp.Header.Get("Access-Control-Allow-Origin") - gotAllowMethods := resp.Header.Get("Access-Control-Allow-Methods") + ln, err := net.Listen("tcp", addr) + require.NoError(t, err, "failed to listen on %q: %v", addr, err) + defer ln.Close() - wantAllowOrigin := "" - wantAllowMethods := "" - if wantAllowed { - wantAllowOrigin = origin - wantAllowMethods = "POST" - } - assert.Equal(t, wantAllowOrigin, gotAllowOrigin) - assert.Equal(t, wantAllowMethods, gotAllowMethods) -} + r := newGRPCReceiver(t, otlpReceiver, addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) + require.NotNil(t, r) -func TestStopWithoutStartNeverCrashes(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - ocr, err := New(otlpReceiver, "tcp", addr, nil, nil) - require.NoError(t, err, "Failed to create an OpenCensus receiver: %v", err) - // Stop it before ever invoking Start*. - ocr.stop() + require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } -func TestNewPortAlreadyUsed(t *testing.T) { +func TestHTTPNewPortAlreadyUsed(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) ln, err := net.Listen("tcp", addr) require.NoError(t, err, "failed to listen on %q: %v", addr, err) defer ln.Close() - r, err := New(otlpReceiver, "tcp", addr, nil, nil) - require.Error(t, err) - require.Nil(t, r) -} - -func TestMultipleStopReceptionShouldNotError(t *testing.T) { - addr := testutil.GetAvailableLocalAddress(t) - r, err := New(otlpReceiver, "tcp", addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) - require.NoError(t, err) + r := newHTTPReceiver(t, otlpReceiver, addr, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) require.NotNil(t, r) - require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, r.Shutdown(context.Background())) + require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } -func TestStartWithoutConsumersShouldFail(t *testing.T) { +func TestGRPCStartWithoutConsumers(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - r, err := New(otlpReceiver, "tcp", addr, nil, nil) - require.NoError(t, err) + r := newGRPCReceiver(t, otlpReceiver, addr, nil, nil) require.NotNil(t, r) - require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } -func tempSocketName(t *testing.T) string { - tmpfile, err := ioutil.TempFile("", "sock") - require.NoError(t, err) - require.NoError(t, tmpfile.Close()) - socket := tmpfile.Name() - require.NoError(t, os.Remove(socket)) - return socket -} - -func TestReceiveOnUnixDomainSocket_endToEnd(t *testing.T) { - socketName := tempSocketName(t) - cbts := new(exportertest.SinkTraceExporter) - r, err := New(otlpReceiver, "unix", socketName, cbts, nil) - require.NoError(t, err) +func TestHTTPStartWithoutConsumers(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) + r := newHTTPReceiver(t, otlpReceiver, addr, nil, nil) require.NotNil(t, r) - require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) - defer r.Shutdown(context.Background()) - - // Wait for the servers to start - <-time.After(10 * time.Millisecond) - - span := ` - { - "resource_spans": [ - { - "instrumentation_library_spans": [ - { - "spans": [ - { - "trace_id": "YpsR8/le4OgjwSSxhjlrEg==", - "span_id": "2CogcbJh7Ko=", - "name": "testSpan", - "start_time_unix_nano": 1544712660000000000, - "end_time_unix_nano": 1544712661000000000 - } - ] - } - ] - } - ] - }` - - c := http.Client{ - Transport: &http.Transport{ - DialContext: func(ctx context.Context, network, addr string) (conn net.Conn, err error) { - return net.Dial("unix", socketName) - }, - }, - } - - response, err := c.Post("http://unix/v1/trace", "application/json", strings.NewReader(span)) - require.NoError(t, err) - defer response.Body.Close() - - bodyBytes, err := ioutil.ReadAll(response.Body) - require.NoError(t, err) - bodyString := string(bodyBytes) - fmt.Println(bodyString) - - require.Equal(t, 200, response.StatusCode) + require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } // TestOTLPReceiverTrace_HandleNextConsumerResponse checks if the trace receiver @@ -561,14 +308,6 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { req := &collectortrace.ExportTraceServiceRequest{ ResourceSpans: []*otlptrace.ResourceSpans{ { - Resource: &otlpresource.Resource{ - Attributes: []*otlpcommon.KeyValue{ - { - Key: conventions.AttributeServiceName, - Value: &otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "test-svc"}}, - }, - }, - }, InstrumentationLibrarySpans: []*otlptrace.InstrumentationLibrarySpans{ { Spans: []*otlptrace.Span{ @@ -616,19 +355,13 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { sink := new(exportertest.SinkTraceExporter) - var opts []Option - ocr, err := New(otlpReceiver, "tcp", addr, nil, nil, opts...) - require.Nil(t, err) + ocr := newGRPCReceiver(t, otlpReceiver, addr, sink, nil) require.NotNil(t, ocr) - - ocr.traceConsumer = sink - require.Nil(t, ocr.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, ocr.Start(context.Background(), componenttest.NewNopHost())) defer ocr.Shutdown(context.Background()) cc, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock()) - if err != nil { - t.Errorf("grpc.Dial: %v", err) - } + require.NoError(t, err) defer cc.Close() for _, ingestionState := range tt.ingestionStates { @@ -656,3 +389,84 @@ func TestOTLPReceiverTrace_HandleNextConsumerResponse(t *testing.T) { } } } + +func TestGRPCInvalidTLSCredentials(t *testing.T) { + cfg := &Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + NameVal: "IncorrectTLS", + }, + Protocols: Protocols{ + GRPC: &configgrpc.GRPCServerSettings{ + Endpoint: "localhost:50000", + TLSCredentials: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "willfail", + }, + }, + }, + }, + } + + // TLS is resolved during Creation of the receiver for GRPC. + factory := &Factory{} + _, err := factory.createReceiver(cfg) + assert.EqualError(t, err, + `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) +} + +func TestHTTPInvalidTLSCredentials(t *testing.T) { + cfg := &Config{ + ReceiverSettings: configmodels.ReceiverSettings{ + NameVal: "IncorrectTLS", + }, + Protocols: Protocols{ + HTTP: &confighttp.HTTPServerSettings{ + Endpoint: "localhost:50000", + TLSSetting: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "willfail", + }, + }, + }, + }, + } + + // TLS is resolved during Start for HTTP. + r := newReceiver(t, &Factory{}, cfg, new(exportertest.SinkTraceExporter), new(exportertest.SinkMetricsExporter)) + assert.EqualError(t, r.Start(context.Background(), componenttest.NewNopHost()), + `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) +} + +func newGRPCReceiver(t *testing.T, name string, endpoint string, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *Receiver { + factory := &Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.SetName(name) + cfg.GRPC.Endpoint = endpoint + cfg.HTTP = nil + return newReceiver(t, factory, cfg, tc, mc) +} + +func newHTTPReceiver(t *testing.T, name string, endpoint string, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *Receiver { + factory := &Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + cfg.SetName(name) + cfg.HTTP.Endpoint = endpoint + cfg.GRPC = nil + return newReceiver(t, factory, cfg, tc, mc) +} + +func newReceiver(t *testing.T, factory *Factory, cfg *Config, tc consumer.TraceConsumer, mc consumer.MetricsConsumer) *Receiver { + r, err := factory.createReceiver(cfg) + require.NoError(t, err) + if tc != nil { + params := component.ReceiverCreateParams{} + _, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc) + require.NoError(t, err) + } + if mc != nil { + params := component.ReceiverCreateParams{} + _, err = factory.CreateMetricsReceiver(context.Background(), params, cfg, mc) + require.NoError(t, err) + } + return r +} diff --git a/receiver/otlpreceiver/testdata/config.yaml b/receiver/otlpreceiver/testdata/config.yaml index c3af95b3994..f80fe2c1589 100644 --- a/receiver/otlpreceiver/testdata/config.yaml +++ b/receiver/otlpreceiver/testdata/config.yaml @@ -2,51 +2,72 @@ receivers: # The following entry initializes the default OTLP receiver. # The full name of this receiver is `otlp` and can be referenced in pipelines by 'otlp'. otlp: + protocols: + grpc: + http: # The following entry demonstrates configuring the common receiver settings: # - endpoint # This configuration is of type 'otlp' and has the name 'customname' with a full name of 'otlp/customname' # ('/'. To reference this configuration in a pipeline, use the full name `otlp/customname`. otlp/customname: - # The receiver will listen on endpoint: "localhost:9090". - endpoint: localhost:9090 + protocols: + grpc: + # The receiver will listen on endpoint: "localhost:9090". + endpoint: localhost:9090 # The following entry configures all of the keep alive settings. These settings are used to configure the receiver. otlp/keepalive: - keepalive: - server_parameters: - max_connection_idle: 11s - max_connection_age: 12s - max_connection_age_grace: 13s - time: 30s - timeout: 5s - enforcement_policy: - min_time: 10s - permit_without_stream: true + protocols: + grpc: + keepalive: + server_parameters: + max_connection_idle: 11s + max_connection_age: 12s + max_connection_age_grace: 13s + time: 30s + timeout: 5s + enforcement_policy: + min_time: 10s + permit_without_stream: true # The following demonstrates how to set maximum limits on stream, message size and connection idle time. # Note: The test yaml has demonstrated configuration on a grouped by their structure; however, all of the settings can # be mix and matched like adding the maximum connection idle setting in this example. otlp/msg-size-conc-connect-max-idle: - max_recv_msg_size_mib: 32 - max_concurrent_streams: 16 - keepalive: - server_parameters: - max_connection_idle: 10s + protocols: + grpc: + max_recv_msg_size_mib: 32 + max_concurrent_streams: 16 + keepalive: + server_parameters: + max_connection_idle: 10s # The following entry demonstrates how to specify TLS credentials for the server. # Note: These files do not exist. If the receiver is started with this configuration, it will fail. otlp/tlscredentials: - tls_credentials: - cert_file: test.crt - key_file: test.key + protocols: + grpc: + tls_credentials: + cert_file: test.crt + key_file: test.key + http: + tls_settings: + cert_file: test.crt + key_file: test.key # The following entry demonstrates how to specify a Unix Domain Socket for the server. otlp/uds: - transport: unix - endpoint: /tmp/otlp.sock - + protocols: + grpc: + # transport: unix + endpoint: /tmp/grpc_otlp.sock + http: + # transport: unix + endpoint: /tmp/http_otlp.sock # The following entry demonstrates how to configure the OTLP receiver to allow Cross-Origin Resource Sharing (CORS). # Both fully qualified domain names and the use of wildcards are supported. otlp/cors: - cors_allowed_origins: - - https://*.test.com # Wildcard subdomain. Allows domains like https://www.test.com and https://foo.test.com but not https://wwwtest.com. - - https://test.com # Fully qualified domain name. Allows https://test.com only. + protocols: + http: + cors_allowed_origins: + - https://*.test.com # Wildcard subdomain. Allows domains like https://www.test.com and https://foo.test.com but not https://wwwtest.com. + - https://test.com # Fully qualified domain name. Allows https://test.com only. processors: exampleprocessor: diff --git a/service/testdata/otelcol-config-minimal.yaml b/service/testdata/otelcol-config-minimal.yaml index 448973ae172..372bd08d5cb 100644 --- a/service/testdata/otelcol-config-minimal.yaml +++ b/service/testdata/otelcol-config-minimal.yaml @@ -1,5 +1,7 @@ receivers: otlp: + protocols: + grpc: exporters: otlp: diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index 94704d6c152..08be5fda0fb 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -198,7 +198,8 @@ func (or *OTLPDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) factory := otlpreceiver.Factory{} cfg := factory.CreateDefaultConfig().(*otlpreceiver.Config) cfg.SetName(or.ProtocolName()) - cfg.Endpoint = fmt.Sprintf("localhost:%d", or.Port) + cfg.GRPC.Endpoint = fmt.Sprintf("localhost:%d", or.Port) + cfg.HTTP = nil var err error params := component.ReceiverCreateParams{Logger: zap.NewNop()} if or.traceReceiver, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc); err != nil { diff --git a/testbed/testbed/senders.go b/testbed/testbed/senders.go index 748ac539609..fa96b2e4cb1 100644 --- a/testbed/testbed/senders.go +++ b/testbed/testbed/senders.go @@ -327,7 +327,9 @@ func (ote *OTLPTraceDataSender) GenConfigYAMLStr() string { // Note that this generates a receiver config for agent. return fmt.Sprintf(` otlp: - endpoint: "%s:%d"`, ote.Host, ote.Port) + protocols: + grpc: + endpoint: "%s:%d"`, ote.Host, ote.Port) } func (ote *OTLPTraceDataSender) ProtocolName() string {