Skip to content

Commit

Permalink
[FABG-825] Add error handler option
Browse files Browse the repository at this point in the history
Added an option to the SDK: WithErrorHandler(). The handler is
called when errors occur during background service operations, such as
discovery refresh or config refresh. The handler may choose to close
the client context or close the SDK.

Change-Id: Id484454e635ee0015751b650087fad822bc3c259
Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed Mar 6, 2019
1 parent 458319b commit f198238
Show file tree
Hide file tree
Showing 19 changed files with 637 additions and 178 deletions.
29 changes: 12 additions & 17 deletions pkg/client/common/discovery/dynamicdiscovery/chservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ import (
"github.com/pkg/errors"
)

const (
accessDenied = "access denied"
)

// ChannelService implements a dynamic Discovery Service that queries
// Fabric's Discovery service for information about the peers that
// are currently joined to the given channel.
Expand Down Expand Up @@ -53,6 +49,17 @@ func (s *ChannelService) Close() {
}

func (s *ChannelService) queryPeers() ([]fab.Peer, error) {
peers, err := s.doQueryPeers()

if err != nil && s.ErrHandler != nil {
logger.Infof("[%s] Got error from discovery query: %s. Invoking error handler", s.channelID, err)
s.ErrHandler(s.ctx, s.channelID, err)
}

return peers, err
}

