Skip to content

Commit

Permalink
Exchanges: Abstract ParallelChanOp
Browse files Browse the repository at this point in the history
  • Loading branch information
gbjk committed Jan 22, 2024
1 parent 781a51a commit 1ee9e64
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 68 deletions.
34 changes: 0 additions & 34 deletions exchanges/bitfinex/bitfinex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1926,40 +1926,6 @@ func TestGetErrResp(t *testing.T) {
assert.NoError(t, fixture.Close(), "Closing the fixture file should not error")
}

// TestParallelChanOp unit tests the helper func parallelChanOp
func TestParallelChanOp(t *testing.T) {
t.Parallel()
c := []subscription.Subscription{
{Channel: "red"},
{Channel: "blue"},
{Channel: "violent"},
{Channel: "spin"},
{Channel: "charm"},
}
run := make(chan struct{}, len(c)*2)
errC := make(chan error, 1)
go func() {
errC <- b.parallelChanOp(c, func(c *subscription.Subscription) error {
time.Sleep(300 * time.Millisecond)
run <- struct{}{}
switch c.Channel {
case "spin", "violent":
return errors.New(c.Channel)
}
return nil
})
}()
f := func(ct *assert.CollectT) {
if assert.Len(ct, errC, 1, "Should eventually have an error") {
err := <-errC
assert.ErrorContains(ct, err, "violent", "Should get a violent error")
assert.ErrorContains(ct, err, "spin", "Should get a spin error")
}
}
assert.EventuallyWithT(t, f, 500*time.Millisecond, 50*time.Millisecond, "ParallelChanOp should complete within 500ms not 5*300ms")
assert.Len(t, run, len(c), "Every channel was run to completion")
}

