diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index fd819126346..12a4252710c 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -71,6 +71,14 @@ type GRPCClientSettings struct { // (https://godoc.org/google.golang.org/grpc#WithKeepaliveParams). Keepalive *KeepaliveClientConfig `mapstructure:"keepalive"` + // The WriteBufferSize for client gRPC. See grpc.WithReadBufferSize + // (https://godoc.org/google.golang.org/grpc#WithReadBufferSize). + ReadBufferSize int `mapstructure:"read_buffer_size"` + + // The WriteBufferSize for client gRPC. See grpc.WithWriteBufferSize + // (https://godoc.org/google.golang.org/grpc#WithWriteBufferSize). + WriteBufferSize int `mapstructure:"write_buffer_size"` + // WaitForReady parameter configures client to wait for ready state before sending data. // (https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md) WaitForReady bool `mapstructure:"wait_for_ready"` @@ -122,18 +130,18 @@ type GRPCServerSettings struct { } // ToServerOption maps configgrpc.GRPCClientSettings to a slice of dial options for gRPC -func (settings *GRPCClientSettings) ToDialOptions() ([]grpc.DialOption, error) { +func (gcs *GRPCClientSettings) ToDialOptions() ([]grpc.DialOption, error) { opts := []grpc.DialOption{} - if settings.Compression != "" { - if compressionKey := GetGRPCCompressionKey(settings.Compression); compressionKey != CompressionUnsupported { + if gcs.Compression != "" { + if compressionKey := GetGRPCCompressionKey(gcs.Compression); compressionKey != CompressionUnsupported { opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor(compressionKey))) } else { - return nil, fmt.Errorf("unsupported compression type %q", settings.Compression) + return nil, fmt.Errorf("unsupported compression type %q", gcs.Compression) } } - tlsCfg, err := settings.TLSSetting.LoadTLSConfig() + tlsCfg, err := gcs.TLSSetting.LoadTLSConfig() if err != nil { return nil, err } @@ -143,11 +151,19 @@ func (settings *GRPCClientSettings) ToDialOptions() ([]grpc.DialOption, error) { } opts = append(opts, tlsDialOption) - if settings.Keepalive != nil { + if gcs.ReadBufferSize > 0 { + opts = append(opts, grpc.WithReadBufferSize(gcs.ReadBufferSize)) + } + + if gcs.WriteBufferSize > 0 { + opts = append(opts, grpc.WithWriteBufferSize(gcs.WriteBufferSize)) + } + + if gcs.Keepalive != nil { keepAliveOption := grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: settings.Keepalive.Time, - Timeout: settings.Keepalive.Timeout, - PermitWithoutStream: settings.Keepalive.PermitWithoutStream, + Time: gcs.Keepalive.Time, + Timeout: gcs.Keepalive.Timeout, + PermitWithoutStream: gcs.Keepalive.PermitWithoutStream, }) opts = append(opts, keepAliveOption) } diff --git a/exporter/jaegerexporter/factory.go b/exporter/jaegerexporter/factory.go index eba31407298..145fda5d405 100644 --- a/exporter/jaegerexporter/factory.go +++ b/exporter/jaegerexporter/factory.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configerror" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" ) @@ -44,6 +45,10 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter { TypeVal: typeStr, NameVal: typeStr, }, + GRPCClientSettings: configgrpc.GRPCClientSettings{ + // We almost read 0 bytes, so no need to tune ReadBufferSize. + WriteBufferSize: 512 * 1024, + }, } } diff --git a/exporter/opencensusexporter/config_test.go b/exporter/opencensusexporter/config_test.go index f8bd024a0ce..60387852359 100644 --- a/exporter/opencensusexporter/config_test.go +++ b/exporter/opencensusexporter/config_test.go @@ -67,6 +67,7 @@ func TestLoadConfig(t *testing.T) { PermitWithoutStream: true, Timeout: 30, }, + WriteBufferSize: 512 * 1024, }, NumWorkers: 123, ReconnectionDelay: 15, diff --git a/exporter/opencensusexporter/factory.go b/exporter/opencensusexporter/factory.go index f1132cd7159..f450d18fb50 100644 --- a/exporter/opencensusexporter/factory.go +++ b/exporter/opencensusexporter/factory.go @@ -51,6 +51,8 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter { }, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{}, + // We almost read 0 bytes, so no need to tune ReadBufferSize. + WriteBufferSize: 512 * 1024, }, } } diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index 9df0859cd19..8371c0c3d45 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -68,6 +68,7 @@ func TestLoadConfig(t *testing.T) { PermitWithoutStream: true, Timeout: 30 * time.Second, }, + WriteBufferSize: 512 * 1024, }, NumWorkers: 8, }) diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index c5e40ba42ee..12bf45a6064 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -45,6 +45,8 @@ func (f *Factory) CreateDefaultConfig() configmodels.Exporter { }, GRPCClientSettings: configgrpc.GRPCClientSettings{ Headers: map[string]string{}, + // We almost read 0 bytes, so no need to tune ReadBufferSize. + WriteBufferSize: 512 * 1024, }, } } diff --git a/testbed/tests/trace_test.go b/testbed/tests/trace_test.go index 33d1906e7b8..f3eb42b9625 100644 --- a/testbed/tests/trace_test.go +++ b/testbed/tests/trace_test.go @@ -54,7 +54,7 @@ func TestTrace10kSPS(t *testing.T) { testbed.NewJaegerDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 40, - ExpectedMaxRAM: 60, + ExpectedMaxRAM: 70, }, }, { @@ -63,7 +63,7 @@ func TestTrace10kSPS(t *testing.T) { testbed.NewOCDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 39, - ExpectedMaxRAM: 60, + ExpectedMaxRAM: 70, }, }, { @@ -72,7 +72,7 @@ func TestTrace10kSPS(t *testing.T) { testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 20, - ExpectedMaxRAM: 60, + ExpectedMaxRAM: 70, }, }, { @@ -81,7 +81,7 @@ func TestTrace10kSPS(t *testing.T) { testbed.NewZipkinDataReceiver(testbed.GetAvailablePort(t)), testbed.ResourceSpec{ ExpectedMaxCPU: 80, - ExpectedMaxRAM: 60, + ExpectedMaxRAM: 70, }, }, }