Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement k8s secrets provider for Agent #24789

Merged
merged 25 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewController(
caps capabilities.Capability,
reloadables ...reloadable,
) *Controller {
init, _ := transpiler.NewVars(map[string]interface{}{})
init, _ := transpiler.NewVars(map[string]interface{}{}, nil)

return &Controller{
logger: log,
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetes"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetes_secrets"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/localdynamic"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/path"
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/transpiler/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1616,7 +1616,7 @@ func TestHash(t *testing.T) {
}

func mustMakeVars(mapping map[string]interface{}) *Vars {
v, err := NewVars(mapping)
v, err := NewVars(mapping, nil)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/transpiler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ func TestRenderInputs(t *testing.T) {
}

func mustMakeVarsP(mapping map[string]interface{}, processorKey string, processors Processors) *Vars {
v, err := NewVarsWithProcessors(mapping, processorKey, processors)
v, err := NewVarsWithProcessors(mapping, processorKey, processors, nil)
if err != nil {
panic(err)
}
Expand Down
30 changes: 22 additions & 8 deletions x-pack/elastic-agent/pkg/agent/transpiler/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"regexp"
"strings"
"unicode"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
)

var varsRegex = regexp.MustCompile(`\${([\p{L}\d\s\\\-_|.'"]*)}`)
Expand All @@ -18,23 +20,24 @@ var ErrNoMatch = fmt.Errorf("no matching vars")

// Vars is a context of variables that also contain a list of processors that go with the mapping.
type Vars struct {
tree *AST
processorsKey string
processors Processors
tree *AST
processorsKey string
processors Processors
fetchContextProviders map[string]composable.ContextProvider
}

// NewVars returns a new instance of vars.
func NewVars(mapping map[string]interface{}) (*Vars, error) {
return NewVarsWithProcessors(mapping, "", nil)
func NewVars(mapping map[string]interface{}, fetchContextProviders map[string]composable.ContextProvider) (*Vars, error) {
return NewVarsWithProcessors(mapping, "", nil, fetchContextProviders)
}

// NewVarsWithProcessors returns a new instance of vars with attachment of processors.
func NewVarsWithProcessors(mapping map[string]interface{}, processorKey string, processors Processors) (*Vars, error) {
func NewVarsWithProcessors(mapping map[string]interface{}, processorKey string, processors Processors, fetchContextProviders map[string]composable.ContextProvider) (*Vars, error) {
tree, err := NewAST(mapping)
if err != nil {
return nil, err
}
return &Vars{tree, processorKey, processors}, nil
return &Vars{tree, processorKey, processors, fetchContextProviders}, nil
}

// Replace returns a new value based on variable replacement.
Expand All @@ -44,7 +47,6 @@ func (v *Vars) Replace(value string) (Node, error) {
if !validBrackets(value, matchIdxs) {
return nil, fmt.Errorf("starting ${ is missing ending }")
}

result := ""
lastIndex := 0
for _, r := range matchIdxs {
Expand All @@ -55,6 +57,18 @@ func (v *Vars) Replace(value string) (Node, error) {
}
set := false
for _, val := range vars {
for name, provider := range v.fetchContextProviders {
if varPrefixMatched(val.Value(), name) {
fetchProvider := provider.(composable.FetchContextProvider)
fval, _ := fetchProvider.Fetch(val.Value())
result += value[lastIndex:r[0]] + fval
set = true
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
if set {
continue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that getting a value from a fetch context provider returns something other than a string?

Below you can see that it will replace full objects if its a varString. I worry based on placing this before the switch val.(type) we are missing that as a possibility with a fetch context provider.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, however no strong opinion here since I'm not super familiar with the combinations that could occur. Do you think that removing continue and being able to get to the switch block after the Fetch() action would be sth that we want?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I took a closer look at this file. This is definitely in the wrong place. At its current location it would try to find a constant string in a fetch context provider, we do not want that.

Looking at it, this needs to be removed from here and moved into the:

func (v *Vars) Lookup(name string) (interface{}, bool) {
	return v.tree.Lookup(name)
}

Leaving this function untouched.

Copy link
Member Author

@ChrsMark ChrsMark Mar 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blakerouse I tried to move (see f023688) the code inside func (v *Vars) Lookup(name string) (interface{}, bool) method of the same file but it does not seem to work (using same configuration so far) and I'm not sure that I can follow the flow here. I see that the method is called at

val, ok := v.vars.Lookup(name)
and when debugging the flow with additional debug messages I cannot find anything useful so far. Could you please provide more content here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see the issue. You need to do the following:

Add the following function to Vars:

// lookupNode performs a lookup on the AST, but keeps the result as a `Node`.
//
// This is different from `Lookup` which returns the actual type, not the AST type.
func (v *Vars) lookupNode(name string) (Node, bool) {
	// check if the value can be retrieved from a FetchContextProvider
	for providerName, provider := range v.fetchContextProviders {
		if varPrefixMatched(name, providerName) {
			fetchProvider := provider.(composable.FetchContextProvider)
			fval, found := fetchProvider.Fetch(name)
			if found {
				return &StrVal{value: fval}, true
			} else {
				return &StrVal{value: ""}, false
			}
		}
	}
	// lookup in the AST tree
	return Lookup(v.tree, name)
}

Then change node, ok := Lookup(v.tree, val.Value()) to node, ok := v.lookupNode(val.Value()).

See if that works.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼 This fixes the issue, thank you!

switch val.(type) {
case *constString:
result += value[lastIndex:r[0]] + val.Value()
Expand Down
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/transpiler/vars_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ func TestVars_ReplaceWithProcessors(t *testing.T) {
},
},
"dynamic",
processers)
processers,
nil)
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)

res, err := vars.Replace("${testing.key1}")
Expand Down
18 changes: 2 additions & 16 deletions x-pack/elastic-agent/pkg/composable/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,16 @@
package composable

import (
"context"
"fmt"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

// ContextProviderComm is the interface that a context provider uses to communicate back to Elastic Agent.
type ContextProviderComm interface {
context.Context

// Set sets the current mapping for this context.
Set(map[string]interface{}) error
}

// ContextProvider is the interface that a context provider must implement.
type ContextProvider interface {
// Run runs the context provider.
Run(ContextProviderComm) error
}

// ContextProviderBuilder creates a new context provider based on the given config and returns it.
type ContextProviderBuilder func(log *logger.Logger, config *config.Config) (ContextProvider, error)
type ContextProviderBuilder func(log *logger.Logger, config *config.Config) (corecomp.ContextProvider, error)

// AddContextProvider adds a new ContextProviderBuilder
func (r *providerRegistry) AddContextProvider(name string, builder ContextProviderBuilder) error {
Expand Down
21 changes: 16 additions & 5 deletions x-pack/elastic-agent/pkg/composable/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

var FetchContextProviders = map[string]string{
"kubernetes_secrets": "",
}
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved

// VarsCallback is callback called when the current vars state changes.
type VarsCallback func([]*transpiler.Vars)

Expand Down Expand Up @@ -98,6 +103,8 @@ func (c *controller) Run(ctx context.Context, cb VarsCallback) error {
notify := make(chan bool, 5000)
localCtx, cancel := context.WithCancel(ctx)

fetchContextProviders := make(map[string]corecomp.ContextProvider, len(FetchContextProviders))

// run all the enabled context providers
for name, state := range c.contextProviders {
state.Context = localCtx
Expand All @@ -107,6 +114,10 @@ func (c *controller) Run(ctx context.Context, cb VarsCallback) error {
cancel()
return errors.New(err, fmt.Sprintf("failed to run provider '%s'", name), errors.TypeConfig, errors.M("provider", name))
}
// manually register providers that want to pass to the Vars as "fetch" providers
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := FetchContextProviders[name]; ok {
fetchContextProviders[name] = state.provider
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
}
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
}

// run all the enabled dynamic providers
Expand Down Expand Up @@ -151,15 +162,15 @@ func (c *controller) Run(ctx context.Context, cb VarsCallback) error {
mapping[name] = state.Current()
}
// this is ensured not to error, by how the mappings states are verified
vars[0], _ = transpiler.NewVars(mapping)
vars[0], _ = transpiler.NewVars(mapping, fetchContextProviders)

// add to the vars list for each dynamic providers mappings
for name, state := range c.dynamicProviders {
for _, mappings := range state.Mappings() {
local, _ := cloneMap(mapping) // will not fail; already been successfully cloned once
local[name] = mappings.mapping
// this is ensured not to error, by how the mappings states are verified
v, _ := transpiler.NewVarsWithProcessors(local, name, mappings.processors)
v, _ := transpiler.NewVarsWithProcessors(local, name, mappings.processors, fetchContextProviders)
vars = append(vars, v)
}
}
Expand All @@ -175,7 +186,7 @@ func (c *controller) Run(ctx context.Context, cb VarsCallback) error {
type contextProviderState struct {
context.Context

provider ContextProvider
provider corecomp.ContextProvider
lock sync.RWMutex
mapping map[string]interface{}
signal chan bool
Expand All @@ -189,7 +200,7 @@ func (c *contextProviderState) Set(mapping map[string]interface{}) error {
return err
}
// ensure creating vars will not error
_, err = transpiler.NewVars(mapping)
_, err = transpiler.NewVars(mapping, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -244,7 +255,7 @@ func (c *dynamicProviderState) AddOrUpdate(id string, priority int, mapping map[
return err
}
// ensure creating vars will not error
_, err = transpiler.NewVars(mapping)
_, err = transpiler.NewVars(mapping, nil)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)
Expand All @@ -20,7 +21,7 @@ func init() {
type contextProvider struct{}

// Run runs the Agent context provider.
func (*contextProvider) Run(comm composable.ContextProviderComm) error {
func (*contextProvider) Run(comm corecomp.ContextProviderComm) error {
a, err := info.NewAgentInfo()
if err != nil {
return err
Expand All @@ -41,6 +42,6 @@ func (*contextProvider) Run(comm composable.ContextProviderComm) error {
}

// ContextProviderBuilder builds the context provider.
func ContextProviderBuilder(_ *logger.Logger, _ *config.Config) (composable.ContextProvider, error) {
func ContextProviderBuilder(_ *logger.Logger, _ *config.Config) (corecomp.ContextProvider, error) {
return &contextProvider{}, nil
}
5 changes: 3 additions & 2 deletions x-pack/elastic-agent/pkg/composable/providers/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

Expand All @@ -21,7 +22,7 @@ func init() {
type contextProvider struct{}

// Run runs the environment context provider.
func (*contextProvider) Run(comm composable.ContextProviderComm) error {
func (*contextProvider) Run(comm corecomp.ContextProviderComm) error {
err := comm.Set(getEnvMapping())
if err != nil {
return errors.New(err, "failed to set mapping", errors.TypeUnexpected)
Expand All @@ -30,7 +31,7 @@ func (*contextProvider) Run(comm composable.ContextProviderComm) error {
}

// ContextProviderBuilder builds the context provider.
func ContextProviderBuilder(_ *logger.Logger, _ *config.Config) (composable.ContextProvider, error) {
func ContextProviderBuilder(_ *logger.Logger, _ *config.Config) (corecomp.ContextProvider, error) {
return &contextProvider{}, nil
}

Expand Down
5 changes: 3 additions & 2 deletions x-pack/elastic-agent/pkg/composable/providers/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

Expand All @@ -38,7 +39,7 @@ type contextProvider struct {
}

// Run runs the environment context provider.
func (c *contextProvider) Run(comm composable.ContextProviderComm) error {
func (c *contextProvider) Run(comm corecomp.ContextProviderComm) error {
current, err := c.fetcher()
if err != nil {
return err
Expand Down Expand Up @@ -79,7 +80,7 @@ func (c *contextProvider) Run(comm composable.ContextProviderComm) error {
}

// ContextProviderBuilder builds the context provider.
func ContextProviderBuilder(log *logger.Logger, c *config.Config) (composable.ContextProvider, error) {
func ContextProviderBuilder(log *logger.Logger, c *config.Config) (corecomp.ContextProvider, error) {
p := &contextProvider{
logger: log,
fetcher: getHostInfo,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// TODO review the need for this
// +build linux darwin windows

package kubernetes_secrets

// Config for kubernetes provider
type Config struct {
KubeConfig string `config:"kube_config"`
}
Loading