Skip to content

Commit

Permalink
event: add ResubscribeErr (ethereum#22191)
Browse files Browse the repository at this point in the history
This adds a way to get the error of the failing subscription
for logging/debugging purposes.

Co-authored-by: Felix Lange <[email protected]>
  • Loading branch information
lukasz-zimnoch and fjl authored Jan 21, 2021
1 parent c4307a9 commit 231040c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
32 changes: 28 additions & 4 deletions event/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,26 @@ func (s *funcSub) Err() <-chan error {
// Resubscribe applies backoff between calls to fn. The time between calls is adapted
// based on the error rate, but will never exceed backoffMax.
func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
return ResubscribeErr(backoffMax, func(ctx context.Context, _ error) (Subscription, error) {
return fn(ctx)
})
}

// A ResubscribeFunc attempts to establish a subscription.
type ResubscribeFunc func(context.Context) (Subscription, error)

// ResubscribeErr calls fn repeatedly to keep a subscription established. When the
// subscription is established, ResubscribeErr waits for it to fail and calls fn again. This
// process repeats until Unsubscribe is called or the active subscription ends
// successfully.
//
// The difference between Resubscribe and ResubscribeErr is that with ResubscribeErr,
// the error of the failing subscription is available to the callback for logging
// purposes.
//
// ResubscribeErr applies backoff between calls to fn. The time between calls is adapted
// based on the error rate, but will never exceed backoffMax.
func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscription {
s := &resubscribeSub{
waitTime: backoffMax / 10,
backoffMax: backoffMax,
Expand All @@ -106,15 +126,18 @@ func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
return s
}

// A ResubscribeFunc attempts to establish a subscription.
type ResubscribeFunc func(context.Context) (Subscription, error)
// A ResubscribeErrFunc attempts to establish a subscription.
// For every call but the first, the second argument to this function is
// the error that occurred with the previous subscription.
type ResubscribeErrFunc func(context.Context, error) (Subscription, error)

type resubscribeSub struct {
fn ResubscribeFunc
fn ResubscribeErrFunc
err chan error
unsub chan struct{}
unsubOnce sync.Once
lastTry mclock.AbsTime
lastSubErr error
waitTime, backoffMax time.Duration
}

Expand Down Expand Up @@ -149,7 +172,7 @@ func (s *resubscribeSub) subscribe() Subscription {
s.lastTry = mclock.Now()
ctx, cancel := context.WithCancel(context.Background())
go func() {
rsub, err := s.fn(ctx)
rsub, err := s.fn(ctx, s.lastSubErr)
sub = rsub
subscribed <- err
}()
Expand Down Expand Up @@ -178,6 +201,7 @@ func (s *resubscribeSub) waitForError(sub Subscription) bool {
defer sub.Unsubscribe()
select {
case err := <-sub.Err():
s.lastSubErr = err
return err == nil
case <-s.unsub:
return true
Expand Down
36 changes: 36 additions & 0 deletions event/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package event
import (
"context"
"errors"
"fmt"
"reflect"
"testing"
"time"
)
Expand Down Expand Up @@ -118,3 +120,37 @@ func TestResubscribeAbort(t *testing.T) {
t.Fatal(err)
}
}

func TestResubscribeWithErrorHandler(t *testing.T) {
t.Parallel()

var i int
nfails := 6
subErrs := make([]string, 0)
sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) {
i++
var lastErrVal string
if lastErr != nil {
lastErrVal = lastErr.Error()
}
subErrs = append(subErrs, lastErrVal)
sub := NewSubscription(func(unsubscribed <-chan struct{}) error {
if i < nfails {
return fmt.Errorf("err-%v", i)
} else {
return nil
}
})
return sub, nil
})

<-sub.Err()
if i != nfails {
t.Fatalf("resubscribe function called %d times, want %d times", i, nfails)
}

expectedSubErrs := []string{"", "err-1", "err-2", "err-3", "err-4", "err-5"}
if !reflect.DeepEqual(subErrs, expectedSubErrs) {
t.Fatalf("unexpected subscription errors %v, want %v", subErrs, expectedSubErrs)
}
}

0 comments on commit 231040c

Please sign in to comment.