From 77a8472b0a5f16d7f21763b4edb1a6c876e549db Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Thu, 30 Jul 2020 11:33:36 -0400 Subject: [PATCH] [Elastic Agent] Add skeleton for client/server for agent control protocol (#20163) * Add protocl to control Elastic Agent. * Fix CI with protoc. * Remove CI changes. * Start on the control server code. * More client/server work. * More work. * Add test. * Fix vet issues. * Fix permissions on unix socket. Add comment to Windows npipe. --- .../elastic-agent/pkg/agent/control/addr.go | 20 ++ .../pkg/agent/control/addr_windows.go | 22 ++ .../pkg/agent/control/client/client.go | 188 ++++++++++++++++++ .../pkg/agent/control/client/dial.go | 26 +++ .../pkg/agent/control/client/dial_windows.go | 26 +++ .../pkg/agent/control/control_test.go | 53 +++++ .../pkg/agent/control/server/listener.go | 38 ++++ .../agent/control/server/listener_windows.go | 29 +++ .../pkg/agent/control/server/server.go | 106 ++++++++++ .../elastic-agent/pkg/agent/control/time.go | 10 + 10 files changed, 518 insertions(+) create mode 100644 x-pack/elastic-agent/pkg/agent/control/addr.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/addr_windows.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/client/client.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/client/dial.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/control_test.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/server/listener.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/server/server.go create mode 100644 x-pack/elastic-agent/pkg/agent/control/time.go diff --git a/x-pack/elastic-agent/pkg/agent/control/addr.go b/x-pack/elastic-agent/pkg/agent/control/addr.go new file mode 100644 index 00000000000..3416480a6a0 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/addr.go @@ -0,0 +1,20 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !windows + +package control + +import ( + "fmt" + "path/filepath" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" +) + +// Address returns the address to connect to Elastic Agent daemon. +func Address() string { + data := paths.Data() + return fmt.Sprintf("unix://%s", filepath.Join(data, "agent.sock")) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/addr_windows.go b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go new file mode 100644 index 00000000000..1123eec941b --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/addr_windows.go @@ -0,0 +1,22 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build windows + +package control + +import ( + "crypto/sha256" + "fmt" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" +) + +// Address returns the address to connect to Elastic Agent daemon. +func Address() string { + data = paths.Data() + // entire string cannot be longer than 256 characters, this forces the + // length to always be 87 characters (but unique per data path) + return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data)) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/client/client.go b/x-pack/elastic-agent/pkg/agent/control/client/client.go new file mode 100644 index 00000000000..bcd8eccdb82 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/client/client.go @@ -0,0 +1,188 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package client + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" +) + +// Status is the status of the Elastic Agent +type Status = proto.Status + +const ( + // Starting is when the it is still starting. + Starting Status = proto.Status_STARTING + // Configuring is when it is configuring. + Configuring Status = proto.Status_CONFIGURING + // Healthy is when it is healthy. + Healthy Status = proto.Status_HEALTHY + // Degraded is when it is degraded. + Degraded Status = proto.Status_DEGRADED + // Failed is when it is failed. + Failed Status = proto.Status_FAILED + // Stopping is when it is stopping. + Stopping Status = proto.Status_STOPPING + // Upgrading is when it is upgrading. + Upgrading Status = proto.Status_UPGRADING +) + +// Version is the current running version of the daemon. +type Version struct { + Version string + Commit string + BuildTime time.Time + Snapshot bool +} + +// ApplicationStatus is a status of an application inside of Elastic Agent. +type ApplicationStatus struct { + ID string + Name string + Status Status + Message string + Payload map[string]interface{} +} + +// AgentStatus is the current status of the Elastic Agent. +type AgentStatus struct { + Status Status + Message string + Applications []*ApplicationStatus +} + +// Client communicates to Elastic Agent through the control protocol. +type Client interface { + // Start starts the client. + Start(ctx context.Context) error + // Stop stops the client. + Stop() + // Version returns the current version of the running agent. + Version(ctx context.Context) (Version, error) + // Status returns the current status of the running agent. + Status(ctx context.Context) (*AgentStatus, error) + // Restart triggers restarting the current running daemon. + Restart(ctx context.Context) error + // Upgrade triggers upgrade of the current running daemon. + Upgrade(ctx context.Context, version string, sourceURI string) (string, error) +} + +// client manages the state and communication to the Elastic Agent. +type client struct { + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + client proto.ElasticAgentClient + cfgLock sync.RWMutex + obsLock sync.RWMutex +} + +// New creates a client connection to Elastic Agent. +func New() Client { + return &client{} +} + +// Start starts the connection to Elastic Agent. +func (c *client) Start(ctx context.Context) error { + c.ctx, c.cancel = context.WithCancel(ctx) + conn, err := dialContext(ctx) + if err != nil { + return err + } + c.client = proto.NewElasticAgentClient(conn) + return nil +} + +// Stop stops the connection to Elastic Agent. +func (c *client) Stop() { + if c.cancel != nil { + c.cancel() + c.wg.Wait() + c.ctx = nil + c.cancel = nil + } +} + +// Version returns the current version of the running agent. +func (c *client) Version(ctx context.Context) (Version, error) { + res, err := c.client.Version(ctx, &proto.Empty{}) + if err != nil { + return Version{}, err + } + bt, err := time.Parse(control.TimeFormat(), res.BuildTime) + if err != nil { + return Version{}, err + } + return Version{ + Version: res.Version, + Commit: res.Commit, + BuildTime: bt, + Snapshot: res.Snapshot, + }, nil +} + +// Status returns the current status of the running agent. +func (c *client) Status(ctx context.Context) (*AgentStatus, error) { + res, err := c.client.Status(ctx, &proto.Empty{}) + if err != nil { + return nil, err + } + s := &AgentStatus{ + Status: res.Status, + Message: res.Message, + Applications: make([]*ApplicationStatus, len(res.Applications)), + } + for i, appRes := range res.Applications { + var payload map[string]interface{} + if appRes.Payload != "" { + err := json.Unmarshal([]byte(appRes.Payload), &payload) + if err != nil { + return nil, err + } + } + s.Applications[i] = &ApplicationStatus{ + ID: appRes.Id, + Name: appRes.Name, + Status: appRes.Status, + Message: appRes.Message, + Payload: payload, + } + } + return s, nil +} + +// Restart triggers restarting the current running daemon. +func (c *client) Restart(ctx context.Context) error { + res, err := c.client.Restart(ctx, &proto.Empty{}) + if err != nil { + return err + } + if res.Status == proto.ActionStatus_FAILURE { + return fmt.Errorf(res.Error) + } + return nil +} + +// Upgrade triggers upgrade of the current running daemon. +func (c *client) Upgrade(ctx context.Context, version string, sourceURI string) (string, error) { + res, err := c.client.Upgrade(ctx, &proto.UpgradeRequest{ + Version: version, + SourceURI: sourceURI, + }) + if err != nil { + return "", err + } + if res.Status == proto.ActionStatus_FAILURE { + return "", fmt.Errorf(res.Error) + } + return res.Version, nil +} diff --git a/x-pack/elastic-agent/pkg/agent/control/client/dial.go b/x-pack/elastic-agent/pkg/agent/control/client/dial.go new file mode 100644 index 00000000000..56313b12c82 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/client/dial.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !windows + +package client + +import ( + "context" + "net" + "strings" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + + "google.golang.org/grpc" +) + +func dialContext(ctx context.Context) (*grpc.ClientConn, error) { + return grpc.DialContext(ctx, strings.TrimPrefix(control.Address(), "unix://"), grpc.WithInsecure(), grpc.WithContextDialer(dialer)) +} + +func dialer(ctx context.Context, addr string) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, "unix", addr) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go b/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go new file mode 100644 index 00000000000..58b36c18043 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go @@ -0,0 +1,26 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build windows + +package client + +import ( + "context" + "net" + + "google.golang.org/grpc" + + "github.com/elastic/beats/v7/libbeat/api/npipe" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" +) + +func dialContext(ctx context.Context) (*grpc.ClientConn, error) { + return grpc.DialContext(ctx, control.Address(), grpc.WithInsecure(), grpc.WithContextDialer(dialer)) +} + +func dialer(ctx context.Context, addr string) (net.Conn, error) { + return npipe.DialContext(arr)(ctx, "", "") +} diff --git a/x-pack/elastic-agent/pkg/agent/control/control_test.go b/x-pack/elastic-agent/pkg/agent/control/control_test.go new file mode 100644 index 00000000000..13d32420258 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/control_test.go @@ -0,0 +1,53 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package control_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/logp" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" +) + +func TestServerClient_Version(t *testing.T) { + srv := server.New(newErrorLogger(t)) + err := srv.Start() + require.NoError(t, err) + defer srv.Stop() + + c := client.New() + err = c.Start(context.Background()) + require.NoError(t, err) + defer c.Stop() + + ver, err := c.Version(context.Background()) + require.NoError(t, err) + + assert.Equal(t, client.Version{ + Version: release.Version(), + Commit: release.Commit(), + BuildTime: release.BuildTime(), + Snapshot: release.Snapshot(), + }, ver) +} + +func newErrorLogger(t *testing.T) *logger.Logger { + t.Helper() + + loggerCfg := logger.DefaultLoggingConfig() + loggerCfg.Level = logp.ErrorLevel + + log, err := logger.NewFromConfig("", loggerCfg) + require.NoError(t, err) + return log +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener.go b/x-pack/elastic-agent/pkg/agent/control/server/listener.go new file mode 100644 index 00000000000..2dd5d54a46f --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener.go @@ -0,0 +1,38 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !windows + +package server + +import ( + "net" + "os" + "path/filepath" + "strings" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" +) + +func createListener() (net.Listener, error) { + path := strings.TrimPrefix(control.Address(), "unix://") + dir := filepath.Dir(path) + if _, err := os.Stat(dir); os.IsNotExist(err) { + err = os.MkdirAll(dir, 0755) + if err != nil { + return nil, err + } + } + lis, err := net.Listen("unix", path) + if err != nil { + return nil, err + } + err = os.Chmod(path, 0700) + if err != nil { + // failed to set permissions (close listener) + lis.Close() + return nil, err + } + return lis, err +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go new file mode 100644 index 00000000000..d2d2866b98a --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/server/listener_windows.go @@ -0,0 +1,29 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build windows + +package server + +import ( + "net" + "os/user" + + "github.com/elastic/beats/v7/libbeat/api/npipe" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" +) + +// createListener creates a named pipe listener on Windows +func createListener() (net.Listener, error) { + u, err := user.Current() + if err != nil { + return nil, err + } + sd, err := npipe.DefaultSD(u.Username) + if err != nil { + return nil, err + } + return npipe.NewListener(control.Address(), sd) +} diff --git a/x-pack/elastic-agent/pkg/agent/control/server/server.go b/x-pack/elastic-agent/pkg/agent/control/server/server.go new file mode 100644 index 00000000000..c9a750808fc --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/server/server.go @@ -0,0 +1,106 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package server + +import ( + "context" + "net" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" + + "google.golang.org/grpc" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" +) + +// Server is the daemon side of the control protocol. +type Server struct { + logger *logger.Logger + listener net.Listener + server *grpc.Server +} + +// New creates a new control protocol server. +func New(log *logger.Logger) *Server { + return &Server{ + logger: log, + } +} + +// Start starts the GRPC endpoint and accepts new connections. +func (s *Server) Start() error { + if s.server != nil { + // already started + return nil + } + + lis, err := createListener() + if err != nil { + return err + } + s.listener = lis + s.server = grpc.NewServer() + proto.RegisterElasticAgentServer(s.server, s) + + // start serving GRPC connections + go func() { + err := s.server.Serve(lis) + if err != nil { + s.logger.Errorf("error listening for GRPC: %s", err) + } + }() + + return nil +} + +// Stop stops the GRPC endpoint. +func (s *Server) Stop() { + if s.server != nil { + s.server.Stop() + s.server = nil + s.listener = nil + } +} + +// Version returns the currently running version. +func (s *Server) Version(_ context.Context, _ *proto.Empty) (*proto.VersionResponse, error) { + return &proto.VersionResponse{ + Version: release.Version(), + Commit: release.Commit(), + BuildTime: release.BuildTime().Format(control.TimeFormat()), + Snapshot: release.Snapshot(), + }, nil +} + +// Status returns the overall status of the agent. +func (s *Server) Status(_ context.Context, _ *proto.Empty) (*proto.StatusResponse, error) { + // not implemented + return &proto.StatusResponse{ + Status: proto.Status_HEALTHY, + Message: "not implemented", + Applications: nil, + }, nil +} + +// Restart performs re-exec. +func (s *Server) Restart(_ context.Context, _ *proto.Empty) (*proto.RestartResponse, error) { + // not implemented + return &proto.RestartResponse{ + Status: proto.ActionStatus_FAILURE, + Error: "not implemented", + }, nil +} + +// Upgrade performs the upgrade operation. +func (s *Server) Upgrade(ctx context.Context, request *proto.UpgradeRequest) (*proto.UpgradeResponse, error) { + // not implemented + return &proto.UpgradeResponse{ + Status: proto.ActionStatus_FAILURE, + Version: "", + Error: "not implemented", + }, nil +} diff --git a/x-pack/elastic-agent/pkg/agent/control/time.go b/x-pack/elastic-agent/pkg/agent/control/time.go new file mode 100644 index 00000000000..c87902bbc37 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/control/time.go @@ -0,0 +1,10 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package control + +// TimeFormat returns the time format shared between the protocol. +func TimeFormat() string { + return "2006-01-02 15:04:05 -0700 MST" +}