Skip to content

Commit

Permalink
[FABG-875] Fix round-robin balancer in selection
Browse files Browse the repository at this point in the history
Create a default sorter which is saved to the selection service. This
way the round-robin balancer will retain its state between requests.

Change-Id: Ic18da43b873a61f038beee531659410a33577723
Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed Jul 8, 2019
1 parent 48690b2 commit f264087
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Service struct {
chResponseCache *lazycache.Cache
retryOpts retry.Opts
errHandler fab.ErrorHandler
peerSorter soptions.PeerSorter
}

// New creates a new dynamic selection service using Fabric's Discovery Service
Expand Down Expand Up @@ -111,6 +112,7 @@ func New(ctx contextAPI.Client, channelID string, discovery fab.DiscoveryService
discClient: discoveryClient,
retryOpts: options.retryOpts,
errHandler: options.errHandler,
peerSorter: resolvePeerSorter(channelID, ctx),
}

s.chResponseCache = lazycache.NewWithData(
Expand Down Expand Up @@ -158,6 +160,10 @@ func (s *Service) GetEndorsersForChaincode(chaincodes []*fab.ChaincodeCall, opts
params := soptions.Params{RetryOpts: s.retryOpts}
coptions.Apply(&params, opts)

if params.PeerSorter == nil {
params.PeerSorter = s.peerSorter
}

chResponse, err := s.getChannelResponse(chaincodes, params.RetryOpts)
if err != nil {
return nil, errors.Wrapf(err, "error getting channel response for channel [%s]", s.channelID)
Expand Down Expand Up @@ -190,7 +196,7 @@ func (s *Service) getEndorsers(chaincodes []*fab.ChaincodeCall, chResponse discc
return nil, errors.Wrapf(err, "error getting peers from discovery service for channel [%s]", s.channelID)
}

endpoints, err := chResponse.Endorsers(asInvocationChain(chaincodes), newFilter(s.channelID, s.ctx, peers, peerFilter, sorter))
endpoints, err := chResponse.Endorsers(asInvocationChain(chaincodes), newFilter(s.ctx, peers, peerFilter, sorter))
if err != nil && newDiscoveryError(err).isTransient() {
return nil, status.New(status.DiscoveryServerStatus, int32(status.QueryEndorsers), fmt.Sprintf("error getting endorsers: %s", err), []interface{}{})
}
Expand Down
48 changes: 30 additions & 18 deletions pkg/client/common/selection/fabricselection/selection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,36 @@ func TestSelection(t *testing.T) {
})

t.Run("Peer Filter", func(t *testing.T) {
testSelectionPeerFilter(t, service)
endorsers, err := service.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: cc1}},
options.WithPeerFilter(func(peer fab.Peer) bool {
return peer.(fab.PeerState).BlockHeight() > 1001
}),
)

assert.NoError(t, err)
assert.Equalf(t, 4, len(endorsers), "Expecting 4 endorser")

// Ensure the endorsers all have a block height > 1001
for _, endorser := range endorsers {
blockHeight := endorser.(fab.PeerState).BlockHeight()
assert.Truef(t, blockHeight > 1001, "Expecting block height to be > 1001")
}
})

t.Run("Default Peer Filter", func(t *testing.T) {
var prev fab.Peer
for i := 0; i < 6; i++ {
endorsers, err := service.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: cc1}})
assert.NoError(t, err)
assert.Equalf(t, 6, len(endorsers), "Expecting 6 endorser")

// Ensure that we get a different endorser as the first peer each time GetEndorsersForChaincode is called in
// order to know that the default balancer (round-robin) is working.
if prev != nil {
require.NotEqual(t, prev, endorsers[0])
}
prev = endorsers[0]
}
})

t.Run("Block Height Sorter Round Robin", func(t *testing.T) {
Expand Down Expand Up @@ -278,23 +307,6 @@ func testSelectionCCtoCC(t *testing.T, service *Service) {
assert.Equalf(t, 6, len(endorsers), "Expecting 6 endorser")
}

func testSelectionPeerFilter(t *testing.T, service *Service) {
endorsers, err := service.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: cc1}},
options.WithPeerFilter(func(peer fab.Peer) bool {
return peer.(fab.PeerState).BlockHeight() > 1001
}),
)

assert.NoError(t, err)
assert.Equalf(t, 4, len(endorsers), "Expecting 4 endorser")

// Ensure the endorsers all have a block height > 1001
for _, endorser := range endorsers {
blockHeight := endorser.(fab.PeerState).BlockHeight()
assert.Truef(t, blockHeight > 1001, "Expecting block height to be > 1001")
}
}

func testSelectionDistribution(t *testing.T, service *Service, balancer balancer.Balancer, tolerance int) {
iterations := 1000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,11 @@ var noFilter = func(fab.Peer) bool {
return true
}

func newFilter(channelID string, ctx contextAPI.Client, peers []fab.Peer, filter options.PeerFilter, sorter options.PeerSorter) *selectionFilter {
func newFilter(ctx contextAPI.Client, peers []fab.Peer, filter options.PeerFilter, sorter options.PeerSorter) *selectionFilter {
if filter == nil {
filter = noFilter
}

if sorter == nil {
sorter = resolvePeerSorter(channelID, ctx)
}

return &selectionFilter{
ctx: ctx,
peers: peers,
Expand Down

0 comments on commit f264087

Please sign in to comment.