Skip to content
This repository has been archived by the owner on Mar 27, 2024. It is now read-only.

Commit

Permalink
Merge pull request #321 from rolsonquadras/issue-276
Browse files Browse the repository at this point in the history
refactor: Service Event - Update to Register/Unregister APIs
  • Loading branch information
George Aristy authored Sep 23, 2019
2 parents ac2e26b + e159a82 commit bd040c0
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 205 deletions.
23 changes: 20 additions & 3 deletions pkg/didcomm/dispatcher/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ type DIDCommMsg struct {
// Outbound indicates the direction of this DIDComm message:
// - outgoing (to another agent)
// - incoming (from another agent)
Outbound bool
Type string
Payload []byte
Outbound bool
Type string
Payload []byte
//TODO : might need refactor as per the issue-226
OutboundDestination *Destination
}
Expand All @@ -50,3 +50,20 @@ type Provider interface {

// OutboundCreator method to create new outbound dispatcher service
type OutboundCreator func(prov Provider) (Outbound, error)

// DIDCommAction message type to pass events in go channels.
type DIDCommAction struct {
// DIDComm message
Message DIDCommMsg
// Callback function to be called by the consumer for further processing the message.
Callback Callback
}

// DIDCommCallback message type to pass service callback in go channels.
type DIDCommCallback struct {
// Set the value in case of any error while processing the DIDComm message event by the consumer.
Err error
}

// Callback type to pass service callbacks.
type Callback func(DIDCommCallback)
26 changes: 0 additions & 26 deletions pkg/didcomm/event/event.go

This file was deleted.

209 changes: 104 additions & 105 deletions pkg/didcomm/protocol/didexchange/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/hyperledger/aries-framework-go/pkg/common/log"
"github.com/hyperledger/aries-framework-go/pkg/common/metadata"
"github.com/hyperledger/aries-framework-go/pkg/didcomm/dispatcher"
"github.com/hyperledger/aries-framework-go/pkg/didcomm/event"
"github.com/hyperledger/aries-framework-go/pkg/didcomm/protocol/decorator"
"github.com/hyperledger/aries-framework-go/pkg/storage"
"github.com/hyperledger/aries-framework-go/pkg/wallet"
Expand All @@ -30,7 +29,7 @@ var logger = log.New("aries-framework/didexchange")
// didCommChMessage type to correlate actionEvent message(go channel) with callback message(internal go channel).
type didCommChMessage struct {
ID string
DIDCommCallback event.DIDCommCallback
DIDCommCallback dispatcher.DIDCommCallback
}

const (
Expand Down Expand Up @@ -70,11 +69,10 @@ type Service struct {
ctx context
store storage.Store
callbackChannel chan didCommChMessage
actionEvent chan<- event.DIDCommEvent
statusEvents []chan<- dispatcher.DIDCommMsg
actionEvent chan<- dispatcher.DIDCommAction
msgEvents []chan<- dispatcher.DIDCommMsg
lock sync.RWMutex
statusEventLock sync.RWMutex
execute bool
msgEventLock sync.RWMutex
}

type context struct {
Expand All @@ -91,9 +89,6 @@ func New(store storage.Store, prov provider) *Service {
store: store,
// TODO channel size - https://github.com/hyperledger/aries-framework-go/issues/246
callbackChannel: make(chan didCommChMessage, 10),
// set execute to false. Consumers have to enable this by setting either RegisterEvent() or
// RegisterAutoExecute()
execute: false,
}

svc.startInternalListener()
Expand All @@ -103,12 +98,15 @@ func New(store storage.Store, prov provider) *Service {

// Handle didexchange msg
func (s *Service) Handle(msg dispatcher.DIDCommMsg) error {
// throw error if there are no action events are registered or auto execute set (if it's not outbound)
// trigger message events
s.sendMsgEvents(msg)

// throw error if there is no action event registered for inbound messages
s.lock.RLock()
execute := s.execute
aEvent := s.actionEvent
s.lock.RUnlock()

if !msg.Outbound && !execute {
if !msg.Outbound && aEvent == nil {
return errors.New("no clients are registered to handle the message")
}

Expand All @@ -128,90 +126,70 @@ func (s *Service) Handle(msg dispatcher.DIDCommMsg) error {
return fmt.Errorf("invalid state transition: %s -> %s", current.Name(), next.Name())
}

eventsTriggered := false
if !msg.Outbound {
// trigger the actionEvent if the message type is registered (if it's not outbound)
eventsTriggered, err = s.sendEvent(msg, thid, next)
// trigger action event based on message type for inbound messages
if !msg.Outbound && canTriggerActionEvents(msg.Type) {
err = s.sendActionEvent(msg, aEvent, thid, next)
if err != nil {
return fmt.Errorf("send events failed: %w", err)
}
return nil
}

// if no events are trigger continue the execution
if !eventsTriggered {
return s.handle(&message{Msg: msg, ThreadID: thid, NextStateName: next.Name()})
}

return nil
// if no action event is triggered, continue the execution
return s.handle(&message{Msg: msg, ThreadID: thid, NextStateName: next.Name()})
}

// RegisterEvent on DID Exchange protocol messages. The events are triggered for incoming ConnectionRequest,
// ConnectionResponse or ConnectionAck message types. The consumer need to invoke the callback to resume processing.
// Only one channel can be registered for the action events. If called multiple times, the events will be sent to the
// last channel. This works in conjunction with RegisterAutoExecute(). If this function is called after
// RegisterAutoExecute(), the service will not execute the processing automatically and it'll wait for the callback. If
// both are not set, then service will not handle the messages and throw an error.
func (s *Service) RegisterEvent(ch chan<- event.DIDCommEvent) error {
// RegisterActionEvent on DID Exchange protocol messages. The events are triggered for incoming message types based on
// canTriggerActionEvents() function. The consumer need to invoke the callback to resume processing.
// Only one channel can be registered for the action events. The function will throw error if a channel is already
// registered. The AutoExecuteActionEvent() function can be used to automatically trigger callback function for the
// event.
func (s *Service) RegisterActionEvent(ch chan<- dispatcher.DIDCommAction) error {
s.lock.Lock()
s.actionEvent = ch
s.execute = true
s.lock.Unlock()
defer s.lock.Unlock()

return nil
}

// UnregisterEvent on DID Exchange protocol messages. Refer RegisterEvent().
func (s *Service) UnregisterEvent() error {
return s.disableExecute()
}

// RegisterMsg on DID Exchange protocol messages. The events are triggered for incoming ConnectionRequest,
// ConnectionResponse or ConnectionAck message types. The Callback is set to nil in the actionEvent message and service
// won't be expecting any response from the consumers.
func (s *Service) RegisterMsg(ch chan<- dispatcher.DIDCommMsg) error {
s.statusEventLock.Lock()
s.statusEvents = append(s.statusEvents, ch)
s.statusEventLock.Unlock()
if s.actionEvent != nil {
return errors.New("channel is already registered for the action event")
}

s.actionEvent = ch
return nil
}

// UnregisterMsg on DID Exchange protocol messages. Refer RegisterMsg().
func (s *Service) UnregisterMsg(ch chan<- dispatcher.DIDCommMsg) error {
s.statusEventLock.Lock()
for i := 0; i < len(s.statusEvents); i++ {
if s.statusEvents[i] == ch {
s.statusEvents = append(s.statusEvents[:i], s.statusEvents[i+1:]...)
i--
}
// UnregisterActionEvent on DID Exchange protocol messages. Refer RegisterActionEvent().
func (s *Service) UnregisterActionEvent(ch chan<- dispatcher.DIDCommAction) error {
s.lock.Lock()
defer s.lock.Unlock()

if s.actionEvent != ch {
return errors.New("invalid channel passed to unregister the action event")
}
s.statusEventLock.Unlock()

s.actionEvent = nil

return nil
}

// RegisterAutoExecute on DID Exchange protocol messages. When this function is called, the service will auto execute
// the workflow. This works in conjunction with RegisterEvent(). If this function is called after
// RegisterEvent(), the service will execute the processing automatically and no action events will be triggered.
// If both are not set, then service will not handle the messages and throw an error.
func (s *Service) RegisterAutoExecute() error {
s.lock.Lock()
s.execute = true
s.lock.Unlock()
// RegisterMsgEvent on DID Exchange protocol messages. The message events are triggered for incoming messages. Service
// will not expect any callback on these events unlike Action events.
func (s *Service) RegisterMsgEvent(ch chan<- dispatcher.DIDCommMsg) error {
s.msgEventLock.Lock()
s.msgEvents = append(s.msgEvents, ch)
s.msgEventLock.Unlock()

return nil
}

// UnregisterAutoExecute on DID Exchange protocol messages. Refer RegisterAutoExecute().
func (s *Service) UnregisterAutoExecute() error {
return s.disableExecute()
}

func (s *Service) disableExecute() error {
s.lock.Lock()
s.actionEvent = nil
s.execute = false
s.lock.Unlock()
// UnregisterMsgEvent on DID Exchange protocol messages. Refer RegisterMsgEvent().
func (s *Service) UnregisterMsgEvent(ch chan<- dispatcher.DIDCommMsg) error {
s.msgEventLock.Lock()
for i := 0; i < len(s.msgEvents); i++ {
if s.msgEvents[i] == ch {
s.msgEvents = append(s.msgEvents[:i], s.msgEvents[i+1:]...)
i--
}
}
s.msgEventLock.Unlock()

return nil
}
Expand Down Expand Up @@ -244,57 +222,50 @@ func (s *Service) handle(msg *message) error {
return nil
}

// sendEvent triggeres the status events and action events. Returns true if action events are triggered. The events are
// triggered for ConnectionRequest, ConnectionResponse or ConnectionAck message types.
func (s *Service) sendEvent(msg dispatcher.DIDCommMsg, threadID string, nextState state) (bool, error) {
// trigger the status events
s.statusEventLock.RLock()
statusEvents := s.statusEvents
s.statusEventLock.RUnlock()

for _, handler := range statusEvents {
handler <- msg
}

s.lock.RLock()
aEvent := s.actionEvent
s.lock.RUnlock()

// invoke events for ConnectionRequest, ConnectionResponse or ConnectionAck and if action events are registered
if aEvent == nil || msg.Type != ConnectionRequest &&
msg.Type != ConnectionResponse && msg.Type != ConnectionAck {
return false, nil
}

// sendEvent triggers the action event. This function stores the state of current processing and passes a callback
// function in the event message.
func (s *Service) sendActionEvent(msg dispatcher.DIDCommMsg, aEvent chan<- dispatcher.DIDCommAction, threadID string, nextState state) error {
jsonDoc, err := json.Marshal(&message{
Msg: msg,
ThreadID: threadID,
NextStateName: nextState.Name(),
})

if err != nil {
return false, fmt.Errorf("JSON marshalling of document failed: %w", err)
return fmt.Errorf("JSON marshalling of document failed: %w", err)
}

// save the incoming message in the store (to retrieve later when callback events are fired)
id := generateRandomID()
err = s.store.Put(id, jsonDoc)
if err != nil {
return false, fmt.Errorf("JSON marshalling of document failed: %w", err)
return fmt.Errorf("JSON marshalling of document failed: %w", err)
}

// create the message for the channel
didCommEvent := event.DIDCommEvent{
didCommAction := dispatcher.DIDCommAction{
Message: msg,
Callback: func(didCommCallback event.DIDCommCallback) {
Callback: func(didCommCallback dispatcher.DIDCommCallback) {
s.processCallback(id, didCommCallback)
},
}

// trigger the registered action events
aEvent <- didCommEvent
// trigger the registered action event
aEvent <- didCommAction

return true, nil
return nil
}

// sendEvent triggers the message events.
func (s *Service) sendMsgEvents(msg dispatcher.DIDCommMsg) {
// trigger the message events
s.msgEventLock.RLock()
statusEvents := s.msgEvents
s.msgEventLock.RUnlock()

for _, handler := range statusEvents {
handler <- msg
}
}

// startInternalListener listens to messages in gochannel for callback messages from clients.
Expand All @@ -307,14 +278,14 @@ func (s *Service) startInternalListener() {
}()
}

func (s *Service) processCallback(id string, didCommCallback event.DIDCommCallback) {
func (s *Service) processCallback(id string, didCommCallback dispatcher.DIDCommCallback) {
// pass the callback data to internal channel. This is created to unblock consumer go routine and wrap the callback
// channel internally.
s.callbackChannel <- didCommChMessage{ID: id, DIDCommCallback: didCommCallback}
}

// processCallback processes the callback events.
func (s *Service) process(id string, didCommCallback event.DIDCommCallback) error {
func (s *Service) process(id string, didCommCallback dispatcher.DIDCommCallback) error {
if didCommCallback.Err != nil {
// TODO https://github.com/hyperledger/aries-framework-go/issues/242 Service callback processing error handling
return nil
Expand Down Expand Up @@ -436,3 +407,31 @@ func GenerateInviteWithKeyAndEndpoint(invite *Invitation) (string, error) {

return encodedExchangeInvitation(invite)
}

// AutoExecuteActionEvent is a utility function to execute events automatically. The function requires a channel to be
// passed-in to listen for dispatcher.DIDCommAction and triggers the callback. This is a blocking function and use
// this function with a goroutine.
//
// Usage:
// s := didexchange.New(....)
// actionCh := make(chan dispatcher.DIDCommAction)
// err = s.RegisterActionEvent(actionCh)
// go didexchange.AutoExecuteActionEvent(actionCh)
func AutoExecuteActionEvent(ch chan dispatcher.DIDCommAction) error {
for msg := range ch {
msg.Callback(dispatcher.DIDCommCallback{Err: nil})
}

return nil
}

// canTriggerActionEvents checks if the incoming message type matches either ConnectionRequest, ConnectionResponse or
// ConnectionAck type.
func canTriggerActionEvents(msgType string) bool {
if msgType != ConnectionRequest &&
msgType != ConnectionResponse && msgType != ConnectionAck {
return false
}

return true
}
Loading

0 comments on commit bd040c0

Please sign in to comment.