-
Notifications
You must be signed in to change notification settings - Fork 2
/
etcd.registry.go
119 lines (100 loc) · 2.82 KB
/
etcd.registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package main
import (
"context"
"go.etcd.io/etcd/client/v3"
"pome/define"
"sync"
"time"
)
/*
大纲
创建一个lease租约,会得到一个 lease id,由于它来自 etcd,那么刚好可以作为 node id 进行全局唯一标识
服务注册时,put一个带后缀的 Key
我们先来看不带后缀的 key, 比如 /abc/email_service/,它指明了目的服务名
但只用 /abc/email_service/ 是不行的, 因为我们部署很多同类型的 node,这会导致 /abc/email_service/ 的值被覆盖
所以我们要加入后缀来区分各个节点,这个时候就可以用到我们的 node id,也就是 lease id
假如 lease id 是 1847459712, 那么完整的 key 就是 /abc/email_service/1847459712
服务发现
服务发现时, 我们使用监听前缀的 watch 方法对 /abc/email_service/* 进行监听
服务注销
通过 keepAlive 续约,如果不续约,etcd 会删除这个租约上的所有 key,实现服务注销
keepAlive 断开的情况有两个
一个是 service 节点所在的 pod 宕机,一个是 etcd 集群下线。
*/
var node_id int64
func ExecUnitEtcd(nodeActiveContext context.Context, nodeActiveContextCancel func(), exitFinishChannel chan bool) {
u := &etcdClient{}
P.discoverer = (*discoverer)(u)
(*etcdClient)(u).init(nodeActiveContext)
(*registry)(u).init(nodeActiveContext)
(*discoverer)(u).init(nodeActiveContext)
go (*discoverer)(u).keepSync(nodeActiveContext)
keepalive, err := u.lease.KeepAlive(nodeActiveContext, u.id)
if err != nil {
panic(err)
}
L:
for {
select {
case <-nodeActiveContext.Done():
break L
case ret := <-keepalive:
if ret == nil {
break L
}
}
}
(*registry)(u).unregister(nodeActiveContext)
nodeActiveContextCancel()
exitFinishChannel <- true
}
type serviceNodes struct {
lastMaxDelay int64
nodes map[int64]*node
lock sync.RWMutex
}
func newServiceNodes() *serviceNodes {
return &serviceNodes{
nodes: map[int64]*node{},
}
}
type etcdClient struct {
client *clientv3.Client
id clientv3.LeaseID
lease clientv3.Lease
serviceName serviceName
synced map[string]*serviceNodes
dLock sync.Mutex
}
func (r *etcdClient) init(ctx context.Context) {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{define.EtcdCluster},
DialTimeout: time.Second,
})
if err != nil {
panic(err)
}
l := clientv3.NewLease(client)
resp, err := l.Grant(ctx, CONFIG.LeaseTimeOut)
if err != nil {
panic(err)
}
r.client = client
r.id = resp.ID
node_id = int64(r.id)
r.lease = l
}
type registry etcdClient
func (r *registry) init(ctx context.Context) error {
r.serviceName = name()
_, err := r.client.Put(
ctx,
r.serviceName.concat(r.id),
localhost(),
clientv3.WithLease(r.id),
)
return err
}
func (r *registry) unregister(ctx context.Context) {
r.lease.Revoke(ctx, r.id)
}