Skip to content

Commit

Permalink
[exporterhelper] Do not re-enqueue failed requests
Browse files Browse the repository at this point in the history
The current re-enqueuing behavior is not obvious and cannot be configured. It takes place only for persistent queue and only if `retry_on_failure::enabled=true` even if `retry_on_failure` is a setting for a different backoff retry strategy. This change removes the re-enqueuing behavior. Consider increasing `retry_on_failure::max_elapsed_time` to reduce chances of data loss.
  • Loading branch information
dmitryax committed Dec 14, 2023
1 parent 9d2e43d commit 46a11f8
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 209 deletions.
29 changes: 29 additions & 0 deletions .chloggen/disable-requeuing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporters/sending_queue

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Do not re-enqueue failed batches, rely on the retry_on_failure strategy instead.

# One or more tracking issues or pull requests related to the change
issues: [8382]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The current re-enqueuing behavior is not obvious and cannot be configured. It takes place only for persistent queue
and only if `retry_on_failure::enabled=true` even if `retry_on_failure` is a setting for a different backoff retry
strategy. This change removes the re-enqueuing behavior. Consider increasing `retry_on_failure::max_elapsed_time`
to reduce chances of data loss or set it to 0 to keep retrying until requests succeed.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
11 changes: 1 addition & 10 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,6 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
}
be.connectSenders()

// If retry sender is disabled then disable requeuing in the queue sender.
// TODO: Make re-enqueuing configurable on queue sender instead of relying on retry sender.
if qs, ok := be.queueSender.(*queueSender); ok {
// if it's not retrySender, then it is disabled.
if _, ok = be.retrySender.(*retrySender); !ok {
qs.requeuingEnabled = false
}
}

return be, nil
}

