Skip to content

Commit

Permalink
[FAB-3902] RegisterTxEvent to return error code
Browse files Browse the repository at this point in the history
Change-Id: I500d4a793b45bf9ec3bbd580e08e31b3c597076a
Signed-off-by: biljana lukovic <[email protected]>
  • Loading branch information
biljanaLukovic committed May 15, 2017
1 parent 9bc35ce commit eb9b94b
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 11 deletions.
16 changes: 8 additions & 8 deletions fabric-client/events/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type EventHub interface {
Disconnect()
RegisterChaincodeEvent(ccid string, eventname string, callback func(*ChaincodeEvent)) *ChainCodeCBE
UnregisterChaincodeEvent(cbe *ChainCodeCBE)
RegisterTxEvent(txID string, callback func(string, error))
RegisterTxEvent(txID string, callback func(string, pb.TxValidationCode, error))
UnregisterTxEvent(txID string)
RegisterBlockEvent(callback func(*common.Block))
UnregisterBlockEvent(callback func(*common.Block))
Expand All @@ -74,7 +74,7 @@ type eventHub struct {
// Map of clients registered for block events
blockRegistrants []func(*common.Block)
// Map of clients registered for transactional events
txRegistrants map[string]func(string, error)
txRegistrants map[string]func(string, pb.TxValidationCode, error)
// peer addr to connect to
peerAddr string
// peer tls certificate
Expand Down Expand Up @@ -125,7 +125,7 @@ func (ccf *consumerClientFactory) newEventsClient(peerAddress string, certificat
func NewEventHub() EventHub {
chaincodeRegistrants := make(map[string][]*ChainCodeCBE)
blockRegistrants := make([]func(*common.Block), 0)
txRegistrants := make(map[string]func(string, error))
txRegistrants := make(map[string]func(string, pb.TxValidationCode, error))

eventHub := &eventHub{
chaincodeRegistrants: chaincodeRegistrants,
Expand Down Expand Up @@ -395,7 +395,7 @@ func (eventHub *eventHub) UnregisterChaincodeEvent(cbe *ChainCodeCBE) {
* @param {function} callback Function that takes a single parameter which
* is a json object representation of type "message Transaction"
*/
func (eventHub *eventHub) RegisterTxEvent(txID string, callback func(string, error)) {
func (eventHub *eventHub) RegisterTxEvent(txID string, callback func(string, pb.TxValidationCode, error)) {
logger.Debugf("reg txid %s\n", txID)

eventHub.mtx.Lock()
Expand Down Expand Up @@ -424,6 +424,7 @@ func (eventHub *eventHub) txCallback(block *common.Block) {

txFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
for i, v := range block.Data.Data {

if env, err := utils.GetEnvelopeFromBlock(v); err != nil {
logger.Errorf("error extracting Envelope from block: %v\n", err)
return
Expand All @@ -446,10 +447,9 @@ func (eventHub *eventHub) txCallback(block *common.Block) {
callback := eventHub.getTXRegistrant(channelHeader.TxId)
if callback != nil {
if txFilter.IsInvalid(i) {
callback(channelHeader.TxId, fmt.Errorf("Received invalid transaction from channel %s\n", channelHeader.ChannelId))

callback(channelHeader.TxId, txFilter.Flag(i), fmt.Errorf("Received invalid transaction from channel %s", channelHeader.ChannelId))
} else {
callback(channelHeader.TxId, nil)
callback(channelHeader.TxId, txFilter.Flag(i), nil)
}
} else {
logger.Debugf("No callback registered for TxID: %s\n", channelHeader.TxId)
Expand Down Expand Up @@ -487,7 +487,7 @@ func (eventHub *eventHub) getChaincodeRegistrants(chaincodeID string) []*ChainCo
return clone
}

func (eventHub *eventHub) getTXRegistrant(txID string) func(string, error) {
func (eventHub *eventHub) getTXRegistrant(txID string) func(string, pb.TxValidationCode, error) {
eventHub.mtx.RLock()
defer eventHub.mtx.RUnlock()
return eventHub.txRegistrants[txID]
Expand Down
2 changes: 1 addition & 1 deletion fabric-client/events/eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestDeadlock(t *testing.T) {
go flood(eventsPerThread, threads, func() {
transactionID := generateTxID()
received := newCompletionHandler(timeout)
eventHub.RegisterTxEvent(transactionID, func(txID string, err error) {
eventHub.RegisterTxEvent(transactionID, func(txID string, code pb.TxValidationCode, err error) {
txCompletion.done()
received.done()
})
Expand Down
7 changes: 5 additions & 2 deletions fabric-client/helpers/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ limitations under the License.

package helpers

import "github.com/hyperledger/fabric-sdk-go/fabric-client/events"
import (
"github.com/hyperledger/fabric-sdk-go/fabric-client/events"
pb "github.com/hyperledger/fabric/protos/peer"
)

// RegisterTxEvent registers on the given eventhub for the give transaction
// returns a boolean channel which receives true when the event is complete
Expand All @@ -28,7 +31,7 @@ func RegisterTxEvent(txID string, eventHub events.EventHub) (chan bool, chan err
done := make(chan bool)
fail := make(chan error)

eventHub.RegisterTxEvent(txID, func(txId string, err error) {
eventHub.RegisterTxEvent(txID, func(txId string, errorCode pb.TxValidationCode, err error) {
if err != nil {
logger.Debugf("Received error event for txid(%s)\n", txId)
fail <- err
Expand Down
81 changes: 81 additions & 0 deletions test/integration/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
fabricClient "github.com/hyperledger/fabric-sdk-go/fabric-client"
fcUtil "github.com/hyperledger/fabric-sdk-go/fabric-client/helpers"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
)

func TestEvents(t *testing.T) {
Expand Down Expand Up @@ -55,6 +56,7 @@ func TestEvents(t *testing.T) {

testFailedTx(t, testSetup)

testFailedTxErrorCode(t, testSetup)
// Test disconnect event hub
testSetup.EventHub.Disconnect()
if testSetup.EventHub.IsConnected() {
Expand Down Expand Up @@ -109,6 +111,7 @@ func testFailedTx(t *testing.T, testSetup BaseSetupImpl) {
select {
case <-done1:
case <-fail1:
t.Fatalf("Received fail for second invoke")
case <-done2:
t.Fatalf("Received success for second invoke")
case <-fail2:
Expand All @@ -121,6 +124,84 @@ func testFailedTx(t *testing.T, testSetup BaseSetupImpl) {

}

func testFailedTxErrorCode(t *testing.T, testSetup BaseSetupImpl) {

// Arguments for events CC
var args []string
args = append(args, "invoke")
args = append(args, "invoke")
args = append(args, "SEVERE")

tpResponses1, tx1, err := fcUtil.CreateAndSendTransactionProposal(testSetup.Chain, testSetup.ChainCodeID, testSetup.ChainID, args, []fabricClient.Peer{testSetup.Chain.GetPrimaryPeer()}, nil)
if err != nil {
t.Fatalf("CreateAndSendTransactionProposal return error: %v \n", err)
}

tpResponses2, tx2, err := fcUtil.CreateAndSendTransactionProposal(testSetup.Chain, testSetup.ChainCodeID, testSetup.ChainID, args, []fabricClient.Peer{testSetup.Chain.GetPrimaryPeer()}, nil)
if err != nil {
t.Fatalf("CreateAndSendTransactionProposal return error: %v \n", err)
}

done := make(chan bool)
fail := make(chan error)
var errorValidationCode pb.TxValidationCode
testSetup.EventHub.RegisterTxEvent(tx1, func(txId string, errorCode pb.TxValidationCode, err error) {
if err != nil {
errorValidationCode = errorCode
fail <- err
} else {
done <- true
}
})

defer testSetup.EventHub.UnregisterTxEvent(tx1)

done2 := make(chan bool)
fail2 := make(chan error)

testSetup.EventHub.RegisterTxEvent(tx2, func(txId string, errorCode pb.TxValidationCode, err error) {
if err != nil {
errorValidationCode = errorCode
fail2 <- err
} else {
done2 <- true
}
})

defer testSetup.EventHub.UnregisterTxEvent(tx2)

// Test invalid transaction: create 2 invoke requests in quick succession that modify
// the same state variable which should cause one invoke to be invalid
_, err = fcUtil.CreateAndSendTransaction(testSetup.Chain, tpResponses1)
if err != nil {
t.Fatalf("First invoke failed err: %v", err)
}
_, err = fcUtil.CreateAndSendTransaction(testSetup.Chain, tpResponses2)
if err != nil {
t.Fatalf("Second invoke failed err: %v", err)
}

for i := 0; i < 2; i++ {
select {
case <-done:
case <-fail:
t.Fatalf("Received fail for second invoke")
case <-done2:
t.Fatalf("Received success for second invoke")
case <-fail2:
// success
fmt.Println("Received error validation Code ", errorValidationCode)
if errorValidationCode.String() != "MVCC_READ_CONFLICT" {
t.Fatalf("Expected error code MVCC_READ_CONFLICT")
}
return
case <-time.After(time.Second * 30):
t.Fatalf("invoke Didn't receive block event for txid1(%s) or txid1(%s)", tx1, tx2)
}
}

}

func testMultipleBlockEventCallbacks(t *testing.T, testSetup BaseSetupImpl) {

// Arguments for events CC
Expand Down

0 comments on commit eb9b94b

Please sign in to comment.