Skip to content

Commit

Permalink
[Elastic Agent] Add priority to dynamic input provider mappings (elas…
Browse files Browse the repository at this point in the history
…tic#22352)

* Add priority to dynamic input provider mappings.

* Add changelog.

* Update config.go
  • Loading branch information
blakerouse authored Nov 6, 2020
1 parent 0e8a72e commit 8b2cebb
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 16 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@
- Update `install` command to perform enroll before starting Elastic Agent {pull}21772[21772]
- Update `fleet.kibana.path` from a POLICY_CHANGE {pull}21804[21804]
- Removed `install-service.ps1` and `uninstall-service.ps1` from Windows .zip packaging {pull}21694[21694]
- Add `priority` to `AddOrUpdate` on dynamic composable input providers communication channel {pull}22352[22352]
39 changes: 32 additions & 7 deletions x-pack/elastic-agent/pkg/composable/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (c *contextProviderState) Current() map[string]interface{} {
}

type dynamicProviderMapping struct {
priority int
mapping map[string]interface{}
processors transpiler.Processors
}
Expand All @@ -228,7 +229,11 @@ type dynamicProviderState struct {
}

// AddOrUpdate adds or updates the current mapping for the dynamic provider.
func (c *dynamicProviderState) AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error {
//
// `priority` ensures that order is maintained when adding the mapping to the current state
// for the processor. Lower priority mappings will always be sorted before higher priority mappings
// to ensure that matching of variables occurs on the lower priority mappings first.
func (c *dynamicProviderState) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error {
var err error
mapping, err = cloneMap(mapping)
if err != nil {
Expand All @@ -252,6 +257,7 @@ func (c *dynamicProviderState) AddOrUpdate(id string, mapping map[string]interfa
return nil
}
c.mappings[id] = dynamicProviderMapping{
priority: priority,
mapping: mapping,
processors: processors,
}
Expand All @@ -276,14 +282,24 @@ func (c *dynamicProviderState) Mappings() []dynamicProviderMapping {
c.lock.RLock()
defer c.lock.RUnlock()

// add the mappings sorted by (priority,id)
mappings := make([]dynamicProviderMapping, 0)
ids := make([]string, 0)
for name := range c.mappings {
ids = append(ids, name)
priorities := make([]int, 0)
for _, mapping := range c.mappings {
priorities = addToSet(priorities, mapping.priority)
}
sort.Strings(ids)
for _, name := range ids {
mappings = append(mappings, c.mappings[name])
sort.Ints(priorities)
for _, priority := range priorities {
ids := make([]string, 0)
for name, mapping := range c.mappings {
if mapping.priority == priority {
ids = append(ids, name)
}
}
sort.Strings(ids)
for _, name := range ids {
mappings = append(mappings, c.mappings[name])
}
}
return mappings
}
Expand Down Expand Up @@ -319,3 +335,12 @@ func cloneMapArray(source []map[string]interface{}) ([]map[string]interface{}, e
}
return dest, nil
}

func addToSet(set []int, i int) []int {
for _, j := range set {
if j == i {
return set
}
}
return append(set, i)
}
6 changes: 5 additions & 1 deletion x-pack/elastic-agent/pkg/composable/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ type DynamicProviderComm interface {
context.Context

// AddOrUpdate updates a mapping with given ID with latest mapping and processors.
AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error
//
// `priority` ensures that order is maintained when adding the mapping to the current state
// for the processor. Lower priority mappings will always be sorted before higher priority mappings
// to ensure that matching of variables occurs on the lower priority mappings first.
AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error
// Remove removes a mapping by given ID.
Remove(id string)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

// ContainerPriority is the priority that container mappings are added to the provider.
const ContainerPriority = 0

func init() {
composable.Providers.AddDynamicProvider("docker", DynamicProviderBuilder)
}
Expand Down Expand Up @@ -76,7 +79,7 @@ func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
delete(stoppers, data.container.ID)
return
}
comm.AddOrUpdate(data.container.ID, data.mapping, data.processors)
comm.AddOrUpdate(data.container.ID, ContainerPriority, data.mapping, data.processors)
case event := <-stopListener.Events():
data, err := generateData(event)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ type Config struct {
func (c *Config) InitDefaults() {
c.SyncPeriod = 10 * time.Minute
c.CleanupTimeout = 60 * time.Second
c.Scope = "node"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

const (
// PodPriority is the priority that pod mappings are added to the provider.
PodPriority = 0
// ContainerPriority is the priority that container mappings are added to the provider.
ContainerPriority = 1
)

func init() {
composable.Providers.AddDynamicProvider("kubernetes", DynamicProviderBuilder)
}
Expand Down Expand Up @@ -54,13 +61,13 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {

// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope)
if p.config.Scope == "node" {
p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node)
p.config.Node = kubernetes.DiscoverKubernetesNode(p.logger, p.config.Node, kubernetes.IsInCluster(p.config.KubeConfig), client)
} else {
p.config.Node = ""
}
p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope)
p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node)

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: p.config.SyncPeriod,
Expand All @@ -70,7 +77,6 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher")
}

watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm})

err = watcher.Start()
Expand Down Expand Up @@ -104,7 +110,7 @@ func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) {
// Emit the pod
// We emit Pod + containers to ensure that configs matching Pod only
// get Pod metadata (not specific to any container)
p.comm.AddOrUpdate(string(pod.GetUID()), mapping, processors)
p.comm.AddOrUpdate(string(pod.GetUID()), PodPriority, mapping, processors)

// Emit all containers in the pod
p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses)
Expand Down Expand Up @@ -161,7 +167,7 @@ func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernet
}

