Skip to content

Commit

Permalink
Reuse operation name constants for consistency
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <[email protected]>
  • Loading branch information
kakkoyun committed Aug 5, 2020
1 parent 040b69b commit e468a1a
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 84 deletions.
49 changes: 25 additions & 24 deletions pkg/objstore/objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/thanos-io/thanos/pkg/runutil"
)

const (
OpIter = "iter"
OpGet = "get"
OpGetRange = "get_range"
OpExists = "exists"
OpUpload = "upload"
OpDelete = "delete"
OpAttributes = "attributes"
)

// Bucket provides read and write access to an object storage bucket.
// NOTE: We assume strong consistency for write-read flow.
type Bucket interface {
Expand Down Expand Up @@ -218,16 +229,6 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, src,
return nil
}

const (
iterOp = "iter"
getOp = "get"
getRangeOp = "get_range"
existsOp = "exists"
uploadOp = "upload"
deleteOp = "delete"
attributesOp = "attributes"
)

// IsOpFailureExpectedFunc allows to mark certain errors as expected, so they will not increment thanos_objstore_bucket_operation_failures_total metric.
type IsOpFailureExpectedFunc func(error) bool

Expand Down Expand Up @@ -263,13 +264,13 @@ func BucketWithMetrics(name string, b Bucket, reg prometheus.Registerer) *metric
}, []string{"bucket"}),
}
for _, op := range []string{
iterOp,
getOp,
getRangeOp,
existsOp,
uploadOp,
deleteOp,
attributesOp,
OpIter,
OpGet,
OpGetRange,
OpExists,
OpUpload,
OpDelete,
OpAttributes,
} {
bkt.ops.WithLabelValues(op)
bkt.opsFailures.WithLabelValues(op)
Expand Down Expand Up @@ -306,7 +307,7 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket
}

func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error) error {
const op = iterOp
const op = OpIter
b.ops.WithLabelValues(op).Inc()

err := b.bkt.Iter(ctx, dir, f)
Expand All @@ -317,7 +318,7 @@ func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string)
}

func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
const op = attributesOp
const op = OpAttributes
b.ops.WithLabelValues(op).Inc()

start := time.Now()
Expand All @@ -333,7 +334,7 @@ func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttri
}

func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
const op = getOp
const op = OpGet
b.ops.WithLabelValues(op).Inc()

rc, err := b.bkt.Get(ctx, name)
Expand All @@ -353,7 +354,7 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err
}

func (b *metricBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
const op = getRangeOp
const op = OpGetRange
b.ops.WithLabelValues(op).Inc()

rc, err := b.bkt.GetRange(ctx, name, off, length)
Expand All @@ -373,7 +374,7 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in
}

func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) {
const op = existsOp
const op = OpExists
b.ops.WithLabelValues(op).Inc()

start := time.Now()
Expand All @@ -389,7 +390,7 @@ func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) {
}

func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error {
const op = uploadOp
const op = OpUpload
b.ops.WithLabelValues(op).Inc()

start := time.Now()
Expand All @@ -405,7 +406,7 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err
}

func (b *metricBucket) Delete(ctx context.Context, name string) error {
const op = deleteOp
const op = OpDelete
b.ops.WithLabelValues(op).Inc()

start := time.Now()
Expand Down
57 changes: 29 additions & 28 deletions pkg/objstore/objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

promtest "github.com/prometheus/client_golang/prometheus/testutil"

"github.com/thanos-io/thanos/pkg/testutil"
)

Expand All @@ -18,21 +19,21 @@ func TestMetricBucket_Close(t *testing.T) {
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration))

