Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand check-in payload for V2 #916

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 81 additions & 4 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"time"

eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/dispatcher"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway"
Expand All @@ -21,12 +22,16 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

// Max number of times an invalid API Key is checked
const maxUnauthCounter int = 6

// Const for decraded state or linter complains
const degraded = "degraded"

// Default Configuration for the Fleet Gateway.
var defaultGatewaySettings = &fleetGatewaySettings{
Duration: 1 * time.Second, // time between successful calls
Expand Down Expand Up @@ -298,6 +303,73 @@ func (f *fleetGateway) doExecute(ctx context.Context, bo backoff.Backoff) (*flee
return nil, ctx.Err()
}

func (f *fleetGateway) convertToCheckinComponents(components []runtime.ComponentComponentState) []fleetapi.CheckinComponent {
if components == nil {
return nil
}
stateString := func(s eaclient.UnitState) string {
switch s {
case eaclient.UnitStateStarting:
return "starting"
case eaclient.UnitStateConfiguring:
return "configuring"
case eaclient.UnitStateHealthy:
return "healthy"
case eaclient.UnitStateDegraded:
return degraded
case eaclient.UnitStateFailed:
return "failed"
case eaclient.UnitStateStopping:
return "stopping"
case eaclient.UnitStateStopped:
return "stopped"
}
return ""
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blakerouse @aleksmaus I believe we have a similar lock elsewhere (inspect or diagnostic) maybe we should have an helper function that takes the UnitState and return a human readable representation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for inspect and diagnostics the Stringer interface that I added to elastic-agent-client works in that situation. And really it produces the same result as here, but this will be an API interface with Fleet Server so coding it here seems safer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, This is a valid point.


unitTypeString := func(t eaclient.UnitType) string {
switch t {
case eaclient.UnitTypeInput:
return "input"
case eaclient.UnitTypeOutput:
return "output"
}
return ""
}

checkinComponents := make([]fleetapi.CheckinComponent, 0, len(components))

for _, item := range components {
component := item.Component
state := item.State

checkinComponent := fleetapi.CheckinComponent{
ID: component.ID,
Type: component.Spec.InputType,
Status: stateString(state.State),
Message: state.Message,
}

if state.Units != nil {
units := make([]fleetapi.CheckinUnit, 0, len(state.Units))

for unitKey, unitState := range state.Units {
units = append(units, fleetapi.CheckinUnit{
ID: unitKey.UnitID,
Type: unitTypeString(unitKey.UnitType),
Status: stateString(unitState.State),
Message: unitState.Message,
Payload: unitState.Payload,
})
}
checkinComponent.Units = units
}
checkinComponents = append(checkinComponents, checkinComponent)
}

return checkinComponents
}

func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, error) {
ecsMeta, err := info.Metadata()
if err != nil {
Expand All @@ -313,12 +385,17 @@ func (f *fleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
// get current state
state := f.stateFetcher.State()

// convert components into checkin components structure
components := f.convertToCheckinComponents(state.Components)

// checkin
cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client)
req := &fleetapi.CheckinRequest{
AckToken: ackToken,
Metadata: ecsMeta,
Status: agentStateToString(state.State),
AckToken: ackToken,
Metadata: ecsMeta,
Status: agentStateToString(state.State),
Message: state.Message,
Components: components,
}

resp, err := cmd.Execute(ctx, req)
Expand Down Expand Up @@ -372,5 +449,5 @@ func agentStateToString(state agentclient.State) string {
case agentclient.Failed:
return "error"
}
return "degraded"
return degraded
}
26 changes: 22 additions & 4 deletions internal/pkg/fleetapi/checkin_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,30 @@ import (

const checkingPath = "/api/fleet/agents/%s/checkin"

type CheckinUnit struct {
ID string `json:"id"`
Type string `json:"type"`
Status string `json:"status"`
Message string `json:"message"`
Payload map[string]interface{} `json:"payload,omitempty"`
}

type CheckinComponent struct {
ID string `json:"id"`
Type string `json:"type"`
Status string `json:"status"`
Message string `json:"message"`
Units []CheckinUnit `json:"units,omitempty"`
}

// CheckinRequest consists of multiple events reported to fleet ui.
type CheckinRequest struct {
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Events []SerializableEvent `json:"events"`
Metadata *info.ECSMeta `json:"local_metadata,omitempty"`
Status string `json:"status"`
AckToken string `json:"ack_token,omitempty"`
Events []SerializableEvent `json:"events"`
Metadata *info.ECSMeta `json:"local_metadata,omitempty"`
Message string `json:"message"` // V2 Agent message
Components []CheckinComponent `json:"components"` // V2 Agent components
}

// SerializableEvent is a representation of the event to be send to the Fleet Server API via the checkin
Expand Down