diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/exporterhelper/internal/bounded_memory_queue.go index 18c64a9d4b6..92b16875d04 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue.go @@ -52,16 +52,16 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu } // Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow. -func (q *boundedMemoryQueue) Produce(ctx context.Context, req any) bool { +func (q *boundedMemoryQueue) Offer(ctx context.Context, req any) error { if q.stopped.Load() { - return false + return ErrQueueIsStopped } select { case q.items <- newQueueRequest(ctx, req): - return true + return nil default: - return false + return ErrQueueIsFull } } diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index dfc76a077fd..2837a505974 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -56,7 +56,7 @@ func TestBoundedQueue(t *testing.T) { startLock.Unlock() }))) - assert.True(t, q.Produce(context.Background(), newStringRequest("a"))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("a"))) // at this point "a" may or may not have been received by the consumer go-routine // so let's make sure it has been @@ -69,10 +69,10 @@ func TestBoundedQueue(t *testing.T) { }) // produce two more items. The first one should be accepted, but not consumed. - assert.True(t, q.Produce(context.Background(), newStringRequest("b"))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("b"))) assert.Equal(t, 1, q.Size()) // the second should be rejected since the queue is full - assert.False(t, q.Produce(context.Background(), newStringRequest("c"))) + assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("c")), ErrQueueIsFull) assert.Equal(t, 1, q.Size()) startLock.Unlock() // unblock consumer @@ -88,13 +88,13 @@ func TestBoundedQueue(t *testing.T) { "b": true, } for _, item := range []string{"d", "e", "f"} { - assert.True(t, q.Produce(context.Background(), newStringRequest(item))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest(item))) expected[item] = true consumerState.assertConsumed(expected) } assert.NoError(t, q.Shutdown(context.Background())) - assert.False(t, q.Produce(context.Background(), newStringRequest("x")), "cannot push to closed queue") + assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("x")), ErrQueueIsStopped) } // In this test we run a queue with many items and a slow consumer. @@ -113,20 +113,20 @@ func TestShutdownWhileNotEmpty(t *testing.T) { time.Sleep(1 * time.Second) }))) - q.Produce(context.Background(), newStringRequest("a")) - q.Produce(context.Background(), newStringRequest("b")) - q.Produce(context.Background(), newStringRequest("c")) - q.Produce(context.Background(), newStringRequest("d")) - q.Produce(context.Background(), newStringRequest("e")) - q.Produce(context.Background(), newStringRequest("f")) - q.Produce(context.Background(), newStringRequest("g")) - q.Produce(context.Background(), newStringRequest("h")) - q.Produce(context.Background(), newStringRequest("i")) - q.Produce(context.Background(), newStringRequest("j")) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("a"))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("b"))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("c"))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("d"))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("e"))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("f"))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("g"))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("h"))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("i"))) + assert.NoError(t, q.Offer(context.Background(), newStringRequest("j"))) assert.NoError(t, q.Shutdown(context.Background())) - assert.False(t, q.Produce(context.Background(), newStringRequest("x")), "cannot push to closed queue") + assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("x")), ErrQueueIsStopped) consumerState.assertConsumed(map[string]bool{ "a": true, "b": true, @@ -193,7 +193,7 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int) })) require.NoError(b, err) for j := 0; j < numberOfItems; j++ { - q.Produce(context.Background(), newStringRequest(fmt.Sprintf("%d", j))) + _ = q.Offer(context.Background(), newStringRequest(fmt.Sprintf("%d", j))) } assert.NoError(b, q.Shutdown(context.Background())) } @@ -250,7 +250,7 @@ func TestZeroSizeWithConsumers(t *testing.T) { err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {})) assert.NoError(t, err) - assert.True(t, q.Produce(context.Background(), newStringRequest("a"))) // in process + assert.NoError(t, q.Offer(context.Background(), newStringRequest("a"))) // in process assert.NoError(t, q.Shutdown(context.Background())) } @@ -261,7 +261,7 @@ func TestZeroSizeNoConsumers(t *testing.T) { err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {})) assert.NoError(t, err) - assert.False(t, q.Produce(context.Background(), newStringRequest("a"))) // in process + assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("a")), ErrQueueIsFull) // in process assert.NoError(t, q.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index ae7803a6759..cd96ec908d1 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -73,11 +73,11 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q return nil } -// Produce adds an item to the queue and returns true if it was accepted -// Request context is currently ignored by the persistent queue. -func (pq *persistentQueue) Produce(_ context.Context, item any) bool { - err := pq.storage.put(item) - return err == nil +// Offer inserts the specified element into this queue if it is possible to do so immediately +// without violating capacity restrictions. If success returns no error. +// It returns ErrQueueIsFull if no space is currently available. +func (pq *persistentQueue) Offer(_ context.Context, item any) error { + return pq.storage.put(item) } // Shutdown stops accepting items, shuts down the queue and closes the persistent queue diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index c3f572c874d..fb17297db81 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -62,11 +62,11 @@ func TestPersistentQueue_Capacity(t *testing.T) { req := newFakeTracesRequest(newTraces(1, 10)) for i := 0; i < 10; i++ { - result := pq.Produce(context.Background(), req) + result := pq.Offer(context.Background(), req) if i < 5 { - assert.True(t, result) + assert.NoError(t, result) } else { - assert.False(t, result) + assert.ErrorIs(t, result, ErrQueueIsFull) } } assert.Equal(t, 5, pq.Size()) @@ -78,7 +78,7 @@ func TestPersistentQueueShutdown(t *testing.T) { req := newFakeTracesRequest(newTraces(1, 10)) for i := 0; i < 1000; i++ { - pq.Produce(context.Background(), req) + assert.NoError(t, pq.Offer(context.Background(), req)) } assert.NoError(t, pq.Shutdown(context.Background())) } @@ -101,7 +101,7 @@ func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) { } for i := 0; i < 1000; i++ { - pq.Produce(context.Background(), req) + assert.NoError(t, pq.Offer(context.Background(), req)) } assert.NoError(t, pq.Shutdown(context.Background())) assert.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time") @@ -147,7 +147,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { }) for i := 0; i < c.numMessagesProduced; i++ { - pq.Produce(context.Background(), req) + assert.NoError(t, pq.Offer(context.Background(), req)) } assert.Eventually(t, func() bool { diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index c237a95b204..c95b45c1f8d 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -71,8 +71,7 @@ const ( ) var ( - errMaxCapacityReached = errors.New("max capacity reached") - errValueNotSet = errors.New("value not set") + errValueNotSet = errors.New("value not set") ) // newPersistentContiguousStorage creates a new file-storage extension backed queue; @@ -187,7 +186,7 @@ func (pcs *persistentContiguousStorage) put(req any) error { if pcs.size() >= pcs.capacity { pcs.logger.Warn("Maximum queue capacity reached") - return errMaxCapacityReached + return ErrQueueIsFull } itemKey := getItemKey(pcs.writeIndex) diff --git a/exporter/exporterhelper/internal/queue.go b/exporter/exporterhelper/internal/queue.go index 63c125ac0c0..343a2f3350e 100644 --- a/exporter/exporterhelper/internal/queue.go +++ b/exporter/exporterhelper/internal/queue.go @@ -7,10 +7,18 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe import ( "context" + "errors" "go.opentelemetry.io/collector/component" ) +var ( + // ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full. + ErrQueueIsFull = errors.New("sending queue is full") + // ErrQueueIsStopped is the error returned when an item is offered to the Queue and the queue is stopped. + ErrQueueIsStopped = errors.New("sending queue is stopped") +) + type QueueSettings struct { DataType component.DataType Callback func(QueueRequest) @@ -22,9 +30,10 @@ type Queue interface { // Start starts the queue with a given number of goroutines consuming items from the queue // and passing them into the consumer callback. Start(ctx context.Context, host component.Host, set QueueSettings) error - // Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added - // to the queue due to queue overflow. - Produce(ctx context.Context, item any) bool + // Offer inserts the specified element into this queue if it is possible to do so immediately + // without violating capacity restrictions. If success returns no error. + // It returns ErrQueueIsFull if no space is currently available. + Offer(ctx context.Context, item any) error // Size returns the current Size of the queue Size() int // Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped. diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 26958a50080..17c0ff47682 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/pdata/plog" ) @@ -95,7 +96,7 @@ func NewLogsExporter( lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req := newLogsRequest(ld, pusher) serr := be.send(ctx, req) - if errors.Is(serr, errSendingQueueIsFull) { + if errors.Is(serr, internal.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount())) } return serr @@ -146,7 +147,7 @@ func NewLogsRequestExporter( return consumererror.NewPermanent(cErr) } sErr := be.send(ctx, req) - if errors.Is(sErr, errSendingQueueIsFull) { + if errors.Is(sErr, internal.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount())) } return sErr diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 30ca548a2c3..f0f1f7d63ea 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -95,7 +96,7 @@ func NewMetricsExporter( mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { req := newMetricsRequest(md, pusher) serr := be.send(ctx, req) - if errors.Is(serr, errSendingQueueIsFull) { + if errors.Is(serr, internal.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount())) } return serr @@ -146,7 +147,7 @@ func NewMetricsRequestExporter( return consumererror.NewPermanent(cErr) } sErr := be.send(ctx, req) - if errors.Is(sErr, errSendingQueueIsFull) { + if errors.Is(sErr, internal.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount())) } return sErr diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index e0f3f3422ae..985aa6fb140 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -26,8 +26,7 @@ import ( const defaultQueueSize = 1000 var ( - errSendingQueueIsFull = errors.New("sending_queue is full") - scopeName = "go.opentelemetry.io/collector/exporterhelper" + scopeName = "go.opentelemetry.io/collector/exporterhelper" ) // QueueSettings defines configuration for queueing batches before sending to the consumerSender. @@ -119,7 +118,7 @@ func (qs *queueSender) onTemporaryFailure(ctx context.Context, req Request, err return err } - if qs.queue.Produce(ctx, req) { + if qs.queue.Offer(ctx, req) == nil { logger.Error( "Exporting failed. Putting back to the end of the queue.", zap.Error(err), @@ -220,13 +219,13 @@ func (qs *queueSender) send(ctx context.Context, req Request) error { c := noCancellationContext{Context: ctx} span := trace.SpanFromContext(c) - if !qs.queue.Produce(c, req) { + if err := qs.queue.Offer(c, req); err != nil { qs.logger.Error( "Dropping data because sending_queue is full. Try increasing queue_size.", zap.Int("dropped_items", req.ItemsCount()), ) span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qs.traceAttribute)) - return errSendingQueueIsFull + return err } span.AddEvent("Enqueued item.", trace.WithAttributes(qs.traceAttribute)) diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index ecf294dac9c..cbe13e49362 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -411,7 +411,7 @@ type producerConsumerQueueWithCounter struct { produceCounter *atomic.Uint32 } -func (pcq *producerConsumerQueueWithCounter) Produce(ctx context.Context, item any) bool { +func (pcq *producerConsumerQueueWithCounter) Offer(ctx context.Context, item any) error { pcq.produceCounter.Add(1) - return pcq.Queue.Produce(ctx, item) + return pcq.Queue.Offer(ctx, item) } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index d0d909b9976..84966b03e1d 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -95,7 +96,7 @@ func NewTracesExporter( tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { req := newTracesRequest(td, pusher) serr := be.send(ctx, req) - if errors.Is(serr, errSendingQueueIsFull) { + if errors.Is(serr, internal.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeTraces, int64(req.ItemsCount())) } return serr @@ -146,7 +147,7 @@ func NewTracesRequestExporter( return consumererror.NewPermanent(cErr) } sErr := be.send(ctx, req) - if errors.Is(sErr, errSendingQueueIsFull) { + if errors.Is(sErr, internal.ErrQueueIsFull) { be.obsrep.recordEnqueueFailure(ctx, component.DataTypeTraces, int64(req.ItemsCount())) } return sErr