From a0d363e8fdbf98db735c2ff991814e9a06f2f824 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 9 Jul 2020 15:14:30 -0700 Subject: [PATCH] Cleanup qretry, do not backoff if no retries configured or failed to add to the queue (#1311) Signed-off-by: Bogdan Drutu --- consumer/consumererror/consumererror.go | 8 +- processor/queuedprocessor/queued_processor.go | 87 ++++++++----------- 2 files changed, 44 insertions(+), 51 deletions(-) diff --git a/consumer/consumererror/consumererror.go b/consumer/consumererror/consumererror.go index a99629f36d7..af909aad8e1 100644 --- a/consumer/consumererror/consumererror.go +++ b/consumer/consumererror/consumererror.go @@ -20,13 +20,17 @@ package consumererror // permanent is an error that will be always returned if its source // receives the same inputs. type permanent struct { - error + err error } // Permanent wraps an error to indicate that it is a permanent error, i.e.: an // error that will be always returned if its source receives the same inputs. func Permanent(err error) error { - return permanent{err} + return permanent{err: err} +} + +func (p permanent) Error() string { + return "Permanent error: " + p.err.Error() } // IsPermanent checks if an error was wrapped with the Permanent function, that diff --git a/processor/queuedprocessor/queued_processor.go b/processor/queuedprocessor/queued_processor.go index e847d8d0816..608372b43b2 100644 --- a/processor/queuedprocessor/queued_processor.go +++ b/processor/queuedprocessor/queued_processor.go @@ -16,6 +16,7 @@ package queuedprocessor import ( "context" + "fmt" "sync" "time" @@ -42,6 +43,7 @@ type queuedSpanProcessor struct { numWorkers int retryOnProcessingFailure bool backoffDelay time.Duration + statsTags []tag.Mutator stopCh chan struct{} stopOnce sync.Once } @@ -49,9 +51,10 @@ type queuedSpanProcessor struct { var _ consumer.TraceConsumer = (*queuedSpanProcessor)(nil) type queueItem struct { - queuedTime time.Time - td pdata.Traces - ctx context.Context + queuedTime time.Time + td pdata.Traces + spanCountStats *processor.SpanCountStats + ctx context.Context } func newQueuedSpanProcessor( @@ -59,13 +62,13 @@ func newQueuedSpanProcessor( sender consumer.TraceConsumer, cfg *Config, ) *queuedSpanProcessor { - boundedQueue := queue.NewBoundedQueue(cfg.QueueSize, func(item interface{}) {}) return &queuedSpanProcessor{ name: cfg.Name(), - queue: boundedQueue, + queue: queue.NewBoundedQueue(cfg.QueueSize, func(item interface{}) {}), logger: params.Logger, numWorkers: cfg.NumWorkers, sender: sender, + statsTags: []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, cfg.Name())}, retryOnProcessingFailure: cfg.RetryOnFailure, backoffDelay: cfg.BackoffDelay, stopCh: make(chan struct{}), @@ -116,13 +119,13 @@ func (sp *queuedSpanProcessor) Stop() { func (sp *queuedSpanProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { ctx = obsreport.ProcessorContext(ctx, sp.name) item := &queueItem{ - queuedTime: time.Now(), - td: td, - ctx: ctx, + queuedTime: time.Now(), + td: td, + spanCountStats: processor.NewSpanCountStats(td, sp.name), + ctx: ctx, } - spanCountStats := processor.NewSpanCountStats(td, sp.name) - processor.RecordsSpanCountMetrics(ctx, spanCountStats, processor.StatReceivedSpanCount) + processor.RecordsSpanCountMetrics(ctx, item.spanCountStats, processor.StatReceivedSpanCount) addedToQueue := sp.queue.Produce(item) if !addedToQueue { @@ -130,9 +133,9 @@ func (sp *queuedSpanProcessor) ConsumeTraces(ctx context.Context, td pdata.Trace // in the same call stack as the receiver, ie.: the call from the receiver // to here is synchronous. This means that actually it could be proper to // record this as "refused" instead of "dropped". - sp.onItemDropped(item, spanCountStats) + sp.onItemDropped(item, fmt.Errorf("failed to add to the queue: %w", error(nil))) } else { - obsreport.ProcessorTraceDataAccepted(ctx, spanCountStats.GetAllSpansCount()) + obsreport.ProcessorTraceDataAccepted(ctx, item.spanCountStats.GetAllSpansCount()) } return nil } @@ -150,13 +153,12 @@ func (sp *queuedSpanProcessor) Shutdown(context.Context) error { func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) { startTime := time.Now() err := sp.sender.ConsumeTraces(item.ctx, item.td) - processorStatsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, sp.name)} if err == nil { // Record latency metrics and return sendLatencyMs := int64(time.Since(startTime) / time.Millisecond) inQueueLatencyMs := int64(time.Since(item.queuedTime) / time.Millisecond) - stats.RecordWithTags(context.Background(), - processorStatsTags, + _ = stats.RecordWithTags(item.ctx, + sp.statsTags, statSuccessSendOps.M(1), statSendLatencyMs.M(sendLatencyMs), statInQueueLatencyMs.M(inQueueLatencyMs)) @@ -164,45 +166,32 @@ func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) { return } - spanCountStats := processor.NewSpanCountStats(item.td, sp.name) - allSpansCount := spanCountStats.GetAllSpansCount() - // There was an error + _ = stats.RecordWithTags(item.ctx, sp.statsTags, statFailedSendOps.M(1)) - // Immediately drop data on permanent errors. In this context permanent - // errors indicate some kind of bad pdata. + // Immediately drop data on permanent errors. if consumererror.IsPermanent(err) { - sp.logger.Warn( - "Unrecoverable bad data error", - zap.Int("#spans", spanCountStats.GetAllSpansCount()), - zap.Error(err)) - - processor.RecordsSpanCountMetrics( - context.Background(), - spanCountStats, - processor.StatBadBatchDroppedSpanCount) - + // throw away the batch + sp.onItemDropped(item, err) return } - stats.RecordWithTags(context.Background(), processorStatsTags, statFailedSendOps.M(1)) - - sp.logger.Warn("Sender failed", zap.Error(err)) + // Immediately drop data on no retires configured. if !sp.retryOnProcessingFailure { // throw away the batch - sp.logger.Error("Failed to process batch, discarding", zap.Int("batch-size", allSpansCount)) - sp.onItemDropped(item, spanCountStats) - } else { - // TODO: (@pjanotti) do not put it back on the end of the queue, retry with it directly. - // This will have the benefit of keeping the batch closer to related ones in time. - if !sp.queue.Produce(item) { - sp.logger.Error("Failed to process batch and failed to re-enqueue", zap.Int("batch-size", allSpansCount)) - sp.onItemDropped(item, spanCountStats) - } else { - sp.logger.Warn("Failed to process batch, re-enqueued", zap.Int("batch-size", allSpansCount)) - } + sp.onItemDropped(item, fmt.Errorf("no retry processing %w", err)) + return + } + + // TODO: (@pjanotti) do not put it back on the end of the queue, retry with it directly. + // This will have the benefit of keeping the batch closer to related ones in time. + if !sp.queue.Produce(item) { + sp.onItemDropped(item, fmt.Errorf("failed to re-enqueue: %w", err)) + return } + sp.logger.Warn("Failed to process batch, re-enqueued", zap.Int("batch-size", item.spanCountStats.GetAllSpansCount())) + // back-off for configured delay, but get interrupted when shutting down if sp.backoffDelay > 0 { sp.logger.Warn("Backing off before next attempt", zap.Duration("backoff_delay", sp.backoffDelay)) @@ -217,15 +206,15 @@ func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) { } } -func (sp *queuedSpanProcessor) onItemDropped(item *queueItem, spanCountStats *processor.SpanCountStats) { - stats.RecordWithTags(item.ctx, +func (sp *queuedSpanProcessor) onItemDropped(item *queueItem, err error) { + _ = stats.RecordWithTags(item.ctx, []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, sp.name)}, processor.StatTraceBatchesDroppedCount.M(int64(1))) - processor.RecordsSpanCountMetrics(item.ctx, spanCountStats, processor.StatDroppedSpanCount) + processor.RecordsSpanCountMetrics(item.ctx, item.spanCountStats, processor.StatDroppedSpanCount) - obsreport.ProcessorTraceDataDropped(item.ctx, spanCountStats.GetAllSpansCount()) + obsreport.ProcessorTraceDataDropped(item.ctx, item.spanCountStats.GetAllSpansCount()) - sp.logger.Warn("Span batch dropped", zap.Int("#spans", spanCountStats.GetAllSpansCount())) + sp.logger.Error("Failed to process batch, discarding", zap.Int("batch-size", item.spanCountStats.GetAllSpansCount()), zap.Error(err)) } // Variables related to metrics specific to queued processor.