Skip to content

Commit

Permalink
Implement compression for WebSocket client (#122)
Browse files Browse the repository at this point in the history
Resolves #101
  • Loading branch information
tigrannajaryan authored Aug 2, 2022
1 parent 0ded8e0 commit 1993a38
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 5 deletions.
2 changes: 2 additions & 0 deletions client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func (c *httpClient) Start(ctx context.Context, settings types.StartSettings) er

c.opAMPServerURL = settings.OpAMPServerURL

// TODO: implement compression based on settings.EnableCompression value.

// Prepare Server connection settings.
c.sender.SetRequestHeader(settings.Header)

Expand Down
17 changes: 12 additions & 5 deletions client/internal/mockserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ type MockServer struct {
OnMessage func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent
srv *httptest.Server

expectedHandlers chan receivedMessageHandler
expectedComplete chan struct{}
isExpectMode bool
expectedHandlers chan receivedMessageHandler
expectedComplete chan struct{}
isExpectMode bool
enableCompression bool
}

const headerContentType = "Content-Type"
const contentTypeProtobuf = "application/x-protobuf"

var upgrader = websocket.Upgrader{}

func StartMockServer(t *testing.T) *MockServer {
srv := &MockServer{
t: t,
Expand Down Expand Up @@ -101,7 +100,15 @@ func (m *MockServer) handlePlainHttp(w http.ResponseWriter, r *http.Request) {
}
}

func (m *MockServer) EnableCompression() {
m.enableCompression = true
}

func (m *MockServer) handleWebSocket(t *testing.T, w http.ResponseWriter, r *http.Request) {
var upgrader = websocket.Upgrader{
EnableCompression: m.enableCompression,
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
Expand Down
113 changes: 113 additions & 0 deletions client/internal/tcpproxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package internal

import (
"log"
"net"
"sync/atomic"
)

// TCPProxy is used for intercepting WebSocket connections and counting
// the number of bytes transferred.
type TCPProxy struct {
// Destination endpoint to connect to.
destHostPort string
// Incoming endpoint to accept connections on.
incomingHostPort string

stopSignal chan struct{}

// Byte counters in both directions.
clientToServerBytes int64
serverToClientBytes int64
}

func NewProxy(destHostPort string) *TCPProxy {
return &TCPProxy{destHostPort: destHostPort}
}

func (p *TCPProxy) Start() error {
// Begin listening on an available TCP port.
ln, err := net.Listen("tcp", "127.0.0.1:")
if err != nil {
return err
}

// Remember the port that we listen on.
p.incomingHostPort = ln.Addr().String()

p.stopSignal = make(chan struct{})

go func() {
for {
select {
case <-p.stopSignal:
ln.Close()
return
default:
}
conn, err := ln.Accept()
if err != nil {
log.Printf("Failed to Accept TCP connection: %v\n", err.Error())
return
}
go p.forwardBothWays(conn)
}
}()

return nil
}

func (p *TCPProxy) Stop() {
close(p.stopSignal)
}

func (p *TCPProxy) IncomingEndpoint() string {
return p.incomingHostPort
}

func (p *TCPProxy) forwardBothWays(in net.Conn) {
// We have an incoming connection. Establish an outgoing connection
// to the destination endpoint.
out, err := net.Dial("tcp", p.destHostPort)
if err != nil {
return
}

defer out.Close()
defer in.Close()

// Forward TCP stream bytes from incoming to outgoing connection direction.
go p.forwardConn(in, out, &p.clientToServerBytes)

// Forward TCP stream bytes in the reverse direction.
p.forwardConn(out, in, &p.serverToClientBytes)
}

func (p *TCPProxy) ServerToClientBytes() int {
return int(atomic.LoadInt64(&p.serverToClientBytes))
}

func (p *TCPProxy) ClientToServerBytes() int {
return int(atomic.LoadInt64(&p.clientToServerBytes))
}

func (p *TCPProxy) forwardConn(in, out net.Conn, byteCounter *int64) {
buf := make([]byte, 1024)
for {
select {
case <-p.stopSignal:
return
default:
}

n, err := in.Read(buf)
if err != nil {
break
}
n, err = out.Write(buf[:n])
if err != nil {
break
}
atomic.AddInt64(byteCounter, int64(n))
}
}
5 changes: 5 additions & 0 deletions client/types/startsettings.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ type StartSettings struct {
// Defines the capabilities of the Agent. AgentCapabilities_ReportsStatus bit does not need to
// be set in this field, it will be set automatically since it is required by OpAMP protocol.
Capabilities protobufs.AgentCapabilities

// EnableCompression can be set to true to enable the compression. Note that for WebSocket transport
// the compression is only effectively enabled if the Server also supports compression.
// The data will be compressed in both directions.
EnableCompression bool
}
2 changes: 2 additions & 0 deletions client/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func (c *wsClient) Start(ctx context.Context, settings types.StartSettings) erro
return err
}

c.dialer.EnableCompression = settings.EnableCompression

if settings.TLSConfig != nil {
c.url.Scheme = "wss"
}
Expand Down
124 changes: 124 additions & 0 deletions client/wsclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@ package client

import (
"context"
"fmt"
"strings"
"sync/atomic"
"testing"

"github.com/gorilla/websocket"
"github.com/open-telemetry/opamp-go/client/internal"
"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
)

func TestDisconnectWSByServer(t *testing.T) {
// Start a Server.
srv := internal.StartMockServer(t)

var conn atomic.Value
srv.OnWSConnect = func(c *websocket.Conn) {
conn.Store(c)
Expand Down Expand Up @@ -51,3 +56,122 @@ func TestDisconnectWSByServer(t *testing.T) {
err := client.Stop(context.Background())
assert.NoError(t, err)
}

func TestVerifyWSCompress(t *testing.T) {

tests := []bool{false, true}
for _, withCompression := range tests {
t.Run(fmt.Sprintf("%v", withCompression), func(t *testing.T) {

// Start a Server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()
if withCompression {
srv.EnableCompression()
}

// We use a transparent TCP proxy to be able to count the actual bytes transferred so that
// we can test the number of actual bytes vs number of expected bytes with and without compression.
proxy := internal.NewProxy(srv.Endpoint)
assert.NoError(t, proxy.Start())

// Start an OpAMP/WebSocket client.
var clientGotRemoteConfig atomic.Value
settings := types.StartSettings{
Callbacks: types.CallbacksStruct{
OnMessageFunc: func(ctx context.Context, msg *types.MessageData) {
if msg.RemoteConfig != nil {
clientGotRemoteConfig.Store(msg.RemoteConfig)
}
},
GetEffectiveConfigFunc: func(ctx context.Context) (*protobufs.EffectiveConfig, error) {
// If the client already received a remote config offer make sure to report
// the effective config back to the server.
var effCfg []byte
remoteCfg, _ := clientGotRemoteConfig.Load().(*protobufs.AgentRemoteConfig)
if remoteCfg != nil {
effCfg = remoteCfg.Config.ConfigMap[""].Body
}
return &protobufs.EffectiveConfig{
ConfigMap: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"key": {
Body: effCfg,
},
},
},
}, nil
},
},
Capabilities: protobufs.AgentCapabilities_AcceptsRemoteConfig |
protobufs.AgentCapabilities_ReportsEffectiveConfig,
}
settings.OpAMPServerURL = "ws://" + proxy.IncomingEndpoint()

if withCompression {
settings.EnableCompression = true
}

client := NewWebSocket(nil)
startClient(t, settings, client)

// Use highly compressible config body.
uncompressedCfg := []byte(strings.Repeat("test", 10000))

remoteCfg := &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": &protobufs.AgentConfigFile{
Body: uncompressedCfg,
},
},
},
ConfigHash: []byte{1, 2, 3, 4},
}

// ---> Server
srv.Expect(
func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
assert.EqualValues(t, 0, msg.SequenceNum)
// The first status report after Start must have full AgentDescription.
assert.True(t, proto.Equal(client.AgentDescription(), msg.AgentDescription))
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
RemoteConfig: remoteCfg,
}
},
)

// Wait to receive remote config
eventually(t, func() bool { return clientGotRemoteConfig.Load() != nil })

_ = client.UpdateEffectiveConfig(context.Background())

// ---> Server
srv.Expect(
func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}
},
)

// Stop the client.
err := client.Stop(context.Background())
assert.NoError(t, err)

proxy.Stop()

fmt.Printf("sent %d, received %d\n", proxy.ClientToServerBytes(), proxy.ServerToClientBytes())

if withCompression {
// With compression the entire bytes exchanged should be less than the config body.
// This is only possible if there is any compression happening.
assert.Less(t, proxy.ServerToClientBytes(), len(uncompressedCfg))
assert.Less(t, proxy.ClientToServerBytes(), len(uncompressedCfg))
} else {
// Without compression the entire bytes exchanged should be more than the config body.
assert.Greater(t, proxy.ServerToClientBytes(), len(uncompressedCfg))
assert.Greater(t, proxy.ClientToServerBytes(), len(uncompressedCfg))
}
})
}
}
1 change: 1 addition & 0 deletions server/serverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func New(logger types.Logger) *server {

func (s *server) Attach(settings Settings) (HTTPHandlerFunc, error) {
s.settings = settings
// TODO: Add support for compression using Upgrader.EnableCompression field.
s.wsUpgrader = websocket.Upgrader{}
return s.httpHandler, nil
}
Expand Down

0 comments on commit 1993a38

Please sign in to comment.