Skip to content

Commit

Permalink
Merge pull request #25 from lafer-fz/optimize/address-registration
Browse files Browse the repository at this point in the history
Optimize: the registered address
  • Loading branch information
lizhemingi authored Jul 4, 2023
2 parents e3c2bfb + 899c1cf commit b1978f4
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 5 deletions.
73 changes: 68 additions & 5 deletions etcd_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"strconv"
"strings"
Expand All @@ -30,8 +31,10 @@ import (
)

const (
ttlKey = "KITEX_ETCD_REGISTRY_LEASE_TTL"
defaultTTL = 60
ttlKey = "KITEX_ETCD_REGISTRY_LEASE_TTL"
defaultTTL = 60
kitexIpToRegistry = "KITEX_IP_TO_REGISTRY"
kitexPortToRegistry = "KITEX_PORT_TO_REGISTRY"
)

type etcdRegistry struct {
Expand Down Expand Up @@ -118,9 +121,13 @@ func (e *etcdRegistry) Deregister(info *registry.Info) error {
}

func (e *etcdRegistry) register(info *registry.Info, leaseID clientv3.LeaseID) error {
addr, err := e.getAddressOfRegistration(info)
if err != nil {
return err
}
val, err := json.Marshal(&instanceInfo{
Network: info.Addr.Network(),
Address: info.Addr.String(),
Address: addr,
Weight: info.Weight,
Tags: info.Tags,
})
Expand All @@ -129,14 +136,18 @@ func (e *etcdRegistry) register(info *registry.Info, leaseID clientv3.LeaseID) e
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err = e.etcdClient.Put(ctx, serviceKey(info.ServiceName, info.Addr.String()), string(val), clientv3.WithLease(leaseID))
_, err = e.etcdClient.Put(ctx, serviceKey(info.ServiceName, addr), string(val), clientv3.WithLease(leaseID))
return err
}

func (e *etcdRegistry) deregister(info *registry.Info) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err := e.etcdClient.Delete(ctx, serviceKey(info.ServiceName, info.Addr.String()))
addr, err := e.getAddressOfRegistration(info)
if err != nil {
return err
}
_, err = e.etcdClient.Delete(ctx, serviceKey(info.ServiceName, addr))
return err
}

Expand Down Expand Up @@ -170,6 +181,40 @@ func (e *etcdRegistry) keepalive(meta *registerMeta) error {
return nil
}

// getAddressOfRegistration returns the address of the service registration.
func (e *etcdRegistry) getAddressOfRegistration(info *registry.Info) (string, error) {

host, port, err := net.SplitHostPort(info.Addr.String())
if err != nil {
return "", err
}

// if host is empty or "::", use local ipv4 address as host
if host == "" || host == "::" {
host, err = getLocalIPv4Host()
if err != nil {
return "", fmt.Errorf("parse registry info addr error: %w", err)
}
}

// if env KITEX_IP_TO_REGISTRY is set, use it as host
if ipToRegistry, exists := os.LookupEnv(kitexIpToRegistry); exists && ipToRegistry != "" {
host = ipToRegistry
}

// if env KITEX_PORT_TO_REGISTRY is set, use it as port
if portToRegistry, exists := os.LookupEnv(kitexPortToRegistry); exists && portToRegistry != "" {
port = portToRegistry
}

p, err := strconv.Atoi(port)
if err != nil {
return "", fmt.Errorf("parse registry info port error: %w", err)
}

return fmt.Sprintf("%s:%d", host, p), nil
}

