Skip to content

Commit

Permalink
Backport: remember backoff counts (elastic#1818)
Browse files Browse the repository at this point in the history
Remember backoff counts between calls to publish, so backoff is not reset in
between multiple calls to publish
  • Loading branch information
Steffen Siering authored and ruflin committed Jun 8, 2016
1 parent 8527545 commit c14cf80
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// Template, add newest changes here

=== Beats version HEAD
https://github.com/elastic/beats/compare/v1.2.3...1.2[Check the HEAD diff]
https://github.com/elastic/beats/compare/v1.2.3...1.3[Check the HEAD diff]

==== Breaking changes

Expand All @@ -26,6 +26,7 @@ https://github.com/elastic/beats/compare/v1.2.3...1.2[Check the HEAD diff]
==== Bugfixes

*Affecting all Beats*
- Fix output modes backoff counter reset. {issue}1803[1803] {pull}1814[1814] {pull}1818[1818]

*Packetbeat*

Expand Down
9 changes: 6 additions & 3 deletions libbeat/outputs/mode/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type LoadBalancerMode struct {
// block until event has been successfully published.
maxAttempts int

backoffCount uint // number of consecutive failed retry attempts.

// waitGroup + signaling channel for handling shutdown
wg sync.WaitGroup
done chan struct{}
Expand Down Expand Up @@ -191,7 +193,6 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
} else {
events := msg.events
total := len(events)
var backoffCount uint

for len(events) > 0 {
var err error
Expand Down Expand Up @@ -222,11 +223,11 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
}

// wait before retry
backoff := time.Duration(int64(m.waitRetry) * (1 << backoffCount))
backoff := time.Duration(int64(m.waitRetry) * (1 << m.backoffCount))
if backoff > m.maxWaitRetry {
backoff = m.maxWaitRetry
} else {
backoffCount++
m.backoffCount++
}
select {
case <-m.done: // shutdown
Expand All @@ -240,6 +241,8 @@ func (m *LoadBalancerMode) onMessage(client ProtocolClient, msg eventsMessage) {
}
}
}

m.backoffCount = 0
outputs.SignalCompleted(msg.signaler)
}

Expand Down
10 changes: 5 additions & 5 deletions libbeat/outputs/mode/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type SingleConnectionMode struct {
timeout time.Duration // connection timeout
waitRetry time.Duration // wait time until reconnect
maxWaitRetry time.Duration // Maximum send/retry timeout in backoff case.
backoffCount uint // number of consecutive failed retry attempts.

// maximum number of configured send attempts. If set to 0, publisher will
// block until event has been successfully published.
Expand Down Expand Up @@ -113,7 +114,6 @@ func (s *SingleConnectionMode) publish(
send func() (ok bool, resetFail bool),
) error {
fails := 0
var backoffCount uint
var err error

guaranteed := opts.Guaranteed || s.maxAttempts == 0
Expand All @@ -132,8 +132,8 @@ func (s *SingleConnectionMode) publish(
goto sendFail
}

backoffCount = 0
debug("send completed")
s.backoffCount = 0
outputs.SignalCompleted(signaler)
return nil

Expand All @@ -142,7 +142,7 @@ func (s *SingleConnectionMode) publish(
if resetFail {
debug("reset fails")
fails = 0
backoffCount = 0
s.backoffCount = 0
}

if !guaranteed && (s.maxAttempts > 0 && fails == s.maxAttempts) {
Expand All @@ -152,11 +152,11 @@ func (s *SingleConnectionMode) publish(
}

logp.Info("send fail")
backoff := time.Duration(int64(s.waitRetry) * (1 << backoffCount))
backoff := time.Duration(int64(s.waitRetry) * (1 << s.backoffCount))
if backoff > s.maxWaitRetry {
backoff = s.maxWaitRetry
} else {
backoffCount++
s.backoffCount++
}
logp.Info("backoff retry: %v", backoff)
time.Sleep(backoff)
Expand Down

0 comments on commit c14cf80

Please sign in to comment.