Skip to content

Commit

Permalink
[FAB-9810] Replace maps with switch, remove lock
Browse files Browse the repository at this point in the history
Change-Id: I3d8137b3ebaa63ca6306c1b1e5d9496f2eba215d
Signed-off-by: Matthew Sykes <[email protected]>
  • Loading branch information
sykesm committed May 2, 2018
1 parent 816ef69 commit de11826
Showing 1 changed file with 78 additions and 81 deletions.
159 changes: 78 additions & 81 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const (

var chaincodeLogger = flogging.MustGetLogger("chaincode")

type stateHandlers map[pb.ChaincodeMessage_Type]func(*pb.ChaincodeMessage)

// ACLProvider is responsible for performing access control checks when invoking
// chaincode.
type ACLProvider interface {
Expand All @@ -60,7 +58,6 @@ type handlerSupport interface {

// Handler responsible for management of Peer's side of chaincode stream
type Handler struct {
sync.Mutex
//peer to shim grpc serializer. User only in serialSend
serialLock sync.Mutex
ChatStream ccintf.ChaincodeStream
Expand All @@ -83,16 +80,84 @@ type Handler struct {
// set of active transaction identifiers
activeTransactions *ActiveTransactions

//handlers for each state of the handler
readyStateHandlers stateHandlers
createStateHandlers stateHandlers

keepalive time.Duration

registry Registry
aclProvider ACLProvider
}

func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStream ccintf.ChaincodeStream, sccp sysccprovider.SystemChaincodeProvider) *Handler {
return &Handler{
ChatStream: peerChatStream,
handlerSupport: chaincodeSupport,
state: created,
errChan: make(chan error, 1),
txCtxs: NewTransactionContexts(),
activeTransactions: NewActiveTransactions(),
keepalive: chaincodeSupport.Keepalive,
aclProvider: chaincodeSupport.ACLProvider,
registry: chaincodeSupport.HandlerRegistry,
lifecycle: &Lifecycle{Executor: chaincodeSupport},
sccp: sccp,
}
}

// HandleMessage is the entry point Chaincode messages.
func (h *Handler) HandleMessage(msg *pb.ChaincodeMessage) error {
chaincodeLogger.Debugf("[%s]Fabric side Handling ChaincodeMessage of type: %s in state %s", shorttxid(msg.Txid), msg.Type, h.state)

switch h.state {
case created:
return h.handleMessageCreatedState(msg)
case ready:
return h.handleMessageReadyState(msg)
default:
return errors.Errorf("handle message: invalid state %s for trnansaction %s", h.state, msg.Txid)
}
}

func (h *Handler) handleMessageCreatedState(msg *pb.ChaincodeMessage) error {
switch msg.Type {
case pb.ChaincodeMessage_REGISTER:
h.handleRegister(msg)
default:
return fmt.Errorf("[%s]Fabric side handler cannot handle message (%s) while in created state", msg.Txid, msg.Type)
}
return nil
}

func (h *Handler) handleMessageReadyState(msg *pb.ChaincodeMessage) error {
switch msg.Type {
case pb.ChaincodeMessage_COMPLETED, pb.ChaincodeMessage_ERROR:
h.notify(msg)

case pb.ChaincodeMessage_PUT_STATE:
h.handleModState(msg)
case pb.ChaincodeMessage_DEL_STATE:
h.handleModState(msg)
case pb.ChaincodeMessage_INVOKE_CHAINCODE:
h.handleModState(msg)

case pb.ChaincodeMessage_GET_STATE:
h.handleGetState(msg)
case pb.ChaincodeMessage_GET_STATE_BY_RANGE:
h.handleGetStateByRange(msg)
case pb.ChaincodeMessage_GET_QUERY_RESULT:
h.handleGetQueryResult(msg)
case pb.ChaincodeMessage_GET_HISTORY_FOR_KEY:
h.handleGetHistoryForKey(msg)
case pb.ChaincodeMessage_QUERY_STATE_NEXT:
h.handleQueryStateNext(msg)
case pb.ChaincodeMessage_QUERY_STATE_CLOSE:
h.handleQueryStateClose(msg)

default:
return fmt.Errorf("[%s]Fabric side handler cannot handle message (%s) while in ready state", msg.Txid, msg.Type)
}

return nil
}

func shorttxid(txid string) string {
if len(txid) < 8 {
return txid
Expand Down Expand Up @@ -306,7 +371,7 @@ func (h *Handler) processStream() error {
continue
}

err = h.handleMessage(in)
err = h.HandleMessage(in)
if err != nil {
err = errors.WithMessage(err, "error handling message, ending stream")
chaincodeLogger.Errorf("[%s] %+v", shorttxid(in.Txid), err)
Expand All @@ -323,46 +388,6 @@ func HandleChaincodeStream(chaincodeSupport *ChaincodeSupport, ctxt context.Cont
return h.processStream()
}

func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStream ccintf.ChaincodeStream, sccp sysccprovider.SystemChaincodeProvider) *Handler {
v := &Handler{
ChatStream: peerChatStream,
handlerSupport: chaincodeSupport,
state: created,
errChan: make(chan error, 1),
txCtxs: NewTransactionContexts(),
activeTransactions: NewActiveTransactions(),
keepalive: chaincodeSupport.Keepalive,
aclProvider: chaincodeSupport.ACLProvider,
registry: chaincodeSupport.HandlerRegistry,
lifecycle: &Lifecycle{Executor: chaincodeSupport},
sccp: sccp,
}

v.readyStateHandlers = stateHandlers{
//events from CC at the end of a TX that require notification
pb.ChaincodeMessage_COMPLETED: v.notify,
pb.ChaincodeMessage_ERROR: v.notify,

//state requests from CC that require processing
pb.ChaincodeMessage_GET_STATE: v.handleGetState,
pb.ChaincodeMessage_GET_STATE_BY_RANGE: v.handleGetStateByRange,
pb.ChaincodeMessage_GET_QUERY_RESULT: v.handleGetQueryResult,
pb.ChaincodeMessage_GET_HISTORY_FOR_KEY: v.handleGetHistoryForKey,
pb.ChaincodeMessage_QUERY_STATE_NEXT: v.handleQueryStateNext,
pb.ChaincodeMessage_QUERY_STATE_CLOSE: v.handleQueryStateClose,
pb.ChaincodeMessage_PUT_STATE: v.handleModState,
pb.ChaincodeMessage_DEL_STATE: v.handleModState,
pb.ChaincodeMessage_INVOKE_CHAINCODE: v.handleModState,
}

v.createStateHandlers = stateHandlers{
//move from created to established via REGISTER
pb.ChaincodeMessage_REGISTER: v.handleRegister,
}

return v
}

func (h *Handler) createTXIDEntry(channelID, txid string) bool {
return h.activeTransactions.Add(channelID, txid)
}
Expand Down Expand Up @@ -439,18 +464,15 @@ func (h *Handler) handleRegister(msg *pb.ChaincodeMessage) {
}

func (h *Handler) notify(msg *pb.ChaincodeMessage) {
h.Lock()
defer h.Unlock()
tctx := h.txCtxs.Get(msg.ChannelId, msg.Txid)
if tctx == nil {
chaincodeLogger.Debugf("notifier Txid:%s, channelID:%s does not exist for handleing message %s", msg.Txid, msg.ChannelId, msg.Type)
} else {
chaincodeLogger.Debugf("[%s]notifying Txid:%s, channelID:%s", shorttxid(msg.Txid), msg.Txid, msg.ChannelId)
tctx.responseNotifier <- msg

// clean up queryIteratorMap
tctx.CloseQueryIterators()
return
}

chaincodeLogger.Debugf("[%s]notifying Txid:%s, channelID:%s", shorttxid(msg.Txid), msg.Txid, msg.ChannelId)
tctx.responseNotifier <- msg
tctx.CloseQueryIterators()
}

// is this a txid for which there is a valid txsim
Expand Down Expand Up @@ -1172,31 +1194,6 @@ func (h *Handler) setChaincodeProposal(signedProp *pb.SignedProposal, prop *pb.P
return nil
}

// handleMessage is the entrance method for Peer's handling of Chaincode messages.
func (h *Handler) handleMessage(msg *pb.ChaincodeMessage) error {
chaincodeLogger.Debugf("[%s]Fabric side Handling ChaincodeMessage of type: %s in state %s", shorttxid(msg.Txid), msg.Type, h.state)

var hFn func(*pb.ChaincodeMessage)
switch h.state {
case created:
//chaincode connects and puts into established
hFn = h.createStateHandlers[msg.Type]
case ready:
//chaincode state requests handled in ready
hFn = h.readyStateHandlers[msg.Type]
default:
return fmt.Errorf("[%s]handleMessage-invalid state %s", msg.Txid, h.state)
}

if hFn == nil {
return fmt.Errorf("[%s]Fabric side handler cannot handle message (%s) while in state: %s", msg.Txid, msg.Type, h.state)
}

hFn(msg)

return nil
}

func (h *Handler) Execute(ctxt context.Context, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error) {
chaincodeLogger.Debugf("Entry")
defer chaincodeLogger.Debugf("Exit")
Expand Down

0 comments on commit de11826

Please sign in to comment.