Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disallow direct implementation of retrieved #4577

Merged
merged 1 commit into from
Dec 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

## 🛑 Breaking changes 🛑

- Remove configmapprovider.NewInMemory() (#4507)
- Remove `configmapprovider.NewInMemory()` (#4507)
- Disallow direct implementation of `configmapprovider.Retrieved` (#4577)

## 💡 Enhancements 💡

Expand Down
4 changes: 3 additions & 1 deletion config/configmapprovider/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func (emp *envMapProvider) Retrieve(_ context.Context, _ func(*ChangeEvent)) (Re
return nil, fmt.Errorf("unable to parse yaml: %w", err)
}

return &simpleRetrieved{confMap: config.NewMapFromStringMap(data)}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) {
return config.NewMapFromStringMap(data), nil
})
}

func (*envMapProvider) Shutdown(context.Context) error {
Expand Down
6 changes: 5 additions & 1 deletion config/configmapprovider/expand.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"os"

"go.opentelemetry.io/collector/config"
)

type expandMapProvider struct {
Expand All @@ -44,7 +46,9 @@ func (emp *expandMapProvider) Retrieve(ctx context.Context, onChange func(*Chang
for _, k := range cfgMap.AllKeys() {
cfgMap.Set(k, expandStringValues(cfgMap.Get(k)))
}
return &simpleRetrieved{confMap: cfgMap, closeFunc: retr.Close}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) {
return cfgMap, nil
}, WithClose(retr.Close))
}

func (emp *expandMapProvider) Shutdown(ctx context.Context) error {
Expand Down
20 changes: 10 additions & 10 deletions config/configmapprovider/expand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,36 @@ func TestBaseRetrieveFailsOnRetrieve(t *testing.T) {
exp := NewExpand(&mockProvider{retrieveErr: retErr})
t.Cleanup(func() { require.NoError(t, exp.Shutdown(context.Background())) })
_, err := exp.Retrieve(context.Background(), nil)
require.Error(t, err)
require.ErrorIs(t, err, retErr)
assert.Error(t, err)
assert.ErrorIs(t, err, retErr)
}

func TestBaseRetrieveFailsOnGet(t *testing.T) {
getErr := errors.New("test error")
exp := NewExpand(&mockProvider{retrieved: &mockRetrieved{getErr: getErr}})
exp := NewExpand(&mockProvider{retrieved: newErrGetRetrieved(getErr)})
t.Cleanup(func() { require.NoError(t, exp.Shutdown(context.Background())) })
_, err := exp.Retrieve(context.Background(), nil)
require.Error(t, err)
require.ErrorIs(t, err, getErr)
assert.Error(t, err)
assert.ErrorIs(t, err, getErr)
}

func TestBaseRetrieveFailsOnClose(t *testing.T) {
closeErr := errors.New("test error")
exp := NewExpand(&mockProvider{retrieved: &mockRetrieved{closeErr: closeErr}})
exp := NewExpand(&mockProvider{retrieved: newErrCloseRetrieved(closeErr)})
t.Cleanup(func() { require.NoError(t, exp.Shutdown(context.Background())) })
ret, err := exp.Retrieve(context.Background(), nil)
require.NoError(t, err)
err = ret.Close(context.Background())
require.Error(t, err)
require.ErrorIs(t, err, closeErr)
assert.Error(t, err)
assert.ErrorIs(t, err, closeErr)
}

func TestBaseRetrieveFailsOnShutdown(t *testing.T) {
shutdownErr := errors.New("test error")
exp := NewExpand(&mockProvider{shutdownErr: shutdownErr})
err := exp.Shutdown(context.Background())
require.Error(t, err)
require.ErrorIs(t, err, shutdownErr)
assert.Error(t, err)
assert.ErrorIs(t, err, shutdownErr)
}

func TestExpand(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion config/configmapprovider/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func (fmp *fileMapProvider) Retrieve(_ context.Context, _ func(*ChangeEvent)) (R
return nil, fmt.Errorf("unable to parse yaml: %w", err)
}

return &simpleRetrieved{confMap: config.NewMapFromStringMap(data)}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) {
return config.NewMapFromStringMap(data), nil
})
}

func (*fileMapProvider) Shutdown(context.Context) error {
Expand Down
18 changes: 11 additions & 7 deletions config/configmapprovider/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,17 @@ func (mp *mergeMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeE
}
retrs = append(retrs, retr)
}
return &simpleRetrieved{confMap: retCfgMap, closeFunc: func(ctxF context.Context) error {
var err error
for _, ret := range retrs {
err = multierr.Append(err, ret.Close(ctxF))
}
return err
}}, nil
return NewRetrieved(
func(ctx context.Context) (*config.Map, error) {
return retCfgMap, nil
},
WithClose(func(ctxF context.Context) error {
var err error
for _, ret := range retrs {
err = multierr.Append(err, ret.Close(ctxF))
}
return err
}))
}

