Skip to content

Commit

Permalink
[FAB-8954] Fix index out of range panic
Browse files Browse the repository at this point in the history
The index out of range occurs because the
peer retriever is called multiple times
during Resolve and sometomes it returns a
different sized set of peers (this can
happen if a peer goes down or comes back
up during the operation). This patch fixes
the problem by feeding the peer group
resolver a peer retriever that always
returns the same set of peers for the
duration of the call to Resolve.

Change-Id: I265943f6c2a5339ce7a411620aa016c6d5078a9c
Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed Mar 19, 2018
1 parent 406f79d commit f1fb12c
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 161 deletions.
50 changes: 37 additions & 13 deletions pkg/client/common/selection/dynamicselection/dynamicselection.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,29 @@ func (s *selectionService) GetEndorsersForChaincode(chaincodeIDs []string, opts
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("Error getting peer group resolver for chaincodes [%v] on channel [%s]", chaincodeIDs, s.channelID))
}
return resolver.Resolve(params.PeerFilter).Peers(), nil

peers, err := s.discoveryService.GetPeers()
if err != nil {
return nil, err
}

if params.PeerFilter != nil {
var filteredPeers []fab.Peer
for _, peer := range peers {
if params.PeerFilter(peer) {
filteredPeers = append(filteredPeers, peer)
} else {
logger.Debugf("Peer [%s] is not accepted by the filter and therefore peer group will be excluded.", peer.URL())
}
}
peers = filteredPeers
}

peerGroup, err := resolver.Resolve(peers)
if err != nil {
return nil, err
}
return peerGroup.Peers(), nil
}

