Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement k8s secrets provider for Agent #24789

Merged
merged 25 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,5 @@
- Add TLS support for Fleet Server {pull}24142[24142]
- Add support for Fleet Server running under Elastic Agent {pull}24220[24220]
- Add CA support to Elastic Agent docker image {pull}24486[24486]
- Add k8s secrets provider for Agent {pull}24789[24789]
- Add STATE_PATH, CONFIG_PATH, LOGS_PATH to Elastic Agent docker image {pull}24817[24817]
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewController(
caps capabilities.Capability,
reloadables ...reloadable,
) *Controller {
init, _ := transpiler.NewVars(map[string]interface{}{})
init, _ := transpiler.NewVars(map[string]interface{}{}, nil)

return &Controller{
logger: log,
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/cmd/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetes"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/kubernetessecrets"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/localdynamic"
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/path"
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/transpiler/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1616,7 +1616,7 @@ func TestHash(t *testing.T) {
}

func mustMakeVars(mapping map[string]interface{}) *Vars {
v, err := NewVars(mapping)
v, err := NewVars(mapping, nil)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/transpiler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ func TestRenderInputs(t *testing.T) {
}

func mustMakeVarsP(mapping map[string]interface{}, processorKey string, processors Processors) *Vars {
v, err := NewVarsWithProcessors(mapping, processorKey, processors)
v, err := NewVarsWithProcessors(mapping, processorKey, processors, nil)
if err != nil {
panic(err)
}
Expand Down
42 changes: 33 additions & 9 deletions x-pack/elastic-agent/pkg/agent/transpiler/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"regexp"
"strings"
"unicode"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"

"github.com/elastic/beats/v7/libbeat/common"
)

var varsRegex = regexp.MustCompile(`\${([\p{L}\d\s\\\-_|.'"]*)}`)
Expand All @@ -18,23 +22,24 @@ var ErrNoMatch = fmt.Errorf("no matching vars")

// Vars is a context of variables that also contain a list of processors that go with the mapping.
type Vars struct {
tree *AST
processorsKey string
processors Processors
tree *AST
processorsKey string
processors Processors
fetchContextProviders common.MapStr
}

// NewVars returns a new instance of vars.
func NewVars(mapping map[string]interface{}) (*Vars, error) {
return NewVarsWithProcessors(mapping, "", nil)
func NewVars(mapping map[string]interface{}, fetchContextProviders common.MapStr) (*Vars, error) {
return NewVarsWithProcessors(mapping, "", nil, fetchContextProviders)
}

// NewVarsWithProcessors returns a new instance of vars with attachment of processors.
func NewVarsWithProcessors(mapping map[string]interface{}, processorKey string, processors Processors) (*Vars, error) {
func NewVarsWithProcessors(mapping map[string]interface{}, processorKey string, processors Processors, fetchContextProviders common.MapStr) (*Vars, error) {
tree, err := NewAST(mapping)
if err != nil {
return nil, err
}
return &Vars{tree, processorKey, processors}, nil
return &Vars{tree, processorKey, processors, fetchContextProviders}, nil
}

// Replace returns a new value based on variable replacement.
Expand All @@ -44,7 +49,6 @@ func (v *Vars) Replace(value string) (Node, error) {
if !validBrackets(value, matchIdxs) {
return nil, fmt.Errorf("starting ${ is missing ending }")
}

result := ""
lastIndex := 0
for _, r := range matchIdxs {
Expand All @@ -60,7 +64,7 @@ func (v *Vars) Replace(value string) (Node, error) {
result += value[lastIndex:r[0]] + val.Value()
set = true
case *varString:
node, ok := Lookup(v.tree, val.Value())
node, ok := v.lookupNode(val.Value())
if ok {
node := nodeToValue(node)
if v.processorsKey != "" && varPrefixMatched(val.Value(), v.processorsKey) {
Expand Down Expand Up @@ -90,9 +94,29 @@ func (v *Vars) Replace(value string) (Node, error) {

// Lookup returns the value from the vars.
func (v *Vars) Lookup(name string) (interface{}, bool) {
// lookup in the AST tree
return v.tree.Lookup(name)
}

// lookupNode performs a lookup on the AST, but keeps the result as a `Node`.
//
// This is different from `Lookup` which returns the actual type, not the AST type.
func (v *Vars) lookupNode(name string) (Node, bool) {
// check if the value can be retrieved from a FetchContextProvider
for providerName, provider := range v.fetchContextProviders {
if varPrefixMatched(name, providerName) {
fetchProvider := provider.(composable.FetchContextProvider)
fval, found := fetchProvider.Fetch(name)
if found {
return &StrVal{value: fval}, true
}
return &StrVal{value: ""}, false
}
}
// lookup in the AST tree
return Lookup(v.tree, name)
}

// nodeToValue ensures that the node is an actual value.
func nodeToValue(node Node) Node {
switch n := node.(type) {
Expand Down
92 changes: 91 additions & 1 deletion x-pack/elastic-agent/pkg/agent/transpiler/vars_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
)

func TestVars_Replace(t *testing.T) {
Expand Down Expand Up @@ -215,7 +218,8 @@ func TestVars_ReplaceWithProcessors(t *testing.T) {
},
},
"dynamic",
processers)
processers,
nil)
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)

res, err := vars.Replace("${testing.key1}")
Expand Down Expand Up @@ -246,3 +250,89 @@ func TestVars_ReplaceWithProcessors(t *testing.T) {
NewKey("key2", NewStrVal("value2")),
}, processers), res)
}

func TestVars_ReplaceWithFetchContextProvider(t *testing.T) {
processers := Processors{
{
"add_fields": map[string]interface{}{
"dynamic": "added",
},
},
}

mockFetchProvider, err := MockContextProviderBuilder()
require.NoError(t, err)

fetchContextProviders := common.MapStr{
"kubernetes_secrets": mockFetchProvider,
}
vars, err := NewVarsWithProcessors(
map[string]interface{}{
"testing": map[string]interface{}{
"key1": "data1",
},
"dynamic": map[string]interface{}{
"key1": "dynamic1",
"list": []string{
"array1",
"array2",
},
"dict": map[string]string{
"key1": "value1",
"key2": "value2",
},
},
},
"dynamic",
processers,
fetchContextProviders)
require.NoError(t, err)

res, err := vars.Replace("${testing.key1}")
require.NoError(t, err)
assert.Equal(t, NewStrVal("data1"), res)

res, err = vars.Replace("${dynamic.key1}")
require.NoError(t, err)
assert.Equal(t, NewStrValWithProcessors("dynamic1", processers), res)

res, err = vars.Replace("${other.key1|dynamic.key1}")
require.NoError(t, err)
assert.Equal(t, NewStrValWithProcessors("dynamic1", processers), res)

res, err = vars.Replace("${dynamic.list}")
require.NoError(t, err)
assert.Equal(t, processers, res.Processors())
assert.Equal(t, NewListWithProcessors([]Node{
NewStrVal("array1"),
NewStrVal("array2"),
}, processers), res)

res, err = vars.Replace("${dynamic.dict}")
require.NoError(t, err)
assert.Equal(t, processers, res.Processors())
assert.Equal(t, NewDictWithProcessors([]Node{
NewKey("key1", NewStrVal("value1")),
NewKey("key2", NewStrVal("value2")),
}, processers), res)

res, err = vars.Replace("${kubernetes_secrets.test_namespace.testing_secret.secret_value}")
require.NoError(t, err)
assert.Equal(t, NewStrVal("mockedFetchContent"), res)
}

type contextProviderMock struct {
}

// MockContextProviderBuilder builds the mock context provider.
func MockContextProviderBuilder() (corecomp.ContextProvider, error) {
return &contextProviderMock{}, nil
}

func (p *contextProviderMock) Fetch(key string) (string, bool) {
return "mockedFetchContent", true
}

func (p *contextProviderMock) Run(comm corecomp.ContextProviderComm) error {
return nil
}
18 changes: 2 additions & 16 deletions x-pack/elastic-agent/pkg/composable/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,16 @@
package composable

import (
"context"
"fmt"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

// ContextProviderComm is the interface that a context provider uses to communicate back to Elastic Agent.
type ContextProviderComm interface {
context.Context

// Set sets the current mapping for this context.
Set(map[string]interface{}) error
}

// ContextProvider is the interface that a context provider must implement.
type ContextProvider interface {
// Run runs the context provider.
Run(ContextProviderComm) error
}

// ContextProviderBuilder creates a new context provider based on the given config and returns it.
type ContextProviderBuilder func(log *logger.Logger, config *config.Config) (ContextProvider, error)
type ContextProviderBuilder func(log *logger.Logger, config *config.Config) (corecomp.ContextProvider, error)

// AddContextProvider adds a new ContextProviderBuilder
func (r *providerRegistry) AddContextProvider(name string, builder ContextProviderBuilder) error {
Expand Down
21 changes: 14 additions & 7 deletions x-pack/elastic-agent/pkg/composable/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ import (
"context"
"encoding/json"
"fmt"
"strings"

"reflect"
"sort"
"strings"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

Expand Down Expand Up @@ -98,6 +100,8 @@ func (c *controller) Run(ctx context.Context, cb VarsCallback) error {
notify := make(chan bool, 5000)
localCtx, cancel := context.WithCancel(ctx)

fetchContextProviders := common.MapStr{}

// run all the enabled context providers
for name, state := range c.contextProviders {
state.Context = localCtx
Expand All @@ -107,6 +111,9 @@ func (c *controller) Run(ctx context.Context, cb VarsCallback) error {
cancel()
return errors.New(err, fmt.Sprintf("failed to run provider '%s'", name), errors.TypeConfig, errors.M("provider", name))
}
if p, ok := state.provider.(corecomp.FetchContextProvider); ok {
fetchContextProviders.Put(name, p)
}
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
}

// run all the enabled dynamic providers
Expand Down Expand Up @@ -151,15 +158,15 @@ func (c *controller) Run(ctx context.Context, cb VarsCallback) error {
mapping[name] = state.Current()
}
// this is ensured not to error, by how the mappings states are verified
vars[0], _ = transpiler.NewVars(mapping)
vars[0], _ = transpiler.NewVars(mapping, fetchContextProviders)

// add to the vars list for each dynamic providers mappings
for name, state := range c.dynamicProviders {
for _, mappings := range state.Mappings() {
local, _ := cloneMap(mapping) // will not fail; already been successfully cloned once
local[name] = mappings.mapping
// this is ensured not to error, by how the mappings states are verified
v, _ := transpiler.NewVarsWithProcessors(local, name, mappings.processors)
v, _ := transpiler.NewVarsWithProcessors(local, name, mappings.processors, fetchContextProviders)
vars = append(vars, v)
}
}
Expand All @@ -175,7 +182,7 @@ func (c *controller) Run(ctx context.Context, cb VarsCallback) error {
type contextProviderState struct {
context.Context

provider ContextProvider
provider corecomp.ContextProvider
lock sync.RWMutex
mapping map[string]interface{}
signal chan bool
Expand All @@ -189,7 +196,7 @@ func (c *contextProviderState) Set(mapping map[string]interface{}) error {
return err
}
// ensure creating vars will not error
_, err = transpiler.NewVars(mapping)
_, err = transpiler.NewVars(mapping, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -244,7 +251,7 @@ func (c *dynamicProviderState) AddOrUpdate(id string, priority int, mapping map[
return err
}
// ensure creating vars will not error
_, err = transpiler.NewVars(mapping)
_, err = transpiler.NewVars(mapping, nil)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
corecomp "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/composable"
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"
)
Expand All @@ -20,7 +21,7 @@ func init() {
type contextProvider struct{}

// Run runs the Agent context provider.
func (*contextProvider) Run(comm composable.ContextProviderComm) error {
func (*contextProvider) Run(comm corecomp.ContextProviderComm) error {
a, err := info.NewAgentInfo()
if err != nil {
return err
Expand All @@ -41,6 +42,6 @@ func (*contextProvider) Run(comm composable.ContextProviderComm) error {
}

// ContextProviderBuilder builds the context provider.
func ContextProviderBuilder(_ *logger.Logger, _ *config.Config) (composable.ContextProvider, error) {
func ContextProviderBuilder(_ *logger.Logger, _ *config.Config) (corecomp.ContextProvider, error) {
return &contextProvider{}, nil
}
Loading