func (mp *mergeMapProvider) Shutdown(ctx context.Context) error {
Expand Down
21 changes: 14 additions & 7 deletions config/configmapprovider/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,30 @@ import (
)

func TestMerge_GetError(t *testing.T) {
pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: &mockRetrieved{getErr: errors.New("my error")}})
getErr := errors.New("test error")
pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: newErrGetRetrieved(getErr)})
require.NotNil(t, pl)
cp, err := pl.Retrieve(context.Background(), nil)
_, err := pl.Retrieve(context.Background(), nil)
assert.Error(t, err)
assert.Nil(t, cp)
assert.ErrorIs(t, err, getErr)
}

func TestMerge_CloseError(t *testing.T) {
pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: &mockRetrieved{closeErr: errors.New("my error")}})
closeErr := errors.New("test error")
pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: newErrCloseRetrieved(closeErr)})
require.NotNil(t, pl)
cp, err := pl.Retrieve(context.Background(), nil)
assert.NoError(t, err)
assert.Error(t, cp.Close(context.Background()))
err = cp.Close(context.Background())
assert.Error(t, err)
assert.ErrorIs(t, err, closeErr)
}

func TestMerge_ShutdownError(t *testing.T) {
pl := NewMerge(&mockProvider{}, &mockProvider{shutdownErr: errors.New("my error")})
shutdownErr := errors.New("test error")
pl := NewMerge(&mockProvider{}, &mockProvider{shutdownErr: shutdownErr})
require.NotNil(t, pl)
assert.Error(t, pl.Shutdown(context.Background()))
err := pl.Shutdown(context.Background())
assert.Error(t, err)
assert.ErrorIs(t, err, shutdownErr)
}
28 changes: 9 additions & 19 deletions config/configmapprovider/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (m *mockProvider) Retrieve(context.Context, func(*ChangeEvent)) (Retrieved,
return nil, m.retrieveErr
}
if m.retrieved == nil {
return &mockRetrieved{}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) { return config.NewMap(), nil })
}
return m.retrieved, nil
}
Expand All @@ -43,24 +43,14 @@ func (m *mockProvider) Shutdown(context.Context) error {
return m.shutdownErr
}

