-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathrunner.go
561 lines (482 loc) · 19.1 KB
/
runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
package sarah
import (
"context"
"fmt"
"github.com/oklahomer/go-kasumi/logger"
"github.com/oklahomer/go-kasumi/worker"
"runtime"
"strings"
"sync"
"time"
)
var options = &optionHolder{}
// Config is a serializable struct that contains some configuration variables.
type Config struct {
// TimeZone tells the scheduler in what timezone the application runs.
TimeZone string `json:"timezone" yaml:"timezone"`
}
// NewConfig creates and returns a new Config instance with default settings.
// Use json.Unmarshal, yaml.Unmarshal, or manual manipulation to override those default values.
func NewConfig() *Config {
return &Config{
TimeZone: time.Now().Location().String(),
}
}
// optionHolder is a struct that stashes and holds given options until Sarah boots up.
// Those options are applied to Sarah on Run execution to manipulate Sarah's behavior.
// This was formally called RunnerOptions and was provided publicly, but is now private in favor of https://github.com/oklahomer/go-sarah/issues/72 .
// Calls to its methods are thread-safe.
type optionHolder struct {
mutex sync.RWMutex
stashed []func(*runner)
}
func (o *optionHolder) register(opt func(*runner)) {
o.mutex.Lock()
defer o.mutex.Unlock()
o.stashed = append(o.stashed, opt)
}
func (o *optionHolder) apply(r *runner) {
o.mutex.Lock()
defer o.mutex.Unlock()
for _, v := range o.stashed {
v(r)
}
}
// RegisterAlerter registers a given Alerter implementation to Sarah.
// When Sarah's process or a registered Bot implementation encounters a critical state, Alerter.Alert is called to notify such state.
// A developer may call this method multiple times to register multiple Alerters.
func RegisterAlerter(alerter Alerter) {
options.register(func(r *runner) {
r.alerters.appendAlerter(alerter)
})
}
// RegisterBot registers a given Bot implementation to be run on Run call.
// This may be called multiple times to register as many bot instances as wanted.
func RegisterBot(bot Bot) {
options.register(func(r *runner) {
r.bots = append(r.bots, bot)
})
}
// RegisterCommand registers a given Command implementation.
// On Run, each Command implementation is registered to the corresponding bot via Bot.AppendCommand.
// A Bot is considered to "correspond" when its BotType matches with the botType.
func RegisterCommand(botType BotType, command Command) {
options.register(func(r *runner) {
commands, ok := r.commands[botType]
if !ok {
commands = []Command{}
}
r.commands[botType] = append(commands, command)
})
}
// RegisterCommandProps registers a given CommandProps to build Command implementation on Run call.
// This instance is reused when a configuration is updated and the corresponding Command needs to be rebuilt to reflect the changes.
func RegisterCommandProps(props *CommandProps) {
options.register(func(r *runner) {
stashed, ok := r.commandProps[props.botType]
if !ok {
stashed = []*CommandProps{}
}
r.commandProps[props.botType] = append(stashed, props)
})
}
// RegisterScheduledTask registers a given ScheduledTask to Sarah.
// On Run, a schedule is set for this task.
func RegisterScheduledTask(botType BotType, task ScheduledTask) {
options.register(func(r *runner) {
tasks, ok := r.scheduledTasks[botType]
if !ok {
tasks = []ScheduledTask{}
}
r.scheduledTasks[botType] = append(tasks, task)
})
}
// RegisterScheduledTaskProps registers a given ScheduledTaskProps to build ScheduledTask on Run call.
// This instance is reused when a configuration file is updated and the corresponding ScheduledTask needs to be rebuilt.
func RegisterScheduledTaskProps(props *ScheduledTaskProps) {
options.register(func(r *runner) {
stashed, ok := r.scheduledTaskProps[props.botType]
if !ok {
stashed = []*ScheduledTaskProps{}
}
r.scheduledTaskProps[props.botType] = append(stashed, props)
})
}
// RegisterConfigWatcher registers a given ConfigWatcher implementation to Sarah.
// If a ConfigWatcher is registered, Sarah's process subscribes to the changes to Command or ScheduledTask's configuration.
// When a configuration is updated, ConfigWatcher reads the new configuration setting and reflects to the corresponding configuration instance
// so Sarah can rebuild the corresponding Command or ScheduledTask with the new setting.
func RegisterConfigWatcher(watcher ConfigWatcher) {
options.register(func(r *runner) {
r.configWatcher = watcher
})
}
// RegisterWorker registers a given worker.Worker implementation to Sarah.
// When one is not registered, a worker instance with default setting is used.
func RegisterWorker(worker worker.Worker) {
options.register(func(r *runner) {
r.worker = worker
})
}
// RegisterBotErrorSupervisor registers a given supervising function that is called when a Bot escalates an error.
// This function judges if the given error is worth being notified to administrators and if the Bot should stop.
// When an action is required, the function may return non-nil *SupervisionDirective to pass the order;
// Return nil when the escalated error can simply be ignored.
//
// Bot and Adapter can escalate an error via a function -- func(error) -- that is passed to Bot.Run as a third argument.
// When BotNonContinuableError is escalated, Sarah cancels the failing Bot's context, and thus the Bot and its related resources stop working.
// If one or more Alerter implementations are registered, such critical error is passed to those Alerters and administrators will be notified.
// When other types of error are escalated, the error is passed to the supervising function registered via RegisterBotErrorSupervisor.
// The function may return *SupervisionDirective to tell how Sarah should react.
//
// Bot and Adapter's implementation should be simple. It should not handle serious errors by itself.
// Instead, they should simply escalate an error every time when a noteworthy error occurs and let Sarah judge how to react.
// For example, if the bot should stop when three reconnection trial fails in ten seconds, the scenario could be somewhat like below:
// 1. Bot escalates reconnection error, FooReconnectionFailureError, each time it fails to reconnect
// 2. The supervising function counts the error and ignores the first two occurrence
// 3. When the third error comes within ten seconds from the initial error escalation, return *SupervisionDirective with StopBot value of true
//
// Similarly, if there should be a rate limiter to limit the calls to Alerters, the supervising function should take care of this instead of the failing Bot.
// Each Bot or Adapter's implementation can be kept simple in this way; Sarah should always supervise and control its belonging Bots.
func RegisterBotErrorSupervisor(fnc func(BotType, error) *SupervisionDirective) {
options.register(func(r *runner) {
r.superviseError = fnc
})
}
// Run sets up all required resources and initiates Sarah.
// Workers, schedulers, and other required resources for a bot interaction start running on this function call.
// This returns an error when bot interaction cannot start; No error is returned when the process starts successfully.
//
// Call ctx.Done or CurrentStatus to reference current running status.
//
// To control its lifecycle, a developer may cancel ctx and stop Sarah at any moment.
// When bot interaction stops unintentionally without such context cancellation,
// the critical state is notified to administrators via registered Alerter.
// Registering multiple Alerter implementations to ensure successful notification is recommended.
func Run(ctx context.Context, config *Config) error {
err := runnerStatus.start()
if err != nil {
return fmt.Errorf("failed to start bot process: %w", err)
}
runner, err := newRunner(ctx, config)
if err != nil {
return fmt.Errorf("failed to start bot process: %w", err)
}
go runner.run(ctx)
return nil
}
func newRunner(ctx context.Context, config *Config) (*runner, error) {
loc, err := time.LoadLocation(config.TimeZone)
if err != nil {
return nil, fmt.Errorf(`given timezone "%s" cannot be converted to time.Location: %w`, config.TimeZone, err)
}
r := &runner{
config: config,
bots: []Bot{},
worker: nil,
configWatcher: &nullConfigWatcher{},
commands: make(map[BotType][]Command),
commandProps: make(map[BotType][]*CommandProps),
scheduledTasks: make(map[BotType][]ScheduledTask),
scheduledTaskProps: make(map[BotType][]*ScheduledTaskProps),
alerters: &alerters{},
scheduler: runScheduler(ctx, loc),
superviseError: nil,
}
options.apply(r)
if r.worker == nil {
// When the jobs are CPU-intensive, the number of workers can be equal to the number of CPUs.
// However, in general, bot interaction involves more IO-intensive jobs such as calling external Weather APIs
// on behalf of the user. This setting expects up to a hundred jobs can work in a concurrent manner based on such premise.
//
// The queue size is set to ten, which is relatively small.
// Instead of allowing larger latency with a bigger queue size, messages will soon be ignored when the worker is busy and the queue is full.
// Users usually do not expect to have belated responses.
//
// Provide a worker.Worker implementation via RegisterWorker to customize the setting.
workerConfig := worker.NewConfig()
workerConfig.WorkerNum = 100
workerConfig.QueueSize = 10
r.worker = worker.Run(ctx, worker.NewConfig())
}
return r, nil
}
type runner struct {
config *Config
bots []Bot
worker worker.Worker
configWatcher ConfigWatcher
commands map[BotType][]Command
commandProps map[BotType][]*CommandProps
scheduledTasks map[BotType][]ScheduledTask
scheduledTaskProps map[BotType][]*ScheduledTaskProps
alerters *alerters
scheduler scheduler
superviseError func(BotType, error) *SupervisionDirective
}
// SupervisionDirective tells Sarah how to react to Bot's escalating error.
//
// A designated supervisor function judges if the error represents a critical state when a bot escalates an error.
// When the bot is in a critical state, the function can return non-nil *SupervisionDirective to tell Sarah how to treat the current state.
// A customized supervisor function can be defined and registered via RegisterBotErrorSupervisor.
type SupervisionDirective struct {
// StopBot tells if Sarah needs to stop the failing bot and cleanup related resources.
// When two or more bots are registered and at least one bot is to stay running after the failing bot stops,
// internal workers and a scheduler keep running.
//
// When all bots stop, then Sarah stops all resources.
StopBot bool
// AlertingErr is sent registered alerters and administrators will be notified.
// Set nil when such alert notification is not required.
AlertingErr error
}
func (r *runner) botCommands(botType BotType) []Command {
if commands, ok := r.commands[botType]; ok {
return commands
}
return []Command{}
}
func (r *runner) botCommandProps(botType BotType) []*CommandProps {
if props, ok := r.commandProps[botType]; ok {
return props
}
return []*CommandProps{}
}
func (r *runner) botScheduledTaskProps(botType BotType) []*ScheduledTaskProps {
if props, ok := r.scheduledTaskProps[botType]; ok {
return props
}
return []*ScheduledTaskProps{}
}
func (r *runner) botScheduledTasks(botType BotType) []ScheduledTask {
if tasks, ok := r.scheduledTasks[botType]; ok {
return tasks
}
return []ScheduledTask{}
}
func (r *runner) run(ctx context.Context) {
var wg sync.WaitGroup
for _, bot := range r.bots {
wg.Add(1)
go func(b Bot) {
defer func() {
wg.Done()
runnerStatus.stopBot(b)
}()
runnerStatus.addBot(b)
r.runBot(ctx, b)
}(bot)
}
wg.Wait()
}
func unsubscribeConfigWatcher(watcher ConfigWatcher, botType BotType) {
defer func() {
if r := recover(); r != nil {
logger.Errorf("Failed to unsubscribe ConfigWatcher for %s: %+v", botType, r)
}
}()
err := watcher.Unwatch(botType)
if err != nil {
logger.Errorf("Failed to unsubscribe ConfigWatcher for %s: %+v", botType, err)
}
}
// runBot initiates the given Bot implementation and blocks until the bot stops.
func (r *runner) runBot(runnerCtx context.Context, bot Bot) {
logger.Infof("Starting %s", bot.BotType())
botCtx, errNotifier := r.superviseBot(runnerCtx, bot.BotType())
// Build commands with stashed CommandProps.
r.registerCommands(botCtx, bot)
// Register scheduled tasks.
r.registerScheduledTasks(botCtx, bot)
inputReceiver := setupInputReceiver(botCtx, bot, r.worker)
// Run the bot in a panic-proof manner.
func() {
defer func() {
// When the bot panics, recover and tell as much detailed information as possible via the error notification channel.
// The channel receiver sends an alert to the administrator.
if r := recover(); r != nil {
stack := []string{fmt.Sprintf("panic in bot: %s. %#v.", bot.BotType(), r)}
// Inform stack trace
for depth := 0; ; depth++ {
_, src, line, ok := runtime.Caller(depth)
if !ok {
break
}
stack = append(stack, fmt.Sprintf(" -> depth:%d. file:%s. line:%d.", depth, src, line))
}
errNotifier(NewBotNonContinuableError(strings.Join(stack, "\n")))
}
// Bot.Run may return without internally sending an error to errNotifier.
// To ensure the bot's context is canceled by Sarah and administrators are notified, explicitly send *BotNonContinuableError in this defer statement.
// An error being sent here is simply ignored if the bot context is already canceled by a previous error notification.
errNotifier(NewBotNonContinuableError(fmt.Sprintf("shutdown bot: %s", bot.BotType())))
}()
bot.Run(botCtx, inputReceiver, errNotifier) // Blocks til interaction ends
unsubscribeConfigWatcher(r.configWatcher, bot.BotType())
}()
}
func (r *runner) superviseBot(runnerCtx context.Context, botType BotType) (context.Context, func(error)) {
botCtx, cancel := context.WithCancel(runnerCtx)
sendAlert := func(err error) {
e := r.alerters.alertAll(runnerCtx, botType, err)
if e != nil {
logger.Errorf("Failed to send alert for %s: %+v", botType, e)
}
}
stopBot := func() {
cancel()
logger.Infof("Stop supervising bot's critical error due to its context cancellation: %s.", botType)
}
// A function that receives an escalated error from the bot.
// If a critical error is sent, this cancels the bot's context to finish its lifecycle.
// The bot MUST NOT kill itself, but Sarah does. Beware that Sarah takes care of all related components' lifecycle.
handleError := func(err error) {
switch err.(type) {
case *BotNonContinuableError:
logger.Errorf("Stop unrecoverable bot. BotType: %s. Error: %+v", botType, err)
stopBot()
go sendAlert(err)
default:
if r.superviseError != nil {
directive := r.superviseError(botType, err)
if directive == nil {
return
}
if directive.StopBot {
logger.Errorf("Stop bot due to given directive. BotType: %s. Reason: %+v", botType, err)
stopBot()
}
if directive.AlertingErr != nil {
go sendAlert(directive.AlertingErr)
}
}
}
}
// A function to be exposed to Bot/Adapter developers.
// When Bot implementation faces a critical state, the failing bot can call this function to let Sarah judge the severity and stop the bot if necessary.
errNotifier := func(err error) {
select {
case <-botCtx.Done():
// Bot context is already canceled by the preceding error notification. Do nothing.
return
default:
handleError(err)
}
}
return botCtx, errNotifier
}
func (r *runner) registerCommands(botCtx context.Context, bot Bot) {
props := r.botCommandProps(bot.BotType())
reg := func(p *CommandProps) {
command, err := buildCommand(botCtx, p, r.configWatcher)
if err != nil {
logger.Errorf("Failed to build command %#v: %+v", p, err)
return
}
bot.AppendCommand(command)
}
callback := func(p *CommandProps) func() {
return func() {
logger.Infof("Updating command: %s", p.identifier)
reg(p)
}
}
for _, p := range props {
reg(p)
err := r.configWatcher.Watch(botCtx, bot.BotType(), p.identifier, callback(p))
if err != nil {
logger.Errorf("Failed to subscribe configuration for command %s: %+v", p.identifier, err)
continue
}
}
for _, command := range r.botCommands(bot.BotType()) {
bot.AppendCommand(command)
}
}
func (r *runner) registerScheduledTasks(botCtx context.Context, bot Bot) {
reg := func(p *ScheduledTaskProps) {
r.scheduler.remove(bot.BotType(), p.identifier)
task, err := buildScheduledTask(botCtx, p, r.configWatcher)
if err != nil {
logger.Errorf("Failed to build scheduled task %s: %+v", p.identifier, err)
return
}
err = r.scheduler.update(bot.BotType(), task, func() {
executeScheduledTask(botCtx, bot, task)
})
if err != nil {
logger.Errorf("Failed to schedule a task. ID: %s: %+v", task.Identifier(), err)
}
}
callback := func(p *ScheduledTaskProps) func() {
return func() {
logger.Infof("Updating scheduled task: %s", p.identifier)
reg(p)
}
}
for _, p := range r.botScheduledTaskProps(bot.BotType()) {
reg(p)
err := r.configWatcher.Watch(botCtx, bot.BotType(), p.identifier, callback(p))
if err != nil {
logger.Errorf("Failed to subscribe configuration for scheduled task %s: %+v", p.identifier, err)
continue
}
}
for _, task := range r.botScheduledTasks(bot.BotType()) {
if task.Schedule() == "" {
logger.Errorf("Failed to schedule a task. ID: %s. Reason: %s.", task.Identifier(), "No schedule given.")
continue
}
err := r.scheduler.update(bot.BotType(), task, func() {
executeScheduledTask(botCtx, bot, task)
})
if err != nil {
logger.Errorf("Failed to schedule a task. id: %s: %+v", task.Identifier(), err)
}
}
}
func executeScheduledTask(ctx context.Context, bot Bot, task ScheduledTask) {
results, err := task.Execute(ctx)
if err != nil {
logger.Errorf("Error on scheduled task: %s", task.Identifier())
return
} else if results == nil {
return
}
for _, res := range results {
// The destination returned by task execution has higher priority.
// e.g. RSS Reader's task searches for stored feed/destination set, and returns which destination to send.
dest := res.Destination
if dest == nil {
// If no destination is given, see if default destination exists.
// Useful when destination can be preset.
// e.g. Weather forecast task always sends weather information to #goodmorning room.
presetDest := task.DefaultDestination()
if presetDest == nil {
logger.Errorf("Task was completed, but destination was not set: %s.", task.Identifier())
continue
}
dest = presetDest
}
message := NewOutputMessage(dest, res.Content)
bot.SendMessage(ctx, message)
}
}
func setupInputReceiver(botCtx context.Context, bot Bot, wkr worker.Worker) func(Input) error {
continuousEnqueueErrCnt := 0
return func(input Input) error {
err := wkr.Enqueue(func() {
err := bot.Respond(botCtx, input)
if err != nil {
logger.Errorf("Error on message handling. Input: %#v. Error: %+v", input, err)
}
})
if err == nil {
continuousEnqueueErrCnt = 0
return nil
}
continuousEnqueueErrCnt++
// Could not send because probably the workers are too busy or the runner context is already canceled.
return NewBlockedInputError(continuousEnqueueErrCnt)
}
}