diff --git a/etcd_registry.go b/etcd_registry.go index b7b6c8e..71007f5 100644 --- a/etcd_registry.go +++ b/etcd_registry.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "fmt" + "net" "os" "strconv" "strings" @@ -30,8 +31,10 @@ import ( ) const ( - ttlKey = "KITEX_ETCD_REGISTRY_LEASE_TTL" - defaultTTL = 60 + ttlKey = "KITEX_ETCD_REGISTRY_LEASE_TTL" + defaultTTL = 60 + kitexIpToRegistry = "KITEX_IP_TO_REGISTRY" + kitexPortToRegistry = "KITEX_PORT_TO_REGISTRY" ) type etcdRegistry struct { @@ -118,9 +121,13 @@ func (e *etcdRegistry) Deregister(info *registry.Info) error { } func (e *etcdRegistry) register(info *registry.Info, leaseID clientv3.LeaseID) error { + addr, err := e.getAddressOfRegistration(info) + if err != nil { + return err + } val, err := json.Marshal(&instanceInfo{ Network: info.Addr.Network(), - Address: info.Addr.String(), + Address: addr, Weight: info.Weight, Tags: info.Tags, }) @@ -129,14 +136,18 @@ func (e *etcdRegistry) register(info *registry.Info, leaseID clientv3.LeaseID) e } ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - _, err = e.etcdClient.Put(ctx, serviceKey(info.ServiceName, info.Addr.String()), string(val), clientv3.WithLease(leaseID)) + _, err = e.etcdClient.Put(ctx, serviceKey(info.ServiceName, addr), string(val), clientv3.WithLease(leaseID)) return err } func (e *etcdRegistry) deregister(info *registry.Info) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - _, err := e.etcdClient.Delete(ctx, serviceKey(info.ServiceName, info.Addr.String())) + addr, err := e.getAddressOfRegistration(info) + if err != nil { + return err + } + _, err = e.etcdClient.Delete(ctx, serviceKey(info.ServiceName, addr)) return err } @@ -170,6 +181,40 @@ func (e *etcdRegistry) keepalive(meta *registerMeta) error { return nil } +// getAddressOfRegistration returns the address of the service registration. +func (e *etcdRegistry) getAddressOfRegistration(info *registry.Info) (string, error) { + + host, port, err := net.SplitHostPort(info.Addr.String()) + if err != nil { + return "", err + } + + // if host is empty or "::", use local ipv4 address as host + if host == "" || host == "::" { + host, err = getLocalIPv4Host() + if err != nil { + return "", fmt.Errorf("parse registry info addr error: %w", err) + } + } + + // if env KITEX_IP_TO_REGISTRY is set, use it as host + if ipToRegistry, exists := os.LookupEnv(kitexIpToRegistry); exists && ipToRegistry != "" { + host = ipToRegistry + } + + // if env KITEX_PORT_TO_REGISTRY is set, use it as port + if portToRegistry, exists := os.LookupEnv(kitexPortToRegistry); exists && portToRegistry != "" { + port = portToRegistry + } + + p, err := strconv.Atoi(port) + if err != nil { + return "", fmt.Errorf("parse registry info port error: %w", err) + } + + return fmt.Sprintf("%s:%d", host, p), nil +} + func validateRegistryInfo(info *registry.Info) error { if info.ServiceName == "" { return fmt.Errorf("missing service name in Register") @@ -192,3 +237,21 @@ func getTTL() int64 { } return ttl } + +func getLocalIPv4Host() (string, error) { + addr, err := net.InterfaceAddrs() + if err != nil { + return "", err + } + + for _, addr := range addr { + ipNet, isIpNet := addr.(*net.IPNet) + if isIpNet && !ipNet.IP.IsLoopback() { + ipv4 := ipNet.IP.To4() + if ipv4 != nil { + return ipv4.String(), nil + } + } + } + return "", fmt.Errorf("not found ipv4 address") +} diff --git a/etcd_resolver_test.go b/etcd_resolver_test.go index c808990..6a18a65 100644 --- a/etcd_resolver_test.go +++ b/etcd_resolver_test.go @@ -173,6 +173,132 @@ func TestEtcdRegistryWithSamePrefix(t *testing.T) { teardownEmbedEtcd(s) } +func TestEtcdRegistryWithAddressBlank(t *testing.T) { + s, endpoint := setupEmbedEtcd(t) + + rg, err := NewEtcdRegistry([]string{endpoint}) + require.Nil(t, err) + rs, err := NewEtcdResolver([]string{endpoint}) + require.Nil(t, err) + + infoList := []registry.Info{ + { + ServiceName: "registry-etcd-test", + Addr: utils.NewNetAddr("tcp", "[::]:8888"), + Weight: 27, + Tags: map[string]string{"hello": "world"}, + }, + { + ServiceName: "registry-etcd-test-suffix", + Addr: utils.NewNetAddr("tcp", "127.0.0.1:9999"), + Weight: 27, + Tags: map[string]string{"hello": "world"}, + }} + + // test register service + { + for _, info := range infoList { + err = rg.Register(&info) + require.Nil(t, err) + + desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil)) + result, err := rs.Resolve(context.TODO(), desc) + require.Nil(t, err) + address, err := rg.(*etcdRegistry).getAddressOfRegistration(&info) + require.Nil(t, err) + expected := discovery.Result{ + Cacheable: true, + CacheKey: info.ServiceName, + Instances: []discovery.Instance{ + discovery.NewInstance(info.Addr.Network(), address, info.Weight, info.Tags), + }, + } + require.Equal(t, expected, result) + } + } + + // test deregister service + { + for _, info := range infoList { + err = rg.Deregister(&info) + require.Nil(t, err) + desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil)) + _, err := rs.Resolve(context.TODO(), desc) + require.NotNil(t, err) + } + } + + teardownEmbedEtcd(s) +} + +func TestEtcdRegistryWithEnvironmentVariable(t *testing.T) { + s, endpoint := setupEmbedEtcd(t) + + err := os.Setenv(kitexPortToRegistry, "8899") + if err != nil { + return + } + err = os.Setenv(kitexIpToRegistry, "127.0.0.2") + if err != nil { + return + } + + rg, err := NewEtcdRegistry([]string{endpoint}) + require.Nil(t, err) + rs, err := NewEtcdResolver([]string{endpoint}) + require.Nil(t, err) + + infoList := []registry.Info{ + { + ServiceName: "registry-etcd-test", + Addr: utils.NewNetAddr("tcp", "[::]:8888"), + Weight: 27, + Tags: map[string]string{"hello": "world"}, + }, + { + ServiceName: "registry-etcd-test-suffix", + Addr: utils.NewNetAddr("tcp", "10.122.1.108:9999"), + Weight: 27, + Tags: map[string]string{"hello": "world"}, + }} + + // test register service + { + for _, info := range infoList { + err = rg.Register(&info) + require.Nil(t, err) + + desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil)) + result, err := rs.Resolve(context.TODO(), desc) + require.Nil(t, err) + address, err := rg.(*etcdRegistry).getAddressOfRegistration(&info) + require.Nil(t, err) + expected := discovery.Result{ + Cacheable: true, + CacheKey: info.ServiceName, + Instances: []discovery.Instance{ + discovery.NewInstance(info.Addr.Network(), address, info.Weight, info.Tags), + }, + } + require.Equal(t, expected, result) + } + } + + // test deregister service + { + for _, info := range infoList { + err = rg.Deregister(&info) + require.Nil(t, err) + desc := rs.Target(context.TODO(), rpcinfo.NewEndpointInfo(info.ServiceName, "", nil, nil)) + _, err := rs.Resolve(context.TODO(), desc) + require.NotNil(t, err) + } + } + os.Unsetenv(kitexPortToRegistry) + os.Unsetenv(kitexIpToRegistry) + teardownEmbedEtcd(s) +} + func TestEmptyEndpoints(t *testing.T) { _, err := NewEtcdResolver([]string{}) require.NotNil(t, err)