Skip to content

Commit

Permalink
[FAB-7512]Expose GRPC(keep-alive and failfast)
Browse files Browse the repository at this point in the history
Change-Id: Iea1e9286ad4f89a028f2d833437e4ff3c8cba248
Signed-off-by: biljana lukovic <[email protected]>
  • Loading branch information
biljanaLukovic committed Feb 14, 2018
1 parent 07808ad commit ebb750a
Show file tree
Hide file tree
Showing 15 changed files with 559 additions and 84 deletions.
15 changes: 15 additions & 0 deletions pkg/config/testdata/template/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,21 @@ orderers:
# grpcOptions:
# ssl-target-name-override: orderer.example.com
# grpc-max-send-message-length: 15
# #these are keep alive client parameters:
# Make sure these parameters are set in coordination with the keepalive policy on the server,
# as incompatible settings can result in closing of connection.
# After a duration of this time if the client doesn't see any activity
# it pings the server to see if the transport is still alive.
# The current default value is infinity
# keep-alive-time: 5s
# After having pinged for keepalive check, the client waits for a duration of Timeout
# and if no activity is seen even after that the connection is closed.
# The current default value is 20 seconds
# keep-alive-timeout: 6s
# If true, client runs keepalive checks even with no active RPCs
# keep-alive-permit: false
#fail-fast is action to take when an RPC is attempted on broken connections or unreachable servers
# fail-fast: true

# tlsCACerts:
# Certificate location absolute path
Expand Down
29 changes: 23 additions & 6 deletions pkg/fabric-client/events/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ SPDX-License-Identifier: Apache-2.0
package consumer

