Skip to content

Commit

Permalink
[FAB-6111]Use syncmap in eventhub
Browse files Browse the repository at this point in the history
Change-Id: I3f3b2e58322d3c866cad04a904cf9e7ecdbac2de
Signed-off-by: biljana lukovic <[email protected]>
  • Loading branch information
biljanaLukovic committed Sep 11, 2017
1 parent 308a18d commit 7cdef1d
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 60 deletions.
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 46 additions & 56 deletions pkg/fabric-client/events/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ import (
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
syncmap "golang.org/x/sync/syncmap"
)

var logger = logging.MustGetLogger("fabric_sdk_go")

// EventHub allows a client to listen to event at a peer.
type EventHub struct {
// Protects chaincodeRegistrants, blockRegistrants and txRegistrants
//Used for protecting parts of code from running concurrently
mtx sync.RWMutex
// Map of clients registered for chaincode events
chaincodeRegistrants map[string][]*fab.ChainCodeCBE
// Map of clients registered for block events
chaincodeRegistrants syncmap.Map
// Array of clients registered for block events
blockRegistrants []func(*common.Block)
// Map of clients registered for transactional events
txRegistrants map[string]func(string, pb.TxValidationCode, error)
txRegistrants syncmap.Map
// peer addr to connect to
peerAddr string
// peer tls certificate
Expand Down Expand Up @@ -77,21 +78,14 @@ func NewEventHub(client fab.FabricClient) (*EventHub, error) {
if client == nil {
return nil, fmt.Errorf("Client is nil")
}
chaincodeRegistrants := make(map[string][]*fab.ChainCodeCBE)
txRegistrants := make(map[string]func(string, pb.TxValidationCode, error))

eventHub := EventHub{
chaincodeRegistrants: chaincodeRegistrants,
blockRegistrants: nil,
txRegistrants: txRegistrants,
interestedEvents: nil,
eventsClientFactory: &consumerClientFactory{},
client: client,
blockRegistrants: nil,
interestedEvents: nil,
eventsClientFactory: &consumerClientFactory{},
client: client,
}

// register default transaction callback
eventHub.RegisterBlockEvent(eventHub.txCallback)

return &eventHub, nil
}

Expand Down Expand Up @@ -328,15 +322,17 @@ func (eventHub *EventHub) RegisterChaincodeEvent(ccid string, eventname string,
eventHub.addChaincodeInterest(ccid, eventname)

cbe := fab.ChainCodeCBE{CCID: ccid, EventNameFilter: eventname, CallbackFunc: callback}
cbeArray := eventHub.chaincodeRegistrants[ccid]
if cbeArray == nil && len(cbeArray) <= 0 {
var cbeArray []*fab.ChainCodeCBE

ccRegistrantArray, ok := eventHub.chaincodeRegistrants.Load(ccid)
if !ok {
cbeArray = make([]*fab.ChainCodeCBE, 0)
cbeArray = append(cbeArray, &cbe)
eventHub.chaincodeRegistrants[ccid] = cbeArray
} else {
cbeArray = append(cbeArray, &cbe)
eventHub.chaincodeRegistrants[ccid] = cbeArray
cbeArray = ccRegistrantArray.([]*fab.ChainCodeCBE)
}
cbeArray = append(cbeArray, &cbe)
eventHub.chaincodeRegistrants.Store(ccid, cbeArray)

return &cbe
}

Expand All @@ -348,21 +344,24 @@ func (eventHub *EventHub) UnregisterChaincodeEvent(cbe *fab.ChainCodeCBE) {

eventHub.removeChaincodeInterest(cbe.CCID, cbe.EventNameFilter)

cbeArray := eventHub.chaincodeRegistrants[cbe.CCID]
if len(cbeArray) <= 0 {
logger.Debugf("No event registration for ccid %s \n", cbe.CCID)
return
}
ccRegistrantArray, ok := eventHub.chaincodeRegistrants.Load(cbe.CCID)
if ok {
cbeArray := ccRegistrantArray.([]*fab.ChainCodeCBE)
if len(cbeArray) <= 0 {
logger.Debugf("No event registration for ccid %s \n", cbe.CCID)
return
}

for i, v := range cbeArray {
if v == cbe {
newCbeArray := append(cbeArray[:i], cbeArray[i+1:]...)
if len(newCbeArray) <= 0 {
delete(eventHub.chaincodeRegistrants, cbe.CCID)
} else {
eventHub.chaincodeRegistrants[cbe.CCID] = newCbeArray
for i, v := range cbeArray {
if v == cbe {
newCbeArray := append(cbeArray[:i], cbeArray[i+1:]...)
if len(newCbeArray) <= 0 {
eventHub.chaincodeRegistrants.Delete(cbe.CCID)
} else {
eventHub.chaincodeRegistrants.Store(cbe.CCID, newCbeArray)
}
break
}
break
}
}
}
Expand All @@ -373,18 +372,14 @@ func (eventHub *EventHub) UnregisterChaincodeEvent(cbe *fab.ChainCodeCBE) {
// is a json object representation of type "message Transaction"
func (eventHub *EventHub) RegisterTxEvent(txnID apitxn.TransactionID, callback func(string, pb.TxValidationCode, error)) {
logger.Debugf("reg txid %s\n", txnID.ID)

eventHub.mtx.Lock()
eventHub.txRegistrants[txnID.ID] = callback
eventHub.mtx.Unlock()
eventHub.txRegistrants.Store(txnID.ID, callback)
}

// UnregisterTxEvent unregister transactional event registration.
// txid: transaction id
func (eventHub *EventHub) UnregisterTxEvent(txnID apitxn.TransactionID) {
eventHub.mtx.Lock()
delete(eventHub.txRegistrants, txnID.ID)
eventHub.mtx.Unlock()
logger.Debugf("un-reg txid %s\n", txnID.ID)
eventHub.txRegistrants.Delete(txnID.ID)
}

/**
Expand All @@ -394,7 +389,6 @@ func (eventHub *EventHub) UnregisterTxEvent(txnID apitxn.TransactionID) {
*/
func (eventHub *EventHub) txCallback(block *common.Block) {
logger.Debugf("txCallback block=%v\n", block)

txFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
for i, v := range block.Data.Data {

Expand All @@ -416,7 +410,6 @@ func (eventHub *EventHub) txCallback(block *common.Block) {
logger.Errorf("error extracting ChannelHeader from payload: %v\n", err)
return
}

callback := eventHub.getTXRegistrant(channelHeader.TxId)
if callback != nil {
if txFilter.IsInvalid(i) {
Expand All @@ -434,36 +427,33 @@ func (eventHub *EventHub) txCallback(block *common.Block) {
func (eventHub *EventHub) getBlockRegistrants() []func(*common.Block) {
eventHub.mtx.RLock()
defer eventHub.mtx.RUnlock()

// Return a clone of the array to avoid race conditions
clone := make([]func(*common.Block), len(eventHub.blockRegistrants))
for i, registrant := range eventHub.blockRegistrants {
clone[i] = registrant
}
copy(clone, eventHub.blockRegistrants)
return clone
}

func (eventHub *EventHub) getChaincodeRegistrants(chaincodeID string) []*fab.ChainCodeCBE {
eventHub.mtx.RLock()
defer eventHub.mtx.RUnlock()

registrants, ok := eventHub.chaincodeRegistrants[chaincodeID]
registrants, ok := eventHub.chaincodeRegistrants.Load(chaincodeID)
if !ok {
return nil
}

cbeRegistrants := registrants.([]*fab.ChainCodeCBE)
// Return a clone of the array to avoid race conditions
clone := make([]*fab.ChainCodeCBE, len(registrants))
for i, registrants := range registrants {
clone[i] = registrants
}
clone := make([]*fab.ChainCodeCBE, len(cbeRegistrants))
copy(clone, cbeRegistrants)
return clone
}

func (eventHub *EventHub) getTXRegistrant(txID string) func(string, pb.TxValidationCode, error) {
eventHub.mtx.RLock()
defer eventHub.mtx.RUnlock()
return eventHub.txRegistrants[txID]
v, ok := eventHub.txRegistrants.Load(txID)
if !ok {
return nil
}
return v.(func(string, pb.TxValidationCode, error))
}

// getChainCodeEvents parses block events for chaincode events associated with individual transactions
Expand Down
6 changes: 3 additions & 3 deletions pkg/fabric-client/events/eventhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func TestDeadlock(t *testing.T) {
t.Fatalf("No client")
}

threads := 20
eventsPerThread := 100
threads := 10
eventsPerThread := 200
eventsSent := eventsPerThread * threads

// The test should be done in milliseconds but if there's
// a deadlock then we don't want it to hang
timeout := 30 * time.Second
timeout := 50 * time.Second

// create a flood of TX events
txCompletion := newMultiCompletionHandler(eventsSent, timeout)
Expand Down

0 comments on commit 7cdef1d

Please sign in to comment.