diff --git a/CHANGELOG.md b/CHANGELOG.md index e381b99dadf..f4ab1f84be5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,8 @@ ## 🛑 Breaking changes 🛑 -- Remove configmapprovider.NewInMemory() (#4507) +- Remove `configmapprovider.NewInMemory()` (#4507) +- Disallow direct implementation of `configmapprovider.Retrieved` (#4577) ## 💡 Enhancements 💡 diff --git a/config/configmapprovider/env.go b/config/configmapprovider/env.go index 171b267e11d..6e7e0e51d23 100644 --- a/config/configmapprovider/env.go +++ b/config/configmapprovider/env.go @@ -48,7 +48,9 @@ func (emp *envMapProvider) Retrieve(_ context.Context, _ func(*ChangeEvent)) (Re return nil, fmt.Errorf("unable to parse yaml: %w", err) } - return &simpleRetrieved{confMap: config.NewMapFromStringMap(data)}, nil + return NewRetrieved(func(ctx context.Context) (*config.Map, error) { + return config.NewMapFromStringMap(data), nil + }) } func (*envMapProvider) Shutdown(context.Context) error { diff --git a/config/configmapprovider/expand.go b/config/configmapprovider/expand.go index 831d3d90952..2fcbb1e7fe4 100644 --- a/config/configmapprovider/expand.go +++ b/config/configmapprovider/expand.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "os" + + "go.opentelemetry.io/collector/config" ) type expandMapProvider struct { @@ -44,7 +46,9 @@ func (emp *expandMapProvider) Retrieve(ctx context.Context, onChange func(*Chang for _, k := range cfgMap.AllKeys() { cfgMap.Set(k, expandStringValues(cfgMap.Get(k))) } - return &simpleRetrieved{confMap: cfgMap, closeFunc: retr.Close}, nil + return NewRetrieved(func(ctx context.Context) (*config.Map, error) { + return cfgMap, nil + }, WithClose(retr.Close)) } func (emp *expandMapProvider) Shutdown(ctx context.Context) error { diff --git a/config/configmapprovider/expand_test.go b/config/configmapprovider/expand_test.go index 03abd17aab9..982770ec63b 100644 --- a/config/configmapprovider/expand_test.go +++ b/config/configmapprovider/expand_test.go @@ -30,36 +30,36 @@ func TestBaseRetrieveFailsOnRetrieve(t *testing.T) { exp := NewExpand(&mockProvider{retrieveErr: retErr}) t.Cleanup(func() { require.NoError(t, exp.Shutdown(context.Background())) }) _, err := exp.Retrieve(context.Background(), nil) - require.Error(t, err) - require.ErrorIs(t, err, retErr) + assert.Error(t, err) + assert.ErrorIs(t, err, retErr) } func TestBaseRetrieveFailsOnGet(t *testing.T) { getErr := errors.New("test error") - exp := NewExpand(&mockProvider{retrieved: &mockRetrieved{getErr: getErr}}) + exp := NewExpand(&mockProvider{retrieved: newErrGetRetrieved(getErr)}) t.Cleanup(func() { require.NoError(t, exp.Shutdown(context.Background())) }) _, err := exp.Retrieve(context.Background(), nil) - require.Error(t, err) - require.ErrorIs(t, err, getErr) + assert.Error(t, err) + assert.ErrorIs(t, err, getErr) } func TestBaseRetrieveFailsOnClose(t *testing.T) { closeErr := errors.New("test error") - exp := NewExpand(&mockProvider{retrieved: &mockRetrieved{closeErr: closeErr}}) + exp := NewExpand(&mockProvider{retrieved: newErrCloseRetrieved(closeErr)}) t.Cleanup(func() { require.NoError(t, exp.Shutdown(context.Background())) }) ret, err := exp.Retrieve(context.Background(), nil) require.NoError(t, err) err = ret.Close(context.Background()) - require.Error(t, err) - require.ErrorIs(t, err, closeErr) + assert.Error(t, err) + assert.ErrorIs(t, err, closeErr) } func TestBaseRetrieveFailsOnShutdown(t *testing.T) { shutdownErr := errors.New("test error") exp := NewExpand(&mockProvider{shutdownErr: shutdownErr}) err := exp.Shutdown(context.Background()) - require.Error(t, err) - require.ErrorIs(t, err, shutdownErr) + assert.Error(t, err) + assert.ErrorIs(t, err, shutdownErr) } func TestExpand(t *testing.T) { diff --git a/config/configmapprovider/file.go b/config/configmapprovider/file.go index de2a115ec12..0423080b269 100644 --- a/config/configmapprovider/file.go +++ b/config/configmapprovider/file.go @@ -51,7 +51,9 @@ func (fmp *fileMapProvider) Retrieve(_ context.Context, _ func(*ChangeEvent)) (R return nil, fmt.Errorf("unable to parse yaml: %w", err) } - return &simpleRetrieved{confMap: config.NewMapFromStringMap(data)}, nil + return NewRetrieved(func(ctx context.Context) (*config.Map, error) { + return config.NewMapFromStringMap(data), nil + }) } func (*fileMapProvider) Shutdown(context.Context) error { diff --git a/config/configmapprovider/merge.go b/config/configmapprovider/merge.go index e6fc5d3fde2..014c4fbdc77 100644 --- a/config/configmapprovider/merge.go +++ b/config/configmapprovider/merge.go @@ -52,13 +52,17 @@ func (mp *mergeMapProvider) Retrieve(ctx context.Context, onChange func(*ChangeE } retrs = append(retrs, retr) } - return &simpleRetrieved{confMap: retCfgMap, closeFunc: func(ctxF context.Context) error { - var err error - for _, ret := range retrs { - err = multierr.Append(err, ret.Close(ctxF)) - } - return err - }}, nil + return NewRetrieved( + func(ctx context.Context) (*config.Map, error) { + return retCfgMap, nil + }, + WithClose(func(ctxF context.Context) error { + var err error + for _, ret := range retrs { + err = multierr.Append(err, ret.Close(ctxF)) + } + return err + })) } func (mp *mergeMapProvider) Shutdown(ctx context.Context) error { diff --git a/config/configmapprovider/merge_test.go b/config/configmapprovider/merge_test.go index 385144a04a7..9c702691d86 100644 --- a/config/configmapprovider/merge_test.go +++ b/config/configmapprovider/merge_test.go @@ -24,23 +24,30 @@ import ( ) func TestMerge_GetError(t *testing.T) { - pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: &mockRetrieved{getErr: errors.New("my error")}}) + getErr := errors.New("test error") + pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: newErrGetRetrieved(getErr)}) require.NotNil(t, pl) - cp, err := pl.Retrieve(context.Background(), nil) + _, err := pl.Retrieve(context.Background(), nil) assert.Error(t, err) - assert.Nil(t, cp) + assert.ErrorIs(t, err, getErr) } func TestMerge_CloseError(t *testing.T) { - pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: &mockRetrieved{closeErr: errors.New("my error")}}) + closeErr := errors.New("test error") + pl := NewMerge(&mockProvider{}, &mockProvider{retrieved: newErrCloseRetrieved(closeErr)}) require.NotNil(t, pl) cp, err := pl.Retrieve(context.Background(), nil) assert.NoError(t, err) - assert.Error(t, cp.Close(context.Background())) + err = cp.Close(context.Background()) + assert.Error(t, err) + assert.ErrorIs(t, err, closeErr) } func TestMerge_ShutdownError(t *testing.T) { - pl := NewMerge(&mockProvider{}, &mockProvider{shutdownErr: errors.New("my error")}) + shutdownErr := errors.New("test error") + pl := NewMerge(&mockProvider{}, &mockProvider{shutdownErr: shutdownErr}) require.NotNil(t, pl) - assert.Error(t, pl.Shutdown(context.Background())) + err := pl.Shutdown(context.Background()) + assert.Error(t, err) + assert.ErrorIs(t, err, shutdownErr) } diff --git a/config/configmapprovider/mock_test.go b/config/configmapprovider/mock_test.go index c001e60bd90..fc5628796a5 100644 --- a/config/configmapprovider/mock_test.go +++ b/config/configmapprovider/mock_test.go @@ -34,7 +34,7 @@ func (m *mockProvider) Retrieve(context.Context, func(*ChangeEvent)) (Retrieved, return nil, m.retrieveErr } if m.retrieved == nil { - return &mockRetrieved{}, nil + return NewRetrieved(func(ctx context.Context) (*config.Map, error) { return config.NewMap(), nil }) } return m.retrieved, nil } @@ -43,24 +43,14 @@ func (m *mockProvider) Shutdown(context.Context) error { return m.shutdownErr } -type mockRetrieved struct { - cfg *config.Map - getErr error - closeErr error +func newErrGetRetrieved(getErr error) Retrieved { + ret, _ := NewRetrieved(func(ctx context.Context) (*config.Map, error) { return nil, getErr }) + return ret } -var _ Retrieved = &mockRetrieved{} - -func (sr *mockRetrieved) Get(context.Context) (*config.Map, error) { - if sr.getErr != nil { - return nil, sr.getErr - } - if sr.cfg == nil { - return config.NewMap(), nil - } - return sr.cfg, nil -} - -func (sr *mockRetrieved) Close(context.Context) error { - return sr.closeErr +func newErrCloseRetrieved(closeErr error) Retrieved { + ret, _ := NewRetrieved( + func(ctx context.Context) (*config.Map, error) { return config.NewMap(), nil }, + WithClose(func(ctx context.Context) error { return closeErr })) + return ret } diff --git a/config/configmapprovider/properties.go b/config/configmapprovider/properties.go index 4cb13e82f3f..39756936910 100644 --- a/config/configmapprovider/properties.go +++ b/config/configmapprovider/properties.go @@ -42,7 +42,9 @@ func NewProperties(properties []string) Provider { func (pmp *propertiesMapProvider) Retrieve(_ context.Context, onChange func(*ChangeEvent)) (Retrieved, error) { if len(pmp.properties) == 0 { - return &simpleRetrieved{confMap: config.NewMap()}, nil + return NewRetrieved(func(ctx context.Context) (*config.Map, error) { + return config.NewMap(), nil + }) } b := &bytes.Buffer{} @@ -67,7 +69,9 @@ func (pmp *propertiesMapProvider) Retrieve(_ context.Context, onChange func(*Cha } prop := maps.Unflatten(parsed, ".") - return &simpleRetrieved{confMap: config.NewMapFromStringMap(prop)}, nil + return NewRetrieved(func(ctx context.Context) (*config.Map, error) { + return config.NewMapFromStringMap(prop), nil + }) } func (*propertiesMapProvider) Shutdown(context.Context) error { diff --git a/config/configmapprovider/provider.go b/config/configmapprovider/provider.go index 49c79350ed9..61bcb1d41a1 100644 --- a/config/configmapprovider/provider.go +++ b/config/configmapprovider/provider.go @@ -16,13 +16,25 @@ package configmapprovider // import "go.opentelemetry.io/collector/config/config import ( "context" - - "go.opentelemetry.io/collector/config" ) // Provider is an interface that helps to retrieve a config map and watch for any // changes to the config map. Implementations may load the config from a file, // a database or any other source. +// +// The typical usage is the following: +// +// r := mapProvider.Retrieve() +// r.Get() +// // wait for onChange() to be called. +// r.Close() +// r = mapProvider.Retrieve() +// r.Get() +// // wait for onChange() to be called. +// r.Close() +// // repeat Retrieve/Get/wait/Close cycle until it is time to shut down the Collector process. +// // ... +// mapProvider.Shutdown() type Provider interface { // Retrieve goes to the configuration source and retrieves the selected data which // contains the value to be injected in the configuration and the corresponding watcher that @@ -50,43 +62,6 @@ type Provider interface { Shutdown(ctx context.Context) error } -// Retrieved holds the result of a call to the Retrieve method of a Provider object. -// -// The typical usage is the following: -// -// r := mapProvider.Retrieve() -// r.Get() -// // wait for onChange() to be called. -// r.Close() -// r = mapProvider.Retrieve() -// r.Get() -// // wait for onChange() to be called. -// r.Close() -// // repeat Retrieve/Get/wait/Close cycle until it is time to shut down the Collector process. -// // ... -// mapProvider.Shutdown() -type Retrieved interface { - // Get returns the config Map. - // If Close is called before Get or concurrently with Get then Get - // should return immediately with ErrSessionClosed error. - // Should never be called concurrently with itself. - // If ctx is cancelled should return immediately with an error. - Get(ctx context.Context) (*config.Map, error) - - // Close signals that the configuration for which it was used to retrieve values is - // no longer in use and the object should close and release any watchers that it - // may have created. - // - // This method must be called when the service ends, either in case of success or error. - // - // Should never be called concurrently with itself. - // May be called before, after or concurrently with Get. - // If ctx is cancelled should return immediately with an error. - // - // Calling Close on an already closed object should have no effect and should return nil. - Close(ctx context.Context) error -} - // ChangeEvent describes the particular change event that happened with the config. // TODO: see if this can be eliminated. type ChangeEvent struct { diff --git a/config/configmapprovider/retrieved.go b/config/configmapprovider/retrieved.go new file mode 100644 index 00000000000..5f4c0b8510e --- /dev/null +++ b/config/configmapprovider/retrieved.go @@ -0,0 +1,100 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package configmapprovider // import "go.opentelemetry.io/collector/config/configmapprovider" + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/config" +) + +// Retrieved holds the result of a call to the Retrieve method of a Provider object. +// This interface cannot be directly implemented. Implementations must use the NewRetrieved helper. +type Retrieved interface { + // Get returns the config Map. + // If Close is called before Get or concurrently with Get then Get + // should return immediately with ErrSessionClosed error. + // Should never be called concurrently with itself. + // If ctx is cancelled should return immediately with an error. + Get(ctx context.Context) (*config.Map, error) + + // Close signals that the configuration for which it was used to retrieve values is + // no longer in use and the object should close and release any watchers that it + // may have created. + // + // This method must be called when the service ends, either in case of success or error. + // + // Should never be called concurrently with itself. + // May be called before, after or concurrently with Get. + // If ctx is cancelled should return immediately with an error. + // + // Calling Close on an already closed object should have no effect and should return nil. + Close(ctx context.Context) error + + // privateRetrieved is an unexported func to disallow direct implementation. + privateRetrieved() +} + +// GetFunc specifies the function invoked when the Retrieved.Get is being called. +type GetFunc func(context.Context) (*config.Map, error) + +// Get implements the Retrieved.Get. +func (f GetFunc) Get(ctx context.Context) (*config.Map, error) { + return f(ctx) +} + +// CloseFunc specifies the function invoked when the Retrieved.Close is being called. +type CloseFunc func(context.Context) error + +// Close implements the Retrieved.Close. +func (f CloseFunc) Close(ctx context.Context) error { + if f == nil { + return nil + } + return f(ctx) +} + +// RetrievedOption represents the possible options for NewRetrieved. +type RetrievedOption func(*retrieved) + +// WithClose overrides the default `Close` function for a Retrieved. +// The default always returns nil. +func WithClose(closeFunc CloseFunc) RetrievedOption { + return func(o *retrieved) { + o.CloseFunc = closeFunc + } +} + +type retrieved struct { + GetFunc + CloseFunc +} + +func (retrieved) privateRetrieved() {} + +// NewRetrieved returns a Retrieved configured with the provided options. +func NewRetrieved(getFunc GetFunc, options ...RetrievedOption) (Retrieved, error) { + if getFunc == nil { + return nil, errors.New("nil getFunc") + } + ret := &retrieved{ + GetFunc: getFunc, + } + for _, op := range options { + op(ret) + } + return ret, nil +} diff --git a/config/configmapprovider/retrieved_test.go b/config/configmapprovider/retrieved_test.go new file mode 100644 index 00000000000..fe770496ecc --- /dev/null +++ b/config/configmapprovider/retrieved_test.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package configmapprovider // import "go.opentelemetry.io/collector/config/configmapprovider" + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/config" +) + +func TestNewRetrieved_NilGetFunc(t *testing.T) { + _, err := NewRetrieved(nil) + assert.Error(t, err) +} + +func TestNewRetrieved_Default(t *testing.T) { + expectedCfg := config.NewMapFromStringMap(map[string]interface{}{"test": nil}) + expectedErr := errors.New("test") + ret, err := NewRetrieved(func(context.Context) (*config.Map, error) { return expectedCfg, expectedErr }) + require.NoError(t, err) + cfg, err := ret.Get(context.Background()) + assert.Equal(t, expectedCfg, cfg) + assert.Equal(t, expectedErr, err) + assert.NoError(t, ret.Close(context.Background())) + // Check that the private func even if called does not panic. + assert.NotPanics(t, func() { ret.privateRetrieved() }) +} + +func TestNewRetrieved_WithClose(t *testing.T) { + expectedCfg := config.NewMapFromStringMap(map[string]interface{}{"test": nil}) + expectedErr := errors.New("test") + expectedCloseErr := errors.New("test") + ret, err := NewRetrieved( + func(context.Context) (*config.Map, error) { return expectedCfg, expectedErr }, + WithClose(func(ctx context.Context) error { return expectedCloseErr })) + require.NoError(t, err) + cfg, err := ret.Get(context.Background()) + assert.Equal(t, expectedCfg, cfg) + assert.Equal(t, expectedErr, err) + assert.Equal(t, expectedCloseErr, ret.Close(context.Background())) +} diff --git a/config/configmapprovider/simple.go b/config/configmapprovider/simple.go deleted file mode 100644 index dda6c0a1de6..00000000000 --- a/config/configmapprovider/simple.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package configmapprovider // import "go.opentelemetry.io/collector/config/configmapprovider" - -import ( - "context" - - "go.opentelemetry.io/collector/config" -) - -type closeFunc func(ctx context.Context) error - -// TODO: This probably will make sense to be exported, but needs better name and documentation. -type simpleRetrieved struct { - closeFunc - confMap *config.Map -} - -func (sr *simpleRetrieved) Get(context.Context) (*config.Map, error) { - return sr.confMap, nil -} - -func (sr *simpleRetrieved) Close(ctx context.Context) error { - if sr.closeFunc == nil { - return nil - } - return sr.closeFunc(ctx) -} diff --git a/service/config_provider_test.go b/service/config_provider_test.go index 2582f96d009..1b2828b851c 100644 --- a/service/config_provider_test.go +++ b/service/config_provider_test.go @@ -57,6 +57,7 @@ func (ecu *errConfigUnmarshaler) Unmarshal(*config.Map, component.Factories) (*c } type errRetrieved struct { + configmapprovider.Retrieved retM *config.Map errW error errC error