type mockRetrieved struct {
cfg *config.Map
getErr error
closeErr error
func newErrGetRetrieved(getErr error) Retrieved {
ret, _ := NewRetrieved(func(ctx context.Context) (*config.Map, error) { return nil, getErr })
return ret
}

var _ Retrieved = &mockRetrieved{}

func (sr *mockRetrieved) Get(context.Context) (*config.Map, error) {
if sr.getErr != nil {
return nil, sr.getErr
}
if sr.cfg == nil {
return config.NewMap(), nil
}
return sr.cfg, nil
}

func (sr *mockRetrieved) Close(context.Context) error {
return sr.closeErr
func newErrCloseRetrieved(closeErr error) Retrieved {
ret, _ := NewRetrieved(
func(ctx context.Context) (*config.Map, error) { return config.NewMap(), nil },
WithClose(func(ctx context.Context) error { return closeErr }))
return ret
}
8 changes: 6 additions & 2 deletions config/configmapprovider/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ func NewProperties(properties []string) Provider {

func (pmp *propertiesMapProvider) Retrieve(_ context.Context, onChange func(*ChangeEvent)) (Retrieved, error) {
if len(pmp.properties) == 0 {
return &simpleRetrieved{confMap: config.NewMap()}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) {
return config.NewMap(), nil
})
}

b := &bytes.Buffer{}
Expand All @@ -67,7 +69,9 @@ func (pmp *propertiesMapProvider) Retrieve(_ context.Context, onChange func(*Cha
}
prop := maps.Unflatten(parsed, ".")

return &simpleRetrieved{confMap: config.NewMapFromStringMap(prop)}, nil
return NewRetrieved(func(ctx context.Context) (*config.Map, error) {
return config.NewMapFromStringMap(prop), nil
})
}

func (*propertiesMapProvider) Shutdown(context.Context) error {
Expand Down
53 changes: 14 additions & 39 deletions config/configmapprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,25 @@ package configmapprovider // import "go.opentelemetry.io/collector/config/config

import (
"context"

"go.opentelemetry.io/collector/config"
)

// Provider is an interface that helps to retrieve a config map and watch for any
// changes to the config map. Implementations may load the config from a file,
// a database or any other source.
//
// The typical usage is the following:
//
// r := mapProvider.Retrieve()
// r.Get()
// // wait for onChange() to be called.
// r.Close()
// r = mapProvider.Retrieve()
// r.Get()
// // wait for onChange() to be called.
// r.Close()
// // repeat Retrieve/Get/wait/Close cycle until it is time to shut down the Collector process.
// // ...
// mapProvider.Shutdown()
type Provider interface {
// Retrieve goes to the configuration source and retrieves the selected data which
// contains the value to be injected in the configuration and the corresponding watcher that
Expand Down Expand Up @@ -50,43 +62,6 @@ type Provider interface {
Shutdown(ctx context.Context) error
}

// Retrieved holds the result of a call to the Retrieve method of a Provider object.
//
// The typical usage is the following:
//
// r := mapProvider.Retrieve()
// r.Get()
// // wait for onChange() to be called.
// r.Close()
// r = mapProvider.Retrieve()
// r.Get()
// // wait for onChange() to be called.
// r.Close()
// // repeat Retrieve/Get/wait/Close cycle until it is time to shut down the Collector process.
// // ...
// mapProvider.Shutdown()
type Retrieved interface {
// Get returns the config Map.
// If Close is called before Get or concurrently with Get then Get
// should return immediately with ErrSessionClosed error.
// Should never be called concurrently with itself.
// If ctx is cancelled should return immediately with an error.
Get(ctx context.Context) (*config.Map, error)

// Close signals that the configuration for which it was used to retrieve values is
// no longer in use and the object should close and release any watchers that it
// may have created.
//
// This method must be called when the service ends, either in case of success or error.
//
// Should never be called concurrently with itself.
// May be called before, after or concurrently with Get.
// If ctx is cancelled should return immediately with an error.
//
// Calling Close on an already closed object should have no effect and should return nil.
Close(ctx context.Context) error
}

// ChangeEvent describes the particular change event that happened with the config.
// TODO: see if this can be eliminated.
type ChangeEvent struct {
Expand Down
100 changes: 100 additions & 0 deletions config/configmapprovider/retrieved.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configmapprovider // import "go.opentelemetry.io/collector/config/configmapprovider"

import (
"context"
"errors"

"go.opentelemetry.io/collector/config"
)

// Retrieved holds the result of a call to the Retrieve method of a Provider object.
// This interface cannot be directly implemented. Implementations must use the NewRetrieved helper.
type Retrieved interface {
// Get returns the config Map.
// If Close is called before Get or concurrently with Get then Get
// should return immediately with ErrSessionClosed error.
// Should never be called concurrently with itself.
// If ctx is cancelled should return immediately with an error.
Get(ctx context.Context) (*config.Map, error)

// Close signals that the configuration for which it was used to retrieve values is
// no longer in use and the object should close and release any watchers that it
// may have created.
//
// This method must be called when the service ends, either in case of success or error.
//
// Should never be called concurrently with itself.
// May be called before, after or concurrently with Get.
// If ctx is cancelled should return immediately with an error.
//
// Calling Close on an already closed object should have no effect and should return nil.
Close(ctx context.Context) error

// privateRetrieved is an unexported func to disallow direct implementation.
privateRetrieved()
}

// GetFunc specifies the function invoked when the Retrieved.Get is being called.
type GetFunc func(context.Context) (*config.Map, error)

// Get implements the Retrieved.Get.
func (f GetFunc) Get(ctx context.Context) (*config.Map, error) {
return f(ctx)
}

// CloseFunc specifies the function invoked when the Retrieved.Close is being called.
type CloseFunc func(context.Context) error

// Close implements the Retrieved.Close.
func (f CloseFunc) Close(ctx context.Context) error {
if f == nil {
return nil
}
return f(ctx)
}

// RetrievedOption represents the possible options for NewRetrieved.
type RetrievedOption func(*retrieved)

// WithClose overrides the default `Close` function for a Retrieved.
// The default always returns nil.
func WithClose(closeFunc CloseFunc) RetrievedOption {
return func(o *retrieved) {
o.CloseFunc = closeFunc
}
}

type retrieved struct {
GetFunc
CloseFunc
}

func (retrieved) privateRetrieved() {}

// NewRetrieved returns a Retrieved configured with the provided options.
func NewRetrieved(getFunc GetFunc, options ...RetrievedOption) (Retrieved, error) {
if getFunc == nil {
return nil, errors.New("nil getFunc")
}
ret := &retrieved{
GetFunc: getFunc,
}
for _, op := range options {
op(ret)
}
return ret, nil
}
Loading