Expand Down Expand Up @@ -212,7 +203,7 @@ func (be *baseExporter) Start(ctx context.Context, host component.Host) error {

func (be *baseExporter) Shutdown(ctx context.Context) error {
return multierr.Combine(
// First shutdown the retry sender, so it can push any pending requests to back the queue.
// First shutdown the retry sender, so the queue sender can flush the queue without retries.
be.retrySender.Shutdown(ctx),
// Then shutdown the queue sender.
be.queueSender.Shutdown(ctx),
Expand Down
5 changes: 3 additions & 2 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) bool) bool {
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
item, ok := <-q.items
if !ok {
return false
}
consumeFunc(item.ctx, item.req)
// the memory queue doesn't handle consume errors
_ = consumeFunc(item.ctx, item.req)
return true
}

Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func TestBoundedQueue(t *testing.T) {

consumerState := newConsumerState(t)

consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) bool {
consumers := NewQueueConsumers(q, 1, func(_ context.Context, item string) error {
consumerState.record(item)
<-waitCh
return true
return nil
})
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -89,10 +89,10 @@ func TestShutdownWhileNotEmpty(t *testing.T) {
consumerState := newConsumerState(t)

waitChan := make(chan struct{})
consumers := NewQueueConsumers(q, 5, func(_ context.Context, item string) bool {
consumers := NewQueueConsumers(q, 5, func(_ context.Context, item string) error {
<-waitChan
consumerState.record(item)
return true
return nil
})
assert.NoError(t, consumers.Start(context.Background(), componenttest.NewNopHost()))

Expand Down Expand Up @@ -176,9 +176,9 @@ func queueUsage(b *testing.B, capacity int, numConsumers int, numberOfItems int)
b.ReportAllocs()
for i := 0; i < b.N; i++ {
q := NewBoundedMemoryQueue[string](capacity)
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) bool {
consumers := NewQueueConsumers(q, numConsumers, func(context.Context, string) error {
time.Sleep(1 * time.Millisecond)
return true
return nil
})
require.NoError(b, consumers.Start(context.Background(), componenttest.NewNopHost()))
for j := 0; j < numberOfItems; j++ {
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
type QueueConsumers[T any] struct {
queue Queue[T]
numConsumers int
consumeFunc func(context.Context, T) bool
consumeFunc func(context.Context, T) error
stopWG sync.WaitGroup
}

func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) bool) *QueueConsumers[T] {
func NewQueueConsumers[T any](q Queue[T], numConsumers int, consumeFunc func(context.Context, T) error) *QueueConsumers[T] {
return &QueueConsumers[T]{
queue: q,
numConsumers: numConsumers,
Expand Down
20 changes: 20 additions & 0 deletions exporter/exporterhelper/internal/err.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal"

type shutdownErr struct {
err error
}

func NewShutdownErr(err error) error {
return shutdownErr{err: err}
}

func (s shutdownErr) Error() string {
return "interrupted due to shutdown: " + s.err.Error()

Check warning on line 15 in exporter/exporterhelper/internal/err.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/err.go#L14-L15

Added lines #L14 - L15 were not covered by tests
}

func (s shutdownErr) Unwrap() error {
return s.err
}
18 changes: 11 additions & 7 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) bool) bool {
func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
var (
req T
onProcessingFinished func()
onProcessingFinished func(error)
consumed bool
)

Expand All @@ -157,9 +157,7 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) bool)
}

if consumed {
if ok := consumeFunc(context.Background(), req); ok {
onProcessingFinished()
}
onProcessingFinished(consumeFunc(context.Background(), req))
return true
}
}
Expand Down Expand Up @@ -241,7 +239,7 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {

// getNextItem pulls the next available item from the persistent storage along with a callback function that should be
// called after the item is processed to clean up the storage. If no new item is available, returns false.
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(), bool) {
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), bool) {
pq.mu.Lock()
defer pq.mu.Unlock()

Expand Down Expand Up @@ -282,7 +280,13 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(), bool)

// Increase the reference count, so the client is not closed while the request is being processed.
pq.refClient++
return request, func() {
return request, func(consumeErr error) {
if errors.As(consumeErr, &shutdownErr{}) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return
}

// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
defer pq.mu.Unlock()
Expand Down
22 changes: 11 additions & 11 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
}

// createAndStartTestPersistentQueue creates and starts a fake queue with the given capacity and number of consumers.
func createAndStartTestPersistentQueue(t *testing.T, capacity, numConsumers int, consumeFunc func(_ context.Context, item ptrace.Traces) bool) Queue[ptrace.Traces] {
func createAndStartTestPersistentQueue(t *testing.T, capacity, numConsumers int, consumeFunc func(_ context.Context, item ptrace.Traces) error) Queue[ptrace.Traces] {
pq := NewPersistentQueue[ptrace.Traces](capacity, component.DataTypeTraces, component.ID{}, marshaler.MarshalTraces,
unmarshaler.UnmarshalTraces, exportertest.NewNopCreateSettings())
host := &mockHost{ext: map[component.ID]component.Component{
Expand Down Expand Up @@ -73,10 +73,10 @@ func createTestPersistentQueue(client storage.Client) *persistentQueue[ptrace.Tr
func TestPersistentQueue_FullCapacity(t *testing.T) {
start := make(chan struct{})
done := make(chan struct{})
pq := createAndStartTestPersistentQueue(t, 5, 1, func(context.Context, ptrace.Traces) bool {
pq := createAndStartTestPersistentQueue(t, 5, 1, func(context.Context, ptrace.Traces) error {
<-start
<-done
return true
return nil
})
assert.Equal(t, 0, pq.Size())

Expand All @@ -100,7 +100,7 @@ func TestPersistentQueue_FullCapacity(t *testing.T) {
}

func TestPersistentQueue_Shutdown(t *testing.T) {
pq := createAndStartTestPersistentQueue(t, 1001, 100, func(context.Context, ptrace.Traces) bool { return true })
pq := createAndStartTestPersistentQueue(t, 1001, 100, func(context.Context, ptrace.Traces) error { return nil })
req := newTraces(1, 10)

for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -140,9 +140,9 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
req := newTraces(1, 10)

numMessagesConsumed := &atomic.Int32{}
pq := createAndStartTestPersistentQueue(t, 1000, c.numConsumers, func(context.Context, ptrace.Traces) bool {
pq := createAndStartTestPersistentQueue(t, 1000, c.numConsumers, func(context.Context, ptrace.Traces) error {
numMessagesConsumed.Add(int32(1))
return true
return nil
})

for i := 0; i < c.numMessagesProduced; i++ {
Expand Down Expand Up @@ -401,7 +401,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1})

// Lets mark item 1 as finished, it will remove it from the currently dispatched items list.
onProcessingFinished()
onProcessingFinished(nil)
requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0})

// Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end.
Expand All @@ -415,7 +415,7 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) {
r, onProcessingFinished, found := newPs.getNextItem(context.Background())
require.True(t, found)
assert.Equal(t, req, r)
onProcessingFinished()
onProcessingFinished(nil)
}

// The queue should be now empty
Expand Down Expand Up @@ -524,7 +524,7 @@ func BenchmarkPersistentQueue_TraceSpans(b *testing.B) {
}

for i := 0; i < bb.N; i++ {
require.True(bb, ps.Consume(func(context.Context, ptrace.Traces) bool { return true }))
require.True(bb, ps.Consume(func(context.Context, ptrace.Traces) error { return nil }))
}
require.NoError(b, ext.Shutdown(context.Background()))
})
Expand Down Expand Up @@ -603,7 +603,7 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) {
assert.False(t, client.(*mockStorageClient).isClosed())
assert.NoError(t, ps.Shutdown(context.Background()))
assert.False(t, client.(*mockStorageClient).isClosed())
onProcessingFinished()
onProcessingFinished(nil)
assert.True(t, client.(*mockStorageClient).isClosed())
}

Expand Down Expand Up @@ -643,7 +643,7 @@ func TestPersistentQueue_StorageFull(t *testing.T) {
// Subsequent items succeed, as deleting the first item frees enough space for the state update
reqCount--
for i := reqCount; i > 0; i-- {
require.True(t, ps.Consume(func(context.Context, ptrace.Traces) bool { return true }))
require.True(t, ps.Consume(func(context.Context, ptrace.Traces) error { return nil }))
}

// We should be able to put a new item in
Expand Down
3 changes: 1 addition & 2 deletions exporter/exporterhelper/internal/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ type Queue[T any] interface {
// Consume applies the provided function on the head of queue.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped.
// The provided callback function returns true if the item was consumed or false if the consumer is stopped.
Consume(func(ctx context.Context, item T) bool) bool
Consume(func(ctx context.Context, item T) error) bool
// Size returns the current Size of the queue
Size() int
// Capacity returns the capacity of the queue.
Expand Down
51 changes: 9 additions & 42 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"

"go.opencensus.io/metric/metricdata"
Expand Down Expand Up @@ -75,14 +74,12 @@ func (qCfg *QueueSettings) Validate() error {

type queueSender struct {
baseRequestSender
fullName string
queue internal.Queue[Request]
traceAttribute attribute.KeyValue
logger *zap.Logger
meter otelmetric.Meter
consumers *internal.QueueConsumers[Request]
requeuingEnabled bool
stopped *atomic.Bool
fullName string
queue internal.Queue[Request]
traceAttribute attribute.KeyValue
logger *zap.Logger
meter otelmetric.Meter
consumers *internal.QueueConsumers[Request]

metricCapacity otelmetric.Int64ObservableGauge
metricSize otelmetric.Int64ObservableGauge
Expand All @@ -105,50 +102,22 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
logger: set.TelemetrySettings.Logger,
meter: set.TelemetrySettings.MeterProvider.Meter(scopeName),
// TODO: this can be further exposed as a config param rather than relying on a type of queue
requeuingEnabled: isPersistent,
stopped: &atomic.Bool{},
}
qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, qs.consume)
return qs
}

// consume is the function that is executed by the queue consumers to send the data to the next consumerSender.
func (qs *queueSender) consume(ctx context.Context, req Request) bool {
func (qs *queueSender) consume(ctx context.Context, req Request) error {
err := qs.nextSender.send(ctx, req)

// Nothing to do if the error is nil or permanent. Permanent errors are already logged by retrySender.
if err == nil || consumererror.IsPermanent(err) {
return true
}

// Do not requeue if the queue sender is stopped.
if qs.stopped.Load() {
return false
}

if !qs.requeuingEnabled {
if err != nil && !consumererror.IsPermanent(err) {
qs.logger.Error(
"Exporting failed. No more retries left. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
return true
}

if qs.queue.Offer(ctx, extractPartialRequest(req, err)) == nil {
qs.logger.Error(
"Exporting failed. Putting back to the end of the queue.",
zap.Error(err),
)
} else {
qs.logger.Error(
"Exporting failed. Queue did not accept requeuing request. Dropping data.",
zap.Error(err),
zap.Int("dropped_items", req.ItemsCount()),
)
}
return true
return err
}

// Start is invoked during service startup.
Expand Down Expand Up @@ -212,8 +181,6 @@ func (qs *queueSender) recordWithOC() error {

// Shutdown is invoked during service shutdown.
func (qs *queueSender) Shutdown(ctx context.Context) error {
qs.stopped.Store(true)

// Cleanup queue metrics reporting
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)
Expand Down
Loading

0 comments on commit 46a11f8

Please sign in to comment.