// setupWs is a helper function to connect both auth and normal websockets
// It will skip the test if websockets are not enabled
// It's up to the test to skip if it requires creds, though
Expand Down
51 changes: 17 additions & 34 deletions exchanges/bitfinex/bitfinex_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1666,44 +1666,23 @@ func (b *Bitfinex) ConfigureWS() error {

// Subscribe sends a websocket message to receive data from channels
func (b *Bitfinex) Subscribe(channels []subscription.Subscription) error {
return b.parallelChanOp(channels, b.subscribeToChan)
return b.ParallelChanOp(channels, b.subscribeToChan, 1)
}

// Unsubscribe sends a websocket message to stop receiving data from channels
func (b *Bitfinex) Unsubscribe(channels []subscription.Subscription) error {
return b.parallelChanOp(channels, b.unsubscribeFromChan)
}

// parallelChanOp performs a single method call in parallel across streams and waits to return any errors
func (b *Bitfinex) parallelChanOp(channels []subscription.Subscription, m func(*subscription.Subscription) error) error {
wg := sync.WaitGroup{}
wg.Add(len(channels))
errC := make(chan error, len(channels))

for i := range channels {
go func(c *subscription.Subscription) {
defer wg.Done()
if err := m(c); err != nil {
errC <- err
}
}(&channels[i])
}

wg.Wait()
close(errC)

var errs error
for err := range errC {
errs = common.AppendError(errs, err)
}

return errs
return b.ParallelChanOp(channels, b.unsubscribeFromChan, 1)
}

// subscribeToChan handles a single subscription and parses the result
// on success it adds the subscription to the websocket
func (b *Bitfinex) subscribeToChan(c *subscription.Subscription) error {
req, err := subscribeReq(c)
func (b *Bitfinex) subscribeToChan(chans []subscription.Subscription) error {
if len(chans) != 1 {
return errors.New("subscription batching limited to 1")
}

c := chans[0]
req, err := subscribeReq(&c)
if err != nil {
return fmt.Errorf("%w: %w; Channel: %s Pair: %s", stream.ErrSubscriptionFailure, err, c.Channel, c.Pair)
}
Expand All @@ -1718,13 +1697,13 @@ func (b *Bitfinex) subscribeToChan(c *subscription.Subscription) error {
c.Key = subID // Note subID string type avoids conflicts with later chanID key

c.State = subscription.SubscribingState
err = b.Websocket.AddSubscription(c)
err = b.Websocket.AddSubscription(&c)
if err != nil {
return fmt.Errorf("%w Channel: %s Pair: %s Error: %w", stream.ErrSubscriptionFailure, c.Channel, c.Pair, err)
}

// Always remove the temporary subscription keyed by subID
defer b.Websocket.RemoveSubscriptions(*c)
defer b.Websocket.RemoveSubscriptions(c)

respRaw, err := b.Websocket.Conn.SendMessageReturnResponse("subscribe:"+subID, req)
if err != nil {
Expand Down Expand Up @@ -1797,7 +1776,11 @@ func subscribeReq(c *subscription.Subscription) (map[string]interface{}, error)
}

// unsubscribeFromChan sends a websocket message to stop receiving data from a channel
func (b *Bitfinex) unsubscribeFromChan(c *subscription.Subscription) error {
func (b *Bitfinex) unsubscribeFromChan(chans []subscription.Subscription) error {
if len(chans) != 1 {
return errors.New("subscription batching limited to 1")
}
c := chans[0]
chanID, ok := c.Key.(int)
if !ok {
return common.GetTypeAssertError("int", c.Key, "chanID")
Expand All @@ -1819,7 +1802,7 @@ func (b *Bitfinex) unsubscribeFromChan(c *subscription.Subscription) error {
return wErr
}

b.Websocket.RemoveSubscriptions(*c)
b.Websocket.RemoveSubscriptions(c)

return nil
}
Expand Down
36 changes: 36 additions & 0 deletions exchanges/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"unicode"

Expand Down Expand Up @@ -62,6 +63,7 @@ var (
errAssetConfigFormatIsNil = errors.New("asset type config format is nil")
errSetDefaultsNotCalled = errors.New("set defaults not called")
errExchangeIsNil = errors.New("exchange is nil")
errBatchSizeZero = errors.New("batch size cannot be 0")
)

// SetRequester sets the instance of the requester
Expand Down Expand Up @@ -1840,3 +1842,37 @@ func (b *Base) IsPairEnabled(pair currency.Pair, a asset.Item) (bool, error) {
func (b *Base) GetOpenInterest(context.Context, ...key.PairAsset) ([]futures.OpenInterest, error) {
return nil, common.ErrFunctionNotSupported
}

// ParallelChanOp performs a single method call in parallel across streams and waits to return any errors
func (b *Base) ParallelChanOp(channels []subscription.Subscription, m func([]subscription.Subscription) error, batchSize int) error {
wg := sync.WaitGroup{}
errC := make(chan error, len(channels))
if batchSize == 0 {
return errBatchSizeZero
}

var j int
for i := 0; i < len(channels); i += batchSize {
j += batchSize
if j >= len(channels) {
j = len(channels)
}
wg.Add(1)
go func(c []subscription.Subscription) {
defer wg.Done()
if err := m(c); err != nil {
errC <- err
}
}(channels[i:j])
}

wg.Wait()
close(errC)

var errs error
for err := range errC {
errs = common.AppendError(errs, err)
}

return errs
}
35 changes: 35 additions & 0 deletions exchanges/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3295,3 +3295,38 @@ func TestSetSubscriptionsFromConfig(t *testing.T) {
assert.ElementsMatch(t, subs, b.Features.Subscriptions, "Subscriptions should be updated from Config")
assert.ElementsMatch(t, subs, b.Config.Features.Subscriptions, "Config Subscriptions should be the same")
}

// TestParallelChanOp unit tests the helper func ParallelChanOp
func TestParallelChanOp(t *testing.T) {
t.Parallel()
c := []subscription.Subscription{
{Channel: "red"},
{Channel: "blue"},
{Channel: "violent"},
{Channel: "spin"},
{Channel: "charm"},
}
run := make(chan struct{}, len(c)*2)
b := Base{}
errC := make(chan error, 1)
go func() {
errC <- b.ParallelChanOp(c, func(c []subscription.Subscription) error {
time.Sleep(300 * time.Millisecond)
run <- struct{}{}
switch c[0].Channel {
case "spin", "violent":
return errors.New(c[0].Channel)
}
return nil
}, 1)
}()
f := func(ct *assert.CollectT) {
if assert.Len(ct, errC, 1, "Should eventually have an error") {
err := <-errC
assert.ErrorContains(ct, err, "violent", "Should get a violent error")
assert.ErrorContains(ct, err, "spin", "Should get a spin error")
}
}
assert.EventuallyWithT(t, f, 500*time.Millisecond, 50*time.Millisecond, "ParallelChanOp should complete within 500ms not 5*300ms")
assert.Len(t, run, len(c), "Every channel was run to completion")
}

0 comments on commit 1ee9e64

Please sign in to comment.