From ac7465fa4f6dbc42aded9c98189ce60a15a0dd01 Mon Sep 17 00:00:00 2001 From: urso Date: Mon, 2 Nov 2015 16:23:32 +0100 Subject: [PATCH] single mode timeout config - fix timeout configuration not applied to single mode - add additional backoff mode to single mode on failure --- CHANGELOG.md | 1 + outputs/elasticsearch/output.go | 4 +++- outputs/logstash/logstash.go | 3 ++- outputs/mode/single.go | 32 ++++++++++++++++++++++---------- outputs/mode/single_test.go | 4 ++++ 5 files changed, 32 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dbad4248c170..857f7d344330 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ All notable changes to this project will be documented in this file based on the - Respect '*' debug selector in IsDebug. #226 (elastic/packetbeat#339) - Limit number of workers for Elasticsearch output. elastic/packetbeat#226 - On Windows, remove service related error message when running in the console. #242 +- Fix waitRetry no configured in single output mode configuration. elastic/filebeat#144 ### Added - Add Console output plugin. #218 diff --git a/outputs/elasticsearch/output.go b/outputs/elasticsearch/output.go index 51d0f8c3892c..5e754673add6 100644 --- a/outputs/elasticsearch/output.go +++ b/outputs/elasticsearch/output.go @@ -82,12 +82,14 @@ func (out *elasticsearchOutput) init( } var waitRetry = time.Duration(1) * time.Second + var maxWaitRetry = time.Duration(60) * time.Second var m mode.ConnectionMode out.clients = clients if len(clients) == 1 { client := clients[0] - m, err = mode.NewSingleConnectionMode(client, maxRetries, waitRetry, timeout) + m, err = mode.NewSingleConnectionMode(client, maxRetries, + waitRetry, timeout, maxWaitRetry) } else { loadBalance := config.LoadBalance == nil || *config.LoadBalance if loadBalance { diff --git a/outputs/logstash/logstash.go b/outputs/logstash/logstash.go index 6b7c5d234b75..480897a7cc8a 100644 --- a/outputs/logstash/logstash.go +++ b/outputs/logstash/logstash.go @@ -47,6 +47,7 @@ const ( ) var waitRetry = time.Duration(1) * time.Second +var maxWaitRetry = time.Duration(60) * time.Second func (lj *logstash) init( beat string, @@ -99,7 +100,7 @@ func (lj *logstash) init( var m mode.ConnectionMode if len(clients) == 1 { m, err = mode.NewSingleConnectionMode(clients[0], - sendRetries, waitRetry, timeout) + sendRetries, waitRetry, timeout, maxWaitRetry) } else { loadBalance := config.LoadBalance != nil && *config.LoadBalance if loadBalance { diff --git a/outputs/mode/single.go b/outputs/mode/single.go index ca5236d58531..81a53e0393af 100644 --- a/outputs/mode/single.go +++ b/outputs/mode/single.go @@ -16,8 +16,9 @@ type SingleConnectionMode struct { closed bool // mode closed flag to break publisher loop - timeout time.Duration // connection timeout - waitRetry time.Duration // wait time until reconnect + timeout time.Duration // connection timeout + waitRetry time.Duration // wait time until reconnect + maxWaitRetry time.Duration // Maximum send/retry timeout in backoff case. // maximum number of configured send attempts. If set to 0, publisher will // block until event has been successfully published. @@ -29,11 +30,15 @@ type SingleConnectionMode struct { func NewSingleConnectionMode( client ProtocolClient, maxAttempts int, - waitRetry, timeout time.Duration, + waitRetry, timeout, maxWaitRetry time.Duration, ) (*SingleConnectionMode, error) { s := &SingleConnectionMode{ - timeout: timeout, - conn: client, + conn: client, + + timeout: timeout, + waitRetry: waitRetry, + maxWaitRetry: maxWaitRetry, + maxAttempts: maxAttempts, } @@ -62,15 +67,13 @@ func (s *SingleConnectionMode) PublishEvents( ) error { published := 0 fails := 0 + var backoffCount uint var err error for !s.closed && (s.maxAttempts == 0 || fails < s.maxAttempts) { if err := s.connect(); err != nil { logp.Info("Connecting error publishing events (retrying): %s", err) - - fails++ - time.Sleep(s.waitRetry) - continue + goto sendFail } for published < len(events) { @@ -89,7 +92,16 @@ func (s *SingleConnectionMode) PublishEvents( return nil } - time.Sleep(s.waitRetry) + sendFail: + logp.Info("send fail") + backoff := time.Duration(int64(s.waitRetry) * (1 << backoffCount)) + if backoff > s.maxWaitRetry { + backoff = s.maxWaitRetry + } else { + backoffCount++ + } + logp.Info("backoff retry: %v", backoff) + time.Sleep(backoff) fails++ } diff --git a/outputs/mode/single_test.go b/outputs/mode/single_test.go index 38b55aa7470a..f4d0c9f426d8 100644 --- a/outputs/mode/single_test.go +++ b/outputs/mode/single_test.go @@ -20,6 +20,7 @@ func testSingleSendOneEvent(t *testing.T, events []eventInfo) { 3, 0, 100*time.Millisecond, + 1*time.Second, ) testMode(t, mode, events, signals(true), &collected) } @@ -45,6 +46,7 @@ func testSingleConnectFailConnectAndSend(t *testing.T, events []eventInfo) { 3, 0, 100*time.Millisecond, + 1*time.Second, ) testMode(t, mode, events, signals(true), &collected) } @@ -70,6 +72,7 @@ func testSingleConnectionFail(t *testing.T, events []eventInfo) { 3, 0, 100*time.Millisecond, + 1*time.Second, ) testMode(t, mode, events, signals(false), &collected) } @@ -94,6 +97,7 @@ func testSingleSendFlaky(t *testing.T, events []eventInfo) { 3, 0, 100*time.Millisecond, + 1*time.Second, ) testMode(t, mode, singleEvent(testEvent), signals(true), &collected) }