Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] rename internal Queue func to Offer; return error #8884

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (q *boundedMemoryQueue) Produce(ctx context.Context, req any) bool {
func (q *boundedMemoryQueue) Offer(ctx context.Context, req any) error {
if q.stopped.Load() {
return false
return ErrQueueIsStopped
}

select {
case q.items <- newQueueRequest(ctx, req):
return true
return nil
default:
return false
return ErrQueueIsFull
}
}

Expand Down
38 changes: 19 additions & 19 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestBoundedQueue(t *testing.T) {
startLock.Unlock()
})))

assert.True(t, q.Produce(context.Background(), newStringRequest("a")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("a")))

// at this point "a" may or may not have been received by the consumer go-routine
// so let's make sure it has been
Expand All @@ -69,10 +69,10 @@ func TestBoundedQueue(t *testing.T) {
})

// produce two more items. The first one should be accepted, but not consumed.
assert.True(t, q.Produce(context.Background(), newStringRequest("b")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("b")))
assert.Equal(t, 1, q.Size())
// the second should be rejected since the queue is full
assert.False(t, q.Produce(context.Background(), newStringRequest("c")))
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("c")), ErrQueueIsFull)
assert.Equal(t, 1, q.Size())

startLock.Unlock() // unblock consumer
Expand All @@ -88,13 +88,13 @@ func TestBoundedQueue(t *testing.T) {
"b": true,
}
for _, item := range []string{"d", "e", "f"} {
assert.True(t, q.Produce(context.Background(), newStringRequest(item)))
assert.NoError(t, q.Offer(context.Background(), newStringRequest(item)))
expected[item] = true
consumerState.assertConsumed(expected)
}

assert.NoError(t, q.Shutdown(context.Background()))
assert.False(t, q.Produce(context.Background(), newStringRequest("x")), "cannot push to closed queue")
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("x")), ErrQueueIsStopped)
}

// In this test we run a queue with many items and a slow consumer.
Expand All @@ -113,20 +113,20 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
time.Sleep(1 * time.Second)
})))

q.Produce(context.Background(), newStringRequest("a"))
q.Produce(context.Background(), newStringRequest("b"))
q.Produce(context.Background(), newStringRequest("c"))
q.Produce(context.Background(), newStringRequest("d"))
q.Produce(context.Background(), newStringRequest("e"))
q.Produce(context.Background(), newStringRequest("f"))
q.Produce(context.Background(), newStringRequest("g"))
q.Produce(context.Background(), newStringRequest("h"))
q.Produce(context.Background(), newStringRequest("i"))
q.Produce(context.Background(), newStringRequest("j"))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("a")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("b")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("c")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("d")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("e")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("f")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("g")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("h")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("i")))
assert.NoError(t, q.Offer(context.Background(), newStringRequest("j")))

assert.NoError(t, q.Shutdown(context.Background()))

assert.False(t, q.Produce(context.Background(), newStringRequest("x")), "cannot push to closed queue")
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("x")), ErrQueueIsStopped)
consumerState.assertConsumed(map[string]bool{
"a": true,
"b": true,
Expand Down Expand Up @@ -193,7 +193,7 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int)
}))
require.NoError(b, err)
for j := 0; j < numberOfItems; j++ {
q.Produce(context.Background(), newStringRequest(fmt.Sprintf("%d", j)))
_ = q.Offer(context.Background(), newStringRequest(fmt.Sprintf("%d", j)))
}
assert.NoError(b, q.Shutdown(context.Background()))
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestZeroSizeWithConsumers(t *testing.T) {
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {}))
assert.NoError(t, err)

assert.True(t, q.Produce(context.Background(), newStringRequest("a"))) // in process
assert.NoError(t, q.Offer(context.Background(), newStringRequest("a"))) // in process

assert.NoError(t, q.Shutdown(context.Background()))
}
Expand All @@ -261,7 +261,7 @@ func TestZeroSizeNoConsumers(t *testing.T) {
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item any) {}))
assert.NoError(t, err)

