Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify status compression #93

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ type OpAMPClient interface {
// their AgentDescription to change dynamically while the OpAMPClient is started.
// May be also called from OnMessage handler.
//
// The Hash field will be calculated and updated from the content of the rest of
// the fields.
//
// nil values are not allowed and will return an error.
SetAgentDescription(descr *protobufs.AgentDescription) error

Expand Down
65 changes: 27 additions & 38 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func TestFirstStatusReport(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
RemoteConfig: remoteConfig,
Expand Down Expand Up @@ -353,8 +354,13 @@ func TestFirstStatusReport(t *testing.T) {
func TestIncludesDetailsOnReconnect(t *testing.T) {
srv := internal.StartMockServer(t)

seqNum := 0

var receivedDetails int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, seqNum, msg.SequenceNum)
seqNum++

// Track when we receive AgentDescription
if msg.AgentDescription != nil {
atomic.AddInt64(&receivedDetails, 1)
Expand Down Expand Up @@ -687,6 +693,7 @@ func TestReportAgentDescription(t *testing.T) {

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// The first status report after Start must have full AgentDescription.
assert.True(t, proto.Equal(client.AgentDescription(), msg.AgentDescription))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
Expand All @@ -699,25 +706,22 @@ func TestReportAgentDescription(t *testing.T) {
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The status report must have compressed AgentDescription.
descr := msg.AgentDescription
assert.Nil(t, descr.IdentifyingAttributes)
assert.Nil(t, descr.NonIdentifyingAttributes)
assert.Nil(t, msg.AgentDescription)

// The Hash field must be present and unchanged.
assert.NotNil(t, descr.Hash)
assert.EqualValues(t, client.AgentDescription().Hash, descr.Hash)
assert.EqualValues(t, 1, msg.SequenceNum)

// Ask client for full AgentDescription.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Flags: protobufs.ServerToAgent_ReportAgentDescription,
Flags: protobufs.ServerToAgent_ReportFullState,
}
})

// Server has requested the client to report, so there will be another message
// coming to the Server.
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 2, msg.SequenceNum)
// The status report must again have full AgentDescription
// because the Server asked for it.
assert.True(t, proto.Equal(client.AgentDescription(), msg.AgentDescription))
Expand Down Expand Up @@ -758,6 +762,7 @@ func TestReportEffectiveConfig(t *testing.T) {

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// The first status report after Start must have full EffectiveConfig.
assert.True(t, proto.Equal(clientEffectiveConfig, msg.EffectiveConfig))
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
Expand All @@ -770,23 +775,21 @@ func TestReportEffectiveConfig(t *testing.T) {
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// The status report must have compressed EffectiveConfig.
cfg := msg.EffectiveConfig
assert.Nil(t, cfg.ConfigMap)
assert.Nil(t, msg.EffectiveConfig)

// Hash must be present and unchanged.
assert.NotNil(t, cfg.Hash)
assert.EqualValues(t, clientEffectiveConfig.Hash, cfg.Hash)
assert.EqualValues(t, 1, msg.SequenceNum)

// Ask client for full AgentDescription.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Flags: protobufs.ServerToAgent_ReportEffectiveConfig,
Flags: protobufs.ServerToAgent_ReportFullState,
}
})

// Server has requested the client to report, so there will be another message.
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 2, msg.SequenceNum)
// The status report must again have full EffectiveConfig
// because Server asked for it.
assert.True(t, proto.Equal(clientEffectiveConfig, msg.EffectiveConfig))
Expand Down Expand Up @@ -841,6 +844,7 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot
remoteCfg := createRemoteConfig()
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// Send the remote config to the Agent.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Expand All @@ -855,12 +859,12 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 1, msg.SequenceNum)
// Verify that the remote config status is as expected.
status := msg.RemoteConfigStatus
assert.EqualValues(t, expectStatus.Status, status.Status)
assert.Equal(t, expectStatus.ErrorMessage, status.ErrorMessage)
assert.EqualValues(t, remoteCfg.ConfigHash, status.LastRemoteConfigHash)
assert.NotNil(t, status.Hash)

firstConfigStatus = proto.Clone(status).(*protobufs.RemoteConfigStatus)

Expand All @@ -873,24 +877,21 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// This time all fields except Hash must be unset. This is expected
// This time the RemoteConfigStatus field must be unset. This is expected
// as compression in OpAMP.
status := msg.RemoteConfigStatus
require.NotNil(t, status)
assert.EqualValues(t, firstConfigStatus.Hash, status.Hash)
assert.EqualValues(t, protobufs.RemoteConfigStatus_UNSET, status.Status)
assert.EqualValues(t, "", status.ErrorMessage)
assert.Nil(t, status.LastRemoteConfigHash)
require.Nil(t, msg.RemoteConfigStatus)
assert.EqualValues(t, 2, msg.SequenceNum)

return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
// Ask client to report full status.
Flags: protobufs.ServerToAgent_ReportRemoteConfigStatus,
Flags: protobufs.ServerToAgent_ReportFullState,
}
})

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 3, msg.SequenceNum)
// Exact same full status must be present again.
status := msg.RemoteConfigStatus
assert.True(t, proto.Equal(status, firstConfigStatus))
Expand Down Expand Up @@ -992,6 +993,7 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// Send the packages to the Agent.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
Expand All @@ -1002,8 +1004,6 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
// The Agent will try to install the packages and will send the status
// report about it back to the Server.

var lastStatusHash []byte

