From 1900ea17f60078ada1b10e9a2c677784554c7dec Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 23 Nov 2021 10:46:03 +0000 Subject: [PATCH 1/2] Use NamedWatcher in Agent's k8s provider Signed-off-by: chrismark --- libbeat/common/kubernetes/watcher.go | 6 ------ .../pkg/composable/providers/kubernetes/node.go | 2 +- .../pkg/composable/providers/kubernetes/pod.go | 6 +++--- .../pkg/composable/providers/kubernetes/service.go | 4 ++-- 4 files changed, 6 insertions(+), 12 deletions(-) diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index b96b837e4e3..621978090a4 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -93,12 +93,6 @@ type watcher struct { logger *logp.Logger } -// NewWatcher initializes the watcher client to provide a events handler for -// resource from the cluster (filtered to the given node) -func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { - return NewNamedWatcher("", client, resource, opts, indexers) -} - // NewNamedWatcher does the same as NewWatcher, but also allows to name the k8s // client's workqueue that is used by the watcher, unlike NewWatcher which sets // the workqueue name to "". Workqueue name is important for exposing workqueue diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go index db63a1cc2ab..29cac3efdb5 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/node.go @@ -43,7 +43,7 @@ func NewNodeEventer( logger *logp.Logger, client k8s.Interface, scope string) (Eventer, error) { - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{ + watcher, err := kubernetes.NewNamedWatcher("agent-node", client, &kubernetes.Node{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, IsUpdated: isUpdated, diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go index 5a7a8c4a392..e45499ec8dd 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/pod.go @@ -74,7 +74,7 @@ func NewPodEventer( logger *logp.Logger, client k8s.Interface, scope string) (Eventer, error) { - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Pod{}, kubernetes.WatchOptions{ + watcher, err := kubernetes.NewNamedWatcher("agent-pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, Namespace: cfg.Namespace, @@ -92,11 +92,11 @@ func NewPodEventer( if metaConf == nil { metaConf = metadata.GetDefaultResourceMetadataConfig() } - nodeWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, options, nil) + nodeWatcher, err := kubernetes.NewNamedWatcher("agent-node", client, &kubernetes.Node{}, options, nil) if err != nil { logger.Errorf("couldn't create watcher for %T due to error %+v", &kubernetes.Node{}, err) } - namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + namespaceWatcher, err := kubernetes.NewNamedWatcher("agent-namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, }, nil) if err != nil { diff --git a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go index 2bdf73380be..a19e5c789c5 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go +++ b/x-pack/elastic-agent/pkg/composable/providers/kubernetes/service.go @@ -44,7 +44,7 @@ func NewServiceEventer( logger *logp.Logger, client k8s.Interface, scope string) (Eventer, error) { - watcher, err := kubernetes.NewWatcher(client, &kubernetes.Service{}, kubernetes.WatchOptions{ + watcher, err := kubernetes.NewNamedWatcher("agent-service", client, &kubernetes.Service{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, HonorReSyncs: true, @@ -54,7 +54,7 @@ func NewServiceEventer( } metaConf := metadata.GetDefaultResourceMetadataConfig() - namespaceWatcher, err := kubernetes.NewWatcher(client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ + namespaceWatcher, err := kubernetes.NewNamedWatcher("agent-namespace", client, &kubernetes.Namespace{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Namespace: cfg.Namespace, }, nil) From b874f1f7335cb46234d4102a47f71bf29b18338a Mon Sep 17 00:00:00 2001 From: chrismark Date: Tue, 23 Nov 2021 11:44:27 +0000 Subject: [PATCH 2/2] Fix docstring Signed-off-by: chrismark --- libbeat/common/kubernetes/watcher.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 621978090a4..75f32f66d66 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -93,9 +93,15 @@ type watcher struct { logger *logp.Logger } -// NewNamedWatcher does the same as NewWatcher, but also allows to name the k8s -// client's workqueue that is used by the watcher, unlike NewWatcher which sets -// the workqueue name to "". Workqueue name is important for exposing workqueue +// NewWatcher initializes the watcher client to provide a events handler for +// resource from the cluster (filtered to the given node) +func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { + return NewNamedWatcher("", client, resource, opts, indexers) +} + +// NewNamedWatcher initializes the watcher client to provide an events handler for +// resource from the cluster (filtered to the given node) and also allows to name the k8s +// client's workqueue that is used by the watcher. Workqueue name is important for exposing workqueue // metrics, if it is empty, its metrics will not be logged by the k8s client. func NewNamedWatcher(name string, client kubernetes.Interface, resource Resource, opts WatchOptions, indexers cache.Indexers) (Watcher, error) { var store cache.Store