Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opampsupervisor]: Skip executing the collector if no config is provided #35430

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .chloggen/feat_opampsupervisor-start-stop-empty-confmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Skip executing the collector if no config is provided

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33680]
107 changes: 93 additions & 14 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,20 +345,10 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {
require.Nil(t, s.Start())
defer s.Shutdown()

// Verify the collector is running by checking the metrics endpoint
require.Eventually(t, func() bool {
resp, err := http.DefaultClient.Get("http://localhost:12345")
if err != nil {
t.Logf("Failed agent healthcheck request: %s", err)
return false
}
require.NoError(t, resp.Body.Close())
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.Logf("Got non-2xx status code: %d", resp.StatusCode)
return false
}
return true
}, 3*time.Second, 100*time.Millisecond)
// Verify the collector is not running after 250 ms by checking the healthcheck endpoint
time.Sleep(250 * time.Millisecond)
_, err := http.DefaultClient.Get("http://localhost:12345")
require.ErrorContains(t, err, "connection refused")

// Start the server and wait for the supervisor to connect
server.start()
Expand Down Expand Up @@ -1262,6 +1252,95 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) {
require.FileExists(t, filepath.Join(storageDir, "effective.yaml"))
}

func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
agentCfgChan := make(chan string, 1)
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
select {
case agentCfgChan <- string(config.Body):
default:
}
}
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "healthcheck_port", map[string]string{
"url": server.addr,
"healthcheck_port": "12345",
})

require.Nil(t, s.Start())
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

cfg, hash, _, _ := createSimplePipelineCollectorConf(t)

server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": {Body: cfg.Bytes()},
},
},
ConfigHash: hash,
},
})

select {
case <-agentCfgChan:
case <-time.After(1 * time.Second):
require.FailNow(t, "timed out waitng for agent to report its initial config")
}

// Use health check endpoint to determine if the collector is actually running
require.Eventually(t, func() bool {
resp, err := http.DefaultClient.Get("http://localhost:12345")
if err != nil {
t.Logf("Failed agent healthcheck request: %s", err)
return false
}
require.NoError(t, resp.Body.Close())
if resp.StatusCode >= 300 || resp.StatusCode < 200 {
t.Logf("Got non-2xx status code: %d", resp.StatusCode)
return false
}
return true
}, 3*time.Second, 100*time.Millisecond)

// Send empty config
emptyHash := sha256.Sum256([]byte{})
server.sendToSupervisor(&protobufs.ServerToAgent{
RemoteConfig: &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{},
},
ConfigHash: emptyHash[:],
},
})

select {
case <-agentCfgChan:
case <-time.After(1 * time.Second):
require.FailNow(t, "timed out waitng for agent to report its noop config")
}

// Verify the collector is not running after 250 ms by checking the healthcheck endpoint
time.Sleep(250 * time.Millisecond)
_, err := http.DefaultClient.Get("http://localhost:12345")
require.ErrorContains(t, err, "connection refused")

}

