Skip to content

Commit

Permalink
[Heartbeat] Adjust State loader to only retry for failed requests and…
Browse files Browse the repository at this point in the history
… not for 4xx (#37981) (#38163)

* only retry when the status is 5xx

* remove test AAA comments

* add changelog

* correct changelog modification

* fix ES query

* change error handling strategy

* do not retry when there is malformed data

* improve retry mechanism

* improve log message

* improve changelog

* fix log format

(cherry picked from commit 27cde87)

Co-authored-by: Alberto Delgado Roda <[email protected]>
  • Loading branch information
mergify[bot] and devcorpio authored Mar 4, 2024
1 parent a5454db commit 999fb8c
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ fields added to events containing the Beats version. {pull}37553[37553]

*Heartbeat*

- Fix panics when parsing dereferencing invalid parsed url. {pull}34702[34702]
- Fix setuid root when running under cgroups v2. {pull}37794[37794]
- Adjust State loader to only retry when response code status is 5xx {pull}37981[37981]

*Metricbeat*

Expand Down
22 changes: 18 additions & 4 deletions heartbeat/monitors/wrappers/monitorstate/esloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@ import (

var DefaultDataStreams = "synthetics-*,heartbeat-*"

type LoaderError struct {
err error
Retry bool
}

func (e LoaderError) Error() string {
return e.err.Error()
}

func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation *config.LocationWithID) StateLoader {
if indexPattern == "" {
// Should never happen, but if we ever make a coding error...
logp.L().Warn("ES state loader initialized with no index pattern, will not load states from ES")
return NilStateLoader
}

return func(sf stdfields.StdMonitorFields) (*State, error) {
var runFromID string
if sf.RunFrom != nil {
Expand Down Expand Up @@ -74,10 +82,11 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
},
},
}

status, body, err := esc.Request("POST", strings.Join([]string{"/", indexPattern, "/", "_search", "?size=1"}, ""), "", nil, reqBody)
if err != nil || status > 299 {
return nil, fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err)
sErr := fmt.Errorf("error executing state search for %s in loc=%s: %w", sf.ID, runFromID, err)
retry := shouldRetry(status)
return nil, LoaderError{err: sErr, Retry: retry}
}

type stateHits struct {
Expand All @@ -94,7 +103,8 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
sh := stateHits{}
err = json.Unmarshal(body, &sh)
if err != nil {
return nil, fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err)
sErr := fmt.Errorf("could not unmarshal state hits for %s: %w", sf.ID, err)
return nil, LoaderError{err: sErr, Retry: false}
}

if len(sh.Hits.Hits) == 0 {
Expand All @@ -107,3 +117,7 @@ func MakeESLoader(esc *eslegclient.Connection, indexPattern string, beatLocation
return state, nil
}
}

func shouldRetry(status int) bool {
return status >= 500
}
63 changes: 61 additions & 2 deletions heartbeat/monitors/wrappers/monitorstate/esloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ package monitorstate

import (
"fmt"
"io"
"net/http"
"strings"
"testing"
"time"

Expand All @@ -33,6 +36,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/config"
"github.com/elastic/beats/v7/heartbeat/esutil"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/processors/util"
)

Expand All @@ -51,7 +55,7 @@ func TestStatesESLoader(t *testing.T) {

monID := etc.createTestMonitorStateInES(t, testStatus)
// Since we've continued this state it should register the initial state
ms := etc.tracker.GetCurrentState(monID)
ms := etc.tracker.GetCurrentState(monID, RetryConfig{})
require.True(t, ms.StartedAt.After(testStart.Add(-time.Nanosecond)), "timestamp for new state is off")
requireMSStatusCount(t, ms, testStatus, 1)

Expand Down Expand Up @@ -89,8 +93,61 @@ func TestStatesESLoader(t *testing.T) {
}
}

func TestMakeESLoaderError(t *testing.T) {
tests := []struct {
name string
statusCode int
expected bool
}{
{
name: "should return a retryable error",
statusCode: http.StatusInternalServerError,
expected: true,
},
{
name: "should not return a retryable error",
statusCode: http.StatusNotFound,
expected: false,
},
{
name: "should not return a retryable error when handling malformed data",
statusCode: http.StatusOK,
expected: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
etc := newESTestContext(t)
etc.ec.HTTP = fakeHTTPClient{respStatus: test.statusCode}
loader := MakeESLoader(etc.ec, "fakeIndexPattern", etc.location)

_, err := loader(stdfields.StdMonitorFields{})

var loaderError LoaderError
require.ErrorAs(t, err, &loaderError)
require.Equal(t, loaderError.Retry, test.expected)
})
}
}

type fakeHTTPClient struct {
respStatus int
}

func (fc fakeHTTPClient) Do(req *http.Request) (resp *http.Response, err error) {
return &http.Response{
StatusCode: fc.respStatus,
Body: io.NopCloser(strings.NewReader("test response")),
}, nil
}

func (fc fakeHTTPClient) CloseIdleConnections() {
// noop
}

