From 5ecdbc43209c205c0a698fbb8e9a2d0fa4c40835 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Wed, 23 Feb 2022 13:50:35 -0500 Subject: [PATCH 1/3] Agent: populate input_type for the .fleet-actions-results --- x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go | 1 + x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker.go | 1 + 2 files changed, 2 insertions(+) diff --git a/x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go b/x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go index 5002fc36ffe7..64e21d566049 100644 --- a/x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go +++ b/x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go @@ -27,6 +27,7 @@ type AckEvent struct { Message string `json:"message,omitempty"` // : 'hello2', Payload string `json:"payload,omitempty"` // : 'payload2', + InputType string `json:"input_type,omitempty"` // copy of original action input_type ActionData json.RawMessage `json:"action_data,omitempty"` // copy of original action data ActionResponse map[string]interface{} `json:"action_response,omitempty"` // custom (per beat) response payload StartedAt string `json:"started_at,omitempty"` // time action started diff --git a/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker.go b/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker.go index fcc74f17205e..0475bf74d352 100644 --- a/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker.go +++ b/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker.go @@ -114,6 +114,7 @@ func constructEvent(action fleetapi.Action, agentID string) fleetapi.AckEvent { } if a, ok := action.(*fleetapi.ActionApp); ok { + ackev.InputType = a.InputType ackev.ActionData = a.Data ackev.ActionResponse = a.Response ackev.StartedAt = a.StartedAt From 2c3a20579b75ca435638aa3c6ba44938790ef79b Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Thu, 24 Feb 2022 11:56:52 -0500 Subject: [PATCH 2/3] Renamed input_type to action_input_type. Rewrote acker unit tests, to simplify them and eliminate the go routines leak --- x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go | 12 +- .../pkg/fleetapi/acker/fleet/fleet_acker.go | 2 +- .../fleetapi/acker/fleet/fleet_acker_test.go | 284 ++++++++---------- 3 files changed, 124 insertions(+), 174 deletions(-) diff --git a/x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go b/x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go index 64e21d566049..50908383d27d 100644 --- a/x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go +++ b/x-pack/elastic-agent/pkg/fleetapi/ack_cmd.go @@ -27,12 +27,12 @@ type AckEvent struct { Message string `json:"message,omitempty"` // : 'hello2', Payload string `json:"payload,omitempty"` // : 'payload2', - InputType string `json:"input_type,omitempty"` // copy of original action input_type - ActionData json.RawMessage `json:"action_data,omitempty"` // copy of original action data - ActionResponse map[string]interface{} `json:"action_response,omitempty"` // custom (per beat) response payload - StartedAt string `json:"started_at,omitempty"` // time action started - CompletedAt string `json:"completed_at,omitempty"` // time action completed - Error string `json:"error,omitempty"` // optional action error + ActionInputType string `json:"action_input_type,omitempty"` // copy of original action input_type + ActionData json.RawMessage `json:"action_data,omitempty"` // copy of original action data + ActionResponse map[string]interface{} `json:"action_response,omitempty"` // custom (per beat) response payload + StartedAt string `json:"started_at,omitempty"` // time action started + CompletedAt string `json:"completed_at,omitempty"` // time action completed + Error string `json:"error,omitempty"` // optional action error } // AckRequest consists of multiple actions acked to fleet ui. diff --git a/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker.go b/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker.go index 0475bf74d352..b261dd12c7d0 100644 --- a/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker.go +++ b/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker.go @@ -114,7 +114,7 @@ func constructEvent(action fleetapi.Action, agentID string) fleetapi.AckEvent { } if a, ok := action.(*fleetapi.ActionApp); ok { - ackev.InputType = a.InputType + ackev.ActionInputType = a.InputType ackev.ActionData = a.Data ackev.ActionResponse = a.Response ackev.StartedAt = a.StartedAt diff --git a/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker_test.go b/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker_test.go index 3f78a3424689..588a92508b97 100644 --- a/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker_test.go +++ b/x-pack/elastic-agent/pkg/fleetapi/acker/fleet/fleet_acker_test.go @@ -13,7 +13,6 @@ import ( "io/ioutil" "net/http" "net/url" - "sync" "testing" "github.com/stretchr/testify/assert" @@ -22,136 +21,19 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" ) -func TestAcker_AckCommit(t *testing.T) { - type ackRequest struct { - Events []fleetapi.AckEvent `json:"events"` - } - - log, _ := logger.New("fleet_acker", false) - client := newTestingClient() - agentInfo := &testAgentInfo{} - acker, err := NewAcker(log, agentInfo, client) - if err != nil { - t.Fatal(err) - } - - if acker == nil { - t.Fatal("acker not initialized") - } - - testID := "ack-test-action-id" - testAction := &fleetapi.ActionUnknown{ActionID: testID} - - ch := client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) { - content, err := ioutil.ReadAll(body) - assert.NoError(t, err) - cr := &ackRequest{} - err = json.Unmarshal(content, &cr) - assert.NoError(t, err) - - assert.EqualValues(t, 1, len(cr.Events)) - assert.EqualValues(t, testID, cr.Events[0].ActionID) - - resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`) - return resp, nil - }) - - go func() { - for range ch { - } - }() - - if err := acker.Ack(context.Background(), testAction); err != nil { - t.Fatal(err) - } - if err := acker.Commit(context.Background()); err != nil { - t.Fatal(err) - } -} - -func TestAcker_AckBatch(t *testing.T) { - type ackRequest struct { - Events []fleetapi.AckEvent `json:"events"` - } - - log, _ := logger.New("fleet_acker", false) - client := newTestingClient() - agentInfo := &testAgentInfo{} - acker, err := NewAcker(log, agentInfo, client) - if err != nil { - t.Fatal(err) - } - - if acker == nil { - t.Fatal("acker not initialized") - } - - testID1 := "ack-test-action-id-1" - testAction1 := &fleetapi.ActionUnknown{ActionID: testID1} - testID2 := "ack-test-action-id-2" - testAction2 := &fleetapi.ActionUnknown{ActionID: testID2} - - ch := client.Answer(func(headers http.Header, body io.Reader) (*http.Response, error) { - content, err := ioutil.ReadAll(body) - assert.NoError(t, err) - cr := &ackRequest{} - err = json.Unmarshal(content, &cr) - assert.NoError(t, err) - - assert.EqualValues(t, 2, len(cr.Events)) - assert.EqualValues(t, testID1, cr.Events[0].ActionID) - assert.EqualValues(t, testID2, cr.Events[1].ActionID) - - resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`) - return resp, nil - }) - - go func() { - for range ch { - } - }() - - if err := acker.AckBatch(context.Background(), []fleetapi.Action{testAction1, testAction2}); err != nil { - t.Fatal(err) - } - if err := acker.Commit(context.Background()); err != nil { - t.Fatal(err) - } +type ackRequest struct { + Events []fleetapi.AckEvent `json:"events"` } -func TestAcker_AckBatch_Empty(t *testing.T) { - log, _ := logger.New("fleet_acker", false) - client := newNotCalledClient() - agentInfo := &testAgentInfo{} - acker, err := NewAcker(log, agentInfo, client) - if err != nil { - t.Fatal(err) - } - - if acker == nil { - t.Fatal("acker not initialized") - } - - if err := acker.AckBatch(context.Background(), []fleetapi.Action{}); err != nil { - t.Fatal(err) - } - if err := acker.Commit(context.Background()); err != nil { - t.Fatal(err) - } - if client.called { - t.Fatal("client should not have been used") - } -} +type testAgentInfo struct{} -type clientCallbackFunc func(headers http.Header, body io.Reader) (*http.Response, error) +func (testAgentInfo) AgentID() string { return "agent-secret" } -type testingClient struct { - sync.Mutex - callback clientCallbackFunc - received chan struct{} +type testSender struct { + req *ackRequest } -func (t *testingClient) Send( +func (s *testSender) Send( _ context.Context, method string, path string, @@ -159,31 +41,20 @@ func (t *testingClient) Send( headers http.Header, body io.Reader, ) (*http.Response, error) { - t.Lock() - defer t.Unlock() - defer func() { t.received <- struct{}{} }() - return t.callback(headers, body) + d := json.NewDecoder(body) + var req ackRequest + err := d.Decode(&req) + if err != nil { + return nil, err + } + s.req = &req + return wrapStrToResp(http.StatusOK, `{ "actions": [] }`), nil } -func (t *testingClient) URI() string { +func (s *testSender) URI() string { return "http://localhost" } -func (t *testingClient) Answer(fn clientCallbackFunc) <-chan struct{} { - t.Lock() - defer t.Unlock() - t.callback = fn - return t.received -} - -func newTestingClient() *testingClient { - return &testingClient{received: make(chan struct{}, 1)} -} - -type testAgentInfo struct{} - -func (testAgentInfo) AgentID() string { return "agent-secret" } - func wrapStrToResp(code int, body string) *http.Response { return &http.Response{ Status: fmt.Sprintf("%d %s", code, http.StatusText(code)), @@ -197,29 +68,108 @@ func wrapStrToResp(code int, body string) *http.Response { } } -type notCalledClient struct { - sync.Mutex - called bool -} +func TestAcker_Ack(t *testing.T) { + tests := []struct { + name string + actions []fleetapi.Action + batch bool + }{ + { + name: "nil", + actions: nil, + }, + { + name: "empty", + actions: []fleetapi.Action{}, + }, + { + name: "ack", + actions: []fleetapi.Action{&fleetapi.ActionUnknown{ActionID: "ack-test-action-id"}}, + }, + { + name: "ackbatch", + actions: []fleetapi.Action{ + &fleetapi.ActionUnknown{ActionID: "ack-test-action-id1"}, + &fleetapi.ActionUnknown{ActionID: "ack-test-action-id2"}, + }, + }, + { + name: "ackaction", + actions: []fleetapi.Action{ + &fleetapi.ActionApp{ + ActionID: "1b12dcd8-bde0-4045-92dc-c4b27668d733", + InputType: "osquery", + Data: []byte(`{"query":"select * from osquery_info"}`), + Response: map[string]interface{}{"osquery": map[string]interface{}{"count": float64(1)}}, + StartedAt: "2022-02-23T18:26:08.506128Z", + CompletedAt: "2022-02-23T18:26:08.507593Z", + }, + &fleetapi.ActionApp{ + ActionID: "2b12dcd8-bde0-4045-92dc-c4b27668d733", + InputType: "osquery", + Data: []byte(`{"query":"select * from foobar"}`), + StartedAt: "2022-02-24T18:26:08.506128Z", + CompletedAt: "2022-02-24T18:26:08.507593Z", + Error: "uknown table", + }, + }, + }, + } -func (t *notCalledClient) Send( - _ context.Context, - method string, - path string, - params url.Values, - headers http.Header, - body io.Reader, -) (*http.Response, error) { - t.Lock() - defer t.Unlock() - t.called = true - return nil, fmt.Errorf("should not have been called") -} + log, _ := logger.New("fleet_acker", false) + agentInfo := &testAgentInfo{} -func (t *notCalledClient) URI() string { - return "http://localhost" -} + checkRequest := func(t *testing.T, actions []fleetapi.Action, req *ackRequest) { + if len(actions) == 0 { // If no actions, expect no request, the sender was not called + assert.Nil(t, req) + return + } + assert.EqualValues(t, len(actions), len(req.Events)) + for i, ac := range actions { + assert.EqualValues(t, "ACTION_RESULT", req.Events[i].EventType) + assert.EqualValues(t, "ACKNOWLEDGED", req.Events[i].SubType) + assert.EqualValues(t, ac.ID(), req.Events[i].ActionID) + assert.EqualValues(t, agentInfo.AgentID(), req.Events[i].AgentID) + assert.EqualValues(t, fmt.Sprintf("Action '%s' of type '%s' acknowledged.", ac.ID(), ac.Type()), req.Events[i].Message) + if a, ok := ac.(*fleetapi.ActionApp); ok { + assert.EqualValues(t, a.InputType, req.Events[i].ActionInputType) + assert.EqualValues(t, a.Data, req.Events[i].ActionData) + assert.EqualValues(t, a.Response, req.Events[i].ActionResponse) + assert.EqualValues(t, a.StartedAt, req.Events[i].StartedAt) + assert.EqualValues(t, a.CompletedAt, req.Events[i].CompletedAt) + assert.EqualValues(t, a.Error, req.Events[i].Error) + } + + } + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + sender := &testSender{} + acker, err := NewAcker(log, agentInfo, sender) + if err != nil { + t.Fatal(err) + } -func newNotCalledClient() *notCalledClient { - return ¬CalledClient{} + if acker == nil { + t.Fatal("acker not initialized") + } + + if len(tc.actions) == 1 { + err = acker.Ack(context.Background(), tc.actions[0]) + } else { + err = acker.AckBatch(context.Background(), tc.actions) + } + + if err != nil { + t.Fatal(err) + } + + if err := acker.Commit(context.Background()); err != nil { + t.Fatal(err) + } + + checkRequest(t, tc.actions, sender.req) + }) + } } From 4bbefb153d16fc209788bfffe87bf05aa7c93b59 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Thu, 24 Feb 2022 13:56:08 -0500 Subject: [PATCH 3/3] Updated CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8ad6feff7387..e02f5e0c41b4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif - Add FIPS configuration option for all AWS API calls. {pull}[28899] - Add metadata change support for some processors {pull}30183[30183] - Add support for non-unique Kafka headers for output messages. {pull}30369[30369] +- Add action_input_type for the .fleet-actions-results {pull}30562[30562] *Auditbeat*