assert.False(t, q.Produce(context.Background(), newStringRequest("a"))) // in process
assert.ErrorIs(t, q.Offer(context.Background(), newStringRequest("a")), ErrQueueIsFull) // in process

assert.NoError(t, q.Shutdown(context.Background()))
}
10 changes: 5 additions & 5 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q
return nil
}

// Produce adds an item to the queue and returns true if it was accepted
// Request context is currently ignored by the persistent queue.
func (pq *persistentQueue) Produce(_ context.Context, item any) bool {
err := pq.storage.put(item)
return err == nil
// Offer inserts the specified element into this queue if it is possible to do so immediately
// without violating capacity restrictions. If success returns no error.
// It returns ErrQueueIsFull if no space is currently available.
func (pq *persistentQueue) Offer(_ context.Context, item any) error {
return pq.storage.put(item)
}

// Shutdown stops accepting items, shuts down the queue and closes the persistent queue
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ func TestPersistentQueue_Capacity(t *testing.T) {
req := newFakeTracesRequest(newTraces(1, 10))

for i := 0; i < 10; i++ {
result := pq.Produce(context.Background(), req)
result := pq.Offer(context.Background(), req)
if i < 5 {
assert.True(t, result)
assert.NoError(t, result)
} else {
assert.False(t, result)
assert.ErrorIs(t, result, ErrQueueIsFull)
}
}
assert.Equal(t, 5, pq.Size())
Expand All @@ -78,7 +78,7 @@ func TestPersistentQueueShutdown(t *testing.T) {
req := newFakeTracesRequest(newTraces(1, 10))

for i := 0; i < 1000; i++ {
pq.Produce(context.Background(), req)
assert.NoError(t, pq.Offer(context.Background(), req))
}
assert.NoError(t, pq.Shutdown(context.Background()))
}
Expand All @@ -101,7 +101,7 @@ func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) {
}

for i := 0; i < 1000; i++ {
pq.Produce(context.Background(), req)
assert.NoError(t, pq.Offer(context.Background(), req))
}
assert.NoError(t, pq.Shutdown(context.Background()))
assert.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time")
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
})

for i := 0; i < c.numMessagesProduced; i++ {
pq.Produce(context.Background(), req)
assert.NoError(t, pq.Offer(context.Background(), req))
}

