diff --git a/pkg/fab/comm/connection.go b/pkg/fab/comm/connection.go new file mode 100755 index 0000000000..5ecb7b3cb9 --- /dev/null +++ b/pkg/fab/comm/connection.go @@ -0,0 +1,153 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "context" + "sync/atomic" + + "github.com/pkg/errors" + + fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/context" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" + "github.com/hyperledger/fabric-sdk-go/pkg/core/config/comm" + "github.com/hyperledger/fabric-sdk-go/pkg/core/config/urlutil" + "github.com/hyperledger/fabric-sdk-go/pkg/logging" + "github.com/hyperledger/fabric-sdk-go/pkg/options" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +var logger = logging.NewLogger("fabric_sdk_go") + +// StreamProvider creates a GRPC stream +type StreamProvider func(conn *grpc.ClientConn) (grpc.ClientStream, error) + +// GRPCConnection manages the GRPC connection and client stream +type GRPCConnection struct { + channelID string + conn *grpc.ClientConn + stream grpc.ClientStream + context fabcontext.Context + tlsCertHash []byte + done int32 +} + +// NewConnection creates a new connection +func NewConnection(ctx fabcontext.Context, channelID string, streamProvider StreamProvider, url string, opts ...options.Opt) (*GRPCConnection, error) { + if url == "" { + return nil, errors.New("server URL not specified") + } + + params := defaultParams() + options.Apply(params, opts) + + dialOpts, err := newDialOpts(ctx.Config(), url, params) + if err != nil { + return nil, err + } + + grpcctx := context.Background() + grpcctx, cancel := context.WithTimeout(grpcctx, params.connectTimeout) + defer cancel() + + grpcconn, err := grpc.DialContext(grpcctx, urlutil.ToAddress(url), dialOpts...) + if err != nil { + return nil, errors.Wrapf(err, "could not connect to %s", url) + } + + stream, err := streamProvider(grpcconn) + if err != nil { + if err := grpcconn.Close(); err != nil { + logger.Warnf("error closing GRPC connection: %s", err) + } + return nil, errors.Wrapf(err, "could not create stream to %s", url) + } + + if stream == nil { + return nil, errors.New("unexpected nil stream received from provider") + } + + return &GRPCConnection{ + channelID: channelID, + conn: grpcconn, + stream: stream, + context: ctx, + tlsCertHash: comm.TLSCertHash(ctx.Config()), + }, nil +} + +// ChannelID returns the ID of the channel +func (c *GRPCConnection) ChannelID() string { + return c.channelID +} + +// Close closes the connection +func (c *GRPCConnection) Close() { + if !c.setClosed() { + logger.Debugf("Already closed") + return + } + + logger.Debugf("Closing stream....") + if err := c.stream.CloseSend(); err != nil { + logger.Warnf("error closing GRPC stream: %s", err) + } + + logger.Debugf("Closing connection....") + if err := c.conn.Close(); err != nil { + logger.Warnf("error closing GRPC connection: %s", err) + } +} + +// Closed returns true if the connection has been closed +func (c *GRPCConnection) Closed() bool { + return atomic.LoadInt32(&c.done) == 1 +} + +func (c *GRPCConnection) setClosed() bool { + return atomic.CompareAndSwapInt32(&c.done, 0, 1) +} + +// Stream returns the GRPC stream +func (c *GRPCConnection) Stream() grpc.Stream { + return c.stream +} + +// TLSCertHash returns the hash of the TLS cert +func (c *GRPCConnection) TLSCertHash() []byte { + return c.tlsCertHash +} + +// Context returns the context of the client establishing the connection +func (c *GRPCConnection) Context() fabcontext.Context { + return c.context +} + +func newDialOpts(config core.Config, url string, params *params) ([]grpc.DialOption, error) { + var dialOpts []grpc.DialOption + + if params.keepAliveParams.Time > 0 || params.keepAliveParams.Timeout > 0 { + dialOpts = append(dialOpts, grpc.WithKeepaliveParams(params.keepAliveParams)) + } + + dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.FailFast(params.failFast))) + + if urlutil.IsTLSEnabled(url) { + tlsConfig, err := comm.TLSConfig(params.certificate, params.hostOverride, config) + if err != nil { + return nil, err + } + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) + logger.Debugf("Creating a secure connection to [%s] with TLS HostOverride [%s]", url, params.hostOverride) + } else { + logger.Debugf("Creating an insecure connection [%s]", url) + dialOpts = append(dialOpts, grpc.WithInsecure()) + } + + return dialOpts, nil +} diff --git a/pkg/fab/comm/connection_test.go b/pkg/fab/comm/connection_test.go new file mode 100755 index 0000000000..330b77c72f --- /dev/null +++ b/pkg/fab/comm/connection_test.go @@ -0,0 +1,125 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "context" + "fmt" + "net" + "os" + "testing" + "time" + + "google.golang.org/grpc/keepalive" + + fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/context" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" + eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks" + fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +const ( + peerAddress = "localhost:9999" + peerURL = "grpc://" + peerAddress +) + +var testStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) { + return pb.NewDeliverClient(grpcconn).Deliver(context.Background()) +} + +var invalidStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) { + return nil, errors.New("simulated error creating stream") +} + +func TestConnection(t *testing.T) { + channelID := "testchannel" + + context := newMockContext() + + conn, err := NewConnection(context, channelID, testStream, "") + if err == nil { + t.Fatalf("expected error creating new connection with empty URL") + } + conn, err = NewConnection(context, channelID, testStream, "invalidhost:0000", + WithFailFast(true), + WithCertificate(nil), + WithHostOverride(""), + WithKeepAliveParams(keepalive.ClientParameters{}), + WithConnectTimeout(3*time.Second), + ) + if err == nil { + t.Fatalf("expected error creating new connection with invalid URL") + } + conn, err = NewConnection(context, channelID, invalidStream, peerURL) + if err == nil { + t.Fatalf("expected error creating new connection with invalid stream but got none") + } + + conn, err = NewConnection(context, channelID, testStream, peerURL) + if err != nil { + t.Fatalf("error creating new connection: %s", err) + } + if conn.Closed() { + t.Fatalf("expected connection to be open") + } + if conn.ChannelID() != channelID { + t.Fatalf("expected channel ID [%s] but got [%s]", channelID, conn.ChannelID()) + } + if conn.Stream() == nil { + t.Fatalf("got invalid stream") + } + if _, err := context.Identity(); err != nil { + t.Fatalf("error getting identity") + } + + time.Sleep(1 * time.Second) + + conn.Close() + if !conn.Closed() { + t.Fatalf("expected connection to be closed") + } + + // Calling close again should be ignored + conn.Close() +} + +// Use the Deliver server for testing +var testServer *eventmocks.MockEventhubServer + +func TestMain(m *testing.M) { + var opts []grpc.ServerOption + grpcServer := grpc.NewServer(opts...) + + lis, err := net.Listen("tcp", peerAddress) + if err != nil { + panic(fmt.Sprintf("Error starting events listener %s", err)) + } + + testServer = eventmocks.NewMockEventhubServer() + + pb.RegisterEventsServer(grpcServer, testServer) + + go grpcServer.Serve(lis) + + time.Sleep(2 * time.Second) + os.Exit(m.Run()) +} + +func newPeerConfig(peerURL string) *core.PeerConfig { + return &core.PeerConfig{ + URL: peerURL, + GRPCOptions: make(map[string]interface{}), + } +} + +func newMockContext() fabcontext.Context { + return fabmocks.NewMockContext(fabmocks.NewMockUser("test")) +} diff --git a/pkg/fab/comm/connectionopts.go b/pkg/fab/comm/connectionopts.go new file mode 100644 index 0000000000..6fff911ca0 --- /dev/null +++ b/pkg/fab/comm/connectionopts.go @@ -0,0 +1,120 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "crypto/x509" + "time" + + "github.com/hyperledger/fabric-sdk-go/pkg/options" + "google.golang.org/grpc/keepalive" +) + +type params struct { + hostOverride string + certificate *x509.Certificate + keepAliveParams keepalive.ClientParameters + failFast bool + connectTimeout time.Duration +} + +func defaultParams() *params { + return ¶ms{ + failFast: true, + connectTimeout: 3 * time.Second, + } +} + +// WithHostOverride sets the host name that will be used to resolve the TLS certificate +func WithHostOverride(value string) options.Opt { + return func(p options.Params) { + if setter, ok := p.(hostOverrideSetter); ok { + setter.SetHostOverride(value) + } + } +} + +// WithCertificate sets the X509 certificate used for the TLS connection +func WithCertificate(value *x509.Certificate) options.Opt { + return func(p options.Params) { + if setter, ok := p.(certificateSetter); ok { + setter.SetCertificate(value) + } + } +} + +// WithKeepAliveParams sets the GRPC keep-alive parameters +func WithKeepAliveParams(value keepalive.ClientParameters) options.Opt { + return func(p options.Params) { + if setter, ok := p.(keepAliveParamsSetter); ok { + setter.SetKeepAliveParams(value) + } + } +} + +// WithFailFast sets the GRPC fail-fast parameter +func WithFailFast(value bool) options.Opt { + return func(p options.Params) { + if setter, ok := p.(failFastSetter); ok { + setter.SetFailFast(value) + } + } +} + +// WithConnectTimeout sets the GRPC connection timeout +func WithConnectTimeout(value time.Duration) options.Opt { + return func(p options.Params) { + if setter, ok := p.(connectTimeoutSetter); ok { + setter.SetConnectTimeout(value) + } + } +} + +func (p *params) SetHostOverride(value string) { + logger.Debugf("HostOverride: %s", value) + p.hostOverride = value +} + +func (p *params) SetCertificate(value *x509.Certificate) { + logger.Debugf("Certificate: %s", value) + p.certificate = value +} + +func (p *params) SetKeepAliveParams(value keepalive.ClientParameters) { + logger.Debugf("KeepAliveParams: %#v", value) + p.keepAliveParams = value +} + +func (p *params) SetFailFast(value bool) { + logger.Debugf("FailFast: %t", value) + p.failFast = value +} + +func (p *params) SetConnectTimeout(value time.Duration) { + logger.Debugf("ConnectTimeout: %s", value) + p.connectTimeout = value +} + +type hostOverrideSetter interface { + SetHostOverride(value string) +} + +type certificateSetter interface { + SetCertificate(value *x509.Certificate) +} + +type keepAliveParamsSetter interface { + SetKeepAliveParams(value keepalive.ClientParameters) +} + +type failFastSetter interface { + SetFailFast(value bool) +} + +type connectTimeoutSetter interface { + SetConnectTimeout(value time.Duration) +} diff --git a/pkg/fab/events/api/endpoint.go b/pkg/fab/events/api/endpoint.go new file mode 100644 index 0000000000..98235927d9 --- /dev/null +++ b/pkg/fab/events/api/endpoint.go @@ -0,0 +1,20 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package api + +import ( + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" +) + +// EventEndpoint extends a Peer endpoint and provides the +// event URL, which may or may not be the same as the Peer URL +type EventEndpoint interface { + fab.Peer + + // EventURL returns the event URL + EventURL() string +} diff --git a/pkg/fab/events/client/client.go b/pkg/fab/events/client/client.go index ebf2b49396..b6fde622a7 100755 --- a/pkg/fab/events/client/client.go +++ b/pkg/fab/events/client/client.go @@ -314,7 +314,7 @@ func (c *Client) monitorConnection() { logger.Warnf("Reconnect already in progress. Setting state to disconnected") } } else { - logger.Warnf("Event client has disconnected. Terminating: %s", event.Err) + logger.Debugf("Event client has disconnected. Terminating: %s", event.Err) go c.Close() break } diff --git a/pkg/fab/events/endpoint/endpoint.go b/pkg/fab/events/endpoint/endpoint.go new file mode 100644 index 0000000000..ed7cd39fc9 --- /dev/null +++ b/pkg/fab/events/endpoint/endpoint.go @@ -0,0 +1,92 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package endpoint + +import ( + "crypto/x509" + "time" + + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/errors/status" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/peer" + "github.com/spf13/cast" + "google.golang.org/grpc/keepalive" +) + +// EventEndpoint extends a Peer endpoint and provides the +// event URL, which, in the case of Eventhub, is different +// from the peer endpoint +type EventEndpoint struct { + fab.Peer + EvtURL string + HostOverride string + Certificate *x509.Certificate + KeepAliveParams keepalive.ClientParameters + FailFast bool + ConnectTimeout time.Duration +} + +// EventURL returns the event URL +func (e *EventEndpoint) EventURL() string { + return e.EvtURL +} + +// FromPeerConfig creates a new EventEndpoint from the given config +func FromPeerConfig(config core.Config, peerCfg core.NetworkPeer) (*EventEndpoint, error) { + p, err := peer.New(config, peer.FromPeerConfig(&peerCfg)) + if err != nil { + return nil, err + } + + certificate, err := peerCfg.TLSCACerts.TLSCert() + if err != nil { + //Ignore empty cert errors, + errStatus, ok := err.(*status.Status) + if !ok || errStatus.Code != status.EmptyCert.ToInt32() { + return nil, err + } + } + + return &EventEndpoint{ + Peer: p, + EvtURL: peerCfg.EventURL, + HostOverride: getServerNameOverride(peerCfg), + Certificate: certificate, + KeepAliveParams: getKeepAliveOptions(peerCfg), + FailFast: getFailFast(peerCfg), + ConnectTimeout: config.TimeoutOrDefault(core.EventHubConnection), + }, nil +} + +func getServerNameOverride(peerCfg core.NetworkPeer) string { + if str, ok := peerCfg.GRPCOptions["ssl-target-name-override"].(string); ok { + return str + } + return "" +} + +func getFailFast(peerCfg core.NetworkPeer) bool { + if ff, ok := peerCfg.GRPCOptions["fail-fast"].(bool); ok { + return cast.ToBool(ff) + } + return false +} + +func getKeepAliveOptions(peerCfg core.NetworkPeer) keepalive.ClientParameters { + var kap keepalive.ClientParameters + if kaTime, ok := peerCfg.GRPCOptions["keep-alive-time"]; ok { + kap.Time = cast.ToDuration(kaTime) + } + if kaTimeout, ok := peerCfg.GRPCOptions["keep-alive-timeout"]; ok { + kap.Timeout = cast.ToDuration(kaTimeout) + } + if kaPermit, ok := peerCfg.GRPCOptions["keep-alive-permit"]; ok { + kap.PermitWithoutStream = cast.ToBool(kaPermit) + } + return kap +} diff --git a/pkg/fab/events/eventhubclient/connection/connection.go b/pkg/fab/events/eventhubclient/connection/connection.go new file mode 100755 index 0000000000..62f100c619 --- /dev/null +++ b/pkg/fab/events/eventhubclient/connection/connection.go @@ -0,0 +1,136 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package connection + +import ( + "context" + "fmt" + "io" + "time" + + fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/context" + + "google.golang.org/grpc" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" + comm "github.com/hyperledger/fabric-sdk-go/pkg/fab/comm" + clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" + logging "github.com/hyperledger/fabric-sdk-go/pkg/logging" + "github.com/hyperledger/fabric-sdk-go/pkg/options" + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" + "github.com/pkg/errors" +) + +var logger = logging.NewLogger("fabric_sdk_go") + +// EventHubConnection manages the connection and client stream +// to the event hub server +type EventHubConnection struct { + comm.GRPCConnection +} + +// New returns a new Connection to the event hub. +func New(ctx fabcontext.Context, channelID string, url string, opts ...options.Opt) (*EventHubConnection, error) { + if channelID == "" { + return nil, errors.New("channel ID not provided") + } + + connect, err := comm.NewConnection( + ctx, channelID, + func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) { + return pb.NewEventsClient(grpcconn).Chat(context.Background()) + }, + url, opts..., + ) + if err != nil { + return nil, err + } + + return &EventHubConnection{ + GRPCConnection: *connect, + }, nil +} + +// EventHubStream returns the event hub chat client +func (c *EventHubConnection) EventHubStream() pb.Events_ChatClient { + if c.Stream() == nil { + return nil + } + stream, ok := c.Stream().(pb.Events_ChatClient) + if !ok { + panic(fmt.Sprintf("invalid events chat client type %T", c.Stream())) + } + return stream +} + +// Send sends an event to the event hub server +func (c *EventHubConnection) Send(emsg *pb.Event) error { + creator, err := c.Context().Identity() + if err != nil { + return errors.WithMessage(err, "error getting creator identity") + } + + timestamp, err := ptypes.TimestampProto(time.Now()) + if err != nil { + return errors.Wrap(err, "failed to create timestamp") + } + + event := *emsg + event.Creator = creator + event.Timestamp = timestamp + event.TlsCertHash = c.TLSCertHash() + + evtBytes, err := proto.Marshal(&event) + if err != nil { + return err + } + + signature, err := c.Context().SigningManager().Sign(evtBytes, c.Context().PrivateKey()) + if err != nil { + return err + } + + return c.EventHubStream().Send(&pb.SignedEvent{ + EventBytes: evtBytes, + Signature: signature, + }) +} + +// Receive receives events from the event hub server +func (c *EventHubConnection) Receive(eventch chan<- interface{}) { + for { + logger.Debugf("Listening for events...") + if c.EventHubStream() == nil { + logger.Warnf("The stream has closed. Terminating loop.") + break + } + + in, err := c.EventHubStream().Recv() + + if c.Closed() { + logger.Debugf("The connection has closed. Terminating loop.") + break + } + + if err == io.EOF { + // This signifies that the stream has been terminated at the client-side. No need to send an event. + logger.Debugf("Received EOF from stream.") + break + } + + if err != nil { + logger.Errorf("Received error from stream: [%s]. Sending disconnected event.", err) + eventch <- clientdisp.NewDisconnectedEvent(err) + break + } + + logger.Debugf("Got event %#v", in) + eventch <- in + } + logger.Debugf("Exiting stream listener") +} diff --git a/pkg/fab/events/eventhubclient/connection/connection_test.go b/pkg/fab/events/eventhubclient/connection/connection_test.go new file mode 100755 index 0000000000..6501c61079 --- /dev/null +++ b/pkg/fab/events/eventhubclient/connection/connection_test.go @@ -0,0 +1,206 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package connection + +import ( + "fmt" + "net" + "os" + "testing" + "time" + + fabcontext "github.com/hyperledger/fabric-sdk-go/pkg/context" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" + clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" + eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/mocks" + fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +const ( + eventAddress = "localhost:7053" + eventURL = "grpc://" + eventAddress +) + +func TestInvalidConnectionOpts(t *testing.T) { + if _, err := New(newMockContext(), "", eventURL); err == nil { + t.Fatalf("expecting error creating new connection without channel but got none") + } + if _, err := New(newMockContext(), "channelid", "grpcs://invalidhost:7053"); err == nil { + t.Fatalf("expecting error creating new connection with invaid address but got none") + } +} + +func TestConnection(t *testing.T) { + channelID := "mychannel" + conn, err := New(newMockContext(), channelID, eventURL) + if err != nil { + t.Fatalf("error creating new connection: %s", err) + } + + conn.Close() + + // Calling close again should be ignored + conn.Close() +} + +func TestSend(t *testing.T) { + channelID := "mychannel" + conn, err := New(newMockContext(), channelID, eventURL) + if err != nil { + t.Fatalf("error creating new connection: %s", err) + } + + eventch := make(chan interface{}) + + go conn.Receive(eventch) + + emsg := &pb.Event{ + Event: &pb.Event_Register{ + Register: &pb.Register{ + Events: []*pb.Interest{ + &pb.Interest{EventType: pb.EventType_FILTEREDBLOCK}, + }, + }, + }, + } + + t.Logf("Sending register event...") + if err := conn.Send(emsg); err != nil { + t.Fatalf("Error sending register interest event: %s", err) + } + + select { + case e, ok := <-eventch: + if !ok { + t.Fatalf("unexpected closed connection") + } + t.Logf("Got response: %#v", e) + evt, ok := e.(*pb.Event) + if !ok { + t.Fatalf("expected Event but got %T", e) + } + _, ok = evt.Event.(*pb.Event_Register) + if !ok { + t.Fatalf("expected register response but got %T", evt.Event) + } + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for event") + } + + emsg = &pb.Event{ + Event: &pb.Event_Unregister{ + Unregister: &pb.Unregister{ + Events: []*pb.Interest{ + &pb.Interest{EventType: pb.EventType_FILTEREDBLOCK}, + }, + }, + }, + } + + t.Logf("Sending unregister event...") + if err := conn.Send(emsg); err != nil { + t.Fatalf("Error sending unregister interest event: %s", err) + } + + select { + case e, ok := <-eventch: + if !ok { + t.Fatalf("unexpected closed connection") + } + t.Logf("Got response: %#v", e) + evt, ok := e.(*pb.Event) + if !ok { + t.Fatalf("expected Event but got %T", e) + } + _, ok = evt.Event.(*pb.Event_Unregister) + if !ok { + t.Fatalf("expected unregister response but got %T", evt.Event) + } + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for event") + } + + conn.Close() +} + +func TestDisconnected(t *testing.T) { + channelID := "mychannel" + conn, err := New(newMockContext(), channelID, eventURL) + if err != nil { + t.Fatalf("error creating new connection: %s", err) + } + + eventch := make(chan interface{}) + + go conn.Receive(eventch) + + emsg := &pb.Event{ + Event: &pb.Event_Register{ + Register: &pb.Register{ + Events: []*pb.Interest{ + &pb.Interest{EventType: pb.EventType_FILTEREDBLOCK}, + }, + }, + }, + } + + if err := conn.Send(emsg); err != nil { + t.Fatalf("Error sending register interest event: %s", err) + } + + ehServer.Disconnect(errors.New("simulating disconnect")) + + select { + case e, ok := <-eventch: + if !ok { + t.Fatalf("unexpected closed connection") + } + _, ok = e.(*clientdisp.DisconnectedEvent) + if !ok { + t.Fatalf("expected DisconnectedEvent but got %T", e) + } + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for event") + } + + conn.Close() +} + +var ehServer *eventmocks.MockEventhubServer + +func TestMain(m *testing.M) { + var opts []grpc.ServerOption + grpcServer := grpc.NewServer(opts...) + + lis, err := net.Listen("tcp", eventAddress) + if err != nil { + panic(fmt.Sprintf("Error starting events listener %s", err)) + } + + ehServer = eventmocks.NewMockEventhubServer() + + pb.RegisterEventsServer(grpcServer, ehServer) + + go grpcServer.Serve(lis) + + time.Sleep(2 * time.Second) + os.Exit(m.Run()) +} + +func newPeerConfig(eventURL string) *core.PeerConfig { + return &core.PeerConfig{ + EventURL: eventURL, + GRPCOptions: make(map[string]interface{}), + } +} + +func newMockContext() fabcontext.Context { + return fabmocks.NewMockContext(fabmocks.NewMockUser("user1")) +} diff --git a/pkg/fab/events/eventhubclient/dispatcher/dispatcher.go b/pkg/fab/events/eventhubclient/dispatcher/dispatcher.go new file mode 100755 index 0000000000..5085a26caa --- /dev/null +++ b/pkg/fab/events/eventhubclient/dispatcher/dispatcher.go @@ -0,0 +1,195 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package dispatcher + +import ( + "github.com/hyperledger/fabric-sdk-go/pkg/context" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api" + clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" + esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher" + "github.com/hyperledger/fabric-sdk-go/pkg/logging" + "github.com/hyperledger/fabric-sdk-go/pkg/options" + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" + "github.com/pkg/errors" +) + +var logger = logging.NewLogger("fabric_sdk_go") + +type ehConnection interface { + api.Connection + Send(emsg *pb.Event) error +} + +// Dispatcher is responsible for handling all events, including connection and registration events originating from the client, +// and events originating from the event hub server. All events are processed in a single Go routine +// in order to avoid any race conditions. This avoids the need for synchronization. +type Dispatcher struct { + clientdisp.Dispatcher + regInterestsRequest *RegisterInterestsEvent + unregInterestsRequest *UnregisterInterestsEvent +} + +// New creates a new event hub dispatcher +func New(context context.Context, channelID string, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts ...options.Opt) *Dispatcher { + return &Dispatcher{ + Dispatcher: *clientdisp.New(context, channelID, connectionProvider, discoveryService, opts...), + } +} + +// Start starts the dispatcher +func (ed *Dispatcher) Start() error { + ed.registerHandlers() + if err := ed.Dispatcher.Start(); err != nil { + return errors.WithMessage(err, "error starting deliver event dispatcher") + } + return nil +} + +func (ed *Dispatcher) connection() ehConnection { + return ed.Dispatcher.Connection().(ehConnection) +} + +func (ed *Dispatcher) handleRegInterestsEvent(e esdispatcher.Event) { + evt := e.(*RegisterInterestsEvent) + + if ed.Connection() == nil { + logger.Warnf("Unable to register interests since no connection was established.") + return + } + + ed.regInterestsRequest = evt + + emsg := &pb.Event{ + Event: &pb.Event_Register{ + Register: &pb.Register{ + Events: evt.Interests, + }, + }, + } + + if err := ed.connection().Send(emsg); err != nil { + evt.ErrCh <- errors.Wrap(err, "error sending register interests event") + ed.regInterestsRequest = nil + } +} + +func (ed *Dispatcher) handleUnregInterestsEvent(e esdispatcher.Event) { + evt := e.(*UnregisterInterestsEvent) + + if ed.Connection() == nil { + logger.Warnf("Unable to unregister interests since no connection was established.") + return + } + + ed.unregInterestsRequest = evt + + emsg := &pb.Event{ + Event: &pb.Event_Unregister{ + Unregister: &pb.Unregister{ + Events: evt.Interests, + }, + }, + } + + if err := ed.connection().Send(emsg); err != nil { + evt.ErrCh <- errors.Wrap(err, "error sending unregister interests event") + ed.unregInterestsRequest = nil + } +} + +func (ed *Dispatcher) handleRegInterestsResponse(e *pb.Event_Register) { + if ed.regInterestsRequest == nil { + return + } + + if err := validateInterests(e.Register.Events, ed.regInterestsRequest.Interests); err != nil { + logger.Warnf("Error registering interests: %s", err) + if ed.regInterestsRequest.ErrCh != nil { + ed.regInterestsRequest.ErrCh <- errors.Wrap(err, "error registering interests") + } + } else { + // Send back a nil error to indicate that the operation was successful + ed.regInterestsRequest.ErrCh <- nil + } + ed.regInterestsRequest = nil +} + +func (ed *Dispatcher) handleUnregInterestsResponse(e *pb.Event_Unregister) { + if ed.unregInterestsRequest == nil { + return + } + + if err := validateInterests(e.Unregister.Events, ed.unregInterestsRequest.Interests); err != nil { + logger.Warnf("Error unregistering interests: %s", err) + if ed.unregInterestsRequest.ErrCh != nil { + ed.unregInterestsRequest.ErrCh <- errors.Wrap(err, "error unregistering interests") + } + } else { + // Send back a nil error to indicate that the operation was successful + ed.unregInterestsRequest.ErrCh <- nil + } + + ed.unregInterestsRequest = nil +} + +func validateInterests(have []*pb.Interest, want []*pb.Interest) error { + if len(have) != len(want) { + return errors.New("all interests were not registered/unregistered") + } + for _, hi := range have { + found := false + for _, wi := range want { + if hi.EventType == wi.EventType { + found = true + break + } + } + if !found { + return errors.New("all interests were not registered/unregistered") + } + } + return nil +} + +func (ed *Dispatcher) handleEvent(e esdispatcher.Event) { + event := e.(*pb.Event) + + logger.Debugf("Handling event: %#v", event) + + switch evt := event.Event.(type) { + case *pb.Event_Block: + ed.HandleBlock(evt.Block) + case *pb.Event_FilteredBlock: + ed.HandleFilteredBlock(evt.FilteredBlock) + case *pb.Event_Register: + ed.handleRegInterestsResponse(evt) + case *pb.Event_Unregister: + ed.handleUnregInterestsResponse(evt) + default: + logger.Warnf("Unsupported event type: %T", event.Event) + } +} + +func (ed *Dispatcher) handleDisconnectedEvent(e esdispatcher.Event) { + if ed.regInterestsRequest != nil && ed.regInterestsRequest.ErrCh != nil { + // We're in the middle of an interest registration. Send an error response to the caller. + ed.regInterestsRequest.ErrCh <- errors.New("connection terminated") + } + ed.regInterestsRequest = nil + ed.Dispatcher.HandleDisconnectedEvent(e) +} + +func (ed *Dispatcher) registerHandlers() { + // Override Handlers + ed.RegisterHandler(&clientdisp.DisconnectedEvent{}, ed.handleDisconnectedEvent) + + // Register Handlers + ed.RegisterHandler(&RegisterInterestsEvent{}, ed.handleRegInterestsEvent) + ed.RegisterHandler(&UnregisterInterestsEvent{}, ed.handleUnregInterestsEvent) + ed.RegisterHandler(&pb.Event{}, ed.handleEvent) +} diff --git a/pkg/fab/events/eventhubclient/dispatcher/dispatcher_test.go b/pkg/fab/events/eventhubclient/dispatcher/dispatcher_test.go new file mode 100755 index 0000000000..b49ece2a40 --- /dev/null +++ b/pkg/fab/events/eventhubclient/dispatcher/dispatcher_test.go @@ -0,0 +1,440 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package dispatcher + +import ( + "testing" + "time" + + "github.com/hyperledger/fabric-sdk-go/pkg/context" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/core" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" + fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" + + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api" + clientdisp "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" + clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/endpoint" + ehmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/eventhubclient/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/blockfilter" + esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher" + servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks" + "github.com/pkg/errors" +) + +var ( + endpoint1 = newMockEventEndpoint("grpcs://peer1.example.com:7053") + endpoint2 = newMockEventEndpoint("grpcs://peer2.example.com:7053") +) + +func TestRegisterInterests(t *testing.T) { + channelID := "testchannel" + dispatcher := New( + newMockContext(), channelID, + clientmocks.NewProviderFactory().Provider( + ehmocks.NewConnection( + clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)), + ), + ), + clientmocks.NewDiscoveryService(endpoint1, endpoint2), + ) + if err := dispatcher.Start(); err != nil { + t.Fatalf("Error starting dispatcher: %s", err) + } + + dispatcherEventch, err := dispatcher.EventCh() + if err != nil { + t.Fatalf("Error getting event channel from dispatcher: %s", err) + } + + // Connect + errch := make(chan error) + dispatcherEventch <- clientdisp.NewConnectEvent(errch) + + select { + case err := <-errch: + if err != nil { + t.Fatalf("Error connecting: %s", err) + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for connection response") + } + + // Register interests + dispatcherEventch <- NewRegisterInterestsEvent( + []*pb.Interest{ + &pb.Interest{ + EventType: pb.EventType_FILTEREDBLOCK, + }, + }, + errch) + + select { + case err := <-errch: + if err != nil { + t.Fatalf("error registering interests: %s", err) + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for register interests response") + } + + // Unregister interests + dispatcherEventch <- NewUnregisterInterestsEvent( + []*pb.Interest{ + &pb.Interest{ + EventType: pb.EventType_FILTEREDBLOCK, + }, + }, + errch) + + select { + case err := <-errch: + if err != nil { + t.Fatalf("error unregistering interests: %s", err) + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for unregister interests response") + } + + // Disconnect + dispatcherEventch <- clientdisp.NewDisconnectEvent(errch) + select { + case err := <-errch: + if err != nil { + t.Fatalf("Error disconnecting: %s", err) + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for connection response") + } + + // Disconnected + dispatcherEventch <- clientdisp.NewDisconnectedEvent(errors.New("simulating disconnected")) + + time.Sleep(time.Second) + + // Stop the dispatcher + stopResp := make(chan error) + dispatcherEventch <- esdispatcher.NewStopEvent(stopResp) + if err := <-stopResp; err != nil { + t.Fatalf("Error stopping dispatcher: %s", err) + } +} + +func TestRegisterInterestsInvalid(t *testing.T) { + channelID := "testchannel" + dispatcher := New( + newMockContext(), channelID, + clientmocks.NewProviderFactory().Provider( + ehmocks.NewConnection( + clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)), + clientmocks.WithResults( + clientmocks.NewResult(ehmocks.RegInterests, clientmocks.FailResult), + clientmocks.NewResult(ehmocks.UnregInterests, clientmocks.FailResult), + ), + ), + ), + clientmocks.NewDiscoveryService(endpoint1, endpoint2), + ) + if err := dispatcher.Start(); err != nil { + t.Fatalf("Error starting dispatcher: %s", err) + } + + dispatcherEventch, err := dispatcher.EventCh() + if err != nil { + t.Fatalf("Error getting event channel from dispatcher: %s", err) + } + + // Connect + errch := make(chan error) + dispatcherEventch <- clientdisp.NewConnectEvent(errch) + + select { + case err := <-errch: + if err != nil { + t.Fatalf("Error connecting: %s", err) + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for connection response") + } + + // Register interests + dispatcherEventch <- NewRegisterInterestsEvent( + []*pb.Interest{ + &pb.Interest{ + EventType: pb.EventType_FILTEREDBLOCK, + }, + }, + errch) + + select { + case err := <-errch: + if err == nil { + t.Fatalf("expecting error registering interests but got none") + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for register interests response") + } + + // Unregister interests + dispatcherEventch <- NewUnregisterInterestsEvent( + []*pb.Interest{ + &pb.Interest{ + EventType: pb.EventType_FILTEREDBLOCK, + }, + }, + errch) + + select { + case err := <-errch: + if err == nil { + t.Fatalf("expecting error unregistering interests but got none") + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for unregister interests response") + } + + // Disconnect + dispatcherEventch <- clientdisp.NewDisconnectEvent(errch) + select { + case err := <-errch: + if err != nil { + t.Fatalf("Error disconnecting: %s", err) + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for connection response") + } + + // Disconnected + dispatcherEventch <- clientdisp.NewDisconnectedEvent(errors.New("simulating disconnected")) + + time.Sleep(time.Second) + + // Stop the dispatcher + stopResp := make(chan error) + dispatcherEventch <- esdispatcher.NewStopEvent(stopResp) + if err := <-stopResp; err != nil { + t.Fatalf("Error stopping dispatcher: %s", err) + } +} + +func TestTimedOutRegister(t *testing.T) { + channelID := "testchannel" + dispatcher := New( + newMockContext(), channelID, + clientmocks.NewProviderFactory().Provider( + ehmocks.NewConnection( + clientmocks.WithResults( + clientmocks.NewResult(ehmocks.RegInterests, clientmocks.NoOpResult), + ), + clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)), + ), + ), + clientmocks.NewDiscoveryService(endpoint1, endpoint2), + ) + if err := dispatcher.Start(); err != nil { + t.Fatalf("Error starting dispatcher: %s", err) + } + + dispatcherEventch, err := dispatcher.EventCh() + if err != nil { + t.Fatalf("Error getting event channel from dispatcher: %s", err) + } + + // Connect + errch := make(chan error) + dispatcherEventch <- clientdisp.NewConnectEvent(errch) + + select { + case err := <-errch: + if err != nil { + t.Fatalf("Error connecting: %s", err) + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for connection response") + } + + // Register interests + dispatcherEventch <- NewRegisterInterestsEvent( + []*pb.Interest{ + &pb.Interest{ + EventType: pb.EventType_FILTEREDBLOCK, + }, + }, + errch) + + select { + case err := <-errch: + if err == nil { + t.Fatalf("expecting error due to no response from register interests but got none") + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for register interests response") + } + +} + +func TestBlockEvents(t *testing.T) { + channelID := "testchannel" + ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory) + dispatcher := New( + newMockContext(), channelID, + clientmocks.NewProviderFactory().Provider( + ehmocks.NewConnection( + clientmocks.WithLedger(ledger), + ), + ), + clientmocks.NewDiscoveryService(endpoint1, endpoint2), + ) + if err := dispatcher.Start(); err != nil { + t.Fatalf("Error starting dispatcher: %s", err) + } + + dispatcherEventch, err := dispatcher.EventCh() + if err != nil { + t.Fatalf("Error getting event channel from dispatcher: %s", err) + } + + // Connect + errch := make(chan error) + dispatcherEventch <- clientdisp.NewConnectEvent(errch) + + select { + case err := <-errch: + if err != nil { + t.Fatalf("Error connecting: %s", err) + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for connection response") + } + + // Register for block events + eventch := make(chan *fab.BlockEvent, 10) + regch := make(chan fab.Registration) + dispatcherEventch <- esdispatcher.NewRegisterBlockEvent(blockfilter.AcceptAny, eventch, regch, errch) + + var reg fab.Registration + select { + case reg = <-regch: + case err := <-errch: + t.Fatalf("Error registering for block events: %s", err) + } + + // Produce block - this should notify the connection + ledger.NewBlock(channelID) + + select { + case _, ok := <-eventch: + if !ok { + t.Fatalf("unexpected closed channel") + } + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for block event") + } + + // Unregister block events + dispatcherEventch <- esdispatcher.NewUnregisterEvent(reg) + + // Stop + stopResp := make(chan error) + dispatcherEventch <- esdispatcher.NewStopEvent(stopResp) + if err := <-stopResp; err != nil { + t.Fatalf("Error stopping dispatcher: %s", err) + } +} + +func TestFilteredBlockEvents(t *testing.T) { + channelID := "testchannel" + ledger := servicemocks.NewMockLedger(servicemocks.FilteredBlockEventFactory) + dispatcher := New( + newMockContext(), channelID, + clientmocks.NewProviderFactory().Provider( + ehmocks.NewConnection( + clientmocks.WithLedger(ledger), + ), + ), + clientmocks.NewDiscoveryService(endpoint1, endpoint2), + ) + if err := dispatcher.Start(); err != nil { + t.Fatalf("Error starting dispatcher: %s", err) + } + + dispatcherEventch, err := dispatcher.EventCh() + if err != nil { + t.Fatalf("Error getting event channel from dispatcher: %s", err) + } + + // Connect + errch := make(chan error) + + dispatcherEventch <- clientdisp.NewConnectEvent(errch) + + select { + case err := <-errch: + if err != nil { + t.Fatalf("Error connecting: %s", err) + } + case <-time.After(2 * time.Second): + err = errors.New("timeout waiting for connection response") + } + + // Register for filtered block events + eventch := make(chan *fab.FilteredBlockEvent, 10) + regch := make(chan fab.Registration) + + dispatcherEventch <- esdispatcher.NewRegisterFilteredBlockEvent(eventch, regch, errch) + + var reg fab.Registration + select { + case reg = <-regch: + case err := <-errch: + t.Fatalf("Error registering for block events: %s", err) + } + + // Produce filtered block - this should notify the connection + ledger.NewFilteredBlock(channelID) + + select { + case event, ok := <-eventch: + if !ok { + t.Fatalf("unexpected closed channel") + } + if event.FilteredBlock.ChannelId != channelID { + t.Fatalf("expecting channelID [%s] but got [%s]", channelID, event.FilteredBlock.ChannelId) + } + case <-time.After(10 * time.Second): + t.Fatalf("timed out waiting for filtered block event") + } + + // Unregister filtered block events + dispatcherEventch <- esdispatcher.NewUnregisterEvent(reg) + + // Stop + stopResp := make(chan error) + dispatcherEventch <- esdispatcher.NewStopEvent(stopResp) + if err := <-stopResp; err != nil { + t.Fatalf("Error stopping dispatcher: %s", err) + } +} + +func newPeerConfig(peerURL string) *core.PeerConfig { + return &core.PeerConfig{ + URL: peerURL, + GRPCOptions: make(map[string]interface{}), + } +} + +func newMockContext() context.Context { + return fabmocks.NewMockContext(fabmocks.NewMockUser("user1")) +} + +func newMockEventEndpoint(url string) api.EventEndpoint { + return &endpoint.EventEndpoint{ + EvtURL: url, + } +} diff --git a/pkg/fab/events/eventhubclient/dispatcher/events.go b/pkg/fab/events/eventhubclient/dispatcher/events.go new file mode 100755 index 0000000000..b3b3b77846 --- /dev/null +++ b/pkg/fab/events/eventhubclient/dispatcher/events.go @@ -0,0 +1,39 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package dispatcher + +import ( + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" +) + +// RegisterInterestsEvent registers interests with the event hub +type RegisterInterestsEvent struct { + Interests []*pb.Interest + ErrCh chan<- error +} + +// NewRegisterInterestsEvent returns a RegisterInterests event +func NewRegisterInterestsEvent(interests []*pb.Interest, errch chan<- error) *RegisterInterestsEvent { + return &RegisterInterestsEvent{ + Interests: interests, + ErrCh: errch, + } +} + +// UnregisterInterestsEvent unregisters interests with the event hub +type UnregisterInterestsEvent struct { + Interests []*pb.Interest + ErrCh chan<- error +} + +// NewUnregisterInterestsEvent returns an UnregisterInterests event +func NewUnregisterInterestsEvent(interests []*pb.Interest, errch chan<- error) *UnregisterInterestsEvent { + return &UnregisterInterestsEvent{ + Interests: interests, + ErrCh: errch, + } +} diff --git a/pkg/fab/events/eventhubclient/eventhubclient.go b/pkg/fab/events/eventhubclient/eventhubclient.go new file mode 100755 index 0000000000..975bf94d01 --- /dev/null +++ b/pkg/fab/events/eventhubclient/eventhubclient.go @@ -0,0 +1,102 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package eventhubclient + +import ( + "time" + + "github.com/hyperledger/fabric-sdk-go/pkg/context" + + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/eventhubclient/connection" + + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/eventhubclient/dispatcher" + "github.com/hyperledger/fabric-sdk-go/pkg/logging" + "github.com/hyperledger/fabric-sdk-go/pkg/options" + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" + "github.com/pkg/errors" +) + +var logger = logging.NewLogger("fabric_sdk_go") + +var ehConnProvider = func(channelID string, context context.Context, peer fab.Peer) (api.Connection, error) { + eventEndpoint, ok := peer.(api.EventEndpoint) + if !ok { + panic("peer is not an EventEndpoint") + } + + return connection.New( + context, channelID, eventEndpoint.EventURL(), + ) +} + +// Client connects to a peer and receives channel events, such as bock, filtered block, chaincode, and transaction status events. +type Client struct { + client.Client + params + interests []*pb.Interest +} + +// New returns a new block event event hub client +func New(context context.Context, channelID string, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) { + return newClient(context, channelID, ehConnProvider, discoveryService, []*pb.Interest{&pb.Interest{EventType: pb.EventType_BLOCK}}, true, opts...) +} + +// NewFiltered returns a new filtered block event hub client +func NewFiltered(context context.Context, channelID string, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) { + return newClient(context, channelID, ehConnProvider, discoveryService, []*pb.Interest{&pb.Interest{EventType: pb.EventType_FILTEREDBLOCK}}, false, opts...) +} + +func newClient(context context.Context, channelID string, connProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, interests []*pb.Interest, permitBlockEvents bool, opts ...options.Opt) (*Client, error) { + if channelID == "" { + return nil, errors.New("expecting channel ID") + } + + params := defaultParams() + options.Apply(params, opts) + + client := &Client{ + Client: *client.New( + permitBlockEvents, + dispatcher.New(context, channelID, connProvider, discoveryService, opts...), + opts..., + ), + params: *params, + interests: interests, + } + client.SetAfterConnectHandler(client.registerInterests) + + if err := client.Start(); err != nil { + return nil, err + } + + return client, nil +} + +func (c *Client) registerInterests() error { + logger.Debugf("sending register interests request....\n") + + errch := make(chan error) + c.Submit(dispatcher.NewRegisterInterestsEvent(c.interests, errch)) + + var err error + select { + case err = <-errch: + case <-time.After(c.respTimeout): + err = errors.New("timeout waiting for register interests response") + } + + if err != nil { + logger.Errorf("unable to send register interests request: %s\n", err) + return err + } + + logger.Debugf("successfully sent register interests\n") + return nil +} diff --git a/pkg/fab/events/eventhubclient/eventhubclient_test.go b/pkg/fab/events/eventhubclient/eventhubclient_test.go new file mode 100755 index 0000000000..b457a2bc7f --- /dev/null +++ b/pkg/fab/events/eventhubclient/eventhubclient_test.go @@ -0,0 +1,499 @@ +// +build testing + +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package eventhubclient + +import ( + "testing" + "time" + + "github.com/hyperledger/fabric-sdk-go/pkg/context" + "github.com/hyperledger/fabric-sdk-go/pkg/context/api/fab" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/api" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/dispatcher" + clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/endpoint" + ehclientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/eventhubclient/mocks" + ehmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/eventhubclient/mocks" + esdispatcher "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/dispatcher" + servicemocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks" + fabmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/mocks" + "github.com/hyperledger/fabric-sdk-go/pkg/options" + cb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common" + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" + "github.com/pkg/errors" +) + +const ( + initialState client.ConnectionState = -1 +) + +var ( + defaultOpts = []options.Opt{} + + endpoint1 = newMockEventEndpoint("grpcs://peer1.example.com:7053") + endpoint2 = newMockEventEndpoint("grpcs://peer2.example.com:7053") +) + +func TestInvalidOptionsInNewClient(t *testing.T) { + // Filtered Client + if _, err := NewFiltered(newMockContext(), "", clientmocks.NewDiscoveryService(endpoint1, endpoint2)); err == nil { + t.Fatalf("expecting error with no channel ID but got none") + } + // Client + if _, err := New(newMockContext(), "", clientmocks.NewDiscoveryService(endpoint1, endpoint2)); err == nil { + t.Fatalf("expecting error with no channel ID but got none") + } +} + +func TestClientConnect(t *testing.T) { + eventClient, _, err := newClientWithMockConnAndOpts( + newMockContext(), "mychannel", + clientmocks.NewProviderFactory().Provider( + ehclientmocks.NewConnection( + clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)), + )), + filteredClientProvider, + clientmocks.NewDiscoveryService(endpoint1, endpoint2), + defaultOpts, + ) + if err != nil { + t.Fatalf("error creating channel event client: %s", err) + } + if eventClient.ConnectionState() != client.Disconnected { + t.Fatalf("expecting connection state %s but got %s", client.Disconnected, eventClient.ConnectionState()) + } + if err := eventClient.Connect(); err != nil { + t.Fatalf("error connecting: %s", err) + } + time.Sleep(500 * time.Millisecond) + if eventClient.ConnectionState() != client.Connected { + t.Fatalf("expecting connection state %s but got %s", client.Connected, eventClient.ConnectionState()) + } + eventClient.Close() + if eventClient.ConnectionState() != client.Disconnected { + t.Fatalf("expecting connection state %s but got %s", client.Disconnected, eventClient.ConnectionState()) + } + time.Sleep(2 * time.Second) +} + +func TestTimeoutClientConnect(t *testing.T) { + eventClient, _, err := newClientWithMockConnAndOpts( + newMockContext(), "mychannel", + clientmocks.NewProviderFactory().Provider( + ehclientmocks.NewConnection( + clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)), + clientmocks.WithResults( + clientmocks.NewResult(ehmocks.RegInterests, clientmocks.NoOpResult), + ), + )), + filteredClientProvider, + clientmocks.NewDiscoveryService(endpoint1, endpoint2), + defaultOpts, + ) + if err != nil { + t.Fatalf("error creating channel event client: %s", err) + } + if err := eventClient.Connect(); err == nil { + t.Fatalf("expecting error connecting due to timeout registering interests") + } +} + +// TestReconnect tests the ability of the Channel Event Client to retry multiple +// times to connect, and reconnect after it has disconnected. +func TestReconnect(t *testing.T) { + // (1) Connect + // -> should fail to connect on the first and second attempt but succeed on the third attempt + t.Run("#1", func(t *testing.T) { + t.Parallel() + testConnect(t, 3, clientmocks.ConnectedOutcome, + clientmocks.NewConnectResults( + clientmocks.NewConnectResult(clientmocks.ThirdAttempt, clientmocks.SucceedResult), + ), + ) + }) + + // (1) Connect + // -> should fail to connect on the first attempt and no further attempts are to be made + t.Run("#2", func(t *testing.T) { + t.Parallel() + testConnect(t, 1, clientmocks.ErrorOutcome, + clientmocks.NewConnectResults(), + ) + }) + + // (1) Connect + // -> should succeed to connect on the first attempt + // (2) Disconnect + // -> should fail to reconnect on the first and second attempt but succeed on the third attempt + t.Run("#3", func(t *testing.T) { + t.Parallel() + testReconnect(t, true, 3, clientmocks.ReconnectedOutcome, + clientmocks.NewConnectResults( + clientmocks.NewConnectResult(clientmocks.FirstAttempt, clientmocks.SucceedResult), + clientmocks.NewConnectResult(clientmocks.FourthAttempt, clientmocks.SucceedResult), + ), + ) + }) + + // (1) Connect + // -> should succeed to connect on the first attempt + // (2) Disconnect + // -> should fail to reconnect after two attempts and then cleanly disconnect + t.Run("#4", func(t *testing.T) { + t.Parallel() + testReconnect(t, true, 2, clientmocks.ClosedOutcome, + clientmocks.NewConnectResults( + clientmocks.NewConnectResult(clientmocks.FirstAttempt, clientmocks.SucceedResult), + ), + ) + }) + + // (1) Connect + // -> should succeed to connect on the first attempt + // (2) Disconnect + // -> should fail and not attempt to reconnect and then cleanly disconnect + t.Run("#5", func(t *testing.T) { + t.Parallel() + testReconnect(t, false, 0, clientmocks.ClosedOutcome, + clientmocks.NewConnectResults( + clientmocks.NewConnectResult(clientmocks.FirstAttempt, clientmocks.SucceedResult), + ), + ) + }) +} + +// TestReconnectRegistration tests the ability of the Channel Event Client to +// re-establish the existing registrations after reconnecting. +func TestReconnectRegistration(t *testing.T) { + // (1) Connect + // (2) Register for block events + // (3) Register for CC events + // (4) Send a CONFIG_UPDATE block event + // -> should receive one block event + // (5) Send a CC event + // -> should receive one CC event and one block event + // (6) Disconnect + // (7) Save a CONFIG_UPDATE block event to the ledger + // (8) Save a CC event to the ledger + // (9) Should reconnect and receive all events that were + // saved to the ledger while the client was disconnected + t.Run("#1", func(t *testing.T) { + t.Parallel() + testReconnectRegistration( + t, clientmocks.ExpectFiveBlocks, clientmocks.ExpectThreeCC, + clientmocks.NewConnectResults( + clientmocks.NewConnectResult(clientmocks.FirstAttempt, clientmocks.SucceedResult), + clientmocks.NewConnectResult(clientmocks.SecondAttempt, clientmocks.SucceedResult)), + ) + }) +} + +func testConnect(t *testing.T, maxConnectAttempts uint, expectedOutcome clientmocks.Outcome, connAttemptResult clientmocks.ConnectAttemptResults) { + eventClient, _, err := newClientWithMockConnAndOpts( + newMockContext(), "mychannel", + clientmocks.NewProviderFactory().FlakeyProvider( + connAttemptResult, + clientmocks.WithLedger(servicemocks.NewMockLedger(servicemocks.BlockEventFactory)), + clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection { + return ehclientmocks.NewConnection(opts...) + }), + ), + clientProvider, + clientmocks.NewDiscoveryService(endpoint1, endpoint2), + newOpts( + esdispatcher.WithEventConsumerTimeout(time.Second), + client.WithMaxConnectAttempts(maxConnectAttempts), + ), + ) + if err != nil { + t.Fatalf("error creating channel event client: %s", err) + } + + var outcome clientmocks.Outcome + if err := eventClient.Connect(); err != nil { + outcome = clientmocks.ErrorOutcome + } else { + outcome = clientmocks.ConnectedOutcome + defer eventClient.Close() + } + + if outcome != expectedOutcome { + t.Fatalf("Expecting that the reconnection attempt would result in [%s] but got [%s]", expectedOutcome, outcome) + } +} + +func testReconnect(t *testing.T, reconnect bool, maxReconnectAttempts uint, expectedOutcome clientmocks.Outcome, connAttemptResult clientmocks.ConnectAttemptResults) { + cp := clientmocks.NewProviderFactory() + + connectch := make(chan *fab.ConnectionEvent) + ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory) + + eventClient, _, err := newClientWithMockConnAndOpts( + newMockContext(), "mychannel", + cp.FlakeyProvider( + connAttemptResult, + clientmocks.WithLedger(ledger), + clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection { + return ehclientmocks.NewConnection(opts...) + }), + ), + clientProvider, + clientmocks.NewDiscoveryService(endpoint1, endpoint2), + newOpts( + esdispatcher.WithEventConsumerTimeout(3*time.Second), + client.WithReconnect(reconnect), + client.WithReconnectInitialDelay(0), + client.WithMaxConnectAttempts(1), + client.WithMaxReconnectAttempts(maxReconnectAttempts), + client.WithTimeBetweenConnectAttempts(time.Millisecond), + client.WithConnectionEvent(connectch), + client.WithResponseTimeout(2*time.Second), + ), + ) + if err != nil { + t.Fatalf("error creating channel event client: %s", err) + } + if err := eventClient.Connect(); err != nil { + t.Fatalf("error connecting channel event client: %s", err) + } + defer eventClient.Close() + + outcomech := make(chan clientmocks.Outcome) + go listenConnection(t, connectch, outcomech) + + // Test automatic reconnect handling + cp.Connection().ProduceEvent(dispatcher.NewDisconnectedEvent(errors.New("testing reconnect handling"))) + + var outcome clientmocks.Outcome + + select { + case outcome = <-outcomech: + case <-time.After(5 * time.Second): + outcome = clientmocks.TimedOutOutcome + } + + if outcome != expectedOutcome { + t.Fatalf("Expecting that the reconnection attempt would result in [%s] but got [%s]", expectedOutcome, outcome) + } +} + +// testReconnectRegistration tests the scenario when an events client is registered to receive events and the connection to the +// event service is lost. After the connection is re-established, events should once again be received without the caller having to +// re-register for those events. +func testReconnectRegistration(t *testing.T, expectedBlockEvents clientmocks.NumBlock, expectedCCEvents clientmocks.NumChaincode, connectResults clientmocks.ConnectAttemptResults) { + channelID := "mychannel" + ccID := "mycc" + + ledger := servicemocks.NewMockLedger(servicemocks.BlockEventFactory) + cp := clientmocks.NewProviderFactory() + + eventClient, _, err := newClientWithMockConnAndOpts( + newMockContext(), channelID, + cp.FlakeyProvider( + connectResults, + clientmocks.WithLedger(ledger), + clientmocks.WithFactory(func(opts ...clientmocks.Opt) clientmocks.Connection { + return ehclientmocks.NewConnection(opts...) + }), + ), + clientProvider, + clientmocks.NewDiscoveryService(endpoint1, endpoint2), + newOpts( + esdispatcher.WithEventConsumerTimeout(3*time.Second), + client.WithReconnectInitialDelay(0), + client.WithMaxConnectAttempts(1), + client.WithMaxReconnectAttempts(1), + client.WithTimeBetweenConnectAttempts(time.Millisecond), + ), + ) + if err != nil { + t.Fatalf("error creating channel event client: %s", err) + } + if err := eventClient.Connect(); err != nil { + t.Fatalf("error connecting channel event client: %s", err) + } + defer eventClient.Close() + + _, blockch, err := eventClient.RegisterBlockEvent() + if err != nil { + t.Fatalf("error registering for block events: %s", err) + } + + _, ccch, err := eventClient.RegisterChaincodeEvent(ccID, ".*") + if err != nil { + t.Fatalf("error registering for chaincode events: %s", err) + } + + numCh := make(chan clientmocks.Received) + go listenEvents(blockch, ccch, 20*time.Second, numCh, expectedBlockEvents, expectedCCEvents) + + time.Sleep(500 * time.Millisecond) + + numEvents := 0 + numCCEvents := 0 + + // Produce a block event + numEvents++ + ledger.NewBlock(channelID, + servicemocks.NewTransaction("txID", pb.TxValidationCode_VALID, cb.HeaderType_CONFIG_UPDATE), + ) + + // Produce a chaincode event + numEvents++ + numCCEvents++ + ledger.NewBlock(channelID, + servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1"), + ) + + // Wait a while for the subscriber to receive the event + time.Sleep(500 * time.Millisecond) + + // Simulate a connection error + cp.Connection().ProduceEvent(dispatcher.NewDisconnectedEvent(errors.New("testing reconnect handling"))) + + // Wait for the client to reconnect + time.Sleep(2 * time.Second) + + // Produce some more events after the client has reconnected + for ; numCCEvents < int(expectedCCEvents); numCCEvents++ { + numEvents++ + ledger.NewBlock(channelID, + servicemocks.NewTransactionWithCCEvent("txID", pb.TxValidationCode_VALID, ccID, "event1"), + ) + } + for ; numEvents < int(expectedBlockEvents); numEvents++ { + ledger.NewBlock(channelID, + servicemocks.NewTransaction("txID", pb.TxValidationCode_VALID, cb.HeaderType_CONFIG_UPDATE), + ) + } + + var eventsReceived clientmocks.Received + + select { + case received, ok := <-numCh: + if !ok { + t.Fatalf("connection closed prematurely") + } else { + eventsReceived = received + } + case <-time.After(30 * time.Second): + t.Fatalf("timed out waiting for events") + } + + if eventsReceived.NumBlock != expectedBlockEvents { + t.Fatalf("Expecting to receive [%d] block events but received [%d]", expectedBlockEvents, eventsReceived.NumBlock) + } + if eventsReceived.NumChaincode != expectedCCEvents { + t.Fatalf("Expecting to receive [%d] CC events but received [%d]", expectedCCEvents, eventsReceived.NumChaincode) + } +} + +func listenConnection(t *testing.T, eventch chan *fab.ConnectionEvent, outcome chan clientmocks.Outcome) { + state := initialState + + for { + e, ok := <-eventch + t.Logf("Got event [%v] - ok=[%v]", e, ok) + if !ok { + t.Logf("Returning terminated outcome") + outcome <- clientmocks.ClosedOutcome + break + } + if e.Connected { + if state == client.Disconnected { + t.Logf("Returning reconnected outcome") + outcome <- clientmocks.ReconnectedOutcome + } + state = client.Connected + } else { + state = client.Disconnected + } + } +} + +func listenEvents(blockch <-chan *fab.BlockEvent, ccch <-chan *fab.CCEvent, waitDuration time.Duration, numEventsCh chan clientmocks.Received, expectedBlockEvents clientmocks.NumBlock, expectedCCEvents clientmocks.NumChaincode) { + var numBlockEventsReceived clientmocks.NumBlock + var numCCEventsReceived clientmocks.NumChaincode + + for { + select { + case _, ok := <-blockch: + if ok { + numBlockEventsReceived++ + } else { + // The channel was closed by the event client. Make a new channel so + // that we don't get into a tight loop + blockch = make(chan *fab.BlockEvent) + } + case _, ok := <-ccch: + if ok { + numCCEventsReceived++ + } else { + // The channel was closed by the event client. Make a new channel so + // that we don't get into a tight loop + ccch = make(chan *fab.CCEvent) + } + case <-time.After(waitDuration): + numEventsCh <- clientmocks.NewReceived(numBlockEventsReceived, numCCEventsReceived) + return + } + if numBlockEventsReceived >= expectedBlockEvents && numCCEventsReceived >= expectedCCEvents { + numEventsCh <- clientmocks.NewReceived(numBlockEventsReceived, numCCEventsReceived) + return + } + } +} + +type ClientProvider func(context context.Context, channelID string, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) + +var clientProvider = func(context context.Context, channelID string, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) { + return newClient(context, channelID, connectionProvider, discoveryService, []*pb.Interest{&pb.Interest{EventType: pb.EventType_BLOCK}}, true, opts...) +} + +var filteredClientProvider = func(context context.Context, channelID string, connectionProvider api.ConnectionProvider, discoveryService fab.DiscoveryService, opts ...options.Opt) (*Client, error) { + return newClient(context, channelID, connectionProvider, discoveryService, []*pb.Interest{&pb.Interest{EventType: pb.EventType_FILTEREDBLOCK}}, false, opts...) +} + +func newClientWithMockConn(context context.Context, channelID string, clientProvider ClientProvider, connOpts ...clientmocks.Opt) (*Client, clientmocks.Connection, error) { + conn := ehclientmocks.NewConnection(connOpts...) + client, _, err := newClientWithMockConnAndOpts( + context, channelID, + clientmocks.NewProviderFactory().Provider(conn), + clientProvider, + clientmocks.NewDiscoveryService(endpoint1, endpoint2), + defaultOpts, + ) + return client, conn, err +} + +func newClientWithMockConnAndOpts(context context.Context, channelID string, connectionProvider api.ConnectionProvider, clientProvider ClientProvider, discoveryService fab.DiscoveryService, opts []options.Opt, connOpts ...clientmocks.Opt) (*Client, clientmocks.Connection, error) { + var conn *ehclientmocks.MockConnection + if connectionProvider == nil { + conn = ehclientmocks.NewConnection(connOpts...) + connectionProvider = clientmocks.NewProviderFactory().Provider(conn) + } + + client, err := clientProvider(context, channelID, connectionProvider, discoveryService, opts...) + return client, conn, err +} + +func newMockContext() context.Context { + return fabmocks.NewMockContext(fabmocks.NewMockUser("user1")) +} + +func newMockEventEndpoint(url string) api.EventEndpoint { + return &endpoint.EventEndpoint{ + EvtURL: url, + } +} + +func newOpts(opts ...options.Opt) []options.Opt { + return opts +} diff --git a/pkg/fab/events/eventhubclient/mocks/mockconnection.go b/pkg/fab/events/eventhubclient/mocks/mockconnection.go new file mode 100755 index 0000000000..ccc622c69d --- /dev/null +++ b/pkg/fab/events/eventhubclient/mocks/mockconnection.go @@ -0,0 +1,97 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package mocks + +import ( + "fmt" + + clientmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/client/mocks" + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" + "github.com/pkg/errors" +) + +const ( + // RegInterests is the register operation (used in the OperationMap) + RegInterests clientmocks.Operation = "reg-interests" + + // UnregInterests is the unregister operation (used in the OperationMap) + UnregInterests clientmocks.Operation = "unreg-interests" +) + +// MockConnection is a mock event hub connection used for unit testing +type MockConnection struct { + clientmocks.MockConnection +} + +// NewConnection returns a new MockConnection using the given options +func NewConnection(opts ...clientmocks.Opt) *MockConnection { + return &MockConnection{ + MockConnection: *clientmocks.NewMockConnection(opts...), + } +} + +// Send simulates sending register/unregister events to the event hub +func (c *MockConnection) Send(emsg *pb.Event) error { + if c.Closed() { + return errors.New("mock connection is closed") + } + + switch evt := emsg.Event.(type) { + case *pb.Event_Register: + result, exists := c.Result(RegInterests) + if exists { + switch result.Result { + case clientmocks.NoOpResult: + // Don't send a response + return nil + case clientmocks.FailResult: + c.ProduceEvent(newRegInterestsResponse(nil)) + return nil + } + } + c.ProduceEvent(newRegInterestsResponse(evt.Register.Events)) + + case *pb.Event_Unregister: + result, exists := c.Result(UnregInterests) + if exists { + switch result.Result { + case clientmocks.NoOpResult: + // Don't send a response + return nil + case clientmocks.FailResult: + c.ProduceEvent(newUnregInterestsResponse(nil)) + return nil + } + } + c.ProduceEvent(newUnregInterestsResponse(evt.Unregister.Events)) + + default: + panic(fmt.Sprintf("unsupported event type: %T", evt)) + } + + return nil +} + +func newRegInterestsResponse(interests []*pb.Interest) *pb.Event { + return &pb.Event{ + Event: &pb.Event_Register{ + Register: &pb.Register{ + Events: interests, + }, + }, + } +} + +func newUnregInterestsResponse(interests []*pb.Interest) *pb.Event { + return &pb.Event{ + Event: &pb.Event_Unregister{ + Unregister: &pb.Unregister{ + Events: interests, + }, + }, + } +} diff --git a/pkg/fab/events/eventhubclient/mocks/mockevents.go b/pkg/fab/events/eventhubclient/mocks/mockevents.go new file mode 100755 index 0000000000..ba0363e983 --- /dev/null +++ b/pkg/fab/events/eventhubclient/mocks/mockevents.go @@ -0,0 +1,35 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package mocks + +import ( + "github.com/golang/protobuf/ptypes/timestamp" + cb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/common" + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" +) + +// NewBlockEvent returns a new mock block event initialized with the given block +func NewBlockEvent(block *cb.Block) *pb.Event { + return &pb.Event{ + Creator: []byte("some-id"), + Timestamp: ×tamp.Timestamp{Seconds: 1000}, + Event: &pb.Event_Block{ + Block: block, + }, + } +} + +// NewFilteredBlockEvent returns a new mock filtered block event initialized with the given filtered block +func NewFilteredBlockEvent(fblock *pb.FilteredBlock) *pb.Event { + return &pb.Event{ + Creator: []byte("some-id"), + Timestamp: ×tamp.Timestamp{Seconds: 1000}, + Event: &pb.Event_FilteredBlock{ + FilteredBlock: fblock, + }, + } +} diff --git a/pkg/fab/events/eventhubclient/opts.go b/pkg/fab/events/eventhubclient/opts.go new file mode 100755 index 0000000000..8aceaf62df --- /dev/null +++ b/pkg/fab/events/eventhubclient/opts.go @@ -0,0 +1,24 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package eventhubclient + +import "time" + +type params struct { + respTimeout time.Duration +} + +func defaultParams() *params { + return ¶ms{ + respTimeout: 5 * time.Second, + } +} + +func (p *params) SetResponseTimeout(value time.Duration) { + logger.Debugf("ResponseTimeout: %s", value) + p.respTimeout = value +} diff --git a/pkg/fab/events/mocks/mockehserver.go b/pkg/fab/events/mocks/mockehserver.go new file mode 100755 index 0000000000..0e16b63e18 --- /dev/null +++ b/pkg/fab/events/mocks/mockehserver.go @@ -0,0 +1,72 @@ +/* +Copyright SecureKey Technologies Inc. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package mocks + +import ( + "fmt" + "io" + "sync" + + "github.com/golang/protobuf/proto" + pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer" +) + +// MockEventhubServer is a mock event hub server +type MockEventhubServer struct { + sync.RWMutex + disconnErr error +} + +// NewMockEventhubServer returns a new MockEventhubServer +func NewMockEventhubServer() *MockEventhubServer { + return new(MockEventhubServer) +} + +// Disconnect terminates the stream and returns the given error to the client +func (s *MockEventhubServer) Disconnect(err error) { + s.Lock() + defer s.Unlock() + s.disconnErr = err +} + +func (s *MockEventhubServer) disconnectErr() error { + s.RLock() + defer s.RUnlock() + return s.disconnErr +} + +// Chat starts a listener on the given chat stream +func (s *MockEventhubServer) Chat(srv pb.Events_ChatServer) error { + for { + signedEvt, err := srv.Recv() + if err == io.EOF || signedEvt == nil { + break + } + + err = s.disconnectErr() + if err != nil { + return err + } + + var emsg pb.Event + if err := proto.Unmarshal(signedEvt.EventBytes, &emsg); err != nil { + panic(fmt.Sprintf("Error unmarshalling event bytes: %s", err)) + } + + switch emsg.Event.(type) { + case *pb.Event_Register: + // Send back the same event (which is what the event hub server currently does) + srv.Send(&emsg) + case *pb.Event_Unregister: + // Send back the same event (which is what the event hub server currently does) + srv.Send(&emsg) + default: + panic(fmt.Sprintf("Unsupported message type: %T", emsg)) + } + } + return nil +} diff --git a/pkg/fab/events/service/dispatcher/dispatcher.go b/pkg/fab/events/service/dispatcher/dispatcher.go index a4f68819c4..8ca5691d0c 100755 --- a/pkg/fab/events/service/dispatcher/dispatcher.go +++ b/pkg/fab/events/service/dispatcher/dispatcher.go @@ -100,8 +100,6 @@ func (ed *Dispatcher) Start() error { return errors.New("cannot start dispatcher since it's not in its initial state") } - logger.Info("Started event dispatcher") - ed.RegisterHandlers() go func() { @@ -384,7 +382,9 @@ func (ed *Dispatcher) publishFilteredBlockEvents(fblock *pb.FilteredBlock) { logger.Warnf("Filtered block is nil. Event will not be published") return } - logger.Warnf("Publishing filtered block event: %#v", fblock) + + logger.Debugf("Publishing filtered block event: %#v", fblock) + for _, reg := range ed.filteredBlockRegistrations { if ed.eventConsumerTimeout < 0 { select { @@ -496,11 +496,9 @@ func toFilteredBlock(block *cb.Block) *pb.FilteredBlock { continue } channelID = chID - logger.Warnf("setting channel ID [%s]", channelID) filteredTxs = append(filteredTxs, filteredTx) } - logger.Warnf("channel ID is [%s]", channelID) return &pb.FilteredBlock{ ChannelId: channelID, Number: block.Header.Number,