func (s *ChannelService) doQueryPeers() ([]fab.Peer, error) {
logger.Debugf("Refreshing peers of channel [%s] from discovery service...", s.channelID)

ctx := s.context()
Expand Down Expand Up @@ -105,7 +112,7 @@ func (s *ChannelService) evaluate(ctx contextAPI.Client, responses []fabdiscover
for _, response := range responses {
endpoints, err := response.ForChannel(s.channelID).Peers()
if err != nil {
lastErr = newDiscoveryError(err)
lastErr = DiscoveryError(err)
logger.Warnf("error getting peers from discovery response: %s", lastErr)
continue
}
Expand Down Expand Up @@ -141,15 +148,3 @@ type peerEndpoint struct {
func (p *peerEndpoint) BlockHeight() uint64 {
return p.blockHeight
}

type discoveryError struct {
error
}

func newDiscoveryError(cause error) *discoveryError {
return &discoveryError{error: cause}
}

func (e *discoveryError) IsFatal() bool {
return e.Error() == accessDenied
}
14 changes: 12 additions & 2 deletions pkg/client/common/discovery/dynamicdiscovery/chservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery"
clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
pfab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
Expand Down Expand Up @@ -64,10 +65,19 @@ func TestDiscoveryService(t *testing.T) {
return discClient, nil
})

var service *ChannelService
service, err := NewChannelService(
ctx, mocks.NewMockMembership(), ch,
WithRefreshInterval(10*time.Millisecond),
WithResponseTimeout(100*time.Millisecond),
WithErrorHandler(
func(ctxt fab.ClientContext, channelID string, err error) {
derr, ok := err.(DiscoveryError)
if ok && derr.Error() == AccessDenied {
service.Close()
}
},
),
)
require.NoError(t, err)
defer service.Close()
Expand Down Expand Up @@ -139,7 +149,7 @@ func TestDiscoveryService(t *testing.T) {
// Fatal error (access denied can be due due a user being revoked)
discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
Error: errors.New(accessDenied),
Error: errors.New(AccessDenied),
},
)

Expand All @@ -148,7 +158,7 @@ func TestDiscoveryService(t *testing.T) {
// The discovery service should have been closed
_, err = service.GetPeers()
require.Error(t, err)
assert.Equal(t, "Discovery client has been closed due to error: access denied", err.Error())
assert.Equal(t, "Discovery client has been closed", err.Error())
}

func TestDiscoveryServiceWithNewOrgJoined(t *testing.T) {
Expand Down
13 changes: 12 additions & 1 deletion pkg/client/common/discovery/dynamicdiscovery/localservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ func (s *LocalService) localContext() contextAPI.Local {
}

func (s *LocalService) queryPeers() ([]fab.Peer, error) {
peers, err := s.doQueryPeers()

if err != nil && s.ErrHandler != nil {
logger.Debugf("Got error from discovery query: %s. Invoking error handler", err)
s.ErrHandler(s.ctx, "", err)
}

return peers, err
}

func (s *LocalService) doQueryPeers() ([]fab.Peer, error) {
logger.Debug("Refreshing local peers from discovery service...")

ctx := s.localContext()
Expand Down Expand Up @@ -77,7 +88,7 @@ func (s *LocalService) queryPeers() ([]fab.Peer, error) {
response := responses[0]
endpoints, err := response.ForLocal().Peers()
if err != nil {
return nil, errors.Wrap(err, "error getting peers from discovery response")
return nil, DiscoveryError(err)
}

return s.filterLocalMSP(asPeers(ctx, endpoints)), nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ SPDX-License-Identifier: Apache-2.0
package dynamicdiscovery

import (
"errors"
"testing"
"time"

clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/mocks"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
pfab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
Expand Down Expand Up @@ -58,8 +61,16 @@ func TestLocalDiscoveryService(t *testing.T) {

service = newLocalService(
config, mspID1,
WithRefreshInterval(500*time.Millisecond),
WithRefreshInterval(3*time.Millisecond),
WithResponseTimeout(2*time.Second),
WithErrorHandler(
func(ctxt fab.ClientContext, channelID string, err error) {
derr, ok := err.(DiscoveryError)
if ok && derr.Error() == AccessDenied {
service.Close()
}
},
),
)
defer service.Close()

Expand Down Expand Up @@ -122,4 +133,19 @@ func TestLocalDiscoveryService(t *testing.T) {
for _, p := range peers {
assert.Equalf(t, mspID1, p.MSPID(), "Expecting peer to be in MSP [%s]", mspID1)
}

// Fatal error (access denied can be due due a user being revoked)
discClient.SetResponses(
&clientmocks.MockDiscoverEndpointResponse{
Error: errors.New(AccessDenied),
},
)

// Wait for the cache to refresh
time.Sleep(10 * time.Millisecond)

// The discovery service should have been closed
_, err = service.GetPeers()
require.Error(t, err)
assert.Equal(t, "Discovery client has been closed", err.Error())
}
22 changes: 22 additions & 0 deletions pkg/client/common/discovery/dynamicdiscovery/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"time"

coptions "github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
)

type options struct {
refreshInterval time.Duration
responseTimeout time.Duration
errHandler fab.ErrorHandler
}

// WithRefreshInterval sets the interval in which the
Expand All @@ -38,6 +40,17 @@ func WithResponseTimeout(value time.Duration) coptions.Opt {
}
}

// WithErrorHandler sets the error handler
func WithErrorHandler(value fab.ErrorHandler) coptions.Opt {
return func(p coptions.Params) {
logger.Debugf("Checking errHandlerSetter")
if setter, ok := p.(errHandlerSetter); ok {
logger.Debugf("... setting error handler")
setter.SetErrorHandler(value)
}
}
}

type refreshIntervalSetter interface {
SetDiscoveryRefreshInterval(value time.Duration)
}
Expand All @@ -46,6 +59,10 @@ type responseTimeoutSetter interface {
SetDiscoveryResponseTimeout(value time.Duration)
}

type errHandlerSetter interface {
SetErrorHandler(value fab.ErrorHandler)
}

func (o *options) SetDiscoveryRefreshInterval(value time.Duration) {
logger.Debugf("RefreshInterval: %s", value)
o.refreshInterval = value
Expand All @@ -55,3 +72,8 @@ func (o *options) SetDiscoveryResponseTimeout(value time.Duration) {
logger.Debugf("ResponseTimeout: %s", value)
o.responseTimeout = value
}

func (o *options) SetErrorHandler(value fab.ErrorHandler) {
logger.Debugf("ErrorHandler: %+v", value)
o.errHandler = value
}
53 changes: 17 additions & 36 deletions pkg/client/common/discovery/dynamicdiscovery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ type DiscoveryClient interface {
Send(ctx context.Context, req *discclient.Request, targets ...fab.PeerConfig) ([]fabdiscovery.Response, error)
}

const (
// AccessDenied indicates that the user does not have permission to perform the operation
AccessDenied = "access denied"
)

// DiscoveryError is an error originating at the Discovery service
type DiscoveryError error

// clientProvider is overridden by unit tests
var clientProvider = func(ctx contextAPI.Client) (DiscoveryClient, error) {
return fabdiscovery.New(ctx)
Expand All @@ -39,7 +47,7 @@ type service struct {
ctx contextAPI.Client
discClient DiscoveryClient
peersRef *lazyref.Reference
lastErr error
ErrHandler fab.ErrorHandler
}

type queryPeers func() ([]fab.Peer, error)
Expand All @@ -59,25 +67,16 @@ func newService(config fab.EndpointConfig, query queryPeers, opts ...coptions.Op
logger.Debugf("Cache refresh interval: %s", options.refreshInterval)
logger.Debugf("Deliver service response timeout: %s", options.responseTimeout)

s := &service{
return &service{
responseTimeout: options.responseTimeout,
ErrHandler: options.errHandler,
peersRef: lazyref.New(
func() (interface{}, error) {
return query()
},
lazyref.WithRefreshInterval(lazyref.InitOnFirstAccess, options.refreshInterval),
),
}

s.peersRef = lazyref.New(
func() (interface{}, error) {
peers, err := query()
if err != nil {
derr, ok := err.(*discoveryError)
if ok && derr.IsFatal() {
go s.close(err)
}
}
return peers, err
},
lazyref.WithRefreshInterval(lazyref.InitOnFirstAccess, options.refreshInterval),
)

return s
}

// initialize initializes the service with client context
Expand Down Expand Up @@ -108,27 +107,9 @@ func (s *service) Close() {
s.peersRef.Close()
}

func (s *service) close(err error) {
logger.Warnf("Got fatal error [%s]. Closing discovery client.", err)
s.lock.Lock()
defer s.lock.Unlock()
s.lastErr = err
s.peersRef.Close()
}

func (s *service) getLastError() error {
s.lock.RLock()
defer s.lock.RUnlock()
return s.lastErr
}

// GetPeers returns the available peers
func (s *service) GetPeers() ([]fab.Peer, error) {
if s.peersRef.IsClosed() {
lastErr := s.getLastError()
if lastErr != nil {
return nil, errors.Errorf("Discovery client has been closed due to error: %s", lastErr)
}
return nil, errors.Errorf("Discovery client has been closed")
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/client/common/mocks/mockdiscoveryclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (r *response) ForChannel(string) discclient.ChannelResponse {
func (r *response) ForLocal() discclient.LocalResponse {
return &localResponse{
peers: r.peers,
err: r.err,
}
}

Expand Down Expand Up @@ -109,11 +110,12 @@ func (cr *channelResponse) Endorsers(invocationChain discclient.InvocationChain,

type localResponse struct {
peers []*discclient.Peer
err error
}

// Peers returns a response for a peer membership query, or error if something went wrong
func (cr *localResponse) Peers() ([]*discclient.Peer, error) {
return cr.peers, nil
return cr.peers, cr.err
}

// MockDiscoverEndpointResponse contains a mock response for the discover client
Expand Down
Loading

0 comments on commit f198238

Please sign in to comment.