Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Dec 12, 2024
1 parent ca62b0a commit 35646d3
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 68 deletions.
14 changes: 8 additions & 6 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,14 +1382,16 @@ func TestWsOrderBook(t *testing.T) {
assert.ErrorIs(t, err, errNoSeqNo, "handleWSBookUpdate should send correct error")
}

func TestWsTradeResponse(t *testing.T) {
func TestWSTrades(t *testing.T) {
t.Parallel()

b := new(Bitfinex) //nolint:govet // Intentional shadow to avoid future copy/paste mistakes
require.NoError(t, testexch.Setup(b), "Test instance Setup must not error")
b.SetSaveTradeDataStatus(true)
err := b.Websocket.AddSubscriptions(b.Websocket.Conn, &subscription.Subscription{Asset: asset.Spot, Pairs: currency.Pairs{btcusdPair}, Channel: subscription.AllTradesChannel, Key: 18788})
require.NoError(t, err, "AddSubscriptions must not error")
pressXToJSON := `[18788,[[412685577,1580268444802,11.1998,176.3],[412685575,1580268444802,5,176.29952759],[412685574,1580268374717,1.99069999,176.41],[412685573,1580268374717,1.00930001,176.41],[412685572,1580268358760,0.9907,176.47],[412685571,1580268324362,0.5505,176.44],[412685570,1580268297270,-0.39040819,176.39],[412685568,1580268297270,-0.39780162,176.46475676],[412685567,1580268283470,-0.09,176.41],[412685566,1580268256536,-2.31310783,176.48],[412685565,1580268256536,-0.59669217,176.49],[412685564,1580268256536,-0.9902,176.49],[412685562,1580268194474,0.9902,176.55],[412685561,1580268186215,0.1,176.6],[412685560,1580268185964,-2.17096773,176.5],[412685559,1580268185964,-1.82903227,176.51],[412685558,1580268181215,2.098914,176.53],[412685557,1580268169844,16.7302,176.55],[412685556,1580268169844,3.25,176.54],[412685555,1580268155725,0.23576115,176.45],[412685553,1580268155725,3,176.44596249],[412685552,1580268155725,3.25,176.44],[412685551,1580268155725,5,176.44],[412685550,1580268155725,0.65830078,176.41],[412685549,1580268155725,0.45063807,176.41],[412685548,1580268153825,-0.67604704,176.39],[412685547,1580268145713,2.5883,176.41],[412685543,1580268087513,12.92927,176.33],[412685542,1580268087513,0.40083,176.33],[412685533,1580268005756,-0.17096773,176.32]]]`
err = b.wsHandleData([]byte(pressXToJSON))
if err != nil {
t.Error(err)
}
testexch.FixtureToDataHandler(t, "testdata/wsAllTrades.json", b.wsHandleData)
close(b.Websocket.DataHandler)
}

func TestWsTickerResponse(t *testing.T) {
Expand Down
16 changes: 6 additions & 10 deletions exchanges/bitfinex/bitfinex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ var (
errSetCannotBeEmpty = errors.New("set cannot be empty")
errNoSeqNo = errors.New("no sequence number")
errParamNotAllowed = errors.New("param not allowed")
errParsingWSField = errors.New("error parsing WS field")
errTickerInvalidSymbol = errors.New("invalid ticker symbol")
errTickerInvalidResp = errors.New("invalid ticker response format")
errTickerInvalidFieldCount = errors.New("invalid ticker response field count")
Expand Down Expand Up @@ -488,16 +487,13 @@ type WebsocketBook struct {
Period int64
}

// WebsocketTrade holds trade information
type WebsocketTrade struct {
// WsTrade holds trade information
type WsTrade struct {
ID int64
Timestamp int64
Price float64
Amount float64
// Funding rate of the trade
Rate float64
// Funding offer period in days
Period int64
Price float64
Period int64 // Funding offer period in days
}

// Candle holds OHLC data
Expand Down Expand Up @@ -625,7 +621,7 @@ const (
wsPositionClose = "pc"
wsWalletSnapshot = "ws"
wsWalletUpdate = "wu"
wsTradeExecutionUpdate = "tu"
wsTradeUpdated = "tu"
wsTradeExecuted = "te"
wsFundingCreditSnapshot = "fcs"
wsFundingCreditNew = "fcn"
Expand All @@ -636,7 +632,7 @@ const (
wsFundingLoanUpdate = "flu"
wsFundingLoanCancel = "flc"
wsFundingTradeExecuted = "fte"
wsFundingTradeUpdate = "ftu"
wsFundingTradeUpdated = "ftu"
wsFundingInfoUpdate = "fiu"
wsBalanceUpdate = "bu"
wsMarginInfoUpdate = "miu"
Expand Down
162 changes: 110 additions & 52 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ import (
"github.com/thrasher-corp/gocryptotrader/log"
)

var (
errParsingWSField = errors.New("error parsing WS field")
errInvalidWSFieldCount = errors.New("invalid WS field count")
)

var defaultSubscriptions = subscription.List{
{Enabled: true, Channel: subscription.TickerChannel, Asset: asset.All},
{Enabled: true, Channel: subscription.AllTradesChannel, Asset: asset.All},
Expand Down Expand Up @@ -162,8 +167,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
eventType, hasEventType := d[1].(string)

if chanID != 0 {
if c := b.Websocket.GetSubscription(chanID); c != nil {
return b.handleWSChannelUpdate(c, eventType, d)
if s := b.Websocket.GetSubscription(chanID); s != nil {
return b.handleWSChannelUpdate(s, respRaw, eventType, d)
}
if b.Verbose {
log.Warnf(log.ExchangeSys, "%s %s; dropped WS message: %s", b.Name, subscription.ErrNotFound, respRaw)
Expand Down Expand Up @@ -201,8 +206,8 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
return b.handleWSPositionSnapshot(d)
case wsPositionNew, wsPositionUpdate, wsPositionClose:
return b.handleWSPositionUpdate(d)
case wsTradeExecuted, wsTradeExecutionUpdate:
return b.handleWSTradeUpdate(d, eventType)
case wsTradeExecuted, wsTradeUpdated:
return b.handleWSMyTradeUpdate(d, eventType)
case wsFundingOfferSnapshot:
if snapBundle, ok := d[2].([]interface{}); ok && len(snapBundle) > 0 {
if _, ok := snapBundle[0].([]interface{}); ok {
Expand Down Expand Up @@ -398,7 +403,7 @@ func (b *Bitfinex) wsHandleData(respRaw []byte) error {
b.Websocket.DataHandler <- fundingInfo
}
}
case wsFundingTradeExecuted, wsFundingTradeUpdate:
case wsFundingTradeExecuted, wsFundingTradeUpdated:
if data, ok := d[2].([]interface{}); ok && len(data) > 0 {
var wsFundingTrade WsFundingTrade
tradeID, ok := data[0].(float64)
Expand Down Expand Up @@ -544,16 +549,15 @@ func (b *Bitfinex) handleWSSubscribed(respRaw []byte) error {
return nil
}

func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType string, d []interface{}) error {
func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, respRaw []byte, eventType string, d []interface{}) error {
if s == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}

if eventType == wsChecksum {
switch eventType {
case wsChecksum:
return b.handleWSChecksum(s, d)
}

if eventType == wsHeartbeat {
case wsHeartbeat:
return nil
}

Expand All @@ -569,7 +573,7 @@ func (b *Bitfinex) handleWSChannelUpdate(s *subscription.Subscription, eventType
case subscription.TickerChannel:
return b.handleWSTickerUpdate(s, d)
case subscription.AllTradesChannel:
return b.handleWSTradesUpdate(s, eventType, d)
return b.handleWSTrades(s, respRaw)
}

return fmt.Errorf("%s unhandled channel update: %s", b.Name, s.Channel)
Expand Down Expand Up @@ -869,23 +873,88 @@ func (b *Bitfinex) handleWSTickerUpdate(c *subscription.Subscription, d []interf
return nil
}

func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType string, d []interface{}) error {
if c == nil {
func (b *Bitfinex) handleWSTrades(s *subscription.Subscription, respRaw []byte) error {
if s == nil {
return fmt.Errorf("%w: Subscription param", common.ErrNilPointer)
}
if len(c.Pairs) != 1 {
if len(s.Pairs) != 1 {
return subscription.ErrNotSinglePair
}
if !b.IsSaveTradeDataEnabled() {
return nil
}
if c.Asset == asset.MarginFunding {
if s.Asset == asset.MarginFunding {
return nil
}
var tradeHolder []WebsocketTrade
switch len(d) {
case 2:
snapshot, ok := d[1].([]interface{})
_, valueType, _, err := jsonparser.Get(respRaw, "[1]")
if err != nil {
return fmt.Errorf("%w `tradesUpdate[1]`: %w", errParsingWSField, err)
}
// DO NOT COMMIT
var wsTrades []*WsTrade
switch valueType {
case jsonparser.String:
wsTrades, err = b.handleWSTradesUpdate(respRaw)
case jsonparser.Array:
wsTrades, err = b.handleWSTradesSnapshot(respRaw)
default:
return fmt.Errorf("%w `tradesUpdate[1]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType)
}
if err != nil {
return err
}
trades := make([]trade.Data, len(wsTrades))
for i, w := range wsTrades {
t := trade.Data{
Exchange: b.Name,
AssetType: s.Asset,
CurrencyPair: s.Pairs[0],
TID: strconv.FormatInt(w.ID, 10),
Timestamp: time.UnixMilli(w.Timestamp),
Side: order.Buy,
Amount: w.Amount,
Price: w.Price,
}
if t.Amount < 0 {
t.Side = order.Sell
t.Amount *= -1
}

trades[i] = t
}
return b.AddTradesToBuffer(trades...)
}

func (b *Bitfinex) handleWSTradesSnapshotFundingTrade(s *subscription.Subscription, v []byte) error {

Check failure on line 928 in exchanges/bitfinex/bitfinex_websocket.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 's' seems to be unused, consider removing or renaming it as _ (revive)
return nil
}

func (b *Bitfinex) handleWSTradesSnapshot(respRaw []byte) (trades []*WsTrade, errs error) {
handleTrade := func(v []byte, valueType jsonparser.ValueType, _ int, _ error) {
if valueType != jsonparser.Array {
errs = common.AppendError(errs, fmt.Errorf("%w `tradesUpdate[1][*]`: %w `%s`", errParsingWSField, jsonparser.UnknownValueTypeError, valueType))
} else {
// Trade updates have 4 fields with just price; Funding updates have 5 fields, with rate and period
// In either case we can't use the period and we're putting rate into price eventually anyway
t := &WsTrade{}
if err := json.Unmarshal(v, &[]any{&t.ID, &t.Timestamp, &t.Amount, &t.Price}); err != nil {
errs = common.AppendError(errs, fmt.Errorf("%w `tradesUpdate[1][*]`: %w", errParsingWSField, err))
} else {
trades = append(trades, t)
}
}
}

_, err := jsonparser.ArrayEach(respRaw, handleTrade, "[1]")

common.AppendError(errs, err)

Check failure on line 950 in exchanges/bitfinex/bitfinex_websocket.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `common.AppendError` is not checked (errcheck)

return
/*

Check failure on line 953 in exchanges/bitfinex/bitfinex_websocket.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
var trades []WsTrade
if !ok {
return errors.New("unable to type assert trade snapshot data")
}
Expand All @@ -906,7 +975,7 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType
if !ok {
return errors.New("unable to type assert trade amount")
}
wsTrade := WebsocketTrade{
wsTrade := WsTrade{
ID: int64(tradeID),
Timestamp: int64(timestamp),
Amount: amount,
Expand All @@ -931,9 +1000,22 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType
}
tradeHolder = append(tradeHolder, wsTrade)
}
case 3:
if eventType != wsFundingTradeUpdate && eventType != wsTradeExecutionUpdate {
return fmt.Errorf("unhandled WS trade update event: %s", eventType)
return nil
*/
}

func (b *Bitfinex) handleWSTradesUpdate(respRaw []byte) ([]*WsTrade, error) {

Check failure on line 1007 in exchanges/bitfinex/bitfinex_websocket.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'respRaw' seems to be unused, consider removing or renaming it as _ (revive)
/*
updateType, err := jsonparser.GetString(respRaw, "[1]")
if err != nil {
return fmt.Errorf("%w `tradesUpdate[1]`: %s", errParsingWSField, err)
}
switch updateType {
case wsFundingTradeUpdated, wsTradeUpdated:
return b.handleWSPublicTrade(c, respRaw)
default:
panic("TODO: ", updateType)
return nil, fmt.Errorf("unhandled WS trade update event: %s", updateType)
}
data, ok := d[2].([]interface{})
if !ok {
Expand All @@ -952,7 +1034,7 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType
if !ok {
return errors.New("unable to type assert trade amount")
}
wsTrade := WebsocketTrade{
wsTrade := WsTrade{
ID: int64(tradeID),
Timestamp: int64(timestamp),
Amount: amount,
Expand All @@ -975,33 +1057,9 @@ func (b *Bitfinex) handleWSTradesUpdate(c *subscription.Subscription, eventType
}
wsTrade.Price = price
}
tradeHolder = append(tradeHolder, wsTrade)
}
trades := make([]trade.Data, len(tradeHolder))
for i := range tradeHolder {
side := order.Buy
newAmount := tradeHolder[i].Amount
if newAmount < 0 {
side = order.Sell
newAmount *= -1
}
price := tradeHolder[i].Price
if price == 0 && tradeHolder[i].Rate > 0 {
price = tradeHolder[i].Rate
}
trades[i] = trade.Data{
TID: strconv.FormatInt(tradeHolder[i].ID, 10),
CurrencyPair: c.Pairs[0],
Timestamp: time.UnixMilli(tradeHolder[i].Timestamp),
Price: price,
Amount: newAmount,
Exchange: b.Name,
AssetType: c.Asset,
Side: side,
}
}

return b.AddTradesToBuffer(trades...)
return []trade.Data{t}, nil
*/
return nil, nil
}

func (b *Bitfinex) handleWSNotification(d []interface{}, respRaw []byte) error {
Expand Down Expand Up @@ -1173,7 +1231,7 @@ func (b *Bitfinex) handleWSPositionUpdate(d []interface{}) error {
return nil
}

func (b *Bitfinex) handleWSTradeUpdate(d []interface{}, eventType string) error {
func (b *Bitfinex) handleWSMyTradeUpdate(d []interface{}, eventType string) error {
tradeData, ok := d[2].([]interface{})
if !ok {
return common.GetTypeAssertError("[]interface{}", d[2], "tradeUpdate")
Expand Down
1 change: 1 addition & 0 deletions exchanges/bitfinex/testdata/wsAllTrades.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[18788,[[412685577,1580268444802,11.1998,176.3],[412685575,1580268444802,5,176.29952759],[412685574,1580268374717,1.99069999,176.41],[412685573,1580268374717,1.00930001,176.41],[412685572,1580268358760,0.9907,176.47],[412685571,1580268324362,0.5505,176.44],[412685570,1580268297270,-0.39040819,176.39],[412685568,1580268297270,-0.39780162,176.46475676],[412685567,1580268283470,-0.09,176.41],[412685566,1580268256536,-2.31310783,176.48],[412685565,1580268256536,-0.59669217,176.49],[412685564,1580268256536,-0.9902,176.49],[412685562,1580268194474,0.9902,176.55],[412685561,1580268186215,0.1,176.6],[412685560,1580268185964,-2.17096773,176.5],[412685559,1580268185964,-1.82903227,176.51],[412685558,1580268181215,2.098914,176.53],[412685557,1580268169844,16.7302,176.55],[412685556,1580268169844,3.25,176.54],[412685555,1580268155725,0.23576115,176.45],[412685553,1580268155725,3,176.44596249],[412685552,1580268155725,3.25,176.44],[412685551,1580268155725,5,176.44],[412685550,1580268155725,0.65830078,176.41],[412685549,1580268155725,0.45063807,176.41],[412685548,1580268153825,-0.67604704,176.39],[412685547,1580268145713,2.5883,176.41],[412685543,1580268087513,12.92927,176.33],[412685542,1580268087513,0.40083,176.33],[412685533,1580268005756,-0.17096773,176.32]],1]

0 comments on commit 35646d3

Please sign in to comment.