Skip to content

Commit

Permalink
[FABG-947] Terminate event reconnect routine after close (#45)
Browse files Browse the repository at this point in the history
The reconnect Go routine was not being terminated after the event client was closed, so the Go routine would run forever.
This patch ensures that the Go routine is terminated on event client close.

Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn authored Feb 3, 2020
1 parent 51ecb63 commit 5f7f0b0
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 9 deletions.
10 changes: 8 additions & 2 deletions pkg/fab/events/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func (c *Client) connectWithRetry(maxAttempts uint, timeBetweenAttempts time.Dur

var attempts uint
for {
if c.Stopped() {
return errors.New("event client is closed")
}

attempts++
logger.Debugf("Attempt #%d to connect...", attempts)
if err := c.connect(); err != nil {
Expand Down Expand Up @@ -411,8 +415,10 @@ func (c *Client) reconnect() {
}

if err := c.connectWithRetry(c.maxReconnAttempts, c.timeBetweenConnAttempts); err != nil {
logger.Warnf("Could not reconnect event client: %s. Closing.", err)
c.Close()
logger.Warnf("Could not reconnect event client: %s", err)
if !c.Stopped() {
c.Close()
}
} else {
logger.Infof("Event client has reconnected")
}
Expand Down
70 changes: 63 additions & 7 deletions pkg/fab/events/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"testing"
"time"

cb "github.com/hyperledger/fabric-protos-go/common"
pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
Expand All @@ -28,8 +30,6 @@ import (
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp"
"github.com/hyperledger/fabric-sdk-go/pkg/util/test"
cb "github.com/hyperledger/fabric-protos-go/common"
pb "github.com/hyperledger/fabric-protos-go/peer"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -711,6 +711,32 @@ func TestReconnect(t *testing.T) {
),
)
})

// (1) Connect
// -> should succeed to connect on the first attempt
// (2) Disconnect with non-fatal error
// -> will keep failing to reconnect and will retry forever
// (3) After waiting a while, close the client
// -> the client should stop trying to reconnect and the client should close
t.Run("#7", func(t *testing.T) {
t.Parallel()

closeCalled := false
testReconnect(t, true, 0, mockconn.ClosedOutcome, newDisconnectedEvent(),
mockconn.NewConnectResults(
mockconn.NewConnectResult(mockconn.FirstAttempt, clientmocks.ConnFactory),
),
withTimeoutAction(func(c *Client) (outcome mockconn.Outcome, b bool) {
if closeCalled {
return mockconn.TimedOutOutcome, true
}

c.Close()
closeCalled = true
return "", false
}),
)
})
}

// TestReconnectRegistration tests the ability of the Channel Event Client to
Expand Down Expand Up @@ -1064,7 +1090,33 @@ func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome mockconn
}
}

func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expectedOutcome mockconn.Outcome, event esdispatcher.Event, connAttemptResult mockconn.ConnectAttemptResults) {
type timeoutAction = func(c *Client) (mockconn.Outcome, bool)

type reconnectOptions struct {
timeoutAction timeoutAction
}

type reconnectOpt func(o *reconnectOptions)

func withTimeoutAction(a timeoutAction) reconnectOpt {
return func(o *reconnectOptions) {
o.timeoutAction = a
}
}

func testReconnect(t *testing.T, reconnect bool,
maxReconnectAttempts uint, expectedOutcome mockconn.Outcome, event esdispatcher.Event,
connAttemptResult mockconn.ConnectAttemptResults, opts ...reconnectOpt) {

reconOpts := &reconnectOptions{}
reconOpts.timeoutAction = func(c *Client) (outcome mockconn.Outcome, b bool) {
return mockconn.TimedOutOutcome, true
}

for _, opt := range opts {
opt(reconOpts)
}

cp := mockconn.NewProviderFactory()

connectch := make(chan *dispatcher.ConnectionEvent)
Expand Down Expand Up @@ -1106,10 +1158,14 @@ func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expe

var outcome mockconn.Outcome

select {
case outcome = <-outcomech:
case <-time.After(5 * time.Second):
outcome = mockconn.TimedOutOutcome
stop := false
for !stop {
select {
case outcome = <-outcomech:
stop = true
case <-time.After(5 * time.Second):
outcome, stop = reconOpts.timeoutAction(eventClient)
}
}

if outcome != expectedOutcome {
Expand Down

0 comments on commit 5f7f0b0

Please sign in to comment.