Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup qretry, do not backoff if no retries configured or failed to add to the queue #1311

Merged
merged 1 commit into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions consumer/consumererror/consumererror.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ package consumererror
// permanent is an error that will be always returned if its source
// receives the same inputs.
type permanent struct {
error
err error
}

// Permanent wraps an error to indicate that it is a permanent error, i.e.: an
// error that will be always returned if its source receives the same inputs.
func Permanent(err error) error {
return permanent{err}
return permanent{err: err}
}

func (p permanent) Error() string {
return "Permanent error: " + p.err.Error()
}

// IsPermanent checks if an error was wrapped with the Permanent function, that
Expand Down
87 changes: 38 additions & 49 deletions processor/queuedprocessor/queued_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package queuedprocessor

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -42,30 +43,32 @@ type queuedSpanProcessor struct {
numWorkers int
retryOnProcessingFailure bool
backoffDelay time.Duration
statsTags []tag.Mutator
stopCh chan struct{}
stopOnce sync.Once
}

var _ consumer.TraceConsumer = (*queuedSpanProcessor)(nil)

type queueItem struct {
queuedTime time.Time
td pdata.Traces
ctx context.Context
queuedTime time.Time
td pdata.Traces
spanCountStats *processor.SpanCountStats
ctx context.Context
}

