From 82cfeb2fb00e0baefbd4f73c7fc1112aa5d96e4b Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 14 Jul 2020 09:31:30 -0400 Subject: [PATCH] [Elastic Agent] Send checkin payload to Fleet (#19857) (#19882) * Add sending payload from status checking to Kibana. * Run make notice. * Fix fleet manager. (cherry picked from commit e16b6bd9499a090b8a497380d5e4705b6d1ccc4f) --- NOTICE.txt | 4 +- go.mod | 2 +- go.sum | 4 +- .../pkg/agent/operation/monitoring_test.go | 8 ++-- .../pkg/agent/operation/operation.go | 8 ++-- .../pkg/agent/operation/operation_config.go | 2 +- .../pkg/agent/operation/operation_fetch.go | 2 +- .../pkg/agent/operation/operation_install.go | 2 +- .../pkg/agent/operation/operation_remove.go | 2 +- .../pkg/agent/operation/operation_start.go | 2 +- .../agent/operation/operation_uninstall.go | 2 +- .../pkg/agent/operation/operation_verify.go | 2 +- .../pkg/agent/operation/operator_test.go | 11 ++++++ .../configurable-1.0-darwin-x86_64/main.go | 22 +++++++---- .../pkg/core/plugin/process/app.go | 18 +++++---- .../pkg/core/plugin/process/start.go | 8 ++-- .../pkg/core/plugin/process/status.go | 6 +-- .../pkg/core/plugin/service/app.go | 30 ++++++++------- .../elastic-agent/pkg/core/server/server.go | 37 +++++++++++++++---- .../pkg/core/server/server_test.go | 29 ++++++++------- x-pack/elastic-agent/pkg/core/state/state.go | 1 + x-pack/elastic-agent/pkg/reporter/reporter.go | 7 ++++ x-pack/libbeat/management/fleet/manager.go | 10 ++--- 23 files changed, 137 insertions(+), 82 deletions(-) diff --git a/NOTICE.txt b/NOTICE.txt index bb94184451f9..1c51ea96d16c 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -5703,11 +5703,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/ecs@v1.5.0/LICE -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-client/v7 -Version: v7.0.0-20200601155656-d6a9eb4f6d07 +Version: v7.0.0-20200709172729-d43b7ad5833a Licence type (autodetected): Elastic -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.0.0-20200601155656-d6a9eb4f6d07/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.0.0-20200709172729-d43b7ad5833a/LICENSE.txt: ELASTIC LICENSE AGREEMENT diff --git a/go.mod b/go.mod index 9af7e2bd5469..0a7e675f6c2c 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2 github.com/elastic/ecs v1.5.0 - github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07 + github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a github.com/elastic/go-concert v0.0.3 github.com/elastic/go-libaudit/v2 v2.0.0-20200515221334-92371bef3fb8 github.com/elastic/go-licenser v0.3.1 diff --git a/go.sum b/go.sum index 56f84eb6a360..e3a671b9951e 100644 --- a/go.sum +++ b/go.sum @@ -226,8 +226,8 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY= github.com/elastic/ecs v1.5.0 h1:/VEIBsRU4ecq2+U3RPfKNc6bFyomP6qnthYEcQZu8GU= github.com/elastic/ecs v1.5.0/go.mod h1:pgiLbQsijLOJvFR8OTILLu0Ni/R/foUNg0L+T6mU9b4= -github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07 h1:s/41t2QLLkaa83VlS5UuyKH0ctX3bG4RMnE3Eha+8fU= -github.com/elastic/elastic-agent-client/v7 v7.0.0-20200601155656-d6a9eb4f6d07/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc= +github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a h1:2NHgf1RUw+f240lpTnLrCp1aBNvq2wDi0E1A423/S1k= +github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc= github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4= github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270/go.mod h1:Msl1pdboCbArMF/nSCDUXgQuWTeoMmE/z8607X+k7ng= github.com/elastic/go-concert v0.0.3 h1:f0F4WOi8tBOFIgwA7YbHRQ+Ok8vR+/qFrG7vYvbpX5Q= diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go index 921c5b5e93c5..fa52447e52a5 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go @@ -157,10 +157,10 @@ func (*testMonitorableApp) Shutdown() {} func (*testMonitorableApp) Configure(_ context.Context, config map[string]interface{}) error { return nil } -func (*testMonitorableApp) State() state.State { return state.State{} } -func (*testMonitorableApp) SetState(_ state.Status, _ string) {} -func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor } -func (a *testMonitorableApp) OnStatusChange(_ *server.ApplicationState, _ proto.StateObserved_Status, _ string) { +func (*testMonitorableApp) State() state.State { return state.State{} } +func (*testMonitorableApp) SetState(_ state.Status, _ string, _ map[string]interface{}) {} +func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor } +func (a *testMonitorableApp) OnStatusChange(_ *server.ApplicationState, _ proto.StateObserved_Status, _ string, _ map[string]interface{}) { } type testMonitor struct { diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation.go b/x-pack/elastic-agent/pkg/agent/operation/operation.go index d14852d7eabb..caa2f8abf402 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation.go @@ -42,8 +42,8 @@ type Application interface { Configure(ctx context.Context, config map[string]interface{}) error Monitor() monitoring.Monitor State() state.State - SetState(status state.Status, msg string) - OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string) + SetState(status state.Status, msg string, payload map[string]interface{}) + OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{}) } // Descriptor defines a program which needs to be run. @@ -68,10 +68,10 @@ type ApplicationStatusHandler struct{} // OnStatusChange is the handler called by the GRPC server code. // // It updates the status of the application and handles restarting the application is needed. -func (*ApplicationStatusHandler) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string) { +func (*ApplicationStatusHandler) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{}) { app, ok := s.App().(Application) if !ok { panic(errors.New("only Application can be registered when using the ApplicationStatusHandler", errors.TypeUnexpected)) } - app.OnStatusChange(s, status, msg) + app.OnStatusChange(s, status, msg, payload) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go index ca52b8791ae8..6f143a4e8064 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go @@ -53,7 +53,7 @@ func (o *operationConfig) Check(_ context.Context, _ Application) (bool, error) func (o *operationConfig) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - application.SetState(state.Failed, err.Error()) + application.SetState(state.Failed, err.Error(), nil) } }() return application.Configure(ctx, o.cfg) diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go b/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go index fc72915ee604..627ea9422c8e 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go @@ -66,7 +66,7 @@ func (o *operationFetch) Check(_ context.Context, _ Application) (bool, error) { func (o *operationFetch) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - application.SetState(state.Failed, err.Error()) + application.SetState(state.Failed, err.Error(), nil) } }() diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_install.go b/x-pack/elastic-agent/pkg/agent/operation/operation_install.go index 883b895d4d21..f92d4606b328 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_install.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_install.go @@ -57,7 +57,7 @@ func (o *operationInstall) Check(ctx context.Context, _ Application) (bool, erro func (o *operationInstall) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - application.SetState(state.Failed, err.Error()) + application.SetState(state.Failed, err.Error(), nil) } }() diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go b/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go index 2f95f7ac50be..f22542934467 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go @@ -34,7 +34,7 @@ func (o *operationRemove) Check(_ context.Context, _ Application) (bool, error) func (o *operationRemove) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - application.SetState(state.Failed, err.Error()) + application.SetState(state.Failed, err.Error(), nil) } }() diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_start.go b/x-pack/elastic-agent/pkg/agent/operation/operation_start.go index 17a2d015c58b..c22f61bb9673 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_start.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_start.go @@ -58,7 +58,7 @@ func (o *operationStart) Check(_ context.Context, application Application) (bool func (o *operationStart) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - application.SetState(state.Failed, err.Error()) + application.SetState(state.Failed, err.Error(), nil) } }() diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_uninstall.go b/x-pack/elastic-agent/pkg/agent/operation/operation_uninstall.go index 1d30b639fa7a..de8f797f8ef3 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_uninstall.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_uninstall.go @@ -47,7 +47,7 @@ func (o *operationUninstall) Check(_ context.Context, _ Application) (bool, erro func (o *operationUninstall) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - application.SetState(state.Failed, err.Error()) + application.SetState(state.Failed, err.Error(), nil) } }() diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go b/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go index bc5d3d3b8cd2..33cdd76e5be5 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go @@ -62,7 +62,7 @@ func (o *operationVerify) Check(_ context.Context, _ Application) (bool, error) func (o *operationVerify) Run(_ context.Context, application Application) (err error) { defer func() { if err != nil { - application.SetState(state.Failed, err.Error()) + application.SetState(state.Failed, err.Error(), nil) } }() diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator_test.go b/x-pack/elastic-agent/pkg/agent/operation/operator_test.go index c9cdae90c04c..ff8cd1214700 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator_test.go @@ -12,6 +12,8 @@ import ( "runtime" "testing" + "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" @@ -137,6 +139,15 @@ func TestConfigurableFailed(t *testing.T) { pid = item.ProcessInfo.PID return nil }) + items := operator.State() + item, ok := items[p.ID()] + if !ok { + t.Fatalf("no state for process") + } + assert.Equal(t, map[string]interface{}{ + "status": float64(proto.StateObserved_HEALTHY), + "message": "Running", + }, item.Payload) // try to configure (with failed status) cfg := make(map[string]interface{}) diff --git a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/main.go b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/main.go index f5de06883cdc..1e6c88106b60 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/main.go +++ b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/main.go @@ -44,28 +44,28 @@ type configServer struct { f *os.File ctx context.Context cancel context.CancelFunc - client *client.Client + client client.Client } func (s *configServer) OnConfig(cfgString string) { - s.client.Status(proto.StateObserved_CONFIGURING, "Writing config file") + s.client.Status(proto.StateObserved_CONFIGURING, "Writing config file", nil) testCfg := &TestConfig{} if err := yaml.Unmarshal([]byte(cfgString), &testCfg); err != nil { - s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to unmarshall config: %s", err)) + s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to unmarshall config: %s", err), nil) return } if testCfg.TestFile != "" { tf, err := os.Create(testCfg.TestFile) if err != nil { - s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to create file %s: %s", testCfg.TestFile, err)) + s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to create file %s: %s", testCfg.TestFile, err), nil) return } err = tf.Close() if err != nil { - s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to close file %s: %s", testCfg.TestFile, err)) + s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to close file %s: %s", testCfg.TestFile, err), nil) return } } @@ -75,14 +75,20 @@ func (s *configServer) OnConfig(cfgString string) { } if testCfg.Status != nil { - s.client.Status(*testCfg.Status, "Custom status") + s.client.Status(*testCfg.Status, "Custom status", map[string]interface{}{ + "status": *testCfg.Status, + "message": "Custom status", + }) } else { - s.client.Status(proto.StateObserved_HEALTHY, "Running") + s.client.Status(proto.StateObserved_HEALTHY, "Running", map[string]interface{}{ + "status": proto.StateObserved_HEALTHY, + "message": "Running", + }) } } func (s *configServer) OnStop() { - s.client.Status(proto.StateObserved_STOPPING, "Stopping") + s.client.Status(proto.StateObserved_STOPPING, "Stopping", nil) s.cancel() } diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index cdd90b3a1907..46123832e5fd 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -8,6 +8,7 @@ import ( "context" "fmt" "os" + "reflect" "sync" "time" @@ -146,7 +147,7 @@ func (a *Application) Stop() { // cleanup drops a.cleanUp() } - a.setState(state.Stopped, "Stopped") + a.setState(state.Stopped, "Stopped", nil) } // Shutdown stops the application (aka. subprocess). @@ -156,10 +157,10 @@ func (a *Application) Shutdown() { } // SetState sets the status of the application. -func (a *Application) SetState(status state.Status, msg string) { +func (a *Application) SetState(status state.Status, msg string, payload map[string]interface{}) { a.appLock.Lock() defer a.appLock.Unlock() - a.setState(status, msg) + a.setState(status, msg, payload) } func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.Info, cfg map[string]interface{}) { @@ -189,7 +190,7 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I } msg := fmt.Sprintf("exited with code: %d", procState.ExitCode()) - a.setState(state.Crashed, msg) + a.setState(state.Crashed, msg, nil) // it was a crash, cleanup anything required go a.cleanUp() @@ -214,7 +215,7 @@ func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState { return resChan } -func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg string) { +func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg string, payload map[string]interface{}) { var status state.Status switch pstatus { case proto.StateObserved_STARTING: @@ -230,13 +231,14 @@ func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg case proto.StateObserved_STOPPING: status = state.Stopping } - a.setState(status, msg) + a.setState(status, msg, payload) } -func (a *Application) setState(status state.Status, msg string) { - if a.state.Status != status || a.state.Message != msg { +func (a *Application) setState(status state.Status, msg string, payload map[string]interface{}) { + if a.state.Status != status || a.state.Message != msg || !reflect.DeepEqual(a.state.Payload, payload) { a.state.Status = status a.state.Message = msg + a.state.Payload = payload if a.reporter != nil { go a.reporter.OnStateChange(a.id, a.name, a.state) } diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/start.go b/x-pack/elastic-agent/pkg/core/plugin/process/start.go index 9ecaf7d5039b..3b675417970b 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/start.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/start.go @@ -54,8 +54,8 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] // Failed applications can be started again. if srvState != nil { - a.setState(state.Starting, "Starting") - srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message) + a.setState(state.Starting, "Starting", nil) + srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message, a.state.Payload) srvState.UpdateConfig(string(cfgStr)) } else { a.srvState, err = a.srv.Register(a, string(cfgStr)) @@ -66,9 +66,9 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string] if a.state.Status != state.Stopped { // restarting as it was previously in a different state - a.setState(state.Restarting, "Restarting") + a.setState(state.Restarting, "Restarting", nil) } else { - a.setState(state.Starting, "Starting") + a.setState(state.Starting, "Starting", nil) } defer func() { diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/status.go b/x-pack/elastic-agent/pkg/core/plugin/process/status.go index d8ba5039e291..eac8f2bc53e5 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/status.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/status.go @@ -18,7 +18,7 @@ import ( // OnStatusChange is the handler called by the GRPC server code. // // It updates the status of the application and handles restarting the application if needed. -func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string) { +func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{}) { a.appLock.Lock() defer a.appLock.Unlock() @@ -28,7 +28,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St return } - a.setStateFromProto(status, msg) + a.setStateFromProto(status, msg, payload) if status == proto.StateObserved_FAILED { // ignore when expected state is stopping if s.Expected() == proto.StateExpected_STOPPING { @@ -52,7 +52,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St err := a.start(ctx, tag, cfg) if err != nil { - a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err)) + a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil) } } } diff --git a/x-pack/elastic-agent/pkg/core/plugin/service/app.go b/x-pack/elastic-agent/pkg/core/plugin/service/app.go index 12a5c518435e..29bd5998891f 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/service/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/service/app.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "net" + "reflect" "sync" "time" @@ -125,10 +126,10 @@ func (a *Application) Started() bool { } // SetState sets the status of the application. -func (a *Application) SetState(status state.Status, msg string) { +func (a *Application) SetState(status state.Status, msg string, payload map[string]interface{}) { a.appLock.Lock() defer a.appLock.Unlock() - a.setState(status, msg) + a.setState(status, msg, payload) } // Start starts the application with a specified config. @@ -150,11 +151,11 @@ func (a *Application) Start(ctx context.Context, t app.Taggable, cfg map[string] // already started if a.srvState != nil { - a.setState(state.Starting, "Starting") - a.srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message) + a.setState(state.Starting, "Starting", nil) + a.srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message, a.state.Payload) a.srvState.UpdateConfig(string(cfgStr)) } else { - a.setState(state.Starting, "Starting") + a.setState(state.Starting, "Starting", nil) a.srvState, err = a.srv.Register(a, string(cfgStr)) if err != nil { return err @@ -226,9 +227,9 @@ func (a *Application) Stop() { } if err := a.srvState.Stop(a.processConfig.StopTimeout); err != nil { - a.setState(state.Failed, errors.New(err, "Failed to stopped").Error()) + a.setState(state.Failed, errors.New(err, "Failed to stopped").Error(), nil) } else { - a.setState(state.Stopped, "Stopped") + a.setState(state.Stopped, "Stopped", nil) } a.srvState = nil @@ -247,7 +248,7 @@ func (a *Application) Shutdown() { // destroy the application in the server, this skips sending // the expected stopping state to the service - a.setState(state.Stopped, "Stopped") + a.setState(state.Stopped, "Stopped", nil) a.srvState.Destroy() a.srvState = nil @@ -258,7 +259,7 @@ func (a *Application) Shutdown() { // OnStatusChange is the handler called by the GRPC server code. // // It updates the status of the application and handles restarting the application is needed. -func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string) { +func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string, payload map[string]interface{}) { a.appLock.Lock() defer a.appLock.Unlock() @@ -268,10 +269,10 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St return } - a.setStateFromProto(status, msg) + a.setStateFromProto(status, msg, payload) } -func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg string) { +func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg string, payload map[string]interface{}) { var status state.Status switch pstatus { case proto.StateObserved_STARTING: @@ -287,13 +288,14 @@ func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg case proto.StateObserved_STOPPING: status = state.Stopping } - a.setState(status, msg) + a.setState(status, msg, payload) } -func (a *Application) setState(status state.Status, msg string) { - if a.state.Status != status || a.state.Message != msg { +func (a *Application) setState(status state.Status, msg string, payload map[string]interface{}) { + if a.state.Status != status || a.state.Message != msg || !reflect.DeepEqual(a.state.Payload, payload) { a.state.Status = status a.state.Message = msg + a.state.Payload = payload if a.reporter != nil { go a.reporter.OnStateChange(a.id, a.name, a.state) } diff --git a/x-pack/elastic-agent/pkg/core/server/server.go b/x-pack/elastic-agent/pkg/core/server/server.go index b6bcba961b01..4cd5c8386ec1 100644 --- a/x-pack/elastic-agent/pkg/core/server/server.go +++ b/x-pack/elastic-agent/pkg/core/server/server.go @@ -72,6 +72,8 @@ type ApplicationState struct { expectedConfig string status proto.StateObserved_Status statusMessage string + statusPayload map[string]interface{} + statusPayloadStr string statusConfigIdx uint64 statusTime time.Time checkinConn bool @@ -88,7 +90,7 @@ type ApplicationState struct { // Handler is the used by the server to inform of status changes. type Handler interface { // OnStatusChange called when a registered application observed status is changed. - OnStatusChange(*ApplicationState, proto.StateObserved_Status, string) + OnStatusChange(*ApplicationState, proto.StateObserved_Status, string, map[string]interface{}) } // Server is the GRPC server that the launched applications connect back to. @@ -646,34 +648,51 @@ func (as *ApplicationState) Config() string { } // Status returns the current observed status. -func (as *ApplicationState) Status() (proto.StateObserved_Status, string) { +func (as *ApplicationState) Status() (proto.StateObserved_Status, string, map[string]interface{}) { as.checkinLock.RLock() defer as.checkinLock.RUnlock() - return as.status, as.statusMessage + return as.status, as.statusMessage, as.statusPayload } // SetStatus allows the status to be overwritten by the agent. // // This status will be overwritten by the client if it reconnects and updates it status. -func (as *ApplicationState) SetStatus(status proto.StateObserved_Status, msg string) { +func (as *ApplicationState) SetStatus(status proto.StateObserved_Status, msg string, payload map[string]interface{}) error { + payloadStr, err := json.Marshal(payload) + if err != nil { + return err + } as.checkinLock.RLock() as.status = status as.statusMessage = msg + as.statusPayload = payload + as.statusPayloadStr = string(payloadStr) as.checkinLock.RUnlock() + return nil } // updateStatus updates the current observed status from the application, sends the expected state back to the // application if the server expects it to be different then its observed state, and alerts the handler on the // server when the application status has changed. func (as *ApplicationState) updateStatus(checkin *proto.StateObserved, waitForReader bool) { + // convert payload from string to JSON + var payload map[string]interface{} + if checkin.Payload != "" { + // ignore the error, if client is sending bad JSON, then payload will just be nil + _ = json.Unmarshal([]byte(checkin.Payload), &payload) + } + as.checkinLock.Lock() expectedStatus := as.expected expectedConfigIdx := as.expectedConfigIdx expectedConfig := as.expectedConfig prevStatus := as.status prevMessage := as.statusMessage + prevPayloadStr := as.statusPayloadStr as.status = checkin.Status as.statusMessage = checkin.Message + as.statusPayloadStr = checkin.Payload + as.statusPayload = payload as.statusConfigIdx = checkin.ConfigStateIdx as.statusTime = time.Now().UTC() as.checkinLock.Unlock() @@ -697,8 +716,8 @@ func (as *ApplicationState) updateStatus(checkin *proto.StateObserved, waitForRe } // alert the service handler that status has changed for the application - if prevStatus != checkin.Status || prevMessage != checkin.Message { - as.srv.handler.OnStatusChange(as, checkin.Status, checkin.Message) + if prevStatus != checkin.Status || prevMessage != checkin.Message || prevPayloadStr != checkin.Payload { + as.srv.handler.OnStatusChange(as, checkin.Status, checkin.Message, payload) } } @@ -821,17 +840,21 @@ func (s *Server) watchdog() { message = "Missed two check-ins" serverApp.status = s serverApp.statusMessage = message + serverApp.statusPayload = nil + serverApp.statusPayloadStr = "" serverApp.statusTime = now } else if serverApp.status != proto.StateObserved_FAILED { s = proto.StateObserved_DEGRADED message = "Missed last check-in" serverApp.status = s serverApp.statusMessage = message + serverApp.statusPayload = nil + serverApp.statusPayloadStr = "" serverApp.statusTime = now } serverApp.checkinLock.Unlock() if prevStatus != s || prevMessage != message { - serverApp.srv.handler.OnStatusChange(serverApp, s, message) + serverApp.srv.handler.OnStatusChange(serverApp, s, message, nil) } } serverApp.flushExpiredActions() diff --git a/x-pack/elastic-agent/pkg/core/server/server_test.go b/x-pack/elastic-agent/pkg/core/server/server_test.go index 9bb82f303aed..0c26a44ed762 100644 --- a/x-pack/elastic-agent/pkg/core/server/server_test.go +++ b/x-pack/elastic-agent/pkg/core/server/server_test.go @@ -73,7 +73,7 @@ func TestServer_InitialCheckIn(t *testing.T) { })) // set status as healthy and running - c.Status(proto.StateObserved_HEALTHY, "Running") + c.Status(proto.StateObserved_HEALTHY, "Running", nil) // application state should be updated assert.NoError(t, waitFor(func() error { @@ -119,8 +119,8 @@ func TestServer_MultiClients(t *testing.T) { })) // set status differently - c1.Status(proto.StateObserved_HEALTHY, "Running") - c2.Status(proto.StateObserved_DEGRADED, "No upstream connection") + c1.Status(proto.StateObserved_HEALTHY, "Running", nil) + c2.Status(proto.StateObserved_DEGRADED, "No upstream connection", nil) // application states should be updated assert.NoError(t, waitFor(func() error { @@ -234,6 +234,7 @@ func TestServer_UpdateConfig(t *testing.T) { cImpl := &StubClientImpl{} c := newClientFromApplicationState(t, as, cImpl) require.NoError(t, c.Start(context.Background())) + require.NoError(t, c.Start(context.Background())) defer c.Stop() // clients should get initial check-ins then set as healthy @@ -243,7 +244,7 @@ func TestServer_UpdateConfig(t *testing.T) { } return nil })) - c.Status(proto.StateObserved_HEALTHY, "Running") + c.Status(proto.StateObserved_HEALTHY, "Running", nil) assert.NoError(t, waitFor(func() error { if app.Status() != proto.StateObserved_HEALTHY { return fmt.Errorf("server never updated currect application state") @@ -287,7 +288,7 @@ func TestServer_UpdateConfigDisconnected(t *testing.T) { } return nil })) - c.Status(proto.StateObserved_HEALTHY, "Running") + c.Status(proto.StateObserved_HEALTHY, "Running", nil) assert.NoError(t, waitFor(func() error { if app.Status() != proto.StateObserved_HEALTHY { return fmt.Errorf("server never updated currect application state") @@ -329,7 +330,7 @@ func TestServer_UpdateConfigStopping(t *testing.T) { } return nil })) - c.Status(proto.StateObserved_HEALTHY, "Running") + c.Status(proto.StateObserved_HEALTHY, "Running", nil) assert.NoError(t, waitFor(func() error { if app.Status() != proto.StateObserved_HEALTHY { return fmt.Errorf("server never updated currect application state") @@ -367,7 +368,7 @@ func TestServer_Stop(t *testing.T) { } return nil })) - c.Status(proto.StateObserved_HEALTHY, "Running") + c.Status(proto.StateObserved_HEALTHY, "Running", nil) assert.NoError(t, waitFor(func() error { if app.Status() != proto.StateObserved_HEALTHY { return fmt.Errorf("server never updated currect application state") @@ -395,14 +396,14 @@ func TestServer_Stop(t *testing.T) { } return nil })) - c.Status(proto.StateObserved_CONFIGURING, "Configuring") + c.Status(proto.StateObserved_CONFIGURING, "Configuring", nil) require.NoError(t, waitFor(func() error { if cImpl.Stop() < 1 { return fmt.Errorf("client never got expected stop again") } return nil })) - c.Status(proto.StateObserved_STOPPING, "Stopping") + c.Status(proto.StateObserved_STOPPING, "Stopping", nil) require.NoError(t, waitFor(func() error { if app.Status() != proto.StateObserved_STOPPING { return fmt.Errorf("server never updated to stopping") @@ -435,7 +436,7 @@ func TestServer_StopTimeout(t *testing.T) { } return nil })) - c.Status(proto.StateObserved_HEALTHY, "Running") + c.Status(proto.StateObserved_HEALTHY, "Running", nil) assert.NoError(t, waitFor(func() error { if app.Status() != proto.StateObserved_HEALTHY { return fmt.Errorf("server never updated currect application state") @@ -594,11 +595,11 @@ func createAndStartServer(t *testing.T, handler Handler, extraConfigs ...func(*S return srv } -func newClientFromApplicationState(t *testing.T, as *ApplicationState, impl client.StateInterface, actions ...client.Action) *client.Client { +func newClientFromApplicationState(t *testing.T, as *ApplicationState, impl client.StateInterface, actions ...client.Action) client.Client { t.Helper() var err error - var c *client.Client + var c client.Client var wg sync.WaitGroup r, w := io.Pipe() wg.Add(1) @@ -617,6 +618,7 @@ type StubApp struct { lock sync.RWMutex status proto.StateObserved_Status message string + payload map[string]interface{} } func (a *StubApp) Status() proto.StateObserved_Status { @@ -633,12 +635,13 @@ func (a *StubApp) Message() string { type StubHandler struct{} -func (h *StubHandler) OnStatusChange(as *ApplicationState, status proto.StateObserved_Status, message string) { +func (h *StubHandler) OnStatusChange(as *ApplicationState, status proto.StateObserved_Status, message string, payload map[string]interface{}) { stub := as.app.(*StubApp) stub.lock.Lock() defer stub.lock.Unlock() stub.status = status stub.message = message + stub.payload = payload } type StubClientImpl struct { diff --git a/x-pack/elastic-agent/pkg/core/state/state.go b/x-pack/elastic-agent/pkg/core/state/state.go index cd806943a135..6b7c8bd53dec 100644 --- a/x-pack/elastic-agent/pkg/core/state/state.go +++ b/x-pack/elastic-agent/pkg/core/state/state.go @@ -37,6 +37,7 @@ type State struct { ProcessInfo *process.Info Status Status Message string + Payload map[string]interface{} } // Reporter is interface that is called when a state is changed. diff --git a/x-pack/elastic-agent/pkg/reporter/reporter.go b/x-pack/elastic-agent/pkg/reporter/reporter.go index 3fad02a3c704..c36708a837f7 100644 --- a/x-pack/elastic-agent/pkg/reporter/reporter.go +++ b/x-pack/elastic-agent/pkg/reporter/reporter.go @@ -135,10 +135,17 @@ func generateRecord(agentID string, id string, name string, s state.State) event errors.TypeApplication, errors.M(errors.MetaKeyAppID, id), errors.M(errors.MetaKeyAppName, name)) + var payload map[string]interface{} + if s.Payload != nil { + payload = map[string]interface{}{ + name: s.Payload, + } + } return event{ eventype: eventType, subType: subType, timestamp: time.Now(), message: err.Error(), + payload: payload, } } diff --git a/x-pack/libbeat/management/fleet/manager.go b/x-pack/libbeat/management/fleet/manager.go index 0fa205220e4d..dc0185a04ace 100644 --- a/x-pack/libbeat/management/fleet/manager.go +++ b/x-pack/libbeat/management/fleet/manager.go @@ -35,7 +35,7 @@ type Manager struct { beatUUID uuid.UUID registry *reload.Registry blacklist *xmanagement.ConfigBlacklist - client *client.Client + client client.Client lock sync.Mutex status management.Status msg string @@ -67,7 +67,7 @@ func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uu var err error var blacklist *xmanagement.ConfigBlacklist - var eac *client.Client + var eac client.Client if c.Enabled && c.Mode == xmanagement.ModeFleet { // Initialize configs blacklist blacklist, err = xmanagement.NewConfigBlacklist(c.Blacklist) @@ -134,7 +134,7 @@ func (cm *Manager) UpdateStatus(status management.Status, msg string) { if cm.status != status || cm.msg != msg { cm.status = status cm.msg = msg - cm.client.Status(statusToProtoStatus(status), msg) + cm.client.Status(statusToProtoStatus(status), msg, nil) cm.logger.Infof("Status change to %s: %s", status, msg) } } @@ -173,12 +173,12 @@ func (cm *Manager) OnConfig(s string) { return } - cm.client.Status(proto.StateObserved_HEALTHY, "Running") + cm.client.Status(proto.StateObserved_HEALTHY, "Running", nil) } func (cm *Manager) OnStop() { if cm.stopFunc != nil { - cm.client.Status(proto.StateObserved_STOPPING, "Stopping") + cm.client.Status(proto.StateObserved_STOPPING, "Stopping", nil) cm.stopFunc() } }