Skip to content

Commit

Permalink
[FAB-8839] Use connection cache in event client
Browse files Browse the repository at this point in the history
The deliver and event hub client now use the
connection cache.

Change-Id: Iab7906eb006de4a27e97ab5de386077aa71ab99e
Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed Mar 13, 2018
1 parent cc87976 commit 8f5d6f4
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 22 deletions.
29 changes: 17 additions & 12 deletions pkg/fab/comm/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ SPDX-License-Identifier: Apache-2.0
package comm

import (
"context"
reqContext "context"
"sync/atomic"

"github.com/pkg/errors"

fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
"github.com/hyperledger/fabric-sdk-go/pkg/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
"github.com/hyperledger/fabric-sdk-go/pkg/core/config/comm"
Expand All @@ -34,6 +35,7 @@ type GRPCConnection struct {
chConfig fab.ChannelCfg
conn *grpc.ClientConn
stream grpc.ClientStream
commManager fab.CommManager
tlsCertHash []byte
done int32
}
Expand All @@ -52,20 +54,22 @@ func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvide
return nil, err
}

grpcctx := context.Background()
grpcctx, cancel := context.WithTimeout(grpcctx, params.connectTimeout)
reqCtx, cancel := reqContext.WithTimeout(context.NewRequest(ctx), params.connectTimeout)
defer cancel()

grpcconn, err := grpc.DialContext(grpcctx, endpoint.ToAddress(url), dialOpts...)
commManager, ok := context.RequestCommManager(reqCtx)
if !ok {
return nil, errors.New("unable to get comm manager")
}

grpcconn, err := commManager.DialContext(reqCtx, endpoint.ToAddress(url), dialOpts...)
if err != nil {
return nil, errors.Wrapf(err, "could not connect to %s", url)
}

stream, err := streamProvider(grpcconn)
if err != nil {
if closeErr := grpcconn.Close(); closeErr != nil {
logger.Warnf("error closing GRPC connection: %s", closeErr)
}
commManager.ReleaseConn(grpcconn)
return nil, errors.Wrapf(err, "could not create stream to %s", url)
}

Expand All @@ -76,6 +80,7 @@ func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvide
return &GRPCConnection{
context: ctx,
chConfig: chConfig,
commManager: commManager,
conn: grpcconn,
stream: stream,
tlsCertHash: comm.TLSCertHash(ctx.Config()),
Expand All @@ -94,15 +99,15 @@ func (c *GRPCConnection) Close() {
return
}

logger.Debugf("Closing stream....")
logger.Debug("Closing stream....")
if err := c.stream.CloseSend(); err != nil {
logger.Warnf("error closing GRPC stream: %s", err)
}

logger.Debugf("Closing connection....")
if err := c.conn.Close(); err != nil {
logger.Warnf("error closing GRPC connection: %s", err)
}
logger.Debug("Releasing connection....")
c.commManager.ReleaseConn(c.conn)

logger.Debug("... connection successfully closed.")
}

// Closed returns true if the connection has been closed
Expand Down
10 changes: 6 additions & 4 deletions pkg/fab/comm/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"google.golang.org/grpc/keepalive"

fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context"
eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks"
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"

Expand Down Expand Up @@ -43,6 +42,7 @@ func TestConnection(t *testing.T) {
_, err = NewConnection(context, chConfig, testStream, "invalidhost:0000",
WithFailFast(true),
WithCertificate(nil),
WithInsecure(),
WithHostOverride(""),
WithKeepAliveParams(keepalive.ClientParameters{}),
WithConnectTimeout(3*time.Second),
Expand Down Expand Up @@ -80,10 +80,12 @@ func TestConnection(t *testing.T) {
conn.Close()
}

// Use the Deliver server for testing
// Use the Event Hub server for testing
var testServer *eventmocks.MockEventhubServer
var endorserAddr []string

func newMockContext() fabcontext.Client {
return fabmocks.NewMockContext(fabmocks.NewMockUser("test"))
func newMockContext() *fabmocks.MockContext {
context := fabmocks.NewMockContext(fabmocks.NewMockUser("test"))
context.SetCustomInfraProvider(NewMockInfraProvider())
return context
}
48 changes: 48 additions & 0 deletions pkg/fab/comm/mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package comm

import (
"context"

"github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab"
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
"google.golang.org/grpc"
)

// MockCommManager is a non-caching comm manager used
// for unit testing
type MockCommManager struct {
}

// DialContext creates a connection
func (m *MockCommManager) DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return grpc.DialContext(ctx, target, opts...)
}

// ReleaseConn closes the connection
func (m *MockCommManager) ReleaseConn(conn *grpc.ClientConn) {
if err := conn.Close(); err != nil {
logger.Warnf("Error closing connection: %s", err)
}
}

// MockInfraProvider overrides the comm manager to return
// the MockCommManager
type MockInfraProvider struct {
fabmocks.MockInfraProvider
}

// NewMockInfraProvider return a new MockInfraProvider
func NewMockInfraProvider() *MockInfraProvider {
return &MockInfraProvider{}
}

// CommManager returns the MockCommManager
func (f *MockInfraProvider) CommManager() fab.CommManager {
return &MockCommManager{}
}
4 changes: 3 additions & 1 deletion pkg/fab/events/deliverclient/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,5 +220,7 @@ func TestMain(m *testing.M) {
}

func newMockContext() *fabmocks.MockContext {
return fabmocks.NewMockContext(fabmocks.NewMockUser("user1"))
context := fabmocks.NewMockContext(fabmocks.NewMockUser("user1"))
context.SetCustomInfraProvider(comm.NewMockInfraProvider())
return context
}
8 changes: 5 additions & 3 deletions pkg/fab/events/eventhubclient/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"testing"
"time"

fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/fab/comm"
clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher"
eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks"
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
Expand Down Expand Up @@ -198,6 +198,8 @@ func newPeerConfig(eventURL string) *core.PeerConfig {
}
}

func newMockContext() fabcontext.Client {
return fabmocks.NewMockContext(fabmocks.NewMockUser("user1"))
func newMockContext() *fabmocks.MockContext {
context := fabmocks.NewMockContext(fabmocks.NewMockUser("user1"))
context.SetCustomInfraProvider(comm.NewMockInfraProvider())
return context
}
7 changes: 5 additions & 2 deletions pkg/fabsdk/provider/fabpvdr/fabpvdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,13 @@ func (f *InfraProvider) Initialize(providers context.Providers) error {

// Close frees resources and caches.
func (f *InfraProvider) Close() {
logger.Debug("Closing comm manager...")
f.commManager.Close()
logger.Debug("Closing event service cache...")
f.eventServiceCache.Close()

// Comm Manager must be closed last since other resources
// may still be using it.
logger.Debug("Closing comm manager...")
f.commManager.Close()
}

// CommManager provides comm support such as GRPC onnections
Expand Down

0 comments on commit 8f5d6f4

Please sign in to comment.