type esTestContext struct {
namespace string
ec *eslegclient.Connection
esc *elasticsearch.Client
loader StateLoader
tracker *Tracker
Expand All @@ -106,10 +163,12 @@ func newESTestContext(t *testing.T) *esTestContext {
}
namespace, _ := uuid.NewV4()
esc := IntegApiClient(t)
ec := IntegES(t)
etc := &esTestContext{
namespace: namespace.String(),
esc: esc,
loader: IntegESLoader(t, fmt.Sprintf("synthetics-*-%s", namespace.String()), location),
ec: ec,
loader: IntegESLoader(t, ec, fmt.Sprintf("synthetics-*-%s", namespace.String()), location),
location: location,
}

Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/wrappers/monitorstate/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (

// Helpers for tests here and elsewhere

func IntegESLoader(t *testing.T, indexPattern string, location *config.LocationWithID) StateLoader {
return MakeESLoader(IntegES(t), indexPattern, location)
func IntegESLoader(t *testing.T, esc *eslegclient.Connection, indexPattern string, location *config.LocationWithID) StateLoader {
return MakeESLoader(esc, indexPattern, location)
}

func IntegES(t *testing.T) (esc *eslegclient.Connection) {
Expand Down
34 changes: 27 additions & 7 deletions heartbeat/monitors/wrappers/monitorstate/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
t.mtx.Lock()
defer t.mtx.Unlock()

state := t.GetCurrentState(sf)
state := t.GetCurrentState(sf, RetryConfig{})
if state == nil {
state = newMonitorState(sf, newStatus, 0, t.flappingEnabled)
logp.L().Infof("initializing new state for monitor %s: %s", sf.ID, state.String())
Expand All @@ -75,36 +75,56 @@ func (t *Tracker) RecordStatus(sf stdfields.StdMonitorFields, newStatus StateSta
}

func (t *Tracker) GetCurrentStatus(sf stdfields.StdMonitorFields) StateStatus {
s := t.GetCurrentState(sf)
s := t.GetCurrentState(sf, RetryConfig{})
if s == nil {
return StatusEmpty
}
return s.Status
}

func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields) (state *State) {
type RetryConfig struct {
attempts int
waitFn func() time.Duration
}

func (t *Tracker) GetCurrentState(sf stdfields.StdMonitorFields, rc RetryConfig) (state *State) {
if state, ok := t.states[sf.ID]; ok {
return state
}

tries := 3
// Default number of attempts
attempts := 3
if rc.attempts != 0 {
attempts = rc.attempts
}

var loadedState *State
var err error
for i := 0; i < tries; i++ {
var i int
for i = 0; i < attempts; i++ {
loadedState, err = t.stateLoader(sf)
if err == nil {
if loadedState != nil {
logp.L().Infof("loaded previous state for monitor %s: %s", sf.ID, loadedState.String())
}
break
}
var loaderError LoaderError
if errors.As(err, &loaderError) && !loaderError.Retry {
logp.L().Warnf("could not load last externally recorded state: %v", loaderError)
break
}

// Default sleep time
sleepFor := (time.Duration(i*i) * time.Second) + (time.Duration(rand.Intn(500)) * time.Millisecond)
logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %w", sleepFor.Milliseconds(), err)
if rc.waitFn != nil {
sleepFor = rc.waitFn()
}
logp.L().Warnf("could not load last externally recorded state, will retry again in %d milliseconds: %v", sleepFor.Milliseconds(), err)
time.Sleep(sleepFor)
}
if err != nil {
logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", tries, sf.ID)
logp.L().Warnf("could not load prior state from elasticsearch after %d attempts, will create new state for monitor: %s", i+1, sf.ID)
}

if loadedState != nil {
Expand Down
49 changes: 49 additions & 0 deletions heartbeat/monitors/wrappers/monitorstate/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package monitorstate

import (
"errors"
"testing"
"time"

Expand Down Expand Up @@ -131,3 +132,51 @@ func TestDeferredStateLoader(t *testing.T) {
resState, _ = dsl(stdfields.StdMonitorFields{})
require.Equal(t, stateA, resState)
}

func TestStateLoaderRetry(t *testing.T) {
// While testing the sleep time between retries should be negligible
waitFn := func() time.Duration {
return time.Microsecond
}

tests := []struct {
name string
retryable bool
rc RetryConfig
expectedCalls int
}{
{
"should retry 3 times when fails with retryable error",
true,
RetryConfig{waitFn: waitFn},
3,
},
{
"should not retry when fails with non-retryable error",
false,
RetryConfig{waitFn: waitFn},
1,
},
{
"should honour the configured number of attempts when fails with retryable error",
true,
RetryConfig{attempts: 5, waitFn: waitFn},
5,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
calls := 0
errorStateLoader := func(_ stdfields.StdMonitorFields) (*State, error) {
calls += 1
return nil, LoaderError{err: errors.New("test error"), Retry: tt.retryable}
}

mst := NewTracker(errorStateLoader, true)
mst.GetCurrentState(stdfields.StdMonitorFields{}, tt.rc)

require.Equal(t, calls, tt.expectedCalls)
})
}
}

0 comments on commit 999fb8c

Please sign in to comment.