Skip to content

Commit

Permalink
Split OTLP receiver by protocols, allow mTLS support (open-telemetry#…
Browse files Browse the repository at this point in the history
…1223)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored and wyTrivail committed Jul 13, 2020
1 parent c1645d3 commit b732ef4
Show file tree
Hide file tree
Showing 21 changed files with 740 additions and 869 deletions.
35 changes: 25 additions & 10 deletions receiver/otlpreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,40 @@
This is the default receiver for the OpenTelemetry project.

To get started, all that is required to enable the OpenTelemetry receiver is to
include it in the receiver definitions. This will enable the default values as
specified [here](./factory.go).
include it in the receiver definitions and defined the enabled protocols. This will
enable the default values as specified [here](./factory.go).
The following is an example:
```yaml
receivers:
otlp:
protocols:
grpc:
http:
```
The full list of settings exposed for this receiver are documented [here](./config.go)
with detailed sample configurations [here](./testdata/config.yaml).
A protocol can be disabled by simply not specifying it in the list of protocols:
```yaml
receivers:
otlp/only_grpc:
protocols:
grpc:
```
## Communicating over TLS
This receiver supports communication using Transport Layer Security (TLS). TLS
can be configured by specifying a `tls_credentials` object in the receiver
configuration for receivers that support it.
```yaml
receivers:
otlp:
tls_credentials:
key_file: /key.pem # path to private key
cert_file: /cert.pem # path to certificate
protocols:
grpc:
tls_credentials:
key_file: /key.pem # path to private key
cert_file: /cert.pem # path to certificate
```

## Writing with HTTP/JSON
Expand All @@ -45,9 +58,11 @@ specifying a list of allowed CORS origins in the `cors_allowed_origins` field:
```yaml
receivers:
otlp:
endpoint: "localhost:55680"
cors_allowed_origins:
- http://test.com
# Origins can have wildcards with *, use * by itself to match any origin.
- https://*.example.com
protocols:
http:
endpoint: "localhost:55681"
cors_allowed_origins:
- http://test.com
# Origins can have wildcards with *, use * by itself to match any origin.
- https://*.example.com
```
36 changes: 8 additions & 28 deletions receiver/otlpreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,19 @@ package otlpreceiver

import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
)

type Protocols struct {
GRPC *configgrpc.GRPCServerSettings `mapstructure:"grpc"`
HTTP *confighttp.HTTPServerSettings `mapstructure:"http"`
}

// Config defines configuration for OTLP receiver.
type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// Configures the receiver server protocol.
configgrpc.GRPCServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// Transport to use: one of tcp or unix, defaults to tcp
Transport string `mapstructure:"transport"`

// CorsOrigins are the allowed CORS origins for HTTP/JSON requests to grpc-gateway adapter
// for the OTLP receiver. See github.com/rs/cors
// An empty list means that CORS is not enabled at all. A wildcard (*) can be
// used to match any origin or one or more characters of an origin.
CorsOrigins []string `mapstructure:"cors_allowed_origins"`
}

func (rOpts *Config) buildOptions() ([]Option, error) {
var opts []Option
if len(rOpts.CorsOrigins) > 0 {
opts = append(opts, WithCorsOrigins(rOpts.CorsOrigins))
}

grpcServerOptions, err := rOpts.GRPCServerSettings.ToServerOption()
if err != nil {
return nil, err
}
if len(grpcServerOptions) > 0 {
opts = append(opts, WithGRPCServerOptions(grpcServerOptions...))
}

return opts, nil
// Protocols is the configuration for the supported protocols, currently gRPC and HTTP (Proto and JSON).
Protocols `mapstructure:"protocols"`
}
180 changes: 100 additions & 80 deletions receiver/otlpreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtls"
)
Expand All @@ -39,144 +40,163 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, cfg)

assert.Equal(t, len(cfg.Receivers), 7)
assert.Equal(t, len(cfg.Receivers), 9)

r0 := cfg.Receivers["otlp"]
assert.Equal(t, r0, factory.CreateDefaultConfig())
assert.Equal(t, cfg.Receivers["otlp"], factory.CreateDefaultConfig())

r1 := cfg.Receivers["otlp/customname"].(*Config)
assert.Equal(t, r1,
defaultOnlyGRPC := factory.CreateDefaultConfig().(*Config)
defaultOnlyGRPC.SetName("otlp/only_grpc")
defaultOnlyGRPC.HTTP = nil
assert.Equal(t, cfg.Receivers["otlp/only_grpc"], defaultOnlyGRPC)

defaultOnlyHTTP := factory.CreateDefaultConfig().(*Config)
defaultOnlyHTTP.SetName("otlp/only_http")
defaultOnlyHTTP.GRPC = nil
assert.Equal(t, cfg.Receivers["otlp/only_http"], defaultOnlyHTTP)

assert.Equal(t, cfg.Receivers["otlp/customname"],
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "otlp/customname",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "localhost:9090",
ReadBufferSize: 512 * 1024,
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "localhost:9090",
ReadBufferSize: 512 * 1024,
},
},
Transport: "tcp",
})