// Emit the container
p.comm.AddOrUpdate(eventID, mapping, processors)
p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

// ItemPriority is the priority that item mappings are added to the provider.
const ItemPriority = 0

func init() {
composable.Providers.AddDynamicProvider("local_dynamic", DynamicProviderBuilder)
}
Expand All @@ -30,7 +33,7 @@ type dynamicProvider struct {
// Run runs the environment context provider.
func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
for i, item := range c.Items {
if err := comm.AddOrUpdate(strconv.Itoa(i), item.Mapping, item.Processors); err != nil {
if err := comm.AddOrUpdate(strconv.Itoa(i), ItemPriority, item.Mapping, item.Processors); err != nil {
return errors.New(err, fmt.Sprintf("failed to add mapping for index %d", i), errors.TypeUnexpected)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ func TestContextProvider(t *testing.T) {

curr1, ok1 := comm.Current("0")
assert.True(t, ok1)
assert.Equal(t, ItemPriority, curr1.Priority)
assert.Equal(t, mapping1, curr1.Mapping)
assert.Equal(t, processors1, curr1.Processors)

curr2, ok2 := comm.Current("1")
assert.True(t, ok2)
assert.Equal(t, ItemPriority, curr2.Priority)
assert.Equal(t, mapping2, curr2.Mapping)
assert.Equal(t, processors2, curr2.Processors)
}
4 changes: 3 additions & 1 deletion x-pack/elastic-agent/pkg/composable/testing/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

// DynamicState is the state of the dynamic mapping.
type DynamicState struct {
Priority int
Mapping map[string]interface{}
Processors []map[string]interface{}
}
Expand All @@ -34,7 +35,7 @@ func NewDynamicComm(ctx context.Context) *DynamicComm {
}

// AddOrUpdate adds or updates a current mapping.
func (t *DynamicComm) AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error {
func (t *DynamicComm) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error {
var err error
mapping, err = CloneMap(mapping)
if err != nil {
Expand All @@ -53,6 +54,7 @@ func (t *DynamicComm) AddOrUpdate(id string, mapping map[string]interface{}, pro
t.previous[id] = prev
}
t.current[id] = DynamicState{
Priority: priority,
Mapping: mapping,
Processors: processors,
}
Expand Down

0 comments on commit 8b2cebb

Please sign in to comment.