Skip to content

Commit

Permalink
feat: Added OpAMP PackageStatuses functionality & basic response to P…
Browse files Browse the repository at this point in the history
…ackagesAvailable (#550)

* Added OpAMP PackageStatuses functionality & basic response to PackagesAvailable

* Add new data model for marshal/unmarshaling OpAMP package statuses.
  • Loading branch information
StefanKurek authored and Corbin Phelps committed Aug 1, 2022
1 parent 8372d82 commit 5e2d17e
Show file tree
Hide file tree
Showing 8 changed files with 1,657 additions and 10 deletions.
231 changes: 231 additions & 0 deletions opamp/mocks/mock_packages_state_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

130 changes: 120 additions & 10 deletions opamp/observiq/observiq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ var (
// Ensure interface is satisfied
var _ opamp.Client = (*Client)(nil)

// mainPackageName is the name for the top level packages for this collector
const mainPackageName = "observiq-otel-collector"

// Client represents a client that is connected to Iris via OpAmp
type Client struct {
opampClient client.OpAMPClient
logger *zap.Logger
ident *identity
configManager opamp.ConfigManager
collector collector.Collector
opampClient client.OpAMPClient
logger *zap.Logger
ident *identity
configManager opamp.ConfigManager
collector collector.Collector
packagesStateProvider types.PackagesStateProvider

currentConfig opamp.Config
}
Expand All @@ -67,12 +71,15 @@ func NewClient(args *NewClientArgs) (opamp.Client, error) {

configManager := NewAgentConfigManager(args.DefaultLogger)

packagesStateProvider := newPackagesStateProvider(clientLogger, "package_statuses.yaml")

observiqClient := &Client{
logger: clientLogger,
ident: newIdentity(clientLogger, args.Config),
configManager: configManager,
collector: args.Collector,
currentConfig: args.Config,
logger: clientLogger,
ident: newIdentity(clientLogger, args.Config),
configManager: configManager,
collector: args.Collector,
currentConfig: args.Config,
packagesStateProvider: packagesStateProvider,
}

// Parse URL to determin scheme
Expand Down Expand Up @@ -157,6 +164,7 @@ func (c *Client) Connect(ctx context.Context) error {
// OnCommandFunc
// SaveRemoteConfigStatusFunc
},
PackagesStateProvider: c.packagesStateProvider,
}

// Start the embedded collector
Expand Down Expand Up @@ -196,6 +204,11 @@ func (c *Client) onMessageFuncHandler(ctx context.Context, msg *types.MessageDat
c.logger.Error("Error while processing Remote Config Change", zap.Error(err))
}
}
if msg.PackagesAvailable != nil {
if err := c.onPackagesAvailableHandler(msg.PackagesAvailable); err != nil {
c.logger.Error("Error while processing Packages Available Change", zap.Error(err))
}
}
}

func (c *Client) onRemoteConfigHandler(ctx context.Context, remoteConfig *protobufs.AgentRemoteConfig) error {
Expand Down Expand Up @@ -229,6 +242,103 @@ func (c *Client) onRemoteConfigHandler(ctx context.Context, remoteConfig *protob
return nil
}

func (c *Client) onPackagesAvailableHandler(packagesAvailable *protobufs.PackagesAvailable) error {
c.logger.Debug("Packages available handler")

// Initialize PackageStatuses that will eventually be sent back to server
curPackageStatuses := &protobufs.PackageStatuses{
ServerProvidedAllPackagesHash: packagesAvailable.GetAllPackagesHash(),
Packages: map[string]*protobufs.PackageStatus{},
}

// Retrieve last known status (this should return with minimal info even on first time)
lastPackageStatuses, err := c.packagesStateProvider.LastReportedStatuses()

// If there is a problem retrieving the last saved PackageStatuses, we will log the error
// but continue on as the only thing missing will be the agent package hash.
if err != nil {
c.logger.Warn("Failed to retrieve last reported package statuses", zap.Error(err))
}

var lastPkgStatusMap map[string]*protobufs.PackageStatus
if lastPackageStatuses != nil {
lastPkgStatusMap = lastPackageStatuses.GetPackages()
}

curPackages := c.createPackageStatusMap(packagesAvailable.GetPackages(), lastPkgStatusMap)
curPackageStatuses.Packages = curPackages

// We may have to modify this in the future to return an error if this causes issues for the actual Update
if err = c.packagesStateProvider.SetLastReportedStatuses(curPackageStatuses); err != nil {
c.logger.Warn("Failed to save last reported package statuses", zap.Error(err))
}

if err = c.opampClient.SetPackageStatuses(curPackageStatuses); err != nil {
return fmt.Errorf("failed to set package statuses: %w", err)
}

return nil
}

func (c *Client) createPackageStatusMap(
pkgAvailMap map[string]*protobufs.PackageAvailable,
lastPkgStatusMap map[string]*protobufs.PackageStatus) map[string]*protobufs.PackageStatus {
pkgStatusMap := map[string]*protobufs.PackageStatus{}

// Loop through all of the available packages sent from the server
for name, availPkg := range pkgAvailMap {
switch name {
// If it's an expected package, return an installing status
case mainPackageName:
var agentHash []byte
if lastPkgStatusMap != nil && lastPkgStatusMap[name] != nil {
if lastPkgStatusMap[name].GetAgentHasVersion() != version.Version() {
c.logger.Debug(fmt.Sprintf(
"Version: %s and last reported package status version: %s differ",
version.Version(),
lastPkgStatusMap[name].GetAgentHasVersion()))
} else {
agentHash = lastPkgStatusMap[name].GetAgentHasHash()
}
}

pkgStatusMap[name] = &protobufs.PackageStatus{
Name: name,
AgentHasVersion: version.Version(),
AgentHasHash: agentHash,
ServerOfferedVersion: availPkg.GetVersion(),
ServerOfferedHash: availPkg.GetHash(),
Status: protobufs.PackageStatus_Installed,
}

if version.Version() == availPkg.GetVersion() {
if agentHash == nil {
pkgStatusMap[name].AgentHasHash = availPkg.GetHash()
}
} else {
if availPkg.GetVersion() != "" {
pkgStatusMap[name].Status = protobufs.PackageStatus_Installing
}
}

if availPkg.GetVersion() != "" && version.Version() != availPkg.GetVersion() {
pkgStatusMap[name].Status = protobufs.PackageStatus_Installing
}
// If it's not an expected package, return a failed status
default:
pkgStatusMap[name] = &protobufs.PackageStatus{
Name: name,
ServerOfferedVersion: availPkg.GetVersion(),
ServerOfferedHash: availPkg.GetHash(),
Status: protobufs.PackageStatus_InstallFailed,
ErrorMessage: fmt.Sprintf("Package %s not supported", name),
}
}
}

return pkgStatusMap
}

func (c *Client) onGetEffectiveConfigHandler(_ context.Context) (*protobufs.EffectiveConfig, error) {
c.logger.Debug("Remote Compose Effective config handler")
return c.configManager.ComposeEffectiveConfig()
Expand Down
Loading

0 comments on commit 5e2d17e

Please sign in to comment.