From 6ac86cdd5bdf794f867e234a9431655eb968470e Mon Sep 17 00:00:00 2001 From: Samuel Reid <43227667+cranktakular@users.noreply.github.com> Date: Wed, 31 Jan 2024 16:48:07 +1100 Subject: [PATCH] CB websocket tidying up --- exchanges/coinbasepro/coinbasepro.go | 51 +--- exchanges/coinbasepro/coinbasepro_test.go | 129 +++++---- exchanges/coinbasepro/coinbasepro_types.go | 14 +- .../coinbasepro/coinbasepro_websocket.go | 273 +++++++++++++----- exchanges/coinbasepro/coinbasepro_wrapper.go | 8 +- exchanges/coinbasepro/ratelimit.go | 2 +- 6 files changed, 291 insertions(+), 186 deletions(-) diff --git a/exchanges/coinbasepro/coinbasepro.go b/exchanges/coinbasepro/coinbasepro.go index 7d85fe14468..1db8939f6ab 100644 --- a/exchanges/coinbasepro/coinbasepro.go +++ b/exchanges/coinbasepro/coinbasepro.go @@ -90,6 +90,7 @@ const ( errUnknownEndpointLimit = "unknown endpoint limit %v" errUnknownL2DataType = "unknown l2update data type %v" errUnknownSide = "unknown side %v" + warnSequenceIssue = "Out of order sequence number. Received %v, expected %v" ) var ( @@ -119,8 +120,8 @@ var ( errNameEmpty = errors.New("name cannot be empty") errPortfolioIDEmpty = errors.New("portfolio id cannot be empty") errFeeTypeNotSupported = errors.New("fee type not supported") - errNoEventsWS = errors.New("no events returned from websocket") errCantDecodePrivKey = errors.New("cannot decode private key") + errNoWalletForCurrency = errors.New("no wallet found for currency, address creation impossible") ) // GetAllAccounts returns information on all trading accounts associated with the API key @@ -474,7 +475,7 @@ func (c *CoinbasePro) GetAllOrders(ctx context.Context, productID, userNativeCur pathParams, nil, true, &resp, nil) } -// GetFills returns information of recent fills on the specified profile +// GetFills returns information of recent fills on the specified order func (c *CoinbasePro) GetFills(ctx context.Context, orderID, productID, cursor string, startDate, endDate time.Time, limit uint16) (FillResponse, error) { var resp FillResponse var params Params @@ -503,14 +504,14 @@ func (c *CoinbasePro) GetFills(ctx context.Context, orderID, productID, cursor s } // GetOrderByID returns a single order by order id. -func (c *CoinbasePro) GetOrderByID(ctx context.Context, orderID, clientID, userNativeCurrency string) (*GetOrderResponse, error) { +func (c *CoinbasePro) GetOrderByID(ctx context.Context, orderID, clientOID, userNativeCurrency string) (*GetOrderResponse, error) { if orderID == "" { return nil, errOrderIDEmpty } var resp GetOrderResponse var params Params params.urlVals = url.Values{} - params.urlVals.Set("client_order_id", clientID) + params.urlVals.Set("client_order_id", clientOID) params.urlVals.Set("user_native_currency", userNativeCurrency) path := fmt.Sprintf("%s%s/%s/%s", coinbaseV3, coinbaseOrders, coinbaseHistorical, orderID) @@ -854,20 +855,6 @@ func (c *CoinbasePro) UpdateUser(ctx context.Context, name, timeZone, nativeCurr coinbaseV2+coinbaseUser, "", req, false, &resp, nil) } -// CreateWallet creates a new wallet for the specified currency -func (c *CoinbasePro) CreateWallet(ctx context.Context, currency string) (*GenWalletResponse, error) { - if currency == "" { - return nil, errCurrencyEmpty - } - - path := fmt.Sprintf("%s%s/%s", coinbaseV2, coinbaseAccounts, currency) - - var resp *GenWalletResponse - - return resp, c.SendAuthenticatedHTTPRequest(ctx, exchange.RestSpot, http.MethodGet, - path, "", nil, false, &resp, nil) -} - // GetAllWallets lists all accounts associated with the API key func (c *CoinbasePro) GetAllWallets(ctx context.Context, pag PaginationInp) (GetAllWalletsResponse, error) { var resp GetAllWalletsResponse @@ -904,34 +891,6 @@ func (c *CoinbasePro) GetWalletByID(ctx context.Context, walletID, currency stri path, "", nil, false, &resp, nil) } -// UpdateWalletName updates the name of a wallet -func (c *CoinbasePro) UpdateWalletName(ctx context.Context, walletID, newName string) (*GenWalletResponse, error) { - if walletID == "" { - return nil, errWalletIDEmpty - } - - path := fmt.Sprintf("%s%s/%s", coinbaseV2, coinbaseAccounts, walletID) - - req := map[string]interface{}{"name": newName} - - var resp *GenWalletResponse - - return resp, c.SendAuthenticatedHTTPRequest(ctx, exchange.RestSpot, http.MethodPut, - path, "", req, false, &resp, nil) -} - -// DeleteWallet deletes a wallet -func (c *CoinbasePro) DeleteWallet(ctx context.Context, walletID string) error { - if walletID == "" { - return errWalletIDEmpty - } - - path := fmt.Sprintf("%s%s/%s", coinbaseV2, coinbaseAccounts, walletID) - - return c.SendAuthenticatedHTTPRequest(ctx, exchange.RestSpot, http.MethodDelete, path, "", nil, - false, nil, nil) -} - // CreateAddress generates a crypto address for depositing to the specified wallet func (c *CoinbasePro) CreateAddress(ctx context.Context, walletID, name string) (*GenAddrResponse, error) { if walletID == "" { diff --git a/exchanges/coinbasepro/coinbasepro_test.go b/exchanges/coinbasepro/coinbasepro_test.go index be16378ca9e..96eeaf657fd 100644 --- a/exchanges/coinbasepro/coinbasepro_test.go +++ b/exchanges/coinbasepro/coinbasepro_test.go @@ -13,6 +13,7 @@ import ( "testing" "time" + "github.com/buger/jsonparser" "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" "github.com/thrasher-corp/gocryptotrader/common" @@ -25,6 +26,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/kline" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues" + "github.com/thrasher-corp/gocryptotrader/exchanges/stream" gctlog "github.com/thrasher-corp/gocryptotrader/log" "github.com/thrasher-corp/gocryptotrader/portfolio/withdraw" ) @@ -76,6 +78,8 @@ const ( errExpectedFeeRange = "expected fee range of %v and %v, received %v" errJsonNumberIntoString = "json: cannot unmarshal number into Go value of type string" errParseIntValueOutOfRange = `strconv.ParseInt: parsing "922337203685477580700": value out of range` + errParseUintInvalidSyntax = `strconv.ParseUint: parsing "l": invalid syntax` + errJsonInvalidCharacter = `invalid character ':' after array element` expectedTimestamp = "1970-01-01 00:20:34 +0000 UTC" @@ -822,19 +826,6 @@ func TestUpdateUser(t *testing.T) { assert.NotEmpty(t, resp, errExpectedNonEmpty) } -func TestCreateWallet(t *testing.T) { - _, err := c.CreateWallet(context.Background(), "") - if !errors.Is(err, errCurrencyEmpty) { - t.Errorf(errExpectMismatch, err, errCurrencyEmpty) - } - sharedtestvalues.SkipTestIfCredentialsUnset(t, c, canManipulateRealOrders) - resp, err := c.CreateWallet(context.Background(), testCrypto.String()) - if err != nil { - t.Error(err) - } - assert.NotEmpty(t, resp, errExpectedNonEmpty) -} - func TestGetAllWallets(t *testing.T) { sharedtestvalues.SkipTestIfCredentialsUnset(t, c) pagIn := PaginationInp{Limit: 2} @@ -872,46 +863,6 @@ func TestGetWalletByID(t *testing.T) { assert.NotEmpty(t, resp, errExpectedNonEmpty) } -func TestUpdateWalletName(t *testing.T) { - _, err := c.UpdateWalletName(context.Background(), "", "") - if !errors.Is(err, errWalletIDEmpty) { - t.Errorf(errExpectMismatch, err, errWalletIDEmpty) - } - sharedtestvalues.SkipTestIfCredentialsUnset(t, c, canManipulateRealOrders) - wID, err := c.GetAllWallets(context.Background(), PaginationInp{}) - if err != nil { - t.Error(err) - } - if len(wID.Data) == 0 { - t.Fatal(errExpectedNonEmpty) - } - resp, err := c.UpdateWalletName(context.Background(), wID.Data[len(wID.Data)-1].ID, "Wallet Tested by GCT") - if err != nil { - t.Error(err) - } - assert.NotEmpty(t, resp, errExpectedNonEmpty) -} - -func TestDeleteWallet(t *testing.T) { - err := c.DeleteWallet(context.Background(), "") - if !errors.Is(err, errWalletIDEmpty) { - t.Errorf(errExpectMismatch, err, errWalletIDEmpty) - } - sharedtestvalues.SkipTestIfCredentialsUnset(t, c, canManipulateRealOrders) - wID, err := c.CreateWallet(context.Background(), testCrypto.String()) - if err != nil { - t.Error(err) - } - // As of now, it seems like this next step always fails. DeleteWallet only lets you delete non-primary - // non-fiat wallets, but the only non-primary wallet is fiat. Trying to create a secondary wallet for - // any cryptocurrency using CreateWallet simply returns the details of the existing primary wallet. - t.Skip("endpoint bugged on their end, skipping") - err = c.DeleteWallet(context.Background(), wID.Data.ID) - if err != nil { - t.Error(err) - } -} - func TestCreateAddress(t *testing.T) { _, err := c.CreateAddress(context.Background(), "", "") if !errors.Is(err, errWalletIDEmpty) { @@ -1765,8 +1716,12 @@ func TestGetOrderInfo(t *testing.T) { } func TestGetDepositAddress(t *testing.T) { - sharedtestvalues.SkipTestIfCredentialsUnset(t, c, canManipulateRealOrders) - _, err := c.GetDepositAddress(context.Background(), currency.BTC, "", "") + sharedtestvalues.SkipTestIfCredentialsUnset(t, c) + _, err := c.GetDepositAddress(context.Background(), currency.NewCode("fake currency that doesn't exist"), "", "") + if !errors.Is(err, errNoWalletForCurrency) { + t.Errorf(errExpectMismatch, err, errNoWalletForCurrency) + } + _, err = c.GetDepositAddress(context.Background(), testCrypto, "", "") if err != nil { t.Error(err) } @@ -2114,8 +2069,70 @@ func TestStringToFloatPtr(t *testing.T) { } } -func TestWsSomethingOrOther(t *testing.T) { +func TestWsConnect(t *testing.T) { + c.Websocket.Disable() + err := c.WsConnect() + if err.Error() != stream.WebsocketNotEnabled { + t.Errorf(errExpectMismatch, err, stream.WebsocketNotEnabled) + } + c.Websocket.Enable() + err = c.WsConnect() + if err != nil { + t.Error(err) + } +} +func TestWsHandleData(t *testing.T) { + c.Websocket.DataHandler = make(chan interface{}, 4) + _, err := c.wsHandleData(nil, 0) + if !errors.Is(err, jsonparser.KeyPathNotFoundError) { + t.Errorf(errExpectMismatch, err, jsonparser.KeyPathNotFoundError) + } + mockJson := []byte(`{"sequence_num": "l"}`) + _, err = c.wsHandleData(mockJson, 0) + if err.Error() != errParseUintInvalidSyntax { + t.Errorf(errExpectMismatch, err, errParseUintInvalidSyntax) + } + mockJson = []byte(`{"sequence_num": 1, /\\/"""}`) + _, err = c.wsHandleData(mockJson, 0) + if !errors.Is(err, jsonparser.KeyPathNotFoundError) { + t.Errorf(errExpectMismatch, err, jsonparser.KeyPathNotFoundError) + } + mockJson = []byte(`{"sequence_num": 0, "channel": "subscriptions"}`) + _, err = c.wsHandleData(mockJson, 0) + if err != nil { + t.Error(err) + } + mockJson = []byte(`{"sequence_num": 0, "channel": "", "events":}`) + _, err = c.wsHandleData(mockJson, 0) + if !errors.Is(err, jsonparser.UnknownValueTypeError) { + t.Errorf(errExpectMismatch, err, jsonparser.UnknownValueTypeError) + } + mockJson = []byte(`{"sequence_num": 0, "channel": "status", "events": ["type": 1234]}`) + _, err = c.wsHandleData(mockJson, 0) + if err.Error() != errJsonInvalidCharacter { + t.Errorf(errExpectMismatch, err, errJsonInvalidCharacter) + } + mockJson = []byte(`{"sequence_num": 0, "channel": "status", "events": [{"type": "moo"}]}`) + _, err = c.wsHandleData(mockJson, 0) + if err != nil { + t.Error(err) + } + mockJson = []byte(`{"sequence_num": 0, "channel": "error", "events": [{"type": "moo"}]}`) + _, err = c.wsHandleData(mockJson, 0) + if err != nil { + t.Error(err) + } + mockJson = []byte(`{"sequence_num": 0, "channel": "ticker", "events": ["type": ""}]}`) + _, err = c.wsHandleData(mockJson, 0) + if err.Error() != errJsonInvalidCharacter { + t.Errorf(errExpectMismatch, err, errJsonInvalidCharacter) + } + mockJson = []byte(`{"sequence_num": 0, "channel": "ticker", "events": [{"type": "moo", "tickers": [{"price": "1.1"}]}]}`) + _, err = c.wsHandleData(mockJson, 0) + if err != nil { + t.Error(err) + } } func skipTestIfLowOnFunds(t *testing.T) { diff --git a/exchanges/coinbasepro/coinbasepro_types.go b/exchanges/coinbasepro/coinbasepro_types.go index 657666eacb7..f6dbc33ae83 100644 --- a/exchanges/coinbasepro/coinbasepro_types.go +++ b/exchanges/coinbasepro/coinbasepro_types.go @@ -1251,13 +1251,13 @@ type WebsocketOrderDataHolder struct { // Changes [][3]string `json:"changes"` // } -type wsGen struct { - Channel string `json:"channel"` - ClientID string `json:"client_id"` - Timestamp time.Time `json:"timestamp"` - SequenceNum uint64 `json:"sequence_num"` - Events []interface{} `json:"events"` -} +// type wsGen struct { +// Channel string `json:"channel"` +// ClientID string `json:"client_id"` +// Timestamp time.Time `json:"timestamp"` +// SequenceNum uint64 `json:"sequence_num"` +// Events []interface{} `json:"events"` +// } // type wsStatus struct { // Currencies []struct { diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index 05c84f5ac89..e91750883fa 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -11,8 +11,10 @@ import ( "encoding/pem" "fmt" "net/http" + _ "net/http/pprof" "strconv" "strings" + "sync" "time" "github.com/buger/jsonparser" @@ -24,7 +26,6 @@ import ( "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" - "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" "github.com/thrasher-corp/gocryptotrader/exchanges/trade" @@ -54,29 +55,91 @@ func (c *CoinbasePro) WsConnect() error { func (c *CoinbasePro) wsReadData() { defer c.Websocket.Wg.Done() + var seqCount uint64 + for { resp := c.Websocket.Conn.ReadMessage() if resp.Raw == nil { return } - err := c.wsHandleData(resp.Raw) + warn, err := c.wsHandleData(resp.Raw, seqCount) if err != nil { c.Websocket.DataHandler <- err } + if warn != "" { + c.Websocket.DataHandler <- warn + tempStr := strings.SplitN(warn, "Out of order sequence number. Received ", 2)[1] + tempStr = strings.SplitN(tempStr, ", expected ", 2)[0] + tempNum, err := strconv.ParseUint(tempStr, 10, 64) + if err != nil { + c.Websocket.DataHandler <- err + } else { + seqCount = tempNum + } + } + seqCount++ } } -func (c *CoinbasePro) wsHandleData(respRaw []byte) error { - // fmt.Println("WHADDUP:", string(respRaw)) - genData := wsGen{} - err := json.Unmarshal(respRaw, &genData) - if err != nil { - return err +var meow sync.Mutex +var count int +var wcTime time.Duration +var bruh bool + +func WOW(tThen time.Time, msg []byte) { + meow.Lock() + if !bruh { + bruh = true + go func() { + for { + select { + case <-time.After(time.Second * 5): + meow.Lock() + fmt.Printf("COINBASEPRO: %v\n", count) + count = 0 + wcTime = 0 + meow.Unlock() + } + } + }() + } + this := time.Since(tThen) + if wcTime == 0 || this > wcTime { + fmt.Printf("Uh-oh, we took %s to process this message\n", this) + wcTime = this + if this > time.Millisecond*50 { + fmt.Printf("Oh jeez, I think I found the big one! %s\n", msg) + } } + count++ + meow.Unlock() +} - if genData.Channel == "subscriptions" || genData.Channel == "heartbeats" { - return nil +var alreadyDone bool + +func launchProfiling() { + if alreadyDone { + return } + alreadyDone = true + // go func() { + // http.ListenAndServe("localhost:6060", nil) + // }() + // fmt.Print("30 second pause, 1") + // time.Sleep(time.Second * 30) + // fmt.Print("5 second pause, 1") + // time.Sleep(time.Second * 5) +} + +func (c *CoinbasePro) wsHandleData(respRaw []byte, seqCount uint64) (string, error) { + // fmt.Println("WHADDUP:", string(respRaw)) + + // genData := wsGen{} + var warnString string + // err := json.Unmarshal(respRaw, &genData) + // if err != nil { + // return warnString, err + // } // fmt.Printf("=== OH NOO LOOK AT THIS DATA WE HAVE TO DEAL WITH: %s ===\n", genData.Events) @@ -91,24 +154,54 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error { // } // fmt.Printf("===== AWESOME, WE'VE GOT THE GOOD DATA: %v =====\n", specData) - if len(genData.Events) == 0 { - return errNoEventsWS + // if len(genData.Events) == 0 { + // return warnString, errNoEventsWS + // } + + seqData, _, _, err := jsonparser.Get(respRaw, "sequence_num") + if err != nil { + return warnString, err + } + + seqNum, err := strconv.ParseUint(string(seqData), 10, 64) + if err != nil { + return warnString, err + } + + if seqNum != seqCount { + warnString = fmt.Sprintf(warnSequenceIssue, seqNum, + seqCount) + } + + channelRaw, _, _, err := jsonparser.Get(respRaw, "channel") + if err != nil { + return warnString, err + } + + channel := string(channelRaw) + + tn := time.Now() + defer WOW(tn, channelRaw) + + if channel == "subscriptions" || channel == "heartbeats" { + return warnString, nil } data, _, _, err := jsonparser.Get(respRaw, "events") if err != nil { - return err + return warnString, err } // fmt.Printf("==== WEEWOO WE'VE GOT THE NASTY DATA: %s ====\n", data) - switch genData.Channel { + switch channel { case "status": wsStatus := []WebsocketProductHolder{} err = json.Unmarshal(data, &wsStatus) if err != nil { - return err + return warnString, err } + c.Websocket.DataHandler <- wsStatus case "error": c.Websocket.DataHandler <- errors.New(string(respRaw)) @@ -117,13 +210,20 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error { err = json.Unmarshal(data, &wsTicker) if err != nil { - return err + return warnString, err + } + + sliToSend := []ticker.Price{} + + timestamp, err := getTimestamp(respRaw) + if err != nil { + return warnString, err } for i := range wsTicker { for j := range wsTicker[i].Tickers { - c.Websocket.DataHandler <- &ticker.Price{ - LastUpdated: genData.Timestamp, + sliToSend = append(sliToSend, ticker.Price{ + LastUpdated: timestamp, Pair: wsTicker[i].Tickers[j].ProductID, AssetType: asset.Spot, ExchangeName: c.Name, @@ -131,22 +231,30 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error { Low: wsTicker[i].Tickers[j].Low24H, Last: wsTicker[i].Tickers[j].Price, Volume: wsTicker[i].Tickers[j].Volume24H, - } + }) } } + c.Websocket.DataHandler <- sliToSend // fmt.Printf("=== WOOT, IT WORKED ===\n") case "candles": wsCandles := []WebsocketCandleHolder{} err = json.Unmarshal(data, &wsCandles) if err != nil { - return err + return warnString, err + } + + sliToSend := []stream.KlineData{} + + timestamp, err := getTimestamp(respRaw) + if err != nil { + return warnString, err } for i := range wsCandles { for j := range wsCandles[i].Candles { - c.Websocket.DataHandler <- &stream.KlineData{ - Timestamp: genData.Timestamp, + sliToSend = append(sliToSend, stream.KlineData{ + Timestamp: timestamp, Pair: wsCandles[i].Candles[j].ProductID, AssetType: asset.Spot, Exchange: c.Name, @@ -156,21 +264,24 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error { HighPrice: wsCandles[i].Candles[j].High, LowPrice: wsCandles[i].Candles[j].Low, Volume: wsCandles[i].Candles[j].Volume, - } + }) } } + c.Websocket.DataHandler <- sliToSend // fmt.Print("=== RECEIVED AND PROCESSED ===\n") case "market_trades": wsTrades := []WebsocketMarketTradeHolder{} err = json.Unmarshal(data, &wsTrades) if err != nil { - return err + return warnString, err } + sliToSend := []trade.Data{} + for i := range wsTrades { for j := range wsTrades[i].Trades { - c.Websocket.DataHandler <- &trade.Data{ + sliToSend = append(sliToSend, trade.Data{ TID: wsTrades[i].Trades[j].TradeID, Exchange: c.Name, CurrencyPair: wsTrades[i].Trades[j].ProductID, @@ -179,29 +290,35 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error { Price: wsTrades[i].Trades[j].Price, Amount: wsTrades[i].Trades[j].Size, Timestamp: wsTrades[i].Trades[j].Time, - } + }) } } + c.Websocket.DataHandler <- sliToSend // fmt.Print("=== RECEIVED AND PROCESSED ===\n") case "l2_data": var wsL2 []WebsocketOrderbookDataHolder err := json.Unmarshal(data, &wsL2) if err != nil { - return err + return warnString, err + } + + timestamp, err := getTimestamp(respRaw) + if err != nil { + return warnString, err } for i := range wsL2 { // fmt.Printf("======== DATA THAT WE JUST HIT: %v ========\n", wsL2[i]) switch wsL2[i].Type { case "snapshot": - err = c.ProcessSnapshot(wsL2[i], genData.Timestamp) + err = c.ProcessSnapshot(wsL2[i], timestamp) case "update": - err = c.ProcessUpdate(wsL2[i], genData.Timestamp) + err = c.ProcessUpdate(wsL2[i], timestamp) default: err = errors.Errorf(errUnknownL2DataType, wsL2[i].Type) } if err != nil { - return err + return warnString, err } } @@ -209,9 +326,10 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error { var wsUser []WebsocketOrderDataHolder err := json.Unmarshal(data, &wsUser) if err != nil { - return err + return warnString, err } + sliToSend := []order.Detail{} for i := range wsUser { for j := range wsUser[i].Orders { var oType order.Type @@ -235,7 +353,7 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error { continue } - c.Websocket.DataHandler <- &order.Detail{ + sliToSend = append(sliToSend, order.Detail{ Price: wsUser[i].Orders[j].AveragePrice, Amount: wsUser[i].Orders[j].CumulativeQuantity + wsUser[i].Orders[j].LeavesQuantity, ExecutedAmount: wsUser[i].Orders[j].CumulativeQuantity, @@ -250,15 +368,16 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error { AssetType: asset.Spot, Date: wsUser[i].Orders[j].CreationTime, Pair: wsUser[i].Orders[j].ProductID, - } + }) } } + c.Websocket.DataHandler <- sliToSend default: c.Websocket.DataHandler <- stream.UnhandledMessageWarning{Message: c.Name + stream.UnhandledMessage + string(respRaw)} - return nil + return warnString, nil } - return nil + return warnString, nil } func statusToStandardStatus(stat string) (order.Status, error) { @@ -347,12 +466,13 @@ func processBidAskArray(data WebsocketOrderbookDataHolder) ([]orderbook.Item, [] func (c *CoinbasePro) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error) { var channels = []string{ "heartbeats", + // "status", "ticker", - "ticker_batch", + // "ticker_batch", "candles", - "market_trades", + // "market_trades", "level2", - "user", + // "user", } enabledCurrencies, err := c.GetEnabledPairs(asset.Spot) if err != nil { @@ -376,7 +496,7 @@ func (c *CoinbasePro) GenerateDefaultSubscriptions() ([]stream.ChannelSubscripti return subscriptions, nil } -func (c *CoinbasePro) sendRequest(msgType, channel string, productID currency.Pair, rLim RateLimit) error { +func (c *CoinbasePro) sendRequest(msgType, channel string, productIDs currency.Pairs) error { creds, err := c.GetCredentials(context.Background()) if err != nil { return err @@ -384,7 +504,7 @@ func (c *CoinbasePro) sendRequest(msgType, channel string, productID currency.Pa n := strconv.FormatInt(time.Now().Unix(), 10) - message := n + channel + productID.String() + message := n + channel + productIDs.Join() hmac, err := crypto.GetHMAC(crypto.HashSHA256, []byte(message), @@ -400,7 +520,7 @@ func (c *CoinbasePro) sendRequest(msgType, channel string, productID currency.Pa req := WebsocketSubscribe{ Type: msgType, - ProductIDs: []string{productID.String()}, + ProductIDs: productIDs.Strings(), Channel: channel, Signature: hex.EncodeToString(hmac), // JWT: jwt, @@ -408,14 +528,15 @@ func (c *CoinbasePro) sendRequest(msgType, channel string, productID currency.Pa Timestamp: n, } - reqMarshal, _ := json.Marshal(req) + // reqMarshal, _ := json.Marshal(req) - fmt.Print(string(reqMarshal)) + // fmt.Print(string(reqMarshal)) - err = rLim.Limit(context.Background(), WSRate) + // err = rLim.Limit(context.Background(), WSRate) if err != nil { return err } + // data, err := c.Websocket.Conn.SendMessageReturnResponse(nil, req) err = c.Websocket.Conn.SendJSONMessage(req) if err != nil { return err @@ -426,7 +547,9 @@ func (c *CoinbasePro) sendRequest(msgType, channel string, productID currency.Pa // Subscribe sends a websocket message to receive data from the channel func (c *CoinbasePro) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error { - fmt.Printf("SUBSCRIBE: %v\n", channelsToSubscribe) + launchProfiling() + + // fmt.Printf("SUBSCRIBE: %v\n", channelsToSubscribe) // var creds *account.Credentials // var err error // if c.IsWebsocketAuthenticationSupported() { @@ -470,15 +593,25 @@ func (c *CoinbasePro) Subscribe(channelsToSubscribe []stream.ChannelSubscription // } // } - var rLim RateLimit + // var ( + // rLim RateLimit + // ) + + chanKeys := make(map[string]currency.Pairs) - rLim.RateLimWS = request.NewRateLimit(coinbaseWSInterval, coinbaseWSRate) + // rLim.RateLimWS = request.NewRateLimit(coinbaseWSInterval, coinbaseWSRate) for i := range channelsToSubscribe { - err := c.sendRequest("subscribe", channelsToSubscribe[i].Channel, channelsToSubscribe[i].Currency, rLim) + chanKeys[channelsToSubscribe[i].Channel] = + chanKeys[channelsToSubscribe[i].Channel].Add(channelsToSubscribe[i].Currency) + + } + for s := range chanKeys { + err := c.sendRequest("subscribe", s, chanKeys[s]) if err != nil { return err } + time.Sleep(time.Millisecond * 10) } c.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe...) @@ -487,34 +620,20 @@ func (c *CoinbasePro) Subscribe(channelsToSubscribe []stream.ChannelSubscription // Unsubscribe sends a websocket message to stop receiving data from the channel func (c *CoinbasePro) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscription) error { - unsubscribe := WebsocketSubscribe{ - Type: "unsubscribe", - } + chanKeys := make(map[string]currency.Pairs) for i := range channelsToUnsubscribe { - p := channelsToUnsubscribe[i].Currency.String() - if !common.StringDataCompare(unsubscribe.ProductIDs, p) && p != "" { - unsubscribe.ProductIDs = append(unsubscribe.ProductIDs, p) - } - - if unsubscribe.Channel == channelsToUnsubscribe[i].Channel { - unsubscribe.Channel = channelsToUnsubscribe[i].Channel - - } - - } - var rLim RateLimit - - rLim.RateLimWS = request.NewRateLimit(coinbaseWSInterval, coinbaseWSRate) + chanKeys[channelsToUnsubscribe[i].Channel] = + chanKeys[channelsToUnsubscribe[i].Channel].Add(channelsToUnsubscribe[i].Currency) - err := rLim.Limit(context.Background(), WSRate) - if err != nil { - return err } - err = c.Websocket.Conn.SendJSONMessage(unsubscribe) - if err != nil { - return err + for s := range chanKeys { + err := c.sendRequest("unsubscribe", s, chanKeys[s]) + if err != nil { + return err + } + time.Sleep(time.Millisecond * 10) } c.Websocket.RemoveSubscriptions(channelsToUnsubscribe...) @@ -580,6 +699,18 @@ func (c *CoinbasePro) GetJWT(ctx context.Context, uri string) (string, error) { return headEncode + "." + bodyEncode + "." + sigEncode, nil } +func getTimestamp(rawData []byte) (time.Time, error) { + data, _, _, err := jsonparser.Get(rawData, "timestamp") + if err != nil { + return time.Time{}, err + } + timestamp, err := time.Parse(time.RFC3339, string(data)) + if err != nil { + return time.Time{}, err + } + return timestamp, nil +} + // Base64URLEncode is a helper function that does some tweaks to standard Base64 encoding, in a way // which JWT requires func Base64URLEncode(b []byte) string { diff --git a/exchanges/coinbasepro/coinbasepro_wrapper.go b/exchanges/coinbasepro/coinbasepro_wrapper.go index f500edf689a..4baa635d376 100644 --- a/exchanges/coinbasepro/coinbasepro_wrapper.go +++ b/exchanges/coinbasepro/coinbasepro_wrapper.go @@ -73,6 +73,8 @@ func (c *CoinbasePro) SetDefaults() { log.Errorln(log.ExchangeSys, err) } + // c.SetKeyChain(account.Primary, account.Secondary) + c.Features = exchange.Features{ Supports: exchange.FeaturesSupported{ REST: true, @@ -828,11 +830,7 @@ func (c *CoinbasePro) GetDepositAddress(ctx context.Context, cryptocurrency curr } } if targetWalletID == "" { - createWalResp, err := c.CreateWallet(ctx, cryptocurrency.String()) - if err != nil { - return nil, err - } - targetWalletID = createWalResp.Data.ID + return nil, errNoWalletForCurrency } resp, err := c.CreateAddress(ctx, targetWalletID, "") if err != nil { diff --git a/exchanges/coinbasepro/ratelimit.go b/exchanges/coinbasepro/ratelimit.go index 8123e6de3ef..916a6337d84 100644 --- a/exchanges/coinbasepro/ratelimit.go +++ b/exchanges/coinbasepro/ratelimit.go @@ -18,7 +18,7 @@ const ( coinbaseV2Rate = 10000 coinbaseWSInterval = time.Second - coinbaseWSRate = 75 + coinbaseWSRate = 750 ) const (