Skip to content

Commit

Permalink
[chore] rename internal Queue func to Offer; return error
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Nov 14, 2023
1 parent d0f9a78 commit 31f8454
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 53 deletions.
8 changes: 4 additions & 4 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (q *boundedMemoryQueue) Produce(ctx context.Context, req any) bool {
func (q *boundedMemoryQueue) Offer(ctx context.Context, req any) error {
if q.stopped.Load() {
return false
return ErrQueueIsStopped
}

select {
case q.items <- newQueueRequest(ctx, req):
return true
return nil
default:
return false
return ErrQueueIsFull
}
}

Expand Down
38 changes: 19 additions & 19 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestBoundedQueue(t *testing.T) {
startLock.Unlock()
})))

assert.True(t, q.Produce(context.Background(), newStringRequest("a")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("a")))

// at this point "a" may or may not have been received by the consumer go-routine
// so let's make sure it has been
Expand All @@ -69,10 +69,10 @@ func TestBoundedQueue(t *testing.T) {
})

// produce two more items. The first one should be accepted, but not consumed.
assert.True(t, q.Produce(context.Background(), newStringRequest("b")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("b")))
assert.Equal(t, 1, q.Size())
// the second should be rejected since the queue is full
assert.False(t, q.Produce(context.Background(), newStringRequest("c")))
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("c")), ErrQueueIsFull)
assert.Equal(t, 1, q.Size())

startLock.Unlock() // unblock consumer
Expand All @@ -88,13 +88,13 @@ func TestBoundedQueue(t *testing.T) {
"b": true,
}
for _, item := range []string{"d", "e", "f"} {
assert.True(t, q.Produce(context.Background(), newStringRequest(item)))
assert.NoError(t, q.Offer(context.Background(), newStringRequest(item)))
expected[item] = true
consumerState.assertConsumed(expected)
}

assert.NoError(t, q.Shutdown(context.Background()))
assert.False(t, q.Produce(context.Background(), newStringRequest("x")), "cannot push to closed queue")
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("x")), ErrQueueIsStopped)
}

// In this test we run a queue with many items and a slow consumer.
Expand All @@ -113,20 +113,20 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
time.Sleep(1 * time.Second)
})))

q.Produce(context.Background(), newStringRequest("a"))
q.Produce(context.Background(), newStringRequest("b"))
q.Produce(context.Background(), newStringRequest("c"))
q.Produce(context.Background(), newStringRequest("d"))
q.Produce(context.Background(), newStringRequest("e"))
q.Produce(context.Background(), newStringRequest("f"))
q.Produce(context.Background(), newStringRequest("g"))
q.Produce(context.Background(), newStringRequest("h"))
q.Produce(context.Background(), newStringRequest("i"))
q.Produce(context.Background(), newStringRequest("j"))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("a")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("b")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("c")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("d")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("e")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("f")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("g")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("h")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("i")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("j")))

assert.NoError(t, q.Shutdown(context.Background()))

assert.False(t, q.Produce(context.Background(), newStringRequest("x")), "cannot push to closed queue")
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("x")), ErrQueueIsStopped)
consumerState.assertConsumed(map[string]bool{
"a": true,
"b": true,
Expand Down Expand Up @@ -193,7 +193,7 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int)
}))
require.NoError(b, err)
for j := 0; j < numberOfItems; j++ {
q.Produce(context.Background(), newStringRequest(fmt.Sprintf("%d", j)))
_ = q.Offer(context.Background(), newStringRequest(fmt.Sprintf("%d", j)))
}
assert.NoError(b, q.Shutdown(context.Background()))
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestZeroSizeWithConsumers(t *testing.T) {
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {}))
assert.NoError(t, err)

assert.True(t, q.Produce(context.Background(), newStringRequest("a"))) // in process
assert.NoError(t, q.Offer(context.Background(), newStringRequest("a"))) // in process

