From 44de487f177e39780d16619b8938b3e9e592176c Mon Sep 17 00:00:00 2001 From: tibrn Date: Sun, 18 Jul 2021 16:35:52 +0300 Subject: [PATCH 01/16] add GetOrderBook --- plugins/binanceExchange_ws.go | 195 ++++++++++++++++++++++++++--- plugins/binanceExchange_ws_test.go | 36 ++++++ 2 files changed, 212 insertions(+), 19 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index 2c4566431..fc72f05c6 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -9,12 +9,14 @@ import ( "time" "github.com/adshao/go-binance/v2" + "github.com/adshao/go-binance/v2/common" "github.com/stellar/kelp/api" "github.com/stellar/kelp/model" ) const ( STREAM_TICKER_FMT = "%s@ticker" + STREAM_BOOK_FMT = "%s@depth" TTLTIME = time.Second * 3 // ttl time in seconds ) @@ -23,9 +25,11 @@ var ( ) var ( - ErrConversionWsMarketEvent = errConversion{from: "interface", to: "*binance.WsMarketStatEvent"} + ErrConversionWsMarketEvent = errConversion{from: "interface", to: "*binance.WsMarketStatEvent"} + ErrConversionWsPartialDepthEvent = errConversion{from: "interface", to: "*binance.WsPartialDepthEvent"} ) +type Subscriber func(symbol string, state *mapEvents) (*stream, error) type errMissingSymbol struct { symbol string } @@ -135,17 +139,21 @@ func makeMapEvents() *mapEvents { //struct used to keep all cached data type events struct { SymbolStats *mapEvents + BookStats *mapEvents } func createStateEvents() *events { events := &events{ SymbolStats: makeMapEvents(), + BookStats: makeMapEvents(), } return events } -// subscribe for symbol@ticker +// 24hr rolling window ticker statistics for a single symbol. These are NOT the statistics of the UTC day, but a 24hr rolling window for the previous 24hrs. +// Stream Name: @ticker +// Update Speed: 1000ms func subcribeTicker(symbol string, state *mapEvents) (*stream, error) { wsMarketStatHandler := func(ticker *binance.WsMarketStatEvent) { @@ -162,6 +170,49 @@ func subcribeTicker(symbol string, state *mapEvents) (*stream, error) { return nil, err } + keepConnection(doneC, func() { + subcribeTicker(symbol, state) + }) + + return &stream{doneC: doneC, stopC: stopC, cleanup: func() { + state.Del(symbol) + }}, err + +} + +//restart Connection with ws// Binance close each connection after 24 hours +func keepConnection(doneC chan struct{}, reconnect func()) { + + go func() { + <-doneC + reconnect() + }() +} + +// Top bids and asks, pushed every second. Valid are 5, 10, or 20. +// @depth@100ms +// 100ms +func subcribeBook(symbol string, state *mapEvents) (*stream, error) { + + wsPartialDepthHandler := func(event *binance.WsPartialDepthEvent) { + state.Set(symbol, event) + } + + errHandler := func(err error) { + log.Printf("Error WsPartialDepthServe for symbol %s: %v\n", symbol, err) + } + + //Subscribe to highest level + doneC, stopC, err := binance.WsPartialDepthServe100Ms(symbol, "20", wsPartialDepthHandler, errHandler) + + if err != nil { + return nil, err + } + + keepConnection(doneC, func() { + subcribeBook(symbol, state) + }) + return &stream{doneC: doneC, stopC: stopC, cleanup: func() { state.Del(symbol) }}, err @@ -209,6 +260,32 @@ func getPrecision(floatStr string) int8 { return int8(len(strs[1])) } +func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe Subscriber) (mapData, error) { + + stream, err := subscribe(symbol, beWs.events.SymbolStats) + + if err != nil { + return mapData{}, fmt.Errorf("error when subscribing for %s: %s", symbol, err) + } + + //Store stream + beWs.streamLock.Lock() + beWs.streams[fmt.Sprintf(format, symbol)] = stream + beWs.streamLock.Unlock() + + //Wait for binance to send events + time.Sleep(timeToWaitForFirstEvent) + + data, isStream := beWs.events.SymbolStats.Get(symbol) + + //We couldn't subscribe for this pair + if !isStream { + return mapData{}, fmt.Errorf("error while fetching ticker price for trading pair %s", symbol) + } + + return data, nil +} + // GetTickerPrice impl. func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[model.TradingPair]api.Ticker, error) { @@ -224,25 +301,10 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo tickerData, isTicker := beWs.events.SymbolStats.Get(symbol) if !isTicker { - stream, err := subcribeTicker(symbol, beWs.events.SymbolStats) + tickerData, err = beWs.subscribeStream(symbol, STREAM_TICKER_FMT, subcribeTicker) if err != nil { - return nil, fmt.Errorf("error when subscribing for %s: %s", symbol, err) - } - - //Store stream - beWs.streamLock.Lock() - beWs.streams[fmt.Sprintf(STREAM_TICKER_FMT, symbol)] = stream - beWs.streamLock.Unlock() - - //Wait for binance to send events - time.Sleep(timeToWaitForFirstEvent) - - tickerData, isTicker = beWs.events.SymbolStats.Get(symbol) - - //We couldn't subscribe for this pair - if !isTicker { - return nil, fmt.Errorf("error while fetching ticker price for trading pair %s", symbol) + return nil, err } } @@ -286,6 +348,101 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo return priceResult, nil } +func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount int32) (*model.OrderBook, error) { + maxCountInt := int(maxCount) + fetchLimit := maxCountInt + + if fetchLimit > 20 { + fetchLimit = 20 + } + + symbol, err := pair.ToString(beWs.assetConverter, beWs.delimiter) + if err != nil { + return nil, fmt.Errorf("error converting pair to string: %s", err) + } + + bookData, isBook := beWs.events.BookStats.Get(symbol) + + if !isBook { + + bookData, err = beWs.subscribeStream(symbol, STREAM_BOOK_FMT, subcribeBook) + + if err != nil { + return nil, err + } + + } + + //Show how old is the ticker + log.Printf("Ticker for %s is %d milliseconds old!\n", symbol, time.Now().Sub(bookData.createdAt).Milliseconds()) + + if isStale(bookData, TTLTIME) { + return nil, fmt.Errorf("ticker for %s symbols is older than %v", symbol, TTLTIME) + } + + bookI := bookData.data + + //Convert to WsMarketStatEvent + book, isOk := bookI.(*binance.WsPartialDepthEvent) + + if !isOk { + return nil, ErrConversionWsPartialDepthEvent + } + + askCcxtOrders := book.Asks + bidCcxtOrders := book.Bids + + if fetchLimit != maxCountInt { + // we may not have fetched all the requested levels because the exchange may not have had that many levels in depth + if len(askCcxtOrders) > maxCountInt { + askCcxtOrders = askCcxtOrders[:maxCountInt] + } + if len(bidCcxtOrders) > maxCountInt { + bidCcxtOrders = bidCcxtOrders[:maxCountInt] + } + } + + asks, err := beWs.readOrders(askCcxtOrders, pair, model.OrderActionSell) + + if err != nil { + return nil, err + } + + bids, err := beWs.readOrders(bidCcxtOrders, pair, model.OrderActionBuy) + + if err != nil { + return nil, err + } + + return model.MakeOrderBook(pair, asks, bids), nil +} + +func (beWs *binanceExchangeWs) readOrders(orders []common.PriceLevel, pair *model.TradingPair, orderAction model.OrderAction) ([]model.Order, error) { + + pricePrecision := getPrecision(orders[0].Price) + volumePrecision := getPrecision(orders[0].Quantity) + + result := []model.Order{} + for _, o := range orders { + + price, quantity, err := o.Parse() + + if err != nil { + return nil, err + } + + result = append(result, model.Order{ + Pair: pair, + OrderAction: orderAction, + OrderType: model.OrderTypeLimit, + Price: model.NumberFromFloat(price, pricePrecision), + Volume: model.NumberFromFloat(quantity, volumePrecision), + Timestamp: nil, + }) + } + return result, nil +} + //Unsubscribe ... unsubscribe from binance streams func (beWs *binanceExchangeWs) Unsubscribe(stream string) { diff --git a/plugins/binanceExchange_ws_test.go b/plugins/binanceExchange_ws_test.go index fa909629b..f6a5cf1ea 100644 --- a/plugins/binanceExchange_ws_test.go +++ b/plugins/binanceExchange_ws_test.go @@ -48,3 +48,39 @@ func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) { return } } + +func Test_binanceExchangeWs_GetOrderBook(t *testing.T) { + if testing.Short() { + return + } + + testBinanceExchangeWs, e := makeBinanceWs() + if !assert.NoError(t, e) { + return + } + + for _, obDepth := range []int32{1, 5, 8, 10, 15, 16, 20} { + + pair := model.TradingPair{Base: model.XLM, Quote: model.BTC} + ob, e := testBinanceExchangeWs.GetOrderBook(&pair, obDepth) + if !assert.NoError(t, e) { + return + } + assert.Equal(t, ob.Pair(), &pair) + + if !assert.True(t, len(ob.Asks()) > 0, len(ob.Asks())) { + return + } + if !assert.True(t, len(ob.Bids()) > 0, len(ob.Bids())) { + return + } + assert.True(t, ob.Asks()[0].OrderAction.IsSell()) + assert.True(t, ob.Asks()[0].OrderType.IsLimit()) + assert.True(t, ob.Bids()[0].OrderAction.IsBuy()) + assert.True(t, ob.Bids()[0].OrderType.IsLimit()) + assert.True(t, ob.Asks()[0].Price.AsFloat() > 0) + assert.True(t, ob.Asks()[0].Volume.AsFloat() > 0) + assert.True(t, ob.Bids()[0].Price.AsFloat() > 0) + assert.True(t, ob.Bids()[0].Volume.AsFloat() > 0) + } +} From 2dfbd2d93c8f8538ce34bca491ab1b8f2079fc95 Mon Sep 17 00:00:00 2001 From: tibrn Date: Sun, 18 Jul 2021 16:37:38 +0300 Subject: [PATCH 02/16] add more comments --- plugins/binanceExchange_ws.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index fc72f05c6..0b4e53e69 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -260,6 +260,7 @@ func getPrecision(floatStr string) int8 { return int8(len(strs[1])) } +//subscribeStream and wait for the first event func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe Subscriber) (mapData, error) { stream, err := subscribe(symbol, beWs.events.SymbolStats) @@ -348,9 +349,13 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo return priceResult, nil } +//GetOrderBook impl func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount int32) (*model.OrderBook, error) { - maxCountInt := int(maxCount) - fetchLimit := maxCountInt + + var ( + maxCountInt = int(maxCount) + fetchLimit = maxCountInt + ) if fetchLimit > 20 { fetchLimit = 20 @@ -417,6 +422,7 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in return model.MakeOrderBook(pair, asks, bids), nil } +//readOrders... transform orders from binance to model.Order func (beWs *binanceExchangeWs) readOrders(orders []common.PriceLevel, pair *model.TradingPair, orderAction model.OrderAction) ([]model.Order, error) { pricePrecision := getPrecision(orders[0].Price) From 1a5cfb3501786c96768fba7e74d23cb202c9fa17 Mon Sep 17 00:00:00 2001 From: tibrn Date: Sun, 18 Jul 2021 16:39:03 +0300 Subject: [PATCH 03/16] move comment on next line --- plugins/binanceExchange_ws.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index 0b4e53e69..e97b5a402 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -180,7 +180,8 @@ func subcribeTicker(symbol string, state *mapEvents) (*stream, error) { } -//restart Connection with ws// Binance close each connection after 24 hours +//restart Connection with ws +// Binance close each connection after 24 hours func keepConnection(doneC chan struct{}, reconnect func()) { go func() { From af75622645f8b1ac807099abd049836c799e4194 Mon Sep 17 00:00:00 2001 From: tibrn Date: Sun, 18 Jul 2021 18:44:28 +0300 Subject: [PATCH 04/16] patch/resolve comments --- plugins/binanceExchange_ws.go | 24 +++++++----------------- plugins/binanceExchange_ws_test.go | 3 --- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index e97b5a402..08b5e9f2b 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -262,9 +262,9 @@ func getPrecision(floatStr string) int8 { } //subscribeStream and wait for the first event -func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe Subscriber) (mapData, error) { +func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe Subscriber, state *mapEvents) (mapData, error) { - stream, err := subscribe(symbol, beWs.events.SymbolStats) + stream, err := subscribe(symbol, state) if err != nil { return mapData{}, fmt.Errorf("error when subscribing for %s: %s", symbol, err) @@ -303,7 +303,7 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo tickerData, isTicker := beWs.events.SymbolStats.Get(symbol) if !isTicker { - tickerData, err = beWs.subscribeStream(symbol, STREAM_TICKER_FMT, subcribeTicker) + tickerData, err = beWs.subscribeStream(symbol, STREAM_TICKER_FMT, subcribeTicker, beWs.events.SymbolStats) if err != nil { return nil, err @@ -359,7 +359,7 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in ) if fetchLimit > 20 { - fetchLimit = 20 + return nil, fmt.Errorf("Max supported depth level is 20") } symbol, err := pair.ToString(beWs.assetConverter, beWs.delimiter) @@ -371,7 +371,7 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in if !isBook { - bookData, err = beWs.subscribeStream(symbol, STREAM_BOOK_FMT, subcribeBook) + bookData, err = beWs.subscribeStream(symbol, STREAM_BOOK_FMT, subcribeBook, beWs.events.BookStats) if err != nil { return nil, err @@ -379,8 +379,8 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in } - //Show how old is the ticker - log.Printf("Ticker for %s is %d milliseconds old!\n", symbol, time.Now().Sub(bookData.createdAt).Milliseconds()) + //Show how old is the orderbook + log.Printf("OrderBook for %s is %d milliseconds old!\n", symbol, time.Now().Sub(bookData.createdAt).Milliseconds()) if isStale(bookData, TTLTIME) { return nil, fmt.Errorf("ticker for %s symbols is older than %v", symbol, TTLTIME) @@ -398,16 +398,6 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in askCcxtOrders := book.Asks bidCcxtOrders := book.Bids - if fetchLimit != maxCountInt { - // we may not have fetched all the requested levels because the exchange may not have had that many levels in depth - if len(askCcxtOrders) > maxCountInt { - askCcxtOrders = askCcxtOrders[:maxCountInt] - } - if len(bidCcxtOrders) > maxCountInt { - bidCcxtOrders = bidCcxtOrders[:maxCountInt] - } - } - asks, err := beWs.readOrders(askCcxtOrders, pair, model.OrderActionSell) if err != nil { diff --git a/plugins/binanceExchange_ws_test.go b/plugins/binanceExchange_ws_test.go index f6a5cf1ea..58e7f5d43 100644 --- a/plugins/binanceExchange_ws_test.go +++ b/plugins/binanceExchange_ws_test.go @@ -50,9 +50,6 @@ func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) { } func Test_binanceExchangeWs_GetOrderBook(t *testing.T) { - if testing.Short() { - return - } testBinanceExchangeWs, e := makeBinanceWs() if !assert.NoError(t, e) { From 9518463729119ef6815063e68dcca5197408734f Mon Sep 17 00:00:00 2001 From: tibrn Date: Sun, 18 Jul 2021 19:16:19 +0300 Subject: [PATCH 05/16] patch/ fixes --- plugins/binanceExchange_ws.go | 13 +++++++++---- plugins/binanceExchange_ws_test.go | 8 ++++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index 08b5e9f2b..8878058bf 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -278,11 +278,11 @@ func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe //Wait for binance to send events time.Sleep(timeToWaitForFirstEvent) - data, isStream := beWs.events.SymbolStats.Get(symbol) + data, isStream := state.Get(symbol) //We couldn't subscribe for this pair if !isStream { - return mapData{}, fmt.Errorf("error while fetching ticker price for trading pair %s", symbol) + return mapData{}, fmt.Errorf("error while subscribing for %s", fmt.Sprintf(format, symbol)) } return data, nil @@ -354,8 +354,7 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount int32) (*model.OrderBook, error) { var ( - maxCountInt = int(maxCount) - fetchLimit = maxCountInt + fetchLimit = int(maxCount) ) if fetchLimit > 20 { @@ -398,6 +397,12 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in askCcxtOrders := book.Asks bidCcxtOrders := book.Bids + if fetchLimit != 20 { + // we may not have fetched all the requested levels because the exchange may not have had that many levels in depth + askCcxtOrders = askCcxtOrders[:fetchLimit] + bidCcxtOrders = bidCcxtOrders[:fetchLimit] + } + asks, err := beWs.readOrders(askCcxtOrders, pair, model.OrderActionSell) if err != nil { diff --git a/plugins/binanceExchange_ws_test.go b/plugins/binanceExchange_ws_test.go index 58e7f5d43..4f6133468 100644 --- a/plugins/binanceExchange_ws_test.go +++ b/plugins/binanceExchange_ws_test.go @@ -71,6 +71,14 @@ func Test_binanceExchangeWs_GetOrderBook(t *testing.T) { if !assert.True(t, len(ob.Bids()) > 0, len(ob.Bids())) { return } + + if !assert.True(t, len(ob.Asks()) <= int(obDepth), fmt.Sprintf("asks should be <= %d", obDepth)) { + return + } + if !assert.True(t, len(ob.Bids()) <= int(obDepth), fmt.Sprintf("bids should be <= %d", obDepth)) { + return + } + assert.True(t, ob.Asks()[0].OrderAction.IsSell()) assert.True(t, ob.Asks()[0].OrderType.IsLimit()) assert.True(t, ob.Bids()[0].OrderAction.IsBuy()) From f4e975a803f479c4b9691ba65ce3831d7e1df581 Mon Sep 17 00:00:00 2001 From: tibrn Date: Sun, 18 Jul 2021 19:33:56 +0300 Subject: [PATCH 06/16] patch/ increase waitTime --- plugins/binanceExchange_ws.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index 8878058bf..5ce79db49 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -21,7 +21,7 @@ const ( ) var ( - timeToWaitForFirstEvent = time.Second + timeToWaitForFirstEvent = time.Second * 2 ) var ( @@ -266,13 +266,15 @@ func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe stream, err := subscribe(symbol, state) + streamName := fmt.Sprintf(format, symbol) + if err != nil { - return mapData{}, fmt.Errorf("error when subscribing for %s: %s", symbol, err) + return mapData{}, fmt.Errorf("error when subscribing for %s: %s", streamName, err) } //Store stream beWs.streamLock.Lock() - beWs.streams[fmt.Sprintf(format, symbol)] = stream + beWs.streams[streamName] = stream beWs.streamLock.Unlock() //Wait for binance to send events @@ -282,7 +284,7 @@ func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe //We couldn't subscribe for this pair if !isStream { - return mapData{}, fmt.Errorf("error while subscribing for %s", fmt.Sprintf(format, symbol)) + return mapData{}, fmt.Errorf("error while subscribing for %s", streamName) } return data, nil From a35faa3632eb6fb28548800bf768da28a32b1aa8 Mon Sep 17 00:00:00 2001 From: tibrn Date: Sun, 18 Jul 2021 19:41:03 +0300 Subject: [PATCH 07/16] add timeout instead of waiting --- plugins/binanceExchange_ws.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index 5ce79db49..5d846c9bf 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -21,7 +21,7 @@ const ( ) var ( - timeToWaitForFirstEvent = time.Second * 2 + timeoutFirstEvent = time.Second * 2 ) var ( @@ -277,10 +277,26 @@ func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe beWs.streams[streamName] = stream beWs.streamLock.Unlock() + timePassed := time.Duration(0) + checkTime := time.Millisecond * 50 + //Wait for binance to send events - time.Sleep(timeToWaitForFirstEvent) - data, isStream := state.Get(symbol) + var ( + data mapData + isStream bool + ) + + for timePassed < timeoutFirstEvent { + time.Sleep(checkTime) + timePassed += checkTime + + data, isStream = state.Get(symbol) + + if isStream { + break + } + } //We couldn't subscribe for this pair if !isStream { From a75195ed4ec4effbf32a5b527a23c6788196dd2b Mon Sep 17 00:00:00 2001 From: tibrn Date: Sun, 18 Jul 2021 19:58:44 +0300 Subject: [PATCH 08/16] patch/ remove timeout --- plugins/binanceExchange_ws.go | 37 ++++++++++++----------------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index 5d846c9bf..6e7fc5254 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -21,7 +21,7 @@ const ( ) var ( - timeoutFirstEvent = time.Second * 2 + timeWaitForFirstEvent = time.Second * 2 ) var ( @@ -277,26 +277,10 @@ func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe beWs.streams[streamName] = stream beWs.streamLock.Unlock() - timePassed := time.Duration(0) - checkTime := time.Millisecond * 50 - //Wait for binance to send events + time.Sleep(timeWaitForFirstEvent) - var ( - data mapData - isStream bool - ) - - for timePassed < timeoutFirstEvent { - time.Sleep(checkTime) - timePassed += checkTime - - data, isStream = state.Get(symbol) - - if isStream { - break - } - } + data, isStream := state.Get(symbol) //We couldn't subscribe for this pair if !isStream { @@ -372,10 +356,10 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount int32) (*model.OrderBook, error) { var ( - fetchLimit = int(maxCount) + fetchSize = int(maxCount) ) - if fetchLimit > 20 { + if fetchSize > 20 { return nil, fmt.Errorf("Max supported depth level is 20") } @@ -415,10 +399,13 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in askCcxtOrders := book.Asks bidCcxtOrders := book.Bids - if fetchLimit != 20 { - // we may not have fetched all the requested levels because the exchange may not have had that many levels in depth - askCcxtOrders = askCcxtOrders[:fetchLimit] - bidCcxtOrders = bidCcxtOrders[:fetchLimit] + if fetchSize < len(book.Asks) { + askCcxtOrders = askCcxtOrders[:fetchSize] + + } + + if fetchSize < len(book.Bids) { + bidCcxtOrders = bidCcxtOrders[:fetchSize] } asks, err := beWs.readOrders(askCcxtOrders, pair, model.OrderActionSell) From 261650242327f1da52a886825e7143fb931fb383 Mon Sep 17 00:00:00 2001 From: tibrn Date: Sun, 18 Jul 2021 20:00:25 +0300 Subject: [PATCH 09/16] patch/ check for len > fetchSize --- plugins/binanceExchange_ws.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index 6e7fc5254..e44a68c59 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -399,12 +399,12 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in askCcxtOrders := book.Asks bidCcxtOrders := book.Bids - if fetchSize < len(book.Asks) { + if len(askCcxtOrders) > fetchSize { askCcxtOrders = askCcxtOrders[:fetchSize] } - if fetchSize < len(book.Bids) { + if len(bidCcxtOrders) > fetchSize { bidCcxtOrders = bidCcxtOrders[:fetchSize] } From df8c006251116c745d0d83c27df7dba973519064 Mon Sep 17 00:00:00 2001 From: tibrn Date: Mon, 19 Jul 2021 20:24:17 +0300 Subject: [PATCH 10/16] add GetTrageHistory + GetLatestTradeCursor --- plugins/binanceExchange_ws.go | 368 ++++++++++++++++++++++++++++- plugins/binanceExchange_ws_test.go | 67 +++++- 2 files changed, 428 insertions(+), 7 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index e44a68c59..23bdedf2e 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -1,8 +1,11 @@ package plugins import ( + "context" + "encoding/json" "fmt" "log" + "sort" "strconv" "strings" "sync" @@ -15,9 +18,12 @@ import ( ) const ( - STREAM_TICKER_FMT = "%s@ticker" - STREAM_BOOK_FMT = "%s@depth" - TTLTIME = time.Second * 3 // ttl time in seconds + STREAM_TICKER_FMT = "%s@ticker" + STREAM_BOOK_FMT = "%s@depth" + STREAM_USER = "@user" + LAST_CURSOR_KEY = "@user||lastCursor" + TTLTIME = time.Second * 3 // ttl time in seconds + EVENT_EXECUTION_REPORT = "executionReport" ) var ( @@ -27,8 +33,82 @@ var ( var ( ErrConversionWsMarketEvent = errConversion{from: "interface", to: "*binance.WsMarketStatEvent"} ErrConversionWsPartialDepthEvent = errConversion{from: "interface", to: "*binance.WsPartialDepthEvent"} + ErrConversionHistory = errConversion{from: "interface", to: "History"} + ErrConversionCursor = errConversion{from: "interface", to: "int64"} ) +type eventBinance struct { + Name string `json:"e"` +} + +// "E": 1499405658658, // Event time +// "s": "ETHBTC", // Symbol +// "c": "mUvoqJxFIILMdfAW5iGSOW", // Client order ID +// "S": "BUY", // Side +// "o": "LIMIT", // Order type +// "f": "GTC", // Time in force +// "q": "1.00000000", // Order quantity +// "p": "0.10264410", // Order price +// "P": "0.00000000", // Stop price +// "F": "0.00000000", // Iceberg quantity +// "g": -1, // OrderListId +// "C": null, // Original client order ID; This is the ID of the order being canceled +// "x": "NEW", // Current execution type +// "X": "NEW", // Current order status +// "r": "NONE", // Order reject reason; will be an error code. +// "i": 4293153, // Order ID +// "l": "0.00000000", // Last executed quantity +// "z": "0.00000000", // Cumulative filled quantity +// "L": "0.00000000", // Last executed price +// "n": "0", // Commission amount +// "N": null, // Commission asset +// "T": 1499405658657, // Transaction time +// "t": -1, // Trade ID +// "I": 8641984, // Ignore +// "w": true, // Is the order on the book? +// "m": false, // Is this trade the maker side? +// "M": false, // Ignore +// "O": 1499405658657, // Order creation time +// "Z": "0.00000000", // Cumulative quote asset transacted quantity +// "Y": "0.00000000", // Last quote asset transacted quantity (i.e. lastPrice * lastQty) +// "Q": "0.00000000" // Quote Order Qty +type eventExecutionReport struct { + eventBinance + EventTime int64 `json:"E"` + Symbol string `json:"s"` + ClientOrderID string `json:"c"` + Side string `json:"S"` + OrderType string `json:"o"` + TimeInForce string `json:"f"` + OrderQuantity string `json:"q"` + OrderPrice string `json:"p"` + StopPrice string `json:"P"` + IceberQuantity string `json:"F"` + OrderListID int64 `json:"g"` + OriginalClientID interface{} `json:"C"` + CurrentExecutionType string `json:"x"` + CurrentOrderStatus string `json:"X"` + OrderRejectReason string `json:"r"` + OrderID int64 `json:"i"` + LastExecutedQuantity string `json:"l"` + CumulativeFillerQuantity string `json:"z"` + LastExecutedPrice string `json:"Z"` + ComissionAmount string `json:"n"` + ComissionAsset interface{} `json:"N"` + TransactionTime int64 `json:"T"` + TradeID int64 `json:"t"` + Ignore int64 `json:"I"` + IsTherOrderOnBook bool `json:"w"` + IsTheTradeMarkerSide bool `json:"m"` + IsIgnore bool `json:"M"` + OrderCreationTime int64 `json:"O"` + CumulativeQuoteAssetQuantity string `json:"Z"` + LastQuoteAssetQuantity string `json:"Y"` + QuoteOrderQuantity string `json:"Q"` +} + +type History []*eventExecutionReport + type Subscriber func(symbol string, state *mapEvents) (*stream, error) type errMissingSymbol struct { symbol string @@ -140,12 +220,14 @@ func makeMapEvents() *mapEvents { type events struct { SymbolStats *mapEvents BookStats *mapEvents + OrderEvents *mapEvents } func createStateEvents() *events { events := &events{ SymbolStats: makeMapEvents(), BookStats: makeMapEvents(), + OrderEvents: makeMapEvents(), } return events @@ -180,6 +262,98 @@ func subcribeTicker(symbol string, state *mapEvents) (*stream, error) { } +func subcribeUserStream(listenKey string, state *mapEvents) (*stream, error) { + + userStreamLock := sync.Mutex{} + + wsUserStreamExecutinReportHandler := func(message []byte) { + + event := &eventExecutionReport{} + err := json.Unmarshal(message, event) + + if err != nil { + log.Printf("Error unmarshal %s to eventExecutionReport\n", string(message)) + return + } + + userStreamLock.Lock() + defer userStreamLock.Unlock() + + history, isHistory := state.Get(event.Symbol) + + if !isHistory { + history.data = make(History, 0) + state.Set(event.Symbol, history) + } + + now := time.Now() + history.createdAt = now + + data, isOk := history.data.(History) + + if !isOk { + log.Printf("Error conversion %v\n", ErrConversionHistory) + return + } + + history.data = append(data, event) + + lastCursor := event.TransactionTime + + lastCursorData, isCursor := state.Get(LAST_CURSOR_KEY) + + if isCursor { + + cursor, isOk := lastCursorData.data.(int64) + + if isOk { + if cursor > lastCursor { + lastCursor = cursor + } + } else { + log.Printf("Error converting cursor %v\n", ErrConversionCursor) + } + } + + state.Set(LAST_CURSOR_KEY, lastCursor) + } + + wsUserStreamHandler := func(message []byte) { + event := &eventBinance{} + err := json.Unmarshal(message, event) + + if err != nil { + log.Printf("Error unmarshal %s to eventBinance\n", string(message)) + return + } + + switch event.Name { + case EVENT_EXECUTION_REPORT: + wsUserStreamExecutinReportHandler(message) + } + + } + + errHandler := func(err error) { + log.Printf("Error WsUserDataServe for listenKey %s: %v\n", listenKey, err) + } + + doneC, stopC, err := binance.WsUserDataServe(listenKey, wsUserStreamHandler, errHandler) + + if err != nil { + return nil, err + } + + keepConnection(doneC, func() { + subcribeUserStream(listenKey, state) + }) + + return &stream{doneC: doneC, stopC: stopC, cleanup: func() { + + }}, err + +} + //restart Connection with ws // Binance close each connection after 24 hours func keepConnection(doneC chan struct{}, reconnect func()) { @@ -220,6 +394,19 @@ func subcribeBook(symbol string, state *mapEvents) (*stream, error) { } +//ListenKey expires every 60 minutes +func keepAliveStreamService(client *binance.Client, key string) { + + time.Sleep(time.Minute * 50) + err := client.NewKeepaliveUserStreamService().ListenKey(key).Do(context.Background()) + + if err != nil { + log.Printf("Error keepAliveStreamService %v\n", err) + } + + go keepAliveStreamService(client, key) +} + type binanceExchangeWs struct { events *events @@ -228,21 +415,46 @@ type binanceExchangeWs struct { assetConverter model.AssetConverterInterface delimiter string + + client *binance.Client + listenKey string } // makeBinanceWs is a factory method to make an binance exchange over ws -func makeBinanceWs() (*binanceExchangeWs, error) { +func makeBinanceWs(keys api.ExchangeAPIKey) (*binanceExchangeWs, error) { binance.WebsocketKeepalive = true events := createStateEvents() + binanceClient := binance.NewClient(keys.Key, keys.Secret) + + listenKey, err := binanceClient.NewStartUserStreamService().Do(context.Background()) + + if err != nil { + return nil, err + } + + keepAliveStreamService(binanceClient, listenKey) + + streamUser, err := subcribeUserStream(listenKey, events.OrderEvents) + + if err != nil { + return nil, err + } + + streams := make(map[string]*stream) + + streams[STREAM_USER] = streamUser + beWs := &binanceExchangeWs{ events: events, delimiter: "", assetConverter: model.CcxtAssetConverter, streamLock: &sync.Mutex{}, - streams: make(map[string]*stream), + streams: streams, + client: binanceClient, + listenKey: listenKey, } return beWs, nil @@ -450,6 +662,152 @@ func (beWs *binanceExchangeWs) readOrders(orders []common.PriceLevel, pair *mode return result, nil } +// GetTradeHistory impl +func (beWs *binanceExchangeWs) GetTradeHistory(pair model.TradingPair, maybeCursorStart interface{}, maybeCursorEnd interface{}) (*api.TradeHistoryResult, error) { + symbol, err := pair.ToString(beWs.assetConverter, beWs.delimiter) + if err != nil { + return nil, fmt.Errorf("error converting symbol to string: %s", err) + } + + data, isOrders := beWs.events.OrderEvents.Get(symbol) + + if !isOrders { + return nil, fmt.Errorf("no trade history for trading pair '%s'", symbol) + } + + history, isOk := data.data.(History) + + if !isOk { + return nil, ErrConversionHistory + } + + trades := []model.Trade{} + for _, raw := range history { + var t *model.Trade + t, err = beWs.readTrade(&pair, symbol, raw) + if err != nil { + return nil, fmt.Errorf("error while reading trade: %s", err) + } + + t.OrderID = fmt.Sprintf("%d", raw.OrderID) + + trades = append(trades, *t) + } + + sort.Sort(model.TradesByTsID(trades)) + cursor := maybeCursorStart + if len(trades) > 0 { + cursor, err = beWs.getCursor(trades) + if err != nil { + return nil, fmt.Errorf("error getting cursor when fetching trades: %s", err) + } + } + + return &api.TradeHistoryResult{ + Cursor: cursor, + Trades: trades, + }, nil +} + +func (beWs *binanceExchangeWs) getCursor(trades []model.Trade) (interface{}, error) { + lastTrade := trades[len(trades)-1] + + lastCursor := lastTrade.Order.Timestamp.AsInt64() + // add 1 to lastCursor so we don't repeat the same cursor on the next run + fetchedCursor := strconv.FormatInt(lastCursor+1, 10) + + // update cursor accordingly + return fetchedCursor, nil +} + +// GetLatestTradeCursor impl. +func (beWs *binanceExchangeWs) GetLatestTradeCursor() (interface{}, error) { + + lastTradeCursor, isCursor := beWs.events.OrderEvents.Get(LAST_CURSOR_KEY) + + if !isCursor { + timeNowMillis := time.Now().UnixNano() / int64(time.Millisecond) + return fmt.Sprintf("%d", timeNowMillis), nil + } + + cursor, isOk := lastTradeCursor.data.(int64) + + if !isOk { + return nil, ErrConversionCursor + } + + return fmt.Sprintf("%d", cursor), nil +} + +func (beWs *binanceExchangeWs) readTrade(pair *model.TradingPair, symbol string, rawTrade *eventExecutionReport) (*model.Trade, error) { + if rawTrade.Symbol != symbol { + return nil, fmt.Errorf("expected '%s' for 'symbol' field, got: %s", symbol, rawTrade.Symbol) + } + + pricePrecision := getPrecision(rawTrade.OrderPrice) + volumePrecision := getPrecision(rawTrade.OrderQuantity) + // use bigger precision for fee and cost since they are logically derived from amount and price + feecCostPrecision := pricePrecision + if volumePrecision > pricePrecision { + feecCostPrecision = volumePrecision + } + + orderPrice, err := strconv.ParseFloat(rawTrade.OrderPrice, 64) + if err != nil { + return nil, err + } + + orderQuantity, err := strconv.ParseFloat(rawTrade.OrderQuantity, 64) + if err != nil { + return nil, err + } + + comissionAmount, err := strconv.ParseFloat(rawTrade.ComissionAmount, 64) + if err != nil { + return nil, err + } + + trade := model.Trade{ + Order: model.Order{ + Pair: pair, + Price: model.NumberFromFloat(orderPrice, pricePrecision), + Volume: model.NumberFromFloat(orderQuantity, volumePrecision), + OrderType: model.OrderTypeLimit, + Timestamp: model.MakeTimestamp(rawTrade.TransactionTime), + }, + TransactionID: model.MakeTransactionID(strconv.FormatInt(rawTrade.OrderID, 10)), + Cost: model.NumberFromFloat(comissionAmount, feecCostPrecision), + // OrderID read by calling function depending on override set for exchange params in "orderId" field of Info object + } + + useSignToDenoteSide := false + + if rawTrade.Side == "sell" { + trade.OrderAction = model.OrderActionSell + } else if rawTrade.Side == "buy" { + trade.OrderAction = model.OrderActionBuy + } else if useSignToDenoteSide { + if trade.Cost.AsFloat() < 0 { + trade.OrderAction = model.OrderActionSell + trade.Order.Volume = trade.Order.Volume.Scale(-1.0) + trade.Cost = trade.Cost.Scale(-1.0) + } else { + trade.OrderAction = model.OrderActionBuy + } + } else { + return nil, fmt.Errorf("unrecognized value for 'side' field: %s (rawTrade = %+v)", rawTrade.Side, rawTrade) + } + + if trade.Cost.AsFloat() < 0 { + return nil, fmt.Errorf("trade.Cost was < 0 (%f)", trade.Cost.AsFloat()) + } + if trade.Order.Volume.AsFloat() < 0 { + return nil, fmt.Errorf("trade.Order.Volume was < 0 (%f)", trade.Order.Volume.AsFloat()) + } + + return &trade, nil +} + //Unsubscribe ... unsubscribe from binance streams func (beWs *binanceExchangeWs) Unsubscribe(stream string) { diff --git a/plugins/binanceExchange_ws_test.go b/plugins/binanceExchange_ws_test.go index 4f6133468..e59ea8f40 100644 --- a/plugins/binanceExchange_ws_test.go +++ b/plugins/binanceExchange_ws_test.go @@ -2,12 +2,17 @@ package plugins import ( "fmt" + "strconv" "testing" + "time" + "github.com/stellar/kelp/api" "github.com/stellar/kelp/model" "github.com/stretchr/testify/assert" ) +var emptyAPIKeyBinance = api.ExchangeAPIKey{} + func Test_createStateEvents(t *testing.T) { events := createStateEvents() @@ -19,7 +24,7 @@ func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) { pair := model.TradingPair{Base: model.XLM, Quote: model.BTC} pairs := []model.TradingPair{pair} - testBinanceExchangeWs, err := makeBinanceWs() + testBinanceExchangeWs, err := makeBinanceWs(emptyAPIKeyBinance) if !assert.NoError(t, err) { return @@ -51,7 +56,7 @@ func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) { func Test_binanceExchangeWs_GetOrderBook(t *testing.T) { - testBinanceExchangeWs, e := makeBinanceWs() + testBinanceExchangeWs, e := makeBinanceWs(emptyAPIKeyBinance) if !assert.NoError(t, e) { return } @@ -89,3 +94,61 @@ func Test_binanceExchangeWs_GetOrderBook(t *testing.T) { assert.True(t, ob.Bids()[0].Volume.AsFloat() > 0) } } + +func Test_binanceExchangeWs_GetLatestTradeCursor(t *testing.T) { + startIntervalSecs := time.Now().Unix() + + testBinanceExchangeWs, err := makeBinanceWs(emptyAPIKeyBinance) + + if !assert.NoError(t, err) { + return + } + + cursor, e := testBinanceExchangeWs.GetLatestTradeCursor() + if !assert.NoError(t, e) { + return + } + endIntervalSecs := time.Now().Unix() + + if !assert.IsType(t, "string", cursor) { + return + } + + cursorString := cursor.(string) + cursorInt, e := strconv.ParseInt(cursorString, 10, 64) + if !assert.NoError(t, e) { + return + } + + if !assert.True(t, startIntervalSecs <= cursorInt, fmt.Sprintf("returned cursor (%d) should be gte the start time of the function call in millis (%d)", cursorInt, startIntervalSecs)) { + return + } + if !assert.True(t, endIntervalSecs >= cursorInt, fmt.Sprintf("returned cursor (%d) should be lte the end time of the function call in millis (%d)", cursorInt, endIntervalSecs)) { + return + } +} + +func Test_binanceExchangeWs_GetTradeHistory(t *testing.T) { + + testBinanceExchangeWs, err := makeBinanceWs(emptyAPIKeyBinance) + + if !assert.NoError(t, err) { + return + } + + pair := model.TradingPair{Base: model.XLM, Quote: model.BTC} + tradeHistoryResult, e := testBinanceExchangeWs.GetTradeHistory(pair, nil, nil) + if !assert.NoError(t, e) { + return + } + + if !assert.True(t, len(tradeHistoryResult.Trades) >= 0) { + return + } + + if !assert.NotNil(t, tradeHistoryResult.Cursor) { + return + } + + assert.Fail(t, "force fail") +} From 0c41ff34345b93eaccc0be403de649fe2f8ef8ac Mon Sep 17 00:00:00 2001 From: tibrn Date: Mon, 19 Jul 2021 20:31:39 +0300 Subject: [PATCH 11/16] avoid testing when short flag is set --- plugins/binanceExchange_ws.go | 1 + plugins/binanceExchange_ws_test.go | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index 23bdedf2e..f7cdd1bf4 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -664,6 +664,7 @@ func (beWs *binanceExchangeWs) readOrders(orders []common.PriceLevel, pair *mode // GetTradeHistory impl func (beWs *binanceExchangeWs) GetTradeHistory(pair model.TradingPair, maybeCursorStart interface{}, maybeCursorEnd interface{}) (*api.TradeHistoryResult, error) { + symbol, err := pair.ToString(beWs.assetConverter, beWs.delimiter) if err != nil { return nil, fmt.Errorf("error converting symbol to string: %s", err) diff --git a/plugins/binanceExchange_ws_test.go b/plugins/binanceExchange_ws_test.go index e59ea8f40..c61937281 100644 --- a/plugins/binanceExchange_ws_test.go +++ b/plugins/binanceExchange_ws_test.go @@ -21,6 +21,11 @@ func Test_createStateEvents(t *testing.T) { } func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) { + + if testing.Short() { + return + } + pair := model.TradingPair{Base: model.XLM, Quote: model.BTC} pairs := []model.TradingPair{pair} @@ -56,6 +61,10 @@ func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) { func Test_binanceExchangeWs_GetOrderBook(t *testing.T) { + if testing.Short() { + return + } + testBinanceExchangeWs, e := makeBinanceWs(emptyAPIKeyBinance) if !assert.NoError(t, e) { return @@ -96,6 +105,11 @@ func Test_binanceExchangeWs_GetOrderBook(t *testing.T) { } func Test_binanceExchangeWs_GetLatestTradeCursor(t *testing.T) { + + if testing.Short() { + return + } + startIntervalSecs := time.Now().Unix() testBinanceExchangeWs, err := makeBinanceWs(emptyAPIKeyBinance) @@ -130,6 +144,10 @@ func Test_binanceExchangeWs_GetLatestTradeCursor(t *testing.T) { func Test_binanceExchangeWs_GetTradeHistory(t *testing.T) { + if testing.Short() { + return + } + testBinanceExchangeWs, err := makeBinanceWs(emptyAPIKeyBinance) if !assert.NoError(t, err) { From 17a422b9f1254f06570d1392e74f9e892fe9e633 Mon Sep 17 00:00:00 2001 From: tibrn Date: Sun, 25 Jul 2021 21:04:04 +0300 Subject: [PATCH 12/16] refactor --- plugins/binanceExchange_ws.go | 217 ++++++++++++----------------- plugins/binanceExchange_ws_test.go | 24 ++-- support/sdk/binance_ws.go | 40 ++++++ 3 files changed, 146 insertions(+), 135 deletions(-) create mode 100644 support/sdk/binance_ws.go diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index f7cdd1bf4..7923ba82d 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -3,6 +3,7 @@ package plugins import ( "context" "encoding/json" + "errors" "fmt" "log" "sort" @@ -15,13 +16,17 @@ import ( "github.com/adshao/go-binance/v2/common" "github.com/stellar/kelp/api" "github.com/stellar/kelp/model" + "github.com/stellar/kelp/support/sdk" ) const ( - STREAM_TICKER_FMT = "%s@ticker" - STREAM_BOOK_FMT = "%s@depth" - STREAM_USER = "@user" - LAST_CURSOR_KEY = "@user||lastCursor" + STREAM_TICKER_FMT = "%s@ticker" + STREAM_BOOK_FMT = "%s@depth" + + //not from binance docs, just for convetion we use @streamName + STREAM_USER = "@user" + // key used to save last cursor in events, it must be something that couldn't be used in the map + LAST_CURSOR_KEY = STREAM_USER + "||lastCursor" TTLTIME = time.Second * 3 // ttl time in seconds EVENT_EXECUTION_REPORT = "executionReport" ) @@ -37,77 +42,7 @@ var ( ErrConversionCursor = errConversion{from: "interface", to: "int64"} ) -type eventBinance struct { - Name string `json:"e"` -} - -// "E": 1499405658658, // Event time -// "s": "ETHBTC", // Symbol -// "c": "mUvoqJxFIILMdfAW5iGSOW", // Client order ID -// "S": "BUY", // Side -// "o": "LIMIT", // Order type -// "f": "GTC", // Time in force -// "q": "1.00000000", // Order quantity -// "p": "0.10264410", // Order price -// "P": "0.00000000", // Stop price -// "F": "0.00000000", // Iceberg quantity -// "g": -1, // OrderListId -// "C": null, // Original client order ID; This is the ID of the order being canceled -// "x": "NEW", // Current execution type -// "X": "NEW", // Current order status -// "r": "NONE", // Order reject reason; will be an error code. -// "i": 4293153, // Order ID -// "l": "0.00000000", // Last executed quantity -// "z": "0.00000000", // Cumulative filled quantity -// "L": "0.00000000", // Last executed price -// "n": "0", // Commission amount -// "N": null, // Commission asset -// "T": 1499405658657, // Transaction time -// "t": -1, // Trade ID -// "I": 8641984, // Ignore -// "w": true, // Is the order on the book? -// "m": false, // Is this trade the maker side? -// "M": false, // Ignore -// "O": 1499405658657, // Order creation time -// "Z": "0.00000000", // Cumulative quote asset transacted quantity -// "Y": "0.00000000", // Last quote asset transacted quantity (i.e. lastPrice * lastQty) -// "Q": "0.00000000" // Quote Order Qty -type eventExecutionReport struct { - eventBinance - EventTime int64 `json:"E"` - Symbol string `json:"s"` - ClientOrderID string `json:"c"` - Side string `json:"S"` - OrderType string `json:"o"` - TimeInForce string `json:"f"` - OrderQuantity string `json:"q"` - OrderPrice string `json:"p"` - StopPrice string `json:"P"` - IceberQuantity string `json:"F"` - OrderListID int64 `json:"g"` - OriginalClientID interface{} `json:"C"` - CurrentExecutionType string `json:"x"` - CurrentOrderStatus string `json:"X"` - OrderRejectReason string `json:"r"` - OrderID int64 `json:"i"` - LastExecutedQuantity string `json:"l"` - CumulativeFillerQuantity string `json:"z"` - LastExecutedPrice string `json:"Z"` - ComissionAmount string `json:"n"` - ComissionAsset interface{} `json:"N"` - TransactionTime int64 `json:"T"` - TradeID int64 `json:"t"` - Ignore int64 `json:"I"` - IsTherOrderOnBook bool `json:"w"` - IsTheTradeMarkerSide bool `json:"m"` - IsIgnore bool `json:"M"` - OrderCreationTime int64 `json:"O"` - CumulativeQuoteAssetQuantity string `json:"Z"` - LastQuoteAssetQuantity string `json:"Y"` - QuoteOrderQuantity string `json:"Q"` -} - -type History []*eventExecutionReport +type History []*sdk.EventExecutionReport type Subscriber func(symbol string, state *mapEvents) (*stream, error) type errMissingSymbol struct { @@ -218,16 +153,16 @@ func makeMapEvents() *mapEvents { //struct used to keep all cached data type events struct { - SymbolStats *mapEvents - BookStats *mapEvents - OrderEvents *mapEvents + SymbolStats *mapEvents + BookStats *mapEvents + TradeHistoryEvents *mapEvents } func createStateEvents() *events { events := &events{ - SymbolStats: makeMapEvents(), - BookStats: makeMapEvents(), - OrderEvents: makeMapEvents(), + SymbolStats: makeMapEvents(), + BookStats: makeMapEvents(), + TradeHistoryEvents: makeMapEvents(), } return events @@ -268,7 +203,7 @@ func subcribeUserStream(listenKey string, state *mapEvents) (*stream, error) { wsUserStreamExecutinReportHandler := func(message []byte) { - event := &eventExecutionReport{} + event := &sdk.EventExecutionReport{} err := json.Unmarshal(message, event) if err != nil { @@ -319,7 +254,7 @@ func subcribeUserStream(listenKey string, state *mapEvents) (*stream, error) { } wsUserStreamHandler := func(message []byte) { - event := &eventBinance{} + event := &sdk.EventBinance{} err := json.Unmarshal(message, event) if err != nil { @@ -397,14 +332,15 @@ func subcribeBook(symbol string, state *mapEvents) (*stream, error) { //ListenKey expires every 60 minutes func keepAliveStreamService(client *binance.Client, key string) { - time.Sleep(time.Minute * 50) - err := client.NewKeepaliveUserStreamService().ListenKey(key).Do(context.Background()) + for { + time.Sleep(time.Minute * 50) + err := client.NewKeepaliveUserStreamService().ListenKey(key).Do(context.Background()) - if err != nil { - log.Printf("Error keepAliveStreamService %v\n", err) + if err != nil { + log.Printf("Error keepAliveStreamService %v\n", err) + panic(err) + } } - - go keepAliveStreamService(client, key) } type binanceExchangeWs struct { @@ -418,6 +354,8 @@ type binanceExchangeWs struct { client *binance.Client listenKey string + + keys api.ExchangeAPIKey } // makeBinanceWs is a factory method to make an binance exchange over ws @@ -427,37 +365,60 @@ func makeBinanceWs(keys api.ExchangeAPIKey) (*binanceExchangeWs, error) { events := createStateEvents() - binanceClient := binance.NewClient(keys.Key, keys.Secret) + streams := make(map[string]*stream) - listenKey, err := binanceClient.NewStartUserStreamService().Do(context.Background()) + beWs := &binanceExchangeWs{ + events: events, + delimiter: "", + assetConverter: model.CcxtAssetConverter, + streamLock: &sync.Mutex{}, + streams: streams, + keys: keys, + } - if err != nil { - return nil, err + return beWs, nil +} + +func (beWs *binanceExchangeWs) isSubscribedUserStream() bool { + + _, isStream := beWs.streams[STREAM_USER] + + return isStream +} + +func (beWs *binanceExchangeWs) subscribeUserStream() error { + + beWs.streamLock.Lock() + defer beWs.streamLock.Unlock() + + if beWs.isSubscribedUserStream() { + return nil } - keepAliveStreamService(binanceClient, listenKey) + binanceClient := binance.NewClient(beWs.keys.Key, beWs.keys.Secret) - streamUser, err := subcribeUserStream(listenKey, events.OrderEvents) + listenKey, err := binanceClient.NewStartUserStreamService().Do(context.Background()) if err != nil { - return nil, err + return fmt.Errorf("error when creating listenKey: %s", err) } - streams := make(map[string]*stream) + go keepAliveStreamService(binanceClient, listenKey) - streams[STREAM_USER] = streamUser + streamUser, err := subcribeUserStream(listenKey, beWs.events.TradeHistoryEvents) - beWs := &binanceExchangeWs{ - events: events, - delimiter: "", - assetConverter: model.CcxtAssetConverter, - streamLock: &sync.Mutex{}, - streams: streams, - client: binanceClient, - listenKey: listenKey, + if err != nil { + return fmt.Errorf("error when subscribing to user stream: %s", err) } - return beWs, nil + beWs.streams[STREAM_USER] = streamUser + beWs.client = binanceClient + beWs.listenKey = listenKey + + //Wait for first + time.Sleep(timeWaitForFirstEvent) + + return nil } //getPrceision... get precision for float string @@ -520,7 +481,7 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo tickerData, err = beWs.subscribeStream(symbol, STREAM_TICKER_FMT, subcribeTicker, beWs.events.SymbolStats) if err != nil { - return nil, err + return nil, fmt.Errorf("error when subscribing to stream %s:%s", fmt.Sprintf(STREAM_TICKER_FMT, symbol), err) } } @@ -623,13 +584,13 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in asks, err := beWs.readOrders(askCcxtOrders, pair, model.OrderActionSell) if err != nil { - return nil, err + return nil, fmt.Errorf("error when reading ask orders:%s", err) } bids, err := beWs.readOrders(bidCcxtOrders, pair, model.OrderActionBuy) if err != nil { - return nil, err + return nil, fmt.Errorf("error when reading bid orders:%s", err) } return model.MakeOrderBook(pair, asks, bids), nil @@ -665,12 +626,18 @@ func (beWs *binanceExchangeWs) readOrders(orders []common.PriceLevel, pair *mode // GetTradeHistory impl func (beWs *binanceExchangeWs) GetTradeHistory(pair model.TradingPair, maybeCursorStart interface{}, maybeCursorEnd interface{}) (*api.TradeHistoryResult, error) { + if !beWs.isSubscribedUserStream() { + if err := beWs.subscribeUserStream(); err != nil { + return nil, fmt.Errorf("error subscribing to user stream: %s", err) + } + } + symbol, err := pair.ToString(beWs.assetConverter, beWs.delimiter) if err != nil { return nil, fmt.Errorf("error converting symbol to string: %s", err) } - data, isOrders := beWs.events.OrderEvents.Get(symbol) + data, isOrders := beWs.events.TradeHistoryEvents.Get(symbol) if !isOrders { return nil, fmt.Errorf("no trade history for trading pair '%s'", symbol) @@ -684,8 +651,8 @@ func (beWs *binanceExchangeWs) GetTradeHistory(pair model.TradingPair, maybeCurs trades := []model.Trade{} for _, raw := range history { - var t *model.Trade - t, err = beWs.readTrade(&pair, symbol, raw) + + t, err := beWs.readTrade(&pair, symbol, raw) if err != nil { return nil, fmt.Errorf("error while reading trade: %s", err) } @@ -697,6 +664,7 @@ func (beWs *binanceExchangeWs) GetTradeHistory(pair model.TradingPair, maybeCurs sort.Sort(model.TradesByTsID(trades)) cursor := maybeCursorStart + if len(trades) > 0 { cursor, err = beWs.getCursor(trades) if err != nil { @@ -724,11 +692,16 @@ func (beWs *binanceExchangeWs) getCursor(trades []model.Trade) (interface{}, err // GetLatestTradeCursor impl. func (beWs *binanceExchangeWs) GetLatestTradeCursor() (interface{}, error) { - lastTradeCursor, isCursor := beWs.events.OrderEvents.Get(LAST_CURSOR_KEY) + if !beWs.isSubscribedUserStream() { + if err := beWs.subscribeUserStream(); err != nil { + return nil, fmt.Errorf("error subscribing to user stream: %s", err) + } + } + + lastTradeCursor, isCursor := beWs.events.TradeHistoryEvents.Get(LAST_CURSOR_KEY) if !isCursor { - timeNowMillis := time.Now().UnixNano() / int64(time.Millisecond) - return fmt.Sprintf("%d", timeNowMillis), nil + return nil, errors.New("Missing cursor") } cursor, isOk := lastTradeCursor.data.(int64) @@ -740,7 +713,7 @@ func (beWs *binanceExchangeWs) GetLatestTradeCursor() (interface{}, error) { return fmt.Sprintf("%d", cursor), nil } -func (beWs *binanceExchangeWs) readTrade(pair *model.TradingPair, symbol string, rawTrade *eventExecutionReport) (*model.Trade, error) { +func (beWs *binanceExchangeWs) readTrade(pair *model.TradingPair, symbol string, rawTrade *sdk.EventExecutionReport) (*model.Trade, error) { if rawTrade.Symbol != symbol { return nil, fmt.Errorf("expected '%s' for 'symbol' field, got: %s", symbol, rawTrade.Symbol) } @@ -781,20 +754,10 @@ func (beWs *binanceExchangeWs) readTrade(pair *model.TradingPair, symbol string, // OrderID read by calling function depending on override set for exchange params in "orderId" field of Info object } - useSignToDenoteSide := false - if rawTrade.Side == "sell" { trade.OrderAction = model.OrderActionSell } else if rawTrade.Side == "buy" { trade.OrderAction = model.OrderActionBuy - } else if useSignToDenoteSide { - if trade.Cost.AsFloat() < 0 { - trade.OrderAction = model.OrderActionSell - trade.Order.Volume = trade.Order.Volume.Scale(-1.0) - trade.Cost = trade.Cost.Scale(-1.0) - } else { - trade.OrderAction = model.OrderActionBuy - } } else { return nil, fmt.Errorf("unrecognized value for 'side' field: %s (rawTrade = %+v)", rawTrade.Side, rawTrade) } diff --git a/plugins/binanceExchange_ws_test.go b/plugins/binanceExchange_ws_test.go index c61937281..facfc2ef1 100644 --- a/plugins/binanceExchange_ws_test.go +++ b/plugins/binanceExchange_ws_test.go @@ -22,10 +22,6 @@ func Test_createStateEvents(t *testing.T) { func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) { - if testing.Short() { - return - } - pair := model.TradingPair{Base: model.XLM, Quote: model.BTC} pairs := []model.TradingPair{pair} @@ -61,10 +57,6 @@ func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) { func Test_binanceExchangeWs_GetOrderBook(t *testing.T) { - if testing.Short() { - return - } - testBinanceExchangeWs, e := makeBinanceWs(emptyAPIKeyBinance) if !assert.NoError(t, e) { return @@ -119,6 +111,22 @@ func Test_binanceExchangeWs_GetLatestTradeCursor(t *testing.T) { } cursor, e := testBinanceExchangeWs.GetLatestTradeCursor() + + if !assert.Error(t, e) { + return + } + + testBinanceExchangeWs.events.TradeHistoryEvents.Set(LAST_CURSOR_KEY, "fail") + + cursor, e = testBinanceExchangeWs.GetLatestTradeCursor() + + if !assert.Error(t, e) { + return + } + + testBinanceExchangeWs.events.TradeHistoryEvents.Set(LAST_CURSOR_KEY, time.Now().Unix()) + + cursor, e = testBinanceExchangeWs.GetLatestTradeCursor() if !assert.NoError(t, e) { return } diff --git a/support/sdk/binance_ws.go b/support/sdk/binance_ws.go new file mode 100644 index 000000000..fde672b21 --- /dev/null +++ b/support/sdk/binance_ws.go @@ -0,0 +1,40 @@ +package sdk + +type EventBinance struct { + Name string `json:"e"` +} + +type EventExecutionReport struct { + EventBinance + EventTime int64 `json:"E"` // "E": 1499405658658, //Event time + Symbol string `json:"s"` // "s": "ETHBTC", //Symbol + ClientOrderID string `json:"c"` // "c": "mUvoqJxFIILMdfAW5iGSOW", //Client order ID + Side string `json:"S"` // "S": "BUY", // Side + OrderType string `json:"o"` // "o": "LIMIT", // Order type + TimeInForce string `json:"f"` // "f": "GTC", // Time in force + OrderQuantity string `json:"q"` // "q": "1.00000000", // Order quantity + OrderPrice string `json:"p"` // "p": "0.10264410", // Order price + StopPrice string `json:"P"` // "P": "0.00000000", // Stop price + IceberQuantity string `json:"F"` // "F": "0.00000000", // Iceberg quantity + OrderListID int64 `json:"g"` // "g": -1, // OrderListId + OriginalClientID interface{} `json:"C"` // "C": null, // Original client order ID; This is the ID of the order being canceled + CurrentExecutionType string `json:"x"` // "x": "NEW", // Current execution type + CurrentOrderStatus string `json:"X"` // "X": "NEW", // Current order status + OrderRejectReason string `json:"r"` // "r": "NONE", // Order reject reason; will be an error code. + OrderID int64 `json:"i"` // "i": 4293153, // Order ID + LastExecutedQuantity string `json:"l"` // "l": "0.00000000", // Last executed quantity + CumulativeFillerQuantity string `json:"z"` // "z": "0.00000000", // Cumulative filled quantity + LastExecutedPrice string `json:"L"` // "L": "0.00000000", // Last executed price + ComissionAmount string `json:"n"` // "n": "0", // Commission amount + ComissionAsset interface{} `json:"N"` // "N": null, // Commission asset + TransactionTime int64 `json:"T"` // "T": 1499405658657, // Transaction time + TradeID int64 `json:"t"` // "t": -1, // Trade ID + Ignore int64 `json:"I"` // "I": 8641984, // Ignore + IsTherOrderOnBook bool `json:"w"` // "w": true, // Is the order on the book? + IsTheTradeMarkerSide bool `json:"m"` // "m": false, // Is this trade the maker side? + IsIgnore bool `json:"M"` // "M": false, // Ignore + OrderCreationTime int64 `json:"O"` // "O": 1499405658657, // Order creation time + CumulativeQuoteAssetQuantity string `json:"Z"` // "Z": "0.00000000", // Cumulative quote asset transacted quantity + LastQuoteAssetQuantity string `json:"Y"` // "Y": "0.00000000", // Last quote asset transacted quantity (i.e. lastPrice * lastQty) + QuoteOrderQuantity string `json:"Q"` // "Q": "0.00000000" // Quote Order Qty +} From 65443c69e5cedb595ec6ac41ddfd79b576cbce46 Mon Sep 17 00:00:00 2001 From: tibrn Date: Sun, 25 Jul 2021 21:07:22 +0300 Subject: [PATCH 13/16] wrap error for creating WsUserDataService --- plugins/binanceExchange_ws.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index 7923ba82d..b49bcaf17 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -276,16 +276,14 @@ func subcribeUserStream(listenKey string, state *mapEvents) (*stream, error) { doneC, stopC, err := binance.WsUserDataServe(listenKey, wsUserStreamHandler, errHandler) if err != nil { - return nil, err + return nil, fmt.Errorf("error creating WsUserDataService:%s", err) } keepConnection(doneC, func() { subcribeUserStream(listenKey, state) }) - return &stream{doneC: doneC, stopC: stopC, cleanup: func() { - - }}, err + return &stream{doneC: doneC, stopC: stopC}, err } From 91837ec9ae178f4a55539afc2066daf540606569 Mon Sep 17 00:00:00 2001 From: tibrn Date: Mon, 26 Jul 2021 21:21:40 +0300 Subject: [PATCH 14/16] handle errors + refactor --- plugins/binanceExchange_ws.go | 67 +++++++++++++++++++++--------- plugins/binanceExchange_ws_test.go | 13 +++++- 2 files changed, 58 insertions(+), 22 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index b49bcaf17..a3cd26951 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -94,6 +94,7 @@ func (s stream) Close() { //mapData... struct used to data from events and timestamp when they are cached type mapData struct { data interface{} + err error createdAt time.Time } @@ -110,7 +111,7 @@ type mapEvents struct { } //Set ... set value -func (m *mapEvents) Set(key string, data interface{}) { +func (m *mapEvents) Set(key string, data interface{}, err error) { now := time.Now() @@ -120,6 +121,7 @@ func (m *mapEvents) Set(key string, data interface{}) { m.data[key] = mapData{ data: data, createdAt: now, + err: err, } } @@ -174,7 +176,7 @@ func createStateEvents() *events { func subcribeTicker(symbol string, state *mapEvents) (*stream, error) { wsMarketStatHandler := func(ticker *binance.WsMarketStatEvent) { - state.Set(symbol, ticker) + state.Set(symbol, ticker, nil) } errHandler := func(err error) { @@ -218,7 +220,7 @@ func subcribeUserStream(listenKey string, state *mapEvents) (*stream, error) { if !isHistory { history.data = make(History, 0) - state.Set(event.Symbol, history) + state.Set(event.Symbol, history, nil) } now := time.Now() @@ -228,6 +230,7 @@ func subcribeUserStream(listenKey string, state *mapEvents) (*stream, error) { if !isOk { log.Printf("Error conversion %v\n", ErrConversionHistory) + state.Set(event.Symbol, history, ErrConversionHistory) return } @@ -247,10 +250,11 @@ func subcribeUserStream(listenKey string, state *mapEvents) (*stream, error) { } } else { log.Printf("Error converting cursor %v\n", ErrConversionCursor) + err = ErrConversionCursor } } - state.Set(LAST_CURSOR_KEY, lastCursor) + state.Set(LAST_CURSOR_KEY, lastCursor, err) } wsUserStreamHandler := func(message []byte) { @@ -303,7 +307,7 @@ func keepConnection(doneC chan struct{}, reconnect func()) { func subcribeBook(symbol string, state *mapEvents) (*stream, error) { wsPartialDepthHandler := func(event *binance.WsPartialDepthEvent) { - state.Set(symbol, event) + state.Set(symbol, event, nil) } errHandler := func(err error) { @@ -327,20 +331,6 @@ func subcribeBook(symbol string, state *mapEvents) (*stream, error) { } -//ListenKey expires every 60 minutes -func keepAliveStreamService(client *binance.Client, key string) { - - for { - time.Sleep(time.Minute * 50) - err := client.NewKeepaliveUserStreamService().ListenKey(key).Do(context.Background()) - - if err != nil { - log.Printf("Error keepAliveStreamService %v\n", err) - panic(err) - } - } -} - type binanceExchangeWs struct { events *events @@ -354,6 +344,8 @@ type binanceExchangeWs struct { listenKey string keys api.ExchangeAPIKey + + errUserStream error } // makeBinanceWs is a factory method to make an binance exchange over ws @@ -377,6 +369,21 @@ func makeBinanceWs(keys api.ExchangeAPIKey) (*binanceExchangeWs, error) { return beWs, nil } +//ListenKey expires every 60 minutes +func (beWs *binanceExchangeWs) keepAliveStreamService(client *binance.Client, key string) { + + for { + time.Sleep(time.Minute * 50) + err := client.NewKeepaliveUserStreamService().ListenKey(key).Do(context.Background()) + + if err != nil { + log.Printf("Error keepAliveStreamService %v\n", err) + } + + beWs.errUserStream = err + } +} + func (beWs *binanceExchangeWs) isSubscribedUserStream() bool { _, isStream := beWs.streams[STREAM_USER] @@ -401,7 +408,7 @@ func (beWs *binanceExchangeWs) subscribeUserStream() error { return fmt.Errorf("error when creating listenKey: %s", err) } - go keepAliveStreamService(binanceClient, listenKey) + go beWs.keepAliveStreamService(binanceClient, listenKey) streamUser, err := subcribeUserStream(listenKey, beWs.events.TradeHistoryEvents) @@ -551,6 +558,10 @@ func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount in } + if bookData.err != nil { + return nil, fmt.Errorf("error from stream:%v", bookData.err) + } + //Show how old is the orderbook log.Printf("OrderBook for %s is %d milliseconds old!\n", symbol, time.Now().Sub(bookData.createdAt).Milliseconds()) @@ -624,6 +635,10 @@ func (beWs *binanceExchangeWs) readOrders(orders []common.PriceLevel, pair *mode // GetTradeHistory impl func (beWs *binanceExchangeWs) GetTradeHistory(pair model.TradingPair, maybeCursorStart interface{}, maybeCursorEnd interface{}) (*api.TradeHistoryResult, error) { + if beWs.errUserStream != nil { + return nil, fmt.Errorf("error from update listen key:%v", beWs.errUserStream) + } + if !beWs.isSubscribedUserStream() { if err := beWs.subscribeUserStream(); err != nil { return nil, fmt.Errorf("error subscribing to user stream: %s", err) @@ -641,6 +656,10 @@ func (beWs *binanceExchangeWs) GetTradeHistory(pair model.TradingPair, maybeCurs return nil, fmt.Errorf("no trade history for trading pair '%s'", symbol) } + if data.err != nil { + return nil, fmt.Errorf("error from stream:%v", data.err) + } + history, isOk := data.data.(History) if !isOk { @@ -690,6 +709,10 @@ func (beWs *binanceExchangeWs) getCursor(trades []model.Trade) (interface{}, err // GetLatestTradeCursor impl. func (beWs *binanceExchangeWs) GetLatestTradeCursor() (interface{}, error) { + if beWs.errUserStream != nil { + return nil, fmt.Errorf("error from update listen key:%v", beWs.errUserStream) + } + if !beWs.isSubscribedUserStream() { if err := beWs.subscribeUserStream(); err != nil { return nil, fmt.Errorf("error subscribing to user stream: %s", err) @@ -702,6 +725,10 @@ func (beWs *binanceExchangeWs) GetLatestTradeCursor() (interface{}, error) { return nil, errors.New("Missing cursor") } + if lastTradeCursor.err != nil { + return nil, fmt.Errorf("error from stream:%v", lastTradeCursor.err) + } + cursor, isOk := lastTradeCursor.data.(int64) if !isOk { diff --git a/plugins/binanceExchange_ws_test.go b/plugins/binanceExchange_ws_test.go index facfc2ef1..5d705e889 100644 --- a/plugins/binanceExchange_ws_test.go +++ b/plugins/binanceExchange_ws_test.go @@ -1,6 +1,7 @@ package plugins import ( + "errors" "fmt" "strconv" "testing" @@ -116,7 +117,7 @@ func Test_binanceExchangeWs_GetLatestTradeCursor(t *testing.T) { return } - testBinanceExchangeWs.events.TradeHistoryEvents.Set(LAST_CURSOR_KEY, "fail") + testBinanceExchangeWs.events.TradeHistoryEvents.Set(LAST_CURSOR_KEY, "fail", nil) cursor, e = testBinanceExchangeWs.GetLatestTradeCursor() @@ -124,12 +125,20 @@ func Test_binanceExchangeWs_GetLatestTradeCursor(t *testing.T) { return } - testBinanceExchangeWs.events.TradeHistoryEvents.Set(LAST_CURSOR_KEY, time.Now().Unix()) + testBinanceExchangeWs.events.TradeHistoryEvents.Set(LAST_CURSOR_KEY, time.Now().Unix(), errors.New("test")) + + cursor, e = testBinanceExchangeWs.GetLatestTradeCursor() + if !assert.Error(t, e) { + return + } + + testBinanceExchangeWs.events.TradeHistoryEvents.Set(LAST_CURSOR_KEY, time.Now().Unix(), nil) cursor, e = testBinanceExchangeWs.GetLatestTradeCursor() if !assert.NoError(t, e) { return } + endIntervalSecs := time.Now().Unix() if !assert.IsType(t, "string", cursor) { From 2f5963a1b0a23dfc9ff1c5bebb8ba8f0f512c267 Mon Sep 17 00:00:00 2001 From: tibrn Date: Mon, 26 Jul 2021 21:23:25 +0300 Subject: [PATCH 15/16] wrap errors --- plugins/binanceExchange_ws.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index a3cd26951..a8bb02720 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -753,17 +753,17 @@ func (beWs *binanceExchangeWs) readTrade(pair *model.TradingPair, symbol string, orderPrice, err := strconv.ParseFloat(rawTrade.OrderPrice, 64) if err != nil { - return nil, err + return nil, fmt.Errorf("error converting OrderPrice:%v", err) } orderQuantity, err := strconv.ParseFloat(rawTrade.OrderQuantity, 64) if err != nil { - return nil, err + return nil, fmt.Errorf("error converting OrderQuantity:%v", err) } comissionAmount, err := strconv.ParseFloat(rawTrade.ComissionAmount, 64) if err != nil { - return nil, err + return nil, fmt.Errorf("error converting ComissionAmount:%v", err) } trade := model.Trade{ From 88d2b2462f6ec6d04389a8856701e29ad5afbb86 Mon Sep 17 00:00:00 2001 From: tibrn Date: Mon, 26 Jul 2021 21:38:47 +0300 Subject: [PATCH 16/16] rename error --- plugins/binanceExchange_ws.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index a8bb02720..e809863f4 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -462,7 +462,7 @@ func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe //We couldn't subscribe for this pair if !isStream { - return mapData{}, fmt.Errorf("error while subscribing for %s", streamName) + return mapData{}, fmt.Errorf("error data doesn't exist for %s", streamName) } return data, nil