Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[new-component] Operator OpAMP Bridge Service #1339

Merged
merged 17 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/1318-remote-config-service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. operator, target allocator, github action)
component: Remote Configuration

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Introducing the Operator Remote Configuration service, which allows a user to manage OpenTelemetry Collector CRDs via OpAMP

# One or more tracking issues related to the change
issues: [1318]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*.dylib
bin
vendor
.DS_Store

# Test binary, build with `go test -c`
*.test
Expand Down
254 changes: 254 additions & 0 deletions cmd/remote-configuration/agent/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package agent

import (
"bytes"
"context"
"errors"
"fmt"
"github.com/open-telemetry/opentelemetry-operator/cmd/remote-configuration/metrics"
"github.com/open-telemetry/opentelemetry-operator/cmd/remote-configuration/operator"
"gopkg.in/yaml.v3"
"strings"
"time"

"github.com/open-telemetry/opentelemetry-operator/cmd/remote-configuration/config"

"github.com/oklog/ulid/v2"
"go.uber.org/multierr"

"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
)

type Agent struct {
logger types.Logger

// A set of the applied object keys (name/namespace)
appliedKeys map[string]bool
startTime uint64
lastHash []byte

instanceId ulid.ULID
agentDescription *protobufs.AgentDescription
remoteConfigStatus *protobufs.RemoteConfigStatus

opampClient client.OpAMPClient
metricReporter *metrics.MetricReporter
config config.Config
applier operator.ConfigApplier
}

func NewAgent(logger types.Logger, applier operator.ConfigApplier, config config.Config, opampClient client.OpAMPClient) *Agent {
agent := &Agent{
config: config,
applier: applier,
logger: logger,
appliedKeys: map[string]bool{},
instanceId: config.GetNewInstanceId(),
agentDescription: config.GetDescription(),
opampClient: opampClient,
}

agent.logger.Debugf("Agent starting, id=%v, type=%s, version=%s.",
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
agent.instanceId.String(), config.GetAgentType(), config.GetAgentVersion())

return agent
}

func (agent *Agent) getHealth() *protobufs.AgentHealth {
return &protobufs.AgentHealth{
Healthy: true,
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
StartTimeUnixNano: agent.startTime,
LastError: "",
}
}

func (agent *Agent) onConnect() {
agent.logger.Debugf("Connected to the server.")
}

func (agent *Agent) onConnectFailed(err error) {
agent.logger.Errorf("Failed to connect to the server: %v", err)
}

func (agent *Agent) onError(err *protobufs.ServerErrorResponse) {
agent.logger.Errorf("Server returned an error response: %v", err.ErrorMessage)
}

func (agent *Agent) saveRemoteConfigStatus(_ context.Context, status *protobufs.RemoteConfigStatus) {
agent.remoteConfigStatus = status
}

func (agent *Agent) Start() error {
agent.startTime = uint64(time.Now().UnixNano())
settings := types.StartSettings{
OpAMPServerURL: agent.config.Endpoint,
InstanceUid: agent.instanceId.String(),
Callbacks: types.CallbacksStruct{
OnConnectFunc: agent.onConnect,
OnConnectFailedFunc: agent.onConnectFailed,
OnErrorFunc: agent.onError,
SaveRemoteConfigStatusFunc: agent.saveRemoteConfigStatus,
GetEffectiveConfigFunc: agent.getEffectiveConfig,
OnMessageFunc: agent.onMessage,
},
RemoteConfigStatus: agent.remoteConfigStatus,
PackagesStateProvider: nil,
Capabilities: agent.config.GetCapabilities(),
}
err := agent.opampClient.SetAgentDescription(agent.agentDescription)
if err != nil {
return err
}
err = agent.opampClient.SetHealth(agent.getHealth())
if err != nil {
return err
}

agent.logger.Debugf("Starting OpAMP client...")

err = agent.opampClient.Start(context.Background(), settings)
if err != nil {
return err
}

agent.logger.Debugf("OpAMP Client started.")

return nil
}

func (agent *Agent) updateAgentIdentity(instanceId ulid.ULID) {
agent.logger.Debugf("Agent identify is being changed from id=%v to id=%v",
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
agent.instanceId.String(),
instanceId.String())
agent.instanceId = instanceId

if agent.metricReporter != nil {
// TODO: reinit or update meter (possibly using a single function to update all own connection settings
// or with having a common resource factory or so)
}
}

