Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Extract out data store metrics for re-usability #138

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions storage/cached_rawstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (

"github.com/flyteorg/flytestdlib/errors"

"github.com/coocood/freecache"
"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flytestdlib/promutils"

"github.com/coocood/freecache"
"github.com/flyteorg/flytestdlib/ioutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
)

const neverExpire = 0
Expand All @@ -31,7 +30,6 @@ type cacheMetrics struct {
type cachedRawStore struct {
RawStore
cache *freecache.Cache
scope promutils.Scope
metrics *cacheMetrics
}

Expand Down Expand Up @@ -102,24 +100,26 @@ func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference,
return err
}

func newCacheMetrics(scope promutils.Scope) *cacheMetrics {
return &cacheMetrics{
FetchLatency: scope.MustNewStopWatch("remote_fetch", "Total Time to read from remote metastore", time.Millisecond),
CacheHit: scope.MustNewCounter("cache_hit", "Number of times metadata was found in cache"),
CacheMiss: scope.MustNewCounter("cache_miss", "Number of times metadata was not found in cache and remote fetch was required"),
CacheWriteError: scope.MustNewCounter("cache_write_err", "Failed to write to cache"),
}
}

// Creates a CachedStore if Caching is enabled, otherwise returns a RawStore
func newCachedRawStore(cfg *Config, store RawStore, scope promutils.Scope) RawStore {
func newCachedRawStore(cfg *Config, store RawStore, metrics *cacheMetrics) RawStore {
if cfg.Cache.MaxSizeMegabytes > 0 {
c := &cachedRawStore{
RawStore: store,
cache: freecache.NewCache(cfg.Cache.MaxSizeMegabytes * 1024 * 1024),
scope: scope,
metrics: &cacheMetrics{
FetchLatency: scope.MustNewStopWatch("remote_fetch", "Total Time to read from remote metastore", time.Millisecond),
CacheHit: scope.MustNewCounter("cache_hit", "Number of times metadata was found in cache"),
CacheMiss: scope.MustNewCounter("cache_miss", "Number of times metadata was not found in cache and remote fetch was required"),
CacheWriteError: scope.MustNewCounter("cache_write_err", "Failed to write to cache"),
},
}
if cfg.Cache.TargetGCPercent > 0 {
debug.SetGCPercent(cfg.Cache.TargetGCPercent)
}
return c
return &cachedRawStore{
RawStore: store,
cache: freecache.NewCache(cfg.Cache.MaxSizeMegabytes * 1024 * 1024),
metrics: metrics,
}
}
return store
}
35 changes: 10 additions & 25 deletions storage/cached_rawstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,57 +10,44 @@ import (
"runtime/debug"
"testing"

"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flytestdlib/ioutils"
"github.com/stretchr/testify/assert"
)

func TestNewCachedStore(t *testing.T) {
resetMetricKeys()

t.Run("CachingDisabled", func(t *testing.T) {
testScope := promutils.NewTestScope()
cfg := &Config{}
assert.Nil(t, newCachedRawStore(cfg, nil, testScope))
store, err := NewInMemoryRawStore(cfg, testScope)
assert.Nil(t, newCachedRawStore(cfg, nil, metrics.cacheMetrics))
store, err := NewInMemoryRawStore(cfg, metrics)
assert.NoError(t, err)
assert.Equal(t, store, newCachedRawStore(cfg, store, testScope))
assert.Equal(t, store, newCachedRawStore(cfg, store, metrics.cacheMetrics))
})

t.Run("CachingEnabled", func(t *testing.T) {
testScope := promutils.NewTestScope()
cfg := &Config{
Cache: CachingConfig{
MaxSizeMegabytes: 1,
TargetGCPercent: 20,
},
}
store, err := NewInMemoryRawStore(cfg, testScope)
store, err := NewInMemoryRawStore(cfg, metrics)
assert.NoError(t, err)
cStore := newCachedRawStore(cfg, store, testScope)
cStore := newCachedRawStore(cfg, store, metrics.cacheMetrics)
assert.Equal(t, 20, debug.SetGCPercent(100))
assert.NotNil(t, cStore)
assert.NotNil(t, cStore.(*cachedRawStore).cache)
})
}

func resetMetricKeys() {
labeled.UnsetMetricKeys()
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey)
}

