Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Add priority to dynamic input provider mappings #22352

Merged
merged 3 commits into from
Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@
- Update `install` command to perform enroll before starting Elastic Agent {pull}21772[21772]
- Update `fleet.kibana.path` from a POLICY_CHANGE {pull}21804[21804]
- Removed `install-service.ps1` and `uninstall-service.ps1` from Windows .zip packaging {pull}21694[21694]
- Add `priority` to `AddOrUpdate` on dynamic composable input providers communication channel {pull}22352[22352]
39 changes: 32 additions & 7 deletions x-pack/elastic-agent/pkg/composable/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (c *contextProviderState) Current() map[string]interface{} {
}

type dynamicProviderMapping struct {
priority int
mapping map[string]interface{}
processors transpiler.Processors
}
Expand All @@ -228,7 +229,11 @@ type dynamicProviderState struct {
}

// AddOrUpdate adds or updates the current mapping for the dynamic provider.
func (c *dynamicProviderState) AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error {
//
// `priority` ensures that order is maintained when adding the mapping to the current state
// for the processor. Lower priority mappings will always be sorted before higher priority mappings
// to ensure that matching of variables occurs on the lower priority mappings first.
func (c *dynamicProviderState) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error {
var err error
mapping, err = cloneMap(mapping)
if err != nil {
Expand All @@ -252,6 +257,7 @@ func (c *dynamicProviderState) AddOrUpdate(id string, mapping map[string]interfa
return nil
}
c.mappings[id] = dynamicProviderMapping{
priority: priority,
mapping: mapping,
processors: processors,
}
Expand All @@ -276,14 +282,24 @@ func (c *dynamicProviderState) Mappings() []dynamicProviderMapping {
c.lock.RLock()
defer c.lock.RUnlock()

// add the mappings sorted by (priority,id)
mappings := make([]dynamicProviderMapping, 0)
ids := make([]string, 0)
for name := range c.mappings {
ids = append(ids, name)
priorities := make([]int, 0)
for _, mapping := range c.mappings {
priorities = addToSet(priorities, mapping.priority)
}
sort.Strings(ids)
for _, name := range ids {
mappings = append(mappings, c.mappings[name])
sort.Ints(priorities)
for _, priority := range priorities {
ids := make([]string, 0)
for name, mapping := range c.mappings {
if mapping.priority == priority {
ids = append(ids, name)
}
}
sort.Strings(ids)
for _, name := range ids {
mappings = append(mappings, c.mappings[name])
}
}
return mappings
}
Expand Down Expand Up @@ -319,3 +335,12 @@ func cloneMapArray(source []map[string]interface{}) ([]map[string]interface{}, e
}
return dest, nil
}

func addToSet(set []int, i int) []int {
for _, j := range set {
if j == i {
return set
}
}
return append(set, i)
}
6 changes: 5 additions & 1 deletion x-pack/elastic-agent/pkg/composable/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ type DynamicProviderComm interface {
context.Context

// AddOrUpdate updates a mapping with given ID with latest mapping and processors.
AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error
//
// `priority` ensures that order is maintained when adding the mapping to the current state
// for the processor. Lower priority mappings will always be sorted before higher priority mappings
// to ensure that matching of variables occurs on the lower priority mappings first.
AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error
// Remove removes a mapping by given ID.
Remove(id string)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

// ContainerPriority is the priority that container mappings are added to the provider.
const ContainerPriority = 0

func init() {
composable.Providers.AddDynamicProvider("docker", DynamicProviderBuilder)
}
Expand Down Expand Up @@ -76,7 +79,7 @@ func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
delete(stoppers, data.container.ID)
return
}
comm.AddOrUpdate(data.container.ID, data.mapping, data.processors)
comm.AddOrUpdate(data.container.ID, ContainerPriority, data.mapping, data.processors)
case event := <-stopListener.Events():
data, err := generateData(event)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ type Config struct {
func (c *Config) InitDefaults() {
c.SyncPeriod = 10 * time.Minute
c.CleanupTimeout = 60 * time.Second
c.Scope = "cluster"
blakerouse marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

const (
// PodPriority is the priority that pod mappings are added to the provider.
PodPriority = 0
// ContainerPriority is the priority that container mappings are added to the provider.
ContainerPriority = 1
)

func init() {
composable.Providers.AddDynamicProvider("kubernetes", DynamicProviderBuilder)
}
Expand Down Expand Up @@ -54,13 +61,13 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {

// Ensure that node is set correctly whenever the scope is set to "node". Make sure that node is empty
// when cluster scope is enforced.
p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope)
if p.config.Scope == "node" {
p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node)
p.config.Node = kubernetes.DiscoverKubernetesNode(p.logger, p.config.Node, kubernetes.IsInCluster(p.config.KubeConfig), client)
} else {
p.config.Node = ""
}
p.logger.Infof("Kubernetes provider started with %s scope", p.config.Scope)
p.logger.Debugf("Initializing Kubernetes watcher using node: %v", p.config.Node)

watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: p.config.SyncPeriod,
Expand All @@ -70,7 +77,6 @@ func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher")
}

watcher.AddEventHandler(&eventWatcher{p.logger, p.config.CleanupTimeout, comm})

err = watcher.Start()
Expand Down Expand Up @@ -104,7 +110,7 @@ func (p *eventWatcher) emitRunning(pod *kubernetes.Pod) {
// Emit the pod
// We emit Pod + containers to ensure that configs matching Pod only
// get Pod metadata (not specific to any container)
p.comm.AddOrUpdate(string(pod.GetUID()), mapping, processors)
p.comm.AddOrUpdate(string(pod.GetUID()), PodPriority, mapping, processors)

// Emit all containers in the pod
p.emitContainers(pod, pod.Spec.Containers, pod.Status.ContainerStatuses)
Expand Down Expand Up @@ -161,7 +167,7 @@ func (p *eventWatcher) emitContainers(pod *kubernetes.Pod, containers []kubernet
}

// Emit the container
p.comm.AddOrUpdate(eventID, mapping, processors)
p.comm.AddOrUpdate(eventID, ContainerPriority, mapping, processors)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

// ItemPriority is the priority that item mappings are added to the provider.
const ItemPriority = 0

func init() {
composable.Providers.AddDynamicProvider("local_dynamic", DynamicProviderBuilder)
}
Expand All @@ -30,7 +33,7 @@ type dynamicProvider struct {
// Run runs the environment context provider.
func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
for i, item := range c.Items {
if err := comm.AddOrUpdate(strconv.Itoa(i), item.Mapping, item.Processors); err != nil {
if err := comm.AddOrUpdate(strconv.Itoa(i), ItemPriority, item.Mapping, item.Processors); err != nil {
return errors.New(err, fmt.Sprintf("failed to add mapping for index %d", i), errors.TypeUnexpected)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ func TestContextProvider(t *testing.T) {

curr1, ok1 := comm.Current("0")
assert.True(t, ok1)
assert.Equal(t, ItemPriority, curr1.Priority)
assert.Equal(t, mapping1, curr1.Mapping)
assert.Equal(t, processors1, curr1.Processors)

curr2, ok2 := comm.Current("1")
assert.True(t, ok2)
assert.Equal(t, ItemPriority, curr2.Priority)
assert.Equal(t, mapping2, curr2.Mapping)
assert.Equal(t, processors2, curr2.Processors)
}
4 changes: 3 additions & 1 deletion x-pack/elastic-agent/pkg/composable/testing/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

// DynamicState is the state of the dynamic mapping.
type DynamicState struct {
Priority int
Mapping map[string]interface{}
Processors []map[string]interface{}
}
Expand All @@ -34,7 +35,7 @@ func NewDynamicComm(ctx context.Context) *DynamicComm {
}

// AddOrUpdate adds or updates a current mapping.
func (t *DynamicComm) AddOrUpdate(id string, mapping map[string]interface{}, processors []map[string]interface{}) error {
func (t *DynamicComm) AddOrUpdate(id string, priority int, mapping map[string]interface{}, processors []map[string]interface{}) error {
var err error
mapping, err = CloneMap(mapping)
if err != nil {
Expand All @@ -53,6 +54,7 @@ func (t *DynamicComm) AddOrUpdate(id string, mapping map[string]interface{}, pro
t.previous[id] = prev
}
t.current[id] = DynamicState{
Priority: priority,
Mapping: mapping,
Processors: processors,
}
Expand Down