From 343052b81632af4b39b61cea6a56267d7a9ff48b Mon Sep 17 00:00:00 2001 From: Samuel Reid <43227667+cranktakular@users.noreply.github.com> Date: Mon, 22 Apr 2024 16:25:14 +1000 Subject: [PATCH] Slight improvements --- exchanges/coinbasepro/coinbasepro.go | 2 +- exchanges/coinbasepro/coinbasepro_test.go | 211 ++++++++++-------- .../coinbasepro/coinbasepro_websocket.go | 44 ++-- exchanges/coinbasepro/ratelimit.go | 34 +-- exchanges/exchange.go | 5 +- exchanges/stream/websocket.go | 4 +- 6 files changed, 173 insertions(+), 127 deletions(-) diff --git a/exchanges/coinbasepro/coinbasepro.go b/exchanges/coinbasepro/coinbasepro.go index 1fb1db3b5d5..3b50ac8f765 100644 --- a/exchanges/coinbasepro/coinbasepro.go +++ b/exchanges/coinbasepro/coinbasepro.go @@ -158,7 +158,7 @@ var ( errPairEmpty = errors.New("pair cannot be empty") errStringConvert = errors.New("unable to convert into string value") errFloatConvert = errors.New("unable to convert into float64 value") - errConvertGen = errors.New("unable to convert value") + errNoCredsUser = errors.New("no credentials when attempting to subscribe to authenticated channel user") ) // GetAllAccounts returns information on all trading accounts associated with the API key diff --git a/exchanges/coinbasepro/coinbasepro_test.go b/exchanges/coinbasepro/coinbasepro_test.go index 39a91d2064d..50bbbf2d480 100644 --- a/exchanges/coinbasepro/coinbasepro_test.go +++ b/exchanges/coinbasepro/coinbasepro_test.go @@ -15,6 +15,7 @@ import ( "github.com/gofrs/uuid" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/config" "github.com/thrasher-corp/gocryptotrader/currency" @@ -49,6 +50,8 @@ var ( // Constants used within tests const ( testAddress = "fake address" + testAmount = 1e-08 + testPrice = 1e+09 skipPayMethodNotFound = "no payment methods found, skipping" skipInsufSuitableAccs = "insufficient suitable accounts for test, skipping" @@ -63,7 +66,7 @@ const ( errExpectedNonEmpty = "expected non-empty response" errIDNotSet = "ID not set" errx7f = "setting proxy address error parse \"\\x7f\": net/url: invalid control character in URL" - errPortfolioNameDuplicate = `CoinbasePro unsuccessful HTTP status code: 500 raw response: {"error":"INTERNAL","error_details":"the requested portfolio name already exists","message":"the requested portfolio name already exists"}, authenticated request failed` + errPortfolioNameDuplicate = `CoinbasePro unsuccessful HTTP status code: 409 raw response: {"error":"CONFLICT","error_details":"the requested portfolio name already exists","message":"the requested portfolio name already exists"}, authenticated request failed` errPortTransferInsufFunds = `CoinbasePro unsuccessful HTTP status code: 429 raw response: {"error":"unknown","error_details":"[PORTFOLIO_ERROR_CODE_INSUFFICIENT_FUNDS] insufficient funds in source account","message":"[PORTFOLIO_ERROR_CODE_INSUFFICIENT_FUNDS] insufficient funds in source account"}, authenticated request failed` errInvalidProductID = `CoinbasePro unsuccessful HTTP status code: 400 raw response: {"error":"INVALID_ARGUMENT","error_details":"valid product_id is required","message":"valid product_id is required"}, authenticated request failed` errNoEndpointPathEdgeCase3 = "no endpoint path found for the given key: EdgeCase3URL" @@ -71,19 +74,16 @@ const ( errUnrecognisedOrderType = `'' unrecognised order type` expectedTimestamp = "1970-01-01 00:20:34 +0000 UTC" - - testAmount = 1e-08 - testPrice = 1e+09 ) -func TestSetup(t *testing.T) { +func TestSetup2(t *testing.T) { cfg, err := c.GetStandardConfig() assert.NoError(t, err) cfg.API.AuthenticatedSupport = true cfg.API.Credentials.Key = apiKey cfg.API.Credentials.Secret = apiSecret - cfg.Enabled = false - cfg.Enabled = true + // cfg.Enabled = false + // cfg.Enabled = true cfg.ProxyAddress = string(rune(0x7f)) err = c.Setup(cfg) if err.Error() != errx7f { @@ -101,25 +101,9 @@ func TestMain(m *testing.M) { log.Fatal("failed to set sandbox endpoint", err) } } - cfg := config.GetConfig() - err := cfg.LoadConfig("../../testdata/configtest.json", true) - if err != nil { - log.Fatal("load config error", err) - } - gdxConfig, err := cfg.GetExchangeConfig("CoinbasePro") - if err != nil { - log.Fatal("init error") - } - if apiKey != "" { - gdxConfig.API.Credentials.Key = apiKey - gdxConfig.API.Credentials.Secret = apiSecret - gdxConfig.API.AuthenticatedSupport = true - gdxConfig.API.AuthenticatedWebsocketSupport = true - } - c.Websocket = sharedtestvalues.NewTestWebsocket() - err = c.Setup(gdxConfig) + err := exchangeBaseHelper(c) if err != nil { - log.Fatal("CoinbasePro setup error", err) + log.Fatal(err) } if apiKey != "" { c.GetBase().API.AuthenticatedSupport = true @@ -132,6 +116,33 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +func TestSetup(t *testing.T) { + cfg, err := c.GetStandardConfig() + assert.NoError(t, err) + exch := &CoinbasePro{} + exch.SetDefaults() + err = exchangeBaseHelper(exch) + require.NoError(t, err) + cfg.ProxyAddress = string(rune(0x7f)) + err = exch.Setup(cfg) + assert.ErrorIs(t, err, exchange.ErrSettingProxyAddress) +} + +func TestWsConnect(t *testing.T) { + exch := &CoinbasePro{} + exch.Websocket = sharedtestvalues.NewTestWebsocket() + sharedtestvalues.SkipTestIfCredentialsUnset(t, c) + err := exch.Websocket.Disable() + assert.ErrorIs(t, err, stream.ErrAlreadyDisabled) + err = exch.WsConnect() + assert.ErrorIs(t, err, stream.ErrWebsocketNotEnabled) + exch.SetDefaults() + err = exchangeBaseHelper(exch) + require.NoError(t, err) + err = exch.Websocket.Enable() + assert.NoError(t, err) +} + func TestGetAllAccounts(t *testing.T) { t.Parallel() sharedtestvalues.SkipTestIfCredentialsUnset(t, c) @@ -475,29 +486,6 @@ func TestGetFuturesPositionByID(t *testing.T) { assert.NoError(t, err) } -func TestScheduleFuturesSweep(t *testing.T) { - t.Parallel() - sharedtestvalues.SkipTestIfCredentialsUnset(t, c, canManipulateRealOrders) - curSweeps, err := c.ListFuturesSweeps(context.Background()) - assert.NoError(t, err) - preCancel := false - if len(curSweeps) > 0 { - for i := range curSweeps { - if curSweeps[i].Status == "PENDING" { - preCancel = true - } - } - } - if preCancel { - _, err = c.CancelPendingFuturesSweep(context.Background()) - if err != nil { - t.Error(err) - } - } - _, err = c.ScheduleFuturesSweep(context.Background(), 0.001337) - assert.NoError(t, err) -} - func TestListFuturesSweeps(t *testing.T) { t.Parallel() sharedtestvalues.SkipTestIfCredentialsUnset(t, c) @@ -505,29 +493,6 @@ func TestListFuturesSweeps(t *testing.T) { assert.NoError(t, err) } -func TestCancelPendingFuturesSweep(t *testing.T) { - t.Parallel() - sharedtestvalues.SkipTestIfCredentialsUnset(t, c, canManipulateRealOrders) - curSweeps, err := c.ListFuturesSweeps(context.Background()) - assert.NoError(t, err) - partialSkip := false - if len(curSweeps) > 0 { - for i := range curSweeps { - if curSweeps[i].Status == "PENDING" { - partialSkip = true - } - } - } - if !partialSkip { - _, err = c.ScheduleFuturesSweep(context.Background(), 0.001337) - if err != nil { - t.Error(err) - } - } - _, err = c.CancelPendingFuturesSweep(context.Background()) - assert.NoError(t, err) -} - func TestAllocatePortfolio(t *testing.T) { t.Parallel() err := c.AllocatePortfolio(context.Background(), "", "", "", 0) @@ -1573,9 +1538,64 @@ func TestFormatExchangeKlineInterval(t *testing.T) { } } +func TestStringToFloatPtr(t *testing.T) { + t.Parallel() + err := stringToFloatPtr(nil, "") + assert.ErrorIs(t, err, errPointerNil) + var fl float64 + err = stringToFloatPtr(&fl, "") + assert.NoError(t, err) + err = stringToFloatPtr(&fl, "1.1") + assert.NoError(t, err) +} + +func TestScheduleFuturesSweep(t *testing.T) { + sharedtestvalues.SkipTestIfCredentialsUnset(t, c, canManipulateRealOrders) + curSweeps, err := c.ListFuturesSweeps(context.Background()) + assert.NoError(t, err) + preCancel := false + if len(curSweeps) > 0 { + for i := range curSweeps { + if curSweeps[i].Status == "PENDING" { + preCancel = true + } + } + } + if preCancel { + _, err = c.CancelPendingFuturesSweep(context.Background()) + if err != nil { + t.Error(err) + } + } + _, err = c.ScheduleFuturesSweep(context.Background(), 0.001337) + assert.NoError(t, err) +} + +func TestCancelPendingFuturesSweep(t *testing.T) { + sharedtestvalues.SkipTestIfCredentialsUnset(t, c, canManipulateRealOrders) + curSweeps, err := c.ListFuturesSweeps(context.Background()) + assert.NoError(t, err) + partialSkip := false + if len(curSweeps) > 0 { + for i := range curSweeps { + if curSweeps[i].Status == "PENDING" { + partialSkip = true + } + } + } + if !partialSkip { + _, err = c.ScheduleFuturesSweep(context.Background(), 0.001337) + if err != nil { + t.Error(err) + } + } + _, err = c.CancelPendingFuturesSweep(context.Background()) + assert.NoError(t, err) +} + // TestWsAuth dials websocket, sends login request. func TestWsAuth(t *testing.T) { - if !c.Websocket.IsEnabled() && !c.API.AuthenticatedWebsocketSupport || !sharedtestvalues.AreAPICredentialsSet(c) { + if c.Websocket.IsEnabled() && !c.API.AuthenticatedWebsocketSupport || !sharedtestvalues.AreAPICredentialsSet(c) { t.Skip(stream.ErrWebsocketNotEnabled.Error()) } var dialer websocket.Dialer @@ -1597,7 +1617,7 @@ func TestWsAuth(t *testing.T) { timer := time.NewTimer(sharedtestvalues.WebsocketResponseDefaultTimeout) select { case badResponse := <-c.Websocket.DataHandler: - t.Error(badResponse) + assert.IsType(t, []order.Detail{}, badResponse) case <-timer.C: } timer.Stop() @@ -1625,27 +1645,6 @@ func TestStatusToStandardStatus(t *testing.T) { } } -func TestStringToFloatPtr(t *testing.T) { - t.Parallel() - err := stringToFloatPtr(nil, "") - assert.ErrorIs(t, err, errPointerNil) - var fl float64 - err = stringToFloatPtr(&fl, "") - assert.NoError(t, err) - err = stringToFloatPtr(&fl, "1.1") - assert.NoError(t, err) -} - -func TestWsConnect(t *testing.T) { - sharedtestvalues.SkipTestIfCredentialsUnset(t, c) - err := c.Websocket.Disable() - assert.ErrorIs(t, err, stream.ErrAlreadyDisabled) - err = c.WsConnect() - assert.ErrorIs(t, err, stream.ErrWebsocketNotEnabled) - err = c.Websocket.Enable() - assert.NoError(t, err) -} - func TestWsHandleData(t *testing.T) { go func() { for range c.Websocket.DataHandler { @@ -1788,6 +1787,30 @@ func TestGetJWT(t *testing.T) { } } +func exchangeBaseHelper(c *CoinbasePro) error { + cfg := config.GetConfig() + err := cfg.LoadConfig("../../testdata/configtest.json", true) + if err != nil { + return err + } + gdxConfig, err := cfg.GetExchangeConfig("CoinbasePro") + if err != nil { + return err + } + if apiKey != "" { + gdxConfig.API.Credentials.Key = apiKey + gdxConfig.API.Credentials.Secret = apiSecret + gdxConfig.API.AuthenticatedSupport = true + gdxConfig.API.AuthenticatedWebsocketSupport = true + } + c.Websocket = sharedtestvalues.NewTestWebsocket() + err = c.Setup(gdxConfig) + if err != nil { + return err + } + return nil +} + func skipTestIfLowOnFunds(t *testing.T) { t.Helper() accounts, err := c.GetAllAccounts(context.Background(), 250, "") diff --git a/exchanges/coinbasepro/coinbasepro_websocket.go b/exchanges/coinbasepro/coinbasepro_websocket.go index 4254aeccd1d..10c99552482 100644 --- a/exchanges/coinbasepro/coinbasepro_websocket.go +++ b/exchanges/coinbasepro/coinbasepro_websocket.go @@ -21,6 +21,7 @@ import ( "github.com/thrasher-corp/gocryptotrader/common" "github.com/thrasher-corp/gocryptotrader/common/crypto" "github.com/thrasher-corp/gocryptotrader/currency" + exchange "github.com/thrasher-corp/gocryptotrader/exchanges" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" "github.com/thrasher-corp/gocryptotrader/exchanges/order" "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" @@ -221,7 +222,7 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte, seqCount uint64) (string, err if err != nil { return warnString, err } - sliToSend := []order.Detail{} + var sliToSend []order.Detail for i := range wsUser { for j := range wsUser[i].Orders { var oType order.Type @@ -432,31 +433,44 @@ func getTimestamp(rawData []byte) (time.Time, error) { // sendRequest is a helper function which sends a websocket message to the Coinbase server func (c *CoinbasePro) sendRequest(msgType, channel string, productIDs currency.Pairs) error { + authenticated := true creds, err := c.GetCredentials(context.Background()) if err != nil { - return err + if errors.Is(err, exchange.ErrCredentialsAreEmpty) || + errors.Is(err, exchange.ErrAuthenticationSupportNotEnabled) { + authenticated = false + if channel == "user" { + return errNoCredsUser + } + } else { + return err + } } n := strconv.FormatInt(time.Now().Unix(), 10) - message := n + channel + productIDs.Join() - hmac, err := crypto.GetHMAC(crypto.HashSHA256, - []byte(message), - []byte(creds.Secret)) - if err != nil { - return err - } - // TODO: Implement JWT authentication once our REST implementation moves to it, or if there's - // an exchange-wide reform to enable multiple sets of authentication credentials req := WebsocketRequest{ Type: msgType, ProductIDs: productIDs.Strings(), Channel: channel, - Signature: hex.EncodeToString(hmac), - Key: creds.Key, Timestamp: n, } - err = c.InitiateRateLimit(context.Background(), WSRate) + if authenticated { + message := n + channel + productIDs.Join() + hmac, err := crypto.GetHMAC(crypto.HashSHA256, + []byte(message), + []byte(creds.Secret)) + if err != nil { + return err + } + // TODO: Implement JWT authentication once our REST implementation moves to it, or if there's + // an exchange-wide reform to enable multiple sets of authentication credentials + req.Key = creds.Key + req.Signature = hex.EncodeToString(hmac) + err = c.InitiateRateLimit(context.Background(), WSAuthRate) + } else { + err = c.InitiateRateLimit(context.Background(), WSUnauthRate) + } if err != nil { - return fmt.Errorf("failed to rate limit HTTP request: %w", err) + return fmt.Errorf("failed to rate limit websocket request: %w", err) } return c.Websocket.Conn.SendJSONMessage(req) } diff --git a/exchanges/coinbasepro/ratelimit.go b/exchanges/coinbasepro/ratelimit.go index 835755ee424..1070afe0943 100644 --- a/exchanges/coinbasepro/ratelimit.go +++ b/exchanges/coinbasepro/ratelimit.go @@ -17,8 +17,11 @@ const ( coinbaseV2Interval = time.Hour coinbaseV2Rate = 10000 - coinbaseWSInterval = time.Second - coinbaseWSRate = 750 + coinbaseWSAuthInterval = time.Second + coinbaseWSAuthRate = 750 + + coinbaseWSUnauthInterval = time.Second + coinbaseWSUnauthRate = 8 coinbasePublicInterval = time.Second coinbasePublicRate = 10 @@ -28,16 +31,18 @@ const ( const ( V2Rate request.EndpointLimit = iota V3Rate - WSRate + WSAuthRate + WSUnauthRate PubRate ) // RateLimit implements the request.Limiter interface type RateLimit struct { - RateLimV3 *rate.Limiter - RateLimV2 *rate.Limiter - RateLimWS *rate.Limiter - RateLimPub *rate.Limiter + RateLimV3 *rate.Limiter + RateLimV2 *rate.Limiter + RateLimWSAuth *rate.Limiter + RateLimWSUnauth *rate.Limiter + RateLimPub *rate.Limiter } // Limit limits outbound calls @@ -47,8 +52,10 @@ func (r *RateLimit) Limit(ctx context.Context, f request.EndpointLimit) error { return r.RateLimV3.Wait(ctx) case V2Rate: return r.RateLimV2.Wait(ctx) - case WSRate: - return r.RateLimWS.Wait(ctx) + case WSAuthRate: + return r.RateLimWSAuth.Wait(ctx) + case WSUnauthRate: + return r.RateLimWSUnauth.Wait(ctx) case PubRate: return r.RateLimPub.Wait(ctx) default: @@ -59,9 +66,10 @@ func (r *RateLimit) Limit(ctx context.Context, f request.EndpointLimit) error { // SetRateLimit returns the rate limit for the exchange func SetRateLimit() *RateLimit { return &RateLimit{ - RateLimWS: request.NewRateLimit(coinbaseWSInterval, coinbaseWSRate), - RateLimV3: request.NewRateLimit(coinbaseV3Interval, coinbaseV3Rate), - RateLimV2: request.NewRateLimit(coinbaseV2Interval, coinbaseV2Rate), - RateLimPub: request.NewRateLimit(coinbasePublicInterval, coinbasePublicRate), + RateLimWSAuth: request.NewRateLimit(coinbaseWSAuthInterval, coinbaseWSAuthRate), + RateLimWSUnauth: request.NewRateLimit(coinbaseWSUnauthInterval, coinbaseWSUnauthRate), + RateLimV3: request.NewRateLimit(coinbaseV3Interval, coinbaseV3Rate), + RateLimV2: request.NewRateLimit(coinbaseV2Interval, coinbaseV2Rate), + RateLimPub: request.NewRateLimit(coinbasePublicInterval, coinbasePublicRate), } } diff --git a/exchanges/exchange.go b/exchanges/exchange.go index 4e2e34e90df..d1f5ca2c3ac 100644 --- a/exchanges/exchange.go +++ b/exchanges/exchange.go @@ -51,6 +51,7 @@ const ( var ( // ErrExchangeNameIsEmpty is returned when the exchange name is empty ErrExchangeNameIsEmpty = errors.New("exchange name is empty") + ErrSettingProxyAddress = errors.New("setting proxy address error") errEndpointStringNotFound = errors.New("endpoint string not found") errConfigPairFormatRequiresDelimiter = errors.New("config pair format requires delimiter") @@ -81,8 +82,8 @@ func (b *Base) SetClientProxyAddress(addr string) error { } proxy, err := url.Parse(addr) if err != nil { - return fmt.Errorf("setting proxy address error %s", - err) + return fmt.Errorf("%w %w", + ErrSettingProxyAddress, err) } err = b.Requester.SetProxy(proxy) diff --git a/exchanges/stream/websocket.go b/exchanges/stream/websocket.go index e7585e9449b..3c3b343c2c8 100644 --- a/exchanges/stream/websocket.go +++ b/exchanges/stream/websocket.go @@ -30,6 +30,7 @@ var ( ErrChannelInStateAlready = errors.New("channel already in state") ErrAlreadyDisabled = errors.New("websocket already disabled") ErrNotConnected = errors.New("websocket is not connected") + ErrWebsocketAlreadyEnabled = errors.New("websocket already enabled") ) // Private websocket errors @@ -40,7 +41,6 @@ var ( errWebsocketIsNil = errors.New("websocket is nil") errWebsocketSetupIsNil = errors.New("websocket setup is nil") errWebsocketAlreadyInitialised = errors.New("websocket already initialised") - errWebsocketAlreadyEnabled = errors.New("websocket already enabled") errWebsocketFeaturesIsUnset = errors.New("websocket features is unset") errConfigFeaturesIsNil = errors.New("exchange config features is nil") errDefaultURLIsEmpty = errors.New("default url is empty") @@ -331,7 +331,7 @@ func (w *Websocket) Disable() error { // Enable enables the exchange websocket protocol func (w *Websocket) Enable() error { if w.IsConnected() || w.IsEnabled() { - return fmt.Errorf("%s %w", w.exchangeName, errWebsocketAlreadyEnabled) + return fmt.Errorf("%s %w", w.exchangeName, ErrWebsocketAlreadyEnabled) } w.setEnabled(true)