Skip to content

Commit

Permalink
[FAB-8397] Event Hub Client Implementation
Browse files Browse the repository at this point in the history
Provide an Event Hub implementation for the
event client API.

Change-Id: I578598908bf4e0e9fb1ea154beeab86d13f643a3
Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn committed Feb 25, 2018
1 parent 983e95a commit e9ea8df
Show file tree
Hide file tree
Showing 18 changed files with 2,359 additions and 6 deletions.
153 changes: 153 additions & 0 deletions pkg/fab/comm/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package comm

import (
"context"
"sync/atomic"

"github.com/pkg/errors"

fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
"github.com/hyperledger/fabric-sdk-go/pkg/core/config/comm"
"github.com/hyperledger/fabric-sdk-go/pkg/core/config/urlutil"
"github.com/hyperledger/fabric-sdk-go/pkg/logging"
"github.com/hyperledger/fabric-sdk-go/pkg/options"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

var logger = logging.NewLogger("fabric_sdk_go")

// StreamProvider creates a GRPC stream
type StreamProvider func(conn *grpc.ClientConn) (grpc.ClientStream, error)

// GRPCConnection manages the GRPC connection and client stream
type GRPCConnection struct {
channelID string
conn *grpc.ClientConn
stream grpc.ClientStream
context fabcontext.Context
tlsCertHash []byte
done int32
}

// NewConnection creates a new connection
func NewConnection(ctx fabcontext.Context, channelID string, streamProvider StreamProvider, url string, opts ...options.Opt) (*GRPCConnection, error) {
if url == "" {
return nil, errors.New("server URL not specified")
}

params := defaultParams()
options.Apply(params, opts)

dialOpts, err := newDialOpts(ctx.Config(), url, params)
if err != nil {
return nil, err
}

grpcctx := context.Background()
grpcctx, cancel := context.WithTimeout(grpcctx, params.connectTimeout)
defer cancel()

grpcconn, err := grpc.DialContext(grpcctx, urlutil.ToAddress(url), dialOpts...)
if err != nil {
return nil, errors.Wrapf(err, "could not connect to %s", url)
}

stream, err := streamProvider(grpcconn)
if err != nil {
if err := grpcconn.Close(); err != nil {
logger.Warnf("error closing GRPC connection: %s", err)
}
return nil, errors.Wrapf(err, "could not create stream to %s", url)
}

if stream == nil {
return nil, errors.New("unexpected nil stream received from provider")
}

return &GRPCConnection{
channelID: channelID,
conn: grpcconn,
stream: stream,
context: ctx,
tlsCertHash: comm.TLSCertHash(ctx.Config()),
}, nil
}

// ChannelID returns the ID of the channel
func (c *GRPCConnection) ChannelID() string {
return c.channelID
}

// Close closes the connection
func (c *GRPCConnection) Close() {
if !c.setClosed() {
logger.Debugf("Already closed")
return
}

logger.Debugf("Closing stream....")
if err := c.stream.CloseSend(); err != nil {
logger.Warnf("error closing GRPC stream: %s", err)
}

logger.Debugf("Closing connection....")
if err := c.conn.Close(); err != nil {
logger.Warnf("error closing GRPC connection: %s", err)
}
}

// Closed returns true if the connection has been closed
func (c *GRPCConnection) Closed() bool {
return atomic.LoadInt32(&c.done) == 1
}

func (c *GRPCConnection) setClosed() bool {
return atomic.CompareAndSwapInt32(&c.done, 0, 1)
}

// Stream returns the GRPC stream
func (c *GRPCConnection) Stream() grpc.Stream {
return c.stream
}

// TLSCertHash returns the hash of the TLS cert
func (c *GRPCConnection) TLSCertHash() []byte {
return c.tlsCertHash
}

// Context returns the context of the client establishing the connection
func (c *GRPCConnection) Context() fabcontext.Context {
return c.context
}

func newDialOpts(config core.Config, url string, params *params) ([]grpc.DialOption, error) {
var dialOpts []grpc.DialOption

if params.keepAliveParams.Time > 0 || params.keepAliveParams.Timeout > 0 {
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(params.keepAliveParams))
}

dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.FailFast(params.failFast)))

if urlutil.IsTLSEnabled(url) {
tlsConfig, err := comm.TLSConfig(params.certificate, params.hostOverride, config)
if err != nil {
return nil, err
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
logger.Debugf("Creating a secure connection to [%s] with TLS HostOverride [%s]", url, params.hostOverride)
} else {
logger.Debugf("Creating an insecure connection [%s]", url)
dialOpts = append(dialOpts, grpc.WithInsecure())
}

return dialOpts, nil
}
125 changes: 125 additions & 0 deletions pkg/fab/comm/connection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package comm

import (
"context"
"fmt"
"net"
"os"
"testing"
"time"

"google.golang.org/grpc/keepalive"

fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/context"
"github.com/hyperledger/fabric-sdk-go/pkg/context/api/core"
eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks"
fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks"

pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
"google.golang.org/grpc"
)

const (
peerAddress = "localhost:9999"
peerURL = "grpc://" + peerAddress
)

var testStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {
return pb.NewDeliverClient(grpcconn).Deliver(context.Background())
}

var invalidStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {
return nil, errors.New("simulated error creating stream")
}

func TestConnection(t *testing.T) {
channelID := "testchannel"

context := newMockContext()

conn, err := NewConnection(context, channelID, testStream, "")
if err == nil {
t.Fatalf("expected error creating new connection with empty URL")
}
conn, err = NewConnection(context, channelID, testStream, "invalidhost:0000",
WithFailFast(true),
WithCertificate(nil),
WithHostOverride(""),
WithKeepAliveParams(keepalive.ClientParameters{}),
WithConnectTimeout(3*time.Second),
)
if err == nil {
t.Fatalf("expected error creating new connection with invalid URL")
}
conn, err = NewConnection(context, channelID, invalidStream, peerURL)
if err == nil {
t.Fatalf("expected error creating new connection with invalid stream but got none")
}

conn, err = NewConnection(context, channelID, testStream, peerURL)
if err != nil {
t.Fatalf("error creating new connection: %s", err)
}
if conn.Closed() {
t.Fatalf("expected connection to be open")
}
if conn.ChannelID() != channelID {
t.Fatalf("expected channel ID [%s] but got [%s]", channelID, conn.ChannelID())
}
if conn.Stream() == nil {
t.Fatalf("got invalid stream")
}
if _, err := context.Identity(); err != nil {
t.Fatalf("error getting identity")
}

time.Sleep(1 * time.Second)

conn.Close()
if !conn.Closed() {
t.Fatalf("expected connection to be closed")
}

// Calling close again should be ignored
conn.Close()
}

// Use the Deliver server for testing
var testServer *eventmocks.MockEventhubServer

func TestMain(m *testing.M) {
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)

lis, err := net.Listen("tcp", peerAddress)
if err != nil {
panic(fmt.Sprintf("Error starting events listener %s", err))
}

testServer = eventmocks.NewMockEventhubServer()

pb.RegisterEventsServer(grpcServer, testServer)

go grpcServer.Serve(lis)

time.Sleep(2 * time.Second)
os.Exit(m.Run())
}

func newPeerConfig(peerURL string) *core.PeerConfig {
return &core.PeerConfig{
URL: peerURL,
GRPCOptions: make(map[string]interface{}),
}
}

func newMockContext() fabcontext.Context {
return fabmocks.NewMockContext(fabmocks.NewMockUser("test"))
}
Loading

0 comments on commit e9ea8df

Please sign in to comment.