func validateRegistryInfo(info *registry.Info) error {
if info.ServiceName == "" {
return fmt.Errorf("missing service name in Register")
Expand All @@ -192,3 +237,21 @@ func getTTL() int64 {
}
return ttl
}

func getLocalIPv4Host() (string, error) {
addr, err := net.InterfaceAddrs()
if err != nil {
return "", err
}

for _, addr := range addr {
ipNet, isIpNet := addr.(*net.IPNet)
if isIpNet && !ipNet.IP.IsLoopback() {
ipv4 := ipNet.IP.To4()
if ipv4 != nil {
return ipv4.String(), nil
}
}
}
return "", fmt.Errorf("not found ipv4 address")
}
126 changes: 126 additions & 0 deletions etcd_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,132 @@ func TestEtcdRegistryWithSamePrefix(t *testing.T) {
teardownEmbedEtcd(s)
}

func TestEtcdRegistryWithAddressBlank(t *testing.T) {
s, endpoint := setupEmbedEtcd(t)

rg, err := NewEtcdRegistry([]string{endpoint})
require.Nil(t, err)
rs, err := NewEtcdResolver([]string{endpoint})
require.Nil(t, err)

infoList := []registry.Info{
{
ServiceName: "registry-etcd-test",
Addr: utils.NewNetAddr("tcp", "[::]:8888"),
Weight: 27,
Tags: map[string]string{"hello": "world"},
},
{
ServiceName: "registry-etcd-test-suffix",
Addr: utils.NewNetAddr("tcp", "127.0.0.1:9999"),
Weight: 27,
Tags: map[string]string{"hello": "world"},
}}

// test register service
{
for _, info := range infoList {
err = rg.Register(&info)
require.Nil(t, err)

desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
result, err := rs.Resolve(context.TODO(), desc)
require.Nil(t, err)
address, err := rg.(*etcdRegistry).getAddressOfRegistration(&info)
require.Nil(t, err)
expected := discovery.Result{
Cacheable: true,
CacheKey: info.ServiceName,
Instances: []discovery.Instance{
discovery.NewInstance(info.Addr.Network(), address, info.Weight, info.Tags),
},
}
require.Equal(t, expected, result)
}
}

// test deregister service
{
for _, info := range infoList {
err = rg.Deregister(&info)
require.Nil(t, err)
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
_, err := rs.Resolve(context.TODO(), desc)
require.NotNil(t, err)
}
}

teardownEmbedEtcd(s)
}

func TestEtcdRegistryWithEnvironmentVariable(t *testing.T) {
s, endpoint := setupEmbedEtcd(t)

err := os.Setenv(kitexPortToRegistry, "8899")
if err != nil {
return
}
err = os.Setenv(kitexIpToRegistry, "127.0.0.2")
if err != nil {
return
}

rg, err := NewEtcdRegistry([]string{endpoint})
require.Nil(t, err)
rs, err := NewEtcdResolver([]string{endpoint})
require.Nil(t, err)

infoList := []registry.Info{
{
ServiceName: "registry-etcd-test",
Addr: utils.NewNetAddr("tcp", "[::]:8888"),
Weight: 27,
Tags: map[string]string{"hello": "world"},
},
{
ServiceName: "registry-etcd-test-suffix",
Addr: utils.NewNetAddr("tcp", "10.122.1.108:9999"),
Weight: 27,
Tags: map[string]string{"hello": "world"},
}}

// test register service
{
for _, info := range infoList {
err = rg.Register(&info)
require.Nil(t, err)

desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
result, err := rs.Resolve(context.TODO(), desc)
require.Nil(t, err)
address, err := rg.(*etcdRegistry).getAddressOfRegistration(&info)
require.Nil(t, err)
expected := discovery.Result{
Cacheable: true,
CacheKey: info.ServiceName,
Instances: []discovery.Instance{
discovery.NewInstance(info.Addr.Network(), address, info.Weight, info.Tags),
},
}
require.Equal(t, expected, result)
}
}

// test deregister service
{
for _, info := range infoList {
err = rg.Deregister(&info)
require.Nil(t, err)
desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil))
_, err := rs.Resolve(context.TODO(), desc)
require.NotNil(t, err)
}
}
os.Unsetenv(kitexPortToRegistry)
os.Unsetenv(kitexIpToRegistry)
teardownEmbedEtcd(s)
}

func TestEmptyEndpoints(t *testing.T) {
_, err := NewEtcdResolver([]string{})
require.NotNil(t, err)
Expand Down

0 comments on commit b1978f4

Please sign in to comment.