diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 12a4252710c..4ed941f8cf7 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -116,14 +116,22 @@ type GRPCServerSettings struct { // Configures the protocol to use TLS. // The default value is nil, which will cause the protocol to not use TLS. - TLSCredentials *configtls.TLSServerSetting `mapstructure:"tls_credentials, omitempty"` + TLSCredentials *configtls.TLSServerSetting `mapstructure:"tls_credentials,omitempty"` // MaxRecvMsgSizeMiB sets the maximum size (in MiB) of messages accepted by the server. - MaxRecvMsgSizeMiB uint64 `mapstructure:"max_recv_msg_size_mib,omitempty"` + MaxRecvMsgSizeMiB uint64 `mapstructure:"max_recv_msg_size_mib"` // MaxConcurrentStreams sets the limit on the number of concurrent streams to each ServerTransport. // It has effect only for streaming RPCs. - MaxConcurrentStreams uint32 `mapstructure:"max_concurrent_streams,omitempty"` + MaxConcurrentStreams uint32 `mapstructure:"max_concurrent_streams"` + + // The WriteBufferSize for client gRPC. See grpc.ReadBufferSize + // (https://godoc.org/google.golang.org/grpc#ReadBufferSize). + ReadBufferSize int `mapstructure:"read_buffer_size"` + + // The WriteBufferSize for client gRPC. See grpc.WriteBufferSize + // (https://godoc.org/google.golang.org/grpc#WriteBufferSize). + WriteBufferSize int `mapstructure:"write_buffer_size"` // Keepalive anchor for all the settings related to keepalive. Keepalive *KeepaliveServerConfig `mapstructure:"keepalive,omitempty"` @@ -191,6 +199,14 @@ func (gss *GRPCServerSettings) ToServerOption() ([]grpc.ServerOption, error) { opts = append(opts, grpc.MaxConcurrentStreams(gss.MaxConcurrentStreams)) } + if gss.ReadBufferSize > 0 { + opts = append(opts, grpc.ReadBufferSize(gss.ReadBufferSize)) + } + + if gss.WriteBufferSize > 0 { + opts = append(opts, grpc.WriteBufferSize(gss.WriteBufferSize)) + } + // The default values referenced in the GRPC docs are set within the server, so this code doesn't need // to apply them over zero/nil values before passing these as grpc.ServerOptions. // The following shows the server code for applying default grpc.ServerOptions. diff --git a/config/configgrpc/configgrpc_test.go b/config/configgrpc/configgrpc_test.go index e027a6693bd..960939ac7e4 100644 --- a/config/configgrpc/configgrpc_test.go +++ b/config/configgrpc/configgrpc_test.go @@ -16,22 +16,83 @@ package configgrpc import ( "testing" + "time" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/config/configtls" ) -func TestBasicGrpcSettings(t *testing.T) { +func TestDefaultGrpcClientSettings(t *testing.T) { gcs := &GRPCClientSettings{ - Headers: nil, - Endpoint: "", - Compression: "", - Keepalive: nil, + TLSSetting: configtls.TLSClientSetting{ + Insecure: true, + }, } - _, err := gcs.ToDialOptions() + opts, err := gcs.ToDialOptions() + assert.NoError(t, err) + assert.Len(t, opts, 1) +} +func TestAllGrpcClientSettings(t *testing.T) { + gcs := &GRPCClientSettings{ + Headers: map[string]string{ + "test": "test", + }, + Endpoint: "localhost:1234", + Compression: "gzip", + TLSSetting: configtls.TLSClientSetting{ + Insecure: false, + }, + Keepalive: &KeepaliveClientConfig{ + Time: time.Second, + Timeout: time.Second, + PermitWithoutStream: true, + }, + ReadBufferSize: 1024, + WriteBufferSize: 1024, + WaitForReady: true, + } + opts, err := gcs.ToDialOptions() + assert.NoError(t, err) + assert.Len(t, opts, 5) +} + +func TestDefaultGrpcServerSettings(t *testing.T) { + gss := &GRPCServerSettings{} + opts, err := gss.ToServerOption() + assert.NoError(t, err) + assert.Len(t, opts, 0) +} + +func TestAllGrpcServerSettings(t *testing.T) { + gss := &GRPCServerSettings{ + Endpoint: "localhost:1234", + TLSCredentials: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{}, + ClientCAFile: "", + }, + MaxRecvMsgSizeMiB: 1, + MaxConcurrentStreams: 1024, + ReadBufferSize: 1024, + WriteBufferSize: 1024, + Keepalive: &KeepaliveServerConfig{ + ServerParameters: &KeepaliveServerParameters{ + MaxConnectionIdle: time.Second, + MaxConnectionAge: time.Second, + MaxConnectionAgeGrace: time.Second, + Time: time.Second, + Timeout: time.Second, + }, + EnforcementPolicy: &KeepaliveEnforcementPolicy{ + MinTime: time.Second, + PermitWithoutStream: true, + }, + }, + } + opts, err := gss.ToServerOption() assert.NoError(t, err) + assert.Len(t, opts, 7) } func TestGRPCClientSettingsError(t *testing.T) { diff --git a/receiver/opencensusreceiver/config_test.go b/receiver/opencensusreceiver/config_test.go index be893cb1fbd..a6829dfee5a 100644 --- a/receiver/opencensusreceiver/config_test.go +++ b/receiver/opencensusreceiver/config_test.go @@ -52,7 +52,8 @@ func TestLoadConfig(t *testing.T) { NameVal: "opencensus/customname", }, GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:9090", + Endpoint: "0.0.0.0:9090", + ReadBufferSize: 512 * 1024, }, Transport: "tcp", }) @@ -65,7 +66,8 @@ func TestLoadConfig(t *testing.T) { NameVal: "opencensus/keepalive", }, GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55678", + Endpoint: "0.0.0.0:55678", + ReadBufferSize: 512 * 1024, Keepalive: &configgrpc.KeepaliveServerConfig{ ServerParameters: &configgrpc.KeepaliveServerParameters{ MaxConnectionIdle: 11 * time.Second, @@ -94,6 +96,8 @@ func TestLoadConfig(t *testing.T) { Endpoint: "0.0.0.0:55678", MaxRecvMsgSizeMiB: 32, MaxConcurrentStreams: 16, + ReadBufferSize: 1024, + WriteBufferSize: 1024, Keepalive: &configgrpc.KeepaliveServerConfig{ ServerParameters: &configgrpc.KeepaliveServerParameters{ MaxConnectionIdle: 10 * time.Second, @@ -113,7 +117,8 @@ func TestLoadConfig(t *testing.T) { NameVal: "opencensus/tlscredentials", }, GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55678", + Endpoint: "0.0.0.0:55678", + ReadBufferSize: 512 * 1024, TLSCredentials: &configtls.TLSServerSetting{ TLSSetting: configtls.TLSSetting{ CertFile: "test.crt", @@ -132,7 +137,8 @@ func TestLoadConfig(t *testing.T) { NameVal: "opencensus/cors", }, GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55678", + Endpoint: "0.0.0.0:55678", + ReadBufferSize: 512 * 1024, }, Transport: "tcp", CorsOrigins: []string{"https://*.test.com", "https://test.com"}, @@ -146,7 +152,8 @@ func TestLoadConfig(t *testing.T) { NameVal: "opencensus/uds", }, GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "/tmp/opencensus.sock", + Endpoint: "/tmp/opencensus.sock", + ReadBufferSize: 512 * 1024, }, Transport: "unix", }) diff --git a/receiver/opencensusreceiver/factory.go b/receiver/opencensusreceiver/factory.go index 390d1a66a3a..20c8b2394b2 100644 --- a/receiver/opencensusreceiver/factory.go +++ b/receiver/opencensusreceiver/factory.go @@ -53,6 +53,8 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { }, GRPCServerSettings: configgrpc.GRPCServerSettings{ Endpoint: "0.0.0.0:55678", + // We almost write 0 bytes, so no need to tune WriteBufferSize. + ReadBufferSize: 512 * 1024, }, Transport: "tcp", } diff --git a/receiver/opencensusreceiver/testdata/config.yaml b/receiver/opencensusreceiver/testdata/config.yaml index c9c1d74c74c..d271521af44 100644 --- a/receiver/opencensusreceiver/testdata/config.yaml +++ b/receiver/opencensusreceiver/testdata/config.yaml @@ -31,6 +31,8 @@ receivers: opencensus/msg-size-conc-connect-max-idle: max_recv_msg_size_mib: 32 max_concurrent_streams: 16 + read_buffer_size: 1024 + write_buffer_size: 1024 keepalive: server_parameters: max_connection_idle: 10s diff --git a/receiver/otlpreceiver/config_test.go b/receiver/otlpreceiver/config_test.go index 95003e27531..d8fab515087 100644 --- a/receiver/otlpreceiver/config_test.go +++ b/receiver/otlpreceiver/config_test.go @@ -52,7 +52,8 @@ func TestLoadConfig(t *testing.T) { NameVal: "otlp/customname", }, GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "localhost:9090", + Endpoint: "localhost:9090", + ReadBufferSize: 512 * 1024, }, Transport: "tcp", }) @@ -65,7 +66,8 @@ func TestLoadConfig(t *testing.T) { NameVal: "otlp/keepalive", }, GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", + Endpoint: "0.0.0.0:55680", + ReadBufferSize: 512 * 1024, Keepalive: &configgrpc.KeepaliveServerConfig{ ServerParameters: &configgrpc.KeepaliveServerParameters{ MaxConnectionIdle: 11 * time.Second, @@ -94,6 +96,8 @@ func TestLoadConfig(t *testing.T) { Endpoint: "0.0.0.0:55680", MaxRecvMsgSizeMiB: 32, MaxConcurrentStreams: 16, + ReadBufferSize: 1024, + WriteBufferSize: 1024, Keepalive: &configgrpc.KeepaliveServerConfig{ ServerParameters: &configgrpc.KeepaliveServerParameters{ MaxConnectionIdle: 10 * time.Second, @@ -113,7 +117,8 @@ func TestLoadConfig(t *testing.T) { NameVal: "otlp/tlscredentials", }, GRPCServerSettings: configgrpc.GRPCServerSettings{ - Endpoint: "0.0.0.0:55680", + Endpoint: "0.0.0.0:55680", + ReadBufferSize: 512 * 1024, TLSCredentials: &configtls.TLSServerSetting{ TLSSetting: configtls.TLSSetting{ CertFile: "test.crt", @@ -133,7 +138,7 @@ func TestLoadConfig(t *testing.T) { }, GRPCServerSettings: configgrpc.GRPCServerSettings{ Endpoint: "0.0.0.0:55680", - TLSCredentials: nil, + ReadBufferSize: 512 * 1024, }, Transport: "tcp", CorsOrigins: []string{"https://*.test.com", "https://test.com"}, @@ -148,7 +153,7 @@ func TestLoadConfig(t *testing.T) { }, GRPCServerSettings: configgrpc.GRPCServerSettings{ Endpoint: "/tmp/otlp.sock", - TLSCredentials: nil, + ReadBufferSize: 512 * 1024, }, Transport: "unix", }) diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 50419e5d3cc..ed321ef95c7 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -51,6 +51,8 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { }, GRPCServerSettings: configgrpc.GRPCServerSettings{ Endpoint: "0.0.0.0:55680", + // We almost write 0 bytes, so no need to tune WriteBufferSize. + ReadBufferSize: 512 * 1024, }, Transport: "tcp", } diff --git a/receiver/otlpreceiver/testdata/config.yaml b/receiver/otlpreceiver/testdata/config.yaml index c3af95b3994..5bc45237b1b 100644 --- a/receiver/otlpreceiver/testdata/config.yaml +++ b/receiver/otlpreceiver/testdata/config.yaml @@ -27,6 +27,8 @@ receivers: otlp/msg-size-conc-connect-max-idle: max_recv_msg_size_mib: 32 max_concurrent_streams: 16 + read_buffer_size: 1024 + write_buffer_size: 1024 keepalive: server_parameters: max_connection_idle: 10s