From b8f6b534c0147a438a647d513d7bfcf1fbff6522 Mon Sep 17 00:00:00 2001 From: iaroslav-ciupin Date: Fri, 12 Aug 2022 20:46:57 +0300 Subject: [PATCH 1/5] Extract out data store metrics for re-usability Signed-off-by: iaroslav-ciupin --- storage/cached_rawstore.go | 30 +++++++-------------- storage/copy_impl.go | 17 ++++-------- storage/mem_store.go | 9 ++++--- storage/protobuf_store.go | 22 +++++----------- storage/rawstores.go | 53 +++++++++++++++++++++++++++++++++++--- storage/stow_store.go | 34 ++++++------------------ 6 files changed, 82 insertions(+), 83 deletions(-) diff --git a/storage/cached_rawstore.go b/storage/cached_rawstore.go index f42f01fd..976df01d 100644 --- a/storage/cached_rawstore.go +++ b/storage/cached_rawstore.go @@ -3,19 +3,15 @@ package storage import ( "bytes" "context" + "github.com/flyteorg/flytestdlib/errors" "io" "runtime/debug" - "time" - - "github.com/flyteorg/flytestdlib/errors" - - "github.com/prometheus/client_golang/prometheus" - - "github.com/flyteorg/flytestdlib/promutils" "github.com/coocood/freecache" "github.com/flyteorg/flytestdlib/ioutils" "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/prometheus/client_golang/prometheus" ) const neverExpire = 0 @@ -31,7 +27,6 @@ type cacheMetrics struct { type cachedRawStore struct { RawStore cache *freecache.Cache - scope promutils.Scope metrics *cacheMetrics } @@ -103,23 +98,16 @@ func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference, } // Creates a CachedStore if Caching is enabled, otherwise returns a RawStore -func newCachedRawStore(cfg *Config, store RawStore, scope promutils.Scope) RawStore { +func newCachedRawStore(cfg *Config, store RawStore, metrics *DataStoreMetrics) RawStore { if cfg.Cache.MaxSizeMegabytes > 0 { - c := &cachedRawStore{ - RawStore: store, - cache: freecache.NewCache(cfg.Cache.MaxSizeMegabytes * 1024 * 1024), - scope: scope, - metrics: &cacheMetrics{ - FetchLatency: scope.MustNewStopWatch("remote_fetch", "Total Time to read from remote metastore", time.Millisecond), - CacheHit: scope.MustNewCounter("cache_hit", "Number of times metadata was found in cache"), - CacheMiss: scope.MustNewCounter("cache_miss", "Number of times metadata was not found in cache and remote fetch was required"), - CacheWriteError: scope.MustNewCounter("cache_write_err", "Failed to write to cache"), - }, - } if cfg.Cache.TargetGCPercent > 0 { debug.SetGCPercent(cfg.Cache.TargetGCPercent) } - return c + return &cachedRawStore{ + RawStore: store, + cache: freecache.NewCache(cfg.Cache.MaxSizeMegabytes * 1024 * 1024), + metrics: metrics.cacheMetrics, + } } return store } diff --git a/storage/copy_impl.go b/storage/copy_impl.go index 3b32905e..e448989e 100644 --- a/storage/copy_impl.go +++ b/storage/copy_impl.go @@ -6,18 +6,18 @@ import ( "io" "time" - "github.com/flyteorg/flytestdlib/logger" + errs "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/flyteorg/flytestdlib/ioutils" + "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" - errs "github.com/pkg/errors" ) type copyImpl struct { rawStore RawStore - metrics copyMetrics + metrics *copyMetrics } type copyMetrics struct { @@ -64,18 +64,11 @@ func (c copyImpl) CopyRaw(ctx context.Context, source, destination DataReference return nil } -func newCopyMetrics(scope promutils.Scope) copyMetrics { - return copyMetrics{ +func newCopyMetrics(scope promutils.Scope) *copyMetrics { + return ©Metrics{ CopyLatency: labeled.NewStopWatch("overall", "Overall copy latency", time.Millisecond, scope, labeled.EmitUnlabeledMetric), ComputeLengthLatency: labeled.NewStopWatch("length", "Latency involved in computing length of content before writing.", time.Millisecond, scope, labeled.EmitUnlabeledMetric), WriteFailureUnrelatedToCache: scope.MustNewCounter("write_failure_unrelated_to_cache", "Raw store write failures that are not caused by ErrFailedToWriteCache"), ReadFailureUnrelatedToCache: scope.MustNewCounter("read_failure_unrelated_to_cache", "Raw store read failures that are not caused by ErrFailedToWriteCache"), } } - -func newCopyImpl(store RawStore, metricsScope promutils.Scope) copyImpl { - return copyImpl{ - rawStore: store, - metrics: newCopyMetrics(metricsScope.NewSubScope("copy")), - } -} diff --git a/storage/mem_store.go b/storage/mem_store.go index 39e8fe61..a99e4caf 100644 --- a/storage/mem_store.go +++ b/storage/mem_store.go @@ -7,8 +7,6 @@ import ( "io" "io/ioutil" "os" - - "github.com/flyteorg/flytestdlib/promutils" ) type rawFile = []byte @@ -70,11 +68,14 @@ func (s *InMemoryStore) CreateSignedURL(ctx context.Context, reference DataRefer return SignedURLResponse{}, fmt.Errorf("unsupported") } -func NewInMemoryRawStore(_ *Config, scope promutils.Scope) (RawStore, error) { +func NewInMemoryRawStore(_ *Config, metrics *DataStoreMetrics) (RawStore, error) { self := &InMemoryStore{ cache: map[DataReference]rawFile{}, } - self.copyImpl = newCopyImpl(self, scope) + self.copyImpl = copyImpl{ + rawStore: self, + metrics: metrics.copyMetrics, + } return self, nil } diff --git a/storage/protobuf_store.go b/storage/protobuf_store.go index 58b55063..5f9dffb3 100644 --- a/storage/protobuf_store.go +++ b/storage/protobuf_store.go @@ -4,16 +4,14 @@ import ( "bytes" "context" "fmt" - "time" - "github.com/flyteorg/flytestdlib/logger" + "github.com/golang/protobuf/proto" + errs "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/flyteorg/flytestdlib/ioutils" + "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" - "github.com/prometheus/client_golang/prometheus" - - "github.com/golang/protobuf/proto" - errs "github.com/pkg/errors" ) type protoMetrics struct { @@ -81,17 +79,9 @@ func (s DefaultProtobufStore) WriteProtobuf(ctx context.Context, reference DataR return nil } -func NewDefaultProtobufStore(store RawStore, metricsScope promutils.Scope) DefaultProtobufStore { +func NewDefaultProtobufStore(store RawStore, metrics *DataStoreMetrics) DefaultProtobufStore { return DefaultProtobufStore{ RawStore: store, - metrics: &protoMetrics{ - FetchLatency: metricsScope.MustNewStopWatch("proto_fetch", "Time to read data before unmarshalling", time.Millisecond), - MarshalTime: metricsScope.MustNewStopWatch("marshal", "Time incurred in marshalling data before writing", time.Millisecond), - UnmarshalTime: metricsScope.MustNewStopWatch("unmarshal", "Time incurred in unmarshalling received data", time.Millisecond), - MarshalFailure: metricsScope.MustNewCounter("marshal_failure", "Failures when marshalling"), - UnmarshalFailure: metricsScope.MustNewCounter("unmarshal_failure", "Failures when unmarshalling"), - WriteFailureUnrelatedToCache: metricsScope.MustNewCounter("write_failure_unrelated_to_cache", "Raw store write failures that are not caused by ErrFailedToWriteCache"), - ReadFailureUnrelatedToCache: metricsScope.MustNewCounter("read_failure_unrelated_to_cache", "Raw store read failures that are not caused by ErrFailedToWriteCache"), - }, + metrics: metrics.protoMetrics, } } diff --git a/storage/rawstores.go b/storage/rawstores.go index 40624b1a..705a3c1d 100644 --- a/storage/rawstores.go +++ b/storage/rawstores.go @@ -3,11 +3,13 @@ package storage import ( "fmt" "net/http" + "time" "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" ) -type dataStoreCreateFn func(cfg *Config, metricsScope promutils.Scope) (RawStore, error) +type dataStoreCreateFn func(cfg *Config, metrics *DataStoreMetrics) (RawStore, error) var stores = map[string]dataStoreCreateFn{ TypeMemory: NewInMemoryRawStore, @@ -54,8 +56,51 @@ func createHTTPClient(cfg HTTPClientConfig) *http.Client { return c } +type DataStoreMetrics struct { + cacheMetrics *cacheMetrics + protoMetrics *protoMetrics + copyMetrics *copyMetrics + stowMetrics *stowMetrics +} + +// NewDataStoreMetrics initialises all metrics required for DataStore +func NewDataStoreMetrics(scope promutils.Scope) *DataStoreMetrics { + failureTypeOption := labeled.AdditionalLabelsOption{Labels: []string{FailureTypeLabel.String()}} + return &DataStoreMetrics{ + cacheMetrics: &cacheMetrics{ + FetchLatency: scope.MustNewStopWatch("remote_fetch", "Total Time to read from remote metastore", time.Millisecond), + CacheHit: scope.MustNewCounter("cache_hit", "Number of times metadata was found in cache"), + CacheMiss: scope.MustNewCounter("cache_miss", "Number of times metadata was not found in cache and remote fetch was required"), + CacheWriteError: scope.MustNewCounter("cache_write_err", "Failed to write to cache"), + }, + protoMetrics: &protoMetrics{ + FetchLatency: scope.MustNewStopWatch("proto_fetch", "Time to read data before unmarshalling", time.Millisecond), + MarshalTime: scope.MustNewStopWatch("marshal", "Time incurred in marshalling data before writing", time.Millisecond), + UnmarshalTime: scope.MustNewStopWatch("unmarshal", "Time incurred in unmarshalling received data", time.Millisecond), + MarshalFailure: scope.MustNewCounter("marshal_failure", "Failures when marshalling"), + UnmarshalFailure: scope.MustNewCounter("unmarshal_failure", "Failures when unmarshalling"), + WriteFailureUnrelatedToCache: scope.MustNewCounter("write_failure_unrelated_to_cache", "Raw store write failures that are not caused by ErrFailedToWriteCache"), + ReadFailureUnrelatedToCache: scope.MustNewCounter("read_failure_unrelated_to_cache", "Raw store read failures that are not caused by ErrFailedToWriteCache"), + }, + copyMetrics: newCopyMetrics(scope.NewSubScope("copy")), + stowMetrics: &stowMetrics{ + BadReference: labeled.NewCounter("bad_key", "Indicates the provided storage reference/key is incorrectly formatted", scope, labeled.EmitUnlabeledMetric), + BadContainer: labeled.NewCounter("bad_container", "Indicates request for a container that has not been initialized", scope, labeled.EmitUnlabeledMetric), + + HeadFailure: labeled.NewCounter("head_failure", "Indicates failure in HEAD for a given reference", scope, labeled.EmitUnlabeledMetric), + HeadLatency: labeled.NewStopWatch("head", "Indicates time to fetch metadata using the Head API", time.Millisecond, scope, labeled.EmitUnlabeledMetric), + + ReadFailure: labeled.NewCounter("read_failure", "Indicates failure in GET for a given reference", scope, labeled.EmitUnlabeledMetric, failureTypeOption), + ReadOpenLatency: labeled.NewStopWatch("read_open", "Indicates time to first byte when reading", time.Millisecond, scope, labeled.EmitUnlabeledMetric), + + WriteFailure: labeled.NewCounter("write_failure", "Indicates failure in storing/PUT for a given reference", scope, labeled.EmitUnlabeledMetric, failureTypeOption), + WriteLatency: labeled.NewStopWatch("write", "Time to write an object irrespective of size", time.Millisecond, scope, labeled.EmitUnlabeledMetric), + }, + } +} + // NewDataStore creates a new Data Store with the supplied config. -func NewDataStore(cfg *Config, metricsScope promutils.Scope) (s *DataStore, err error) { +func NewDataStore(cfg *Config, metrics *DataStoreMetrics) (s *DataStore, err error) { defaultClient := http.DefaultClient defer func() { http.DefaultClient = defaultClient @@ -65,12 +110,12 @@ func NewDataStore(cfg *Config, metricsScope promutils.Scope) (s *DataStore, err var rawStore RawStore if fn, found := stores[cfg.Type]; found { - rawStore, err = fn(cfg, metricsScope) + rawStore, err = fn(cfg, metrics) if err != nil { return &emptyStore, err } - protoStore := NewDefaultProtobufStore(newCachedRawStore(cfg, rawStore, metricsScope), metricsScope) + protoStore := NewDefaultProtobufStore(newCachedRawStore(cfg, rawStore, metrics), metrics) return NewCompositeDataStore(NewURLPathConstructor(), protoStore), nil } diff --git a/storage/stow_store.go b/storage/stow_store.go index 67933906..138800a4 100644 --- a/storage/stow_store.go +++ b/storage/stow_store.go @@ -7,27 +7,22 @@ import ( "net/url" "strconv" "sync" - "time" - - "github.com/flyteorg/flytestdlib/errors" "github.com/aws/aws-sdk-go/aws/awserr" s32 "github.com/aws/aws-sdk-go/service/s3" + "github.com/flyteorg/stow" "github.com/flyteorg/stow/azure" "github.com/flyteorg/stow/google" "github.com/flyteorg/stow/local" "github.com/flyteorg/stow/oracle" "github.com/flyteorg/stow/s3" "github.com/flyteorg/stow/swift" + errs "github.com/pkg/errors" "github.com/flyteorg/flytestdlib/contextutils" + "github.com/flyteorg/flytestdlib/errors" "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils/labeled" - - "github.com/flyteorg/flytestdlib/promutils" - - "github.com/flyteorg/stow" - errs "github.com/pkg/errors" ) const ( @@ -345,30 +340,17 @@ func (s *StowStore) getLocation(id locationID) stow.Location { } } -func NewStowRawStore(baseContainerFQN DataReference, loc, signedURLLoc stow.Location, enableDynamicContainerLoading bool, metricsScope promutils.Scope) (*StowStore, error) { - failureTypeOption := labeled.AdditionalLabelsOption{Labels: []string{FailureTypeLabel.String()}} +func NewStowRawStore(baseContainerFQN DataReference, loc, signedURLLoc stow.Location, enableDynamicContainerLoading bool, metrics *DataStoreMetrics) (*StowStore, error) { self := &StowStore{ loc: loc, signedURLLoc: signedURLLoc, baseContainerFQN: baseContainerFQN, enableDynamicContainerLoading: enableDynamicContainerLoading, dynamicContainerMap: sync.Map{}, - metrics: &stowMetrics{ - BadReference: labeled.NewCounter("bad_key", "Indicates the provided storage reference/key is incorrectly formatted", metricsScope, labeled.EmitUnlabeledMetric), - BadContainer: labeled.NewCounter("bad_container", "Indicates request for a container that has not been initialized", metricsScope, labeled.EmitUnlabeledMetric), - - HeadFailure: labeled.NewCounter("head_failure", "Indicates failure in HEAD for a given reference", metricsScope, labeled.EmitUnlabeledMetric), - HeadLatency: labeled.NewStopWatch("head", "Indicates time to fetch metadata using the Head API", time.Millisecond, metricsScope, labeled.EmitUnlabeledMetric), - - ReadFailure: labeled.NewCounter("read_failure", "Indicates failure in GET for a given reference", metricsScope, labeled.EmitUnlabeledMetric, failureTypeOption), - ReadOpenLatency: labeled.NewStopWatch("read_open", "Indicates time to first byte when reading", time.Millisecond, metricsScope, labeled.EmitUnlabeledMetric), - - WriteFailure: labeled.NewCounter("write_failure", "Indicates failure in storing/PUT for a given reference", metricsScope, labeled.EmitUnlabeledMetric, failureTypeOption), - WriteLatency: labeled.NewStopWatch("write", "Time to write an object irrespective of size", time.Millisecond, metricsScope, labeled.EmitUnlabeledMetric), - }, + metrics: metrics.stowMetrics, } - self.copyImpl = newCopyImpl(self, metricsScope) + self.copyImpl = copyImpl{rawStore: self, metrics: metrics.copyMetrics} _, c, _, err := baseContainerFQN.Split() if err != nil { return nil, err @@ -382,7 +364,7 @@ func NewStowRawStore(baseContainerFQN DataReference, loc, signedURLLoc stow.Loca } // Constructor for the StowRawStore -func newStowRawStore(cfg *Config, metricsScope promutils.Scope) (RawStore, error) { +func newStowRawStore(cfg *Config, metrics *DataStoreMetrics) (RawStore, error) { if cfg.InitContainer == "" { return nil, fmt.Errorf("initContainer is required even with `enable-multicontainer`") } @@ -419,7 +401,7 @@ func newStowRawStore(cfg *Config, metricsScope promutils.Scope) (RawStore, error } } - return NewStowRawStore(fn(cfg.InitContainer), loc, signedURLLoc, cfg.MultiContainerEnabled, metricsScope) + return NewStowRawStore(fn(cfg.InitContainer), loc, signedURLLoc, cfg.MultiContainerEnabled, metrics) } func legacyS3ConfigMap(cfg ConnectionConfig) stow.ConfigMap { From 0a019f97a42b37e0f29dfb22738d2d814fa234b6 Mon Sep 17 00:00:00 2001 From: iaroslav-ciupin Date: Sat, 13 Aug 2022 09:39:01 +0300 Subject: [PATCH 2/5] adjust constructor Signed-off-by: iaroslav-ciupin --- storage/cached_rawstore.go | 10 +++-- storage/cached_rawstore_test.go | 35 +++++------------- storage/copy_impl.go | 7 ++++ storage/copy_impl_test.go | 14 +++---- storage/init_test.go | 15 ++++++++ storage/mem_store.go | 7 +--- storage/mem_store_test.go | 17 +++------ storage/protobuf_store.go | 4 +- storage/protobuf_store_test.go | 3 +- storage/rawstores.go | 54 +++++++++++++++------------ storage/storage.go | 1 + storage/stow_store.go | 6 +-- storage/stow_store_test.go | 65 +++++++++++---------------------- 13 files changed, 110 insertions(+), 128 deletions(-) create mode 100644 storage/init_test.go diff --git a/storage/cached_rawstore.go b/storage/cached_rawstore.go index 976df01d..a7dd4229 100644 --- a/storage/cached_rawstore.go +++ b/storage/cached_rawstore.go @@ -3,15 +3,17 @@ package storage import ( "bytes" "context" - "github.com/flyteorg/flytestdlib/errors" "io" "runtime/debug" + "github.com/flyteorg/flytestdlib/errors" + "github.com/coocood/freecache" + "github.com/prometheus/client_golang/prometheus" + "github.com/flyteorg/flytestdlib/ioutils" "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" - "github.com/prometheus/client_golang/prometheus" ) const neverExpire = 0 @@ -98,7 +100,7 @@ func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference, } // Creates a CachedStore if Caching is enabled, otherwise returns a RawStore -func newCachedRawStore(cfg *Config, store RawStore, metrics *DataStoreMetrics) RawStore { +func newCachedRawStore(cfg *Config, store RawStore, metrics *cacheMetrics) RawStore { if cfg.Cache.MaxSizeMegabytes > 0 { if cfg.Cache.TargetGCPercent > 0 { debug.SetGCPercent(cfg.Cache.TargetGCPercent) @@ -106,7 +108,7 @@ func newCachedRawStore(cfg *Config, store RawStore, metrics *DataStoreMetrics) R return &cachedRawStore{ RawStore: store, cache: freecache.NewCache(cfg.Cache.MaxSizeMegabytes * 1024 * 1024), - metrics: metrics.cacheMetrics, + metrics: metrics, } } return store diff --git a/storage/cached_rawstore_test.go b/storage/cached_rawstore_test.go index 28372525..50a03cba 100644 --- a/storage/cached_rawstore_test.go +++ b/storage/cached_rawstore_test.go @@ -10,57 +10,44 @@ import ( "runtime/debug" "testing" - "github.com/flyteorg/flytestdlib/contextutils" - "github.com/flyteorg/flytestdlib/promutils/labeled" - - "github.com/flyteorg/flytestdlib/promutils" + "github.com/stretchr/testify/assert" "github.com/flyteorg/flytestdlib/ioutils" - "github.com/stretchr/testify/assert" ) func TestNewCachedStore(t *testing.T) { - resetMetricKeys() - t.Run("CachingDisabled", func(t *testing.T) { - testScope := promutils.NewTestScope() cfg := &Config{} - assert.Nil(t, newCachedRawStore(cfg, nil, testScope)) - store, err := NewInMemoryRawStore(cfg, testScope) + assert.Nil(t, newCachedRawStore(cfg, nil, metrics.cacheMetrics)) + store, err := NewInMemoryRawStore(cfg, metrics) assert.NoError(t, err) - assert.Equal(t, store, newCachedRawStore(cfg, store, testScope)) + assert.Equal(t, store, newCachedRawStore(cfg, store, metrics.cacheMetrics)) }) t.Run("CachingEnabled", func(t *testing.T) { - testScope := promutils.NewTestScope() cfg := &Config{ Cache: CachingConfig{ MaxSizeMegabytes: 1, TargetGCPercent: 20, }, } - store, err := NewInMemoryRawStore(cfg, testScope) + store, err := NewInMemoryRawStore(cfg, metrics) assert.NoError(t, err) - cStore := newCachedRawStore(cfg, store, testScope) + cStore := newCachedRawStore(cfg, store, metrics.cacheMetrics) assert.Equal(t, 20, debug.SetGCPercent(100)) assert.NotNil(t, cStore) assert.NotNil(t, cStore.(*cachedRawStore).cache) }) } -func resetMetricKeys() { - labeled.UnsetMetricKeys() - labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) -} - -func dummyCacheStore(t *testing.T, store RawStore, scope promutils.Scope) *cachedRawStore { +func dummyCacheStore(t *testing.T, store RawStore, metrics *cacheMetrics) *cachedRawStore { cfg := &Config{ Cache: CachingConfig{ MaxSizeMegabytes: 1, TargetGCPercent: 20, }, } - cStore := newCachedRawStore(cfg, store, scope) + cStore := newCachedRawStore(cfg, store, metrics) assert.NotNil(t, cStore) return cStore.(*cachedRawStore) } @@ -94,7 +81,6 @@ func (d *dummyStore) WriteRaw(ctx context.Context, reference DataReference, size } func TestCachedRawStore(t *testing.T) { - resetMetricKeys() ctx := context.TODO() k1 := DataReference("k1") k2 := DataReference("k2") @@ -145,11 +131,10 @@ func TestCachedRawStore(t *testing.T) { return nil, fmt.Errorf("err") }, } - testScope := promutils.NewTestScope() - store.copyImpl = newCopyImpl(store, testScope.NewSubScope("copy")) + store.copyImpl = newCopyImpl(store, metrics.copyMetrics) - cStore := dummyCacheStore(t, store, testScope.NewSubScope("x")) + cStore := dummyCacheStore(t, store, metrics.cacheMetrics) t.Run("HeadExists", func(t *testing.T) { m, err := cStore.Head(ctx, k1) diff --git a/storage/copy_impl.go b/storage/copy_impl.go index e448989e..5b0bc325 100644 --- a/storage/copy_impl.go +++ b/storage/copy_impl.go @@ -72,3 +72,10 @@ func newCopyMetrics(scope promutils.Scope) *copyMetrics { ReadFailureUnrelatedToCache: scope.MustNewCounter("read_failure_unrelated_to_cache", "Raw store read failures that are not caused by ErrFailedToWriteCache"), } } + +func newCopyImpl(store RawStore, metrics *copyMetrics) copyImpl { + return copyImpl{ + rawStore: store, + metrics: metrics, + } +} diff --git a/storage/copy_impl_test.go b/storage/copy_impl_test.go index 45043276..a31770fc 100644 --- a/storage/copy_impl_test.go +++ b/storage/copy_impl_test.go @@ -9,9 +9,9 @@ import ( "github.com/flyteorg/flytestdlib/errors" - "github.com/flyteorg/flytestdlib/ioutils" - "github.com/flyteorg/flytestdlib/promutils" "github.com/stretchr/testify/assert" + + "github.com/flyteorg/flytestdlib/ioutils" ) type notSeekerReader struct { @@ -44,7 +44,6 @@ func newNotSeekerReader(bytesCount int) *notSeekerReader { } func TestCopyRaw(t *testing.T) { - resetMetricKeys() t.Run("Called", func(t *testing.T) { readerCalled := false writerCalled := false @@ -59,7 +58,7 @@ func TestCopyRaw(t *testing.T) { }, } - copier := newCopyImpl(&store, promutils.NewTestScope()) + copier := newCopyImpl(&store, metrics.copyMetrics) assert.NoError(t, copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{})) assert.True(t, readerCalled) assert.True(t, writerCalled) @@ -79,7 +78,7 @@ func TestCopyRaw(t *testing.T) { }, } - copier := newCopyImpl(&store, promutils.NewTestScope()) + copier := newCopyImpl(&store, metrics.copyMetrics) assert.NoError(t, copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{})) assert.True(t, readerCalled) assert.True(t, writerCalled) @@ -87,7 +86,6 @@ func TestCopyRaw(t *testing.T) { } func TestCopyRaw_CachingErrorHandling(t *testing.T) { - resetMetricKeys() t.Run("CopyRaw with Caching Error", func(t *testing.T) { readerCalled := false writerCalled := false @@ -108,7 +106,7 @@ func TestCopyRaw_CachingErrorHandling(t *testing.T) { }, } - copier := newCopyImpl(&store, promutils.NewTestScope()) + copier := newCopyImpl(&store, metrics.copyMetrics) assert.NoError(t, copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{})) assert.True(t, readerCalled) assert.True(t, writerCalled) @@ -134,7 +132,7 @@ func TestCopyRaw_CachingErrorHandling(t *testing.T) { }, } - copier := newCopyImpl(&store, promutils.NewTestScope()) + copier := newCopyImpl(&store, metrics.copyMetrics) err = copier.CopyRaw(context.Background(), DataReference("source.pb"), DataReference("dest.pb"), Options{}) assert.Error(t, err) assert.True(t, readerCalled) diff --git a/storage/init_test.go b/storage/init_test.go new file mode 100644 index 00000000..b7a4e75a --- /dev/null +++ b/storage/init_test.go @@ -0,0 +1,15 @@ +package storage + +import ( + "github.com/flyteorg/flytestdlib/contextutils" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" +) + +var metrics *dataStoreMetrics + +func init() { + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) + scope := promutils.NewTestScope() + metrics = newDataStoreMetrics(scope) +} diff --git a/storage/mem_store.go b/storage/mem_store.go index a99e4caf..956d8263 100644 --- a/storage/mem_store.go +++ b/storage/mem_store.go @@ -68,14 +68,11 @@ func (s *InMemoryStore) CreateSignedURL(ctx context.Context, reference DataRefer return SignedURLResponse{}, fmt.Errorf("unsupported") } -func NewInMemoryRawStore(_ *Config, metrics *DataStoreMetrics) (RawStore, error) { +func NewInMemoryRawStore(_ *Config, metrics *dataStoreMetrics) (RawStore, error) { self := &InMemoryStore{ cache: map[DataReference]rawFile{}, } - self.copyImpl = copyImpl{ - rawStore: self, - metrics: metrics.copyMetrics, - } + self.copyImpl = newCopyImpl(self, metrics.copyMetrics) return self, nil } diff --git a/storage/mem_store_test.go b/storage/mem_store_test.go index ad3bb84b..131136a8 100644 --- a/storage/mem_store_test.go +++ b/storage/mem_store_test.go @@ -5,15 +5,12 @@ import ( "context" "testing" - "github.com/flyteorg/flytestdlib/promutils" - "github.com/stretchr/testify/assert" ) func TestInMemoryStore_Head(t *testing.T) { t.Run("Empty store", func(t *testing.T) { - testScope := promutils.NewTestScope() - s, err := NewInMemoryRawStore(&Config{}, testScope) + s, err := NewInMemoryRawStore(&Config{}, metrics) assert.NoError(t, err) metadata, err := s.Head(context.TODO(), DataReference("hello")) assert.NoError(t, err) @@ -21,8 +18,7 @@ func TestInMemoryStore_Head(t *testing.T) { }) t.Run("Existing Item", func(t *testing.T) { - testScope := promutils.NewTestScope() - s, err := NewInMemoryRawStore(&Config{}, testScope) + s, err := NewInMemoryRawStore(&Config{}, metrics) assert.NoError(t, err) err = s.WriteRaw(context.TODO(), DataReference("hello"), 0, Options{}, bytes.NewReader([]byte{})) assert.NoError(t, err) @@ -35,8 +31,7 @@ func TestInMemoryStore_Head(t *testing.T) { func TestInMemoryStore_ReadRaw(t *testing.T) { t.Run("Empty store", func(t *testing.T) { - testScope := promutils.NewTestScope() - s, err := NewInMemoryRawStore(&Config{}, testScope) + s, err := NewInMemoryRawStore(&Config{}, metrics) assert.NoError(t, err) raw, err := s.ReadRaw(context.TODO(), DataReference("hello")) @@ -45,8 +40,7 @@ func TestInMemoryStore_ReadRaw(t *testing.T) { }) t.Run("Existing Item", func(t *testing.T) { - testScope := promutils.NewTestScope() - s, err := NewInMemoryRawStore(&Config{}, testScope) + s, err := NewInMemoryRawStore(&Config{}, metrics) assert.NoError(t, err) err = s.WriteRaw(context.TODO(), DataReference("hello"), 0, Options{}, bytes.NewReader([]byte{})) @@ -58,8 +52,7 @@ func TestInMemoryStore_ReadRaw(t *testing.T) { } func TestInMemoryStore_Clear(t *testing.T) { - testScope := promutils.NewTestScope() - m, err := NewInMemoryRawStore(&Config{}, testScope) + m, err := NewInMemoryRawStore(&Config{}, metrics) assert.NoError(t, err) mStore := m.(*InMemoryStore) diff --git a/storage/protobuf_store.go b/storage/protobuf_store.go index 5f9dffb3..ac3b56db 100644 --- a/storage/protobuf_store.go +++ b/storage/protobuf_store.go @@ -79,9 +79,9 @@ func (s DefaultProtobufStore) WriteProtobuf(ctx context.Context, reference DataR return nil } -func NewDefaultProtobufStore(store RawStore, metrics *DataStoreMetrics) DefaultProtobufStore { +func NewDefaultProtobufStore(store RawStore, metrics *protoMetrics) DefaultProtobufStore { return DefaultProtobufStore{ RawStore: store, - metrics: metrics.protoMetrics, + metrics: metrics, } } diff --git a/storage/protobuf_store_test.go b/storage/protobuf_store_test.go index 0f2df567..7dff69e8 100644 --- a/storage/protobuf_store_test.go +++ b/storage/protobuf_store_test.go @@ -107,8 +107,7 @@ func TestDefaultProtobufStore_HardErrors(t *testing.T) { return nil, fmt.Errorf(dummyReadErrorMsg) }, } - testScope := promutils.NewTestScope() - pbErroneousStore := NewDefaultProtobufStore(store, testScope) + pbErroneousStore := NewDefaultProtobufStore(store, metrics.protoMetrics) t.Run("Test if hard write errors are handled correctly", func(t *testing.T) { err := pbErroneousStore.WriteProtobuf(ctx, k1, Options{}, &mockProtoMessage{X: 5}) assert.False(t, IsFailedWriteToCache(err)) diff --git a/storage/rawstores.go b/storage/rawstores.go index 705a3c1d..a075fca0 100644 --- a/storage/rawstores.go +++ b/storage/rawstores.go @@ -9,7 +9,7 @@ import ( "github.com/flyteorg/flytestdlib/promutils/labeled" ) -type dataStoreCreateFn func(cfg *Config, metrics *DataStoreMetrics) (RawStore, error) +type dataStoreCreateFn func(cfg *Config, metrics *dataStoreMetrics) (RawStore, error) var stores = map[string]dataStoreCreateFn{ TypeMemory: NewInMemoryRawStore, @@ -56,17 +56,17 @@ func createHTTPClient(cfg HTTPClientConfig) *http.Client { return c } -type DataStoreMetrics struct { +type dataStoreMetrics struct { cacheMetrics *cacheMetrics protoMetrics *protoMetrics copyMetrics *copyMetrics stowMetrics *stowMetrics } -// NewDataStoreMetrics initialises all metrics required for DataStore -func NewDataStoreMetrics(scope promutils.Scope) *DataStoreMetrics { +// newDataStoreMetrics initialises all metrics required for DataStore +func newDataStoreMetrics(scope promutils.Scope) *dataStoreMetrics { failureTypeOption := labeled.AdditionalLabelsOption{Labels: []string{FailureTypeLabel.String()}} - return &DataStoreMetrics{ + return &dataStoreMetrics{ cacheMetrics: &cacheMetrics{ FetchLatency: scope.MustNewStopWatch("remote_fetch", "Total Time to read from remote metastore", time.Millisecond), CacheHit: scope.MustNewCounter("cache_hit", "Number of times metadata was found in cache"), @@ -100,7 +100,20 @@ func NewDataStoreMetrics(scope promutils.Scope) *DataStoreMetrics { } // NewDataStore creates a new Data Store with the supplied config. -func NewDataStore(cfg *Config, metrics *DataStoreMetrics) (s *DataStore, err error) { +func NewDataStore(cfg *Config, scope promutils.Scope) (s *DataStore, err error) { + ds := &DataStore{metrics: newDataStoreMetrics(scope)} + return ds, ds.RefreshConfig(cfg) +} + +// NewCompositeDataStore composes a new DataStore. +func NewCompositeDataStore(refConstructor ReferenceConstructor, composedProtobufStore ComposedProtobufStore) *DataStore { + return &DataStore{ + ReferenceConstructor: refConstructor, + ComposedProtobufStore: composedProtobufStore, + } +} + +func (ds *DataStore) RefreshConfig(cfg *Config) error { defaultClient := http.DefaultClient defer func() { http.DefaultClient = defaultClient @@ -108,24 +121,19 @@ func NewDataStore(cfg *Config, metrics *DataStoreMetrics) (s *DataStore, err err http.DefaultClient = createHTTPClient(cfg.DefaultHTTPClient) - var rawStore RawStore - if fn, found := stores[cfg.Type]; found { - rawStore, err = fn(cfg, metrics) - if err != nil { - return &emptyStore, err - } - - protoStore := NewDefaultProtobufStore(newCachedRawStore(cfg, rawStore, metrics), metrics) - return NewCompositeDataStore(NewURLPathConstructor(), protoStore), nil + fn, found := stores[cfg.Type] + if !found { + return fmt.Errorf("type is of an invalid value [%v]", cfg.Type) } - return &emptyStore, fmt.Errorf("type is of an invalid value [%v]", cfg.Type) -} - -// NewCompositeDataStore composes a new DataStore. -func NewCompositeDataStore(refConstructor ReferenceConstructor, composedProtobufStore ComposedProtobufStore) *DataStore { - return &DataStore{ - ReferenceConstructor: refConstructor, - ComposedProtobufStore: composedProtobufStore, + rawStore, err := fn(cfg, ds.metrics) + if err != nil { + return err } + + rawStore = newCachedRawStore(cfg, rawStore, ds.metrics.cacheMetrics) + protoStore := NewDefaultProtobufStore(rawStore, ds.metrics.protoMetrics) + newDS := NewCompositeDataStore(NewURLPathConstructor(), protoStore) + *ds = *newDS + return nil } diff --git a/storage/storage.go b/storage/storage.go index 86c44277..d6ccf080 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -41,6 +41,7 @@ type Metadata interface { type DataStore struct { ComposedProtobufStore ReferenceConstructor + metrics *dataStoreMetrics } // SignedURLProperties encapsulates properties about the signedURL operation. diff --git a/storage/stow_store.go b/storage/stow_store.go index 138800a4..99007542 100644 --- a/storage/stow_store.go +++ b/storage/stow_store.go @@ -340,7 +340,7 @@ func (s *StowStore) getLocation(id locationID) stow.Location { } } -func NewStowRawStore(baseContainerFQN DataReference, loc, signedURLLoc stow.Location, enableDynamicContainerLoading bool, metrics *DataStoreMetrics) (*StowStore, error) { +func NewStowRawStore(baseContainerFQN DataReference, loc, signedURLLoc stow.Location, enableDynamicContainerLoading bool, metrics *dataStoreMetrics) (*StowStore, error) { self := &StowStore{ loc: loc, signedURLLoc: signedURLLoc, @@ -350,7 +350,7 @@ func NewStowRawStore(baseContainerFQN DataReference, loc, signedURLLoc stow.Loca metrics: metrics.stowMetrics, } - self.copyImpl = copyImpl{rawStore: self, metrics: metrics.copyMetrics} + self.copyImpl = newCopyImpl(self, metrics.copyMetrics) _, c, _, err := baseContainerFQN.Split() if err != nil { return nil, err @@ -364,7 +364,7 @@ func NewStowRawStore(baseContainerFQN DataReference, loc, signedURLLoc stow.Loca } // Constructor for the StowRawStore -func newStowRawStore(cfg *Config, metrics *DataStoreMetrics) (RawStore, error) { +func newStowRawStore(cfg *Config, metrics *dataStoreMetrics) (RawStore, error) { if cfg.InitContainer == "" { return nil, fmt.Errorf("initContainer is required even with `enable-multicontainer`") } diff --git a/storage/stow_store_test.go b/storage/stow_store_test.go index 4a446f74..15f12707 100644 --- a/storage/stow_store_test.go +++ b/storage/stow_store_test.go @@ -15,7 +15,7 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" s32 "github.com/aws/aws-sdk-go/service/s3" - + "github.com/flyteorg/stow" "github.com/flyteorg/stow/azure" "github.com/flyteorg/stow/google" "github.com/flyteorg/stow/local" @@ -23,14 +23,11 @@ import ( "github.com/flyteorg/stow/s3" "github.com/flyteorg/stow/swift" "github.com/pkg/errors" - - "github.com/flyteorg/stow" "github.com/stretchr/testify/assert" "github.com/flyteorg/flytestdlib/config" "github.com/flyteorg/flytestdlib/contextutils" "github.com/flyteorg/flytestdlib/internal/utils" - "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" ) @@ -154,11 +151,8 @@ func TestAwsBucketIsNotFound(t *testing.T) { } func TestStowStore_CreateSignedURL(t *testing.T) { - labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) - const container = "container" t.Run("Happy Path", func(t *testing.T) { - testScope := promutils.NewTestScope() fn := fQNFn["s3"] s, err := NewStowRawStore(fn(container), &mockStowLoc{ ContainerCb: func(id string) (stow.Container, error) { @@ -173,7 +167,7 @@ func TestStowStore_CreateSignedURL(t *testing.T) { } return nil, fmt.Errorf("container is not supported") }, - }, nil, false, testScope) + }, nil, false, metrics) assert.NoError(t, err) actual, err := s.CreateSignedURL(context.TODO(), DataReference("https://container/path"), SignedURLProperties{}) @@ -182,7 +176,6 @@ func TestStowStore_CreateSignedURL(t *testing.T) { }) t.Run("Invalid URL", func(t *testing.T) { - testScope := promutils.NewTestScope() fn := fQNFn["s3"] s, err := NewStowRawStore(fn(container), &mockStowLoc{ ContainerCb: func(id string) (stow.Container, error) { @@ -197,7 +190,7 @@ func TestStowStore_CreateSignedURL(t *testing.T) { } return nil, fmt.Errorf("container is not supported") }, - }, nil, false, testScope) + }, nil, false, metrics) assert.NoError(t, err) _, err = s.CreateSignedURL(context.TODO(), DataReference("://container/path"), SignedURLProperties{}) @@ -205,7 +198,6 @@ func TestStowStore_CreateSignedURL(t *testing.T) { }) t.Run("Non existing container", func(t *testing.T) { - testScope := promutils.NewTestScope() fn := fQNFn["s3"] s, err := NewStowRawStore(fn(container), &mockStowLoc{ ContainerCb: func(id string) (stow.Container, error) { @@ -220,7 +212,7 @@ func TestStowStore_CreateSignedURL(t *testing.T) { } return nil, fmt.Errorf("container is not supported") }, - }, nil, false, testScope) + }, nil, false, metrics) assert.NoError(t, err) _, err = s.CreateSignedURL(context.TODO(), DataReference("s3://container2/path"), SignedURLProperties{}) @@ -229,11 +221,8 @@ func TestStowStore_CreateSignedURL(t *testing.T) { } func TestStowStore_ReadRaw(t *testing.T) { - labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) - const container = "container" t.Run("Happy Path", func(t *testing.T) { - testScope := promutils.NewTestScope() fn := fQNFn["s3"] s, err := NewStowRawStore(fn(container), &mockStowLoc{ ContainerCb: func(id string) (stow.Container, error) { @@ -248,7 +237,7 @@ func TestStowStore_ReadRaw(t *testing.T) { } return nil, fmt.Errorf("container is not supported") }, - }, nil, false, testScope) + }, nil, false, metrics) assert.NoError(t, err) err = s.WriteRaw(context.TODO(), DataReference("s3://container/path"), 0, Options{}, bytes.NewReader([]byte{})) assert.NoError(t, err) @@ -264,7 +253,6 @@ func TestStowStore_ReadRaw(t *testing.T) { }) t.Run("Exceeds limit", func(t *testing.T) { - testScope := promutils.NewTestScope() fn := fQNFn["s3"] s, err := NewStowRawStore(fn(container), &mockStowLoc{ @@ -280,7 +268,7 @@ func TestStowStore_ReadRaw(t *testing.T) { } return nil, fmt.Errorf("container is not supported") }, - }, nil, false, testScope) + }, nil, false, metrics) assert.NoError(t, err) err = s.WriteRaw(context.TODO(), DataReference("s3://container/path"), 3*MiB, Options{}, bytes.NewReader([]byte{})) assert.NoError(t, err) @@ -294,7 +282,6 @@ func TestStowStore_ReadRaw(t *testing.T) { }) t.Run("No Limit", func(t *testing.T) { - testScope := promutils.NewTestScope() fn := fQNFn["s3"] GetConfig().Limits.GetLimitMegabytes = 0 @@ -311,7 +298,7 @@ func TestStowStore_ReadRaw(t *testing.T) { } return nil, fmt.Errorf("container is not supported") }, - }, nil, false, testScope) + }, nil, false, metrics) assert.NoError(t, err) err = s.WriteRaw(context.TODO(), DataReference("s3://container/path"), 3*MiB, Options{}, bytes.NewReader([]byte{})) assert.NoError(t, err) @@ -323,7 +310,6 @@ func TestStowStore_ReadRaw(t *testing.T) { }) t.Run("Happy Path multi-container enabled", func(t *testing.T) { - testScope := promutils.NewTestScope() fn := fQNFn["s3"] s, err := NewStowRawStore(fn(container), &mockStowLoc{ ContainerCb: func(id string) (stow.Container, error) { @@ -340,7 +326,7 @@ func TestStowStore_ReadRaw(t *testing.T) { } return nil, fmt.Errorf("container is not supported") }, - }, nil, true, testScope) + }, nil, true, metrics) assert.NoError(t, err) err = s.WriteRaw(context.TODO(), "s3://bad-container/path", 0, Options{}, bytes.NewReader([]byte{})) assert.NoError(t, err) @@ -357,7 +343,6 @@ func TestStowStore_ReadRaw(t *testing.T) { }) t.Run("Happy Path multi-container bad", func(t *testing.T) { - testScope := promutils.NewTestScope() fn := fQNFn["s3"] s, err := NewStowRawStore(fn(container), &mockStowLoc{ ContainerCb: func(id string) (stow.Container, error) { @@ -372,7 +357,7 @@ func TestStowStore_ReadRaw(t *testing.T) { } return nil, fmt.Errorf("container is not supported") }, - }, nil, true, testScope) + }, nil, true, metrics) assert.NoError(t, err) err = s.WriteRaw(context.TODO(), "s3://bad-container/path", 0, Options{}, bytes.NewReader([]byte{})) assert.Error(t, err) @@ -386,7 +371,6 @@ func TestStowStore_ReadRaw(t *testing.T) { func TestNewLocalStore(t *testing.T) { labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) t.Run("Valid config", func(t *testing.T) { - testScope := promutils.NewTestScope() store, err := newStowRawStore(&Config{ Stow: StowConfig{ Kind: local.Kind, @@ -395,7 +379,7 @@ func TestNewLocalStore(t *testing.T) { }, }, InitContainer: "testdata", - }, testScope.NewSubScope("x")) + }, metrics) assert.NoError(t, err) assert.NotNil(t, store) @@ -409,13 +393,11 @@ func TestNewLocalStore(t *testing.T) { }) t.Run("Invalid config", func(t *testing.T) { - testScope := promutils.NewTestScope() - _, err := newStowRawStore(&Config{}, testScope) + _, err := newStowRawStore(&Config{}, metrics) assert.Error(t, err) }) t.Run("Initialize container", func(t *testing.T) { - testScope := promutils.NewTestScope() tmpDir, err := ioutil.TempDir("", "stdlib_local") assert.NoError(t, err) @@ -431,7 +413,7 @@ func TestNewLocalStore(t *testing.T) { }, }, InitContainer: "tmp", - }, testScope.NewSubScope("y")) + }, metrics) assert.NoError(t, err) assert.NotNil(t, store) @@ -444,7 +426,6 @@ func TestNewLocalStore(t *testing.T) { }) t.Run("missing init container", func(t *testing.T) { - testScope := promutils.NewTestScope() tmpDir, err := ioutil.TempDir("", "stdlib_local") assert.NoError(t, err) @@ -459,14 +440,13 @@ func TestNewLocalStore(t *testing.T) { local.ConfigKeyPath: tmpDir, }, }, - }, testScope.NewSubScope("y")) + }, metrics) assert.Error(t, err) assert.Nil(t, store) }) t.Run("multi-container enabled", func(t *testing.T) { - testScope := promutils.NewTestScope() tmpDir, err := ioutil.TempDir("", "stdlib_local") assert.NoError(t, err) @@ -483,7 +463,7 @@ func TestNewLocalStore(t *testing.T) { }, InitContainer: "tmp", MultiContainerEnabled: true, - }, testScope.NewSubScope("y")) + }, metrics) assert.NoError(t, err) assert.NotNil(t, store) @@ -498,15 +478,14 @@ func TestNewLocalStore(t *testing.T) { func Test_newStowRawStore(t *testing.T) { type args struct { - cfg *Config - metricsScope promutils.Scope + cfg *Config } tests := []struct { name string args args wantErr bool }{ - {"fail", args{&Config{}, promutils.NewTestScope()}, true}, + {"fail", args{&Config{}}, true}, {"google", args{&Config{ InitContainer: "flyte", Stow: StowConfig{ @@ -516,18 +495,18 @@ func Test_newStowRawStore(t *testing.T) { google.ConfigScopes: "y", }, }, - }, promutils.NewTestScope()}, true}, + }}, true}, {"minio", args{&Config{ Type: TypeMinio, InitContainer: "some-container", Connection: ConnectionConfig{ Endpoint: config.URL{URL: utils.MustParseURL("http://minio:9000")}, }, - }, promutils.NewTestScope()}, true}, + }}, true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := newStowRawStore(tt.args.cfg, tt.args.metricsScope) + got, err := newStowRawStore(tt.args.cfg, metrics) if tt.wantErr { assert.Error(t, err, "newStowRawStore() error = %v, wantErr %v", err, tt.wantErr) return @@ -598,7 +577,6 @@ func TestStowStore_WriteRaw(t *testing.T) { const container = "container" fn := fQNFn["s3"] t.Run("create container when not found", func(t *testing.T) { - testScope := promutils.NewTestScope() var createCalled bool s, err := NewStowRawStore(fn(container), &mockStowLoc{ ContainerCb: func(id string) (stow.Container, error) { @@ -618,7 +596,7 @@ func TestStowStore_WriteRaw(t *testing.T) { } return nil, fmt.Errorf("container is not supported") }, - }, nil, true, testScope) + }, nil, true, metrics) assert.NoError(t, err) err = s.WriteRaw(context.TODO(), DataReference("s3://container/path"), 0, Options{}, bytes.NewReader([]byte{})) assert.NoError(t, err) @@ -634,7 +612,6 @@ func TestStowStore_WriteRaw(t *testing.T) { assert.True(t, containerStoredInDynamicContainerMap) }) t.Run("bubble up generic put errors", func(t *testing.T) { - testScope := promutils.NewTestScope() s, err := NewStowRawStore(fn(container), &mockStowLoc{ ContainerCb: func(id string) (stow.Container, error) { if id == container { @@ -646,7 +623,7 @@ func TestStowStore_WriteRaw(t *testing.T) { } return nil, fmt.Errorf("container is not supported") }, - }, nil, true, testScope) + }, nil, true, metrics) assert.NoError(t, err) err = s.WriteRaw(context.TODO(), DataReference("s3://container/path"), 0, Options{}, bytes.NewReader([]byte{})) assert.EqualError(t, err, "Failed to write data [0b] to path [path].: foo") From b9fb0112433aa6d80ac1ced78d0f319e57cfdc10 Mon Sep 17 00:00:00 2001 From: iaroslav-ciupin Date: Mon, 15 Aug 2022 20:39:27 +0300 Subject: [PATCH 3/5] add some tests to improve coverage Signed-off-by: iaroslav-ciupin --- storage/protobuf_store_test.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/storage/protobuf_store_test.go b/storage/protobuf_store_test.go index 7dff69e8..ef874ac0 100644 --- a/storage/protobuf_store_test.go +++ b/storage/protobuf_store_test.go @@ -42,20 +42,36 @@ func (m mockBigDataProtoMessage) String() string { func (mockBigDataProtoMessage) ProtoMessage() { } -func TestDefaultProtobufStore_ReadProtobuf(t *testing.T) { +func TestDefaultProtobufStore(t *testing.T) { t.Run("Read after Write", func(t *testing.T) { testScope := promutils.NewTestScope() s, err := NewDataStore(&Config{Type: TypeMemory}, testScope) assert.NoError(t, err) - err = s.WriteProtobuf(context.TODO(), DataReference("hello"), Options{}, &mockProtoMessage{X: 5}) + err = s.WriteProtobuf(context.TODO(), "hello", Options{}, &mockProtoMessage{X: 5}) assert.NoError(t, err) m := &mockProtoMessage{} - err = s.ReadProtobuf(context.TODO(), DataReference("hello"), m) + err = s.ReadProtobuf(context.TODO(), "hello", m) assert.NoError(t, err) assert.Equal(t, int64(5), m.X) }) + + t.Run("invalid type", func(t *testing.T) { + testScope := promutils.NewTestScope() + + _, err := NewDataStore(&Config{Type: "invalid"}, testScope) + + assert.EqualError(t, err, "type is of an invalid value [invalid]") + }) + + t.Run("coudln't create store", func(t *testing.T) { + testScope := promutils.NewTestScope() + + _, err := NewDataStore(&Config{Type: TypeS3}, testScope) + + assert.EqualError(t, err, "initContainer is required even with `enable-multicontainer`") + }) } func TestDefaultProtobufStore_BigDataReadAfterWrite(t *testing.T) { From be8dd8d70845a52f84b7e11a2e8fc5b4dacbfbdf Mon Sep 17 00:00:00 2001 From: iaroslav-ciupin Date: Tue, 16 Aug 2022 14:17:09 +0300 Subject: [PATCH 4/5] metrics constructors Signed-off-by: iaroslav-ciupin --- storage/cached_rawstore.go | 10 ++++++++++ storage/protobuf_store.go | 13 +++++++++++++ storage/rawstores.go | 36 ++++-------------------------------- storage/stow_store.go | 19 +++++++++++++++++++ 4 files changed, 46 insertions(+), 32 deletions(-) diff --git a/storage/cached_rawstore.go b/storage/cached_rawstore.go index a7dd4229..71f8d668 100644 --- a/storage/cached_rawstore.go +++ b/storage/cached_rawstore.go @@ -5,6 +5,7 @@ import ( "context" "io" "runtime/debug" + "time" "github.com/flyteorg/flytestdlib/errors" @@ -99,6 +100,15 @@ func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference, return err } +func newCacheMetrics(scope promutils.Scope) *cacheMetrics { + return &cacheMetrics{ + FetchLatency: scope.MustNewStopWatch("remote_fetch", "Total Time to read from remote metastore", time.Millisecond), + CacheHit: scope.MustNewCounter("cache_hit", "Number of times metadata was found in cache"), + CacheMiss: scope.MustNewCounter("cache_miss", "Number of times metadata was not found in cache and remote fetch was required"), + CacheWriteError: scope.MustNewCounter("cache_write_err", "Failed to write to cache"), + } +} + // Creates a CachedStore if Caching is enabled, otherwise returns a RawStore func newCachedRawStore(cfg *Config, store RawStore, metrics *cacheMetrics) RawStore { if cfg.Cache.MaxSizeMegabytes > 0 { diff --git a/storage/protobuf_store.go b/storage/protobuf_store.go index ac3b56db..1eca5096 100644 --- a/storage/protobuf_store.go +++ b/storage/protobuf_store.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "time" "github.com/golang/protobuf/proto" errs "github.com/pkg/errors" @@ -79,6 +80,18 @@ func (s DefaultProtobufStore) WriteProtobuf(ctx context.Context, reference DataR return nil } +func newProtoMetrics(scope promutils.Scope) *protoMetrics { + return &protoMetrics{ + FetchLatency: scope.MustNewStopWatch("proto_fetch", "Time to read data before unmarshalling", time.Millisecond), + MarshalTime: scope.MustNewStopWatch("marshal", "Time incurred in marshalling data before writing", time.Millisecond), + UnmarshalTime: scope.MustNewStopWatch("unmarshal", "Time incurred in unmarshalling received data", time.Millisecond), + MarshalFailure: scope.MustNewCounter("marshal_failure", "Failures when marshalling"), + UnmarshalFailure: scope.MustNewCounter("unmarshal_failure", "Failures when unmarshalling"), + WriteFailureUnrelatedToCache: scope.MustNewCounter("write_failure_unrelated_to_cache", "Raw store write failures that are not caused by ErrFailedToWriteCache"), + ReadFailureUnrelatedToCache: scope.MustNewCounter("read_failure_unrelated_to_cache", "Raw store read failures that are not caused by ErrFailedToWriteCache"), + } +} + func NewDefaultProtobufStore(store RawStore, metrics *protoMetrics) DefaultProtobufStore { return DefaultProtobufStore{ RawStore: store, diff --git a/storage/rawstores.go b/storage/rawstores.go index a075fca0..3bf1c18a 100644 --- a/storage/rawstores.go +++ b/storage/rawstores.go @@ -3,10 +3,8 @@ package storage import ( "fmt" "net/http" - "time" "github.com/flyteorg/flytestdlib/promutils" - "github.com/flyteorg/flytestdlib/promutils/labeled" ) type dataStoreCreateFn func(cfg *Config, metrics *dataStoreMetrics) (RawStore, error) @@ -65,37 +63,11 @@ type dataStoreMetrics struct { // newDataStoreMetrics initialises all metrics required for DataStore func newDataStoreMetrics(scope promutils.Scope) *dataStoreMetrics { - failureTypeOption := labeled.AdditionalLabelsOption{Labels: []string{FailureTypeLabel.String()}} return &dataStoreMetrics{ - cacheMetrics: &cacheMetrics{ - FetchLatency: scope.MustNewStopWatch("remote_fetch", "Total Time to read from remote metastore", time.Millisecond), - CacheHit: scope.MustNewCounter("cache_hit", "Number of times metadata was found in cache"), - CacheMiss: scope.MustNewCounter("cache_miss", "Number of times metadata was not found in cache and remote fetch was required"), - CacheWriteError: scope.MustNewCounter("cache_write_err", "Failed to write to cache"), - }, - protoMetrics: &protoMetrics{ - FetchLatency: scope.MustNewStopWatch("proto_fetch", "Time to read data before unmarshalling", time.Millisecond), - MarshalTime: scope.MustNewStopWatch("marshal", "Time incurred in marshalling data before writing", time.Millisecond), - UnmarshalTime: scope.MustNewStopWatch("unmarshal", "Time incurred in unmarshalling received data", time.Millisecond), - MarshalFailure: scope.MustNewCounter("marshal_failure", "Failures when marshalling"), - UnmarshalFailure: scope.MustNewCounter("unmarshal_failure", "Failures when unmarshalling"), - WriteFailureUnrelatedToCache: scope.MustNewCounter("write_failure_unrelated_to_cache", "Raw store write failures that are not caused by ErrFailedToWriteCache"), - ReadFailureUnrelatedToCache: scope.MustNewCounter("read_failure_unrelated_to_cache", "Raw store read failures that are not caused by ErrFailedToWriteCache"), - }, - copyMetrics: newCopyMetrics(scope.NewSubScope("copy")), - stowMetrics: &stowMetrics{ - BadReference: labeled.NewCounter("bad_key", "Indicates the provided storage reference/key is incorrectly formatted", scope, labeled.EmitUnlabeledMetric), - BadContainer: labeled.NewCounter("bad_container", "Indicates request for a container that has not been initialized", scope, labeled.EmitUnlabeledMetric), - - HeadFailure: labeled.NewCounter("head_failure", "Indicates failure in HEAD for a given reference", scope, labeled.EmitUnlabeledMetric), - HeadLatency: labeled.NewStopWatch("head", "Indicates time to fetch metadata using the Head API", time.Millisecond, scope, labeled.EmitUnlabeledMetric), - - ReadFailure: labeled.NewCounter("read_failure", "Indicates failure in GET for a given reference", scope, labeled.EmitUnlabeledMetric, failureTypeOption), - ReadOpenLatency: labeled.NewStopWatch("read_open", "Indicates time to first byte when reading", time.Millisecond, scope, labeled.EmitUnlabeledMetric), - - WriteFailure: labeled.NewCounter("write_failure", "Indicates failure in storing/PUT for a given reference", scope, labeled.EmitUnlabeledMetric, failureTypeOption), - WriteLatency: labeled.NewStopWatch("write", "Time to write an object irrespective of size", time.Millisecond, scope, labeled.EmitUnlabeledMetric), - }, + cacheMetrics: newCacheMetrics(scope), + protoMetrics: newProtoMetrics(scope), + copyMetrics: newCopyMetrics(scope.NewSubScope("copy")), + stowMetrics: newStowMetrics(scope), } } diff --git a/storage/stow_store.go b/storage/stow_store.go index 99007542..d5132d54 100644 --- a/storage/stow_store.go +++ b/storage/stow_store.go @@ -7,6 +7,7 @@ import ( "net/url" "strconv" "sync" + "time" "github.com/aws/aws-sdk-go/aws/awserr" s32 "github.com/aws/aws-sdk-go/service/s3" @@ -22,6 +23,7 @@ import ( "github.com/flyteorg/flytestdlib/contextutils" "github.com/flyteorg/flytestdlib/errors" "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/promutils/labeled" ) @@ -363,6 +365,23 @@ func NewStowRawStore(baseContainerFQN DataReference, loc, signedURLLoc stow.Loca return self, nil } +func newStowMetrics(scope promutils.Scope) *stowMetrics { + failureTypeOption := labeled.AdditionalLabelsOption{Labels: []string{FailureTypeLabel.String()}} + return &stowMetrics{ + BadReference: labeled.NewCounter("bad_key", "Indicates the provided storage reference/key is incorrectly formatted", scope, labeled.EmitUnlabeledMetric), + BadContainer: labeled.NewCounter("bad_container", "Indicates request for a container that has not been initialized", scope, labeled.EmitUnlabeledMetric), + + HeadFailure: labeled.NewCounter("head_failure", "Indicates failure in HEAD for a given reference", scope, labeled.EmitUnlabeledMetric), + HeadLatency: labeled.NewStopWatch("head", "Indicates time to fetch metadata using the Head API", time.Millisecond, scope, labeled.EmitUnlabeledMetric), + + ReadFailure: labeled.NewCounter("read_failure", "Indicates failure in GET for a given reference", scope, labeled.EmitUnlabeledMetric, failureTypeOption), + ReadOpenLatency: labeled.NewStopWatch("read_open", "Indicates time to first byte when reading", time.Millisecond, scope, labeled.EmitUnlabeledMetric), + + WriteFailure: labeled.NewCounter("write_failure", "Indicates failure in storing/PUT for a given reference", scope, labeled.EmitUnlabeledMetric, failureTypeOption), + WriteLatency: labeled.NewStopWatch("write", "Time to write an object irrespective of size", time.Millisecond, scope, labeled.EmitUnlabeledMetric), + } +} + // Constructor for the StowRawStore func newStowRawStore(cfg *Config, metrics *dataStoreMetrics) (RawStore, error) { if cfg.InitContainer == "" { From 239a484cd904aa3f135030960e9330b2e29c22b7 Mon Sep 17 00:00:00 2001 From: iaroslav-ciupin Date: Tue, 16 Aug 2022 19:45:26 +0300 Subject: [PATCH 5/5] add godoc Signed-off-by: iaroslav-ciupin --- storage/rawstores.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/storage/rawstores.go b/storage/rawstores.go index 3bf1c18a..0e163d34 100644 --- a/storage/rawstores.go +++ b/storage/rawstores.go @@ -85,6 +85,8 @@ func NewCompositeDataStore(refConstructor ReferenceConstructor, composedProtobuf } } +// RefreshConfig re-initialises the data store client leaving metrics untouched. +// This is NOT thread-safe! func (ds *DataStore) RefreshConfig(cfg *Config) error { defaultClient := http.DefaultClient defer func() {