Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent: populate action_input_type for the .fleet-actions-results #30562

Merged
merged 3 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Add FIPS configuration option for all AWS API calls. {pull}[28899]
- Add metadata change support for some processors {pull}30183[30183]
- Add support for non-unique Kafka headers for output messages. {pull}30369[30369]
- Add action_input_type for the .fleet-actions-results {pull}30562[30562]

*Auditbeat*

Expand Down
11 changes: 6 additions & 5 deletions x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ type AckEvent struct {
Message string `json:"message,omitempty"` // : 'hello2',
Payload string `json:"payload,omitempty"` // : 'payload2',

ActionData json.RawMessage `json:"action_data,omitempty"` // copy of original action data
ActionResponse map[string]interface{} `json:"action_response,omitempty"` // custom (per beat) response payload
StartedAt string `json:"started_at,omitempty"` // time action started
CompletedAt string `json:"completed_at,omitempty"` // time action completed
Error string `json:"error,omitempty"` // optional action error
ActionInputType string `json:"action_input_type,omitempty"` // copy of original action input_type
ActionData json.RawMessage `json:"action_data,omitempty"` // copy of original action data
ActionResponse map[string]interface{} `json:"action_response,omitempty"` // custom (per beat) response payload
StartedAt string `json:"started_at,omitempty"` // time action started
CompletedAt string `json:"completed_at,omitempty"` // time action completed
Error string `json:"error,omitempty"` // optional action error
}

// AckRequest consists of multiple actions acked to fleet ui.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func constructEvent(action fleetapi.Action, agentID string) fleetapi.AckEvent {
}

if a, ok := action.(*fleetapi.ActionApp); ok {
ackev.ActionInputType = a.InputType
ackev.ActionData = a.Data
ackev.ActionResponse = a.Response
ackev.StartedAt = a.StartedAt
Expand Down
284 changes: 117 additions & 167 deletions x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -22,168 +21,40 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)

func TestAcker_AckCommit(t *testing.T) {
type ackRequest struct {
Events []fleetapi.AckEvent `json:"events"`
}

log, _ := logger.New("fleet_acker", false)
client := newTestingClient()
agentInfo := &testAgentInfo{}
acker, err := NewAcker(log, agentInfo, client)
if err != nil {
t.Fatal(err)
}

if acker == nil {
t.Fatal("acker not initialized")
}

testID := "ack-test-action-id"
testAction := &fleetapi.ActionUnknown{ActionID: testID}

ch := client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
content, err := ioutil.ReadAll(body)
assert.NoError(t, err)
cr := &ackRequest{}
err = json.Unmarshal(content, &cr)
assert.NoError(t, err)

assert.EqualValues(t, 1, len(cr.Events))
assert.EqualValues(t, testID, cr.Events[0].ActionID)

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
})

go func() {
for range ch {
}
}()

if err := acker.Ack(context.Background(), testAction); err != nil {
t.Fatal(err)
}
if err := acker.Commit(context.Background()); err != nil {
t.Fatal(err)
}
}

func TestAcker_AckBatch(t *testing.T) {
type ackRequest struct {
Events []fleetapi.AckEvent `json:"events"`
}

log, _ := logger.New("fleet_acker", false)
client := newTestingClient()
agentInfo := &testAgentInfo{}
acker, err := NewAcker(log, agentInfo, client)
if err != nil {
t.Fatal(err)
}

if acker == nil {
t.Fatal("acker not initialized")
}

testID1 := "ack-test-action-id-1"
testAction1 := &fleetapi.ActionUnknown{ActionID: testID1}
testID2 := "ack-test-action-id-2"
testAction2 := &fleetapi.ActionUnknown{ActionID: testID2}

ch := client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) {
content, err := ioutil.ReadAll(body)
assert.NoError(t, err)
cr := &ackRequest{}
err = json.Unmarshal(content, &cr)
assert.NoError(t, err)

assert.EqualValues(t, 2, len(cr.Events))
assert.EqualValues(t, testID1, cr.Events[0].ActionID)
assert.EqualValues(t, testID2, cr.Events[1].ActionID)

resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
})

go func() {
for range ch {
}
}()

if err := acker.AckBatch(context.Background(), []fleetapi.Action{testAction1, testAction2}); err != nil {
t.Fatal(err)
}
if err := acker.Commit(context.Background()); err != nil {
t.Fatal(err)
}
type ackRequest struct {
Events []fleetapi.AckEvent `json:"events"`
}

func TestAcker_AckBatch_Empty(t *testing.T) {
log, _ := logger.New("fleet_acker", false)
client := newNotCalledClient()
agentInfo := &testAgentInfo{}
acker, err := NewAcker(log, agentInfo, client)
if err != nil {
t.Fatal(err)
}

if acker == nil {
t.Fatal("acker not initialized")
}

if err := acker.AckBatch(context.Background(), []fleetapi.Action{}); err != nil {
t.Fatal(err)
}
if err := acker.Commit(context.Background()); err != nil {
t.Fatal(err)
}
if client.called {
t.Fatal("client should not have been used")
}
}
type testAgentInfo struct{}

type clientCallbackFunc func(headers http.Header, body io.Reader) (*http.Response, error)
func (testAgentInfo) AgentID() string { return "agent-secret" }

