diff --git a/exchanges/kraken/kraken_test.go b/exchanges/kraken/kraken_test.go index 8530cb53b8d..6b820a58af5 100644 --- a/exchanges/kraken/kraken_test.go +++ b/exchanges/kraken/kraken_test.go @@ -1248,10 +1248,10 @@ func setupWsTests(t *testing.T) { // TestWebsocketSubscribe tests returning a message with an id func TestWebsocketSubscribe(t *testing.T) { setupWsTests(t) - err := k.Subscribe([]subscription.Subscription{ + err := k.Subscribe(subscription.List{ { Channel: defaultSubscribedChannels[0], - Pair: currency.NewPairWithDelimiter("XBT", "USD", "/"), + Pairs: currency.Pairs{currency.NewPairWithDelimiter("XBT", "USD", "/")}, }, }) if err != nil { diff --git a/exchanges/kraken/kraken_types.go b/exchanges/kraken/kraken_types.go index dcd51b155d6..d59d75a77f3 100644 --- a/exchanges/kraken/kraken_types.go +++ b/exchanges/kraken/kraken_types.go @@ -500,11 +500,11 @@ type WithdrawStatusResponse struct { // WebsocketSubscriptionEventRequest handles WS subscription events type WebsocketSubscriptionEventRequest struct { - Event string `json:"event"` // subscribe - RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. - Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3). - Subscription WebsocketSubscriptionData `json:"subscription,omitempty"` - Channels []subscription.Subscription `json:"-"` // Keeps track of associated subscriptions in batched outgoings + Event string `json:"event"` // subscribe + RequestID int64 `json:"reqid,omitempty"` // Optional, client originated ID reflected in response message. + Pairs []string `json:"pair,omitempty"` // Array of currency pairs (pair1,pair2,pair3). + Subscription WebsocketSubscriptionData `json:"subscription,omitempty"` + Channels subscription.List `json:"-"` // Keeps track of associated subscriptions in batched outgoings } // WebsocketBaseEventRequest Just has an "event" property diff --git a/exchanges/kraken/kraken_websocket.go b/exchanges/kraken/kraken_websocket.go index fd4325164e2..67c38932a9f 100644 --- a/exchanges/kraken/kraken_websocket.go +++ b/exchanges/kraken/kraken_websocket.go @@ -859,7 +859,7 @@ func (k *Kraken) wsProcessOrderBook(channelData *WebsocketChannelData, data map[ } }(&subscription.Subscription{ Channel: krakenWsOrderbook, - Pair: outbound, + Pairs: currency.Pairs{outbound}, Asset: asset.Spot, }) return err @@ -1210,25 +1210,25 @@ func (k *Kraken) wsProcessCandles(channelData *WebsocketChannelData, data []inte } // GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions() -func (k *Kraken) GenerateDefaultSubscriptions() ([]subscription.Subscription, error) { +func (k *Kraken) GenerateDefaultSubscriptions() (subscription.List, error) { enabledPairs, err := k.GetEnabledPairs(asset.Spot) if err != nil { return nil, err } - var subscriptions []subscription.Subscription + var subscriptions subscription.List for i := range defaultSubscribedChannels { for j := range enabledPairs { enabledPairs[j].Delimiter = "/" - subscriptions = append(subscriptions, subscription.Subscription{ + subscriptions = append(subscriptions, &subscription.Subscription{ Channel: defaultSubscribedChannels[i], - Pair: enabledPairs[j], + Pairs: currency.Pairs{enabledPairs[j]}, Asset: asset.Spot, }) } } if k.Websocket.CanUseAuthenticatedEndpoints() { for i := range authenticatedChannels { - subscriptions = append(subscriptions, subscription.Subscription{ + subscriptions = append(subscriptions, &subscription.Subscription{ Channel: authenticatedChannels[i], }) } @@ -1237,7 +1237,7 @@ func (k *Kraken) GenerateDefaultSubscriptions() ([]subscription.Subscription, er } // Subscribe sends a websocket message to receive data from the channel -func (k *Kraken) Subscribe(channelsToSubscribe []subscription.Subscription) error { +func (k *Kraken) Subscribe(channelsToSubscribe subscription.List) error { var subscriptions = make(map[string]*[]WebsocketSubscriptionEventRequest) channels: for i := range channelsToSubscribe { @@ -1248,7 +1248,7 @@ channels: } for j := range *s { - (*s)[j].Pairs = append((*s)[j].Pairs, channelsToSubscribe[i].Pair.String()) + (*s)[j].Pairs = append((*s)[j].Pairs, channelsToSubscribe[i].Pairs[0].String()) (*s)[j].Channels = append((*s)[j].Channels, channelsToSubscribe[i]) continue channels } @@ -1264,8 +1264,8 @@ channels: if channelsToSubscribe[i].Channel == "book" { outbound.Subscription.Depth = krakenWsOrderbookDepth } - if !channelsToSubscribe[i].Pair.IsEmpty() { - outbound.Pairs = []string{channelsToSubscribe[i].Pair.String()} + if !channelsToSubscribe[i].Pairs[0].IsEmpty() { + outbound.Pairs = []string{channelsToSubscribe[i].Pairs[0].String()} } if common.StringDataContains(authenticatedChannels, channelsToSubscribe[i].Channel) { outbound.Subscription.Token = authToken @@ -1278,35 +1278,34 @@ channels: var errs error for _, subs := range subscriptions { for i := range *subs { + var err error if common.StringDataContains(authenticatedChannels, (*subs)[i].Subscription.Name) { - _, err := k.Websocket.AuthConn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i]) - if err != nil { - errs = common.AppendError(errs, err) - continue - } - k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...) - continue + _, err = k.Websocket.AuthConn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i]) + } else { + _, err = k.Websocket.Conn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i]) } - _, err := k.Websocket.Conn.SendMessageReturnResponse((*subs)[i].RequestID, (*subs)[i]) if err != nil { errs = common.AppendError(errs, err) - continue + } else { + for _, sub := range (*subs)[i].Channels { + sub.SetState(subscription.SubscribedState) + k.Websocket.AddSubscription(sub) + } } - k.Websocket.AddSuccessfulSubscriptions((*subs)[i].Channels...) } } return errs } // Unsubscribe sends a websocket message to stop receiving data from the channel -func (k *Kraken) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) error { +func (k *Kraken) Unsubscribe(channelsToUnsubscribe subscription.List) error { var unsubs []WebsocketSubscriptionEventRequest channels: for x := range channelsToUnsubscribe { for y := range unsubs { if unsubs[y].Subscription.Name == channelsToUnsubscribe[x].Channel { unsubs[y].Pairs = append(unsubs[y].Pairs, - channelsToUnsubscribe[x].Pair.String()) + channelsToUnsubscribe[x].Pairs[0].String()) unsubs[y].Channels = append(unsubs[y].Channels, channelsToUnsubscribe[x]) continue channels @@ -1326,7 +1325,7 @@ channels: unsub := WebsocketSubscriptionEventRequest{ Event: krakenWsUnsubscribe, - Pairs: []string{channelsToUnsubscribe[x].Pair.String()}, + Pairs: []string{channelsToUnsubscribe[x].Pairs[0].String()}, Subscription: WebsocketSubscriptionData{ Name: channelsToUnsubscribe[x].Channel, Depth: depth, @@ -1348,7 +1347,7 @@ channels: errs = common.AppendError(errs, err) continue } - k.Websocket.RemoveSubscriptions(unsubs[i].Channels...) + k.Websocket.RemoveSubscriptions(unsubs[i].Channels) continue } @@ -1357,7 +1356,7 @@ channels: errs = common.AppendError(errs, err) continue } - k.Websocket.RemoveSubscriptions(unsubs[i].Channels...) + k.Websocket.RemoveSubscriptions(unsubs[i].Channels) } return errs }