Skip to content

Commit

Permalink
Kucoin: Fix Subscription response handling
Browse files Browse the repository at this point in the history
Fixes [review
comment](#1394 (comment))
  • Loading branch information
gbjk committed Nov 21, 2023
1 parent 8071e2a commit 15b3258
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
1 change: 1 addition & 0 deletions exchanges/kucoin/kucoin_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
errSizeOrFundIsRequired = errors.New("at least one required among size and funds")
errInvalidLeverage = errors.New("invalid leverage value")
errInvalidClientOrderID = errors.New("no client order ID supplied, this endpoint requires a UUID or similar string")
errInvalidMsgType = errors.New("message type field not valid")

subAccountRegExp = regexp.MustCompile("^[a-zA-Z0-9]{7-32}$")
subAccountPassphraseRegExp = regexp.MustCompile("^[a-zA-Z0-9]{7-24}$")
Expand Down
25 changes: 22 additions & 3 deletions exchanges/kucoin/kucoin_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/buger/jsonparser"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
Expand Down Expand Up @@ -198,6 +199,12 @@ func (ku *Kucoin) wsHandleData(respData []byte) error {
if err != nil {
return err
}
if resp.ID != "" {
if !ku.Websocket.Match.IncomingWithData("msgID:"+resp.ID, respData) {
return fmt.Errorf("message listener not found: %s", resp.ID)
}
return nil
}
if resp.Type == "pong" || resp.Type == "welcome" {
return nil
}
Expand Down Expand Up @@ -909,17 +916,29 @@ func (ku *Kucoin) Unsubscribe(subscriptions []subscription.Subscription) error {
func (ku *Kucoin) handleSubscriptions(subs []subscription.Subscription, operation string) error {
var errs error
for i := range subs {
msgID := strconv.FormatInt(ku.Websocket.Conn.GenerateMessageID(false), 10)
req := WsSubscriptionInput{
ID: strconv.FormatInt(ku.Websocket.Conn.GenerateMessageID(false), 10),
ID: msgID,
Type: operation,
Topic: subs[i].Channel,
PrivateChannel: subs[i].Authenticated,
Response: true,
}
if err := ku.Websocket.Conn.SendJSONMessage(req); err != nil {
if respRaw, err := ku.Websocket.Conn.SendMessageReturnResponse("msgID:"+msgID, req); err != nil {
errs = common.AppendError(errs, err)
} else {
ku.Websocket.AddSuccessfulSubscriptions(subs[i])
rType, err := jsonparser.GetUnsafeString(respRaw, "type")
switch {
case err != nil:
errs = common.AppendError(errs, err)
case rType != "ack":
errs = common.AppendError(errs, fmt.Errorf("%w: %s", errInvalidMsgType, rType))
default:
ku.Websocket.AddSuccessfulSubscriptions(subs[i])
if ku.Verbose {
log.Debugf(log.ExchangeSys, "%s Subscribed to Channel: %s", ku.Name, subs[i].Channel)
}
}
}
}
return errs
Expand Down

0 comments on commit 15b3258

Please sign in to comment.