Skip to content

Commit

Permalink
[FAB-3999] EventHub client reconnect fix
Browse files Browse the repository at this point in the history
Modified EventHub Disconnect to perform a synchronous unregister.

Change-Id: I478036a4ed5e5a84a6e76bed5f44bd9e93d838e1
Signed-off-by: Troy Ronda <[email protected]>
  • Loading branch information
troyronda committed May 18, 2017
1 parent 061e0e0 commit d6d3fe0
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 29 deletions.
3 changes: 2 additions & 1 deletion fabric-client/events/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const defaultTimeout = time.Second * 3
type EventsClient interface {
RegisterAsync(ies []*ehpb.Interest) error
UnregisterAsync(ies []*ehpb.Interest) error
Unregister(ies []*ehpb.Interest) error
Recv() (*ehpb.Event, error)
Start() error
Stop() error
Expand Down Expand Up @@ -151,7 +152,7 @@ func (ec *eventsClient) UnregisterAsync(ies []*ehpb.Interest) error {
}

// unregister - unregisters interest in a event
func (ec *eventsClient) unregister(ies []*ehpb.Interest) error {
func (ec *eventsClient) Unregister(ies []*ehpb.Interest) error {
var err error
if err = ec.UnregisterAsync(ies); err != nil {
return err
Expand Down
6 changes: 5 additions & 1 deletion fabric-client/events/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,9 @@ func (eventHub *eventHub) Disconnect() {
}

// Unregister interests with server and stop the stream
eventHub.client.UnregisterAsync(eventHub.interestedEvents)
eventHub.client.Unregister(eventHub.interestedEvents)
eventHub.client.Stop()

eventHub.connected = false
}

Expand All @@ -175,6 +176,8 @@ func (eventHub *eventHub) RegisterBlockEvent(callback func(*common.Block)) {
defer eventHub.mtx.Unlock()

eventHub.blockRegistrants = append(eventHub.blockRegistrants, callback)

// Register interest for blocks (only declare interest once, so do this for the first registrant)
if len(eventHub.blockRegistrants) == 1 {
eventHub.interestedEvents = append(eventHub.interestedEvents, &pb.Interest{EventType: pb.EventType_BLOCK})
}
Expand All @@ -195,6 +198,7 @@ func (eventHub *eventHub) UnregisterBlockEvent(callback func(*common.Block)) {
}
}

// Unregister interest for blocks if there are no more registrants
if len(eventHub.blockRegistrants) < 1 {
blockEventInterest := pb.Interest{EventType: pb.EventType_BLOCK}
eventHub.client.UnregisterAsync([]*pb.Interest{&blockEventInterest})
Expand Down
5 changes: 5 additions & 0 deletions fabric-client/events/eventmocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (mec *mockEventClient) UnregisterAsync(ies []*pb.Interest) error {
return nil
}

// Unregister does not unregister anything anywhere but acts like all is well
func (mec *mockEventClient) Unregister(ies []*pb.Interest) error {
return mec.UnregisterAsync(ies)
}

// Recv will return mock events sent to the event channel. Warning! This might block indefinitely
func (mec *mockEventClient) Recv() (*pb.Event, error) {
event := <-mec.events
Expand Down
12 changes: 10 additions & 2 deletions test/integration/base_test_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ func (setup *BaseSetupImpl) Initialize() error {
return fmt.Errorf("CreateAndJoinChannel return error: %v", err)
}

if err := setup.setupEventHub(); err != nil {
return err
}

setup.Initialized = true

return nil
}

func (setup *BaseSetupImpl) setupEventHub() error {
eventHub, err := getEventHub()
if err != nil {
return err
Expand All @@ -98,8 +108,6 @@ func (setup *BaseSetupImpl) Initialize() error {
}
setup.EventHub = eventHub

setup.Initialized = true

return nil
}

Expand Down
52 changes: 27 additions & 25 deletions test/integration/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ limitations under the License.
package integration

import (
"fmt"
"testing"
"time"

Expand All @@ -31,7 +30,15 @@ import (
)

func TestEvents(t *testing.T) {
testSetup := initializeTests(t)

testFailedTx(t, testSetup)
testFailedTxErrorCode(t, testSetup)
testReconnectEventHub(t, testSetup)
testMultipleBlockEventCallbacks(t, testSetup)
}

func initializeTests(t *testing.T) BaseSetupImpl {
testSetup := BaseSetupImpl{
ConfigFile: "../fixtures/config/config_test.yaml",
ChainID: "testchannel",
Expand All @@ -54,22 +61,7 @@ func TestEvents(t *testing.T) {
t.Fatalf("instantiateCC return error: %v", err)
}

testFailedTx(t, testSetup)

testFailedTxErrorCode(t, testSetup)
// Test disconnect event hub
testSetup.EventHub.Disconnect()
if testSetup.EventHub.IsConnected() {
t.Fatalf("Failed to disconnect event hub")
}

// Reconnect event hub
if err := testSetup.EventHub.Connect(); err != nil {
t.Fatalf("Failed to connect event hub")
}

testMultipleBlockEventCallbacks(t, testSetup)

return testSetup
}

func testFailedTx(t *testing.T, testSetup BaseSetupImpl) {
Expand Down Expand Up @@ -121,11 +113,9 @@ func testFailedTx(t *testing.T, testSetup BaseSetupImpl) {
t.Fatalf("invoke Didn't receive block event for txid1(%s) or txid1(%s)", tx1, tx2)
}
}

}

func testFailedTxErrorCode(t *testing.T, testSetup BaseSetupImpl) {

// Arguments for events CC
var args []string
args = append(args, "invoke")
Expand Down Expand Up @@ -190,7 +180,7 @@ func testFailedTxErrorCode(t *testing.T, testSetup BaseSetupImpl) {
t.Fatalf("Received success for second invoke")
case <-fail2:
// success
fmt.Println("Received error validation Code ", errorValidationCode)
t.Logf("Received error validation code %s", errorValidationCode.String())
if errorValidationCode.String() != "MVCC_READ_CONFLICT" {
t.Fatalf("Expected error code MVCC_READ_CONFLICT")
}
Expand All @@ -202,8 +192,20 @@ func testFailedTxErrorCode(t *testing.T, testSetup BaseSetupImpl) {

}

func testMultipleBlockEventCallbacks(t *testing.T, testSetup BaseSetupImpl) {
func testReconnectEventHub(t *testing.T, testSetup BaseSetupImpl) {
// Test disconnect event hub
testSetup.EventHub.Disconnect()
if testSetup.EventHub.IsConnected() {
t.Fatalf("Failed to disconnect event hub")
}

// Reconnect event hub
if err := testSetup.EventHub.Connect(); err != nil {
t.Fatalf("Failed to connect event hub")
}
}

func testMultipleBlockEventCallbacks(t *testing.T, testSetup BaseSetupImpl) {
// Arguments for events CC
var args []string
args = append(args, "invoke")
Expand All @@ -213,13 +215,13 @@ func testMultipleBlockEventCallbacks(t *testing.T, testSetup BaseSetupImpl) {
// Create and register test callback that will be invoked upon block event
test := make(chan bool)
testSetup.EventHub.RegisterBlockEvent(func(block *common.Block) {
fmt.Println("Invoked test callback on block event")
t.Logf("Received test callback on block event")
test <- true
})

tpResponses, tx, err := fcUtil.CreateAndSendTransactionProposal(testSetup.Chain, testSetup.ChainCodeID, testSetup.ChainID, args, []fabricClient.Peer{testSetup.Chain.GetPrimaryPeer()}, nil)
if err != nil {
t.Fatalf("CreateAndSendTransactionProposal return error: %v \n", err)
t.Fatalf("CreateAndSendTransactionProposal returned error: %v \n", err)
}

// Register tx for commit/block event(s)
Expand All @@ -228,7 +230,7 @@ func testMultipleBlockEventCallbacks(t *testing.T, testSetup BaseSetupImpl) {

_, err = fcUtil.CreateAndSendTransaction(testSetup.Chain, tpResponses)
if err != nil {
t.Fatalf("First invoke failed err: %v", err)
t.Fatalf("CreateAndSendTransaction failed with error: %v", err)
}

for i := 0; i < 2; i++ {
Expand All @@ -237,7 +239,7 @@ func testMultipleBlockEventCallbacks(t *testing.T, testSetup BaseSetupImpl) {
case <-fail:
case <-test:
case <-time.After(time.Second * 30):
t.Fatalf("invoke Didn't receive test callback event")
t.Fatalf("Didn't receive test callback event")
}
}

Expand Down

0 comments on commit d6d3fe0

Please sign in to comment.