Skip to content

Commit

Permalink
Merge pull request #249 from urso/bug/144-timeout-unconfigured
Browse files Browse the repository at this point in the history
single mode timeout config
  • Loading branch information
andrewkroh committed Nov 2, 2015
2 parents 4c86c25 + ac7465f commit 2155fb7
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ All notable changes to this project will be documented in this file based on the
- Respect '*' debug selector in IsDebug. #226 (elastic/packetbeat#339)
- Limit number of workers for Elasticsearch output. elastic/packetbeat#226
- On Windows, remove service related error message when running in the console. #242
- Fix waitRetry no configured in single output mode configuration. elastic/filebeat#144

### Added
- Add Console output plugin. #218
Expand Down
4 changes: 3 additions & 1 deletion outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,14 @@ func (out *elasticsearchOutput) init(
}

var waitRetry = time.Duration(1) * time.Second
var maxWaitRetry = time.Duration(60) * time.Second

var m mode.ConnectionMode
out.clients = clients
if len(clients) == 1 {
client := clients[0]
m, err = mode.NewSingleConnectionMode(client, maxRetries, waitRetry, timeout)
m, err = mode.NewSingleConnectionMode(client, maxRetries,
waitRetry, timeout, maxWaitRetry)
} else {
loadBalance := config.LoadBalance == nil || *config.LoadBalance
if loadBalance {
Expand Down
3 changes: 2 additions & 1 deletion outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
)

var waitRetry = time.Duration(1) * time.Second
var maxWaitRetry = time.Duration(60) * time.Second

func (lj *logstash) init(
beat string,
Expand Down Expand Up @@ -99,7 +100,7 @@ func (lj *logstash) init(
var m mode.ConnectionMode
if len(clients) == 1 {
m, err = mode.NewSingleConnectionMode(clients[0],
sendRetries, waitRetry, timeout)
sendRetries, waitRetry, timeout, maxWaitRetry)
} else {
loadBalance := config.LoadBalance != nil && *config.LoadBalance
if loadBalance {
Expand Down
32 changes: 22 additions & 10 deletions outputs/mode/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ type SingleConnectionMode struct {

closed bool // mode closed flag to break publisher loop

timeout time.Duration // connection timeout
waitRetry time.Duration // wait time until reconnect
timeout time.Duration // connection timeout
waitRetry time.Duration // wait time until reconnect
maxWaitRetry time.Duration // Maximum send/retry timeout in backoff case.

// maximum number of configured send attempts. If set to 0, publisher will
// block until event has been successfully published.
Expand All @@ -29,11 +30,15 @@ type SingleConnectionMode struct {
func NewSingleConnectionMode(
client ProtocolClient,
maxAttempts int,
waitRetry, timeout time.Duration,
waitRetry, timeout, maxWaitRetry time.Duration,
) (*SingleConnectionMode, error) {
s := &SingleConnectionMode{
timeout: timeout,
conn: client,
conn: client,

timeout: timeout,
waitRetry: waitRetry,
maxWaitRetry: maxWaitRetry,

maxAttempts: maxAttempts,
}

Expand Down Expand Up @@ -62,15 +67,13 @@ func (s *SingleConnectionMode) PublishEvents(
) error {
published := 0
fails := 0
var backoffCount uint
var err error

for !s.closed && (s.maxAttempts == 0 || fails < s.maxAttempts) {
if err := s.connect(); err != nil {
logp.Info("Connecting error publishing events (retrying): %s", err)

fails++
time.Sleep(s.waitRetry)
continue
goto sendFail
}

for published < len(events) {
Expand All @@ -89,7 +92,16 @@ func (s *SingleConnectionMode) PublishEvents(
return nil
}

time.Sleep(s.waitRetry)
sendFail:
logp.Info("send fail")
backoff := time.Duration(int64(s.waitRetry) * (1 << backoffCount))
if backoff > s.maxWaitRetry {
backoff = s.maxWaitRetry
} else {
backoffCount++
}
logp.Info("backoff retry: %v", backoff)
time.Sleep(backoff)
fails++
}

Expand Down
4 changes: 4 additions & 0 deletions outputs/mode/single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func testSingleSendOneEvent(t *testing.T, events []eventInfo) {
3,
0,
100*time.Millisecond,
1*time.Second,
)
testMode(t, mode, events, signals(true), &collected)
}
Expand All @@ -45,6 +46,7 @@ func testSingleConnectFailConnectAndSend(t *testing.T, events []eventInfo) {
3,
0,
100*time.Millisecond,
1*time.Second,
)
testMode(t, mode, events, signals(true), &collected)
}
Expand All @@ -70,6 +72,7 @@ func testSingleConnectionFail(t *testing.T, events []eventInfo) {
3,
0,
100*time.Millisecond,
1*time.Second,
)
testMode(t, mode, events, signals(false), &collected)
}
Expand All @@ -94,6 +97,7 @@ func testSingleSendFlaky(t *testing.T, events []eventInfo) {
3,
0,
100*time.Millisecond,
1*time.Second,
)
testMode(t, mode, singleEvent(testEvent), signals(true), &collected)
}
Expand Down

0 comments on commit 2155fb7

Please sign in to comment.