diff --git a/client/clientimpl_test.go b/client/clientimpl_test.go index 5beafeaf..226b0e66 100644 --- a/client/clientimpl_test.go +++ b/client/clientimpl_test.go @@ -332,6 +332,7 @@ func TestFirstStatusReport(t *testing.T) { atomic.AddInt64(&remoteConfigReceived, 1) }, }, + Capabilities: protobufs.AgentCapabilities_AcceptsRemoteConfig, } settings.OpAMPServerURL = "ws://" + srv.Endpoint startClient(t, settings, client) @@ -430,6 +431,7 @@ func TestSetEffectiveConfig(t *testing.T) { return sendConfig, nil }, }, + Capabilities: protobufs.AgentCapabilities_ReportsEffectiveConfig, } settings.OpAMPServerURL = "ws://" + srv.Endpoint prepareClient(t, &settings, client) @@ -655,6 +657,11 @@ func TestConnectionSettings(t *testing.T) { return nil }, }, + Capabilities: protobufs.AgentCapabilities_ReportsOwnTraces | + protobufs.AgentCapabilities_ReportsOwnMetrics | + protobufs.AgentCapabilities_ReportsOwnLogs | + protobufs.AgentCapabilities_AcceptsOtherConnectionSettings | + protobufs.AgentCapabilities_AcceptsOpAMPConnectionSettings, } settings.OpAMPServerURL = "ws://" + srv.Endpoint prepareClient(t, &settings, client) @@ -685,6 +692,7 @@ func TestReportAgentDescription(t *testing.T) { // Start a client. settings := types.StartSettings{ OpAMPServerURL: "ws://" + srv.Endpoint, + Capabilities: protobufs.AgentCapabilities_ReportsEffectiveConfig, } prepareClient(t, &settings, client) @@ -747,6 +755,7 @@ func TestReportAgentHealth(t *testing.T) { // Start a client. settings := types.StartSettings{ OpAMPServerURL: "ws://" + srv.Endpoint, + Capabilities: protobufs.AgentCapabilities_ReportsEffectiveConfig | protobufs.AgentCapabilities_ReportsHealth, } prepareClient(t, &settings, client) @@ -906,6 +915,7 @@ func verifyRemoteConfigUpdate(t *testing.T, successCase bool, expectStatus *prot } }, }, + Capabilities: protobufs.AgentCapabilities_AcceptsRemoteConfig, } prepareClient(t, &settings, client) @@ -1056,6 +1066,7 @@ func verifyUpdatePackages(t *testing.T, testCase packageTestCase) { OnMessageFunc: onMessageFunc, }, PackagesStateProvider: localPackageState, + Capabilities: protobufs.AgentCapabilities_AcceptsPackages | protobufs.AgentCapabilities_ReportsPackageStatuses, } prepareClient(t, &settings, client) @@ -1257,3 +1268,112 @@ func TestUpdatePackages(t *testing.T) { }) } } + +func TestMissingCapabilities(t *testing.T) { + testClients(t, func(t *testing.T, client OpAMPClient) { + // Start a server. + srv := internal.StartMockServer(t) + srv.EnableExpectMode() + + // Start a client. + settings := types.StartSettings{ + Callbacks: types.CallbacksStruct{ + OnMessageFunc: func(ctx context.Context, msg *types.MessageData) { + // These fields must not be set since we did not define the capabilities to accept them. + assert.Nil(t, msg.RemoteConfig) + assert.Nil(t, msg.OwnLogsConnSettings) + assert.Nil(t, msg.OwnMetricsConnSettings) + assert.Nil(t, msg.OwnTracesConnSettings) + assert.Nil(t, msg.OtherConnSettings) + assert.Nil(t, msg.PackagesAvailable) + }, + OnOpampConnectionSettingsFunc: func( + ctx context.Context, settings *protobufs.OpAMPConnectionSettings, + ) error { + assert.Fail(t, "should not be called since capability is not set to accept it") + return nil + }, + }, + } + settings.OpAMPServerURL = "ws://" + srv.Endpoint + prepareClient(t, &settings, client) + + require.NoError(t, client.Start(context.Background(), settings)) + + // Change the config. + err := client.UpdateEffectiveConfig(context.Background()) + + assert.ErrorIs(t, err, internal.ErrReportsEffectiveConfigNotSet) + + remoteCfg := createRemoteConfig() + // ---> Server + srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent { + opampSettings := &protobufs.OpAMPConnectionSettings{DestinationEndpoint: "http://opamp.com"} + metricsSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://metrics.com"} + tracesSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://traces.com"} + logsSettings := &protobufs.TelemetryConnectionSettings{DestinationEndpoint: "http://logs.com"} + otherSettings := &protobufs.OtherConnectionSettings{DestinationEndpoint: "http://other.com"} + hash := []byte{1, 2, 3} + + return &protobufs.ServerToAgent{ + InstanceUid: msg.InstanceUid, + RemoteConfig: remoteCfg, + ConnectionSettings: &protobufs.ConnectionSettingsOffers{ + Hash: hash, + Opamp: opampSettings, + OwnMetrics: metricsSettings, + OwnTraces: tracesSettings, + OwnLogs: logsSettings, + OtherConnections: map[string]*protobufs.OtherConnectionSettings{ + "other": otherSettings, + }, + }, + PackagesAvailable: &protobufs.PackagesAvailable{ + Packages: map[string]*protobufs.PackageAvailable{}, + AllPackagesHash: []byte{1, 2, 3, 4, 5}, + }, + } + }) + + // Shutdown the Server. + srv.Close() + + // Shutdown the client. + err = client.Stop(context.Background()) + assert.NoError(t, err) + }) +} + +func TestMissingPackagesStateProvider(t *testing.T) { + testClients(t, func(t *testing.T, client OpAMPClient) { + // Start a client. + settings := types.StartSettings{ + Callbacks: types.CallbacksStruct{}, + Capabilities: protobufs.AgentCapabilities_AcceptsPackages | protobufs.AgentCapabilities_ReportsPackageStatuses, + } + prepareClient(t, &settings, client) + + assert.ErrorIs(t, client.Start(context.Background(), settings), internal.ErrPackagesStateProviderNotSet) + + // Start a client. + localPackageState := internal.NewInMemPackagesStore() + settings = types.StartSettings{ + Callbacks: types.CallbacksStruct{}, + PackagesStateProvider: localPackageState, + Capabilities: protobufs.AgentCapabilities_AcceptsPackages, + } + prepareClient(t, &settings, client) + + assert.ErrorIs(t, client.Start(context.Background(), settings), internal.ErrAcceptsPackagesNotSet) + + // Start a client. + settings = types.StartSettings{ + Callbacks: types.CallbacksStruct{}, + PackagesStateProvider: localPackageState, + Capabilities: protobufs.AgentCapabilities_ReportsPackageStatuses, + } + prepareClient(t, &settings, client) + + assert.ErrorIs(t, client.Start(context.Background(), settings), internal.ErrAcceptsPackagesNotSet) + }) +} diff --git a/client/httpclient.go b/client/httpclient.go index 96f89224..63244b18 100644 --- a/client/httpclient.go +++ b/client/httpclient.go @@ -94,5 +94,6 @@ func (c *httpClient) runUntilStopped(ctx context.Context) { c.common.Callbacks, &c.common.ClientSyncedState, c.common.PackagesStateProvider, + c.common.Capabilities, ) } diff --git a/client/internal/clientcommon.go b/client/internal/clientcommon.go index 313e2326..16e2024a 100644 --- a/client/internal/clientcommon.go +++ b/client/internal/clientcommon.go @@ -15,9 +15,13 @@ var ( ErrAgentDescriptionMissing = errors.New("AgentDescription is nil") ErrAgentDescriptionNoAttributes = errors.New("AgentDescription has no attributes defined") ErrAgentHealthMissing = errors.New("AgentHealth is nil") + ErrReportsEffectiveConfigNotSet = errors.New("ReportsEffectiveConfig capability is not set") + ErrPackagesStateProviderNotSet = errors.New("PackagesStateProvider must be set") + ErrAcceptsPackagesNotSet = errors.New("AcceptsPackages and ReportsPackageStatuses must be set") - errAlreadyStarted = errors.New("already started") - errCannotStopNotStarted = errors.New("cannot stop because not started") + errAlreadyStarted = errors.New("already started") + errCannotStopNotStarted = errors.New("cannot stop because not started") + errReportsPackageStatusesNotSet = errors.New("ReportsPackageStatuses capability is not set") ) // ClientCommon contains the OpAMP logic that is common between WebSocket and @@ -26,6 +30,9 @@ type ClientCommon struct { Logger types.Logger Callbacks types.Callbacks + // Agent's capabilities defined at Start() time. + Capabilities protobufs.AgentCapabilities + // Client state storage. This is needed if the Server asks to report the state. ClientSyncedState ClientSyncedState @@ -62,10 +69,19 @@ func (c *ClientCommon) PrepareStart( return errAlreadyStarted } + c.Capabilities = settings.Capabilities + + // According to OpAMP spec this capability MUST be set, since all Agents MUST report status. + c.Capabilities |= protobufs.AgentCapabilities_ReportsStatus + if c.ClientSyncedState.AgentDescription() == nil { return ErrAgentDescriptionMissing } + if c.Capabilities&protobufs.AgentCapabilities_ReportsHealth != 0 && c.ClientSyncedState.Health() == nil { + return ErrAgentHealthMissing + } + // Prepare remote config status. if settings.RemoteConfigStatus == nil { // RemoteConfigStatus is not provided. Start with empty. @@ -82,12 +98,22 @@ func (c *ClientCommon) PrepareStart( c.PackagesStateProvider = settings.PackagesStateProvider var packageStatuses *protobufs.PackageStatuses if settings.PackagesStateProvider != nil { + if c.Capabilities&protobufs.AgentCapabilities_AcceptsPackages == 0 || + c.Capabilities&protobufs.AgentCapabilities_ReportsPackageStatuses == 0 { + return ErrAcceptsPackagesNotSet + } + // Set package status from the value previously saved in the PackagesStateProvider. var err error packageStatuses, err = settings.PackagesStateProvider.LastReportedStatuses() if err != nil { return err } + } else { + if c.Capabilities&protobufs.AgentCapabilities_AcceptsPackages != 0 || + c.Capabilities&protobufs.AgentCapabilities_ReportsPackageStatuses != 0 { + return ErrPackagesStateProviderNotSet + } } if packageStatuses == nil { @@ -234,6 +260,10 @@ func (c *ClientCommon) SetHealth(health *protobufs.AgentHealth) error { // UpdateEffectiveConfig fetches the current local effective config using // GetEffectiveConfig callback and sends it to the Server using provided Sender. func (c *ClientCommon) UpdateEffectiveConfig(ctx context.Context) error { + if c.Capabilities&protobufs.AgentCapabilities_ReportsEffectiveConfig == 0 { + return ErrReportsEffectiveConfigNotSet + } + // Fetch the locally stored config. cfg, err := c.Callbacks.GetEffectiveConfig(ctx) if err != nil { @@ -284,6 +314,10 @@ func (c *ClientCommon) SetRemoteConfigStatus(status *protobufs.RemoteConfigStatu } func (c *ClientCommon) SetPackageStatuses(statuses *protobufs.PackageStatuses) error { + if c.Capabilities&protobufs.AgentCapabilities_ReportsPackageStatuses == 0 { + return errReportsPackageStatusesNotSet + } + if statuses.ServerProvidedAllPackagesHash == nil { return errServerProvidedAllPackagesHashNil } diff --git a/client/internal/httpsender.go b/client/internal/httpsender.go index 4db1c244..5b580a0d 100644 --- a/client/internal/httpsender.go +++ b/client/internal/httpsender.go @@ -65,10 +65,11 @@ func (h *HTTPSender) Run( callbacks types.Callbacks, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, + capabilities protobufs.AgentCapabilities, ) { h.url = url h.callbacks = callbacks - h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider) + h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities) for { pollingTimer := time.NewTimer(time.Millisecond * time.Duration(atomic.LoadInt64(&h.pollingIntervalMs))) diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index 15823d4f..f8d6184c 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -23,6 +23,9 @@ type receivedProcessor struct { clientSyncedState *ClientSyncedState packagesStateProvider types.PackagesStateProvider + + // Agent's capabilities defined at Start() time. + capabilities protobufs.AgentCapabilities } func newReceivedProcessor( @@ -31,6 +34,7 @@ func newReceivedProcessor( sender Sender, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, + capabilities protobufs.AgentCapabilities, ) receivedProcessor { return receivedProcessor{ logger: logger, @@ -38,6 +42,7 @@ func newReceivedProcessor( sender: sender, clientSyncedState: clientSyncedState, packagesStateProvider: packagesStateProvider, + capabilities: capabilities, } } @@ -57,26 +62,63 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro r.logger.Errorf("cannot processed received flags:%v", err) } - msgData := &types.MessageData{ - RemoteConfig: msg.RemoteConfig, + msgData := &types.MessageData{} + + if msg.RemoteConfig != nil { + if r.hasCapability(protobufs.AgentCapabilities_AcceptsRemoteConfig) { + msgData.RemoteConfig = msg.RemoteConfig + } else { + r.logger.Debugf("Ignoring RemoteConfig, agent does not have AcceptsRemoteConfig capability") + } } if msg.ConnectionSettings != nil { - msgData.OwnMetricsConnSettings = msg.ConnectionSettings.OwnMetrics - msgData.OwnTracesConnSettings = msg.ConnectionSettings.OwnTraces - msgData.OwnLogsConnSettings = msg.ConnectionSettings.OwnLogs - msgData.OtherConnSettings = msg.ConnectionSettings.OtherConnections + if msg.ConnectionSettings.OwnMetrics != nil { + if r.hasCapability(protobufs.AgentCapabilities_ReportsOwnMetrics) { + msgData.OwnMetricsConnSettings = msg.ConnectionSettings.OwnMetrics + } else { + r.logger.Debugf("Ignoring OwnMetrics, agent does not have ReportsOwnMetrics capability") + } + } + + if msg.ConnectionSettings.OwnTraces != nil { + if r.hasCapability(protobufs.AgentCapabilities_ReportsOwnTraces) { + msgData.OwnTracesConnSettings = msg.ConnectionSettings.OwnTraces + } else { + r.logger.Debugf("Ignoring OwnTraces, agent does not have ReportsOwnTraces capability") + } + } + + if msg.ConnectionSettings.OwnLogs != nil { + if r.hasCapability(protobufs.AgentCapabilities_ReportsOwnLogs) { + msgData.OwnLogsConnSettings = msg.ConnectionSettings.OwnLogs + } else { + r.logger.Debugf("Ignoring OwnLogs, agent does not have ReportsOwnLogs capability") + } + } + + if msg.ConnectionSettings.OtherConnections != nil { + if r.hasCapability(protobufs.AgentCapabilities_AcceptsOtherConnectionSettings) { + msgData.OtherConnSettings = msg.ConnectionSettings.OtherConnections + } else { + r.logger.Debugf("Ignoring OtherConnections, agent does not have AcceptsOtherConnectionSettings capability") + } + } } if msg.PackagesAvailable != nil { - msgData.PackagesAvailable = msg.PackagesAvailable - msgData.PackageSyncer = NewPackagesSyncer( - r.logger, - msgData.PackagesAvailable, - r.sender, - r.clientSyncedState, - r.packagesStateProvider, - ) + if r.hasCapability(protobufs.AgentCapabilities_AcceptsPackages) { + msgData.PackagesAvailable = msg.PackagesAvailable + msgData.PackageSyncer = NewPackagesSyncer( + r.logger, + msgData.PackagesAvailable, + r.sender, + r.clientSyncedState, + r.packagesStateProvider, + ) + } else { + r.logger.Debugf("Ignoring PackagesAvailable, agent does not have AcceptsPackages capability") + } } if msg.AgentIdentification != nil { @@ -101,6 +143,10 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro } } +func (r *receivedProcessor) hasCapability(capability protobufs.AgentCapabilities) bool { + return r.capabilities&capability != 0 +} + func (r *receivedProcessor) rcvFlags( ctx context.Context, flags protobufs.ServerToAgent_Flags, @@ -139,10 +185,14 @@ func (r *receivedProcessor) rcvOpampConnectionSettings(ctx context.Context, sett return } - err := r.callbacks.OnOpampConnectionSettings(ctx, settings.Opamp) - if err == nil { - // TODO: verify connection using new settings. - r.callbacks.OnOpampConnectionSettingsAccepted(settings.Opamp) + if r.hasCapability(protobufs.AgentCapabilities_AcceptsOpAMPConnectionSettings) { + err := r.callbacks.OnOpampConnectionSettings(ctx, settings.Opamp) + if err == nil { + // TODO: verify connection using new settings. + r.callbacks.OnOpampConnectionSettingsAccepted(settings.Opamp) + } + } else { + r.logger.Debugf("Ignoring Opamp, agent does not have AcceptsOpAMPConnectionSettings capability") } } diff --git a/client/internal/wsreceiver.go b/client/internal/wsreceiver.go index 08bed682..a9c71cba 100644 --- a/client/internal/wsreceiver.go +++ b/client/internal/wsreceiver.go @@ -27,13 +27,14 @@ func NewWSReceiver( sender *WSSender, clientSyncedState *ClientSyncedState, packagesStateProvider types.PackagesStateProvider, + capabilities protobufs.AgentCapabilities, ) *wsReceiver { w := &wsReceiver{ conn: conn, logger: logger, sender: sender, callbacks: callbacks, - processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider), + processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities), } return w diff --git a/client/internal/wsreceiver_test.go b/client/internal/wsreceiver_test.go index c7674696..4319aad0 100644 --- a/client/internal/wsreceiver_test.go +++ b/client/internal/wsreceiver_test.go @@ -74,7 +74,7 @@ func TestServerToAgentCommand(t *testing.T) { remoteConfigStatus: &protobufs.RemoteConfigStatus{}, } sender := WSSender{} - receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil) + receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, 0) receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{ Command: test.command, }) @@ -97,7 +97,7 @@ func TestServerToAgentCommandExclusive(t *testing.T) { }, } clientSyncedState := ClientSyncedState{} - receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil) + receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, 0) receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{ Command: &protobufs.ServerToAgentCommand{ Type: protobufs.ServerToAgentCommand_Restart, diff --git a/client/types/startsettings.go b/client/types/startsettings.go index 83f82c91..8a2e4c20 100644 --- a/client/types/startsettings.go +++ b/client/types/startsettings.go @@ -38,4 +38,8 @@ type StartSettings struct { // If nil then ReportsPackageStatuses and AcceptsPackages capabilities will be disabled, // i.e. package status reporting and syncing from the Server will be disabled. PackagesStateProvider PackagesStateProvider + + // Defines the capabilities of the Agent. AgentCapabilities_ReportsStatus bit does not need to + // be set in this field, it will be set automatically since it is required by OpAMP protocol. + Capabilities protobufs.AgentCapabilities } diff --git a/client/wsclient.go b/client/wsclient.go index 551a13eb..baeb8e15 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -235,6 +235,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { c.sender, &c.common.ClientSyncedState, c.common.PackagesStateProvider, + c.common.Capabilities, ) r.ReceiverLoop(ctx) diff --git a/internal/examples/agent/agent/agent.go b/internal/examples/agent/agent/agent.go index a8f79a5b..fbfc2741 100644 --- a/internal/examples/agent/agent/agent.go +++ b/internal/examples/agent/agent/agent.go @@ -103,6 +103,9 @@ func (agent *Agent) start() error { OnMessageFunc: agent.onMessage, }, RemoteConfigStatus: agent.remoteConfigStatus, + Capabilities: protobufs.AgentCapabilities_AcceptsRemoteConfig | + protobufs.AgentCapabilities_ReportsEffectiveConfig | + protobufs.AgentCapabilities_ReportsOwnMetrics, } err := agent.opampClient.SetAgentDescription(agent.agentDescription) if err != nil { diff --git a/internal/examples/supervisor/supervisor/supervisor.go b/internal/examples/supervisor/supervisor/supervisor.go index 8eecce76..161e1a48 100644 --- a/internal/examples/supervisor/supervisor/supervisor.go +++ b/internal/examples/supervisor/supervisor/supervisor.go @@ -151,12 +151,21 @@ func (s *Supervisor) startOpAMP() error { }, OnMessageFunc: s.onMessage, }, + Capabilities: protobufs.AgentCapabilities_AcceptsRemoteConfig | + protobufs.AgentCapabilities_ReportsEffectiveConfig | + protobufs.AgentCapabilities_ReportsOwnMetrics | + protobufs.AgentCapabilities_ReportsHealth, } err := s.opampClient.SetAgentDescription(s.createAgentDescription()) if err != nil { return err } + err = s.opampClient.SetHealth(&protobufs.AgentHealth{Up: false}) + if err != nil { + return err + } + s.logger.Debugf("Starting OpAMP client...") err = s.opampClient.Start(context.Background(), settings)