Skip to content

Commit

Permalink
Websockets: Add keys to websocket subscriptions
Browse files Browse the repository at this point in the history
* This switches all RO uses of the mutex to use a RLock method.
* The mutex used for discrete field access has had scope drift from
  name 'connectionMutex' so rename to more appropriate fieldsMutex
* The mutex used for Set/CanUseAuthEndpoints moves from the
  subscriptions endpoint to the fieldsMutex
* Add GetSubscription by key
* Expose stream.Matcher type
  • Loading branch information
gbjk committed Oct 1, 2023
1 parent 6105071 commit c724690
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 167 deletions.
26 changes: 13 additions & 13 deletions exchanges/stream/stream_match.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
)

// NewMatch returns a new matcher
// NewMatch returns a new Match
func NewMatch() *Match {
return &Match{
m: make(map[interface{}]chan []byte),
Expand All @@ -21,6 +21,13 @@ type Match struct {
mu sync.Mutex
}

// Matcher defines a payload matching return mechanism
type Matcher struct {
C chan []byte
sig interface{}
m *Match
}

// Incoming matches with request, disregarding the returned payload
func (m *Match) Incoming(signature interface{}) bool {
return m.IncomingWithData(signature, nil)
Expand All @@ -44,35 +51,28 @@ func (m *Match) IncomingWithData(signature interface{}, data []byte) bool {
return false
}

// Sets the signature response channel for incoming data
func (m *Match) set(signature interface{}) (matcher, error) {
// Set the signature response channel for incoming data
func (m *Match) Set(signature interface{}) (Matcher, error) {
var ch chan []byte
m.mu.Lock()
if _, ok := m.m[signature]; ok {
m.mu.Unlock()
return matcher{}, errors.New("signature collision")
return Matcher{}, errors.New("signature collision")
}
// This is buffered so we don't need to wait for receiver.
ch = make(chan []byte, 1)
m.m[signature] = ch
m.mu.Unlock()

return matcher{
return Matcher{
C: ch,
sig: signature,
m: m,
}, nil
}

// matcher defines a payload matching return mechanism
type matcher struct {
C chan []byte
sig interface{}
m *Match
}

// Cleanup closes underlying channel and deletes signature from map
func (m *matcher) Cleanup() {
func (m *Matcher) Cleanup() {
m.m.mu.Lock()
close(m.C)
delete(m.m.m, m.sig)
Expand Down
8 changes: 8 additions & 0 deletions exchanges/stream/stream_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,16 @@ type Response struct {
Raw []byte
}

// DefaultChannelKey is the fallback key for AddSuccessfulSubscriptions
type DefaultChannelKey struct {
Channel string
Currency currency.Pair
Asset asset.Item
}

// ChannelSubscription container for streaming subscriptions
type ChannelSubscription struct {
Key any
Channel string
Currency currency.Pair
Asset asset.Item
Expand Down
Loading

0 comments on commit c724690

Please sign in to comment.