Skip to content

Commit

Permalink
Add support in the confmap.Resolver to expand embedded config URIs in…
Browse files Browse the repository at this point in the history
…side configuration. (#4742)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Aug 4, 2022
1 parent 869dad4 commit 1c1a668
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ There isn't a valid core binary for this release. Use v0.57.2 instead.
- Use OpenCensus `metric` package for process metrics instead of `stats` package (#5486)
- Update OTLP to v0.18.0 (#5530)
- Log histogram min/max fields with `logging` exporter (#5520)
- Add support in the `confmap.Resolver` to expand embedded config URIs inside configuration (#4742)

### 🧰 Bug fixes 🧰

Expand Down
26 changes: 19 additions & 7 deletions confmap/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,39 +33,51 @@ The `Resolver` receives as input a set of `Providers`, a list of `Converters`, a
`configURI` that will be used to generate the resulting, or effective, configuration in the form of a `Conf`,
that can be used by code that is oblivious to the usage of `Providers` and `Converters`.

`Providers` are used to provide an entire configuration when the `configURI` is given directly to the `Resolver`,
or an individual value (partial configuration) when the `configURI` is embedded into the `Conf` as a values using
the syntax `${configURI}`.

```terminal
Resolver Provider
│ │
Resolve │ │
────────────────►│ │
│ │
┌─ │ Retrieve │
│ ├─────────────────────────►│
│ │
│ │ Conf
│ │◄─────────────────────────┤
foreach │ │ │
foreach │ │ │
configURI │ ├───┐ │
│ │ │Merge │
│ │◄──┘ │
└─ │ │
┌─ │ Retrieve │
│ ├─────────────────────────►│
│ │ Partial Conf Value │
│ │◄─────────────────────────┤
foreach │ │ │
embedded │ │ │
configURI │ ├───┐ │
│ │ │Replace │
│ │◄──┘ │
└─ │ │
│ Converter │
│ │ │
┌─ │ Convert │ │
│ ├───────────────►│ │
foreach │ │ │ │
Converter │ │◄───────────────┤ │
└─ │ │
│ │
◄────────────────┤ │
│ │
```

The `Resolve` method proceeds in the following steps:

1. Start with an empty "result" of `Conf` type.
2. For each config URI retrieves individual configurations, and merges it into the "result".
2. For each "Converter", call "Convert" for the "result".
4. Return the "result", aka effective, configuration.
3. For each embedded config URI retrieves individual value, and replaces it into the "result".
4. For each "Converter", call "Convert" for the "result".
5. Return the "result", aka effective, configuration.

### Watching for Updates
After the configuration was processed, the `Resolver` can be used as a single point to watch for updates in the
Expand Down
45 changes: 40 additions & 5 deletions confmap/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package confmap // import "go.opentelemetry.io/collector/confmap"

import (
"context"
"fmt"
)

// Provider is an interface that helps to retrieve a config map and watch for any
Expand Down Expand Up @@ -81,7 +82,7 @@ type ChangeEvent struct {

// Retrieved holds the result of a call to the Retrieve method of a Provider object.
type Retrieved struct {
conf *Conf
rawConf interface{}
closeFunc CloseFunc
}

Expand All @@ -101,17 +102,39 @@ func WithRetrievedClose(closeFunc CloseFunc) RetrievedOption {
}

// NewRetrieved returns a new Retrieved instance that contains the data from the raw deserialized config.
func NewRetrieved(rawConf map[string]interface{}, opts ...RetrievedOption) (Retrieved, error) {
// The rawConf can be one of the following types:
// - Primitives: int, int32, int64, float32, float64, bool, string;
// - []interface{};
// - map[string]interface{};
func NewRetrieved(rawConf interface{}, opts ...RetrievedOption) (Retrieved, error) {
if err := checkRawConfType(rawConf); err != nil {
return Retrieved{}, err
}
set := retrievedSettings{}
for _, opt := range opts {
opt(&set)
}
return Retrieved{conf: NewFromStringMap(rawConf), closeFunc: set.closeFunc}, nil
return Retrieved{rawConf: rawConf, closeFunc: set.closeFunc}, nil
}

// AsConf returns the retrieved configuration parsed as a Conf.
func (r Retrieved) AsConf() (*Conf, error) {
return r.conf, nil
func (r *Retrieved) AsConf() (*Conf, error) {
if r.rawConf == nil {
return New(), nil
}
val, ok := r.rawConf.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("retrieved value (type=%T) cannot be used as a Conf", r.rawConf)
}
return NewFromStringMap(val), nil
}

// AsRaw returns the retrieved configuration parsed as an interface{} which can be one of the following types:
// - Primitives: int, int32, int64, float32, float64, bool, string;
// - []interface{} - every member follows the same rules as the given interface{};
// - map[string]interface{} - every value follows the same rules as the given interface{};
func (r *Retrieved) AsRaw() (interface{}, error) {
return r.rawConf, nil
}

// Close and release any watchers that Provider.Retrieve may have created.
Expand All @@ -129,3 +152,15 @@ func (r Retrieved) Close(ctx context.Context) error {

// CloseFunc a function equivalent to Retrieved.Close.
type CloseFunc func(context.Context) error

func checkRawConfType(rawConf interface{}) error {
if rawConf == nil {
return nil
}
switch rawConf.(type) {
case int, int32, int64, float32, float64, bool, string, []interface{}, map[string]interface{}:
return nil
default:
return fmt.Errorf("unsupported type=%T for retrieved config", rawConf)
}
}
2 changes: 1 addition & 1 deletion confmap/provider/internal/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
// * yamlBytes the yaml bytes that will be deserialized.
// * opts specifies options associated with this Retrieved value, such as CloseFunc.
func NewRetrievedFromYAML(yamlBytes []byte, opts ...confmap.RetrievedOption) (confmap.Retrieved, error) {
var rawConf map[string]interface{}
var rawConf interface{}
if err := yaml.Unmarshal(yamlBytes, &rawConf); err != nil {
return confmap.Retrieved{}, err
}
Expand Down
9 changes: 6 additions & 3 deletions confmap/provider/internal/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ func TestNewRetrievedFromYAMLWithOptions(t *testing.T) {

func TestNewRetrievedFromYAMLInvalidYAMLBytes(t *testing.T) {
_, err := NewRetrievedFromYAML([]byte("[invalid:,"))
require.Error(t, err)
assert.Error(t, err)
}

func TestNewRetrievedFromYAMLInvalidAsMap(t *testing.T) {
_, err := NewRetrievedFromYAML([]byte("string"))
require.Error(t, err)
ret, err := NewRetrievedFromYAML([]byte("string"))
require.NoError(t, err)

_, err = ret.AsConf()
assert.Error(t, err)
}
110 changes: 98 additions & 12 deletions confmap/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Resolver struct {
sync.Mutex
closers []CloseFunc
watcher chan error

enableExpand bool
}

// ResolverSettings are the settings to configure the behavior of the Resolver.
Expand Down Expand Up @@ -115,28 +117,33 @@ func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) {
// For backwards compatibility:
// - empty url scheme means "file".
// - "^[A-z]:" also means "file"
scheme := "file"
if idx := strings.Index(uri, ":"); idx != -1 && !driverLetterRegexp.MatchString(uri) {
scheme = uri[:idx]
} else {
uri = scheme + ":" + uri
}
p, ok := mr.providers[scheme]
if !ok {
return nil, fmt.Errorf("scheme %q is not supported for uri %q", scheme, uri)
if driverLetterRegexp.MatchString(uri) {
uri = "file:" + uri
}
ret, err := p.Retrieve(ctx, uri, mr.onChange)
ret, err := mr.retrieveValue(ctx, location{uri: uri, defaultScheme: "file"})
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot retrieve the configuration: %w", err)
}
mr.closers = append(mr.closers, ret.Close)
retCfgMap, err := ret.AsConf()
if err != nil {
return nil, err
}
if err = retMap.Merge(retCfgMap); err != nil {
return nil, err
}
mr.closers = append(mr.closers, ret.Close)
}

if mr.enableExpand {
cfgMap := make(map[string]interface{})
for _, k := range retMap.AllKeys() {
val, err := mr.expandValueRecursively(ctx, retMap.Get(k))
if err != nil {
return nil, err
}
cfgMap[k] = val
}
retMap = NewFromStringMap(cfgMap)
}

// Apply the converters in the given order.
Expand Down Expand Up @@ -187,3 +194,82 @@ func (mr *Resolver) closeIfNeeded(ctx context.Context) error {
}
return err
}

func (mr *Resolver) expandValueRecursively(ctx context.Context, value interface{}) (interface{}, error) {
for i := 0; i < 100; i++ {
val, changed, err := mr.expandValue(ctx, value)
if err != nil {
return nil, err
}
if !changed {
return val, nil
}
value = val
}
return nil, errors.New("too many recursive expansions")
}

func (mr *Resolver) expandValue(ctx context.Context, value interface{}) (interface{}, bool, error) {
switch v := value.(type) {
case string:
// If it doesn't have the format "${scheme:opaque}" no need to expand.
if !strings.HasPrefix(v, "${") || !strings.HasSuffix(v, "}") {
return value, false, nil
}
uri := v[2 : len(v)-1]
// For backwards compatibility:
// - empty scheme means "env".
ret, err := mr.retrieveValue(ctx, location{uri: uri, defaultScheme: "env"})
if err != nil {
return nil, false, err
}
mr.closers = append(mr.closers, ret.Close)
val, err := ret.AsRaw()
return val, true, err
case []interface{}:
nslice := make([]interface{}, 0, len(v))
nchanged := false
for _, vint := range v {
val, changed, err := mr.expandValue(ctx, vint)
if err != nil {
return nil, false, err
}
nslice = append(nslice, val)
nchanged = nchanged || changed
}
return nslice, nchanged, nil
case map[string]interface{}:
nmap := map[string]interface{}{}
nchanged := false
for mk, mv := range v {
val, changed, err := mr.expandValue(ctx, mv)
if err != nil {
return nil, false, err
}
nmap[mk] = val
nchanged = nchanged || changed
}
return nmap, nchanged, nil
}
return value, false, nil
}

type location struct {
uri string
defaultScheme string
}

func (mr *Resolver) retrieveValue(ctx context.Context, l location) (Retrieved, error) {
uri := l.uri
scheme := l.defaultScheme
if idx := strings.Index(uri, ":"); idx != -1 {
scheme = uri[:idx]
} else {
uri = scheme + ":" + uri
}
p, ok := mr.providers[scheme]
if !ok {
return Retrieved{}, fmt.Errorf("scheme %q is not supported for uri %q", scheme, uri)
}
return p.Retrieve(ctx, uri, mr.onChange)
}
Loading

0 comments on commit 1c1a668

Please sign in to comment.