diff --git a/internal/pkg/service/stream/storage/metacleanup/metacleanup.go b/internal/pkg/service/stream/storage/metacleanup/metacleanup.go index 35180e25ee..e6fbba2909 100644 --- a/internal/pkg/service/stream/storage/metacleanup/metacleanup.go +++ b/internal/pkg/service/stream/storage/metacleanup/metacleanup.go @@ -180,7 +180,7 @@ func (n *Node) cleanMetadata(ctx context.Context) (err error) { // Log/trace job details attrs := job.Telemetry() - ctx = ctxattr.ContextWith(ctx, attrs...) + ctx := ctxattr.ContextWith(ctx, attrs...) // Trace each job ctx, span := n.telemetry.Tracer().Start(ctx, "keboola.go.stream.model.cleanup.metadata.cleanJob") diff --git a/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go b/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go index 81261a27bd..5c74e29bf5 100644 --- a/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go +++ b/internal/pkg/service/stream/storage/node/coordinator/filerotation/operator.go @@ -497,7 +497,7 @@ func (o *operator) waitForFileClosing(ctx context.Context, file *fileData) (stat // Make sure the statistics cache is up-to-date if err := o.statisticsCache.WaitForRevision(ctx, file.ModRevision); err != nil { - return statistics.Aggregated{}, errors.PrefixError(err, "error when waiting for statistics cache revision") + return statistics.Aggregated{}, errors.PrefixErrorf(err, "error when waiting for statistics cache revision, actual: %v, expected: %v", o.statisticsCache.Revision(), file.ModRevision) } // Get file statistics