-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
271 lines (240 loc) · 7.27 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
package main
import (
"1PanelHelper/notify"
"encoding/json"
"fmt"
_ "github.com/joho/godotenv/autoload"
"log"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
)
// RecoverGo 包装一个函数,使其在 goroutine 中安全运行
func RecoverGo(f func()) {
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("从panic中恢复: %v,推送错误消息", r)
recordChannel <- recordInChannelStruct{
recordItem: CronjobRecordItem{
ID: 114514,
StartTime: time.Now().Format(time.RFC3339),
Message: fmt.Sprintf("%v", r),
},
cronjobItem: CronjobItem{
Name: "1PanelHelper自身错误",
},
}
}
}()
f()
}()
}
func getAllCronjobItems(p *PanelApiStruct) ([]CronjobItem, error) {
log.Println("开始获取定时任务列表")
list, err := p.GetCronjobList(SearchRequest{
Page: 1,
PageSize: 10,
OrderBy: "created_at",
Order: "null",
})
if err != nil {
return nil, fmt.Errorf("获取定时任务列表失败: %w", err)
}
log.Printf("获取到部分定时任务列表,共 %d 条记录", len(list.Data.Items))
fullList, err := p.GetCronjobList(SearchRequest{
Page: 1,
PageSize: list.Data.Total,
OrderBy: "created_at",
Order: "null",
})
if err != nil {
return nil, fmt.Errorf("获取定时任务列表失败: %w", err)
}
log.Printf("获取到完整定时任务列表,共 %d 条记录", fullList.Data.Total)
return fullList.Data.Items, nil
}
// notifyWorker 用于推送通知
func notifyWorker(n notify.Notify) {
for item := range recordChannel {
log.Printf("开始向通知通道推送任务 %d 的失败提醒", item.recordItem.ID)
title := fmt.Sprintf("1PanelHelper: 任务 「%s」出错了!", item.cronjobItem.Name)
content := fmt.Sprintf("任务名: %s\n任务id: %s\n任务触发时间: %s\n错误信息: %s\n", item.cronjobItem.Name, strconv.Itoa(item.recordItem.ID), item.recordItem.StartTime, item.recordItem.Message)
err := n.PushNotify(content, title)
if err != nil {
notifyErrorChannel <- fmt.Errorf("在推送消息时出错: %w", err)
}
}
}
type recordInChannelStruct struct {
recordItem CronjobRecordItem
cronjobItem CronjobItem
}
var (
jsonFileMutex sync.Mutex
recordChannel = make(chan recordInChannelStruct, 200)
notifyErrorChannel = make(chan error, 200)
)
// ProcessNewCronjobRecords 获取最新的记录并处理准备推送
func ProcessNewCronjobRecords(p *PanelApiStruct, dataFilePath string, cronjobId int, cronjobItem CronjobItem) error {
log.Printf("开始处理任务 %d 的记录", cronjobId)
records, err := p.GetCronjobRecords(CronjobRecordRequest{
Page: 1,
PageSize: 50,
CronjobID: cronjobId,
Days: 1,
})
if err != nil {
return fmt.Errorf("从api获取任务记录失败: %w", err)
}
log.Printf("获取到任务 %d 的记录,共 %d 条", cronjobId, len(records.Data.Items))
jsonFileMutex.Lock()
defer jsonFileMutex.Unlock()
var jsonData map[int]int
data, err := os.ReadFile(dataFilePath)
if err != nil {
return fmt.Errorf("读取json文件时失败: %w", err)
}
err = json.Unmarshal(data, &jsonData)
if err != nil {
return fmt.Errorf("反序列化json文件失败: %w", err)
}
newestIdInJsonFile := jsonData[cronjobId]
var newRecordList []CronjobRecordItem
for _, record := range records.Data.Items {
if record.ID != newestIdInJsonFile {
newRecordList = append(newRecordList, record)
continue
}
break
}
if len(records.Data.Items) == 0 {
log.Printf("任务 %d 一天内还没有运行过,跳过", cronjobId)
return nil
}
newestIdInJsonFile = records.Data.Items[0].ID
// 更新JSON文件
jsonData[cronjobId] = newestIdInJsonFile
updatedData, err := json.Marshal(jsonData)
if err != nil {
return fmt.Errorf("序列化更新后的数据失败: %w", err)
}
err = os.WriteFile(dataFilePath, updatedData, 0644)
if err != nil {
return fmt.Errorf("写入更新后的数据到文件失败: %w", err)
}
log.Printf("更新任务 %d 的记录到JSON文件", cronjobId)
for _, item := range newRecordList {
if item.Status != "Success" && item.Status != "Waiting" {
log.Printf("任务 %d 的运行状态非 Success/Waiting,有问题!", item.ID)
recordChannel <- recordInChannelStruct{recordItem: item, cronjobItem: cronjobItem}
}
}
return nil
}
func main() {
p := PanelApiStruct{
BaseUrl: os.Getenv("PANEL_BASE_URL"),
}
err := p.LoginWithout2FA(os.Getenv("PANEL_USERNAME"), os.Getenv("PANEL_PASSWORD"), os.Getenv("PANEL_ENTRANCE_CODE"))
if err != nil {
log.Fatalf("登录失败: %v", err)
}
log.Println("登录成功")
dataFilePath := "cronjob_records.json"
checkInterval := 1 * time.Hour
// 初始化通知系统
n := notify.GotifyStruct{
ServerRootUrl: os.Getenv("GOTIFY_BASE_URL"),
Token: os.Getenv("GOTIFY_APP_TOKEN"),
}
n.Priority, _ = strconv.Atoi(os.Getenv("GOTIFY_PRIORITY"))
log.Println("通知系统初始化成功")
// 启动notify worker
RecoverGo(func() {
notifyWorker(n)
})
log.Println("notify worker启动成功")
// 启动错误日志监控
RecoverGo(func() {
monitorNotifyErrors()
})
log.Println("错误日志监控启动成功")
log.Println("程序初始化完成, 发送条通知报平安~")
err = n.PushNotify("1PanelHelper成功启动!", "1PanelHelper")
if err != nil {
log.Printf("再发送通知时出错: %s", err)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
_ = <-sigs
fmt.Println("接收到退出信号,程序退出!")
_ = n.PushNotify("接收到退出信号, 正常退出", "1PanelHelper: 退出")
os.Exit(0)
}()
go func() {
for {
time.Sleep(time.Hour * 24)
err := n.PushNotify("已经一天了! 报个平安!", "1PanelHelper: 报平安")
if err != nil {
log.Printf("再发送通知时出错: %s", err)
}
}
}()
for {
cronjobItems, err := getAllCronjobItems(&p)
if err != nil {
log.Printf("获取定时任务列表失败: %v", err)
time.Sleep(checkInterval)
continue
}
log.Printf("获取到定时任务列表,共 %d 条记录", len(cronjobItems))
var wg sync.WaitGroup
for _, item := range cronjobItems {
wg.Add(1)
RecoverGo(func() {
defer wg.Done()
err := ProcessNewCronjobRecords(&p, dataFilePath, item.ID, item)
if err != nil {
log.Printf("更新任务 %d 的记录失败: %v", item.ID, err)
}
})
}
wg.Wait()
log.Println("所有任务检查完成,等待下一次检查")
time.Sleep(checkInterval)
}
}
// monitorNotifyErrors 监控通知错误并记录日志
func monitorNotifyErrors() {
for err := range notifyErrorChannel {
log.Printf("通知错误: %v", err)
}
}
// initializeJSONFile 确保JSON文件存在并包含有效的数据
func initializeJSONFile(filePath string) error {
log.Printf("检查JSON文件 %s 是否存在", filePath)
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
log.Printf("JSON文件 %s 不存在,创建新文件", filePath)
emptyData := map[int]int{}
jsonData, err := json.Marshal(emptyData)
if err != nil {
return fmt.Errorf("创建空JSON数据失败: %w", err)
}
return os.WriteFile(filePath, jsonData, os.ModePerm)
}
return err
}
func init() {
// 确保JSON文件存在
err := initializeJSONFile("cronjob_records.json")
if err != nil {
log.Fatalf("初始化JSON文件失败: %v", err)
}
log.Println("JSON文件初始化成功")
}