-
Notifications
You must be signed in to change notification settings - Fork 148
/
Copy pathack_cmd.go
143 lines (122 loc) · 4.34 KB
/
ack_cmd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package fleetapi
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"go.elastic.co/apm"
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
)
const ackPath = "/api/fleet/agents/%s/acks"
// AckEvent is an event sent in an ACK request.
type AckEvent struct {
EventType string `json:"type"` // 'STATE' | 'ERROR' | 'ACTION_RESULT' | 'ACTION'
SubType string `json:"subtype"` // 'RUNNING','STARTING','IN_PROGRESS','CONFIG','FAILED','STOPPING','STOPPED','DATA_DUMP','ACKNOWLEDGED','UNKNOWN';
Timestamp string `json:"timestamp"` // : '2019-01-05T14:32:03.36764-05:00',
ActionID string `json:"action_id"` // : '48cebde1-c906-4893-b89f-595d943b72a2',
AgentID string `json:"agent_id"` // : 'agent1',
Message string `json:"message,omitempty"` // : 'hello2',
Payload string `json:"payload,omitempty"` // : 'payload2',
ActionInputType string `json:"action_input_type,omitempty"` // copy of original action input_type
ActionData json.RawMessage `json:"action_data,omitempty"` // copy of original action data
ActionResponse map[string]interface{} `json:"action_response,omitempty"` // custom (per beat) response payload
StartedAt string `json:"started_at,omitempty"` // time action started
CompletedAt string `json:"completed_at,omitempty"` // time action completed
Error string `json:"error,omitempty"` // optional action error
}
// AckRequest consists of multiple actions acked to fleet ui.
// POST /agents/{agentId}/acks
// Authorization: ApiKey {AgentAccessApiKey}
// {
// "action_ids": ["id1"]
// }
type AckRequest struct {
Events []AckEvent `json:"events"`
}
// Validate validates the enrollment request before sending it to the API.
func (e *AckRequest) Validate() error {
return nil
}
// AckResponseItem the status items for individual acks
type AckResponseItem struct {
Status int `json:"status"`
Message string `json:"message,omitempty"`
}
// AckResponse is the response send back from the server.
// 200
// {
// "action": "acks"
// "items": [
// {"status": 200},
// {"status": 404},
// ]
// }
type AckResponse struct {
Action string `json:"action"`
Errors bool `json:"errors,omitempty"` // indicates that some of the events in the ack request failed
Items []AckResponseItem `json:"items,omitempty"`
}
// Validate validates the response send from the server.
func (e *AckResponse) Validate() error {
return nil
}
// AckCmd is a fleet API command.
type AckCmd struct {
client client.Sender
info agentInfo
}
// NewAckCmd creates a new api command.
func NewAckCmd(info agentInfo, client client.Sender) *AckCmd {
return &AckCmd{
client: client,
info: info,
}
}
// Execute ACK of actions to the Fleet.
func (e *AckCmd) Execute(ctx context.Context, r *AckRequest) (_ *AckResponse, err error) {
span, ctx := apm.StartSpan(ctx, "execute", "app.internal")
defer func() {
apm.CaptureError(ctx, err).Send()
span.End()
}()
if err := r.Validate(); err != nil {
return nil, err
}
b, err := json.Marshal(r)
if err != nil {
return nil, errors.New(err,
"fail to encode the ack request",
errors.TypeUnexpected)
}
ap := fmt.Sprintf(ackPath, e.info.AgentID())
resp, err := e.client.Send(ctx, "POST", ap, nil, nil, bytes.NewBuffer(b))
if err != nil {
return nil, errors.New(err,
"fail to ack to fleet",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, ap))
}
defer resp.Body.Close()
// Read ack response always it can be sent with any status code.
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var ackResponse AckResponse
if err := json.Unmarshal(body, &ackResponse); err != nil {
return nil, errors.New(err,
"fail to decode ack response",
errors.TypeNetwork,
errors.M(errors.MetaKeyURI, ap))
}
// if action is not "acks", try to extract the error
if ackResponse.Action != "acks" {
return nil, client.ExtractError(bytes.NewReader(body))
}
return &ackResponse, nil
}