Skip to content

Commit

Permalink
Add support for concatenating multiple embedded uris, or uris with ot…
Browse files Browse the repository at this point in the history
…her string parts

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jan 30, 2023
1 parent a2f0153 commit 432fdf2
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 43 deletions.
11 changes: 11 additions & 0 deletions .chloggen/supportconcat.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: confmap

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support to resolve embedded uris inside a string, concatenate results.

# One or more tracking issues or pull requests related to the change
issues: [6932]
78 changes: 56 additions & 22 deletions confmap/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,21 @@ import (
"go.opentelemetry.io/collector/featuregate"
)

// schemePattern defines the regexp pattern for scheme names.
// Scheme name consist of a sequence of characters beginning with a letter and followed by any
// combination of letters, digits, plus ("+"), period ("."), or hyphen ("-").
const schemePattern = `[A-Za-z][A-Za-z0-9+.-]+`

var (
// follows drive-letter specification:
// https://datatracker.ietf.org/doc/html/draft-kerwin-file-scheme-07.html#section-2.2
driverLetterRegexp = regexp.MustCompile("^[A-z]:")

// Scheme name consist of a sequence of characters beginning with a letter and followed by any
// combination of letters, digits, plus ("+"), period ("."), or hyphen ("-").
// Need to match new line as well in the OpaqueValue, so setting the "s" flag. See https://pkg.go.dev/regexp/syntax.
locationRegexp = regexp.MustCompile(`(?s:^(?P<Scheme>[A-Za-z][A-Za-z0-9+.-]+):(?P<OpaqueValue>.*)$)`)
uriRegexp = regexp.MustCompile(`(?s:^(?P<Scheme>` + schemePattern + `):(?P<OpaqueValue>.*)$)`)

// embeddedURI matches "embedded" provider uris into a string value.
embeddedURI = regexp.MustCompile(`\${` + schemePattern + `:.*?}`)

errTooManyRecursiveExpansions = errors.New("too many recursive expansions")
)
Expand All @@ -43,7 +49,7 @@ var (
var expandEnabledGauge = featuregate.GlobalRegistry().MustRegister(
"confmap.expandEnabled",
featuregate.StageBeta,
featuregate.WithRegisterDescription("controls whether expending embedded external config providers URIs"))
featuregate.WithRegisterDescription("controls whether expanding embedded external config providers URIs"))

// Resolver resolves a configuration as a Conf.
type Resolver struct {
Expand Down Expand Up @@ -168,6 +174,7 @@ func (mr *Resolver) Resolve(ctx context.Context) (*Conf, error) {
}
retMap = NewFromStringMap(cfgMap)
}

