Skip to content

Commit

Permalink
Allow to tune the read/write buffers for gRPC server (#1218)
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdandrutu authored Jun 29, 2020
1 parent 10ce76d commit 7ef6c53
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 19 deletions.
22 changes: 19 additions & 3 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,22 @@ type GRPCServerSettings struct {

// Configures the protocol to use TLS.
// The default value is nil, which will cause the protocol to not use TLS.
TLSCredentials *configtls.TLSServerSetting `mapstructure:"tls_credentials, omitempty"`
TLSCredentials *configtls.TLSServerSetting `mapstructure:"tls_credentials,omitempty"`

// MaxRecvMsgSizeMiB sets the maximum size (in MiB) of messages accepted by the server.
MaxRecvMsgSizeMiB uint64 `mapstructure:"max_recv_msg_size_mib,omitempty"`
MaxRecvMsgSizeMiB uint64 `mapstructure:"max_recv_msg_size_mib"`

// MaxConcurrentStreams sets the limit on the number of concurrent streams to each ServerTransport.
// It has effect only for streaming RPCs.
MaxConcurrentStreams uint32 `mapstructure:"max_concurrent_streams,omitempty"`
MaxConcurrentStreams uint32 `mapstructure:"max_concurrent_streams"`

// The WriteBufferSize for client gRPC. See grpc.ReadBufferSize
// (https://godoc.org/google.golang.org/grpc#ReadBufferSize).
ReadBufferSize int `mapstructure:"read_buffer_size"`

// The WriteBufferSize for client gRPC. See grpc.WriteBufferSize
// (https://godoc.org/google.golang.org/grpc#WriteBufferSize).
WriteBufferSize int `mapstructure:"write_buffer_size"`

// Keepalive anchor for all the settings related to keepalive.
Keepalive *KeepaliveServerConfig `mapstructure:"keepalive,omitempty"`
Expand Down Expand Up @@ -191,6 +199,14 @@ func (gss *GRPCServerSettings) ToServerOption() ([]grpc.ServerOption, error) {
opts = append(opts, grpc.MaxConcurrentStreams(gss.MaxConcurrentStreams))
}

if gss.ReadBufferSize > 0 {
opts = append(opts, grpc.ReadBufferSize(gss.ReadBufferSize))
}

if gss.WriteBufferSize > 0 {
opts = append(opts, grpc.WriteBufferSize(gss.WriteBufferSize))
}

// The default values referenced in the GRPC docs are set within the server, so this code doesn't need
// to apply them over zero/nil values before passing these as grpc.ServerOptions.
// The following shows the server code for applying default grpc.ServerOptions.
Expand Down
73 changes: 67 additions & 6 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,83 @@ package configgrpc

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/config/configtls"
)

func TestBasicGrpcSettings(t *testing.T) {
func TestDefaultGrpcClientSettings(t *testing.T) {
gcs := &GRPCClientSettings{
Headers: nil,
Endpoint: "",
Compression: "",
Keepalive: nil,
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
}
_, err := gcs.ToDialOptions()
opts, err := gcs.ToDialOptions()
assert.NoError(t, err)
assert.Len(t, opts, 1)
}

func TestAllGrpcClientSettings(t *testing.T) {
gcs := &GRPCClientSettings{
Headers: map[string]string{
"test": "test",
},
Endpoint: "localhost:1234",
Compression: "gzip",
TLSSetting: configtls.TLSClientSetting{
Insecure: false,
},
Keepalive: &KeepaliveClientConfig{
Time: time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
WaitForReady: true,
}
opts, err := gcs.ToDialOptions()
assert.NoError(t, err)
assert.Len(t, opts, 5)
}

func TestDefaultGrpcServerSettings(t *testing.T) {
gss := &GRPCServerSettings{}
opts, err := gss.ToServerOption()
assert.NoError(t, err)
assert.Len(t, opts, 0)
}

func TestAllGrpcServerSettings(t *testing.T) {
gss := &GRPCServerSettings{
Endpoint: "localhost:1234",
TLSCredentials: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{},
ClientCAFile: "",
},
MaxRecvMsgSizeMiB: 1,
MaxConcurrentStreams: 1024,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Keepalive: &KeepaliveServerConfig{
ServerParameters: &KeepaliveServerParameters{
MaxConnectionIdle: time.Second,
MaxConnectionAge: time.Second,
MaxConnectionAgeGrace: time.Second,
Time: time.Second,
Timeout: time.Second,
},
EnforcementPolicy: &KeepaliveEnforcementPolicy{
MinTime: time.Second,
PermitWithoutStream: true,
},
},
}
opts, err := gss.ToServerOption()
assert.NoError(t, err)
assert.Len(t, opts, 7)
}

func TestGRPCClientSettingsError(t *testing.T) {
Expand Down
17 changes: 12 additions & 5 deletions receiver/opencensusreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func TestLoadConfig(t *testing.T) {
NameVal: "opencensus/customname",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:9090",
Endpoint: "0.0.0.0:9090",
ReadBufferSize: 512 * 1024,
},
Transport: "tcp",
})
Expand All @@ -65,7 +66,8 @@ func TestLoadConfig(t *testing.T) {
NameVal: "opencensus/keepalive",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55678",
Endpoint: "0.0.0.0:55678",
ReadBufferSize: 512 * 1024,
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 11 * time.Second,
Expand Down Expand Up @@ -94,6 +96,8 @@ func TestLoadConfig(t *testing.T) {
Endpoint: "0.0.0.0:55678",
MaxRecvMsgSizeMiB: 32,
MaxConcurrentStreams: 16,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 10 * time.Second,
Expand All @@ -113,7 +117,8 @@ func TestLoadConfig(t *testing.T) {
NameVal: "opencensus/tlscredentials",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55678",
Endpoint: "0.0.0.0:55678",
ReadBufferSize: 512 * 1024,
TLSCredentials: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
Expand All @@ -132,7 +137,8 @@ func TestLoadConfig(t *testing.T) {
NameVal: "opencensus/cors",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55678",
Endpoint: "0.0.0.0:55678",
ReadBufferSize: 512 * 1024,
},
Transport: "tcp",
CorsOrigins: []string{"https://*.test.com", "https://test.com"},
Expand All @@ -146,7 +152,8 @@ func TestLoadConfig(t *testing.T) {
NameVal: "opencensus/uds",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "/tmp/opencensus.sock",
Endpoint: "/tmp/opencensus.sock",
ReadBufferSize: 512 * 1024,
},
Transport: "unix",
})
Expand Down
2 changes: 2 additions & 0 deletions receiver/opencensusreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55678",
// We almost write 0 bytes, so no need to tune WriteBufferSize.
ReadBufferSize: 512 * 1024,
},
Transport: "tcp",
}
Expand Down
2 changes: 2 additions & 0 deletions receiver/opencensusreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ receivers:
opencensus/msg-size-conc-connect-max-idle:
max_recv_msg_size_mib: 32
max_concurrent_streams: 16
read_buffer_size: 1024
write_buffer_size: 1024
keepalive:
server_parameters:
max_connection_idle: 10s
Expand Down
15 changes: 10 additions & 5 deletions receiver/otlpreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func TestLoadConfig(t *testing.T) {
NameVal: "otlp/customname",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "localhost:9090",
Endpoint: "localhost:9090",
ReadBufferSize: 512 * 1024,
},
Transport: "tcp",
})
Expand All @@ -65,7 +66,8 @@ func TestLoadConfig(t *testing.T) {
NameVal: "otlp/keepalive",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
Endpoint: "0.0.0.0:55680",
ReadBufferSize: 512 * 1024,
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 11 * time.Second,
Expand Down Expand Up @@ -94,6 +96,8 @@ func TestLoadConfig(t *testing.T) {
Endpoint: "0.0.0.0:55680",
MaxRecvMsgSizeMiB: 32,
MaxConcurrentStreams: 16,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 10 * time.Second,
Expand All @@ -113,7 +117,8 @@ func TestLoadConfig(t *testing.T) {
NameVal: "otlp/tlscredentials",
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
Endpoint: "0.0.0.0:55680",
ReadBufferSize: 512 * 1024,
TLSCredentials: &configtls.TLSServerSetting{
TLSSetting: configtls.TLSSetting{
CertFile: "test.crt",
Expand All @@ -133,7 +138,7 @@ func TestLoadConfig(t *testing.T) {
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
TLSCredentials: nil,
ReadBufferSize: 512 * 1024,
},
Transport: "tcp",
CorsOrigins: []string{"https://*.test.com", "https://test.com"},
Expand All @@ -148,7 +153,7 @@ func TestLoadConfig(t *testing.T) {
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "/tmp/otlp.sock",
TLSCredentials: nil,
ReadBufferSize: 512 * 1024,
},
Transport: "unix",
})
Expand Down
2 changes: 2 additions & 0 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {
},
GRPCServerSettings: configgrpc.GRPCServerSettings{
Endpoint: "0.0.0.0:55680",
// We almost write 0 bytes, so no need to tune WriteBufferSize.
ReadBufferSize: 512 * 1024,
},
Transport: "tcp",
}
Expand Down
2 changes: 2 additions & 0 deletions receiver/otlpreceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ receivers:
otlp/msg-size-conc-connect-max-idle:
max_recv_msg_size_mib: 32
max_concurrent_streams: 16
read_buffer_size: 1024
write_buffer_size: 1024
keepalive:
server_parameters:
max_connection_idle: 10s
Expand Down

0 comments on commit 7ef6c53

Please sign in to comment.