Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix diagnostics failures on max message size limit #1777

Merged
merged 10 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions _meta/config/common.p2.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ inputs:
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789
# # max_message_size limits the message size in agent internal communication
# # default is 100MB
# max_message_size: 104857600

# agent.retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
3 changes: 3 additions & 0 deletions _meta/config/common.reference.p2.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ inputs:
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789
# # max_message_size limits the message size in agent internal communication
# # default is 100MB
# max_message_size: 104857600

# agent.retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
3 changes: 3 additions & 0 deletions _meta/config/elastic-agent.docker.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ inputs:
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789
# # max_message_size limits the message size in agent internal communication
# # default is 100MB
# max_message_size: 104857600

# agent.retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
2 changes: 1 addition & 1 deletion control.proto
Original file line number Diff line number Diff line change
Expand Up @@ -244,5 +244,5 @@ service ElasticAgentControl {
rpc DiagnosticAgent(DiagnosticAgentRequest) returns (DiagnosticAgentResponse);

// Gather diagnostic information for the running units.
rpc DiagnosticUnits(DiagnosticUnitsRequest) returns (DiagnosticUnitsResponse);
rpc DiagnosticUnits(DiagnosticUnitsRequest) returns (stream DiagnosticUnitResponse);
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
}
3 changes: 3 additions & 0 deletions elastic-agent.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ inputs:
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789
# # max_message_size limits the message size in agent internal communication
# # default is 100MB
# max_message_size: 104857600

# agent.retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
3 changes: 3 additions & 0 deletions elastic-agent.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ inputs:
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789
# # max_message_size limits the message size in agent internal communication
# # default is 100MB
# max_message_size: 104857600

# agent.retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
3 changes: 3 additions & 0 deletions elastic-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ inputs:
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789
# # max_message_size limits the message size in agent internal communication
# # default is 100MB
# max_message_size: 104857600

# agent.retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func New(
upgrader := upgrade.NewUpgrader(log, cfg.Settings.DownloadConfig, agentInfo)
monitor := monitoring.New(cfg.Settings.V1MonitoringEnabled, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, agentInfo)

