Skip to content

Commit

Permalink
Merge pull request #81 from oklahomer/feature/judge-alert-policy
Browse files Browse the repository at this point in the history
Add a mechanism to judge if the raised error is worth being alerted to admin
  • Loading branch information
oklahomer authored Jun 2, 2019
2 parents cf27dc5 + d4233f4 commit 1648c0b
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 101 deletions.
2 changes: 1 addition & 1 deletion alerter/line/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func New(config *Config) *Client {

// Alert sends alert message to notify critical state of caller.
func (c *Client) Alert(ctx context.Context, botType sarah.BotType, err error) error {
msg := fmt.Sprintf("Critical error on %s: %s.", botType.String(), err.Error())
msg := fmt.Sprintf("Error on %s: %s.", botType.String(), err.Error())
v := url.Values{"message": {msg}}
req, err := http.NewRequest(http.MethodPost, Endpoint, strings.NewReader(v.Encode()))
if err != nil {
Expand Down
154 changes: 112 additions & 42 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,34 @@ func RegisterWorker(worker workers.Worker) {
})
}

// RegisterBotErrorSupervisor registers a given supervising function that is called when a Bot escalates an error.
// This function judges if the given error is worth being notified to administrators and if the Bot should stop.
// A developer may return *SupervisionDirective to tell such order.
// If the escalated error can simply be ignored, a nil value can be returned.
//
// Bot/Adapter can escalate an error via a function, func(error), that is passed to Run() as a third argument.
// When BotNonContinuableError is escalated, go-sarah's core cancels failing Bot's context and thus the Bot and related resources stop working.
// If one or more sarah.Alerters implementations are registered, such critical error is passed to the alerters and administrators will be notified.
// When other types of error are escalated, the error is passed to the supervising function registered via sarah.RegisterBotErrorSupervisor().
// The function may return *SupervisionDirective to tell how go-sarah's core should react.
//
// Bot/Adapter's implementation should be simple. It should not handle serious errors by itself.
// Instead, it should simply escalate an error every time when a noteworthy error occurs and let core judge how to react.
// For example, if the bot should stop when three reconnection trial fails in ten seconds, the scenario could be somewhat like below:
// 1. Bot escalates reconnection error, FooReconnectionFailureError, each time it fails to reconnect
// 2. Supervising function counts the error and ignores the first two occurrence
// 3. When the third error comes within ten seconds from the initial error escalation, return *SupervisionDirective with StopBot value of true
//
// Similarly, if there should be a rate limiter to limit the calls to alerters, the supervising function should take care of this instead of the failing Bot.
// Each Bot/Adapter's implementation can be kept simple in this way.
// go-sarah's core should always supervise and control its belonging Bots.
func RegisterBotErrorSupervisor(fnc func(BotType, error) *SupervisionDirective) {
options.register(func(r *runner) error {
r.superviseError = fnc
return nil
})
}

// Run is a non-blocking function that starts running go-sarah's process with pre-registered options.
// Workers, schedulers and other required resources for bot interaction starts running on this function call.
// This returns error when bot interaction cannot start; No error is returned when process starts successfully.
Expand Down Expand Up @@ -178,11 +206,13 @@ func newRunner(ctx context.Context, config *Config) (*runner, error) {
config: config,
bots: []Bot{},
worker: nil,
watcher: nil,
commandProps: make(map[BotType][]*CommandProps),
scheduledTaskProps: make(map[BotType][]*ScheduledTaskProps),
scheduledTasks: make(map[BotType][]ScheduledTask),
alerters: &alerters{},
scheduler: runScheduler(ctx, loc),
superviseError: nil,
}

err = options.apply(r)
Expand Down Expand Up @@ -221,6 +251,23 @@ type runner struct {
scheduledTasks map[BotType][]ScheduledTask
alerters *alerters
scheduler scheduler
superviseError func(BotType, error) *SupervisionDirective
}

// SupervisionDirective tells go-sarah's core how to react when a Bot escalates an error.
// A customized supervisor can be defined and registered via RegisterBotErrorSupervisor().
type SupervisionDirective struct {
// StopBot tells the core to stop the failing Bot and related resources.
// When two or more Bots are registered and one or more Bots are to be still running after the failing Bot stops,
// internal workers and scheduler keep running.
// Directory watcher stops subscribing to corresponding Bot's configuration files' directory,
// but it still keeps running until the last Bot stops.
//
// When all Bots stop, then the core stops all resources.
StopBot bool
// AlertingErr is sent registered alerters and administrators will be notified.
// Set nil when such alert notification is not required.
AlertingErr error
}

func (r *runner) botCommandProps(botType BotType) []*CommandProps {
Expand Down Expand Up @@ -267,7 +314,7 @@ func (r *runner) run(ctx context.Context) {
// This returns when bot stops.
func (r *runner) runBot(runnerCtx context.Context, bot Bot) {
log.Infof("Starting %s", bot.BotType())
botCtx, errNotifier := superviseBot(runnerCtx, bot.BotType(), r.alerters)
botCtx, errNotifier := r.superviseBot(runnerCtx, bot.BotType())

// Setup config directory for this particular Bot.
var configDir string
Expand Down Expand Up @@ -366,6 +413,70 @@ func (r *runner) subscribeConfigDir(botCtx context.Context, bot Bot, configDir s
}
}

func (r *runner) superviseBot(runnerCtx context.Context, botType BotType) (context.Context, func(error)) {
botCtx, cancel := context.WithCancel(runnerCtx)

sendAlert := func(err error) {
e := r.alerters.alertAll(runnerCtx, botType, err)
if e != nil {
log.Errorf("Failed to send alert for %s: %+v", botType, e)
}
}

stopBot := func() {
cancel()
log.Infof("Stop supervising bot's critical error due to its context cancellation: %s.", botType)
}

// A function that receives an escalated error from Bot.
// If critical error is sent, this cancels Bot context to finish its lifecycle.
// Bot itself MUST NOT kill itself, but the Runner does. Beware that Runner takes care of all related components' lifecycle.
handleError := func(err error) {
switch err.(type) {
case *BotNonContinuableError:
log.Errorf("Stop unrecoverable bot. BotType: %s. Error: %+v", botType, err)

stopBot()

go sendAlert(err)

default:
if r.superviseError != nil {
directive := r.superviseError(botType, err)
if directive == nil {
return
}

if directive.StopBot {
log.Errorf("Stop bot due to given directive. BotType: %s. Reason: %+v", botType, err)
stopBot()
}

if directive.AlertingErr != nil {
go sendAlert(directive.AlertingErr)
}
}

}
}

// A function to be exposed to Bot/Adapter developers.
// When Bot/Adapter faces a critical state, it can call this function to let Runner judge the severity and stop Bot if necessary.
errNotifier := func(err error) {
select {
case <-botCtx.Done():
// Bot context is already canceled by preceding error notification. Do nothing.
return

default:
handleError(err)

}
}

return botCtx, errNotifier
}

func registerScheduledTask(botCtx context.Context, bot Bot, task ScheduledTask, taskScheduler scheduler) {
err := taskScheduler.update(bot.BotType(), task, func() {
executeScheduledTask(botCtx, bot, task)
Expand Down Expand Up @@ -528,47 +639,6 @@ func executeScheduledTask(ctx context.Context, bot Bot, task ScheduledTask) {
}
}

func superviseBot(runnerCtx context.Context, botType BotType, alerters *alerters) (context.Context, func(error)) {
botCtx, cancel := context.WithCancel(runnerCtx)

// A function that receives an escalated error from Bot.
// If critical error is sent, this cancels Bot context to finish its lifecycle.
// Bot itself MUST NOT kill itself, but the Runner does. Beware that Runner takes care of all related components' lifecycle.
handleError := func(err error) {
switch err.(type) {
case *BotNonContinuableError:
log.Errorf("Stop unrecoverable bot. BotType: %s. Error: %+v", botType, err)
cancel()

go func() {
e := alerters.alertAll(runnerCtx, botType, err)
if e != nil {
log.Errorf("Failed to send alert for %s: %+v", botType, e)
}
}()

log.Infof("Stop supervising bot critical error due to context cancellation: %s.", botType)

}
}

// A function to be exposed to Bot/Adapter developers.
// When Bot/Adapter faces a critical state, it can call this function to let Runner judge the severity and stop Bot if necessary.
errNotifier := func(err error) {
select {
case <-botCtx.Done():
// Bot context is already canceled by preceding error notification. Do nothing.
return

default:
handleError(err)

}
}

return botCtx, errNotifier
}

func setupInputReceiver(botCtx context.Context, bot Bot, worker workers.Worker) func(Input) error {
continuousEnqueueErrCnt := 0
return func(input Input) error {
Expand Down
Loading

0 comments on commit 1648c0b

Please sign in to comment.