Skip to content

Commit

Permalink
feat: Implement the discovery service by Etcd (#605)
Browse files Browse the repository at this point in the history
Implement the discovery service by Etcd
  • Loading branch information
yizhibian authored Feb 17, 2024
1 parent e18861d commit 18e611d
Show file tree
Hide file tree
Showing 8 changed files with 613 additions and 7 deletions.
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ require (
require (
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/agiledragon/gomonkey/v2 v2.9.0
go.etcd.io/etcd/api/v3 v3.5.6
go.etcd.io/etcd/client/v3 v3.5.6
)

require (
Expand All @@ -45,13 +47,16 @@ require (
github.com/bytedance/sonic v1.9.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/creasty/defaults v1.5.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
Expand Down Expand Up @@ -82,6 +87,7 @@ require (
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.6 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
Expand Down Expand Up @@ -270,6 +272,7 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
Expand Down Expand Up @@ -766,12 +769,15 @@ go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r74ymPoSWa3Sw=
go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A=
go.etcd.io/etcd/api/v3 v3.5.6 h1:Cy2qx3npLcYqTKqGJzMypnMv2tiRyifZJ17BlWIWA7A=
go.etcd.io/etcd/api/v3 v3.5.6/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8=
go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g=
go.etcd.io/etcd/client/pkg/v3 v3.5.6 h1:TXQWYceBKqLp4sa87rcPs11SXxUA/mHwH975v+BDvLU=
go.etcd.io/etcd/client/pkg/v3 v3.5.6/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ=
go.etcd.io/etcd/client/v2 v2.305.0-alpha.0/go.mod h1:kdV+xzCJ3luEBSIeQyB/OEKkWKd8Zkux4sbDeANrosU=
go.etcd.io/etcd/client/v3 v3.5.0-alpha.0/go.mod h1:wKt7jgDgf/OfKiYmCq5WFGxOFAkVMLxiiXgLDFhECr8=
go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY=
go.etcd.io/etcd/client/v3 v3.5.6 h1:coLs69PWCXE9G4FKquzNaSHrRyMCAXwF+IX1tAPVO8E=
go.etcd.io/etcd/client/v3 v3.5.6/go.mod h1:f6GRinRMCsFVv9Ht42EyY7nfsVGwrNO0WEoS2pRKzQk=
go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY=
go.etcd.io/etcd/raft/v3 v3.5.0-alpha.0/go.mod h1:FAwse6Zlm5v4tEWZaTjmNhe17Int4Oxbu7+2r0DiD3w=
Expand Down
243 changes: 238 additions & 5 deletions pkg/discovery/etcd3.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,247 @@

package discovery

type EtcdRegistryService struct{}
import (
"context"
"fmt"
"github.com/seata/seata-go/pkg/util/log"
etcd3 "go.etcd.io/etcd/client/v3"
"strconv"
"strings"
"sync"
)

const (
clusterNameSplitChar = "-"
addressSplitChar = ":"
etcdClusterPrefix = "registry-seata"
)

type EtcdRegistryService struct {
client *etcd3.Client
cfg etcd3.Config
vgroupMapping map[string]string
grouplist map[string][]*ServiceInstance
rwLock sync.RWMutex

stopCh chan struct{}
}

func newEtcdRegistryService(config *ServiceConfig, etcd3Config *Etcd3Config) RegistryService {

if etcd3Config == nil {
log.Fatalf("etcd config is nil")
panic("etcd config is nil")
}

cfg := etcd3.Config{
Endpoints: []string{etcd3Config.ServerAddr},
}
cli, err := etcd3.New(cfg)
if err != nil {
log.Fatalf("failed to create etcd3 client")
panic("failed to create etcd3 client")
}

vgroupMapping := config.VgroupMapping
grouplist := make(map[string][]*ServiceInstance, 0)

etcdRegistryService := &EtcdRegistryService{
client: cli,
cfg: cfg,
vgroupMapping: vgroupMapping,
grouplist: grouplist,
stopCh: make(chan struct{}),
}
go etcdRegistryService.watch(etcdClusterPrefix)

return etcdRegistryService
}

func (s *EtcdRegistryService) watch(key string) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

resp, err := s.client.Get(ctx, key, etcd3.WithPrefix())
if err != nil {
log.Infof("cant get server instances from etcd")
}

if resp != nil {
for _, kv := range resp.Kvs {
k := kv.Key
v := kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key has an incorrect format: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value has an incorrect format: ", err)
return
}
s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
} else {
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
}
s.rwLock.Unlock()
}

}
// watch the changes of endpoints
watchCh := s.client.Watch(ctx, key, etcd3.WithPrefix())

for {
select {
case watchResp, ok := <-watchCh:
if !ok {
log.Warnf("Watch channel closed")
return
}
for _, event := range watchResp.Events {
switch event.Type {
case etcd3.EventTypePut:
log.Infof("Key %s updated. New value: %s\n", event.Kv.Key, event.Kv.Value)

k := event.Kv.Key
v := event.Kv.Value
clusterName, err := getClusterName(k)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}
serverInstance, err := getServerInstance(v)
if err != nil {
log.Errorf("etcd value err: ", err)
return
}

s.rwLock.Lock()
if s.grouplist[clusterName] == nil {
s.grouplist[clusterName] = []*ServiceInstance{serverInstance}
s.rwLock.Unlock()
continue
}
if ifHaveSameServiceInstances(s.grouplist[clusterName], serverInstance) {
s.rwLock.Unlock()
continue
}
s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance)
s.rwLock.Unlock()

case etcd3.EventTypeDelete:
log.Infof("Key %s deleted.\n", event.Kv.Key)

cluster, ip, port, err := getClusterAndAddress(event.Kv.Key)
if err != nil {
log.Errorf("etcd key err: ", err)
return
}

s.rwLock.Lock()
serviceInstances := s.grouplist[cluster]
if serviceInstances == nil {
log.Warnf("etcd doesnt exit cluster: ", cluster)
s.rwLock.Unlock()
continue
}
s.grouplist[cluster] = removeValueFromList(serviceInstances, ip, port)
s.rwLock.Unlock()
}
}
case <-s.stopCh:
log.Warn("stop etcd watch")
return
}
}
}

