Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【等CI结束就合并】feat: Implement the discovery service by Etcd #605

Merged
merged 15 commits into from
Feb 17, 2024
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ require (
require (
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/agiledragon/gomonkey/v2 v2.9.0
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/v3 v3.5.6
)

require (
Expand All @@ -45,13 +47,16 @@ require (
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/creasty/defaults v1.5.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
Expand Down Expand Up @@ -82,6 +87,7 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.6 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
Expand Down Expand Up @@ -270,6 +272,7 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
Expand Down Expand Up @@ -766,12 +769,15 @@ go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r74ymPoSWa3Sw=
go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A=
go.etcd.io/etcd/api/v3 v3.5.6 h1:Cy2qx3npLcYqTKqGJzMypnMv2tiRyifZJ17BlWIWA7A=
go.etcd.io/etcd/api/v3 v3.5.6/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8=
go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g=
go.etcd.io/etcd/client/pkg/v3 v3.5.6 h1:TXQWYceBKqLp4sa87rcPs11SXxUA/mHwH975v+BDvLU=
go.etcd.io/etcd/client/pkg/v3 v3.5.6/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ=
go.etcd.io/etcd/client/v2 v2.305.0-alpha.0/go.mod h1:kdV+xzCJ3luEBSIeQyB/OEKkWKd8Zkux4sbDeANrosU=
go.etcd.io/etcd/client/v3 v3.5.0-alpha.0/go.mod h1:wKt7jgDgf/OfKiYmCq5WFGxOFAkVMLxiiXgLDFhECr8=
go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY=
go.etcd.io/etcd/client/v3 v3.5.6 h1:coLs69PWCXE9G4FKquzNaSHrRyMCAXwF+IX1tAPVO8E=
go.etcd.io/etcd/client/v3 v3.5.6/go.mod h1:f6GRinRMCsFVv9Ht42EyY7nfsVGwrNO0WEoS2pRKzQk=
go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY=
go.etcd.io/etcd/raft/v3 v3.5.0-alpha.0/go.mod h1:FAwse6Zlm5v4tEWZaTjmNhe17Int4Oxbu7+2r0DiD3w=
Expand Down
243 changes: 238 additions & 5 deletions pkg/discovery/etcd3.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,247 @@

package discovery

type EtcdRegistryService struct{}
import (
"context"
"fmt"
"github.com/seata/seata-go/pkg/util/log"
etcd3 "go.etcd.io/etcd/client/v3"
"strconv"
"strings"
"sync"
)

const (
clusterNameSplitChar = "-"
addressSplitChar = ":"
etcdClusterPrefix = "registry-seata"
)

type EtcdRegistryService struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

给EtcdRegistryService编写完整的单元测试

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已添加

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iSuperCoder 你好,已经加上单元测试了,还需要补充什么吗?

client *etcd3.Client
cfg etcd3.Config
vgroupMapping map[string]string
grouplist map[string][]*ServiceInstance
rwLock sync.RWMutex

stopCh chan struct{}
}

func newEtcdRegistryService(config *ServiceConfig, etcd3Config *Etcd3Config) RegistryService {

if etcd3Config == nil {
log.Fatalf("etcd config is nil")
panic("etcd config is nil")
}

cfg := etcd3.Config{
Endpoints: []string{etcd3Config.ServerAddr},
}
cli, err := etcd3.New(cfg)
if err != nil {
log.Fatalf("failed to create etcd3 client")
panic("failed to create etcd3 client")
}

vgroupMapping := config.VgroupMapping
grouplist := make(map[string][]*ServiceInstance, 0)

etcdRegistryService := &EtcdRegistryService{
client: cli,
cfg: cfg,
vgroupMapping: vgroupMapping,
grouplist: grouplist,
stopCh: make(chan struct{}),
}
go etcdRegistryService.watch(etcdClusterPrefix)

return etcdRegistryService
}

func (s *EtcdRegistryService) watch(key string) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

resp, err := s.client.Get(ctx, key, etcd3.WithPrefix())
if err != nil {
log.Infof("cant get server instances from etcd")
}

if resp != nil {
for _, kv := range resp.Kvs {
k := kv.Key
v := kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key has an incorrect format: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value has an incorrect format: ", err)
return
}
s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
} else {
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
}
s.rwLock.Unlock()
}

}
// watch the changes of endpoints
watchCh := s.client.Watch(ctx, key, etcd3.WithPrefix())

