diff --git a/internal/otelarrow/admission/boundedqueue.go b/internal/otelarrow/admission/boundedqueue.go index 388cd93285bb..ea3f255db551 100644 --- a/internal/otelarrow/admission/boundedqueue.go +++ b/internal/otelarrow/admission/boundedqueue.go @@ -10,6 +10,9 @@ import ( "github.com/google/uuid" orderedmap "github.com/wk8/go-ordered-map/v2" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters") @@ -21,6 +24,7 @@ type BoundedQueue struct { currentWaiters int64 lock sync.Mutex waiters *orderedmap.OrderedMap[uuid.UUID, waiter] + tracer trace.Tracer } type waiter struct { @@ -29,11 +33,12 @@ type waiter struct { ID uuid.UUID } -func NewBoundedQueue(maxLimitBytes, maxLimitWaiters int64) *BoundedQueue { +func NewBoundedQueue(tp trace.TracerProvider, maxLimitBytes, maxLimitWaiters int64) *BoundedQueue { return &BoundedQueue{ maxLimitBytes: maxLimitBytes, maxLimitWaiters: maxLimitWaiters, waiters: orderedmap.New[uuid.UUID, waiter](), + tracer: tp.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"), } } @@ -87,7 +92,9 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error { } bq.lock.Unlock() - // @@@ instrument this code path + ctx, span := bq.tracer.Start(ctx, "admission_blocked", + trace.WithAttributes(attribute.Int64("pending", pendingBytes))) + defer span.End() select { case <-curWaiter.readyCh: @@ -97,6 +104,7 @@ func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error { bq.lock.Lock() defer bq.lock.Unlock() err = fmt.Errorf("context canceled: %w ", ctx.Err()) + span.SetStatus(codes.Error, "context canceled") _, found := bq.waiters.Delete(curWaiter.ID) if !found { diff --git a/internal/otelarrow/admission/boundedqueue_test.go b/internal/otelarrow/admission/boundedqueue_test.go index a56ea86e7461..e0c4ac471f10 100644 --- a/internal/otelarrow/admission/boundedqueue_test.go +++ b/internal/otelarrow/admission/boundedqueue_test.go @@ -11,6 +11,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/multierr" ) @@ -34,13 +38,16 @@ func abs(x int64) int64 { } return x } + +var noopTraces = noop.NewTracerProvider() + func TestAcquireSimpleNoWaiters(t *testing.T) { maxLimitBytes := 1000 maxLimitWaiters := 10 numRequests := 40 requestSize := 21 - bq := NewBoundedQueue(int64(maxLimitBytes), int64(maxLimitWaiters)) + bq := NewBoundedQueue(noopTraces, int64(maxLimitBytes), int64(maxLimitWaiters)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -92,7 +99,7 @@ func TestAcquireBoundedWithWaiters(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - bq := NewBoundedQueue(tt.maxLimitBytes, tt.maxLimitWaiters) + bq := NewBoundedQueue(noopTraces, tt.maxLimitBytes, tt.maxLimitWaiters) var blockedRequests int64 numReqsUntilBlocked := tt.maxLimitBytes / tt.requestSize requestsAboveLimit := abs(tt.numRequests - numReqsUntilBlocked) @@ -151,7 +158,10 @@ func TestAcquireContextCanceled(t *testing.T) { blockedRequests := min(int64(maxLimitWaiters), requestsAboveLimit) - bq := NewBoundedQueue(int64(maxLimitBytes), int64(maxLimitWaiters)) + exp := tracetest.NewInMemoryExporter() + tp := trace.NewTracerProvider(trace.WithSyncer(exp)) + + bq := NewBoundedQueue(tp, int64(maxLimitBytes), int64(maxLimitWaiters)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) var errs error @@ -178,6 +188,16 @@ func TestAcquireContextCanceled(t *testing.T) { wg.Wait() assert.ErrorContains(t, errs, "context canceled") + // Expect spans named admission_blocked w/ context canceled. + spans := exp.GetSpans() + exp.Reset() + assert.NotEmpty(t, spans) + for _, span := range spans { + assert.Equal(t, "admission_blocked", span.Name) + assert.Equal(t, codes.Error, span.Status.Code) + assert.Equal(t, "context canceled", span.Status.Description) + } + // Now all waiters should have returned and been removed. assert.Equal(t, 0, bq.waiters.Len()) @@ -186,4 +206,8 @@ func TestAcquireContextCanceled(t *testing.T) { assert.Equal(t, int64(0), bq.currentWaiters) } assert.True(t, bq.TryAcquire(int64(maxLimitBytes))) + + // Expect no more spans, because admission was not blocked. + spans = exp.GetSpans() + require.Empty(t, spans) } diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 70c31e9e7320..7a136e918731 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -36,6 +36,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" "golang.org/x/net/http2/hpack" @@ -49,8 +50,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow/mock" ) +var noopTraces = noop.NewTracerProvider() + func defaultBQ() *admission.BoundedQueue { - return admission.NewBoundedQueue(int64(100000), int64(10)) + return admission.NewBoundedQueue(noopTraces, int64(100000), int64(10)) } type compareJSONTraces struct{ ptrace.Traces } @@ -464,10 +467,10 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) { var bq *admission.BoundedQueue if tt.rejected { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0) - bq = admission.NewBoundedQueue(int64(sizer.TracesSize(td)-100), 10) + bq = admission.NewBoundedQueue(noopTraces, int64(sizer.TracesSize(td)-100), 10) } else { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - bq = admission.NewBoundedQueue(defaultBoundedQueueLimit, 10) + bq = admission.NewBoundedQueue(noopTraces, defaultBoundedQueueLimit, 10) } ctc.start(ctc.newRealConsumer, bq) diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index 0381e84771bf..b90c9c36a615 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -115,7 +115,7 @@ func (r *otelArrowReceiver) startProtocolServers(ctx context.Context, host compo return err } } - bq := admission.NewBoundedQueue(int64(r.cfg.Arrow.AdmissionLimitMiB<<20), r.cfg.Arrow.WaiterLimit) + bq := admission.NewBoundedQueue(r.settings.TracerProvider, int64(r.cfg.Arrow.AdmissionLimitMiB<<20), r.cfg.Arrow.WaiterLimit) r.arrowReceiver, err = arrow.New(arrow.Consumers(r), r.settings, r.obsrepGRPC, r.cfg.GRPC, authServer, func() arrowRecord.ConsumerAPI { var opts []arrowRecord.Option