Skip to content

Commit

Permalink
Split OTLP receiver by protocols, allow mTLS support
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jun 28, 2020
1 parent 12b3d9c commit 4334566
Show file tree
Hide file tree
Showing 11 changed files with 449 additions and 753 deletions.
36 changes: 8 additions & 28 deletions receiver/otlpreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,19 @@ package otlpreceiver

import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
)

type Protocols struct {
GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"`
HTTP *confighttp.HTTPServerSettings `mapstructure:"http"`
}

// Config defines configuration for OTLP receiver.
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// Configures the receiver server protocol.
configgrpc.GRPCServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// Transport to use: one of tcp or unix, defaults to tcp
Transport string `mapstructure:"transport"`

// CorsOrigins are the allowed CORS origins for HTTP/JSON requests to grpc-gateway adapter
// for the OTLP receiver. See github.com/rs/cors
// An empty list means that CORS is not enabled at all. A wildcard (*) can be
// used to match any origin or one or more characters of an origin.
CorsOrigins []string `mapstructure:"cors_allowed_origins"`
}

func (rOpts *Config) buildOptions() ([]Option, error) {
var opts []Option
if len(rOpts.CorsOrigins) > 0 {
opts = append(opts, WithCorsOrigins(rOpts.CorsOrigins))
}

grpcServerOptions, err := rOpts.GRPCServerSettings.ToServerOption()
if err != nil {
return nil, err
}
if len(grpcServerOptions) > 0 {
opts = append(opts, WithGRPCServerOptions(grpcServerOptions...))
}

return opts, nil
// Protocols is the configuration for the supported protocols, currently gRPC and HTTP (Proto and JSON).
Protocols `mapstructure:"protocols"`
}
123 changes: 60 additions & 63 deletions receiver/otlpreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtls"
)
Expand Down Expand Up @@ -51,10 +52,11 @@ func TestLoadConfig(t *testing.T) {
TypeVal: typeStr,
NameVal: "otlp/customname",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "localhost:9090",
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "localhost:9090",
},
},
Transport: "tcp",
})

r2 := cfg.Receivers["otlp/keepalive"].(*Config)
Expand All @@ -64,23 +66,24 @@ func TestLoadConfig(t *testing.T) {
TypeVal: typeStr,
NameVal: "otlp/keepalive",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 11 * time.Second,
MaxConnectionAge: 12 * time.Second,
MaxConnectionAgeGrace: 13 * time.Second,
Time: 30 * time.Second,
Timeout: 5 * time.Second,
},
EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 11 * time.Second,
MaxConnectionAge: 12 * time.Second,
MaxConnectionAgeGrace: 13 * time.Second,
Time: 30 * time.Second,
Timeout: 5 * time.Second,
},
EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
},
},
},
},
Transport: "tcp",
})

r3 := cfg.Receivers["otlp/msg-size-conc-connect-max-idle"].(*Config)
Expand All @@ -90,17 +93,18 @@ func TestLoadConfig(t *testing.T) {
TypeVal: typeStr,
NameVal: "otlp/msg-size-conc-connect-max-idle",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
MaxRecvMsgSizeMiB: 32,
MaxConcurrentStreams: 16,
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 10 * time.Second,
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
MaxRecvMsgSizeMiB: 32,
MaxConcurrentStreams: 16,
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 10 * time.Second,
},
},
},
},
Transport: "tcp",
})

// NOTE: Once the config loader checks for the files existence, this test may fail and require
Expand All @@ -112,16 +116,26 @@ func TestLoadConfig(t *testing.T) {
TypeVal: typeStr,
NameVal: "otlp/tlscredentials",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
TLSCredentials: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
TLSCredentials: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
},
},
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:55681",
TLSSetting: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
},
},
},
},
Transport: "tcp",
})

r5 := cfg.Receivers["otlp/cors"].(*Config)
Expand All @@ -131,12 +145,12 @@ func TestLoadConfig(t *testing.T) {
TypeVal: typeStr,
NameVal: "otlp/cors",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
TLSCredentials: nil,
Protocols: Protocols{
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:55681",
CorsOrigins: []string{"https://*.test.com", "https://test.com"},
},
},
Transport: "tcp",
CorsOrigins: []string{"https://*.test.com", "https://test.com"},
})

