From d6f20e983dbbdba4ea62bc8064df30cd4b0e0b16 Mon Sep 17 00:00:00 2001 From: Gareth Kirwan Date: Thu, 10 Aug 2023 13:40:30 +0700 Subject: [PATCH] Bitfinex: Refactor wsUpdate handling This commit doesn't break out all the sub-updater, but attempts to do something about the unmanagable size of ws update handling --- exchanges/bitfinex/bitfinex_websocket.go | 1178 ++++++++++++---------- 1 file changed, 621 insertions(+), 557 deletions(-) diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index d33fc757fdf..3876eb44547 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -223,443 +223,20 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { } } - c, ok := b.WebsocketSubdChannels[chanID] - if !ok && chanID != 0 { - return fmt.Errorf("unable to locate chanID: %d", - chanID) - } - - switch c.Channel { - case wsBook: - var newOrderbook []WebsocketBook - obSnapBundle, ok := d[1].([]interface{}) - if !ok { - return errors.New("orderbook interface cast failed") - } - if len(obSnapBundle) == 0 { - return errors.New("no data within orderbook snapshot") - } - - sequenceNo, ok := d[2].(float64) - if !ok { - return errors.New("type assertion failure") - } - - var fundingRate bool - switch id := obSnapBundle[0].(type) { - case []interface{}: - for i := range obSnapBundle { - data, ok := obSnapBundle[i].([]interface{}) - if !ok { - return errors.New("type assertion failed for orderbok item data") - } - id, okAssert := data[0].(float64) - if !okAssert { - return errors.New("type assertion failed for orderbook id data") - } - pricePeriod, okAssert := data[1].(float64) - if !okAssert { - return errors.New("type assertion failed for orderbook price data") - } - rateAmount, okAssert := data[2].(float64) - if !okAssert { - return errors.New("type assertion failed for orderbook rate data") - } - if len(data) == 4 { - fundingRate = true - amount, okFunding := data[3].(float64) - if !okFunding { - return errors.New("type assertion failed for orderbook funding data") - } - newOrderbook = append(newOrderbook, WebsocketBook{ - ID: int64(id), - Period: int64(pricePeriod), - Price: rateAmount, - Amount: amount}) - } else { - newOrderbook = append(newOrderbook, WebsocketBook{ - ID: int64(id), - Price: pricePeriod, - Amount: rateAmount}) - } - } - if err := b.WsInsertSnapshot(c.Currency, c.Asset, newOrderbook, fundingRate); err != nil { - return fmt.Errorf("inserting snapshot error: %s", - err) - } - case float64: - pricePeriod, okSnap := obSnapBundle[1].(float64) - if !okSnap { - return errors.New("type assertion failed for orderbook price snapshot data") - } - amountRate, okSnap := obSnapBundle[2].(float64) - if !okSnap { - return errors.New("type assertion failed for orderbook amount snapshot data") - } - if len(obSnapBundle) == 4 { - fundingRate = true - var amount float64 - amount, okSnap = obSnapBundle[3].(float64) - if !okSnap { - return errors.New("type assertion failed for orderbook amount snapshot data") - } - newOrderbook = append(newOrderbook, WebsocketBook{ - ID: int64(id), - Period: int64(pricePeriod), - Price: amountRate, - Amount: amount}) - } else { - newOrderbook = append(newOrderbook, WebsocketBook{ - ID: int64(id), - Price: pricePeriod, - Amount: amountRate}) - } - - if err := b.WsUpdateOrderbook(c.Currency, c.Asset, newOrderbook, chanID, int64(sequenceNo), fundingRate); err != nil { - return fmt.Errorf("updating orderbook error: %s", - err) - } - } - - return nil - case wsCandles: - if candleBundle, ok := d[1].([]interface{}); ok { - if len(candleBundle) == 0 { - return nil - } - - switch candleData := candleBundle[0].(type) { - case []interface{}: - for i := range candleBundle { - var element []interface{} - element, ok = candleBundle[i].([]interface{}) - if !ok { - return errors.New("candle type assertion for element data") - } - if len(element) < 6 { - return errors.New("invalid candleBundle length") - } - var err error - var klineData stream.KlineData - if klineData.Timestamp, err = convert.TimeFromUnixTimestampFloat(element[0]); err != nil { - return fmt.Errorf("unable to convert candle timestamp: %w", err) - } - if klineData.OpenPrice, ok = element[1].(float64); !ok { - return errors.New("unable to type assert candle open price") - } - if klineData.ClosePrice, ok = element[2].(float64); !ok { - return errors.New("unable to type assert candle close price") - } - if klineData.HighPrice, ok = element[3].(float64); !ok { - return errors.New("unable to type assert candle high price") - } - if klineData.LowPrice, ok = element[4].(float64); !ok { - return errors.New("unable to type assert candle low price") - } - if klineData.Volume, ok = element[5].(float64); !ok { - return errors.New("unable to type assert candle volume") - } - klineData.Exchange = b.Name - klineData.AssetType = c.Asset - klineData.Pair = c.Currency - b.Websocket.DataHandler <- klineData - } - case float64: - if len(candleBundle) < 6 { - return errors.New("invalid candleBundle length") - } - var err error - var klineData stream.KlineData - if klineData.Timestamp, err = convert.TimeFromUnixTimestampFloat(candleData); err != nil { - return fmt.Errorf("unable to convert candle timestamp: %w", err) - } - if klineData.OpenPrice, ok = candleBundle[1].(float64); !ok { - return errors.New("unable to type assert candle open price") - } - if klineData.ClosePrice, ok = candleBundle[2].(float64); !ok { - return errors.New("unable to type assert candle close price") - } - if klineData.HighPrice, ok = candleBundle[3].(float64); !ok { - return errors.New("unable to type assert candle high price") - } - if klineData.LowPrice, ok = candleBundle[4].(float64); !ok { - return errors.New("unable to type assert candle low price") - } - if klineData.Volume, ok = candleBundle[5].(float64); !ok { - return errors.New("unable to type assert candle volume") - } - klineData.Exchange = b.Name - klineData.AssetType = c.Asset - klineData.Pair = c.Currency - b.Websocket.DataHandler <- klineData - } - } - return nil - case wsTicker: - tickerData, ok := d[1].([]interface{}) - if !ok { - return errors.New("type assertion for tickerData") - } - - t := &ticker.Price{ - AssetType: c.Asset, - Pair: c.Currency, - ExchangeName: b.Name, - } - - if len(tickerData) == 10 { - if t.Bid, ok = tickerData[0].(float64); !ok { - return errors.New("unable to type assert ticker bid") - } - if t.Ask, ok = tickerData[2].(float64); !ok { - return errors.New("unable to type assert ticker ask") - } - if t.Last, ok = tickerData[6].(float64); !ok { - return errors.New("unable to type assert ticker last") - } - if t.Volume, ok = tickerData[7].(float64); !ok { - return errors.New("unable to type assert ticker volume") - } - if t.High, ok = tickerData[8].(float64); !ok { - return errors.New("unable to type assert ticker high") - } - if t.Low, ok = tickerData[9].(float64); !ok { - return errors.New("unable to type assert ticker low") - } + if chanID != 0 { + if c, ok := b.WebsocketSubdChannels[chanID]; ok { + return b.handleWSChannelUpdate(c, d) } else { - if t.FlashReturnRate, ok = tickerData[0].(float64); !ok { - return errors.New("unable to type assert ticker flash return rate") - } - if t.Bid, ok = tickerData[1].(float64); !ok { - return errors.New("unable to type assert ticker bid") - } - if t.BidPeriod, ok = tickerData[2].(float64); !ok { - return errors.New("unable to type assert ticker bid period") - } - if t.BidSize, ok = tickerData[3].(float64); !ok { - return errors.New("unable to type assert ticker bid size") - } - if t.Ask, ok = tickerData[4].(float64); !ok { - return errors.New("unable to type assert ticker ask") - } - if t.AskPeriod, ok = tickerData[5].(float64); !ok { - return errors.New("unable to type assert ticker ask period") - } - if t.AskSize, ok = tickerData[6].(float64); !ok { - return errors.New("unable to type assert ticker ask size") - } - if t.Last, ok = tickerData[9].(float64); !ok { - return errors.New("unable to type assert ticker last") - } - if t.Volume, ok = tickerData[10].(float64); !ok { - return errors.New("unable to type assert ticker volume") - } - if t.High, ok = tickerData[11].(float64); !ok { - return errors.New("unable to type assert ticker high") - } - if t.Low, ok = tickerData[12].(float64); !ok { - return errors.New("unable to type assert ticker low") - } - if t.FlashReturnRateAmount, ok = tickerData[15].(float64); !ok { - return errors.New("unable to type assert ticker flash return rate") - } - } - b.Websocket.DataHandler <- t - return nil - case wsTrades: - if !b.IsSaveTradeDataEnabled() { - return nil - } - if c.Asset == asset.MarginFunding { - return nil - } - var tradeHolder []WebsocketTrade - switch len(d) { - case 2: - snapshot, ok := d[1].([]interface{}) - if !ok { - return errors.New("unable to type assert trade snapshot data") - } - for i := range snapshot { - elem, ok := snapshot[i].([]interface{}) - if !ok { - return errors.New("unable to type assert trade snapshot element data") - } - tradeID, ok := elem[0].(float64) - if !ok { - return errors.New("unable to type assert trade ID") - } - timestamp, ok := elem[1].(float64) - if !ok { - return errors.New("unable to type assert trade timestamp") - } - amount, ok := elem[2].(float64) - if !ok { - return errors.New("unable to type assert trade amount") - } - wsTrade := WebsocketTrade{ - ID: int64(tradeID), - Timestamp: int64(timestamp), - Amount: amount, - } - if len(elem) == 5 { - rate, ok := elem[3].(float64) - if !ok { - return errors.New("unable to type assert trade rate") - } - wsTrade.Rate = rate - period, ok := elem[4].(float64) - if !ok { - return errors.New("unable to type assert trade period") - } - wsTrade.Period = int64(period) - } else { - price, ok := elem[3].(float64) - if !ok { - return errors.New("unable to type assert trade price") - } - wsTrade.Rate = price - } - tradeHolder = append(tradeHolder, wsTrade) - } - case 3: - event, ok := d[1].(string) - if !ok { - return errors.New("unable to type assert data event") - } - if event != wsFundingTradeUpdate && - event != wsTradeExecutionUpdate { - return nil - } - data, ok := d[2].([]interface{}) - if !ok { - return errors.New("trade data type assertion error") - } - - tradeID, ok := data[0].(float64) - if !ok { - return errors.New("unable to type assert trade ID") - } - timestamp, ok := data[1].(float64) - if !ok { - return errors.New("unable to type assert trade timestamp") - } - amount, ok := data[2].(float64) - if !ok { - return errors.New("unable to type assert trade amount") - } - wsTrade := WebsocketTrade{ - ID: int64(tradeID), - Timestamp: int64(timestamp), - Amount: amount, - } - if len(data) == 5 { - rate, ok := data[3].(float64) - if !ok { - return errors.New("unable to type assert trade rate") - } - period, ok := data[4].(float64) - if !ok { - return errors.New("unable to type assert trade period") - } - wsTrade.Rate = rate - wsTrade.Period = int64(period) - } else { - price, ok := data[3].(float64) - if !ok { - return errors.New("unable to type assert trade price") - } - wsTrade.Price = price - } - tradeHolder = append(tradeHolder, wsTrade) - } - trades := make([]trade.Data, len(tradeHolder)) - for i := range tradeHolder { - side := order.Buy - newAmount := tradeHolder[i].Amount - if newAmount < 0 { - side = order.Sell - newAmount *= -1 - } - price := tradeHolder[i].Price - if price == 0 && tradeHolder[i].Rate > 0 { - price = tradeHolder[i].Rate - } - trades[i] = trade.Data{ - TID: strconv.FormatInt(tradeHolder[i].ID, 10), - CurrencyPair: c.Currency, - Timestamp: time.UnixMilli(tradeHolder[i].Timestamp), - Price: price, - Amount: newAmount, - Exchange: b.Name, - AssetType: c.Asset, - Side: side, - } + return fmt.Errorf("unable to locate chanID: %d", chanID) } - - return b.AddTradesToBuffer(trades...) } - if authResp, ok := d[1].(string); ok { - switch authResp { + if eventType, ok := d[1].(string); ok { + switch eventType { case wsHeartbeat, pong: return nil case wsNotification: - notification, ok := d[2].([]interface{}) - if !ok { - return errors.New("unable to type assert notification data") - } - if data, ok := notification[4].([]interface{}); ok { - channelName, ok := notification[1].(string) - if !ok { - return errors.New("unable to type assert channelName") - } - switch { - case strings.Contains(channelName, wsFundingOfferNewRequest), - strings.Contains(channelName, wsFundingOfferUpdateRequest), - strings.Contains(channelName, wsFundingOfferCancelRequest): - if data[0] != nil { - if id, ok := data[0].(float64); ok && id > 0 { - if b.Websocket.Match.IncomingWithData(int64(id), respRaw) { - return nil - } - offer, err := wsHandleFundingOffer(data, true /* include rate real */) - if err != nil { - return err - } - b.Websocket.DataHandler <- offer - } - } - case strings.Contains(channelName, wsOrderNewRequest), - strings.Contains(channelName, wsOrderUpdateRequest), - strings.Contains(channelName, wsOrderCancelRequest): - if data[2] != nil { - if id, ok := data[2].(float64); ok && id > 0 { - if b.Websocket.Match.IncomingWithData(int64(id), respRaw) { - return nil - } - b.wsHandleOrder(data) - } - } - default: - return fmt.Errorf("%s - Unexpected data returned %s", - b.Name, - respRaw) - } - } - if notification[5] != nil { - if wsErr, ok := notification[5].(string); ok { - if strings.EqualFold(wsErr, wsError) { - if errMsg, ok := notification[6].(string); ok { - return fmt.Errorf("%s - Error %s", - b.Name, - errMsg) - } - return fmt.Errorf("%s - unhandled error message: %v", b.Name, - notification[6]) - } - } - } + return b.handleWSNotification(d, respRaw) case wsOrderSnapshot: if snapBundle, ok := d[2].([]interface{}); ok && len(snapBundle) > 0 { if _, ok := snapBundle[0].([]interface{}); ok { @@ -675,135 +252,11 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { b.wsHandleOrder(oData) } case wsPositionSnapshot: - if snapBundle, ok := d[2].([]interface{}); ok && len(snapBundle) > 0 { - if _, ok := snapBundle[0].([]interface{}); ok { - snapshot := make([]WebsocketPosition, len(snapBundle)) - for i := range snapBundle { - positionData, ok := snapBundle[i].([]interface{}) - if !ok { - return errors.New("unable to type assert wsPositionSnapshot positionData") - } - var position WebsocketPosition - if position.Pair, ok = positionData[0].(string); !ok { - return errors.New("unable to type assert position snapshot pair") - } - if position.Status, ok = positionData[1].(string); !ok { - return errors.New("unable to type assert position snapshot status") - } - if position.Amount, ok = positionData[2].(float64); !ok { - return errors.New("unable to type assert position snapshot amount") - } - if position.Price, ok = positionData[3].(float64); !ok { - return errors.New("unable to type assert position snapshot price") - } - if position.MarginFunding, ok = positionData[4].(float64); !ok { - return errors.New("unable to type assert position snapshot margin funding") - } - marginFundingType, ok := positionData[5].(float64) - if !ok { - return errors.New("unable to type assert position snapshot margin funding type") - } - position.MarginFundingType = int64(marginFundingType) - if position.ProfitLoss, ok = positionData[6].(float64); !ok { - return errors.New("unable to type assert position snapshot profit loss") - } - if position.ProfitLossPercent, ok = positionData[7].(float64); !ok { - return errors.New("unable to type assert position snapshot profit loss percent") - } - if position.LiquidationPrice, ok = positionData[8].(float64); !ok { - return errors.New("unable to type assert position snapshot liquidation price") - } - if position.Leverage, ok = positionData[9].(float64); !ok { - return errors.New("unable to type assert position snapshot leverage") - } - snapshot[i] = position - } - b.Websocket.DataHandler <- snapshot - } - } + return b.handleWSPositionSnapshot(d) case wsPositionNew, wsPositionUpdate, wsPositionClose: - if positionData, ok := d[2].([]interface{}); ok && len(positionData) > 0 { - var position WebsocketPosition - if position.Pair, ok = positionData[0].(string); !ok { - return errors.New("unable to type assert position pair") - } - if position.Status, ok = positionData[1].(string); !ok { - return errors.New("unable to type assert position status") - } - if position.Amount, ok = positionData[2].(float64); !ok { - return errors.New("unable to type assert position amount") - } - if position.Price, ok = positionData[3].(float64); !ok { - return errors.New("unable to type assert position price") - } - if position.MarginFunding, ok = positionData[4].(float64); !ok { - return errors.New("unable to type assert margin position funding") - } - marginFundingType, ok := positionData[5].(float64) - if !ok { - return errors.New("unable to type assert position margin funding type") - } - position.MarginFundingType = int64(marginFundingType) - if position.ProfitLoss, ok = positionData[6].(float64); !ok { - return errors.New("unable to type assert position profit loss") - } - if position.ProfitLossPercent, ok = positionData[7].(float64); !ok { - return errors.New("unable to type assert position profit loss percent") - } - if position.LiquidationPrice, ok = positionData[8].(float64); !ok { - return errors.New("unable to type assert position liquidation price") - } - if position.Leverage, ok = positionData[9].(float64); !ok { - return errors.New("unable to type assert position leverage") - } - b.Websocket.DataHandler <- position - } + return b.handleWSPositionUpdate(d) case wsTradeExecuted, wsTradeExecutionUpdate: - if tradeData, ok := d[2].([]interface{}); ok && len(tradeData) > 4 { - var tData WebsocketTradeData - var tradeID float64 - if tradeID, ok = tradeData[0].(float64); !ok { - return errors.New("unable to type assert trade ID") - } - tData.TradeID = int64(tradeID) - if tData.Pair, ok = tradeData[1].(string); !ok { - return errors.New("unable to type assert trade pair") - } - var timestamp float64 - if timestamp, ok = tradeData[2].(float64); !ok { - return errors.New("unable to type assert trade timestamp") - } - tData.Timestamp = int64(timestamp) - var orderID float64 - if orderID, ok = tradeData[3].(float64); !ok { - return errors.New("unable to type assert trade order ID") - } - tData.OrderID = int64(orderID) - if tData.AmountExecuted, ok = tradeData[4].(float64); !ok { - return errors.New("unable to type assert trade amount executed") - } - if tData.PriceExecuted, ok = tradeData[5].(float64); !ok { - return errors.New("unable to type assert trade price executed") - } - if tData.OrderType, ok = tradeData[6].(string); !ok { - return errors.New("unable to type assert trade order type") - } - if tData.OrderPrice, ok = tradeData[7].(float64); !ok { - return errors.New("unable to type assert trade order type") - } - var maker float64 - if maker, ok = tradeData[8].(float64); !ok { - return errors.New("unable to type assert trade maker") - } - tData.Maker = maker == 1 - if tData.Fee, ok = tradeData[9].(float64); !ok { - return errors.New("unable to type assert trade fee") - } - if tData.FeeCurrency, ok = tradeData[10].(string); !ok { - return errors.New("unable to type assert trade fee currency") - } - b.Websocket.DataHandler <- tData - } + return b.handleWSTradeUpdate(d, eventType) case wsFundingOfferSnapshot: if snapBundle, ok := d[2].([]interface{}); ok && len(snapBundle) > 0 { if _, ok := snapBundle[0].([]interface{}); ok { @@ -1045,6 +498,617 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { return nil } +func (b *Bitfinex) handleWSChannelUpdate(c *stream.ChannelSubscription, d []interface{}) error { + switch c.Channel { + case wsBook: + return b.handleWSBookUpdate(c, d) + case wsCandles: + return b.handleWSCandleUpdate(c, d) + case wsTicker: + return b.handleWSTickerUpdate(c, d) + case wsTrades: + return b.handleWSTradesUpdate(c, d) + } + + return fmt.Errorf("%s unhandled channel update: %s", b.Name, c.Channel) +} + +func (b *Bitfinex) handleWSBookUpdate(c *stream.ChannelSubscription, d []interface{}) error { + var newOrderbook []WebsocketBook + obSnapBundle, ok := d[1].([]interface{}) + if !ok { + return errors.New("orderbook interface cast failed") + } + if len(obSnapBundle) == 0 { + return errors.New("no data within orderbook snapshot") + } + + sequenceNo, ok := d[2].(float64) + if !ok { + return errors.New("type assertion failure") + } + + var fundingRate bool + switch id := obSnapBundle[0].(type) { + case []interface{}: + for i := range obSnapBundle { + data, ok := obSnapBundle[i].([]interface{}) + if !ok { + return errors.New("type assertion failed for orderbok item data") + } + id, okAssert := data[0].(float64) + if !okAssert { + return errors.New("type assertion failed for orderbook id data") + } + pricePeriod, okAssert := data[1].(float64) + if !okAssert { + return errors.New("type assertion failed for orderbook price data") + } + rateAmount, okAssert := data[2].(float64) + if !okAssert { + return errors.New("type assertion failed for orderbook rate data") + } + if len(data) == 4 { + fundingRate = true + amount, okFunding := data[3].(float64) + if !okFunding { + return errors.New("type assertion failed for orderbook funding data") + } + newOrderbook = append(newOrderbook, WebsocketBook{ + ID: int64(id), + Period: int64(pricePeriod), + Price: rateAmount, + Amount: amount}) + } else { + newOrderbook = append(newOrderbook, WebsocketBook{ + ID: int64(id), + Price: pricePeriod, + Amount: rateAmount}) + } + } + if err := b.WsInsertSnapshot(c.Currency, c.Asset, newOrderbook, fundingRate); err != nil { + return fmt.Errorf("inserting snapshot error: %s", + err) + } + case float64: + pricePeriod, okSnap := obSnapBundle[1].(float64) + if !okSnap { + return errors.New("type assertion failed for orderbook price snapshot data") + } + amountRate, okSnap := obSnapBundle[2].(float64) + if !okSnap { + return errors.New("type assertion failed for orderbook amount snapshot data") + } + if len(obSnapBundle) == 4 { + fundingRate = true + var amount float64 + amount, okSnap = obSnapBundle[3].(float64) + if !okSnap { + return errors.New("type assertion failed for orderbook amount snapshot data") + } + newOrderbook = append(newOrderbook, WebsocketBook{ + ID: int64(id), + Period: int64(pricePeriod), + Price: amountRate, + Amount: amount}) + } else { + newOrderbook = append(newOrderbook, WebsocketBook{ + ID: int64(id), + Price: pricePeriod, + Amount: amountRate}) + } + + chanId, ok := c.Params["chanId"].(int) + if !ok { + // This should absolutely never happen and is not currently possible. Leave check to protect against regression + return fmt.Errorf("%s chanId not an int. Type: %T Value: %v", b.Name, c.Params["chanId"], c.Params["chanId"]) + } + + if err := b.WsUpdateOrderbook(c.Currency, c.Asset, newOrderbook, chanId, int64(sequenceNo), fundingRate); err != nil { + return fmt.Errorf("updating orderbook error: %s", + err) + } + } + + return nil +} + +func (b *Bitfinex) handleWSCandleUpdate(c *stream.ChannelSubscription, d []interface{}) error { + candleBundle, ok := d[1].([]interface{}) + if !ok || len(candleBundle) == 0 { + return nil + } + + switch candleData := candleBundle[0].(type) { + case []interface{}: + for i := range candleBundle { + var element []interface{} + element, ok = candleBundle[i].([]interface{}) + if !ok { + return errors.New("candle type assertion for element data") + } + if len(element) < 6 { + return errors.New("invalid candleBundle length") + } + var err error + var klineData stream.KlineData + if klineData.Timestamp, err = convert.TimeFromUnixTimestampFloat(element[0]); err != nil { + return fmt.Errorf("unable to convert candle timestamp: %w", err) + } + if klineData.OpenPrice, ok = element[1].(float64); !ok { + return errors.New("unable to type assert candle open price") + } + if klineData.ClosePrice, ok = element[2].(float64); !ok { + return errors.New("unable to type assert candle close price") + } + if klineData.HighPrice, ok = element[3].(float64); !ok { + return errors.New("unable to type assert candle high price") + } + if klineData.LowPrice, ok = element[4].(float64); !ok { + return errors.New("unable to type assert candle low price") + } + if klineData.Volume, ok = element[5].(float64); !ok { + return errors.New("unable to type assert candle volume") + } + klineData.Exchange = b.Name + klineData.AssetType = c.Asset + klineData.Pair = c.Currency + b.Websocket.DataHandler <- klineData + } + case float64: + if len(candleBundle) < 6 { + return errors.New("invalid candleBundle length") + } + var err error + var klineData stream.KlineData + if klineData.Timestamp, err = convert.TimeFromUnixTimestampFloat(candleData); err != nil { + return fmt.Errorf("unable to convert candle timestamp: %w", err) + } + if klineData.OpenPrice, ok = candleBundle[1].(float64); !ok { + return errors.New("unable to type assert candle open price") + } + if klineData.ClosePrice, ok = candleBundle[2].(float64); !ok { + return errors.New("unable to type assert candle close price") + } + if klineData.HighPrice, ok = candleBundle[3].(float64); !ok { + return errors.New("unable to type assert candle high price") + } + if klineData.LowPrice, ok = candleBundle[4].(float64); !ok { + return errors.New("unable to type assert candle low price") + } + if klineData.Volume, ok = candleBundle[5].(float64); !ok { + return errors.New("unable to type assert candle volume") + } + klineData.Exchange = b.Name + klineData.AssetType = c.Asset + klineData.Pair = c.Currency + b.Websocket.DataHandler <- klineData + } + return nil +} + +func (b *Bitfinex) handleWSTickerUpdate(c *stream.ChannelSubscription, d []interface{}) error { + tickerData, ok := d[1].([]interface{}) + if !ok { + return errors.New("type assertion for tickerData") + } + + t := &ticker.Price{ + AssetType: c.Asset, + Pair: c.Currency, + ExchangeName: b.Name, + } + + if len(tickerData) == 10 { + if t.Bid, ok = tickerData[0].(float64); !ok { + return errors.New("unable to type assert ticker bid") + } + if t.Ask, ok = tickerData[2].(float64); !ok { + return errors.New("unable to type assert ticker ask") + } + if t.Last, ok = tickerData[6].(float64); !ok { + return errors.New("unable to type assert ticker last") + } + if t.Volume, ok = tickerData[7].(float64); !ok { + return errors.New("unable to type assert ticker volume") + } + if t.High, ok = tickerData[8].(float64); !ok { + return errors.New("unable to type assert ticker high") + } + if t.Low, ok = tickerData[9].(float64); !ok { + return errors.New("unable to type assert ticker low") + } + } else { + if t.FlashReturnRate, ok = tickerData[0].(float64); !ok { + return errors.New("unable to type assert ticker flash return rate") + } + if t.Bid, ok = tickerData[1].(float64); !ok { + return errors.New("unable to type assert ticker bid") + } + if t.BidPeriod, ok = tickerData[2].(float64); !ok { + return errors.New("unable to type assert ticker bid period") + } + if t.BidSize, ok = tickerData[3].(float64); !ok { + return errors.New("unable to type assert ticker bid size") + } + if t.Ask, ok = tickerData[4].(float64); !ok { + return errors.New("unable to type assert ticker ask") + } + if t.AskPeriod, ok = tickerData[5].(float64); !ok { + return errors.New("unable to type assert ticker ask period") + } + if t.AskSize, ok = tickerData[6].(float64); !ok { + return errors.New("unable to type assert ticker ask size") + } + if t.Last, ok = tickerData[9].(float64); !ok { + return errors.New("unable to type assert ticker last") + } + if t.Volume, ok = tickerData[10].(float64); !ok { + return errors.New("unable to type assert ticker volume") + } + if t.High, ok = tickerData[11].(float64); !ok { + return errors.New("unable to type assert ticker high") + } + if t.Low, ok = tickerData[12].(float64); !ok { + return errors.New("unable to type assert ticker low") + } + if t.FlashReturnRateAmount, ok = tickerData[15].(float64); !ok { + return errors.New("unable to type assert ticker flash return rate") + } + } + b.Websocket.DataHandler <- t + return nil +} + +func (b *Bitfinex) handleWSTradesUpdate(c *stream.ChannelSubscription, d []interface{}) error { + if !b.IsSaveTradeDataEnabled() { + return nil + } + if c.Asset == asset.MarginFunding { + return nil + } + var tradeHolder []WebsocketTrade + switch len(d) { + case 2: + snapshot, ok := d[1].([]interface{}) + if !ok { + return errors.New("unable to type assert trade snapshot data") + } + for i := range snapshot { + elem, ok := snapshot[i].([]interface{}) + if !ok { + return errors.New("unable to type assert trade snapshot element data") + } + tradeID, ok := elem[0].(float64) + if !ok { + return errors.New("unable to type assert trade ID") + } + timestamp, ok := elem[1].(float64) + if !ok { + return errors.New("unable to type assert trade timestamp") + } + amount, ok := elem[2].(float64) + if !ok { + return errors.New("unable to type assert trade amount") + } + wsTrade := WebsocketTrade{ + ID: int64(tradeID), + Timestamp: int64(timestamp), + Amount: amount, + } + if len(elem) == 5 { + rate, ok := elem[3].(float64) + if !ok { + return errors.New("unable to type assert trade rate") + } + wsTrade.Rate = rate + period, ok := elem[4].(float64) + if !ok { + return errors.New("unable to type assert trade period") + } + wsTrade.Period = int64(period) + } else { + price, ok := elem[3].(float64) + if !ok { + return errors.New("unable to type assert trade price") + } + wsTrade.Rate = price + } + tradeHolder = append(tradeHolder, wsTrade) + } + case 3: + event, ok := d[1].(string) + if !ok { + return errors.New("unable to type assert data event") + } + if event != wsFundingTradeUpdate && + event != wsTradeExecutionUpdate { + return nil + } + data, ok := d[2].([]interface{}) + if !ok { + return errors.New("trade data type assertion error") + } + + tradeID, ok := data[0].(float64) + if !ok { + return errors.New("unable to type assert trade ID") + } + timestamp, ok := data[1].(float64) + if !ok { + return errors.New("unable to type assert trade timestamp") + } + amount, ok := data[2].(float64) + if !ok { + return errors.New("unable to type assert trade amount") + } + wsTrade := WebsocketTrade{ + ID: int64(tradeID), + Timestamp: int64(timestamp), + Amount: amount, + } + if len(data) == 5 { + rate, ok := data[3].(float64) + if !ok { + return errors.New("unable to type assert trade rate") + } + period, ok := data[4].(float64) + if !ok { + return errors.New("unable to type assert trade period") + } + wsTrade.Rate = rate + wsTrade.Period = int64(period) + } else { + price, ok := data[3].(float64) + if !ok { + return errors.New("unable to type assert trade price") + } + wsTrade.Price = price + } + tradeHolder = append(tradeHolder, wsTrade) + } + trades := make([]trade.Data, len(tradeHolder)) + for i := range tradeHolder { + side := order.Buy + newAmount := tradeHolder[i].Amount + if newAmount < 0 { + side = order.Sell + newAmount *= -1 + } + price := tradeHolder[i].Price + if price == 0 && tradeHolder[i].Rate > 0 { + price = tradeHolder[i].Rate + } + trades[i] = trade.Data{ + TID: strconv.FormatInt(tradeHolder[i].ID, 10), + CurrencyPair: c.Currency, + Timestamp: time.UnixMilli(tradeHolder[i].Timestamp), + Price: price, + Amount: newAmount, + Exchange: b.Name, + AssetType: c.Asset, + Side: side, + } + } + + return b.AddTradesToBuffer(trades...) +} + +func (b *Bitfinex) handleWSNotification(d []interface{}, respRaw []byte) error { + notification, ok := d[2].([]interface{}) + if !ok { + return errors.New("unable to type assert notification data") + } + if data, ok := notification[4].([]interface{}); ok { + channelName, ok := notification[1].(string) + if !ok { + return errors.New("unable to type assert channelName") + } + switch { + case strings.Contains(channelName, wsFundingOfferNewRequest), + strings.Contains(channelName, wsFundingOfferUpdateRequest), + strings.Contains(channelName, wsFundingOfferCancelRequest): + if data[0] != nil { + if id, ok := data[0].(float64); ok && id > 0 { + if b.Websocket.Match.IncomingWithData(int64(id), respRaw) { + return nil + } + offer, err := wsHandleFundingOffer(data, true /* include rate real */) + if err != nil { + return err + } + b.Websocket.DataHandler <- offer + } + } + case strings.Contains(channelName, wsOrderNewRequest): + if data[2] != nil { + if cid, ok := data[2].(float64); ok && cid > 0 { + if b.Websocket.Match.IncomingWithData(int64(cid), respRaw) { + return nil + } + b.wsHandleOrder(data) + } + } + case strings.Contains(channelName, wsOrderUpdateRequest), + strings.Contains(channelName, wsOrderCancelRequest): + if data[0] != nil { + if id, ok := data[0].(float64); ok && id > 0 { + if b.Websocket.Match.IncomingWithData(int64(id), respRaw) { + return nil + } + b.wsHandleOrder(data) + } + } + default: + return fmt.Errorf("%s - Unexpected data returned %s", + b.Name, + respRaw) + } + } + if notification[5] != nil { + if wsErr, ok := notification[5].(string); ok { + if strings.EqualFold(wsErr, wsError) { + if errMsg, ok := notification[6].(string); ok { + return fmt.Errorf("%s - Error %s", + b.Name, + errMsg) + } + return fmt.Errorf("%s - unhandled error message: %v", b.Name, + notification[6]) + } + } + } + return nil +} + +func (b *Bitfinex) handleWSPositionSnapshot(d []interface{}) error { + snapBundle, ok := d[2].([]interface{}) + if !ok || len(snapBundle) == 0 { + return nil + } + if _, ok := snapBundle[0].([]interface{}); !ok { + return nil + } + snapshot := make([]WebsocketPosition, len(snapBundle)) + for i := range snapBundle { + positionData, ok := snapBundle[i].([]interface{}) + if !ok { + return errors.New("unable to type assert wsPositionSnapshot positionData") + } + var position WebsocketPosition + if position.Pair, ok = positionData[0].(string); !ok { + return errors.New("unable to type assert position snapshot pair") + } + if position.Status, ok = positionData[1].(string); !ok { + return errors.New("unable to type assert position snapshot status") + } + if position.Amount, ok = positionData[2].(float64); !ok { + return errors.New("unable to type assert position snapshot amount") + } + if position.Price, ok = positionData[3].(float64); !ok { + return errors.New("unable to type assert position snapshot price") + } + if position.MarginFunding, ok = positionData[4].(float64); !ok { + return errors.New("unable to type assert position snapshot margin funding") + } + marginFundingType, ok := positionData[5].(float64) + if !ok { + return errors.New("unable to type assert position snapshot margin funding type") + } + position.MarginFundingType = int64(marginFundingType) + if position.ProfitLoss, ok = positionData[6].(float64); !ok { + return errors.New("unable to type assert position snapshot profit loss") + } + if position.ProfitLossPercent, ok = positionData[7].(float64); !ok { + return errors.New("unable to type assert position snapshot profit loss percent") + } + if position.LiquidationPrice, ok = positionData[8].(float64); !ok { + return errors.New("unable to type assert position snapshot liquidation price") + } + if position.Leverage, ok = positionData[9].(float64); !ok { + return errors.New("unable to type assert position snapshot leverage") + } + snapshot[i] = position + } + b.Websocket.DataHandler <- snapshot + return nil +} + +func (b *Bitfinex) handleWSPositionUpdate(d []interface{}) error { + positionData, ok := d[2].([]interface{}) + if !ok || len(positionData) == 0 { + return nil + } + var position WebsocketPosition + if position.Pair, ok = positionData[0].(string); !ok { + return errors.New("unable to type assert position pair") + } + if position.Status, ok = positionData[1].(string); !ok { + return errors.New("unable to type assert position status") + } + if position.Amount, ok = positionData[2].(float64); !ok { + return errors.New("unable to type assert position amount") + } + if position.Price, ok = positionData[3].(float64); !ok { + return errors.New("unable to type assert position price") + } + if position.MarginFunding, ok = positionData[4].(float64); !ok { + return errors.New("unable to type assert margin position funding") + } + marginFundingType, ok := positionData[5].(float64) + if !ok { + return errors.New("unable to type assert position margin funding type") + } + position.MarginFundingType = int64(marginFundingType) + if position.ProfitLoss, ok = positionData[6].(float64); !ok { + return errors.New("unable to type assert position profit loss") + } + if position.ProfitLossPercent, ok = positionData[7].(float64); !ok { + return errors.New("unable to type assert position profit loss percent") + } + if position.LiquidationPrice, ok = positionData[8].(float64); !ok { + return errors.New("unable to type assert position liquidation price") + } + if position.Leverage, ok = positionData[9].(float64); !ok { + return errors.New("unable to type assert position leverage") + } + b.Websocket.DataHandler <- position + return nil +} + +func (b *Bitfinex) handleWSTradeUpdate(d []interface{}, eventType string) error { + tradeData, ok := d[2].([]interface{}) + if !ok || len(tradeData) <= 4 { + return nil + } + var tData WebsocketTradeData + var tradeID float64 + if tradeID, ok = tradeData[0].(float64); !ok { + return errors.New("unable to type assert trade ID") + } + tData.TradeID = int64(tradeID) + if tData.Pair, ok = tradeData[1].(string); !ok { + return errors.New("unable to type assert trade pair") + } + var timestamp float64 + if timestamp, ok = tradeData[2].(float64); !ok { + return errors.New("unable to type assert trade timestamp") + } + tData.Timestamp = int64(timestamp) + var orderID float64 + if orderID, ok = tradeData[3].(float64); !ok { + return errors.New("unable to type assert trade order ID") + } + tData.OrderID = int64(orderID) + if tData.AmountExecuted, ok = tradeData[4].(float64); !ok { + return errors.New("unable to type assert trade amount executed") + } + if tData.PriceExecuted, ok = tradeData[5].(float64); !ok { + return errors.New("unable to type assert trade price executed") + } + if tData.OrderType, ok = tradeData[6].(string); !ok { + return errors.New("unable to type assert trade order type") + } + if tData.OrderPrice, ok = tradeData[7].(float64); !ok { + return errors.New("unable to type assert trade order type") + } + var maker float64 + if maker, ok = tradeData[8].(float64); !ok { + return errors.New("unable to type assert trade maker") + } + tData.Maker = maker == 1 + if eventType == "tu" { + if tData.Fee, ok = tradeData[9].(float64); !ok { + return errors.New("unable to type assert trade fee") + } + if tData.FeeCurrency, ok = tradeData[10].(string); !ok { + return errors.New("unable to type assert trade fee currency") + } + } + b.Websocket.DataHandler <- tData + return nil +} + func wsHandleFundingOffer(data []interface{}, includeRateReal bool) (*WsFundingOffer, error) { var offer WsFundingOffer var ok bool