// Apply the converters in the given order.
for _, confConv := range mr.converters {
if err := confConv.Convert(ctx, retMap); err != nil {
Expand Down Expand Up @@ -234,26 +241,36 @@ func (mr *Resolver) expandValueRecursively(ctx context.Context, value any) (any,
func (mr *Resolver) expandValue(ctx context.Context, value any) (any, bool, error) {
switch v := value.(type) {
case string:
// If it doesn't have the format "${scheme:opaque}" no need to expand.
if !strings.HasPrefix(v, "${") || !strings.HasSuffix(v, "}") || !strings.Contains(v, ":") {
return value, false, nil
}
lURI, err := newLocation(v[2 : len(v)-1])
if err != nil {
// Cannot return error, since a case like "${HOST}:${PORT}" is invalid location,
// but is supported in the legacy implementation.
// If no embedded "uris" no need to expand. embeddedURI regexp matches uriRegexp as well.
if !embeddedURI.MatchString(v) {
return value, false, nil
}
if strings.Contains(lURI.opaqueValue, "$") {
return nil, false, fmt.Errorf("the uri %q contains unsupported characters ('$')", lURI.asString())
}
ret, err := mr.retrieveValue(ctx, lURI)
if err != nil {
return nil, false, err

// If the value is a single URI, then the return value can be anything.
// This is the case `foo: ${file:some_extra_config.yml}`.
if embeddedURI.FindString(v) == v {
return mr.expandStringURI(ctx, v)
}
mr.closers = append(mr.closers, ret.Close)
val, err := ret.AsRaw()
return val, true, err

// If the URI is embedded into the string, return value must be a string, and we have to concatenate all strings.
var nerr error
var nchanged bool
nv := embeddedURI.ReplaceAllStringFunc(v, func(s string) string {
ret, changed, err := mr.expandStringURI(ctx, s)
nchanged = nchanged || changed
nerr = multierr.Append(nerr, err)
if err != nil {
return ""
}
switch val := ret.(type) {
case string:
return val
default:
nerr = multierr.Append(nerr, fmt.Errorf("expanding %v, expected string value type, got %T", s, val))
return v
}
})
return nv, nchanged, nerr
case []any:
nslice := make([]any, 0, len(v))
nchanged := false
Expand Down Expand Up @@ -282,6 +299,23 @@ func (mr *Resolver) expandValue(ctx context.Context, value any) (any, bool, erro
return value, false, nil
}

func (mr *Resolver) expandStringURI(ctx context.Context, uri string) (any, bool, error) {
lURI, err := newLocation(uri[2 : len(uri)-1])
if err != nil {
return nil, false, err
}
if strings.Contains(lURI.opaqueValue, "$") {
return nil, false, fmt.Errorf("the uri %q contains unsupported characters ('$')", lURI.asString())
}
ret, err := mr.retrieveValue(ctx, lURI)
if err != nil {
return nil, false, err
}
mr.closers = append(mr.closers, ret.Close)
val, err := ret.AsRaw()
return val, true, err
}

type location struct {
scheme string
opaqueValue string
Expand All @@ -292,7 +326,7 @@ func (c location) asString() string {
}

func newLocation(uri string) (location, error) {
submatches := locationRegexp.FindStringSubmatch(uri)
submatches := uriRegexp.FindStringSubmatch(uri)
if len(submatches) != 3 {
return location{}, fmt.Errorf("invalid uri: %q", uri)
}
Expand Down
155 changes: 134 additions & 21 deletions confmap/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,120 @@ func TestResolverExpandMapAndSliceValues(t *testing.T) {
assert.Equal(t, expectedMap, cfgMap.ToStringMap())
}

func TestResolverExpandStringValues(t *testing.T) {
tests := []struct {
name string
input string
output any
}{
{
name: "test_no_match_old",
input: "${HOST}:${PORT}",
output: "${HOST}:${PORT}",
},
{
name: "test_no_match_old_no_brackets",
input: "${HOST}:$PORT",
output: "${HOST}:$PORT",
},
{
name: "test_match_value",
input: "${env:COMPLEX_VALUE}",
output: []any{"localhost:3042"},
},
{
name: "test_match_embedded",
input: "${env:HOST}:3043",
output: "localhost:3043",
},
{
name: "test_match_embedded_multi",
input: "${env:HOST}:${env:PORT}",
output: "localhost:3044",
},
{
name: "test_match_embedded_concat",
input: "https://${env:HOST}:3045",
output: "https://localhost:3045",
},
{
name: "test_match_embedded_new_and_old",
input: "${env:HOST}:${PORT}",
output: "localhost:${PORT}",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
return NewRetrieved(map[string]any{tt.name: tt.input})
})

testProvider := newFakeProvider("env", func(_ context.Context, uri string, _ WatcherFunc) (*Retrieved, error) {
switch uri {
case "env:COMPLEX_VALUE":
return NewRetrieved([]any{"localhost:3042"})
case "env:HOST":
return NewRetrieved("localhost")
case "env:PORT":
return NewRetrieved("3044")

}
return nil, errors.New("impossible")
})

resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil})
require.NoError(t, err)

cfgMap, err := resolver.Resolve(context.Background())
require.NoError(t, err)
assert.Equal(t, map[string]any{tt.name: tt.output}, cfgMap.ToStringMap())
})
}
}

func TestResolverExpandReturnError(t *testing.T) {
tests := []struct {
name string
input any
}{
{
name: "string_value",
input: "${test:VALUE}",
},
{
name: "slice_value",
input: []any{"${test:VALUE}"},
},
{
name: "map_value",
input: map[string]any{"test": "${test:VALUE}"},
},
{
name: "string_embedded_value",
input: "https://${test:HOST}:3045",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
return NewRetrieved(map[string]any{tt.name: tt.input})
})

myErr := errors.New(tt.name)
testProvider := newFakeProvider("test", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
return nil, myErr
})

resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil})
require.NoError(t, err)

_, err = resolver.Resolve(context.Background())
assert.ErrorIs(t, err, myErr)
})
}
}
func TestResolverInfiniteExpand(t *testing.T) {
const receiverValue = "${test:VALUE}"
provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
Expand All @@ -440,71 +554,70 @@ func TestResolverInfiniteExpand(t *testing.T) {
assert.ErrorIs(t, err, errTooManyRecursiveExpansions)
}

func TestResolverExpandSliceValueError(t *testing.T) {
func TestResolverExpandInvalidScheme(t *testing.T) {
provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
return NewRetrieved(map[string]any{"test": []any{"${test:VALUE}"}})
return NewRetrieved(map[string]any{"test": "${g_c_s:VALUE}"})
})

testProvider := newFakeProvider("test", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
return NewRetrieved(errors.New("invalid value"))
testProvider := newFakeProvider("g_c_s", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
panic("must not be called")
})

resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil})
require.NoError(t, err)

