From 6dab14cca1c72524e8feec93c1b72f31b45115b8 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Tue, 28 Jul 2020 09:10:49 -0400 Subject: [PATCH] [Elastic Agent] Fix service application stop timeout issue (#20256) * Add tests. * Fix test. * Add CHANGELOG. --- .gitignore | 1 + x-pack/elastic-agent/CHANGELOG.asciidoc | 1 + x-pack/elastic-agent/magefile.go | 9 +- .../pkg/agent/operation/common_test.go | 6 +- .../pkg/agent/operation/operator_test.go | 117 ++++++++++++++- .../configurable-1.0-darwin-x86_64/README.md | 2 +- .../serviceable-1.0-darwin-x86_64/README.md | 1 + .../serviceable-1.0-darwin-x86_64/main.go | 141 ++++++++++++++++++ .../pkg/core/plugin/process/app.go | 16 +- .../pkg/core/plugin/service/app.go | 10 +- 10 files changed, 280 insertions(+), 24 deletions(-) create mode 100644 x-pack/elastic-agent/pkg/agent/operation/tests/scripts/serviceable-1.0-darwin-x86_64/README.md create mode 100644 x-pack/elastic-agent/pkg/agent/operation/tests/scripts/serviceable-1.0-darwin-x86_64/main.go diff --git a/.gitignore b/.gitignore index 78e09df9348..4f2f4f719b9 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ x-pack/dockerlogbeat/temproot.tar x-pack/elastic-agent/elastic_agent x-pack/elastic-agent/fleet.yml x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/configurable +x-pack/elastic-agent/pkg/agent/operation/tests/scripts/serviceable-1.0-darwin-x86_64/serviceable # Editor swap files *.swp diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index 1d8d8c2bf83..e0e925847aa 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -56,6 +56,7 @@ - Improve GRPC stop to be more relaxed {pull}20118[20118] - Prevent closing closed reader {pull}20214[20214] - Fix Windows service installation script {pull}20203[20203] +- Fix timeout issue stopping service applications {pull}20256[20256] ==== New features diff --git a/x-pack/elastic-agent/magefile.go b/x-pack/elastic-agent/magefile.go index e17f8a68eab..47d74b10541 100644 --- a/x-pack/elastic-agent/magefile.go +++ b/x-pack/elastic-agent/magefile.go @@ -167,12 +167,15 @@ func (Build) Clean() { func (Build) TestBinaries() error { p := filepath.Join("pkg", "agent", "operation", "tests", "scripts") - binaryName := "configurable" + configurableName := "configurable" + serviceableName := "serviceable" if runtime.GOOS == "windows" { - binaryName += ".exe" + configurableName += ".exe" + serviceableName += ".exe" } return combineErr( - RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86_64", binaryName), filepath.Join(p, "configurable-1.0-darwin-x86_64", "main.go")), + RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86_64", configurableName), filepath.Join(p, "configurable-1.0-darwin-x86_64", "main.go")), + RunGo("build", "-o", filepath.Join(p, "serviceable-1.0-darwin-x86_64", serviceableName), filepath.Join(p, "serviceable-1.0-darwin-x86_64", "main.go")), ) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/common_test.go b/x-pack/elastic-agent/pkg/agent/operation/common_test.go index 1e57335c121..070f87a7432 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/common_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/common_test.go @@ -94,15 +94,13 @@ func getLogger() *logger.Logger { } func getProgram(binary, version string) *app.Descriptor { + spec := program.SupportedMap[binary] downloadCfg := &artifact.Config{ InstallPath: installPath, OperatingSystem: "darwin", Architecture: "32", } - return app.NewDescriptor(program.Spec{ - Name: binary, - Cmd: binary, - }, version, downloadCfg, nil) + return app.NewDescriptor(spec, version, downloadCfg, nil) } func getAbsPath(path string) string { diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator_test.go b/x-pack/elastic-agent/pkg/agent/operation/operator_test.go index 6c43bc12274..e0dce31816b 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator_test.go @@ -7,7 +7,9 @@ package operation import ( "fmt" "math/rand" + "net" "os" + "os/exec" "path/filepath" "runtime" "testing" @@ -27,17 +29,26 @@ func TestMain(m *testing.M) { Cmd: "configurable", Args: []string{}, } + port, err := getFreePort() + if err != nil { + panic(err) + } + serviceSpec := program.Spec{ + ServicePort: port, + Name: "serviceable", + Cmd: "serviceable", + Args: []string{fmt.Sprintf("%d", port)}, + } - program.Supported = append(program.Supported, configurableSpec) + program.Supported = append(program.Supported, configurableSpec, serviceSpec) + program.SupportedMap["configurable"] = configurableSpec + program.SupportedMap["serviceable"] = serviceSpec - p := getProgram("configurable", "1.0") - spec := p.Spec() - path := spec.BinaryPath - if runtime.GOOS == "windows" { - path += ".exe" + if err := isAvailable("configurable", "1.0"); err != nil { + panic(err) } - if s, err := os.Stat(path); err != nil || s == nil { - panic(fmt.Errorf("binary not available %s", spec.BinaryPath)) + if err := isAvailable("serviceable", "1.0"); err != nil { + panic(err) } os.Exit(m.Run()) @@ -366,3 +377,93 @@ func TestConfigurableStartStop(t *testing.T) { }) } } + +func TestConfigurableService(t *testing.T) { + p := getProgram("serviceable", "1.0") + + operator := getTestOperator(t, downloadPath, installPath, p) + if err := operator.start(p, nil); err != nil { + t.Fatal(err) + } + defer operator.stop(p) // failure catch, to ensure no sub-process stays running + + // emulating a service, so we need to start the binary here in the test + spec := p.Spec() + cmd := exec.Command(spec.BinaryPath, fmt.Sprintf("%d", p.ServicePort())) + cmd.Env = append(cmd.Env, os.Environ()...) + cmd.Dir = filepath.Dir(spec.BinaryPath) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + + waitFor(t, func() error { + items := operator.State() + item, ok := items[p.ID()] + if !ok { + return fmt.Errorf("no state for process") + } + if item.Status != state.Running { + return fmt.Errorf("process never went to running") + } + return nil + }) + + // try to configure + cfg := make(map[string]interface{}) + tstFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("tmp%d", rand.Uint32())) + cfg["TestFile"] = tstFilePath + if err := operator.pushConfig(p, cfg); err != nil { + t.Fatalf("failed to config: %v", err) + } + + waitFor(t, func() error { + if s, err := os.Stat(tstFilePath); err != nil || s == nil { + return fmt.Errorf("failed to create a file using Config call %s", tstFilePath) + } + return nil + }) + + items := operator.State() + item0, ok := items[p.ID()] + if !ok || item0.Status != state.Running { + t.Fatalf("Process no longer running after config %#v", items) + } + + // stop the process + if err := operator.stop(p); err != nil { + t.Fatalf("Failed to stop service: %v", err) + } + + if err := cmd.Wait(); err != nil { + t.Fatalf("Process failed: %v", err) + } +} + +func isAvailable(name, version string) error { + p := getProgram(name, version) + spec := p.Spec() + path := spec.BinaryPath + if runtime.GOOS == "windows" { + path += ".exe" + } + if s, err := os.Stat(path); err != nil || s == nil { + return fmt.Errorf("binary not available %s", spec.BinaryPath) + } + return nil +} + +// getFreePort finds a free port. +func getFreePort() (int, error) { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return 0, err + } + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return 0, err + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil +} diff --git a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/README.md b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/README.md index 71e80463f7c..309d9b655d8 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/README.md +++ b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/README.md @@ -1 +1 @@ -Testing program emulating tool which is configurable using GRPC communication channel. +Testing program emulating tool which is configurable using GRPC communication channel when running as a sub-process. diff --git a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/serviceable-1.0-darwin-x86_64/README.md b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/serviceable-1.0-darwin-x86_64/README.md new file mode 100644 index 00000000000..da8cc52049c --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/serviceable-1.0-darwin-x86_64/README.md @@ -0,0 +1 @@ +Testing program emulating tool which is configurable using GRPC communication channel when running as an external service. diff --git a/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/serviceable-1.0-darwin-x86_64/main.go b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/serviceable-1.0-darwin-x86_64/main.go new file mode 100644 index 00000000000..da9123f4587 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/operation/tests/scripts/serviceable-1.0-darwin-x86_64/main.go @@ -0,0 +1,141 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package main + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "net" + "os" + "path/filepath" + "strconv" + + protobuf "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "gopkg.in/yaml.v2" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +func main() { + srvPort, err := strconv.Atoi(os.Args[1]) + if err != nil { + panic(err) + } + f, _ := os.OpenFile(filepath.Join(os.TempDir(), "testing.out"), os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666) + f.WriteString("starting \n") + ctx, cancel := context.WithCancel(context.Background()) + s := &configServer{ + f: f, + ctx: ctx, + cancel: cancel, + } + f.WriteString(fmt.Sprintf("reading creds from port: %d\n", srvPort)) + client, err := clientFromNet(srvPort, s) + if err != nil { + f.WriteString(err.Error()) + panic(err) + } + s.client = client + err = client.Start(ctx) + if err != nil { + f.WriteString(err.Error()) + panic(err) + } + <-ctx.Done() + f.WriteString("finished \n") +} + +type configServer struct { + f *os.File + ctx context.Context + cancel context.CancelFunc + client client.Client +} + +func (s *configServer) OnConfig(cfgString string) { + s.client.Status(proto.StateObserved_CONFIGURING, "Writing config file", nil) + + testCfg := &TestConfig{} + if err := yaml.Unmarshal([]byte(cfgString), &testCfg); err != nil { + s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to unmarshall config: %s", err), nil) + return + } + + if testCfg.TestFile != "" { + tf, err := os.Create(testCfg.TestFile) + if err != nil { + s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to create file %s: %s", testCfg.TestFile, err), nil) + return + } + + err = tf.Close() + if err != nil { + s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to close file %s: %s", testCfg.TestFile, err), nil) + return + } + } + + s.client.Status(proto.StateObserved_HEALTHY, "Running", map[string]interface{}{ + "status": proto.StateObserved_HEALTHY, + "message": "Running", + }) +} + +func (s *configServer) OnStop() { + s.client.Status(proto.StateObserved_STOPPING, "Stopping", nil) + s.cancel() +} + +func (s *configServer) OnError(err error) { + s.f.WriteString(err.Error()) +} + +// TestConfig is a configuration for testing Config calls +type TestConfig struct { + TestFile string `config:"TestFile" yaml:"TestFile"` +} + +func getCreds(port int) (*proto.ConnInfo, error) { + c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + return nil, err + } + defer c.Close() + buf := make([]byte, 1024*1024) + n, err := c.Read(buf) + if err != nil { + return nil, err + } + var connInfo proto.ConnInfo + err = protobuf.Unmarshal(buf[:n], &connInfo) + if err != nil { + return nil, err + } + return &connInfo, nil +} + +func clientFromNet(port int, impl client.StateInterface, actions ...client.Action) (client.Client, error) { + connInfo, err := getCreds(port) + if err != nil { + return nil, err + } + cert, err := tls.X509KeyPair(connInfo.PeerCert, connInfo.PeerKey) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(connInfo.CaCert) + trans := credentials.NewTLS(&tls.Config{ + ServerName: connInfo.ServerName, + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + }) + return client.New(connInfo.Addr, connInfo.Token, impl, actions, grpc.WithTransportCredentials(trans)), nil +} diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index 8bdaae6f94a..5f9cf6efb25 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -123,20 +123,26 @@ func (a *Application) Started() bool { // Stop stops the current application. func (a *Application) Stop() { a.appLock.Lock() - defer a.appLock.Unlock() + status := a.state.Status + srvState := a.srvState + a.appLock.Unlock() - if a.state.Status == state.Stopped { + if status == state.Stopped { return } stopSig := os.Interrupt - if a.srvState != nil { - if err := a.srvState.Stop(a.processConfig.StopTimeout); err != nil { + if srvState != nil { + if err := srvState.Stop(a.processConfig.StopTimeout); err != nil { // kill the process if stop through GRPC doesn't work stopSig = os.Kill } - a.srvState = nil } + + a.appLock.Lock() + defer a.appLock.Unlock() + + a.srvState = nil if a.state.ProcessInfo != nil { if err := a.state.ProcessInfo.Process.Signal(stopSig); err == nil { // no error on signal, so wait for it to stop diff --git a/x-pack/elastic-agent/pkg/core/plugin/service/app.go b/x-pack/elastic-agent/pkg/core/plugin/service/app.go index fc5ea4b5de4..d9e9fc681a2 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/service/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/service/app.go @@ -220,21 +220,25 @@ func (a *Application) Configure(_ context.Context, config map[string]interface{} // Stop stops the current application. func (a *Application) Stop() { a.appLock.Lock() - defer a.appLock.Unlock() + srvState := a.srvState + a.appLock.Unlock() - if a.srvState == nil { + if srvState == nil { return } - if err := a.srvState.Stop(a.processConfig.StopTimeout); err != nil { + if err := srvState.Stop(a.processConfig.StopTimeout); err != nil { + a.appLock.Lock() a.setState(state.Failed, errors.New(err, "Failed to stopped").Error(), nil) } else { + a.appLock.Lock() a.setState(state.Stopped, "Stopped", nil) } a.srvState = nil a.cleanUp() a.stopCredsListener() + a.appLock.Unlock() } // Shutdown disconnects the service, but doesn't signal it to stop.