diff --git a/.chloggen/admission-control-for-otlp.yaml b/.chloggen/admission-control-for-otlp.yaml new file mode 100644 index 000000000000..92f27e26b56c --- /dev/null +++ b/.chloggen/admission-control-for-otlp.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: | + Add admission control in the otelarrow receiver's standard otlp data path. + Also moves admission control config options to a separate block. + arrow.admission_limit_mib -> admission.request_limit_mib + arrow.waiter_limit -> admission.waiter_limit + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35021] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/internal/otelarrow/go.mod b/internal/otelarrow/go.mod index 6ab36d7dda67..813d801e8350 100644 --- a/internal/otelarrow/go.mod +++ b/internal/otelarrow/go.mod @@ -44,13 +44,19 @@ require ( github.com/fxamacker/cbor/v2 v2.4.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-viper/mapstructure/v2 v2.1.0 // indirect github.com/goccy/go-json v0.10.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.1.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mostynb/go-grpc-compression v1.2.3 // indirect @@ -69,6 +75,7 @@ require ( go.opentelemetry.io/collector/config/configretry v1.16.0 // indirect go.opentelemetry.io/collector/config/configtls v1.16.0 // indirect go.opentelemetry.io/collector/config/internal v0.110.0 // indirect + go.opentelemetry.io/collector/confmap v1.16.0 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 // indirect go.opentelemetry.io/collector/extension v0.110.0 // indirect go.opentelemetry.io/collector/extension/auth v0.110.0 // indirect diff --git a/receiver/otelarrowreceiver/README.md b/receiver/otelarrowreceiver/README.md index b3451502c968..2714b0cae795 100644 --- a/receiver/otelarrowreceiver/README.md +++ b/receiver/otelarrowreceiver/README.md @@ -77,6 +77,16 @@ Several common configuration structures provide additional capabilities automati - [gRPC settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configgrpc/README.md) - [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) +### Admission Control Configuration + +In the `admission` configuration block the following settings are available: + +- `request_limit_mib` (default: 128): limits the number of requests that are received by the stream in terms of *uncompressed request size*. This should not be confused with `arrow.memory_limit_mib` which limits allocations made by the consumer when translating arrow records into pdata objects. i.e. request size is used to control how much traffic we admit, but does not control how much memory is used during request processing. + +- `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. + +`request_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. + ### Arrow-specific Configuration In the `arrow` configuration block, the following settings are available: @@ -87,13 +97,6 @@ When the limit is reached, the receiver will return RESOURCE_EXHAUSTED error codes to the receiver, which are [conditionally retryable, see exporter retry configuration](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md). -- `admission_limit_mib` (default: 64): limits the number of requests that are received by the stream based on request size information available. This should not be confused with `memory_limit_mib` which limits allocations made by the consumer when translating arrow records into pdata objects. i.e. request size is used to control how much traffic we admit, but does not control how much memory is used during request processing. - -- `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. - -`admission_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/otel-arrow/tree/main/collector/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. - - ### Compression Configuration In the `arrow` configuration block, `zstd` sub-section applies to all diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index cc2d47c929cb..220b0fc45124 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/confmap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd" ) @@ -18,6 +19,18 @@ type Protocols struct { Arrow ArrowConfig `mapstructure:"arrow"` } +type AdmissionConfig struct { + // RequestLimitMiB limits the number of requests that are received by the stream based on + // uncompressed request size. Request size is used to control how much traffic we admit + // for processing. + RequestLimitMiB uint64 `mapstructure:"request_limit_mib"` + + // WaiterLimit is the limit on the number of waiters waiting to be processed and consumed. + // This is a dimension of memory limiting to ensure waiters are not consuming an + // unexpectedly large amount of memory in the arrow receiver. + WaiterLimit int64 `mapstructure:"waiter_limit"` +} + // ArrowConfig support configuring the Arrow receiver. type ArrowConfig struct { // MemoryLimitMiB is the size of a shared memory region used @@ -25,15 +38,11 @@ type ArrowConfig struct { // passing through, they will see ResourceExhausted errors. MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"` - // AdmissionLimitMiB limits the number of requests that are received by the stream based on - // request size information available. Request size is used to control how much traffic we admit - // for processing, but does not control how much memory is used during request processing. - AdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"` + // Deprecated: This field is no longer supported, use cfg.Admission.RequestLimitMiB instead. + DeprecatedAdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"` - // WaiterLimit is the limit on the number of waiters waiting to be processed and consumed. - // This is a dimension of memory limiting to ensure waiters are not consuming an - // unexpectedly large amount of memory in the arrow receiver. - WaiterLimit int64 `mapstructure:"waiter_limit"` + // Deprecated: This field is no longer supported, use cfg.Admission.WaiterLimit instead. + DeprecatedWaiterLimit int64 `mapstructure:"waiter_limit"` // Zstd settings apply to OTel-Arrow use of gRPC specifically. Zstd zstd.DecoderConfig `mapstructure:"zstd"` @@ -43,6 +52,8 @@ type ArrowConfig struct { type Config struct { // Protocols is the configuration for gRPC and Arrow. Protocols `mapstructure:"protocols"` + // Admission is the configuration for controlling amount of request memory entering the receiver. + Admission AdmissionConfig `mapstructure:"admission"` } var _ component.Config = (*Config)(nil) @@ -54,3 +65,27 @@ func (cfg *ArrowConfig) Validate() error { } return nil } + +func (cfg *Config) Validate() error { + if err := cfg.GRPC.Validate(); err != nil { + return err + } + if err := cfg.Arrow.Validate(); err != nil { + return err + } + return nil +} + +// Unmarshal will apply deprecated field values to assist the user with migration. +func (cfg *Config) Unmarshal(conf *confmap.Conf) error { + if err := conf.Unmarshal(cfg); err != nil { + return err + } + if cfg.Admission.RequestLimitMiB == 0 && cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { + cfg.Admission.RequestLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB + } + if cfg.Admission.WaiterLimit == 0 && cfg.Arrow.DeprecatedWaiterLimit != 0 { + cfg.Admission.WaiterLimit = cfg.Arrow.DeprecatedWaiterLimit + } + return nil +} diff --git a/receiver/otelarrowreceiver/config_test.go b/receiver/otelarrowreceiver/config_test.go index 0f373c65df51..60edaf00cf61 100644 --- a/receiver/otelarrowreceiver/config_test.go +++ b/receiver/otelarrowreceiver/config_test.go @@ -77,15 +77,41 @@ func TestUnmarshalConfig(t *testing.T) { }, }, Arrow: ArrowConfig{ - MemoryLimitMiB: 123, - AdmissionLimitMiB: 80, - WaiterLimit: 100, + MemoryLimitMiB: 123, }, }, + Admission: AdmissionConfig{ + RequestLimitMiB: 80, + WaiterLimit: 100, + }, }, cfg) } +// Tests that a deprecated config validation sets RequestLimitMiB and WaiterLimit in the correct config block. +func TestValidateDeprecatedConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "deprecated.yaml")) + require.NoError(t, err) + cfg := &Config{} + assert.NoError(t, cm.Unmarshal(cfg)) + assert.NoError(t, cfg.Validate()) + assert.Equal(t, + &Config{ + Protocols: Protocols{ + Arrow: ArrowConfig{ + MemoryLimitMiB: 123, + DeprecatedAdmissionLimitMiB: 80, + DeprecatedWaiterLimit: 100, + }, + }, + Admission: AdmissionConfig{ + // cfg.Validate should now set these fields. + RequestLimitMiB: 80, + WaiterLimit: 100, + }, + }, cfg) +} + func TestUnmarshalConfigUnix(t *testing.T) { cm, err := confmaptest.LoadConf(filepath.Join("testdata", "uds.yaml")) require.NoError(t, err) @@ -103,11 +129,13 @@ func TestUnmarshalConfigUnix(t *testing.T) { ReadBufferSize: 512 * 1024, }, Arrow: ArrowConfig{ - MemoryLimitMiB: defaultMemoryLimitMiB, - AdmissionLimitMiB: defaultAdmissionLimitMiB, - WaiterLimit: defaultWaiterLimit, + MemoryLimitMiB: defaultMemoryLimitMiB, }, }, + Admission: AdmissionConfig{ + RequestLimitMiB: defaultRequestLimitMiB, + WaiterLimit: defaultWaiterLimit, + }, }, cfg) } diff --git a/receiver/otelarrowreceiver/factory.go b/receiver/otelarrowreceiver/factory.go index 9c4c57dcab60..92d154060d86 100644 --- a/receiver/otelarrowreceiver/factory.go +++ b/receiver/otelarrowreceiver/factory.go @@ -19,9 +19,9 @@ import ( const ( defaultGRPCEndpoint = "0.0.0.0:4317" - defaultMemoryLimitMiB = 128 - defaultAdmissionLimitMiB = defaultMemoryLimitMiB / 2 - defaultWaiterLimit = 1000 + defaultMemoryLimitMiB = 128 + defaultRequestLimitMiB = 128 + defaultWaiterLimit = 1000 ) // NewFactory creates a new OTel-Arrow receiver factory. @@ -47,11 +47,13 @@ func createDefaultConfig() component.Config { ReadBufferSize: 512 * 1024, }, Arrow: ArrowConfig{ - MemoryLimitMiB: defaultMemoryLimitMiB, - AdmissionLimitMiB: defaultAdmissionLimitMiB, - WaiterLimit: defaultWaiterLimit, + MemoryLimitMiB: defaultMemoryLimitMiB, }, }, + Admission: AdmissionConfig{ + RequestLimitMiB: defaultRequestLimitMiB, + WaiterLimit: defaultWaiterLimit, + }, } } diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 72f62dc9d28d..23ec4c96fbbc 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -7,8 +7,12 @@ import ( "context" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" ) const dataFormatProtobuf = "protobuf" @@ -18,13 +22,19 @@ type Receiver struct { plogotlp.UnimplementedGRPCServer nextConsumer consumer.Logs obsrecv *receiverhelper.ObsReport + boundedQueue *admission.BoundedQueue + sizer *plog.ProtoMarshaler + logger *zap.Logger } // New creates a new Receiver reference. -func New(nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, + boundedQueue: bq, + sizer: &plog.ProtoMarshaler{}, + logger: logger, } } @@ -37,7 +47,19 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog } ctx = r.obsrecv.StartLogsOp(ctx) - err := r.nextConsumer.ConsumeLogs(ctx, ld) + + sizeBytes := int64(r.sizer.LogsSize(req.Logs())) + err := r.boundedQueue.Acquire(ctx, sizeBytes) + if err != nil { + return plogotlp.NewExportResponse(), err + } + defer func() { + if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { + r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) + } + }() + + err = r.nextConsumer.ConsumeLogs(ctx, ld) r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err) return plogotlp.NewExportResponse(), err diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index 8c00b1c78f17..bf58f738c34e 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -7,7 +7,9 @@ import ( "context" "errors" "net" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,10 +19,20 @@ import ( "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/multierr" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" +) + +const ( + maxWaiters = 10 + maxBytes = int64(250) ) func TestExport(t *testing.T) { @@ -57,6 +69,61 @@ func TestExport_ErrorConsumer(t *testing.T) { assert.Equal(t, plogotlp.ExportResponse{}, resp) } +func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { + ld := testdata.GenerateLogs(10) + logSink := new(consumertest.LogsSink) + req := plogotlp.NewExportRequestFromLogs(ld) + + logClient := makeLogsServiceClient(t, logSink) + resp, err := logClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + assert.Equal(t, plogotlp.ExportResponse{}, resp) +} + +func TestExport_TooManyWaiters(t *testing.T) { + bc := testconsumer.NewBlockingConsumer() + + logsClient := makeLogsServiceClient(t, bc) + bg := context.Background() + var errs, err error + ld := testdata.GenerateLogs(1) + req := plogotlp.NewExportRequestFromLogs(ld) + var mtx sync.Mutex + numResponses := 0 + // Send request that will acquire all of the semaphores bytes and block. + go func() { + _, err = logsClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses++ + mtx.Unlock() + }() + + for i := 0; i < maxWaiters+1; i++ { + go func() { + _, err := logsClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses++ + mtx.Unlock() + }() + } + + // sleep so all async requests are blocked on semaphore Acquire. + time.Sleep(1 * time.Second) + + // unblock and wait for errors to be returned and written. + bc.Unblock() + assert.Eventually(t, func() bool { + mtx.Lock() + defer mtx.Unlock() + errSlice := multierr.Errors(errs) + return numResponses == maxWaiters+2 && len(errSlice) == 1 + }, 3*time.Second, 10*time.Millisecond) + + assert.ErrorContains(t, errs, "too many waiters") +} + func makeLogsServiceClient(t *testing.T, lc consumer.Logs) plogotlp.GRPCClient { addr := otlpReceiverOnGRPCServer(t, lc) cc, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -84,7 +151,9 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { ReceiverCreateSettings: set, }) require.NoError(t, err) - r := New(lc, obsrecv) + + bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) + r := New(zap.NewNop(), lc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() plogotlp.RegisterGRPCServer(srv, r) diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index 77e12a86ce14..d038d63bef3d 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -7,8 +7,12 @@ import ( "context" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" ) const dataFormatProtobuf = "protobuf" @@ -18,13 +22,19 @@ type Receiver struct { pmetricotlp.UnimplementedGRPCServer nextConsumer consumer.Metrics obsrecv *receiverhelper.ObsReport + boundedQueue *admission.BoundedQueue + sizer *pmetric.ProtoMarshaler + logger *zap.Logger } // New creates a new Receiver reference. -func New(nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, + boundedQueue: bq, + sizer: &pmetric.ProtoMarshaler{}, + logger: logger, } } @@ -37,7 +47,19 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p } ctx = r.obsrecv.StartMetricsOp(ctx) - err := r.nextConsumer.ConsumeMetrics(ctx, md) + + sizeBytes := int64(r.sizer.MetricsSize(req.Metrics())) + err := r.boundedQueue.Acquire(ctx, sizeBytes) + if err != nil { + return pmetricotlp.NewExportResponse(), err + } + defer func() { + if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { + r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) + } + }() + + err = r.nextConsumer.ConsumeMetrics(ctx, md) r.obsrecv.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err) return pmetricotlp.NewExportResponse(), err diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index 71c4f939d813..9bd0b9911e57 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -7,7 +7,9 @@ import ( "context" "errors" "net" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,10 +19,20 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/multierr" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" +) + +const ( + maxWaiters = 10 + maxBytes = int64(250) ) func TestExport(t *testing.T) { @@ -57,6 +69,61 @@ func TestExport_ErrorConsumer(t *testing.T) { assert.Equal(t, pmetricotlp.ExportResponse{}, resp) } +func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { + md := testdata.GenerateMetrics(10) + metricSink := new(consumertest.MetricsSink) + req := pmetricotlp.NewExportRequestFromMetrics(md) + + metricsClient := makeMetricsServiceClient(t, metricSink) + resp, err := metricsClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + assert.Equal(t, pmetricotlp.ExportResponse{}, resp) +} + +func TestExport_TooManyWaiters(t *testing.T) { + bc := testconsumer.NewBlockingConsumer() + + metricsClient := makeMetricsServiceClient(t, bc) + bg := context.Background() + var errs, err error + md := testdata.GenerateMetrics(1) + req := pmetricotlp.NewExportRequestFromMetrics(md) + var mtx sync.Mutex + numResponses := 0 + // Send request that will acquire all of the semaphores bytes and block. + go func() { + _, err = metricsClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses++ + mtx.Unlock() + }() + + for i := 0; i < maxWaiters+1; i++ { + go func() { + _, err := metricsClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses++ + mtx.Unlock() + }() + } + + // sleep so all async requests are blocked on semaphore Acquire. + time.Sleep(1 * time.Second) + + // unblock and wait for errors to be returned and written. + bc.Unblock() + assert.Eventually(t, func() bool { + mtx.Lock() + defer mtx.Unlock() + errSlice := multierr.Errors(errs) + return numResponses == maxWaiters+2 && len(errSlice) == 1 + }, 3*time.Second, 10*time.Millisecond) + + assert.ErrorContains(t, errs, "too many waiters") +} + func makeMetricsServiceClient(t *testing.T, mc consumer.Metrics) pmetricotlp.GRPCClient { addr := otlpReceiverOnGRPCServer(t, mc) @@ -85,7 +152,9 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { ReceiverCreateSettings: set, }) require.NoError(t, err) - r := New(mc, obsrecv) + + bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) + r := New(zap.NewNop(), mc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() pmetricotlp.RegisterGRPCServer(srv, r) diff --git a/receiver/otelarrowreceiver/internal/testconsumer/blocking_consumer.go b/receiver/otelarrowreceiver/internal/testconsumer/blocking_consumer.go new file mode 100644 index 000000000000..b132048dbb8b --- /dev/null +++ b/receiver/otelarrowreceiver/internal/testconsumer/blocking_consumer.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" + +import ( + "context" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +type BlockingConsumer struct { + block chan struct{} +} + +func NewBlockingConsumer() *BlockingConsumer { + return &BlockingConsumer{ + block: make(chan struct{}), + } +} + +func (bc *BlockingConsumer) ConsumeTraces(_ context.Context, _ ptrace.Traces) error { + <-bc.block + return nil +} + +func (bc *BlockingConsumer) ConsumeMetrics(_ context.Context, _ pmetric.Metrics) error { + <-bc.block + return nil +} + +func (bc *BlockingConsumer) ConsumeLogs(_ context.Context, _ plog.Logs) error { + <-bc.block + return nil +} + +func (bc *BlockingConsumer) Unblock() { + close(bc.block) +} + +func (bc *BlockingConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index 82d836ed8b7e..af9bc335ea19 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -7,8 +7,12 @@ import ( "context" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" ) const dataFormatProtobuf = "protobuf" @@ -18,13 +22,19 @@ type Receiver struct { ptraceotlp.UnimplementedGRPCServer nextConsumer consumer.Traces obsrecv *receiverhelper.ObsReport + boundedQueue *admission.BoundedQueue + sizer *ptrace.ProtoMarshaler + logger *zap.Logger } // New creates a new Receiver reference. -func New(nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, + boundedQueue: bq, + sizer: &ptrace.ProtoMarshaler{}, + logger: logger, } } @@ -36,9 +46,20 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt if numSpans == 0 { return ptraceotlp.NewExportResponse(), nil } - ctx = r.obsrecv.StartTracesOp(ctx) - err := r.nextConsumer.ConsumeTraces(ctx, td) + + sizeBytes := int64(r.sizer.TracesSize(req.Traces())) + err := r.boundedQueue.Acquire(ctx, sizeBytes) + if err != nil { + return ptraceotlp.NewExportResponse(), err + } + defer func() { + if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { + r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) + } + }() + + err = r.nextConsumer.ConsumeTraces(ctx, td) r.obsrecv.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err) return ptraceotlp.NewExportResponse(), err diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index 2ce1929e875a..b968b79d20d8 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -7,7 +7,9 @@ import ( "context" "errors" "net" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,10 +19,20 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/multierr" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" +) + +const ( + maxWaiters = 10 + maxBytes = int64(250) ) func TestExport(t *testing.T) { @@ -55,6 +67,62 @@ func TestExport_ErrorConsumer(t *testing.T) { assert.Equal(t, ptraceotlp.ExportResponse{}, resp) } +func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { + td := testdata.GenerateTraces(10) + traceSink := new(consumertest.TracesSink) + req := ptraceotlp.NewExportRequestFromTraces(td) + + traceClient := makeTraceServiceClient(t, traceSink) + + resp, err := traceClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + assert.Equal(t, ptraceotlp.ExportResponse{}, resp) +} + +func TestExport_TooManyWaiters(t *testing.T) { + bc := testconsumer.NewBlockingConsumer() + + traceClient := makeTraceServiceClient(t, bc) + bg := context.Background() + var errs, err error + td := testdata.GenerateTraces(1) + req := ptraceotlp.NewExportRequestFromTraces(td) + var mtx sync.Mutex + numResponses := 0 + // Send request that will acquire all of the semaphores bytes and block. + go func() { + _, err = traceClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses++ + mtx.Unlock() + }() + + for i := 0; i < maxWaiters+1; i++ { + go func() { + _, err := traceClient.Export(bg, req) + mtx.Lock() + errs = multierr.Append(errs, err) + numResponses++ + mtx.Unlock() + }() + } + + // sleep so all async requests are blocked on semaphore Acquire. + time.Sleep(1 * time.Second) + + // unblock and wait for errors to be returned and written. + bc.Unblock() + assert.Eventually(t, func() bool { + mtx.Lock() + defer mtx.Unlock() + errSlice := multierr.Errors(errs) + return numResponses == maxWaiters+2 && len(errSlice) == 1 + }, 3*time.Second, 10*time.Millisecond) + + assert.ErrorContains(t, errs, "too many waiters") +} + func makeTraceServiceClient(t *testing.T, tc consumer.Traces) ptraceotlp.GRPCClient { addr := otlpReceiverOnGRPCServer(t, tc) cc, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -82,7 +150,8 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { ReceiverCreateSettings: set, }) require.NoError(t, err) - r := New(tc, obsrecv) + bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) + r := New(zap.NewNop(), tc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() ptraceotlp.RegisterGRPCServer(srv, r) diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index 1c0dde45c067..1f0a16f0d13a 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -43,8 +43,9 @@ type otelArrowReceiver struct { arrowReceiver *arrow.Receiver shutdownWG sync.WaitGroup - obsrepGRPC *receiverhelper.ObsReport - netReporter *netstats.NetworkReporter + obsrepGRPC *receiverhelper.ObsReport + netReporter *netstats.NetworkReporter + boundedQueue *admission.BoundedQueue settings receiver.Settings } @@ -53,14 +54,24 @@ type otelArrowReceiver struct { // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceiver, error) { + if cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { + set.Logger.Warn("arrow.admission_limit_mib is deprecated, using admission.request_limit_mib instead.") + } + + if cfg.Arrow.DeprecatedWaiterLimit != 0 { + set.Logger.Warn("arrow.waiter_limit is deprecated, using admission.waiter_limit instead.") + } + netReporter, err := netstats.NewReceiverNetworkReporter(set) if err != nil { return nil, err } + bq := admission.NewBoundedQueue(set.TracerProvider, int64(cfg.Admission.RequestLimitMiB<<20), cfg.Admission.WaiterLimit) r := &otelArrowReceiver{ - cfg: cfg, - settings: set, - netReporter: netReporter, + cfg: cfg, + settings: set, + netReporter: netReporter, + boundedQueue: bq, } if err = zstd.SetDecoderConfig(cfg.Arrow.Zstd); err != nil { return nil, err @@ -115,7 +126,6 @@ func (r *otelArrowReceiver) startProtocolServers(ctx context.Context, host compo return err } } - bq := admission.NewBoundedQueue(r.settings.TracerProvider, int64(r.cfg.Arrow.AdmissionLimitMiB<<20), r.cfg.Arrow.WaiterLimit) r.arrowReceiver, err = arrow.New(arrow.Consumers(r), r.settings, r.obsrepGRPC, r.cfg.GRPC, authServer, func() arrowRecord.ConsumerAPI { var opts []arrowRecord.Option @@ -127,7 +137,7 @@ func (r *otelArrowReceiver) startProtocolServers(ctx context.Context, host compo opts = append(opts, arrowRecord.WithMeterProvider(r.settings.TelemetrySettings.MeterProvider, r.settings.TelemetrySettings.MetricsLevel)) } return arrowRecord.NewConsumer(opts...) - }, bq, r.netReporter) + }, r.boundedQueue, r.netReporter) if err != nil { return err @@ -178,15 +188,15 @@ func (r *otelArrowReceiver) Shutdown(_ context.Context) error { } func (r *otelArrowReceiver) registerTraceConsumer(tc consumer.Traces) { - r.tracesReceiver = trace.New(tc, r.obsrepGRPC) + r.tracesReceiver = trace.New(r.settings.Logger, tc, r.obsrepGRPC, r.boundedQueue) } func (r *otelArrowReceiver) registerMetricsConsumer(mc consumer.Metrics) { - r.metricsReceiver = metrics.New(mc, r.obsrepGRPC) + r.metricsReceiver = metrics.New(r.settings.Logger, mc, r.obsrepGRPC, r.boundedQueue) } func (r *otelArrowReceiver) registerLogsConsumer(lc consumer.Logs) { - r.logsReceiver = logs.New(lc, r.obsrepGRPC) + r.logsReceiver = logs.New(r.settings.Logger, lc, r.obsrepGRPC, r.boundedQueue) } var _ arrow.Consumers = &otelArrowReceiver{} diff --git a/receiver/otelarrowreceiver/testdata/config.yaml b/receiver/otelarrowreceiver/testdata/config.yaml index 726263f82f9f..e911cafdd0c5 100644 --- a/receiver/otelarrowreceiver/testdata/config.yaml +++ b/receiver/otelarrowreceiver/testdata/config.yaml @@ -27,5 +27,6 @@ protocols: permit_without_stream: true arrow: memory_limit_mib: 123 - admission_limit_mib: 80 - waiter_limit: 100 +admission: + request_limit_mib: 80 + waiter_limit: 100 \ No newline at end of file diff --git a/receiver/otelarrowreceiver/testdata/deprecated.yaml b/receiver/otelarrowreceiver/testdata/deprecated.yaml new file mode 100644 index 000000000000..5675a753cb2d --- /dev/null +++ b/receiver/otelarrowreceiver/testdata/deprecated.yaml @@ -0,0 +1,6 @@ +protocols: + arrow: + memory_limit_mib: 123 + # these fields are deprecated and should populate cfg.Admission.AdmissionLimitMiB and cfg.Admission.WaiterLimit instead. + admission_limit_mib: 80 + waiter_limit: 100 \ No newline at end of file