_, err = resolver.Resolve(context.Background())
assert.EqualError(t, err, "unsupported type=*errors.errorString for retrieved config")
// TODO: Investigate how to return an error like `invalid uri: "g_c_s:VALUE"` and keep the legacy behavior
// for cases like "${HOST}:${PORT}"
assert.NoError(t, err)
}

func TestResolverExpandMapValueError(t *testing.T) {
func TestResolverExpandInvalidOpaqueValue(t *testing.T) {
provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
return NewRetrieved(map[string]any{"test": []any{map[string]any{"test": "${test:VALUE}"}}})
return NewRetrieved(map[string]any{"test": []any{map[string]any{"test": "${test:$VALUE}"}}})
})

testProvider := newFakeProvider("test", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
return NewRetrieved(errors.New("invalid value"))
panic("must not be called")
})

resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil})
require.NoError(t, err)

_, err = resolver.Resolve(context.Background())
assert.EqualError(t, err, "unsupported type=*errors.errorString for retrieved config")
assert.EqualError(t, err, `the uri "test:$VALUE" contains unsupported characters ('$')`)
}

func TestResolverExpandInvalidScheme(t *testing.T) {
const receiverValue = "${g_c_s:VALUE}"
func TestResolverExpandUnsupportedScheme(t *testing.T) {
provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
return NewRetrieved(map[string]any{"test": receiverValue})
return NewRetrieved(map[string]any{"test": "${unsupported:VALUE}"})
})

testProvider := newFakeProvider("g_c_s", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
return NewRetrieved(receiverValue)
testProvider := newFakeProvider("test", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
panic("must not be called")
})

resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil})
require.NoError(t, err)

_, err = resolver.Resolve(context.Background())
// TODO: Investigate how to return an error like `invalid uri: "g_c_s:VALUE"` and keep the legacy behavior
// for cases like "${HOST}:${PORT}"
assert.NoError(t, err)
assert.EqualError(t, err, `scheme "unsupported" is not supported for uri "unsupported:VALUE"`)
}

func TestResolverExpandInvalidOpaqueValue(t *testing.T) {
func TestResolverExpandStringValueInvalidReturnValue(t *testing.T) {
provider := newFakeProvider("input", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
return NewRetrieved(map[string]any{"test": []any{map[string]any{"test": "${test:$VALUE}"}}})
return NewRetrieved(map[string]any{"test": "localhost:${test:PORT}"})
})

testProvider := newFakeProvider("test", func(context.Context, string, WatcherFunc) (*Retrieved, error) {
return NewRetrieved(errors.New("invalid value"))
return NewRetrieved(3043)
})

resolver, err := NewResolver(ResolverSettings{URIs: []string{"input:"}, Providers: makeMapProvidersMap(provider, testProvider), Converters: nil})
require.NoError(t, err)

_, err = resolver.Resolve(context.Background())
assert.EqualError(t, err, `the uri "test:$VALUE" contains unsupported characters ('$')`)
assert.EqualError(t, err, `expanding ${test:PORT}, expected string value type, got int`)
}

func makeMapProvidersMap(providers ...Provider) map[string]Provider {
Expand Down

0 comments on commit 432fdf2

Please sign in to comment.