Skip to content

Commit

Permalink
Websockets: Add keys to websocket subscriptions
Browse files Browse the repository at this point in the history
* This switches all RO uses of the mutex to use a RLock method.
* The mutex used for discrete field access has had scope drift from
  name 'connectionMutex' so rename to more appropriate fieldsMutex
* The mutex used for Set/CanUseAuthEndpoints moves from the
  subscriptions endpoint to the fieldsMutex
* Add GetSubscription by key
* Expose stream.Matcher type
  • Loading branch information
gbjk committed Oct 1, 2023
1 parent 6105071 commit 505ada1
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 170 deletions.
28 changes: 14 additions & 14 deletions exchanges/stream/stream_match.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
)

// NewMatch returns a new matcher
// NewMatch returns a new Match
func NewMatch() *Match {
return &Match{
m: make(map[interface{}]chan []byte),
Expand All @@ -21,13 +21,20 @@ type Match struct {
mu sync.Mutex
}

// Matcher defines a payload matching return mechanism
type Matcher struct {
C chan []byte
sig interface{}
m *Match
}

// Incoming matches with request, disregarding the returned payload
func (m *Match) Incoming(signature interface{}) bool {
return m.IncomingWithData(signature, nil)
}

// IncomingWithData matches with requests and takes in the returned payload, to
// be processed outside of a stream processing routine
// be processed outside of a stream processing routine and returns true if a handler was found
func (m *Match) IncomingWithData(signature interface{}, data []byte) bool {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -44,35 +51,28 @@ func (m *Match) IncomingWithData(signature interface{}, data []byte) bool {
return false
}

// Sets the signature response channel for incoming data
func (m *Match) set(signature interface{}) (matcher, error) {
// Set the signature response channel for incoming data
func (m *Match) Set(signature interface{}) (Matcher, error) {
var ch chan []byte
m.mu.Lock()
if _, ok := m.m[signature]; ok {
m.mu.Unlock()
return matcher{}, errors.New("signature collision")
return Matcher{}, errors.New("signature collision")
}
// This is buffered so we don't need to wait for receiver.
ch = make(chan []byte, 1)
m.m[signature] = ch
m.mu.Unlock()

return matcher{
return Matcher{
C: ch,
sig: signature,
m: m,
}, nil
}

// matcher defines a payload matching return mechanism
type matcher struct {
C chan []byte
sig interface{}
m *Match
}

// Cleanup closes underlying channel and deletes signature from map
func (m *matcher) Cleanup() {
func (m *Matcher) Cleanup() {
m.m.mu.Lock()
close(m.C)
delete(m.m.m, m.sig)
Expand Down
4 changes: 2 additions & 2 deletions exchanges/stream/stream_match_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ func TestMatch(t *testing.T) {
t.Fatal("should not be able to match")
}

m, err := nm.set("hello")
m, err := nm.Set("hello")
if err != nil {
t.Fatal(err)
}

_, err = nm.set("hello")
_, err = nm.Set("hello")
if err == nil {
t.Fatal("error cannot be nil as this collision cannot occur")
}
Expand Down
8 changes: 8 additions & 0 deletions exchanges/stream/stream_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,16 @@ type Response struct {
Raw []byte
}

// DefaultChannelKey is the fallback key for AddSuccessfulSubscriptions
type DefaultChannelKey struct {
Channel string
Currency currency.Pair
Asset asset.Item
}

// ChannelSubscription container for streaming subscriptions
type ChannelSubscription struct {
Key any
Channel string
Currency currency.Pair
Asset asset.Item
Expand Down
Loading

0 comments on commit 505ada1

Please sign in to comment.