assert.Eventually(t, func() bool {
Expand Down
5 changes: 2 additions & 3 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ const (
)

var (
errMaxCapacityReached = errors.New("max capacity reached")
errValueNotSet = errors.New("value not set")
errValueNotSet = errors.New("value not set")
)

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
Expand Down Expand Up @@ -187,7 +186,7 @@ func (pcs *persistentContiguousStorage) put(req any) error {

if pcs.size() >= pcs.capacity {
pcs.logger.Warn("Maximum queue capacity reached")
return errMaxCapacityReached
return ErrQueueIsFull
}

itemKey := getItemKey(pcs.writeIndex)
Expand Down
15 changes: 12 additions & 3 deletions exporter/exporterhelper/internal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,18 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"errors"

"go.opentelemetry.io/collector/component"
)

var (
// ErrQueueIsFull is the error returned when an item is offered to the Queue and the queue is full.
ErrQueueIsFull = errors.New("sending queue is full")
// ErrQueueIsStopped is the error returned when an item is offered to the Queue and the queue is stopped.
ErrQueueIsStopped = errors.New("sending queue is stopped")
)

type QueueSettings struct {
DataType component.DataType
Callback func(QueueRequest)
Expand All @@ -22,9 +30,10 @@ type Queue interface {
// Start starts the queue with a given number of goroutines consuming items from the queue
// and passing them into the consumer callback.
Start(ctx context.Context, host component.Host, set QueueSettings) error
// Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added
// to the queue due to queue overflow.
Produce(ctx context.Context, item any) bool
// Offer inserts the specified element into this queue if it is possible to do so immediately
// without violating capacity restrictions. If success returns no error.
// It returns ErrQueueIsFull if no space is currently available.
Offer(ctx context.Context, item any) error
// Size returns the current Size of the queue
Size() int
// Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped.
Expand Down
5 changes: 3 additions & 2 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/pdata/plog"
)

Expand Down Expand Up @@ -95,7 +96,7 @@ func NewLogsExporter(
lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
req := newLogsRequest(ld, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, errSendingQueueIsFull) {
if errors.Is(serr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
}
return serr
Expand Down Expand Up @@ -146,7 +147,7 @@ func NewLogsRequestExporter(
return consumererror.NewPermanent(cErr)
}
sErr := be.send(ctx, req)
if errors.Is(sErr, errSendingQueueIsFull) {
if errors.Is(sErr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
}
return sErr
Expand Down
5 changes: 3 additions & 2 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/pdata/pmetric"
)

Expand Down Expand Up @@ -95,7 +96,7 @@ func NewMetricsExporter(
mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
req := newMetricsRequest(md, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, errSendingQueueIsFull) {
if errors.Is(serr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount()))
}
return serr
Expand Down Expand Up @@ -146,7 +147,7 @@ func NewMetricsRequestExporter(
return consumererror.NewPermanent(cErr)
}
sErr := be.send(ctx, req)
if errors.Is(sErr, errSendingQueueIsFull) {
if errors.Is(sErr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount()))
}
return sErr
Expand Down
9 changes: 4 additions & 5 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import (
const defaultQueueSize = 1000

var (
errSendingQueueIsFull = errors.New("sending_queue is full")
scopeName = "go.opentelemetry.io/collector/exporterhelper"
scopeName = "go.opentelemetry.io/collector/exporterhelper"
)

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
Expand Down Expand Up @@ -119,7 +118,7 @@ func (qs *queueSender) onTemporaryFailure(ctx context.Context, req Request, err
return err
}

if qs.queue.Produce(ctx, req) {
if qs.queue.Offer(ctx, req) == nil {
logger.Error(
"Exporting failed. Putting back to the end of the queue.",
zap.Error(err),
Expand Down Expand Up @@ -220,13 +219,13 @@ func (qs *queueSender) send(ctx context.Context, req Request) error {
c := noCancellationContext{Context: ctx}

span := trace.SpanFromContext(c)
if !qs.queue.Produce(c, req) {
if err := qs.queue.Offer(c, req); err != nil {
qs.logger.Error(
"Dropping data because sending_queue is full. Try increasing queue_size.",
zap.Int("dropped_items", req.ItemsCount()),
)
span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qs.traceAttribute))
return errSendingQueueIsFull
return err
}

span.AddEvent("Enqueued item.", trace.WithAttributes(qs.traceAttribute))
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ type producerConsumerQueueWithCounter struct {
produceCounter *atomic.Uint32
}

func (pcq *producerConsumerQueueWithCounter) Produce(ctx context.Context, item any) bool {
func (pcq *producerConsumerQueueWithCounter) Offer(ctx context.Context, item any) error {
pcq.produceCounter.Add(1)
return pcq.Queue.Produce(ctx, item)
return pcq.Queue.Offer(ctx, item)
}
5 changes: 3 additions & 2 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand Down Expand Up @@ -95,7 +96,7 @@ func NewTracesExporter(
tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
req := newTracesRequest(td, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, errSendingQueueIsFull) {
if errors.Is(serr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeTraces, int64(req.ItemsCount()))
}
return serr
Expand Down Expand Up @@ -146,7 +147,7 @@ func NewTracesRequestExporter(
return consumererror.NewPermanent(cErr)
}
sErr := be.send(ctx, req)
if errors.Is(sErr, errSendingQueueIsFull) {
if errors.Is(sErr, internal.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeTraces, int64(req.ItemsCount()))
}
return sErr
Expand Down
Loading