r6 := cfg.Receivers["otlp/uds"].(*Config)
Expand All @@ -146,32 +160,15 @@ func TestLoadConfig(t *testing.T) {
TypeVal: typeStr,
NameVal: "otlp/uds",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "/tmp/otlp.sock",
TLSCredentials: nil,
},
Transport: "unix",
})
}

func TestBuildOptions_TLSCredentials(t *testing.T) {
cfg := Config{
ReceiverSettings: configmodels.ReceiverSettings{
NameVal: "IncorrectTLS",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
TLSCredentials: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "willfail",
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "/tmp/grpc_otlp.sock",
// Transport: "unix",
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "/tmp/http_otlp.sock",
// Transport: "unix",
},
},
},
}
_, err := cfg.buildOptions()
assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`)

cfg.TLSCredentials = &configtls.TLSServerSetting{}
opt, err := cfg.buildOptions()
assert.NoError(t, err)
assert.NotNil(t, opt)
})
}
84 changes: 60 additions & 24 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@ package otlpreceiver

import (
"context"
"fmt"

"github.com/spf13/viper"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
)

const (
// The value of "type" key in configuration.
typeStr = "otlp"

// Protocol values.
protoGRPC = "grpc"
protoHTTP = "http"
protocolsFieldName = "protocols"
)

// Factory is the Factory for receiver.
Expand All @@ -37,28 +46,61 @@ func (f *Factory) Type() configmodels.Type {
return typeStr
}

// CustomUnmarshaler returns nil because we don't need custom unmarshaling for this config.
func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return nil
}

// CreateDefaultConfig creates the default configuration for receiver.
func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
return &Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:55681",
},
},
Transport: "tcp",
}
}

// CustomUnmarshaler is used to add defaults for named but empty protocols
func (f *Factory) CustomUnmarshaler() component.CustomUnmarshaler {
return func(componentViperSection *viper.Viper, intoCfg interface{}) error {
if componentViperSection == nil || len(componentViperSection.AllKeys()) == 0 {
return fmt.Errorf("empty config for Jaeger receiver")
}
// first load the config normally
err := componentViperSection.UnmarshalExact(intoCfg)
if err != nil {
return err
}
receiverCfg, ok := intoCfg.(*Config)
if !ok {
return fmt.Errorf("config type not *otlpreceiver.Config")
}

// next manually search for protocols in viper, if a protocol is not present it means it is disable.
protocols := componentViperSection.GetStringMap(protocolsFieldName)
if _, ok := protocols[protoGRPC]; !ok {
receiverCfg.GRPC = nil
}

if _, ok := protocols[protoHTTP]; !ok {
receiverCfg.HTTP = nil
}

if receiverCfg.GRPC == nil && receiverCfg.HTTP == nil {
return fmt.Errorf("must specify at least one protocol when using the OTLP receiver")
}

return nil
}
}

// CreateTraceReceiver creates a trace receiver based on provided config.
func (f *Factory) CreateTraceReceiver(
_ context.Context,
ctx context.Context,
_ component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.TraceConsumer,
Expand All @@ -67,27 +109,27 @@ func (f *Factory) CreateTraceReceiver(
if err != nil {
return nil, err
}

r.traceConsumer = nextConsumer

if err = r.registerTraceConsumer(ctx, nextConsumer); err != nil {
return nil, err
}
return r, nil
}

// CreateMetricsReceiver creates a metrics receiver based on provided config.
func (f *Factory) CreateMetricsReceiver(
_ context.Context,
ctx context.Context,
_ component.ReceiverCreateParams,
cfg configmodels.Receiver,
consumer consumer.MetricsConsumer,
) (component.MetricsReceiver, error) {

r, err := f.createReceiver(cfg)
if err != nil {
return nil, err
}

r.metricsConsumer = consumer

if err = r.registerMetricsConsumer(ctx, consumer); err != nil {
return nil, err
}
return r, nil
}

Expand All @@ -100,15 +142,9 @@ func (f *Factory) createReceiver(cfg configmodels.Receiver) (*Receiver, error) {
// Check to see if there is already a receiver for this config.
receiver, ok := receivers[rCfg]
if !ok {
// Build the configuration options.
opts, err := rCfg.buildOptions()
if err != nil {
return nil, err
}

var err error
// We don't have a receiver, so create one.
receiver, err = New(
rCfg.Name(), rCfg.Transport, rCfg.Endpoint, nil, nil, opts...)
receiver, err = New(rCfg)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 4334566

Please sign in to comment.