Skip to content

Commit

Permalink
Port IncludeAnnotations setting to Agent and small manifest fix (elas…
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrsMark authored and wiwen committed Nov 1, 2021
1 parent 96f3569 commit b9a02b5
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 32 deletions.
2 changes: 1 addition & 1 deletion deploy/kubernetes/elastic-agent-standalone-kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ data:
# maxconn: 10
# network: tcp
# period: 10s
# condition: ${kubernetes.pod.labels.app} == 'redis'
# condition: ${kubernetes.labels.app} == 'redis'
---
apiVersion: apps/v1
kind: DaemonSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,4 @@ data:
# maxconn: 10
# network: tcp
# period: 10s
# condition: ${kubernetes.pod.labels.app} == 'redis'
# condition: ${kubernetes.labels.app} == 'redis'
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Config struct {
AddResourceMetadata *metadata.AddResourceMetadataConfig `config:"add_resource_metadata"`
IncludeLabels []string `config:"include_labels"`
ExcludeLabels []string `config:"exclude_labels"`
IncludeAnnotations []string `config:"include_annotations"`

LabelsDedot bool `config:"labels.dedot"`
AnnotationsDedot bool `config:"annotations.dedot"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,42 +110,50 @@ func (p *dynamicProvider) watchResource(
p.config.Node = ""
}

watcher, err := p.newWatcher(resourceType, comm, client)
eventer, err := p.newEventer(resourceType, comm, client)
if err != nil {
return errors.New(err, "couldn't create kubernetes watcher for resource %s", resourceType)
}

err = watcher.Start()
err = eventer.Start()
if err != nil {
return errors.New(err, "couldn't start kubernetes watcher for resource %s", resourceType)
return errors.New(err, "couldn't start kubernetes eventer for resource %s", resourceType)
}

return nil
}

// newWatcher initializes the proper watcher according to the given resource (pod, node, service).
func (p *dynamicProvider) newWatcher(
// Eventer allows defining ways in which kubernetes resource events are observed and processed
type Eventer interface {
kubernetes.ResourceEventHandler
Start() error
Stop()
}

// newEventer initializes the proper eventer according to the given resource (pod, node, service).
func (p *dynamicProvider) newEventer(
resourceType string,
comm composable.DynamicProviderComm,
client k8s.Interface) (kubernetes.Watcher, error) {
client k8s.Interface) (Eventer, error) {
switch resourceType {
case "pod":
watcher, err := NewPodWatcher(comm, p.config, p.logger, client, p.config.Scope)
eventer, err := NewPodEventer(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
return eventer, nil
case "node":
watcher, err := NewNodeWatcher(comm, p.config, p.logger, client, p.config.Scope)
eventer, err := NewNodeEventer(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
return eventer, nil
case "service":
watcher, err := NewServiceWatcher(comm, p.config, p.logger, client, p.config.Scope)
eventer, err := NewServiceEventer(comm, p.config, p.logger, client, p.config.Scope)
if err != nil {
return nil, err
}
return watcher, nil
return eventer, nil
default:
return nil, fmt.Errorf("unsupported autodiscover resource %s", resourceType)
}
Expand Down
25 changes: 19 additions & 6 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type node struct {
scope string
config *Config
metagen metadata.MetaGen
watcher kubernetes.Watcher
}

type nodeData struct {
Expand All @@ -35,13 +36,13 @@ type nodeData struct {
processors []map[string]interface{}
}

// NewNodeWatcher creates a watcher that can discover and process node objects
func NewNodeWatcher(
// NewNodeEventer creates an eventer that can discover and process node objects
func NewNodeEventer(
comm composable.DynamicProviderComm,
cfg *Config,
logger *logp.Logger,
client k8s.Interface,
scope string) (kubernetes.Watcher, error) {
scope string) (Eventer, error) {
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Node: cfg.Node,
Expand All @@ -57,15 +58,17 @@ func NewNodeWatcher(
return nil, errors.New(err, "failed to unpack configuration")
}
metaGen := metadata.NewNodeMetadataGenerator(rawConfig, watcher.Store(), client)
watcher.AddEventHandler(&node{
n := &node{
logger,
cfg.CleanupTimeout,
comm,
scope,
cfg,
metaGen})
metaGen,
watcher}
watcher.AddEventHandler(n)

return watcher, nil
return n, nil
}

func (n *node) emitRunning(node *kubernetes.Node) {
Expand All @@ -83,6 +86,16 @@ func (n *node) emitStopped(node *kubernetes.Node) {
n.comm.Remove(string(node.GetUID()))
}

// Start starts the eventer
func (n *node) Start() error {
return n.watcher.Start()
}

// Stop stops the eventer
func (n *node) Stop() {
n.watcher.Stop()
}

// OnAdd ensures processing of node objects that are newly created
func (n *node) OnAdd(obj interface{}) {
n.logger.Debugf("Watcher Node add: %+v", obj)
Expand Down
47 changes: 41 additions & 6 deletions x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type pod struct {
scope string
config *Config
metagen metadata.MetaGen
watcher kubernetes.Watcher
nodeWatcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher

// Mutex used by configuration updates not triggered by the main watcher,
Expand Down Expand Up @@ -65,13 +67,13 @@ type namespacePodUpdater struct {
locker sync.Locker
}

// NewPodWatcher creates a watcher that can discover and process pod objects
func NewPodWatcher(
// NewPodEventer creates an eventer that can discover and process pod objects
func NewPodEventer(
comm composable.DynamicProviderComm,
cfg *Config,
logger *logp.Logger,
client k8s.Interface,
scope string) (kubernetes.Watcher, error) {
scope string) (Eventer, error) {
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Node: cfg.Node,
Expand Down Expand Up @@ -107,24 +109,57 @@ func NewPodWatcher(
}
metaGen := metadata.GetPodMetaGen(rawConfig, watcher, nodeWatcher, namespaceWatcher, metaConf)

p := pod{
p := &pod{
logger: logger,
cleanupTimeout: cfg.CleanupTimeout,
comm: comm,
scope: scope,
config: cfg,
metagen: metaGen,
watcher: watcher,
nodeWatcher: nodeWatcher,
namespaceWatcher: namespaceWatcher,
}

watcher.AddEventHandler(&p)
watcher.AddEventHandler(p)

if namespaceWatcher != nil && metaConf.Namespace.Enabled() {
updater := newNamespacePodUpdater(p.unlockedUpdate, watcher.Store(), &p.crossUpdate)
namespaceWatcher.AddEventHandler(updater)
}

return watcher, nil
return p, nil
}

// Start starts the eventer
func (p *pod) Start() error {
if p.nodeWatcher != nil {
err := p.nodeWatcher.Start()
if err != nil {
return err
}
}

if p.namespaceWatcher != nil {
if err := p.namespaceWatcher.Start(); err != nil {
return err
}
}

return p.watcher.Start()
}

// Stop stops the eventer
func (p *pod) Stop() {
p.watcher.Stop()

if p.namespaceWatcher != nil {
p.namespaceWatcher.Stop()
}

if p.nodeWatcher != nil {
p.nodeWatcher.Stop()
}
}

func (p *pod) emitRunning(pod *kubernetes.Pod) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type service struct {
scope string
config *Config
metagen metadata.MetaGen
watcher kubernetes.Watcher
namespaceWatcher kubernetes.Watcher
}

Expand All @@ -36,13 +37,13 @@ type serviceData struct {
processors []map[string]interface{}
}

// NewServiceWatcher creates a watcher that can discover and process service objects
func NewServiceWatcher(
// NewServiceEventer creates an eventer that can discover and process service objects
func NewServiceEventer(
comm composable.DynamicProviderComm,
cfg *Config,
logger *logp.Logger,
client k8s.Interface,
scope string) (kubernetes.Watcher, error) {
scope string) (Eventer, error) {
watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{
SyncTimeout: cfg.SyncPeriod,
Node: cfg.Node,
Expand All @@ -68,17 +69,38 @@ func NewServiceWatcher(
}

metaGen := metadata.NewServiceMetadataGenerator(rawConfig, watcher.Store(), namespaceMeta, client)
watcher.AddEventHandler(&service{
s := &service{
logger,
cfg.CleanupTimeout,
comm,
scope,
cfg,
metaGen,
watcher,
namespaceWatcher,
})
}
watcher.AddEventHandler(s)

return s, nil
}

return watcher, nil
// Start starts the eventer
func (s *service) Start() error {
if s.namespaceWatcher != nil {
if err := s.namespaceWatcher.Start(); err != nil {
return err
}
}
return s.watcher.Start()
}

// Stop stops the eventer
func (s *service) Stop() {
s.watcher.Stop()

if s.namespaceWatcher != nil {
s.namespaceWatcher.Stop()
}
}

func (s *service) emitRunning(service *kubernetes.Service) {
Expand Down

0 comments on commit b9a02b5

Please sign in to comment.