Skip to content

Commit

Permalink
Add ability to specify Client capabilities (#106)
Browse files Browse the repository at this point in the history
Resolves #98
  • Loading branch information
tigrannajaryan authored Jul 22, 2022
1 parent b493e26 commit 63c9165
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 24 deletions.
120 changes: 120 additions & 0 deletions client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func TestFirstStatusReport(t *testing.T) {
atomic.AddInt64(&remoteConfigReceived, 1)
},
},
Capabilities: protobufs.AgentCapabilities_AcceptsRemoteConfig,
}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
startClient(t, settings, client)
Expand Down Expand Up @@ -430,6 +431,7 @@ func TestSetEffectiveConfig(t *testing.T) {
return sendConfig, nil
},
},
Capabilities: protobufs.AgentCapabilities_ReportsEffectiveConfig,
}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)
Expand Down Expand Up @@ -655,6 +657,11 @@ func TestConnectionSettings(t *testing.T) {
return nil
},
},
Capabilities: protobufs.AgentCapabilities_ReportsOwnTraces |
protobufs.AgentCapabilities_ReportsOwnMetrics |
protobufs.AgentCapabilities_ReportsOwnLogs |
protobufs.AgentCapabilities_AcceptsOtherConnectionSettings |
protobufs.AgentCapabilities_AcceptsOpAMPConnectionSettings,
}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)
Expand Down Expand Up @@ -685,6 +692,7 @@ func TestReportAgentDescription(t *testing.T) {
// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Capabilities: protobufs.AgentCapabilities_ReportsEffectiveConfig,
}
prepareClient(t, &settings, client)

Expand Down Expand Up @@ -747,6 +755,7 @@ func TestReportAgentHealth(t *testing.T) {
// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Capabilities: protobufs.AgentCapabilities_ReportsEffectiveConfig | protobufs.AgentCapabilities_ReportsHealth,
}
prepareClient(t, &settings, client)

Expand Down Expand Up @@ -906,6 +915,7 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot
}
},
},
Capabilities: protobufs.AgentCapabilities_AcceptsRemoteConfig,
}
prepareClient(t, &settings, client)

Expand Down Expand Up @@ -1056,6 +1066,7 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
OnMessageFunc: onMessageFunc,
},
PackagesStateProvider: localPackageState,
Capabilities: protobufs.AgentCapabilities_AcceptsPackages | protobufs.AgentCapabilities_ReportsPackageStatuses,
}
prepareClient(t, &settings, client)

Expand Down Expand Up @@ -1257,3 +1268,112 @@ func TestUpdatePackages(t *testing.T) {
})
}
}

func TestMissingCapabilities(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
// Start a server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()

// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
// These fields must not be set since we did not define the capabilities to accept them.
assert.Nil(t, msg.RemoteConfig)
assert.Nil(t, msg.OwnLogsConnSettings)
assert.Nil(t, msg.OwnMetricsConnSettings)
assert.Nil(t, msg.OwnTracesConnSettings)
assert.Nil(t, msg.OtherConnSettings)
assert.Nil(t, msg.PackagesAvailable)
},
OnOpampConnectionSettingsFunc: func(
ctx context.Context, settings *protobufs.OpAMPConnectionSettings,
) error {
assert.Fail(t, "should not be called since capability is not set to accept it")
return nil
},
},
}
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)

require.NoError(t, client.Start(context.Background(), settings))

// Change the config.
err := client.UpdateEffectiveConfig(context.Background())

assert.ErrorIs(t, err, internal.ErrReportsEffectiveConfigNotSet)

remoteCfg := createRemoteConfig()
// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"}
metricsSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://metrics.com"}
tracesSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://traces.com"}
logsSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://logs.com"}
otherSettings := &protobufs.OtherConnectionSettings{DestinationEndpoint: "http://other.com"}
hash := []byte{1, 2, 3}

return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
RemoteConfig: remoteCfg,
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Hash: hash,
Opamp: opampSettings,
OwnMetrics: metricsSettings,
OwnTraces: tracesSettings,
OwnLogs: logsSettings,
OtherConnections: map[string]*protobufs.OtherConnectionSettings{
"other": otherSettings,
},
},
PackagesAvailable: &protobufs.PackagesAvailable{
Packages: map[string]*protobufs.PackageAvailable{},
AllPackagesHash: []byte{1, 2, 3, 4, 5},
},
}
})

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err = client.Stop(context.Background())
assert.NoError(t, err)
})
}