runtime, err := runtime.NewManager(log, cfg.Settings.GRPC.String(), agentInfo, tracer, monitor)
runtime, err := runtime.NewManager(log, cfg.Settings.GRPC.String(), agentInfo, tracer, monitor, cfg.Settings.GRPC)
if err != nil {
return nil, fmt.Errorf("failed to initialize runtime manager: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error {

diagHooks := diagnostics.GlobalHooks()
diagHooks = append(diagHooks, coord.DiagnosticHooks()...)
control := server.New(logger.Named("control"), agentInfo, coord, tracer, diagHooks)
control := server.New(logger.Named("control"), agentInfo, coord, tracer, diagHooks, cfg.Settings.GRPC)
// start the control listener
if err := control.Start(); err != nil {
return err
Expand Down Expand Up @@ -395,7 +395,7 @@ func initTracer(agentName, version string, mcfg *monitoringCfg.MonitoringConfig)

cfg := mcfg.APM

// nolint:godox // the TODO is intentional
//nolint:godox // the TODO is intentional
// TODO(stn): Ideally, we'd use apmtransport.NewHTTPTransportOptions()
// but it doesn't exist today. Update this code once we have something
// available via the APM Go agent.
Expand Down
10 changes: 6 additions & 4 deletions internal/pkg/agent/configuration/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import "fmt"

// GRPCConfig is a configuration of GRPC server.
type GRPCConfig struct {
Address string `config:"address"`
Port uint16 `config:"port"`
Address string `config:"address"`
Port uint16 `config:"port"`
MaxMsgSize int `config:"max_message_size"`
}

// DefaultGRPCConfig creates a default server configuration.
func DefaultGRPCConfig() *GRPCConfig {
return &GRPCConfig{
Address: "localhost",
Port: 6789,
Address: "localhost",
Port: 6789,
MaxMsgSize: 1024 * 1024 * 100, // grpc default 4MB is unsufficient for diagnostics
}
}

Expand Down
40 changes: 30 additions & 10 deletions internal/pkg/agent/control/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"sync"
"time"

"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/control"
"github.com/elastic/elastic-agent/internal/pkg/agent/control/cproto"
)
Expand Down Expand Up @@ -152,21 +154,29 @@ type Client interface {

// client manages the state and communication to the Elastic Agent.
type client struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client cproto.ElasticAgentControlClient
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client cproto.ElasticAgentControlClient
grpcConfig *configuration.GRPCConfig
}

// New creates a client connection to Elastic Agent.
// New creates a client connection to Elastic Agent. It uses default grpc configuration for client initialization.
func New() Client {
return &client{}
return NewWithConfig(configuration.DefaultGRPCConfig())
}

// NewWithConfig creates a client connection to Elastic Agent.
func NewWithConfig(grpcConfig *configuration.GRPCConfig) Client {
return &client{
grpcConfig: grpcConfig,
}
}

// Connect connects to the running Elastic Agent.
func (c *client) Connect(ctx context.Context) error {
c.ctx, c.cancel = context.WithCancel(ctx)
conn, err := dialContext(ctx)
conn, err := dialContext(ctx, c.grpcConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -316,13 +326,22 @@ func (c *client) DiagnosticUnits(ctx context.Context, units ...DiagnosticUnitReq
})
}

resp, err := c.client.DiagnosticUnits(ctx, &cproto.DiagnosticUnitsRequest{Units: reqs})
respStream, err := c.client.DiagnosticUnits(ctx, &cproto.DiagnosticUnitsRequest{Units: reqs})
if err != nil {
return nil, err
}

results := make([]DiagnosticUnitResult, 0, len(resp.Units))
for _, u := range resp.Units {
results := make([]DiagnosticUnitResult, 0)
for {
var u *cproto.DiagnosticUnitResponse
u, err = respStream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, fmt.Errorf("failed to retrieve unit diagnostics: %w", err)
}

files := make([]DiagnosticFileResult, 0, len(u.Results))
for _, f := range u.Results {
files = append(files, DiagnosticFileResult{
Expand All @@ -346,5 +365,6 @@ func (c *client) DiagnosticUnits(ctx context.Context, units ...DiagnosticUnitReq
Results: files,
})
}

return results, nil
}
12 changes: 10 additions & 2 deletions internal/pkg/agent/control/client/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@ import (
"net"
"strings"

"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/control"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func dialContext(ctx context.Context) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, strings.TrimPrefix(control.Address(), "unix://"), grpc.WithInsecure(), grpc.WithContextDialer(dialer))
func dialContext(ctx context.Context, grpcConfig *configuration.GRPCConfig) (*grpc.ClientConn, error) {
return grpc.DialContext(
ctx,
strings.TrimPrefix(control.Address(), "unix://"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialer),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcConfig.MaxMsgSize)),
)
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
Expand Down
12 changes: 10 additions & 2 deletions internal/pkg/agent/control/client/dial_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@ import (
"net"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/elastic/elastic-agent-libs/api/npipe"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/control"
)

func dialContext(ctx context.Context) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, control.Address(), grpc.WithInsecure(), grpc.WithContextDialer(dialer))
func dialContext(ctx context.Context, grpcConfig *configuration.GRPCConfig) (*grpc.ClientConn, error) {
return grpc.DialContext(
ctx,
control.Address(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialer),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcConfig.MaxMsgSize)),
)
}

func dialer(ctx context.Context, addr string) (net.Conn, error) {
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/agent/control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/configuration"
"github.com/elastic/elastic-agent/internal/pkg/agent/control/client"
"github.com/elastic/elastic-agent/internal/pkg/agent/control/server"
"github.com/elastic/elastic-agent/internal/pkg/release"
"github.com/elastic/elastic-agent/pkg/core/logger"
)

func TestServerClient_Version(t *testing.T) {
srv := server.New(newErrorLogger(t), nil, nil, apmtest.DiscardTracer, nil)
srv := server.New(newErrorLogger(t), nil, nil, apmtest.DiscardTracer, nil, configuration.DefaultGRPCConfig())
err := srv.Start()
require.NoError(t, err)
defer srv.Stop()
Expand Down
25 changes: 12 additions & 13 deletions internal/pkg/agent/control/cproto/control.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading