Skip to content

Commit

Permalink
compactor: Put important compaction logs as info; init object store m…
Browse files Browse the repository at this point in the history
…etrics. (#2162)

Signed-off-by: Bartlomiej Plotka <[email protected]>
  • Loading branch information
bwplotka authored Feb 21, 2020
1 parent f278950 commit 7515974
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 45 deletions.
4 changes: 2 additions & 2 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if overlappingBlocks {
cg.verticalCompactions.Inc()
}
level.Debug(cg.logger).Log("msg", "compacted blocks",
level.Info(cg.logger).Log("msg", "compacted blocks",
"blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin), "overlapping_blocks", overlappingBlocks)

bdir := filepath.Join(dir, compID.String())
Expand Down Expand Up @@ -732,7 +732,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (
if err := block.Upload(ctx, cg.logger, cg.bkt, bdir); err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "upload of %s failed", compID))
}
level.Debug(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin))
level.Info(cg.logger).Log("msg", "uploaded block", "result_block", compID, "duration", time.Since(begin))

// Delete the blocks we just compacted from the group and bucket so they do not get included
// into the next planning cycle.
Expand Down
90 changes: 47 additions & 43 deletions pkg/objstore/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,16 @@ func Exists(ctx context.Context, bkt Bucket, src string) (bool, error) {
return true, nil
}

const (
iterOp = "iter"
sizeOp = "objectsize"
getOp = "get"
getRangeOp = "get_range"
existsOp = "exists"
uploadOp = "upload"
deleteOp = "delete"
)

// BucketWithMetrics takes a bucket and registers metrics with the given registry for
// operations run against the bucket.
func BucketWithMetrics(name string, b Bucket, r prometheus.Registerer) Bucket {
Expand All @@ -212,131 +222,125 @@ func BucketWithMetrics(name string, b Bucket, r prometheus.Registerer) Bucket {
ConstLabels: prometheus.Labels{"bucket": name},
Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120},
}, []string{"operation"}),
lastSuccessfullUploadTime: prometheus.NewGaugeVec(prometheus.GaugeOpts{
lastSuccessfulUploadTime: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "thanos_objstore_bucket_last_successful_upload_time",
Help: "Second timestamp of the last successful upload to the bucket.",
}, []string{"bucket"}),
}
if r != nil {
r.MustRegister(bkt.ops, bkt.opsFailures, bkt.opsDuration, bkt.lastSuccessfullUploadTime)
r.MustRegister(bkt.ops, bkt.opsFailures, bkt.opsDuration, bkt.lastSuccessfulUploadTime)
for _, op := range []string{iterOp, sizeOp, getOp, getRangeOp, existsOp, uploadOp, deleteOp} {
bkt.ops.WithLabelValues(op)
bkt.opsFailures.WithLabelValues(op)
bkt.opsDuration.WithLabelValues(op)
}
bkt.lastSuccessfulUploadTime.WithLabelValues(b.Name())
}
return bkt
}

type metricBucket struct {
bkt Bucket

ops *prometheus.CounterVec
opsFailures *prometheus.CounterVec
opsDuration *prometheus.HistogramVec
lastSuccessfullUploadTime *prometheus.GaugeVec
ops *prometheus.CounterVec
opsFailures *prometheus.CounterVec
opsDuration *prometheus.HistogramVec
lastSuccessfulUploadTime *prometheus.GaugeVec
}

func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error) error {
const op = "iter"

err := b.bkt.Iter(ctx, dir, f)
if err != nil {
b.opsFailures.WithLabelValues(op).Inc()
b.opsFailures.WithLabelValues(iterOp).Inc()
}
b.ops.WithLabelValues(op).Inc()
b.ops.WithLabelValues(iterOp).Inc()

return err
}

// ObjectSize returns the size of the specified object.
func (b *metricBucket) ObjectSize(ctx context.Context, name string) (uint64, error) {
const op = "objectsize"
b.ops.WithLabelValues(op).Inc()
b.ops.WithLabelValues(sizeOp).Inc()
start := time.Now()

rc, err := b.bkt.ObjectSize(ctx, name)
if err != nil {
b.opsFailures.WithLabelValues(op).Inc()
b.opsFailures.WithLabelValues(sizeOp).Inc()
return 0, err
}
b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
b.opsDuration.WithLabelValues(sizeOp).Observe(time.Since(start).Seconds())
return rc, nil
}

func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
const op = "get"
b.ops.WithLabelValues(op).Inc()
b.ops.WithLabelValues(getOp).Inc()

rc, err := b.bkt.Get(ctx, name)
if err != nil {
b.opsFailures.WithLabelValues(op).Inc()
b.opsFailures.WithLabelValues(getOp).Inc()
return nil, err
}
rc = newTimingReadCloser(
return newTimingReadCloser(
rc,
op,
getOp,
b.opsDuration,
b.opsFailures,
)

return rc, nil
), nil
}

func (b *metricBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
const op = "get_range"
b.ops.WithLabelValues(op).Inc()
b.ops.WithLabelValues(getRangeOp).Inc()

rc, err := b.bkt.GetRange(ctx, name, off, length)
if err != nil {
b.opsFailures.WithLabelValues(op).Inc()
b.opsFailures.WithLabelValues(getRangeOp).Inc()
return nil, err
}
rc = newTimingReadCloser(
return newTimingReadCloser(
rc,
op,
getRangeOp,
b.opsDuration,
b.opsFailures,
)

return rc, nil
), nil
}

func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) {
const op = "exists"
start := time.Now()

ok, err := b.bkt.Exists(ctx, name)
if err != nil {
b.opsFailures.WithLabelValues(op).Inc()
b.opsFailures.WithLabelValues(existsOp).Inc()
}
b.ops.WithLabelValues(op).Inc()
b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
b.ops.WithLabelValues(existsOp).Inc()
b.opsDuration.WithLabelValues(existsOp).Observe(time.Since(start).Seconds())

return ok, err
}

func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error {
const op = "upload"
start := time.Now()

err := b.bkt.Upload(ctx, name, r)
if err != nil {
b.opsFailures.WithLabelValues(op).Inc()
b.opsFailures.WithLabelValues(uploadOp).Inc()
} else {
b.lastSuccessfullUploadTime.WithLabelValues(b.bkt.Name()).SetToCurrentTime()
b.lastSuccessfulUploadTime.WithLabelValues(b.bkt.Name()).SetToCurrentTime()
}
b.ops.WithLabelValues(op).Inc()
b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
b.ops.WithLabelValues(uploadOp).Inc()
b.opsDuration.WithLabelValues(uploadOp).Observe(time.Since(start).Seconds())

return err
}

func (b *metricBucket) Delete(ctx context.Context, name string) error {
const op = "delete"
start := time.Now()

err := b.bkt.Delete(ctx, name)
if err != nil {
b.opsFailures.WithLabelValues(op).Inc()
b.opsFailures.WithLabelValues(deleteOp).Inc()
}
b.ops.WithLabelValues(op).Inc()
b.opsDuration.WithLabelValues(op).Observe(time.Since(start).Seconds())
b.ops.WithLabelValues(deleteOp).Inc()
b.opsDuration.WithLabelValues(deleteOp).Observe(time.Since(start).Seconds())

return err
}
Expand Down

0 comments on commit 7515974

Please sign in to comment.