Skip to content

Commit

Permalink
Bitstamp: Handle sub/unsub responses
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Dec 7, 2024
1 parent 04c4f75 commit b189cd2
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 85 deletions.
22 changes: 21 additions & 1 deletion exchanges/bitstamp/bitstamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ func TestWsOrderbook(t *testing.T) {

pressXToJSON = []byte(`{"data": {"timestamp": "1580336834", "microtimestamp": "1580336834607546", "bids": [["9328.28", "0.05925332"], ["9327.34", "0.43120000"], ["9327.29", "0.63470860"], ["9326.59", "0.41114619"], ["9326.38", "1.06910000"], ["9323.91", "2.67930000"], ["9322.69", "0.80000000"], ["9322.57", "0.03000000"], ["9322.31", "1.36010820"], ["9319.54", "0.03090000"], ["9318.97", "0.28000000"], ["9317.61", "0.02910000"], ["9316.39", "1.08000000"], ["9316.20", "2.00000000"], ["9315.48", "1.00000000"], ["9314.72", "0.11197459"], ["9314.47", "0.32207398"], ["9312.53", "0.03961501"], ["9312.29", "1.00000000"], ["9311.78", "0.03060000"], ["9311.69", "0.32217221"], ["9310.98", "3.29000000"], ["9310.18", "0.01304192"], ["9310.13", "0.02500000"], ["9309.04", "1.00000000"], ["9309.00", "0.05000000"], ["9308.96", "0.03030000"], ["9308.91", "0.32227154"], ["9307.52", "0.32191362"], ["9307.25", "2.44280000"], ["9305.92", "3.00000000"], ["9305.62", "2.37600000"], ["9305.60", "0.21815312"], ["9305.54", "2.80000000"], ["9305.13", "0.05000000"], ["9305.02", "2.90917302"], ["9303.68", "0.02316372"], ["9303.53", "12.55000000"], ["9303.00", "0.02191430"], ["9302.94", "2.38250000"], ["9302.37", "0.01000000"], ["9301.85", "2.50000000"], ["9300.89", "0.02000000"], ["9300.40", "4.10000000"], ["9300.00", "0.33936139"], ["9298.48", "1.45200000"], ["9297.80", "0.42380000"], ["9295.44", "4.54689328"], ["9295.43", "3.20000000"], ["9295.00", "0.28669566"], ["9291.66", "14.09931321"], ["9290.13", "2.87254900"], ["9290.00", "0.67530840"], ["9285.37", "0.38033002"], ["9285.15", "5.37993528"], ["9285.00", "0.09419278"], ["9283.71", "0.15679830"], ["9280.33", "12.55000000"], ["9280.13", "3.20310000"], ["9280.00", "1.36477909"], ["9276.01", "0.00707488"], ["9275.75", "0.56974291"], ["9275.00", "5.88000000"], ["9274.00", "0.00754205"], ["9271.68", "0.01400000"], ["9271.11", "15.37188500"], ["9270.00", "0.06674325"], ["9268.79", "24.54320000"], ["9257.18", "12.55000000"], ["9256.30", "0.17876365"], ["9255.71", "13.82642967"], ["9254.79", "0.96329407"], ["9250.00", "0.78214958"], ["9245.34", "4.90200000"], ["9245.13", "0.10000000"], ["9240.00", "0.44383459"], ["9238.84", "13.16615207"], ["9234.11", "0.43317656"], ["9234.10", "12.55000000"], ["9231.28", "11.79290000"], ["9230.09", "4.15059441"], ["9227.69", "0.00791097"], ["9225.00", "0.44768346"], ["9224.49", "0.85857203"], ["9223.50", "5.61001041"], ["9216.01", "0.03222653"], ["9216.00", "0.05000000"], ["9213.54", "0.71253866"], ["9212.50", "2.86768195"], ["9211.07", "12.55000000"], ["9210.00", "0.54288817"], ["9208.00", "1.00000000"], ["9206.06", "2.62587578"], ["9205.98", "15.40000000"], ["9205.52", "0.01710603"], ["9205.37", "0.03524953"], ["9205.11", "0.15000000"], ["9205.00", "0.01534763"], ["9204.76", "7.00600000"], ["9203.00", "0.01090000"]], "asks": [["9337.10", "0.03000000"], ["9340.85", "2.67820000"], ["9340.95", "0.02900000"], ["9341.17", "1.00000000"], ["9341.41", "2.13966390"], ["9341.61", "0.20000000"], ["9341.97", "0.11199911"], ["9341.98", "3.00000000"], ["9342.26", "0.32112762"], ["9343.87", "1.00000000"], ["9344.17", "3.57250000"], ["9345.04", "0.32103450"], ["9345.41", "4.90000000"], ["9345.69", "1.03000000"], ["9345.80", "0.03000000"], ["9346.00", "0.10200000"], ["9346.69", "0.02397394"], ["9347.41", "1.00000000"], ["9347.82", "0.32094177"], ["9348.23", "0.02880000"], ["9348.62", "11.96287551"], ["9349.31", "2.44270000"], ["9349.47", "0.96000000"], ["9349.86", "4.50000000"], ["9350.37", "0.03300000"], ["9350.57", "0.34682266"], ["9350.60", "0.32085527"], ["9351.45", "0.31147923"], ["9352.31", "0.28000000"], ["9352.86", "9.80000000"], ["9353.73", "0.02360739"], ["9354.00", "0.45000000"], ["9354.12", "0.03000000"], ["9354.29", "3.82446861"], ["9356.20", "0.64000000"], ["9356.90", "0.02316372"], ["9357.30", "2.50000000"], ["9357.70", "2.38240000"], ["9358.92", "6.00000000"], ["9359.97", "0.34898075"], ["9359.98", "2.30000000"], ["9362.56", "2.37600000"], ["9365.00", "0.64000000"], ["9365.16", "1.70030306"], ["9365.27", "3.03000000"], ["9369.99", "2.47102665"], ["9370.00", "3.15688574"], ["9370.21", "2.32720000"], ["9371.78", "13.20000000"], ["9371.89", "0.96293482"], ["9375.08", "4.74762500"], ["9384.34", "1.45200000"], ["9384.49", "16.42310000"], ["9385.66", "0.34382112"], ["9388.19", "0.00268265"], ["9392.20", "0.20980000"], ["9392.40", "0.10320000"], ["9393.00", "0.20980000"], ["9395.40", "0.40000000"], ["9398.86", "24.54310000"], ["9400.00", "0.05489988"], ["9400.33", "0.00495100"], ["9400.45", "0.00484700"], ["9402.92", "17.20000000"], ["9404.18", "10.00000000"], ["9418.89", "16.38000000"], ["9419.41", "3.06700000"], ["9420.40", "12.50000000"], ["9421.11", "0.10500000"], ["9434.47", "0.03215805"], ["9434.48", "0.28285714"], ["9434.49", "15.83000000"], ["9435.13", "0.15000000"], ["9438.93", "0.00368800"], ["9439.19", "0.69343985"], ["9442.86", "0.10000000"], ["9443.96", "12.50000000"], ["9444.00", "0.06004471"], ["9444.97", "0.01494896"], ["9447.00", "0.01234000"], ["9448.97", "0.14500000"], ["9449.00", "0.05000000"], ["9450.00", "11.13426018"], ["9451.87", "15.90000000"], ["9452.00", "0.20000000"], ["9454.25", "0.01100000"], ["9454.51", "0.02409062"], ["9455.05", "0.00600063"], ["9456.00", "0.27965118"], ["9456.10", "0.17000000"], ["9459.00", "0.00320000"], ["9459.98", "0.02460685"], ["9459.99", "8.11000000"], ["9460.00", "0.08500000"], ["9464.36", "0.56957951"], ["9464.54", "0.69158059"], ["9465.00", "21.00002015"], ["9467.57", "12.50000000"], ["9468.00", "0.08800000"], ["9469.09", "13.94000000"]]}, "event": "data", "channel": ""}`)
err = b.wsHandleData(pressXToJSON)
require.Equal(t, errWSPairParsingError, err, "TestWsOrderbook must error parsing error")
require.Equal(t, errParsingWSPair, err, "TestWsOrderbook must error parsing error")
}

