Skip to content

Commit

Permalink
[FAB-10012] Reset attempt counter on connect success
Browse files Browse the repository at this point in the history
If broadcastClient succeed to connect to orderer, we
reset attempts counter.

Change-Id: I0489c73b62b27c9acf960b74644152f03bc11276
Signed-off-by: gennady <[email protected]>
  • Loading branch information
gennadylaventman committed May 30, 2018
1 parent 08e5958 commit 50527ed
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
11 changes: 8 additions & 3 deletions core/deliverservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,14 @@ func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{},
var totalRetryTime time.Duration
var backoffDuration time.Duration
retry := true
resetAttemptCounter := func() {
attempt = 0
totalRetryTime = 0
}
for retry && !bc.shouldStop() {
attempt++
resp, err := bc.doAction(action)
resp, err := bc.doAction(action, resetAttemptCounter)
if err != nil {
attempt++
backoffDuration, retry = bc.shouldRetry(attempt, totalRetryTime)
if !retry {
logger.Warning("Got error:", err, "at", attempt, "attempt. Ceasing to retry")
Expand All @@ -105,12 +109,13 @@ func (bc *broadcastClient) try(action func() (interface{}, error)) (interface{},
return nil, fmt.Errorf("attempts (%d) or elapsed time (%v) exhausted", attempt, totalRetryTime)
}

func (bc *broadcastClient) doAction(action func() (interface{}, error)) (interface{}, error) {
func (bc *broadcastClient) doAction(action func() (interface{}, error), actionOnNewConnection func()) (interface{}, error) {
if bc.conn == nil {
err := bc.connect()
if err != nil {
return nil, err
}
actionOnNewConnection()
}
resp, err := action()
if err != nil {
Expand Down
79 changes: 79 additions & 0 deletions core/deliverservice/deliveryclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/hyperledger/fabric/core/deliverservice/mocks"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/common"
"github.com/hyperledger/fabric/gossip/util"
"github.com/hyperledger/fabric/msp/mgmt/testtools"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/spf13/viper"
Expand Down Expand Up @@ -547,6 +548,68 @@ func TestDeliverServiceShutdownRespawn(t *testing.T) {
osn2.Shutdown()
}

func TestDeliverServiceDisconnectReconnect(t *testing.T) {
// Scenario: Launch an ordering service node and let the client pull some blocks.
// Stop ordering service, wait for while - simulate disconnect and restart it back.
// Wait for some time, without sending blocks - simulate recv wait on empty channel.
// Repeat stop/start sequence multiple times, to make sure total retry time will pass
// value returned by getReConnectTotalTimeThreshold - in test it set to 2 seconds
// (0.5s + 1s + 2s + 4s) > 2s.
// Send new block and check that delivery client got it.
// So, we can see that waiting on recv in empty channel do reset total time spend in reconnection.
orgReconnectTotalTimeThreshold := util.GetDurationOrDefault("peer.deliveryclient.reconnectTotalTimeThreshold", defaultReConnectTotalTimeThreshold)
viper.Set("peer.deliveryclient.reconnectTotalTimeThreshold", time.Second*2)
defer func() {
viper.Set("peer.deliveryclient.reconnectTotalTimeThreshold", orgReconnectTotalTimeThreshold)
}()
defer ensureNoGoroutineLeak(t)()

osn := mocks.NewOrderer(5614, t)

time.Sleep(time.Second)
gossipServiceAdapter := &mocks.MockGossipServiceAdapter{GossipBlockDisseminations: make(chan uint64)}

service, err := NewDeliverService(&Config{
Endpoints: []string{"localhost:5614"},
Gossip: gossipServiceAdapter,
CryptoSvc: &mockMCS{},
ABCFactory: DefaultABCFactory,
ConnFactory: DefaultConnectionFactory,
})
assert.NoError(t, err)

li := &mocks.MockLedgerInfo{Height: uint64(100)}
osn.SetNextExpectedSeek(uint64(100))
err = service.StartDeliverForChannel("TEST_CHAINID", li, func() {})
assert.NoError(t, err, "can't start delivery")

// Check that delivery service requests blocks in order
go osn.SendBlock(uint64(100))
assertBlockDissemination(100, gossipServiceAdapter.GossipBlockDisseminations, t)
go osn.SendBlock(uint64(101))
assertBlockDissemination(101, gossipServiceAdapter.GossipBlockDisseminations, t)
atomic.StoreUint64(&li.Height, uint64(102))

for i := 0; i < 5; i += 1 {
// Shutdown orderer, simulate network disconnect
osn.Shutdown()
// Now wait for a disconnect to be discovered
assert.True(t, waitForConnectionCount(osn, 0), "deliverService can't disconnect from orderer")
// Recreate orderer, simulating network is back
osn = mocks.NewOrderer(5614, t)
osn.SetNextExpectedSeek(atomic.LoadUint64(&li.Height))
// Now wait for a while, to client connect back and simulate empty channel
assert.True(t, waitForConnectionCount(osn, 1), "deliverService can't reconnect to orderer")
}

// Send a block from orderer
go osn.SendBlock(uint64(102))
// Ensure it is received
assertBlockDissemination(102, gossipServiceAdapter.GossipBlockDisseminations, t)
service.Stop()
osn.Shutdown()
}

func TestDeliverServiceBadConfig(t *testing.T) {
// Empty endpoints
service, err := NewDeliverService(&Config{
Expand Down Expand Up @@ -647,3 +710,19 @@ func getStackTrace() string {
runtime.Stack(buf, true)
return string(buf)
}

func waitForConnectionCount(orderer *mocks.Orderer, connCount int) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

for {
select {
case <-time.After(time.Millisecond * 100):
if orderer.ConnCount() == connCount {
return true
}
case <-ctx.Done():
return false
}
}
}

0 comments on commit 50527ed

Please sign in to comment.