diff --git a/exchanges/stream/stream_match.go b/exchanges/stream/stream_match.go index 431bdba9384..9ec0c59d097 100644 --- a/exchanges/stream/stream_match.go +++ b/exchanges/stream/stream_match.go @@ -5,7 +5,7 @@ import ( "sync" ) -// NewMatch returns a new matcher +// NewMatch returns a new Match func NewMatch() *Match { return &Match{ m: make(map[interface{}]chan []byte), @@ -21,6 +21,13 @@ type Match struct { mu sync.Mutex } +// Matcher defines a payload matching return mechanism +type Matcher struct { + C chan []byte + sig interface{} + m *Match +} + // Incoming matches with request, disregarding the returned payload func (m *Match) Incoming(signature interface{}) bool { return m.IncomingWithData(signature, nil) @@ -44,35 +51,28 @@ func (m *Match) IncomingWithData(signature interface{}, data []byte) bool { return false } -// Sets the signature response channel for incoming data -func (m *Match) set(signature interface{}) (matcher, error) { +// Set the signature response channel for incoming data +func (m *Match) Set(signature interface{}) (Matcher, error) { var ch chan []byte m.mu.Lock() if _, ok := m.m[signature]; ok { m.mu.Unlock() - return matcher{}, errors.New("signature collision") + return Matcher{}, errors.New("signature collision") } // This is buffered so we don't need to wait for receiver. ch = make(chan []byte, 1) m.m[signature] = ch m.mu.Unlock() - return matcher{ + return Matcher{ C: ch, sig: signature, m: m, }, nil } -// matcher defines a payload matching return mechanism -type matcher struct { - C chan []byte - sig interface{} - m *Match -} - // Cleanup closes underlying channel and deletes signature from map -func (m *matcher) Cleanup() { +func (m *Matcher) Cleanup() { m.m.mu.Lock() close(m.C) delete(m.m.m, m.sig) diff --git a/exchanges/stream/stream_types.go b/exchanges/stream/stream_types.go index 053e5688a30..5315f0cffa2 100644 --- a/exchanges/stream/stream_types.go +++ b/exchanges/stream/stream_types.go @@ -31,8 +31,16 @@ type Response struct { Raw []byte } +// DefaultChannelKey is the fallback key for AddSuccessfulSubscriptions +type DefaultChannelKey struct { + Channel string + Currency currency.Pair + Asset asset.Item +} + // ChannelSubscription container for streaming subscriptions type ChannelSubscription struct { + Key any Channel string Currency currency.Pair Asset asset.Item diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index 4a6a318a385..3394c18368d 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -48,6 +48,10 @@ var ( errWebsocketConnectorUnset = errors.New("websocket connector function not set") errWebsocketSubscriptionsGeneratorUnset = errors.New("websocket subscriptions generator function needs to be set") errClosedConnection = errors.New("use of closed network connection") + errSubscriptionNotFound = errors.New("subscription not found in list") + errAlreadySubscribed = errors.New("already subscribed") + errNoChannelsInArgs = errors.New("no channels in args") + errKeyCannotBeNil = errors.New("key cannot be nil") ) var globalReporter Reporter @@ -356,9 +360,9 @@ func (w *Websocket) connectionMonitor() error { if w.checkAndSetMonitorRunning() { return errAlreadyRunning } - w.connectionMutex.RLock() + w.fieldMutex.RLock() delay := w.connectionMonitorDelay - w.connectionMutex.RUnlock() + w.fieldMutex.RUnlock() go func() { timer := time.NewTimer(delay) @@ -460,7 +464,7 @@ func (w *Websocket) Shutdown() error { // flush any subscriptions from last connection if needed w.subscriptionMutex.Lock() - w.subscriptions = nil + w.subscriptions = subscriptionMap{} w.subscriptionMutex.Unlock() close(w.ShutdownC) @@ -520,7 +524,7 @@ func (w *Websocket) FlushChannels() error { if len(newsubs) != 0 { // Purge subscription list as there will be conflicts w.subscriptionMutex.Lock() - w.subscriptions = nil + w.subscriptions = subscriptionMap{} w.subscriptionMutex.Unlock() return w.SubscribeToChannels(newsubs) } @@ -612,73 +616,73 @@ func (w *Websocket) trafficMonitor() { } func (w *Websocket) setConnectedStatus(b bool) { - w.connectionMutex.Lock() + w.fieldMutex.Lock() w.connected = b - w.connectionMutex.Unlock() + w.fieldMutex.Unlock() } // IsConnected returns status of connection func (w *Websocket) IsConnected() bool { - w.connectionMutex.RLock() - defer w.connectionMutex.RUnlock() + w.fieldMutex.RLock() + defer w.fieldMutex.RUnlock() return w.connected } func (w *Websocket) setConnectingStatus(b bool) { - w.connectionMutex.Lock() + w.fieldMutex.Lock() w.connecting = b - w.connectionMutex.Unlock() + w.fieldMutex.Unlock() } // IsConnecting returns status of connecting func (w *Websocket) IsConnecting() bool { - w.connectionMutex.RLock() - defer w.connectionMutex.RUnlock() + w.fieldMutex.RLock() + defer w.fieldMutex.RUnlock() return w.connecting } func (w *Websocket) setEnabled(b bool) { - w.connectionMutex.Lock() + w.fieldMutex.Lock() w.enabled = b - w.connectionMutex.Unlock() + w.fieldMutex.Unlock() } // IsEnabled returns status of enabled func (w *Websocket) IsEnabled() bool { - w.connectionMutex.RLock() - defer w.connectionMutex.RUnlock() + w.fieldMutex.RLock() + defer w.fieldMutex.RUnlock() return w.enabled } func (w *Websocket) setInit(b bool) { - w.connectionMutex.Lock() + w.fieldMutex.Lock() w.Init = b - w.connectionMutex.Unlock() + w.fieldMutex.Unlock() } // IsInit returns status of init func (w *Websocket) IsInit() bool { - w.connectionMutex.RLock() - defer w.connectionMutex.RUnlock() + w.fieldMutex.RLock() + defer w.fieldMutex.RUnlock() return w.Init } func (w *Websocket) setTrafficMonitorRunning(b bool) { - w.connectionMutex.Lock() + w.fieldMutex.Lock() w.trafficMonitorRunning = b - w.connectionMutex.Unlock() + w.fieldMutex.Unlock() } // IsTrafficMonitorRunning returns status of the traffic monitor func (w *Websocket) IsTrafficMonitorRunning() bool { - w.connectionMutex.RLock() - defer w.connectionMutex.RUnlock() + w.fieldMutex.RLock() + defer w.fieldMutex.RUnlock() return w.trafficMonitorRunning } func (w *Websocket) checkAndSetMonitorRunning() (alreadyRunning bool) { - w.connectionMutex.Lock() - defer w.connectionMutex.Unlock() + w.fieldMutex.Lock() + defer w.fieldMutex.Unlock() if w.connectionMonitorRunning { return true } @@ -687,28 +691,28 @@ func (w *Websocket) checkAndSetMonitorRunning() (alreadyRunning bool) { } func (w *Websocket) setConnectionMonitorRunning(b bool) { - w.connectionMutex.Lock() + w.fieldMutex.Lock() w.connectionMonitorRunning = b - w.connectionMutex.Unlock() + w.fieldMutex.Unlock() } // IsConnectionMonitorRunning returns status of connection monitor func (w *Websocket) IsConnectionMonitorRunning() bool { - w.connectionMutex.RLock() - defer w.connectionMutex.RUnlock() + w.fieldMutex.RLock() + defer w.fieldMutex.RUnlock() return w.connectionMonitorRunning } func (w *Websocket) setDataMonitorRunning(b bool) { - w.connectionMutex.Lock() + w.fieldMutex.Lock() w.dataMonitorRunning = b - w.connectionMutex.Unlock() + w.fieldMutex.Unlock() } // IsDataMonitorRunning returns status of data monitor func (w *Websocket) IsDataMonitorRunning() bool { - w.connectionMutex.RLock() - defer w.connectionMutex.RUnlock() + w.fieldMutex.RLock() + defer w.fieldMutex.RUnlock() return w.dataMonitorRunning } @@ -845,23 +849,23 @@ func (w *Websocket) GetName() string { // GetChannelDifference finds the difference between the subscribed channels // and the new subscription list when pairs are disabled or enabled. func (w *Websocket) GetChannelDifference(genSubs []ChannelSubscription) (sub, unsub []ChannelSubscription) { - w.subscriptionMutex.Lock() - defer w.subscriptionMutex.Unlock() + w.subscriptionMutex.RLock() + defer w.subscriptionMutex.RUnlock() oldsubs: - for x := range w.subscriptions { + for _, x := range w.subscriptions { for y := range genSubs { - if w.subscriptions[x].Equal(&genSubs[y]) { + if x.Equal(&genSubs[y]) { continue oldsubs } } - unsub = append(unsub, w.subscriptions[x]) + unsub = append(unsub, x) } newsubs: for x := range genSubs { - for y := range w.subscriptions { - if genSubs[x].Equal(&w.subscriptions[y]) { + for _, y := range w.subscriptions { + if genSubs[x].Equal(&y) { //nolint:gosec // for alias var is not closured or stored continue newsubs } } @@ -873,24 +877,24 @@ newsubs: // UnsubscribeChannels unsubscribes from a websocket channel func (w *Websocket) UnsubscribeChannels(channels []ChannelSubscription) error { if len(channels) == 0 { - return fmt.Errorf("%s websocket: channels not populated cannot remove", - w.exchangeName) + return fmt.Errorf("%s websocket: %w", w.exchangeName, errNoChannelsInArgs) } - w.subscriptionMutex.Lock() + w.subscriptionMutex.RLock() channels: for x := range channels { - for y := range w.subscriptions { - if channels[x].Equal(&w.subscriptions[y]) { + for _, y := range w.subscriptions { + if channels[x].Equal(&y) { //nolint:gosec // for alias var is not closured or stored continue channels } } - w.subscriptionMutex.Unlock() - return fmt.Errorf("%s websocket: subscription not found in list: %+v", + w.subscriptionMutex.RUnlock() + return fmt.Errorf("%s websocket: %w: %+v", w.exchangeName, + errSubscriptionNotFound, channels[x]) } - w.subscriptionMutex.Unlock() + w.subscriptionMutex.RUnlock() return w.Unsubscriber(channels) } @@ -906,21 +910,21 @@ func (w *Websocket) ResubscribeToChannel(subscribedChannel *ChannelSubscription) // SubscribeToChannels appends supplied channels to channelsToSubscribe func (w *Websocket) SubscribeToChannels(channels []ChannelSubscription) error { if len(channels) == 0 { - return fmt.Errorf("%s websocket: cannot subscribe no channels supplied", - w.exchangeName) + return fmt.Errorf("%s websocket: %w", w.exchangeName, errNoChannelsInArgs) } - w.subscriptionMutex.Lock() + w.subscriptionMutex.RLock() for x := range channels { - for y := range w.subscriptions { - if channels[x].Equal(&w.subscriptions[y]) { - w.subscriptionMutex.Unlock() - return fmt.Errorf("%s websocket: %v already subscribed", + for _, y := range w.subscriptions { + if channels[x].Equal(&y) { //nolint:gosec // for alias var is not closured or stored + w.subscriptionMutex.RUnlock() + return fmt.Errorf("%s websocket: %v %w", w.exchangeName, - channels[x]) + channels[x], + errAlreadySubscribed) } } } - w.subscriptionMutex.Unlock() + w.subscriptionMutex.RUnlock() if err := w.Subscriber(channels); err != nil { return fmt.Errorf("%v %w: %v", w.exchangeName, ErrSubscriptionFailure, err) } @@ -931,7 +935,13 @@ func (w *Websocket) SubscribeToChannels(channels []ChannelSubscription) error { // has been successfully subscribed func (w *Websocket) AddSuccessfulSubscriptions(channels ...ChannelSubscription) { w.subscriptionMutex.Lock() - w.subscriptions = append(w.subscriptions, channels...) + if w.subscriptions == nil { + w.subscriptions = subscriptionMap{} + } + for i := range channels { + key := channels[i].EnsureKeyed() + w.subscriptions[key] = channels[i] + } w.subscriptionMutex.Unlock() } @@ -941,44 +951,73 @@ func (w *Websocket) RemoveSuccessfulUnsubscriptions(channels ...ChannelSubscript w.subscriptionMutex.Lock() defer w.subscriptionMutex.Unlock() for x := range channels { - for y := range w.subscriptions { - if channels[x].Equal(&w.subscriptions[y]) { - w.subscriptions[y] = w.subscriptions[len(w.subscriptions)-1] - w.subscriptions[len(w.subscriptions)-1] = ChannelSubscription{} - w.subscriptions = w.subscriptions[:len(w.subscriptions)-1] + for _, y := range w.subscriptions { + if channels[x].Equal(&y) { //nolint:gosec // for alias var is not closured or stored + delete(w.subscriptions, y.Key) break } } } } +// EnsureKeyed sets the default key on a channel if it doesn't have one +// Returns key for convenience +func (c *ChannelSubscription) EnsureKeyed() any { + if c.Key == nil { + c.Key = DefaultChannelKey{ + Channel: c.Channel, + Asset: c.Asset, + Currency: c.Currency, + } + } + return c.Key +} + // Equal two WebsocketChannelSubscription to determine equality -func (w *ChannelSubscription) Equal(s *ChannelSubscription) bool { - return strings.EqualFold(w.Channel, s.Channel) && - w.Currency.Equal(s.Currency) +func (c *ChannelSubscription) Equal(b *ChannelSubscription) bool { + return strings.EqualFold(c.Channel, b.Channel) && + c.Currency.Equal(b.Currency) } -// GetSubscriptions returns a copied list of subscriptions -// and is a private member that cannot be manipulated +// GetSubscription returns a pointer to a copy of the subscription at the key provided +// returns nil if no subscription is at that key or the key is nil +func (w *Websocket) GetSubscription(key any) *ChannelSubscription { + if key == nil || w.subscriptions == nil { + return nil + } + w.subscriptionMutex.RLock() + defer w.subscriptionMutex.RUnlock() + if s, ok := w.subscriptions[key]; ok { + c := s + return &c + } + return nil +} + +// GetSubscriptions returns a new slice of the subscriptions func (w *Websocket) GetSubscriptions() []ChannelSubscription { - w.subscriptionMutex.Lock() - defer w.subscriptionMutex.Unlock() - return append(w.subscriptions[:0:0], w.subscriptions...) + w.subscriptionMutex.RLock() + defer w.subscriptionMutex.RUnlock() + subs := make([]ChannelSubscription, 0, len(w.subscriptions)) + for _, c := range w.subscriptions { + subs = append(subs, c) + } + return subs } // SetCanUseAuthenticatedEndpoints sets canUseAuthenticatedEndpoints val in // a thread safe manner func (w *Websocket) SetCanUseAuthenticatedEndpoints(val bool) { - w.subscriptionMutex.Lock() - defer w.subscriptionMutex.Unlock() + w.fieldMutex.Lock() + defer w.fieldMutex.Unlock() w.canUseAuthenticatedEndpoints = val } // CanUseAuthenticatedEndpoints gets canUseAuthenticatedEndpoints val in // a thread safe manner func (w *Websocket) CanUseAuthenticatedEndpoints() bool { - w.subscriptionMutex.Lock() - defer w.subscriptionMutex.Unlock() + w.fieldMutex.RLock() + defer w.fieldMutex.RUnlock() return w.canUseAuthenticatedEndpoints } diff --git a/exchanges/stream/websocket_connection.go b/exchanges/stream/websocket_connection.go index 1a983654166..f82b675b508 100644 --- a/exchanges/stream/websocket_connection.go +++ b/exchanges/stream/websocket_connection.go @@ -22,7 +22,7 @@ import ( // SendMessageReturnResponse will send a WS message to the connection and wait // for response func (w *WebsocketConnection) SendMessageReturnResponse(signature, request interface{}) ([]byte, error) { - m, err := w.Match.set(signature) + m, err := w.Match.Set(signature) if err != nil { return nil, err } diff --git a/exchanges/stream/websocket_test.go b/exchanges/stream/websocket_test.go index a5651d0f7df..e1a493ab163 100644 --- a/exchanges/stream/websocket_test.go +++ b/exchanges/stream/websocket_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" @@ -52,6 +53,10 @@ type testResponse struct { RequestID int64 `json:"reqid,omitempty"` } +type testSubKey struct { + Mood string +} + var defaultSetup = &WebsocketSetup{ ExchangeConfig: &config.Exchange{ Features: &config.FeaturesConfig{ @@ -71,9 +76,9 @@ var defaultSetup = &WebsocketSetup{ GenerateSubscriptions: func() ([]ChannelSubscription, error) { return []ChannelSubscription{ {Channel: "TestSub"}, - {Channel: "TestSub2"}, - {Channel: "TestSub3"}, - {Channel: "TestSub4"}, + {Channel: "TestSub2", Key: "purple"}, + {Channel: "TestSub3", Key: testSubKey{"mauve"}}, + {Channel: "TestSub4", Key: 42}, }, nil }, Features: &protocol.Features{Subscribe: true, Unsubscribe: true}, @@ -495,10 +500,7 @@ func TestWebsocket(t *testing.T) { func TestSubscribeUnsubscribe(t *testing.T) { t.Parallel() ws := *New() - err := ws.Setup(defaultSetup) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, ws.Setup(defaultSetup), "WS Setup should not error") fnSub := func(subs []ChannelSubscription) error { ws.AddSuccessfulSubscriptions(subs...) @@ -511,53 +513,39 @@ func TestSubscribeUnsubscribe(t *testing.T) { ws.Subscriber = fnSub ws.Unsubscriber = fnUnsub - err = ws.UnsubscribeChannels(nil) - if err == nil { - t.Fatal("error cannot be nil") - } - - // Generate test sub subs, err := ws.GenerateSubs() - if err != nil { - t.Fatal(err) - } - - // unsub when no subscribed channel - err = ws.UnsubscribeChannels(subs) - if err == nil { - t.Fatal("error cannot be nil") - } - - err = ws.SubscribeToChannels(subs) - if err != nil { - t.Fatal(err) - } - - // subscribe when already subscribed - err = ws.SubscribeToChannels(subs) - if err == nil { - t.Fatal("error cannot be nil") - } - - // subscribe to nothing - err = ws.SubscribeToChannels(nil) - if err == nil { - t.Fatal("error cannot be nil") - } - - err = ws.UnsubscribeChannels(subs) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err, "Generating test subscriptions should not error") + assert.ErrorIs(t, ws.UnsubscribeChannels(nil), errNoChannelsInArgs, "Unsubscribing from nil should error") + assert.ErrorIs(t, ws.UnsubscribeChannels(subs), errSubscriptionNotFound, "Unsubscribing should error when not subscribed") + assert.Nil(t, ws.GetSubscription(42), "GetSubscription on empty internal map should return") + assert.NoError(t, ws.SubscribeToChannels(subs), "Basic Subscribing should not error") + assert.Len(t, ws.GetSubscriptions(), 4, "Should have 4 subscriptions") + byDefKey := ws.GetSubscription(defaultChannelKey{Channel: "TestSub"}) + if assert.NotNil(t, byDefKey, "GetSubscription by default key should find a channel") { + assert.Equal(t, "TestSub", byDefKey.Channel, "GetSubscription by default key should return a pointer a copy of the right channel") + assert.NotSame(t, byDefKey, ws.subscriptions["TestSub"], "GetSubscription returns a fresh pointer") + } + if assert.NotNil(t, ws.GetSubscription("purple"), "GetSubscription by string key should find a channel") { + assert.Equal(t, "TestSub2", ws.GetSubscription("purple").Channel, "GetSubscription by string key should return a pointer a copy of the right channel") + } + if assert.NotNil(t, ws.GetSubscription(testSubKey{"mauve"}), "GetSubscription by type key should find a channel") { + assert.Equal(t, "TestSub3", ws.GetSubscription(testSubKey{"mauve"}).Channel, "GetSubscription by type key should return a pointer a copy of the right channel") + } + if assert.NotNil(t, ws.GetSubscription(42), "GetSubscription by int key should find a channel") { + assert.Equal(t, "TestSub4", ws.GetSubscription(42).Channel, "GetSubscription by int key should return a pointer a copy of the right channel") + } + assert.Nil(t, ws.GetSubscription(nil), "GetSubscription by nil should return nil") + assert.Nil(t, ws.GetSubscription(45), "GetSubscription by invalid key should return nil") + assert.ErrorIs(t, ws.SubscribeToChannels(subs), errAlreadySubscribed, "Subscribe should error when already subscribed") + assert.ErrorIs(t, ws.SubscribeToChannels(nil), errNoChannelsInArgs, "Subscribe to nil should error") + assert.NoError(t, ws.UnsubscribeChannels(subs), "Unsubscribing should not error") } +// TestResubscribe tests Resubscribing to existing subscriptions func TestResubscribe(t *testing.T) { t.Parallel() ws := *New() - err := ws.Setup(defaultSetup) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, ws.Setup(defaultSetup), "WS Setup should not error") fnSub := func(subs []ChannelSubscription) error { ws.AddSuccessfulSubscriptions(subs...) @@ -571,20 +559,10 @@ func TestResubscribe(t *testing.T) { ws.Unsubscriber = fnUnsub channel := []ChannelSubscription{{Channel: "resubTest"}} - err = ws.ResubscribeToChannel(&channel[0]) - if err == nil { - t.Fatal("error cannot be nil") - } - err = ws.SubscribeToChannels(channel) - if err != nil { - t.Fatal(err) - } - - err = ws.ResubscribeToChannel(&channel[0]) - if err != nil { - t.Fatal("error cannot be nil") - } + assert.ErrorIs(t, ws.ResubscribeToChannel(&channel[0]), errSubscriptionNotFound, "Resubscribe should error when channel isn't subscribed yet") + assert.NoError(t, ws.SubscribeToChannels(channel), "Subscribe should not error") + assert.NoError(t, ws.ResubscribeToChannel(&channel[0]), "Resubscribe should not error now the channel is subscribed") } // TestConnectionMonitorNoConnection logic test @@ -614,15 +592,13 @@ func TestConnectionMonitorNoConnection(t *testing.T) { func TestGetSubscriptions(t *testing.T) { t.Parallel() w := Websocket{ - subscriptions: []ChannelSubscription{ - { + subscriptions: subscriptionMap{ + 42: { Channel: "hello3", }, }, } - if !strings.EqualFold("hello3", w.GetSubscriptions()[0].Channel) { - t.Error("Subscriptions was not copied properly") - } + assert.Equal(t, "hello3", w.GetSubscriptions()[0].Channel, "GetSubscriptions should return the correct channel details") } // TestSetCanUseAuthenticatedEndpoints logic test @@ -1024,7 +1000,7 @@ func TestGetChannelDifference(t *testing.T) { t.Fatal("error mismatch") } - web.subscriptions = subs + web.AddSuccessfulSubscriptions(subs...) flushedSubs := []ChannelSubscription{ { @@ -1169,9 +1145,7 @@ func TestFlushChannels(t *testing.T) { if err != nil { t.Fatal(err) } - web.subscriptionMutex.Lock() - web.subscriptions = subs - web.subscriptionMutex.Unlock() + web.AddSuccessfulSubscriptions(subs...) err = web.FlushChannels() if err != nil { t.Fatal(err) @@ -1191,12 +1165,12 @@ func TestFlushChannels(t *testing.T) { t.Fatal(err) } web.subscriptionMutex.Lock() - web.subscriptions = []ChannelSubscription{ - { + web.subscriptions = subscriptionMap{ + 41: { Channel: "match channel", Currency: currency.NewPair(currency.BTC, currency.AUD), }, - { + 42: { Channel: "unsub channel", Currency: currency.NewPair(currency.THETA, currency.USDT), }, diff --git a/exchanges/stream/websocket_types.go b/exchanges/stream/websocket_types.go index 2db60384f1c..3ebf888293b 100644 --- a/exchanges/stream/websocket_types.go +++ b/exchanges/stream/websocket_types.go @@ -22,6 +22,8 @@ const ( UnhandledMessage = " - Unhandled websocket message: " ) +type subscriptionMap map[any]ChannelSubscription + // Websocket defines a return type for websocket connections via the interface // wrapper for routine processing type Websocket struct { @@ -42,12 +44,12 @@ type Websocket struct { runningURL string runningURLAuth string exchangeName string - m sync.Mutex - connectionMutex sync.RWMutex + m sync.RWMutex + fieldMutex sync.RWMutex connector func() error - subscriptionMutex sync.Mutex - subscriptions []ChannelSubscription + subscriptionMutex sync.RWMutex + subscriptions subscriptionMap Subscribe chan []ChannelSubscription Unsubscribe chan []ChannelSubscription