Skip to content

Commit

Permalink
Created packagestate module (#579)
Browse files Browse the repository at this point in the history
* Broke package status objects into their own file

Signed-off-by: Corbin Phelps <[email protected]>

* Updated main module to reference packagestate module

Signed-off-by: Corbin Phelps <[email protected]>

* Fixed licsense check for new module

Signed-off-by: Corbin Phelps <[email protected]>

* Created interface and mocks for package state manager

Signed-off-by: Corbin Phelps <[email protected]>

* Changed PackageStateProvider to use interface of StateManager

Signed-off-by: Corbin Phelps <[email protected]>

* Fixed up linux test for package state manager

Signed-off-by: Corbin Phelps <[email protected]>
  • Loading branch information
Corbin Phelps authored Jul 20, 2022
1 parent 9225370 commit 49f7831
Show file tree
Hide file tree
Showing 16 changed files with 751 additions and 292 deletions.
7 changes: 7 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,10 @@ updates:
commit-message:
prefix: "deps"
include: "scope"
- package-ecosystem: "gomod"
directory: "/packagestate"
schedule:
interval: "weekly"
commit-message:
prefix: "deps"
include: "scope"
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/googlemanagedprometheus v0.32.2
github.com/google/uuid v1.3.0
github.com/observiq/observiq-otel-collector/exporter/googlecloudexporter v1.3.0
github.com/observiq/observiq-otel-collector/packagestate v0.0.0
github.com/observiq/observiq-otel-collector/processor/resourceattributetransposerprocessor v1.3.0
github.com/observiq/observiq-otel-collector/receiver/pluginreceiver v1.3.0
github.com/open-telemetry/opamp-go v0.2.0
Expand Down Expand Up @@ -110,7 +111,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.54.0
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver v0.54.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.5
github.com/stretchr/testify v1.8.0
go.opentelemetry.io/collector v0.54.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.21.0
Expand Down Expand Up @@ -453,5 +454,7 @@ replace github.com/observiq/observiq-otel-collector/receiver/pluginreceiver => .

replace github.com/observiq/observiq-otel-collector/exporter/googlecloudexporter => ./exporter/googlecloudexporter

replace github.com/observiq/observiq-otel-collector/packagestate => ./packagestate

// see https://github.com/google/gnostic/issues/262
replace github.com/googleapis/gnostic v0.5.6 => github.com/googleapis/gnostic v0.5.5
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1616,8 +1616,9 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.7.5 h1:s5PTfem8p8EbKQOctVV53k6jCJt3UX4IEJzwh+C324Q=
github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 h1:kdXcSzyDtseVEc4yCz2qF8ZrQvIDBJLl4S1c3GCXmoI=
Expand Down
1 change: 1 addition & 0 deletions license.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ exceptions:
- path: "./processor/resourceattributetransposerprocessor"
- path: "./receiver/pluginreceiver"
- path: "./exporter/googlecloudexporter"
- path: "./packagestate"
22 changes: 10 additions & 12 deletions opamp/observiq/observiq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/observiq/observiq-otel-collector/collector"
"github.com/observiq/observiq-otel-collector/internal/version"
"github.com/observiq/observiq-otel-collector/opamp"
"github.com/observiq/observiq-otel-collector/packagestate"
"github.com/open-telemetry/opamp-go/client"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
Expand All @@ -40,9 +41,6 @@ var (
// Ensure interface is satisfied
var _ opamp.Client = (*Client)(nil)

// mainPackageName is the name for the top level packages for this collector
const mainPackageName = "observiq-otel-collector"

// Client represents a client that is connected to Iris via OpAmp
type Client struct {
opampClient client.OpAMPClient
Expand Down Expand Up @@ -214,7 +212,7 @@ func (c *Client) onConnectHandler() {
return
}

lastMainPackageStatus := lastPackageStatuses.Packages[mainPackageName]
lastMainPackageStatus := lastPackageStatuses.Packages[packagestate.CollectorPackageName]
// If in the middle of an install and we just connected, this is most likely becasue the collector was just spun up fresh by the Updater.
// If the current version matches the server offered version, this implies a good install and so we should set the PackageStatuses and
// send it to the OpAMP Server. If the version does not match, just change the PackageStatues JSON so that the Updater can start rollback.
Expand Down Expand Up @@ -342,7 +340,7 @@ func (c *Client) onPackagesAvailableHandler(packagesAvailable *protobufs.Package
}

// Start update if applicable
collectorDownloadableFile := curPackageFiles[mainPackageName]
collectorDownloadableFile := curPackageFiles[packagestate.CollectorPackageName]
if collectorDownloadableFile != nil {
c.safeSetUpdatingPackage(true)
go c.installPackageFromFile(collectorDownloadableFile, curPackageStatuses)
Expand All @@ -361,7 +359,7 @@ func (c *Client) createPackageMaps(
for name, availPkg := range pkgAvailMap {
switch name {
// If it's an expected package, return an installing status
case mainPackageName:
case packagestate.CollectorPackageName:
var agentHash []byte
if lastPkgStatusMap != nil && lastPkgStatusMap[name] != nil {
if lastPkgStatusMap[name].GetAgentHasVersion() != version.Version() {
Expand Down Expand Up @@ -419,9 +417,9 @@ func (c *Client) createPackageMaps(
func (c *Client) installPackageFromFile(file *protobufs.DownloadableFile, curPackageStatuses *protobufs.PackageStatuses) {
if fileManagerErr := c.downloadableFileManager.FetchAndExtractArchive(file); fileManagerErr != nil {
// Change existing status to show that install failed and get ready to send
curPackageStatuses.Packages[mainPackageName].Status = protobufs.PackageStatus_InstallFailed
curPackageStatuses.Packages[mainPackageName].ErrorMessage =
fmt.Sprintf("Failed to download and verify package %s's downloadable file", mainPackageName)
curPackageStatuses.Packages[packagestate.CollectorPackageName].Status = protobufs.PackageStatus_InstallFailed
curPackageStatuses.Packages[packagestate.CollectorPackageName].ErrorMessage =
fmt.Sprintf("Failed to download and verify package %s's downloadable file", packagestate.CollectorPackageName)

if err := c.packagesStateProvider.SetLastReportedStatuses(curPackageStatuses); err != nil {
c.logger.Error("Failed to save last reported package statuses", zap.Error(err))
Expand Down Expand Up @@ -454,7 +452,7 @@ func (c *Client) attemptFailedInstall(errMsg string) {
return
}

lastMainPackageStatus := lastPackageStatuses.Packages[mainPackageName]
lastMainPackageStatus := lastPackageStatuses.Packages[packagestate.CollectorPackageName]
lastMainPackageStatus.Status = protobufs.PackageStatus_InstallFailed
lastMainPackageStatus.ErrorMessage = errMsg

Expand All @@ -471,12 +469,12 @@ func (c *Client) getMainPackageInstallingLastStatuses() *protobufs.PackageStatus
}

// If we have no info on our main package, nothing else to do
if lastPackageStatuses == nil || lastPackageStatuses.Packages == nil || lastPackageStatuses.Packages[mainPackageName] == nil {
if lastPackageStatuses == nil || lastPackageStatuses.Packages == nil || lastPackageStatuses.Packages[packagestate.CollectorPackageName] == nil {
c.logger.Warn("Failed to retrieve last reported package statuses for main package")
return nil
}

lastMainPackageStatus := lastPackageStatuses.Packages[mainPackageName]
lastMainPackageStatus := lastPackageStatuses.Packages[packagestate.CollectorPackageName]

// If we were not installing before the connection, nothing else to do
if lastMainPackageStatus.Status != protobufs.PackageStatus_Installing {
Expand Down
87 changes: 44 additions & 43 deletions opamp/observiq/observiq_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/observiq/observiq-otel-collector/internal/version"
"github.com/observiq/observiq-otel-collector/opamp"
"github.com/observiq/observiq-otel-collector/opamp/mocks"
"github.com/observiq/observiq-otel-collector/packagestate"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -324,8 +325,8 @@ func TestClientConnect(t *testing.T) {
desc: "Problem connecting & not installing",
testFunc: func(*testing.T) {
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
ServerOfferedVersion: version.Version(),
Status: protobufs.PackageStatus_Installed,
Expand Down Expand Up @@ -366,8 +367,8 @@ func TestClientConnect(t *testing.T) {
newHash := []byte("newHash")
newVersion := "99.99.99"
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand All @@ -393,13 +394,13 @@ func TestClientConnect(t *testing.T) {
assert.Equal(t, "", status.ErrorMessage)
assert.Equal(t, allHash, status.ServerProvidedAllPackagesHash)
assert.Equal(t, 1, len(status.Packages))
assert.Equal(t, mainPackageName, status.Packages[mainPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[mainPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[mainPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[mainPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[mainPackageName].ServerOfferedHash)
assert.Equal(t, fmt.Sprintf("Error while setting agent description: %s", expectedErr), status.Packages[mainPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[mainPackageName].Status)
assert.Equal(t, packagestate.CollectorPackageName, status.Packages[packagestate.CollectorPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[packagestate.CollectorPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[packagestate.CollectorPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[packagestate.CollectorPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[packagestate.CollectorPackageName].ServerOfferedHash)
assert.Equal(t, fmt.Sprintf("Error while setting agent description: %s", expectedErr), status.Packages[packagestate.CollectorPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[packagestate.CollectorPackageName].Status)
})

c := &Client{
Expand Down Expand Up @@ -490,8 +491,8 @@ func TestClient_onConnectHandler(t *testing.T) {
newVersion := "99.99.99"
errorMessage := "problem"
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand Down Expand Up @@ -524,8 +525,8 @@ func TestClient_onConnectHandler(t *testing.T) {
newHash := []byte("newHash")
newVersion := "99.99.99"
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand All @@ -547,13 +548,13 @@ func TestClient_onConnectHandler(t *testing.T) {
assert.Equal(t, "", status.ErrorMessage)
assert.Equal(t, allHash, status.ServerProvidedAllPackagesHash)
assert.Equal(t, 1, len(status.Packages))
assert.Equal(t, mainPackageName, status.Packages[mainPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[mainPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[mainPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[mainPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[mainPackageName].ServerOfferedHash)
assert.Equal(t, "", status.Packages[mainPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[mainPackageName].Status)
assert.Equal(t, packagestate.CollectorPackageName, status.Packages[packagestate.CollectorPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[packagestate.CollectorPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[packagestate.CollectorPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[packagestate.CollectorPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[packagestate.CollectorPackageName].ServerOfferedHash)
assert.Equal(t, "", status.Packages[packagestate.CollectorPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[packagestate.CollectorPackageName].Status)
})

c := &Client{
Expand All @@ -573,8 +574,8 @@ func TestClient_onConnectHandler(t *testing.T) {
oldVersion := "99.99.99"
newVersion := version.Version()
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: oldVersion,
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand All @@ -598,13 +599,13 @@ func TestClient_onConnectHandler(t *testing.T) {
assert.Equal(t, "", status.ErrorMessage)
assert.Equal(t, allHash, status.ServerProvidedAllPackagesHash)
assert.Equal(t, 1, len(status.Packages))
assert.Equal(t, mainPackageName, status.Packages[mainPackageName].Name)
assert.Equal(t, newVersion, status.Packages[mainPackageName].AgentHasVersion)
assert.Equal(t, newHash, status.Packages[mainPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[mainPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[mainPackageName].ServerOfferedHash)
assert.Equal(t, "", status.Packages[mainPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_Installed, status.Packages[mainPackageName].Status)
assert.Equal(t, packagestate.CollectorPackageName, status.Packages[packagestate.CollectorPackageName].Name)
assert.Equal(t, newVersion, status.Packages[packagestate.CollectorPackageName].AgentHasVersion)
assert.Equal(t, newHash, status.Packages[packagestate.CollectorPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[packagestate.CollectorPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[packagestate.CollectorPackageName].ServerOfferedHash)
assert.Equal(t, "", status.Packages[packagestate.CollectorPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_Installed, status.Packages[packagestate.CollectorPackageName].Status)
})

c := &Client{
Expand Down Expand Up @@ -672,8 +673,8 @@ func TestClient_onConnectFailedHandler(t *testing.T) {
newVersion := "99.99.99"
errorMessage := "problem"
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand Down Expand Up @@ -706,8 +707,8 @@ func TestClient_onConnectFailedHandler(t *testing.T) {
newHash := []byte("newHash")
newVersion := "99.99.99"
statuses := map[string]*protobufs.PackageStatus{
mainPackageName: {
Name: mainPackageName,
packagestate.CollectorPackageName: {
Name: packagestate.CollectorPackageName,
AgentHasVersion: version.Version(),
AgentHasHash: hash,
ServerOfferedVersion: newVersion,
Expand All @@ -729,13 +730,13 @@ func TestClient_onConnectFailedHandler(t *testing.T) {
assert.Equal(t, "", status.ErrorMessage)
assert.Equal(t, allHash, status.ServerProvidedAllPackagesHash)
assert.Equal(t, 1, len(status.Packages))
assert.Equal(t, mainPackageName, status.Packages[mainPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[mainPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[mainPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[mainPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[mainPackageName].ServerOfferedHash)
assert.Equal(t, fmt.Sprintf("Failed to connect to BindPlane: %s", expectedErr), status.Packages[mainPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[mainPackageName].Status)
assert.Equal(t, packagestate.CollectorPackageName, status.Packages[packagestate.CollectorPackageName].Name)
assert.Equal(t, version.Version(), status.Packages[packagestate.CollectorPackageName].AgentHasVersion)
assert.Equal(t, hash, status.Packages[packagestate.CollectorPackageName].AgentHasHash)
assert.Equal(t, newVersion, status.Packages[packagestate.CollectorPackageName].ServerOfferedVersion)
assert.Equal(t, newHash, status.Packages[packagestate.CollectorPackageName].ServerOfferedHash)
assert.Equal(t, fmt.Sprintf("Failed to connect to BindPlane: %s", expectedErr), status.Packages[packagestate.CollectorPackageName].ErrorMessage)
assert.Equal(t, protobufs.PackageStatus_InstallFailed, status.Packages[packagestate.CollectorPackageName].Status)
})

c := &Client{
Expand Down Expand Up @@ -931,7 +932,7 @@ func TestClient_onRemoteConfigHandler(t *testing.T) {
}

func TestClient_onPackagesAvailableHandler(t *testing.T) {
collectorPackageName := mainPackageName
collectorPackageName := packagestate.CollectorPackageName
allHash := []byte("totalhash0")
newAllHash := []byte("totalhash1")
packageHash := []byte("hash0")
Expand Down
Loading

0 comments on commit 49f7831

Please sign in to comment.