From 3d5e292d26a1b8915c038e4a3b11a1666a79a0a6 Mon Sep 17 00:00:00 2001 From: Akkia Date: Tue, 12 Apr 2022 00:01:38 +0800 Subject: [PATCH 1/5] =?UTF-8?q?optimize:=20=E7=A7=BB=E9=99=A4=E4=B8=A4?= =?UTF-8?q?=E5=A4=84=E5=86=97=E4=BD=99=E7=9A=84=E4=BB=A3=E7=A0=81=20(?= =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96=E5=90=8E=E6=9C=AA=E4=BD=BF=E7=94=A8?= =?UTF-8?q?=EF=BD=9C=E9=87=8D=E5=A4=8D=E5=88=9D=E5=A7=8B=E5=8C=96)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/dashboard/main.go | 5 ----- service/singleton/servicesentinel.go | 1 - 2 files changed, 6 deletions(-) diff --git a/cmd/dashboard/main.go b/cmd/dashboard/main.go index fce865478f..3bfeb2a1e6 100644 --- a/cmd/dashboard/main.go +++ b/cmd/dashboard/main.go @@ -171,11 +171,6 @@ func loadCrons() { for i := 0; i < len(crons); i++ { cr := crons[i] - crIgnoreMap := make(map[uint64]bool) - for j := 0; j < len(cr.Servers); j++ { - crIgnoreMap[cr.Servers[j]] = true - } - // 注册计划任务 cr.CronJobID, err = singleton.Cron.AddFunc(cr.Scheduler, singleton.CronTrigger(cr)) if err == nil { diff --git a/service/singleton/servicesentinel.go b/service/singleton/servicesentinel.go index 50aa78f4ec..cb897d5d07 100644 --- a/service/singleton/servicesentinel.go +++ b/service/singleton/servicesentinel.go @@ -149,7 +149,6 @@ func (ss *ServiceSentinel) loadMonitorHistory() { var err error ss.monitorsLock.Lock() defer ss.monitorsLock.Unlock() - ss.monitors = make(map[uint64]*model.Monitor) for i := 0; i < len(monitors); i++ { task := *monitors[i] // 通过cron定时将服务监控任务传递给任务调度管道 From c65028188ce79807c0250d24c065ccfc2e56460e Mon Sep 17 00:00:00 2001 From: Akkia Date: Tue, 12 Apr 2022 13:16:33 +0800 Subject: [PATCH 2/5] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E7=BB=84=E7=BB=87=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/dashboard/main.go | 176 ++---------------------------- model/config.go | 3 + service/singleton/crontask.go | 81 ++++++++++++++ service/singleton/notification.go | 2 +- service/singleton/server.go | 58 ++++++++++ service/singleton/singleton.go | 103 ++++++----------- service/singleton/toolfunc.go | 95 ++++++++++++++++ 7 files changed, 280 insertions(+), 238 deletions(-) create mode 100644 service/singleton/crontask.go create mode 100644 service/singleton/server.go create mode 100644 service/singleton/toolfunc.go diff --git a/cmd/dashboard/main.go b/cmd/dashboard/main.go index 3bfeb2a1e6..540c2cf89b 100644 --- a/cmd/dashboard/main.go +++ b/cmd/dashboard/main.go @@ -1,196 +1,40 @@ package main import ( - "bytes" "context" - "fmt" - "log" - "time" - - "github.com/ory/graceful" - "github.com/patrickmn/go-cache" - "github.com/robfig/cron/v3" - "gorm.io/driver/sqlite" - "gorm.io/gorm" - "github.com/naiba/nezha/cmd/dashboard/controller" "github.com/naiba/nezha/cmd/dashboard/rpc" "github.com/naiba/nezha/model" "github.com/naiba/nezha/service/singleton" + "github.com/ory/graceful" + "log" ) func init() { - shanghai, err := time.LoadLocation("Asia/Shanghai") - if err != nil { - panic(err) - } - // 初始化 dao 包 singleton.Init() - singleton.Conf = &model.Config{} - singleton.Cron = cron.New(cron.WithSeconds(), cron.WithLocation(shanghai)) - singleton.Crons = make(map[uint64]*model.Cron) - singleton.ServerList = make(map[uint64]*model.Server) - singleton.SecretToID = make(map[string]uint64) - - err = singleton.Conf.Read("data/config.yaml") - if err != nil { - panic(err) - } - singleton.DB, err = gorm.Open(sqlite.Open("data/sqlite.db"), &gorm.Config{ - CreateBatchSize: 200, - }) - if err != nil { - panic(err) - } - if singleton.Conf.Debug { - singleton.DB = singleton.DB.Debug() - } - if singleton.Conf.GRPCPort == 0 { - singleton.Conf.GRPCPort = 5555 - } - singleton.Cache = cache.New(5*time.Minute, 10*time.Minute) - + singleton.InitConfigFromPath("data/config.yaml") + singleton.InitDBFromPath("data/sqlite.db") initSystem() } func initSystem() { - singleton.DB.AutoMigrate(model.Server{}, model.User{}, - model.Notification{}, model.AlertRule{}, model.Monitor{}, - model.MonitorHistory{}, model.Cron{}, model.Transfer{}) - - singleton.LoadNotifications() - loadServers() //加载服务器列表 - loadCrons() //加载计划任务 + // 启动 singleton 包下的所有服务 + singleton.LoadSingleton() // 每天的3:30 对 监控记录 和 流量记录 进行清理 - _, err := singleton.Cron.AddFunc("0 30 3 * * *", cleanMonitorHistory) - if err != nil { + if _, err := singleton.Cron.AddFunc("0 30 3 * * *", singleton.CleanMonitorHistory); err != nil { panic(err) } // 每小时对流量记录进行打点 - _, err = singleton.Cron.AddFunc("0 0 * * * *", recordTransferHourlyUsage) - if err != nil { + if _, err := singleton.Cron.AddFunc("0 0 * * * *", singleton.RecordTransferHourlyUsage); err != nil { panic(err) } } -// recordTransferHourlyUsage 对流量记录进行打点 -func recordTransferHourlyUsage() { - singleton.ServerLock.Lock() - defer singleton.ServerLock.Unlock() - now := time.Now() - nowTrimSeconds := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) - var txs []model.Transfer - for id, server := range singleton.ServerList { - tx := model.Transfer{ - ServerID: id, - In: server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn), - Out: server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut), - } - if tx.In == 0 && tx.Out == 0 { - continue - } - server.PrevHourlyTransferIn = int64(server.State.NetInTransfer) - server.PrevHourlyTransferOut = int64(server.State.NetOutTransfer) - tx.CreatedAt = nowTrimSeconds - txs = append(txs, tx) - } - if len(txs) == 0 { - return - } - log.Println("NEZHA>> Cron 流量统计入库", len(txs), singleton.DB.Create(txs).Error) -} - -// cleanMonitorHistory 清理无效或过时的 监控记录 和 流量记录 -func cleanMonitorHistory() { - // 清理已被删除的服务器的监控记录与流量记录 - singleton.DB.Unscoped().Delete(&model.MonitorHistory{}, "created_at < ? OR monitor_id NOT IN (SELECT `id` FROM monitors)", time.Now().AddDate(0, 0, -30)) - singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (SELECT `id` FROM servers)") - // 计算可清理流量记录的时长 - var allServerKeep time.Time - specialServerKeep := make(map[uint64]time.Time) - var specialServerIDs []uint64 - var alerts []model.AlertRule - singleton.DB.Find(&alerts) - for i := 0; i < len(alerts); i++ { - for j := 0; j < len(alerts[i].Rules); j++ { - // 是不是流量记录规则 - if !alerts[i].Rules[j].IsTransferDurationRule() { - continue - } - dataCouldRemoveBefore := alerts[i].Rules[j].GetTransferDurationStart() - // 判断规则影响的机器范围 - if alerts[i].Rules[j].Cover == model.RuleCoverAll { - // 更新全局可以清理的数据点 - if allServerKeep.IsZero() || allServerKeep.After(dataCouldRemoveBefore) { - allServerKeep = dataCouldRemoveBefore - } - } else { - // 更新特定机器可以清理数据点 - for id := range alerts[i].Rules[j].Ignore { - if specialServerKeep[id].IsZero() || specialServerKeep[id].After(dataCouldRemoveBefore) { - specialServerKeep[id] = dataCouldRemoveBefore - specialServerIDs = append(specialServerIDs, id) - } - } - } - } - } - for id, couldRemove := range specialServerKeep { - singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id = ? AND created_at < ?", id, couldRemove) - } - if allServerKeep.IsZero() { - singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?)", specialServerIDs) - } else { - singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?) AND created_at < ?", specialServerIDs, allServerKeep) - } -} - -//loadServers 加载服务器列表并根据ID排序 -func loadServers() { - var servers []model.Server - singleton.DB.Find(&servers) - for _, s := range servers { - innerS := s - innerS.Host = &model.Host{} - innerS.State = &model.HostState{} - singleton.ServerList[innerS.ID] = &innerS - singleton.SecretToID[innerS.Secret] = innerS.ID - } - singleton.ReSortServer() -} - -// loadCrons 加载计划任务 -func loadCrons() { - var crons []model.Cron - singleton.DB.Find(&crons) - var err error - errMsg := new(bytes.Buffer) - for i := 0; i < len(crons); i++ { - cr := crons[i] - - // 注册计划任务 - cr.CronJobID, err = singleton.Cron.AddFunc(cr.Scheduler, singleton.CronTrigger(cr)) - if err == nil { - singleton.Crons[cr.ID] = &cr - } else { - if errMsg.Len() == 0 { - errMsg.WriteString("调度失败的计划任务:[") - } - errMsg.WriteString(fmt.Sprintf("%d,", cr.ID)) - } - } - if errMsg.Len() > 0 { - msg := errMsg.String() - singleton.SendNotification(msg[:len(msg)-1]+"] 这些任务将无法正常执行,请进入后点重新修改保存。", false) - } - singleton.Cron.Start() -} - func main() { - cleanMonitorHistory() + singleton.CleanMonitorHistory() go rpc.ServeRPC(singleton.Conf.GRPCPort) serviceSentinelDispatchBus := make(chan model.Monitor) // 用于传递服务监控任务信息的channel go rpc.DispatchTask(serviceSentinelDispatchBus) @@ -202,7 +46,7 @@ func main() { return srv.ListenAndServe() }, func(c context.Context) error { log.Println("NEZHA>> Graceful::START") - recordTransferHourlyUsage() + singleton.RecordTransferHourlyUsage() log.Println("NEZHA>> Graceful::END") srv.Shutdown(c) return nil diff --git a/model/config.go b/model/config.go index 92e04ba80d..dd7c65567f 100644 --- a/model/config.go +++ b/model/config.go @@ -98,6 +98,9 @@ func (c *Config) Read(path string) error { if c.Site.Theme == "" { c.Site.Theme = "default" } + if c.GRPCPort == 0 { + c.GRPCPort = 5555 + } c.updateIgnoredIPNotificationID() return nil diff --git a/service/singleton/crontask.go b/service/singleton/crontask.go new file mode 100644 index 0000000000..b4aafe1b34 --- /dev/null +++ b/service/singleton/crontask.go @@ -0,0 +1,81 @@ +package singleton + +import ( + "bytes" + "fmt" + "github.com/naiba/nezha/model" + pb "github.com/naiba/nezha/proto" + "github.com/robfig/cron/v3" + "sync" +) + +var ( + Cron *cron.Cron + Crons map[uint64]*model.Cron + CronLock sync.RWMutex +) + +func InitCronTask() { + Cron = cron.New(cron.WithSeconds(), cron.WithLocation(Loc)) + Crons = make(map[uint64]*model.Cron) +} + +// LoadCronTasks 加载计划任务 +func LoadCronTasks() { + InitCronTask() + var crons []model.Cron + DB.Find(&crons) + var err error + errMsg := new(bytes.Buffer) + for i := 0; i < len(crons); i++ { + cr := crons[i] + + // 注册计划任务 + cr.CronJobID, err = Cron.AddFunc(cr.Scheduler, CronTrigger(cr)) + if err == nil { + Crons[cr.ID] = &cr + } else { + if errMsg.Len() == 0 { + errMsg.WriteString("调度失败的计划任务:[") + } + errMsg.WriteString(fmt.Sprintf("%d,", cr.ID)) + } + } + if errMsg.Len() > 0 { + msg := errMsg.String() + SendNotification(msg[:len(msg)-1]+"] 这些任务将无法正常执行,请进入后点重新修改保存。", false) + } + Cron.Start() +} + +func ManualTrigger(c model.Cron) { + CronTrigger(c)() +} + +func CronTrigger(cr model.Cron) func() { + crIgnoreMap := make(map[uint64]bool) + for j := 0; j < len(cr.Servers); j++ { + crIgnoreMap[cr.Servers[j]] = true + } + return func() { + ServerLock.RLock() + defer ServerLock.RUnlock() + for _, s := range ServerList { + if cr.Cover == model.CronCoverAll && crIgnoreMap[s.ID] { + continue + } + if cr.Cover == model.CronCoverIgnoreAll && !crIgnoreMap[s.ID] { + continue + } + if s.TaskStream != nil { + s.TaskStream.Send(&pb.Task{ + Id: cr.ID, + Data: cr.Command, + Type: model.TaskTypeCommand, + }) + } else { + SendNotification(fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), false) + } + } + } +} diff --git a/service/singleton/notification.go b/service/singleton/notification.go index a7783d4d61..476403f417 100644 --- a/service/singleton/notification.go +++ b/service/singleton/notification.go @@ -16,7 +16,7 @@ const firstNotificationDelay = time.Minute * 15 var notifications []model.Notification var notificationsLock sync.RWMutex -// LoadNotifications 加载通知方式到 singleton.notifications 变量 +// LoadNotifications 从 DB 加载通知方式到 singleton.notifications 变量 func LoadNotifications() { notificationsLock.Lock() if err := DB.Find(¬ifications).Error; err != nil { diff --git a/service/singleton/server.go b/service/singleton/server.go new file mode 100644 index 0000000000..10c228ecfe --- /dev/null +++ b/service/singleton/server.go @@ -0,0 +1,58 @@ +package singleton + +import ( + "github.com/naiba/nezha/model" + "sort" + "sync" +) + +var ( + ServerList map[uint64]*model.Server // [ServerID] -> model.Server + SecretToID map[string]uint64 // [ServerSecret] -> ServerID + ServerLock sync.RWMutex + + SortedServerList []*model.Server // 用于存储服务器列表的 slice,按照服务器 ID 排序 + SortedServerLock sync.RWMutex +) + +// InitServer 初始化 ServerID <-> Secret 的映射 +func InitServer() { + ServerList = make(map[uint64]*model.Server) + SecretToID = make(map[string]uint64) +} + +//LoadServers 加载服务器列表并根据ID排序 +func LoadServers() { + InitServer() + var servers []model.Server + DB.Find(&servers) + for _, s := range servers { + innerS := s + innerS.Host = &model.Host{} + innerS.State = &model.HostState{} + ServerList[innerS.ID] = &innerS + SecretToID[innerS.Secret] = innerS.ID + } + ReSortServer() +} + +// ReSortServer 根据服务器ID 对服务器列表进行排序(ID越大越靠前) +func ReSortServer() { + ServerLock.RLock() + defer ServerLock.RUnlock() + SortedServerLock.Lock() + defer SortedServerLock.Unlock() + + SortedServerList = []*model.Server{} + for _, s := range ServerList { + SortedServerList = append(SortedServerList, s) + } + + // 按照服务器 ID 排序的具体实现(ID越大越靠前) + sort.SliceStable(SortedServerList, func(i, j int) bool { + if SortedServerList[i].DisplayIndex == SortedServerList[j].DisplayIndex { + return SortedServerList[i].ID < SortedServerList[j].ID + } + return SortedServerList[i].DisplayIndex > SortedServerList[j].DisplayIndex + }) +} diff --git a/service/singleton/singleton.go b/service/singleton/singleton.go index 531ea5a544..2de0947cda 100644 --- a/service/singleton/singleton.go +++ b/service/singleton/singleton.go @@ -1,18 +1,13 @@ package singleton import ( - "fmt" - "sort" - "sync" + "gorm.io/driver/sqlite" "time" "github.com/patrickmn/go-cache" - "github.com/robfig/cron/v3" "gorm.io/gorm" "github.com/naiba/nezha/model" - "github.com/naiba/nezha/pkg/utils" - pb "github.com/naiba/nezha/proto" ) var Version = "v0.12.18" // !!记得修改 README 中的 badge 版本!! @@ -22,86 +17,52 @@ var ( Cache *cache.Cache DB *gorm.DB Loc *time.Location - - ServerList map[uint64]*model.Server // [ServerID] -> model.Server - SecretToID map[string]uint64 // [ServerSecret] -> ServerID - ServerLock sync.RWMutex - - SortedServerList []*model.Server // 用于存储服务器列表的 slice,按照服务器 ID 排序 - SortedServerLock sync.RWMutex ) -// Init 初始化时区为上海时区 +// Init 初始化singleton func Init() { + // 初始化时区至上海 UTF+8 var err error Loc, err = time.LoadLocation("Asia/Shanghai") if err != nil { panic(err) } -} - -// ReSortServer 根据服务器ID 对服务器列表进行排序(ID越大越靠前) -func ReSortServer() { - ServerLock.RLock() - defer ServerLock.RUnlock() - SortedServerLock.Lock() - defer SortedServerLock.Unlock() - - SortedServerList = []*model.Server{} - for _, s := range ServerList { - SortedServerList = append(SortedServerList, s) - } - // 按照服务器 ID 排序的具体实现(ID越大越靠前) - sort.SliceStable(SortedServerList, func(i, j int) bool { - if SortedServerList[i].DisplayIndex == SortedServerList[j].DisplayIndex { - return SortedServerList[i].ID < SortedServerList[j].ID - } - return SortedServerList[i].DisplayIndex > SortedServerList[j].DisplayIndex - }) + Conf = &model.Config{} + Cache = cache.New(5*time.Minute, 10*time.Minute) } -// =============== Cron Mixin =============== - -var CronLock sync.RWMutex -var Crons map[uint64]*model.Cron -var Cron *cron.Cron - -func ManualTrigger(c model.Cron) { - CronTrigger(c)() +// LoadSingleton 加载子服务并执行 +func LoadSingleton() { + LoadNotifications() // 加载通知服务 + LoadServers() // 加载服务器列表 + LoadCronTasks() // 加载定时任务 } -func CronTrigger(cr model.Cron) func() { - crIgnoreMap := make(map[uint64]bool) - for j := 0; j < len(cr.Servers); j++ { - crIgnoreMap[cr.Servers[j]] = true - } - return func() { - ServerLock.RLock() - defer ServerLock.RUnlock() - for _, s := range ServerList { - if cr.Cover == model.CronCoverAll && crIgnoreMap[s.ID] { - continue - } - if cr.Cover == model.CronCoverIgnoreAll && !crIgnoreMap[s.ID] { - continue - } - if s.TaskStream != nil { - s.TaskStream.Send(&pb.Task{ - Id: cr.ID, - Data: cr.Command, - Type: model.TaskTypeCommand, - }) - } else { - SendNotification(fmt.Sprintf("[任务失败] %s,服务器 %s 离线,无法执行。", cr.Name, s.Name), false) - } - } +// InitConfigFromPath 从给出的文件路径中加载配置 +func InitConfigFromPath(path string) { + err := Conf.Read(path) + if err != nil { + panic(err) } } -func IPDesensitize(ip string) string { - if Conf.EnablePlainIPInNotification { - return ip +// InitDBFromPath 从给出的文件路径中加载数据库 +func InitDBFromPath(path string) { + var err error + DB, err = gorm.Open(sqlite.Open(path), &gorm.Config{ + CreateBatchSize: 200, + }) + if err != nil { + panic(err) + } + if Conf.Debug { + DB = DB.Debug() + } + err = DB.AutoMigrate(model.Server{}, model.User{}, + model.Notification{}, model.AlertRule{}, model.Monitor{}, + model.MonitorHistory{}, model.Cron{}, model.Transfer{}) + if err != nil { + panic(err) } - return utils.IPDesensitize(ip) } diff --git a/service/singleton/toolfunc.go b/service/singleton/toolfunc.go new file mode 100644 index 0000000000..e6a334027d --- /dev/null +++ b/service/singleton/toolfunc.go @@ -0,0 +1,95 @@ +package singleton + +import ( + "github.com/naiba/nezha/model" + "github.com/naiba/nezha/pkg/utils" + "log" + "time" +) + +/* + 该文件保存了一些工具函数 + RecordTransferHourlyUsage 对流量记录进行打点 + CleanMonitorHistory 清理无效或过时的 监控记录 和 流量记录 + IPDesensitize 根据用户设置选择是否对IP进行打码处理 返回处理后的IP(关闭打码则返回原IP) +*/ + +// RecordTransferHourlyUsage 对流量记录进行打点 +func RecordTransferHourlyUsage() { + ServerLock.Lock() + defer ServerLock.Unlock() + now := time.Now() + nowTrimSeconds := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) + var txs []model.Transfer + for id, server := range ServerList { + tx := model.Transfer{ + ServerID: id, + In: server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn), + Out: server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut), + } + if tx.In == 0 && tx.Out == 0 { + continue + } + server.PrevHourlyTransferIn = int64(server.State.NetInTransfer) + server.PrevHourlyTransferOut = int64(server.State.NetOutTransfer) + tx.CreatedAt = nowTrimSeconds + txs = append(txs, tx) + } + if len(txs) == 0 { + return + } + log.Println("NEZHA>> Cron 流量统计入库", len(txs), DB.Create(txs).Error) +} + +// CleanMonitorHistory 清理无效或过时的 监控记录 和 流量记录 +func CleanMonitorHistory() { + // 清理已被删除的服务器的监控记录与流量记录 + DB.Unscoped().Delete(&model.MonitorHistory{}, "created_at < ? OR monitor_id NOT IN (SELECT `id` FROM monitors)", time.Now().AddDate(0, 0, -30)) + DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (SELECT `id` FROM servers)") + // 计算可清理流量记录的时长 + var allServerKeep time.Time + specialServerKeep := make(map[uint64]time.Time) + var specialServerIDs []uint64 + var alerts []model.AlertRule + DB.Find(&alerts) + for i := 0; i < len(alerts); i++ { + for j := 0; j < len(alerts[i].Rules); j++ { + // 是不是流量记录规则 + if !alerts[i].Rules[j].IsTransferDurationRule() { + continue + } + dataCouldRemoveBefore := alerts[i].Rules[j].GetTransferDurationStart() + // 判断规则影响的机器范围 + if alerts[i].Rules[j].Cover == model.RuleCoverAll { + // 更新全局可以清理的数据点 + if allServerKeep.IsZero() || allServerKeep.After(dataCouldRemoveBefore) { + allServerKeep = dataCouldRemoveBefore + } + } else { + // 更新特定机器可以清理数据点 + for id := range alerts[i].Rules[j].Ignore { + if specialServerKeep[id].IsZero() || specialServerKeep[id].After(dataCouldRemoveBefore) { + specialServerKeep[id] = dataCouldRemoveBefore + specialServerIDs = append(specialServerIDs, id) + } + } + } + } + } + for id, couldRemove := range specialServerKeep { + DB.Unscoped().Delete(&model.Transfer{}, "server_id = ? AND created_at < ?", id, couldRemove) + } + if allServerKeep.IsZero() { + DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?)", specialServerIDs) + } else { + DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?) AND created_at < ?", specialServerIDs, allServerKeep) + } +} + +// IPDesensitize 根据设置选择是否对IP进行打码处理 返回处理后的IP(关闭打码则返回原IP) +func IPDesensitize(ip string) string { + if Conf.EnablePlainIPInNotification { + return ip + } + return utils.IPDesensitize(ip) +} From e1ca2d5a1cd44f5303a0004d97628eb93fcb66bb Mon Sep 17 00:00:00 2001 From: Akkia <68485070+AkkiaS7@users.noreply.github.com> Date: Tue, 12 Apr 2022 17:43:57 +0800 Subject: [PATCH 3/5] =?UTF-8?q?optimize:=20=E4=BC=98=E5=8C=96=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E6=9B=B4=E6=96=B0=E7=9B=B8=E5=85=B3=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/agent/main.go | 42 +++++++++++++++++++----------------- cmd/agent/monitor/monitor.go | 2 ++ cmd/agent/monitor/myip.go | 1 + model/config.go | 1 + 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/cmd/agent/main.go b/cmd/agent/main.go index ef05e789d8..59d8721a40 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -38,16 +38,16 @@ import ( ) type AgentCliParam struct { - SkipConnectionCount bool - SkipProcsCount bool - DisableAutoUpdate bool - DisableForceUpdate bool - DisableCommandExecute bool - Debug bool - Server string - ClientSecret string - ReportDelay int - TLS bool + SkipConnectionCount bool // 跳过连接数检查 + SkipProcsCount bool // 跳过进程数量检查 + DisableAutoUpdate bool // 关闭自动更新 + DisableForceUpdate bool // 关闭强制更新 + DisableCommandExecute bool // 关闭命令执行 + Debug bool // debug模式 + Server string // 服务器地址 + ClientSecret string // 客户端密钥 + ReportDelay int // 报告间隔 + TLS bool // 是否使用TLS加密传输至服务端 } var ( @@ -60,7 +60,6 @@ var ( var ( agentCliParam AgentCliParam agentConfig model.AgentConfig - updateCh = make(chan struct{}) // Agent 自动更新间隔 httpClient = &http.Client{ CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse @@ -86,6 +85,7 @@ func init() { } func main() { + // windows环境处理 if runtime.GOOS == "windows" { hostArch, err := host.KernelArch() if err != nil { @@ -108,6 +108,7 @@ func main() { // 来自于 GoReleaser 的版本号 monitor.Version = version + // 初始化运行参数 var isEditAgentConfig bool flag.BoolVarP(&agentCliParam.Debug, "debug", "d", false, "开启调试信息") flag.BoolVarP(&isEditAgentConfig, "edit-agent-config", "", false, "修改要监控的网卡/分区白名单") @@ -145,6 +146,7 @@ func run() { ClientSecret: agentCliParam.ClientSecret, } + // 下载远程命令执行需要的终端 if !agentCliParam.DisableCommandExecute { go pty.DownloadDependency() } @@ -153,19 +155,14 @@ func run() { // 更新IP信息 go monitor.UpdateIP() + // 定时检查更新 if _, err := semver.Parse(version); err == nil && !agentCliParam.DisableAutoUpdate { + doSelfUpdate(true) go func() { - for range updateCh { - go func() { - defer func() { - time.Sleep(time.Minute * 20) - updateCh <- struct{}{} - }() - doSelfUpdate(true) - }() + for range time.Tick(20 * time.Minute) { + doSelfUpdate(true) } }() - updateCh <- struct{}{} } var err error @@ -267,6 +264,7 @@ func doTask(task *pb.Task) { client.ReportTask(context.Background(), &result) } +// reportState 向server上报状态信息 func reportState() { var lastReportHostInfo time.Time var err error @@ -282,6 +280,7 @@ func reportState() { println("reportState error", err) time.Sleep(delayWhenError) } + // 每10分钟重新获取一次硬件信息 if lastReportHostInfo.Before(time.Now().Add(-10 * time.Minute)) { lastReportHostInfo = time.Now() client.ReportSystemInfo(context.Background(), monitor.GetHost(&agentConfig).PB()) @@ -291,6 +290,7 @@ func reportState() { } } +// doSelfUpdate 执行更新检查 如果更新成功则会结束进程 func doSelfUpdate(useLocalVersion bool) { v := semver.MustParse("0.1.0") if useLocalVersion { @@ -303,6 +303,7 @@ func doSelfUpdate(useLocalVersion bool) { return } if !latest.Version.Equals(v) { + println("已经更新至:", latest.Version, " 正在结束进程") os.Exit(1) } } @@ -500,6 +501,7 @@ func handleTerminalTask(task *pb.Task) { } } +// 修改Agent要监控的网卡与硬盘分区 func editAgentConfig() { nc, err := psnet.IOCounters(true) if err != nil { diff --git a/cmd/agent/monitor/monitor.go b/cmd/agent/monitor/monitor.go index 6b20f3991c..0977e374e5 100644 --- a/cmd/agent/monitor/monitor.go +++ b/cmd/agent/monitor/monitor.go @@ -37,6 +37,7 @@ var ( cachedBootTime time.Time ) +// GetHost 获取主机硬件信息 func GetHost(agentConfig *model.AgentConfig) *model.Host { hi, _ := host.Info() var cpuType string @@ -155,6 +156,7 @@ func GetState(agentConfig *model.AgentConfig, skipConnectionCount bool, skipProc } } +// TrackNetworkSpeed NIC监控,统计流量与速度 func TrackNetworkSpeed(agentConfig *model.AgentConfig) { var innerNetInTransfer, innerNetOutTransfer uint64 nc, err := net.IOCounters(true) diff --git a/cmd/agent/monitor/myip.go b/cmd/agent/monitor/myip.go index 4442da80c0..0e8d704252 100644 --- a/cmd/agent/monitor/myip.go +++ b/cmd/agent/monitor/myip.go @@ -51,6 +51,7 @@ var ( httpClientV6 = utils.NewSingleStackHTTPClient(time.Second*20, time.Second*5, time.Second*10, true) ) +// UpdateIP 每30分钟汇报一次IP地址信息 func UpdateIP() { for { ipv4 := fetchGeoIP(geoIPApiList, false) diff --git a/model/config.go b/model/config.go index dd7c65567f..947aa769e3 100644 --- a/model/config.go +++ b/model/config.go @@ -26,6 +26,7 @@ type AgentConfig struct { v *viper.Viper } +// Read 从给点的文件目录加载配置文件 func (c *AgentConfig) Read(path string) error { c.v = viper.New() c.v.SetConfigFile(path) From 50a18daf05b2ea5a8822f4f675b7fcf34db4d1e1 Mon Sep 17 00:00:00 2001 From: Akkia Date: Tue, 12 Apr 2022 19:02:22 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20UpdateIP=20=E6=B3=A8?= =?UTF-8?q?=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/agent/monitor/myip.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/agent/monitor/myip.go b/cmd/agent/monitor/myip.go index 0e8d704252..692b2181d7 100644 --- a/cmd/agent/monitor/myip.go +++ b/cmd/agent/monitor/myip.go @@ -51,7 +51,7 @@ var ( httpClientV6 = utils.NewSingleStackHTTPClient(time.Second*20, time.Second*5, time.Second*10, true) ) -// UpdateIP 每30分钟汇报一次IP地址信息 +// UpdateIP 每30分钟更新一次IP地址与国家码的缓存 func UpdateIP() { for { ipv4 := fetchGeoIP(geoIPApiList, false) From 6b75682bdf5fb07f435fa1a07de64f6500f39cfb Mon Sep 17 00:00:00 2001 From: naiba Date: Tue, 12 Apr 2022 20:11:19 +0800 Subject: [PATCH 5/5] typo --- model/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/model/config.go b/model/config.go index 947aa769e3..e7cf6b71c3 100644 --- a/model/config.go +++ b/model/config.go @@ -26,7 +26,7 @@ type AgentConfig struct { v *viper.Viper } -// Read 从给点的文件目录加载配置文件 +// Read 从给定的文件目录加载配置文件 func (c *AgentConfig) Read(path string) error { c.v = viper.New() c.v.SetConfigFile(path)