Skip to content

Commit

Permalink
WIP: Rebase me out
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Feb 6, 2024
1 parent 68a6f58 commit cc204b3
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 8 deletions.
18 changes: 18 additions & 0 deletions exchanges/okx/okx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thrasher-corp/gocryptotrader/common"
"github.com/thrasher-corp/gocryptotrader/common/key"
"github.com/thrasher-corp/gocryptotrader/config"
Expand All @@ -28,6 +31,7 @@ import (
"github.com/thrasher-corp/gocryptotrader/exchanges/order"
"github.com/thrasher-corp/gocryptotrader/exchanges/orderbook"
"github.com/thrasher-corp/gocryptotrader/exchanges/sharedtestvalues"
testexch "github.com/thrasher-corp/gocryptotrader/internal/testing/exchange"
"github.com/thrasher-corp/gocryptotrader/portfolio/withdraw"
)

Expand Down Expand Up @@ -2735,6 +2739,20 @@ func setupWS() {
}
}

func TestDoubleConnPanic(t *testing.T) {
ok := new(Okx)

Check failure on line 2743 in exchanges/okx/okx_test.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "ok" shadows declaration at line 47 (govet)
require.NoError(t, testexch.TestInstance(ok), "TestInstance must not error")
ok = testexch.MockWSInstance[Okx](t, func(msg []byte, w *websocket.Conn) error {
return nil
})
ok.Websocket.ReadMessageErrors <- &websocket.CloseError{Code: websocket.CloseAbnormalClosure, Text: io.ErrUnexpectedEOF.Error()}
require.Eventually(t, func() bool { return !ok.Websocket.IsConnected() }, time.Second, 1*time.Millisecond, "Websocket should be set to not Connected")
fmt.Println("Calling connect")
err := ok.Websocket.Connect()
require.NoError(t, err, "Connect must not error")
time.Sleep(3 * time.Second)
}

// ************************** Public Channel Subscriptions *****************************

func TestInstrumentsSubscription(t *testing.T) {
Expand Down
36 changes: 36 additions & 0 deletions exchanges/okx/okx_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"hash/crc32"
"net/http"
"runtime/debug"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -215,6 +216,7 @@ const (

// WsConnect initiates a websocket connection
func (ok *Okx) WsConnect() error {
fmt.Printf("OKX WsConnect: %s", debug.Stack())
if !ok.Websocket.IsEnabled() || !ok.IsEnabled() {
return errors.New(stream.WebsocketNotEnabled)
}
Expand Down Expand Up @@ -335,8 +337,15 @@ func (ok *Okx) WsAuth(ctx context.Context, dialer *websocket.Dialer) error {

// wsReadData sends msgs from public and auth websockets to data handler
func (ok *Okx) wsReadData(ws stream.Connection) {
fmt.Printf("OKX Read data Starting: %s", debug.Stack())
defer fmt.Printf("OKX Read data exiting: %s", debug.Stack())
defer ok.Websocket.Wg.Done()
for {
/*
select {
case <-w.ShutdownC
case <-b.Websocket.ShutdownC:
*/
resp := ws.ReadMessage()
if resp.Raw == nil {
return
Expand All @@ -357,6 +366,33 @@ func (ok *Okx) Unsubscribe(channelsToUnsubscribe []subscription.Subscription) er
return ok.handleSubscription(operationUnsubscribe, channelsToUnsubscribe)
}

/*
func (ok *Okx) subscribeToChan(subs []subscription.Subscription) error {
for i := range subs {
s := &subs[i]
underlying, err := ok.GetUnderlying(s.Pair, s.Asset)
if err != nil {
...
}
algoID, _ := s.Params["algoId"].(string) // If it's not a string, it'll be empty, which is okay?
oneSub := SubscriptionInfo{
Channel: s.Channel,
InstrumentID: s.Pair.String(),
InstrumentType: ok.GetInstrumentTypeFromAssetItem(s.Asset),
Underlying: underlying,
AlgoID: algoID,
}
req := WSSubscriptionInformationList{
Arguments: []SubscriptionInfo{
oneSub,
},
}
}
}
*/

// handleSubscription sends a subscription and unsubscription information thought the websocket endpoint.
// as of the okx, exchange this endpoint sends subscription and unsubscription messages but with a list of json objects.
func (ok *Okx) handleSubscription(operation string, subscriptions []subscription.Subscription) error {
Expand Down
28 changes: 21 additions & 7 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ func (w *Websocket) connectionMonitor() error {
delay := w.connectionMonitorDelay
w.fieldMutex.RUnlock()

destroy := time.After(time.Second * 5)

go func() {
timer := time.NewTimer(delay)
for {
Expand Down Expand Up @@ -422,13 +424,23 @@ func (w *Websocket) connectionMonitor() error {
return
}
select {
case <-destroy:
if w.IsConnected() {
if err := w.Shutdown(); err != nil {
log.Errorf(log.WebsocketMgr, "%v websocket: connectionMonitor shutdown err: %s", w.exchangeName, err)
}
}
if err := w.Connect(); err != nil {
log.Errorln(log.WebsocketMgr, err)
}
case err := <-w.ReadMessageErrors:
if IsDisconnectionError(err) {
w.setInit(false)
log.Warnf(log.WebsocketMgr,
"%v websocket has been disconnected. Reason: %v",
w.exchangeName, err)
w.setConnectedStatus(false)
log.Warnf(log.WebsocketMgr, "%v websocket has been disconnected. Reason: %v", w.exchangeName, err)
if w.IsConnected() {
if err := w.Shutdown(); err != nil {

Check failure on line 440 in exchanges/stream/websocket.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 436 (govet)
log.Errorf(log.WebsocketMgr, "%v websocket: connectionMonitor shutdown err: %s", w.exchangeName, err)
}
}
}

w.DataHandler <- err
Expand All @@ -455,6 +467,7 @@ func (w *Websocket) connectionMonitor() error {
// Shutdown attempts to shut down a websocket connection and associated routines
// by using a package defined shutdown function
func (w *Websocket) Shutdown() error {
log.Warnf(log.WebsocketMgr, "Shutting down websocket")
w.m.Lock()
defer w.m.Unlock()

Expand Down Expand Up @@ -496,7 +509,9 @@ func (w *Websocket) Shutdown() error {
w.subscriptionMutex.Unlock()

close(w.ShutdownC)
log.Warnf(log.WebsocketMgr, "Waiting on Wg")
w.Wg.Wait()
log.Warnf(log.WebsocketMgr, "Done Waiting on Wg")
w.ShutdownC = make(chan struct{})
w.setConnectedStatus(false)
w.setConnectingStatus(false)
Expand Down Expand Up @@ -609,15 +624,14 @@ func (w *Websocket) trafficMonitor() {
trafficTimer.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done() // without this the w.Shutdown() call below will deadlock
if !w.IsConnecting() && w.IsConnected() {
if w.IsConnected() {
err := w.Shutdown()
if err != nil {
log.Errorf(log.WebsocketMgr,
"%v websocket: trafficMonitor shutdown err: %s",
w.exchangeName, err)
}
}

return
}

Expand Down
2 changes: 1 addition & 1 deletion testdata/configtest.json
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@
{
"name": "Okx",
"enabled": true,
"verbose": false,
"verbose": true,
"httpTimeout": 15000000000,
"websocketResponseCheckTimeout": 30000000,
"websocketResponseMaxLimit": 7000000000,
Expand Down

0 comments on commit cc204b3

Please sign in to comment.