diff --git a/CHANGELOG.md b/CHANGELOG.md index dbfec61bcce..bcf657f9fab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -192,6 +192,7 @@ There isn't a valid core binary for this release. Use v0.57.2 instead. - Use OpenCensus `metric` package for process metrics instead of `stats` package (#5486) - Update OTLP to v0.18.0 (#5530) - Log histogram min/max fields with `logging` exporter (#5520) +- Add support in the `confmap.Resolver` to expand embedded config URIs inside configuration (#4742) ### 🧰 Bug fixes 🧰 diff --git a/confmap/README.md b/confmap/README.md index c6051429e90..db71340c2b1 100644 --- a/confmap/README.md +++ b/confmap/README.md @@ -33,23 +33,35 @@ The `Resolver` receives as input a set of `Providers`, a list of `Converters`, a `configURI` that will be used to generate the resulting, or effective, configuration in the form of a `Conf`, that can be used by code that is oblivious to the usage of `Providers` and `Converters`. +`Providers` are used to provide an entire configuration when the `configURI` is given directly to the `Resolver`, +or an individual value (partial configuration) when the `configURI` is embedded into the `Conf` as a values using +the syntax `${configURI}`. + ```terminal Resolver Provider - β”‚ β”‚ Resolve β”‚ β”‚ ────────────────►│ β”‚ β”‚ β”‚ β”Œβ”€ β”‚ Retrieve β”‚ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Ίβ”‚ - β”‚ β”‚ β”‚ + β”‚ β”‚ Conf β”‚ β”‚ │◄────────────────────────── - foreach β”‚ β”‚ β”‚ + foreach β”‚ β”‚ β”‚ configURI β”‚ β”œβ”€β”€β”€β” β”‚ β”‚ β”‚ β”‚Merge β”‚ β”‚ β”‚β—„β”€β”€β”˜ β”‚ + └─ β”‚ β”‚ + β”Œβ”€ β”‚ Retrieve β”‚ + β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Ίβ”‚ + β”‚ β”‚ Partial Conf Value β”‚ + β”‚ │◄────────────────────────── + foreach β”‚ β”‚ β”‚ + embedded β”‚ β”‚ β”‚ + configURI β”‚ β”œβ”€β”€β”€β” β”‚ + β”‚ β”‚ β”‚Replace β”‚ + β”‚ β”‚β—„β”€β”€β”˜ β”‚ └─ β”‚ β”‚ β”‚ Converter β”‚ - β”‚ β”‚ β”‚ β”Œβ”€ β”‚ Convert β”‚ β”‚ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Ίβ”‚ β”‚ foreach β”‚ β”‚ β”‚ β”‚ @@ -57,15 +69,15 @@ that can be used by code that is oblivious to the usage of `Providers` and `Conv └─ β”‚ β”‚ β”‚ β”‚ ◄───────────────── β”‚ - β”‚ β”‚ ``` The `Resolve` method proceeds in the following steps: 1. Start with an empty "result" of `Conf` type. 2. For each config URI retrieves individual configurations, and merges it into the "result". -2. For each "Converter", call "Convert" for the "result". -4. Return the "result", aka effective, configuration. +3. For each embedded config URI retrieves individual value, and replaces it into the "result". +4. For each "Converter", call "Convert" for the "result". +5. Return the "result", aka effective, configuration. ### Watching for Updates After the configuration was processed, the `Resolver` can be used as a single point to watch for updates in the diff --git a/confmap/provider.go b/confmap/provider.go index 2b2801cc7c8..9dfa2b4adb6 100644 --- a/confmap/provider.go +++ b/confmap/provider.go @@ -16,6 +16,7 @@ package confmap // import "go.opentelemetry.io/collector/confmap" import ( "context" + "fmt" ) // Provider is an interface that helps to retrieve a config map and watch for any @@ -81,7 +82,7 @@ type ChangeEvent struct { // Retrieved holds the result of a call to the Retrieve method of a Provider object. type Retrieved struct { - conf *Conf + rawConf interface{} closeFunc CloseFunc } @@ -101,17 +102,39 @@ func WithRetrievedClose(closeFunc CloseFunc) RetrievedOption { } // NewRetrieved returns a new Retrieved instance that contains the data from the raw deserialized config. -func NewRetrieved(rawConf map[string]interface{}, opts ...RetrievedOption) (Retrieved, error) { +// The rawConf can be one of the following types: +// - Primitives: int, int32, int64, float32, float64, bool, string; +// - []interface{}; +// - map[string]interface{}; +func NewRetrieved(rawConf interface{}, opts ...RetrievedOption) (Retrieved, error) { + if err := checkRawConfType(rawConf); err != nil { + return Retrieved{}, err + } set := retrievedSettings{} for _, opt := range opts { opt(&set) } - return Retrieved{conf: NewFromStringMap(rawConf), closeFunc: set.closeFunc}, nil + return Retrieved{rawConf: rawConf, closeFunc: set.closeFunc}, nil } // AsConf returns the retrieved configuration parsed as a Conf. -func (r Retrieved) AsConf() (*Conf, error) { - return r.conf, nil +func (r *Retrieved) AsConf() (*Conf, error) { + if r.rawConf == nil { + return New(), nil + } + val, ok := r.rawConf.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("retrieved value (type=%T) cannot be used as a Conf", r.rawConf) + } + return NewFromStringMap(val), nil +} + +// AsRaw returns the retrieved configuration parsed as an interface{} which can be one of the following types: +// - Primitives: int, int32, int64, float32, float64, bool, string; +// - []interface{} - every member follows the same rules as the given interface{}; +// - map[string]interface{} - every value follows the same rules as the given interface{}; +func (r *Retrieved) AsRaw() (interface{}, error) { + return r.rawConf, nil } // Close and release any watchers that Provider.Retrieve may have created. @@ -129,3 +152,15 @@ func (r Retrieved) Close(ctx context.Context) error { // CloseFunc a function equivalent to Retrieved.Close. type CloseFunc func(context.Context) error + +func checkRawConfType(rawConf interface{}) error { + if rawConf == nil { + return nil + } + switch rawConf.(type) { + case int, int32, int64, float32, float64, bool, string, []interface{}, map[string]interface{}: + return nil + default: + return fmt.Errorf("unsupported type=%T for retrieved config", rawConf) + } +} diff --git a/confmap/provider/internal/provider.go b/confmap/provider/internal/provider.go index e3f6776dbc5..a1f62c5a06e 100644 --- a/confmap/provider/internal/provider.go +++ b/confmap/provider/internal/provider.go @@ -24,7 +24,7 @@ import ( // * yamlBytes the yaml bytes that will be deserialized. // * opts specifies options associated with this Retrieved value, such as CloseFunc. func NewRetrievedFromYAML(yamlBytes []byte, opts ...confmap.RetrievedOption) (confmap.Retrieved, error) { - var rawConf map[string]interface{} + var rawConf interface{} if err := yaml.Unmarshal(yamlBytes, &rawConf); err != nil { return confmap.Retrieved{}, err } diff --git a/confmap/provider/internal/provider_test.go b/confmap/provider/internal/provider_test.go index 0bd0b74a35f..023fa3347fe 100644 --- a/confmap/provider/internal/provider_test.go +++ b/confmap/provider/internal/provider_test.go @@ -46,10 +46,13 @@ func TestNewRetrievedFromYAMLWithOptions(t *testing.T) { func TestNewRetrievedFromYAMLInvalidYAMLBytes(t *testing.T) { _, err := NewRetrievedFromYAML([]byte("[invalid:,")) - require.Error(t, err) + assert.Error(t, err) } func TestNewRetrievedFromYAMLInvalidAsMap(t *testing.T) { - _, err := NewRetrievedFromYAML([]byte("string")) - require.Error(t, err) + ret, err := NewRetrievedFromYAML([]byte("string")) + require.NoError(t, err) + + _, err = ret.AsConf() + assert.Error(t, err) } diff --git a/confmap/resolver.go b/confmap/resolver.go index b97bcef4827..a36fae6652c 100644 --- a/confmap/resolver.go +++ b/confmap/resolver.go @@ -38,6 +38,8 @@ type Resolver struct { sync.Mutex closers []CloseFunc watcher chan error + + enableExpand bool } // ResolverSettings are the settings to configure the behavior of the Resolver. @@ -115,20 +117,14 @@ func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) { // For backwards compatibility: // - empty url scheme means "file". // - "^[A-z]:" also means "file" - scheme := "file" - if idx := strings.Index(uri, ":"); idx != -1 && !driverLetterRegexp.MatchString(uri) { - scheme = uri[:idx] - } else { - uri = scheme + ":" + uri - } - p, ok := mr.providers[scheme] - if !ok { - return nil, fmt.Errorf("scheme %q is not supported for uri %q", scheme, uri) + if driverLetterRegexp.MatchString(uri) { + uri = "file:" + uri } - ret, err := p.Retrieve(ctx, uri, mr.onChange) + ret, err := mr.retrieveValue(ctx, location{uri: uri, defaultScheme: "file"}) if err != nil { - return nil, err + return nil, fmt.Errorf("cannot retrieve the configuration: %w", err) } + mr.closers = append(mr.closers, ret.Close) retCfgMap, err := ret.AsConf() if err != nil { return nil, err @@ -136,7 +132,18 @@ func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) { if err = retMap.Merge(retCfgMap); err != nil { return nil, err } - mr.closers = append(mr.closers, ret.Close) + } + + if mr.enableExpand { + cfgMap := make(map[string]interface{}) + for _, k := range retMap.AllKeys() { + val, err := mr.expandValueRecursively(ctx, retMap.Get(k)) + if err != nil { + return nil, err + } + cfgMap[k] = val + } + retMap = NewFromStringMap(cfgMap) } // Apply the converters in the given order. @@ -187,3 +194,82 @@ func (mr *Resolver) closeIfNeeded(ctx context.Context) error { } return err } + +func (mr *Resolver) expandValueRecursively(ctx context.Context, value interface{}) (interface{}, error) { + for i := 0; i < 100; i++ { + val, changed, err := mr.expandValue(ctx, value) + if err != nil { + return nil, err + } + if !changed { + return val, nil + } + value = val + } + return nil, errors.New("too many recursive expansions") +} + +func (mr *Resolver) expandValue(ctx context.Context, value interface{}) (interface{}, bool, error) { + switch v := value.(type) { + case string: + // If it doesn't have the format "${scheme:opaque}" no need to expand. + if !strings.HasPrefix(v, "${") || !strings.HasSuffix(v, "}") { + return value, false, nil + } + uri := v[2 : len(v)-1] + // For backwards compatibility: + // - empty scheme means "env". + ret, err := mr.retrieveValue(ctx, location{uri: uri, defaultScheme: "env"}) + if err != nil { + return nil, false, err + } + mr.closers = append(mr.closers, ret.Close) + val, err := ret.AsRaw() + return val, true, err + case []interface{}: + nslice := make([]interface{}, 0, len(v)) + nchanged := false + for _, vint := range v { + val, changed, err := mr.expandValue(ctx, vint) + if err != nil { + return nil, false, err + } + nslice = append(nslice, val) + nchanged = nchanged || changed + } + return nslice, nchanged, nil + case map[string]interface{}: + nmap := map[string]interface{}{} + nchanged := false + for mk, mv := range v { + val, changed, err := mr.expandValue(ctx, mv) + if err != nil { + return nil, false, err + } + nmap[mk] = val + nchanged = nchanged || changed + } + return nmap, nchanged, nil + } + return value, false, nil +} + +type location struct { + uri string + defaultScheme string +} + +func (mr *Resolver) retrieveValue(ctx context.Context, l location) (Retrieved, error) { + uri := l.uri + scheme := l.defaultScheme + if idx := strings.Index(uri, ":"); idx != -1 { + scheme = uri[:idx] + } else { + uri = scheme + ":" + uri + } + p, ok := mr.providers[scheme] + if !ok { + return Retrieved{}, fmt.Errorf("scheme %q is not supported for uri %q", scheme, uri) + } + return p.Retrieve(ctx, uri, mr.onChange) +} diff --git a/confmap/resolver_test.go b/confmap/resolver_test.go index 6d01ee12547..88663193c9f 100644 --- a/confmap/resolver_test.go +++ b/confmap/resolver_test.go @@ -27,7 +27,7 @@ import ( type mockProvider struct { scheme string - retM map[string]interface{} + retM interface{} errR error errS error errW error @@ -41,9 +41,8 @@ func (m *mockProvider) Retrieve(_ context.Context, _ string, watcher WatcherFunc if m.retM == nil { return NewRetrieved(nil) } - if watcher != nil { - watcher(&ChangeEvent{Error: m.errW}) - } + + watcher(&ChangeEvent{Error: m.errW}) return NewRetrieved(m.retM, WithRetrievedClose(func(ctx context.Context) error { return m.errC })) } @@ -122,6 +121,15 @@ func TestResolverErrors(t *testing.T) { }, expectResolveErr: true, }, + { + name: "retrieve location not convertable to Conf", + locations: []string{"mock:", "err:"}, + providers: []Provider{ + &mockProvider{}, + &mockProvider{scheme: "err", retM: "invalid value"}, + }, + expectResolveErr: true, + }, { name: "converter error", locations: []string{"mock:"}, @@ -306,6 +314,123 @@ func TestResolverShutdownClosesWatch(t *testing.T) { watcherWG.Wait() } +func TestResolverExpandEnvVars(t *testing.T) { + var testCases = []struct { + name string // test case name (also file name containing config yaml) + }{ + {name: "expand-with-no-env.yaml"}, + {name: "expand-with-partial-env.yaml"}, + {name: "expand-with-all-env.yaml"}, + {name: "expand-with-all-env-with-source.yaml"}, + } + + envs := map[string]string{ + "EXTRA": "some string", + "EXTRA_MAP_VALUE_1": "some map value_1", + "EXTRA_MAP_VALUE_2": "some map value_2", + "EXTRA_LIST_MAP_VALUE_1": "some list map value_1", + "EXTRA_LIST_MAP_VALUE_2": "some list map value_2", + "EXTRA_LIST_VALUE_1": "some list value_1", + "EXTRA_LIST_VALUE_2": "some list value_2", + } + + expectedCfgMap := newConfFromFile(t, filepath.Join("testdata", "expand-with-no-env.yaml")) + fileProvider := newFakeProvider("file", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(newConfFromFile(t, uri[5:])) + }) + envProvider := newFakeProvider("env", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(envs[uri[4:]]) + }) + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + resolver, err := NewResolver(ResolverSettings{URIs: []string{filepath.Join("testdata", test.name)}, Providers: makeMapProvidersMap(fileProvider, envProvider), Converters: nil}) + require.NoError(t, err) + resolver.enableExpand = true + // Test that expanded configs are the same with the simple config with no env vars. + cfgMap, err := resolver.Resolve(context.Background()) + require.NoError(t, err) + assert.Equal(t, expectedCfgMap, cfgMap.ToStringMap()) + }) + } +} + +func TestResolverExpandMapAndSliceValues(t *testing.T) { + provider := newFakeProvider("input", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(map[string]interface{}{ + "test_map": map[string]interface{}{"recv": "${test:MAP_VALUE}"}, + "test_slice": []interface{}{"${test:MAP_VALUE}"}}) + }) + + const receiverExtraMapValue = "some map value" + testProvider := newFakeProvider("test", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(receiverExtraMapValue) + }) + + resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) + require.NoError(t, err) + resolver.enableExpand = true + + cfgMap, err := resolver.Resolve(context.Background()) + require.NoError(t, err) + expectedMap := map[string]interface{}{ + "test_map": map[string]interface{}{"recv": receiverExtraMapValue}, + "test_slice": []interface{}{receiverExtraMapValue}} + assert.Equal(t, expectedMap, cfgMap.ToStringMap()) +} + +func TestResolverInfiniteExpand(t *testing.T) { + const receiverValue = "${test:VALUE}" + provider := newFakeProvider("input", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(map[string]interface{}{"test": receiverValue}) + }) + + testProvider := newFakeProvider("test", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(receiverValue) + }) + + resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) + require.NoError(t, err) + resolver.enableExpand = true + + _, err = resolver.Resolve(context.Background()) + assert.Error(t, err) +} + +func TestResolverExpandSliceValueError(t *testing.T) { + provider := newFakeProvider("input", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(map[string]interface{}{"test": []interface{}{"${test:VALUE}"}}) + }) + + testProvider := newFakeProvider("test", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(errors.New("invalid value")) + }) + + resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) + require.NoError(t, err) + resolver.enableExpand = true + + _, err = resolver.Resolve(context.Background()) + assert.Error(t, err) +} + +func TestResolverExpandMapValueError(t *testing.T) { + provider := newFakeProvider("input", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(map[string]interface{}{"test": []interface{}{map[string]interface{}{"test": "${test:VALUE}"}}}) + }) + + testProvider := newFakeProvider("test", func(_ context.Context, uri string, _ WatcherFunc) (Retrieved, error) { + return NewRetrieved(errors.New("invalid value")) + }) + + resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil}) + require.NoError(t, err) + resolver.enableExpand = true + + _, err = resolver.Resolve(context.Background()) + assert.Error(t, err) +} + func makeMapProvidersMap(providers ...Provider) map[string]Provider { ret := make(map[string]Provider, len(providers)) for _, provider := range providers { diff --git a/confmap/testdata/expand-with-all-env-with-source.yaml b/confmap/testdata/expand-with-all-env-with-source.yaml new file mode 100644 index 00000000000..5386e05158a --- /dev/null +++ b/confmap/testdata/expand-with-all-env-with-source.yaml @@ -0,0 +1,11 @@ +test_map: + extra: "${env:EXTRA}" + extra_map: + recv.1: "${env:EXTRA_MAP_VALUE_1}" + recv.2: "${env:EXTRA_MAP_VALUE_2}" + extra_list_map: + - { recv.1: "${env:EXTRA_LIST_MAP_VALUE_1}",recv.2: "${env:EXTRA_LIST_MAP_VALUE_2}" } + extra_list: + - "${env:EXTRA_LIST_VALUE_1}" + - "${env:EXTRA_LIST_VALUE_2}" + diff --git a/confmap/testdata/expand-with-all-env.yaml b/confmap/testdata/expand-with-all-env.yaml new file mode 100644 index 00000000000..ed623bf9a57 --- /dev/null +++ b/confmap/testdata/expand-with-all-env.yaml @@ -0,0 +1,11 @@ +test_map: + extra: "${EXTRA}" + extra_map: + recv.1: "${EXTRA_MAP_VALUE_1}" + recv.2: "${EXTRA_MAP_VALUE_2}" + extra_list_map: + - { recv.1: "${EXTRA_LIST_MAP_VALUE_1}",recv.2: "${EXTRA_LIST_MAP_VALUE_2}" } + extra_list: + - "${EXTRA_LIST_VALUE_1}" + - "${EXTRA_LIST_VALUE_2}" + diff --git a/confmap/testdata/expand-with-no-env.yaml b/confmap/testdata/expand-with-no-env.yaml new file mode 100644 index 00000000000..fd4dd08210d --- /dev/null +++ b/confmap/testdata/expand-with-no-env.yaml @@ -0,0 +1,10 @@ +test_map: + extra: "some string" + extra_map: + recv.1: "some map value_1" + recv.2: "some map value_2" + extra_list_map: + - { recv.1: "some list map value_1",recv.2: "some list map value_2" } + extra_list: + - "some list value_1" + - "some list value_2" diff --git a/confmap/testdata/expand-with-partial-env.yaml b/confmap/testdata/expand-with-partial-env.yaml new file mode 100644 index 00000000000..fb8ffe51b8a --- /dev/null +++ b/confmap/testdata/expand-with-partial-env.yaml @@ -0,0 +1,10 @@ +test_map: + extra: "${EXTRA}" + extra_map: + recv.1: "${EXTRA_MAP_VALUE_1}" + recv.2: "some map value_2" + extra_list_map: + - { recv.1: "some list map value_1",recv.2: "${EXTRA_LIST_MAP_VALUE_2}" } + extra_list: + - "some list value_1" + - "${EXTRA_LIST_VALUE_2}"