Skip to content

Commit

Permalink
2020 05 tracing bucket (#2676)
Browse files Browse the repository at this point in the history
* Added tracing bucket and cache.

Signed-off-by: Peter Štibraný <[email protected]>

* Don't finish span before calling function.

Signed-off-by: Peter Štibraný <[email protected]>

* Added extra details.

Signed-off-by: Peter Štibraný <[email protected]>

* Configure store to use tracing bucket and cache.

Signed-off-by: Peter Štibraný <[email protected]>

* Don't use nameless embeds.

Signed-off-by: Peter Štibraný <[email protected]>

* Comments, headers.

Signed-off-by: Peter Štibraný <[email protected]>

* Fix compilation error after recent changes.

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored May 29, 2020
1 parent 63ef382 commit c5dd131
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 11 deletions.
41 changes: 41 additions & 0 deletions pkg/cache/tracing_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"time"

"github.com/opentracing/opentracing-go"

"github.com/thanos-io/thanos/pkg/tracing"
)

// TracingCache includes Fetch operation in the traces.
type TracingCache struct {
c Cache
}

func NewTracingCache(cache Cache) Cache {
return TracingCache{c: cache}
}

func (t TracingCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
t.c.Store(ctx, data, ttl)
}

func (t TracingCache) Fetch(ctx context.Context, keys []string) (result map[string][]byte) {
tracing.DoWithSpan(ctx, "cache_fetch", func(spanCtx context.Context, span opentracing.Span) {
span.LogKV("requested keys", len(keys))

result = t.c.Fetch(spanCtx, keys)

bytes := 0
for _, v := range result {
bytes += len(v)
}
span.LogKV("returned keys", len(result), "returned bytes", bytes)
})
return
}
2 changes: 1 addition & 1 deletion pkg/objstore/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,5 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registe
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("create %s client", bucketConf.Type))
}
return objstore.BucketWithMetrics(bucket.Name(), bucket, reg), nil
return objstore.NewTracingBucket(objstore.BucketWithMetrics(bucket.Name(), bucket, reg)), nil
}
143 changes: 143 additions & 0 deletions pkg/objstore/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package objstore

import (
"context"
"io"

"github.com/opentracing/opentracing-go"

"github.com/thanos-io/thanos/pkg/tracing"
)

// TracingBucket includes bucket operations in the traces.
type TracingBucket struct {
bkt Bucket
}

func NewTracingBucket(bkt Bucket) InstrumentedBucket {
return TracingBucket{bkt: bkt}
}

func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) error) (err error) {
tracing.DoWithSpan(ctx, "bucket_iter", func(spanCtx context.Context, span opentracing.Span) {
span.LogKV("dir", dir)
err = t.bkt.Iter(spanCtx, dir, f)
})
return
}

func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
span, spanCtx := tracing.StartSpan(ctx, "bucket_get")
span.LogKV("name", name)

r, err := t.bkt.Get(spanCtx, name)
if err != nil {
span.LogKV("err", err)
span.Finish()
return nil, err
}

return &tracingReadCloser{r: r, s: span}, nil
}

func (t TracingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
span, spanCtx := tracing.StartSpan(ctx, "bucket_getrange")
span.LogKV("name", name, "offset", off, "length", length)

r, err := t.bkt.GetRange(spanCtx, name, off, length)
if err != nil {
span.LogKV("err", err)
span.Finish()
return nil, err
}

return &tracingReadCloser{r: r, s: span}, nil
}

func (t TracingBucket) Exists(ctx context.Context, name string) (exists bool, err error) {
tracing.DoWithSpan(ctx, "bucket_exists", func(spanCtx context.Context, span opentracing.Span) {
span.LogKV("name", name)
exists, err = t.bkt.Exists(spanCtx, name)
})
return
}

func (t TracingBucket) Attributes(ctx context.Context, name string) (attrs ObjectAttributes, err error) {
tracing.DoWithSpan(ctx, "bucket_attributes", func(spanCtx context.Context, span opentracing.Span) {
span.LogKV("name", name)
attrs, err = t.bkt.Attributes(spanCtx, name)
})
return
}

