Skip to content

Commit

Permalink
WIP- Rebase me out
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Dec 11, 2024
1 parent ca62b0a commit 7f40416
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 56 deletions.
14 changes: 8 additions & 6 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,14 +1382,16 @@ func TestWsOrderBook(t *testing.T) {
assert.ErrorIs(t, err, errNoSeqNo, "handleWSBookUpdate should send correct error")
}

func TestWsTradeResponse(t *testing.T) {
func TestWSTrades(t *testing.T) {
t.Parallel()

b := new(Bitfinex) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(b), "Test instance Setup must not error")
b.SetSaveTradeDataStatus(true)
err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: subscription.AllTradesChannel, Key: 18788})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON := `[18788,[[412685577,1580268444802,11.1998,176.3],[412685575,1580268444802,5,176.29952759],[412685574,1580268374717,1.99069999,176.41],[412685573,1580268374717,1.00930001,176.41],[412685572,1580268358760,0.9907,176.47],[412685571,1580268324362,0.5505,176.44],[412685570,1580268297270,-0.39040819,176.39],[412685568,1580268297270,-0.39780162,176.46475676],[412685567,1580268283470,-0.09,176.41],[412685566,1580268256536,-2.31310783,176.48],[412685565,1580268256536,-0.59669217,176.49],[412685564,1580268256536,-0.9902,176.49],[412685562,1580268194474,0.9902,176.55],[412685561,1580268186215,0.1,176.6],[412685560,1580268185964,-2.17096773,176.5],[412685559,1580268185964,-1.82903227,176.51],[412685558,1580268181215,2.098914,176.53],[412685557,1580268169844,16.7302,176.55],[412685556,1580268169844,3.25,176.54],[412685555,1580268155725,0.23576115,176.45],[412685553,1580268155725,3,176.44596249],[412685552,1580268155725,3.25,176.44],[412685551,1580268155725,5,176.44],[412685550,1580268155725,0.65830078,176.41],[412685549,1580268155725,0.45063807,176.41],[412685548,1580268153825,-0.67604704,176.39],[412685547,1580268145713,2.5883,176.41],[412685543,1580268087513,12.92927,176.33],[412685542,1580268087513,0.40083,176.33],[412685533,1580268005756,-0.17096773,176.32]]]`
err = b.wsHandleData([]byte(pressXToJSON))
if err != nil {
t.Error(err)
}
testexch.FixtureToDataHandler(t, "testdata/wsAllTrades.json", b.wsHandleData)
close(b.Websocket.DataHandler)
}

