Skip to content

Commit

Permalink
orderbook: Check assignment of time values and reject if not set (thr…
Browse files Browse the repository at this point in the history
…asher-corp#1318)

* orderbook: Check assignment of time values and reject if not set.

* linter: fix

* buffer: additional linter winter fixter

* Implement through pending exchanges

* finished push

* linty: minty

* gomod: tidy

* thrasher: nits

* glorious: nits

* orderbook: purge type now in favour of external call allocation

* orderbook: push last param

* orderbook: only 1 unlock call is needed

---------

Co-authored-by: Ryan O'Hara-Reid <[email protected]>
  • Loading branch information
shazbert and Ryan O'Hara-Reid committed Nov 9, 2023
1 parent 944fb8d commit 48057e0
Show file tree
Hide file tree
Showing 31 changed files with 726 additions and 327 deletions.
15 changes: 12 additions & 3 deletions engine/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3461,7 +3461,10 @@ func TestGetOrderbookMovement(t *testing.T) {
{Price: 13, Amount: 1},
{Price: 14, Amount: 1},
}
depth.LoadSnapshot(bid, ask, 0, time.Time{}, true)
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
if err != nil {
t.Fatal(err)
}

_, err = s.GetOrderbookMovement(context.Background(), req)
if err.Error() != "quote amount invalid" {
Expand Down Expand Up @@ -3571,7 +3574,10 @@ func TestGetOrderbookAmountByNominal(t *testing.T) {
{Price: 13, Amount: 1},
{Price: 14, Amount: 1},
}
depth.LoadSnapshot(bid, ask, 0, time.Time{}, true)
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
if err != nil {
t.Fatal(err)
}

nominal, err := s.GetOrderbookAmountByNominal(context.Background(), req)
if !errors.Is(err, nil) {
Expand Down Expand Up @@ -3674,7 +3680,10 @@ func TestGetOrderbookAmountByImpact(t *testing.T) {
{Price: 13, Amount: 1},
{Price: 14, Amount: 1},
}
depth.LoadSnapshot(bid, ask, 0, time.Time{}, true)
err = depth.LoadSnapshot(bid, ask, 0, time.Now(), true)
if err != nil {
t.Fatal(err)
}

req.ImpactPercentage = 9.090909090909092
impact, err := s.GetOrderbookAmountByImpact(context.Background(), req)
Expand Down
2 changes: 1 addition & 1 deletion exchanges/binance/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2123,7 +2123,7 @@ func TestWsDepthUpdate(t *testing.T) {

p := currency.NewPairWithDelimiter("BTC", "USDT", "-")
if err := b.SeedLocalCacheWithBook(p, &book); err != nil {
t.Error(err)
t.Fatal(err)
}

if err := b.wsHandleData(update1); err != nil {
Expand Down
1 change: 1 addition & 0 deletions exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (b *Binance) SeedLocalCacheWithBook(p currency.Pair, orderbookNew *OrderBoo
VerifyOrderbook: b.CanVerifyOrderbook,
Bids: make(orderbook.Items, len(orderbookNew.Bids)),
Asks: make(orderbook.Items, len(orderbookNew.Asks)),
LastUpdated: time.Now(), // Time not provided in REST book.
}
for i := range orderbookNew.Bids {
newOrderBook.Bids[i] = orderbook.Item{
Expand Down
2 changes: 1 addition & 1 deletion exchanges/binanceus/binanceus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,7 @@ func TestWebsocketOrderBookDepthDiffStream(t *testing.T) {

p := currency.NewPairWithDelimiter("BTC", "USDT", "-")
if err := bi.SeedLocalCacheWithBook(p, &book); err != nil {
t.Error(err)
t.Fatal(err)
}
if err := bi.wsHandleData(update1); err != nil {
t.Error(err)
Expand Down
1 change: 1 addition & 0 deletions exchanges/binanceus/binanceus_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ func (bi *Binanceus) SeedLocalCacheWithBook(p currency.Pair, orderbookNew *Order
VerifyOrderbook: bi.CanVerifyOrderbook,
Bids: make(orderbook.Items, len(orderbookNew.Bids)),
Asks: make(orderbook.Items, len(orderbookNew.Asks)),
LastUpdated: time.Now(), // Time not provided in REST book.
}
for i := range orderbookNew.Bids {
newOrderBook.Bids[i] = orderbook.Item{
Expand Down
4 changes: 2 additions & 2 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,7 @@ func (b *Bitfinex) WsInsertSnapshot(p currency.Pair, assetType asset.Item, books
book.PriceDuplication = true
book.IsFundingRate = fundingRate
book.VerifyOrderbook = b.CanVerifyOrderbook
book.LastUpdated = time.Now()
book.LastUpdated = time.Now() // Not included in snapshot
return b.Websocket.Orderbook.LoadSnapshot(&book)
}

Expand All @@ -1451,7 +1451,7 @@ func (b *Bitfinex) WsUpdateOrderbook(p currency.Pair, assetType asset.Item, book
Pair: p,
Bids: make([]orderbook.Item, 0, len(book)),
Asks: make([]orderbook.Item, 0, len(book)),
UpdateTime: time.Now(),
UpdateTime: time.Now(), // Not included in update
}

for i := range book {
Expand Down
8 changes: 4 additions & 4 deletions exchanges/bitmex/bitmex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ func TestWSOrderbookHandling(t *testing.T) {
"attributes":{"id":"sorted","symbol":"grouped"},
"action":"partial",
"data":[
{"symbol":"ETHUSD","id":17999992000,"side":"Sell","size":100,"price":80},
{"symbol":"ETHUSD","id":17999992000,"side":"Sell","size":100,"price":80,"timestamp":"2017-04-04T22:16:38.461Z"},
{"symbol":"ETHUSD","id":17999993000,"side":"Sell","size":20,"price":70},
{"symbol":"ETHUSD","id":17999994000,"side":"Sell","size":10,"price":60},
{"symbol":"ETHUSD","id":17999995000,"side":"Buy","size":10,"price":50},
Expand All @@ -958,7 +958,7 @@ func TestWSOrderbookHandling(t *testing.T) {
"table":"orderBookL2_25",
"action":"update",
"data":[
{"symbol":"ETHUSD","id":17999995000,"side":"Buy","size":5}
{"symbol":"ETHUSD","id":17999995000,"side":"Buy","size":5,"timestamp":"2017-04-04T22:16:38.461Z"}
]
}`)
err = b.wsHandleData(pressXToJSON)
Expand All @@ -981,7 +981,7 @@ func TestWSOrderbookHandling(t *testing.T) {
"table":"orderBookL2_25",
"action":"delete",
"data":[
{"symbol":"ETHUSD","id":17999995000,"side":"Buy"}
{"symbol":"ETHUSD","id":17999995000,"side":"Buy","timestamp":"2017-04-04T22:16:38.461Z"}
]
}`)
err = b.wsHandleData(pressXToJSON)
Expand All @@ -993,7 +993,7 @@ func TestWSOrderbookHandling(t *testing.T) {
"table":"orderBookL2_25",
"action":"delete",
"data":[
{"symbol":"ETHUSD","id":17999995000,"side":"Buy"}
{"symbol":"ETHUSD","id":17999995000,"side":"Buy","timestamp":"2017-04-04T22:16:38.461Z"}
]
}`)
err = b.wsHandleData(pressXToJSON)
Expand Down
11 changes: 6 additions & 5 deletions exchanges/bitmex/bitmex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,12 @@ type Order struct {

// OrderBookL2 contains order book l2
type OrderBookL2 struct {
ID int64 `json:"id"`
Price float64 `json:"price"`
Side string `json:"side"`
Size int64 `json:"size"`
Symbol string `json:"symbol"`
ID int64 `json:"id"`
Price float64 `json:"price"`
Side string `json:"side"`
Size int64 `json:"size"`
Symbol string `json:"symbol"`
Timestamp time.Time `json:"timestamp"`
}

// Position Summary of Open and Closed Positions
Expand Down
12 changes: 7 additions & 5 deletions exchanges/bitmex/bitmex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, p currency.
book.Pair = p
book.Exchange = b.Name
book.VerifyOrderbook = b.CanVerifyOrderbook
book.LastUpdated = data[0].Timestamp

err := b.Websocket.Orderbook.LoadSnapshot(&book)
if err != nil {
Expand Down Expand Up @@ -528,11 +529,12 @@ func (b *Bitmex) processOrderbook(data []OrderBookL2, action string, p currency.
}

err = b.Websocket.Orderbook.Update(&orderbook.Update{
Bids: bids,
Asks: asks,
Pair: p,
Asset: a,
Action: updateAction,
Bids: bids,
Asks: asks,
Pair: p,
Asset: a,
Action: updateAction,
UpdateTime: data[0].Timestamp,
})
if err != nil {
return err
Expand Down
11 changes: 8 additions & 3 deletions exchanges/btse/btse_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,19 @@ func (b *BTSE) wsHandleData(respRaw []byte) error {
if err != nil {
return err
}
log.Infof(log.WebsocketMgr, "%v subscribed to %v", b.Name, strings.Join(subscribe.Channel, ", "))
if b.Verbose {
log.Infof(log.WebsocketMgr, "%v subscribed to %v", b.Name, strings.Join(subscribe.Channel, ", "))
}
case "login":
var login WsLoginAcknowledgement
err = json.Unmarshal(respRaw, &login)
if err != nil {
return err
}
b.Websocket.SetCanUseAuthenticatedEndpoints(login.Success)
log.Infof(log.WebsocketMgr, "%v websocket authenticated: %v", b.Name, login.Success)
if b.Verbose {
log.Infof(log.WebsocketMgr, "%v websocket authenticated: %v", b.Name, login.Success)
}
default:
return errors.New(b.Name + stream.UnhandledMessage + string(respRaw))
}
Expand Down Expand Up @@ -265,7 +269,7 @@ func (b *BTSE) wsHandleData(respRaw []byte) error {
})
}
return trade.AddTradesToBuffer(b.Name, trades...)
case strings.Contains(topic, "orderBookL2Api"):
case strings.Contains(topic, "orderBookL2Api"): // TODO: Fix orderbook updates.
var t wsOrderBook
err = json.Unmarshal(respRaw, &t)
if err != nil {
Expand Down Expand Up @@ -328,6 +332,7 @@ func (b *BTSE) wsHandleData(respRaw []byte) error {
newOB.Exchange = b.Name
newOB.Asks.Reverse() // Reverse asks for correct alignment
newOB.VerifyOrderbook = b.CanVerifyOrderbook
newOB.LastUpdated = time.Now() // NOTE: Temp to fix test.
err = b.Websocket.Orderbook.LoadSnapshot(&newOB)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions exchanges/coinbasepro/coinbasepro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,8 @@ func TestWsOrderbook(t *testing.T) {
"type": "snapshot",
"product_id": "BTC-USD",
"bids": [["10101.10", "0.45054140"]],
"asks": [["10102.55", "0.57753524"]]
"asks": [["10102.55", "0.57753524"]],
"time":"2023-08-15T06:46:55.376250Z"
}`)
err := c.wsHandleData(pressXToJSON)
if err != nil {
Expand All @@ -862,7 +863,7 @@ func TestWsOrderbook(t *testing.T) {
pressXToJSON = []byte(`{
"type": "l2update",
"product_id": "BTC-USD",
"time": "2019-08-14T20:42:27.265Z",
"time": "2023-08-15T06:46:57.933713Z",
"changes": [
[
"buy",
Expand Down
3 changes: 2 additions & 1 deletion exchanges/coinbasepro/coinbasepro_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,14 @@ type WebsocketOrderbookSnapshot struct {
Type string `json:"type"`
Bids [][2]string `json:"bids"`
Asks [][2]string `json:"asks"`
Time time.Time `json:"time"`
}

// WebsocketL2Update defines an update on the L2 orderbooks
type WebsocketL2Update struct {
Type string `json:"type"`
ProductID string `json:"product_id"`
Time string `json:"time"`
Time time.Time `json:"time"`
Changes [][3]string `json:"changes"`
}

Expand Down
36 changes: 17 additions & 19 deletions exchanges/coinbasepro/coinbasepro_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error {
}

case "snapshot":
snapshot := WebsocketOrderbookSnapshot{}
var snapshot WebsocketOrderbookSnapshot
err := json.Unmarshal(respRaw, &snapshot)
if err != nil {
return err
Expand All @@ -112,9 +112,8 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error {
if err != nil {
return err
}

case "l2update":
update := WebsocketL2Update{}
var update WebsocketL2Update
err := json.Unmarshal(respRaw, &update)
if err != nil {
return err
Expand Down Expand Up @@ -291,38 +290,38 @@ func (c *CoinbasePro) ProcessSnapshot(snapshot *WebsocketOrderbookSnapshot) erro
}

for i := range snapshot.Bids {
price, err := strconv.ParseFloat(snapshot.Bids[i][0], 64)
var price float64
price, err = strconv.ParseFloat(snapshot.Bids[i][0], 64)
if err != nil {
return err
}

amount, err := strconv.ParseFloat(snapshot.Bids[i][1], 64)
var amount float64
amount, err = strconv.ParseFloat(snapshot.Bids[i][1], 64)
if err != nil {
return err
}

base.Bids[i] = orderbook.Item{Price: price, Amount: amount}
}

for i := range snapshot.Asks {
price, err := strconv.ParseFloat(snapshot.Asks[i][0], 64)
var price float64
price, err = strconv.ParseFloat(snapshot.Asks[i][0], 64)
if err != nil {
return err
}

amount, err := strconv.ParseFloat(snapshot.Asks[i][1], 64)
var amount float64
amount, err = strconv.ParseFloat(snapshot.Asks[i][1], 64)
if err != nil {
return err
}

base.Asks[i] = orderbook.Item{Price: price, Amount: amount}
}

base.Asset = asset.Spot
base.Pair = pair
base.Exchange = c.Name
base.VerifyOrderbook = c.CanVerifyOrderbook

base.LastUpdated = snapshot.Time
return c.Websocket.Orderbook.LoadSnapshot(&base)
}

Expand All @@ -337,11 +336,6 @@ func (c *CoinbasePro) ProcessUpdate(update *WebsocketL2Update) error {
return err
}

timestamp, err := time.Parse(time.RFC3339, update.Time)
if err != nil {
return err
}

asks := make(orderbook.Items, 0, len(update.Changes))
bids := make(orderbook.Items, 0, len(update.Changes))

Expand All @@ -365,14 +359,18 @@ func (c *CoinbasePro) ProcessUpdate(update *WebsocketL2Update) error {
Bids: bids,
Asks: asks,
Pair: p,
UpdateTime: timestamp,
UpdateTime: update.Time,
Asset: asset.Spot,
})
}

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (c *CoinbasePro) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error) {
var channels = []string{"heartbeat", "level2", "ticker", "user", "matches"}
var channels = []string{"heartbeat",
"level2_batch", /*Other orderbook feeds require authentication. This is batched in 50ms lots.*/
"ticker",
"user",
"matches"}
enabledCurrencies, err := c.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
Expand Down
8 changes: 5 additions & 3 deletions exchanges/coinut/coinut_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ func (c *COINUT) WsProcessOrderbookSnapshot(ob *WsOrderbookSnapshot) error {

newOrderBook.Asset = asset.Spot
newOrderBook.Exchange = c.Name
newOrderBook.LastUpdated = time.Now() // No time sent

return c.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
}
Expand All @@ -582,9 +583,10 @@ func (c *COINUT) WsProcessOrderbookUpdate(update *WsOrderbookUpdate) error {
}

bufferUpdate := &orderbook.Update{
Pair: p,
UpdateID: update.TransID,
Asset: asset.Spot,
Pair: p,
UpdateID: update.TransID,
Asset: asset.Spot,
UpdateTime: time.Now(), // No time sent
}
if strings.EqualFold(update.Side, order.Buy.Lower()) {
bufferUpdate.Bids = []orderbook.Item{{Price: update.Price, Amount: update.Volume}}
Expand Down
10 changes: 6 additions & 4 deletions exchanges/gemini/gemini_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ func (g *Gemini) wsProcessUpdate(result *wsL2MarketData) error {
newOrderBook.Pair = pair
newOrderBook.Exchange = g.Name
newOrderBook.VerifyOrderbook = g.CanVerifyOrderbook
newOrderBook.LastUpdated = time.Now() // No time is sent
err := g.Websocket.Orderbook.LoadSnapshot(&newOrderBook)
if err != nil {
return err
Expand All @@ -569,10 +570,11 @@ func (g *Gemini) wsProcessUpdate(result *wsL2MarketData) error {
return nil
}
err := g.Websocket.Orderbook.Update(&orderbook.Update{
Asks: asks,
Bids: bids,
Pair: pair,
Asset: asset.Spot,
Asks: asks,
Bids: bids,
Pair: pair,
Asset: asset.Spot,
UpdateTime: time.Now(), // No time is sent
})
if err != nil {
return err
Expand Down
Loading

0 comments on commit 48057e0

Please sign in to comment.