Skip to content

Commit

Permalink
stream/match: Reduce complexity and limit locking when match occurs (#…
Browse files Browse the repository at this point in the history
…1581)

* stream match update

* update tests

* linter: fix

* glorious: nits + handle context cancellations

* glorious: whooops

* Websocket: Add SendMessageReturnResponses

* whooooooopsie

* gk: nitssssss

* Update exchanges/stream/stream_match.go

Co-authored-by: Gareth Kirwan <[email protected]>

* Update exchanges/stream/stream_match_test.go

Co-authored-by: Gareth Kirwan <[email protected]>

* linter: appease the linter gods

* glorious: nits

* glorious: nits

* Update exchanges/stream/stream_match_test.go

Co-authored-by: Scott <[email protected]>

---------

Co-authored-by: Ryan O'Hara-Reid <[email protected]>
Co-authored-by: Gareth Kirwan <[email protected]>
Co-authored-by: Scott <[email protected]>
  • Loading branch information
4 people authored Aug 19, 2024
1 parent 225429b commit 17c2ef2
Show file tree
Hide file tree
Showing 23 changed files with 203 additions and 174 deletions.
2 changes: 1 addition & 1 deletion exchanges/binance/binance_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (b *Binance) manageSubs(op string, subs subscription.List) error {
Params: subs.QualifiedChannels(),
}

respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(req.ID, req)
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), req.ID, req)
if err == nil {
if v, d, _, rErr := jsonparser.Get(respRaw, "result"); rErr != nil {
err = rErr
Expand Down
7 changes: 3 additions & 4 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,7 @@ func TestWsCancelOffer(t *testing.T) {
}

func TestWsSubscribedResponse(t *testing.T) {
m, err := b.Websocket.Match.Set("subscribe:waiter1")
ch, err := b.Websocket.Match.Set("subscribe:waiter1", 1)
assert.NoError(t, err, "Setting a matcher should not error")
err = b.wsHandleData([]byte(`{"event":"subscribed","channel":"ticker","chanId":224555,"subId":"waiter1","symbol":"tBTCUSD","pair":"BTCUSD"}`))
if assert.Error(t, err, "Should error if sub is not registered yet") {
Expand All @@ -1328,13 +1328,12 @@ func TestWsSubscribedResponse(t *testing.T) {
require.NoError(t, err, "AddSubscriptions must not error")
err = b.wsHandleData([]byte(`{"event":"subscribed","channel":"ticker","chanId":224555,"subId":"waiter1","symbol":"tBTCUSD","pair":"BTCUSD"}`))
assert.NoError(t, err, "wsHandleData should not error")
if assert.NotEmpty(t, m.C, "Matcher should have received a sub notification") {
msg := <-m.C
if assert.NotEmpty(t, ch, "Matcher should have received a sub notification") {
msg := <-ch
cID, err := jsonparser.GetInt(msg, "chanId")
assert.NoError(t, err, "Should get chanId from sub notification without error")
assert.EqualValues(t, 224555, cID, "Should get the correct chanId through the matcher notification")
}
m.Cleanup()
}

func TestWsOrderBook(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1756,7 +1756,7 @@ func (b *Bitfinex) subscribeToChan(chans subscription.List) error {
_ = b.Websocket.RemoveSubscriptions(c)
}()

respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subID, req)
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), "subscribe:"+subID, req)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pairs)
}
Expand Down Expand Up @@ -1849,7 +1849,7 @@ func (b *Bitfinex) unsubscribeFromChan(chans subscription.List) error {
"chanId": chanID,
}

respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("unsubscribe:"+strconv.Itoa(chanID), req)
respRaw, err := b.Websocket.Conn.SendMessageReturnResponse(context.TODO(), "unsubscribe:"+strconv.Itoa(chanID), req)
if err != nil {
return err
}
Expand Down Expand Up @@ -1926,7 +1926,7 @@ func (b *Bitfinex) WsSendAuth(ctx context.Context) error {
func (b *Bitfinex) WsNewOrder(data *WsNewOrderRequest) (string, error) {
data.CustomID = b.Websocket.AuthConn.GenerateMessageID(false)
request := makeRequestInterface(wsOrderNew, data)
resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(data.CustomID, request)
resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), data.CustomID, request)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -1983,7 +1983,7 @@ func (b *Bitfinex) WsNewOrder(data *WsNewOrderRequest) (string, error) {
// WsModifyOrder authenticated modify order request
func (b *Bitfinex) WsModifyOrder(data *WsUpdateOrderRequest) error {
request := makeRequestInterface(wsOrderUpdate, data)
resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(data.OrderID, request)
resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), data.OrderID, request)
if err != nil {
return err
}
Expand Down Expand Up @@ -2037,7 +2037,7 @@ func (b *Bitfinex) WsCancelOrder(orderID int64) error {
OrderID: orderID,
}
request := makeRequestInterface(wsOrderCancel, cancel)
resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(orderID, request)
resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), orderID, request)
if err != nil {
return err
}
Expand Down Expand Up @@ -2094,7 +2094,7 @@ func (b *Bitfinex) WsCancelOffer(orderID int64) error {
OrderID: orderID,
}
request := makeRequestInterface(wsFundingOfferCancel, cancel)
resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(orderID, request)
resp, err := b.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), orderID, request)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions exchanges/bybit/bybit_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (by *Bybit) WsAuth(ctx context.Context) error {
Operation: "auth",
Args: []interface{}{creds.Key, intNonce, sign},
}
resp, err := by.Websocket.AuthConn.SendMessageReturnResponse(req.RequestID, req)
resp, err := by.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), req.RequestID, req)
if err != nil {
return err
}
Expand Down Expand Up @@ -220,12 +220,12 @@ func (by *Bybit) handleSpotSubscription(operation string, channelsToSubscribe su
for a := range payloads {
var response []byte
if payloads[a].auth {
response, err = by.Websocket.AuthConn.SendMessageReturnResponse(payloads[a].RequestID, payloads[a])
response, err = by.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), payloads[a].RequestID, payloads[a])
if err != nil {
return err
}
} else {
response, err = by.Websocket.Conn.SendMessageReturnResponse(payloads[a].RequestID, payloads[a])
response, err = by.Websocket.Conn.SendMessageReturnResponse(context.TODO(), payloads[a].RequestID, payloads[a])
if err != nil {
return err
}
Expand Down
20 changes: 10 additions & 10 deletions exchanges/coinut/coinut_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func (c *COINUT) WsGetInstruments() (Instruments, error) {
SecurityType: strings.ToUpper(asset.Spot.String()),
Nonce: getNonce(),
}
resp, err := c.Websocket.Conn.SendMessageReturnResponse(request.Nonce, request)
resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Nonce, request)
if err != nil {
return list, err
}
Expand Down Expand Up @@ -648,7 +648,7 @@ func (c *COINUT) Unsubscribe(channelToUnsubscribe subscription.List) error {
Subscribe: false,
Nonce: getNonce(),
}
resp, err := c.Websocket.Conn.SendMessageReturnResponse(subscribe.Nonce, subscribe)
resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), subscribe.Nonce, subscribe)
if err != nil {
errs = common.AppendError(errs, err)
continue
Expand Down Expand Up @@ -691,7 +691,7 @@ func (c *COINUT) wsAuthenticate(ctx context.Context) error {
}
r.Hmac = crypto.HexEncodeToString(hmac)

resp, err := c.Websocket.Conn.SendMessageReturnResponse(r.Nonce, r)
resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), r.Nonce, r)
if err != nil {
return err
}
Expand All @@ -714,7 +714,7 @@ func (c *COINUT) wsGetAccountBalance() (*UserBalance, error) {
Request: "user_balance",
Nonce: getNonce(),
}
resp, err := c.Websocket.Conn.SendMessageReturnResponse(accBalance.Nonce, accBalance)
resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), accBalance.Nonce, accBalance)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -750,7 +750,7 @@ func (c *COINUT) wsSubmitOrder(o *WsSubmitOrderParameters) (*order.Detail, error
if o.OrderID > 0 {
orderSubmissionRequest.OrderID = o.OrderID
}
resp, err := c.Websocket.Conn.SendMessageReturnResponse(orderSubmissionRequest.Nonce, orderSubmissionRequest)
resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), orderSubmissionRequest.Nonce, orderSubmissionRequest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -793,7 +793,7 @@ func (c *COINUT) wsSubmitOrders(orders []WsSubmitOrderParameters) ([]order.Detai

orderRequest.Nonce = getNonce()
orderRequest.Request = "new_orders"
resp, err := c.Websocket.Conn.SendMessageReturnResponse(orderRequest.Nonce, orderRequest)
resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), orderRequest.Nonce, orderRequest)
if err != nil {
errs = append(errs, err)
return nil, errs
Expand Down Expand Up @@ -829,7 +829,7 @@ func (c *COINUT) wsGetOpenOrders(curr string) (*WsUserOpenOrdersResponse, error)
openOrdersRequest.Nonce = getNonce()
openOrdersRequest.InstrumentID = c.instrumentMap.LookupID(curr)

resp, err := c.Websocket.Conn.SendMessageReturnResponse(openOrdersRequest.Nonce, openOrdersRequest)
resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), openOrdersRequest.Nonce, openOrdersRequest)
if err != nil {
return response, err
}
Expand Down Expand Up @@ -862,7 +862,7 @@ func (c *COINUT) wsCancelOrder(cancellation *WsCancelOrderParameters) (*CancelOr
cancellationRequest.OrderID = cancellation.OrderID
cancellationRequest.Nonce = getNonce()

resp, err := c.Websocket.Conn.SendMessageReturnResponse(cancellationRequest.Nonce, cancellationRequest)
resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), cancellationRequest.Nonce, cancellationRequest)
if err != nil {
return response, err
}
Expand Down Expand Up @@ -903,7 +903,7 @@ func (c *COINUT) wsCancelOrders(cancellations []WsCancelOrderParameters) (*Cance

cancelOrderRequest.Request = "cancel_orders"
cancelOrderRequest.Nonce = getNonce()
resp, err := c.Websocket.Conn.SendMessageReturnResponse(cancelOrderRequest.Nonce, cancelOrderRequest)
resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), cancelOrderRequest.Nonce, cancelOrderRequest)
if err != nil {
return response, err
}
Expand Down Expand Up @@ -933,7 +933,7 @@ func (c *COINUT) wsGetTradeHistory(p currency.Pair, start, limit int64) (*WsTrad
request.Start = start
request.Limit = limit

resp, err := c.Websocket.Conn.SendMessageReturnResponse(request.Nonce, request)
resp, err := c.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.Nonce, request)
if err != nil {
return response, err
}
Expand Down
4 changes: 2 additions & 2 deletions exchanges/deribit/deribit_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (d *Deribit) wsLogin(ctx context.Context) error {
"signature": crypto.HexEncodeToString(hmac),
},
}
resp, err := d.Websocket.Conn.SendMessageReturnResponse(request.ID, request)
resp, err := d.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request)
if err != nil {
d.Websocket.SetCanUseAuthenticatedEndpoints(false)
return err
Expand Down Expand Up @@ -1165,7 +1165,7 @@ func (d *Deribit) handleSubscription(operation string, channels subscription.Lis
return err
}
for x := range payloads {
data, err := d.Websocket.Conn.SendMessageReturnResponse(payloads[x].ID, payloads[x])
data, err := d.Websocket.Conn.SendMessageReturnResponse(context.TODO(), payloads[x].ID, payloads[x])
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/deribit/deribit_websocket_eps.go
Original file line number Diff line number Diff line change
Expand Up @@ -2406,7 +2406,7 @@ func (d *Deribit) sendWsPayload(ep request.EndpointLimit, input *WsRequest, resp
log.Debugf(log.RequestSys, "%s attempt %d", d.Name, attempt)
}
var payload []byte
payload, err = d.Websocket.Conn.SendMessageReturnResponse(input.ID, input)
payload, err = d.Websocket.Conn.SendMessageReturnResponse(context.TODO(), input.ID, input)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion exchanges/gateio/gateio_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func (g *Gateio) handleSubscription(event string, channelsToSubscribe subscripti
}
var errs error
for k := range payloads {
result, err := g.Websocket.Conn.SendMessageReturnResponse(payloads[k].ID, payloads[k])
result, err := g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), payloads[k].ID, payloads[k])
if err != nil {
errs = common.AppendError(errs, err)
continue
Expand Down
4 changes: 2 additions & 2 deletions exchanges/gateio/gateio_ws_delivery_futures.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ func (g *Gateio) handleDeliveryFuturesSubscription(event string, channelsToSubsc
for con, val := range payloads {
for k := range val {
if con == 0 {
respByte, err = g.Websocket.Conn.SendMessageReturnResponse(val[k].ID, val[k])
respByte, err = g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), val[k].ID, val[k])
} else {
respByte, err = g.Websocket.AuthConn.SendMessageReturnResponse(val[k].ID, val[k])
respByte, err = g.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), val[k].ID, val[k])
}
if err != nil {
errs = common.AppendError(errs, err)
Expand Down
4 changes: 2 additions & 2 deletions exchanges/gateio/gateio_ws_futures.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ func (g *Gateio) handleFuturesSubscription(event string, channelsToSubscribe sub
for con, val := range payloads {
for k := range val {
if con == 0 {
respByte, err = g.Websocket.Conn.SendMessageReturnResponse(val[k].ID, val[k])
respByte, err = g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), val[k].ID, val[k])
} else {
respByte, err = g.Websocket.AuthConn.SendMessageReturnResponse(val[k].ID, val[k])
respByte, err = g.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), val[k].ID, val[k])
}
if err != nil {
errs = common.AppendError(errs, err)
Expand Down
2 changes: 1 addition & 1 deletion exchanges/gateio/gateio_ws_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (g *Gateio) handleOptionsSubscription(event string, channelsToSubscribe sub
}
var errs error
for k := range payloads {
result, err := g.Websocket.Conn.SendMessageReturnResponse(payloads[k].ID, payloads[k])
result, err := g.Websocket.Conn.SendMessageReturnResponse(context.TODO(), payloads[k].ID, payloads[k])
if err != nil {
errs = common.AppendError(errs, err)
continue
Expand Down
16 changes: 8 additions & 8 deletions exchanges/hitbtc/hitbtc_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func (h *HitBTC) wsPlaceOrder(pair currency.Pair, side string, price, quantity f
},
ID: id,
}
resp, err := h.Websocket.Conn.SendMessageReturnResponse(id, request)
resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), id, request)
if err != nil {
return nil, fmt.Errorf("%v %v", h.Name, err)
}
Expand All @@ -659,7 +659,7 @@ func (h *HitBTC) wsCancelOrder(clientOrderID string) (*WsCancelOrderResponse, er
},
ID: h.Websocket.Conn.GenerateMessageID(false),
}
resp, err := h.Websocket.Conn.SendMessageReturnResponse(request.ID, request)
resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request)
if err != nil {
return nil, fmt.Errorf("%v %v", h.Name, err)
}
Expand Down Expand Up @@ -689,7 +689,7 @@ func (h *HitBTC) wsReplaceOrder(clientOrderID string, quantity, price float64) (
},
ID: h.Websocket.Conn.GenerateMessageID(false),
}
resp, err := h.Websocket.Conn.SendMessageReturnResponse(request.ID, request)
resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request)
if err != nil {
return nil, fmt.Errorf("%v %v", h.Name, err)
}
Expand All @@ -714,7 +714,7 @@ func (h *HitBTC) wsGetActiveOrders() (*wsActiveOrdersResponse, error) {
Params: WsReplaceOrderRequestData{},
ID: h.Websocket.Conn.GenerateMessageID(false),
}
resp, err := h.Websocket.Conn.SendMessageReturnResponse(request.ID, request)
resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request)
if err != nil {
return nil, fmt.Errorf("%v %v", h.Name, err)
}
Expand All @@ -739,7 +739,7 @@ func (h *HitBTC) wsGetTradingBalance() (*WsGetTradingBalanceResponse, error) {
Params: WsReplaceOrderRequestData{},
ID: h.Websocket.Conn.GenerateMessageID(false),
}
resp, err := h.Websocket.Conn.SendMessageReturnResponse(request.ID, request)
resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request)
if err != nil {
return nil, fmt.Errorf("%v %v", h.Name, err)
}
Expand All @@ -763,7 +763,7 @@ func (h *HitBTC) wsGetCurrencies(currencyItem currency.Code) (*WsGetCurrenciesRe
},
ID: h.Websocket.Conn.GenerateMessageID(false),
}
resp, err := h.Websocket.Conn.SendMessageReturnResponse(request.ID, request)
resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request)
if err != nil {
return nil, fmt.Errorf("%v %v", h.Name, err)
}
Expand Down Expand Up @@ -792,7 +792,7 @@ func (h *HitBTC) wsGetSymbols(c currency.Pair) (*WsGetSymbolsResponse, error) {
},
ID: h.Websocket.Conn.GenerateMessageID(false),
}
resp, err := h.Websocket.Conn.SendMessageReturnResponse(request.ID, request)
resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request)
if err != nil {
return nil, fmt.Errorf("%v %v", h.Name, err)
}
Expand Down Expand Up @@ -824,7 +824,7 @@ func (h *HitBTC) wsGetTrades(c currency.Pair, limit int64, sort, by string) (*Ws
},
ID: h.Websocket.Conn.GenerateMessageID(false),
}
resp, err := h.Websocket.Conn.SendMessageReturnResponse(request.ID, request)
resp, err := h.Websocket.Conn.SendMessageReturnResponse(context.TODO(), request.ID, request)
if err != nil {
return nil, fmt.Errorf("%v %v", h.Name, err)
}
Expand Down
6 changes: 3 additions & 3 deletions exchanges/huobi/huobi_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func (h *HUOBI) wsGetAccountsList(ctx context.Context) (*WsAuthenticatedAccounts
}
request.Signature = crypto.Base64Encode(hmac)
request.ClientID = h.Websocket.AuthConn.GenerateMessageID(true)
resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(request.ClientID, request)
resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.ClientID, request)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -752,7 +752,7 @@ func (h *HUOBI) wsGetOrdersList(ctx context.Context, accountID int64, pair curre
request.Signature = crypto.Base64Encode(hmac)
request.ClientID = h.Websocket.AuthConn.GenerateMessageID(true)

resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(request.ClientID, request)
resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.ClientID, request)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -794,7 +794,7 @@ func (h *HUOBI) wsGetOrderDetails(ctx context.Context, orderID string) (*WsAuthe
}
request.Signature = crypto.Base64Encode(hmac)
request.ClientID = h.Websocket.AuthConn.GenerateMessageID(true)
resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(request.ClientID, request)
resp, err := h.Websocket.AuthConn.SendMessageReturnResponse(context.TODO(), request.ClientID, request)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 17c2ef2

Please sign in to comment.