Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize:移除两处冗余的代码 + refactor:优化代码组织结构 #158

Merged
merged 5 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 22 additions & 20 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ import (
)

type AgentCliParam struct {
SkipConnectionCount bool
SkipProcsCount bool
DisableAutoUpdate bool
DisableForceUpdate bool
DisableCommandExecute bool
Debug bool
Server string
ClientSecret string
ReportDelay int
TLS bool
SkipConnectionCount bool // 跳过连接数检查
SkipProcsCount bool // 跳过进程数量检查
DisableAutoUpdate bool // 关闭自动更新
DisableForceUpdate bool // 关闭强制更新
DisableCommandExecute bool // 关闭命令执行
Debug bool // debug模式
Server string // 服务器地址
ClientSecret string // 客户端密钥
ReportDelay int // 报告间隔
TLS bool // 是否使用TLS加密传输至服务端
}

var (
Expand All @@ -60,7 +60,6 @@ var (
var (
agentCliParam AgentCliParam
agentConfig model.AgentConfig
updateCh = make(chan struct{}) // Agent 自动更新间隔
httpClient = &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
Expand All @@ -86,6 +85,7 @@ func init() {
}

func main() {
// windows环境处理
if runtime.GOOS == "windows" {
hostArch, err := host.KernelArch()
if err != nil {
Expand All @@ -108,6 +108,7 @@ func main() {
// 来自于 GoReleaser 的版本号
monitor.Version = version

// 初始化运行参数
var isEditAgentConfig bool
flag.BoolVarP(&agentCliParam.Debug, "debug", "d", false, "开启调试信息")
flag.BoolVarP(&isEditAgentConfig, "edit-agent-config", "", false, "修改要监控的网卡/分区白名单")
Expand Down Expand Up @@ -145,6 +146,7 @@ func run() {
ClientSecret: agentCliParam.ClientSecret,
}

// 下载远程命令执行需要的终端
if !agentCliParam.DisableCommandExecute {
go pty.DownloadDependency()
}
Expand All @@ -153,19 +155,14 @@ func run() {
// 更新IP信息
go monitor.UpdateIP()

// 定时检查更新
if _, err := semver.Parse(version); err == nil && !agentCliParam.DisableAutoUpdate {
doSelfUpdate(true)
go func() {
for range updateCh {
go func() {
defer func() {
time.Sleep(time.Minute * 20)
updateCh <- struct{}{}
}()
doSelfUpdate(true)
}()
for range time.Tick(20 * time.Minute) {
doSelfUpdate(true)
}
}()
updateCh <- struct{}{}
}

var err error
Expand Down Expand Up @@ -267,6 +264,7 @@ func doTask(task *pb.Task) {
client.ReportTask(context.Background(), &result)
}

// reportState 向server上报状态信息
func reportState() {
var lastReportHostInfo time.Time
var err error
Expand All @@ -282,6 +280,7 @@ func reportState() {
println("reportState error", err)
time.Sleep(delayWhenError)
}
// 每10分钟重新获取一次硬件信息
if lastReportHostInfo.Before(time.Now().Add(-10 * time.Minute)) {
lastReportHostInfo = time.Now()
client.ReportSystemInfo(context.Background(), monitor.GetHost(&agentConfig).PB())
Expand All @@ -291,6 +290,7 @@ func reportState() {
}
}

// doSelfUpdate 执行更新检查 如果更新成功则会结束进程
func doSelfUpdate(useLocalVersion bool) {
v := semver.MustParse("0.1.0")
if useLocalVersion {
Expand All @@ -303,6 +303,7 @@ func doSelfUpdate(useLocalVersion bool) {
return
}
if !latest.Version.Equals(v) {
println("已经更新至:", latest.Version, " 正在结束进程")
os.Exit(1)
}
}
Expand Down Expand Up @@ -500,6 +501,7 @@ func handleTerminalTask(task *pb.Task) {
}
}

// 修改Agent要监控的网卡与硬盘分区
func editAgentConfig() {
nc, err := psnet.IOCounters(true)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/agent/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
cachedBootTime time.Time
)

// GetHost 获取主机硬件信息
func GetHost(agentConfig *model.AgentConfig) *model.Host {
hi, _ := host.Info()
var cpuType string
Expand Down Expand Up @@ -155,6 +156,7 @@ func GetState(agentConfig *model.AgentConfig, skipConnectionCount bool, skipProc
}
}

// TrackNetworkSpeed NIC监控,统计流量与速度
func TrackNetworkSpeed(agentConfig *model.AgentConfig) {
var innerNetInTransfer, innerNetOutTransfer uint64
nc, err := net.IOCounters(true)
Expand Down
1 change: 1 addition & 0 deletions cmd/agent/monitor/myip.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
httpClientV6 = utils.NewSingleStackHTTPClient(time.Second*20, time.Second*5, time.Second*10, true)
)

// UpdateIP 每30分钟更新一次IP地址与国家码的缓存
func UpdateIP() {
for {
ipv4 := fetchGeoIP(geoIPApiList, false)
Expand Down
181 changes: 10 additions & 171 deletions cmd/dashboard/main.go
Original file line number Diff line number Diff line change
@@ -1,201 +1,40 @@
package main

import (
"bytes"
"context"
"fmt"
"log"
"time"

"github.com/ory/graceful"
"github.com/patrickmn/go-cache"
"github.com/robfig/cron/v3"
"gorm.io/driver/sqlite"
"gorm.io/gorm"

"github.com/naiba/nezha/cmd/dashboard/controller"
"github.com/naiba/nezha/cmd/dashboard/rpc"
"github.com/naiba/nezha/model"
"github.com/naiba/nezha/service/singleton"
"github.com/ory/graceful"
"log"
)

func init() {
shanghai, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
panic(err)
}

// 初始化 dao 包
singleton.Init()
singleton.Conf = &model.Config{}
singleton.Cron = cron.New(cron.WithSeconds(), cron.WithLocation(shanghai))
singleton.Crons = make(map[uint64]*model.Cron)
singleton.ServerList = make(map[uint64]*model.Server)
singleton.SecretToID = make(map[string]uint64)

err = singleton.Conf.Read("data/config.yaml")
if err != nil {
panic(err)
}
singleton.DB, err = gorm.Open(sqlite.Open("data/sqlite.db"), &gorm.Config{
CreateBatchSize: 200,
})
if err != nil {
panic(err)
}
if singleton.Conf.Debug {
singleton.DB = singleton.DB.Debug()
}
if singleton.Conf.GRPCPort == 0 {
singleton.Conf.GRPCPort = 5555
}
singleton.Cache = cache.New(5*time.Minute, 10*time.Minute)

singleton.InitConfigFromPath("data/config.yaml")
singleton.InitDBFromPath("data/sqlite.db")
initSystem()
}

func initSystem() {
singleton.DB.AutoMigrate(model.Server{}, model.User{},
model.Notification{}, model.AlertRule{}, model.Monitor{},
model.MonitorHistory{}, model.Cron{}, model.Transfer{})

singleton.LoadNotifications()
loadServers() //加载服务器列表
loadCrons() //加载计划任务
// 启动 singleton 包下的所有服务
singleton.LoadSingleton()

// 每天的3:30 对 监控记录 和 流量记录 进行清理
_, err := singleton.Cron.AddFunc("0 30 3 * * *", cleanMonitorHistory)
if err != nil {
if _, err := singleton.Cron.AddFunc("0 30 3 * * *", singleton.CleanMonitorHistory); err != nil {
panic(err)
}

// 每小时对流量记录进行打点
_, err = singleton.Cron.AddFunc("0 0 * * * *", recordTransferHourlyUsage)
if err != nil {
if _, err := singleton.Cron.AddFunc("0 0 * * * *", singleton.RecordTransferHourlyUsage); err != nil {
panic(err)
}
}

// recordTransferHourlyUsage 对流量记录进行打点
func recordTransferHourlyUsage() {
singleton.ServerLock.Lock()
defer singleton.ServerLock.Unlock()
now := time.Now()
nowTrimSeconds := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
var txs []model.Transfer
for id, server := range singleton.ServerList {
tx := model.Transfer{
ServerID: id,
In: server.State.NetInTransfer - uint64(server.PrevHourlyTransferIn),
Out: server.State.NetOutTransfer - uint64(server.PrevHourlyTransferOut),
}
if tx.In == 0 && tx.Out == 0 {
continue
}
server.PrevHourlyTransferIn = int64(server.State.NetInTransfer)
server.PrevHourlyTransferOut = int64(server.State.NetOutTransfer)
tx.CreatedAt = nowTrimSeconds
txs = append(txs, tx)
}
if len(txs) == 0 {
return
}
log.Println("NEZHA>> Cron 流量统计入库", len(txs), singleton.DB.Create(txs).Error)
}

// cleanMonitorHistory 清理无效或过时的 监控记录 和 流量记录
func cleanMonitorHistory() {
// 清理已被删除的服务器的监控记录与流量记录
singleton.DB.Unscoped().Delete(&model.MonitorHistory{}, "created_at < ? OR monitor_id NOT IN (SELECT `id` FROM monitors)", time.Now().AddDate(0, 0, -30))
singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (SELECT `id` FROM servers)")
// 计算可清理流量记录的时长
var allServerKeep time.Time
specialServerKeep := make(map[uint64]time.Time)
var specialServerIDs []uint64
var alerts []model.AlertRule
singleton.DB.Find(&alerts)
for i := 0; i < len(alerts); i++ {
for j := 0; j < len(alerts[i].Rules); j++ {
// 是不是流量记录规则
if !alerts[i].Rules[j].IsTransferDurationRule() {
continue
}
dataCouldRemoveBefore := alerts[i].Rules[j].GetTransferDurationStart()
// 判断规则影响的机器范围
if alerts[i].Rules[j].Cover == model.RuleCoverAll {
// 更新全局可以清理的数据点
if allServerKeep.IsZero() || allServerKeep.After(dataCouldRemoveBefore) {
allServerKeep = dataCouldRemoveBefore
}
} else {
// 更新特定机器可以清理数据点
for id := range alerts[i].Rules[j].Ignore {
if specialServerKeep[id].IsZero() || specialServerKeep[id].After(dataCouldRemoveBefore) {
specialServerKeep[id] = dataCouldRemoveBefore
specialServerIDs = append(specialServerIDs, id)
}
}
}
}
}
for id, couldRemove := range specialServerKeep {
singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id = ? AND created_at < ?", id, couldRemove)
}
if allServerKeep.IsZero() {
singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?)", specialServerIDs)
} else {
singleton.DB.Unscoped().Delete(&model.Transfer{}, "server_id NOT IN (?) AND created_at < ?", specialServerIDs, allServerKeep)
}
}

//loadServers 加载服务器列表并根据ID排序
func loadServers() {
var servers []model.Server
singleton.DB.Find(&servers)
for _, s := range servers {
innerS := s
innerS.Host = &model.Host{}
innerS.State = &model.HostState{}
singleton.ServerList[innerS.ID] = &innerS
singleton.SecretToID[innerS.Secret] = innerS.ID
}
singleton.ReSortServer()
}

// loadCrons 加载计划任务
func loadCrons() {
var crons []model.Cron
singleton.DB.Find(&crons)
var err error
errMsg := new(bytes.Buffer)
for i := 0; i < len(crons); i++ {
cr := crons[i]

crIgnoreMap := make(map[uint64]bool)
for j := 0; j < len(cr.Servers); j++ {
crIgnoreMap[cr.Servers[j]] = true
}

// 注册计划任务
cr.CronJobID, err = singleton.Cron.AddFunc(cr.Scheduler, singleton.CronTrigger(cr))
if err == nil {
singleton.Crons[cr.ID] = &cr
} else {
if errMsg.Len() == 0 {
errMsg.WriteString("调度失败的计划任务:[")
}
errMsg.WriteString(fmt.Sprintf("%d,", cr.ID))
}
}
if errMsg.Len() > 0 {
msg := errMsg.String()
singleton.SendNotification(msg[:len(msg)-1]+"] 这些任务将无法正常执行,请进入后点重新修改保存。", false)
}
singleton.Cron.Start()
}

func main() {
cleanMonitorHistory()
singleton.CleanMonitorHistory()
go rpc.ServeRPC(singleton.Conf.GRPCPort)
serviceSentinelDispatchBus := make(chan model.Monitor) // 用于传递服务监控任务信息的channel
go rpc.DispatchTask(serviceSentinelDispatchBus)
Expand All @@ -207,7 +46,7 @@ func main() {
return srv.ListenAndServe()
}, func(c context.Context) error {
log.Println("NEZHA>> Graceful::START")
recordTransferHourlyUsage()
singleton.RecordTransferHourlyUsage()
log.Println("NEZHA>> Graceful::END")
srv.Shutdown(c)
return nil
Expand Down
4 changes: 4 additions & 0 deletions model/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type AgentConfig struct {
v *viper.Viper
}

// Read 从给定的文件目录加载配置文件
func (c *AgentConfig) Read(path string) error {
c.v = viper.New()
c.v.SetConfigFile(path)
Expand Down Expand Up @@ -98,6 +99,9 @@ func (c *Config) Read(path string) error {
if c.Site.Theme == "" {
c.Site.Theme = "default"
}
if c.GRPCPort == 0 {
c.GRPCPort = 5555
}

c.updateIgnoredIPNotificationID()
return nil
Expand Down
Loading