func TestWsTickerResponse(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions exchanges/bitfinex/bitfinex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ var (
errSetCannotBeEmpty = errors.New("set cannot be empty")
errNoSeqNo = errors.New("no sequence number")
errParamNotAllowed = errors.New("param not allowed")
errParsingWSField = errors.New("error parsing WS field")
errTickerInvalidSymbol = errors.New("invalid ticker symbol")
errTickerInvalidResp = errors.New("invalid ticker response format")
errTickerInvalidFieldCount = errors.New("invalid ticker response field count")
Expand Down Expand Up @@ -625,7 +624,7 @@ const (
wsPositionClose = "pc"
wsWalletSnapshot = "ws"
wsWalletUpdate = "wu"
wsTradeExecutionUpdate = "tu"
wsTradeUpdated = "tu"
wsTradeExecuted = "te"
wsFundingCreditSnapshot = "fcs"
wsFundingCreditNew = "fcn"
Expand All @@ -636,7 +635,7 @@ const (
wsFundingLoanUpdate = "flu"
wsFundingLoanCancel = "flc"
wsFundingTradeExecuted = "fte"
wsFundingTradeUpdate = "ftu"
wsFundingTradeUpdated = "ftu"
wsFundingInfoUpdate = "fiu"
wsBalanceUpdate = "bu"
wsMarginInfoUpdate = "miu"
Expand Down
171 changes: 124 additions & 47 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bitfinex

import (
"bytes"
"context"
"encoding/json"
"errors"
Expand All @@ -16,6 +17,7 @@ import (

"github.com/Masterminds/sprig/v3"
"github.com/buger/jsonparser"
"github.com/davecgh/go-spew/spew"
"github.com/gorilla/websocket"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/convert"
Expand All @@ -33,6 +35,11 @@ import (
"github.com/thrasher-corp/gocryptotrader/log"
)

var (
errParsingWSField = errors.New("error parsing WS field")
errInvalidWSFieldCount = errors.New("invalid WS field count")
)

var defaultSubscriptions = subscription.List{
{Enabled: true, Channel: subscription.TickerChannel, Asset: asset.All},
{Enabled: true, Channel: subscription.AllTradesChannel, Asset: asset.All},
Expand Down Expand Up @@ -162,8 +169,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
eventType, hasEventType := d[1].(string)

if chanID != 0 {
if c := b.Websocket.GetSubscription(chanID); c != nil {
return b.handleWSChannelUpdate(c, eventType, d)
if s := b.Websocket.GetSubscription(chanID); s != nil {
return b.handleWSChannelUpdate(s, respRaw, eventType, d)
}
if b.Verbose {
log.Warnf(log.ExchangeSys, "%s %s; dropped WS message: %s", b.Name, subscription.ErrNotFound, respRaw)
Expand Down Expand Up @@ -201,8 +208,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
return b.handleWSPositionSnapshot(d)
case wsPositionNew, wsPositionUpdate, wsPositionClose:
return b.handleWSPositionUpdate(d)
case wsTradeExecuted, wsTradeExecutionUpdate:
return b.handleWSTradeUpdate(d, eventType)
case wsTradeExecuted, wsTradeUpdated:
return b.handleWSMyTradeUpdate(d, eventType)
case wsFundingOfferSnapshot:
if snapBundle, ok := d[2].([]interface{}); ok && len(snapBundle) > 0 {
if _, ok := snapBundle[0].([]interface{}); ok {
Expand Down Expand Up @@ -398,7 +405,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
b.Websocket.DataHandler <- fundingInfo
}
}
case wsFundingTradeExecuted, wsFundingTradeUpdate:
case wsFundingTradeExecuted, wsFundingTradeUpdated:
if data, ok := d[2].([]interface{}); ok && len(data) > 0 {
var wsFundingTrade WsFundingTrade
tradeID, ok := data[0].(float64)
Expand Down Expand Up @@ -544,16 +551,15 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
return nil
}

func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType string, d []interface{}) error {
func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, respRaw []byte, eventType string, d []interface{}) error {
if s == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}

if eventType == wsChecksum {
switch eventType {
case wsChecksum:
return b.handleWSChecksum(s, d)
}

if eventType == wsHeartbeat {
case wsHeartbeat:
return nil
}

Expand All @@ -569,7 +575,7 @@ func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType
case subscription.TickerChannel:
return b.handleWSTickerUpdate(s, d)
case subscription.AllTradesChannel:
return b.handleWSTradesUpdate(s, eventType, d)
return b.handleWSTrades(s, respRaw)
}

return fmt.Errorf("%s unhandled channel update: %s", b.Name, s.Channel)
Expand Down Expand Up @@ -869,7 +875,7 @@ func (b *Bitfinex) handleWSTickerUpdate(c *subscription.Subscription, d []interf
return nil
}

func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType string, d []interface{}) error {
func (b *Bitfinex) handleWSTrades(c *subscription.Subscription, respRaw []byte) error {
if c == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}
Expand All @@ -882,10 +888,92 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType
if c.Asset == asset.MarginFunding {
return nil
}
var tradeHolder []WebsocketTrade
switch len(d) {
case 2:
snapshot, ok := d[1].([]interface{})
_, valueType, _, err := jsonparser.Get(respRaw, "[1]")
if err != nil {
return fmt.Errorf("%w `tradesUpdate[1]`: %w", errParsingWSField, err)
}
// DO NOT COMMIT
var trades []trade.Data
switch valueType {
case jsonparser.String:
return b.handleWSTradesUpdate(c, respRaw)
case jsonparser.Array:
return b.handleWSTradesSnapshot(c, respRaw)
default:
return fmt.Errorf("%w `tradesUpdate[1]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType)
}
if err != nil {

Check failure on line 905 in exchanges/bitfinex/bitfinex_websocket.go

View workflow job for this annotation

GitHub Actions / lint

unreachable: unreachable code (govet)
return err
}
/*

Check failure on line 908 in exchanges/bitfinex/bitfinex_websocket.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
for i := range tradeHolder {
side := order.Buy
newAmount := tradeHolder[i].Amount
if newAmount < 0 {
side = order.Sell
newAmount *= -1
}
price := tradeHolder[i].Price
if price == 0 && tradeHolder[i].Rate > 0 {
price = tradeHolder[i].Rate
}
trades[i] = trade.Data{
TID: strconv.FormatInt(tradeHolder[i].ID, 10),
CurrencyPair: c.Pairs[0],
Timestamp: time.UnixMilli(tradeHolder[i].Timestamp),
Price: price,
Amount: newAmount,
Exchange: b.Name,
AssetType: c.Asset,
Side: side,
}
}
*/
return b.AddTradesToBuffer(trades...)
}

func (b *Bitfinex) handleWSTradesSnapshotTrade(s *subscription.Subscription, v []byte) error {

Check failure on line 935 in exchanges/bitfinex/bitfinex_websocket.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 's' seems to be unused, consider removing or renaming it as _ (revive)
c := &WebsocketTrade{}
if err := json.Unmarshal(v, &[]any{&c.ID, &c.Timestamp, &c.Amount, &c.Price}); err != nil {
return err
}
spew.Dump(c)

return nil
}

func (b *Bitfinex) handleWSTradesSnapshotFundingTrade(s *subscription.Subscription, v []byte) error {

Check failure on line 945 in exchanges/bitfinex/bitfinex_websocket.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 's' seems to be unused, consider removing or renaming it as _ (revive)
return nil
}

func (b *Bitfinex) handleWSTradesSnapshot(s *subscription.Subscription, respRaw []byte) error {
var errs error
handleTrade := func(v []byte, valueType jsonparser.ValueType, _ int, _ error) {
var err error
if valueType != jsonparser.Array {
err = fmt.Errorf("%w `tradesUpdate[1][*]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType)

Check failure on line 954 in exchanges/bitfinex/bitfinex_websocket.go

View workflow job for this annotation

GitHub Actions / lint

ineffectual assignment to err (ineffassign)
} else {
fields := bytes.Count(v, []byte(",")) + 1
switch fields {
case 4:
err = b.handleWSTradesSnapshotTrade(s, v)
case 5:
err = b.handleWSTradesSnapshotFundingTrade(s, v)
default:
err = fmt.Errorf("%w `tradesUpdate[1][*]`: %w `%d`", errParsingWSField, errInvalidWSFieldCount, fields)
}
errs = common.AppendError(errs, err)
}
}

_, err := jsonparser.ArrayEach(respRaw, handleTrade, "[1]")

return common.AppendError(errs, err)
/*

Check failure on line 972 in exchanges/bitfinex/bitfinex_websocket.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
var trades []WebsocketTrade
if !ok {
return errors.New("unable to type assert trade snapshot data")
}
Expand Down Expand Up @@ -931,9 +1019,22 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType
}
tradeHolder = append(tradeHolder, wsTrade)
}
case 3:
if eventType != wsFundingTradeUpdate && eventType != wsTradeExecutionUpdate {
return fmt.Errorf("unhandled WS trade update event: %s", eventType)
return nil
*/
}

func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, respRaw []byte) error {

Check failure on line 1026 in exchanges/bitfinex/bitfinex_websocket.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'c' seems to be unused, consider removing or renaming it as _ (revive)
/*
updateType, err := jsonparser.GetString(respRaw, "[1]")
if err != nil {
return fmt.Errorf("%w `tradesUpdate[1]`: %s", errParsingWSField, err)
}
switch updateType {
case wsFundingTradeUpdated, wsTradeUpdated:
return b.handleWSPublicTrade(c, respRaw)
default:
panic("TODO: ", updateType)
return nil, fmt.Errorf("unhandled WS trade update event: %s", updateType)
}
data, ok := d[2].([]interface{})
if !ok {
Expand Down Expand Up @@ -975,33 +1076,9 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType
}
wsTrade.Price = price
}
tradeHolder = append(tradeHolder, wsTrade)
}
trades := make([]trade.Data, len(tradeHolder))
for i := range tradeHolder {
side := order.Buy
newAmount := tradeHolder[i].Amount
if newAmount < 0 {
side = order.Sell
newAmount *= -1
}
price := tradeHolder[i].Price
if price == 0 && tradeHolder[i].Rate > 0 {
price = tradeHolder[i].Rate
}
trades[i] = trade.Data{
TID: strconv.FormatInt(tradeHolder[i].ID, 10),
CurrencyPair: c.Pairs[0],
Timestamp: time.UnixMilli(tradeHolder[i].Timestamp),
Price: price,
Amount: newAmount,
Exchange: b.Name,
AssetType: c.Asset,
Side: side,
}
}

return b.AddTradesToBuffer(trades...)
return []trade.Data{t}, nil
*/
return nil
}

func (b *Bitfinex) handleWSNotification(d []interface{}, respRaw []byte) error {
Expand Down Expand Up @@ -1173,7 +1250,7 @@ func (b *Bitfinex) handleWSPositionUpdate(d []interface{}) error {
return nil
}

func (b *Bitfinex) handleWSTradeUpdate(d []interface{}, eventType string) error {
func (b *Bitfinex) handleWSMyTradeUpdate(d []interface{}, eventType string) error {
tradeData, ok := d[2].([]interface{})
if !ok {
return common.GetTypeAssertError("[]interface{}", d[2], "tradeUpdate")
Expand Down
1 change: 1 addition & 0 deletions exchanges/bitfinex/testdata/wsAllTrades.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[18788,[[412685577,1580268444802,11.1998,176.3],[412685575,1580268444802,5,176.29952759],[412685574,1580268374717,1.99069999,176.41],[412685573,1580268374717,1.00930001,176.41],[412685572,1580268358760,0.9907,176.47],[412685571,1580268324362,0.5505,176.44],[412685570,1580268297270,-0.39040819,176.39],[412685568,1580268297270,-0.39780162,176.46475676],[412685567,1580268283470,-0.09,176.41],[412685566,1580268256536,-2.31310783,176.48],[412685565,1580268256536,-0.59669217,176.49],[412685564,1580268256536,-0.9902,176.49],[412685562,1580268194474,0.9902,176.55],[412685561,1580268186215,0.1,176.6],[412685560,1580268185964,-2.17096773,176.5],[412685559,1580268185964,-1.82903227,176.51],[412685558,1580268181215,2.098914,176.53],[412685557,1580268169844,16.7302,176.55],[412685556,1580268169844,3.25,176.54],[412685555,1580268155725,0.23576115,176.45],[412685553,1580268155725,3,176.44596249],[412685552,1580268155725,3.25,176.44],[412685551,1580268155725,5,176.44],[412685550,1580268155725,0.65830078,176.41],[412685549,1580268155725,0.45063807,176.41],[412685548,1580268153825,-0.67604704,176.39],[412685547,1580268145713,2.5883,176.41],[412685543,1580268087513,12.92927,176.33],[412685542,1580268087513,0.40083,176.33],[412685533,1580268005756,-0.17096773,176.32]],1]

0 comments on commit 7f40416

Please sign in to comment.