Skip to content

Commit

Permalink
gk: nits; continued
Browse files Browse the repository at this point in the history
  • Loading branch information
shazbert committed Nov 20, 2024
1 parent 72f97dd commit 274ae8f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 27 deletions.
5 changes: 1 addition & 4 deletions exchanges/stream/stream_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ type ConnectionSetup struct {
// This is useful for when an exchange connection requires a unique or
// structured message ID for each message sent.
BespokeGenerateMessageID func(highPrecision bool) int64
// Authenticate is a function that will be called to authenticate the
// connection to the exchange's websocket server. This function should
// handle the authentication process and return an error if the
// authentication fails.
// Authenticate will be called to authenticate the connection
Authenticate func(ctx context.Context, conn Connection) error
// WrapperDefinedConnectionSignature is any type that will match to a specific connection. This could be an asset
// type `asset.Spot`, a string type denoting the individual URL, an authenticated or unauthenticated string or a
Expand Down
36 changes: 18 additions & 18 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ func NewWebsocket() *Websocket {
// after subscriptions are made but before the connectionMonitor has
// started. This allows the error to be read and handled in the
// connectionMonitor and start a connection cycle again.
ReadMessageErrors: make(chan error, 1),
Match: NewMatch(),
subscriptions: subscription.NewStore(),
features: &protocol.Features{},
Orderbook: buffer.Orderbook{},
connectionToWrapper: make(map[Connection]*ConnectionWrapper),
ReadMessageErrors: make(chan error, 1),
Match: NewMatch(),
subscriptions: subscription.NewStore(),
features: &protocol.Features{},
Orderbook: buffer.Orderbook{},
connections: make(map[Connection]*ConnectionWrapper),
}
}

Expand Down Expand Up @@ -435,7 +435,7 @@ func (w *Websocket) connect() error {
break
}

w.connectionToWrapper[conn] = w.connectionManager[i]
w.connections[conn] = w.connectionManager[i]
w.connectionManager[i].Connection = conn

w.Wg.Add(1)
Expand Down Expand Up @@ -476,7 +476,7 @@ func (w *Websocket) connect() error {
}
w.connectionManager[x].Subscriptions.Clear()
}
clear(w.connectionToWrapper)
clear(w.connections)
w.setState(disconnectedState) // Flip from connecting to disconnected.

// Drain residual error in the single buffered channel, this mitigates
Expand Down Expand Up @@ -564,7 +564,7 @@ func (w *Websocket) shutdown() error {
}
}
// Clean map of old connections
clear(w.connectionToWrapper)
clear(w.connections)