import (
"context"
"io"
"sync"
"time"

"golang.org/x/net/context"

"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"

"github.com/golang/protobuf/ptypes"
apiconfig "github.com/hyperledger/fabric-sdk-go/api/apiconfig"
Expand Down Expand Up @@ -49,10 +51,14 @@ type eventsClient struct {
provider fab.ProviderContext
identity fab.IdentityContext
processEventsCompleted chan struct{}
kap keepalive.ClientParameters
failFast bool
}

//NewEventsClient Returns a new grpc.ClientConn to the configured local PEER.
func NewEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate, serverhostoverride string, regTimeout time.Duration, adapter consumer.EventAdapter) (fab.EventsClient, error) {
func NewEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate,
serverhostoverride string, regTimeout time.Duration, adapter consumer.EventAdapter,
kap keepalive.ClientParameters, failFast bool) (fab.EventsClient, error) {
var err error
if regTimeout < 100*time.Millisecond {
regTimeout = 100 * time.Millisecond
Expand All @@ -72,13 +78,15 @@ func NewEventsClient(provider fab.ProviderContext, identity fab.IdentityContext,
provider: provider,
identity: identity,
tlsCertHash: ccomm.TLSCertHash(provider.Config()),
kap: kap,
failFast: failFast,
}, err
}

//newEventsClientConnectionWithAddress Returns a new grpc.ClientConn to the configured local PEER.
func newEventsClientConnectionWithAddress(peerAddress string, cert *x509.Certificate, serverHostOverride string, config apiconfig.Config) (*grpc.ClientConn, error) {
func newEventsClientConnectionWithAddress(peerAddress string, cert *x509.Certificate, serverHostOverride string,
config apiconfig.Config, kap keepalive.ClientParameters, failFast bool) (*grpc.ClientConn, error) {
var opts []grpc.DialOption
opts = append(opts, grpc.WithTimeout(config.TimeoutOrDefault(apiconfig.EventHub)))
if urlutil.IsTLSEnabled(peerAddress) {
tlsConfig, err := comm.TLSConfig(cert, serverHostOverride, config)
if err != nil {
Expand All @@ -89,7 +97,15 @@ func newEventsClientConnectionWithAddress(peerAddress string, cert *x509.Certifi
} else {
opts = append(opts, grpc.WithInsecure())
}
conn, err := grpc.Dial(urlutil.ToAddress(peerAddress), opts...)

if kap.Time > 0 {
opts = append(opts, grpc.WithKeepaliveParams(kap))
}
opts = append(opts, grpc.WithDefaultCallOptions(grpc.FailFast(failFast)))

ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, config.TimeoutOrDefault(apiconfig.EventHub))
conn, err := grpc.DialContext(ctx, urlutil.ToAddress(peerAddress), opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -285,7 +301,8 @@ func (ec *eventsClient) processEvents() error {

//Start establishes connection with Event hub and registers interested events with it
func (ec *eventsClient) Start() error {
conn, err := newEventsClientConnectionWithAddress(ec.peerAddress, ec.TLSCertificate, ec.TLSServerHostOverride, ec.provider.Config())
conn, err := newEventsClientConnectionWithAddress(ec.peerAddress, ec.TLSCertificate, ec.TLSServerHostOverride,
ec.provider.Config(), ec.kap, ec.failFast)
if err != nil {
return errors.WithMessage(err, "events connection failed")
}
Expand Down
40 changes: 35 additions & 5 deletions pkg/fabric-client/events/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"time"

"github.com/golang/protobuf/proto"
"github.com/spf13/cast"
"google.golang.org/grpc/keepalive"

"github.com/hyperledger/fabric-sdk-go/api/apiconfig"
fab "github.com/hyperledger/fabric-sdk-go/api/apifabclient"
Expand Down Expand Up @@ -60,18 +62,21 @@ type EventHub struct {
// FabricClient
provider fab.ProviderContext
identity fab.IdentityContext
kap keepalive.ClientParameters
failFast bool
}

// eventClientFactory creates an EventsClient instance
type eventClientFactory interface {
newEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate, serverHostOverride string, regTimeout time.Duration, adapter cnsmr.EventAdapter) (fab.EventsClient, error)
newEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate, serverHostOverride string, regTimeout time.Duration, adapter cnsmr.EventAdapter, kap keepalive.ClientParameters, failFast bool) (fab.EventsClient, error)
}

// consumerClientFactory is the default implementation oif the eventClientFactory
type consumerClientFactory struct{}

func (ccf *consumerClientFactory) newEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate, serverHostOverride string, regTimeout time.Duration, adapter cnsmr.EventAdapter) (fab.EventsClient, error) {
return consumer.NewEventsClient(provider, identity, peerAddress, certificate, serverHostOverride, regTimeout, adapter)
func (ccf *consumerClientFactory) newEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate, serverHostOverride string,
regTimeout time.Duration, adapter cnsmr.EventAdapter, kap keepalive.ClientParameters, failFast bool) (fab.EventsClient, error) {
return consumer.NewEventsClient(provider, identity, peerAddress, certificate, serverHostOverride, regTimeout, adapter, kap, failFast)
}

// Context holds the providers and services needed to create an EventHub.
Expand Down Expand Up @@ -119,10 +124,35 @@ func FromConfig(ctx Context, peerCfg *apiconfig.PeerConfig) (*EventHub, error) {
}

eventHub.peerTLSServerHostOverride = serverHostOverride

eventHub.kap = getKeepAliveOptions(peerCfg)
eventHub.failFast = getFailFast(peerCfg)
return eventHub, nil
}

func getFailFast(peerCfg *apiconfig.PeerConfig) bool {
var failFast = true //the default
if ff, ok := peerCfg.GRPCOptions["fail-fast"].(bool); ok {
failFast = cast.ToBool(ff)
}

return failFast
}

func getKeepAliveOptions(peerCfg *apiconfig.PeerConfig) keepalive.ClientParameters {

var kap keepalive.ClientParameters
if kaTime, ok := peerCfg.GRPCOptions["keep-alive-time"]; ok {
kap.Time = cast.ToDuration(kaTime)
}
if kaTimeout, ok := peerCfg.GRPCOptions["keep-alive-timeout"]; ok {
kap.Timeout = cast.ToDuration(kaTimeout)
}
if kaPermit, ok := peerCfg.GRPCOptions["keep-alive-permit"]; ok {
kap.PermitWithoutStream = cast.ToBool(kaPermit)
}
return kap
}

// SetInterests clears all interests and sets the interests for BLOCK type of events.
func (eventHub *EventHub) SetInterests(block bool) {
eventHub.mtx.Lock()
Expand Down Expand Up @@ -282,7 +312,7 @@ func (eventHub *EventHub) Connect() error {
if eventHub.grpcClient == nil {
eventsClient, _ := eventHub.eventsClientFactory.newEventsClient(eventHub.provider, eventHub.identity,
eventHub.peerAddr, eventHub.peerTLSCertificate, eventHub.peerTLSServerHostOverride,
eventHub.provider.Config().TimeoutOrDefault(apiconfig.EventReg), eventHub)
eventHub.provider.Config().TimeoutOrDefault(apiconfig.EventReg), eventHub, eventHub.kap, eventHub.failFast)
eventHub.grpcClient = eventsClient
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/fabric-client/events/eventmocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/pkg/errors"
"google.golang.org/grpc/keepalive"

fab "github.com/hyperledger/fabric-sdk-go/api/apifabclient"
fcConsumer "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/events/consumer"
Expand Down Expand Up @@ -43,7 +44,8 @@ type mockEventClientFactory struct {
clients []*mockEventClient
}

func (mecf *mockEventClientFactory) newEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate, serverHostOverride string, regTimeout time.Duration, adapter fcConsumer.EventAdapter) (fab.EventsClient, error) {
func (mecf *mockEventClientFactory) newEventsClient(provider fab.ProviderContext, identity fab.IdentityContext, peerAddress string, certificate *x509.Certificate, serverHostOverride string, regTimeout time.Duration,
adapter fcConsumer.EventAdapter, kap keepalive.ClientParameters, failFast bool) (fab.EventsClient, error) {
mec := &mockEventClient{
PeerAddress: peerAddress,
RegTimeout: regTimeout,
Expand Down
26 changes: 22 additions & 4 deletions pkg/fabric-client/orderer/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,22 @@ import (
"github.com/hyperledger/fabric-sdk-go/api/apiconfig"
"github.com/hyperledger/fabric-sdk-go/pkg/config/comm"
"github.com/hyperledger/fabric-sdk-go/pkg/config/urlutil"
"github.com/spf13/cast"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)

// NewOrderer Returns a Orderer instance
// Deprecated: use orderer.New() instead
func NewOrderer(url string, certPath string, serverHostOverride string, config apiconfig.Config) (*Orderer, error) {
func NewOrderer(url string, certPath string, serverHostOverride string, config apiconfig.Config,
kap keepalive.ClientParameters) (*Orderer, error) {
var opts []grpc.DialOption
opts = append(opts, grpc.WithTimeout(config.TimeoutOrDefault(apiconfig.OrdererConnection)))

timeout := config.TimeoutOrDefault(apiconfig.OrdererConnection)
if kap.Time > 0 || kap.Timeout > 0 {
opts = append(opts, grpc.WithKeepaliveParams(kap))
}
if urlutil.IsTLSEnabled(url) {
certConfig := apiconfig.TLSConfig{Path: certPath}
certificate, err := certConfig.TLSCert()
Expand All @@ -36,7 +43,7 @@ func NewOrderer(url string, certPath string, serverHostOverride string, config a
} else {
opts = append(opts, grpc.WithInsecure())
}
return &Orderer{url: urlutil.ToAddress(url), grpcDialOption: opts}, nil
return &Orderer{url: urlutil.ToAddress(url), grpcDialOption: opts, dialTimeout: timeout}, nil
}

// NewOrdererFromConfig returns an Orderer instance constructed from orderer config
Expand All @@ -48,5 +55,16 @@ func NewOrdererFromConfig(ordererCfg *apiconfig.OrdererConfig, config apiconfig.
serverHostOverride = str
}

return NewOrderer(ordererCfg.URL, ordererCfg.TLSCACerts.Path, serverHostOverride, config)
var kap keepalive.ClientParameters
if kaTime, ok := ordererCfg.GRPCOptions["keep-alive-time"]; ok {
kap.Time = cast.ToDuration(kaTime)
}
if kaTimeout, ok := ordererCfg.GRPCOptions["keep-alive-timeout"]; ok {
kap.Timeout = cast.ToDuration(kaTimeout)
}
if kaPermit, ok := ordererCfg.GRPCOptions["keep-alive-permit"]; ok {
kap.PermitWithoutStream = cast.ToBool(kaPermit)
}

return NewOrderer(ordererCfg.URL, ordererCfg.TLSCACerts.Path, serverHostOverride, config, kap)
}
29 changes: 16 additions & 13 deletions pkg/fabric-client/orderer/deprecated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

fab "github.com/hyperledger/fabric-sdk-go/api/apifabclient"
ab "github.com/hyperledger/fabric-sdk-go/internal/github.com/hyperledger/fabric/protos/orderer"
Expand All @@ -23,12 +24,14 @@ import (
"github.com/pkg/errors"
)

var kap keepalive.ClientParameters

func TestDeprecatedSendDeliver(t *testing.T) {
grpcServer := grpc.NewServer()
defer grpcServer.Stop()
mockServer, addr := startMockServer(t, grpcServer)

orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig())
orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig(), kap)
// Test deliver happy path
blocks, errs := orderer.SendDeliver(&fab.SignedEnvelope{})
select {
Expand Down Expand Up @@ -70,7 +73,7 @@ func TestDeprecatedSendDeliver(t *testing.T) {
t.Fatalf("Did not receive block or error from SendDeliver")
}

orderer, _ = NewOrderer(testOrdererURL+"invalid-test", "", "", mocks.NewMockConfig())
orderer, _ = NewOrderer(testOrdererURL+"invalid-test", "", "", mocks.NewMockConfig(), kap)
// Test deliver happy path
blocks, errs = orderer.SendDeliver(&fab.SignedEnvelope{})
select {
Expand All @@ -86,26 +89,26 @@ func TestDeprecatedSendDeliver(t *testing.T) {

func TestDeprecatedNewOrdererWithTLS(t *testing.T) {
//Positive Test case
orderer, err := NewOrderer("grpcs://", "../../../test/fixtures/fabricca/tls/ca/ca_root.pem", "", mocks.NewMockConfigCustomized(true, false, false))
orderer, err := NewOrderer("grpcs://", "../../../test/fixtures/fabricca/tls/ca/ca_root.pem", "", mocks.NewMockConfigCustomized(true, false, false), kap)
if orderer == nil || err != nil {
t.Fatalf("Testing NewOrderer with TLS failed, cause [%s]", err)
}

//Negative Test case
orderer, err = NewOrderer("grpcs://", "", "", mocks.NewMockConfigCustomized(true, false, true))
orderer, err = NewOrderer("grpcs://", "", "", mocks.NewMockConfigCustomized(true, false, true), kap)
if orderer != nil || err == nil {
t.Fatalf("Testing NewOrderer with TLS was supposed to fail")
}
}

func TestDeprecatedNewOrdererWithMutualTLS(t *testing.T) {
//Positive Test case
orderer, err := NewOrderer("grpcs://", "../../../test/fixtures/fabricca/tls/ca/ca_root.pem", "", mocks.NewMockConfigCustomized(true, true, false))
orderer, err := NewOrderer("grpcs://", "../../../test/fixtures/fabricca/tls/ca/ca_root.pem", "", mocks.NewMockConfigCustomized(true, true, false), kap)
if orderer == nil || err != nil {
t.Fatalf("Testing NewOrderer with Mutual TLS failed, cause [%s]", err)
}
//Negative Test case
orderer, err = NewOrderer("grpcs://", "../../../test/fixtures/fabricca/tls/ca/ca_root.pem", "", mocks.NewMockConfigCustomized(true, false, false))
orderer, err = NewOrderer("grpcs://", "../../../test/fixtures/fabricca/tls/ca/ca_root.pem", "", mocks.NewMockConfigCustomized(true, false, false), kap)
if orderer == nil || err != nil {
t.Fatalf("Testing NewOrderer with Mutual TLS failed, cause [%s]", err)
}
Expand All @@ -116,14 +119,14 @@ func TestDeprecatedSendBroadcast(t *testing.T) {
defer grpcServer.Stop()
_, addr := startMockServer(t, grpcServer)

orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig())
orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig(), kap)
_, err := orderer.SendBroadcast(&fab.SignedEnvelope{})

if err != nil {
t.Fatalf("Test SendBroadcast was not supposed to fail")
}

orderer, _ = NewOrderer(testOrdererURL+"Test", "", "", mocks.NewMockConfig())
orderer, _ = NewOrderer(testOrdererURL+"Test", "", "", mocks.NewMockConfig(), kap)
_, err = orderer.SendBroadcast(&fab.SignedEnvelope{})

if err == nil || !strings.HasPrefix(err.Error(), "NewAtomicBroadcastClient") {
Expand All @@ -145,7 +148,7 @@ func TestDeprecatedSendDeliverServerBadResponse(t *testing.T) {
grpcServer := grpc.NewServer()
defer grpcServer.Stop()
addr := startCustomizedMockServer(t, testOrdererURL, grpcServer, &broadcastServer)
orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig())
orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig(), kap)

blocks, errors := orderer.SendDeliver(&fab.SignedEnvelope{})

Expand Down Expand Up @@ -175,7 +178,7 @@ func TestDeprecatedSendDeliverServerSuccessResponse(t *testing.T) {
defer grpcServer.Stop()
addr := startCustomizedMockServer(t, testOrdererURL, grpcServer, &broadcastServer)

orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig())
orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig(), kap)

blocks, errors := orderer.SendDeliver(&fab.SignedEnvelope{})

Expand All @@ -200,7 +203,7 @@ func TestDeprecatedSendDeliverFailure(t *testing.T) {
grpcServer := grpc.NewServer()
defer grpcServer.Stop()
addr := startCustomizedMockServer(t, testOrdererURL, grpcServer, &broadcastServer)
orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig())
orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig(), kap)

blocks, errors := orderer.SendDeliver(&fab.SignedEnvelope{})

Expand All @@ -225,7 +228,7 @@ func TestDeprecatedSendBroadcastServerBadResponse(t *testing.T) {
grpcServer := grpc.NewServer()
defer grpcServer.Stop()
addr := startCustomizedMockServer(t, testOrdererURL, grpcServer, &broadcastServer)
orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig())
orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig(), kap)

_, err := orderer.SendBroadcast(&fab.SignedEnvelope{})

Expand All @@ -247,7 +250,7 @@ func TestDeprecatedSendBroadcastError(t *testing.T) {
grpcServer := grpc.NewServer()
defer grpcServer.Stop()
addr := startCustomizedMockServer(t, testOrdererURL, grpcServer, &broadcastServer)
orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig())
orderer, _ := NewOrderer(addr, "", "", mocks.NewMockConfig(), kap)

status, err := orderer.SendBroadcast(&fab.SignedEnvelope{})

Expand Down
Loading

0 comments on commit ebb750a

Please sign in to comment.