-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathscheduled_job_executor.go
197 lines (174 loc) · 6.63 KB
/
scheduled_job_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package jobs
import (
"context"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)
// ScheduledJobExecutor is an interface describing execution of the scheduled job.
type ScheduledJobExecutor interface {
// Executes scheduled job; Implementation may use provided transaction.
// Modifications to the ScheduledJob object will be persisted.
ExecuteJob(
ctx context.Context,
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
schedule *ScheduledJob,
txn *kv.Txn,
) error
// Notifies that the system.job started by the ScheduledJob completed.
// Implementation may use provided transaction to perform any additional mutations.
// Modifications to the ScheduledJob object will be persisted.
NotifyJobTermination(
ctx context.Context,
jobID jobspb.JobID,
jobStatus Status,
details jobspb.Details,
env scheduledjobs.JobSchedulerEnv,
schedule *ScheduledJob,
ex sqlutil.InternalExecutor,
txn *kv.Txn,
) error
// Metrics returns optional metric.Struct object for this executor.
Metrics() metric.Struct
// GetCreateScheduleStatement returns a `CREATE SCHEDULE` statement that is
// functionally equivalent to the statement that led to the creation of
// the passed in `schedule`.
GetCreateScheduleStatement(ctx context.Context, env scheduledjobs.JobSchedulerEnv, txn *kv.Txn,
schedule *ScheduledJob, ex sqlutil.InternalExecutor) (string, error)
}
// ScheduledJobController is an interface describing hooks that will execute
// when controlling a scheduled job.
type ScheduledJobController interface {
// OnDrop runs before the passed in `schedule` is dropped as part of a `DROP
// SCHEDULE` query.
OnDrop(ctx context.Context, ie sqlutil.InternalExecutor, ptsProvider protectedts.Provider,
env scheduledjobs.JobSchedulerEnv, schedule *ScheduledJob, txn *kv.Txn) error
}
// ScheduledJobExecutorFactory is a callback to create a ScheduledJobExecutor.
type ScheduledJobExecutorFactory = func() (ScheduledJobExecutor, error)
var executorRegistry struct {
syncutil.Mutex
factories map[string]ScheduledJobExecutorFactory
executors map[string]ScheduledJobExecutor
}
// RegisterScheduledJobExecutorFactory registers callback for creating ScheduledJobExecutor
// with the specified name.
func RegisterScheduledJobExecutorFactory(name string, factory ScheduledJobExecutorFactory) {
executorRegistry.Lock()
defer executorRegistry.Unlock()
if executorRegistry.factories == nil {
executorRegistry.factories = make(map[string]ScheduledJobExecutorFactory)
}
if _, ok := executorRegistry.factories[name]; ok {
panic("executor " + name + " already registered")
}
executorRegistry.factories[name] = factory
}
// newScheduledJobExecutor creates new instance of ScheduledJobExecutor.
func newScheduledJobExecutorLocked(name string) (ScheduledJobExecutor, error) {
if factory, ok := executorRegistry.factories[name]; ok {
return factory()
}
return nil, errors.Newf("executor %q is not registered", name)
}
// GetScheduledJobExecutor returns a singleton instance of
// ScheduledJobExecutor and a flag indicating if that instance was just created.
func GetScheduledJobExecutor(name string) (ScheduledJobExecutor, error) {
executorRegistry.Lock()
defer executorRegistry.Unlock()
return getScheduledJobExecutorLocked(name)
}
func getScheduledJobExecutorLocked(name string) (ScheduledJobExecutor, error) {
if executorRegistry.executors == nil {
executorRegistry.executors = make(map[string]ScheduledJobExecutor)
}
if ex, ok := executorRegistry.executors[name]; ok {
return ex, nil
}
ex, err := newScheduledJobExecutorLocked(name)
if err != nil {
return nil, err
}
executorRegistry.executors[name] = ex
return ex, nil
}
// RegisterExecutorsMetrics registered the metrics updated by each executor.
func RegisterExecutorsMetrics(registry *metric.Registry) error {
executorRegistry.Lock()
defer executorRegistry.Unlock()
for executorType := range executorRegistry.factories {
ex, err := getScheduledJobExecutorLocked(executorType)
if err != nil {
return err
}
if m := ex.Metrics(); m != nil {
registry.AddMetricStruct(m)
}
}
return nil
}
// DefaultHandleFailedRun is a default implementation for handling failed run
// (either system.job failure, or perhaps error processing the schedule itself).
func DefaultHandleFailedRun(schedule *ScheduledJob, fmtOrMsg string, args ...interface{}) {
switch schedule.ScheduleDetails().OnError {
case jobspb.ScheduleDetails_RETRY_SOON:
schedule.SetScheduleStatus("retrying: "+fmtOrMsg, args...)
schedule.SetNextRun(schedule.env.Now().Add(retryFailedJobAfter)) // TODO(yevgeniy): backoff
case jobspb.ScheduleDetails_PAUSE_SCHED:
schedule.Pause()
schedule.SetScheduleStatus("schedule paused: "+fmtOrMsg, args...)
case jobspb.ScheduleDetails_RETRY_SCHED:
schedule.SetScheduleStatus("reschedule: "+fmtOrMsg, args...)
}
}
// NotifyJobTermination is invoked when the job triggered by specified schedule
// completes
//
// The 'txn' transaction argument is the transaction the job will use to update its
// state (e.g. status, etc). If any changes need to be made to the scheduled job record,
// those changes are applied to the same transaction -- that is, they are applied atomically
// with the job status changes.
func NotifyJobTermination(
ctx context.Context,
env scheduledjobs.JobSchedulerEnv,
jobID jobspb.JobID,
jobStatus Status,
jobDetails jobspb.Details,
scheduleID int64,
ex sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
if env == nil {
env = scheduledjobs.ProdJobSchedulerEnv
}
schedule, err := LoadScheduledJob(ctx, env, scheduleID, ex, txn)
if err != nil {
return err
}
executor, err := GetScheduledJobExecutor(schedule.ExecutorType())
if err != nil {
return err
}
// Delegate handling of the job termination to the executor.
err = executor.NotifyJobTermination(ctx, jobID, jobStatus, jobDetails, env, schedule, ex, txn)
if err != nil {
return err
}
// Update this schedule in case executor made changes to it.
return schedule.Update(ctx, ex, txn)
}