Skip to content

Commit

Permalink
Cleanup qretry, do not backoff if no retries configured or failed to …
Browse files Browse the repository at this point in the history
…add to the queue (open-telemetry#1311)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored and wyTrivail committed Jul 13, 2020
1 parent 5ec327b commit a0d363e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 51 deletions.
8 changes: 6 additions & 2 deletions consumer/consumererror/consumererror.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ package consumererror
// permanent is an error that will be always returned if its source
// receives the same inputs.
type permanent struct {
error
err error
}

// Permanent wraps an error to indicate that it is a permanent error, i.e.: an
// error that will be always returned if its source receives the same inputs.
func Permanent(err error) error {
return permanent{err}
return permanent{err: err}
}

func (p permanent) Error() string {
return "Permanent error: " + p.err.Error()
}

// IsPermanent checks if an error was wrapped with the Permanent function, that
Expand Down
87 changes: 38 additions & 49 deletions processor/queuedprocessor/queued_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package queuedprocessor

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -42,30 +43,32 @@ type queuedSpanProcessor struct {
numWorkers int
retryOnProcessingFailure bool
backoffDelay time.Duration
statsTags []tag.Mutator
stopCh chan struct{}
stopOnce sync.Once
}

var _ consumer.TraceConsumer = (*queuedSpanProcessor)(nil)

type queueItem struct {
queuedTime time.Time
td pdata.Traces
ctx context.Context
queuedTime time.Time
td pdata.Traces
spanCountStats *processor.SpanCountStats
ctx context.Context
}

func newQueuedSpanProcessor(
params component.ProcessorCreateParams,
sender consumer.TraceConsumer,
cfg *Config,
) *queuedSpanProcessor {
boundedQueue := queue.NewBoundedQueue(cfg.QueueSize, func(item interface{}) {})
return &queuedSpanProcessor{
name: cfg.Name(),
queue: boundedQueue,
queue: queue.NewBoundedQueue(cfg.QueueSize, func(item interface{}) {}),
logger: params.Logger,
numWorkers: cfg.NumWorkers,
sender: sender,
statsTags: []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, cfg.Name())},
retryOnProcessingFailure: cfg.RetryOnFailure,
backoffDelay: cfg.BackoffDelay,
stopCh: make(chan struct{}),
Expand Down Expand Up @@ -116,23 +119,23 @@ func (sp *queuedSpanProcessor) Stop() {
func (sp *queuedSpanProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
ctx = obsreport.ProcessorContext(ctx, sp.name)
item := &queueItem{
queuedTime: time.Now(),
td: td,
ctx: ctx,
queuedTime: time.Now(),
td: td,
spanCountStats: processor.NewSpanCountStats(td, sp.name),
ctx: ctx,
}

spanCountStats := processor.NewSpanCountStats(td, sp.name)
processor.RecordsSpanCountMetrics(ctx, spanCountStats, processor.StatReceivedSpanCount)
processor.RecordsSpanCountMetrics(ctx, item.spanCountStats, processor.StatReceivedSpanCount)

addedToQueue := sp.queue.Produce(item)
if !addedToQueue {
// TODO: in principle this may not end in data loss because this can be
// in the same call stack as the receiver, ie.: the call from the receiver
// to here is synchronous. This means that actually it could be proper to
// record this as "refused" instead of "dropped".
sp.onItemDropped(item, spanCountStats)
sp.onItemDropped(item, fmt.Errorf("failed to add to the queue: %w", error(nil)))
} else {
obsreport.ProcessorTraceDataAccepted(ctx, spanCountStats.GetAllSpansCount())
obsreport.ProcessorTraceDataAccepted(ctx, item.spanCountStats.GetAllSpansCount())
}
return nil
}
Expand All @@ -150,59 +153,45 @@ func (sp *queuedSpanProcessor) Shutdown(context.Context) error {
func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) {
startTime := time.Now()
err := sp.sender.ConsumeTraces(item.ctx, item.td)
processorStatsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, sp.name)}
if err == nil {
// Record latency metrics and return
sendLatencyMs := int64(time.Since(startTime) / time.Millisecond)
inQueueLatencyMs := int64(time.Since(item.queuedTime) / time.Millisecond)
stats.RecordWithTags(context.Background(),
processorStatsTags,
_ = stats.RecordWithTags(item.ctx,
sp.statsTags,
statSuccessSendOps.M(1),
statSendLatencyMs.M(sendLatencyMs),
statInQueueLatencyMs.M(inQueueLatencyMs))

return
}

spanCountStats := processor.NewSpanCountStats(item.td, sp.name)
allSpansCount := spanCountStats.GetAllSpansCount()

// There was an error
_ = stats.RecordWithTags(item.ctx, sp.statsTags, statFailedSendOps.M(1))

// Immediately drop data on permanent errors. In this context permanent
// errors indicate some kind of bad pdata.
// Immediately drop data on permanent errors.
if consumererror.IsPermanent(err) {
sp.logger.Warn(
"Unrecoverable bad data error",
zap.Int("#spans", spanCountStats.GetAllSpansCount()),
zap.Error(err))

processor.RecordsSpanCountMetrics(
context.Background(),
spanCountStats,
processor.StatBadBatchDroppedSpanCount)

// throw away the batch
sp.onItemDropped(item, err)
return
}

stats.RecordWithTags(context.Background(), processorStatsTags, statFailedSendOps.M(1))

sp.logger.Warn("Sender failed", zap.Error(err))
// Immediately drop data on no retires configured.
if !sp.retryOnProcessingFailure {
// throw away the batch
sp.logger.Error("Failed to process batch, discarding", zap.Int("batch-size", allSpansCount))
sp.onItemDropped(item, spanCountStats)
} else {
// TODO: (@pjanotti) do not put it back on the end of the queue, retry with it directly.
// This will have the benefit of keeping the batch closer to related ones in time.
if !sp.queue.Produce(item) {
sp.logger.Error("Failed to process batch and failed to re-enqueue", zap.Int("batch-size", allSpansCount))
sp.onItemDropped(item, spanCountStats)
} else {
sp.logger.Warn("Failed to process batch, re-enqueued", zap.Int("batch-size", allSpansCount))
}
sp.onItemDropped(item, fmt.Errorf("no retry processing %w", err))
return
}

// TODO: (@pjanotti) do not put it back on the end of the queue, retry with it directly.
// This will have the benefit of keeping the batch closer to related ones in time.
if !sp.queue.Produce(item) {
sp.onItemDropped(item, fmt.Errorf("failed to re-enqueue: %w", err))
return
}

sp.logger.Warn("Failed to process batch, re-enqueued", zap.Int("batch-size", item.spanCountStats.GetAllSpansCount()))

// back-off for configured delay, but get interrupted when shutting down
if sp.backoffDelay > 0 {
sp.logger.Warn("Backing off before next attempt", zap.Duration("backoff_delay", sp.backoffDelay))
Expand All @@ -217,15 +206,15 @@ func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) {
}
}

func (sp *queuedSpanProcessor) onItemDropped(item *queueItem, spanCountStats *processor.SpanCountStats) {
stats.RecordWithTags(item.ctx,
func (sp *queuedSpanProcessor) onItemDropped(item *queueItem, err error) {
_ = stats.RecordWithTags(item.ctx,
[]tag.Mutator{tag.Insert(processor.TagProcessorNameKey, sp.name)},
processor.StatTraceBatchesDroppedCount.M(int64(1)))
processor.RecordsSpanCountMetrics(item.ctx, spanCountStats, processor.StatDroppedSpanCount)
processor.RecordsSpanCountMetrics(item.ctx, item.spanCountStats, processor.StatDroppedSpanCount)

obsreport.ProcessorTraceDataDropped(item.ctx, spanCountStats.GetAllSpansCount())
obsreport.ProcessorTraceDataDropped(item.ctx, item.spanCountStats.GetAllSpansCount())

sp.logger.Warn("Span batch dropped", zap.Int("#spans", spanCountStats.GetAllSpansCount()))
sp.logger.Error("Failed to process batch, discarding", zap.Int("batch-size", item.spanCountStats.GetAllSpansCount()), zap.Error(err))
}

// Variables related to metrics specific to queued processor.
Expand Down

0 comments on commit a0d363e

Please sign in to comment.