Skip to content

Commit

Permalink
Add Disconnect() method to client connections (#129)
Browse files Browse the repository at this point in the history
Resolves #91 

This enables OpAMP to close the network connection to an agent from the server side
  • Loading branch information
nemoshlag authored Sep 22, 2022
1 parent 481b3de commit c15cf0b
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 2 deletions.
13 changes: 11 additions & 2 deletions server/httpconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import (
"github.com/open-telemetry/opamp-go/server/types"
)

var errInvalidHTTPConnection = errors.New("cannot Send() over HTTP connection")
// ErrInvalidHTTPConnection represents an event of misuse function for plain HTTP
// connection, such as httpConnection.Send() or httpConnection.Disconnect().
// Usage will not result with change but return this error to indicate current state
// might not be as expected.
var ErrInvalidHTTPConnection = errors.New("cannot operate over HTTP connection")

// httpConnection represents an OpAMP connection over a plain HTTP connection.
// Only one response is possible to send when using plain HTTP connection
Expand All @@ -28,5 +32,10 @@ var _ types.Connection = (*httpConnection)(nil)
func (c httpConnection) Send(_ context.Context, _ *protobufs.ServerToAgent) error {
// Send() should not be called for plain HTTP connection. Instead, the response will
// be sent after the onMessage callback returns.
return errInvalidHTTPConnection
return ErrInvalidHTTPConnection
}

func (c httpConnection) Disconnect() error {
// Disconnect() should not be called for plain HTTP connection.
return ErrInvalidHTTPConnection
}
41 changes: 41 additions & 0 deletions server/serverimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,47 @@ func TestServerStartAcceptConnection(t *testing.T) {
eventually(t, func() bool { return atomic.LoadInt32(&connectionCloseCalled) == 1 })
}

func TestDisconnectHttpConnection(t *testing.T) {
// Verify Disconnect() results with Invalid HTTP Connection error
err := httpConnection{}.Disconnect()
assert.Error(t, err)
assert.Equal(t, ErrInvalidHTTPConnection, err)
}

func TestDisconnectWSConnection(t *testing.T) {
connectionCloseCalled := int32(0)
callback := CallbacksStruct{
OnConnectionCloseFunc: func(conn types.Connection) {
atomic.StoreInt32(&connectionCloseCalled, 1)
},
}

// Start a Server.
settings := &StartSettings{Settings: Settings{Callbacks: callback}}
srv := startServer(t, settings)
defer srv.Stop(context.Background())

// Connect to the Server.
conn, _, err := dialClient(settings)

// Verify that the connection is successful.
assert.NoError(t, err)
assert.True(t, atomic.LoadInt32(&connectionCloseCalled) == 0)

// Close connection from server side
srvConn := wsConnection{wsConn: conn}
err = srvConn.Disconnect()
assert.NoError(t, err)

// Verify connection disconnected from server side
eventually(t, func() bool { return atomic.LoadInt32(&connectionCloseCalled) == 1 })
// Waiting for wsConnection to fail ReadMessage() over a Disconnected communication
eventually(t, func() bool {
_, _, err := conn.ReadMessage()
return err != nil
})
}

func TestServerReceiveSendMessage(t *testing.T) {
var rcvMsg atomic.Value
callbacks := CallbacksStruct{
Expand Down
4 changes: 4 additions & 0 deletions server/types/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ type Connection interface {
// Blocks until the message is sent.
// Should return as soon as possible if the ctx is cancelled.
Send(ctx context.Context, message *protobufs.ServerToAgent) error

// Disconnect closes the network connection.
// Any blocked Read or Write operations will be unblocked and return errors.
Disconnect() error
}
4 changes: 4 additions & 0 deletions server/wsconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ func (c wsConnection) Send(_ context.Context, message *protobufs.ServerToAgent)
}
return c.wsConn.WriteMessage(websocket.BinaryMessage, bytes)
}

func (c wsConnection) Disconnect() error {
return c.wsConn.Close()
}

0 comments on commit c15cf0b

Please sign in to comment.