Skip to content

Commit

Permalink
[FAB-10453] Discovery, Selection moved to chprovider
Browse files Browse the repository at this point in the history
Discovery and Selection services were moved to the
ChannelService. The discovery provider is now just
a "local" discovery provider and the selection provider
was removed.

Change-Id: I13fbdcaba8b9f8ac736f7676f0a06137bcbfe920
Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed May 31, 2018
1 parent ced92a7 commit 9bae250
Show file tree
Hide file tree
Showing 77 changed files with 1,334 additions and 1,761 deletions.
14 changes: 12 additions & 2 deletions pkg/client/channel/chclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,16 @@ func (cc *Client) prepareHandlerContexts(reqCtx reqContext.Context, request Requ
return nil, nil, errors.WithMessage(err, "failed to create transactor")
}

selection, err := cc.context.ChannelService().Selection()
if err != nil {
return nil, nil, errors.WithMessage(err, "failed to create selection service")
}

discovery, err := cc.context.ChannelService().Discovery()
if err != nil {
return nil, nil, errors.WithMessage(err, "failed to create discovery service")
}

peerFilter := func(peer fab.Peer) bool {
if !cc.greylist.Accept(peer) {
return false
Expand All @@ -234,8 +244,8 @@ func (cc *Client) prepareHandlerContexts(reqCtx reqContext.Context, request Requ
}

clientContext := &invoke.ClientContext{
Selection: cc.context.SelectionService(),
Discovery: cc.context.DiscoveryService(),
Selection: selection,
Discovery: discovery,
Membership: cc.membership,
Transactor: transactor,
EventService: cc.eventService,
Expand Down
65 changes: 6 additions & 59 deletions pkg/client/channel/chclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,32 +453,20 @@ func TestMultiErrorPropogation(t *testing.T) {
assert.Equal(t, "Multiple errors occurred: \nTest Error\nTest Error", statusError.Message, "Expected multi error message")
}

type serviceInit interface {
Initialize(context context.Channel) error
}

func TestDiscoveryGreylist(t *testing.T) {

testPeer1 := fcmocks.NewMockPeer("Peer1", "http://peer1.com")
testPeer1.Error = status.New(status.EndorserClientStatus,
status.ConnectionFailed.ToInt32(), "test", []interface{}{testPeer1.URL()})

selectionProvider, err := staticselection.New(fcmocks.NewMockEndpointConfig())
assert.Nil(t, err, "Got error %s", err)

selectionService, err := selectionProvider.CreateSelectionService("mychannel")
assert.Nil(t, err, "Got error %s", err)
discoveryService := txnmocks.NewMockDiscoveryService(nil, testPeer1)

discoveryService, err := setupTestDiscovery(nil, []fab.Peer{testPeer1})
selectionService, err := staticselection.NewService(discoveryService)
assert.Nil(t, err, "Got error %s", err)

fabCtx := setupCustomTestContext(t, selectionService, discoveryService, nil)
ctx := createChannelContext(fabCtx, channelID)

channelCtx, err := ctx()
assert.Nil(t, err, "Got error %s", err)
selectionService.(serviceInit).Initialize(channelCtx)

chClient, err := New(ctx)
assert.Nil(t, err, "Got error %s", err)

Expand Down Expand Up @@ -552,58 +540,23 @@ func setupCustomTestContext(t *testing.T, selectionService fab.SelectionService,

mockChService := testChannelSvc.(*fcmocks.MockChannelService)
mockChService.SetTransactor(&transactor)

//Modify for custom mockcore to test scenarios
selectionProvider := ctx.MockProviderContext.SelectionProvider()
selectionProvider.(*fcmocks.MockSelectionProvider).SetCustomSelectionService(selectionService)
mockChService.SetDiscovery(discoveryService)
mockChService.SetSelection(selectionService)

channelProvider := ctx.MockProviderContext.ChannelProvider()
channelProvider.(*fcmocks.MockChannelProvider).SetCustomChannelService(testChannelSvc)

discoveryProvider := ctx.MockProviderContext.DiscoveryProvider()
discoveryProvider.(*fcmocks.MockStaticDiscoveryProvider).SetCustomDiscoveryService(discoveryService)

return createClientContext(ctx)
}

func setupTestDiscovery(discErr error, peers []fab.Peer) (fab.DiscoveryService, error) {

mockDiscovery, err := txnmocks.NewMockDiscoveryProvider(discErr, peers)
if err != nil {
return nil, errors.WithMessage(err, "NewMockDiscoveryProvider failed")
}

return mockDiscovery.CreateDiscoveryService("mychannel")
}

func setupTestSelection(discErr error, peers []fab.Peer) (*txnmocks.MockSelectionService, error) {

mockSelection, err := txnmocks.NewMockSelectionProvider(discErr, peers)
if err != nil {
return nil, errors.WithMessage(err, "NewMockSelectinProvider failed")
}

return mockSelection.CreateSelectionService("mychannel")
}

func setupChannelClient(peers []fab.Peer, t *testing.T) *Client {

return setupChannelClientWithError(nil, nil, peers, t)
}

func setupChannelClientWithError(discErr error, selectionErr error, peers []fab.Peer, t *testing.T) *Client {

discoveryService, err := setupTestDiscovery(discErr, nil)
if err != nil {
t.Fatalf("Failed to setup discovery service: %s", err)
}

selectionService, err := setupTestSelection(selectionErr, peers)
if err != nil {
t.Fatalf("Failed to setup discovery service: %s", err)
}

fabCtx := setupCustomTestContext(t, selectionService, discoveryService, nil)
fabCtx := setupCustomTestContext(t, txnmocks.NewMockSelectionService(selectionErr, peers...), txnmocks.NewMockDiscoveryService(discErr), nil)

ctx := createChannelContext(fabCtx, channelID)

Expand All @@ -618,13 +571,7 @@ func setupChannelClientWithError(discErr error, selectionErr error, peers []fab.
func setupChannelClientWithNodes(peers []fab.Peer,
orderers []fab.Orderer, t *testing.T) *Client {

discoveryService, err := setupTestDiscovery(nil, nil)
assert.Nil(t, err, "Failed to setup discovery service")

selectionService, err := setupTestSelection(nil, peers)
assert.Nil(t, err, "Failed to setup discovery service")

fabCtx := setupCustomTestContext(t, selectionService, discoveryService, orderers)
fabCtx := setupCustomTestContext(t, txnmocks.NewMockSelectionService(nil, peers...), txnmocks.NewMockDiscoveryService(nil), orderers)

ctx := createChannelContext(fabCtx, channelID)

Expand Down
14 changes: 2 additions & 12 deletions pkg/client/channel/invoke/signature_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,15 @@ func setupContextForSignatureValidation(verifyErr, validateErr error, peers []fa
membership.ValidateErr = validateErr
membership.VerifyErr = verifyErr

discoveryService, err := setupTestDiscovery(nil, nil)
if err != nil {
t.Fatalf("Failed to setup discovery service: %s", err)
}

selectionService, err := setupTestSelection(nil, peers)
if err != nil {
t.Fatalf("Failed to setup discovery service: %s", err)
}

transactor := txnmocks.MockTransactor{
Ctx: ctx,
ChannelID: "",
}

return &ClientContext{
Membership: membership,
Discovery: discoveryService,
Selection: selectionService,
Discovery: fcmocks.NewMockDiscoveryService(nil),
Selection: fcmocks.NewMockSelectionService(nil, peers...),
Transactor: &transactor,
}

Expand Down
33 changes: 2 additions & 31 deletions pkg/client/channel/invoke/txnhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,16 +290,6 @@ func prepareRequestContext(request Request, opts Opts, t *testing.T) *RequestCon
func setupChannelClientContext(discErr error, selectionErr error, peers []fab.Peer, t *testing.T) *ClientContext {
membership := fcmocks.NewMockMembership()

discoveryService, err := setupTestDiscovery(discErr, nil)
if err != nil {
t.Fatalf("Failed to setup discovery service: %s", err)
}

selectionService, err := setupTestSelection(selectionErr, peers)
if err != nil {
t.Fatalf("Failed to setup discovery service: %s", err)
}

ctx := setupTestContext()
orderer := fcmocks.NewMockOrderer("", nil)
transactor := txnmocks.MockTransactor{
Expand All @@ -310,8 +300,8 @@ func setupChannelClientContext(discErr error, selectionErr error, peers []fab.Pe

return &ClientContext{
Membership: membership,
Discovery: discoveryService,
Selection: selectionService,
Discovery: txnmocks.NewMockDiscoveryService(discErr),
Selection: txnmocks.NewMockSelectionService(selectionErr, peers...),
Transactor: &transactor,
}

Expand All @@ -322,22 +312,3 @@ func setupTestContext() context.Client {
ctx := fcmocks.NewMockContext(user)
return ctx
}

func setupTestDiscovery(discErr error, peers []fab.Peer) (fab.DiscoveryService, error) {

mockDiscovery, err := txnmocks.NewMockDiscoveryProvider(discErr, peers)
if err != nil {
return nil, errors.WithMessage(err, "NewMockDiscoveryProvider failed")
}
return mockDiscovery.CreateDiscoveryService("mychannel")
}

func setupTestSelection(discErr error, peers []fab.Peer) (*txnmocks.MockSelectionService, error) {

mockSelection, err := txnmocks.NewMockSelectionProvider(discErr, peers)
if err != nil {
return nil, errors.WithMessage(err, "NewMockSelectinProvider failed")
}

return mockSelection.CreateSelectionService("mychannel")
}
12 changes: 3 additions & 9 deletions pkg/client/common/discovery/discoveryfilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,16 @@ func TestDiscoveryFilter(t *testing.T) {
t.Fatalf(err.Error())
}

discoveryProvider, err := staticdiscovery.New(config1)
if err != nil {
t.Fatalf("Failed to setup discovery provider: %s", err)
}
discoveryProvider.Initialize(mocks.NewMockContext(mockmsp.NewMockSigningIdentity("user1", "Org1MSP")))

discoveryService, err := discoveryProvider.CreateDiscoveryService("mychannel")
discoveryService, err := staticdiscovery.NewService(config1, mocks.NewMockContext(mockmsp.NewMockSigningIdentity("user1", "Org1MSP")).InfraProvider(), "mychannel")
if err != nil {
t.Fatalf("Failed to setup discovery service: %s", err)
}

discoveryFilter := &mockFilter{called: false}

discoveryService = NewDiscoveryFilterService(discoveryService, discoveryFilter)
filteredService := NewDiscoveryFilterService(discoveryService, discoveryFilter)

peers, err := discoveryService.GetPeers()
peers, err := filteredService.GetPeers()
if err != nil {
t.Fatalf("Failed to get peers from discovery service: %s", err)
}
Expand Down
66 changes: 33 additions & 33 deletions pkg/client/common/discovery/dynamicdiscovery/chservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,74 +8,75 @@ package dynamicdiscovery

import (
discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client"
coptions "github.com/hyperledger/fabric-sdk-go/pkg/common/options"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
reqContext "github.com/hyperledger/fabric-sdk-go/pkg/context"
fabdiscovery "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery"
"github.com/pkg/errors"
)

// channelService implements a dynamic Discovery Service that queries
// ChannelService implements a dynamic Discovery Service that queries
// Fabric's Discovery service for information about the peers that
// are currently joined to the given channel.
type channelService struct {
type ChannelService struct {
*service
channelID string
}

// newChannelService creates a Discovery Service to query the list of member peers on a given channel.
func newChannelService(options options) *channelService {
logger.Debugf("Creating new dynamic discovery service with cache refresh interval %s", options.refreshInterval)

s := &channelService{}
s.service = newService(s.queryPeers, options)
return s
}

// Initialize initializes the service with channel context
func (s *channelService) Initialize(ctx contextAPI.Channel) error {
return s.service.Initialize(ctx)
// NewChannelService creates a Discovery Service to query the list of member peers on a given channel.
func NewChannelService(ctx contextAPI.Client, channelID string, opts ...coptions.Opt) (*ChannelService, error) {
logger.Debugf("Creating new dynamic discovery service")
s := &ChannelService{
channelID: channelID,
}
s.service = newService(ctx.EndpointConfig(), s.queryPeers, opts...)
err := s.service.initialize(ctx)
if err != nil {
return nil, err
}
return s, nil
}

func (s *channelService) channelContext() contextAPI.Channel {
return s.context().(contextAPI.Channel)
// Close releases resources
func (s *ChannelService) Close() {
logger.Debugf("Closing discovery service for channel [%s]", s.channelID)
s.service.Close()
}

func (s *channelService) queryPeers() ([]fab.Peer, error) {
logger.Debugf("Refreshing peers of channel [%s] from discovery service...", s.channelContext().ChannelID())
func (s *ChannelService) queryPeers() ([]fab.Peer, error) {
logger.Debugf("Refreshing peers of channel [%s] from discovery service...", s.channelID)

channelContext := s.channelContext()
if channelContext == nil {
return nil, errors.Errorf("the service has not been initialized")
}
ctx := s.context()

targets, err := s.getTargets(channelContext)
targets, err := s.getTargets(ctx)
if err != nil {
return nil, err
}
if len(targets) == 0 {
return nil, errors.Errorf("no peers configured for channel [%s]", channelContext.ChannelID())
return nil, errors.Errorf("no peers configured for channel [%s]", s.channelID)
}

reqCtx, cancel := reqContext.NewRequest(channelContext, reqContext.WithTimeout(s.responseTimeout))
reqCtx, cancel := reqContext.NewRequest(ctx, reqContext.WithTimeout(s.responseTimeout))
defer cancel()

req := discclient.NewRequest().OfChannel(channelContext.ChannelID()).AddPeersQuery()
req := discclient.NewRequest().OfChannel(s.channelID).AddPeersQuery()
responses, err := s.discoveryClient().Send(reqCtx, req, targets...)
if err != nil {
if len(responses) == 0 {
return nil, errors.Wrapf(err, "error calling discover service send")
}
logger.Warnf("Received %d response(s) and one or more errors from discovery client: %s", len(responses), err)
}
return s.evaluate(channelContext, responses)
return s.evaluate(ctx, responses)
}

func (s *channelService) getTargets(ctx contextAPI.Channel) ([]fab.PeerConfig, error) {
func (s *ChannelService) getTargets(ctx contextAPI.Client) ([]fab.PeerConfig, error) {
// TODO: The number of peers to query should be retrieved from the channel policy.
// This will done in a future patch.
chpeers, ok := ctx.EndpointConfig().ChannelPeers(ctx.ChannelID())
chpeers, ok := ctx.EndpointConfig().ChannelPeers(s.channelID)
if !ok {
return nil, errors.Errorf("failed to get peer configs for channel [%s]", ctx.ChannelID())
return nil, errors.Errorf("failed to get peer configs for channel [%s]", s.channelID)
}
targets := make([]fab.PeerConfig, len(chpeers))
for i := 0; i < len(targets); i++ {
Expand All @@ -85,19 +86,18 @@ func (s *channelService) getTargets(ctx contextAPI.Channel) ([]fab.PeerConfig, e
}

// evaluate validates the responses and returns the peers
func (s *channelService) evaluate(ctx contextAPI.Channel, responses []fabdiscovery.Response) ([]fab.Peer, error) {
func (s *ChannelService) evaluate(ctx contextAPI.Client, responses []fabdiscovery.Response) ([]fab.Peer, error) {
if len(responses) == 0 {
return nil, errors.New("no successful response received from any peer")
}

// TODO: In a future patch:
// - validate the signatures in the responses
// - ensure N responses match according to the policy
// For now just pick the first successful response

var lastErr error
for _, response := range responses {
endpoints, err := response.ForChannel(ctx.ChannelID()).Peers()
endpoints, err := response.ForChannel(s.channelID).Peers()
if err != nil {
lastErr = errors.Wrapf(err, "error getting peers from discovery response")
logger.Warn(lastErr.Error())
Expand Down
Loading

0 comments on commit 9bae250

Please sign in to comment.