// ---> Server
// Wait for the expected package statuses to be received.
srv.EventuallyExpect("full PackageStatuses",
Expand All @@ -1013,7 +1013,6 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
status := msg.PackageStatuses
require.NotNil(t, status)
assert.EqualValues(t, testCase.expectedStatus.ServerProvidedAllPackagesHash, status.ServerProvidedAllPackagesHash)
lastStatusHash = status.Hash

if testCase.expectedError != "" {
assert.EqualValues(t, testCase.expectedError, status.ErrorMessage)
Expand Down Expand Up @@ -1046,7 +1045,6 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
assert.Len(t, status.Packages, len(testCase.available.Packages))
}
}
assert.NotNil(t, status.Hash)

return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}, expectedStatusReceived
})
Expand All @@ -1069,21 +1067,13 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
srv.EventuallyExpect("compressed PackageStatuses",
func(msg *protobufs.AgentToServer) (*protobufs.ServerToAgent, bool) {
// Ensure that compressed status is received.
status := msg.PackageStatuses
require.NotNil(t, status)
compressedReceived := status.ServerProvidedAllPackagesHash == nil
if compressedReceived {
assert.Nil(t, status.ServerProvidedAllPackagesHash)
assert.Nil(t, status.Packages)
}
assert.NotNil(t, status.Hash)
assert.Equal(t, lastStatusHash, status.Hash)
compressedReceived := msg.PackageStatuses == nil

response := &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}

if compressedReceived {
// Ask for full report again.
response.Flags = protobufs.ServerToAgent_ReportPackageStatuses
response.Flags = protobufs.ServerToAgent_ReportFullState
} else {
// Keep triggering status report by setting AgentDescription
// until the compressed PackageStatuses arrives.
Expand Down Expand Up @@ -1114,8 +1104,7 @@ func createDownloadSrv(t *testing.T) *httptest.Server {
w.WriteHeader(http.StatusOK)
_, err := w.Write(packageFileContent)
assert.NoError(t, err)
},
)
})

srv := httptest.NewServer(m)

Expand Down
1 change: 1 addition & 0 deletions client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestHTTPPolling(t *testing.T) {
srv := internal.StartMockServer(t)
var rcvCounter int64
srv.OnMessage = func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, rcvCounter, msg.SequenceNum)
if msg != nil {
atomic.AddInt64(&rcvCounter, 1)
}
Expand Down
53 changes: 6 additions & 47 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package internal

import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"sort"
"sync"

"github.com/open-telemetry/opamp-go/client/types"
Expand Down Expand Up @@ -175,9 +172,6 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
if err != nil {
return err
}
if cfg != nil {
calcHashEffectiveConfig(cfg)
}

c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
Expand Down Expand Up @@ -219,36 +213,6 @@ func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) er
return nil
}

// calcHashEffectiveConfig calculates and sets the Hash field from the rest of the
// fields in the message.
func calcHashEffectiveConfig(msg *protobufs.EffectiveConfig) {
cfgMap := msg.GetConfigMap().GetConfigMap()

// Construct hash
h := sha256.New()

// If the config is empty don't attemp to add more to the hash
if len(cfgMap) > 0 {
// Sort keys of configMap to make deterministic hash
keys := make([]string, 0, len(cfgMap))
for k := range cfgMap {
keys = append(keys, k)
}

sort.Strings(keys)

if msg.ConfigMap != nil {
for _, k := range keys {
v := cfgMap[k]
h.Write([]byte(k))
h.Write(v.Body)
h.Write([]byte(v.ContentType))
}
}
}
msg.Hash = h.Sum(nil)
}

// UpdateEffectiveConfig fetches the current local effective config using
// GetEffectiveConfig callback and sends it to the Server using provided Sender.
func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error {
Expand All @@ -257,9 +221,7 @@ func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error {
if err != nil {
return fmt.Errorf("GetEffectiveConfig failed: %w", err)
}
if cfg != nil {
calcHashEffectiveConfig(cfg)
}

// Send it to the Server.
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
Expand All @@ -281,16 +243,14 @@ func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatu
return errLastRemoteConfigHashNil
}

// Get the hash of the status before we update it.
prevHash := c.ClientSyncedState.RemoteConfigStatus().GetHash()
statusChanged := !proto.Equal(c.ClientSyncedState.RemoteConfigStatus(), status)

// Remember the new status.
if err := c.ClientSyncedState.SetRemoteConfigStatus(status); err != nil {
return err
}

// Check if the new status is different from the previous by comparing the hashes.
if !bytes.Equal(prevHash, status.Hash) {
if statusChanged {
// Let the Server know about the new status.
c.sender.NextMessage().Update(
func(msg *protobufs.AgentToServer) {
Expand All @@ -310,15 +270,14 @@ func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) e
return errServerProvidedAllPackagesHashNil
}

// Get the hash of the status before we update it.
prevHash := c.ClientSyncedState.PackageStatuses().GetHash()
statusChanged := !proto.Equal(c.ClientSyncedState.PackageStatuses(), statuses)

if err := c.ClientSyncedState.SetPackageStatuses(statuses); err != nil {
return err
}

// Check if the new status is different from the previous by comparing the hashes.
if !bytes.Equal(prevHash, statuses.Hash) {
// Check if the new status is different from the previous.
if statusChanged {
// Let the Server know about the new status.

c.sender.NextMessage().Update(
Expand Down
57 changes: 0 additions & 57 deletions client/internal/clientcommon_test.go

This file was deleted.

Loading