Skip to content

Commit

Permalink
Subscriptions: Move Map to Store
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Feb 14, 2024
1 parent 054830e commit 02f51bb
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 146 deletions.
6 changes: 3 additions & 3 deletions exchanges/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ func (b *Base) FlushWebsocketChannels() error {

// SubscribeToWebsocketChannels appends to ChannelsToSubscribe
// which lets websocket.manageSubscriptions handle subscribing
func (b *Base) SubscribeToWebsocketChannels(channels []subscription.Subscription) error {
func (b *Base) SubscribeToWebsocketChannels(channels []*subscription.Subscription) error {
if b.Websocket == nil {
return common.ErrFunctionNotSupported
}
Expand All @@ -1173,15 +1173,15 @@ func (b *Base) SubscribeToWebsocketChannels(channels []subscription.Subscription

// UnsubscribeToWebsocketChannels removes from ChannelsToSubscribe
// which lets websocket.manageSubscriptions handle unsubscribing
func (b *Base) UnsubscribeToWebsocketChannels(channels []subscription.Subscription) error {
func (b *Base) UnsubscribeToWebsocketChannels(channels []*subscription.Subscription) error {
if b.Websocket == nil {
return common.ErrFunctionNotSupported
}
return b.Websocket.UnsubscribeChannels(channels)
}

// GetSubscriptions returns a copied list of subscriptions
func (b *Base) GetSubscriptions() ([]subscription.Subscription, error) {
func (b *Base) GetSubscriptions() ([]*subscription.Subscription, error) {
if b.Websocket == nil {
return nil, common.ErrFunctionNotSupported
}
Expand Down
6 changes: 3 additions & 3 deletions exchanges/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ type IBotExchange interface {
EnableRateLimiter() error
GetServerTime(ctx context.Context, ai asset.Item) (time.Time, error)
GetWebsocket() (*stream.Websocket, error)
SubscribeToWebsocketChannels(channels []subscription.Subscription) error
UnsubscribeToWebsocketChannels(channels []subscription.Subscription) error
GetSubscriptions() ([]subscription.Subscription, error)
SubscribeToWebsocketChannels(channels []*subscription.Subscription) error
UnsubscribeToWebsocketChannels(channels []*subscription.Subscription) error
GetSubscriptions() ([]*subscription.Subscription, error)
FlushWebsocketChannels() error
AuthenticateWebsocket(ctx context.Context) error
GetOrderExecutionLimits(a asset.Item, cp currency.Pair) (order.MinMaxLevel, error)
Expand Down
14 changes: 6 additions & 8 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func New() *Websocket {
Subscribe: make(chan []subscription.Subscription),
Unsubscribe: make(chan []subscription.Subscription),
Match: NewMatch(),
subscriptions: subscription.Map{},
subscriptions: subscription.NewStore(),
}
}

Expand Down Expand Up @@ -538,9 +538,7 @@ func (w *Websocket) FlushChannels() error {

if len(newsubs) != 0 {
// Purge subscription list as there will be conflicts
w.subscriptionMutex.Lock()
w.subscriptions = subscription.Map{}
w.subscriptionMutex.Unlock()
w.subscriptions.Clear()
return w.SubscribeToChannels(newsubs)
}
return nil
Expand Down Expand Up @@ -863,9 +861,9 @@ func (w *Websocket) GetName() string {

// GetChannelDifference finds the difference between the subscribed channels
// and the new subscription list when pairs are disabled or enabled.
func (w *Websocket) GetChannelDifference(newSubs []subscription.Subscription) (sub, unsub []subscription.Subscription) {
func (w *Websocket) GetChannelDifference(newSubs []*subscription.Subscription) (sub, unsub []*subscription.Subscription) {
if w.subscriptions == nil {
w.subscriptions = subscription.Map{}
w.subscriptions = subscription.NewStore()
}
return w.subscriptions.Diff(subscription.ListToMap(newSubs))

Check failure on line 868 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, 386, true, true)

undefined: subscription.ListToMap

Check failure on line 868 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, amd64, true, false)

undefined: subscription.ListToMap

Check failure on line 868 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-latest, amd64, true, true)

undefined: subscription.ListToMap

Check failure on line 868 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-13, amd64, true, true)

undefined: subscription.ListToMap

Check failure on line 868 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (windows-latest, amd64, true, true)

undefined: subscription.ListToMap
}
Expand Down Expand Up @@ -907,8 +905,8 @@ func (w *Websocket) AddSubscription(c *subscription.Subscription) error {
if w == nil || key == nil {

Check failure on line 905 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, 386, true, true)

undefined: key

Check failure on line 905 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (ubuntu-latest, amd64, true, false)

undefined: key

Check failure on line 905 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-latest, amd64, true, true)

undefined: key

Check failure on line 905 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (macos-13, amd64, true, true)

undefined: key

Check failure on line 905 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end (windows-latest, amd64, true, true)

undefined: key
return common.ErrNilPointer
}
if w.subscriptions == nil
w.subscriptions = subscription.Map{}
if w.subscriptions == nil {
w.subscriptions = subscription.NewStore()
}
return w.subscriptions.Add(c)
}
Expand Down
132 changes: 0 additions & 132 deletions exchanges/subscription/map.go

This file was deleted.

140 changes: 140 additions & 0 deletions exchanges/subscription/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package subscription

import (
"maps"
"sync"
)

// Store is a container of subscription pointers
type Store struct {
m map[any]*Subscription
mu sync.RWMutex
}

func NewStore() *Store {
return &Store{
m: map[any]*Subscription{},
}
}

// ListToStore creates a Store from a slice of subscriptions
func ListToStore(s List) *Store {
n := new(Store)
for _, c := range s {
n.Add(c)
}
return n
}

// Add copies a subscription and adds it to the store
// Key can be already set; if ommitted EnsureKeyed will be used

Check failure on line 30 in exchanges/subscription/store.go

View workflow job for this annotation

GitHub Actions / Spell checker

ommitted ==> omitted
// Errors if it already exists
func (s *Store) Add(sub *Subscription) error {
s.mu.Lock()
defer s.mu.Unlock()
key := sub.ensureKeyed()
if e := s.get(key); e != nil {
return ErrDuplicate
}

s.m[key] = sub

return nil
}

// Get returns a pointer to a subscription or nil if not found
// If key implements MatchableKey then key.Match will be used
func (s *Store) Get(key any) *Subscription {
s.mu.RLock()
defer s.mu.RUnlock()
return s.get(key)
}

// get returns a pointer to subscription or nil if not found
// If key implements MatchableKey then key.Match will be used
// If key is actually a subscription then ensureKeyed will be used and we'll return the
// This method provides no locking protection
// returned subscriptions are implicitly guaranteed to have a key
func (s *Store) get(key any) *Subscription {
switch v := key.(type) {
case *Subscription:
return s.get(v.ensureKeyed())
case Subscription:
return s.get(v.ensureKeyed())
case MatchableKey:
return s.match(v)
default:
return s.m[v]
}
}

// Remove removes a subscription from the store
func (s *Store) Remove(sub *Subscription) {
s.mu.Lock()
defer s.mu.Unlock()

if found := s.get(sub); found != nil {
delete(s.m, found.key)
}
}

// List returns a slice of Subscriptions pointers
func (s *Store) List() []*Subscription {
s.mu.RLock()
defer s.mu.RUnlock()
subs := make([]*Subscription, 0, len(s.m))
for _, s := range s.m {
subs = append(subs, s)
}
return subs
}

// Clear empties the subscription store
func (s *Store) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
clear(s.m)
}

// match returns the first subscription which matches the Key's Asset, Channel and Pairs
// If the key provided has:
// 1) Empty pairs then only Subscriptions without pairs will be considered
// 2) >=1 pairs then Subscriptions which contain all the pairs will be considered
// This method provides no locking protection
func (s *Store) match(key MatchableKey) *Subscription {
for anyKey, s := range s.m {
if key.Match(anyKey) {
return s
}
}
return nil
}

// Diff returns a list of the added and missing subs between two stores
// The store Diff is invoked upon is read-lock protected
// The new store is assumed to be a new instance and enjoys no locking protection
func (s *Store) Diff(compare *Store) (added, removed List) {
s.mu.RLock()
defer s.mu.RUnlock()
removedMap := maps.Clone(s.m)
for _, sub := range compare.m {
if found := s.get(sub); found != nil {
delete(removedMap, found.key)
} else {
added = append(added, sub)
}
}

for _, c := range removedMap {
removed = append(removed, c)
}

return
}

// Len returns the number of subscriptions
func (s *Store) Len() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.m)
}

0 comments on commit 02f51bb

Please sign in to comment.