Skip to content

Commit

Permalink
WS revamp begins
Browse files Browse the repository at this point in the history
  • Loading branch information
cranktakular committed Jan 16, 2024
1 parent 2881841 commit 2166886
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 110 deletions.
1 change: 1 addition & 0 deletions exchanges/coinbasepro/coinbasepro.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ var (
errPortfolioIDEmpty = errors.New("portfolio id cannot be empty")
errFeeTypeNotSupported = errors.New("fee type not supported")
errUnknownEndpointLimit = errors.New("unknown endpoint limit")
errNoEventsWS = errors.New("no events returned from websocket")
)

// GetAllAccounts returns information on all trading accounts associated with the API key
Expand Down
58 changes: 27 additions & 31 deletions exchanges/coinbasepro/coinbasepro_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,21 +1069,21 @@ type ServerTimeV2 struct {

// WebsocketSubscribe takes in subscription information
type WebsocketSubscribe struct {
Type string `json:"type"`
ProductIDs []string `json:"product_ids,omitempty"`
Channels []WsChannels `json:"channels,omitempty"`
Signature string `json:"signature,omitempty"`
Key string `json:"key,omitempty"`
Passphrase string `json:"passphrase,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
}

// WsChannels defines outgoing channels for subscription purposes
type WsChannels struct {
Name string `json:"name"`
Type string `json:"type"`
ProductIDs []string `json:"product_ids,omitempty"`
Channel string `json:"channel,omitempty"`
Signature string `json:"signature,omitempty"`
Key string `json:"api_key,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
JWT string `json:"jwt,omitempty"`
}

// // WsChannels defines outgoing channels for subscription purposes
// type WsChannels struct {
// Name string `json:"name"`
// ProductIDs []string `json:"product_ids,omitempty"`
// }

// wsOrderReceived holds websocket received values
type wsOrderReceived struct {
Type string `json:"type"`
Expand Down Expand Up @@ -1125,21 +1125,15 @@ type WebsocketHeartBeat struct {

// WebsocketTicker defines ticker websocket response
type WebsocketTicker struct {
Type string `json:"type"`
Sequence int64 `json:"sequence"`
ProductID currency.Pair `json:"product_id"`
Price float64 `json:"price,string"`
Open24H float64 `json:"open_24h,string"`
Volume24H float64 `json:"volume_24h,string"`
Low24H float64 `json:"low_24h,string"`
High24H float64 `json:"high_24h,string"`
Volume30D float64 `json:"volume_30d,string"`
BestBid float64 `json:"best_bid,string"`
BestAsk float64 `json:"best_ask,string"`
Side string `json:"side"`
Time time.Time `json:"time"`
TradeID int64 `json:"trade_id"`
LastSize float64 `json:"last_size,string"`
Type string `json:"type"`
ProductID currency.Pair `json:"product_id"`
Price float64 `json:"price,string"`
Volume24H float64 `json:"volume_24_h,string"`
Low24H float64 `json:"low_24_h,string"`
High24H float64 `json:"high_24_h,string"`
Low52W float64 `json:"low_52_w,string"`
High52W float64 `json:"high_52_w,string"`
PricePercentageChange24H float64 `json:"price_percent_chg_24_h,string"`
}

// WebsocketOrderbookSnapshot defines a snapshot response
Expand All @@ -1159,10 +1153,12 @@ type WebsocketL2Update struct {
Changes [][3]string `json:"changes"`
}

type wsMsgType struct {
Type string `json:"type"`
Sequence int64 `json:"sequence"`
ProductID string `json:"product_id"`
type wsGen struct {
Channel string `json:"channel"`
ClientID string `json:"client_id"`
Timestamp time.Time `json:"timestamp"`
SequenceNum uint64 `json:"sequence_num"`
Events []interface{} `json:"events"`
}

type wsStatus struct {
Expand Down
207 changes: 128 additions & 79 deletions exchanges/coinbasepro/coinbasepro_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coinbasepro

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/thrasher-corp/gocryptotrader/common/convert"
"github.com/thrasher-corp/gocryptotrader/common/crypto"
"github.com/thrasher-corp/gocryptotrader/currency"
"github.com/thrasher-corp/gocryptotrader/exchanges/account"
"github.com/thrasher-corp/gocryptotrader/exchanges/asset"
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
Expand All @@ -24,7 +24,7 @@ import (
)

const (
coinbaseproWebsocketURL = "wss://ws-feed.pro.coinbase.com"
coinbaseproWebsocketURL = "wss://advanced-trade-ws.coinbase.com"
)

// WsConnect initiates a websocket connection
Expand Down Expand Up @@ -60,17 +60,20 @@ func (c *CoinbasePro) wsReadData() {
}

func (c *CoinbasePro) wsHandleData(respRaw []byte) error {
msgType := wsMsgType{}
err := json.Unmarshal(respRaw, &msgType)
fmt.Println("WHADDUP:", string(respRaw))
genData := wsGen{}
err := json.Unmarshal(respRaw, &genData)
if err != nil {
return err
}

if msgType.Type == "subscriptions" || msgType.Type == "heartbeat" {
if genData.Channel == "subscriptions" || genData.Channel == "heartbeats" {
return nil
}

switch msgType.Type {
fmt.Printf("=== OH NOO LOOK AT THIS DATA WE HAVE TO DEAL WITH: %s ===\n", genData.Events)

Check failure on line 74 in exchanges/coinbasepro/coinbasepro_websocket.go

View workflow job for this annotation

GitHub Actions / Spell checker

NOO ==> NO

switch genData.Channel {
case "status":
var status wsStatus
err = json.Unmarshal(respRaw, &status)
Expand All @@ -81,26 +84,27 @@ func (c *CoinbasePro) wsHandleData(respRaw []byte) error {
case "error":
c.Websocket.DataHandler <- errors.New(string(respRaw))
case "ticker":
wsTicker := WebsocketTicker{}
var wsTicker []WebsocketTicker
if len(genData.Events) == 0 {
return errNoEventsWS
}
err := json.Unmarshal(respRaw, &wsTicker)
if err != nil {
return err
}

c.Websocket.DataHandler <- &ticker.Price{
LastUpdated: wsTicker.Time,
Pair: wsTicker.ProductID,
AssetType: asset.Spot,
ExchangeName: c.Name,
Open: wsTicker.Open24H,
High: wsTicker.High24H,
Low: wsTicker.Low24H,
Last: wsTicker.Price,
Volume: wsTicker.Volume24H,
Bid: wsTicker.BestBid,
Ask: wsTicker.BestAsk,
for i := range wsTicker {
c.Websocket.DataHandler <- &ticker.Price{
LastUpdated: genData.Timestamp,
Pair: wsTicker[i].ProductID,
AssetType: asset.Spot,
ExchangeName: c.Name,
High: wsTicker[i].High24H,
Low: wsTicker[i].Low24H,
Last: wsTicker[i].Price,
Volume: wsTicker[i].Volume24H,
}
}

case "snapshot":
var snapshot WebsocketOrderbookSnapshot
err := json.Unmarshal(respRaw, &snapshot)
Expand Down Expand Up @@ -366,21 +370,19 @@ func (c *CoinbasePro) ProcessUpdate(update *WebsocketL2Update) error {

// GenerateDefaultSubscriptions Adds default subscriptions to websocket to be handled by ManageSubscriptions()
func (c *CoinbasePro) GenerateDefaultSubscriptions() ([]stream.ChannelSubscription, error) {
var channels = []string{"heartbeat",
"level2_batch", /*Other orderbook feeds require authentication. This is batched in 50ms lots.*/
var channels = []string{
"heartbeats",
// "level2_batch", /*Other orderbook feeds require authentication. This is batched in 50ms lots.*/
"ticker",
"user",
"matches"}
// "user",
// "matches",
}
enabledCurrencies, err := c.GetEnabledPairs(asset.Spot)
if err != nil {
return nil, err
}
var subscriptions []stream.ChannelSubscription
for i := range channels {
if (channels[i] == "user" || channels[i] == "full") &&
!c.IsWebsocketAuthenticationSupported() {
continue
}
for j := range enabledCurrencies {
fPair, err := c.FormatExchangeCurrency(enabledCurrencies[j],
asset.Spot)
Expand All @@ -397,59 +399,96 @@ func (c *CoinbasePro) GenerateDefaultSubscriptions() ([]stream.ChannelSubscripti
return subscriptions, nil
}

// Subscribe sends a websocket message to receive data from the channel
func (c *CoinbasePro) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {
var creds *account.Credentials
var err error
if c.IsWebsocketAuthenticationSupported() {
creds, err = c.GetCredentials(context.TODO())
if err != nil {
return err
}
func (c *CoinbasePro) sendRequest(msgType, channel string, productID currency.Pair) error {
creds, err := c.GetCredentials(context.Background())
if err != nil {
return err
}

subscribe := WebsocketSubscribe{
Type: "subscribe",
n := strconv.FormatInt(time.Now().Unix(), 10)

message := n + channel + productID.String()

hmac, err := crypto.GetHMAC(crypto.HashSHA256,
[]byte(message),
[]byte(creds.Secret))
if err != nil {
return err
}

subscriptions:
for i := range channelsToSubscribe {
p := channelsToSubscribe[i].Currency.String()
if !common.StringDataCompare(subscribe.ProductIDs, p) && p != "" {
subscribe.ProductIDs = append(subscribe.ProductIDs, p)
}
req := WebsocketSubscribe{
Type: msgType,
ProductIDs: []string{productID.String()},
Channel: channel,
Signature: hex.EncodeToString(hmac),
Key: creds.Key,
Timestamp: n,
}

for j := range subscribe.Channels {
if subscribe.Channels[j].Name == channelsToSubscribe[i].Channel {
continue subscriptions
}
}
meow, _ := json.Marshal(req)

subscribe.Channels = append(subscribe.Channels, WsChannels{
Name: channelsToSubscribe[i].Channel,
})
fmt.Print(string(meow))

if (channelsToSubscribe[i].Channel == "user" ||
channelsToSubscribe[i].Channel == "full") && creds != nil {
n := strconv.FormatInt(time.Now().Unix(), 10)
message := n + http.MethodGet + "/users/self/verify"
var hmac []byte
hmac, err = crypto.GetHMAC(crypto.HashSHA256,
[]byte(message),
[]byte(creds.Secret))
if err != nil {
return err
}
subscribe.Signature = crypto.Base64Encode(hmac)
subscribe.Key = creds.Key
subscribe.Passphrase = creds.ClientID
subscribe.Timestamp = n
}
}
err = c.Websocket.Conn.SendJSONMessage(subscribe)
err = c.Websocket.Conn.SendJSONMessage(req)
if err != nil {
return err
}
return nil
}

// Subscribe sends a websocket message to receive data from the channel
func (c *CoinbasePro) Subscribe(channelsToSubscribe []stream.ChannelSubscription) error {

fmt.Printf("SUBSCRIBE: %v\n", channelsToSubscribe)
// var creds *account.Credentials
// var err error
// if c.IsWebsocketAuthenticationSupported() {
// creds, err = c.GetCredentials(context.TODO())
// if err != nil {
// return err
// }
// }

// subscribe := WebsocketSubscribe{

Check failure on line 452 in exchanges/coinbasepro/coinbasepro_websocket.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
// Type: "subscribe",
// }

// subscriptions:
// for i := range channelsToSubscribe {
// p := channelsToSubscribe[i].Currency.String()
// if !common.StringDataCompare(subscribe.ProductIDs, p) && p != "" {
// subscribe.ProductIDs = append(subscribe.ProductIDs, p)
// }

// if subscribe.Channel == channelsToSubscribe[i].Channel {

Check failure on line 463 in exchanges/coinbasepro/coinbasepro_websocket.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
// continue subscriptions
// }

// subscribe.Channel = channelsToSubscribe[i].Channel

// if (channelsToSubscribe[i].Channel == "user" ||
// channelsToSubscribe[i].Channel == "full") && creds != nil {
// n := strconv.FormatInt(time.Now().Unix(), 10)
// message := n + http.MethodGet + "/users/self/verify"
// var hmac []byte
// hmac, err = crypto.GetHMAC(crypto.HashSHA256,
// []byte(message),
// []byte(creds.Secret))
// if err != nil {
// return err
// }
// subscribe.Signature = crypto.Base64Encode(hmac)
// subscribe.Key = creds.Key
// subscribe.Timestamp = n
// }
// }
for i := range channelsToSubscribe {
err := c.sendRequest("subscribe", channelsToSubscribe[i].Channel, channelsToSubscribe[i].Currency)
if err != nil {
return err
}
}

c.Websocket.AddSuccessfulSubscriptions(channelsToSubscribe...)
return nil
}
Expand All @@ -460,22 +499,17 @@ func (c *CoinbasePro) Unsubscribe(channelsToUnsubscribe []stream.ChannelSubscrip
Type: "unsubscribe",
}

unsubscriptions:
for i := range channelsToUnsubscribe {
p := channelsToUnsubscribe[i].Currency.String()
if !common.StringDataCompare(unsubscribe.ProductIDs, p) && p != "" {
unsubscribe.ProductIDs = append(unsubscribe.ProductIDs, p)
}

for j := range unsubscribe.Channels {
if unsubscribe.Channels[j].Name == channelsToUnsubscribe[i].Channel {
continue unsubscriptions
}
if unsubscribe.Channel == channelsToUnsubscribe[i].Channel {
unsubscribe.Channel = channelsToUnsubscribe[i].Channel

}

unsubscribe.Channels = append(unsubscribe.Channels, WsChannels{
Name: channelsToUnsubscribe[i].Channel,
})
}
err := c.Websocket.Conn.SendJSONMessage(unsubscribe)
if err != nil {
Expand All @@ -484,3 +518,18 @@ unsubscriptions:
c.Websocket.RemoveSubscriptions(channelsToUnsubscribe...)
return nil
}

// const wow = "-----BEGIN EC PRIVATE KEY-----\n%s\n-----END EC PRIVATE KEY-----\n"

// func (c *CoinbasePro) GetJWT(ctx context.Context) (string, error) {
// creds, err := c.GetCredentials(ctx)
// if err != nil {
// return "", err
// }

// block, _ := pem.Decode([]byte(fmt.Sprintf(wow, creds.Secret)))
// if block == nil {
// return "", fmt.Errorf("jwt: Could not decode private key")
// }

// }

0 comments on commit 2166886

Please sign in to comment.