diff --git a/pkg/client/common/discovery/dynamicdiscovery/chservice.go b/pkg/client/common/discovery/dynamicdiscovery/chservice.go index 0aa684a041..4de6afb244 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/chservice.go +++ b/pkg/client/common/discovery/dynamicdiscovery/chservice.go @@ -17,10 +17,6 @@ import ( "github.com/pkg/errors" ) -const ( - accessDenied = "access denied" -) - // ChannelService implements a dynamic Discovery Service that queries // Fabric's Discovery service for information about the peers that // are currently joined to the given channel. @@ -53,6 +49,17 @@ func (s *ChannelService) Close() { } func (s *ChannelService) queryPeers() ([]fab.Peer, error) { + peers, err := s.doQueryPeers() + + if err != nil && s.ErrHandler != nil { + logger.Infof("[%s] Got error from discovery query: %s. Invoking error handler", s.channelID, err) + s.ErrHandler(s.ctx, s.channelID, err) + } + + return peers, err +} + +func (s *ChannelService) doQueryPeers() ([]fab.Peer, error) { logger.Debugf("Refreshing peers of channel [%s] from discovery service...", s.channelID) ctx := s.context() @@ -105,7 +112,7 @@ func (s *ChannelService) evaluate(ctx contextAPI.Client, responses []fabdiscover for _, response := range responses { endpoints, err := response.ForChannel(s.channelID).Peers() if err != nil { - lastErr = newDiscoveryError(err) + lastErr = DiscoveryError(err) logger.Warnf("error getting peers from discovery response: %s", lastErr) continue } @@ -141,15 +148,3 @@ type peerEndpoint struct { func (p *peerEndpoint) BlockHeight() uint64 { return p.blockHeight } - -type discoveryError struct { - error -} - -func newDiscoveryError(cause error) *discoveryError { - return &discoveryError{error: cause} -} - -func (e *discoveryError) IsFatal() bool { - return e.Error() == accessDenied -} diff --git a/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go b/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go index a312cb0221..88014861f2 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go +++ b/pkg/client/common/discovery/dynamicdiscovery/chservice_test.go @@ -16,6 +16,7 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery" clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks" contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" pfab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks" "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" @@ -64,10 +65,19 @@ func TestDiscoveryService(t *testing.T) { return discClient, nil }) + var service *ChannelService service, err := NewChannelService( ctx, mocks.NewMockMembership(), ch, WithRefreshInterval(10*time.Millisecond), WithResponseTimeout(100*time.Millisecond), + WithErrorHandler( + func(ctxt fab.ClientContext, channelID string, err error) { + derr, ok := err.(DiscoveryError) + if ok && derr.Error() == AccessDenied { + service.Close() + } + }, + ), ) require.NoError(t, err) defer service.Close() @@ -139,7 +149,7 @@ func TestDiscoveryService(t *testing.T) { // Fatal error (access denied can be due due a user being revoked) discClient.SetResponses( &clientmocks.MockDiscoverEndpointResponse{ - Error: errors.New(accessDenied), + Error: errors.New(AccessDenied), }, ) @@ -148,7 +158,7 @@ func TestDiscoveryService(t *testing.T) { // The discovery service should have been closed _, err = service.GetPeers() require.Error(t, err) - assert.Equal(t, "Discovery client has been closed due to error: access denied", err.Error()) + assert.Equal(t, "Discovery client has been closed", err.Error()) } func TestDiscoveryServiceWithNewOrgJoined(t *testing.T) { diff --git a/pkg/client/common/discovery/dynamicdiscovery/localservice.go b/pkg/client/common/discovery/dynamicdiscovery/localservice.go index 5a10c4b4a7..8eef479179 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/localservice.go +++ b/pkg/client/common/discovery/dynamicdiscovery/localservice.go @@ -50,6 +50,17 @@ func (s *LocalService) localContext() contextAPI.Local { } func (s *LocalService) queryPeers() ([]fab.Peer, error) { + peers, err := s.doQueryPeers() + + if err != nil && s.ErrHandler != nil { + logger.Debugf("Got error from discovery query: %s. Invoking error handler", err) + s.ErrHandler(s.ctx, "", err) + } + + return peers, err +} + +func (s *LocalService) doQueryPeers() ([]fab.Peer, error) { logger.Debug("Refreshing local peers from discovery service...") ctx := s.localContext() @@ -77,7 +88,7 @@ func (s *LocalService) queryPeers() ([]fab.Peer, error) { response := responses[0] endpoints, err := response.ForLocal().Peers() if err != nil { - return nil, errors.Wrap(err, "error getting peers from discovery response") + return nil, DiscoveryError(err) } return s.filterLocalMSP(asPeers(ctx, endpoints)), nil diff --git a/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go b/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go index 771b97c86e..d71d84dfd2 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go +++ b/pkg/client/common/discovery/dynamicdiscovery/localservice_test.go @@ -9,16 +9,19 @@ SPDX-License-Identifier: Apache-2.0 package dynamicdiscovery import ( + "errors" "testing" "time" clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks" contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" pfab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks" "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -58,8 +61,16 @@ func TestLocalDiscoveryService(t *testing.T) { service = newLocalService( config, mspID1, - WithRefreshInterval(500*time.Millisecond), + WithRefreshInterval(3*time.Millisecond), WithResponseTimeout(2*time.Second), + WithErrorHandler( + func(ctxt fab.ClientContext, channelID string, err error) { + derr, ok := err.(DiscoveryError) + if ok && derr.Error() == AccessDenied { + service.Close() + } + }, + ), ) defer service.Close() @@ -122,4 +133,19 @@ func TestLocalDiscoveryService(t *testing.T) { for _, p := range peers { assert.Equalf(t, mspID1, p.MSPID(), "Expecting peer to be in MSP [%s]", mspID1) } + + // Fatal error (access denied can be due due a user being revoked) + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + Error: errors.New(AccessDenied), + }, + ) + + // Wait for the cache to refresh + time.Sleep(10 * time.Millisecond) + + // The discovery service should have been closed + _, err = service.GetPeers() + require.Error(t, err) + assert.Equal(t, "Discovery client has been closed", err.Error()) } diff --git a/pkg/client/common/discovery/dynamicdiscovery/opts.go b/pkg/client/common/discovery/dynamicdiscovery/opts.go index 983a3fb01e..37201f4ac8 100755 --- a/pkg/client/common/discovery/dynamicdiscovery/opts.go +++ b/pkg/client/common/discovery/dynamicdiscovery/opts.go @@ -10,11 +10,13 @@ import ( "time" coptions "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" ) type options struct { refreshInterval time.Duration responseTimeout time.Duration + errHandler fab.ErrorHandler } // WithRefreshInterval sets the interval in which the @@ -38,6 +40,17 @@ func WithResponseTimeout(value time.Duration) coptions.Opt { } } +// WithErrorHandler sets the error handler +func WithErrorHandler(value fab.ErrorHandler) coptions.Opt { + return func(p coptions.Params) { + logger.Debugf("Checking errHandlerSetter") + if setter, ok := p.(errHandlerSetter); ok { + logger.Debugf("... setting error handler") + setter.SetErrorHandler(value) + } + } +} + type refreshIntervalSetter interface { SetDiscoveryRefreshInterval(value time.Duration) } @@ -46,6 +59,10 @@ type responseTimeoutSetter interface { SetDiscoveryResponseTimeout(value time.Duration) } +type errHandlerSetter interface { + SetErrorHandler(value fab.ErrorHandler) +} + func (o *options) SetDiscoveryRefreshInterval(value time.Duration) { logger.Debugf("RefreshInterval: %s", value) o.refreshInterval = value @@ -55,3 +72,8 @@ func (o *options) SetDiscoveryResponseTimeout(value time.Duration) { logger.Debugf("ResponseTimeout: %s", value) o.responseTimeout = value } + +func (o *options) SetErrorHandler(value fab.ErrorHandler) { + logger.Debugf("ErrorHandler: %+v", value) + o.errHandler = value +} diff --git a/pkg/client/common/discovery/dynamicdiscovery/service.go b/pkg/client/common/discovery/dynamicdiscovery/service.go index 431e9d4973..c95911c4fb 100644 --- a/pkg/client/common/discovery/dynamicdiscovery/service.go +++ b/pkg/client/common/discovery/dynamicdiscovery/service.go @@ -25,6 +25,14 @@ type DiscoveryClient interface { Send(ctx context.Context, req *discclient.Request, targets ...fab.PeerConfig) ([]fabdiscovery.Response, error) } +const ( + // AccessDenied indicates that the user does not have permission to perform the operation + AccessDenied = "access denied" +) + +// DiscoveryError is an error originating at the Discovery service +type DiscoveryError error + // clientProvider is overridden by unit tests var clientProvider = func(ctx contextAPI.Client) (DiscoveryClient, error) { return fabdiscovery.New(ctx) @@ -39,7 +47,7 @@ type service struct { ctx contextAPI.Client discClient DiscoveryClient peersRef *lazyref.Reference - lastErr error + ErrHandler fab.ErrorHandler } type queryPeers func() ([]fab.Peer, error) @@ -59,25 +67,16 @@ func newService(config fab.EndpointConfig, query queryPeers, opts ...coptions.Op logger.Debugf("Cache refresh interval: %s", options.refreshInterval) logger.Debugf("Deliver service response timeout: %s", options.responseTimeout) - s := &service{ + return &service{ responseTimeout: options.responseTimeout, + ErrHandler: options.errHandler, + peersRef: lazyref.New( + func() (interface{}, error) { + return query() + }, + lazyref.WithRefreshInterval(lazyref.InitOnFirstAccess, options.refreshInterval), + ), } - - s.peersRef = lazyref.New( - func() (interface{}, error) { - peers, err := query() - if err != nil { - derr, ok := err.(*discoveryError) - if ok && derr.IsFatal() { - go s.close(err) - } - } - return peers, err - }, - lazyref.WithRefreshInterval(lazyref.InitOnFirstAccess, options.refreshInterval), - ) - - return s } // initialize initializes the service with client context @@ -108,27 +107,9 @@ func (s *service) Close() { s.peersRef.Close() } -func (s *service) close(err error) { - logger.Warnf("Got fatal error [%s]. Closing discovery client.", err) - s.lock.Lock() - defer s.lock.Unlock() - s.lastErr = err - s.peersRef.Close() -} - -func (s *service) getLastError() error { - s.lock.RLock() - defer s.lock.RUnlock() - return s.lastErr -} - // GetPeers returns the available peers func (s *service) GetPeers() ([]fab.Peer, error) { if s.peersRef.IsClosed() { - lastErr := s.getLastError() - if lastErr != nil { - return nil, errors.Errorf("Discovery client has been closed due to error: %s", lastErr) - } return nil, errors.Errorf("Discovery client has been closed") } diff --git a/pkg/client/common/mocks/mockdiscoveryclient.go b/pkg/client/common/mocks/mockdiscoveryclient.go index ec9f4d4bcb..f4daaf6cb0 100644 --- a/pkg/client/common/mocks/mockdiscoveryclient.go +++ b/pkg/client/common/mocks/mockdiscoveryclient.go @@ -81,6 +81,7 @@ func (r *response) ForChannel(string) discclient.ChannelResponse { func (r *response) ForLocal() discclient.LocalResponse { return &localResponse{ peers: r.peers, + err: r.err, } } @@ -109,11 +110,12 @@ func (cr *channelResponse) Endorsers(invocationChain discclient.InvocationChain, type localResponse struct { peers []*discclient.Peer + err error } // Peers returns a response for a peer membership query, or error if something went wrong func (cr *localResponse) Peers() ([]*discclient.Peer, error) { - return cr.peers, nil + return cr.peers, cr.err } // MockDiscoverEndpointResponse contains a mock response for the discover client diff --git a/pkg/client/common/selection/fabricselection/fabricselection.go b/pkg/client/common/selection/fabricselection/fabricselection.go index b776c02ff4..0f75e159fe 100644 --- a/pkg/client/common/selection/fabricselection/fabricselection.go +++ b/pkg/client/common/selection/fabricselection/fabricselection.go @@ -11,7 +11,6 @@ import ( "encoding/json" "fmt" "strings" - "sync" "time" discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client" @@ -34,8 +33,10 @@ import ( ) const ( - moduleName = "fabsdk/client" - accessDenied = "access denied" + moduleName = "fabsdk/client" + + // AccessDenied indicates that the user does not have permission to perform the operation + AccessDenied = "access denied" ) var logger = logging.NewLogger(moduleName) @@ -77,8 +78,7 @@ type Service struct { discClient DiscoveryClient chResponseCache *lazycache.Cache retryOpts retry.Opts - lastErr error - lock sync.RWMutex + errHandler fab.ErrorHandler } // New creates a new dynamic selection service using Fabric's Discovery Service @@ -110,6 +110,7 @@ func New(ctx contextAPI.Client, channelID string, discovery fab.DiscoveryService discovery: discovery, discClient: discoveryClient, retryOpts: options.retryOpts, + errHandler: options.errHandler, } s.chResponseCache = lazycache.NewWithData( @@ -131,11 +132,9 @@ func New(ctx contextAPI.Client, channelID string, discovery fab.DiscoveryService } endorsers, err := s.queryEndorsers(invocationChain, ropts) - if err != nil { - derr, ok := err.(discoveryError) - if ok && derr.isFatal() { - go s.close(err) - } + if err != nil && s.errHandler != nil { + logger.Debugf("[%s] Got error from discovery query: %s. Invoking error handler", s.channelID, err) + s.errHandler(s.ctx, s.channelID, err) } return endorsers, err }, @@ -148,10 +147,6 @@ func New(ctx contextAPI.Client, channelID string, discovery fab.DiscoveryService // GetEndorsersForChaincode returns the endorsing peers for the given chaincodes func (s *Service) GetEndorsersForChaincode(chaincodes []*fab.ChaincodeCall, opts ...coptions.Opt) ([]fab.Peer, error) { if s.chResponseCache.IsClosed() { - lastErr := s.getLastError() - if lastErr != nil { - return nil, errors.Errorf("Selection service has been closed due to error: %s", lastErr) - } return nil, errors.Errorf("Selection service has been closed") } @@ -189,20 +184,6 @@ func (s *Service) Close() { s.chResponseCache.Close() } -func (s *Service) close(err error) { - logger.Warnf("Got fatal error [%s]. Closing selection service.", err) - s.lock.Lock() - defer s.lock.Unlock() - s.lastErr = err - s.chResponseCache.Close() -} - -func (s *Service) getLastError() error { - s.lock.RLock() - defer s.lock.RUnlock() - return s.lastErr -} - func (s *Service) getEndorsers(chaincodes []*fab.ChaincodeCall, chResponse discclient.ChannelResponse, peerFilter soptions.PeerFilter, sorter soptions.PeerSorter) (discclient.Endorsers, error) { peers, err := s.discovery.GetPeers() if err != nil { @@ -278,7 +259,7 @@ func (s *Service) query(req *discclient.Request, chaincodes []*fab.ChaincodeCall invocChain := asInvocationChain(chaincodes) - var discErrs []discoveryError + var discErrs []DiscoveryError for _, response := range responses { logger.Debugf("Checking response from [%s]...", response.Target()) chResp := response.ForChannel(s.channelID) @@ -377,21 +358,19 @@ func (p *peerEndpoint) BlockHeight() uint64 { return p.blockHeight } -type discoveryError string +// DiscoveryError is an error originating at the Discovery service +type DiscoveryError string -func newDiscoveryError(err error) discoveryError { - return discoveryError(err.Error()) +func newDiscoveryError(err error) DiscoveryError { + return DiscoveryError(err.Error()) } -func (e discoveryError) Error() string { +// Error returns the error message +func (e DiscoveryError) Error() string { return string(e) } -func (e discoveryError) isTransient() bool { +func (e DiscoveryError) isTransient() bool { return strings.Contains(e.Error(), "failed constructing descriptor for chaincodes") || strings.Contains(e.Error(), "no endorsement combination can be satisfied") } - -func (e discoveryError) isFatal() bool { - return e == accessDenied -} diff --git a/pkg/client/common/selection/fabricselection/opts.go b/pkg/client/common/selection/fabricselection/opts.go index 9b01880e4f..4239a22b8f 100755 --- a/pkg/client/common/selection/fabricselection/opts.go +++ b/pkg/client/common/selection/fabricselection/opts.go @@ -11,12 +11,14 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/common/errors/retry" coptions "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" ) type params struct { refreshInterval time.Duration responseTimeout time.Duration retryOpts retry.Opts + errHandler fab.ErrorHandler } // WithRefreshInterval sets the interval in which the @@ -51,6 +53,16 @@ func WithRetryOpts(value retry.Opts) coptions.Opt { } } +// WithErrorHandler sets the error handler +func WithErrorHandler(value fab.ErrorHandler) coptions.Opt { + return func(p coptions.Params) { + logger.Debug("Checking errHandlerSetter") + if setter, ok := p.(errHandlerSetter); ok { + setter.SetErrorHandler(value) + } + } +} + type refreshIntervalSetter interface { SetSelectionRefreshInterval(value time.Duration) } @@ -63,6 +75,10 @@ type retryOptsSetter interface { SetSelectionRetryOpts(value retry.Opts) } +type errHandlerSetter interface { + SetErrorHandler(value fab.ErrorHandler) +} + func (o *params) SetSelectionRefreshInterval(value time.Duration) { logger.Debugf("RefreshInterval: %s", value) o.refreshInterval = value @@ -77,3 +93,8 @@ func (o *params) SetSelectionRetryOpts(value retry.Opts) { logger.Debugf("RetryOpts: %#v", value) o.retryOpts = value } + +func (o *params) SetErrorHandler(value fab.ErrorHandler) { + logger.Debugf("ErrorHandler: %+v", value) + o.errHandler = value +} diff --git a/pkg/client/common/selection/fabricselection/selection_test.go b/pkg/client/common/selection/fabricselection/selection_test.go index 741bb4ff26..2b0562531e 100644 --- a/pkg/client/common/selection/fabricselection/selection_test.go +++ b/pkg/client/common/selection/fabricselection/selection_test.go @@ -120,11 +120,21 @@ func TestSelection(t *testing.T) { return discClient, nil }) + var service *Service + + errHandler := func(ctxt fab.ClientContext, channelID string, err error) { + derr, ok := err.(DiscoveryError) + if ok && derr.Error() == AccessDenied { + service.Close() + } + } + service, err := New( ctx, channelID, mocks.NewMockDiscoveryService(nil, peer1Org1, peer2Org1, peer1Org2, peer2Org2, peer1Org3, peer2Org3), - WithRefreshInterval(10*time.Millisecond), + WithRefreshInterval(5*time.Millisecond), WithResponseTimeout(100*time.Millisecond), + WithErrorHandler(errHandler), ) require.NoError(t, err) defer service.Close() @@ -175,12 +185,12 @@ func TestSelection(t *testing.T) { discClient.SetResponses( &clientmocks.MockDiscoverEndpointResponse{ PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{}, - Error: fmt.Errorf(accessDenied), + Error: fmt.Errorf(AccessDenied), }, ) // Wait for cache to refresh time.Sleep(20 * time.Millisecond) - testSelectionError(t, service, "Selection service has been closed due to error: access denied") + testSelectionError(t, service, "Selection service has been closed") }) } @@ -257,7 +267,7 @@ func TestWithDiscoveryFilter(t *testing.T) { func testSelectionError(t *testing.T, service *Service, expectedErrMsg string) { endorsers, err := service.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: cc1}}) - assert.Error(t, err) + require.Error(t, err) assert.Equal(t, expectedErrMsg, err.Error()) assert.Equal(t, 0, len(endorsers)) } diff --git a/pkg/common/providers/fab/context.go b/pkg/common/providers/fab/context.go index 6390af2681..0f87fda5e7 100644 --- a/pkg/common/providers/fab/context.go +++ b/pkg/common/providers/fab/context.go @@ -33,3 +33,6 @@ type Transactor interface { type ChannelProvider interface { ChannelService(ctx ClientContext, channelID string) (ChannelService, error) } + +// ErrorHandler is invoked when an error occurs in one of the services +type ErrorHandler func(ctxt ClientContext, channelID string, err error) diff --git a/pkg/fab/chconfig/cache_test.go b/pkg/fab/chconfig/cache_test.go index 7c3d3d14ce..6f7c31b563 100644 --- a/pkg/fab/chconfig/cache_test.go +++ b/pkg/fab/chconfig/cache_test.go @@ -8,6 +8,8 @@ package chconfig import ( "fmt" + "sync" + "sync/atomic" "testing" "time" @@ -67,6 +69,48 @@ func TestChannelConfigCacheBad(t *testing.T) { assert.Contains(t, err.Error(), badProviderErrMessage) } +func TestErrorHandler(t *testing.T) { + user := mspmocks.NewMockSigningIdentity("user", "user") + clientCtx := mocks.NewMockContext(user) + + var numErrors int32 + cache := NewRefCache( + WithRefreshInterval(time.Millisecond*10), + WithErrorHandler(func(ctxt fab.ClientContext, channelID string, err error) { + if _, ok := err.(ChannelConfigError); ok { + atomic.AddInt32(&numErrors, 1) + } + }), + ) + assert.NotNil(t, cache) + + var mutex sync.RWMutex + var providerErr error + + provider := func(channelID string) (fab.ChannelConfig, error) { + chConfig, _ := mocks.NewMockChannelConfig(nil, channelID) + mutex.RLock() + defer mutex.RUnlock() + return chConfig, providerErr + } + + // Should succeed + key, err := NewCacheKey(clientCtx, provider, "test") + assert.Nil(t, err) + assert.NotNil(t, key) + + r, err := cache.Get(key) + assert.Nil(t, err) + assert.NotNil(t, r) + + mutex.Lock() + providerErr = fmt.Errorf(badProviderErrMessage) + mutex.Unlock() + + time.Sleep(50 * time.Millisecond) + assert.Truef(t, atomic.LoadInt32(&numErrors) > 0, "Error handler should have received at least one error") +} + type badKey struct { s string } diff --git a/pkg/fab/chconfig/opts.go b/pkg/fab/chconfig/opts.go index d0a842c5e2..dcb8025900 100755 --- a/pkg/fab/chconfig/opts.go +++ b/pkg/fab/chconfig/opts.go @@ -10,6 +10,7 @@ import ( "time" coptions "github.com/hyperledger/fabric-sdk-go/pkg/common/options" + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" ) const ( @@ -18,6 +19,7 @@ const ( type params struct { refreshInterval time.Duration + errHandler fab.ErrorHandler } func newDefaultParams() *params { @@ -36,11 +38,30 @@ func WithRefreshInterval(value time.Duration) coptions.Opt { } } +// WithErrorHandler sets the error handler +func WithErrorHandler(value fab.ErrorHandler) coptions.Opt { + return func(p coptions.Params) { + logger.Debug("Checking errHandlerSetter") + if setter, ok := p.(errHandlerSetter); ok { + setter.SetErrorHandler(value) + } + } +} + type refreshIntervalSetter interface { SetChConfigRefreshInterval(value time.Duration) } +type errHandlerSetter interface { + SetErrorHandler(value fab.ErrorHandler) +} + func (o *params) SetChConfigRefreshInterval(value time.Duration) { logger.Debugf("RefreshInterval: %s", value) o.refreshInterval = value } + +func (o *params) SetErrorHandler(value fab.ErrorHandler) { + logger.Debugf("ErrorHandler: %+v", value) + o.errHandler = value +} diff --git a/pkg/fab/chconfig/reference.go b/pkg/fab/chconfig/reference.go index 89950dc0c9..e10b0fa819 100644 --- a/pkg/fab/chconfig/reference.go +++ b/pkg/fab/chconfig/reference.go @@ -17,9 +17,10 @@ import ( // Ref channel configuration lazy reference type Ref struct { *lazyref.Reference - pvdr Provider - ctx fab.ClientContext - channelID string + pvdr Provider + ctx fab.ClientContext + channelID string + errHandler fab.ErrorHandler } // ChannelConfigError is returned when the channel config could not be refreshed @@ -31,9 +32,10 @@ func NewRef(ctx fab.ClientContext, pvdr Provider, channel string, opts ...option options.Apply(params, opts) cfgRef := &Ref{ - pvdr: pvdr, - ctx: ctx, - channelID: channel, + pvdr: pvdr, + ctx: ctx, + channelID: channel, + errHandler: params.errHandler, } cfgRef.Reference = lazyref.New( @@ -46,19 +48,28 @@ func NewRef(ctx fab.ClientContext, pvdr Provider, channel string, opts ...option func (ref *Ref) initializer() lazyref.Initializer { return func() (interface{}, error) { - chConfigProvider, err := ref.pvdr(ref.channelID) - if err != nil { - return nil, errors.WithMessage(err, "error creating channel config provider") + chConfig, err := ref.getConfig() + if err != nil && ref.errHandler != nil { + logger.Debugf("[%s] An error occurred while retrieving channel config. Invoking error handler.", ref.channelID) + ref.errHandler(ref.ctx, ref.channelID, ChannelConfigError(err)) } + return chConfig, err + } +} - reqCtx, cancel := contextImpl.NewRequest(ref.ctx, contextImpl.WithTimeoutType(fab.PeerResponse)) - defer cancel() +func (ref *Ref) getConfig() (fab.ChannelCfg, error) { + chConfigProvider, err := ref.pvdr(ref.channelID) + if err != nil { + return nil, errors.WithMessage(err, "error creating channel config provider") + } - chConfig, err := chConfigProvider.Query(reqCtx) - if err != nil { - return nil, err - } + reqCtx, cancel := contextImpl.NewRequest(ref.ctx, contextImpl.WithTimeoutType(fab.PeerResponse)) + defer cancel() - return chConfig, nil + chConfig, err := chConfigProvider.Query(reqCtx) + if err != nil { + return nil, err } + + return chConfig, nil } diff --git a/pkg/fab/discovery/discovery.go b/pkg/fab/discovery/discovery.go index 5a26612748..b287776c78 100644 --- a/pkg/fab/discovery/discovery.go +++ b/pkg/fab/discovery/discovery.go @@ -13,6 +13,7 @@ import ( discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client" "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/protos/discovery" "github.com/hyperledger/fabric-sdk-go/pkg/common/errors/multi" + "github.com/hyperledger/fabric-sdk-go/pkg/common/logging" fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" corecomm "github.com/hyperledger/fabric-sdk-go/pkg/core/config/comm" @@ -21,6 +22,8 @@ import ( "google.golang.org/grpc" ) +var logger = logging.NewLogger("fabsdk/fab") + const ( signerCacheSize = 10 // TODO: set an appropriate value (and perhaps make configurable) ) @@ -67,19 +70,21 @@ func (c *Client) Send(ctx context.Context, req *discclient.Request, targets ...f var responses []Response var errs error - for _, target := range targets { - pconfig := target - go func() { + for _, t := range targets { + go func(target fab.PeerConfig) { defer wg.Done() - resp, err := c.send(ctx, req, pconfig) + + resp, err := c.send(ctx, req, target) lock.Lock() if err != nil { - errs = multi.Append(errs, errors.WithMessage(err, "From target: "+pconfig.URL)) + errs = multi.Append(errs, errors.WithMessage(err, "From target: "+target.URL)) + logger.Debugf("... got discovery error response from [%s]: %s", target.URL, err) } else { - responses = append(responses, &response{Response: resp, target: pconfig.URL}) + responses = append(responses, &response{Response: resp, target: target.URL}) + logger.Debugf("... got discovery response from [%s]", target.URL) } lock.Unlock() - }() + }(t) } wg.Wait() diff --git a/pkg/fabsdk/fabsdk.go b/pkg/fabsdk/fabsdk.go index 99127bd078..fcdcf44a28 100644 --- a/pkg/fabsdk/fabsdk.go +++ b/pkg/fabsdk/fabsdk.go @@ -215,6 +215,13 @@ func WithProviderOpts(sopts ...coptions.Opt) Option { } } +// WithErrorHandler sets an error handler that will be invoked when a service error is experienced. +// This allows the client to take a decision of whether to ignore the error, shut down the client context, +// or shut down the entire SDK. +func WithErrorHandler(value fab.ErrorHandler) Option { + return WithProviderOpts(withErrorHandlerProviderOpt(value)) +} + // providerInit interface allows for initializing providers // TODO: minimize interface type providerInit interface { @@ -547,3 +554,16 @@ func (sdk *FabricSDK) loadMetricsConfig(configBackend ...core.ConfigBackend) (me return metricsConfigOpt, nil } + +func withErrorHandlerProviderOpt(value fab.ErrorHandler) coptions.Opt { + return func(p coptions.Params) { + if setter, ok := p.(errHandlerSetter); ok { + logger.Debugf("... setting error handler") + setter.SetErrorHandler(value) + } + } +} + +type errHandlerSetter interface { + SetErrorHandler(value fab.ErrorHandler) +} diff --git a/pkg/fabsdk/fabsdk_test.go b/pkg/fabsdk/fabsdk_test.go index 2f13f44c63..e9e67f1ea6 100644 --- a/pkg/fabsdk/fabsdk_test.go +++ b/pkg/fabsdk/fabsdk_test.go @@ -11,15 +11,19 @@ package fabsdk import ( "os" "reflect" + "sync" "testing" "time" "github.com/golang/mock/gomock" "github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery/dynamicdiscovery" clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/client/common/selection/fabricselection" "github.com/hyperledger/fabric-sdk-go/pkg/client/resmgmt" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + context2 "github.com/hyperledger/fabric-sdk-go/pkg/context" + contextImpl "github.com/hyperledger/fabric-sdk-go/pkg/context" configImpl "github.com/hyperledger/fabric-sdk-go/pkg/core/config" discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks" "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" @@ -412,6 +416,181 @@ func TestCloseContext(t *testing.T) { assert.NoError(t, err) } +func TestErrorHandler(t *testing.T) { + c := configImpl.FromFile(sdkConfigFile) + + core, err := newMockCorePkg(c) + require.NoError(t, err) + + discClient := clientmocks.NewMockDiscoveryClient() + dynamicdiscovery.SetClientProvider(func(ctx context.Client) (dynamicdiscovery.DiscoveryClient, error) { + return discClient, nil + }) + fabricselection.SetClientProvider(func(ctx context.Client) (fabricselection.DiscoveryClient, error) { + return discClient, nil + }) + + var sdk *FabricSDK + var chService fab.ChannelService + var localCtxt context.Local + var mutex sync.RWMutex + + newContext := func(user, org string) { + getClientCtxt := sdk.Context(WithUser(user), WithOrg(org)) + require.NotNil(t, getClientCtxt) + + chCtxt, err := contextImpl.NewChannel(getClientCtxt, "orgchannel") + require.NoError(t, err) + require.NotNil(t, chCtxt) + + s := chCtxt.ChannelService() + require.NotNil(t, s) + + lc, err := context2.NewLocal(getClientCtxt) + require.NoError(t, err) + + mutex.Lock() + defer mutex.Unlock() + + chService = s + localCtxt = lc + } + + getChannelService := func() fab.ChannelService { + mutex.Lock() + defer mutex.Unlock() + return chService + } + + getLocalCtxt := func() context.Local { + mutex.Lock() + defer mutex.Unlock() + return localCtxt + } + + errHandler := func(ctxt fab.ClientContext, channelID string, err error) { + // Analyse the error to see if it needs handling + if err.Error() != dynamicdiscovery.AccessDenied { + // Transient error; no handling necessary + return + } + + // Need to spawn a new Go routine or else deadlock results when calling CloseContext + go func() { + sdk.CloseContext(ctxt) + + // Reset the successful response + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{}, + }, + ) + + newContext(sdkValidClientUser, sdkValidClientOrg2) + }() + } + + sdk, err = New(c, + WithCorePkg(core), + WithServicePkg(&dynamicDiscoveryProviderFactory{}), + WithErrorHandler(errHandler), + WithProviderOpts( + dynamicdiscovery.WithRefreshInterval(3*time.Millisecond), + fabricselection.WithRefreshInterval(3*time.Millisecond), + ), + ) + require.NoError(t, err) + defer sdk.Close() + + chCfg := mocks.NewMockChannelCfg("orgchannel") + chCfg.MockCapabilities[fab.ApplicationGroupKey][fab.V1_2Capability] = true + chpvdr.SetChannelConfig(chCfg) + + newContext(sdkValidClientUser, sdkValidClientOrg1) + + localDiscovery := getLocalCtxt().LocalDiscoveryService() + require.NotNil(t, localDiscovery) + + discovery, err := getChannelService().Discovery() + require.NoError(t, err) + + selection, err := getChannelService().Selection() + require.NoError(t, err) + + // First set a successful response + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{}, + }, + ) + + _, err = localDiscovery.GetPeers() + assert.NoError(t, err) + + _, err = discovery.GetPeers() + assert.NoError(t, err) + + _, err = selection.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: "cc1"}}) + require.NoError(t, err) + + // Simulate a transient error + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + Error: errors.New("some transient error"), + }, + ) + + time.Sleep(10 * time.Millisecond) + + _, err = localDiscovery.GetPeers() + assert.NoError(t, err) + + _, err = discovery.GetPeers() + assert.NoError(t, err) + + _, err = selection.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: "cc1"}}) + require.NoError(t, err) + + // Simulate an access-denied (could be due to a user being revoked) + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + Error: errors.New(dynamicdiscovery.AccessDenied), + }, + ) + + time.Sleep(10 * time.Millisecond) + + // Subsequent calls on the old services should fail since the service is closed + _, err = localDiscovery.GetPeers() + assert.EqualError(t, err, "Discovery client has been closed") + + _, err = discovery.GetPeers() + assert.EqualError(t, err, "Discovery client has been closed") + + _, err = selection.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: "cc1"}}) + assert.EqualError(t, err, "Selection service has been closed") + + // Refresh the services with the new context + + localDiscovery = getLocalCtxt().LocalDiscoveryService() + require.NotNil(t, localDiscovery) + + _, err = localDiscovery.GetPeers() + assert.NoError(t, err) + + discovery, err = getChannelService().Discovery() + require.NoError(t, err) + + _, err = discovery.GetPeers() + assert.NoError(t, err) + + selection, err = getChannelService().Selection() + require.NoError(t, err) + + _, err = selection.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: "cc1"}}) + require.NoError(t, err) +} + type MockNetworkPeers struct{} func (M *MockNetworkPeers) NetworkPeers() []fab.NetworkPeer { diff --git a/pkg/fabsdk/provider/chpvdr/chprovider_test.go b/pkg/fabsdk/provider/chpvdr/chprovider_test.go index ce1ccf2d23..c0385dc0b4 100644 --- a/pkg/fabsdk/provider/chpvdr/chprovider_test.go +++ b/pkg/fabsdk/provider/chpvdr/chprovider_test.go @@ -10,6 +10,7 @@ package chpvdr import ( "errors" + "sync" "testing" "time" @@ -180,96 +181,205 @@ func newMockClientContext(userID, mspID string) fab.ClientContext { } func TestDiscoveryAccessDenied(t *testing.T) { - discClient, channelService := setupDiscovery(t, func(discClient *clientmocks.MockDiscoveryClient) { - dynamicdiscovery.SetClientProvider(func(ctx context.Client) (dynamicdiscovery.DiscoveryClient, error) { - return discClient, nil - }) + var channelProvider *ChannelProvider + var disc fab.DiscoveryService + var mutex sync.RWMutex + + testChannelCfg := mocks.NewMockChannelCfg("testchannel") + testChannelCfg.MockCapabilities[fab.ApplicationGroupKey][fab.V1_2Capability] = true + + SetChannelConfig(chconfig.NewChannelCfg(""), testChannelCfg) + + discClient := clientmocks.NewMockDiscoveryClient() + dynamicdiscovery.SetClientProvider(func(ctx context.Client) (dynamicdiscovery.DiscoveryClient, error) { + return discClient, nil }) + newDiscovery := func(userID, mspID string) fab.DiscoveryService { + channelService, err := channelProvider.ChannelService(newMockClientContext(userID, mspID), "testchannel") + require.NoError(t, err) + + d, err := channelService.Discovery() + require.NoError(t, err) + require.NotNil(t, d) + + mutex.Lock() + defer mutex.Unlock() + disc = d + return d + } + + getDiscovery := func() fab.DiscoveryService { + mutex.RLock() + defer mutex.RUnlock() + return disc + } + + errHandler := func(ctxt fab.ClientContext, channelID string, err error) { + if derr, ok := err.(dynamicdiscovery.DiscoveryError); ok && derr.Error() == dynamicdiscovery.AccessDenied && derr.Error() == dynamicdiscovery.AccessDenied { + // Spawn a new Go routine or else we'll hit a deadlock when closing the context + go func() { + channelProvider.CloseContext(ctxt) + + // Reset the error + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{}, + }, + ) + + // Replace Discovery with a new one using different credentials + newDiscovery("user2", "org1") + }() + } + } + + channelProvider = getChannelProvider(t, mocks.NewMockProviderContext(), + dynamicdiscovery.WithErrorHandler(errHandler), + dynamicdiscovery.WithRefreshInterval(5*time.Millisecond), + ) + defer channelProvider.Close() + + discovery := newDiscovery("user1", "org1") + + // First set a successful response discClient.SetResponses( &clientmocks.MockDiscoverEndpointResponse{ - Error: errors.New("access denied"), + PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{}, }, ) - discovery, err := channelService.Discovery() + _, err := discovery.GetPeers() require.NoError(t, err) - require.NotNil(t, discovery) - _, ok := discovery.(*dynamicdiscovery.ChannelService) - assert.Truef(t, ok, "Expecting discovery to be Dynamic for v1_2") - _, err = discovery.GetPeers() - require.Error(t, err) - assert.Equal(t, "access denied", err.Error()) + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + Error: errors.New("access denied"), + }, + ) - time.Sleep(50 * time.Millisecond) + time.Sleep(10 * time.Millisecond) - // Subsequent calls should fail since the service is closed + // Subsequent calls on the old discovery should fail since the service is closed _, err = discovery.GetPeers() require.Error(t, err) - assert.Equal(t, "Discovery client has been closed due to error: access denied", err.Error()) -} + assert.Equal(t, "Discovery client has been closed", err.Error()) -func TestSelectionAccessDenied(t *testing.T) { - discClient, channelService := setupDiscovery(t, func(discClient *clientmocks.MockDiscoveryClient) { - fabricselection.SetClientProvider(func(ctx context.Client) (fabricselection.DiscoveryClient, error) { - return discClient, nil - }) - }) + time.Sleep(10 * time.Millisecond) + + // Subsequent calls should succeed since the error handler should have replaced the discovery service + discovery = getDiscovery() + _, err = discovery.GetPeers() + require.NoError(t, err) + // Set a transient error discClient.SetResponses( &clientmocks.MockDiscoverEndpointResponse{ - Error: errors.New("access denied"), + Error: errors.New("some transient error"), }, ) - selection, err := channelService.Selection() + // Wait for the cache to refresh + time.Sleep(10 * time.Millisecond) + + // Calls should still succeed since the error handler ignores transient errors + _, err = discovery.GetPeers() require.NoError(t, err) - require.NotNil(t, selection) - _, ok := selection.(*fabricselection.Service) - assert.Truef(t, ok, "Expecting selection to be Fabric for v1_2") +} - _, err = selection.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: "cc1"}}) - require.Error(t, err) - assert.Equal(t, "error getting channel response for channel [testchannel]: access denied", err.Error()) +func TestSelectionAccessDenied(t *testing.T) { + var channelProvider *ChannelProvider + var sel fab.SelectionService + var mutex sync.RWMutex - time.Sleep(50 * time.Millisecond) + testChannelCfg := mocks.NewMockChannelCfg("testchannel") + testChannelCfg.MockCapabilities[fab.ApplicationGroupKey][fab.V1_2Capability] = true - // Subsequent calls should fail since the service is closed - _, err = selection.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: "cc1"}}) - require.Error(t, err) - assert.Equal(t, "Selection service has been closed due to error: access denied", err.Error()) -} + SetChannelConfig(chconfig.NewChannelCfg(""), testChannelCfg) -func setupDiscovery(t *testing.T, preInit func(discClient *clientmocks.MockDiscoveryClient)) (*clientmocks.MockDiscoveryClient, fab.ChannelService) { - ctx := mocks.NewMockProviderContext() + discClient := clientmocks.NewMockDiscoveryClient() + dynamicdiscovery.SetClientProvider(func(ctx context.Client) (dynamicdiscovery.DiscoveryClient, error) { + return discClient, nil + }) + fabricselection.SetClientProvider(func(ctx context.Client) (fabricselection.DiscoveryClient, error) { + logger.Infof("Returning mock discovery client") + return discClient, nil + }) - user := mspmocks.NewMockSigningIdentity("user", "user") + newSelection := func(userID, mspID string) fab.SelectionService { + channelService, err := channelProvider.ChannelService(newMockClientContext(userID, mspID), "testchannel") + require.NoError(t, err) - clientCtx := &mockClientContext{ - Providers: ctx, - SigningIdentity: user, + s, err := channelService.Selection() + require.NoError(t, err) + require.NotNil(t, s) + + mutex.Lock() + defer mutex.Unlock() + sel = s + return s } - discClient := clientmocks.NewMockDiscoveryClient() + getSelection := func() fab.SelectionService { + mutex.RLock() + defer mutex.RUnlock() + return sel + } - preInit(discClient) + errHandler := func(ctxt fab.ClientContext, channelID string, err error) { + if derr, ok := err.(fabricselection.DiscoveryError); ok && derr.Error() == fabricselection.AccessDenied { + // Spawn a new Go routine or else we'll hit a deadlock when closing the context + go func() { + channelProvider.CloseContext(ctxt) + + // Reset the error + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{}, + }, + ) + + // Replace Selection with a new one using different credentials + newSelection("user2", "org1") + }() + } + } - cp, err := New(clientCtx.EndpointConfig()) + channelProvider = getChannelProvider(t, mocks.NewMockProviderContext(), + dynamicdiscovery.WithErrorHandler(errHandler), + fabricselection.WithRefreshInterval(3*time.Millisecond), + ) + defer channelProvider.Close() + + // First set a successful response + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{}, + }, + ) + + selection := newSelection("user1", "org1") + + _, err := selection.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: "cc1"}}) require.NoError(t, err) - err = cp.Initialize(ctx) - assert.NoError(t, err) + // Now set an error response + discClient.SetResponses( + &clientmocks.MockDiscoverEndpointResponse{ + Error: errors.New("access denied"), + }, + ) - testChannelCfg := mocks.NewMockChannelCfg("testchannel") - testChannelCfg.MockCapabilities[fab.ApplicationGroupKey][fab.V1_2Capability] = true + // Wait for the cache to refresh + time.Sleep(10 * time.Millisecond) - SetChannelConfig(chconfig.NewChannelCfg(""), testChannelCfg) + // The old selection service should be closed + _, err = selection.GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: "cc1"}}) + assert.EqualError(t, err, "Selection service has been closed") - channelService, err := cp.ChannelService(clientCtx, "testchannel") + // The selection service should have been replaced with a good one + _, err = getSelection().GetEndorsersForChaincode([]*fab.ChaincodeCall{{ID: "cc1"}}) require.NoError(t, err) - - return discClient, channelService } func getChannelProvider(t *testing.T, providers context.Providers, opts ...options.Opt) *ChannelProvider { diff --git a/test/integration/negative/revoked/revoked_test.go b/test/integration/negative/revoked/revoked_test.go index 62375245f1..4466d9583a 100644 --- a/test/integration/negative/revoked/revoked_test.go +++ b/test/integration/negative/revoked/revoked_test.go @@ -230,8 +230,17 @@ func testRevokedPeer(t *testing.T) { //testRevokedUser performs revoke peer test func testRevokedUser(t *testing.T) { - - sdk, err := fabsdk.New(config.FromFile(integration.GetConfigPath(configFilename))) + var sdk *fabsdk.FabricSDK + var err error + sdk, err = fabsdk.New( + config.FromFile(integration.GetConfigPath(configFilename)), + fabsdk.WithErrorHandler(func(ctxt fab.ClientContext, channelID string, err error) { + if strings.Contains(err.Error(), "access denied") { + t.Logf("Closing context after error: %s", err) + go sdk.CloseContext(ctxt) + } + }), + ) require.NoError(t, err) defer sdk.Close()