diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d8a51a18081..a46e58ce92e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -150,6 +150,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Server-side TLS config now validates certificate and key are both specified {pull}19584[19584] - Fix terminating pod autodiscover issue. {pull}20084[20084] - Fix seccomp policy for calls to `chmod` and `chown`. {pull}20054[20054] +- Remove unnecessary restarts of metricsets while using Node autodiscover {pull}19974[19974] - Output errors when Kibana index pattern setup fails. {pull}20121[20121] *Auditbeat* diff --git a/libbeat/autodiscover/providers/kubernetes/node.go b/libbeat/autodiscover/providers/kubernetes/node.go index bd529582f0c..a78622756cd 100644 --- a/libbeat/autodiscover/providers/kubernetes/node.go +++ b/libbeat/autodiscover/providers/kubernetes/node.go @@ -23,6 +23,7 @@ import ( "github.com/gofrs/uuid" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" k8s "k8s.io/client-go/kubernetes" "github.com/elastic/beats/v7/libbeat/autodiscover/builder" @@ -66,6 +67,7 @@ func NewNodeEventer(uuid uuid.UUID, cfg *common.Config, client k8s.Interface, pu watcher, err := kubernetes.NewWatcher(client, &kubernetes.Node{}, kubernetes.WatchOptions{ SyncTimeout: config.SyncPeriod, Node: config.Node, + IsUpdated: isUpdated, }, nil) if err != nil { @@ -190,6 +192,39 @@ func (n *node) emit(node *kubernetes.Node, flag string) { n.publish(event) } +func isUpdated(o, n interface{}) bool { + old, _ := o.(*kubernetes.Node) + new, _ := n.(*kubernetes.Node) + + // Consider as not update in case one of the two objects is not a Node + if old == nil || new == nil { + return true + } + + // This is a resync. It is not an update + if old.ResourceVersion == new.ResourceVersion { + return false + } + + // If the old object and new object are different + oldCopy := old.DeepCopy() + oldCopy.ResourceVersion = "" + + newCopy := new.DeepCopy() + newCopy.ResourceVersion = "" + + // If the old object and new object are different in either meta or spec then there is a valid change + if !equality.Semantic.DeepEqual(oldCopy.Spec, newCopy.Spec) || !equality.Semantic.DeepEqual(oldCopy.ObjectMeta, newCopy.ObjectMeta) { + return true + } + + // If there is a change in the node status then there is a valid change. + if isNodeReady(old) != isNodeReady(new) { + return true + } + return false +} + func getAddress(node *kubernetes.Node) string { for _, address := range node.Status.Addresses { if address.Type == v1.NodeExternalIP && address.Address != "" { diff --git a/libbeat/autodiscover/providers/kubernetes/node_test.go b/libbeat/autodiscover/providers/kubernetes/node_test.go index 59fb67ada7d..736bd153cf2 100644 --- a/libbeat/autodiscover/providers/kubernetes/node_test.go +++ b/libbeat/autodiscover/providers/kubernetes/node_test.go @@ -278,3 +278,178 @@ func TestEmitEvent_Node(t *testing.T) { }) } } + +func TestNode_isUpdated(t *testing.T) { + tests := []struct { + old *kubernetes.Node + new *kubernetes.Node + updated bool + test string + }{ + { + test: "one of the objects is nil then its updated", + old: nil, + new: &kubernetes.Node{}, + updated: true, + }, + { + test: "both empty nodes should return not updated", + old: &kubernetes.Node{}, + new: &kubernetes.Node{}, + updated: false, + }, + { + test: "resource version is the same should return not updated", + old: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + }, + }, + new: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + }, + }, + }, + { + test: "if meta changes then it should return updated", + old: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + Annotations: map[string]string{}, + }, + }, + new: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{ + "a": "b", + }, + }, + }, + updated: true, + }, + { + test: "if spec changes then it should return updated", + old: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: false, + }, + }, + new: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: true, + }, + }, + updated: true, + }, + { + test: "if overall status doesn't change then its not an update", + old: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: true, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + }, + new: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: true, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + }, + updated: false, + }, + { + test: "if node status changes then its an update", + old: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "1", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: true, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionFalse, + }, + }, + }, + }, + new: &kubernetes.Node{ + ObjectMeta: kubernetes.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{ + "a": "b", + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "1", + Unschedulable: true, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + }, + updated: true, + }, + } + + for _, test := range tests { + t.Run(test.test, func(t *testing.T) { + assert.Equal(t, test.updated, isUpdated(test.old, test.new)) + }) + } +} diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 606a36ac109..3cef13944ec 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -66,6 +66,9 @@ type WatchOptions struct { Node string // Namespace is used for filtering watched resource to given namespace, use "" for all namespaces Namespace string + // IsUpdated allows registering a func that allows the invoker of the Watch to decide what amounts to an update + // vs what does not. + IsUpdated func(old, new interface{}) bool } type item struct { @@ -100,6 +103,19 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption queue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), objType) ctx, cancel := context.WithCancel(context.Background()) + if opts.IsUpdated == nil { + opts.IsUpdated = func(o, n interface{}) bool { + old, _ := accessor.ResourceVersion(o.(runtime.Object)) + new, _ := accessor.ResourceVersion(n.(runtime.Object)) + + // Only enqueue changes that have a different resource versions to avoid processing resyncs. + if old != new { + return true + } + return false + } + } + w := &watcher{ client: client, informer: informer, @@ -119,11 +135,7 @@ func NewWatcher(client kubernetes.Interface, resource Resource, opts WatchOption w.enqueue(o, delete) }, UpdateFunc: func(o, n interface{}) { - old, _ := accessor.ResourceVersion(o.(runtime.Object)) - new, _ := accessor.ResourceVersion(n.(runtime.Object)) - - // Only enqueue changes that have a different resource versions to avoid processing resyncs. - if old != new { + if opts.IsUpdated(o, n) { w.enqueue(n, update) } },