for {
select {
case watchResp, ok := <-watchCh:
if !ok {
log.Warnf("Watch channel closed")
return
}
for _, event := range watchResp.Events {
switch event.Type {
case etcd3.EventTypePut:
log.Infof("Key %s updated. New value: %s\n", event.Kv.Key, event.Kv.Value)

k := event.Kv.Key
v := event.Kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value err: ", err)
return
}

s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
s.rwLock.Unlock()
continue
}
if ifHaveSameServiceInstances(s.grouplist[clusterName], serverInstance) {
s.rwLock.Unlock()
continue
}
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
s.rwLock.Unlock()

case etcd3.EventTypeDelete:
log.Infof("Key %s deleted.\n", event.Kv.Key)

cluster, ip, port, err := getClusterAndAddress(event.Kv.Key)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}

s.rwLock.Lock()
serviceInstances := s.grouplist[cluster]
if serviceInstances == nil {
log.Warnf("etcd doesnt exit cluster: ", cluster)
s.rwLock.Unlock()
continue
}
s.grouplist[cluster] = removeValueFromList(serviceInstances, ip, port)
s.rwLock.Unlock()
}
}
case <-s.stopCh:
log.Warn("stop etcd watch")
return
}
}
}

func getClusterName(key []byte) (string, error) {
stringKey := string(key)
keySplit := strings.Split(stringKey, clusterNameSplitChar)
if len(keySplit) != 4 {
return "", fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey)
}

cluster := keySplit[2]
return cluster, nil
}

func getServerInstance(value []byte) (*ServiceInstance, error) {
stringValue := string(value)
valueSplit := strings.Split(stringValue, addressSplitChar)
if len(valueSplit) != 2 {
return nil, fmt.Errorf("etcd value has an incorrect format. value: %s", stringValue)
}
ip := valueSplit[0]
port, err := strconv.Atoi(valueSplit[1])
if err != nil {
return nil, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}
serverInstance := &ServiceInstance{
Addr: ip,
Port: port,
}

return serverInstance, nil
}

func getClusterAndAddress(key []byte) (string, string, int, error) {
stringKey := string(key)
keySplit := strings.Split(stringKey, clusterNameSplitChar)
if len(keySplit) != 4 {
return "", "", 0, fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey)
}
cluster := keySplit[2]
address := strings.Split(keySplit[3], addressSplitChar)
ip := address[0]
port, err := strconv.Atoi(address[1])
if err != nil {
return "", "", 0, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}

return cluster, ip, port, nil
}

func ifHaveSameServiceInstances(list []*ServiceInstance, value *ServiceInstance) bool {
for _, v := range list {
if v.Addr == value.Addr && v.Port == value.Port {
return true
}
}
return false
}

func removeValueFromList(list []*ServiceInstance, ip string, port int) []*ServiceInstance {
for k, v := range list {
if v.Addr == ip && v.Port == port {
result := list[:k]
if k < len(list)-1 {
result = append(result, list[k+1:]...)
}
return result
}
}

return list
}

func (s *EtcdRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
//TODO implement me
panic("implement me")
s.rwLock.RLock()
defer s.rwLock.RUnlock()
cluster := s.vgroupMapping[key]
if cluster == "" {
return nil, fmt.Errorf("cluster doesnt exit")
}

list := s.grouplist[cluster]
return list, nil
}

func (s *EtcdRegistryService) Close() {
//TODO implement me
panic("implement me")
s.stopCh <- struct{}{}
}
Loading
Loading