This repository has been archived by the owner on May 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 15
/
slave_agent.go
83 lines (71 loc) · 1.81 KB
/
slave_agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package ultron
import (
"fmt"
"sync/atomic"
"time"
"github.com/wosai/ultron/v2/pkg/genproto"
"go.uber.org/zap"
)
type (
// SlaveAgent 定义master侧的slave对象
SlaveAgent interface {
ID() string
Extras() map[string]string
}
slaveAgent struct {
slaveID string
extras map[string]string // todo: 之后再实现
input chan *genproto.SubscribeResponse
status *genproto.SendStatusRequest
closed uint32
}
)
var (
_ SlaveAgent = (*slaveAgent)(nil)
)
func newSlaveAgent(req *genproto.SubscribeRequest) *slaveAgent {
return &slaveAgent{
slaveID: req.SlaveId,
extras: req.Extras,
input: make(chan *genproto.SubscribeResponse, 1),
}
}
func (sa *slaveAgent) ID() string {
return sa.slaveID
}
func (sa *slaveAgent) Extras() map[string]string {
ret := make(map[string]string)
for k, v := range sa.extras {
ret[k] = v
}
return ret
}
func (sa *slaveAgent) close() error {
if atomic.CompareAndSwapUint32(&sa.closed, 0, 1) {
select {
case sa.input <- &genproto.SubscribeResponse{Type: genproto.EventType_DISCONNECT}:
close(sa.input)
case <-time.After(3 * time.Second):
return fmt.Errorf("the input channel is blocked: %s", sa.ID())
}
}
return nil
}
// send 返回是否发送(不代表发送成功)
func (sa *slaveAgent) send(event *genproto.SubscribeResponse) error {
if atomic.LoadUint32(&sa.closed) == 0 {
sa.input <- event
return nil
}
return fmt.Errorf("slave agent is closed, cannot send event out: %d", event.Type)
}
func (sa *slaveAgent) keepAlives() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for range ticker.C {
if err := sa.send(&genproto.SubscribeResponse{Type: genproto.EventType_STATUS_REPORT}); err != nil {
Logger.Error("the slave agent is closed, stop the ticker", zap.String("slave_id", sa.ID()), zap.Error(err))
return
}
}
}