Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #20804 to 7.x: [Elastic Agent] Add initial composable providers #20828

Merged
merged 1 commit into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions x-pack/elastic-agent/pkg/composable/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package composable

import "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"

// Config is config for multiple providers.
type Config struct {
Providers map[string]*config.Config `config:"providers"`
}
64 changes: 64 additions & 0 deletions x-pack/elastic-agent/pkg/composable/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package composable

import (
"context"
"fmt"
"strings"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
)

// ContextProviderComm is the interface that a context provider uses to communicate back to Elastic Agent.
type ContextProviderComm interface {
context.Context

// Set sets the current mapping for this context.
Set(map[string]interface{}) error
}

// ContextProvider is the interface that a context provider must implement.
type ContextProvider interface {
// Run runs the context provider.
Run(ContextProviderComm) error
}

// ContextProviderBuilder creates a new context provider based on the given config and returns it.
type ContextProviderBuilder func(config *config.Config) (ContextProvider, error)

// AddContextProvider adds a new ContextProviderBuilder
func (r *providerRegistry) AddContextProvider(name string, builder ContextProviderBuilder) error {
r.lock.Lock()
defer r.lock.Unlock()

if name == "" {
return fmt.Errorf("provider name is required")
}
if strings.ToLower(name) != name {
return fmt.Errorf("provider name must be lowercase")
}
_, contextExists := r.contextProviders[name]
_, dynamicExists := r.dynamicProviders[name]
if contextExists || dynamicExists {
return fmt.Errorf("provider '%s' is already registered", name)
}
if builder == nil {
return fmt.Errorf("provider '%s' cannot be registered with a nil factory", name)
}

r.contextProviders[name] = builder
r.logger.Debugf("Registered provider: %s", name)
return nil
}

// GetContextProvider returns the context provider with the giving name, nil if it doesn't exist
func (r *providerRegistry) GetContextProvider(name string) (ContextProviderBuilder, bool) {
r.lock.RLock()
defer r.lock.RUnlock()

b, ok := r.contextProviders[name]
return b, ok
}
300 changes: 300 additions & 0 deletions x-pack/elastic-agent/pkg/composable/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package composable

import (
"context"
"encoding/json"
"fmt"
"reflect"
"sort"
"sync"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
)

// Vars is a context of variables that also contain a list of processors that go with the mapping.
type Vars struct {
Mapping map[string]interface{}

ProcessorsKey string
Processors []map[string]interface{}
}

// VarsCallback is callback called when the current vars state changes.
type VarsCallback func([]Vars)

// Controller manages the state of the providers current context.
type Controller struct {
contextProviders map[string]*contextProviderState
dynamicProviders map[string]*dynamicProviderState
}

// New creates a new controller.
func New(c *config.Config) (*Controller, error) {
var providersCfg Config
err := c.Unpack(&providersCfg)
if err != nil {
return nil, errors.New(err, "failed to unpack providers config", errors.TypeConfig)
}

// build all the context providers
contextProviders := map[string]*contextProviderState{}
for name, builder := range Providers.contextProviders {
pCfg, ok := providersCfg.Providers[name]
if ok && !pCfg.Enabled() {
// explicitly disabled; skipping
continue
}
provider, err := builder(pCfg)
if err != nil {
return nil, errors.New(err, fmt.Sprintf("failed to build provider '%s'", name), errors.TypeConfig, errors.M("provider", name))
}
contextProviders[name] = &contextProviderState{
provider: provider,
}
}

// build all the dynamic providers
dynamicProviders := map[string]*dynamicProviderState{}
for name, builder := range Providers.dynamicProviders {
pCfg, ok := providersCfg.Providers[name]
if ok && !pCfg.Enabled() {
// explicitly disabled; skipping
continue
}
provider, err := builder(pCfg)
if err != nil {
return nil, errors.New(err, fmt.Sprintf("failed to build provider '%s'", name), errors.TypeConfig, errors.M("provider", name))
}
dynamicProviders[name] = &dynamicProviderState{
provider: provider,
mappings: map[string]Vars{},
}
}

return &Controller{
contextProviders: contextProviders,
dynamicProviders: dynamicProviders,
}, nil
}

