From a2569e0f0db4d8a6f8c96969f8f2c927edc2d2f6 Mon Sep 17 00:00:00 2001 From: tony Date: Thu, 12 Sep 2024 17:16:45 +0800 Subject: [PATCH] no error after unsubscribe --- thorclient/wsclient/client.go | 67 ++++++++++++++---------------- thorclient/wsclient/client_test.go | 31 ++++++++++++-- 2 files changed, 58 insertions(+), 40 deletions(-) diff --git a/thorclient/wsclient/client.go b/thorclient/wsclient/client.go index bf8ab2145..2d009d38a 100644 --- a/thorclient/wsclient/client.go +++ b/thorclient/wsclient/client.go @@ -11,6 +11,7 @@ import ( "fmt" "net/url" "strings" + "time" "github.com/gorilla/websocket" "github.com/vechain/thor/v2/api/blocks" @@ -18,6 +19,8 @@ import ( "github.com/vechain/thor/v2/thorclient/common" ) +const readTimeout = 60 * time.Second + // Client represents a WebSocket client that connects to the VeChainThor blockchain via WebSocket // for subscribing to blockchain events and updates. type Client struct { @@ -57,10 +60,7 @@ func (c *Client) SubscribeEvents(query string) (*common.Subscription[*subscripti return nil, fmt.Errorf("unable to connect - %w", err) } - return &common.Subscription[*subscriptions.EventMessage]{ - EventChan: subscribe[subscriptions.EventMessage](conn), - Unsubscribe: stopFunc(conn), - }, nil + return subscribe[subscriptions.EventMessage](conn), nil } // SubscribeBlocks subscribes to block updates based on the provided query. @@ -71,10 +71,7 @@ func (c *Client) SubscribeBlocks(query string) (*common.Subscription[*blocks.JSO return nil, fmt.Errorf("unable to connect - %w", err) } - return &common.Subscription[*blocks.JSONCollapsedBlock]{ - EventChan: subscribe[blocks.JSONCollapsedBlock](conn), - Unsubscribe: stopFunc(conn), - }, nil + return subscribe[blocks.JSONCollapsedBlock](conn), nil } // SubscribeTransfers subscribes to transfer events based on the provided query. @@ -85,10 +82,7 @@ func (c *Client) SubscribeTransfers(query string) (*common.Subscription[*subscri return nil, fmt.Errorf("unable to connect - %w", err) } - return &common.Subscription[*subscriptions.TransferMessage]{ - EventChan: subscribe[subscriptions.TransferMessage](conn), - Unsubscribe: stopFunc(conn), - }, nil + return subscribe[subscriptions.TransferMessage](conn), nil } // SubscribeTxPool subscribes to pending transaction pool updates based on the provided query. @@ -99,10 +93,7 @@ func (c *Client) SubscribeTxPool(query string) (*common.Subscription[*subscripti return nil, fmt.Errorf("unable to connect - %w", err) } - return &common.Subscription[*subscriptions.PendingTxIDMessage]{ - EventChan: subscribe[subscriptions.PendingTxIDMessage](conn), - Unsubscribe: stopFunc(conn), - }, nil + return subscribe[subscriptions.PendingTxIDMessage](conn), nil } // SubscribeBeats2 subscribes to Beat2 messages based on the provided query. @@ -113,27 +104,15 @@ func (c *Client) SubscribeBeats2(query string) (*common.Subscription[*subscripti return nil, fmt.Errorf("unable to connect - %w", err) } - return &common.Subscription[*subscriptions.Beat2Message]{ - EventChan: subscribe[subscriptions.Beat2Message](conn), - Unsubscribe: stopFunc(conn), - }, nil -} - -// stopFunc returns a function to close the WebSocket connection gracefully. -// It ensures the WebSocket connection is stopped after sending a close message. -func stopFunc(conn *websocket.Conn) func() { - return func() { - // todo add metrics - conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - conn.Close() - } + return subscribe[subscriptions.Beat2Message](conn), nil } // subscribe starts a new subscription over the given WebSocket connection. // It returns a read-only channel that streams events of type T. -func subscribe[T any](conn *websocket.Conn) <-chan common.EventWrapper[*T] { +func subscribe[T any](conn *websocket.Conn) *common.Subscription[*T] { // Create a new channel for events - eventChan := make(chan common.EventWrapper[*T], 10_000) + eventChan := make(chan common.EventWrapper[*T], 1_000) + var closed bool // Start a goroutine to handle receiving messages from the WebSocket connection. go func() { @@ -141,21 +120,30 @@ func subscribe[T any](conn *websocket.Conn) <-chan common.EventWrapper[*T] { defer conn.Close() for { + conn.SetReadDeadline(time.Now().Add(readTimeout)) var data T // Read a JSON message from the WebSocket and unmarshal it into the data. err := conn.ReadJSON(&data) if err != nil { - // Send an EventWrapper with the error to the channel. - eventChan <- common.EventWrapper[*T]{Error: fmt.Errorf("%w: %w", common.ErrUnexpectedMsg, err)} + if !closed { + // Send an EventWrapper with the error to the channel. + eventChan <- common.EventWrapper[*T]{Error: fmt.Errorf("%w: %w", common.ErrUnexpectedMsg, err)} + } return } - // Send the received data to the event channel. eventChan <- common.EventWrapper[*T]{Data: &data} } }() - return eventChan + return &common.Subscription[*T]{ + EventChan: eventChan, + Unsubscribe: func() { + closed = true + conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + conn.Close() + }, + } } // connect establishes a WebSocket connection to the specified endpoint and query. @@ -172,6 +160,13 @@ func (c *Client) connect(endpoint, rawQuery string) (*websocket.Conn, error) { if err != nil { return nil, err } + + conn.SetPingHandler(func(payload string) error { + // Make a best effort to send the pong message. + _ = conn.WriteControl(websocket.PongMessage, []byte(payload), time.Now().Add(time.Second)) + conn.SetReadDeadline(time.Now().Add(readTimeout)) + return nil + }) // TODO append to the connection pool return conn, nil } diff --git a/thorclient/wsclient/client_test.go b/thorclient/wsclient/client_test.go index 54db3f4a7..546f256f2 100644 --- a/thorclient/wsclient/client_test.go +++ b/thorclient/wsclient/client_test.go @@ -393,16 +393,39 @@ func TestClient_SubscribeBlocks_ClientShutdown_LongBlocks(t *testing.T) { // unsubscribe should close the connection forcing a connection error in the eventChan sub.Unsubscribe() - // next message should be an error - assert.Error(t, (<-sub.EventChan).Error) - // Ensure no more events are received after unsubscribe select { case _, ok := <-sub.EventChan: if ok { t.Error("Expected the event channel to be closed after unsubscribe, but it was still open") } - case <-time.After(2 * time.Second): + case <-time.After(200 * time.Millisecond): // Timeout here is expected since the channel should be closed and not sending events } } + +// go test -timeout 80s -run ^TestSubscribeBeats2WithServer$ github.com/vechain/thor/v2/thorclient/wsclient -v +func TestSubscribeBeats2WithServer(t *testing.T) { + // t.Skip("this is a manual test") + client, err := NewClient("https://mainnet.vechain.org") + if err != nil { + t.Fatal(err) + } + + sub, err := client.SubscribeBeats2("") + if err != nil { + t.Fatal(err) + } + + go func() { + <-time.After(60 * time.Second) + sub.Unsubscribe() + }() + + for ev := range sub.EventChan { + if ev.Error != nil { + t.Fatal(ev.Error) + } + t.Log(ev.Data) + } +}