From 57618ced7cad7aa5c5a19217cc1720fe90b519a1 Mon Sep 17 00:00:00 2001 From: edwin Date: Mon, 3 Jun 2024 17:01:57 +0800 Subject: [PATCH 1/2] pkg/exchange: add conn count info event --- pkg/exchange/okex/parse.go | 25 +++++++++++++++++---- pkg/exchange/okex/parse_test.go | 40 +++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/pkg/exchange/okex/parse.go b/pkg/exchange/okex/parse.go index 2baa35fd53..54ea2a2ffb 100644 --- a/pkg/exchange/okex/parse.go +++ b/pkg/exchange/okex/parse.go @@ -99,10 +99,12 @@ func parseWebSocketEvent(in []byte) (interface{}, error) { type WsEventType string const ( - WsEventTypeLogin = "login" - WsEventTypeError = "error" - WsEventTypeSubscribe = "subscribe" - WsEventTypeUnsubscribe = "unsubscribe" + WsEventTypeLogin WsEventType = "login" + WsEventTypeError WsEventType = "error" + WsEventTypeSubscribe WsEventType = "subscribe" + WsEventTypeUnsubscribe WsEventType = "unsubscribe" + WsEventTypeConnectionInfo WsEventType = "channel-conn-count" + WsEventTypeConnectionError WsEventType = "channel-conn-count-error" ) type WebSocketEvent struct { @@ -115,6 +117,8 @@ type WebSocketEvent struct { } `json:"arg,omitempty"` Data json.RawMessage `json:"data"` ActionType ActionType `json:"action"` + Channel Channel `json:"channel"` + ConnCount string `json:"connCount"` } func (w *WebSocketEvent) IsValid() error { @@ -133,6 +137,12 @@ func (w *WebSocketEvent) IsValid() error { } return nil + case WsEventTypeConnectionInfo: + return nil + + case WsEventTypeConnectionError: + return fmt.Errorf("connection rate limit exceeded, channel: %s, connCount: %s", w.Channel, w.ConnCount) + default: return fmt.Errorf("unexpected event type: %+v", w) } @@ -401,3 +411,10 @@ func (m *MarketTradeEvent) toGlobalTrade() (types.Trade, error) { FeeCurrency: "", // not supported }, nil } + +type ConnectionInfoEvent struct { + Event string `json:"event"` + Channel Channel `json:"channel"` + ConnCount string `json:"connCount"` + ConnId string `json:"connId"` +} diff --git a/pkg/exchange/okex/parse_test.go b/pkg/exchange/okex/parse_test.go index 6877691ecb..a9bfb15dbf 100644 --- a/pkg/exchange/okex/parse_test.go +++ b/pkg/exchange/okex/parse_test.go @@ -849,6 +849,46 @@ func TestWebSocketEvent_IsValid(t *testing.T) { assert.ErrorContains(t, opEvent.IsValid(), "unexpected event type") }) + + t.Run("conn count info", func(t *testing.T) { + input := `{ + "event":"channel-conn-count", + "channel":"orders", + "connCount": "2", + "connId":"abcd1234" +}` + res, err := parseWebSocketEvent([]byte(input)) + assert.NoError(t, err) + opEvent, ok := res.(*WebSocketEvent) + assert.True(t, ok) + assert.Equal(t, WebSocketEvent{ + Event: "channel-conn-count", + Channel: "orders", + ConnCount: "2", + }, *opEvent) + + assert.NoError(t, opEvent.IsValid()) + }) + + t.Run("conn count error", func(t *testing.T) { + input := `{ + "event": "channel-conn-count-error", + "channel": "orders", + "connCount": "20", + "connId":"a4d3ae55" +}` + res, err := parseWebSocketEvent([]byte(input)) + assert.NoError(t, err) + opEvent, ok := res.(*WebSocketEvent) + assert.True(t, ok) + assert.Equal(t, WebSocketEvent{ + Event: "channel-conn-count-error", + Channel: "orders", + ConnCount: "20", + }, *opEvent) + + assert.ErrorContains(t, opEvent.IsValid(), "rate limit") + }) } func TestOrderTradeEvent(t *testing.T) { From bafa5a47837ad3553ddd7ea760f7e1a12d3f7fe7 Mon Sep 17 00:00:00 2001 From: edwin Date: Mon, 3 Jun 2024 17:25:07 +0800 Subject: [PATCH 2/2] pkg/exchange: add rate limit comment --- pkg/exchange/okex/stream.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index 106d594541..96265e06e5 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -196,6 +196,9 @@ func (s *Stream) subscribePrivateChannels(next func()) func() { {Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)}, } + // https://www.okx.com/docs-v5/zh/#overview-websocket-connect + // **NOTICE** 2024/06/03 Since the number of channels we are currently subscribed to is far less + // than the rate limit of 20, rate limiting is not supported for now. log.Infof("subscribing private channels: %+v", subs) err := s.Conn.WriteJSON(WebsocketOp{ Op: "subscribe",