Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【等CI结束就合并】feat: Implement the discovery service by Etcd #605

Merged
merged 15 commits into from
Feb 17, 2024
193 changes: 175 additions & 18 deletions pkg/discovery/etcd3.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,27 @@ import (
"fmt"
"github.com/seata/seata-go/pkg/util/log"
etcd3 "go.etcd.io/etcd/client/v3"
"strconv"
"strings"
"sync"
)

const (
clusterNameSplitChar = "-"
AddressSplitChar = ":"
yizhibian marked this conversation as resolved.
Show resolved Hide resolved
)

type EtcdRegistryService struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

给EtcdRegistryService编写完整的单元测试

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已添加

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iSuperCoder 你好,已经加上单元测试了,还需要补充什么吗?

client *etcd3.Client
cfg etcd3.Config
vgroupMapping map[string]string
grouplist map[string]*ServiceConfig
grouplist map[string][]*ServiceInstance
rwLock sync.RWMutex

stopCh chan struct{}
}

func newEtcdRegistryService(etcd3Config *Etcd3Config) RegistryService {
func newEtcdRegistryService(config *ServiceConfig, etcd3Config *Etcd3Config) RegistryService {

if etcd3Config == nil {
log.Fatalf("etcd config is nil")
Expand All @@ -49,8 +58,8 @@ func newEtcdRegistryService(etcd3Config *Etcd3Config) RegistryService {
panic("failed to create etcd3 client")
}

vgroupMapping := make(map[string]string, 0)
grouplist := make(map[string]*ServiceConfig, 0)
vgroupMapping := config.VgroupMapping
grouplist := make(map[string][]*ServiceInstance, 0)

etcdRegistryService := &EtcdRegistryService{
client: cli,
Expand All @@ -59,7 +68,7 @@ func newEtcdRegistryService(etcd3Config *Etcd3Config) RegistryService {
grouplist: grouplist,
stopCh: make(chan struct{}),
}
go etcdRegistryService.watch("seata-server")
go etcdRegistryService.watch("registry-seata")

return etcdRegistryService
}
Expand All @@ -69,48 +78,196 @@ func (s *EtcdRegistryService) watch(key string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

resp, err := s.client.Get(ctx, key, etcd3.WithPrefix())
if err != nil {
log.Infof("cant get server instances from etcd")
}

for _, kv := range resp.Kvs {
k := kv.Key
v := kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key has an incorrect format: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value has an incorrect format: ", err)
return
}
s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
} else {
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
}
s.rwLock.Unlock()
}

// 监视指定 key 的变化
yizhibian marked this conversation as resolved.
Show resolved Hide resolved
watchCh := s.client.Watch(ctx, key, etcd3.WithPrefix())

fmt.Println("yep I begin to watch")
// 处理监视事件
for {
select {
case watchResp, ok := <-watchCh:
if !ok {
fmt.Println("Watch channel closed")
log.Warnf("Watch channel closed")
return
}
for _, event := range watchResp.Events {
// 处理事件
switch event.Type {
case etcd3.EventTypePut:
fmt.Printf("Key %s updated. New value: %s\n", event.Kv.Key, event.Kv.Value)
// 进行你想要的操作

k := event.Kv.Key
v := event.Kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value err: ", err)
return
}

s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
s.rwLock.Unlock()
continue
}
if ifHaveSameServiceInstances(s.grouplist[clusterName], serverInstance) {
s.rwLock.Unlock()
continue
}
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
s.rwLock.Unlock()

case etcd3.EventTypeDelete:
fmt.Printf("Key %s deleted.\n", event.Kv.Key)
// 进行你想要的操作
cluster, ip, port, err := getClusterAndAddress(event.Kv.Key)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}

s.rwLock.Lock()
serviceInstances := s.grouplist[cluster]
if serviceInstances == nil {
log.Warnf("etcd doesnt exit cluster: ", cluster)
s.rwLock.Unlock()
continue
}
s.grouplist[cluster] = removeValueFromList(serviceInstances, ip, port)
s.rwLock.Unlock()
}
}
case <-s.stopCh: // stop信号
fmt.Println("stop the watch")
log.Warn("stop etcd watch")
return
}
}
}