func newQueuedSpanProcessor(
params component.ProcessorCreateParams,
sender consumer.TraceConsumer,
cfg *Config,
) *queuedSpanProcessor {
boundedQueue := queue.NewBoundedQueue(cfg.QueueSize, func(item interface{}) {})
return &queuedSpanProcessor{
name: cfg.Name(),
queue: boundedQueue,
queue: queue.NewBoundedQueue(cfg.QueueSize, func(item interface{}) {}),
logger: params.Logger,
numWorkers: cfg.NumWorkers,
sender: sender,
statsTags: []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, cfg.Name())},
retryOnProcessingFailure: cfg.RetryOnFailure,
backoffDelay: cfg.BackoffDelay,
stopCh: make(chan struct{}),
Expand Down Expand Up @@ -116,23 +119,23 @@ func (sp *queuedSpanProcessor) Stop() {
func (sp *queuedSpanProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
ctx = obsreport.ProcessorContext(ctx, sp.name)
item := &queueItem{
queuedTime: time.Now(),
td: td,
ctx: ctx,
queuedTime: time.Now(),
td: td,
spanCountStats: processor.NewSpanCountStats(td, sp.name),
ctx: ctx,
}

spanCountStats := processor.NewSpanCountStats(td, sp.name)
processor.RecordsSpanCountMetrics(ctx, spanCountStats, processor.StatReceivedSpanCount)
processor.RecordsSpanCountMetrics(ctx, item.spanCountStats, processor.StatReceivedSpanCount)

addedToQueue := sp.queue.Produce(item)
if !addedToQueue {
// TODO: in principle this may not end in data loss because this can be
// in the same call stack as the receiver, ie.: the call from the receiver
// to here is synchronous. This means that actually it could be proper to
// record this as "refused" instead of "dropped".
sp.onItemDropped(item, spanCountStats)
sp.onItemDropped(item, fmt.Errorf("failed to add to the queue: %w", error(nil)))
} else {
obsreport.ProcessorTraceDataAccepted(ctx, spanCountStats.GetAllSpansCount())
obsreport.ProcessorTraceDataAccepted(ctx, item.spanCountStats.GetAllSpansCount())
}
return nil
}
Expand All @@ -150,59 +153,45 @@ func (sp *queuedSpanProcessor) Shutdown(context.Context) error {
func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) {
startTime := time.Now()
err := sp.sender.ConsumeTraces(item.ctx, item.td)
processorStatsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, sp.name)}
if err == nil {
// Record latency metrics and return
sendLatencyMs := int64(time.Since(startTime) / time.Millisecond)
inQueueLatencyMs := int64(time.Since(item.queuedTime) / time.Millisecond)
stats.RecordWithTags(context.Background(),
processorStatsTags,
_ = stats.RecordWithTags(item.ctx,
sp.statsTags,
statSuccessSendOps.M(1),
statSendLatencyMs.M(sendLatencyMs),
statInQueueLatencyMs.M(inQueueLatencyMs))

return
}

spanCountStats := processor.NewSpanCountStats(item.td, sp.name)
allSpansCount := spanCountStats.GetAllSpansCount()

// There was an error
_ = stats.RecordWithTags(item.ctx, sp.statsTags, statFailedSendOps.M(1))

// Immediately drop data on permanent errors. In this context permanent
// errors indicate some kind of bad pdata.
// Immediately drop data on permanent errors.
if consumererror.IsPermanent(err) {
sp.logger.Warn(
"Unrecoverable bad data error",
zap.Int("#spans", spanCountStats.GetAllSpansCount()),
zap.Error(err))

processor.RecordsSpanCountMetrics(
context.Background(),
spanCountStats,
processor.StatBadBatchDroppedSpanCount)

// throw away the batch
sp.onItemDropped(item, err)
return
}

stats.RecordWithTags(context.Background(), processorStatsTags, statFailedSendOps.M(1))

sp.logger.Warn("Sender failed", zap.Error(err))
// Immediately drop data on no retires configured.
if !sp.retryOnProcessingFailure {
// throw away the batch
sp.logger.Error("Failed to process batch, discarding", zap.Int("batch-size", allSpansCount))
sp.onItemDropped(item, spanCountStats)
} else {
// TODO: (@pjanotti) do not put it back on the end of the queue, retry with it directly.
// This will have the benefit of keeping the batch closer to related ones in time.
if !sp.queue.Produce(item) {
sp.logger.Error("Failed to process batch and failed to re-enqueue", zap.Int("batch-size", allSpansCount))
sp.onItemDropped(item, spanCountStats)
} else {
sp.logger.Warn("Failed to process batch, re-enqueued", zap.Int("batch-size", allSpansCount))
}
sp.onItemDropped(item, fmt.Errorf("no retry processing %w", err))
return
}

// TODO: (@pjanotti) do not put it back on the end of the queue, retry with it directly.
// This will have the benefit of keeping the batch closer to related ones in time.
if !sp.queue.Produce(item) {
sp.onItemDropped(item, fmt.Errorf("failed to re-enqueue: %w", err))
return
}

sp.logger.Warn("Failed to process batch, re-enqueued", zap.Int("batch-size", item.spanCountStats.GetAllSpansCount()))

// back-off for configured delay, but get interrupted when shutting down
if sp.backoffDelay > 0 {
sp.logger.Warn("Backing off before next attempt", zap.Duration("backoff_delay", sp.backoffDelay))
Expand All @@ -217,15 +206,15 @@ func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) {
}
}

func (sp *queuedSpanProcessor) onItemDropped(item *queueItem, spanCountStats *processor.SpanCountStats) {
stats.RecordWithTags(item.ctx,
func (sp *queuedSpanProcessor) onItemDropped(item *queueItem, err error) {
_ = stats.RecordWithTags(item.ctx,
[]tag.Mutator{tag.Insert(processor.TagProcessorNameKey, sp.name)},
processor.StatTraceBatchesDroppedCount.M(int64(1)))
processor.RecordsSpanCountMetrics(item.ctx, spanCountStats, processor.StatDroppedSpanCount)
processor.RecordsSpanCountMetrics(item.ctx, item.spanCountStats, processor.StatDroppedSpanCount)

obsreport.ProcessorTraceDataDropped(item.ctx, spanCountStats.GetAllSpansCount())
obsreport.ProcessorTraceDataDropped(item.ctx, item.spanCountStats.GetAllSpansCount())

sp.logger.Warn("Span batch dropped", zap.Int("#spans", spanCountStats.GetAllSpansCount()))
sp.logger.Error("Failed to process batch, discarding", zap.Int("batch-size", item.spanCountStats.GetAllSpansCount()), zap.Error(err))
}

// Variables related to metrics specific to queued processor.
Expand Down