Skip to content

Commit

Permalink
Created packagestate module (#579)
Browse files Browse the repository at this point in the history
* Broke package status objects into their own file

Signed-off-by: Corbin Phelps <[email protected]>

* Updated main module to reference packagestate module

Signed-off-by: Corbin Phelps <[email protected]>

* Fixed licsense check for new module

Signed-off-by: Corbin Phelps <[email protected]>

* Created interface and mocks for package state manager

Signed-off-by: Corbin Phelps <[email protected]>

* Changed PackageStateProvider to use interface of StateManager

Signed-off-by: Corbin Phelps <[email protected]>

* Fixed up linux test for package state manager

Signed-off-by: Corbin Phelps <[email protected]>
  • Loading branch information
Corbin Phelps committed Jul 29, 2022
1 parent 50e59b5 commit b0ccec5
Show file tree
Hide file tree
Showing 15 changed files with 748 additions and 290 deletions.
7 changes: 7 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,10 @@ updates:
commit-message:
prefix: "deps"
include: "scope"
- package-ecosystem: "gomod"
directory: "/packagestate"
schedule:
interval: "weekly"
commit-message:
prefix: "deps"
include: "scope"
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-collector v0.0.3-0.20220711143229-08f2752ed367
github.com/google/uuid v1.3.0
github.com/observiq/observiq-otel-collector/exporter/googlecloudexporter v1.3.0
github.com/observiq/observiq-otel-collector/packagestate v0.0.0
github.com/observiq/observiq-otel-collector/processor/resourceattributetransposerprocessor v1.3.0
github.com/observiq/observiq-otel-collector/receiver/pluginreceiver v1.3.0
github.com/open-telemetry/opamp-go v0.2.0
Expand Down Expand Up @@ -470,3 +471,5 @@ replace github.com/observiq/observiq-otel-collector/processor/resourceattributet
replace github.com/observiq/observiq-otel-collector/receiver/pluginreceiver => ./receiver/pluginreceiver

replace github.com/observiq/observiq-otel-collector/exporter/googlecloudexporter => ./exporter/googlecloudexporter

replace github.com/observiq/observiq-otel-collector/packagestate => ./packagestate
1 change: 1 addition & 0 deletions license.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ exceptions:
- path: "./processor/resourceattributetransposerprocessor"
- path: "./receiver/pluginreceiver"
- path: "./exporter/googlecloudexporter"
- path: "./packagestate"
22 changes: 10 additions & 12 deletions opamp/observiq/observiq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/observiq/observiq-otel-collector/collector"
"github.com/observiq/observiq-otel-collector/internal/version"
"github.com/observiq/observiq-otel-collector/opamp"
"github.com/observiq/observiq-otel-collector/packagestate"
"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -40,9 +41,6 @@ var (
// Ensure interface is satisfied
var _ opamp.Client = (*Client)(nil)

// mainPackageName is the name for the top level packages for this collector
const mainPackageName = "observiq-otel-collector"

// Client represents a client that is connected to Iris via OpAmp
type Client struct {
opampClient client.OpAMPClient
Expand Down Expand Up @@ -214,7 +212,7 @@ func (c *Client) onConnectHandler() {
return
}

lastMainPackageStatus := lastPackageStatuses.Packages[mainPackageName]
lastMainPackageStatus := lastPackageStatuses.Packages[packagestate.CollectorPackageName]
// If in the middle of an install and we just connected, this is most likely becasue the collector was just spun up fresh by the Updater.
// If the current version matches the server offered version, this implies a good install and so we should set the PackageStatuses and
// send it to the OpAMP Server. If the version does not match, just change the PackageStatues JSON so that the Updater can start rollback.
Expand Down Expand Up @@ -342,7 +340,7 @@ func (c *Client) onPackagesAvailableHandler(packagesAvailable *protobufs.Package
}

// Start update if applicable
collectorDownloadableFile := curPackageFiles[mainPackageName]
collectorDownloadableFile := curPackageFiles[packagestate.CollectorPackageName]
if collectorDownloadableFile != nil {
c.safeSetUpdatingPackage(true)
go c.installPackageFromFile(collectorDownloadableFile, curPackageStatuses)
Expand All @@ -361,7 +359,7 @@ func (c *Client) createPackageMaps(
for name, availPkg := range pkgAvailMap {
switch name {
// If it's an expected package, return an installing status
case mainPackageName:
case packagestate.CollectorPackageName:
var agentHash []byte
if lastPkgStatusMap != nil && lastPkgStatusMap[name] != nil {
if lastPkgStatusMap[name].GetAgentHasVersion() != version.Version() {
Expand Down Expand Up @@ -419,9 +417,9 @@ func (c *Client) createPackageMaps(
func (c *Client) installPackageFromFile(file *protobufs.DownloadableFile, curPackageStatuses *protobufs.PackageStatuses) {
if fileManagerErr := c.downloadableFileManager.FetchAndExtractArchive(file); fileManagerErr != nil {
// Change existing status to show that install failed and get ready to send
curPackageStatuses.Packages[mainPackageName].Status = protobufs.PackageStatus_InstallFailed
curPackageStatuses.Packages[mainPackageName].ErrorMessage =
fmt.Sprintf("Failed to download and verify package %s's downloadable file", mainPackageName)
curPackageStatuses.Packages[packagestate.CollectorPackageName].Status = protobufs.PackageStatus_InstallFailed
curPackageStatuses.Packages[packagestate.CollectorPackageName].ErrorMessage =
fmt.Sprintf("Failed to download and verify package %s's downloadable file", packagestate.CollectorPackageName)

if err := c.packagesStateProvider.SetLastReportedStatuses(curPackageStatuses); err != nil {
c.logger.Error("Failed to save last reported package statuses", zap.Error(err))
Expand Down Expand Up @@ -454,7 +452,7 @@ func (c *Client) attemptFailedInstall(errMsg string) {
return
}

lastMainPackageStatus := lastPackageStatuses.Packages[mainPackageName]
lastMainPackageStatus := lastPackageStatuses.Packages[packagestate.CollectorPackageName]
lastMainPackageStatus.Status = protobufs.PackageStatus_InstallFailed
lastMainPackageStatus.ErrorMessage = errMsg

Expand All @@ -471,12 +469,12 @@ func (c *Client) getMainPackageInstallingLastStatuses() *protobufs.PackageStatus
}

// If we have no info on our main package, nothing else to do
if lastPackageStatuses == nil || lastPackageStatuses.Packages == nil || lastPackageStatuses.Packages[mainPackageName] == nil {
if lastPackageStatuses == nil || lastPackageStatuses.Packages == nil || lastPackageStatuses.Packages[packagestate.CollectorPackageName] == nil {
c.logger.Warn("Failed to retrieve last reported package statuses for main package")
return nil
}

lastMainPackageStatus := lastPackageStatuses.Packages[mainPackageName]
lastMainPackageStatus := lastPackageStatuses.Packages[packagestate.CollectorPackageName]

// If we were not installing before the connection, nothing else to do
if lastMainPackageStatus.Status != protobufs.PackageStatus_Installing {
Expand Down
87 changes: 44 additions & 43 deletions opamp/observiq/observiq_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/observiq/observiq-otel-collector/internal/version"
"github.com/observiq/observiq-otel-collector/opamp"
"github.com/observiq/observiq-otel-collector/opamp/mocks"
"github.com/observiq/observiq-otel-collector/packagestate"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -324,8 +325,8 @@ func TestClientConnect(t *testing.T) {
desc: "Problem connecting & not installing",
testFunc: func(*testing.T) {
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
ServerOfferedVersion: version.Version(),
Status: protobufs.PackageStatus_Installed,
Expand Down Expand Up @@ -366,8 +367,8 @@ func TestClientConnect(t *testing.T) {
newHash := []byte("newHash")
newVersion := "99.99.99"
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand All @@ -393,13 +394,13 @@ func TestClientConnect(t *testing.T) {
assert.Equal(t, "", status.ErrorMessage)
assert.Equal(t, allHash, status.ServerProvidedAllPackagesHash)
assert.Equal(t, 1, len(status.Packages))
assert.Equal(t, mainPackageName, status.Packages[mainPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[mainPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[mainPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[mainPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[mainPackageName].ServerOfferedHash)
assert.Equal(t, fmt.Sprintf("Error while setting agent description: %s", expectedErr), status.Packages[mainPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[mainPackageName].Status)
assert.Equal(t, packagestate.CollectorPackageName, status.Packages[packagestate.CollectorPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[packagestate.CollectorPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[packagestate.CollectorPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[packagestate.CollectorPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[packagestate.CollectorPackageName].ServerOfferedHash)
assert.Equal(t, fmt.Sprintf("Error while setting agent description: %s", expectedErr), status.Packages[packagestate.CollectorPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[packagestate.CollectorPackageName].Status)
})

c := &Client{
Expand Down Expand Up @@ -490,8 +491,8 @@ func TestClient_onConnectHandler(t *testing.T) {
newVersion := "99.99.99"
errorMessage := "problem"
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand Down Expand Up @@ -524,8 +525,8 @@ func TestClient_onConnectHandler(t *testing.T) {
newHash := []byte("newHash")
newVersion := "99.99.99"
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand All @@ -547,13 +548,13 @@ func TestClient_onConnectHandler(t *testing.T) {
assert.Equal(t, "", status.ErrorMessage)
assert.Equal(t, allHash, status.ServerProvidedAllPackagesHash)
assert.Equal(t, 1, len(status.Packages))
assert.Equal(t, mainPackageName, status.Packages[mainPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[mainPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[mainPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[mainPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[mainPackageName].ServerOfferedHash)
assert.Equal(t, "", status.Packages[mainPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[mainPackageName].Status)
assert.Equal(t, packagestate.CollectorPackageName, status.Packages[packagestate.CollectorPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[packagestate.CollectorPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[packagestate.CollectorPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[packagestate.CollectorPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[packagestate.CollectorPackageName].ServerOfferedHash)
assert.Equal(t, "", status.Packages[packagestate.CollectorPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[packagestate.CollectorPackageName].Status)
})

c := &Client{
Expand All @@ -573,8 +574,8 @@ func TestClient_onConnectHandler(t *testing.T) {
oldVersion := "99.99.99"
newVersion := version.Version()
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: oldVersion,
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand All @@ -598,13 +599,13 @@ func TestClient_onConnectHandler(t *testing.T) {
assert.Equal(t, "", status.ErrorMessage)
assert.Equal(t, allHash, status.ServerProvidedAllPackagesHash)
assert.Equal(t, 1, len(status.Packages))
assert.Equal(t, mainPackageName, status.Packages[mainPackageName].Name)
assert.Equal(t, newVersion, status.Packages[mainPackageName].AgentHasVersion)
assert.Equal(t, newHash, status.Packages[mainPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[mainPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[mainPackageName].ServerOfferedHash)
assert.Equal(t, "", status.Packages[mainPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_Installed, status.Packages[mainPackageName].Status)
assert.Equal(t, packagestate.CollectorPackageName, status.Packages[packagestate.CollectorPackageName].Name)
assert.Equal(t, newVersion, status.Packages[packagestate.CollectorPackageName].AgentHasVersion)
assert.Equal(t, newHash, status.Packages[packagestate.CollectorPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[packagestate.CollectorPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[packagestate.CollectorPackageName].ServerOfferedHash)
assert.Equal(t, "", status.Packages[packagestate.CollectorPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_Installed, status.Packages[packagestate.CollectorPackageName].Status)
})

c := &Client{
Expand Down Expand Up @@ -672,8 +673,8 @@ func TestClient_onConnectFailedHandler(t *testing.T) {
newVersion := "99.99.99"
errorMessage := "problem"
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand Down Expand Up @@ -706,8 +707,8 @@ func TestClient_onConnectFailedHandler(t *testing.T) {
newHash := []byte("newHash")
newVersion := "99.99.99"
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand All @@ -729,13 +730,13 @@ func TestClient_onConnectFailedHandler(t *testing.T) {
assert.Equal(t, "", status.ErrorMessage)
assert.Equal(t, allHash, status.ServerProvidedAllPackagesHash)
assert.Equal(t, 1, len(status.Packages))
assert.Equal(t, mainPackageName, status.Packages[mainPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[mainPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[mainPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[mainPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[mainPackageName].ServerOfferedHash)
assert.Equal(t, fmt.Sprintf("Failed to connect to BindPlane: %s", expectedErr), status.Packages[mainPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[mainPackageName].Status)
assert.Equal(t, packagestate.CollectorPackageName, status.Packages[packagestate.CollectorPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[packagestate.CollectorPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[packagestate.CollectorPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[packagestate.CollectorPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[packagestate.CollectorPackageName].ServerOfferedHash)
assert.Equal(t, fmt.Sprintf("Failed to connect to BindPlane: %s", expectedErr), status.Packages[packagestate.CollectorPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[packagestate.CollectorPackageName].Status)
})

c := &Client{
Expand Down Expand Up @@ -931,7 +932,7 @@ func TestClient_onRemoteConfigHandler(t *testing.T) {
}

func TestClient_onPackagesAvailableHandler(t *testing.T) {
collectorPackageName := mainPackageName
collectorPackageName := packagestate.CollectorPackageName
allHash := []byte("totalhash0")
newAllHash := []byte("totalhash1")
packageHash := []byte("hash0")
Expand Down
Loading

0 comments on commit b0ccec5

Please sign in to comment.