func findRandomPort() (int, error) {
l, err := net.Listen("tcp", "localhost:0")

Expand Down
20 changes: 12 additions & 8 deletions cmd/opampsupervisor/specification/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ agent:
### Operation When OpAMP Server is Unavailable

When the supervisor cannot connect to the OpAMP server, the collector will
be run with the last known configuration, or with a "noop" configuration
if no previous configuration is persisted. The supervisor will continually
attempt to reconnect to the OpAMP server with exponential backoff.
be run with the last known configuration if a previous configuration is persisted.
If no previous configuration has been persisted, the collector does not run.
The supervisor will continually attempt to reconnect to the OpAMP server with exponential backoff.

### Executing Collector

Expand Down Expand Up @@ -204,6 +204,10 @@ Configuration*](https://github.com/open-telemetry/opamp-spec/blob/main/specifica
from the OpAMP Backend, merges it with an optional local config file and
writes it to the Collector's config file, then restarts the Collector.

If the remote configuration from the OpAMP Backend contains an empty config map,
the collector will be stopped and will not be run again until a non-empty config map
is received from the OpAMP Backend.

In the future once config file watching is implemented the Collector can
reload the config without the need for the Supervisor to restart the
Collector process.
Expand Down Expand Up @@ -244,13 +248,13 @@ configuration.
To overcome this problem the Supervisor starts the Collector with an
"noop" configuration that collects nothing but allows the opamp
extension to be started. The "noop" configuration is a single pipeline
with an OTLP receiver that listens on a random port and a debug
exporter, and the opamp extension. The purpose of the "noop"
configuration is to make sure the Collector starts and the opamp
extension communicates with the Supervisor.
with an nop receiver, a nop exporter, and the opamp extension.
The purpose of the "noop" configuration is to make sure the Collector starts
and the opamp extension communicates with the Supervisor. The Collector is stopped
after the AgentDescription is received from the Collector.

Once the initial Collector launch is successful and the remote
configuration is received by the Supervisor the Supervisor restarts the
configuration is received by the Supervisor the Supervisor starts the
Collector with the new config. The new config is also cached by the
Supervisor in a local file, so that subsequent restarts no longer need
to start the Collector using the "noop" configuration. Caching of the
Expand Down
46 changes: 35 additions & 11 deletions cmd/opampsupervisor/supervisor/supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ const (

const maxBufferedCustomMessages = 10

type configState struct {
// Supervisor-assembled config to be given to the Collector.
mergedConfig string
// true if the server provided configmap was empty
emptyConfigMap bool
}

func (c *configState) equal(other *configState) bool {
return other.mergedConfig == c.mergedConfig && other.emptyConfigMap == c.emptyConfigMap
}

// Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient
// to work with an OpAMP Server.
type Supervisor struct {
Expand Down Expand Up @@ -106,8 +117,8 @@ type Supervisor struct {
// will listen on for health check requests from the Supervisor.
agentHealthCheckEndpoint string

// Supervisor-assembled config to be given to the Collector.
mergedConfig *atomic.Value
// Internal config state for agent use. See the configState struct for more details.
cfgState *atomic.Value

// Final effective config of the Collector.
effectiveConfig *atomic.Value
Expand Down Expand Up @@ -142,7 +153,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) {
pidProvider: defaultPIDProvider{},
hasNewConfig: make(chan struct{}, 1),
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentDescription: &atomic.Value{},
doneChan: make(chan struct{}),
Expand Down Expand Up @@ -802,8 +813,8 @@ func (s *Supervisor) loadAndWriteInitialMergedConfig() error {
}

// write the initial merged config to disk
cfg := s.mergedConfig.Load().(string)
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfg), 0600); err != nil {
cfgState := s.cfgState.Load().(*configState)
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0600); err != nil {
s.logger.Error("Failed to write agent config.", zap.Error(err))
}

Expand All @@ -815,9 +826,11 @@ func (s *Supervisor) loadAndWriteInitialMergedConfig() error {
func (s *Supervisor) createEffectiveConfigMsg() *protobufs.EffectiveConfig {
cfgStr, ok := s.effectiveConfig.Load().(string)
if !ok {
cfgStr, ok = s.mergedConfig.Load().(string)
cfgState, ok := s.cfgState.Load().(*configState)
if !ok {
cfgStr = ""
} else {
cfgStr = cfgState.mergedConfig
}
}

Expand Down Expand Up @@ -948,11 +961,16 @@ func (s *Supervisor) composeMergedConfig(config *protobufs.AgentRemoteConfig) (c
}

// Check if supervisor's merged config is changed.
newMergedConfig := string(newMergedConfigBytes)

newConfigState := &configState{
mergedConfig: string(newMergedConfigBytes),
emptyConfigMap: len(config.GetConfig().GetConfigMap()) == 0,
BinaryFissionGames marked this conversation as resolved.
Show resolved Hide resolved
}

configChanged = false

oldConfig := s.mergedConfig.Swap(newMergedConfig)
if oldConfig == nil || oldConfig.(string) != newMergedConfig {
oldConfigState := s.cfgState.Swap(newConfigState)
if oldConfigState == nil || !oldConfigState.(*configState).equal(newConfigState) {
s.logger.Debug("Merged config changed.")
configChanged = true
}
Expand All @@ -972,6 +990,12 @@ func (s *Supervisor) handleRestartCommand() error {
}

func (s *Supervisor) startAgent() {
if s.cfgState.Load().(*configState).emptyConfigMap {
// Don't start the agent if there is no config to run
s.logger.Info("No config present, not starting agent.")
return
}

err := s.commander.Start(context.Background())
if err != nil {
s.logger.Error("Cannot start the agent", zap.Error(err))
Expand Down Expand Up @@ -1113,14 +1137,14 @@ func (s *Supervisor) runAgentProcess() {

func (s *Supervisor) stopAgentApplyConfig() {
s.logger.Debug("Stopping the agent to apply new config")
cfg := s.mergedConfig.Load().(string)
cfgState := s.cfgState.Load().(*configState)
err := s.commander.Stop(context.Background())

if err != nil {
s.logger.Error("Could not stop agent process", zap.Error(err))
}

if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfg), 0600); err != nil {
if err := os.WriteFile(s.agentConfigFilePath(), []byte(cfgState.mergedConfig), 0600); err != nil {
s.logger.Error("Failed to write agent config.", zap.Error(err))
}
}
Expand Down
21 changes: 11 additions & 10 deletions cmd/opampsupervisor/supervisor/supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func Test_composeEffectiveConfig(t *testing.T) {
pidProvider: staticPIDProvider(1234),
hasNewConfig: make(chan struct{}, 1),
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
agentHealthCheckEndpoint: "localhost:8000",
}

Expand Down Expand Up @@ -87,7 +87,7 @@ service:
expectedConfig = bytes.ReplaceAll(expectedConfig, []byte("\r\n"), []byte("\n"))

require.True(t, configChanged)
require.Equal(t, string(expectedConfig), s.mergedConfig.Load().(string))
require.Equal(t, string(expectedConfig), s.cfgState.Load().(*configState).mergedConfig)
}

func Test_onMessage(t *testing.T) {
Expand All @@ -104,7 +104,7 @@ func Test_onMessage(t *testing.T) {
persistentState: &persistentState{InstanceID: initialID},
agentDescription: agentDesc,
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentHealthCheckEndpoint: "localhost:8000",
opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())),
Expand Down Expand Up @@ -133,7 +133,7 @@ func Test_onMessage(t *testing.T) {
persistentState: &persistentState{InstanceID: testUUID},
agentDescription: agentDesc,
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentHealthCheckEndpoint: "localhost:8000",
}
Expand Down Expand Up @@ -179,7 +179,7 @@ func Test_onMessage(t *testing.T) {
hasNewConfig: make(chan struct{}, 1),
persistentState: &persistentState{InstanceID: testUUID},
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentConn: agentConnAtomic,
agentHealthCheckEndpoint: "localhost:8000",
Expand Down Expand Up @@ -260,7 +260,7 @@ func Test_onMessage(t *testing.T) {
persistentState: &persistentState{InstanceID: initialID},
agentDescription: agentDesc,
agentConfigOwnMetricsSection: &atomic.Value{},
mergedConfig: &atomic.Value{},
cfgState: &atomic.Value{},
effectiveConfig: &atomic.Value{},
agentHealthCheckEndpoint: "localhost:8000",
opampClient: client.NewHTTP(newLoggerFromZap(zap.NewNop())),
Expand All @@ -286,10 +286,11 @@ func Test_onMessage(t *testing.T) {
})

require.Equal(t, newID, s.persistentState.InstanceID)
t.Log(s.mergedConfig.Load())
require.Contains(t, s.mergedConfig.Load(), "prometheus/own_metrics")
require.Contains(t, s.mergedConfig.Load(), newID.String())
require.Contains(t, s.mergedConfig.Load(), "runtime.type: test")
t.Log(s.cfgState.Load())
mergedCfg := s.cfgState.Load().(*configState).mergedConfig
require.Contains(t, mergedCfg, "prometheus/own_metrics")
require.Contains(t, mergedCfg, newID.String())
require.Contains(t, mergedCfg, "runtime.type: test")
})
}

Expand Down