if w.Conn != nil {
if err := w.Conn.Shutdown(); err != nil {
Expand Down Expand Up @@ -655,7 +655,7 @@ func (w *Websocket) FlushChannels() error {
}
w.Wg.Add(1)
go w.Reader(context.TODO(), conn, w.connectionManager[x].Setup.Handler)
w.connectionToWrapper[conn] = w.connectionManager[x]
w.connections[conn] = w.connectionManager[x]
w.connectionManager[x].Connection = conn
}

Expand All @@ -674,7 +674,7 @@ func (w *Websocket) FlushChannels() error {

// If there are no subscriptions to subscribe to, close the connection as it is no longer needed.
if w.connectionManager[x].Subscriptions.Len() == 0 {
delete(w.connectionToWrapper, w.connectionManager[x].Connection) // Remove from lookup map
delete(w.connections, w.connectionManager[x].Connection) // Remove from lookup map
if err := w.connectionManager[x].Connection.Shutdown(); err != nil {
log.Warnf(log.WebsocketMgr, "%v websocket: failed to shutdown connection: %v", w.exchangeName, err)
}
Expand Down Expand Up @@ -835,7 +835,7 @@ func (w *Websocket) GetName() string {
// and the new subscription list when pairs are disabled or enabled.
func (w *Websocket) GetChannelDifference(conn Connection, newSubs subscription.List) (sub, unsub subscription.List) {
var subscriptionStore **subscription.Store
if wrapper, ok := w.connectionToWrapper[conn]; ok && conn != nil {
if wrapper, ok := w.connections[conn]; ok && conn != nil {
subscriptionStore = &wrapper.Subscriptions
} else {
subscriptionStore = &w.subscriptions
Expand All @@ -851,7 +851,7 @@ func (w *Websocket) UnsubscribeChannels(conn Connection, channels subscription.L
if len(channels) == 0 {
return nil // No channels to unsubscribe from is not an error
}
if wrapper, ok := w.connectionToWrapper[conn]; ok && conn != nil {
if wrapper, ok := w.connections[conn]; ok && conn != nil {
return w.unsubscribe(wrapper.Subscriptions, channels, func(channels subscription.List) error {
return wrapper.Setup.Unsubscriber(context.TODO(), conn, channels)
})
Expand Down Expand Up @@ -897,7 +897,7 @@ func (w *Websocket) SubscribeToChannels(conn Connection, subs subscription.List)
return err
}

if wrapper, ok := w.connectionToWrapper[conn]; ok && conn != nil {
if wrapper, ok := w.connections[conn]; ok && conn != nil {
return wrapper.Setup.Subscriber(context.TODO(), conn, subs)
}

Expand All @@ -918,7 +918,7 @@ func (w *Websocket) AddSubscriptions(conn Connection, subs ...*subscription.Subs
return fmt.Errorf("%w: AddSubscriptions called on nil Websocket", common.ErrNilPointer)
}
var subscriptionStore **subscription.Store
if wrapper, ok := w.connectionToWrapper[conn]; ok && conn != nil {
if wrapper, ok := w.connections[conn]; ok && conn != nil {
subscriptionStore = &wrapper.Subscriptions
} else {
subscriptionStore = &w.subscriptions
Expand Down Expand Up @@ -948,7 +948,7 @@ func (w *Websocket) AddSuccessfulSubscriptions(conn Connection, subs ...*subscri
}

var subscriptionStore **subscription.Store
if wrapper, ok := w.connectionToWrapper[conn]; ok && conn != nil {
if wrapper, ok := w.connections[conn]; ok && conn != nil {
subscriptionStore = &wrapper.Subscriptions
} else {
subscriptionStore = &w.subscriptions
Expand Down Expand Up @@ -977,7 +977,7 @@ func (w *Websocket) RemoveSubscriptions(conn Connection, subs ...*subscription.S
}

var subscriptionStore *subscription.Store
if wrapper, ok := w.connectionToWrapper[conn]; ok && conn != nil {
if wrapper, ok := w.connections[conn]; ok && conn != nil {
subscriptionStore = wrapper.Subscriptions
} else {
subscriptionStore = w.subscriptions
Expand Down Expand Up @@ -1064,7 +1064,7 @@ func checkWebsocketURL(s string) error {
// The subscription state is not considered when counting existing subscriptions
func (w *Websocket) checkSubscriptions(conn Connection, subs subscription.List) error {
var subscriptionStore *subscription.Store
if wrapper, ok := w.connectionToWrapper[conn]; ok && conn != nil {
if wrapper, ok := w.connections[conn]; ok && conn != nil {
subscriptionStore = wrapper.Subscriptions
} else {
subscriptionStore = w.subscriptions
Expand Down
6 changes: 3 additions & 3 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func TestSubscribeUnsubscribe(t *testing.T) {
require.NoError(t, multi.SetupNewConnection(amazingCandidate))

amazingConn := multi.getConnectionFromSetup(amazingCandidate)
multi.connectionToWrapper = map[Connection]*ConnectionWrapper{
multi.connections = map[Connection]*ConnectionWrapper{
amazingConn: multi.connectionManager[0],
}

Expand Down Expand Up @@ -985,7 +985,7 @@ func TestGetChannelDifference(t *testing.T) {
require.Equal(t, 1, len(subs))
require.Empty(t, unsubs, "Should get no unsubs")

w.connectionToWrapper = map[Connection]*ConnectionWrapper{
w.connections = map[Connection]*ConnectionWrapper{
sweetConn: {Setup: &ConnectionSetup{URL: "ws://localhost:8080/ws"}},
}

Expand All @@ -998,7 +998,7 @@ func TestGetChannelDifference(t *testing.T) {
require.Equal(t, 1, len(subs))
require.Empty(t, unsubs, "Should get no unsubs")

err := w.connectionToWrapper[sweetConn].Subscriptions.Add(&subscription.Subscription{Channel: subscription.CandlesChannel})
err := w.connections[sweetConn].Subscriptions.Add(&subscription.Subscription{Channel: subscription.CandlesChannel})
require.NoError(t, err)

subs, unsubs = w.GetChannelDifference(sweetConn, subscription.List{{Channel: subscription.CandlesChannel}})
Expand Down
4 changes: 2 additions & 2 deletions exchanges/stream/websocket_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type Websocket struct {
// for exchanges that differentiate between trading pairs by using different connection endpoints or protocols for various asset classes.
// If an exchange does not require such differentiation, all connections may be managed under a single ConnectionWrapper.
connectionManager []*ConnectionWrapper
// connectionToWrapper holds a look up table for all connections to their corresponding ConnectionWrapper and subscription holder
connectionToWrapper map[Connection]*ConnectionWrapper
// connections holds a look up table for all connections to their corresponding ConnectionWrapper and subscription holder
connections map[Connection]*ConnectionWrapper

subscriptions *subscription.Store

Expand Down

0 comments on commit 274ae8f

Please sign in to comment.