func (t TracingBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) {
tracing.DoWithSpan(ctx, "bucket_upload", func(spanCtx context.Context, span opentracing.Span) {
span.LogKV("name", name)
err = t.bkt.Upload(spanCtx, name, r)
})
return
}

func (t TracingBucket) Delete(ctx context.Context, name string) (err error) {
tracing.DoWithSpan(ctx, "bucket_delete", func(spanCtx context.Context, span opentracing.Span) {
span.LogKV("name", name)
err = t.bkt.Delete(spanCtx, name)
})
return
}

func (t TracingBucket) Name() string {
return "tracing: " + t.bkt.Name()
}

func (t TracingBucket) Close() error {
return t.bkt.Close()
}

func (t TracingBucket) IsObjNotFoundErr(err error) bool {
return t.bkt.IsObjNotFoundErr(err)
}

func (t TracingBucket) WithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) Bucket {
if ib, ok := t.bkt.(InstrumentedBucket); ok {
return TracingBucket{bkt: ib.WithExpectedErrs(expectedFunc)}
}
return t
}

func (t TracingBucket) ReaderWithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) BucketReader {
return t.WithExpectedErrs(expectedFunc)
}

type tracingReadCloser struct {
r io.ReadCloser
s opentracing.Span
read int
}

func (t *tracingReadCloser) Read(p []byte) (int, error) {
n, err := t.r.Read(p)
if n > 0 {
t.read += n
}
if err != nil && err != io.EOF && t.s != nil {
t.s.LogKV("err", err)
}
return n, err
}

func (t *tracingReadCloser) Close() error {
err := t.r.Close()
if t.s != nil {
t.s.LogKV("read", t.read)
if err != nil {
t.s.LogKV("close err", err)
}
t.s.Finish()
t.s = nil
}
return err
}
10 changes: 1 addition & 9 deletions pkg/store/cache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/thanos-io/thanos/pkg/cache"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/tracing"
)

const (
Expand Down Expand Up @@ -280,14 +279,7 @@ func (cb *CachingBucket) GetRange(ctx context.Context, name string, off, length
return cb.Bucket.GetRange(ctx, name, off, length)
}

var (
r io.ReadCloser
err error
)
tracing.DoInSpan(ctx, "cachingbucket_getrange", func(ctx context.Context) {
r, err = cb.cachedGetRange(ctx, name, off, length, cfgName, cfg)
})
return r, err
return cb.cachedGetRange(ctx, name, off, length, cfgName, cfg)
}

func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/store/cache/caching_bucket_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func NewCachingBucketFromYaml(yamlContent []byte, bucket objstore.Bucket, logger
return nil, errors.Errorf("unsupported cache type: %s", config.Type)
}

// Include interactions with cache in the traces.
c = cache.NewTracingCache(c)
cfg := NewCachingBucketConfig()

// Configure cache.
Expand Down
10 changes: 9 additions & 1 deletion pkg/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ func StartSpan(ctx context.Context, operationName string, opts ...opentracing.St
// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification.
func DoInSpan(ctx context.Context, operationName string, doFn func(context.Context), opts ...opentracing.StartSpanOption) {
span, newCtx := StartSpan(ctx, operationName, opts...)
defer doFn(newCtx)
defer span.Finish()
doFn(newCtx)
}

// DoWithSpan executes function doFn inside new span with `operationName` name and hooking as child to a span found within given context if any.
// It uses opentracing.Tracer propagated in context. If no found, it uses noop tracer notification.
func DoWithSpan(ctx context.Context, operationName string, doFn func(context.Context, opentracing.Span), opts ...opentracing.StartSpanOption) {
span, newCtx := StartSpan(ctx, operationName, opts...)
defer span.Finish()
doFn(newCtx, span)
}

0 comments on commit c5dd131

Please sign in to comment.