// Run runs the controller.
func (c *Controller) Run(ctx context.Context, cb VarsCallback) error {
// large number not to block performing Run on the provided providers
notify := make(chan bool, 5000)
localCtx, cancel := context.WithCancel(ctx)

// run all the enabled context providers
for name, state := range c.contextProviders {
state.Context = localCtx
state.signal = notify
err := state.provider.Run(state)
if err != nil {
cancel()
return errors.New(err, fmt.Sprintf("failed to run provider '%s'", name), errors.TypeConfig, errors.M("provider", name))
}
}

// run all the enabled dynamic providers
for name, state := range c.dynamicProviders {
state.Context = localCtx
state.signal = notify
err := state.provider.Run(state)
if err != nil {
cancel()
return errors.New(err, fmt.Sprintf("failed to run provider '%s'", name), errors.TypeConfig, errors.M("provider", name))
}
}

go func() {
for {
// performs debounce of notifies; accumulates them into 100 millisecond chunks
changed := false
for {
exitloop := false
select {
case <-ctx.Done():
cancel()
return
case <-notify:
changed = true
case <-time.After(100 * time.Millisecond):
exitloop = true
break
}
if exitloop {
break
}
}
if !changed {
continue
}

// build the vars list of mappings
vars := make([]Vars, 1)
mapping := map[string]interface{}{}
for name, state := range c.contextProviders {
mapping[name] = state.Current()
}
vars[0] = Vars{
Mapping: mapping,
}

// add to the vars list for each dynamic providers mappings
for name, state := range c.dynamicProviders {
for _, mappings := range state.Mappings() {
local, _ := cloneMap(mapping) // will not fail; already been successfully cloned once
local[name] = mappings.Mapping
vars = append(vars, Vars{
Mapping: local,
ProcessorsKey: name,
Processors: mappings.Processors,
})
}
}

// execute the callback
cb(vars)
}
}()

return nil
}

type contextProviderState struct {
context.Context

provider ContextProvider
lock sync.RWMutex
mapping map[string]interface{}
signal chan bool
}

// Set sets the current mapping.
func (c *contextProviderState) Set(mapping map[string]interface{}) error {
var err error
mapping, err = cloneMap(mapping)
if err != nil {
return err
}

c.lock.Lock()
defer c.lock.Unlock()

if reflect.DeepEqual(c.mapping, mapping) {
// same mapping; no need to update and signal
return nil
}
c.mapping = mapping
c.signal <- true
return nil
}

// Current returns the current mapping.
func (c *contextProviderState) Current() map[string]interface{} {
c.lock.RLock()
defer c.lock.RUnlock()
return c.mapping
}

type dynamicProviderState struct {
context.Context

provider DynamicProvider
lock sync.RWMutex
mappings map[string]Vars
signal chan bool
}

// AddOrUpdate adds or updates the current mapping for the dynamic provider.
func (c *dynamicProviderState) AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error {
var err error
mapping, err = cloneMap(mapping)
if err != nil {
return err
}
processors, err = cloneMapArray(processors)
if err != nil {
return err
}

c.lock.Lock()
defer c.lock.Unlock()
curr, ok := c.mappings[id]
if ok && reflect.DeepEqual(curr.Mapping, mapping) && reflect.DeepEqual(curr.Processors, processors) {
// same mapping; no need to update and signal
return nil
}
c.mappings[id] = Vars{
Mapping: mapping,
Processors: processors,
}
c.signal <- true
return nil
}

// Remove removes the current mapping for the dynamic provider.
func (c *dynamicProviderState) Remove(id string) {
c.lock.Lock()
defer c.lock.Unlock()
_, exists := c.mappings[id]
if exists {
// existed; remove and signal
delete(c.mappings, id)
c.signal <- true
}
}

// Mappings returns the current mappings.
func (c *dynamicProviderState) Mappings() []Vars {
c.lock.RLock()
defer c.lock.RUnlock()

mappings := make([]Vars, 0)
ids := make([]string, 0)
for name := range c.mappings {
ids = append(ids, name)
}
sort.Strings(ids)
for _, name := range ids {
mappings = append(mappings, c.mappings[name])
}
return mappings
}

func cloneMap(source map[string]interface{}) (map[string]interface{}, error) {
if source == nil {
return nil, nil
}
bytes, err := json.Marshal(source)
if err != nil {
return nil, fmt.Errorf("failed to clone: %s", err)
}
var dest map[string]interface{}
err = json.Unmarshal(bytes, &dest)
if err != nil {
return nil, fmt.Errorf("failed to clone: %s", err)
}
return dest, nil
}

func cloneMapArray(source []map[string]interface{}) ([]map[string]interface{}, error) {
if source == nil {
return nil, nil
}
bytes, err := json.Marshal(source)
if err != nil {
return nil, fmt.Errorf("failed to clone: %s", err)
}
var dest []map[string]interface{}
err = json.Unmarshal(bytes, &dest)
if err != nil {
return nil, fmt.Errorf("failed to clone: %s", err)
}
return dest, nil
}
Loading