From 15c4be3c327fc9298d3373d6887d0b8abb1ec1ba Mon Sep 17 00:00:00 2001 From: odubajDT Date: Thu, 8 Aug 2024 11:02:08 +0200 Subject: [PATCH] [chore]: split NewSupervisor logic into two separate functions Signed-off-by: odubajDT --- cmd/opampsupervisor/e2e_test.go | 34 ++++++++++++++++++++ cmd/opampsupervisor/main.go | 7 ++++ cmd/opampsupervisor/supervisor/supervisor.go | 20 +++++++----- 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 690298d19e8c..c02ab346f455 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -210,6 +210,8 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) { }) s := newSupervisor(t, "basic", map[string]string{"url": server.addr}) + + require.Nil(t, s.Start()) defer s.Shutdown() waitForSupervisorConnection(server.supervisorConnected, true) @@ -281,6 +283,8 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) { "url": server.addr, "storage_dir": storageDir, }) + + require.Nil(t, s.Start()) defer s.Shutdown() // Verify the collector runs eventually by pinging the healthcheck extension @@ -332,6 +336,8 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) { s := newSupervisor(t, "basic", map[string]string{ "url": server.addr, }) + + require.Nil(t, s.Start()) defer s.Shutdown() // Verify the collector is running by checking the metrics endpoint @@ -416,6 +422,8 @@ func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) { }) s := newSupervisor(t, "basic", map[string]string{"url": server.addr}) + + require.Nil(t, s.Start()) defer s.Shutdown() waitForSupervisorConnection(server.supervisorConnected, true) @@ -492,6 +500,8 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) { }) s := newSupervisor(t, "nocap", map[string]string{"url": server.addr}) + + require.Nil(t, s.Start()) defer s.Shutdown() waitForSupervisorConnection(server.supervisorConnected, true) @@ -547,6 +557,8 @@ func TestSupervisorBootstrapsCollector(t *testing.T) { }) s := newSupervisor(t, "nocap", map[string]string{"url": server.addr}) + + require.Nil(t, s.Start()) defer s.Shutdown() waitForSupervisorConnection(server.supervisorConnected, true) @@ -594,6 +606,8 @@ func TestSupervisorReportsEffectiveConfig(t *testing.T) { }) s := newSupervisor(t, "basic", map[string]string{"url": server.addr}) + + require.Nil(t, s.Start()) defer s.Shutdown() waitForSupervisorConnection(server.supervisorConnected, true) @@ -701,6 +715,8 @@ func TestSupervisorAgentDescriptionConfigApplies(t *testing.T) { }) s := newSupervisor(t, "agent_description", map[string]string{"url": server.addr}) + + require.Nil(t, s.Start()) defer s.Shutdown() waitForSupervisorConnection(server.supervisorConnected, true) @@ -852,6 +868,8 @@ func TestSupervisorRestartCommand(t *testing.T) { }) s := newSupervisor(t, "basic", map[string]string{"url": server.addr}) + + require.Nil(t, s.Start()) defer s.Shutdown() waitForSupervisorConnection(server.supervisorConnected, true) @@ -918,6 +936,8 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) { server.ConnectionCallbacksStruct{}) s := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr}) + + require.Nil(t, s.Start()) defer s.Shutdown() waitForSupervisorConnection(initialServer.supervisorConnected, true) @@ -978,6 +998,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) { s := newSupervisor(t, "persistence", map[string]string{"url": initialServer.addr, "storage_dir": tempDir}) + require.Nil(t, s.Start()) + waitForSupervisorConnection(initialServer.supervisorConnected, true) cfg, hash, _, _ := createSimplePipelineCollectorConf(t) @@ -1020,6 +1042,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) { defer newServer.shutdown() s1 := newSupervisor(t, "persistence", map[string]string{"url": newServer.addr, "storage_dir": tempDir}) + + require.Nil(t, s1.Start()) defer s1.Shutdown() waitForSupervisorConnection(newServer.supervisorConnected, true) @@ -1066,6 +1090,8 @@ func TestSupervisorPersistsInstanceID(t *testing.T) { "storage_dir": storageDir, }) + require.Nil(t, s.Start()) + waitForSupervisorConnection(server.supervisorConnected, true) t.Logf("Supervisor connected") @@ -1095,6 +1121,8 @@ func TestSupervisorPersistsInstanceID(t *testing.T) { "url": server.addr, "storage_dir": storageDir, }) + + require.Nil(t, s.Start()) defer s.Shutdown() waitForSupervisorConnection(server.supervisorConnected, true) @@ -1148,6 +1176,8 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) { "storage_dir": storageDir, }) + require.Nil(t, s.Start()) + waitForSupervisorConnection(server.supervisorConnected, true) t.Logf("Supervisor connected") @@ -1175,6 +1205,8 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) { "url": server.addr, "storage_dir": storageDir, }) + + require.Nil(t, s.Start()) defer s.Shutdown() waitForSupervisorConnection(server.supervisorConnected, true) @@ -1206,6 +1238,8 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) { "storage_dir": storageDir, }) + require.Nil(t, s.Start()) + waitForSupervisorConnection(server.supervisorConnected, true) t.Logf("Supervisor connected") diff --git a/cmd/opampsupervisor/main.go b/cmd/opampsupervisor/main.go index b93579842c09..3e58ee4730a7 100644 --- a/cmd/opampsupervisor/main.go +++ b/cmd/opampsupervisor/main.go @@ -26,6 +26,13 @@ func main() { return } + err = supervisor.Start() + if err != nil { + logger.Error(err.Error()) + os.Exit(-1) + return + } + interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) <-interrupt diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index b5a8eb70da10..1d7150f6b2dd 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -165,34 +165,38 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return nil, fmt.Errorf("error creating storage dir: %w", err) } + return s, nil +} + +func (s *Supervisor) Start() error { var err error s.persistentState, err = loadOrCreatePersistentState(s.persistentStateFilePath()) if err != nil { - return nil, err + return err } if err = s.getBootstrapInfo(); err != nil { - return nil, fmt.Errorf("could not get bootstrap info from the Collector: %w", err) + return fmt.Errorf("could not get bootstrap info from the Collector: %w", err) } healthCheckPort, err := s.findRandomPort() if err != nil { - return nil, fmt.Errorf("could not find port for health check: %w", err) + return fmt.Errorf("could not find port for health check: %w", err) } s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", healthCheckPort) - logger.Debug("Supervisor starting", + s.logger.Debug("Supervisor starting", zap.String("id", s.persistentState.InstanceID.String())) err = s.loadAndWriteInitialMergedConfig() if err != nil { - return nil, fmt.Errorf("failed loading initial config: %w", err) + return fmt.Errorf("failed loading initial config: %w", err) } if err = s.startOpAMP(); err != nil { - return nil, fmt.Errorf("cannot start OpAMP client: %w", err) + return fmt.Errorf("cannot start OpAMP client: %w", err) } s.commander, err = commander.NewCommander( @@ -202,7 +206,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { "--config", s.agentConfigFilePath(), ) if err != nil { - return nil, err + return err } s.startHealthCheckTicker() @@ -219,7 +223,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { s.forwardCustomMessagesToServerLoop() }() - return s, nil + return nil } func (s *Supervisor) createTemplates() error {