func (s *EtcdRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
//TODO implement me
get, err := s.client.Get(context.Background(), key)
func getClusterName(key []byte) (string, error) {
stringKey := string(key)
keySplit := strings.Split(stringKey, clusterNameSplitChar)
if len(keySplit) != 4 {
return "", fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey)
}

cluster := keySplit[2]
return cluster, nil
}

func getServerInstance(value []byte) (*ServiceInstance, error) {
stringValue := string(value)
valueSplit := strings.Split(stringValue, AddressSplitChar)
if len(valueSplit) != 2 {
return nil, fmt.Errorf("etcd value has an incorrect format. value: %s", stringValue)
}
ip := valueSplit[0]
port, err := strconv.Atoi(valueSplit[1])
if err != nil {
return nil, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}
serverInstance := &ServiceInstance{
Addr: ip,
Port: port,
}

return serverInstance, nil
}

func getClusterAndAddress(key []byte) (string, string, int, error) {
stringKey := string(key)
keySplit := strings.Split(stringKey, clusterNameSplitChar)
if len(keySplit) != 4 {
return "", "", 0, fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey)
}
cluster := keySplit[2]
address := strings.Split(keySplit[3], AddressSplitChar)
ip := address[0]
port, err := strconv.Atoi(address[1])
if err != nil {
return nil, err
return "", "", 0, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}
kvs := get.Kvs
fmt.Println(kvs)
return nil, err

return cluster, ip, port, nil
}

func ifHaveSameServiceInstances(list []*ServiceInstance, value *ServiceInstance) bool {
for _, v := range list {
if v.Addr == value.Addr && v.Port == value.Port {
return true
}
}
return false
}

func removeValueFromList(list []*ServiceInstance, ip string, port int) []*ServiceInstance {
for k, v := range list {
if v.Addr == ip && v.Port == port {
result := list[:k]
if k < len(list)-1 {
result = append(result, list[k+1:]...)
}
return result
}
}

return list
}

func (s *EtcdRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
s.rwLock.RLock()
fmt.Println("lets begin")
cluster := s.vgroupMapping[key]
if cluster == "" {
s.rwLock.Unlock()
yizhibian marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("cluster doesnt exit")
}

list := s.grouplist[cluster]
//if len(list) == 0 {
// return nil, fmt.Errorf("service instance doesnt exit in %s", cluster)
//}

if len(list) != 0 {
for _, v := range list {
fmt.Println("here is instance", v.Addr, ":", v.Port)
}
}
fmt.Println("over")
s.rwLock.RUnlock()
yizhibian marked this conversation as resolved.
Show resolved Hide resolved
return list, nil
}

func (s *EtcdRegistryService) Close() {
s.stopCh <- struct{}{}

}
2 changes: 1 addition & 1 deletion pkg/discovery/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func InitRegistry(serviceConfig *ServiceConfig, registryConfig *RegistryConfig)
registryService = newFileRegistryService(serviceConfig)
case ETCD:
//init etcd registry
registryService = newEtcdRegistryService(&registryConfig.Etcd3)
registryService = newEtcdRegistryService(serviceConfig, &registryConfig.Etcd3)
case NACOS:
//TODO: init nacos registry
case EUREKA:
Expand Down
7 changes: 2 additions & 5 deletions pkg/discovery/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,11 @@ func TestInitRegistry(t *testing.T) {
name: "normal",
args: args{
registryConfig: &RegistryConfig{
Type: ETCD,
Etcd3: Etcd3Config{
ServerAddr: "localhost:2379",
},
Type: FILE,
yizhibian marked this conversation as resolved.
Show resolved Hide resolved
},
serviceConfig: &ServiceConfig{},
},
expectedType: "EtcdRegistryService",
expectedType: "FileRegistryService",
},
{
name: "unknown type",
Expand Down