Skip to content

Commit

Permalink
no error after unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
libotony committed Sep 12, 2024
1 parent d86f0f2 commit a2569e0
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 40 deletions.
67 changes: 31 additions & 36 deletions thorclient/wsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"fmt"
"net/url"
"strings"
"time"

"github.com/gorilla/websocket"
"github.com/vechain/thor/v2/api/blocks"
"github.com/vechain/thor/v2/api/subscriptions"
"github.com/vechain/thor/v2/thorclient/common"
)

const readTimeout = 60 * time.Second

// Client represents a WebSocket client that connects to the VeChainThor blockchain via WebSocket
// for subscribing to blockchain events and updates.
type Client struct {
Expand Down Expand Up @@ -57,10 +60,7 @@ func (c *Client) SubscribeEvents(query string) (*common.Subscription[*subscripti
return nil, fmt.Errorf("unable to connect - %w", err)
}

return &common.Subscription[*subscriptions.EventMessage]{
EventChan: subscribe[subscriptions.EventMessage](conn),
Unsubscribe: stopFunc(conn),
}, nil
return subscribe[subscriptions.EventMessage](conn), nil
}

// SubscribeBlocks subscribes to block updates based on the provided query.
Expand All @@ -71,10 +71,7 @@ func (c *Client) SubscribeBlocks(query string) (*common.Subscription[*blocks.JSO
return nil, fmt.Errorf("unable to connect - %w", err)
}

return &common.Subscription[*blocks.JSONCollapsedBlock]{
EventChan: subscribe[blocks.JSONCollapsedBlock](conn),
Unsubscribe: stopFunc(conn),
}, nil
return subscribe[blocks.JSONCollapsedBlock](conn), nil
}

// SubscribeTransfers subscribes to transfer events based on the provided query.
Expand All @@ -85,10 +82,7 @@ func (c *Client) SubscribeTransfers(query string) (*common.Subscription[*subscri
return nil, fmt.Errorf("unable to connect - %w", err)
}

return &common.Subscription[*subscriptions.TransferMessage]{
EventChan: subscribe[subscriptions.TransferMessage](conn),
Unsubscribe: stopFunc(conn),
}, nil
return subscribe[subscriptions.TransferMessage](conn), nil
}

// SubscribeTxPool subscribes to pending transaction pool updates based on the provided query.
Expand All @@ -99,10 +93,7 @@ func (c *Client) SubscribeTxPool(query string) (*common.Subscription[*subscripti
return nil, fmt.Errorf("unable to connect - %w", err)
}

return &common.Subscription[*subscriptions.PendingTxIDMessage]{
EventChan: subscribe[subscriptions.PendingTxIDMessage](conn),
Unsubscribe: stopFunc(conn),
}, nil
return subscribe[subscriptions.PendingTxIDMessage](conn), nil
}

// SubscribeBeats2 subscribes to Beat2 messages based on the provided query.
Expand All @@ -113,49 +104,46 @@ func (c *Client) SubscribeBeats2(query string) (*common.Subscription[*subscripti
return nil, fmt.Errorf("unable to connect - %w", err)
}

return &common.Subscription[*subscriptions.Beat2Message]{
EventChan: subscribe[subscriptions.Beat2Message](conn),
Unsubscribe: stopFunc(conn),
}, nil
}

// stopFunc returns a function to close the WebSocket connection gracefully.
// It ensures the WebSocket connection is stopped after sending a close message.
func stopFunc(conn *websocket.Conn) func() {
return func() {
// todo add metrics
conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
conn.Close()
}
return subscribe[subscriptions.Beat2Message](conn), nil
}

// subscribe starts a new subscription over the given WebSocket connection.
// It returns a read-only channel that streams events of type T.
func subscribe[T any](conn *websocket.Conn) <-chan common.EventWrapper[*T] {
func subscribe[T any](conn *websocket.Conn) *common.Subscription[*T] {
// Create a new channel for events
eventChan := make(chan common.EventWrapper[*T], 10_000)
eventChan := make(chan common.EventWrapper[*T], 1_000)
var closed bool

// Start a goroutine to handle receiving messages from the WebSocket connection.
go func() {
defer close(eventChan)
defer conn.Close()

for {
conn.SetReadDeadline(time.Now().Add(readTimeout))
var data T
// Read a JSON message from the WebSocket and unmarshal it into the data.
err := conn.ReadJSON(&data)
if err != nil {
// Send an EventWrapper with the error to the channel.
eventChan <- common.EventWrapper[*T]{Error: fmt.Errorf("%w: %w", common.ErrUnexpectedMsg, err)}
if !closed {
// Send an EventWrapper with the error to the channel.
eventChan <- common.EventWrapper[*T]{Error: fmt.Errorf("%w: %w", common.ErrUnexpectedMsg, err)}
}
return
}

// Send the received data to the event channel.
eventChan <- common.EventWrapper[*T]{Data: &data}
}
}()

return eventChan
return &common.Subscription[*T]{

This comment has been minimized.

Copy link
@otherview

otherview Sep 13, 2024

Member

good idea 👍

EventChan: eventChan,
Unsubscribe: func() {
closed = true
conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
conn.Close()
},
}
}

// connect establishes a WebSocket connection to the specified endpoint and query.
Expand All @@ -172,6 +160,13 @@ func (c *Client) connect(endpoint, rawQuery string) (*websocket.Conn, error) {
if err != nil {
return nil, err
}

conn.SetPingHandler(func(payload string) error {
// Make a best effort to send the pong message.
_ = conn.WriteControl(websocket.PongMessage, []byte(payload), time.Now().Add(time.Second))
conn.SetReadDeadline(time.Now().Add(readTimeout))
return nil
})
// TODO append to the connection pool
return conn, nil
}
31 changes: 27 additions & 4 deletions thorclient/wsclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,16 +393,39 @@ func TestClient_SubscribeBlocks_ClientShutdown_LongBlocks(t *testing.T) {
// unsubscribe should close the connection forcing a connection error in the eventChan
sub.Unsubscribe()

// next message should be an error
assert.Error(t, (<-sub.EventChan).Error)

// Ensure no more events are received after unsubscribe
select {
case _, ok := <-sub.EventChan:
if ok {
t.Error("Expected the event channel to be closed after unsubscribe, but it was still open")
}
case <-time.After(2 * time.Second):
case <-time.After(200 * time.Millisecond):
// Timeout here is expected since the channel should be closed and not sending events
}
}

// go test -timeout 80s -run ^TestSubscribeBeats2WithServer$ github.com/vechain/thor/v2/thorclient/wsclient -v
func TestSubscribeBeats2WithServer(t *testing.T) {
// t.Skip("this is a manual test")
client, err := NewClient("https://mainnet.vechain.org")
if err != nil {
t.Fatal(err)
}

sub, err := client.SubscribeBeats2("")
if err != nil {
t.Fatal(err)
}

go func() {
<-time.After(60 * time.Second)
sub.Unsubscribe()
}()

for ev := range sub.EventChan {
if ev.Error != nil {
t.Fatal(ev.Error)
}
t.Log(ev.Data)
}
}

0 comments on commit a2569e0

Please sign in to comment.