forked from hibiken/asynq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.go
254 lines (230 loc) · 6.69 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"fmt"
"os"
"sync"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
"github.com/robfig/cron/v3"
)
// A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
type Scheduler struct {
id string
status *base.ServerStatus
logger *log.Logger
client *Client
rdb *rdb.RDB
cron *cron.Cron
location *time.Location
done chan struct{}
wg sync.WaitGroup
errHandler func(task *Task, opts []Option, err error)
}
// NewScheduler returns a new Scheduler instance given the redis connection option.
// The parameter opts is optional, defaults will be used if opts is set to nil
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
}
if opts == nil {
opts = &SchedulerOpts{}
}
logger := log.NewLogger(opts.Logger)
loglevel := opts.LogLevel
if loglevel == level_unspecified {
loglevel = InfoLevel
}
logger.SetLevel(toInternalLogLevel(loglevel))
loc := opts.Location
if loc == nil {
loc = time.UTC
}
return &Scheduler{
id: generateSchedulerID(),
status: base.NewServerStatus(base.StatusIdle),
logger: logger,
client: NewClient(r),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),
errHandler: opts.EnqueueErrorHandler,
}
}
func generateSchedulerID() string {
host, err := os.Hostname()
if err != nil {
host = "unknown-host"
}
return fmt.Sprintf("%s:%d:%v", host, os.Getpid(), uuid.New())
}
// SchedulerOpts specifies scheduler options.
type SchedulerOpts struct {
// Logger specifies the logger used by the scheduler instance.
//
// If unset, the default logger is used.
Logger Logger
// LogLevel specifies the minimum log level to enable.
//
// If unset, InfoLevel is used by default.
LogLevel LogLevel
// Location specifies the time zone location.
//
// If unset, the UTC time zone (time.UTC) is used.
Location *time.Location
// EnqueueErrorHandler gets called when scheduler cannot enqueue a registered task
// due to an error.
EnqueueErrorHandler func(task *Task, opts []Option, err error)
}
// enqueueJob encapsulates the job of enqueing a task and recording the event.
type enqueueJob struct {
id uuid.UUID
cronspec string
task *Task
opts []Option
location *time.Location
logger *log.Logger
client *Client
rdb *rdb.RDB
errHandler func(task *Task, opts []Option, err error)
}
func (j *enqueueJob) Run() {
res, err := j.client.Enqueue(j.task, j.opts...)
if err != nil {
j.logger.Errorf("scheduler could not enqueue a task %+v: %v", j.task, err)
if j.errHandler != nil {
j.errHandler(j.task, j.opts, err)
}
return
}
j.logger.Debugf("scheduler enqueued a task: %+v", res)
event := &base.SchedulerEnqueueEvent{
TaskID: res.ID,
EnqueuedAt: res.EnqueuedAt.In(j.location),
}
err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event)
if err != nil {
j.logger.Errorf("scheduler could not record enqueue event of enqueued task %+v: %v", j.task, err)
}
}
// Register registers a task to be enqueued on the given schedule specified by the cronspec.
// It returns an ID of the newly registered entry.
func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) {
job := &enqueueJob{
id: uuid.New(),
cronspec: cronspec,
task: task,
opts: opts,
location: s.location,
client: s.client,
rdb: s.rdb,
logger: s.logger,
errHandler: s.errHandler,
}
if _, err = s.cron.AddJob(cronspec, job); err != nil {
return "", err
}
return job.id.String(), nil
}
// Run starts the scheduler until an os signal to exit the program is received.
// It returns an error if scheduler is already running or has been stopped.
func (s *Scheduler) Run() error {
if err := s.Start(); err != nil {
return err
}
s.waitForSignals()
return s.Stop()
}
// Start starts the scheduler.
// It returns an error if the scheduler is already running or has been stopped.
func (s *Scheduler) Start() error {
switch s.status.Get() {
case base.StatusRunning:
return fmt.Errorf("asynq: the scheduler is already running")
case base.StatusStopped:
return fmt.Errorf("asynq: the scheduler has already been stopped")
}
s.logger.Info("Scheduler starting")
s.logger.Infof("Scheduler timezone is set to %v", s.location)
s.cron.Start()
s.wg.Add(1)
go s.runHeartbeater()
s.status.Set(base.StatusRunning)
return nil
}
// Stop stops the scheduler.
// It returns an error if the scheduler is not currently running.
func (s *Scheduler) Stop() error {
if s.status.Get() != base.StatusRunning {
return fmt.Errorf("asynq: the scheduler is not running")
}
s.logger.Info("Scheduler shutting down")
close(s.done) // signal heartbeater to stop
ctx := s.cron.Stop()
<-ctx.Done()
s.wg.Wait()
s.clearHistory()
s.client.Close()
s.rdb.Close()
s.status.Set(base.StatusStopped)
s.logger.Info("Scheduler stopped")
return nil
}
func (s *Scheduler) runHeartbeater() {
defer s.wg.Done()
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-s.done:
s.logger.Debugf("Scheduler heatbeater shutting down")
s.rdb.ClearSchedulerEntries(s.id)
return
case <-ticker.C:
s.beat()
}
}
}
// beat writes a snapshot of entries to redis.
func (s *Scheduler) beat() {
var entries []*base.SchedulerEntry
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
e := &base.SchedulerEntry{
ID: job.id.String(),
Spec: job.cronspec,
Type: job.task.Type,
Payload: job.task.Payload.data,
Opts: stringifyOptions(job.opts),
Next: entry.Next,
Prev: entry.Prev,
}
entries = append(entries, e)
}
s.logger.Debugf("Writing entries %v", entries)
if err := s.rdb.WriteSchedulerEntries(s.id, entries, 5*time.Second); err != nil {
s.logger.Warnf("Scheduler could not write heartbeat data: %v", err)
}
}
func stringifyOptions(opts []Option) []string {
var res []string
for _, opt := range opts {
res = append(res, opt.String())
}
return res
}
func (s *Scheduler) clearHistory() {
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
if err := s.rdb.ClearSchedulerHistory(job.id.String()); err != nil {
s.logger.Warnf("Could not clear scheduler history for entry %q: %v", job.id.String(), err)
}
}
}