Skip to content

Commit

Permalink
Expand status reporter/controller interfaces to allow local reporters (
Browse files Browse the repository at this point in the history
…#1285)

* Expand status reporter/controller interfaces to allow local reporters

Add a local reporter map to the status controller. These reporters are
not used when updating status with fleet-server, they are only used to
gather local state information - specifically if the agent is degraded
because checkin with fleet-server has failed. This bypasses the bug that
was introduced with the liveness endpoint where the agent could checkin
(to fleet-server) with a degraded status because a previous checkin
failed. Local reporters are used to generate a separate status. This
status is used in the liveness endpoint.

* fix linter

(cherry picked from commit 717708a)

# Conflicts:
#	internal/pkg/core/status/reporter.go
  • Loading branch information
michel-laterman authored and mergify[bot] committed Sep 26, 2022
1 parent 0283688 commit ddaca73
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
- Use at least warning level for all status logs {pull}1218[1218]
- Remove fleet event reporter and events from checkin body. {issue}993[993]
- Fix unintended reset of source URI when downloading components {pull}1252[1252]
- Create separate status reporter for local only events so that degraded fleet-checkins no longer affect health on successful fleet-checkins. {issue}1157[1157] {pull}1285[1285]

==== New features

Expand Down
9 changes: 6 additions & 3 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type fleetGateway struct {
checkinFailCounter int
statusController status.Controller
statusReporter status.Reporter
localReporter status.Reporter
stateStore stateStore
queue actionQueue
}
Expand Down Expand Up @@ -156,6 +157,7 @@ func newFleetGatewayWithScheduler(
done: done,
acker: acker,
statusReporter: statusController.RegisterComponent("gateway"),
localReporter: statusController.RegisterLocalComponent("gateway-checkin"),
statusController: statusController,
stateStore: stateStore,
queue: queue,
Expand Down Expand Up @@ -208,6 +210,7 @@ func (f *fleetGateway) worker() {
f.statusReporter.Update(state.Failed, errMsg, nil)
} else {
f.statusReporter.Update(state.Healthy, "", nil)
f.localReporter.Update(state.Healthy, "", nil) // we don't need to specifically set the local reporter to failed above, but it needs to be reset to healthy if a checking succeeds
}

case <-f.bgContext.Done():
Expand Down Expand Up @@ -291,12 +294,11 @@ func (f *fleetGateway) doExecute() (*fleetapi.CheckinResponse, error) {
)

f.log.Error(err)
f.localReporter.Update(state.Failed, err.Error(), nil)
return nil, err
}
if f.checkinFailCounter > 1 {
// do not update status reporter with failure
// status reporter would report connection failure on first successful connection, leading to
// stale result for certain period causing slight confusion.
f.localReporter.Update(state.Degraded, fmt.Sprintf("checkin failed: %v", err), nil)
f.log.Errorf("checking number %d failed: %s", f.checkinFailCounter, err.Error())
}
continue
Expand Down Expand Up @@ -386,6 +388,7 @@ func (f *fleetGateway) stop() {
f.log.Info("Fleet gateway is stopping")
defer f.scheduler.Stop()
f.statusReporter.Unregister()
f.localReporter.Unregister()
close(f.done)
f.wg.Wait()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage"
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/core/state"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
noopacker "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
Expand Down Expand Up @@ -703,12 +704,18 @@ func TestRetriesOnFailures(t *testing.T) {
queue.On("DequeueActions").Return([]fleetapi.Action{})
queue.On("Actions").Return([]fleetapi.Action{})

localReporter := &testutils.MockReporter{}
localReporter.On("Update", state.Degraded, mock.Anything, mock.Anything).Times(2)
localReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
localReporter.On("Unregister").Maybe()

fleetReporter := &testutils.MockReporter{}
fleetReporter.On("Update", mock.Anything, mock.Anything, mock.Anything).Maybe()
fleetReporter.On("Unregister").Maybe()

statusController := &testutils.MockController{}
statusController.On("RegisterComponent", "gateway").Return(fleetReporter).Once()
statusController.On("RegisterLocalComponent", "gateway-checkin").Return(localReporter).Once()
statusController.On("StatusString").Return("string")

gateway, err := newFleetGatewayWithScheduler(
Expand Down Expand Up @@ -767,6 +774,7 @@ func TestRetriesOnFailures(t *testing.T) {
waitFn()
statusController.AssertExpectations(t)
fleetReporter.AssertExpectations(t)
localReporter.AssertExpectations(t)
})

t.Run("The retry loop is interruptible",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ import (

type noopController struct{}

func (*noopController) SetAgentID(_ string) {}
func (*noopController) RegisterComponent(_ string) status.Reporter { return &noopReporter{} }
func (*noopController) SetAgentID(_ string) {}
func (*noopController) RegisterComponent(_ string) status.Reporter { return &noopReporter{} }
func (*noopController) RegisterLocalComponent(_ string) status.Reporter { return &noopReporter{} }
func (*noopController) RegisterComponentWithPersistance(_ string, _ bool) status.Reporter {
return &noopReporter{}
}
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
func (*noopController) RegisterApp(_ string, _ string) status.Reporter { return &noopReporter{} }
func (*noopController) Status() status.AgentStatus { return status.AgentStatus{Status: status.Healthy} }
func (*noopController) LocalStatus() status.AgentStatus {
return status.AgentStatus{Status: status.Healthy}
}
func (*noopController) StatusCode() status.AgentStatusCode { return status.Healthy }
func (*noopController) UpdateStateID(_ string) {}
func (*noopController) StatusString() string { return "online" }
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/core/status/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ type LivenessResponse struct {
}

// ServeHTTP is an HTTP Handler for the status controller.
// It uses the local agent status so it is able to report a degraded state if the fleet-server checkin has issues.
// Respose code is 200 for a healthy agent, and 503 otherwise.
// Response body is a JSON object that contains the agent ID, status, message, and the last status update time.
func (r *controller) ServeHTTP(wr http.ResponseWriter, req *http.Request) {
s := r.Status()
s := r.LocalStatus()
lr := LivenessResponse{
ID: r.agentID,
Status: s.Status.String(),
Expand Down
78 changes: 78 additions & 0 deletions internal/pkg/core/status/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,19 @@ type AgentStatus struct {
type Controller interface {
SetAgentID(string)
RegisterComponent(string) Reporter
RegisterLocalComponent(string) Reporter
RegisterComponentWithPersistance(string, bool) Reporter
RegisterApp(id string, name string) Reporter
Status() AgentStatus
LocalStatus() AgentStatus
StatusCode() AgentStatusCode
StatusString() string
UpdateStateID(string)
ServeHTTP(http.ResponseWriter, *http.Request)
}

type controller struct {
<<<<<<< HEAD
mx sync.Mutex
status AgentStatusCode
message string
Expand All @@ -77,6 +80,21 @@ type controller struct {
log *logger.Logger
stateID string
agentID string
=======
updateTime time.Time
log *logger.Logger
reporters map[string]*reporter
localReporters map[string]*reporter
appReporters map[string]*reporter
stateID string
message string
agentID string
status AgentStatusCode
localStatus AgentStatusCode
localMessage string
localTime time.Time
mx sync.Mutex
>>>>>>> 717708a72 (Expand status reporter/controller interfaces to allow local reporters (#1285))
}

// NewController creates a new reporter.
Expand Down Expand Up @@ -126,6 +144,28 @@ func (r *controller) UpdateStateID(stateID string) {
r.updateStatus()
}

// RegisterLocalComponent registers new component for local-only status updates.
func (r *controller) RegisterLocalComponent(componentIdentifier string) Reporter {
id := componentIdentifier + "-" + uuid.New().String()[:8]
rep := &reporter{
name: componentIdentifier,
isRegistered: true,
unregisterFunc: func() {
r.mx.Lock()
delete(r.localReporters, id)
r.mx.Unlock()
},
notifyChangeFunc: r.updateStatus,
isPersistent: false,
}

r.mx.Lock()
r.localReporters[id] = rep
r.mx.Unlock()

return rep
}

// Register registers new component for status updates.
func (r *controller) RegisterComponent(componentIdentifier string) Reporter {
return r.RegisterComponentWithPersistance(componentIdentifier, false)
Expand Down Expand Up @@ -199,6 +239,25 @@ func (r *controller) Status() AgentStatus {
}
}

// LocalStatus returns the status from the local registered components if they are different from the agent status.
// If the agent status is more severe then the local status (failed vs degraded for example) agent status is used.
// If they are equal (healthy and healthy) agent status is used.
func (r *controller) LocalStatus() AgentStatus {
status := r.Status()
r.mx.Lock()
defer r.mx.Unlock()

if r.localStatus > status.Status {
return AgentStatus{
Status: r.localStatus,
Message: r.localMessage,
UpdateTime: r.localTime,
}
}
return status

}

// StatusCode retrieves current agent status code.
func (r *controller) StatusCode() AgentStatusCode {
r.mx.Lock()
Expand All @@ -208,9 +267,23 @@ func (r *controller) StatusCode() AgentStatusCode {

func (r *controller) updateStatus() {
status := Healthy
lStatus := Healthy
message := ""
lMessage := ""

r.mx.Lock()
for id, rep := range r.localReporters {
s := statusToAgentStatus(rep.status)
if s > lStatus {
lStatus = s
lMessage = fmt.Sprintf("component %s: %s", id, rep.message)
}
r.log.Debugf("local component '%s' has status '%s'", id, s)
if status == Failed {
break
}
}

for id, rep := range r.reporters {
s := statusToAgentStatus(rep.status)
if s > status {
Expand Down Expand Up @@ -244,6 +317,11 @@ func (r *controller) updateStatus() {
r.message = message
r.updateTime = time.Now().UTC()
}
if r.localStatus != lStatus {
r.localStatus = lStatus
r.localMessage = lMessage
r.localTime = time.Now().UTC()
}

r.mx.Unlock()

Expand Down
10 changes: 10 additions & 0 deletions internal/pkg/testutils/status_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ func (m *MockController) RegisterComponent(id string) status.Reporter {
return args.Get(0).(status.Reporter)
}

func (m *MockController) RegisterLocalComponent(id string) status.Reporter {
args := m.Called(id)
return args.Get(0).(status.Reporter)
}

func (m *MockController) RegisterComponentWithPersistance(id string, b bool) status.Reporter {
args := m.Called(id, b)
return args.Get(0).(status.Reporter)
Expand All @@ -40,6 +45,11 @@ func (m *MockController) Status() status.AgentStatus {
return args.Get(0).(status.AgentStatus)
}

func (m *MockController) LocalStatus() status.AgentStatus {
args := m.Called()
return args.Get(0).(status.AgentStatus)
}

func (m *MockController) StatusCode() status.AgentStatusCode {
args := m.Called()
return args.Get(0).(status.AgentStatusCode)
Expand Down

0 comments on commit ddaca73

Please sign in to comment.