Skip to content

Commit

Permalink
Agent: populate action_input_type for the .fleet-actions-results (#30562
Browse files Browse the repository at this point in the history
)

* Agent: populate input_type for the .fleet-actions-results

* Renamed input_type to action_input_type. Rewrote acker unit tests, to simplify them and eliminate the go routines leak

* Updated CHANGELOG.next.asciidoc
  • Loading branch information
aleksmaus authored Feb 28, 2022
1 parent 9cf957b commit e9e86a8
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 172 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Add FIPS configuration option for all AWS API calls. {pull}[28899]
- Add metadata change support for some processors {pull}30183[30183]
- Add support for non-unique Kafka headers for output messages. {pull}30369[30369]
- Add action_input_type for the .fleet-actions-results {pull}30562[30562]

*Auditbeat*

Expand Down
11 changes: 6 additions & 5 deletions x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ type AckEvent struct {
Message string `json:"message,omitempty"` // : 'hello2',
Payload string `json:"payload,omitempty"` // : 'payload2',

ActionData json.RawMessage `json:"action_data,omitempty"` // copy of original action data
ActionResponse map[string]interface{} `json:"action_response,omitempty"` // custom (per beat) response payload
StartedAt string `json:"started_at,omitempty"` // time action started
CompletedAt string `json:"completed_at,omitempty"` // time action completed
Error string `json:"error,omitempty"` // optional action error
ActionInputType string `json:"action_input_type,omitempty"` // copy of original action input_type
ActionData json.RawMessage `json:"action_data,omitempty"` // copy of original action data
ActionResponse map[string]interface{} `json:"action_response,omitempty"` // custom (per beat) response payload
StartedAt string `json:"started_at,omitempty"` // time action started
CompletedAt string `json:"completed_at,omitempty"` // time action completed
Error string `json:"error,omitempty"` // optional action error
}

// AckRequest consists of multiple actions acked to fleet ui.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func constructEvent(action fleetapi.Action, agentID string) fleetapi.AckEvent {
}

if a, ok := action.(*fleetapi.ActionApp); ok {
ackev.ActionInputType = a.InputType
ackev.ActionData = a.Data
ackev.ActionResponse = a.Response
ackev.StartedAt = a.StartedAt
Expand Down
284 changes: 117 additions & 167 deletions x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -22,168 +21,40 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)

func TestAcker_AckCommit(t *testing.T) {
type ackRequest struct {
Events []fleetapi.AckEvent `json:"events"`
}

log, _ := logger.New("fleet_acker", false)
client := newTestingClient()
agentInfo := &testAgentInfo{}
acker, err := NewAcker(log, agentInfo, client)
if err != nil {
t.Fatal(err)
}

if acker == nil {
t.Fatal("acker not initialized")
}

testID := "ack-test-action-id"
testAction := &fleetapi.ActionUnknown{ActionID: testID}

ch := client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
content, err := ioutil.ReadAll(body)
assert.NoError(t, err)
cr := &ackRequest{}
err = json.Unmarshal(content, &cr)
assert.NoError(t, err)

assert.EqualValues(t, 1, len(cr.Events))
assert.EqualValues(t, testID, cr.Events[0].ActionID)

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
})

go func() {
for range ch {
}
}()

if err := acker.Ack(context.Background(), testAction); err != nil {
t.Fatal(err)
}
if err := acker.Commit(context.Background()); err != nil {
t.Fatal(err)
}
}

func TestAcker_AckBatch(t *testing.T) {
type ackRequest struct {
Events []fleetapi.AckEvent `json:"events"`
}

log, _ := logger.New("fleet_acker", false)
client := newTestingClient()
agentInfo := &testAgentInfo{}
acker, err := NewAcker(log, agentInfo, client)
if err != nil {
t.Fatal(err)
}

if acker == nil {
t.Fatal("acker not initialized")
}

testID1 := "ack-test-action-id-1"
testAction1 := &fleetapi.ActionUnknown{ActionID: testID1}
testID2 := "ack-test-action-id-2"
testAction2 := &fleetapi.ActionUnknown{ActionID: testID2}

ch := client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
content, err := ioutil.ReadAll(body)
assert.NoError(t, err)
cr := &ackRequest{}
err = json.Unmarshal(content, &cr)
assert.NoError(t, err)

assert.EqualValues(t, 2, len(cr.Events))
assert.EqualValues(t, testID1, cr.Events[0].ActionID)
assert.EqualValues(t, testID2, cr.Events[1].ActionID)

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
})

go func() {
for range ch {
}
}()

if err := acker.AckBatch(context.Background(), []fleetapi.Action{testAction1, testAction2}); err != nil {
t.Fatal(err)
}
if err := acker.Commit(context.Background()); err != nil {
t.Fatal(err)
}
type ackRequest struct {
Events []fleetapi.AckEvent `json:"events"`
}

func TestAcker_AckBatch_Empty(t *testing.T) {
log, _ := logger.New("fleet_acker", false)
client := newNotCalledClient()
agentInfo := &testAgentInfo{}
acker, err := NewAcker(log, agentInfo, client)
if err != nil {
t.Fatal(err)
}

if acker == nil {
t.Fatal("acker not initialized")
}

if err := acker.AckBatch(context.Background(), []fleetapi.Action{}); err != nil {
t.Fatal(err)
}
if err := acker.Commit(context.Background()); err != nil {
t.Fatal(err)
}
if client.called {
t.Fatal("client should not have been used")
}
}
type testAgentInfo struct{}