func (s *selectionService) Close() {
Expand All @@ -199,7 +221,7 @@ func (s *selectionService) getPeerGroupResolver(chaincodeIDs []string) (pgresolv

func (s *selectionService) createPGResolver(key *resolverKey) (pgresolver.PeerGroupResolver, error) {
// Retrieve the signature policies for all of the chaincodes
var policyGroups []pgresolver.Group
var policyGroups []pgresolver.GroupRetriever
for _, ccID := range key.chaincodeIDs {
policyGroup, err := s.getPolicyGroupForCC(key.channelID, ccID)
if err != nil {
Expand All @@ -209,30 +231,32 @@ func (s *selectionService) createPGResolver(key *resolverKey) (pgresolver.PeerGr
}

// Perform an 'and' operation on all of the peer groups
aggregatePolicyGroup, err := pgresolver.NewGroupOfGroups(policyGroups).Nof(int32(len(policyGroups)))
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error computing signature policy for chaincode(s) [%v] on channel [%s]", key.chaincodeIDs, key.channelID))
aggregatePolicyGroupRetriever := func(peerRetriever pgresolver.MSPPeerRetriever) (pgresolver.GroupOfGroups, error) {
var groups []pgresolver.Group
for _, f := range policyGroups {
grps, err := f(peerRetriever)
if err != nil {
return nil, err
}
groups = append(groups, grps)
}
return pgresolver.NewGroupOfGroups(groups).Nof(int32(len(policyGroups)))
}

// Create the resolver
resolver, err := pgresolver.NewPeerGroupResolver(aggregatePolicyGroup, s.pgLBP)
resolver, err := pgresolver.NewPeerGroupResolver(aggregatePolicyGroupRetriever, s.pgLBP)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error creating peer group resolver for chaincodes [%v] on channel [%s]", key.chaincodeIDs, key.channelID))
}
return resolver, nil
}

func (s *selectionService) getPolicyGroupForCC(channelID string, ccID string) (pgresolver.Group, error) {
func (s *selectionService) getPolicyGroupForCC(channelID string, ccID string) (pgresolver.GroupRetriever, error) {
sigPolicyEnv, err := s.ccPolicyProvider.GetChaincodePolicy(ccID)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error querying chaincode [%s] on channel [%s]", ccID, channelID))
}

return pgresolver.NewSignaturePolicyCompiler(
func(mspID string) []fab.Peer {
return s.getAvailablePeers(mspID)
},
).Compile(sigPolicyEnv)
return pgresolver.CompileSignaturePolicy(sigPolicyEnv)
}

func (s *selectionService) getAvailablePeers(mspID string) []fab.Peer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/pkg/errors"
)

// MSPPeerRetriever is a function that retrieves peers by MSPID
type MSPPeerRetriever func(mspID string) []fab.Peer

// NewGroupOfGroups returns a new group of groups
func NewGroupOfGroups(groups []Group) GroupOfGroups {
items := make([]Item, len(groups))
Expand All @@ -34,7 +37,7 @@ func NewPeerGroup(peers ...fab.Peer) PeerGroup {
}

// NewMSPPeerGroup returns a new MSP PeerGroup
func NewMSPPeerGroup(mspID string, peerRetriever PeerRetriever) PeerGroup {
func NewMSPPeerGroup(mspID string, peerRetriever MSPPeerRetriever) PeerGroup {
return &mspPeerGroup{
mspID: mspID,
peerRetriever: peerRetriever,
Expand Down Expand Up @@ -268,7 +271,7 @@ func (pg *peerGroup) Collapse() Group {

type mspPeerGroup struct {
mspID string
peerRetriever PeerRetriever
peerRetriever MSPPeerRetriever
}

func (pg *mspPeerGroup) Items() []Item {
Expand Down Expand Up @@ -381,6 +384,8 @@ func (o *andOperation) and(grps []Group, index int) {
groupItems := c.group.Items()
if c.index < len(groupItems) {
items = append(items, groupItems[c.index])
} else {
logger.Warnf("Expecting index to be less than %d but got %d", len(groupItems), c.index)
}
}
if len(items) > 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,15 @@ SPDX-License-Identifier: Apache-2.0
package pgresolver

import (
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
common "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
)

// SignaturePolicyFunc is a function that evaluates a signature policy and returns a peer group hierarchy
type SignaturePolicyFunc func() (GroupOfGroups, error)

// SignaturePolicyCompiler compiles a signature policy envelope and returns a peer group hierarchy
type SignaturePolicyCompiler interface {
Compile(sigPolicyEnv *common.SignaturePolicyEnvelope) (GroupOfGroups, error)
}

// PeerRetriever is a function that returns a set of peers for the given MSP ID
type PeerRetriever func(mspID string) []fab.Peer

// PeerGroupResolver resolves a group of peers that would (exactly) satisfy
// a chaincode's endorsement policy.
type PeerGroupResolver interface {
// Resolve returns a PeerGroup ensuring that all of the peers in the group are
// in the given set of available peers. An optional peer filter may be specified
// to provide per-request filtering of peers.
// This method should never return nil but may return a PeerGroup that contains no peers.
Resolve(filter options.PeerFilter) PeerGroup
// in the given set of available peers.
Resolve(peers []fab.Peer) (PeerGroup, error)
}

// LoadBalancePolicy is used to pick a peer group from a given set of peer groups
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ SPDX-License-Identifier: Apache-2.0
package pgresolver

import (
"math/rand"
"testing"
"time"

"github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
mocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
common "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common"
Expand All @@ -27,26 +28,20 @@ const (
org10 = "Org10MSP"
)

var p1 = peer("peer1", "peer1:9999")
var p2 = peer("peer2", "peer2:9999")
var p3 = peer("peer3", "peer3:9999")
var p4 = peer("peer4", "peer4:9999")
var p5 = peer("peer5", "peer5:9999")
var p6 = peer("peer6", "peer6:9999")
var p7 = peer("peer7", "peer7:9999")
var p8 = peer("peer8", "peer8:9999")
var p9 = peer("peer9", "peer9:9999")
var p10 = peer("peer10", "peer10:9999")
var p11 = peer("peer11", "peer11:9999")
var p12 = peer("peer12", "peer12:9999")

var peersByMSPID = map[string][]fab.Peer{
org1: peers(p1, p2),
org2: peers(p3, p4),
org3: peers(p5, p6, p7),
org4: peers(p8, p9, p10),
org5: peers(p11, p12),
}
var p1 = peer("peer1", "peer1:9999", org1)
var p2 = peer("peer2", "peer2:9999", org1)
var p3 = peer("peer3", "peer3:9999", org2)
var p4 = peer("peer4", "peer4:9999", org2)
var p5 = peer("peer5", "peer5:9999", org3)
var p6 = peer("peer6", "peer6:9999", org3)
var p7 = peer("peer7", "peer7:9999", org3)
var p8 = peer("peer8", "peer8:9999", org4)
var p9 = peer("peer9", "peer9:9999", org4)
var p10 = peer("peer10", "peer10:9999", org4)
var p11 = peer("peer11", "peer11:9999", org5)
var p12 = peer("peer12", "peer12:9999", org5)

var allPeers = []fab.Peer{p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12}

var configImp = mocks.NewMockConfig()

Expand All @@ -72,10 +67,7 @@ func TestPeerGroupResolverPolicyNoAvailablePeers(t *testing.T) {

testPeerGroupResolver(
t, sigPolicyEnv,
func(mspID string) []fab.Peer {
return nil
},
expected, nil)
nil, expected, nil)
}

// 1 of [(2 of [1,2]),(2 of [1,3,4])]
Expand Down Expand Up @@ -112,29 +104,7 @@ func TestPeerGroupResolverPolicy1(t *testing.T) {
pg(p5, p8), pg(p5, p9), pg(p5, p10), pg(p6, p8), pg(p6, p9), pg(p6, p10), pg(p7, p8), pg(p7, p9), pg(p7, p10),
}

testPeerGroupResolver(t, sigPolicyEnv, retrievePeersByMSPid, expected,
func(peer fab.Peer) bool {
return true
},
)

// With peer filter
expected = []PeerGroup{
// Org1 and Org2
pg(p1, p3), pg(p1, p4), pg(p2, p3), pg(p2, p4),
// Org1 and Org3
pg(p1, p5), pg(p1, p7), pg(p2, p5), pg(p2, p7),
// Org1 and Org4
pg(p1, p8), pg(p1, p9), pg(p1, p10), pg(p2, p8), pg(p2, p9), pg(p2, p10),
// Org3 and Org4
pg(p5, p8), pg(p5, p9), pg(p5, p10), pg(p7, p8), pg(p7, p9), pg(p7, p10),
}
testPeerGroupResolver(t, sigPolicyEnv, retrievePeersByMSPid, expected,
func(peer fab.Peer) bool {
// Filter out peer6
return peer.URL() != p6.URL()
},
)
testPeerGroupResolver(t, sigPolicyEnv, allPeers, expected, nil)
}

// 1 of [(2 of [1,2]),(3 of [3,4,5])]
Expand Down Expand Up @@ -169,7 +139,7 @@ func TestPeerGroupResolverPolicy2(t *testing.T) {
pg(p7, p8, p11), pg(p7, p8, p12), pg(p7, p9, p11), pg(p7, p9, p12), pg(p7, p10, p11), pg(p7, p10, p12),
}

testPeerGroupResolver(t, sigPolicyEnv, retrievePeersByMSPid, expected, nil)
testPeerGroupResolver(t, sigPolicyEnv, allPeers, expected, nil)
}

// 2 of [(1 of [1,2]),(1 of [3,4,5])]
Expand Down Expand Up @@ -202,7 +172,7 @@ func TestPeerGroupResolverPolicy3(t *testing.T) {
pg(p3, p5), pg(p3, p6), pg(p3, p7), pg(p3, p8), pg(p3, p9), pg(p3, p10), pg(p3, p11), pg(p3, p12),
pg(p4, p5), pg(p4, p6), pg(p4, p7), pg(p4, p8), pg(p4, p9), pg(p4, p10), pg(p4, p11), pg(p4, p12),
}
testPeerGroupResolver(t, sigPolicyEnv, retrievePeersByMSPid, expected, nil)
testPeerGroupResolver(t, sigPolicyEnv, allPeers, expected, nil)
}

// 2 of [1,2,(2 of [3,4,5])]
Expand Down Expand Up @@ -238,7 +208,7 @@ func TestPeerGroupResolverPolicy4(t *testing.T) {
pg(p3, p8), pg(p3, p9), pg(p3, p10),
pg(p4, p8), pg(p4, p9), pg(p4, p10),
}
testPeerGroupResolver(t, sigPolicyEnv, retrievePeersByMSPid, expected, nil)
testPeerGroupResolver(t, sigPolicyEnv, allPeers, expected, nil)
}

// 1 of [1,(2 of [2,(1 of [3,4])])]
Expand Down Expand Up @@ -320,30 +290,81 @@ func TestPeerGroupResolverPolicy5(t *testing.T) {
pg(p4, p10, p11), pg(p4, p10, p12),
}

testPeerGroupResolver(t, sigPolicyEnv, retrievePeersByMSPid, expected, nil)
testPeerGroupResolver(t, sigPolicyEnv, allPeers, expected, nil)
}

func testPeerGroupResolver(t *testing.T, sigPolicyEnv *common.SignaturePolicyEnvelope, peerRetriever PeerRetriever, expected []PeerGroup, filter options.PeerFilter) {
// 1 of [(2 of [1,2]),(2 of [1,3,4])]
func TestPeerGroupResolverRandomPeers(t *testing.T) {
signedBy, identities, err := GetPolicies(org1, org2, org3, org4)
if err != nil {
panic(err)
}

sigPolicyEnv := &common.SignaturePolicyEnvelope{
Version: 0,
Rule: NewNOutOfPolicy(1,
NewNOutOfPolicy(2,
signedBy[o1],
signedBy[o2],
),
NewNOutOfPolicy(2,
signedBy[o1],
signedBy[o3],
signedBy[o4],
),
),
Identities: identities,
}

pgResolver, err := NewRandomPeerGroupResolver(sigPolicyEnv)
if err != nil {
t.Fatal(err)
}

var peers []fab.Peer
for _, peer := range allPeers {
if rand.Int31n(2) == 1 {
peers = append(peers, peer)
}
}

pgResolver, err := NewRoundRobinPeerGroupResolver(sigPolicyEnv, peerRetriever)
for i := 0; i < 100; i++ {
pgResolver.Resolve(peers)
}
}

func testPeerGroupResolver(t *testing.T, sigPolicyEnv *common.SignaturePolicyEnvelope, peers []fab.Peer, expected []PeerGroup, expectedErr error) {
pgResolver, err := NewRoundRobinPeerGroupResolver(sigPolicyEnv)
if err != nil {
t.Fatal(err)
}
verify(t, pgResolver, expected, filter)
verify(t, pgResolver, peers, expected, expectedErr)
}

func peer(name, url string) fab.Peer {
func peer(name, url, mspID string) fab.Peer {
mp := mocks.NewMockPeer(name, url)
mp.MockMSP = mspID
return mp
}

func peers(peers ...fab.Peer) []fab.Peer {
return peers
}

func verify(t *testing.T, pgResolver PeerGroupResolver, expectedPeerGroups []PeerGroup, filter options.PeerFilter) {
func verify(t *testing.T, pgResolver PeerGroupResolver, peers []fab.Peer, expectedPeerGroups []PeerGroup, expectedErr error) {
peerGroup, err := pgResolver.Resolve(peers)
if err != nil {
if expectedErr == nil {
t.Fatalf("got error resolving peer groups: %s", err)
}
if expectedErr.Error() != err.Error() {
t.Fatalf("expecting error [%s] but got [%s]", expectedErr, err)
}
} else if expectedErr != nil {
t.Fatalf("expecting error [%s] but got none", expectedErr)
}

for i := 0; i < len(expectedPeerGroups); i++ {
peerGroup := pgResolver.Resolve(filter)
if !containsPeerGroup(expectedPeerGroups, peerGroup) {
t.Fatalf("peer group %s is not one of the expected peer groups: %v", peerGroup, expectedPeerGroups)
}
Expand All @@ -354,10 +375,6 @@ func pg(peers ...fab.Peer) PeerGroup {
return NewPeerGroup(peers...)
}

func retrievePeersByMSPid(mspID string) []fab.Peer {
return peersByMSPID[mspID]
}

func containsPeerGroup(groups []PeerGroup, group PeerGroup) bool {
for _, g := range groups {
if containsAllPeers(group, g) {
Expand Down Expand Up @@ -387,3 +404,7 @@ func containsPeer(pg []fab.Peer, p fab.Peer) bool {
}
return false
}

func init() {
rand.Seed(time.Now().Unix())
}
Loading

0 comments on commit f1fb12c

Please sign in to comment.