diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index d66383919a0..ff077713d93 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/zap" @@ -291,11 +292,14 @@ func TestSpanProcessorErrors(t *testing.T) { type blockingWriter struct { sync.Mutex + inWriteSpan atomic.Int32 } func (w *blockingWriter) WriteSpan(ctx context.Context, span *model.Span) error { + w.inWriteSpan.Inc() w.Lock() defer w.Unlock() + w.inWriteSpan.Dec() return nil } @@ -666,18 +670,31 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { Options.NumWorkers(1), Options.QueueSize(1), Options.OnDroppedSpan(customOnDroppedSpan), + Options.ReportBusy(true), ).(*spanProcessor) defer p.Close() - // block the writer so that the first span is read from the queue and blocks the processor, and followings are dropped. + + // Acquire the lock externally to force the writer to block. w.Lock() defer w.Unlock() + opts := processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat} _, err := p.ProcessSpans([]*model.Span{ {OperationName: "op1"}, + }, opts) + require.NoError(t, err) + + // Wait for the sole worker to pick the item from the queue and block + assert.Eventually(t, + func() bool { return w.inWriteSpan.Load() == 1 }, + time.Second, time.Microsecond) + + // Now the queue is empty again and can accept one more item, but no workers available. + // If we send two items, the last one will have to be dropped. + _, err = p.ProcessSpans([]*model.Span{ {OperationName: "op2"}, {OperationName: "op3"}, - }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) - - assert.NoError(t, err) - assert.Equal(t, []string{"op2", "op3"}, droppedOperations) + }, opts) + assert.EqualError(t, err, processor.ErrBusy.Error()) + assert.Equal(t, []string{"op3"}, droppedOperations) }