type clientCallbackFunc func(headers http.Header, body io.Reader) (*http.Response, error)
func (testAgentInfo) AgentID() string { return "agent-secret" }

type testingClient struct {
sync.Mutex
callback clientCallbackFunc
received chan struct{}
type testSender struct {
req *ackRequest
}

func (t *testingClient) Send(
func (s *testSender) Send(
_ context.Context,
method string,
path string,
params url.Values,
headers http.Header,
body io.Reader,
) (*http.Response, error) {
t.Lock()
defer t.Unlock()
defer func() { t.received <- struct{}{} }()
return t.callback(headers, body)
d := json.NewDecoder(body)
var req ackRequest
err := d.Decode(&req)
if err != nil {
return nil, err
}
s.req = &req
return wrapStrToResp(http.StatusOK, `{ "actions": [] }`), nil
}

func (t *testingClient) URI() string {
func (s *testSender) URI() string {
return "http://localhost"
}

func (t *testingClient) Answer(fn clientCallbackFunc) <-chan struct{} {
t.Lock()
defer t.Unlock()
t.callback = fn
return t.received
}

func newTestingClient() *testingClient {
return &testingClient{received: make(chan struct{}, 1)}
}

type testAgentInfo struct{}

func (testAgentInfo) AgentID() string { return "agent-secret" }

func wrapStrToResp(code int, body string) *http.Response {
return &http.Response{
Status: fmt.Sprintf("%d %s", code, http.StatusText(code)),
Expand All @@ -197,29 +68,108 @@ func wrapStrToResp(code int, body string) *http.Response {
}
}

type notCalledClient struct {
sync.Mutex
called bool
}
func TestAcker_Ack(t *testing.T) {
tests := []struct {
name string
actions []fleetapi.Action
batch bool
}{
{
name: "nil",
actions: nil,
},
{
name: "empty",
actions: []fleetapi.Action{},
},
{
name: "ack",
actions: []fleetapi.Action{&fleetapi.ActionUnknown{ActionID: "ack-test-action-id"}},
},
{
name: "ackbatch",
actions: []fleetapi.Action{
&fleetapi.ActionUnknown{ActionID: "ack-test-action-id1"},
&fleetapi.ActionUnknown{ActionID: "ack-test-action-id2"},
},
},
{
name: "ackaction",
actions: []fleetapi.Action{
&fleetapi.ActionApp{
ActionID: "1b12dcd8-bde0-4045-92dc-c4b27668d733",
InputType: "osquery",
Data: []byte(`{"query":"select * from osquery_info"}`),
Response: map[string]interface{}{"osquery": map[string]interface{}{"count": float64(1)}},
StartedAt: "2022-02-23T18:26:08.506128Z",
CompletedAt: "2022-02-23T18:26:08.507593Z",
},
&fleetapi.ActionApp{
ActionID: "2b12dcd8-bde0-4045-92dc-c4b27668d733",
InputType: "osquery",
Data: []byte(`{"query":"select * from foobar"}`),
StartedAt: "2022-02-24T18:26:08.506128Z",
CompletedAt: "2022-02-24T18:26:08.507593Z",
Error: "uknown table",
},
},
},
}

func (t *notCalledClient) Send(
_ context.Context,
method string,
path string,
params url.Values,
headers http.Header,
body io.Reader,
) (*http.Response, error) {
t.Lock()
defer t.Unlock()
t.called = true
return nil, fmt.Errorf("should not have been called")
}
log, _ := logger.New("fleet_acker", false)
agentInfo := &testAgentInfo{}

func (t *notCalledClient) URI() string {
return "http://localhost"
}
checkRequest := func(t *testing.T, actions []fleetapi.Action, req *ackRequest) {
if len(actions) == 0 { // If no actions, expect no request, the sender was not called
assert.Nil(t, req)
return
}
assert.EqualValues(t, len(actions), len(req.Events))
for i, ac := range actions {
assert.EqualValues(t, "ACTION_RESULT", req.Events[i].EventType)
assert.EqualValues(t, "ACKNOWLEDGED", req.Events[i].SubType)
assert.EqualValues(t, ac.ID(), req.Events[i].ActionID)
assert.EqualValues(t, agentInfo.AgentID(), req.Events[i].AgentID)
assert.EqualValues(t, fmt.Sprintf("Action '%s' of type '%s' acknowledged.", ac.ID(), ac.Type()), req.Events[i].Message)
if a, ok := ac.(*fleetapi.ActionApp); ok {
assert.EqualValues(t, a.InputType, req.Events[i].ActionInputType)
assert.EqualValues(t, a.Data, req.Events[i].ActionData)
assert.EqualValues(t, a.Response, req.Events[i].ActionResponse)
assert.EqualValues(t, a.StartedAt, req.Events[i].StartedAt)
assert.EqualValues(t, a.CompletedAt, req.Events[i].CompletedAt)
assert.EqualValues(t, a.Error, req.Events[i].Error)
}

}
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
sender := &testSender{}
acker, err := NewAcker(log, agentInfo, sender)
if err != nil {
t.Fatal(err)
}

func newNotCalledClient() *notCalledClient {
return &notCalledClient{}
if acker == nil {
t.Fatal("acker not initialized")
}

if len(tc.actions) == 1 {
err = acker.Ack(context.Background(), tc.actions[0])
} else {
err = acker.AckBatch(context.Background(), tc.actions)
}

if err != nil {
t.Fatal(err)
}

if err := acker.Commit(context.Background()); err != nil {
t.Fatal(err)
}

checkRequest(t, tc.actions, sender.req)
})
}
}

0 comments on commit e9e86a8

Please sign in to comment.