Skip to content

Commit

Permalink
Remove oneof Body from AgentToServer message (#27)
Browse files Browse the repository at this point in the history
This eliminates oneof Body from AgentToServer and puts all fields in AgentToServer
which allows to include multiple messages in one AgentToServer. This can reduce
the number of roundtrips and makes it uniform with ServerToAgent.

Implements changes corresponding to spec changes open-telemetry/opamp-spec#44
  • Loading branch information
tigrannajaryan authored Dec 1, 2021
1 parent b58fc0e commit dd5c666
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 150 deletions.
13 changes: 10 additions & 3 deletions client/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,21 @@ func (w *client) Start(settings StartSettings) error {
}

// Prepare the first status report.
w.sender.UpdateStatus(
w.sender.UpdateNextStatus(
func(statusReport *protobufs.StatusReport) {
statusReport.AgentDescription = &protobufs.AgentDescription{
AgentType: w.settings.AgentType,
AgentVersion: w.settings.AgentVersion,
}
},
)

statusReport.ServerProvidedAllAddonsHash = w.settings.LastServerProvidedAllAddonsHash
w.sender.UpdateNextMessage(
func(msg *protobufs.AgentToServer) {
if msg.AddonStatuses == nil {
msg.AddonStatuses = &protobufs.AgentAddonStatuses{}
}
msg.AddonStatuses.ServerProvidedAllAddonsHash = w.settings.LastServerProvidedAllAddonsHash
},
)

Expand Down Expand Up @@ -158,7 +165,7 @@ func (w *client) SetAgentAttributes(attrs map[string]protobufs.AnyValue) error {
}

func (w *client) SetEffectiveConfig(config *protobufs.EffectiveConfig) error {
w.sender.UpdateStatus(func(statusReport *protobufs.StatusReport) {
w.sender.UpdateNextStatus(func(statusReport *protobufs.StatusReport) {
statusReport.EffectiveConfig = config
})
w.sender.ScheduleSend()
Expand Down
2 changes: 1 addition & 1 deletion client/internal/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (r *Receiver) processReceivedMessage(ctx context.Context, msg *protobufs.Se
func (r *Receiver) rcvRemoteConfig(ctx context.Context, config *protobufs.AgentRemoteConfig) (reportStatus bool) {
effective, err := r.callbacks.OnRemoteConfig(ctx, config)
if err == nil {
r.sender.UpdateStatus(func(statusReport *protobufs.StatusReport) {
r.sender.UpdateNextStatus(func(statusReport *protobufs.StatusReport) {
statusReport.EffectiveConfig = effective
})
if effective != nil {
Expand Down
88 changes: 46 additions & 42 deletions client/internal/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ type Sender struct {
// Indicates that there are pending messages to send.
hasMessages chan struct{}

// The next status report to send.
statusReport protobufs.StatusReport
// Indicates that the status report is pending to be sent.
statusReportPending bool
// The next message to send.
nextMessage protobufs.AgentToServer
// Indicates that nextMessage is pending to be sent.
messagePending bool
// Mutex to protect the above 2 fields.
statusReportMutex sync.Mutex
messageMutex sync.Mutex

// Indicates that the sender has fully stopped.
stopped chan struct{}
Expand All @@ -39,12 +39,12 @@ func NewSender(logger types.Logger) *Sender {
}
}

// Start the sender and send the first status report that was set via UpdateStatus()
// Start the sender and send the first message that was set via UpdateNextMessage()
// earlier. To stop the Sender cancel the ctx.
func (s *Sender) Start(ctx context.Context, instanceUid string, conn *websocket.Conn) error {
s.conn = conn
s.instanceUid = instanceUid
err := s.sendStatusReport()
err := s.sendNextMessage()

// Run the sender in the background.
s.stopped = make(chan struct{})
Expand All @@ -59,18 +59,31 @@ func (s *Sender) WaitToStop() {
<-s.stopped
}

// UpdateStatus applies the specified modifier function to the status report that
// UpdateNextMessage applies the specified modifier function to the next message that
// will be sent and marks the message as pending to be sent.
func (s *Sender) UpdateNextMessage(modifier func(msg *protobufs.AgentToServer)) {
s.messageMutex.Lock()
modifier(&s.nextMessage)
s.messagePending = true
s.messageMutex.Unlock()
}

// UpdateNextStatus applies the specified modifier function to the status report that
// will be sent next and marks the status report as pending to be sent.
func (s *Sender) UpdateStatus(modifier func(statusReport *protobufs.StatusReport)) {
s.statusReportMutex.Lock()
modifier(&s.statusReport)
s.statusReportPending = true
s.statusReportMutex.Unlock()
func (s *Sender) UpdateNextStatus(modifier func(statusReport *protobufs.StatusReport)) {
s.UpdateNextMessage(
func(msg *protobufs.AgentToServer) {
if s.nextMessage.StatusReport == nil {
s.nextMessage.StatusReport = &protobufs.StatusReport{}
}
modifier(s.nextMessage.StatusReport)
},
)
}

// ScheduleSend signals to the sending goroutine to send the current status report
// if it is pending. If there is no pending status report (e.g. status report was
// already sent and "pending" flag is reset) then status report will not be sent.
// ScheduleSend signals to the sending goroutine to send the next message
// if it is pending. If there is no pending message (e.g. the message was
// already sent and "pending" flag is reset) then no message will be be sent.
func (s *Sender) ScheduleSend() {
select {
case s.hasMessages <- struct{}{}:
Expand All @@ -84,7 +97,7 @@ out:
for {
select {
case <-s.hasMessages:
s.sendMessages()
s.sendNextMessage()

case <-ctx.Done():
break out
Expand All @@ -94,35 +107,26 @@ out:
close(s.stopped)
}

func (s *Sender) sendMessages() {
s.sendStatusReport()
}

func (s *Sender) sendStatusReport() error {
var statusReport *protobufs.StatusReport
s.statusReportMutex.Lock()
if s.statusReportPending {
// Clone the statusReport to have a copy for sending and avoid blocking
// future updates to statusReport field.
statusReport = proto.Clone(&s.statusReport).(*protobufs.StatusReport)
s.statusReportPending = false
func (s *Sender) sendNextMessage() error {
var msgToSend *protobufs.AgentToServer
s.messageMutex.Lock()
if s.messagePending {
// Clone the message to have a copy for sending and avoid blocking
// future updates to s.nextMessage field.
msgToSend = proto.Clone(&s.nextMessage).(*protobufs.AgentToServer)
s.messagePending = false

// Reset fields that we do not have to send unless they change before the
// next report after this one.
s.statusReport.RemoteConfigStatus = nil
s.statusReport.EffectiveConfig = nil
s.statusReport.AgentDescription = nil
s.nextMessage = protobufs.AgentToServer{}
}
s.statusReportMutex.Unlock()

if statusReport != nil {
msg := protobufs.AgentToServer{
InstanceUid: s.instanceUid,
Body: &protobufs.AgentToServer_StatusReport{
StatusReport: statusReport,
},
}
return s.sendMessage(&msg)
s.messageMutex.Unlock()

if msgToSend != nil && !proto.Equal(msgToSend, &protobufs.AgentToServer{}) {
// There is a pending message and the message has some fields populated.
// Set the InstanceUid field and send it.
msgToSend.InstanceUid = s.instanceUid
return s.sendMessage(msgToSend)
}
return nil
}
Expand Down
47 changes: 19 additions & 28 deletions internal/proto/opamp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,25 @@ message AgentToServer {
// Recommended format: https://github.com/ulid/spec
string instance_uid = 1;

// Sequential number of the message.
//
// (instance_uid, sequence_num) pair globally uniquely identifies every
// AgentToServer message and SHOULD be treated as an idempotency key by the
// Management Server.
//
// The Agent SHOULD start with sequence_num=0 and increment it by 2 for every new
// request (thus only even numbers should appear in the requests from Agent to Server).
//
// Requests which need to be retried (e.g. because the response timed out) MUST
// keep the same sequence_num.
//fixed64 sequence_num = 2;

oneof Body {
StatusReport status_report = 2;

// The list of the agent addons, including addon statuses.
// This field SHOULD be unset if this information is unchanged since the last
// StatusReport message for this agent was sent in the stream.
AgentAddonStatuses addon_statuses = 3;

// The status of the installation operation that was previously offered by the server.
// This field SHOULD be unset if the installation status is unchanged since the
// last StatusReport message.
AgentInstallStatus agent_install_status = 4;

AgentDisconnect agent_disconnect = 5;
}
// The status of the Agent. MUST be set in the first AgentToServer message that the
// Agent sends after connecting.
// This field SHOULD be unset if this information is unchanged since the last
// AgentToServer message for this agent was sent in the stream.
StatusReport status_report = 2;

// The list of the agent addons, including addon statuses.
// This field SHOULD be unset if this information is unchanged since the last
// AgentToServer message for this agent was sent in the stream.
AgentAddonStatuses addon_statuses = 3;

// The status of the installation operation that was previously offered by the server.
// This field SHOULD be unset if the installation status is unchanged since the
// last AgentToServer message.
AgentInstallStatus agent_install_status = 4;

// AgentDisconnect MUST be set in the last AgentToServer message sent from the
// agent to the server.
AgentDisconnect agent_disconnect = 5;
}

// AgentDisconnect is the last message sent from the agent to the server. The server
Expand Down
Loading

0 comments on commit dd5c666

Please sign in to comment.