Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable scheduleSend when onMessage callback is running #144

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/internal/httpsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (h *HTTPSender) Run(
) {
h.url = url
h.callbacks = callbacks
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, clientSyncedState, packagesStateProvider, capabilities)
h.receiveProcessor = newReceivedProcessor(h.logger, callbacks, h, &h.SenderCommon, clientSyncedState, packagesStateProvider, capabilities)

for {
pollingTimer := time.NewTimer(time.Millisecond * time.Duration(atomic.LoadInt64(&h.pollingIntervalMs)))
Expand Down
33 changes: 33 additions & 0 deletions client/internal/httpsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,39 @@ import (
"github.com/stretchr/testify/assert"
)

func TestDelaySchedule(t *testing.T) {
sender := NewHTTPSender(&sharedinternal.NopLogger{})
pendingMessageChan := sender.hasPendingMessage
scheduleSendDelayChan := sender.registerScheduleSend
sender.DisableScheduleSend()

// Verify ScheduleSend is not writing to message channel when disabled
sender.ScheduleSend()
assert.Equal(t, 0, len(pendingMessageChan))
assert.Equal(t, 1, len(scheduleSendDelayChan))

// Repeat process to verify non-blocking and no change in channel length
sender.ScheduleSend()
assert.Equal(t, 0, len(pendingMessageChan))
assert.Equal(t, 1, len(scheduleSendDelayChan))

// Verify ScheduleSend is writing to message channel when enabled
sender.EnableScheduleSend()
assert.Equal(t, 1, len(pendingMessageChan))
assert.Equal(t, 0, len(scheduleSendDelayChan))

// Repeat process to verify non-blocking and no change in channel length
sender.EnableScheduleSend()
assert.Equal(t, 1, len(pendingMessageChan))
assert.Equal(t, 0, len(scheduleSendDelayChan))

// ScheduleSend sanity check after enabling
sender.ScheduleSend()
assert.Equal(t, 1, len(pendingMessageChan))
assert.Equal(t, 0, len(scheduleSendDelayChan))

}

func TestHTTPSenderRetryForStatusTooManyRequests(t *testing.T) {

var connectionAttempts int64
Expand Down
13 changes: 11 additions & 2 deletions client/internal/receivedprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type receivedProcessor struct {
// what will be sent later.
sender Sender

// A senderCommon to handle the scheduling of sending.
senderCommon *SenderCommon
nemoshlag marked this conversation as resolved.
Show resolved Hide resolved

// Client state storage. This is needed if the Server asks to report the state.
clientSyncedState *ClientSyncedState

Expand All @@ -32,6 +35,7 @@ func newReceivedProcessor(
logger types.Logger,
callbacks types.Callbacks,
sender Sender,
senderCommon *SenderCommon,
nemoshlag marked this conversation as resolved.
Show resolved Hide resolved
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
Expand All @@ -40,6 +44,7 @@ func newReceivedProcessor(
logger: logger,
callbacks: callbacks,
sender: sender,
senderCommon: senderCommon,
clientSyncedState: clientSyncedState,
packagesStateProvider: packagesStateProvider,
capabilities: capabilities,
Expand Down Expand Up @@ -127,8 +132,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
msgData.AgentIdentification = msg.AgentIdentification
}
}

r.callbacks.OnMessage(ctx, msgData)
r.onMessage(ctx, msgData)

r.rcvOpampConnectionSettings(ctx, msg.ConnectionSettings)
nemoshlag marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -143,6 +147,11 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro
}
}

func (r *receivedProcessor) onMessage(ctx context.Context, msgData *types.MessageData) {
r.senderCommon.DisableScheduleSend()
r.callbacks.OnMessage(ctx, msgData)
nemoshlag marked this conversation as resolved.
Show resolved Hide resolved
r.senderCommon.EnableScheduleSend()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: move this up and use defer so that if the code becomes more complicated in the future there is no chance that EnableScheduleSend() is not executed on some of the return paths.

}
func (r *receivedProcessor) hasCapability(capability protobufs.AgentCapabilities) bool {
return r.capabilities&capability != 0
}
Expand Down
44 changes: 42 additions & 2 deletions client/internal/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"errors"
"sync/atomic"

"github.com/oklog/ulid/v2"
"github.com/open-telemetry/opamp-go/protobufs"
Expand Down Expand Up @@ -31,6 +32,12 @@ type SenderCommon struct {
// Indicates that there is a pending message to send.
hasPendingMessage chan struct{}

// Indicates onMessage callback is running
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the comment doesn't match what the field name implies. It would be best to reword it to something like "Set to non-zero to indicate that the message sending is disabled"

onMessageRunning int32
nemoshlag marked this conversation as resolved.
Show resolved Hide resolved

// Indicates ScheduleSend() was called during onMessage callback run
registerScheduleSend chan struct{}

// The next message to send.
nextMessage NextMessage
}
Expand All @@ -39,15 +46,27 @@ type SenderCommon struct {
// the WebSocket and HTTP Sender implementations.
func NewSenderCommon() SenderCommon {
return SenderCommon{
hasPendingMessage: make(chan struct{}, 1),
nextMessage: NewNextMessage(),
hasPendingMessage: make(chan struct{}, 1),
registerScheduleSend: make(chan struct{}, 1),
nextMessage: NewNextMessage(),
onMessageRunning: 0,
}
}

// ScheduleSend signals to HTTPSender that the message in NextMessage struct
// is now ready to be sent. If there is no pending message (e.g. the NextMessage was
// already sent and "pending" flag is reset) then no message will be sent.
func (h *SenderCommon) ScheduleSend() {
if h.IsOnMessageRunning() {
// onMessage callback is running, ScheduleSend() will rerun after it is done
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, comment doesn't match what the code and func names imply.

select {
case h.registerScheduleSend <- struct{}{}:
default:
break
}
return
}

// Set pending flag. Don't block on writing to channel.
select {
case h.hasPendingMessage <- struct{}{}:
Expand All @@ -62,6 +81,27 @@ func (h *SenderCommon) NextMessage() *NextMessage {
return &h.nextMessage
}

// IsOnMessageRunning returns true if onMessage callback is running
func (h *SenderCommon) IsOnMessageRunning() bool {
nemoshlag marked this conversation as resolved.
Show resolved Hide resolved
return atomic.LoadInt32(&h.onMessageRunning) != 0
}

// DisableScheduleSend temporary preventing ScheduleSend from writing to channel
func (h *SenderCommon) DisableScheduleSend() {
atomic.StoreInt32(&h.onMessageRunning, 1)
}

// EnableScheduleSend re-enables ScheduleSend and checks if it was called during onMessage callback
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I would refrain from mentioning onMessage here. It is not a concern of SenderCommon.

func (h *SenderCommon) EnableScheduleSend() {
atomic.StoreInt32(&h.onMessageRunning, 0)
select {
case <-h.registerScheduleSend:
h.ScheduleSend()
default:
break
}
}

// SetInstanceUid sets a new instanceUid to be used for all subsequent messages to be sent.
// Can be called concurrently, normally is called when a message is received from the
// Server that instructs us to change our instance UID.
Expand Down
3 changes: 2 additions & 1 deletion client/internal/wsreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func NewWSReceiver(
callbacks types.Callbacks,
conn *websocket.Conn,
sender *WSSender,
senderCommon *SenderCommon,
clientSyncedState *ClientSyncedState,
packagesStateProvider types.PackagesStateProvider,
capabilities protobufs.AgentCapabilities,
Expand All @@ -36,7 +37,7 @@ func NewWSReceiver(
logger: logger,
sender: sender,
callbacks: callbacks,
processor: newReceivedProcessor(logger, callbacks, sender, clientSyncedState, packagesStateProvider, capabilities),
processor: newReceivedProcessor(logger, callbacks, sender, senderCommon, clientSyncedState, packagesStateProvider, capabilities),
}

return w
Expand Down
4 changes: 2 additions & 2 deletions client/internal/wsreceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestServerToAgentCommand(t *testing.T) {
remoteConfigStatus: &protobufs.RemoteConfigStatus{},
}
sender := WSSender{}
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &clientSyncedState, nil, 0)
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, &sender, &sender.SenderCommon, &clientSyncedState, nil, 0)
receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Command: test.command,
})
Expand All @@ -97,7 +97,7 @@ func TestServerToAgentCommandExclusive(t *testing.T) {
},
}
clientSyncedState := ClientSyncedState{}
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, &clientSyncedState, nil, 0)
receiver := NewWSReceiver(TestLogger{t}, callbacks, nil, nil, nil, &clientSyncedState, nil, 0)
receiver.processor.ProcessReceivedMessage(context.Background(), &protobufs.ServerToAgent{
Command: &protobufs.ServerToAgentCommand{
Type: protobufs.CommandType_CommandType_Restart,
Expand Down
1 change: 1 addition & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) {
c.common.Callbacks,
c.conn,
c.sender,
&c.sender.SenderCommon,
&c.common.ClientSyncedState,
c.common.PackagesStateProvider,
c.common.Capabilities,
Expand Down