Skip to content

Commit

Permalink
Extract out data store metrics for re-usability (flyteorg#138)
Browse files Browse the repository at this point in the history
* Extract out data store metrics for re-usability and add RefreshConfig method to DataStore
* Add some tests to improve coverage
  • Loading branch information
iaroslav-ciupin authored Aug 16, 2022
1 parent c9998d3 commit bcfa537
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 181 deletions.
34 changes: 17 additions & 17 deletions storage/cached_rawstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (

"github.com/flyteorg/flytestdlib/errors"

"github.com/coocood/freecache"
"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flytestdlib/promutils"

"github.com/coocood/freecache"
"github.com/flyteorg/flytestdlib/ioutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
)

const neverExpire = 0
Expand All @@ -31,7 +30,6 @@ type cacheMetrics struct {
type cachedRawStore struct {
RawStore
cache *freecache.Cache
scope promutils.Scope
metrics *cacheMetrics
}

Expand Down Expand Up @@ -102,24 +100,26 @@ func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference,
return err
}

func newCacheMetrics(scope promutils.Scope) *cacheMetrics {
return &cacheMetrics{
FetchLatency: scope.MustNewStopWatch("remote_fetch", "Total Time to read from remote metastore", time.Millisecond),
CacheHit: scope.MustNewCounter("cache_hit", "Number of times metadata was found in cache"),
CacheMiss: scope.MustNewCounter("cache_miss", "Number of times metadata was not found in cache and remote fetch was required"),
CacheWriteError: scope.MustNewCounter("cache_write_err", "Failed to write to cache"),
}
}

// Creates a CachedStore if Caching is enabled, otherwise returns a RawStore
func newCachedRawStore(cfg *Config, store RawStore, scope promutils.Scope) RawStore {
func newCachedRawStore(cfg *Config, store RawStore, metrics *cacheMetrics) RawStore {
if cfg.Cache.MaxSizeMegabytes > 0 {
c := &cachedRawStore{
RawStore: store,
cache: freecache.NewCache(cfg.Cache.MaxSizeMegabytes * 1024 * 1024),
scope: scope,
metrics: &cacheMetrics{
FetchLatency: scope.MustNewStopWatch("remote_fetch", "Total Time to read from remote metastore", time.Millisecond),
CacheHit: scope.MustNewCounter("cache_hit", "Number of times metadata was found in cache"),
CacheMiss: scope.MustNewCounter("cache_miss", "Number of times metadata was not found in cache and remote fetch was required"),
CacheWriteError: scope.MustNewCounter("cache_write_err", "Failed to write to cache"),
},
}
if cfg.Cache.TargetGCPercent > 0 {
debug.SetGCPercent(cfg.Cache.TargetGCPercent)
}
return c
return &cachedRawStore{
RawStore: store,
cache: freecache.NewCache(cfg.Cache.MaxSizeMegabytes * 1024 * 1024),
metrics: metrics,
}
}
return store
}
35 changes: 10 additions & 25 deletions storage/cached_rawstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,57 +10,44 @@ import (
"runtime/debug"
"testing"

"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flytestdlib/ioutils"
"github.com/stretchr/testify/assert"
)

func TestNewCachedStore(t *testing.T) {
resetMetricKeys()

t.Run("CachingDisabled", func(t *testing.T) {
testScope := promutils.NewTestScope()
cfg := &Config{}
assert.Nil(t, newCachedRawStore(cfg, nil, testScope))
store, err := NewInMemoryRawStore(cfg, testScope)
assert.Nil(t, newCachedRawStore(cfg, nil, metrics.cacheMetrics))
store, err := NewInMemoryRawStore(cfg, metrics)
assert.NoError(t, err)
assert.Equal(t, store, newCachedRawStore(cfg, store, testScope))
assert.Equal(t, store, newCachedRawStore(cfg, store, metrics.cacheMetrics))
})

t.Run("CachingEnabled", func(t *testing.T) {
testScope := promutils.NewTestScope()
cfg := &Config{
Cache: CachingConfig{
MaxSizeMegabytes: 1,
TargetGCPercent: 20,
},
}
store, err := NewInMemoryRawStore(cfg, testScope)
store, err := NewInMemoryRawStore(cfg, metrics)
assert.NoError(t, err)
cStore := newCachedRawStore(cfg, store, testScope)
cStore := newCachedRawStore(cfg, store, metrics.cacheMetrics)
assert.Equal(t, 20, debug.SetGCPercent(100))
assert.NotNil(t, cStore)
assert.NotNil(t, cStore.(*cachedRawStore).cache)
})
}

func resetMetricKeys() {
labeled.UnsetMetricKeys()
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey)
}

func dummyCacheStore(t *testing.T, store RawStore, scope promutils.Scope) *cachedRawStore {
func dummyCacheStore(t *testing.T, store RawStore, metrics *cacheMetrics) *cachedRawStore {
cfg := &Config{
Cache: CachingConfig{
MaxSizeMegabytes: 1,
TargetGCPercent: 20,
},
}
cStore := newCachedRawStore(cfg, store, scope)
cStore := newCachedRawStore(cfg, store, metrics)
assert.NotNil(t, cStore)
return cStore.(*cachedRawStore)
}
Expand Down Expand Up @@ -94,7 +81,6 @@ func (d *dummyStore) WriteRaw(ctx context.Context, reference DataReference, size
}

func TestCachedRawStore(t *testing.T) {
resetMetricKeys()
ctx := context.TODO()
k1 := DataReference("k1")
k2 := DataReference("k2")
Expand Down Expand Up @@ -145,11 +131,10 @@ func TestCachedRawStore(t *testing.T) {
return nil, fmt.Errorf("err")
},
}
testScope := promutils.NewTestScope()

store.copyImpl = newCopyImpl(store, testScope.NewSubScope("copy"))
store.copyImpl = newCopyImpl(store, metrics.copyMetrics)

cStore := dummyCacheStore(t, store, testScope.NewSubScope("x"))
cStore := dummyCacheStore(t, store, metrics.cacheMetrics)

t.Run("HeadExists", func(t *testing.T) {
m, err := cStore.Head(ctx, k1)
Expand Down
14 changes: 7 additions & 7 deletions storage/copy_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import (
"io"
"time"

"github.com/flyteorg/flytestdlib/logger"
errs "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/flyteorg/flytestdlib/ioutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
errs "github.com/pkg/errors"
)

type copyImpl struct {
rawStore RawStore
metrics copyMetrics
metrics *copyMetrics
}

type copyMetrics struct {
Expand Down Expand Up @@ -64,18 +64,18 @@ func (c copyImpl) CopyRaw(ctx context.Context, source, destination DataReference
return nil
}

func newCopyMetrics(scope promutils.Scope) copyMetrics {
return copyMetrics{
func newCopyMetrics(scope promutils.Scope) *copyMetrics {
return &copyMetrics{
CopyLatency: labeled.NewStopWatch("overall", "Overall copy latency", time.Millisecond, scope, labeled.EmitUnlabeledMetric),
ComputeLengthLatency: labeled.NewStopWatch("length", "Latency involved in computing length of content before writing.", time.Millisecond, scope, labeled.EmitUnlabeledMetric),
WriteFailureUnrelatedToCache: scope.MustNewCounter("write_failure_unrelated_to_cache", "Raw store write failures that are not caused by ErrFailedToWriteCache"),
ReadFailureUnrelatedToCache: scope.MustNewCounter("read_failure_unrelated_to_cache", "Raw store read failures that are not caused by ErrFailedToWriteCache"),
}
}

func newCopyImpl(store RawStore, metricsScope promutils.Scope) copyImpl {
func newCopyImpl(store RawStore, metrics *copyMetrics) copyImpl {
return copyImpl{
rawStore: store,
metrics: newCopyMetrics(metricsScope.NewSubScope("copy")),
metrics: metrics,
}
}
14 changes: 6 additions & 8 deletions storage/copy_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (

"github.com/flyteorg/flytestdlib/errors"

"github.com/flyteorg/flytestdlib/ioutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flytestdlib/ioutils"
)

type notSeekerReader struct {
Expand Down Expand Up @@ -44,7 +44,6 @@ func newNotSeekerReader(bytesCount int) *notSeekerReader {
}

func TestCopyRaw(t *testing.T) {
resetMetricKeys()
t.Run("Called", func(t *testing.T) {
readerCalled := false
writerCalled := false
Expand All @@ -59,7 +58,7 @@ func TestCopyRaw(t *testing.T) {
},
}

copier := newCopyImpl(&store, promutils.NewTestScope())
copier := newCopyImpl(&store, metrics.copyMetrics)
assert.NoError(t, copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{}))
assert.True(t, readerCalled)
assert.True(t, writerCalled)
Expand All @@ -79,15 +78,14 @@ func TestCopyRaw(t *testing.T) {
},
}

copier := newCopyImpl(&store, promutils.NewTestScope())
copier := newCopyImpl(&store, metrics.copyMetrics)
assert.NoError(t, copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{}))
assert.True(t, readerCalled)
assert.True(t, writerCalled)
})
}

func TestCopyRaw_CachingErrorHandling(t *testing.T) {
resetMetricKeys()
t.Run("CopyRaw with Caching Error", func(t *testing.T) {
readerCalled := false
writerCalled := false
Expand All @@ -108,7 +106,7 @@ func TestCopyRaw_CachingErrorHandling(t *testing.T) {
},
}

copier := newCopyImpl(&store, promutils.NewTestScope())
copier := newCopyImpl(&store, metrics.copyMetrics)
assert.NoError(t, copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{}))
assert.True(t, readerCalled)
assert.True(t, writerCalled)
Expand All @@ -134,7 +132,7 @@ func TestCopyRaw_CachingErrorHandling(t *testing.T) {
},
}

copier := newCopyImpl(&store, promutils.NewTestScope())
copier := newCopyImpl(&store, metrics.copyMetrics)
err = copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{})
assert.Error(t, err)
assert.True(t, readerCalled)
Expand Down
15 changes: 15 additions & 0 deletions storage/init_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package storage

import (
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
)

var metrics *dataStoreMetrics

func init() {
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey)
scope := promutils.NewTestScope()
metrics = newDataStoreMetrics(scope)
}
6 changes: 2 additions & 4 deletions storage/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"io"
"io/ioutil"
"os"

"github.com/flyteorg/flytestdlib/promutils"
)

type rawFile = []byte
Expand Down Expand Up @@ -70,11 +68,11 @@ func (s *InMemoryStore) CreateSignedURL(ctx context.Context, reference DataRefer
return SignedURLResponse{}, fmt.Errorf("unsupported")
}

func NewInMemoryRawStore(_ *Config, scope promutils.Scope) (RawStore, error) {
func NewInMemoryRawStore(_ *Config, metrics *dataStoreMetrics) (RawStore, error) {
self := &InMemoryStore{
cache: map[DataReference]rawFile{},
}

self.copyImpl = newCopyImpl(self, scope)
self.copyImpl = newCopyImpl(self, metrics.copyMetrics)
return self, nil
}
17 changes: 5 additions & 12 deletions storage/mem_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,20 @@ import (
"context"
"testing"

"github.com/flyteorg/flytestdlib/promutils"

"github.com/stretchr/testify/assert"
)

func TestInMemoryStore_Head(t *testing.T) {
t.Run("Empty store", func(t *testing.T) {
testScope := promutils.NewTestScope()
s, err := NewInMemoryRawStore(&Config{}, testScope)
s, err := NewInMemoryRawStore(&Config{}, metrics)
assert.NoError(t, err)
metadata, err := s.Head(context.TODO(), DataReference("hello"))
assert.NoError(t, err)
assert.False(t, metadata.Exists())
})

t.Run("Existing Item", func(t *testing.T) {
testScope := promutils.NewTestScope()
s, err := NewInMemoryRawStore(&Config{}, testScope)
s, err := NewInMemoryRawStore(&Config{}, metrics)
assert.NoError(t, err)
err = s.WriteRaw(context.TODO(), DataReference("hello"), 0, Options{}, bytes.NewReader([]byte{}))
assert.NoError(t, err)
Expand All @@ -35,8 +31,7 @@ func TestInMemoryStore_Head(t *testing.T) {

func TestInMemoryStore_ReadRaw(t *testing.T) {
t.Run("Empty store", func(t *testing.T) {
testScope := promutils.NewTestScope()
s, err := NewInMemoryRawStore(&Config{}, testScope)
s, err := NewInMemoryRawStore(&Config{}, metrics)
assert.NoError(t, err)

raw, err := s.ReadRaw(context.TODO(), DataReference("hello"))
Expand All @@ -45,8 +40,7 @@ func TestInMemoryStore_ReadRaw(t *testing.T) {
})

t.Run("Existing Item", func(t *testing.T) {
testScope := promutils.NewTestScope()
s, err := NewInMemoryRawStore(&Config{}, testScope)
s, err := NewInMemoryRawStore(&Config{}, metrics)
assert.NoError(t, err)

err = s.WriteRaw(context.TODO(), DataReference("hello"), 0, Options{}, bytes.NewReader([]byte{}))
Expand All @@ -58,8 +52,7 @@ func TestInMemoryStore_ReadRaw(t *testing.T) {
}

func TestInMemoryStore_Clear(t *testing.T) {
testScope := promutils.NewTestScope()
m, err := NewInMemoryRawStore(&Config{}, testScope)
m, err := NewInMemoryRawStore(&Config{}, metrics)
assert.NoError(t, err)

mStore := m.(*InMemoryStore)
Expand Down
Loading

0 comments on commit bcfa537

Please sign in to comment.