Skip to content

Commit

Permalink
WIP - Rebase and reword me
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Oct 8, 2023
1 parent 9df1b6e commit b618ab9
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 19 deletions.
47 changes: 36 additions & 11 deletions exchanges/kraken/kraken_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ func (k *Kraken) wsHandleData(respRaw []byte) error {

c := k.Websocket.GetSubscription(stream.DefaultChannelKey{Channel: channelName, Currency: wsPair, Asset: asset.Spot})
if c == nil {
return fmt.Errorf("%w: %s %s %s", stream.ErrSubscriptionNotFound, asset.Spot, channelName, wsPair)
return fmt.Errorf("PINGU: %w: %s %s %s", stream.ErrSubscriptionNotFound, asset.Spot, channelName, wsPair)
}
if c.IsPending() {
log.Debugln(log.Global, "PINGU: Found it, and it's pending, but it's all good in the hood bro'")
}

return k.wsReadDataResponse(c, dataResponse)
Expand Down Expand Up @@ -1204,17 +1207,12 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error {
c.Asset = asset.Spot
}

key, ok := c.EnsureKeyed().(stream.DefaultChannelKey)
if !ok {
return common.GetTypeAssertError("stream.DefaultChannelKey", c.Key, "subscription.Key") // Should be impossible
if err := ensureChannelKeyed(c); err != nil {

Check failure on line 1210 in exchanges/kraken/kraken_websocket.go

View workflow job for this annotation

GitHub Actions / lint

undefined: ensureChannelKeyed) (typecheck)

Check failure on line 1210 in exchanges/kraken/kraken_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end 32-bit with PSQL

undefined: ensureChannelKeyed

Check failure on line 1210 in exchanges/kraken/kraken_websocket.go

View workflow job for this annotation

GitHub Actions / GoCryptoTrader back-end with PSQL

undefined: ensureChannelKeyed
return err
}

if r.Subscription.Depth > 0 {
key.Channel += "-" + strconv.Itoa(r.Subscription.Depth) // All responses will have book-N as the channel name
}

if r.Subscription.Interval > 0 {
key.Channel += "-" + strconv.Itoa(r.Subscription.Interval) // All responses will have ohlc-N as the channel name
if !k.Websocket.AddPendingSubscription(*c) {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, stream.ErrSubscriptionPending)
}

conn := k.Websocket.Conn
Expand All @@ -1225,16 +1223,17 @@ func (k *Kraken) subscribeToChan(c *stream.ChannelSubscription) error {

respRaw, err := conn.SendMessageReturnResponse(r.RequestID, r)
if err != nil {
k.Websocket.RemoveSubscription(*c)
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err)
}

if err = k.getErrResp(respRaw); err != nil {
wErr := fmt.Errorf("%w Channel: %s Pair: %s; %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err)
k.Websocket.DataHandler <- wErr
k.Websocket.RemoveSubscription(*c)
return wErr
}

c.Key = key
k.Websocket.AddSuccessfulSubscriptions(*c)
if k.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s Pair: %s\n", k.Name, c.Channel, c.Currency)
Expand All @@ -1250,6 +1249,12 @@ func (k *Kraken) unsubscribeFromChan(c *stream.ChannelSubscription) error {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, err)
}

c.EnsureKeyed()

if !k.Websocket.AddPendingSubscription(*c) {
return fmt.Errorf("PINGU: %w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Currency, stream.ErrSubscriptionPending)
}

conn := k.Websocket.Conn
if common.StringDataContains(authenticatedChannels, c.Channel) {
conn = k.Websocket.AuthConn
Expand Down Expand Up @@ -1296,6 +1301,26 @@ func (k *Kraken) reqForSub(e string, c *stream.ChannelSubscription) (*WebsocketS
return r, err
}

// ensureKeyed wraps the channel EnsureKeyed to add channel name suffixes for Depth and Interval
func ensureKeyed(c *stream.ChannelSubscription, r *WebsocketSubRequest) error {
key, ok := c.EnsureKeyed().(stream.DefaultChannelKey)
if !ok {
return common.GetTypeAssertError("stream.DefaultChannelKey", c.Key, "subscription.Key") // Should be impossible
}

if r.Subscription.Depth > 0 {
key.Channel += "-" + strconv.Itoa(r.Subscription.Depth) // All responses will have book-N as the channel name
}

if r.Subscription.Interval > 0 {
key.Channel += "-" + strconv.Itoa(r.Subscription.Interval) // All responses will have ohlc-N as the channel name
}

c.Key = key

return nil
}

func depthFromChan(c *stream.ChannelSubscription) (int, error) {
depthAny, ok := c.Params[ChannelOrderbookDepthKey]
if !ok {
Expand Down
1 change: 1 addition & 0 deletions exchanges/stream/stream_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type ChannelSubscription struct {
Currency currency.Pair
Asset asset.Item
Params map[string]interface{}
pending bool
}

// ConnectionSetup defines variables for an individual stream connection
Expand Down
52 changes: 44 additions & 8 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var (
ErrSubscriptionFailure = errors.New("subscription failure")
// ErrUnsubscribeFailure defines an error when a unsubscribe fails
ErrUnsubscribeFailure = errors.New("unsubscribe failure")
// ErrSubscriptionPending defins an error when a subscription is already pending an operation result

Check failure on line 33 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / Spell checker

defins ==> defines, define
ErrSubscriptionPending = errors.New("subscripton update already happening")
// ErrAlreadyDisabled is returned when you double-disable the websocket
ErrAlreadyDisabled = errors.New("websocket already disabled")
// ErrNotConnected defines an error when websocket is not connected
Expand Down Expand Up @@ -934,32 +936,66 @@ func (w *Websocket) SubscribeToChannels(channels []ChannelSubscription) error {
return nil
}

// AddPendingSubscription adds a subscription to the subscription lists, or updates it if it exists.
// The pending operation could be subscribe or unsubscribe. The purpose of setting this state is to block other similar operations
// If the subscription already exists then the state is changed to pending but no other fields are merged
// Returns true on success, false if the subscription is already in pending state
func (w *Websocket) AddPendingSubscription(c ChannelSubscription) bool {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
if w.subscriptions == nil {
w.subscriptions = subscriptionMap{}
}
key := c.EnsureKeyed()
p, ok := w.subscriptions[key]
if !ok {
p = c
w.subscriptions[key] = p
}
if p.pending {
return false
}
p.pending = true
return true
}

func (c *ChannelSubscription) IsPending() bool {

Check warning on line 962 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / lint

exported: exported method ChannelSubscription.IsPending should have comment or be unexported (revive)
return c.pending
}

// RemoveSubscription removes subscriptions from the subscription list
// It is a duplicate of RemoveSuccessfulUnsubscriptions for shorterm naming clarity
// We should harmonise Add* and Remove* to not imply sub state in these method names
func (w *Websocket) RemoveSubscription(c ChannelSubscription) {
w.RemoveSuccessfulUnsubscriptions(c)
}

// AddSuccessfulSubscriptions adds subscriptions to the subscription lists that
// has been successfully subscribed
func (w *Websocket) AddSuccessfulSubscriptions(channels ...ChannelSubscription) {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
if w.subscriptions == nil {
w.subscriptions = subscriptionMap{}
}
for i := range channels {
key := channels[i].EnsureKeyed()
channels[i].pending = false
w.subscriptions[key] = channels[i]
}
w.subscriptionMutex.Unlock()
}

// RemoveSuccessfulUnsubscriptions removes subscriptions from the subscription
// list that has been successfulling unsubscribed
func (w *Websocket) RemoveSuccessfulUnsubscriptions(channels ...ChannelSubscription) {
w.subscriptionMutex.Lock()
defer w.subscriptionMutex.Unlock()
for x := range channels {
for _, y := range w.subscriptions {
if channels[x].Equal(&y) { //nolint:gosec // for alias var is not closured or stored
delete(w.subscriptions, y.Key)
break
}
}
if w.subscriptions == nil {
w.subscriptions = subscriptionMap{}
}
for i := range channels {
key := channels[i].EnsureKeyed()
delete(w.subscriptions, key)
}
}

Expand Down
6 changes: 6 additions & 0 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,12 @@ func TestResubscribe(t *testing.T) {
assert.NoError(t, ws.ResubscribeToChannel(&channel[0]), "Resubscribe should not error now the channel is subscribed")
}

// TestPendingSubscriptions tests AddPendingSubscription and isSubscriptionPending interactions
func TestPendingSubscriptions(t *testing.T) {
// DO NOT COMMIT
t.Error("G didn't write this test. Why didn't G write this? Is G dead? Is this a cry for help? If so, Why are you ignoring it?")
}

// TestConnectionMonitorNoConnection logic test
func TestConnectionMonitorNoConnection(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit b618ab9

Please sign in to comment.