Skip to content

Commit

Permalink
Add OpAMP supervisor skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
evan-bradley committed Jun 13, 2023
1 parent 9f854da commit 583d06f
Show file tree
Hide file tree
Showing 18 changed files with 1,539 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

cmd/configschema/ @open-telemetry/collector-contrib-approvers @mx-psi @dmitryax @pmcollins
cmd/mdatagen/ @open-telemetry/collector-contrib-approvers @dmitryax
cmd/opampsupervisor/ @open-telemetry/collector-contrib-approvers @evan-bradley @atoulme @tigrannajaryan
cmd/otelcontribcol/ @open-telemetry/collector-contrib-approvers
cmd/oteltestbedcol/ @open-telemetry/collector-contrib-approvers
cmd/telemetrygen/ @open-telemetry/collector-contrib-approvers @mx-psi @codeboten
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ body:
# Start Collector components list
- cmd/configschema
- cmd/mdatagen
- cmd/opampsupervisor
- cmd/otelcontribcol
- cmd/oteltestbedcol
- cmd/telemetrygen
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ body:
# Start Collector components list
- cmd/configschema
- cmd/mdatagen
- cmd/opampsupervisor
- cmd/otelcontribcol
- cmd/oteltestbedcol
- cmd/telemetrygen
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ body:
# Start Collector components list
- cmd/configschema
- cmd/mdatagen
- cmd/opampsupervisor
- cmd/otelcontribcol
- cmd/oteltestbedcol
- cmd/telemetrygen
Expand Down
2 changes: 2 additions & 0 deletions cmd/opampsupervisor/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
effective.yaml
agent.log
1 change: 1 addition & 0 deletions cmd/opampsupervisor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
24 changes: 24 additions & 0 deletions cmd/opampsupervisor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# OpAMP Supervisor for the OpenTelemetry Collector