func getClusterName(key []byte) (string, error) {
stringKey := string(key)
keySplit := strings.Split(stringKey, clusterNameSplitChar)
if len(keySplit) != 4 {
return "", fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey)
}

cluster := keySplit[2]
return cluster, nil
}

func getServerInstance(value []byte) (*ServiceInstance, error) {
stringValue := string(value)
valueSplit := strings.Split(stringValue, addressSplitChar)
if len(valueSplit) != 2 {
return nil, fmt.Errorf("etcd value has an incorrect format. value: %s", stringValue)
}
ip := valueSplit[0]
port, err := strconv.Atoi(valueSplit[1])
if err != nil {
return nil, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}
serverInstance := &ServiceInstance{
Addr: ip,
Port: port,
}

return serverInstance, nil
}

func getClusterAndAddress(key []byte) (string, string, int, error) {
stringKey := string(key)
keySplit := strings.Split(stringKey, clusterNameSplitChar)
if len(keySplit) != 4 {
return "", "", 0, fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey)
}
cluster := keySplit[2]
address := strings.Split(keySplit[3], addressSplitChar)
ip := address[0]
port, err := strconv.Atoi(address[1])
if err != nil {
return "", "", 0, fmt.Errorf("etcd port has an incorrect format. err: %w", err)
}

return cluster, ip, port, nil
}

func ifHaveSameServiceInstances(list []*ServiceInstance, value *ServiceInstance) bool {
for _, v := range list {
if v.Addr == value.Addr && v.Port == value.Port {
return true
}
}
return false
}

func removeValueFromList(list []*ServiceInstance, ip string, port int) []*ServiceInstance {
for k, v := range list {
if v.Addr == ip && v.Port == port {
result := list[:k]
if k < len(list)-1 {
result = append(result, list[k+1:]...)
}
return result
}
}

return list
}

func (s *EtcdRegistryService) Lookup(key string) ([]*ServiceInstance, error) {
//TODO implement me
panic("implement me")
s.rwLock.RLock()
defer s.rwLock.RUnlock()
cluster := s.vgroupMapping[key]
if cluster == "" {
return nil, fmt.Errorf("cluster doesnt exit")
}

list := s.grouplist[cluster]
return list, nil
}

func (s *EtcdRegistryService) Close() {
//TODO implement me
panic("implement me")
s.stopCh <- struct{}{}
}
Loading

0 comments on commit 18e611d

Please sign in to comment.