diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index ca3f548d578..b7e80480397 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -8,7 +8,7 @@ // Template, add newest changes here === Beats version HEAD -https://github.com/elastic/beats/compare/v1.2.3...1.2[Check the HEAD diff] +https://github.com/elastic/beats/compare/v1.2.3...1.3[Check the HEAD diff] ==== Breaking changes @@ -26,6 +26,7 @@ https://github.com/elastic/beats/compare/v1.2.3...1.2[Check the HEAD diff] ==== Bugfixes *Affecting all Beats* +- Fix output modes backoff counter reset. {issue}1803[1803] {pull}1814[1814] {pull}1818[1818] *Packetbeat* diff --git a/libbeat/outputs/mode/balance.go b/libbeat/outputs/mode/balance.go index 8b8b5a2106b..6404dc421e0 100644 --- a/libbeat/outputs/mode/balance.go +++ b/libbeat/outputs/mode/balance.go @@ -40,6 +40,8 @@ type LoadBalancerMode struct { // block until event has been successfully published. maxAttempts int + backoffCount uint // number of consecutive failed retry attempts. + // waitGroup + signaling channel for handling shutdown wg sync.WaitGroup done chan struct{} @@ -191,7 +193,6 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) { } else { events := msg.events total := len(events) - var backoffCount uint for len(events) > 0 { var err error @@ -222,11 +223,11 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) { } // wait before retry - backoff := time.Duration(int64(m.waitRetry) * (1 << backoffCount)) + backoff := time.Duration(int64(m.waitRetry) * (1 << m.backoffCount)) if backoff > m.maxWaitRetry { backoff = m.maxWaitRetry } else { - backoffCount++ + m.backoffCount++ } select { case <-m.done: // shutdown @@ -240,6 +241,8 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) { } } } + + m.backoffCount = 0 outputs.SignalCompleted(msg.signaler) } diff --git a/libbeat/outputs/mode/single.go b/libbeat/outputs/mode/single.go index a0cd1f3fde6..f486365299e 100644 --- a/libbeat/outputs/mode/single.go +++ b/libbeat/outputs/mode/single.go @@ -19,6 +19,7 @@ type SingleConnectionMode struct { timeout time.Duration // connection timeout waitRetry time.Duration // wait time until reconnect maxWaitRetry time.Duration // Maximum send/retry timeout in backoff case. + backoffCount uint // number of consecutive failed retry attempts. // maximum number of configured send attempts. If set to 0, publisher will // block until event has been successfully published. @@ -113,7 +114,6 @@ func (s *SingleConnectionMode) publish( send func() (ok bool, resetFail bool), ) error { fails := 0 - var backoffCount uint var err error guaranteed := opts.Guaranteed || s.maxAttempts == 0 @@ -132,8 +132,8 @@ func (s *SingleConnectionMode) publish( goto sendFail } - backoffCount = 0 debug("send completed") + s.backoffCount = 0 outputs.SignalCompleted(signaler) return nil @@ -142,7 +142,7 @@ func (s *SingleConnectionMode) publish( if resetFail { debug("reset fails") fails = 0 - backoffCount = 0 + s.backoffCount = 0 } if !guaranteed && (s.maxAttempts > 0 && fails == s.maxAttempts) { @@ -152,11 +152,11 @@ func (s *SingleConnectionMode) publish( } logp.Info("send fail") - backoff := time.Duration(int64(s.waitRetry) * (1 << backoffCount)) + backoff := time.Duration(int64(s.waitRetry) * (1 << s.backoffCount)) if backoff > s.maxWaitRetry { backoff = s.maxWaitRetry } else { - backoffCount++ + s.backoffCount++ } logp.Info("backoff retry: %v", backoff) time.Sleep(backoff)