func TestWsOrderbook2(t *testing.T) {
Expand Down Expand Up @@ -1062,3 +1062,23 @@ func TestGenerateSubscriptions(t *testing.T) {
"should panic on invalid channel",
)
}

func TestSubscribe(t *testing.T) {
t.Parallel()
b := new(Bitstamp)
require.NoError(t, testexch.Setup(b), "Test instance Setup must not error")
subs, err := b.Features.Subscriptions.ExpandTemplates(b)
require.NoError(t, err, "ExpandTemplates must not error")
b.Features.Subscriptions = subscription.List{}
testexch.SetupWs(t, b)
err = b.Subscribe(subs)
require.NoError(t, err, "Subscribe must not error")
for _, s := range subs {
assert.Equalf(t, subscription.SubscribedState, s.State(), "Subscription %s should be subscribed", s)
}
err = b.Unsubscribe(subs)
require.NoError(t, err, "UnSubscribe must not error")
for _, s := range subs {
assert.Equalf(t, subscription.UnsubscribedState, s.State(), "Subscription %s should be subscribed", s)
}
}
10 changes: 2 additions & 8 deletions exchanges/bitstamp/bitstamp_types.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package bitstamp

import (
"errors"

"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/types"
)
Expand All @@ -21,8 +19,6 @@ const (
SellOrder
)