AcceptanceTest(t, bkt.WithExpectedErrs(bkt.IsObjNotFoundErr))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(iterOp)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(attributesOp)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(getOp)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(getRangeOp)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGet)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGetRange)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpExists)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(2), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete)))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(attributesOp)))
testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getRangeOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpIter)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGet)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGetRange)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpExists)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpDelete)))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration))
lastUpload := promtest.ToFloat64(bkt.lastSuccessfulUploadTime)
Expand All @@ -41,23 +42,23 @@ func TestMetricBucket_Close(t *testing.T) {
// Clear bucket, but don't clear metrics to ensure we use same.
bkt.bkt = NewInMemBucket()
AcceptanceTest(t, bkt)
testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(iterOp)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(attributesOp)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(getOp)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(getRangeOp)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(existsOp)))
testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(uploadOp)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(deleteOp)))
testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(OpIter)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpAttributes)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGet)))
testutil.Equals(t, float64(6), promtest.ToFloat64(bkt.ops.WithLabelValues(OpGetRange)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpExists)))
testutil.Equals(t, float64(12), promtest.ToFloat64(bkt.ops.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(4), promtest.ToFloat64(bkt.ops.WithLabelValues(OpDelete)))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.ops))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(iterOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpIter)))
// Not expected not found error here.
testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(attributesOp)))
testutil.Equals(t, float64(1), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpAttributes)))
// Not expected not found errors, this should increment failure metric on get for not found as well, so +2.
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(getRangeOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(existsOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(uploadOp)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(deleteOp)))
testutil.Equals(t, float64(3), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGet)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpGetRange)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpExists)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpUpload)))
testutil.Equals(t, float64(0), promtest.ToFloat64(bkt.opsFailures.WithLabelValues(OpDelete)))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsFailures))
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration))
testutil.Assert(t, promtest.ToFloat64(bkt.lastSuccessfulUploadTime) > lastUpload)
Expand Down
30 changes: 12 additions & 18 deletions pkg/store/cache/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ import (
const (
originCache = "cache"
originBucket = "bucket"

opGet = "get"
opGetRange = "getrange"
opIter = "iter"
opExists = "exists"
opAttributes = "attributes"
)

var errObjNotFound = errors.Errorf("object not found")
Expand Down Expand Up @@ -97,7 +91,7 @@ func NewCachingBucket(b objstore.Bucket, cfg *CachingBucketConfig, logger log.Lo
cb.operationRequests.WithLabelValues(op, n)
cb.operationHits.WithLabelValues(op, n)

if op == opGetRange {
if op == objstore.OpGetRange {
cb.requestedGetRangeBytes.WithLabelValues(n)
cb.fetchedGetRangeBytes.WithLabelValues(originCache, n)
cb.fetchedGetRangeBytes.WithLabelValues(originBucket, n)
Expand Down Expand Up @@ -135,14 +129,14 @@ func (cb *CachingBucket) Iter(ctx context.Context, dir string, f func(string) er
return cb.Bucket.Iter(ctx, dir, f)
}

cb.operationRequests.WithLabelValues(opIter, cfgName).Inc()
cb.operationRequests.WithLabelValues(objstore.OpIter, cfgName).Inc()

key := cachingKeyIter(dir)
data := cfg.cache.Fetch(ctx, []string{key})
if data[key] != nil {
list, err := cfg.codec.Decode(data[key])
if err == nil {
cb.operationHits.WithLabelValues(opIter, cfgName).Inc()
cb.operationHits.WithLabelValues(objstore.OpIter, cfgName).Inc()
for _, n := range list {
if err := f(n); err != nil {
return err
Expand Down Expand Up @@ -180,15 +174,15 @@ func (cb *CachingBucket) Exists(ctx context.Context, name string) (bool, error)
return cb.Bucket.Exists(ctx, name)
}

cb.operationRequests.WithLabelValues(opExists, cfgName).Inc()
cb.operationRequests.WithLabelValues(objstore.OpExists, cfgName).Inc()

key := cachingKeyExists(name)
hits := cfg.cache.Fetch(ctx, []string{key})

if ex := hits[key]; ex != nil {
exists, err := strconv.ParseBool(string(ex))
if err == nil {
cb.operationHits.WithLabelValues(opExists, cfgName).Inc()
cb.operationHits.WithLabelValues(objstore.OpExists, cfgName).Inc()
return exists, nil
}
level.Warn(cb.logger).Log("msg", "unexpected cached 'exists' value", "key", key, "val", string(ex))
Expand Down Expand Up @@ -222,21 +216,21 @@ func (cb *CachingBucket) Get(ctx context.Context, name string) (io.ReadCloser, e
return cb.Bucket.Get(ctx, name)
}

cb.operationRequests.WithLabelValues(opGet, cfgName).Inc()
cb.operationRequests.WithLabelValues(objstore.OpGet, cfgName).Inc()

contentKey := cachingKeyContent(name)
existsKey := cachingKeyExists(name)

hits := cfg.cache.Fetch(ctx, []string{contentKey, existsKey})
if hits[contentKey] != nil {
cb.operationHits.WithLabelValues(opGet, cfgName).Inc()
cb.operationHits.WithLabelValues(objstore.OpGet, cfgName).Inc()
return ioutil.NopCloser(bytes.NewReader(hits[contentKey])), nil
}

// If we know that file doesn't exist, we can return that. Useful for deletion marks.
if ex := hits[existsKey]; ex != nil {
if exists, err := strconv.ParseBool(string(ex)); err == nil && !exists {
cb.operationHits.WithLabelValues(opGet, cfgName).Inc()
cb.operationHits.WithLabelValues(objstore.OpGet, cfgName).Inc()
return nil, errObjNotFound
}
}
Expand Down Expand Up @@ -294,14 +288,14 @@ func (cb *CachingBucket) Attributes(ctx context.Context, name string) (objstore.
func (cb *CachingBucket) cachedAttributes(ctx context.Context, name string, cfgName string, cache cache.Cache, ttl time.Duration) (objstore.ObjectAttributes, error) {
key := cachingKeyAttributes(name)

cb.operationRequests.WithLabelValues(opAttributes, cfgName).Inc()
cb.operationRequests.WithLabelValues(objstore.OpAttributes, cfgName).Inc()

hits := cache.Fetch(ctx, []string{key})
if raw, ok := hits[key]; ok {
var attrs objstore.ObjectAttributes
err := json.Unmarshal(raw, &attrs)
if err == nil {
cb.operationHits.WithLabelValues(opAttributes, cfgName).Inc()
cb.operationHits.WithLabelValues(objstore.OpAttributes, cfgName).Inc()
return attrs, nil
}

Expand All @@ -323,7 +317,7 @@ func (cb *CachingBucket) cachedAttributes(ctx context.Context, name string, cfgN
}

func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset, length int64, cfgName string, cfg *getRangeConfig) (io.ReadCloser, error) {
cb.operationRequests.WithLabelValues(opGetRange, cfgName).Inc()
cb.operationRequests.WithLabelValues(objstore.OpGetRange, cfgName).Inc()
cb.requestedGetRangeBytes.WithLabelValues(cfgName).Add(float64(length))

attrs, err := cb.cachedAttributes(ctx, name, cfgName, cfg.cache, cfg.attributesTTL)
Expand Down Expand Up @@ -376,7 +370,7 @@ func (cb *CachingBucket) cachedGetRange(ctx context.Context, name string, offset
totalCachedBytes += int64(len(b))
}
cb.fetchedGetRangeBytes.WithLabelValues(originCache, cfgName).Add(float64(totalCachedBytes))
cb.operationHits.WithLabelValues(opGetRange, cfgName).Add(float64(len(hits)) / float64(len(keys)))
cb.operationHits.WithLabelValues(objstore.OpGetRange, cfgName).Add(float64(len(hits)) / float64(len(keys)))

if len(hits) < len(keys) {
if hits == nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/store/cache/caching_bucket_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/thanos-io/thanos/pkg/cache"
"github.com/thanos-io/thanos/pkg/objstore"
)

// Codec for encoding and decoding results of Iter call.
Expand Down Expand Up @@ -145,19 +146,19 @@ func (cfg *CachingBucketConfig) CacheAttributes(configName string, cache cache.C
func (cfg *CachingBucketConfig) allConfigNames() map[string][]string {
result := map[string][]string{}
for n := range cfg.get {
result[opGet] = append(result[opGet], n)
result[objstore.OpGet] = append(result[objstore.OpGet], n)
}
for n := range cfg.iter {
result[opIter] = append(result[opIter], n)
result[objstore.OpIter] = append(result[objstore.OpIter], n)
}
for n := range cfg.exists {
result[opExists] = append(result[opExists], n)
result[objstore.OpExists] = append(result[objstore.OpExists], n)
}
for n := range cfg.getRange {
result[opGetRange] = append(result[opGetRange], n)
result[objstore.OpGetRange] = append(result[objstore.OpGetRange], n)
}
for n := range cfg.attributes {
result[opAttributes] = append(result[opAttributes], n)
result[objstore.OpAttributes] = append(result[objstore.OpAttributes], n)
}
return result
}
Expand Down
Loading

0 comments on commit e468a1a

Please sign in to comment.