Skip to content

Commit

Permalink
[FAB-9412] Remove stream from connection
Browse files Browse the repository at this point in the history
The comm/connection should be split into
two: connection and streamconnection since
not all connections have a stream.

Change-Id: I4bb106f2097842ab8606dd0890bb2c39ac4575a8
Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed Apr 9, 2018
1 parent 3e7d1d1 commit 9f82f8e
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 87 deletions.
49 changes: 6 additions & 43 deletions pkg/fab/comm/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/hyperledger/fabric-sdk-go/pkg/core/config/endpoint"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
)

var logger = logging.NewLogger("fabsdk/fab")
Expand All @@ -33,22 +32,18 @@ const (
maxCallSendMsgSize = 100 * 1024 * 1024
)

// StreamProvider creates a GRPC stream
type StreamProvider func(conn *grpc.ClientConn) (grpc.ClientStream, error)

// GRPCConnection manages the GRPC connection and client stream
type GRPCConnection struct {
context fabcontext.Client
chConfig fab.ChannelCfg
conn *grpc.ClientConn
stream grpc.ClientStream
commManager fab.CommManager
tlsCertHash []byte
done int32
}

// NewConnection creates a new connection
func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamProvider, url string, opts ...options.Opt) (*GRPCConnection, error) {
func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, url string, opts ...options.Opt) (*GRPCConnection, error) {
if url == "" {
return nil, errors.New("server URL not specified")
}
Expand All @@ -74,38 +69,11 @@ func NewConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvide
return nil, errors.Wrapf(err, "could not connect to %s", url)
}

stream, err := streamProvider(grpcconn)
if err != nil {
commManager.ReleaseConn(grpcconn)
return nil, errors.Wrapf(err, "could not create stream to %s", url)
}

if stream == nil {
return nil, errors.New("unexpected nil stream received from provider")
}
peer, ok := peer.FromContext(stream.Context())
if !ok || peer == nil {
//return error - certificate is not available
return nil, errors.Wrapf(err, "No peer cert in GRPC stream")

}
if peer.AuthInfo != nil {
tlsInfo := peer.AuthInfo.(credentials.TLSInfo)
for _, peercert := range tlsInfo.State.PeerCertificates {
err := verifier.ValidateCertificateDates(peercert)
if err != nil {
//log and return error
logger.Error(err)
return nil, errors.Wrapf(err, "Got error while validating certificate dates for [%v]", peercert.Subject)
}
}
}
return &GRPCConnection{
context: ctx,
chConfig: chConfig,
commManager: commManager,
conn: grpcconn,
stream: stream,
tlsCertHash: comm.TLSCertHash(ctx.EndpointConfig()),
}, nil
}
Expand All @@ -115,18 +83,18 @@ func (c *GRPCConnection) ChannelConfig() fab.ChannelCfg {
return c.chConfig
}

// ClientConn returns the underlying GRPC connection
func (c *GRPCConnection) ClientConn() *grpc.ClientConn {
return c.conn
}

