-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Elastic Agent] Add skeleton for client/server for agent control prot…
…ocol (#20163) * Add protocl to control Elastic Agent. * Fix CI with protoc. * Remove CI changes. * Start on the control server code. * More client/server work. * More work. * Add test. * Fix vet issues. * Fix permissions on unix socket. Add comment to Windows npipe.
- Loading branch information
1 parent
2abf87f
commit 77a8472
Showing
10 changed files
with
518 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
// +build !windows | ||
|
||
package control | ||
|
||
import ( | ||
"fmt" | ||
"path/filepath" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" | ||
) | ||
|
||
// Address returns the address to connect to Elastic Agent daemon. | ||
func Address() string { | ||
data := paths.Data() | ||
return fmt.Sprintf("unix://%s", filepath.Join(data, "agent.sock")) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
// +build windows | ||
|
||
package control | ||
|
||
import ( | ||
"crypto/sha256" | ||
"fmt" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" | ||
) | ||
|
||
// Address returns the address to connect to Elastic Agent daemon. | ||
func Address() string { | ||
data = paths.Data() | ||
// entire string cannot be longer than 256 characters, this forces the | ||
// length to always be 87 characters (but unique per data path) | ||
return fmt.Sprintf(`\\.\pipe\elastic-agent-%s`, sha256.Sum256(data)) | ||
} |
188 changes: 188 additions & 0 deletions
188
x-pack/elastic-agent/pkg/agent/control/client/client.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package client | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/proto" | ||
) | ||
|
||
// Status is the status of the Elastic Agent | ||
type Status = proto.Status | ||
|
||
const ( | ||
// Starting is when the it is still starting. | ||
Starting Status = proto.Status_STARTING | ||
// Configuring is when it is configuring. | ||
Configuring Status = proto.Status_CONFIGURING | ||
// Healthy is when it is healthy. | ||
Healthy Status = proto.Status_HEALTHY | ||
// Degraded is when it is degraded. | ||
Degraded Status = proto.Status_DEGRADED | ||
// Failed is when it is failed. | ||
Failed Status = proto.Status_FAILED | ||
// Stopping is when it is stopping. | ||
Stopping Status = proto.Status_STOPPING | ||
// Upgrading is when it is upgrading. | ||
Upgrading Status = proto.Status_UPGRADING | ||
) | ||
|
||
// Version is the current running version of the daemon. | ||
type Version struct { | ||
Version string | ||
Commit string | ||
BuildTime time.Time | ||
Snapshot bool | ||
} | ||
|
||
// ApplicationStatus is a status of an application inside of Elastic Agent. | ||
type ApplicationStatus struct { | ||
ID string | ||
Name string | ||
Status Status | ||
Message string | ||
Payload map[string]interface{} | ||
} | ||
|
||
// AgentStatus is the current status of the Elastic Agent. | ||
type AgentStatus struct { | ||
Status Status | ||
Message string | ||
Applications []*ApplicationStatus | ||
} | ||
|
||
// Client communicates to Elastic Agent through the control protocol. | ||
type Client interface { | ||
// Start starts the client. | ||
Start(ctx context.Context) error | ||
// Stop stops the client. | ||
Stop() | ||
// Version returns the current version of the running agent. | ||
Version(ctx context.Context) (Version, error) | ||
// Status returns the current status of the running agent. | ||
Status(ctx context.Context) (*AgentStatus, error) | ||
// Restart triggers restarting the current running daemon. | ||
Restart(ctx context.Context) error | ||
// Upgrade triggers upgrade of the current running daemon. | ||
Upgrade(ctx context.Context, version string, sourceURI string) (string, error) | ||
} | ||
|
||
// client manages the state and communication to the Elastic Agent. | ||
type client struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
wg sync.WaitGroup | ||
client proto.ElasticAgentClient | ||
cfgLock sync.RWMutex | ||
obsLock sync.RWMutex | ||
} | ||
|
||
// New creates a client connection to Elastic Agent. | ||
func New() Client { | ||
return &client{} | ||
} | ||
|
||
// Start starts the connection to Elastic Agent. | ||
func (c *client) Start(ctx context.Context) error { | ||
c.ctx, c.cancel = context.WithCancel(ctx) | ||
conn, err := dialContext(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
c.client = proto.NewElasticAgentClient(conn) | ||
return nil | ||
} | ||
|
||
// Stop stops the connection to Elastic Agent. | ||
func (c *client) Stop() { | ||
if c.cancel != nil { | ||
c.cancel() | ||
c.wg.Wait() | ||
c.ctx = nil | ||
c.cancel = nil | ||
} | ||
} | ||
|
||
// Version returns the current version of the running agent. | ||
func (c *client) Version(ctx context.Context) (Version, error) { | ||
res, err := c.client.Version(ctx, &proto.Empty{}) | ||
if err != nil { | ||
return Version{}, err | ||
} | ||
bt, err := time.Parse(control.TimeFormat(), res.BuildTime) | ||
if err != nil { | ||
return Version{}, err | ||
} | ||
return Version{ | ||
Version: res.Version, | ||
Commit: res.Commit, | ||
BuildTime: bt, | ||
Snapshot: res.Snapshot, | ||
}, nil | ||
} | ||
|
||
// Status returns the current status of the running agent. | ||
func (c *client) Status(ctx context.Context) (*AgentStatus, error) { | ||
res, err := c.client.Status(ctx, &proto.Empty{}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
s := &AgentStatus{ | ||
Status: res.Status, | ||
Message: res.Message, | ||
Applications: make([]*ApplicationStatus, len(res.Applications)), | ||
} | ||
for i, appRes := range res.Applications { | ||
var payload map[string]interface{} | ||
if appRes.Payload != "" { | ||
err := json.Unmarshal([]byte(appRes.Payload), &payload) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
s.Applications[i] = &ApplicationStatus{ | ||
ID: appRes.Id, | ||
Name: appRes.Name, | ||
Status: appRes.Status, | ||
Message: appRes.Message, | ||
Payload: payload, | ||
} | ||
} | ||
return s, nil | ||
} | ||
|
||
// Restart triggers restarting the current running daemon. | ||
func (c *client) Restart(ctx context.Context) error { | ||
res, err := c.client.Restart(ctx, &proto.Empty{}) | ||
if err != nil { | ||
return err | ||
} | ||
if res.Status == proto.ActionStatus_FAILURE { | ||
return fmt.Errorf(res.Error) | ||
} | ||
return nil | ||
} | ||
|
||
// Upgrade triggers upgrade of the current running daemon. | ||
func (c *client) Upgrade(ctx context.Context, version string, sourceURI string) (string, error) { | ||
res, err := c.client.Upgrade(ctx, &proto.UpgradeRequest{ | ||
Version: version, | ||
SourceURI: sourceURI, | ||
}) | ||
if err != nil { | ||
return "", err | ||
} | ||
if res.Status == proto.ActionStatus_FAILURE { | ||
return "", fmt.Errorf(res.Error) | ||
} | ||
return res.Version, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
// +build !windows | ||
|
||
package client | ||
|
||
import ( | ||
"context" | ||
"net" | ||
"strings" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" | ||
|
||
"google.golang.org/grpc" | ||
) | ||
|
||
func dialContext(ctx context.Context) (*grpc.ClientConn, error) { | ||
return grpc.DialContext(ctx, strings.TrimPrefix(control.Address(), "unix://"), grpc.WithInsecure(), grpc.WithContextDialer(dialer)) | ||
} | ||
|
||
func dialer(ctx context.Context, addr string) (net.Conn, error) { | ||
var d net.Dialer | ||
return d.DialContext(ctx, "unix", addr) | ||
} |
26 changes: 26 additions & 0 deletions
26
x-pack/elastic-agent/pkg/agent/control/client/dial_windows.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
// +build windows | ||
|
||
package client | ||
|
||
import ( | ||
"context" | ||
"net" | ||
|
||
"google.golang.org/grpc" | ||
|
||
"github.com/elastic/beats/v7/libbeat/api/npipe" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" | ||
) | ||
|
||
func dialContext(ctx context.Context) (*grpc.ClientConn, error) { | ||
return grpc.DialContext(ctx, control.Address(), grpc.WithInsecure(), grpc.WithContextDialer(dialer)) | ||
} | ||
|
||
func dialer(ctx context.Context, addr string) (net.Conn, error) { | ||
return npipe.DialContext(arr)(ctx, "", "") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
package control_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/elastic/beats/v7/libbeat/logp" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" | ||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" | ||
) | ||
|
||
func TestServerClient_Version(t *testing.T) { | ||
srv := server.New(newErrorLogger(t)) | ||
err := srv.Start() | ||
require.NoError(t, err) | ||
defer srv.Stop() | ||
|
||
c := client.New() | ||
err = c.Start(context.Background()) | ||
require.NoError(t, err) | ||
defer c.Stop() | ||
|
||
ver, err := c.Version(context.Background()) | ||
require.NoError(t, err) | ||
|
||
assert.Equal(t, client.Version{ | ||
Version: release.Version(), | ||
Commit: release.Commit(), | ||
BuildTime: release.BuildTime(), | ||
Snapshot: release.Snapshot(), | ||
}, ver) | ||
} | ||
|
||
func newErrorLogger(t *testing.T) *logger.Logger { | ||
t.Helper() | ||
|
||
loggerCfg := logger.DefaultLoggingConfig() | ||
loggerCfg.Level = logp.ErrorLevel | ||
|
||
log, err := logger.NewFromConfig("", loggerCfg) | ||
require.NoError(t, err) | ||
return log | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
// or more contributor license agreements. Licensed under the Elastic License; | ||
// you may not use this file except in compliance with the Elastic License. | ||
|
||
// +build !windows | ||
|
||
package server | ||
|
||
import ( | ||
"net" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
|
||
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control" | ||
) | ||
|
||
func createListener() (net.Listener, error) { | ||
path := strings.TrimPrefix(control.Address(), "unix://") | ||
dir := filepath.Dir(path) | ||
if _, err := os.Stat(dir); os.IsNotExist(err) { | ||
err = os.MkdirAll(dir, 0755) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
lis, err := net.Listen("unix", path) | ||
if err != nil { | ||
return nil, err | ||
} | ||
err = os.Chmod(path, 0700) | ||
if err != nil { | ||
// failed to set permissions (close listener) | ||
lis.Close() | ||
return nil, err | ||
} | ||
return lis, err | ||
} |
Oops, something went wrong.