assert.NoError(t, q.Shutdown(context.Background()))
}
Expand All @@ -261,7 +261,7 @@ func TestZeroSizeNoConsumers(t *testing.T) {
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {}))
assert.NoError(t, err)

assert.False(t, q.Produce(context.Background(), newStringRequest("a"))) // in process
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("a")), ErrQueueIsFull) // in process

assert.NoError(t, q.Shutdown(context.Background()))
}
10 changes: 5 additions & 5 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q
return nil
}

// Produce adds an item to the queue and returns true if it was accepted
// Request context is currently ignored by the persistent queue.
func (pq *persistentQueue) Produce(_ context.Context, item any) bool {
err := pq.storage.put(item)
return err == nil
// Offer inserts the specified element into this queue if it is possible to do so immediately
// without violating capacity restrictions. If success returns no error.
// It returns ErrQueueIsFull if no space is currently available.
func (pq *persistentQueue) Offer(_ context.Context, item any) error {
return pq.storage.put(item)
}

// Shutdown stops accepting items, shuts down the queue and closes the persistent queue
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ func TestPersistentQueue_Capacity(t *testing.T) {
req := newFakeTracesRequest(newTraces(1, 10))

for i := 0; i < 10; i++ {
result := pq.Produce(context.Background(), req)
result := pq.Offer(context.Background(), req)
if i < 5 {
assert.True(t, result)
assert.NoError(t, result)
} else {
assert.False(t, result)
assert.ErrorIs(t, result, ErrQueueIsFull)
}
}
assert.Equal(t, 5, pq.Size())
Expand All @@ -78,7 +78,7 @@ func TestPersistentQueueShutdown(t *testing.T) {
req := newFakeTracesRequest(newTraces(1, 10))

for i := 0; i < 1000; i++ {
pq.Produce(context.Background(), req)
assert.NoError(t, pq.Offer(context.Background(), req))
}
assert.NoError(t, pq.Shutdown(context.Background()))
}
Expand All @@ -101,7 +101,7 @@ func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) {
}

for i := 0; i < 1000; i++ {
pq.Produce(context.Background(), req)
assert.NoError(t, pq.Offer(context.Background(), req))
}
assert.NoError(t, pq.Shutdown(context.Background()))
assert.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time")
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
})

for i := 0; i < c.numMessagesProduced; i++ {
pq.Produce(context.Background(), req)
assert.NoError(t, pq.Offer(context.Background(), req))
}