func TestMissingPackagesStateProvider(t *testing.T) {
testClients(t, func(t *testing.T, client OpAMPClient) {
// Start a client.
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{},
Capabilities: protobufs.AgentCapabilities_AcceptsPackages | protobufs.AgentCapabilities_ReportsPackageStatuses,
}
prepareClient(t, &settings, client)

assert.ErrorIs(t, client.Start(context.Background(), settings), internal.ErrPackagesStateProviderNotSet)

// Start a client.
localPackageState := internal.NewInMemPackagesStore()
settings = types.StartSettings{
Callbacks: types.CallbacksStruct{},
PackagesStateProvider: localPackageState,
Capabilities: protobufs.AgentCapabilities_AcceptsPackages,
}
prepareClient(t, &settings, client)

assert.ErrorIs(t, client.Start(context.Background(), settings), internal.ErrAcceptsPackagesNotSet)

// Start a client.
settings = types.StartSettings{
Callbacks: types.CallbacksStruct{},
PackagesStateProvider: localPackageState,
Capabilities: protobufs.AgentCapabilities_ReportsPackageStatuses,
}
prepareClient(t, &settings, client)

assert.ErrorIs(t, client.Start(context.Background(), settings), internal.ErrAcceptsPackagesNotSet)
})
}
1 change: 1 addition & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,6 @@ func (c *httpClient) runUntilStopped(ctx context.Context) {
c.common.Callbacks,
&c.common.ClientSyncedState,
c.common.PackagesStateProvider,
c.common.Capabilities,
)
}
38 changes: 36 additions & 2 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ var (
ErrAgentDescriptionMissing = errors.New("AgentDescription is nil")
ErrAgentDescriptionNoAttributes = errors.New("AgentDescription has no attributes defined")
ErrAgentHealthMissing = errors.New("AgentHealth is nil")
ErrReportsEffectiveConfigNotSet = errors.New("ReportsEffectiveConfig capability is not set")
ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set")
ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set")
)

// ClientCommon contains the OpAMP logic that is common between WebSocket and
Expand All @@ -26,6 +30,9 @@ type ClientCommon struct {
Logger types.Logger
Callbacks types.Callbacks

// Agent's capabilities defined at Start() time.
Capabilities protobufs.AgentCapabilities

// Client state storage. This is needed if the Server asks to report the state.
ClientSyncedState ClientSyncedState

Expand Down Expand Up @@ -62,10 +69,19 @@ func (c *ClientCommon) PrepareStart(
return errAlreadyStarted
}

c.Capabilities = settings.Capabilities

// According to OpAMP spec this capability MUST be set, since all Agents MUST report status.
c.Capabilities |= protobufs.AgentCapabilities_ReportsStatus

if c.ClientSyncedState.AgentDescription() == nil {
return ErrAgentDescriptionMissing
}

if c.Capabilities&protobufs.AgentCapabilities_ReportsHealth != 0 && c.ClientSyncedState.Health() == nil {
return ErrAgentHealthMissing
}

// Prepare remote config status.
if settings.RemoteConfigStatus == nil {
// RemoteConfigStatus is not provided. Start with empty.
Expand All @@ -82,12 +98,22 @@ func (c *ClientCommon) PrepareStart(
c.PackagesStateProvider = settings.PackagesStateProvider
var packageStatuses *protobufs.PackageStatuses
if settings.PackagesStateProvider != nil {
if c.Capabilities&protobufs.AgentCapabilities_AcceptsPackages == 0 ||
c.Capabilities&protobufs.AgentCapabilities_ReportsPackageStatuses == 0 {
return ErrAcceptsPackagesNotSet
}

// Set package status from the value previously saved in the PackagesStateProvider.
var err error
packageStatuses, err = settings.PackagesStateProvider.LastReportedStatuses()
if err != nil {
return err
}
} else {
if c.Capabilities&protobufs.AgentCapabilities_AcceptsPackages != 0 ||
c.Capabilities&protobufs.AgentCapabilities_ReportsPackageStatuses != 0 {
return ErrPackagesStateProviderNotSet
}
}

if packageStatuses == nil {
Expand Down Expand Up @@ -234,6 +260,10 @@ func (c *ClientCommon) SetHealth(health *protobufs.AgentHealth) error {
// UpdateEffectiveConfig fetches the current local effective config using
// GetEffectiveConfig callback and sends it to the Server using provided Sender.
func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error {
if c.Capabilities&protobufs.AgentCapabilities_ReportsEffectiveConfig == 0 {
return ErrReportsEffectiveConfigNotSet
}

// Fetch the locally stored config.
cfg, err := c.Callbacks.GetEffectiveConfig(ctx)
if err != nil {
Expand Down Expand Up @@ -284,6 +314,10 @@ func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatu
}

func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) error {
if c.Capabilities&protobufs.AgentCapabilities_ReportsPackageStatuses == 0 {
return errReportsPackageStatusesNotSet
}

if statuses.ServerProvidedAllPackagesHash == nil {
return errServerProvidedAllPackagesHashNil
}
Expand Down
3 changes: 2 additions & 1 deletion client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ func (h *HTTPSender) Run(
callbacks types.Callbacks,
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
) {
h.url = url
h.callbacks = callbacks
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider)
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities)

for {
pollingTimer := time.NewTimer(time.Millisecond * time.Duration(atomic.LoadInt64(&h.pollingIntervalMs)))
Expand Down
Loading

0 comments on commit 63c9165

Please sign in to comment.