diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index e766d703e03..821ac8f9d45 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -246,10 +246,10 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), if pq.readIndex == pq.writeIndex { return request, nil, false } + index := pq.readIndex // Increase here, so even if errors happen below, it always iterates pq.readIndex++ - pq.currentlyDispatchedItems = append(pq.currentlyDispatchedItems, index) getOp := storage.GetOperation(getItemKey(index)) err := pq.client.Batch(ctx, @@ -274,21 +274,26 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), // Increase the reference count, so the client is not closed while the request is being processed. pq.refClient++ return request, func(consumeErr error) { + // Delete the item from the persistent storage after it was processed. + pq.mu.Lock() + // Always unref client even if the consumer is shutdown because we always ref it for every valid request. + defer func() { + if err = pq.unrefClient(ctx); err != nil { + pq.set.Logger.Error("Error closing the storage client", zap.Error(err)) + } + pq.mu.Unlock() + }() + if errors.As(consumeErr, &shutdownErr{}) { // The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart. // TODO: Handle partially delivered requests by updating their values in the storage. return } - // Delete the item from the persistent storage after it was processed. - pq.mu.Lock() - defer pq.mu.Unlock() if err = pq.itemDispatchingFinish(ctx, index); err != nil { pq.set.Logger.Error("Error deleting item from queue", zap.Error(err)) } - if err = pq.unrefClient(ctx); err != nil { - pq.set.Logger.Error("Error closing the storage client", zap.Error(err)) - } + }, true }