diff --git a/exchanges/bitget/bitget.go b/exchanges/bitget/bitget.go index 6fb60f0b6f4..e580e14aa21 100644 --- a/exchanges/bitget/bitget.go +++ b/exchanges/bitget/bitget.go @@ -203,11 +203,11 @@ const ( // Websocket endpoints // Unauthenticated - bitgetTickerChannel = "ticker" bitgetCandleDailyChannel = "candle1D" // There's one of these for each time period, but we'll ignore those for now + bitgetBookFullChannel = "books" // There's more of these for varying orderbook depths, ignored for now // Authenticated - bitgetAccountChannel = "account" + bitgetFillChannel = "fill" errIntervalNotSupported = "interval not supported" errAuthenticatedWebsocketDisabled = "%v AuthenticatedWebsocketAPISupport not enabled" @@ -283,6 +283,7 @@ var ( errStrategyMutex = errors.New("only one of immediate or cancel, fill or kill, and post only can be set to true") errOrderNotFound = errors.New("order not found") errReturnEmpty = errors.New("returned data unexpectedly empty") + errInvalidChecksum = errors.New("invalid checksum") prodTypes = []string{"USDT-FUTURES", "COIN-FUTURES", "USDC-FUTURES"} planTypes = []string{"normal_plan", "track_plan", "profit_loss"} diff --git a/exchanges/bitget/bitget_types.go b/exchanges/bitget/bitget_types.go index edb830bf938..192ab91bac5 100644 --- a/exchanges/bitget/bitget_types.go +++ b/exchanges/bitget/bitget_types.go @@ -2506,3 +2506,20 @@ type WsAccountResponse struct { LimitAvailable float64 `json:"limitAvailable,string"` UpdateTime UnixTimestamp `json:"uTime"` } + +// WsTradeResponse contains information on a trade response +type WsTradeResponse struct { + Timestamp UnixTimestamp `json:"ts"` + Price float64 `json:"price,string"` + Size float64 `json:"size,string"` + Side string `json:"side"` + TradeID int64 `json:"tradeId,string"` +} + +// WsOrderBookResponse contains information on an order book response +type WsOrderBookResponse struct { + Asks [][2]string `json:"asks"` + Bids [][2]string `json:"bids"` + Timestamp UnixTimestamp `json:"ts"` + Checksum int32 `json:"checksum"` +} diff --git a/exchanges/bitget/bitget_websocket.go b/exchanges/bitget/bitget_websocket.go index 0f0fdb8a535..6943f997891 100644 --- a/exchanges/bitget/bitget_websocket.go +++ b/exchanges/bitget/bitget_websocket.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "fmt" + "hash/crc32" + "html/template" "net/http" "strconv" "strings" @@ -14,9 +16,11 @@ import ( "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/account" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" + "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" ) @@ -25,6 +29,24 @@ const ( bitgetPrivateWSURL = "wss://ws.bitget.com/v2/ws/private" ) +var subscriptionNames = map[string]string{ + subscription.TickerChannel: bitgetTicker, + subscription.CandlesChannel: bitgetCandleDailyChannel, + subscription.AllOrdersChannel: bitgetTrade, + subscription.OrderbookChannel: bitgetBookFullChannel, + "account": bitgetAccount, + subscription.AllTradesChannel: bitgetFillChannel, +} + +var defaultSubscriptions = subscription.List{ + {Enabled: false, Channel: subscription.TickerChannel, Asset: asset.Spot}, + {Enabled: false, Channel: subscription.CandlesChannel, Asset: asset.Spot}, + {Enabled: false, Channel: subscription.AllOrdersChannel, Asset: asset.Spot}, + {Enabled: false, Channel: subscription.OrderbookChannel, Asset: asset.Spot}, + {Enabled: false, Channel: "account", Authenticated: true, Asset: asset.Spot}, + {Enabled: true, Channel: subscription.AllTradesChannel, Authenticated: true, Asset: asset.Spot}, +} + // WsConnect connects to a websocket feed func (bi *Bitget) WsConnect() error { if !bi.Websocket.IsEnabled() || !bi.IsEnabled() { @@ -57,6 +79,7 @@ func (bi *Bitget) WsConnect() error { return nil } +// WsAuth sends an authentication message to the websocket func (bi *Bitget) WsAuth(ctx context.Context, dialer *websocket.Dialer) error { if !bi.Websocket.CanUseAuthenticatedEndpoints() { return fmt.Errorf(errAuthenticatedWebsocketDisabled, bi.Name) @@ -117,6 +140,7 @@ func (bi *Bitget) wsReadData(ws stream.Connection) { } } +// wsHandleData handles data from the websocket connection func (bi *Bitget) wsHandleData(respRaw []byte) error { var wsResponse WsResponse if respRaw != nil && string(respRaw[:4]) == "pong" { @@ -149,7 +173,7 @@ func (bi *Bitget) wsHandleData(respRaw []byte) error { } case "snapshot": switch wsResponse.Arg.Channel { - case bitgetTickerChannel: + case bitgetTicker: var ticks []WsTickerSnapshot err := json.Unmarshal(wsResponse.Data, &ticks) if err != nil { @@ -227,12 +251,38 @@ func (bi *Bitget) wsHandleData(respRaw []byte) error { } } bi.Websocket.DataHandler <- resp - case bitgetAccountChannel: - var account []WsAccountResponse - err := json.Unmarshal(wsResponse.Data, &account) + case bitgetTrade: + resp, err := bi.tradeDataHandler(wsResponse) + if err != nil { + return err + } + bi.Websocket.DataHandler <- resp + case bitgetBookFullChannel: + err := bi.orderbookDataHandler(wsResponse) + if err != nil { + return err + } + case bitgetAccount: + var acc []WsAccountResponse + err := json.Unmarshal(wsResponse.Data, &acc) if err != nil { return err } + var hold account.Holdings + hold.Exchange = bi.Name + var sub account.SubAccount + sub.AssetType = asset.Spot + sub.Currencies = make([]account.Balance, len(acc)) + for i := range acc { + sub.Currencies[i] = account.Balance{ + Currency: currency.NewCode(acc[i].Coin), + Hold: acc[i].Frozen + acc[i].Locked, + Free: acc[i].Available, + Total: sub.Currencies[i].Hold + sub.Currencies[i].Free, + } + } + // Plan to add handling of account.Holdings on websocketDataHandler side in a later PR + bi.Websocket.DataHandler <- hold default: bi.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: bi.Name + stream.UnhandledMessage + string(respRaw)} @@ -291,7 +341,18 @@ func (bi *Bitget) wsHandleData(respRaw []byte) error { } } bi.Websocket.DataHandler <- resp - case bitgetAccountChannel: + case bitgetTrade: + resp, err := bi.tradeDataHandler(wsResponse) + if err != nil { + return err + } + bi.Websocket.DataHandler <- resp + case bitgetBookFullChannel: + err := bi.orderbookDataHandler(wsResponse) + if err != nil { + return err + } + case bitgetAccount: var acc []WsAccountResponse err := json.Unmarshal(wsResponse.Data, &acc) if err != nil { @@ -318,22 +379,146 @@ func (bi *Bitget) wsHandleData(respRaw []byte) error { return nil } +// TradeDataHandler handles trade data, as functionality is shared between updates and snapshots +func (bi *Bitget) tradeDataHandler(wsResponse WsResponse) ([]trade.Data, error) { + var trades []WsTradeResponse + pair, err := pairFromStringHelper(wsResponse.Arg.InstrumentID) + if err != nil { + return nil, err + } + err = json.Unmarshal(wsResponse.Data, &trades) + if err != nil { + return nil, err + } + resp := make([]trade.Data, len(trades)) + for i := range trades { + resp[i] = trade.Data{ + Timestamp: trades[i].Timestamp.Time(), + CurrencyPair: pair, + AssetType: asset.Spot, + Exchange: bi.Name, + Price: trades[i].Price, + Amount: trades[i].Size, + Side: sideDecoder(trades[i].Side), + TID: strconv.FormatInt(trades[i].TradeID, 10), + } + } + return resp, nil +} + +// OrderbookDataHandler handles orderbook data, as functionality is shared between updates and snapshots +func (bi *Bitget) orderbookDataHandler(wsResponse WsResponse) error { + var ob []WsOrderBookResponse + pair, err := pairFromStringHelper(wsResponse.Arg.InstrumentID) + if err != nil { + return err + } + err = json.Unmarshal(wsResponse.Data, &ob) + if err != nil { + return err + } + if len(ob) == 0 { + return errReturnEmpty + } + bids, err := trancheConstructor(ob[0].Bids) + if err != nil { + return err + } + asks, err := trancheConstructor(ob[0].Asks) + if err != nil { + return err + } + if wsResponse.Action[0] == 's' { + orderbook := orderbook.Base{ + Pair: pair, + Asset: asset.Spot, + Bids: bids, + Asks: asks, + LastUpdated: wsResponse.Timestamp.Time(), + Exchange: bi.Name, + VerifyOrderbook: bi.CanVerifyOrderbook, + ChecksumStringRequired: true, + } + err = bi.Websocket.Orderbook.LoadSnapshot(&orderbook) + if err != nil { + return err + } + } else { + update := orderbook.Update{ + Bids: bids, + Asks: asks, + Pair: pair, + UpdateTime: wsResponse.Timestamp.Time(), + Asset: asset.Spot, + Checksum: uint32(ob[0].Checksum), + } + err = bi.Websocket.Orderbook.Update(&update) + if err != nil { + return err + } + } + return nil +} + +// TrancheConstructor turns the exchange's orderbook data into a standardised format for the engine +func trancheConstructor(data [][2]string) ([]orderbook.Tranche, error) { + resp := make([]orderbook.Tranche, len(data)) + var err error + for i := range data { + resp[i] = orderbook.Tranche{ + StrPrice: data[i][0], + StrAmount: data[i][1], + } + resp[i].Price, err = strconv.ParseFloat(data[i][0], 64) + if err != nil { + return nil, err + } + resp[i].Amount, err = strconv.ParseFloat(data[i][1], 64) + if err != nil { + return nil, err + } + } + return resp, nil +} + +func (bi *Bitget) CalculateUpdateOrderbookChecksum(orderbookData *orderbook.Base, checksumVal uint32) error { + var builder strings.Builder + for i := 0; i < 25; i++ { + if len(orderbookData.Bids) > i { + builder.WriteString(orderbookData.Bids[i].StrPrice + ":" + orderbookData.Bids[i].StrAmount + ":") + } + if len(orderbookData.Asks) > i { + builder.WriteString(orderbookData.Asks[i].StrPrice + ":" + orderbookData.Asks[i].StrAmount + ":") + } + } + check := builder.String() + if check != "" { + check = check[:len(check)-1] + } + if crc32.ChecksumIEEE([]byte(check)) != checksumVal { + return errInvalidChecksum + } + return nil +} + +// GenerateDefaultSubscriptions generates default subscriptions func (bi *Bitget) generateDefaultSubscriptions() (subscription.List, error) { - channels := []string{bitgetAccountChannel} - // channels := []string{bitgetTickerChannel} enabledPairs, err := bi.GetEnabledPairs(asset.Spot) if err != nil { return nil, err } - var subscriptions subscription.List - for i := range channels { - subscriptions = append(subscriptions, &subscription.Subscription{ - Channel: channels[i], - Pairs: enabledPairs, - Asset: asset.Spot, - }) + subs := make(subscription.List, 0, len(defaultSubscriptions)) + for _, sub := range defaultSubscriptions { + if sub.Enabled { + subs = append(subs, sub) + } } - return subscriptions, nil + subs = subs[:len(subs):len(subs)] + for i := range subs { + subs[i].Pairs = enabledPairs + subs[i].Channel = subscriptionNames[subs[i].Channel] + } + return subs, nil } // Subscribe sends a websocket message to receive data from the channel @@ -384,16 +569,17 @@ func (bi *Bitget) websocketMessage(subs subscription.List, op string) error { } for _, s := range subs { switch s.Channel { - case bitgetTickerChannel, bitgetCandleDailyChannel: - bi.reqBuilder(unauthBase, s) - case bitgetAccountChannel: + case bitgetAccount, bitgetFillChannel: authBase.Arguments = append(authBase.Arguments, WsArgument{ Channel: s.Channel, InstrumentType: strings.ToUpper(s.Asset.String()), Coin: "default", }) - default: + } + if s.Authenticated { bi.reqBuilder(authBase, s) + } else { + bi.reqBuilder(unauthBase, s) } } unauthReq := reqSplitter(unauthBase) @@ -417,4 +603,22 @@ func (bi *Bitget) websocketMessage(subs subscription.List, op string) error { return nil } -// SendJSONMessage sends a JSON message to the connected websocket +// GetSubscriptionTemplate returns a subscription channel template +func (bi *Bitget) GetSubscriptionTemplate(_ *subscription.Subscription) (*template.Template, error) { + return template.New("master.tmpl").Funcs(template.FuncMap{"channelName": channelName}).Parse(subTplText) +} + +func channelName(s *subscription.Subscription) string { + if n, ok := subscriptionNames[s.Channel]; ok { + return n + } + // Replace error with subscription.ErrNotSupported after merge + panic(fmt.Errorf("error not supported: %s", s.Channel)) +} + +const subTplText = ` +{{ range $asset, $pairs := $.AssetPairs }} + {{- channelName $.S -}} + {{- $.AssetSeparator }} +{{- end }} +` diff --git a/exchanges/bitget/bitget_wrapper.go b/exchanges/bitget/bitget_wrapper.go index fd332ec05ac..c6bdd28cf7d 100644 --- a/exchanges/bitget/bitget_wrapper.go +++ b/exchanges/bitget/bitget_wrapper.go @@ -28,6 +28,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/protocol" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream/buffer" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" "github.com/thrasher-corp/gocryptotrader/log" @@ -164,6 +165,9 @@ func (bi *Bitget) Setup(exch *config.Exchange) error { Unsubscriber: bi.Unsubscribe, GenerateSubscriptions: bi.generateDefaultSubscriptions, Features: &bi.Features.Supports.WebsocketCapabilities, + OrderbookBufferConfig: buffer.Config{ + Checksum: bi.CalculateUpdateOrderbookChecksum, + }, }) if err != nil { return err