Skip to content

Commit

Permalink
e2e logging test
Browse files Browse the repository at this point in the history
  • Loading branch information
dpaasman00 committed Oct 3, 2024
1 parent 7d977cf commit 0409490
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 26 deletions.
78 changes: 76 additions & 2 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build e2e

package main

import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -40,10 +40,12 @@ import (
"github.com/stretchr/testify/require"
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/telemetry"
)

var _ clientTypes.Logger = testLogger{}
Expand Down Expand Up @@ -1354,6 +1356,78 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {

}

type LogEntry struct {
Level string `json:"level"`
}

func TestSupervisorInfoLoggingLevel(t *testing.T) {
storageDir := t.TempDir()
remoteCfgFilePath := filepath.Join(storageDir, "last_recv_remote_config.dat")

collectorCfg, hash, _, _ := createSimplePipelineCollectorConf(t)
remoteCfgProto := &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: collectorCfg.Bytes()},
},
},
ConfigHash: hash,
}
marshalledRemoteCfg, err := proto.Marshal(remoteCfgProto)
require.NoError(t, err)
require.NoError(t, os.WriteFile(remoteCfgFilePath, marshalledRemoteCfg, 0600))

connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
})
defer server.shutdown()

supervisorLogFilePath := filepath.Join(storageDir, "supervisor_log.log")
cfgFile := getSupervisorConfig(t, "logging", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
"log_level": "0",
"log_file": supervisorLogFilePath,
})

cfg, err := config.Load(cfgFile.Name())
require.NoError(t, err)
logger, err := telemetry.NewLogger(cfg.Telemetry.Logs)
require.NoError(t, err)

s, err := supervisor.NewSupervisor(logger, cfg)
require.NoError(t, err)
require.Nil(t, s.Start())

// Start the server and wait for the supervisor to connect
server.start()
waitForSupervisorConnection(server.supervisorConnected, true)
require.True(t, connected.Load(), "Supervisor failed to connect")

s.Shutdown()

// Read from log file checking for Info level logs
logFile, err := os.Open(supervisorLogFilePath)
require.NoError(t, err)
defer logFile.Close()

scanner := bufio.NewScanner(logFile)

for scanner.Scan() {
line := scanner.Bytes()
var log LogEntry
err := json.Unmarshal(line, &log)
require.NoError(t, err)

level, err := zapcore.ParseLevel(log.Level)
require.NoError(t, err)
require.GreaterOrEqual(t, level, zapcore.InfoLevel)
}
}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

Expand Down
22 changes: 0 additions & 22 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,25 +237,3 @@ func DefaultSupervisor() Supervisor {
},
}
}

func LoadConfig(configFile string) (Supervisor, error) {
if configFile == "" {
return Supervisor{}, errors.New("path to config file cannot be empty")
}

k := koanf.New("::")
if err := k.Load(file.Provider(configFile), yaml.Parser()); err != nil {
return Supervisor{}, err
}

decodeConf := koanf.UnmarshalConf{
Tag: "mapstructure",
}

cfg := DefaultSupervisor()
if err := k.UnmarshalWithConf("", &cfg, decodeConf); err != nil {
return Supervisor{}, fmt.Errorf("cannot parse %v: %w", configFile, err)
}

return cfg, nil
}
3 changes: 1 addition & 2 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ type Supervisor struct {

func NewSupervisor(logger *zap.Logger, cfg config.Supervisor) (*Supervisor, error) {
s := &Supervisor{
config: cfg,
logger: logger,
pidProvider: defaultPIDProvider{},
hasNewConfig: make(chan struct{}, 1),
Expand Down Expand Up @@ -200,7 +199,7 @@ func (s *Supervisor) Start() error {

s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", healthCheckPort)

s.logger.Debug("Supervisor starting",
s.logger.Info("Supervisor starting",
zap.String("id", s.persistentState.InstanceID.String()))

err = s.loadAndWriteInitialMergedConfig()
Expand Down
21 changes: 21 additions & 0 deletions cmd/opampsupervisor/testdata/supervisor/supervisor_logging.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
server:
endpoint: ws://{{.url}}/v1/opamp

capabilities:
reports_effective_config: true
reports_own_metrics: true
reports_health: true
accepts_remote_config: true
reports_remote_config: true
accepts_restart_command: true

storage:
directory: '{{.storage_dir}}'

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}

telemetry:
logs:
level: {{.log_level}} # info level logs
output_paths: ['{{.log_file}}']

0 comments on commit 0409490

Please sign in to comment.