-
Notifications
You must be signed in to change notification settings - Fork 60
/
Copy pathetcdv3_service.go
126 lines (105 loc) · 3.01 KB
/
etcdv3_service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"encoding/base64"
"log"
"net/url"
"path"
"strings"
"github.com/docker/libkv"
kvstore "github.com/docker/libkv/store"
etcd "github.com/smallnest/libkv-etcdv3-store"
)
type EtcdV3Registry struct {
kv kvstore.Store
}
func (r *EtcdV3Registry) initRegistry() {
etcd.Register()
kv, err := libkv.NewStore(etcd.ETCDV3, []string{serverConfig.RegistryURL}, nil)
if err != nil {
log.Printf("cannot create etcd registry: %v", err)
return
}
r.kv = kv
return
}
func (r *EtcdV3Registry) fetchServices() []*Service {
var services []*Service
kvs, err := r.kv.List(serverConfig.ServiceBaseURL)
if err != nil {
log.Printf("failed to list services %s: %v", serverConfig.ServiceBaseURL, err)
return services
}
for _, value := range kvs {
nodes, err := r.kv.List(value.Key)
if err != nil {
log.Printf("failed to list %s: %v", value.Key, err)
continue
}
for _, n := range nodes {
key := string(n.Key[:])
i := strings.LastIndex(key, "/")
serviceName := strings.TrimPrefix(key[0:i], serverConfig.ServiceBaseURL)
var serviceAddr string
fields := strings.Split(key, "/")
if fields != nil && len(fields) > 1 {
serviceAddr = fields[len(fields)-1]
}
v, err := url.ParseQuery(string(n.Value[:]))
if err != nil {
log.Println("etcd value parse failed. error: ", err.Error())
continue
}
state := "n/a"
group := ""
if err == nil {
state = v.Get("state")
if state == "" {
state = "active"
}
group = v.Get("group")
}
id := base64.StdEncoding.EncodeToString([]byte(serviceName + "@" + serviceAddr))
service := &Service{ID: id, Name: serviceName, Address: serviceAddr, Metadata: string(n.Value[:]), State: state, Group: group}
services = append(services, service)
}
}
return services
}
func (r *EtcdV3Registry) deactivateService(name, address string) error {
key := path.Join(serverConfig.ServiceBaseURL, name, address)
kv, err := r.kv.Get(key)
if err != nil {
return err
}
v, err := url.ParseQuery(string(kv.Value[:]))
if err != nil {
log.Println("etcd value parse failed. err ", err.Error())
return err
}
v.Set("state", "inactive")
err = r.kv.Put(kv.Key, []byte(v.Encode()), &kvstore.WriteOptions{IsDir: false})
if err != nil {
log.Println("etcd set failed, err : ", err.Error())
}
return err
}
func (r *EtcdV3Registry) activateService(name, address string) error {
key := path.Join(serverConfig.ServiceBaseURL, name, address)
kv, err := r.kv.Get(key)
v, err := url.ParseQuery(string(kv.Value[:]))
if err != nil {
log.Println("etcd value parse failed. err ", err.Error())
return err
}
v.Set("state", "active")
err = r.kv.Put(kv.Key, []byte(v.Encode()), &kvstore.WriteOptions{IsDir: false})
if err != nil {
log.Println("etcdv3 put failed. err: ", err.Error())
}
return err
}
func (r *EtcdV3Registry) updateMetadata(name, address string, metadata string) error {
key := path.Join(serverConfig.ServiceBaseURL, name, address)
err := r.kv.Put(key, []byte(metadata), &kvstore.WriteOptions{IsDir: false})
return err
}