assert.Eventually(t, func() bool {
Expand Down
5 changes: 2 additions & 3 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ const (
)

var (
errMaxCapacityReached = errors.New("max capacity reached")
errValueNotSet = errors.New("value not set")
errValueNotSet = errors.New("value not set")
)

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
Expand Down Expand Up @@ -187,7 +186,7 @@ func (pcs *persistentContiguousStorage) put(req any) error {

if pcs.size() >= pcs.capacity {
pcs.logger.Warn("Maximum queue capacity reached")
return errMaxCapacityReached
return ErrQueueIsFull
}

itemKey := getItemKey(pcs.writeIndex)
Expand Down
15 changes: 12 additions & 3 deletions exporter/exporterhelper/internal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"errors"

"go.opentelemetry.io/collector/component"
)

var (
// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full.
ErrQueueIsFull = errors.New("sending queue is full")
// ErrQueueIsStopped is the error returned when an item is offered to the Queue and the queue is stopped.
ErrQueueIsStopped = errors.New("sending queue is stopped")
)

type QueueSettings struct {
DataType component.DataType
Callback func(QueueRequest)
Expand All @@ -22,9 +30,10 @@ type Queue interface {
// Start starts the queue with a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
Start(ctx context.Context, host component.Host, set QueueSettings) error
// Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added
// to the queue due to queue overflow.
Produce(ctx context.Context, item any) bool
// Offer inserts the specified element into this queue if it is possible to do so immediately
// without violating capacity restrictions. If success returns no error.
// It returns ErrQueueIsFull if no space is currently available.
Offer(ctx context.Context, item any) error
// Size returns the current Size of the queue
Size() int
// Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped.
Expand Down
5 changes: 3 additions & 2 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/pdata/plog"
)

Expand Down Expand Up @@ -95,7 +96,7 @@ func NewLogsExporter(
lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
req := newLogsRequest(ld, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, errSendingQueueIsFull) {
if errors.Is(serr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
}
return serr
Expand Down Expand Up @@ -146,7 +147,7 @@ func NewLogsRequestExporter(
return consumererror.NewPermanent(cErr)
}
sErr := be.send(ctx, req)
if errors.Is(sErr, errSendingQueueIsFull) {
if errors.Is(sErr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
}
return sErr
Expand Down
5 changes: 3 additions & 2 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/pdata/pmetric"
)

Expand Down Expand Up @@ -95,7 +96,7 @@ func NewMetricsExporter(
mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
req := newMetricsRequest(md, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, errSendingQueueIsFull) {
if errors.Is(serr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount()))
}
return serr
Expand Down Expand Up @@ -146,7 +147,7 @@ func NewMetricsRequestExporter(
return consumererror.NewPermanent(cErr)
}
sErr := be.send(ctx, req)
if errors.Is(sErr, errSendingQueueIsFull) {
if errors.Is(sErr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount()))
}
return sErr
Expand Down
9 changes: 4 additions & 5 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import (
const defaultQueueSize = 1000

var (
errSendingQueueIsFull = errors.New("sending_queue is full")
scopeName = "go.opentelemetry.io/collector/exporterhelper"
scopeName = "go.opentelemetry.io/collector/exporterhelper"
)

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
Expand Down Expand Up @@ -119,7 +118,7 @@ func (qs *queueSender) onTemporaryFailure(ctx context.Context, req Request, err
return err
}

if qs.queue.Produce(ctx, req) {
if qs.queue.Offer(ctx, req) == nil {
logger.Error(
"Exporting failed. Putting back to the end of the queue.",
zap.Error(err),
Expand Down Expand Up @@ -220,13 +219,13 @@ func (qs *queueSender) send(ctx context.Context, req Request) error {
c := noCancellationContext{Context: ctx}

span := trace.SpanFromContext(c)
if !qs.queue.Produce(c, req) {
if err := qs.queue.Offer(c, req); err != nil {
qs.logger.Error(
"Dropping data because sending_queue is full. Try increasing queue_size.",
zap.Int("dropped_items", req.ItemsCount()),
)
span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qs.traceAttribute))
return errSendingQueueIsFull
return err
}

span.AddEvent("Enqueued item.", trace.WithAttributes(qs.traceAttribute))
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ type producerConsumerQueueWithCounter struct {
produceCounter *atomic.Uint32
}

func (pcq *producerConsumerQueueWithCounter) Produce(ctx context.Context, item any) bool {
func (pcq *producerConsumerQueueWithCounter) Offer(ctx context.Context, item any) error {
pcq.produceCounter.Add(1)
return pcq.Queue.Produce(ctx, item)
return pcq.Queue.Offer(ctx, item)
}
5 changes: 3 additions & 2 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand Down Expand Up @@ -95,7 +96,7 @@ func NewTracesExporter(
tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
req := newTracesRequest(td, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, errSendingQueueIsFull) {
if errors.Is(serr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeTraces, int64(req.ItemsCount()))
}
return serr
Expand Down Expand Up @@ -146,7 +147,7 @@ func NewTracesRequestExporter(
return consumererror.NewPermanent(cErr)
}
sErr := be.send(ctx, req)
if errors.Is(sErr, errSendingQueueIsFull) {
if errors.Is(sErr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeTraces, int64(req.ItemsCount()))
}
return sErr
Expand Down

0 comments on commit 31f8454

Please sign in to comment.