From c15cf0bf7f1d97f73b07b25bfd7e18d362310897 Mon Sep 17 00:00:00 2001 From: Nimrod Shlagman Date: Thu, 22 Sep 2022 20:03:13 +0300 Subject: [PATCH] Add Disconnect() method to client connections (#129) Resolves #91 This enables OpAMP to close the network connection to an agent from the server side --- server/httpconnection.go | 13 ++++++++++-- server/serverimpl_test.go | 41 ++++++++++++++++++++++++++++++++++++++ server/types/connection.go | 4 ++++ server/wsconnection.go | 4 ++++ 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/server/httpconnection.go b/server/httpconnection.go index d8e40850..e70ea82c 100644 --- a/server/httpconnection.go +++ b/server/httpconnection.go @@ -9,7 +9,11 @@ import ( "github.com/open-telemetry/opamp-go/server/types" ) -var errInvalidHTTPConnection = errors.New("cannot Send() over HTTP connection") +// ErrInvalidHTTPConnection represents an event of misuse function for plain HTTP +// connection, such as httpConnection.Send() or httpConnection.Disconnect(). +// Usage will not result with change but return this error to indicate current state +// might not be as expected. +var ErrInvalidHTTPConnection = errors.New("cannot operate over HTTP connection") // httpConnection represents an OpAMP connection over a plain HTTP connection. // Only one response is possible to send when using plain HTTP connection @@ -28,5 +32,10 @@ var _ types.Connection = (*httpConnection)(nil) func (c httpConnection) Send(_ context.Context, _ *protobufs.ServerToAgent) error { // Send() should not be called for plain HTTP connection. Instead, the response will // be sent after the onMessage callback returns. - return errInvalidHTTPConnection + return ErrInvalidHTTPConnection +} + +func (c httpConnection) Disconnect() error { + // Disconnect() should not be called for plain HTTP connection. + return ErrInvalidHTTPConnection } diff --git a/server/serverimpl_test.go b/server/serverimpl_test.go index 45e461a9..11c7f897 100644 --- a/server/serverimpl_test.go +++ b/server/serverimpl_test.go @@ -126,6 +126,47 @@ func TestServerStartAcceptConnection(t *testing.T) { eventually(t, func() bool { return atomic.LoadInt32(&connectionCloseCalled) == 1 }) } +func TestDisconnectHttpConnection(t *testing.T) { + // Verify Disconnect() results with Invalid HTTP Connection error + err := httpConnection{}.Disconnect() + assert.Error(t, err) + assert.Equal(t, ErrInvalidHTTPConnection, err) +} + +func TestDisconnectWSConnection(t *testing.T) { + connectionCloseCalled := int32(0) + callback := CallbacksStruct{ + OnConnectionCloseFunc: func(conn types.Connection) { + atomic.StoreInt32(&connectionCloseCalled, 1) + }, + } + + // Start a Server. + settings := &StartSettings{Settings: Settings{Callbacks: callback}} + srv := startServer(t, settings) + defer srv.Stop(context.Background()) + + // Connect to the Server. + conn, _, err := dialClient(settings) + + // Verify that the connection is successful. + assert.NoError(t, err) + assert.True(t, atomic.LoadInt32(&connectionCloseCalled) == 0) + + // Close connection from server side + srvConn := wsConnection{wsConn: conn} + err = srvConn.Disconnect() + assert.NoError(t, err) + + // Verify connection disconnected from server side + eventually(t, func() bool { return atomic.LoadInt32(&connectionCloseCalled) == 1 }) + // Waiting for wsConnection to fail ReadMessage() over a Disconnected communication + eventually(t, func() bool { + _, _, err := conn.ReadMessage() + return err != nil + }) +} + func TestServerReceiveSendMessage(t *testing.T) { var rcvMsg atomic.Value callbacks := CallbacksStruct{ diff --git a/server/types/connection.go b/server/types/connection.go index 22e5ac80..1f8371f5 100644 --- a/server/types/connection.go +++ b/server/types/connection.go @@ -19,4 +19,8 @@ type Connection interface { // Blocks until the message is sent. // Should return as soon as possible if the ctx is cancelled. Send(ctx context.Context, message *protobufs.ServerToAgent) error + + // Disconnect closes the network connection. + // Any blocked Read or Write operations will be unblocked and return errors. + Disconnect() error } diff --git a/server/wsconnection.go b/server/wsconnection.go index 6d90f75b..1d583587 100644 --- a/server/wsconnection.go +++ b/server/wsconnection.go @@ -29,3 +29,7 @@ func (c wsConnection) Send(_ context.Context, message *protobufs.ServerToAgent) } return c.wsConn.WriteMessage(websocket.BinaryMessage, bytes) } + +func (c wsConnection) Disconnect() error { + return c.wsConn.Close() +}