From 7515974b48cd05f5143a8968013ee389e4de4ed1 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 21 Feb 2020 10:56:58 +0000 Subject: [PATCH] compactor: Put important compaction logs as info; init object store metrics. (#2162) Signed-off-by: Bartlomiej Plotka --- pkg/compact/compact.go | 4 +- pkg/objstore/objstore.go | 90 +++++++++++++++++++++------------------- 2 files changed, 49 insertions(+), 45 deletions(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 3a963783f8..b268724e10 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -690,7 +690,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( if overlappingBlocks { cg.verticalCompactions.Inc() } - level.Debug(cg.logger).Log("msg", "compacted blocks", + level.Info(cg.logger).Log("msg", "compacted blocks", "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin), "overlapping_blocks", overlappingBlocks) bdir := filepath.Join(dir, compID.String()) @@ -732,7 +732,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil { return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID)) } - level.Debug(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin)) + level.Info(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin)) // Delete the blocks we just compacted from the group and bucket so they do not get included // into the next planning cycle. diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index e187be3c62..ef8fc4ca4a 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -188,6 +188,16 @@ func Exists(ctx context.Context, bkt Bucket, src string) (bool, error) { return true, nil } +const ( + iterOp = "iter" + sizeOp = "objectsize" + getOp = "get" + getRangeOp = "get_range" + existsOp = "exists" + uploadOp = "upload" + deleteOp = "delete" +) + // BucketWithMetrics takes a bucket and registers metrics with the given registry for // operations run against the bucket. func BucketWithMetrics(name string, b Bucket, r prometheus.Registerer) Bucket { @@ -212,13 +222,19 @@ func BucketWithMetrics(name string, b Bucket, r prometheus.Registerer) Bucket { ConstLabels: prometheus.Labels{"bucket": name}, Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, }, []string{"operation"}), - lastSuccessfullUploadTime: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + lastSuccessfulUploadTime: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "thanos_objstore_bucket_last_successful_upload_time", Help: "Second timestamp of the last successful upload to the bucket.", }, []string{"bucket"}), } if r != nil { - r.MustRegister(bkt.ops, bkt.opsFailures, bkt.opsDuration, bkt.lastSuccessfullUploadTime) + r.MustRegister(bkt.ops, bkt.opsFailures, bkt.opsDuration, bkt.lastSuccessfulUploadTime) + for _, op := range []string{iterOp, sizeOp, getOp, getRangeOp, existsOp, uploadOp, deleteOp} { + bkt.ops.WithLabelValues(op) + bkt.opsFailures.WithLabelValues(op) + bkt.opsDuration.WithLabelValues(op) + } + bkt.lastSuccessfulUploadTime.WithLabelValues(b.Name()) } return bkt } @@ -226,117 +242,105 @@ func BucketWithMetrics(name string, b Bucket, r prometheus.Registerer) Bucket { type metricBucket struct { bkt Bucket - ops *prometheus.CounterVec - opsFailures *prometheus.CounterVec - opsDuration *prometheus.HistogramVec - lastSuccessfullUploadTime *prometheus.GaugeVec + ops *prometheus.CounterVec + opsFailures *prometheus.CounterVec + opsDuration *prometheus.HistogramVec + lastSuccessfulUploadTime *prometheus.GaugeVec } func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error) error { - const op = "iter" - err := b.bkt.Iter(ctx, dir, f) if err != nil { - b.opsFailures.WithLabelValues(op).Inc() + b.opsFailures.WithLabelValues(iterOp).Inc() } - b.ops.WithLabelValues(op).Inc() + b.ops.WithLabelValues(iterOp).Inc() return err } // ObjectSize returns the size of the specified object. func (b *metricBucket) ObjectSize(ctx context.Context, name string) (uint64, error) { - const op = "objectsize" - b.ops.WithLabelValues(op).Inc() + b.ops.WithLabelValues(sizeOp).Inc() start := time.Now() rc, err := b.bkt.ObjectSize(ctx, name) if err != nil { - b.opsFailures.WithLabelValues(op).Inc() + b.opsFailures.WithLabelValues(sizeOp).Inc() return 0, err } - b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) + b.opsDuration.WithLabelValues(sizeOp).Observe(time.Since(start).Seconds()) return rc, nil } func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - const op = "get" - b.ops.WithLabelValues(op).Inc() + b.ops.WithLabelValues(getOp).Inc() rc, err := b.bkt.Get(ctx, name) if err != nil { - b.opsFailures.WithLabelValues(op).Inc() + b.opsFailures.WithLabelValues(getOp).Inc() return nil, err } - rc = newTimingReadCloser( + return newTimingReadCloser( rc, - op, + getOp, b.opsDuration, b.opsFailures, - ) - - return rc, nil + ), nil } func (b *metricBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - const op = "get_range" - b.ops.WithLabelValues(op).Inc() + b.ops.WithLabelValues(getRangeOp).Inc() rc, err := b.bkt.GetRange(ctx, name, off, length) if err != nil { - b.opsFailures.WithLabelValues(op).Inc() + b.opsFailures.WithLabelValues(getRangeOp).Inc() return nil, err } - rc = newTimingReadCloser( + return newTimingReadCloser( rc, - op, + getRangeOp, b.opsDuration, b.opsFailures, - ) - - return rc, nil + ), nil } func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) { - const op = "exists" start := time.Now() ok, err := b.bkt.Exists(ctx, name) if err != nil { - b.opsFailures.WithLabelValues(op).Inc() + b.opsFailures.WithLabelValues(existsOp).Inc() } - b.ops.WithLabelValues(op).Inc() - b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) + b.ops.WithLabelValues(existsOp).Inc() + b.opsDuration.WithLabelValues(existsOp).Observe(time.Since(start).Seconds()) return ok, err } func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error { - const op = "upload" start := time.Now() err := b.bkt.Upload(ctx, name, r) if err != nil { - b.opsFailures.WithLabelValues(op).Inc() + b.opsFailures.WithLabelValues(uploadOp).Inc() } else { - b.lastSuccessfullUploadTime.WithLabelValues(b.bkt.Name()).SetToCurrentTime() + b.lastSuccessfulUploadTime.WithLabelValues(b.bkt.Name()).SetToCurrentTime() } - b.ops.WithLabelValues(op).Inc() - b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) + b.ops.WithLabelValues(uploadOp).Inc() + b.opsDuration.WithLabelValues(uploadOp).Observe(time.Since(start).Seconds()) return err } func (b *metricBucket) Delete(ctx context.Context, name string) error { - const op = "delete" start := time.Now() err := b.bkt.Delete(ctx, name) if err != nil { - b.opsFailures.WithLabelValues(op).Inc() + b.opsFailures.WithLabelValues(deleteOp).Inc() } - b.ops.WithLabelValues(op).Inc() - b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds()) + b.ops.WithLabelValues(deleteOp).Inc() + b.opsDuration.WithLabelValues(deleteOp).Observe(time.Since(start).Seconds()) return err }