diff --git a/.chloggen/supervisor-passthrough-collector-logging.yaml b/.chloggen/supervisor-passthrough-collector-logging.yaml new file mode 100644 index 000000000000..073398436f17 --- /dev/null +++ b/.chloggen/supervisor-passthrough-collector-logging.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow collector logs to passthrough to supervisor output to facilitate running in a containerized environment. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35473] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index f48d9e25a8c3..1e5cd4e36507 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -1359,10 +1359,12 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) { } type LogEntry struct { - Level string `json:"level"` + Level string `json:"level"` + Logger string `json:"logger"` } -func TestSupervisorInfoLoggingLevel(t *testing.T) { +func TestSupervisorLogging(t *testing.T) { + // Tests that supervisor only logs at Info level and above && that collector logs passthrough and are present in supervisor log file if runtime.GOOS == "windows" { t.Skip("Zap does not close the log file and Windows disallows removing files that are still opened.") } @@ -1420,12 +1422,8 @@ func TestSupervisorInfoLoggingLevel(t *testing.T) { require.NoError(t, err) scanner := bufio.NewScanner(logFile) - check := false + seenCollectorLog := false for scanner.Scan() { - if !check { - check = true - } - line := scanner.Bytes() var log LogEntry err := json.Unmarshal(line, &log) @@ -1434,9 +1432,13 @@ func TestSupervisorInfoLoggingLevel(t *testing.T) { level, err := zapcore.ParseLevel(log.Level) require.NoError(t, err) require.GreaterOrEqual(t, level, zapcore.InfoLevel) + + if log.Logger == "collector" { + seenCollectorLog = true + } } - // verify at least 1 log was read - require.True(t, check) + // verify a collector log was read + require.True(t, seenCollectorLog) require.NoError(t, logFile.Close()) } diff --git a/cmd/opampsupervisor/supervisor/commander/commander.go b/cmd/opampsupervisor/supervisor/commander/commander.go index c9891d7b7d60..1a2e6973f0ec 100644 --- a/cmd/opampsupervisor/supervisor/commander/commander.go +++ b/cmd/opampsupervisor/supervisor/commander/commander.go @@ -4,6 +4,7 @@ package commander import ( + "bufio" "context" "errors" "fmt" @@ -71,15 +72,32 @@ func (c *Commander) Start(ctx context.Context) error { c.logger.Debug("Starting agent", zap.String("agent", c.cfg.Executable)) + c.cmd = exec.CommandContext(ctx, c.cfg.Executable, c.args...) // #nosec G204 + c.cmd.SysProcAttr = sysProcAttrs() + + // PassthroughLogging changes how collector start up happens + if c.cfg.PassthroughLogs { + return c.startWithPassthroughLogging() + } + return c.startNormal() +} + +func (c *Commander) Restart(ctx context.Context) error { + c.logger.Debug("Restarting agent", zap.String("agent", c.cfg.Executable)) + if err := c.Stop(ctx); err != nil { + return err + } + + return c.Start(ctx) +} + +func (c *Commander) startNormal() error { logFilePath := filepath.Join(c.logsDir, "agent.log") stdoutFile, err := os.Create(logFilePath) if err != nil { return fmt.Errorf("cannot create %s: %w", logFilePath, err) } - c.cmd = exec.CommandContext(ctx, c.cfg.Executable, c.args...) // #nosec G204 - c.cmd.SysProcAttr = sysProcAttrs() - // Capture standard output and standard error. // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21072 c.cmd.Stdout = stdoutFile @@ -87,29 +105,68 @@ func (c *Commander) Start(ctx context.Context) error { if err := c.cmd.Start(); err != nil { stdoutFile.Close() - return err + return fmt.Errorf("startNormal: %w", err) } c.logger.Debug("Agent process started", zap.Int("pid", c.cmd.Process.Pid)) c.running.Store(1) - go c.watch(stdoutFile) + go func() { + defer stdoutFile.Close() + c.watch() + }() return nil } -func (c *Commander) Restart(ctx context.Context) error { - c.logger.Debug("Restarting agent", zap.String("agent", c.cfg.Executable)) - if err := c.Stop(ctx); err != nil { - return err +func (c *Commander) startWithPassthroughLogging() error { + // grab cmd pipes + stdoutPipe, err := c.cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("stdoutPipe: %w", err) + } + stderrPipe, err := c.cmd.StderrPipe() + if err != nil { + return fmt.Errorf("stderrPipe: %w", err) } - return c.Start(ctx) -} + // start agent + if err := c.cmd.Start(); err != nil { + return fmt.Errorf("start: %w", err) + } + c.running.Store(1) -func (c *Commander) watch(stdoutFile *os.File) { - defer stdoutFile.Close() + colLogger := c.logger.Named("collector") + + // capture agent output + go func() { + scanner := bufio.NewScanner(stdoutPipe) + for scanner.Scan() { + line := scanner.Text() + colLogger.Info(line) + } + if err := scanner.Err(); err != nil { + c.logger.Error("Error reading agent stdout: %w", zap.Error(err)) + } + }() + go func() { + scanner := bufio.NewScanner(stderrPipe) + for scanner.Scan() { + line := scanner.Text() + colLogger.Info(line) + } + if err := scanner.Err(); err != nil { + c.logger.Error("Error reading agent stderr: %w", zap.Error(err)) + } + }() + + c.logger.Debug("Agent process started", zap.Int("pid", c.cmd.Process.Pid)) + + go c.watch() + return nil +} +func (c *Commander) watch() { err := c.cmd.Wait() // cmd.Wait returns an exec.ExitError when the Collector exits unsuccessfully or stops diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index 7d1275a37a30..5e6049dddbd8 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -155,6 +155,7 @@ type Agent struct { Description AgentDescription `mapstructure:"description"` BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"` HealthCheckPort int `mapstructure:"health_check_port"` + PassthroughLogs bool `mapstructure:"passthrough_logs"` } func (a Agent) Validate() error { @@ -229,6 +230,7 @@ func DefaultSupervisor() Supervisor { Agent: Agent{ OrphanDetectionInterval: 5 * time.Second, BootstrapTimeout: 3 * time.Second, + PassthroughLogs: false, }, Telemetry: Telemetry{ Logs: Logs{ diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml index dc6153dfb86c..161fda828730 100644 --- a/cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml @@ -14,6 +14,7 @@ storage: agent: executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} + passthrough_logs: true telemetry: logs: