diff --git a/client/clientimpl.go b/client/clientimpl.go index 28cef3f6..1f60458d 100644 --- a/client/clientimpl.go +++ b/client/clientimpl.go @@ -101,14 +101,21 @@ func (w *client) Start(settings StartSettings) error { } // Prepare the first status report. - w.sender.UpdateStatus( + w.sender.UpdateNextStatus( func(statusReport *protobufs.StatusReport) { statusReport.AgentDescription = &protobufs.AgentDescription{ AgentType: w.settings.AgentType, AgentVersion: w.settings.AgentVersion, } + }, + ) - statusReport.ServerProvidedAllAddonsHash = w.settings.LastServerProvidedAllAddonsHash + w.sender.UpdateNextMessage( + func(msg *protobufs.AgentToServer) { + if msg.AddonStatuses == nil { + msg.AddonStatuses = &protobufs.AgentAddonStatuses{} + } + msg.AddonStatuses.ServerProvidedAllAddonsHash = w.settings.LastServerProvidedAllAddonsHash }, ) @@ -158,7 +165,7 @@ func (w *client) SetAgentAttributes(attrs map[string]protobufs.AnyValue) error { } func (w *client) SetEffectiveConfig(config *protobufs.EffectiveConfig) error { - w.sender.UpdateStatus(func(statusReport *protobufs.StatusReport) { + w.sender.UpdateNextStatus(func(statusReport *protobufs.StatusReport) { statusReport.EffectiveConfig = config }) w.sender.ScheduleSend() diff --git a/client/internal/receiver.go b/client/internal/receiver.go index a994e4e5..58decc53 100644 --- a/client/internal/receiver.go +++ b/client/internal/receiver.go @@ -80,7 +80,7 @@ func (r *Receiver) processReceivedMessage(ctx context.Context, msg *protobufs.Se func (r *Receiver) rcvRemoteConfig(ctx context.Context, config *protobufs.AgentRemoteConfig) (reportStatus bool) { effective, err := r.callbacks.OnRemoteConfig(ctx, config) if err == nil { - r.sender.UpdateStatus(func(statusReport *protobufs.StatusReport) { + r.sender.UpdateNextStatus(func(statusReport *protobufs.StatusReport) { statusReport.EffectiveConfig = effective }) if effective != nil { diff --git a/client/internal/sender.go b/client/internal/sender.go index ac5a1f47..243004b6 100644 --- a/client/internal/sender.go +++ b/client/internal/sender.go @@ -21,12 +21,12 @@ type Sender struct { // Indicates that there are pending messages to send. hasMessages chan struct{} - // The next status report to send. - statusReport protobufs.StatusReport - // Indicates that the status report is pending to be sent. - statusReportPending bool + // The next message to send. + nextMessage protobufs.AgentToServer + // Indicates that nextMessage is pending to be sent. + messagePending bool // Mutex to protect the above 2 fields. - statusReportMutex sync.Mutex + messageMutex sync.Mutex // Indicates that the sender has fully stopped. stopped chan struct{} @@ -39,12 +39,12 @@ func NewSender(logger types.Logger) *Sender { } } -// Start the sender and send the first status report that was set via UpdateStatus() +// Start the sender and send the first message that was set via UpdateNextMessage() // earlier. To stop the Sender cancel the ctx. func (s *Sender) Start(ctx context.Context, instanceUid string, conn *websocket.Conn) error { s.conn = conn s.instanceUid = instanceUid - err := s.sendStatusReport() + err := s.sendNextMessage() // Run the sender in the background. s.stopped = make(chan struct{}) @@ -59,18 +59,31 @@ func (s *Sender) WaitToStop() { <-s.stopped } -// UpdateStatus applies the specified modifier function to the status report that +// UpdateNextMessage applies the specified modifier function to the next message that +// will be sent and marks the message as pending to be sent. +func (s *Sender) UpdateNextMessage(modifier func(msg *protobufs.AgentToServer)) { + s.messageMutex.Lock() + modifier(&s.nextMessage) + s.messagePending = true + s.messageMutex.Unlock() +} + +// UpdateNextStatus applies the specified modifier function to the status report that // will be sent next and marks the status report as pending to be sent. -func (s *Sender) UpdateStatus(modifier func(statusReport *protobufs.StatusReport)) { - s.statusReportMutex.Lock() - modifier(&s.statusReport) - s.statusReportPending = true - s.statusReportMutex.Unlock() +func (s *Sender) UpdateNextStatus(modifier func(statusReport *protobufs.StatusReport)) { + s.UpdateNextMessage( + func(msg *protobufs.AgentToServer) { + if s.nextMessage.StatusReport == nil { + s.nextMessage.StatusReport = &protobufs.StatusReport{} + } + modifier(s.nextMessage.StatusReport) + }, + ) } -// ScheduleSend signals to the sending goroutine to send the current status report -// if it is pending. If there is no pending status report (e.g. status report was -// already sent and "pending" flag is reset) then status report will not be sent. +// ScheduleSend signals to the sending goroutine to send the next message +// if it is pending. If there is no pending message (e.g. the message was +// already sent and "pending" flag is reset) then no message will be be sent. func (s *Sender) ScheduleSend() { select { case s.hasMessages <- struct{}{}: @@ -84,7 +97,7 @@ out: for { select { case <-s.hasMessages: - s.sendMessages() + s.sendNextMessage() case <-ctx.Done(): break out @@ -94,35 +107,26 @@ out: close(s.stopped) } -func (s *Sender) sendMessages() { - s.sendStatusReport() -} - -func (s *Sender) sendStatusReport() error { - var statusReport *protobufs.StatusReport - s.statusReportMutex.Lock() - if s.statusReportPending { - // Clone the statusReport to have a copy for sending and avoid blocking - // future updates to statusReport field. - statusReport = proto.Clone(&s.statusReport).(*protobufs.StatusReport) - s.statusReportPending = false +func (s *Sender) sendNextMessage() error { + var msgToSend *protobufs.AgentToServer + s.messageMutex.Lock() + if s.messagePending { + // Clone the message to have a copy for sending and avoid blocking + // future updates to s.nextMessage field. + msgToSend = proto.Clone(&s.nextMessage).(*protobufs.AgentToServer) + s.messagePending = false // Reset fields that we do not have to send unless they change before the // next report after this one. - s.statusReport.RemoteConfigStatus = nil - s.statusReport.EffectiveConfig = nil - s.statusReport.AgentDescription = nil + s.nextMessage = protobufs.AgentToServer{} } - s.statusReportMutex.Unlock() - - if statusReport != nil { - msg := protobufs.AgentToServer{ - InstanceUid: s.instanceUid, - Body: &protobufs.AgentToServer_StatusReport{ - StatusReport: statusReport, - }, - } - return s.sendMessage(&msg) + s.messageMutex.Unlock() + + if msgToSend != nil && !proto.Equal(msgToSend, &protobufs.AgentToServer{}) { + // There is a pending message and the message has some fields populated. + // Set the InstanceUid field and send it. + msgToSend.InstanceUid = s.instanceUid + return s.sendMessage(msgToSend) } return nil } diff --git a/internal/proto/opamp.proto b/internal/proto/opamp.proto index 598a67de..68fb1d3c 100644 --- a/internal/proto/opamp.proto +++ b/internal/proto/opamp.proto @@ -28,34 +28,25 @@ message AgentToServer { // Recommended format: https://github.com/ulid/spec string instance_uid = 1; - // Sequential number of the message. - // - // (instance_uid, sequence_num) pair globally uniquely identifies every - // AgentToServer message and SHOULD be treated as an idempotency key by the - // Management Server. - // - // The Agent SHOULD start with sequence_num=0 and increment it by 2 for every new - // request (thus only even numbers should appear in the requests from Agent to Server). - // - // Requests which need to be retried (e.g. because the response timed out) MUST - // keep the same sequence_num. - //fixed64 sequence_num = 2; - - oneof Body { - StatusReport status_report = 2; - - // The list of the agent addons, including addon statuses. - // This field SHOULD be unset if this information is unchanged since the last - // StatusReport message for this agent was sent in the stream. - AgentAddonStatuses addon_statuses = 3; - - // The status of the installation operation that was previously offered by the server. - // This field SHOULD be unset if the installation status is unchanged since the - // last StatusReport message. - AgentInstallStatus agent_install_status = 4; - - AgentDisconnect agent_disconnect = 5; - } + // The status of the Agent. MUST be set in the first AgentToServer message that the + // Agent sends after connecting. + // This field SHOULD be unset if this information is unchanged since the last + // AgentToServer message for this agent was sent in the stream. + StatusReport status_report = 2; + + // The list of the agent addons, including addon statuses. + // This field SHOULD be unset if this information is unchanged since the last + // AgentToServer message for this agent was sent in the stream. + AgentAddonStatuses addon_statuses = 3; + + // The status of the installation operation that was previously offered by the server. + // This field SHOULD be unset if the installation status is unchanged since the + // last AgentToServer message. + AgentInstallStatus agent_install_status = 4; + + // AgentDisconnect MUST be set in the last AgentToServer message sent from the + // agent to the server. + AgentDisconnect agent_disconnect = 5; } // AgentDisconnect is the last message sent from the agent to the server. The server diff --git a/internal/protobufs/opamp.pb.go b/internal/protobufs/opamp.pb.go index 08cdbfde..591a4389 100644 --- a/internal/protobufs/opamp.pb.go +++ b/internal/protobufs/opamp.pb.go @@ -381,12 +381,22 @@ type AgentToServer struct { // unchanged for the lifetime of the agent process. // Recommended format: https://github.com/ulid/spec InstanceUid string `protobuf:"bytes,1,opt,name=instance_uid,json=instanceUid,proto3" json:"instance_uid,omitempty"` - // Types that are assignable to Body: - // *AgentToServer_StatusReport - // *AgentToServer_AddonStatuses - // *AgentToServer_AgentInstallStatus - // *AgentToServer_AgentDisconnect - Body isAgentToServer_Body `protobuf_oneof:"Body"` + // The status of the Agent. MUST be set in the first AgentToServer message that the + // Agent sends after connecting. + // This field SHOULD be unset if this information is unchanged since the last + // AgentToServer message for this agent was sent in the stream. + StatusReport *StatusReport `protobuf:"bytes,2,opt,name=status_report,json=statusReport,proto3" json:"status_report,omitempty"` + // The list of the agent addons, including addon statuses. + // This field SHOULD be unset if this information is unchanged since the last + // AgentToServer message for this agent was sent in the stream. + AddonStatuses *AgentAddonStatuses `protobuf:"bytes,3,opt,name=addon_statuses,json=addonStatuses,proto3" json:"addon_statuses,omitempty"` + // The status of the installation operation that was previously offered by the server. + // This field SHOULD be unset if the installation status is unchanged since the + // last AgentToServer message. + AgentInstallStatus *AgentInstallStatus `protobuf:"bytes,4,opt,name=agent_install_status,json=agentInstallStatus,proto3" json:"agent_install_status,omitempty"` + // AgentDisconnect MUST be set in the last AgentToServer message sent from the + // agent to the server. + AgentDisconnect *AgentDisconnect `protobuf:"bytes,5,opt,name=agent_disconnect,json=agentDisconnect,proto3" json:"agent_disconnect,omitempty"` } func (x *AgentToServer) Reset() { @@ -428,75 +438,34 @@ func (x *AgentToServer) GetInstanceUid() string { return "" } -func (m *AgentToServer) GetBody() isAgentToServer_Body { - if m != nil { - return m.Body - } - return nil -} - func (x *AgentToServer) GetStatusReport() *StatusReport { - if x, ok := x.GetBody().(*AgentToServer_StatusReport); ok { + if x != nil { return x.StatusReport } return nil } func (x *AgentToServer) GetAddonStatuses() *AgentAddonStatuses { - if x, ok := x.GetBody().(*AgentToServer_AddonStatuses); ok { + if x != nil { return x.AddonStatuses } return nil } func (x *AgentToServer) GetAgentInstallStatus() *AgentInstallStatus { - if x, ok := x.GetBody().(*AgentToServer_AgentInstallStatus); ok { + if x != nil { return x.AgentInstallStatus } return nil } func (x *AgentToServer) GetAgentDisconnect() *AgentDisconnect { - if x, ok := x.GetBody().(*AgentToServer_AgentDisconnect); ok { + if x != nil { return x.AgentDisconnect } return nil } -type isAgentToServer_Body interface { - isAgentToServer_Body() -} - -type AgentToServer_StatusReport struct { - StatusReport *StatusReport `protobuf:"bytes,2,opt,name=status_report,json=statusReport,proto3,oneof"` -} - -type AgentToServer_AddonStatuses struct { - // The list of the agent addons, including addon statuses. - // This field SHOULD be unset if this information is unchanged since the last - // StatusReport message for this agent was sent in the stream. - AddonStatuses *AgentAddonStatuses `protobuf:"bytes,3,opt,name=addon_statuses,json=addonStatuses,proto3,oneof"` -} - -type AgentToServer_AgentInstallStatus struct { - // The status of the installation operation that was previously offered by the server. - // This field SHOULD be unset if the installation status is unchanged since the - // last StatusReport message. - AgentInstallStatus *AgentInstallStatus `protobuf:"bytes,4,opt,name=agent_install_status,json=agentInstallStatus,proto3,oneof"` -} - -type AgentToServer_AgentDisconnect struct { - AgentDisconnect *AgentDisconnect `protobuf:"bytes,5,opt,name=agent_disconnect,json=agentDisconnect,proto3,oneof"` -} - -func (*AgentToServer_StatusReport) isAgentToServer_Body() {} - -func (*AgentToServer_AddonStatuses) isAgentToServer_Body() {} - -func (*AgentToServer_AgentInstallStatus) isAgentToServer_Body() {} - -func (*AgentToServer_AgentDisconnect) isAgentToServer_Body() {} - // AgentDisconnect is the last message sent from the agent to the server. The server // SHOULD forget the association of the agent instance with the message stream. // @@ -2217,30 +2186,29 @@ var File_opamp_proto protoreflect.FileDescriptor var file_opamp_proto_rawDesc = []byte{ 0x0a, 0x0b, 0x6f, 0x70, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x6f, 0x70, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x0e, 0x61, 0x6e, 0x79, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xe6, 0x02, 0x0a, 0x0d, 0x41, + 0x61, 0x6c, 0x75, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd6, 0x02, 0x0a, 0x0d, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x55, 0x69, 0x64, 0x12, - 0x40, 0x0a, 0x0d, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, + 0x3e, 0x0a, 0x0d, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x5f, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6f, 0x70, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x70, 0x6f, 0x72, - 0x74, 0x48, 0x00, 0x52, 0x0c, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x70, 0x6f, 0x72, - 0x74, 0x12, 0x48, 0x0a, 0x0e, 0x61, 0x64, 0x64, 0x6f, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x65, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6f, 0x70, 0x61, 0x6d, - 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x41, 0x64, 0x64, - 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x65, 0x73, 0x48, 0x00, 0x52, 0x0d, 0x61, 0x64, - 0x64, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x65, 0x73, 0x12, 0x53, 0x0a, 0x14, 0x61, - 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x5f, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6f, 0x70, 0x61, 0x6d, - 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x73, - 0x74, 0x61, 0x6c, 0x6c, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x00, 0x52, 0x12, 0x61, 0x67, - 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, - 0x12, 0x49, 0x0a, 0x10, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x5f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x6e, - 0x6e, 0x65, 0x63, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x6f, 0x70, 0x61, - 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x44, 0x69, - 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x48, 0x00, 0x52, 0x0f, 0x61, 0x67, 0x65, 0x6e, - 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x42, 0x06, 0x0a, 0x04, 0x42, - 0x6f, 0x64, 0x79, 0x22, 0x11, 0x0a, 0x0f, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, + 0x74, 0x52, 0x0c, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x12, + 0x46, 0x0a, 0x0e, 0x61, 0x64, 0x64, 0x6f, 0x6e, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x65, + 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6f, 0x70, 0x61, 0x6d, 0x70, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x41, 0x64, 0x64, 0x6f, 0x6e, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x65, 0x73, 0x52, 0x0d, 0x61, 0x64, 0x64, 0x6f, 0x6e, 0x53, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x65, 0x73, 0x12, 0x51, 0x0a, 0x14, 0x61, 0x67, 0x65, 0x6e, 0x74, + 0x5f, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6f, 0x70, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, + 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x12, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x73, + 0x74, 0x61, 0x6c, 0x6c, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x47, 0x0a, 0x10, 0x61, 0x67, + 0x65, 0x6e, 0x74, 0x5f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x6f, 0x70, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x52, 0x0f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, + 0x65, 0x63, 0x74, 0x22, 0x11, 0x0a, 0x0f, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x22, 0xc6, 0x04, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x6f, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6e, 0x63, 0x65, 0x5f, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, @@ -2928,12 +2896,6 @@ func file_opamp_proto_init() { } } } - file_opamp_proto_msgTypes[0].OneofWrappers = []interface{}{ - (*AgentToServer_StatusReport)(nil), - (*AgentToServer_AddonStatuses)(nil), - (*AgentToServer_AgentInstallStatus)(nil), - (*AgentToServer_AgentDisconnect)(nil), - } file_opamp_proto_msgTypes[11].OneofWrappers = []interface{}{ (*ServerErrorResponse_RetryInfo)(nil), }