func dummyCacheStore(t *testing.T, store RawStore, scope promutils.Scope) *cachedRawStore {
func dummyCacheStore(t *testing.T, store RawStore, metrics *cacheMetrics) *cachedRawStore {
cfg := &Config{
Cache: CachingConfig{
MaxSizeMegabytes: 1,
TargetGCPercent: 20,
},
}
cStore := newCachedRawStore(cfg, store, scope)
cStore := newCachedRawStore(cfg, store, metrics)
assert.NotNil(t, cStore)
return cStore.(*cachedRawStore)
}
Expand Down Expand Up @@ -94,7 +81,6 @@ func (d *dummyStore) WriteRaw(ctx context.Context, reference DataReference, size
}

func TestCachedRawStore(t *testing.T) {
resetMetricKeys()
ctx := context.TODO()
k1 := DataReference("k1")
k2 := DataReference("k2")
Expand Down Expand Up @@ -145,11 +131,10 @@ func TestCachedRawStore(t *testing.T) {
return nil, fmt.Errorf("err")
},
}
testScope := promutils.NewTestScope()

store.copyImpl = newCopyImpl(store, testScope.NewSubScope("copy"))
store.copyImpl = newCopyImpl(store, metrics.copyMetrics)

cStore := dummyCacheStore(t, store, testScope.NewSubScope("x"))
cStore := dummyCacheStore(t, store, metrics.cacheMetrics)

t.Run("HeadExists", func(t *testing.T) {
m, err := cStore.Head(ctx, k1)
Expand Down
14 changes: 7 additions & 7 deletions storage/copy_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import (
"io"
"time"

"github.com/flyteorg/flytestdlib/logger"
errs "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flytestdlib/ioutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
errs "github.com/pkg/errors"
)

type copyImpl struct {
rawStore RawStore
metrics copyMetrics
metrics *copyMetrics
}

type copyMetrics struct {
Expand Down Expand Up @@ -64,18 +64,18 @@ func (c copyImpl) CopyRaw(ctx context.Context, source, destination DataReference
return nil
}

func newCopyMetrics(scope promutils.Scope) copyMetrics {
return copyMetrics{
func newCopyMetrics(scope promutils.Scope) *copyMetrics {
return &copyMetrics{
CopyLatency: labeled.NewStopWatch("overall", "Overall copy latency", time.Millisecond, scope, labeled.EmitUnlabeledMetric),
ComputeLengthLatency: labeled.NewStopWatch("length", "Latency involved in computing length of content before writing.", time.Millisecond, scope, labeled.EmitUnlabeledMetric),
WriteFailureUnrelatedToCache: scope.MustNewCounter("write_failure_unrelated_to_cache", "Raw store write failures that are not caused by ErrFailedToWriteCache"),
ReadFailureUnrelatedToCache: scope.MustNewCounter("read_failure_unrelated_to_cache", "Raw store read failures that are not caused by ErrFailedToWriteCache"),
}
}

func newCopyImpl(store RawStore, metricsScope promutils.Scope) copyImpl {
iaroslav-ciupin marked this conversation as resolved.
Show resolved Hide resolved
func newCopyImpl(store RawStore, metrics *copyMetrics) copyImpl {
return copyImpl{
rawStore: store,
metrics: newCopyMetrics(metricsScope.NewSubScope("copy")),
metrics: metrics,
}
}
14 changes: 6 additions & 8 deletions storage/copy_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/flyteorg/flytestdlib/errors"

"github.com/flyteorg/flytestdlib/ioutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flytestdlib/ioutils"
)

type notSeekerReader struct {
Expand Down Expand Up @@ -44,7 +44,6 @@ func newNotSeekerReader(bytesCount int) *notSeekerReader {
}

func TestCopyRaw(t *testing.T) {
resetMetricKeys()
t.Run("Called", func(t *testing.T) {
readerCalled := false
writerCalled := false
Expand All @@ -59,7 +58,7 @@ func TestCopyRaw(t *testing.T) {
},
}

copier := newCopyImpl(&store, promutils.NewTestScope())
copier := newCopyImpl(&store, metrics.copyMetrics)
assert.NoError(t, copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{}))
assert.True(t, readerCalled)
assert.True(t, writerCalled)
Expand All @@ -79,15 +78,14 @@ func TestCopyRaw(t *testing.T) {
},
}

copier := newCopyImpl(&store, promutils.NewTestScope())
copier := newCopyImpl(&store, metrics.copyMetrics)
assert.NoError(t, copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{}))
assert.True(t, readerCalled)
assert.True(t, writerCalled)
})
}

func TestCopyRaw_CachingErrorHandling(t *testing.T) {
resetMetricKeys()
t.Run("CopyRaw with Caching Error", func(t *testing.T) {
readerCalled := false
writerCalled := false
Expand All @@ -108,7 +106,7 @@ func TestCopyRaw_CachingErrorHandling(t *testing.T) {
},
}

copier := newCopyImpl(&store, promutils.NewTestScope())
copier := newCopyImpl(&store, metrics.copyMetrics)
assert.NoError(t, copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{}))
assert.True(t, readerCalled)
assert.True(t, writerCalled)
Expand All @@ -134,7 +132,7 @@ func TestCopyRaw_CachingErrorHandling(t *testing.T) {
},
}

copier := newCopyImpl(&store, promutils.NewTestScope())
copier := newCopyImpl(&store, metrics.copyMetrics)
err = copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{})
assert.Error(t, err)
assert.True(t, readerCalled)
Expand Down
15 changes: 15 additions & 0 deletions storage/init_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package storage

import (
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
)

var metrics *dataStoreMetrics

func init() {
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey)
scope := promutils.NewTestScope()
metrics = newDataStoreMetrics(scope)
}
6 changes: 2 additions & 4 deletions storage/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"io"
"io/ioutil"
"os"

"github.com/flyteorg/flytestdlib/promutils"
)

