From 1fed32069bd62d53b5e5d562f9bf250dbbacb9f2 Mon Sep 17 00:00:00 2001 From: Bob Stasyszyn Date: Thu, 7 Jun 2018 16:53:27 -0400 Subject: [PATCH] [FAB-9661] Selection based on Fabric's Discovery Selection Service implementation using Fabric's Discovery Service. Change-Id: I11ebac5ddb397ead8fb5bec01e461697bd5a7d41 Signed-off-by: Bob Stasyszyn --- .../dynamicdiscovery/chservice_test.go | 10 +- .../dynamicdiscovery/localservice_test.go | 10 +- .../mocks/mockdiscoveryclient.go | 104 ++++-- .../selection/fabricselection/cachekey.go | 27 ++ .../fabricselection/fabricselection.go | 308 ++++++++++++++++++ .../common/selection/fabricselection/opts.go | 80 +++++ .../fabricselection/selection_test.go | 282 ++++++++++++++++ .../fabricselection/selectionfilter.go | 107 ++++++ pkg/client/common/selection/options/opts.go | 30 +- pkg/common/errors/retry/defaults.go | 3 + pkg/common/errors/status/codes.go | 5 + pkg/common/errors/status/status.go | 4 + test/integration/sdk/sdk_provider_test.go | 27 +- 13 files changed, 951 insertions(+), 46 deletions(-) rename pkg/client/common/{discovery/dynamicdiscovery => }/mocks/mockdiscoveryclient.go (68%) create mode 100644 pkg/client/common/selection/fabricselection/cachekey.go create mode 100644 pkg/client/common/selection/fabricselection/fabricselection.go create mode 100755 pkg/client/common/selection/fabricselection/opts.go create mode 100644 pkg/client/common/selection/fabricselection/selection_test.go create mode 100644 pkg/client/common/selection/fabricselection/selectionfilter.go diff --git a/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go b/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go index 7058ed16b8..b1c4279c0b 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go +++ b/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - dyndiscmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery/dynamicdiscovery/mocks" + clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks" contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" pfab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks" @@ -49,9 +49,9 @@ func TestDiscoveryService(t *testing.T) { } ctx.SetEndpointConfig(config) - discClient := dyndiscmocks.NewMockDiscoveryClient() + discClient := clientmocks.NewMockDiscoveryClient() discClient.SetResponses( - &dyndiscmocks.MockDiscoverEndpointResponse{ + &clientmocks.MockDiscoverEndpointResponse{ PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{}, }, ) @@ -73,7 +73,7 @@ func TestDiscoveryService(t *testing.T) { assert.Equal(t, 0, len(peers)) discClient.SetResponses( - &dyndiscmocks.MockDiscoverEndpointResponse{ + &clientmocks.MockDiscoverEndpointResponse{ PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{ { MSPID: mspID1, @@ -91,7 +91,7 @@ func TestDiscoveryService(t *testing.T) { assert.Equalf(t, 1, len(peers), "Expected 1 peer") discClient.SetResponses( - &dyndiscmocks.MockDiscoverEndpointResponse{ + &clientmocks.MockDiscoverEndpointResponse{ PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{ { MSPID: mspID1, diff --git a/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go b/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go index 9157c927c1..6db5b47e2c 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go +++ b/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - dyndiscmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery/dynamicdiscovery/mocks" + clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks" contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" pfab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks" @@ -38,9 +38,9 @@ func TestLocalDiscoveryService(t *testing.T) { } config.SetCustomNetworkPeerCfg([]pfab.NetworkPeer{peer1}) - discClient := dyndiscmocks.NewMockDiscoveryClient() + discClient := clientmocks.NewMockDiscoveryClient() discClient.SetResponses( - &dyndiscmocks.MockDiscoverEndpointResponse{ + &clientmocks.MockDiscoverEndpointResponse{ PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{}, }, ) @@ -72,7 +72,7 @@ func TestLocalDiscoveryService(t *testing.T) { assert.Equal(t, 0, len(peers)) discClient.SetResponses( - &dyndiscmocks.MockDiscoverEndpointResponse{ + &clientmocks.MockDiscoverEndpointResponse{ PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{ { MSPID: mspID1, @@ -90,7 +90,7 @@ func TestLocalDiscoveryService(t *testing.T) { assert.Equal(t, 1, len(peers), "Expecting 1 peer") discClient.SetResponses( - &dyndiscmocks.MockDiscoverEndpointResponse{ + &clientmocks.MockDiscoverEndpointResponse{ PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{ { MSPID: mspID1, diff --git a/pkg/client/common/discovery/dynamicdiscovery/mocks/mockdiscoveryclient.go b/pkg/client/common/mocks/mockdiscoveryclient.go similarity index 68% rename from pkg/client/common/discovery/dynamicdiscovery/mocks/mockdiscoveryclient.go rename to pkg/client/common/mocks/mockdiscoveryclient.go index 424ceb8768..2c8a8f01e6 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/mocks/mockdiscoveryclient.go +++ b/pkg/client/common/mocks/mockdiscoveryclient.go @@ -8,6 +8,7 @@ package mocks import ( reqcontext "context" + "sort" "sync" discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client" @@ -24,11 +25,9 @@ type MockDiscoveryClient struct { lock sync.RWMutex } -// MockDiscoverEndpointResponse contains a mock response for the discover client -type MockDiscoverEndpointResponse struct { - Target string - PeerEndpoints []*discmocks.MockDiscoveryPeerEndpoint - Error error +// MockResponseBuilder builds a mock discovery response +type MockResponseBuilder interface { + Build() fabdiscovery.Response } // NewMockDiscoveryClient returns a new mock Discover service @@ -42,25 +41,14 @@ func (m *MockDiscoveryClient) Send(ctx reqcontext.Context, req *discclient.Reque } // SetResponses sets the responses that the mock client should return from the Send function -func (m *MockDiscoveryClient) SetResponses(responses ...*MockDiscoverEndpointResponse) { +func (m *MockDiscoveryClient) SetResponses(responses ...MockResponseBuilder) { m.lock.Lock() defer m.lock.Unlock() m.resp = nil for _, resp := range responses { - var peers []*discclient.Peer - for _, endpoint := range resp.PeerEndpoints { - peer := &discclient.Peer{ - MSPID: endpoint.MSPID, - AliveMessage: newAliveMessage(endpoint), - StateInfoMessage: newStateInfoMessage(endpoint), - } - peers = append(peers, peer) - } - m.resp = append(m.resp, &mockDiscoverResponse{ - Response: &response{peers: peers}, target: resp.Target, err: resp.Error, - }) + m.resp = append(m.resp, resp.Build()) } } @@ -73,24 +61,21 @@ func (m *MockDiscoveryClient) responses() []fabdiscovery.Response { type mockDiscoverResponse struct { discclient.Response target string - err error } func (r *mockDiscoverResponse) Target() string { return r.target } -func (r *mockDiscoverResponse) Error() error { - return r.err -} - type response struct { peers []*discclient.Peer + err error } func (r *response) ForChannel(string) discclient.ChannelResponse { return &channelResponse{ peers: r.peers, + err: r.err, } } @@ -102,6 +87,7 @@ func (r *response) ForLocal() discclient.LocalResponse { type channelResponse struct { peers []*discclient.Peer + err error } // Config returns a response for a config query, or error if something went wrong @@ -111,12 +97,25 @@ func (cr *channelResponse) Config() (*discovery.ConfigResult, error) { // Peers returns a response for a peer membership query, or error if something went wrong func (cr *channelResponse) Peers() ([]*discclient.Peer, error) { - return cr.peers, nil + return cr.peers, cr.err } // Endorsers returns the response for an endorser query func (cr *channelResponse) Endorsers(invocationChain discclient.InvocationChain, ps discclient.PrioritySelector, ef discclient.ExclusionFilter) (discclient.Endorsers, error) { - panic("not implemented") + if cr.err != nil { + return nil, cr.err + } + + var endorsers discclient.Endorsers + for _, endorser := range cr.peers { + if !ef.Exclude(*endorser) { + endorsers = append(endorsers, endorser) + } + } + + sortEndorsers(endorsers, ps) + + return endorsers, nil } type localResponse struct { @@ -128,6 +127,33 @@ func (cr *localResponse) Peers() ([]*discclient.Peer, error) { return cr.peers, nil } +// MockDiscoverEndpointResponse contains a mock response for the discover client +type MockDiscoverEndpointResponse struct { + Target string + PeerEndpoints []*discmocks.MockDiscoveryPeerEndpoint + Error error +} + +// Build builds a mock discovery response +func (b *MockDiscoverEndpointResponse) Build() fabdiscovery.Response { + var peers []*discclient.Peer + for _, endpoint := range b.PeerEndpoints { + peer := &discclient.Peer{ + MSPID: endpoint.MSPID, + AliveMessage: newAliveMessage(endpoint), + StateInfoMessage: newStateInfoMessage(endpoint), + } + peers = append(peers, peer) + } + return &mockDiscoverResponse{ + Response: &response{ + peers: peers, + err: b.Error, + }, + target: b.Target, + } +} + func newAliveMessage(endpoint *discmocks.MockDiscoveryPeerEndpoint) *gossip.SignedGossipMessage { return &gossip.SignedGossipMessage{ GossipMessage: &gossip.GossipMessage{ @@ -155,3 +181,31 @@ func newStateInfoMessage(endpoint *discmocks.MockDiscoveryPeerEndpoint) *gossip. }, } } + +func sortEndorsers(endorsers discclient.Endorsers, ps discclient.PrioritySelector) discclient.Endorsers { + sort.Sort(&endorserSort{ + Endorsers: endorsers, + PrioritySelector: ps, + }) + return endorsers +} + +type endorserSort struct { + discclient.Endorsers + discclient.PrioritySelector +} + +func (es *endorserSort) Len() int { + return len(es.Endorsers) +} + +func (es *endorserSort) Less(i, j int) bool { + e1 := es.Endorsers[i] + e2 := es.Endorsers[j] + less := es.Compare(*e1, *e2) + return less > discclient.Priority(0) +} + +func (es *endorserSort) Swap(i, j int) { + es.Endorsers[i], es.Endorsers[j] = es.Endorsers[j], es.Endorsers[i] +} diff --git a/pkg/client/common/selection/fabricselection/cachekey.go b/pkg/client/common/selection/fabricselection/cachekey.go new file mode 100644 index 0000000000..bd0c686836 --- /dev/null +++ b/pkg/client/common/selection/fabricselection/cachekey.go @@ -0,0 +1,27 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package fabricselection + +import "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" +import "encoding/json" + +type cacheKey struct { + chaincodes []*fab.ChaincodeCall +} + +func newCacheKey(chaincodes []*fab.ChaincodeCall) *cacheKey { + return &cacheKey{chaincodes: chaincodes} +} + +func (k *cacheKey) String() string { + bytes, err := json.Marshal(k.chaincodes) + if err != nil { + logger.Errorf("unexpected error marshalling chaincodes: %s", err) + return "" + } + return string(bytes) +} diff --git a/pkg/client/common/selection/fabricselection/fabricselection.go b/pkg/client/common/selection/fabricselection/fabricselection.go new file mode 100644 index 0000000000..03728700c8 --- /dev/null +++ b/pkg/client/common/selection/fabricselection/fabricselection.go @@ -0,0 +1,308 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package fabricselection + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client" + "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/protos/discovery" + "github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/options" + "github.com/hyperledger/fabric-sdk-go/pkg/common/errors/retry" + "github.com/hyperledger/fabric-sdk-go/pkg/common/errors/status" + "github.com/hyperledger/fabric-sdk-go/pkg/common/logging" + coptions "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + reqContext "github.com/hyperledger/fabric-sdk-go/pkg/context" + fabdiscovery "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery" + "github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/lazycache" + "github.com/hyperledger/fabric-sdk-go/pkg/util/concurrent/lazyref" + "github.com/pkg/errors" +) + +const moduleName = "fabsdk/client" + +var logger = logging.NewLogger(moduleName) + +// PeerState provides state information about the Peer +type PeerState interface { + BlockHeight() uint64 +} + +type discoveryClient interface { + Send(ctx context.Context, req *discclient.Request, targets ...fab.PeerConfig) ([]fabdiscovery.Response, error) +} + +// clientProvider is overridden by unit tests +var clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) { + return fabdiscovery.New(ctx) +} + +// Service chooses endorsing peers for a given set of chaincodes using +// Fabric's Discovery Service +type Service struct { + channelID string + responseTimeout time.Duration + ctx contextAPI.Client + discovery fab.DiscoveryService + discClient discoveryClient + chResponseCache *lazycache.Cache + retryOpts retry.Opts +} + +// New creates a new dynamic selection service using Fabric's Discovery Service +func New(ctx contextAPI.Client, channelID string, discovery fab.DiscoveryService, opts ...coptions.Opt) (*Service, error) { + options := params{retryOpts: retry.DefaultResMgmtOpts} + coptions.Apply(&options, opts) + + if options.refreshInterval == 0 { + // Use DiscoveryServiceRefresh since the selection algorithm depends on up-to-date + // information from the Discovery Client. + options.refreshInterval = ctx.EndpointConfig().Timeout(fab.DiscoveryServiceRefresh) + } + + if options.responseTimeout == 0 { + options.responseTimeout = ctx.EndpointConfig().Timeout(fab.DiscoveryResponse) + } + + logger.Debugf("Cache refresh interval: %s", options.refreshInterval) + logger.Debugf("Deliver service response timeout: %s", options.responseTimeout) + + discoveryClient, err := clientProvider(ctx) + if err != nil { + return nil, errors.Wrap(err, "error creating discover client") + } + + s := &Service{ + channelID: channelID, + ctx: ctx, + responseTimeout: options.responseTimeout, + discovery: discovery, + discClient: discoveryClient, + retryOpts: options.retryOpts, + } + + s.chResponseCache = lazycache.New( + "Channel_Response_Cache", + func(key lazycache.Key) (interface{}, error) { + return s.newChannelResponseRef(key.(*cacheKey).chaincodes, options.refreshInterval), nil + }, + ) + + return s, nil +} + +// GetEndorsersForChaincode returns the endorsing peers for the given chaincodes +func (s *Service) GetEndorsersForChaincode(chaincodes []*fab.ChaincodeCall, opts ...coptions.Opt) ([]fab.Peer, error) { + logger.Debugf("Getting endorsers for chaincodes [%#v]...", chaincodes) + if len(chaincodes) == 0 { + return nil, errors.New("no chaincode IDs provided") + } + + chResponse, err := s.getChannelResponse(chaincodes) + if err != nil { + return nil, errors.Wrapf(err, "error getting channel response for channel [%s]", s.channelID) + } + + peers, err := s.discovery.GetPeers() + if err != nil { + return nil, errors.Wrapf(err, "error getting peers from discovery service for channel [%s]", s.channelID) + } + + params := options.NewParams(opts) + + endpoints, err := chResponse.Endorsers(asInvocationChain(chaincodes), newSelector(params.PrioritySelector), newFilter(params.PeerFilter, peers)) + if err != nil { + return nil, errors.Wrap(err, "error getting endorsers from channel response") + } + + return asPeers(s.ctx, endpoints), nil +} + +// Close closes all resources associated with the service +func (s *Service) Close() { + logger.Debug("Closing channel response cache") + s.chResponseCache.Close() +} + +func (s *Service) newChannelResponseRef(chaincodes []*fab.ChaincodeCall, refreshInterval time.Duration) *lazyref.Reference { + return lazyref.New( + func() (interface{}, error) { + if logging.IsEnabledFor(moduleName, logging.DEBUG) { + key, _ := json.Marshal(chaincodes) + logger.Debugf("Refreshing endorsers for chaincodes [%s] in channel [%s] from discovery service...", key, s.channelID) + } + return s.queryEndorsers(chaincodes) + }, + lazyref.WithRefreshInterval(lazyref.InitImmediately, refreshInterval), + ) +} + +func (s *Service) getChannelResponse(chaincodes []*fab.ChaincodeCall) (discclient.ChannelResponse, error) { + key := newCacheKey(chaincodes) + ref, err := s.chResponseCache.Get(key) + if err != nil { + return nil, err + } + chResp, err := ref.(*lazyref.Reference).Get() + if err != nil { + return nil, err + } + return chResp.(discclient.ChannelResponse), nil +} + +func (s *Service) queryEndorsers(chaincodes []*fab.ChaincodeCall) (discclient.ChannelResponse, error) { + logger.Debugf("Querying discovery service for endorsers for chaincodes: %#v", chaincodes) + + targets, err := s.getTargets(s.ctx) + if err != nil { + return nil, err + } + if len(targets) == 0 { + return nil, errors.Errorf("no peers configured for channel [%s]", s.channelID) + } + + req, err := discclient.NewRequest().OfChannel(s.channelID).AddEndorsersQuery(asChaincodeInterests(chaincodes)) + if err != nil { + return nil, errors.Wrapf(err, "error creating endorser query request") + } + + chResponse, err := retry.NewInvoker(retry.New(s.retryOpts)).Invoke( + func() (interface{}, error) { + return s.query(req, chaincodes, targets) + }, + ) + + if err != nil { + return nil, err + } + return chResponse.(discclient.ChannelResponse), err +} + +func (s *Service) query(req *discclient.Request, chaincodes []*fab.ChaincodeCall, targets []fab.PeerConfig) (discclient.ChannelResponse, error) { + logger.Debugf("Querying Discovery Service for endorsers for chaincodes: %#v on channel [%s]", chaincodes, s.channelID) + reqCtx, cancel := reqContext.NewRequest(s.ctx, reqContext.WithTimeout(s.responseTimeout)) + defer cancel() + + responses, err := s.discClient.Send(reqCtx, req, targets...) + if err != nil { + if len(responses) == 0 { + return nil, errors.Wrapf(err, "error calling discover service send") + } + logger.Warnf("Received %d response(s) and one or more errors from discovery client: %s", len(responses), err) + } + + if len(responses) == 0 { + return nil, errors.New("no successful response received from any peer") + } + + // TODO: In a future patch: + // - validate the signatures in the responses + // For now just pick the first successful response + + invocChain := asInvocationChain(chaincodes) + + var lastErr error + for _, response := range responses { + chResp := response.ForChannel(s.channelID) + // Make sure the target didn't return an error + _, err := chResp.Endorsers(invocChain, discclient.NoPriorities, discclient.NoExclusion) + if err != nil { + lastErr = errors.Wrapf(err, "error getting endorsers from target [%s]", response.Target()) + logger.Debugf(lastErr.Error()) + continue + } + return chResp, nil + } + + logger.Warn(lastErr.Error()) + + if strings.Contains(lastErr.Error(), "failed constructing descriptor for chaincodes") { + errMsg := fmt.Sprintf("error received from Discovery Server: %s", lastErr) + return nil, status.New(status.DiscoveryServerStatus, int32(status.QueryEndorsers), errMsg, []interface{}{}) + } + + return nil, lastErr +} + +func (s *Service) getTargets(ctx contextAPI.Client) ([]fab.PeerConfig, error) { + // TODO: The number of peers to query should be retrieved from the channel policy. + // This will done in a future patch. + chpeers, ok := ctx.EndpointConfig().ChannelPeers(s.channelID) + if !ok { + return nil, errors.Errorf("failed to get peer configs for channel [%s]", s.channelID) + } + targets := make([]fab.PeerConfig, len(chpeers)) + for i := 0; i < len(targets); i++ { + targets[i] = chpeers[i].NetworkPeer.PeerConfig + } + return targets, nil +} + +func asChaincodeInterests(chaincodes []*fab.ChaincodeCall) *discovery.ChaincodeInterest { + return &discovery.ChaincodeInterest{ + Chaincodes: asInvocationChain(chaincodes), + } +} + +func asInvocationChain(chaincodes []*fab.ChaincodeCall) discclient.InvocationChain { + var invocChain discclient.InvocationChain + for _, cc := range chaincodes { + invocChain = append(invocChain, &discovery.ChaincodeCall{ + Name: cc.ID, + CollectionNames: cc.Collections, + }) + } + return invocChain +} + +func asPeers(ctx contextAPI.Client, endpoints []*discclient.Peer) []fab.Peer { + var peers []fab.Peer + for _, endpoint := range endpoints { + peer, err := asPeer(ctx, endpoint) + if err != nil { + logger.Warnf(err.Error()) + continue + } + peers = append(peers, peer) + } + return peers +} + +func asPeer(ctx contextAPI.Client, endpoint *discclient.Peer) (fab.Peer, error) { + url := endpoint.AliveMessage.GetAliveMsg().Membership.Endpoint + + peerConfig, found := ctx.EndpointConfig().PeerConfig(url) + if !found { + logger.Warnf("Peer config not found for url [%s]", url) + return nil, errors.Errorf("peer config not found for [%s]", url) + } + + peer, err := ctx.InfraProvider().CreatePeerFromConfig(&fab.NetworkPeer{PeerConfig: *peerConfig, MSPID: endpoint.MSPID}) + if err != nil { + return nil, errors.Wrapf(err, "unable to create peer config for [%s]", url) + } + + return &peerEndpoint{ + Peer: peer, + blockHeight: endpoint.StateInfoMessage.GetStateInfo().GetProperties().LedgerHeight, + }, nil +} + +type peerEndpoint struct { + fab.Peer + blockHeight uint64 +} + +func (p *peerEndpoint) BlockHeight() uint64 { + return p.blockHeight +} diff --git a/pkg/client/common/selection/fabricselection/opts.go b/pkg/client/common/selection/fabricselection/opts.go new file mode 100755 index 0000000000..4c1fbd83f5 --- /dev/null +++ b/pkg/client/common/selection/fabricselection/opts.go @@ -0,0 +1,80 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package fabricselection + +import ( + "time" + + "github.com/hyperledger/fabric-sdk-go/pkg/common/errors/retry" + + coptions "github.com/hyperledger/fabric-sdk-go/pkg/common/options" +) + +type params struct { + refreshInterval time.Duration + responseTimeout time.Duration + retryOpts retry.Opts +} + +// WithRefreshInterval sets the interval in which the +// peer cache is refreshed +func WithRefreshInterval(value time.Duration) coptions.Opt { + return func(p coptions.Params) { + logger.Debug("Checking refreshIntervalSetter") + if setter, ok := p.(refreshIntervalSetter); ok { + setter.SetRefreshInterval(value) + } + } +} + +// WithResponseTimeout sets the Discover service response timeout +func WithResponseTimeout(value time.Duration) coptions.Opt { + return func(p coptions.Params) { + logger.Debug("Checking responseTimeoutSetter") + if setter, ok := p.(responseTimeoutSetter); ok { + setter.SetResponseTimeout(value) + } + } +} + +// WithRetryOpts sets retry options for retries on transient errors +// from the Discovery Server +func WithRetryOpts(value retry.Opts) coptions.Opt { + return func(p coptions.Params) { + logger.Debug("Checking retryOptsSetter") + if setter, ok := p.(retryOptsSetter); ok { + setter.SetRetryOpts(value) + } + } +} + +type refreshIntervalSetter interface { + SetRefreshInterval(value time.Duration) +} + +type responseTimeoutSetter interface { + SetResponseTimeout(value time.Duration) +} + +type retryOptsSetter interface { + SetRetryOpts(value retry.Opts) +} + +func (o *params) SetRefreshInterval(value time.Duration) { + logger.Debugf("RefreshInterval: %s", value) + o.refreshInterval = value +} + +func (o *params) SetResponseTimeout(value time.Duration) { + logger.Debugf("ResponseTimeout: %s", value) + o.responseTimeout = value +} + +func (o *params) SetRetryOpts(value retry.Opts) { + logger.Debugf("RetryOpts: %#v", value) + o.retryOpts = value +} diff --git a/pkg/client/common/selection/fabricselection/selection_test.go b/pkg/client/common/selection/fabricselection/selection_test.go new file mode 100644 index 0000000000..540930ec25 --- /dev/null +++ b/pkg/client/common/selection/fabricselection/selection_test.go @@ -0,0 +1,282 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package fabricselection + +import ( + "fmt" + "strings" + "testing" + "time" + + clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/options" + contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + fab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + channelID = "testchannel" + cc1 = "cc1" + cc1Col1 = "cc1col1" + cc1Col2 = "cc1col2" + cc2 = "cc2" + cc2Col1 = "cc2col1" + + mspID1 = "Org1MSP" + peer1Org1URL = "peer1.org1.com:9999" + peer2Org1URL = "peer2.org1.com:9999" + + mspID2 = "Org2MSP" + peer1Org2URL = "peer1.org2.com:9999" + peer2Org2URL = "peer2.org2.com:9999" + + mspID3 = "Org3MSP" + peer1Org3URL = "peer1.org3.com:9999" + peer2Org3URL = "peer2.org3.com:9999" +) + +var ( + peer1Org1 = mocks.NewMockPeer("p11", peer1Org1URL) + peer2Org1 = mocks.NewMockPeer("p12", peer2Org1URL) + peer1Org2 = mocks.NewMockPeer("p21", peer1Org2URL) + peer2Org2 = mocks.NewMockPeer("p22", peer2Org2URL) + peer1Org3 = mocks.NewMockPeer("p31", peer1Org3URL) + peer2Org3 = mocks.NewMockPeer("p32", peer2Org3URL) + + peerConfigOrg1 = fab.NetworkPeer{ + PeerConfig: fab.PeerConfig{ + URL: peer1Org1URL, + }, + MSPID: mspID1, + } + peerConfigOrg2 = fab.NetworkPeer{ + PeerConfig: fab.PeerConfig{ + URL: peer1Org2URL, + }, + MSPID: mspID2, + } + channelPeers = []fab.ChannelPeer{ + { + NetworkPeer: peerConfigOrg1, + }, + { + NetworkPeer: peerConfigOrg2, + }, + } + + peer1Org1Endpoint = &discmocks.MockDiscoveryPeerEndpoint{ + MSPID: mspID1, + Endpoint: peer1Org1URL, + LedgerHeight: 1000, + } + peer2Org1Endpoint = &discmocks.MockDiscoveryPeerEndpoint{ + MSPID: mspID1, + Endpoint: peer2Org1URL, + LedgerHeight: 1002, + } + peer1Org2Endpoint = &discmocks.MockDiscoveryPeerEndpoint{ + MSPID: mspID2, + Endpoint: peer1Org2URL, + LedgerHeight: 1001, + } + peer2Org2Endpoint = &discmocks.MockDiscoveryPeerEndpoint{ + MSPID: mspID2, + Endpoint: peer2Org2URL, + LedgerHeight: 1003, + } + peer1Org3Endpoint = &discmocks.MockDiscoveryPeerEndpoint{ + MSPID: mspID3, + Endpoint: peer1Org3URL, + LedgerHeight: 1000, + } + peer2Org3Endpoint = &discmocks.MockDiscoveryPeerEndpoint{ + MSPID: mspID3, + Endpoint: peer2Org3URL, + LedgerHeight: 1003, + } + + cc1ChaincodeCall = &fab.ChaincodeCall{ + ID: cc1, + Collections: []string{cc1Col1, cc1Col2}, + } + cc2ChaincodeCall = &fab.ChaincodeCall{ + ID: cc2, + Collections: []string{cc2Col1}, + } +) + +func TestSelection(t *testing.T) { + ctx := mocks.NewMockContext(mspmocks.NewMockSigningIdentity("test", mspID1)) + config := &config{ + EndpointConfig: mocks.NewMockEndpointConfig(), + peers: channelPeers, + } + ctx.SetEndpointConfig(config) + + discClient := clientmocks.NewMockDiscoveryClient() + + clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) { + return discClient, nil + } + + service, err := New( + ctx, channelID, + mocks.NewMockDiscoveryService(nil, peer1Org1, peer2Org1, peer1Org2, peer2Org2, peer1Org3, peer2Org3), + WithRefreshInterval(500*time.Millisecond), + WithResponseTimeout(2*time.Second), + ) + require.NoError(t, err) + defer service.Close() + + // Error condition + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{}, + Error: fmt.Errorf("simulated response error"), + }, + ) + endorsers, err := service.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: cc1}}) + assert.Error(t, err) + fmt.Printf("err: %s\n", err) + assert.Equal(t, 0, len(endorsers)) + + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{ + peer2Org1Endpoint, peer2Org3Endpoint, peer2Org2Endpoint, + peer1Org1Endpoint, peer1Org2Endpoint, peer1Org3Endpoint, + }, + }, + ) + + // Wait for cache to refresh + time.Sleep(1 * time.Second) + + // Test multiple chaincodes + endorsers, err = service.GetEndorsersForChaincode([]*fab.ChaincodeCall{cc1ChaincodeCall, cc2ChaincodeCall}) + + assert.NoError(t, err) + assert.Equalf(t, 6, len(endorsers), "Expecting 6 endorser") + + // Test peer filter + endorsers, err = service.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: cc1}}, + options.WithPeerFilter(func(peer fab.Peer) bool { + return peer.(PeerState).BlockHeight() > 1001 + }), + ) + + assert.NoError(t, err) + assert.Equalf(t, 3, len(endorsers), "Expecting 3 endorser") + + // Ensure the endorsers all have a block height > 1001 and they are returned in descending order of block height + lastBlockHeight := uint64(9999999) + for _, endorser := range endorsers { + blockHeight := endorser.(PeerState).BlockHeight() + assert.Truef(t, blockHeight > 1001, "Expecting block height to be > 1001") + assert.Truef(t, blockHeight <= lastBlockHeight, "Expecting endorsers to be returned in order of descending block height. Block Height: %d, Last Block Height: %d", blockHeight, lastBlockHeight) + lastBlockHeight = blockHeight + } + + // Test priority selector + endorsers, err = service.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: cc1}}, + options.WithPrioritySelector(func(peer1, peer2 fab.Peer) int { + // Return peers in alphabetical order + if peer1.URL() < peer2.URL() { + return 1 + } + if peer1.URL() > peer2.URL() { + return -1 + } + return 0 + }), + ) + + assert.NoError(t, err) + assert.Equalf(t, 6, len(endorsers), "Expecting 6 endorser") + + var lastURL string + for _, endorser := range endorsers { + if lastURL != "" { + assert.Truef(t, endorser.URL() <= lastURL, "Expecting endorsers in alphabetical order") + } + lastURL = endorser.URL() + } +} + +func TestWithDiscoveryFilter(t *testing.T) { + ctx := mocks.NewMockContext(mspmocks.NewMockSigningIdentity("test", mspID1)) + config := &config{ + EndpointConfig: mocks.NewMockEndpointConfig(), + peers: channelPeers, + } + ctx.SetEndpointConfig(config) + + discClient := clientmocks.NewMockDiscoveryClient() + clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) { + return discClient, nil + } + + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{ + peer2Org1Endpoint, peer2Org3Endpoint, peer2Org2Endpoint, + peer1Org1Endpoint, peer1Org2Endpoint, peer1Org3Endpoint, + }, + }, + ) + + // DiscoveryService error + expectedDiscoveryErrMsg := "simulated discovery service error" + service, err := New( + ctx, channelID, + mocks.NewMockDiscoveryService(fmt.Errorf(expectedDiscoveryErrMsg)), + WithRefreshInterval(500*time.Millisecond), + WithResponseTimeout(2*time.Second), + ) + require.NoError(t, err) + _, err = service.GetEndorsersForChaincode([]*fab.ChaincodeCall{cc1ChaincodeCall}) + assert.Truef(t, strings.Contains(err.Error(), expectedDiscoveryErrMsg), "expected error due to discovery error") + service.Close() + + // DiscoveryService - only 4 peers + service, err = New( + ctx, channelID, + mocks.NewMockDiscoveryService(nil, peer1Org1, peer2Org1, peer2Org2, peer2Org3), + WithRefreshInterval(500*time.Millisecond), + WithResponseTimeout(2*time.Second), + ) + require.NoError(t, err) + endorsers, err := service.GetEndorsersForChaincode([]*fab.ChaincodeCall{cc1ChaincodeCall}) + assert.NoError(t, err) + assert.Equalf(t, 4, len(endorsers), "Expecting 4 endorser") + + // With peer filter + endorsers, err = service.GetEndorsersForChaincode([]*fab.ChaincodeCall{cc1ChaincodeCall}, + options.WithPeerFilter(func(peer fab.Peer) bool { + return peer.(PeerState).BlockHeight() > 1001 + })) + assert.NoError(t, err) + assert.Equalf(t, 3, len(endorsers), "Expecting 3 endorser") + service.Close() +} + +type config struct { + fab.EndpointConfig + peers []fab.ChannelPeer +} + +func (c *config) ChannelPeers(name string) ([]fab.ChannelPeer, bool) { + if len(c.peers) == 0 { + return nil, false + } + return c.peers, true +} diff --git a/pkg/client/common/selection/fabricselection/selectionfilter.go b/pkg/client/common/selection/fabricselection/selectionfilter.go new file mode 100644 index 0000000000..e765bfdce7 --- /dev/null +++ b/pkg/client/common/selection/fabricselection/selectionfilter.go @@ -0,0 +1,107 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package fabricselection + +import ( + "context" + + discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client" + "github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/options" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" +) + +type selectionFilter struct { + peers []fab.Peer + filter options.PeerFilter +} + +func newFilter(filter options.PeerFilter, peers []fab.Peer) *selectionFilter { + return &selectionFilter{ + peers: peers, + filter: filter, + } +} + +func (s *selectionFilter) Exclude(endpoint discclient.Peer) bool { + logger.Debugf("Calling peer filter on endpoint [%s]", endpoint.AliveMessage.GetAliveMsg().Membership.Endpoint) + + peer := asPeerValue(&endpoint) + + // The peer must be included in the set of peers returned from fab.DiscoveryService. + // (Note that DiscoveryService may return a filtered set of peers, depending on how the + // SDK was configured, so we need to exclude those peers from selection.) + if !containsPeer(s.peers, peer) { + logger.Debugf("Excluding peer [%s] since it isn't in the set of peers returned by the discovery service", peer.URL()) + return true + } + + // Apply the PeerFilter (if any) + if s.filter != nil && !s.filter(peer) { + logger.Debugf("Excluding peer [%s] since it was excluded by the peer filter", peer.URL()) + return true + } + + return false +} + +type prioritySelector struct { + selector options.PrioritySelector +} + +func newSelector(selector options.PrioritySelector) discclient.PrioritySelector { + if selector != nil { + return &prioritySelector{selector: selector} + } + return discclient.PrioritiesByHeight +} + +func (s *prioritySelector) Compare(endpoint1, endpoint2 discclient.Peer) discclient.Priority { + logger.Debugf("Calling priority selector on endpoint1 [%s] and endpoint2 [%s]", endpoint1.AliveMessage.GetAliveMsg().Membership.Endpoint, endpoint2.AliveMessage.GetAliveMsg().Membership.Endpoint) + return discclient.Priority(s.selector(asPeerValue(&endpoint1), asPeerValue(&endpoint2))) +} + +// asPeerValue converts the discovery endpoint into a light-weight peer value (i.e. without the GRPC config) +// so that it may used by a peer filter +func asPeerValue(endpoint *discclient.Peer) fab.Peer { + url := endpoint.AliveMessage.GetAliveMsg().Membership.Endpoint + return &peerEndpointValue{ + mspID: endpoint.MSPID, + url: url, + blockHeight: endpoint.StateInfoMessage.GetStateInfo().GetProperties().LedgerHeight, + } +} + +func containsPeer(peers []fab.Peer, peer fab.Peer) bool { + for _, p := range peers { + if p.URL() == peer.URL() { + return true + } + } + return false +} + +type peerEndpointValue struct { + mspID string + url string + blockHeight uint64 +} + +func (p *peerEndpointValue) MSPID() string { + return p.mspID +} + +func (p *peerEndpointValue) URL() string { + return p.url +} + +func (p *peerEndpointValue) BlockHeight() uint64 { + return p.blockHeight +} + +func (p *peerEndpointValue) ProcessTransactionProposal(context.Context, fab.ProcessProposalRequest) (*fab.TransactionProposalResponse, error) { + panic("not implemented") +} diff --git a/pkg/client/common/selection/options/opts.go b/pkg/client/common/selection/options/opts.go index 4683e72ab9..b36dd70fce 100755 --- a/pkg/client/common/selection/options/opts.go +++ b/pkg/client/common/selection/options/opts.go @@ -17,9 +17,17 @@ var logger = logging.NewLogger("fabsdk/client") // PeerFilter filters out unwanted peers type PeerFilter func(peer fab.Peer) bool +// PrioritySelector determines how likely a peer is to be +// selected over another peer. +// A positive return value means peer1 is selected; +// negative return value means the peer2 is selected; +// zero return value means their priorities are the same +type PrioritySelector func(peer1, peer2 fab.Peer) int + // Params defines the parameters of a selection service request type Params struct { - PeerFilter PeerFilter + PeerFilter PeerFilter + PrioritySelector PrioritySelector } // NewParams creates new parameters based on the provided options @@ -38,6 +46,16 @@ func WithPeerFilter(value PeerFilter) copts.Opt { } } +// WithPrioritySelector sets a priority selector function which provides per-request +// prioritization of peers +func WithPrioritySelector(value PrioritySelector) copts.Opt { + return func(p copts.Params) { + if setter, ok := p.(prioritySelectorSetter); ok { + setter.SetPrioritySelector(value) + } + } +} + type peerFilterSetter interface { SetPeerFilter(value PeerFilter) } @@ -47,3 +65,13 @@ func (p *Params) SetPeerFilter(value PeerFilter) { logger.Debugf("PeerFilter: %#+v", value) p.PeerFilter = value } + +type prioritySelectorSetter interface { + SetPrioritySelector(value PrioritySelector) +} + +// SetPrioritySelector sets the priority selector +func (p *Params) SetPrioritySelector(value PrioritySelector) { + logger.Debugf("PrioritySelector: %#+v", value) + p.PrioritySelector = value +} diff --git a/pkg/common/errors/retry/defaults.go b/pkg/common/errors/retry/defaults.go index d7ecae6bf5..aaa995b10c 100644 --- a/pkg/common/errors/retry/defaults.go +++ b/pkg/common/errors/retry/defaults.go @@ -123,6 +123,9 @@ var ResMgmtDefaultRetryableCodes = map[status.Group][]status.Code{ status.GRPCTransportStatus: { status.Code(grpcCodes.Unavailable), }, + status.DiscoveryServerStatus: { + status.QueryEndorsers, + }, } // ChannelClientRetryableCodes are the suggested codes that should be treated as diff --git a/pkg/common/errors/status/codes.go b/pkg/common/errors/status/codes.go index 54a11e8f8c..ac6c6165a5 100644 --- a/pkg/common/errors/status/codes.go +++ b/pkg/common/errors/status/codes.go @@ -48,6 +48,10 @@ const ( // MissingEndorsement is if an endoresement is missing MissingEndorsement Code = 9 + // QueryEndorsers error indicates that no endorser group was found that would + // satisfy the chaincode policy + QueryEndorsers Code = 11 + // PrematureChaincodeExecution indicates that an attempt was made to invoke a chaincode that's // in the process of being launched. PrematureChaincodeExecution Code = 21 @@ -69,6 +73,7 @@ var CodeName = map[int32]string{ 8: "SIGNATURE_VERIFICATION_FAILED", 9: "MISSING_ENDORSEMENT", 10: "CHAINCODE_ERROR", + 11: "QUERY_ENDORSERS", 21: "NO_MATCHING_CERTIFICATE_AUTHORITY_ENTITY", 22: "NO_MATCHING_PEER_ENTITY", 23: "NO_MATCHING_ORDERER_ENTITY", diff --git a/pkg/common/errors/status/status.go b/pkg/common/errors/status/status.go index 32877e0589..144acc0d85 100644 --- a/pkg/common/errors/status/status.go +++ b/pkg/common/errors/status/status.go @@ -78,6 +78,9 @@ const ( // ChaincodeStatus defines the status codes returned by chaincode ChaincodeStatus + + // DiscoveryServerStatus status returned by the Discovery Server + DiscoveryServerStatus ) // GroupName maps the groups in this packages to human-readable strings @@ -93,6 +96,7 @@ var GroupName = map[int32]string{ 8: "Orderer Client Status", 9: "Client Status", 10: "Chaincode status", + 11: "Discovery status", } func (g Group) String() string { diff --git a/test/integration/sdk/sdk_provider_test.go b/test/integration/sdk/sdk_provider_test.go index 8008613dcb..51c5aca4ac 100644 --- a/test/integration/sdk/sdk_provider_test.go +++ b/test/integration/sdk/sdk_provider_test.go @@ -9,6 +9,7 @@ package sdk import ( "strconv" "testing" + "time" "github.com/hyperledger/fabric-sdk-go/test/integration" @@ -69,18 +70,24 @@ func TestDynamicSelection(t *testing.T) { t.Fatalf("Failed to move funds: %s", err) } - // Verify move funds transaction result - response, err = chClient.Query(channel.Request{ChaincodeID: chainCodeID, Fcn: "invoke", Args: integration.ExampleCCQueryArgs()}) - if err != nil { - t.Fatalf("Failed to query funds after transaction: %s", err) - } - valueInt, _ := strconv.Atoi(string(value)) - valueAfterInvokeInt, _ := strconv.Atoi(string(response.Payload)) - if valueInt+1 != valueAfterInvokeInt { - t.Fatalf("Execute failed. Before: %s, after: %s", value, response.Payload) - } + success := false + for i := 0; i < 5; i++ { + // Verify move funds transaction result + response, err = chClient.Query(channel.Request{ChaincodeID: chainCodeID, Fcn: "invoke", Args: integration.ExampleCCQueryArgs()}) + if err != nil { + t.Fatalf("Failed to query funds after transaction: %s", err) + } + valueAfterInvokeInt, _ := strconv.Atoi(string(response.Payload)) + if valueInt+1 == valueAfterInvokeInt { + success = true + break + } + t.Logf("Execute failed. Before: %s, after: %s", value, response.Payload) + time.Sleep(2 * time.Second) + } + require.Truef(t, success, "Execute failed. Value was not updated") } // DynamicSelectionProviderFactory is configured with dynamic (endorser) selection provider