// Close closes the connection
func (c *GRPCConnection) Close() {
if !c.setClosed() {
logger.Debugf("Already closed")
return
}

logger.Debug("Closing stream....")
if err := c.stream.CloseSend(); err != nil {
logger.Warnf("error closing GRPC stream: %s", err)
}

logger.Debug("Releasing connection....")
c.commManager.ReleaseConn(c.conn)

Expand All @@ -142,11 +110,6 @@ func (c *GRPCConnection) setClosed() bool {
return atomic.CompareAndSwapInt32(&c.done, 0, 1)
}

// Stream returns the GRPC stream
func (c *GRPCConnection) Stream() grpc.Stream {
return c.stream
}

// TLSCertHash returns the hash of the TLS cert
func (c *GRPCConnection) TLSCertHash() []byte {
return c.tlsCertHash
Expand Down
38 changes: 2 additions & 36 deletions pkg/fab/comm/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,65 +7,31 @@ SPDX-License-Identifier: Apache-2.0
package comm

import (
"context"
"testing"
"time"

"google.golang.org/grpc/keepalive"

eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks"
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"
mspmocks "github.com/hyperledger/fabric-sdk-go/pkg/msp/test/mockmsp"

pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
"google.golang.org/grpc"
)

var testStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {
return pb.NewDeliverClient(grpcconn).Deliver(context.Background())
}

var invalidStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {
return nil, errors.New("simulated error creating stream")
}

func TestConnection(t *testing.T) {
channelID := "testchannel"

context := newMockContext()
chConfig := fabmocks.NewMockChannelCfg(channelID)

_, err := NewConnection(context, chConfig, testStream, "")
_, err := NewConnection(context, chConfig, "")
if err == nil {
t.Fatalf("expected error creating new connection with empty URL")
}
_, err = NewConnection(context, chConfig, testStream, "invalidhost:0000",
WithFailFast(true),
WithCertificate(nil),
WithInsecure(),
WithHostOverride(""),
WithKeepAliveParams(keepalive.ClientParameters{}),
WithConnectTimeout(3*time.Second),
)
if err == nil {
t.Fatalf("expected error creating new connection with invalid URL")
}
_, err = NewConnection(context, chConfig, invalidStream, peerURL)
if err == nil {
t.Fatalf("expected error creating new connection with invalid stream but got none")
}

conn, err := NewConnection(context, chConfig, testStream, peerURL)
conn, err := NewConnection(context, chConfig, peerURL)
if err != nil {
t.Fatalf("error creating new connection: %s", err)
}
if conn.Closed() {
t.Fatalf("expected connection to be open")
}
if conn.Stream() == nil {
t.Fatalf("got invalid stream")
}
if _, err := context.Serialize(); err != nil {
t.Fatalf("error getting identity")
}
Expand Down
94 changes: 94 additions & 0 deletions pkg/fab/comm/streamconnection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package comm

import (
"sync"

"github.com/pkg/errors"

"github.com/hyperledger/fabric-sdk-go/pkg/client/common/verifier"
"github.com/hyperledger/fabric-sdk-go/pkg/common/options"
fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
)

// StreamProvider creates a GRPC stream
type StreamProvider func(conn *grpc.ClientConn) (grpc.ClientStream, error)

// StreamConnection manages the GRPC connection and client stream
type StreamConnection struct {
*GRPCConnection
stream grpc.ClientStream
lock sync.Mutex
}

// NewStreamConnection creates a new connection with stream
func NewStreamConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamProvider, url string, opts ...options.Opt) (*StreamConnection, error) {
conn, err := NewConnection(ctx, chConfig, url, opts...)
if err != nil {
return nil, err
}

stream, err := streamProvider(conn.conn)
if err != nil {
conn.commManager.ReleaseConn(conn.conn)
return nil, errors.Wrapf(err, "could not create stream to %s", url)
}

if stream == nil {
return nil, errors.New("unexpected nil stream received from provider")
}

peer, ok := peer.FromContext(stream.Context())
if !ok || peer == nil {
//return error - certificate is not available
return nil, errors.Wrapf(err, "No peer cert in GRPC stream")

}

if peer.AuthInfo != nil {
tlsInfo := peer.AuthInfo.(credentials.TLSInfo)
for _, peercert := range tlsInfo.State.PeerCertificates {
err := verifier.ValidateCertificateDates(peercert)
if err != nil {
logger.Error(err)
return nil, errors.Wrapf(err, "error validating certificate dates for [%v]", peercert.Subject)
}
}
}

return &StreamConnection{
GRPCConnection: conn,
stream: stream,
}, nil
}

// Close closes the connection
func (c *StreamConnection) Close() {
c.lock.Lock()
defer c.lock.Unlock()

if c.Closed() {
return
}

logger.Debug("Closing stream....")
if err := c.stream.CloseSend(); err != nil {
logger.Warnf("error closing GRPC stream: %s", err)
}

c.GRPCConnection.Close()
}

// Stream returns the GRPC stream
func (c *StreamConnection) Stream() grpc.Stream {
return c.stream
}
80 changes: 80 additions & 0 deletions pkg/fab/comm/streamconnection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package comm

import (
"context"
"testing"
"time"

"google.golang.org/grpc/keepalive"

fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"

pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
"google.golang.org/grpc"
)

var testStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {
return pb.NewDeliverClient(grpcconn).Deliver(context.Background())
}

var invalidStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {
return nil, errors.New("simulated error creating stream")
}

func TestStreamConnection(t *testing.T) {
channelID := "testchannel"

context := newMockContext()
chConfig := fabmocks.NewMockChannelCfg(channelID)

_, err := NewStreamConnection(context, chConfig, testStream, "")
if err == nil {
t.Fatalf("expected error creating new connection with empty URL")
}
_, err = NewStreamConnection(context, chConfig, testStream, "invalidhost:0000",
WithFailFast(true),
WithCertificate(nil),
WithInsecure(),
WithHostOverride(""),
WithKeepAliveParams(keepalive.ClientParameters{}),
WithConnectTimeout(3*time.Second),
)
if err == nil {
t.Fatalf("expected error creating new connection with invalid URL")
}
_, err = NewStreamConnection(context, chConfig, invalidStream, peerURL)
if err == nil {
t.Fatalf("expected error creating new connection with invalid stream but got none")
}

conn, err := NewStreamConnection(context, chConfig, testStream, peerURL)
if err != nil {
t.Fatalf("error creating new connection: %s", err)
}
if conn.Closed() {
t.Fatalf("expected connection to be open")
}
if conn.Stream() == nil {
t.Fatalf("got invalid stream")
}
if _, err := context.Serialize(); err != nil {
t.Fatalf("error getting identity")
}

time.Sleep(1 * time.Second)

conn.Close()
if !conn.Closed() {
t.Fatalf("expected connection to be closed")
}

// Calling close again should be ignored
conn.Close()
}
8 changes: 4 additions & 4 deletions pkg/fab/events/deliverclient/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type deliverStream interface {

// DeliverConnection manages the connection to the deliver server
type DeliverConnection struct {
comm.GRPCConnection
*comm.StreamConnection
url string
}

Expand All @@ -61,7 +61,7 @@ var (
// New returns a new Deliver Server connection
func New(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamProvider, url string, opts ...options.Opt) (*DeliverConnection, error) {
logger.Debugf("Connecting to %s...", url)
connect, err := comm.NewConnection(
connect, err := comm.NewStreamConnection(
ctx, chConfig,
func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {
return streamProvider(pb.NewDeliverClient(grpcconn))
Expand All @@ -73,8 +73,8 @@ func New(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamPr
}

return &DeliverConnection{
GRPCConnection: *connect,
url: url,
StreamConnection: connect,
url: url,
}, nil
}

Expand Down
Loading

0 comments on commit 9f82f8e

Please sign in to comment.