type rawFile = []byte
Expand Down Expand Up @@ -70,11 +68,11 @@ func (s *InMemoryStore) CreateSignedURL(ctx context.Context, reference DataRefer
return SignedURLResponse{}, fmt.Errorf("unsupported")
}

func NewInMemoryRawStore(_ *Config, scope promutils.Scope) (RawStore, error) {
func NewInMemoryRawStore(_ *Config, metrics *dataStoreMetrics) (RawStore, error) {
self := &InMemoryStore{
cache: map[DataReference]rawFile{},
}

self.copyImpl = newCopyImpl(self, scope)
self.copyImpl = newCopyImpl(self, metrics.copyMetrics)
return self, nil
}
17 changes: 5 additions & 12 deletions storage/mem_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,20 @@ import (
"context"
"testing"

"github.com/flyteorg/flytestdlib/promutils"

"github.com/stretchr/testify/assert"
)

func TestInMemoryStore_Head(t *testing.T) {
t.Run("Empty store", func(t *testing.T) {
testScope := promutils.NewTestScope()
s, err := NewInMemoryRawStore(&Config{}, testScope)
s, err := NewInMemoryRawStore(&Config{}, metrics)
assert.NoError(t, err)
metadata, err := s.Head(context.TODO(), DataReference("hello"))
assert.NoError(t, err)
assert.False(t, metadata.Exists())
})

t.Run("Existing Item", func(t *testing.T) {
testScope := promutils.NewTestScope()
s, err := NewInMemoryRawStore(&Config{}, testScope)
s, err := NewInMemoryRawStore(&Config{}, metrics)
assert.NoError(t, err)
err = s.WriteRaw(context.TODO(), DataReference("hello"), 0, Options{}, bytes.NewReader([]byte{}))
assert.NoError(t, err)
Expand All @@ -35,8 +31,7 @@ func TestInMemoryStore_Head(t *testing.T) {

func TestInMemoryStore_ReadRaw(t *testing.T) {
t.Run("Empty store", func(t *testing.T) {
testScope := promutils.NewTestScope()
s, err := NewInMemoryRawStore(&Config{}, testScope)
s, err := NewInMemoryRawStore(&Config{}, metrics)
assert.NoError(t, err)

raw, err := s.ReadRaw(context.TODO(), DataReference("hello"))
Expand All @@ -45,8 +40,7 @@ func TestInMemoryStore_ReadRaw(t *testing.T) {
})

t.Run("Existing Item", func(t *testing.T) {
testScope := promutils.NewTestScope()
s, err := NewInMemoryRawStore(&Config{}, testScope)
s, err := NewInMemoryRawStore(&Config{}, metrics)
assert.NoError(t, err)

err = s.WriteRaw(context.TODO(), DataReference("hello"), 0, Options{}, bytes.NewReader([]byte{}))
Expand All @@ -58,8 +52,7 @@ func TestInMemoryStore_ReadRaw(t *testing.T) {
}

func TestInMemoryStore_Clear(t *testing.T) {
testScope := promutils.NewTestScope()
m, err := NewInMemoryRawStore(&Config{}, testScope)
m, err := NewInMemoryRawStore(&Config{}, metrics)
assert.NoError(t, err)

mStore := m.(*InMemoryStore)
Expand Down
Loading