Skip to content

Commit

Permalink
[Elastic Agent] Fix service application stop timeout issue (elastic#2…
Browse files Browse the repository at this point in the history
…0256) (elastic#20271)

* Add tests.

* Fix test.

* Add CHANGELOG.

(cherry picked from commit ae46fa5)
  • Loading branch information
blakerouse authored Jul 29, 2020
1 parent 4f5415f commit 0df5d78
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 24 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ x-pack/dockerlogbeat/temproot.tar
x-pack/elastic-agent/elastic_agent
x-pack/elastic-agent/fleet.yml
x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/configurable
x-pack/elastic-agent/pkg/agent/operation/tests/scripts/serviceable-1.0-darwin-x86_64/serviceable

# Editor swap files
*.swp
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
- Prevent closing closed reader {pull}20214[20214]
- Improve GRPC stop to be more relaxed {pull}20118[20118]
- Fix Windows service installation script {pull}20203[20203]
- Fix timeout issue stopping service applications {pull}20256[20256]

==== New features

Expand Down
9 changes: 6 additions & 3 deletions x-pack/elastic-agent/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,15 @@ func (Build) Clean() {
func (Build) TestBinaries() error {
p := filepath.Join("pkg", "agent", "operation", "tests", "scripts")

binaryName := "configurable"
configurableName := "configurable"
serviceableName := "serviceable"
if runtime.GOOS == "windows" {
binaryName += ".exe"
configurableName += ".exe"
serviceableName += ".exe"
}
return combineErr(
RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86_64", binaryName), filepath.Join(p, "configurable-1.0-darwin-x86_64", "main.go")),
RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86_64", configurableName), filepath.Join(p, "configurable-1.0-darwin-x86_64", "main.go")),
RunGo("build", "-o", filepath.Join(p, "serviceable-1.0-darwin-x86_64", serviceableName), filepath.Join(p, "serviceable-1.0-darwin-x86_64", "main.go")),
)
}

Expand Down
6 changes: 2 additions & 4 deletions x-pack/elastic-agent/pkg/agent/operation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,13 @@ func getLogger() *logger.Logger {
}

func getProgram(binary, version string) *app.Descriptor {
spec := program.SupportedMap[binary]
downloadCfg := &artifact.Config{
InstallPath: installPath,
OperatingSystem: "darwin",
Architecture: "32",
}
return app.NewDescriptor(program.Spec{
Name: binary,
Cmd: binary,
}, version, downloadCfg, nil)
return app.NewDescriptor(spec, version, downloadCfg, nil)
}

func getAbsPath(path string) string {
Expand Down
117 changes: 109 additions & 8 deletions x-pack/elastic-agent/pkg/agent/operation/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ package operation
import (
"fmt"
"math/rand"
"net"
"os"
"os/exec"
"path/filepath"
"runtime"
"testing"
Expand All @@ -27,17 +29,26 @@ func TestMain(m *testing.M) {
Cmd: "configurable",
Args: []string{},
}
port, err := getFreePort()
if err != nil {
panic(err)
}
serviceSpec := program.Spec{
ServicePort: port,
Name: "serviceable",
Cmd: "serviceable",
Args: []string{fmt.Sprintf("%d", port)},
}

program.Supported = append(program.Supported, configurableSpec)
program.Supported = append(program.Supported, configurableSpec, serviceSpec)
program.SupportedMap["configurable"] = configurableSpec
program.SupportedMap["serviceable"] = serviceSpec

p := getProgram("configurable", "1.0")
spec := p.Spec()
path := spec.BinaryPath
if runtime.GOOS == "windows" {
path += ".exe"
if err := isAvailable("configurable", "1.0"); err != nil {
panic(err)
}
if s, err := os.Stat(path); err != nil || s == nil {
panic(fmt.Errorf("binary not available %s", spec.BinaryPath))
if err := isAvailable("serviceable", "1.0"); err != nil {
panic(err)
}

os.Exit(m.Run())
Expand Down Expand Up @@ -366,3 +377,93 @@ func TestConfigurableStartStop(t *testing.T) {
})
}
}

func TestConfigurableService(t *testing.T) {
p := getProgram("serviceable", "1.0")

operator := getTestOperator(t, downloadPath, installPath, p)
if err := operator.start(p, nil); err != nil {
t.Fatal(err)
}
defer operator.stop(p) // failure catch, to ensure no sub-process stays running

// emulating a service, so we need to start the binary here in the test
spec := p.Spec()
cmd := exec.Command(spec.BinaryPath, fmt.Sprintf("%d", p.ServicePort()))
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Dir = filepath.Dir(spec.BinaryPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
t.Fatal(err)
}

waitFor(t, func() error {
items := operator.State()
item, ok := items[p.ID()]
if !ok {
return fmt.Errorf("no state for process")
}
if item.Status != state.Running {
return fmt.Errorf("process never went to running")
}
return nil
})

// try to configure
cfg := make(map[string]interface{})
tstFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("tmp%d", rand.Uint32()))
cfg["TestFile"] = tstFilePath
if err := operator.pushConfig(p, cfg); err != nil {
t.Fatalf("failed to config: %v", err)
}

waitFor(t, func() error {
if s, err := os.Stat(tstFilePath); err != nil || s == nil {
return fmt.Errorf("failed to create a file using Config call %s", tstFilePath)
}
return nil
})

items := operator.State()
item0, ok := items[p.ID()]
if !ok || item0.Status != state.Running {
t.Fatalf("Process no longer running after config %#v", items)
}

// stop the process
if err := operator.stop(p); err != nil {
t.Fatalf("Failed to stop service: %v", err)
}

if err := cmd.Wait(); err != nil {
t.Fatalf("Process failed: %v", err)
}
}

func isAvailable(name, version string) error {
p := getProgram(name, version)
spec := p.Spec()
path := spec.BinaryPath
if runtime.GOOS == "windows" {
path += ".exe"
}
if s, err := os.Stat(path); err != nil || s == nil {
return fmt.Errorf("binary not available %s", spec.BinaryPath)
}
return nil
}

// getFreePort finds a free port.
func getFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Testing program emulating tool which is configurable using GRPC communication channel.
Testing program emulating tool which is configurable using GRPC communication channel when running as a sub-process.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Testing program emulating tool which is configurable using GRPC communication channel when running as an external service.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"os"
"path/filepath"
"strconv"

protobuf "github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"gopkg.in/yaml.v2"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
)

func main() {
srvPort, err := strconv.Atoi(os.Args[1])
if err != nil {
panic(err)
}
f, _ := os.OpenFile(filepath.Join(os.TempDir(), "testing.out"), os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666)
f.WriteString("starting \n")
ctx, cancel := context.WithCancel(context.Background())
s := &configServer{
f: f,
ctx: ctx,
cancel: cancel,
}
f.WriteString(fmt.Sprintf("reading creds from port: %d\n", srvPort))
client, err := clientFromNet(srvPort, s)
if err != nil {
f.WriteString(err.Error())
panic(err)
}
s.client = client
err = client.Start(ctx)
if err != nil {
f.WriteString(err.Error())
panic(err)
}
<-ctx.Done()
f.WriteString("finished \n")
}

type configServer struct {
f *os.File
ctx context.Context
cancel context.CancelFunc
client client.Client
}

func (s *configServer) OnConfig(cfgString string) {
s.client.Status(proto.StateObserved_CONFIGURING, "Writing config file", nil)

testCfg := &TestConfig{}
if err := yaml.Unmarshal([]byte(cfgString), &testCfg); err != nil {
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to unmarshall config: %s", err), nil)
return
}

if testCfg.TestFile != "" {
tf, err := os.Create(testCfg.TestFile)
if err != nil {
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to create file %s: %s", testCfg.TestFile, err), nil)
return
}

err = tf.Close()
if err != nil {
s.client.Status(proto.StateObserved_FAILED, fmt.Sprintf("Failed to close file %s: %s", testCfg.TestFile, err), nil)
return
}
}

s.client.Status(proto.StateObserved_HEALTHY, "Running", map[string]interface{}{
"status": proto.StateObserved_HEALTHY,
"message": "Running",
})
}

func (s *configServer) OnStop() {
s.client.Status(proto.StateObserved_STOPPING, "Stopping", nil)
s.cancel()
}

func (s *configServer) OnError(err error) {
s.f.WriteString(err.Error())
}

// TestConfig is a configuration for testing Config calls
type TestConfig struct {
TestFile string `config:"TestFile" yaml:"TestFile"`
}

func getCreds(port int) (*proto.ConnInfo, error) {
c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port))
if err != nil {
return nil, err
}
defer c.Close()
buf := make([]byte, 1024*1024)
n, err := c.Read(buf)
if err != nil {
return nil, err
}
var connInfo proto.ConnInfo
err = protobuf.Unmarshal(buf[:n], &connInfo)
if err != nil {
return nil, err
}
return &connInfo, nil
}

