From 9095d9ec3353fff49ad701b332e4cf511ca19678 Mon Sep 17 00:00:00 2001 From: Saad Karim Date: Thu, 19 Apr 2018 13:26:09 -0400 Subject: [PATCH] [FAB-9591] Make active transactions an object Change-Id: I5e8dd612ae2744da843479ec4d5b7c59e7a1780f Signed-off-by: Matthew Sykes --- core/chaincode/active_transactions.go | 41 +++++++++++++++++ core/chaincode/active_transactions_test.go | 53 ++++++++++++++++++++++ core/chaincode/chaincode_support.go | 5 -- core/chaincode/handler.go | 39 ++++++---------- 4 files changed, 107 insertions(+), 31 deletions(-) create mode 100644 core/chaincode/active_transactions.go create mode 100644 core/chaincode/active_transactions_test.go diff --git a/core/chaincode/active_transactions.go b/core/chaincode/active_transactions.go new file mode 100644 index 00000000000..392c575e2fc --- /dev/null +++ b/core/chaincode/active_transactions.go @@ -0,0 +1,41 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package chaincode + +import "sync" + +func NewTxKey(channelID, txID string) string { return channelID + txID } + +type ActiveTransactions struct { + mutex sync.Mutex + ids map[string]struct{} +} + +func NewActiveTransactions() *ActiveTransactions { + return &ActiveTransactions{ + ids: map[string]struct{}{}, + } +} + +func (a *ActiveTransactions) Add(channelID, txID string) bool { + key := NewTxKey(channelID, txID) + a.mutex.Lock() + defer a.mutex.Unlock() + if _, ok := a.ids[key]; ok { + return false + } + + a.ids[key] = struct{}{} + return true +} + +func (a *ActiveTransactions) Remove(channelID, txID string) { + key := NewTxKey(channelID, txID) + a.mutex.Lock() + delete(a.ids, key) + a.mutex.Unlock() +} diff --git a/core/chaincode/active_transactions_test.go b/core/chaincode/active_transactions_test.go new file mode 100644 index 00000000000..0cd5a4fecf2 --- /dev/null +++ b/core/chaincode/active_transactions_test.go @@ -0,0 +1,53 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package chaincode_test + +import ( + "testing" + + "github.com/hyperledger/fabric/core/chaincode" + "github.com/stretchr/testify/assert" +) + +func TestActiveTransactions(t *testing.T) { + activeTx := chaincode.NewActiveTransactions() + assert.NotNil(t, activeTx) + + // Add unique transactions + ok := activeTx.Add("channel-id", "tx-id") + assert.True(t, ok, "a new transaction should return true") + ok = activeTx.Add("channel-id", "tx-id-2") + assert.True(t, ok, "adding a different transaction id should return true") + ok = activeTx.Add("channel-id-2", "tx-id") + assert.True(t, ok, "adding a different channel-id should return true") + + // Attempt to add a transaction that already exists + ok = activeTx.Add("channel-id", "tx-id") + assert.False(t, ok, "attempting to an existing transaction should return false") + + // Remove existing and make sure the ID can be reused + activeTx.Remove("channel-id", "tx-id") + ok = activeTx.Add("channel-id", "tx-id") + assert.True(t, ok, "using a an id that has been removed should return true") +} + +func TestNewTxKey(t *testing.T) { + tests := []struct { + channelID string + txID string + result string + }{ + {"", "", ""}, + {"", "tx-1", "tx-1"}, + {"chan-1", "", "chan-1"}, + {"chan-1", "tx-1", "chan-1tx-1"}, + } + for _, tc := range tests { + result := chaincode.NewTxKey(tc.channelID, tc.txID) + assert.Equal(t, tc.result, result) + } +} diff --git a/core/chaincode/chaincode_support.go b/core/chaincode/chaincode_support.go index 3cd6d9e6dd8..ccc7b9b04e4 100644 --- a/core/chaincode/chaincode_support.go +++ b/core/chaincode/chaincode_support.go @@ -211,11 +211,6 @@ func (chaincodeSupport *ChaincodeSupport) registerHandler(chaincodehandler *Hand chaincodehandler.registered = true - //now we are ready to receive messages and send back responses - // TODO: Should we move these to the handler constructor? - chaincodehandler.txCtxs = NewTransactionContexts() - chaincodehandler.txidMap = make(map[string]bool) - chaincodeLogger.Debugf("registered handler complete for chaincode %s", key) return nil diff --git a/core/chaincode/handler.go b/core/chaincode/handler.go index c2a9632bb8c..9521d836d93 100644 --- a/core/chaincode/handler.go +++ b/core/chaincode/handler.go @@ -88,11 +88,13 @@ type Handler struct { //chan to pass error in sync and nonsync mode errChan chan error + // Map of tx txid to either invoke tx. Each tx will be // added prior to execute and remove when done execute txCtxs *TransactionContexts - txidMap map[string]bool + // set of active transaction identifiers + activeTransactions *ActiveTransactions //handlers for each state of the handler readyStateHandlers stateHandlers @@ -358,12 +360,14 @@ func HandleChaincodeStream(chaincodeSupport *ChaincodeSupport, ctxt context.Cont func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStream ccintf.ChaincodeStream) *Handler { v := &Handler{ - ChatStream: peerChatStream, - handlerSupport: chaincodeSupport, - state: created, - errChan: make(chan error, 1), - keepalive: chaincodeSupport.keepalive, - userRunsCC: chaincodeSupport.userRunsCC, + ChatStream: peerChatStream, + handlerSupport: chaincodeSupport, + state: created, + errChan: make(chan error, 1), + txCtxs: NewTransactionContexts(), + activeTransactions: NewActiveTransactions(), + keepalive: chaincodeSupport.keepalive, + userRunsCC: chaincodeSupport.userRunsCC, } v.readyStateHandlers = stateHandlers{ @@ -392,28 +396,11 @@ func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStre } func (handler *Handler) createTXIDEntry(channelID, txid string) bool { - if handler.txidMap == nil { - return false - } - handler.Lock() - defer handler.Unlock() - txCtxID := handler.getTxCtxId(channelID, txid) - if handler.txidMap[txCtxID] { - return false - } - handler.txidMap[txCtxID] = true - return handler.txidMap[txCtxID] + return handler.activeTransactions.Add(channelID, txid) } func (handler *Handler) deleteTXIDEntry(channelID, txid string) { - handler.Lock() - defer handler.Unlock() - txCtxID := handler.getTxCtxId(channelID, txid) - if handler.txidMap != nil { - delete(handler.txidMap, txCtxID) - } else { - chaincodeLogger.Warningf("TXID %s not found!", txCtxID) - } + handler.activeTransactions.Remove(channelID, txid) } //sendReady sends READY to chaincode serially (just like REGISTER)