r2 := cfg.Receivers["otlp/keepalive"].(*Config)
assert.Equal(t, r2,
assert.Equal(t, cfg.Receivers["otlp/keepalive"],
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "otlp/keepalive",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
ReadBufferSize: 512 * 1024,
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 11 * time.Second,
MaxConnectionAge: 12 * time.Second,
MaxConnectionAgeGrace: 13 * time.Second,
Time: 30 * time.Second,
Timeout: 5 * time.Second,
},
EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
ReadBufferSize: 512 * 1024,
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 11 * time.Second,
MaxConnectionAge: 12 * time.Second,
MaxConnectionAgeGrace: 13 * time.Second,
Time: 30 * time.Second,
Timeout: 5 * time.Second,
},
EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{
MinTime: 10 * time.Second,
PermitWithoutStream: true,
},
},
},
},
Transport: "tcp",
})

r3 := cfg.Receivers["otlp/msg-size-conc-connect-max-idle"].(*Config)
assert.Equal(t, r3,
assert.Equal(t, cfg.Receivers["otlp/msg-size-conc-connect-max-idle"],
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "otlp/msg-size-conc-connect-max-idle",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
MaxRecvMsgSizeMiB: 32,
MaxConcurrentStreams: 16,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 10 * time.Second,
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
MaxRecvMsgSizeMiB: 32,
MaxConcurrentStreams: 16,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 10 * time.Second,
},
},
},
},
Transport: "tcp",
})

// NOTE: Once the config loader checks for the files existence, this test may fail and require
// use of fake cert/key for test purposes.
r4 := cfg.Receivers["otlp/tlscredentials"].(*Config)
assert.Equal(t, r4,
assert.Equal(t, cfg.Receivers["otlp/tlscredentials"],
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "otlp/tlscredentials",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
ReadBufferSize: 512 * 1024,
TLSCredentials: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
TLSCredentials: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
},
},
ReadBufferSize: 512 * 1024,
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:55681",
TLSSetting: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
KeyFile: "test.key",
},
},
},
},
Transport: "tcp",
})

r5 := cfg.Receivers["otlp/cors"].(*Config)
assert.Equal(t, r5,
assert.Equal(t, cfg.Receivers["otlp/cors"],
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "otlp/cors",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
ReadBufferSize: 512 * 1024,
Protocols: Protocols{
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "0.0.0.0:55681",
CorsOrigins: []string{"https://*.test.com", "https://test.com"},
},
},
Transport: "tcp",
CorsOrigins: []string{"https://*.test.com", "https://test.com"},
})

r6 := cfg.Receivers["otlp/uds"].(*Config)
assert.Equal(t, r6,
assert.Equal(t, cfg.Receivers["otlp/uds"],
&Config{
ReceiverSettings: configmodels.ReceiverSettings{
TypeVal: typeStr,
NameVal: "otlp/uds",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "/tmp/otlp.sock",
ReadBufferSize: 512 * 1024,
Protocols: Protocols{
GRPC: &configgrpc.GRPCServerSettings{
Endpoint: "/tmp/grpc_otlp.sock",
// Transport: "unix",
ReadBufferSize: 512 * 1024,
},
HTTP: &confighttp.HTTPServerSettings{
Endpoint: "/tmp/http_otlp.sock",
// Transport: "unix",
},
},
Transport: "unix",
})
}

func TestBuildOptions_TLSCredentials(t *testing.T) {
cfg := Config{
ReceiverSettings: configmodels.ReceiverSettings{
NameVal: "IncorrectTLS",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
TLSCredentials: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "willfail",
},
},
},
}
_, err := cfg.buildOptions()
assert.EqualError(t, err, `failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`)

cfg.TLSCredentials = &configtls.TLSServerSetting{}
opt, err := cfg.buildOptions()
func TestFailedLoadConfig(t *testing.T) {
factories, err := config.ExampleComponents()
assert.NoError(t, err)
assert.NotNil(t, opt)

factory := &Factory{}
factories.Receivers[typeStr] = factory
_, err = config.LoadConfigFile(t, path.Join(".", "testdata", "typo_default_proto_config.yaml"), factories)
assert.EqualError(t, err, `error reading settings for receiver type "otlp": unknown protocols in the OTLP receiver`)

_, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_proto_config.yaml"), factories)
assert.EqualError(t, err, "error reading settings for receiver type \"otlp\": 1 error(s) decoding:\n\n* 'protocols' has invalid keys: thrift")

_, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_no_proto_config.yaml"), factories)
assert.EqualError(t, err, "error reading settings for receiver type \"otlp\": must specify at least one protocol when using the OTLP receiver")

_, err = config.LoadConfigFile(t, path.Join(".", "testdata", "bad_empty_config.yaml"), factories)
assert.EqualError(t, err, "error reading settings for receiver type \"otlp\": empty config for OTLP receiver")
}
Loading

0 comments on commit b732ef4

Please sign in to comment.