diff --git a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml index fd413ee79128..9663a899b317 100644 --- a/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml +++ b/deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml @@ -441,7 +441,7 @@ data: # maxconn: 10 # network: tcp # period: 10s - # condition: ${kubernetes.pod.labels.app} == 'redis' + # condition: ${kubernetes.labels.app} == 'redis' --- apiVersion: apps/v1 kind: DaemonSet diff --git a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset-configmap.yaml b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset-configmap.yaml index f5cb508d367c..2688bbfc61ce 100644 --- a/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset-configmap.yaml +++ b/deploy/kubernetes/elastic-agent-standalone/elastic-agent-standalone-daemonset-configmap.yaml @@ -441,4 +441,4 @@ data: # maxconn: 10 # network: tcp # period: 10s - # condition: ${kubernetes.pod.labels.app} == 'redis' + # condition: ${kubernetes.labels.app} == 'redis' diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go index ddad6fd25401..dd40c53587c9 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/config.go @@ -30,6 +30,7 @@ type Config struct { AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"` IncludeLabels []string `config:"include_labels"` ExcludeLabels []string `config:"exclude_labels"` + IncludeAnnotations []string `config:"include_annotations"` LabelsDedot bool `config:"labels.dedot"` AnnotationsDedot bool `config:"annotations.dedot"` diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go index c43e5f984300..e542d8afb32e 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/kubernetes.go @@ -110,42 +110,50 @@ func (p *dynamicProvider) watchResource( p.config.Node = "" } - watcher, err := p.newWatcher(resourceType, comm, client) + eventer, err := p.newEventer(resourceType, comm, client) if err != nil { return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType) } - err = watcher.Start() + err = eventer.Start() if err != nil { - return errors.New(err, "couldn't start kubernetes watcher for resource %s", resourceType) + return errors.New(err, "couldn't start kubernetes eventer for resource %s", resourceType) } + return nil } -// newWatcher initializes the proper watcher according to the given resource (pod, node, service). -func (p *dynamicProvider) newWatcher( +// Eventer allows defining ways in which kubernetes resource events are observed and processed +type Eventer interface { + kubernetes.ResourceEventHandler + Start() error + Stop() +} + +// newEventer initializes the proper eventer according to the given resource (pod, node, service). +func (p *dynamicProvider) newEventer( resourceType string, comm composable.DynamicProviderComm, - client k8s.Interface) (kubernetes.Watcher, error) { + client k8s.Interface) (Eventer, error) { switch resourceType { case "pod": - watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope) + eventer, err := NewPodEventer(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } - return watcher, nil + return eventer, nil case "node": - watcher, err := NewNodeWatcher(comm, p.config, p.logger, client, p.config.Scope) + eventer, err := NewNodeEventer(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } - return watcher, nil + return eventer, nil case "service": - watcher, err := NewServiceWatcher(comm, p.config, p.logger, client, p.config.Scope) + eventer, err := NewServiceEventer(comm, p.config, p.logger, client, p.config.Scope) if err != nil { return nil, err } - return watcher, nil + return eventer, nil default: return nil, fmt.Errorf("unsupported autodiscover resource %s", resourceType) } diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go index 17802735d070..db63a1cc2abb 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go @@ -27,6 +27,7 @@ type node struct { scope string config *Config metagen metadata.MetaGen + watcher kubernetes.Watcher } type nodeData struct { @@ -35,13 +36,13 @@ type nodeData struct { processors []map[string]interface{} } -// NewNodeWatcher creates a watcher that can discover and process node objects -func NewNodeWatcher( +// NewNodeEventer creates an eventer that can discover and process node objects +func NewNodeEventer( comm composable.DynamicProviderComm, cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (kubernetes.Watcher, error) { + scope string) (Eventer, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -57,15 +58,17 @@ func NewNodeWatcher( return nil, errors.New(err, "failed to unpack configuration") } metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client) - watcher.AddEventHandler(&node{ + n := &node{ logger, cfg.CleanupTimeout, comm, scope, cfg, - metaGen}) + metaGen, + watcher} + watcher.AddEventHandler(n) - return watcher, nil + return n, nil } func (n *node) emitRunning(node *kubernetes.Node) { @@ -83,6 +86,16 @@ func (n *node) emitStopped(node *kubernetes.Node) { n.comm.Remove(string(node.GetUID())) } +// Start starts the eventer +func (n *node) Start() error { + return n.watcher.Start() +} + +// Stop stops the eventer +func (n *node) Stop() { + n.watcher.Stop() +} + // OnAdd ensures processing of node objects that are newly created func (n *node) OnAdd(obj interface{}) { n.logger.Debugf("Watcher Node add: %+v", obj) diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 859f7f29dfde..80b28fcab476 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -28,6 +28,8 @@ type pod struct { scope string config *Config metagen metadata.MetaGen + watcher kubernetes.Watcher + nodeWatcher kubernetes.Watcher namespaceWatcher kubernetes.Watcher // Mutex used by configuration updates not triggered by the main watcher, @@ -65,13 +67,13 @@ type namespacePodUpdater struct { locker sync.Locker } -// NewPodWatcher creates a watcher that can discover and process pod objects -func NewPodWatcher( +// NewPodEventer creates an eventer that can discover and process pod objects +func NewPodEventer( comm composable.DynamicProviderComm, cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (kubernetes.Watcher, error) { + scope string) (Eventer, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -107,24 +109,57 @@ func NewPodWatcher( } metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, metaConf) - p := pod{ + p := &pod{ logger: logger, cleanupTimeout: cfg.CleanupTimeout, comm: comm, scope: scope, config: cfg, metagen: metaGen, + watcher: watcher, + nodeWatcher: nodeWatcher, namespaceWatcher: namespaceWatcher, } - watcher.AddEventHandler(&p) + watcher.AddEventHandler(p) if namespaceWatcher != nil && metaConf.Namespace.Enabled() { updater := newNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate) namespaceWatcher.AddEventHandler(updater) } - return watcher, nil + return p, nil +} + +// Start starts the eventer +func (p *pod) Start() error { + if p.nodeWatcher != nil { + err := p.nodeWatcher.Start() + if err != nil { + return err + } + } + + if p.namespaceWatcher != nil { + if err := p.namespaceWatcher.Start(); err != nil { + return err + } + } + + return p.watcher.Start() +} + +// Stop stops the eventer +func (p *pod) Stop() { + p.watcher.Stop() + + if p.namespaceWatcher != nil { + p.namespaceWatcher.Stop() + } + + if p.nodeWatcher != nil { + p.nodeWatcher.Stop() + } } func (p *pod) emitRunning(pod *kubernetes.Pod) { diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go index 03686ca70455..0e3f0055af16 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go @@ -27,6 +27,7 @@ type service struct { scope string config *Config metagen metadata.MetaGen + watcher kubernetes.Watcher namespaceWatcher kubernetes.Watcher } @@ -36,13 +37,13 @@ type serviceData struct { processors []map[string]interface{} } -// NewServiceWatcher creates a watcher that can discover and process service objects -func NewServiceWatcher( +// NewServiceEventer creates an eventer that can discover and process service objects +func NewServiceEventer( comm composable.DynamicProviderComm, cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (kubernetes.Watcher, error) { + scope string) (Eventer, error) { watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -68,17 +69,38 @@ func NewServiceWatcher( } metaGen := metadata.NewServiceMetadataGenerator(rawConfig, watcher.Store(), namespaceMeta, client) - watcher.AddEventHandler(&service{ + s := &service{ logger, cfg.CleanupTimeout, comm, scope, cfg, metaGen, + watcher, namespaceWatcher, - }) + } + watcher.AddEventHandler(s) + + return s, nil +} - return watcher, nil +// Start starts the eventer +func (s *service) Start() error { + if s.namespaceWatcher != nil { + if err := s.namespaceWatcher.Start(); err != nil { + return err + } + } + return s.watcher.Start() +} + +// Stop stops the eventer +func (s *service) Stop() { + s.watcher.Stop() + + if s.namespaceWatcher != nil { + s.namespaceWatcher.Stop() + } } func (s *service) emitRunning(service *kubernetes.Service) {