type testingClient struct {
sync.Mutex
callback clientCallbackFunc
received chan struct{}
type testSender struct {
req *ackRequest
}

func (t *testingClient) Send(
func (s *testSender) Send(
_ context.Context,
method string,
path string,
params url.Values,
headers http.Header,
body io.Reader,
) (*http.Response, error) {
t.Lock()
defer t.Unlock()
defer func() { t.received <- struct{}{} }()
return t.callback(headers, body)
d := json.NewDecoder(body)
var req ackRequest
err := d.Decode(&req)
if err != nil {
return nil, err
}
s.req = &req
return wrapStrToResp(http.StatusOK, `{ "actions": [] }`), nil
}

func (t *testingClient) URI() string {
func (s *testSender) URI() string {
return "http://localhost"
}

func (t *testingClient) Answer(fn clientCallbackFunc) <-chan struct{} {
t.Lock()
defer t.Unlock()
t.callback = fn
return t.received
}

func newTestingClient() *testingClient {
return &testingClient{received: make(chan struct{}, 1)}
}

type testAgentInfo struct{}

func (testAgentInfo) AgentID() string { return "agent-secret" }

func wrapStrToResp(code int, body string) *http.Response {
return &http.Response{
Status: fmt.Sprintf("%d %s", code, http.StatusText(code)),
Expand All @@ -197,29 +68,108 @@ func wrapStrToResp(code int, body string) *http.Response {
}
}

type notCalledClient struct {
sync.Mutex
called bool
}
func TestAcker_Ack(t *testing.T) {
tests := []struct {
name string
actions []fleetapi.Action
batch bool
}{
{
name: "nil",
actions: nil,
},
{
name: "empty",
actions: []fleetapi.Action{},
},
{
name: "ack",
actions: []fleetapi.Action{&fleetapi.ActionUnknown{ActionID: "ack-test-action-id"}},
},
{
name: "ackbatch",
actions: []fleetapi.Action{
&fleetapi.ActionUnknown{ActionID: "ack-test-action-id1"},
&fleetapi.ActionUnknown{ActionID: "ack-test-action-id2"},
},
},
{
name: "ackaction",
actions: []fleetapi.Action{
&fleetapi.ActionApp{
ActionID: "1b12dcd8-bde0-4045-92dc-c4b27668d733",
InputType: "osquery",
Data: []byte(`{"query":"select * from osquery_info"}`),
Response: map[string]interface{}{"osquery": map[string]interface{}{"count": float64(1)}},
StartedAt: "2022-02-23T18:26:08.506128Z",
CompletedAt: "2022-02-23T18:26:08.507593Z",
},
&fleetapi.ActionApp{
ActionID: "2b12dcd8-bde0-4045-92dc-c4b27668d733",
InputType: "osquery",
Data: []byte(`{"query":"select * from foobar"}`),
StartedAt: "2022-02-24T18:26:08.506128Z",
CompletedAt: "2022-02-24T18:26:08.507593Z",
Error: "uknown table",
},
},
},
}

func (t *notCalledClient) Send(
_ context.Context,
method string,
path string,
params url.Values,
headers http.Header,
body io.Reader,
) (*http.Response, error) {
t.Lock()
defer t.Unlock()
t.called = true
return nil, fmt.Errorf("should not have been called")
}
log, _ := logger.New("fleet_acker", false)
agentInfo := &testAgentInfo{}

func (t *notCalledClient) URI() string {
return "http://localhost"
}
checkRequest := func(t *testing.T, actions []fleetapi.Action, req *ackRequest) {
if len(actions) == 0 { // If no actions, expect no request, the sender was not called
assert.Nil(t, req)
return
}
assert.EqualValues(t, len(actions), len(req.Events))
for i, ac := range actions {
assert.EqualValues(t, "ACTION_RESULT", req.Events[i].EventType)
assert.EqualValues(t, "ACKNOWLEDGED", req.Events[i].SubType)
assert.EqualValues(t, ac.ID(), req.Events[i].ActionID)
assert.EqualValues(t, agentInfo.AgentID(), req.Events[i].AgentID)
assert.EqualValues(t, fmt.Sprintf("Action '%s' of type '%s' acknowledged.", ac.ID(), ac.Type()), req.Events[i].Message)
if a, ok := ac.(*fleetapi.ActionApp); ok {
assert.EqualValues(t, a.InputType, req.Events[i].ActionInputType)
assert.EqualValues(t, a.Data, req.Events[i].ActionData)
assert.EqualValues(t, a.Response, req.Events[i].ActionResponse)
assert.EqualValues(t, a.StartedAt, req.Events[i].StartedAt)
assert.EqualValues(t, a.CompletedAt, req.Events[i].CompletedAt)
assert.EqualValues(t, a.Error, req.Events[i].Error)
}

}
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
sender := &testSender{}
acker, err := NewAcker(log, agentInfo, sender)
if err != nil {
t.Fatal(err)
}

func newNotCalledClient() *notCalledClient {
return &notCalledClient{}
if acker == nil {
t.Fatal("acker not initialized")
}

if len(tc.actions) == 1 {
err = acker.Ack(context.Background(), tc.actions[0])
} else {
err = acker.AckBatch(context.Background(), tc.actions)
}

if err != nil {
t.Fatal(err)
}

if err := acker.Commit(context.Background()); err != nil {
t.Fatal(err)
}

checkRequest(t, tc.actions, sender.req)
})
}
}