diff --git a/exchanges/bitfinex/bitfinex_test.go b/exchanges/bitfinex/bitfinex_test.go index 854d7a46761..92f41cdf88f 100644 --- a/exchanges/bitfinex/bitfinex_test.go +++ b/exchanges/bitfinex/bitfinex_test.go @@ -1382,14 +1382,16 @@ func TestWsOrderBook(t *testing.T) { assert.ErrorIs(t, err, errNoSeqNo, "handleWSBookUpdate should send correct error") } -func TestWsTradeResponse(t *testing.T) { +func TestWSTrades(t *testing.T) { + t.Parallel() + + b := new(Bitfinex) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes + require.NoError(t, testexch.Setup(b), "Test instance Setup must not error") + b.SetSaveTradeDataStatus(true) err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: subscription.AllTradesChannel, Key: 18788}) require.NoError(t, err, "AddSubscriptions must not error") - pressXToJSON := `[18788,[[412685577,1580268444802,11.1998,176.3],[412685575,1580268444802,5,176.29952759],[412685574,1580268374717,1.99069999,176.41],[412685573,1580268374717,1.00930001,176.41],[412685572,1580268358760,0.9907,176.47],[412685571,1580268324362,0.5505,176.44],[412685570,1580268297270,-0.39040819,176.39],[412685568,1580268297270,-0.39780162,176.46475676],[412685567,1580268283470,-0.09,176.41],[412685566,1580268256536,-2.31310783,176.48],[412685565,1580268256536,-0.59669217,176.49],[412685564,1580268256536,-0.9902,176.49],[412685562,1580268194474,0.9902,176.55],[412685561,1580268186215,0.1,176.6],[412685560,1580268185964,-2.17096773,176.5],[412685559,1580268185964,-1.82903227,176.51],[412685558,1580268181215,2.098914,176.53],[412685557,1580268169844,16.7302,176.55],[412685556,1580268169844,3.25,176.54],[412685555,1580268155725,0.23576115,176.45],[412685553,1580268155725,3,176.44596249],[412685552,1580268155725,3.25,176.44],[412685551,1580268155725,5,176.44],[412685550,1580268155725,0.65830078,176.41],[412685549,1580268155725,0.45063807,176.41],[412685548,1580268153825,-0.67604704,176.39],[412685547,1580268145713,2.5883,176.41],[412685543,1580268087513,12.92927,176.33],[412685542,1580268087513,0.40083,176.33],[412685533,1580268005756,-0.17096773,176.32]]]` - err = b.wsHandleData([]byte(pressXToJSON)) - if err != nil { - t.Error(err) - } + testexch.FixtureToDataHandler(t, "testdata/wsAllTrades.json", b.wsHandleData) + close(b.Websocket.DataHandler) } func TestWsTickerResponse(t *testing.T) { diff --git a/exchanges/bitfinex/bitfinex_types.go b/exchanges/bitfinex/bitfinex_types.go index 69c506ea977..769f7632f69 100644 --- a/exchanges/bitfinex/bitfinex_types.go +++ b/exchanges/bitfinex/bitfinex_types.go @@ -14,7 +14,6 @@ var ( errSetCannotBeEmpty = errors.New("set cannot be empty") errNoSeqNo = errors.New("no sequence number") errParamNotAllowed = errors.New("param not allowed") - errParsingWSField = errors.New("error parsing WS field") errTickerInvalidSymbol = errors.New("invalid ticker symbol") errTickerInvalidResp = errors.New("invalid ticker response format") errTickerInvalidFieldCount = errors.New("invalid ticker response field count") @@ -625,7 +624,7 @@ const ( wsPositionClose = "pc" wsWalletSnapshot = "ws" wsWalletUpdate = "wu" - wsTradeExecutionUpdate = "tu" + wsTradeUpdated = "tu" wsTradeExecuted = "te" wsFundingCreditSnapshot = "fcs" wsFundingCreditNew = "fcn" @@ -636,7 +635,7 @@ const ( wsFundingLoanUpdate = "flu" wsFundingLoanCancel = "flc" wsFundingTradeExecuted = "fte" - wsFundingTradeUpdate = "ftu" + wsFundingTradeUpdated = "ftu" wsFundingInfoUpdate = "fiu" wsBalanceUpdate = "bu" wsMarginInfoUpdate = "miu" diff --git a/exchanges/bitfinex/bitfinex_websocket.go b/exchanges/bitfinex/bitfinex_websocket.go index 74c04afc2d3..a734a32ee16 100644 --- a/exchanges/bitfinex/bitfinex_websocket.go +++ b/exchanges/bitfinex/bitfinex_websocket.go @@ -1,6 +1,7 @@ package bitfinex import ( + "bytes" "context" "encoding/json" "errors" @@ -16,6 +17,7 @@ import ( "github.com/Masterminds/sprig/v3" "github.com/buger/jsonparser" + "github.com/davecgh/go-spew/spew" "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/convert" @@ -33,6 +35,11 @@ import ( "github.com/thrasher-corp/gocryptotrader/log" ) +var ( + errParsingWSField = errors.New("error parsing WS field") + errInvalidWSFieldCount = errors.New("invalid WS field count") +) + var defaultSubscriptions = subscription.List{ {Enabled: true, Channel: subscription.TickerChannel, Asset: asset.All}, {Enabled: true, Channel: subscription.AllTradesChannel, Asset: asset.All}, @@ -162,8 +169,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { eventType, hasEventType := d[1].(string) if chanID != 0 { - if c := b.Websocket.GetSubscription(chanID); c != nil { - return b.handleWSChannelUpdate(c, eventType, d) + if s := b.Websocket.GetSubscription(chanID); s != nil { + return b.handleWSChannelUpdate(s, respRaw, eventType, d) } if b.Verbose { log.Warnf(log.ExchangeSys, "%s %s; dropped WS message: %s", b.Name, subscription.ErrNotFound, respRaw) @@ -201,8 +208,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { return b.handleWSPositionSnapshot(d) case wsPositionNew, wsPositionUpdate, wsPositionClose: return b.handleWSPositionUpdate(d) - case wsTradeExecuted, wsTradeExecutionUpdate: - return b.handleWSTradeUpdate(d, eventType) + case wsTradeExecuted, wsTradeUpdated: + return b.handleWSMyTradeUpdate(d, eventType) case wsFundingOfferSnapshot: if snapBundle, ok := d[2].([]interface{}); ok && len(snapBundle) > 0 { if _, ok := snapBundle[0].([]interface{}); ok { @@ -398,7 +405,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error { b.Websocket.DataHandler <- fundingInfo } } - case wsFundingTradeExecuted, wsFundingTradeUpdate: + case wsFundingTradeExecuted, wsFundingTradeUpdated: if data, ok := d[2].([]interface{}); ok && len(data) > 0 { var wsFundingTrade WsFundingTrade tradeID, ok := data[0].(float64) @@ -544,16 +551,15 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error { return nil } -func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType string, d []interface{}) error { +func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, respRaw []byte, eventType string, d []interface{}) error { if s == nil { return fmt.Errorf("%w: Subscription param", common.ErrNilPointer) } - if eventType == wsChecksum { + switch eventType { + case wsChecksum: return b.handleWSChecksum(s, d) - } - - if eventType == wsHeartbeat { + case wsHeartbeat: return nil } @@ -569,7 +575,7 @@ func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType case subscription.TickerChannel: return b.handleWSTickerUpdate(s, d) case subscription.AllTradesChannel: - return b.handleWSTradesUpdate(s, eventType, d) + return b.handleWSTrades(s, respRaw) } return fmt.Errorf("%s unhandled channel update: %s", b.Name, s.Channel) @@ -869,7 +875,7 @@ func (b *Bitfinex) handleWSTickerUpdate(c *subscription.Subscription, d []interf return nil } -func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType string, d []interface{}) error { +func (b *Bitfinex) handleWSTrades(c *subscription.Subscription, respRaw []byte) error { if c == nil { return fmt.Errorf("%w: Subscription param", common.ErrNilPointer) } @@ -882,10 +888,92 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType if c.Asset == asset.MarginFunding { return nil } - var tradeHolder []WebsocketTrade - switch len(d) { - case 2: - snapshot, ok := d[1].([]interface{}) + _, valueType, _, err := jsonparser.Get(respRaw, "[1]") + if err != nil { + return fmt.Errorf("%w `tradesUpdate[1]`: %w", errParsingWSField, err) + } + // DO NOT COMMIT + var trades []trade.Data + switch valueType { + case jsonparser.String: + return b.handleWSTradesUpdate(c, respRaw) + case jsonparser.Array: + return b.handleWSTradesSnapshot(c, respRaw) + default: + return fmt.Errorf("%w `tradesUpdate[1]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType) + } + if err != nil { + return err + } + /* + for i := range tradeHolder { + side := order.Buy + newAmount := tradeHolder[i].Amount + if newAmount < 0 { + side = order.Sell + newAmount *= -1 + } + price := tradeHolder[i].Price + if price == 0 && tradeHolder[i].Rate > 0 { + price = tradeHolder[i].Rate + } + trades[i] = trade.Data{ + TID: strconv.FormatInt(tradeHolder[i].ID, 10), + CurrencyPair: c.Pairs[0], + Timestamp: time.UnixMilli(tradeHolder[i].Timestamp), + Price: price, + Amount: newAmount, + Exchange: b.Name, + AssetType: c.Asset, + Side: side, + } + } + */ + return b.AddTradesToBuffer(trades...) +} + +func (b *Bitfinex) handleWSTradesSnapshotTrade(s *subscription.Subscription, v []byte) error { + c := &WebsocketTrade{} + if err := json.Unmarshal(v, &[]any{&c.ID, &c.Timestamp, &c.Amount, &c.Price}); err != nil { + return err + } + spew.Dump(c) + + return nil +} + +func (b *Bitfinex) handleWSTradesSnapshotFundingTrade(s *subscription.Subscription, v []byte) error { + return nil +} + +func (b *Bitfinex) handleWSTradesSnapshot(s *subscription.Subscription, respRaw []byte) error { + var errs error + handleTrade := func(v []byte, valueType jsonparser.ValueType, _ int, _ error) { + var err error + if valueType != jsonparser.Array { + err = fmt.Errorf("%w `tradesUpdate[1][*]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType) + } else { + fields := bytes.Count(v, []byte(",")) + 1 + switch fields { + case 4: + err = b.handleWSTradesSnapshotTrade(s, v) + case 5: + err = b.handleWSTradesSnapshotFundingTrade(s, v) + default: + err = fmt.Errorf("%w `tradesUpdate[1][*]`: %w `%d`", errParsingWSField, errInvalidWSFieldCount, fields) + } + errs = common.AppendError(errs, err) + } + } + + _, err := jsonparser.ArrayEach(respRaw, handleTrade, "[1]") + + return common.AppendError(errs, err) + /* + + + var trades []WebsocketTrade + if !ok { return errors.New("unable to type assert trade snapshot data") } @@ -931,9 +1019,22 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType } tradeHolder = append(tradeHolder, wsTrade) } - case 3: - if eventType != wsFundingTradeUpdate && eventType != wsTradeExecutionUpdate { - return fmt.Errorf("unhandled WS trade update event: %s", eventType) + return nil + */ +} + +func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, respRaw []byte) error { + /* + updateType, err := jsonparser.GetString(respRaw, "[1]") + if err != nil { + return fmt.Errorf("%w `tradesUpdate[1]`: %s", errParsingWSField, err) + } + switch updateType { + case wsFundingTradeUpdated, wsTradeUpdated: + return b.handleWSPublicTrade(c, respRaw) + default: + panic("TODO: ", updateType) + return nil, fmt.Errorf("unhandled WS trade update event: %s", updateType) } data, ok := d[2].([]interface{}) if !ok { @@ -975,33 +1076,9 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType } wsTrade.Price = price } - tradeHolder = append(tradeHolder, wsTrade) - } - trades := make([]trade.Data, len(tradeHolder)) - for i := range tradeHolder { - side := order.Buy - newAmount := tradeHolder[i].Amount - if newAmount < 0 { - side = order.Sell - newAmount *= -1 - } - price := tradeHolder[i].Price - if price == 0 && tradeHolder[i].Rate > 0 { - price = tradeHolder[i].Rate - } - trades[i] = trade.Data{ - TID: strconv.FormatInt(tradeHolder[i].ID, 10), - CurrencyPair: c.Pairs[0], - Timestamp: time.UnixMilli(tradeHolder[i].Timestamp), - Price: price, - Amount: newAmount, - Exchange: b.Name, - AssetType: c.Asset, - Side: side, - } - } - - return b.AddTradesToBuffer(trades...) + return []trade.Data{t}, nil + */ + return nil } func (b *Bitfinex) handleWSNotification(d []interface{}, respRaw []byte) error { @@ -1173,7 +1250,7 @@ func (b *Bitfinex) handleWSPositionUpdate(d []interface{}) error { return nil } -func (b *Bitfinex) handleWSTradeUpdate(d []interface{}, eventType string) error { +func (b *Bitfinex) handleWSMyTradeUpdate(d []interface{}, eventType string) error { tradeData, ok := d[2].([]interface{}) if !ok { return common.GetTypeAssertError("[]interface{}", d[2], "tradeUpdate") diff --git a/exchanges/bitfinex/testdata/wsAllTrades.json b/exchanges/bitfinex/testdata/wsAllTrades.json new file mode 100644 index 00000000000..5db0ae27e41 --- /dev/null +++ b/exchanges/bitfinex/testdata/wsAllTrades.json @@ -0,0 +1 @@ +[18788,[[412685577,1580268444802,11.1998,176.3],[412685575,1580268444802,5,176.29952759],[412685574,1580268374717,1.99069999,176.41],[412685573,1580268374717,1.00930001,176.41],[412685572,1580268358760,0.9907,176.47],[412685571,1580268324362,0.5505,176.44],[412685570,1580268297270,-0.39040819,176.39],[412685568,1580268297270,-0.39780162,176.46475676],[412685567,1580268283470,-0.09,176.41],[412685566,1580268256536,-2.31310783,176.48],[412685565,1580268256536,-0.59669217,176.49],[412685564,1580268256536,-0.9902,176.49],[412685562,1580268194474,0.9902,176.55],[412685561,1580268186215,0.1,176.6],[412685560,1580268185964,-2.17096773,176.5],[412685559,1580268185964,-1.82903227,176.51],[412685558,1580268181215,2.098914,176.53],[412685557,1580268169844,16.7302,176.55],[412685556,1580268169844,3.25,176.54],[412685555,1580268155725,0.23576115,176.45],[412685553,1580268155725,3,176.44596249],[412685552,1580268155725,3.25,176.44],[412685551,1580268155725,5,176.44],[412685550,1580268155725,0.65830078,176.41],[412685549,1580268155725,0.45063807,176.41],[412685548,1580268153825,-0.67604704,176.39],[412685547,1580268145713,2.5883,176.41],[412685543,1580268087513,12.92927,176.33],[412685542,1580268087513,0.40083,176.33],[412685533,1580268005756,-0.17096773,176.32]],1]