func (agent *Agent) getNameAndNamespace(key string) (string, string, error) {
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
s := strings.Split(key, "/")
// We expect map keys to be of the form name/namespace
if len(s) != 2 {
return "", "", errors.New("invalid key")
}
return s[0], s[1], nil
}

func (agent *Agent) makeKeyFromNameNamespace(name string, namespace string) string {
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Sprintf("%s/%s", name, namespace)
}

func (agent *Agent) getEffectiveConfig(ctx context.Context) (*protobufs.EffectiveConfig, error) {
instances, err := agent.applier.ListInstances()
if err != nil {
agent.logger.Errorf("couldn't list instances", err)
return nil, err
}
instanceMap := map[string]*protobufs.AgentConfigFile{}
for _, instance := range instances {
marshalled, err := yaml.Marshal(instance)
if err != nil {
agent.logger.Errorf("couldn't marshal collector configuration", err)
return nil, err
}
mapKey := agent.makeKeyFromNameNamespace(instance.GetName(), instance.GetNamespace())
instanceMap[mapKey] = &protobufs.AgentConfigFile{
Body: marshalled,
ContentType: "yaml",
}
}
return &protobufs.EffectiveConfig{
ConfigMap: &protobufs.AgentConfigMap{
ConfigMap: instanceMap,
},
}, nil
}

func (agent *Agent) initMeter(settings *protobufs.TelemetryConnectionSettings) {
reporter, err := metrics.NewMetricReporter(agent.logger, settings, agent.config.GetAgentType(), agent.config.GetAgentVersion(), agent.instanceId)
if err != nil {
agent.logger.Errorf("Cannot collect metrics: %v", err)
return
}

if agent.metricReporter != nil {
agent.metricReporter.Shutdown()
}
agent.metricReporter = reporter
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}

// Take the remote config, layer it over existing, done
// INVARIANT: The caller must verify that config isn't nil _and_ the configuration has changed between calls
func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (*protobufs.RemoteConfigStatus, error) {
var multiErr error
for key, file := range config.Config.GetConfigMap() {
if len(key) == 0 || len(file.Body) == 0 {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we continue? If there's a partial success, the config hash is no longer accurate to what's being provided. Is that okay?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a choice by me to say that it's okay to have a partial success, i agree the configHash would not be accurate though. Should we just throw away everything in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. 😕 It seems difficult to make this function an atomic operation.

}
name, namespace, err := agent.getNameAndNamespace(key)
if err != nil {
multiErr = multierr.Append(multiErr, err)
continue
}
err = agent.applier.Apply(name, namespace, file)
if err != nil {
multiErr = multierr.Append(multiErr, err)
continue
}
agent.appliedKeys[key] = true
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}
if multiErr != nil {
return &protobufs.RemoteConfigStatus{
LastRemoteConfigHash: config.GetConfigHash(),
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED,
ErrorMessage: multiErr.Error(),
}, multiErr
}
agent.lastHash = config.ConfigHash
return &protobufs.RemoteConfigStatus{
LastRemoteConfigHash: config.ConfigHash,
Status: protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED,
}, nil
}

func (agent *Agent) Shutdown() {
agent.logger.Debugf("Agent shutting down...")
if agent.opampClient != nil {
_ = agent.opampClient.Stop(context.Background())
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) {

// If we received remote configuration, and it's not the same as the previously applied one
if msg.RemoteConfig != nil && !bytes.Equal(agent.lastHash, msg.RemoteConfig.GetConfigHash()) {
var err error
status, err := agent.applyRemoteConfig(msg.RemoteConfig)
setErr := agent.opampClient.SetRemoteConfigStatus(status)
if setErr != nil {
return
}
err = agent.opampClient.UpdateEffectiveConfig(ctx)
if err != nil {
agent.logger.Errorf(err.Error())
}
}

// TODO: figure out why metrics aren't working
//if msg.OwnMetricsConnSettings != nil {
// agent.initMeter(msg.OwnMetricsConnSettings)
//}

if msg.AgentIdentification != nil {
newInstanceId, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid)
if err != nil {
agent.logger.Errorf(err.Error())
}
agent.updateAgentIdentity(newInstanceId)
}
}
Loading