This is an implementation of an OpAMP Supervisor that runs a Collector instance using configuration provided from an OpAMP server. This implementation
is following a design specified in this [Google Doc](https://docs.google.com/document/d/1KtH5atZQUs9Achbce6LiOaJxLbksNJenvgvyKLsJrkc/edit).
The design is still undergoing changes, and as such this implementation may change as well.

## Experimenting with the supervisor

The supervisor is currently undergoing heavy development and is not ready for any serious use. However, if you would like to test it, you can follow the steps below:

1. Download the [opamp-go](https://github.com/open-telemetry/opamp-go) repository, and run the OpAMP example server in the `internal/examples/server` directory.
2. From the Collector contrib repository root, build the Collector:

```shell
make otelcontribcol
```

3. Run the supervisor, substituting `<OS>` for your platform:

```shell
go run . --config testdata/supervisor_<OS>.yaml
```

4. The supervisor should connect to the OpAMP server and start a Collector instance.
28 changes: 28 additions & 0 deletions cmd/opampsupervisor/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor

go 1.19

require (
github.com/cenkalti/backoff/v4 v4.2.0
github.com/knadh/koanf v1.5.0
github.com/oklog/ulid/v2 v2.1.0
github.com/open-telemetry/opamp-go v0.6.0
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.13
go.uber.org/atomic v1.7.0
go.uber.org/zap v1.17.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.8.2 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
414 changes: 414 additions & 0 deletions cmd/opampsupervisor/go.sum

Large diffs are not rendered by default.

43 changes: 43 additions & 0 deletions cmd/opampsupervisor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"
"os"
"os/signal"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor"
"go.uber.org/zap"
)

func main() {
configFlag := flag.String("config", "", "Path to a supervisor configuration file")
flag.Parse()

logger, _ := zap.NewDevelopment()

supervisor, err := supervisor.NewSupervisor(logger, *configFlag)
if err != nil {
logger.Error(err.Error())
os.Exit(-1)
return
}

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
supervisor.Shutdown()
}
192 changes: 192 additions & 0 deletions cmd/opampsupervisor/supervisor/commander/commander.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package commander

import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"syscall"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config"
)

// Commander can start/stop/restart the Agent executable and also watch for a signal
// for the Agent process to finish.
type Commander struct {
logger *zap.Logger
cfg *config.Agent
args []string
cmd *exec.Cmd
doneCh chan struct{}
waitCh chan struct{}
running *atomic.Int64
}

func NewCommander(logger *zap.Logger, cfg *config.Agent, args ...string) (*Commander, error) {
if cfg.Executable == "" {
return nil, errors.New("agent.executable config option must be specified")
}

return &Commander{
logger: logger,
cfg: cfg,
args: args,
running: atomic.NewInt64(0),
}, nil
}

// Start the Agent and begin watching the process.
// Agent's stdout and stderr are written to a file.
func (c *Commander) Start(ctx context.Context) error {
c.logger.Debug("Starting agent", zap.String("agent", c.cfg.Executable))

logFilePath := "agent.log"
logFile, err := os.Create(logFilePath)
if err != nil {
return fmt.Errorf("cannot create %s: %w", logFilePath, err)
}

c.cmd = exec.CommandContext(ctx, c.cfg.Executable, c.args...) // #nosec G204

// Capture standard output and standard error.
c.cmd.Stdout = logFile
c.cmd.Stderr = logFile

c.doneCh = make(chan struct{}, 1)
c.waitCh = make(chan struct{})

if err := c.cmd.Start(); err != nil {
return err
}

c.logger.Debug("Agent process started", zap.Int("pid", c.cmd.Process.Pid))
c.running.Store(1)

go c.watch()

return nil
}

func (c *Commander) Restart(ctx context.Context) error {
if err := c.Stop(ctx); err != nil {
return err
}
if err := c.Start(ctx); err != nil {
return err
}
return nil
}

func (c *Commander) watch() {
err := c.cmd.Wait()

// cmd.Wait returns an exec.ExitError when the Collector exits unsuccessfully or stops
// after receiving a signal. The Commander caller will handle these cases, so we filter
// them out here.
if _, ok := err.(*exec.ExitError); err != nil && !ok {
c.logger.Error("An error occurred while watching the agent process", zap.Error(err))
}

c.doneCh <- struct{}{}
c.running.Store(0)
close(c.waitCh)
}

// Done returns a channel that will send a signal when the Agent process is finished.
func (c *Commander) Done() <-chan struct{} {
return c.doneCh
}

// Pid returns Agent process PID if it is started or 0 if it is not.
func (c *Commander) Pid() int {
if c.cmd == nil || c.cmd.Process == nil {
return 0
}
return c.cmd.Process.Pid
}

// ExitCode returns Agent process exit code if it exited or 0 if it is not.
func (c *Commander) ExitCode() int {
if c.cmd == nil || c.cmd.ProcessState == nil {
return 0
}
return c.cmd.ProcessState.ExitCode()
}

func (c *Commander) IsRunning() bool {
return c.running.Load() != 0
}

// Stop the Agent process. Sends SIGTERM to the process and wait for up 10 seconds
// and if the process does not finish kills it forcedly by sending SIGKILL.
// Returns after the process is terminated.
func (c *Commander) Stop(ctx context.Context) error {
if c.cmd == nil || c.cmd.Process == nil {
// Not started, nothing to do.
return nil
}

c.logger.Debug("Stopping agent process", zap.Int("pid", c.cmd.Process.Pid))

// Gracefully signal process to stop.
if err := c.cmd.Process.Signal(syscall.SIGTERM); err != nil {
return err
}

finished := make(chan struct{})

// Setup a goroutine to wait a while for process to finish and send kill signal
// to the process if it doesn't finish.
var innerErr error
go func() {
// Wait 10 seconds.
t := time.After(10 * time.Second)
select {
case <-ctx.Done():
break
case <-t:
break
case <-finished:
// Process is successfully finished.
c.logger.Debug("Agent process successfully stopped.", zap.Int("pid", c.cmd.Process.Pid))
return
}

// Time is out. Kill the process.
c.logger.Debug(
"Agent process is not responding to SIGTERM. Sending SIGKILL to kill forcedly.",
zap.Int("pid", c.cmd.Process.Pid))
if innerErr = c.cmd.Process.Signal(syscall.SIGKILL); innerErr != nil {
return
}
}()

// Wait for process to terminate
<-c.waitCh

c.running.Store(0)

// Let goroutine know process is finished.
close(finished)

return innerErr
}
29 changes: 29 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

// Supervisor is the Supervisor config file format.
type Supervisor struct {
Server *OpAMPServer
Agent *Agent
}

type OpAMPServer struct {
Endpoint string
}

type Agent struct {
Executable string
}
Loading

0 comments on commit 583d06f

Please sign in to comment.