func clientFromNet(port int, impl client.StateInterface, actions ...client.Action) (client.Client, error) {
connInfo, err := getCreds(port)
if err != nil {
return nil, err
}
cert, err := tls.X509KeyPair(connInfo.PeerCert, connInfo.PeerKey)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(connInfo.CaCert)
trans := credentials.NewTLS(&tls.Config{
ServerName: connInfo.ServerName,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
})
return client.New(connInfo.Addr, connInfo.Token, impl, actions, grpc.WithTransportCredentials(trans)), nil
}
16 changes: 11 additions & 5 deletions x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,20 +123,26 @@ func (a *Application) Started() bool {
// Stop stops the current application.
func (a *Application) Stop() {
a.appLock.Lock()
defer a.appLock.Unlock()
status := a.state.Status
srvState := a.srvState
a.appLock.Unlock()

if a.state.Status == state.Stopped {
if status == state.Stopped {
return
}

stopSig := os.Interrupt
if a.srvState != nil {
if err := a.srvState.Stop(a.processConfig.StopTimeout); err != nil {
if srvState != nil {
if err := srvState.Stop(a.processConfig.StopTimeout); err != nil {
// kill the process if stop through GRPC doesn't work
stopSig = os.Kill
}
a.srvState = nil
}

a.appLock.Lock()
defer a.appLock.Unlock()

a.srvState = nil
if a.state.ProcessInfo != nil {
if err := a.state.ProcessInfo.Process.Signal(stopSig); err == nil {
// no error on signal, so wait for it to stop
Expand Down
10 changes: 7 additions & 3 deletions x-pack/elastic-agent/pkg/core/plugin/service/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,21 +220,25 @@ func (a *Application) Configure(_ context.Context, config map[string]interface{}
// Stop stops the current application.
func (a *Application) Stop() {
a.appLock.Lock()
defer a.appLock.Unlock()
srvState := a.srvState
a.appLock.Unlock()

if a.srvState == nil {
if srvState == nil {
return
}

if err := a.srvState.Stop(a.processConfig.StopTimeout); err != nil {
if err := srvState.Stop(a.processConfig.StopTimeout); err != nil {
a.appLock.Lock()
a.setState(state.Failed, errors.New(err, "Failed to stopped").Error(), nil)
} else {
a.appLock.Lock()
a.setState(state.Stopped, "Stopped", nil)
}
a.srvState = nil

a.cleanUp()
a.stopCredsListener()
a.appLock.Unlock()
}

// Shutdown disconnects the service, but doesn't signal it to stop.
Expand Down

0 comments on commit 0df5d78

Please sign in to comment.