Skip to content

Commit

Permalink
重构协议通信模块,将http调整为websocket协议,效果:1000个任务并行执行,0延迟
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Sep 10, 2024
1 parent 986ea09 commit 88a2592
Show file tree
Hide file tree
Showing 17 changed files with 397 additions and 658 deletions.
82 changes: 82 additions & 0 deletions addJob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package fSchedule

import (
"fmt"
"github.com/farseer-go/collections"
"github.com/farseer-go/fSchedule/executeStatus"
"github.com/farseer-go/fs"
"github.com/farseer-go/fs/configure"
"github.com/farseer-go/fs/parse"
"github.com/robfig/cron/v3"
"regexp"
"time"
)

// JobFunc 客户端要执行的JOB
type JobFunc func(jobContext *JobContext) bool

var StandardParser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)

type Option struct {
StartAt int64 // 任务开始时间(时间戳秒)
Data collections.Dictionary[string, string] // 第一次注册时使用
}
type options func(opt *Option)

// AddJob 客户端支持的任务
func AddJob(isEnable bool, name, caption string, ver int, cronString string, job JobFunc, ops ...options) {
matched, err := regexp.MatchString("[a-zA-Z0-9\\-_]+", name)
if err != nil {
panic(fmt.Sprintf("任务组:%s %s,name格式错误:%s", name, caption, err.Error()))
}
if !matched {
panic(fmt.Sprintf("任务组:%s %s,name格式错误,只允许【字母、数字、_、-】", name, caption))
}
_, err = StandardParser.Parse(cronString)
if err != nil {
panic(fmt.Sprintf("任务组:%s %s,Cron格式[%s]错误:%s", name, caption, cronString, err.Error()))
}

// 说明没有启用调度中心(没有依赖模块)
if len(defaultServer.Address) < 1 {
return
}
// 设置额度参数
opt := &Option{Data: collections.NewDictionary[string, string]()}
for _, op := range ops {
op(opt)
}

// 如果是调试状态,则模拟调度
if configure.GetBool("FSchedule.Debug.Enable") {
jobContext := &JobContext{
Id: 888,
Name: name,
Data: collections.NewDictionary[string, string](),
Caption: caption,
nextTimespan: 0,
progress: 0,
status: executeStatus.Working,
StartAt: time.Now(),
}
for k, v := range configure.GetSubNodes("FSchedule.Debug." + name) {
jobContext.Data.Add(k, parse.ToString(v))
}
job(jobContext)
return
}

fs.AddInitCallback("向调度中心注册任务组:"+name, func() {
// 向调度中心注册
go connectFScheduleServer(ClientVO{
Name: name,
IsEnable: isEnable,
Caption: caption,
Ver: ver,
Cron: cronString,
jobFunc: job,
StartAt: opt.StartAt,
Data: opt.Data,
})
})
}
82 changes: 0 additions & 82 deletions clientApi.go

This file was deleted.

78 changes: 78 additions & 0 deletions clientJob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package fSchedule

import (
"fmt"
"github.com/farseer-go/collections"
"github.com/farseer-go/fs/core/eumLogLevel"
"github.com/farseer-go/fs/flog"
"github.com/farseer-go/utils/ws"
"time"
)

type ClientVO struct {
Name string // 任务名称
Ver int // 任务版本
Caption string // 任务标题
Cron string // 任务执行表达式
StartAt int64 // 任务开始时间(时间戳秒)
IsEnable bool // 任务是否启用
Data collections.Dictionary[string, string] // 第一次注册时使用

jobFunc JobFunc // 任务执行函数
client *ws.Client // ws客户端
}

// SetProgress 报告任务结果
func (receiver *ClientVO) report(jobContext *JobContext) {
err := receiver.client.Send(sendDTO{
Type: 0,
TaskReport: taskReportDTO{
Id: jobContext.Id,
Name: jobContext.Name,
Data: jobContext.Data,
NextTimespan: jobContext.nextTimespan,
Progress: jobContext.progress,
Status: jobContext.status,
FailRemark: jobContext.failRemark,
resourceVO: getResource(),
},
})
if err != nil {
flog.Warningf("向调度中心报告任务结果时失败:%s", err.Error())
}
}

// log 记录日志
func (receiver *ClientVO) log(jobContext *JobContext, logLevel eumLogLevel.Enum, contents ...any) {
err := receiver.client.Send(sendDTO{
Type: 1,
Log: logDTO{
TaskId: jobContext.Id,
Name: jobContext.Name,
Ver: jobContext.Ver,
Caption: jobContext.Caption,
Data: jobContext.Data,
LogLevel: logLevel,
CreateAt: time.Now().UnixMilli(),
Content: fmt.Sprint(contents...),
},
})

if err != nil {
flog.Warningf("向调度中心报告任务结果时失败:%s", err.Error())
}
}

// 获取当前客户端的环境信息
func getResource() resourceVO {
// 计算长度
taskListLength := 0
taskList.Range(func(k, v interface{}) bool {
taskListLength++
return true
})
return resourceVO{
QueueCount: taskListLength - workCount,
WorkCount: workCount,
}
}
Loading

0 comments on commit 88a2592

Please sign in to comment.