From 095e2d03b52cf0b9ad9cfa3b93424aa97d80508c Mon Sep 17 00:00:00 2001 From: Vijay Samuel Date: Tue, 6 Mar 2018 23:18:33 -0800 Subject: [PATCH] Fix infinite failure on Kubernetes watch --- CHANGELOG.asciidoc | 1 + libbeat/common/kubernetes/watcher.go | 23 ++++++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 978e0d1fca31..efefc7123a86 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -87,6 +87,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Fix conditions checking on autodiscover Docker labels. {pull}6412[6412] - Fix for kafka logger. {pull}6430[6430] - Remove double slashes in Windows service script. {pull}6491[6491] +- Fix infinite failure on Kubernetes watch {pull}6504[6504] *Auditbeat* diff --git a/libbeat/common/kubernetes/watcher.go b/libbeat/common/kubernetes/watcher.go index 022d7f1588d5..29b64c25afa0 100644 --- a/libbeat/common/kubernetes/watcher.go +++ b/libbeat/common/kubernetes/watcher.go @@ -3,6 +3,7 @@ package kubernetes import ( "context" "fmt" + "io" "time" "github.com/ericchiang/k8s" @@ -11,6 +12,9 @@ import ( "github.com/elastic/beats/libbeat/logp" ) +// Max back off time for retries +const maxBackoff = 30 * time.Second + func filterByNode(node string) k8s.Option { return k8s.QueryParam("fieldSelector", "spec.nodeName="+node) } @@ -161,6 +165,9 @@ func (w *watcher) Start() error { } func (w *watcher) watch() { + // Failures counter, do exponential backoff on retries + var failures uint + for { select { case <-w.ctx.Done(): @@ -176,7 +183,8 @@ func (w *watcher) watch() { //watch failures should be logged and gracefully failed over as metadata retrieval //should never stop. logp.Err("kubernetes: Watching API error %v", err) - time.Sleep(time.Second) + backoff(failures) + failures++ continue } @@ -186,8 +194,14 @@ func (w *watcher) watch() { if err != nil { logp.Err("kubernetes: Watching API error %v", err) watcher.Close() + if !(err == io.EOF || err == io.ErrUnexpectedEOF) { + // This is an error event which can be recovered by moving to the latest resource verison + logp.Info("kubernetes: Ignoring event, moving to most recent resource version") + w.lastResourceVersion = "" + } break } + failures = 0 switch eventType { case k8s.EventAdded: w.onAdd(r) @@ -205,3 +219,10 @@ func (w *watcher) watch() { func (w *watcher) Stop() { w.stop() } +func backoff(failures uint) { + wait := 1 << failures * time.Second + if wait > maxBackoff { + wait = maxBackoff + } + time.Sleep(wait) +}