-
Notifications
You must be signed in to change notification settings - Fork 5
/
main.go
256 lines (216 loc) · 7.28 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package main
import (
"context"
"flag"
"fmt"
"os"
"os/exec"
"strings"
"sync"
"time"
"github.com/jpillora/overseer/fetcher"
"github.com/jpillora/overseer"
"github.com/mylxsw/coyotes/backend"
"github.com/mylxsw/coyotes/backend/mysql"
"github.com/mylxsw/coyotes/config"
"github.com/mylxsw/coyotes/console"
"github.com/mylxsw/coyotes/log"
"github.com/mylxsw/coyotes/pidfile"
"github.com/mylxsw/coyotes/scheduler"
sysRuntime "runtime"
_ "github.com/go-sql-driver/mysql"
broker "github.com/mylxsw/coyotes/brokers/redis"
server "github.com/mylxsw/coyotes/http"
)
var (
bizName string
redisAddr string
redisPassword string
redisDB int
redisAddrDepressed string
redisPasswordDepressed string
httpAddr string
pidFile string
concurrent int
taskMode bool
colorfulTTY bool
defaultChannel string
logFilename string
debugMode bool
daemonize bool
backendStorage string
backendKeepDays int
fetchUpdateURL string
autoUpdate bool
updateInterval int
)
var BuildID = "0"
func main() {
flag.Usage = func() {
fmt.Println(config.WelcomeMessageStr)
fmt.Print("Options:\n\n")
flag.PrintDefaults()
}
flag.StringVar(&bizName, "biz-name", "", "业务名称,使用英文字符串")
flag.StringVar(&redisAddr, "redis-host", "127.0.0.1:6379", "redis连接地址,必须指定端口")
flag.StringVar(&redisPassword, "redis-password", "", "redis连接密码")
flag.IntVar(&redisDB, "redis-db", 0, "redis默认数据库0-15")
flag.StringVar(&redisAddrDepressed, "host", "127.0.0.1:6379", "redis连接地址,必须指定端口(depressed,使用redis-host)")
flag.StringVar(&redisPasswordDepressed, "password", "", "redis连接密码(depressed,使用redis-password)")
flag.StringVar(&httpAddr, "http-addr", "127.0.0.1:60001", "HTTP监控服务监听地址+端口")
flag.StringVar(&pidFile, "pidfile", "", "pid文件路径,默认为空,不使用")
flag.IntVar(&concurrent, "concurrent", 5, "并发执行线程数")
flag.BoolVar(&taskMode, "task-mode", true, "是否启用任务模式,默认启用,关闭则不会执行消费")
flag.BoolVar(&colorfulTTY, "colorful-tty", false, "是否启用彩色模式的控制台输出")
flag.StringVar(&defaultChannel, "channel-default", "default", "默认channel名称,用于消息队列")
flag.StringVar(&logFilename, "log-file", "", "日志文件存储路径,默认为空,直接输出到标准输出")
flag.BoolVar(&debugMode, "debug", false, "日志输出级别,默认为false,如果为true,则输出debug日志")
flag.BoolVar(&daemonize, "daemonize", false, "守护进程模式,模式为false")
flag.StringVar(&backendStorage, "backend-storage", "", "后端存储方式,用于存储任务执行结果,默认不存储")
flag.IntVar(&backendKeepDays, "backend-keep-days", 0, "后端存储历史保留天数,0为永久保留")
flag.BoolVar(&autoUpdate, "update-auto", false, "是否启用自动更新")
flag.IntVar(&updateInterval, "update-check-interval", 5, "自动更新频率,单位秒")
flag.StringVar(&fetchUpdateURL, "update-check-url", "https://aicode.cc/open-api/coyotes/update/coyotes-%s-%s", "自动更新检查地址")
flag.Parse()
if bizName == "" {
log.Error("业务名称不能为空")
os.Exit(2)
}
// 如果是守护进程模式,则创建子进程,退出父进程
if daemonize && os.Getppid() != 1 {
binary, err := exec.LookPath(os.Args[0])
if err != nil {
fmt.Println("failed to lookup binary:", err)
os.Exit(2)
}
_, err = os.StartProcess(binary, os.Args, &os.ProcAttr{Dir: "", Env: nil, Files: []*os.File{os.Stdin, os.Stdout, os.Stderr}, Sys: nil})
if err != nil {
fmt.Println("failed to start process:", err)
os.Exit(2)
}
os.Exit(0)
}
runtime := config.InitRuntime(
bizName,
redisAddr,
redisPassword,
redisAddrDepressed,
redisPasswordDepressed,
pidFile,
concurrent,
redisDB,
httpAddr,
taskMode,
colorfulTTY,
defaultChannel,
logFilename,
debugMode,
backendStorage,
backendKeepDays,
)
if os.Getuid() == 0 {
fmt.Println(console.ColorfulText(
console.TextYellow,
"\n当前以root用户执行,使用root权限执行可能会造成严重的安全问题,建议使用非root用户执行\n",
))
}
overseerConfig := overseer.Config{
Program: mainProcess,
Address: runtime.Config.HTTP.ListenAddr,
Debug: runtime.Config.DebugMode,
}
// 是否启用自动更新
if autoUpdate {
overseerConfig.Fetcher = &fetcher.HTTP{
URL: fmt.Sprintf(fetchUpdateURL, sysRuntime.GOOS, sysRuntime.GOARCH),
Interval: time.Duration(updateInterval) * time.Second,
}
}
overseer.Run(overseerConfig)
}
func mainProcess(state overseer.State) {
runtime := config.GetRuntime()
runtime.BuildID = BuildID
// 初始化日志输出
// 指定日志文件时,使用日志文件输出,否则输出到标准输出
if runtime.Config.LogFilename != "" {
logFile, err := os.OpenFile(runtime.Config.LogFilename, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
fmt.Printf("open log file %s failed: %v\n", runtime.Config.LogFilename, err)
os.Exit(2)
}
defer logFile.Close()
log.InitLogger(logFile, debugMode, "coyotes#"+BuildID)
} else {
log.InitLogger(os.Stdout, debugMode, "coyotes#"+BuildID)
}
// 创建进程pid文件
if runtime.Config.PidFile != "" {
pid, err := pidfile.New(runtime.Config.PidFile)
if err != nil {
log.Error("failed to create pidfile: %v", err)
os.Exit(2)
}
defer pid.Remove()
}
if runtime.Config.ColorfulTTY {
fmt.Println(console.ColorfulText(console.TextGreen, config.WelcomeMessage()))
}
log.Debug("redis addr: %s/%d", runtime.Config.Redis.Addr, runtime.Config.Redis.DB)
log.Debug("process ID: %d", os.Getpid())
// 信号处理程序,接收退出信号,平滑退出进程
ctx, cancel := context.WithCancel(context.Background())
// signal.InitSignalReceiver(ctx, cancel)
go func() {
<-state.GracefulShutdown
cancel()
}()
// 初始化所有channel
scheduler.InitChannels()
// 初始化后端存储
backendStorage := runtime.Config.BackendStorage
if backendStorage != "" && strings.HasPrefix(backendStorage, "mysql:") {
dataSource := backendStorage[6:]
mysql.Register("mysql", dataSource)
mysql.InitTableForMySQL(dataSource)
// 自动清理过期的后端存储日志
if runtime.Config.BackendKeepDays > 0 {
go func() {
for range time.Tick(5 * time.Minute) {
beforeTime := time.Now().AddDate(0, 0, -runtime.Config.BackendKeepDays)
if driver := backend.Default(); driver != nil {
affectRows, err := driver.ClearExpired(beforeTime)
if err != nil {
log.Error("backend clear hisories failed: %v", err)
return
}
log.Debug("backend clear histories, affected %d rows", affectRows)
}
}
}()
}
}
var wg sync.WaitGroup
// 启动http server
wg.Add(1)
go func() {
defer wg.Done()
server.StartHTTPServer(state.Listener)
}()
// 启动待执行任务转移任务
wg.Add(1)
go func() {
defer wg.Done()
broker.TransferPrepareTask(ctx)
}()
// 启动延迟任务
wg.Add(1)
go func() {
defer wg.Done()
broker.StartDelayTaskLifeCycle(ctx)
}()
// 启动任务调度器
scheduler.Schedule(ctx)
wg.Wait()
log.Debug("all stoped.")
}