From fcefa2d70a5c9ae216cccb8b298be796bd2cd957 Mon Sep 17 00:00:00 2001 From: Dakota Paasman <122491662+dpaasman00@users.noreply.github.com> Date: Fri, 4 Oct 2024 09:12:26 -0400 Subject: [PATCH] [cmd/opampsupervisor] Add passthrough logging for collector (#35474) **Description:** Allow collector logs to passthrough to stdout instead of strictly being sent to a file. If configured to do so, the supervisor will capture collector output and log it using it's logger. This way, the supervisor should be configured to log to stdout if running in a containerized env. This PR follows closely with this [PR](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35468). Right now the supervisor exclusively logs to stdout, but under the assumption that it can be configured to log elsewhere, this change uses the supervisor logger rather than setting the collector's `exec.Cmd` to log to stdout and stderr. **Link to tracking Issue:** Closes #35473 **Testing:** I built a docker image of the supervisor and collector and ran it using the `containerized` param. This is a sample of what the output looked like: ``` 2024-09-30T12:42:47.472Z DEBUG commander/commander.go:73 Starting agent {"agent": "/collector/observiq-otel-collector"} 2024-09-30T12:42:47.472Z DEBUG commander/commander.go:163 Agent process started {"pid": 11} 2024-09-30T12:42:47.525Z INFO collector commander/commander.go:156 2024-09-30T12:42:47.525Z info service@v0.108.1/service.go:178 Setting up own telemetry... 2024-09-30T12:42:47.525Z INFO collector commander/commander.go:156 2024-09-30T12:42:47.525Z info service@v0.108.1/telemetry.go:98 Serving metrics {"address": ":8888", "metrics level": "Normal"} 2024-09-30T12:42:47.525Z INFO collector commander/commander.go:156 2024-09-30T12:42:47.525Z info service@v0.108.1/service.go:263 Starting observiq-otel-collector... {"Version": "v2.0.0", "NumCPU": 12} 2024-09-30T12:42:47.525Z INFO collector commander/commander.go:156 2024-09-30T12:42:47.525Z info extensions/extensions.go:38 Starting extensions... 2024-09-30T12:42:47.525Z INFO collector commander/commander.go:156 2024-09-30T12:42:47.525Z info extensions/extensions.go:41 Extension is starting... {"kind": "extension", "name": "opamp"} 2024-09-30T12:42:47.525Z INFO collector commander/commander.go:156 2024-09-30T12:42:47.525Z info extensions/extensions.go:58 Extension started. {"kind": "extension", "name": "opamp"} 2024-09-30T12:42:47.526Z INFO collector commander/commander.go:156 2024-09-30T12:42:47.526Z info service@v0.108.1/service.go:289 Everything is ready. Begin running and processing data. 2024-09-30T12:42:47.526Z INFO collector commander/commander.go:156 2024-09-30T12:42:47.526Z info localhostgate/featuregate.go:63 The default endpoints for all servers in components have changed to use localhost instead of 0.0.0.0. Disable the feature gate to temporarily revert to the previous default. {"feature gate ID": "component.UseLocalHostAsDefaultHost"} 2024-09-30T12:42:47.528Z DEBUG commander/commander.go:220 Stopping agent process {"pid": 11} ``` **Documentation:** --- ...ervisor-passthrough-collector-logging.yaml | 27 ++++++ cmd/opampsupervisor/e2e_test.go | 20 +++-- .../supervisor/commander/commander.go | 83 ++++++++++++++++--- .../supervisor/config/config.go | 2 + .../supervisor/supervisor_logging.yaml | 1 + 5 files changed, 111 insertions(+), 22 deletions(-) create mode 100644 .chloggen/supervisor-passthrough-collector-logging.yaml diff --git a/.chloggen/supervisor-passthrough-collector-logging.yaml b/.chloggen/supervisor-passthrough-collector-logging.yaml new file mode 100644 index 000000000000..073398436f17 --- /dev/null +++ b/.chloggen/supervisor-passthrough-collector-logging.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow collector logs to passthrough to supervisor output to facilitate running in a containerized environment. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35473] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index f48d9e25a8c3..1e5cd4e36507 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -1359,10 +1359,12 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) { } type LogEntry struct { - Level string `json:"level"` + Level string `json:"level"` + Logger string `json:"logger"` } -func TestSupervisorInfoLoggingLevel(t *testing.T) { +func TestSupervisorLogging(t *testing.T) { + // Tests that supervisor only logs at Info level and above && that collector logs passthrough and are present in supervisor log file if runtime.GOOS == "windows" { t.Skip("Zap does not close the log file and Windows disallows removing files that are still opened.") } @@ -1420,12 +1422,8 @@ func TestSupervisorInfoLoggingLevel(t *testing.T) { require.NoError(t, err) scanner := bufio.NewScanner(logFile) - check := false + seenCollectorLog := false for scanner.Scan() { - if !check { - check = true - } - line := scanner.Bytes() var log LogEntry err := json.Unmarshal(line, &log) @@ -1434,9 +1432,13 @@ func TestSupervisorInfoLoggingLevel(t *testing.T) { level, err := zapcore.ParseLevel(log.Level) require.NoError(t, err) require.GreaterOrEqual(t, level, zapcore.InfoLevel) + + if log.Logger == "collector" { + seenCollectorLog = true + } } - // verify at least 1 log was read - require.True(t, check) + // verify a collector log was read + require.True(t, seenCollectorLog) require.NoError(t, logFile.Close()) } diff --git a/cmd/opampsupervisor/supervisor/commander/commander.go b/cmd/opampsupervisor/supervisor/commander/commander.go index c9891d7b7d60..1a2e6973f0ec 100644 --- a/cmd/opampsupervisor/supervisor/commander/commander.go +++ b/cmd/opampsupervisor/supervisor/commander/commander.go @@ -4,6 +4,7 @@ package commander import ( + "bufio" "context" "errors" "fmt" @@ -71,15 +72,32 @@ func (c *Commander) Start(ctx context.Context) error { c.logger.Debug("Starting agent", zap.String("agent", c.cfg.Executable)) + c.cmd = exec.CommandContext(ctx, c.cfg.Executable, c.args...) // #nosec G204 + c.cmd.SysProcAttr = sysProcAttrs() + + // PassthroughLogging changes how collector start up happens + if c.cfg.PassthroughLogs { + return c.startWithPassthroughLogging() + } + return c.startNormal() +} + +func (c *Commander) Restart(ctx context.Context) error { + c.logger.Debug("Restarting agent", zap.String("agent", c.cfg.Executable)) + if err := c.Stop(ctx); err != nil { + return err + } + + return c.Start(ctx) +} + +func (c *Commander) startNormal() error { logFilePath := filepath.Join(c.logsDir, "agent.log") stdoutFile, err := os.Create(logFilePath) if err != nil { return fmt.Errorf("cannot create %s: %w", logFilePath, err) } - c.cmd = exec.CommandContext(ctx, c.cfg.Executable, c.args...) // #nosec G204 - c.cmd.SysProcAttr = sysProcAttrs() - // Capture standard output and standard error. // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21072 c.cmd.Stdout = stdoutFile @@ -87,29 +105,68 @@ func (c *Commander) Start(ctx context.Context) error { if err := c.cmd.Start(); err != nil { stdoutFile.Close() - return err + return fmt.Errorf("startNormal: %w", err) } c.logger.Debug("Agent process started", zap.Int("pid", c.cmd.Process.Pid)) c.running.Store(1) - go c.watch(stdoutFile) + go func() { + defer stdoutFile.Close() + c.watch() + }() return nil } -func (c *Commander) Restart(ctx context.Context) error { - c.logger.Debug("Restarting agent", zap.String("agent", c.cfg.Executable)) - if err := c.Stop(ctx); err != nil { - return err +func (c *Commander) startWithPassthroughLogging() error { + // grab cmd pipes + stdoutPipe, err := c.cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("stdoutPipe: %w", err) + } + stderrPipe, err := c.cmd.StderrPipe() + if err != nil { + return fmt.Errorf("stderrPipe: %w", err) } - return c.Start(ctx) -} + // start agent + if err := c.cmd.Start(); err != nil { + return fmt.Errorf("start: %w", err) + } + c.running.Store(1) -func (c *Commander) watch(stdoutFile *os.File) { - defer stdoutFile.Close() + colLogger := c.logger.Named("collector") + + // capture agent output + go func() { + scanner := bufio.NewScanner(stdoutPipe) + for scanner.Scan() { + line := scanner.Text() + colLogger.Info(line) + } + if err := scanner.Err(); err != nil { + c.logger.Error("Error reading agent stdout: %w", zap.Error(err)) + } + }() + go func() { + scanner := bufio.NewScanner(stderrPipe) + for scanner.Scan() { + line := scanner.Text() + colLogger.Info(line) + } + if err := scanner.Err(); err != nil { + c.logger.Error("Error reading agent stderr: %w", zap.Error(err)) + } + }() + + c.logger.Debug("Agent process started", zap.Int("pid", c.cmd.Process.Pid)) + + go c.watch() + return nil +} +func (c *Commander) watch() { err := c.cmd.Wait() // cmd.Wait returns an exec.ExitError when the Collector exits unsuccessfully or stops diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index 7d1275a37a30..5e6049dddbd8 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -155,6 +155,7 @@ type Agent struct { Description AgentDescription `mapstructure:"description"` BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"` HealthCheckPort int `mapstructure:"health_check_port"` + PassthroughLogs bool `mapstructure:"passthrough_logs"` } func (a Agent) Validate() error { @@ -229,6 +230,7 @@ func DefaultSupervisor() Supervisor { Agent: Agent{ OrphanDetectionInterval: 5 * time.Second, BootstrapTimeout: 3 * time.Second, + PassthroughLogs: false, }, Telemetry: Telemetry{ Logs: Logs{ diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml index dc6153dfb86c..161fda828730 100644 --- a/cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml @@ -14,6 +14,7 @@ storage: agent: executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} + passthrough_logs: true telemetry: logs: