Skip to content

Commit

Permalink
[FAB-9555] Local Discovery Provider
Browse files Browse the repository at this point in the history
Create a Local Discovery Provider that can
be plugged into the SDK in order to retrieve
peers from the local MSP.

Change-Id: Ida30eb96bc63f5bd94ecd0f5f2bcf5160a62ba8f
Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed Apr 22, 2018
1 parent dd02971 commit 637b655
Show file tree
Hide file tree
Showing 31 changed files with 944 additions and 331 deletions.
104 changes: 104 additions & 0 deletions pkg/client/common/discovery/dynamicdiscovery/chservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package dynamicdiscovery

import (
discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
reqContext "github.com/hyperledger/fabric-sdk-go/pkg/context"
fabdiscovery "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery"
"github.com/pkg/errors"
)

// channelService implements a dynamic Discovery Service that queries
// Fabric's Discovery service for information about the peers that
// are currently joined to the given channel.
type channelService struct {
*service
}

// newChannelService creates a Discovery Service to query the list of member peers on a given channel.
func newChannelService(options options) *channelService {
logger.Debugf("Creating new dynamic discovery service with cache refresh interval %s", options.refreshInterval)

s := &channelService{}
s.service = newService(s.queryPeers, options)
return s
}

// Initialize initializes the service with channel context
func (s *channelService) Initialize(ctx contextAPI.Channel) error {
return s.service.Initialize(ctx)
}

func (s *channelService) channelContext() contextAPI.Channel {
return s.context().(contextAPI.Channel)
}

func (s *channelService) queryPeers() ([]fab.Peer, error) {
logger.Debugf("Refreshing peers of channel [%s] from discovery service...", s.channelContext().ChannelID())

channelContext := s.channelContext()
if channelContext == nil {
return nil, errors.Errorf("the service has not been initialized")
}

targets, err := s.getTargets(channelContext)
if err != nil {
return nil, err
}
if len(targets) == 0 {
return nil, errors.Errorf("no peers configured for channel [%s]", channelContext.ChannelID())
}

reqCtx, cancel := reqContext.NewRequest(channelContext, reqContext.WithTimeout(s.responseTimeout))
defer cancel()

req := discclient.NewRequest().OfChannel(channelContext.ChannelID()).AddPeersQuery()
responses, err := s.discoveryClient().Send(reqCtx, req, targets...)
if err != nil {
if len(responses) == 0 {
return nil, errors.Wrapf(err, "error calling discover service send")
}
logger.Warnf("Received %d response(s) and one or more errors from discovery client: %s", len(responses), err)
}
return s.evaluate(channelContext, responses)
}

func (s *channelService) getTargets(ctx contextAPI.Channel) ([]fab.PeerConfig, error) {
// TODO: The number of peers to query should be retrieved from the channel policy.
// This will done in a future patch.
chpeers, err := ctx.EndpointConfig().ChannelPeers(ctx.ChannelID())
if err != nil {
return nil, errors.Wrapf(err, "failed to get peer configs for channel [%s]", ctx.ChannelID())
}
targets := make([]fab.PeerConfig, len(chpeers))
for i := 0; i < len(targets); i++ {
targets[i] = chpeers[i].NetworkPeer.PeerConfig
}
return targets, nil
}

// evaluate validates the responses and returns the peers
func (s *channelService) evaluate(ctx contextAPI.Channel, responses []fabdiscovery.Response) ([]fab.Peer, error) {
if len(responses) == 0 {
return nil, errors.New("no successful response received from any peer")
}

// TODO: In a future patch:
// - validate the signatures in the responses
// - ensure N responses match according to the policy
// For now just pick the first response
response := responses[0]
endpoints, err := response.ForChannel(ctx.ChannelID()).Peers()
if err != nil {
return nil, errors.Wrapf(err, "error getting peers from discovery response")
}

return asPeers(ctx, endpoints), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ func TestDiscoveryService(t *testing.T) {
},
)

clientProvider = func(ctx contextAPI.Client) (discoverClient, error) {
clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) {
return discClient, nil
}

membershipService := newService(
membershipService := newChannelService(
options{
refreshInterval: 500 * time.Millisecond,
responseTimeout: 2 * time.Second,
Expand Down
103 changes: 103 additions & 0 deletions pkg/client/common/discovery/dynamicdiscovery/localservice.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package dynamicdiscovery

import (
discclient "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/discovery/client"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
reqContext "github.com/hyperledger/fabric-sdk-go/pkg/context"
"github.com/pkg/errors"
)

// LocalService implements a dynamic Discovery Service that queries
// Fabric's Discovery service for the peers that are in the local MSP.
type LocalService struct {
*service
}

// newLocalService creates a Local Discovery Service to query the list of member peers in the local MSP.
func newLocalService(options options) *LocalService {
logger.Debugf("Creating new dynamic discovery service with cache refresh interval %s", options.refreshInterval)

s := &LocalService{}
s.service = newService(s.queryPeers, options)
return s
}

// Initialize initializes the service with local context
func (s *LocalService) Initialize(ctx contextAPI.Local) error {
return s.service.Initialize(ctx)
}

func (s *LocalService) localContext() contextAPI.Local {
return s.context().(contextAPI.Local)
}

func (s *LocalService) queryPeers() ([]fab.Peer, error) {
logger.Debugf("Refreshing local peers from discovery service...")

ctx := s.localContext()
if ctx == nil {
return nil, errors.Errorf("the service has not been initialized")
}

target, err := s.getTarget(ctx)
if err != nil {
return nil, err
}

reqCtx, cancel := reqContext.NewRequest(ctx, reqContext.WithTimeout(s.responseTimeout))
defer cancel()

req := discclient.NewRequest().AddLocalPeersQuery()
responses, err := s.discoveryClient().Send(reqCtx, req, *target)
if err != nil {
return nil, errors.Wrapf(err, "error calling discover service send")
}
if len(responses) == 0 {
return nil, errors.Wrapf(err, "expecting 1 response from discover service send but got none")
}

response := responses[0]
endpoints, err := response.ForLocal().Peers()
if err != nil {
return nil, errors.Wrapf(err, "error getting peers from discovery response")
}

return s.filterLocalMSP(asPeers(ctx, endpoints)), nil
}

func (s *LocalService) getTarget(ctx contextAPI.Client) (*fab.PeerConfig, error) {
peers, err := ctx.EndpointConfig().NetworkPeers()
if err != nil {
return nil, errors.Wrapf(err, "failed to get peer configs")
}
mspID := ctx.Identifier().MSPID
for _, p := range peers {
// Need to go to a peer with the local MSPID, otherwise the request will be rejected
if p.MSPID == mspID {
return &p.PeerConfig, nil
}
}
return nil, errors.Errorf("no bootstrap peers configured for MSP [%s]", mspID)
}

// Even though the local peer query should only return peers in the local
// MSP, this function double checks and logs a warning if this is not the case.
func (s *LocalService) filterLocalMSP(peers []fab.Peer) []fab.Peer {
localMSPID := s.ctx.Identifier().MSPID
var filteredPeers []fab.Peer
for _, p := range peers {
if p.MSPID() != localMSPID {
logger.Warnf("Peer [%s] is not part of the local MSP [%s] but in MSP [%s]", p.URL(), localMSPID, p.MSPID())
} else {
filteredPeers = append(filteredPeers, p)
}
}
return filteredPeers
}
119 changes: 119 additions & 0 deletions pkg/client/common/discovery/dynamicdiscovery/localservice_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package dynamicdiscovery

import (
"testing"
"time"

dyndiscmocks "github.com/hyperledger/fabric-sdk-go/pkg/client/common/discovery/dynamicdiscovery/mocks"
contextAPI "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
pfab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
discmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/discovery/mocks"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp"
"github.com/stretchr/testify/assert"
)

const (
peer2MSP1 = "peer2.org1.com:9999"
)

func TestLocalDiscoveryService(t *testing.T) {
ctx := mocks.NewMockContext(mspmocks.NewMockSigningIdentity("test", mspID1))
config := &mocks.MockConfig{}
ctx.SetEndpointConfig(config)

localCtx := mocks.NewMockLocalContext(ctx, nil)

peer1 := pfab.NetworkPeer{
PeerConfig: pfab.PeerConfig{
URL: peer1MSP1,
},
MSPID: mspID1,
}
config.SetCustomNetworkPeerCfg([]pfab.NetworkPeer{peer1})

discClient := dyndiscmocks.NewMockDiscoveryClient()
discClient.SetResponses(
&dyndiscmocks.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{},
},
)

clientProvider = func(ctx contextAPI.Client) (discoveryClient, error) {
return discClient, nil
}

service := newLocalService(
options{
refreshInterval: 500 * time.Millisecond,
responseTimeout: 2 * time.Second,
},
)
defer service.Close()

err := service.Initialize(localCtx)
assert.NoError(t, err)
// Initialize again should produce no error
err = service.Initialize(localCtx)
assert.NoError(t, err)

peers, err := service.GetPeers()
assert.NoError(t, err)
assert.Equal(t, 0, len(peers))

discClient.SetResponses(
&dyndiscmocks.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{
{
MSPID: mspID1,
Endpoint: peer1MSP1,
LedgerHeight: 5,
},
},
},
)

time.Sleep(time.Second)

peers, err = service.GetPeers()
assert.NoError(t, err)
assert.Equal(t, 1, len(peers))

discClient.SetResponses(
&dyndiscmocks.MockDiscoverEndpointResponse{
PeerEndpoints: []*discmocks.MockDiscoveryPeerEndpoint{
{
MSPID: mspID1,
Endpoint: peer1MSP1,
LedgerHeight: 5,
},
{
MSPID: mspID1,
Endpoint: peer2MSP1,
LedgerHeight: 5,
},
{
MSPID: mspID2,
Endpoint: peer1MSP2,
LedgerHeight: 5,
},
},
},
)

time.Sleep(time.Second)

peers, err = service.GetPeers()
assert.NoError(t, err)
assert.Equal(t, 2, len(peers))

for _, p := range peers {
assert.Equalf(t, mspID1, p.MSPID(), "Expecting peer to be in MSP [%s]", mspID1)
}
}
18 changes: 15 additions & 3 deletions pkg/client/common/discovery/dynamicdiscovery/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,16 @@ func New(config fab.EndpointConfig, opts ...Opt) *Provider {
}

return &Provider{
cache: lazycache.New("Discovery_Service_Cache", func(key lazycache.Key) (interface{}, error) {
return newService(options), nil
cache: lazycache.New("Dynamic_Discovery_Service_Cache", func(key lazycache.Key) (interface{}, error) {
if key.String() == "" {
return newLocalService(options), nil
}
return newChannelService(options), nil
}),
}
}

// CreateDiscoveryService will create a new membership service
// CreateDiscoveryService returns a discovery service for the given channel
func (p *Provider) CreateDiscoveryService(channelID string) (fab.DiscoveryService, error) {
ref, err := p.cache.Get(lazycache.NewStringKey(channelID))
if err != nil {
Expand All @@ -77,6 +80,15 @@ func (p *Provider) CreateDiscoveryService(channelID string) (fab.DiscoveryServic
return ref.(fab.DiscoveryService), nil
}

// CreateLocalDiscoveryService returns a local discovery service
func (p *Provider) CreateLocalDiscoveryService() (fab.DiscoveryService, error) {
ref, err := p.cache.Get(lazycache.NewStringKey(""))
if err != nil {
return nil, errors.WithMessage(err, "failed to get local discovery service from cache")
}
return ref.(fab.DiscoveryService), nil
}

// Close will close the cache and all services contained by the cache.
func (p *Provider) Close() {
p.cache.Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ func TestDiscoveryProvider(t *testing.T) {

chCtx := mocks.NewMockChannelContext(ctx, ch)

err = service.(*Service).Initialize(chCtx)
err = service.(*channelService).Initialize(chCtx)
assert.NoError(t, err)

localService, err := p.CreateLocalDiscoveryService()
assert.NoError(t, err)

localCtx := mocks.NewMockLocalContext(ctx, nil)
err = localService.(*LocalService).Initialize(localCtx)
assert.NoError(t, err)
}

Expand Down
Loading

0 comments on commit 637b655

Please sign in to comment.