var errWSPairParsingError = errors.New("unable to parse currency pair from wsResponse.Channel")

// Ticker holds ticker information
type Ticker struct {
Last float64 `json:"last,string"`
Expand Down Expand Up @@ -220,10 +216,8 @@ type websocketData struct {
}

type websocketResponse struct {
Event string `json:"event"`
Channel string `json:"channel"`
channelType string
pair currency.Pair
Event string `json:"event"`
Channel string `json:"channel"`
}

type websocketTradeResponse struct {
Expand Down
154 changes: 79 additions & 75 deletions exchanges/bitstamp/bitstamp_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"text/template"
"time"

"github.com/buger/jsonparser"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/currency"
Expand All @@ -32,6 +33,11 @@ const (
)

var (
errParsingWSField = errors.New("error parsing WS field")
errParsingWSPair = errors.New("unable to parse currency pair from wsResponse.Channel")
errChannelHyphens = errors.New("channel name does not contain exactly 0 or 2 hyphens")
errChannelUnderscores = errors.New("channel name does not contain exactly 2 underscores")

hbMsg = []byte(`{"event":"bts:heartbeat"}`)
)

Expand Down Expand Up @@ -94,78 +100,55 @@ func (b *Bitstamp) wsReadData() {
}

func (b *Bitstamp) wsHandleData(respRaw []byte) error {
wsResponse := &websocketResponse{}
if err := json.Unmarshal(respRaw, wsResponse); err != nil {
return err
}

if err := b.parseChannelName(wsResponse); err != nil {
return err
event, err := jsonparser.GetUnsafeString(respRaw, "event")
if err != nil {
return fmt.Errorf("%w `event`: %w", errParsingWSField, err)
}

switch wsResponse.Event {
case "bts:heartbeat":
event = strings.TrimPrefix(event, "bts:")
switch event {
case "heartbeat":
return nil
case "bts:subscribe", "bts:subscription_succeeded":
if b.Verbose {
log.Debugf(log.ExchangeSys, "%v - Websocket subscription acknowledgement", b.Name)
}
case "bts:unsubscribe":
if b.Verbose {
log.Debugf(log.ExchangeSys, "%v - Websocket unsubscribe acknowledgement", b.Name)
}
case "bts:request_reconnect":
if b.Verbose {
log.Debugf(log.ExchangeSys, "%v - Websocket reconnection request received", b.Name)
}
go func() {
err := b.Websocket.Shutdown()
if err != nil {
log.Errorf(log.WebsocketMgr, "%s failed to shutdown websocket: %v", b.Name, err)
}
}() // Connection monitor will reconnect
case "subscription_succeeded", "unsubscription_succeeded":
return b.handleWSSubscription(event, respRaw)
case "data":
if err := b.handleWSOrderbook(wsResponse, respRaw); err != nil {
return err
}
return b.handleWSOrderbook(respRaw)
case "trade":
if err := b.handleWSTrade(wsResponse, respRaw); err != nil {
return err
}
return b.handleWSTrade(respRaw)
case "order_created", "order_deleted", "order_changed":
// Only process MyOrders, not orders from the LiveOrder channel
if wsResponse.channelType == bitstampAPIWSMyOrders {
if err := b.handleWSOrder(wsResponse, respRaw); err != nil {
return err
return b.handleWSOrder(event, respRaw)
case "request_reconnect":
go func() {
if err := b.Websocket.Shutdown(); err != nil { // Connection monitor will reconnect
log.Errorf(log.WebsocketMgr, "%s failed to shutdown websocket: %v", b.Name, err)
}
}
}()
default:
b.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: b.Name + stream.UnhandledMessage + string(respRaw)}
}
return nil
}

func (b *Bitstamp) handleWSOrderbook(wsResp *websocketResponse, msg []byte) error {
if wsResp.pair.IsEmpty() {
return errWSPairParsingError
}

wsOrderBookTemp := websocketOrderBookResponse{}
err := json.Unmarshal(msg, &wsOrderBookTemp)
func (b *Bitstamp) handleWSSubscription(event string, respRaw []byte) error {
channel, err := jsonparser.GetUnsafeString(respRaw, "channel")
if err != nil {
return err
return fmt.Errorf("%w `channel`: %w", errParsingWSField, err)
}

return b.wsUpdateOrderbook(&wsOrderBookTemp.Data, wsResp.pair, asset.Spot)
event = strings.TrimSuffix(event, "scription_succeeded")
if !b.Websocket.Match.IncomingWithData(event+":"+channel, respRaw) {
return fmt.Errorf("%w: %s", stream.ErrNoMessageListener, event+":"+channel)
}
return nil
}

func (b *Bitstamp) handleWSTrade(wsResp *websocketResponse, msg []byte) error {
func (b *Bitstamp) handleWSTrade(msg []byte) error {
if !b.IsSaveTradeDataEnabled() {
return nil
}

if wsResp.pair.IsEmpty() {
return errWSPairParsingError
_, p, err := b.parseChannelName(msg)
if err != nil {
return err
}

wsTradeTemp := websocketTradeResponse{}
Expand All @@ -179,7 +162,7 @@ func (b *Bitstamp) handleWSTrade(wsResp *websocketResponse, msg []byte) error {
}
return trade.AddTradesToBuffer(b.Name, trade.Data{
Timestamp: time.Unix(wsTradeTemp.Data.Timestamp, 0),
CurrencyPair: wsResp.pair,
CurrencyPair: p,
AssetType: asset.Spot,
Exchange: b.Name,
Price: wsTradeTemp.Data.Price,
Expand All @@ -189,7 +172,15 @@ func (b *Bitstamp) handleWSTrade(wsResp *websocketResponse, msg []byte) error {
})
}

func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error {
func (b *Bitstamp) handleWSOrder(event string, msg []byte) error {
channel, p, err := b.parseChannelName(msg)
if err != nil {
return err
}
if channel != bitstampAPIWSMyOrders {
return nil // Only process MyOrders, not orders from the LiveOrder channel
}

r := &websocketOrderResponse{}
if err := json.Unmarshal(msg, &r); err != nil {
return err
Expand All @@ -200,7 +191,7 @@ func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error {
}

var status order.Status
switch wsResp.Event {
switch event {
case "order_created":
status = order.New
case "order_changed":
Expand Down Expand Up @@ -230,7 +221,7 @@ func (b *Bitstamp) handleWSOrder(wsResp *websocketResponse, msg []byte) error {
Status: status,
AssetType: asset.Spot,
Date: r.Order.Microtimestamp.Time(),
Pair: wsResp.pair,
Pair: p,
}

b.Websocket.DataHandler <- d
Expand Down Expand Up @@ -273,7 +264,7 @@ func (b *Bitstamp) subscribe(subs subscription.List, creds *WebsocketAuthRespons
req.Data.Channel = "private-" + req.Data.Channel + "-" + strconv.Itoa(int(creds.UserID))
req.Data.Auth = creds.Token
}
err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req)
_, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, "sub:"+req.Data.Channel, req)
if err == nil {
err = b.Websocket.AddSuccessfulSubscriptions(b.Websocket.Conn, s)
}
Expand All @@ -299,7 +290,7 @@ func (b *Bitstamp) unsubscribe(subs subscription.List) error {
Channel: s.QualifiedChannel,
},
}
err := b.Websocket.Conn.SendJSONMessage(context.TODO(), request.Unset, req)
_, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Unset, "unsub:"+req.Data.Channel, req)
if err == nil {
err = b.Websocket.RemoveSubscriptions(b.Websocket.Conn, s)
}
Expand All @@ -310,7 +301,18 @@ func (b *Bitstamp) unsubscribe(subs subscription.List) error {
return errs
}

func (b *Bitstamp) wsUpdateOrderbook(update *websocketOrderBook, p currency.Pair, assetType asset.Item) error {
func (b *Bitstamp) handleWSOrderbook(msg []byte) error {
_, p, err := b.parseChannelName(msg)
if err != nil {
return err
}

wsOrderBookResp := websocketOrderBookResponse{}
if err := json.Unmarshal(msg, &wsOrderBookResp); err != nil {
return err
}
update := &wsOrderBookResp.Data

if len(update.Asks) == 0 && len(update.Bids) == 0 {
return errors.New("no orderbook data")
}
Expand All @@ -320,7 +322,7 @@ func (b *Bitstamp) wsUpdateOrderbook(update *websocketOrderBook, p currency.Pair
Asks: make(orderbook.Tranches, len(update.Asks)),
Pair: p,
LastUpdated: time.UnixMicro(update.Microtimestamp),
Asset: assetType,
Asset: asset.Spot,
Exchange: b.Name,
VerifyOrderbook: b.CanVerifyOrderbook,
}
Expand Down Expand Up @@ -411,37 +413,39 @@ func (b *Bitstamp) FetchWSAuth(ctx context.Context) (*WebsocketAuthResponse, err
return resp, nil
}

// parseChannel splits the ws response channel and sets the channel type and pair
func (b *Bitstamp) parseChannelName(r *websocketResponse) error {
if r.Channel == "" {
return nil
// parseChannelName splits the ws message channel and returns the channel name and pair
func (b *Bitstamp) parseChannelName(respRaw []byte) (string, currency.Pair, error) {
channel, err := jsonparser.GetUnsafeString(respRaw, "channel")
if err != nil {
return "", currency.EMPTYPAIR, fmt.Errorf("%w `channel`: %w", errParsingWSField, err)
}

chanName := r.Channel
authParts := strings.Split(r.Channel, "-")
authParts := strings.Split(channel, "-")
switch len(authParts) {
case 1:
// Not an auth channel
case 3:
chanName = authParts[1]
channel = authParts[1]
default:
return fmt.Errorf("channel name does not contain exactly 0 or 2 hyphens: %v", r.Channel)
return "", currency.EMPTYPAIR, fmt.Errorf("%w: %s", errChannelHyphens, channel)
}

parts := strings.Split(chanName, "_")
parts := strings.Split(channel, "_")
if len(parts) != 3 {
return fmt.Errorf("%w: channel name does not contain exactly 2 underscores: %v", errWSPairParsingError, r.Channel)
return "", currency.EMPTYPAIR, fmt.Errorf("%w: %s", errChannelUnderscores, channel)
}

r.channelType = parts[0] + "_" + parts[1]
symbol := parts[2]

enabledPairs, err := b.GetEnabledPairs(asset.Spot)
if err == nil {
r.pair, err = enabledPairs.DeriveFrom(symbol)
if err != nil {
return "", currency.EMPTYPAIR, err
}

pair, err := enabledPairs.DeriveFrom(parts[2])
if err != nil {
return "", currency.EMPTYPAIR, fmt.Errorf("%w: %s", errParsingWSPair, err)
}

return err
return parts[0] + "_" + parts[1], pair, nil
}

// channelName converts global channel Names to exchange specific ones
Expand Down
2 changes: 1 addition & 1 deletion testdata/configtest.json
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@
},
"enabled": {
"autoPairUpdates": true,
"websocketAPI": false
"websocketAPI": true
}
},
"bankAccounts": [
Expand Down

0 comments on commit b189cd2

Please sign in to comment.