Skip to content

Commit

Permalink
Merge pull request #2741 from LiilyZhang/zhangl/Issue2555
Browse files Browse the repository at this point in the history
Issue 2555 - Race condition when agent un-registering leaves agent as…
  • Loading branch information
dabooz authored Aug 13, 2021
2 parents 87dd268 + 01cd943 commit b8c38dd
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 7 deletions.
33 changes: 33 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ const (

// Secret related
UPDATED_SECRETS EventId = "SECRET_UPDATES"

// ESS related
ESS_UNCONFIG EventId = "ESS_UNCONFIG"
)

type EndContractCause string
Expand Down Expand Up @@ -1759,6 +1762,36 @@ func NewNodeShutdownCompleteMessage(id EventId, errorMsg string) *NodeShutdownCo
}
}

type SyncServiceCleanedUpMessage struct {
event Event
err string
}

func (n *SyncServiceCleanedUpMessage) Event() Event {
return n.event
}

func (n SyncServiceCleanedUpMessage) String() string {
return n.ShortString()
}

func (n SyncServiceCleanedUpMessage) ShortString() string {
return fmt.Sprintf("Event: %v, Error: %v", n.event, n.err)
}

func (n SyncServiceCleanedUpMessage) Err() string {
return n.err
}

func NewSyncServiceCleanedUpMessage(id EventId, errorMsg string) *SyncServiceCleanedUpMessage {
return &SyncServiceCleanedUpMessage{
event: Event{
Id: id,
},
err: errorMsg,
}
}

// This is a special message that the message dispatcher knows about.
type WorkerStopMessage struct {
event Event
Expand Down
10 changes: 9 additions & 1 deletion governance/governance.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type GovernanceWorker struct {
limitedRetryEC exchange.ExchangeContext
exchErrors cache.Cache
noworkDispatch int64 // The last time the NoWorkHandler was dispatched.
essCleanedUp bool
}

func NewGovernanceWorker(name string, cfg *config.HorizonConfig, db *bolt.DB, pm *policy.PolicyManager) *GovernanceWorker {
Expand Down Expand Up @@ -83,6 +84,7 @@ func NewGovernanceWorker(name string, cfg *config.HorizonConfig, db *bolt.DB, pm
limitedRetryEC: lrec,
exchErrors: cache.NewSimpleMapCache(),
noworkDispatch: time.Now().Unix(),
essCleanedUp: false,
}

// Start the worker and set the no work interval to 10 seconds.
Expand Down Expand Up @@ -311,6 +313,10 @@ func (w *GovernanceWorker) NewEvent(incoming events.Message) {
w.Commands <- worker.NewTerminateCommand("shutdown")
}

case *events.SyncServiceCleanedUpMessage:
w.essCleanedUp = true
glog.V(5).Infof(logString(fmt.Sprintf("Receive SyncServiceCleanedUpMessage event, set ess.CleanUp to %t for governance worker.", w.essCleanedUp)))

case *events.NodeHeartbeatStateChangeMessage:
msg, _ := incoming.(*events.NodeHeartbeatStateChangeMessage)
switch msg.Event().Id {
Expand Down Expand Up @@ -1401,12 +1407,14 @@ func (w *GovernanceWorker) NoWorkHandler() {

// When all subworkers are down, start the shutdown process.
if w.IsWorkerShuttingDown() && w.ShuttingDownCmd != nil {
if w.AreAllSubworkersTerminated() {
if w.AreAllSubworkersTerminated() && w.essCleanedUp {
glog.V(5).Infof(logString(fmt.Sprintf("GovernanceWorker initiating async shutdown.")))
cmd := w.ShuttingDownCmd
// This is one of the few go routines that should NOT be abstracted as a subworker.
go w.nodeShutdown(cmd)
w.ShuttingDownCmd = nil
} else if !w.essCleanedUp {
glog.V(5).Infof(logString(fmt.Sprintf("GovernanceWorker waiting for ESS to finish cleanup.")))
} else {
glog.V(5).Infof(logString(fmt.Sprintf("GovernanceWorker waiting for subworkers to terminate.")))
}
Expand Down
4 changes: 2 additions & 2 deletions resource/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewNodeConfigCommand(msg *events.EdgeRegisteredExchangeMessage) *NodeConfig

// This worker command is used to tell the worker than the node is done shutting down and so it can terminate itself.
type NodeUnconfigCommand struct {
msg *events.NodeShutdownCompleteMessage
msg *events.NodeShutdownMessage
}

func (n NodeUnconfigCommand) String() string {
Expand All @@ -37,7 +37,7 @@ func (n NodeUnconfigCommand) ShortString() string {
return fmt.Sprintf("NodeUnconfig Command, Msg: %v", n.msg)
}

func NewNodeUnconfigCommand(msg *events.NodeShutdownCompleteMessage) *NodeUnconfigCommand {
func NewNodeUnconfigCommand(msg *events.NodeShutdownMessage) *NodeUnconfigCommand {
return &NodeUnconfigCommand{
msg: msg,
}
Expand Down
12 changes: 8 additions & 4 deletions resource/resource_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ func (w *ResourceWorker) NewEvent(incoming events.Message) {
w.EC = worker.NewExchangeContext(fmt.Sprintf("%v/%v", msg.Org(), msg.DeviceId()), msg.Token(), w.Config.Edge.ExchangeURL, w.Config.GetCSSURL(), w.Config.Collaborators.HTTPClientFactory)
w.Commands <- NewNodeConfigCommand(msg)

case *events.NodeShutdownCompleteMessage:
msg, _ := incoming.(*events.NodeShutdownCompleteMessage)
case *events.NodeShutdownMessage:
msg, _ := incoming.(*events.NodeShutdownMessage)
switch msg.Event().Id {
case events.UNCONFIGURE_COMPLETE:
case events.START_UNCONFIGURE:
w.Commands <- NewNodeUnconfigCommand(msg)
}

Expand All @@ -103,9 +103,13 @@ func (w *ResourceWorker) CommandHandler(command worker.Command) bool {

case *NodeUnconfigCommand:
cmd, _ := command.(*NodeUnconfigCommand)
if err := w.handleNodeUnconfigCommand(cmd); err != nil {
err := w.handleNodeUnconfigCommand(cmd)
errMsg := ""
if err != nil {
glog.Errorf(reslog(fmt.Sprintf("Error handling node unconfig command: %v", err)))
errMsg = err.Error()
}
w.Messages() <- events.NewSyncServiceCleanedUpMessage(events.ESS_UNCONFIG, errMsg)

default:
return false
Expand Down

0 comments on commit b8c38dd

Please sign in to comment.