Skip to content

Commit

Permalink
实现ws的链路追踪
Browse files Browse the repository at this point in the history
  • Loading branch information
steden committed Sep 16, 2024
1 parent affb1ba commit 287b6d7
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 30 deletions.
33 changes: 17 additions & 16 deletions connectServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,62 +11,63 @@ import (
"time"
)

// 每个任务组对应的ClientVO
var mapClient = sync.Map{}

func connectFScheduleServer(job ClientVO) {
func connectFScheduleServer(clientVO ClientVO) {
for {
address := defaultServer.getAddress()
var err error
job.client, err = ws.Connect(address, 8192)
job.client.AutoExit = false
clientVO.client, err = ws.Connect(address, 8192)
clientVO.client.AutoExit = false
if err != nil {
flog.Warningf("[%s]调度中心连接失败:%s", job.Name, err.Error())
flog.Warningf("[%s]调度中心连接失败:%s", clientVO.Name, err.Error())
time.Sleep(3 * time.Second)
continue
}
mapClient.Store(job.Name, job)
mapClient.Store(clientVO.Name, clientVO)
// 连接成功后,需要先注册
err = job.client.Send(sendDTO{Type: -1, Registry: registryDTO{ClientName: core.AppName, Job: job}})
err = clientVO.client.Send(sendDTO{Type: -1, Registry: registryDTO{ClientName: core.AppName, Job: clientVO}})
if err != nil {
flog.Warningf("[%s]调度中心注册失败:%s", job.Name, err.Error())
flog.Warningf("[%s]调度中心注册失败:%s", clientVO.Name, err.Error())
time.Sleep(3 * time.Second)
continue
}

for {
// 接收调度请求
var dto receiverDTO
err = job.client.Receiver(&dto)
err = clientVO.client.Receiver(&dto)
if err != nil {
if job.client.IsClose() {
mapClient.Delete(job.Name)
flog.Warningf("[%s]调度中心服务端:%s 已断开连接,将在3秒后重连", job.Name, address)
if clientVO.client.IsClose() {
mapClient.Delete(clientVO.Name)
flog.Warningf("[%s]调度中心服务端:%s 已断开连接,将在3秒后重连", clientVO.Name, address)
break
}
flog.Warningf("[%s]接收调度中心数据时失败:%s", job.Name, err.Error())
flog.Warningf("[%s]接收调度中心数据时失败:%s", clientVO.Name, err.Error())
continue
}

switch dto.Type {
// 新任务
case 0:
go invokeJob(job, dto.Task)
go invokeJob(clientVO, dto.Task)
// 停止任务
case 1:
flog.Infof("任务组:%s,收到Kill请求,停止任务%d", job.Name, dto.Task.Id)
flog.Infof("任务组:%s,收到Kill请求,停止任务%d", clientVO.Name, dto.Task.Id)
if jContext, exists := taskList.Load(dto.Task.Id); exists {
jobContext := jContext.(*JobContext)
jobContext.Remark("FOPS主动停止任务")
jobContext.status = executeStatus.Fail
jobContext.clientJob.report(jobContext)
jobContext.cancel()
flog.Infof("任务组:%s,主动停止了任务%d", job.Name, dto.Task.Id)
flog.Infof("任务组:%s,主动停止了任务%d", clientVO.Name, dto.Task.Id)
}
}
}

// 断开后重连
<-job.client.Ctx.Done()
<-clientVO.client.Ctx.Done()
time.Sleep(3 * time.Second)
}
}
Expand Down
10 changes: 2 additions & 8 deletions invokeJob.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,13 @@ func invokeJob(clientVO ClientVO, task taskDTO) {
traceManager: container.Resolve[trace.IManager](),
}
taskList.Store(task.Id, jobContext)

// 移除任务
defer taskList.Delete(task.Id)

// 链路追踪
entryFSchedule := jobContext.traceManager.EntryFSchedule(jobContext.Name, jobContext.Id, jobContext.Data.ToMap())
defer func() {
// 任务报告完后,移除本次任务
clientVO.report(jobContext)

if entryFSchedule != nil {
entryFSchedule.End()
}
taskList.Delete(task.Id)
entryFSchedule.End(nil)
asyncLocal.Release()
}()

Expand Down
4 changes: 2 additions & 2 deletions test/farseer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ FSchedule:
Server:
Token: ""
Address:
- "ws://127.0.0.1:8886"
#- "https://fschedule.fsgit.cc"
# - "ws://127.0.0.1:8886"
- "wss://fschedule.fsgit.cc"
Log:
LogLevel: "info"
Component:
Expand Down
6 changes: 2 additions & 4 deletions test/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ func (module startupModule) DependsModule() []modules.FarseerModule {
}

func (module startupModule) PostInitialize() {
for i := 1; i <= 1; i++ {
for i := 1; i <= 5; i++ {
fSchedule.AddJob(true, "Hello"+strconv.Itoa(i), "测试HelloJob"+strconv.Itoa(i), 1, "0/1 * * * * ?", func(jobContext *fSchedule.JobContext) bool {
//jobContext.Debug("测试日志2")
//jobContext.Tracef("测试日志1")
//jobContext.Info("测试日志3")
if jobContext.Name == "Hello1" {
flog.Infof("任务组:%s %d 开始执行", jobContext.Name, jobContext.Id)
}
flog.Infof("任务组:%s %d 开始执行", jobContext.Name, jobContext.Id)
return true
})
}
Expand Down

0 comments on commit 287b6d7

Please sign in to comment.