Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to Consul nameresolver #2594

Merged
merged 9 commits into from
Feb 24, 2023
97 changes: 44 additions & 53 deletions nameresolution/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ limitations under the License.
package consul

import (
"crypto/rand"
"fmt"
"math/big"
"math/rand"
"net"
"strconv"

Expand Down Expand Up @@ -81,96 +80,84 @@ type resolverConfig struct {

// NewResolver creates Consul name resolver.
func NewResolver(logger logger.Logger) nr.Resolver {
return newResolver(logger, resolverConfig{}, &client{})
return newResolver(logger, &client{})
}

func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface) nr.Resolver {
func newResolver(logger logger.Logger, client clientInterface) *resolver {
return &resolver{
logger: logger,
config: resolverConfig,
client: client,
}
}

// Init will configure component. It will also register service or validate client connection based on config.
func (r *resolver) Init(metadata nr.Metadata) error {
var err error

func (r *resolver) Init(metadata nr.Metadata) (err error) {
r.config, err = getConfig(metadata)
if err != nil {
return err
}

if err = r.client.InitClient(r.config.Client); err != nil {
err = r.client.InitClient(r.config.Client)
if err != nil {
return fmt.Errorf("failed to init consul client: %w", err)
}

// register service to consul
// Register service to consul
if r.config.Registration != nil {
if err := r.client.Agent().ServiceRegister(r.config.Registration); err != nil {
agent := r.client.Agent()

err = agent.ServiceRegister(r.config.Registration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw your comment about using deregistercriticalserviceafter - but that fix not working.

Should we rather deregister before registering?
Documentation for ServiceDeregister(serviceID string) error (https://developer.hashicorp.com/consul/commands/services/deregister) also recommends that it is meant to be paired with services register.

It may not completely solve the issue but would certainly be a good practice most probably and may also help in solving issues like #2489

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was actually what I first tried doing, and it didn't work because a service can have multiple instances if the app is scaled horizontally. Calling "ServiceDeregister" removed the entire service and so un-registered other instances of the app too.

I spent 2 hours on this yesterday and doesn't really look like Consul has any way to remove only one instance :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh ok, got it.

if err != nil {
return fmt.Errorf("failed to register consul service: %w", err)
}

r.logger.Infof("service:%s registered on consul agent", r.config.Registration.Name)
} else if _, err := r.client.Agent().Self(); err != nil {
return fmt.Errorf("failed check on consul agent: %w", err)
} else {
_, err = r.client.Agent().Self()
if err != nil {
return fmt.Errorf("failed check on consul agent: %w", err)
}
}

return nil
}

// ResolveID resolves name to address via consul.
func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) {
func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) {
cfg := r.config
services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions)
if err != nil {
return "", fmt.Errorf("failed to query healthy consul services: %w", err)
}

if len(services) == 0 {
return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID)
return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID)
}

shuffle := func(services []*consul.ServiceEntry) []*consul.ServiceEntry {
for i := len(services) - 1; i > 0; i-- {
rndbig, _ := rand.Int(rand.Reader, big.NewInt(int64(i+1)))
j := rndbig.Int64()

services[i], services[j] = services[j], services[i]
}
// Pick a random service from the result
// Note: we're using math/random here as PRNG and that's ok since we're just using this for selecting a random address from a list for load-balancing, so we don't need a CSPRNG
//nolint:gosec
svc := services[rand.Int()%len(services)]

return services
port := svc.Service.Meta[cfg.DaprPortMetaKey]
if port == "" {
return "", fmt.Errorf("target service AppID '%s' found but DAPR_PORT missing from meta", req.ID)
}

svc := shuffle(services)[0]

addr := ""

if port, ok := svc.Service.Meta[cfg.DaprPortMetaKey]; ok {
if svc.Service.Address != "" {
addr = fmt.Sprintf("%s:%s", svc.Service.Address, port)
} else if svc.Node.Address != "" {
addr = fmt.Sprintf("%s:%s", svc.Node.Address, port)
} else {
return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID)
}
if svc.Service.Address != "" {
addr = svc.Service.Address + ":" + port
} else if svc.Node.Address != "" {
addr = svc.Node.Address + ":" + port
} else {
return "", fmt.Errorf("target service AppID:%s found but DAPR_PORT missing from meta", req.ID)
return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID)
}

return addr, nil
}

// getConfig configuration from metadata, defaults are best suited for self-hosted mode.
func getConfig(metadata nr.Metadata) (resolverConfig, error) {
var daprPort string
var ok bool
var err error
resolverCfg := resolverConfig{}

props := metadata.Properties

if daprPort, ok = props[nr.DaprPort]; !ok {
func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
if metadata.Properties[nr.DaprPort] == "" {
return resolverCfg, fmt.Errorf("metadata property missing: %s", nr.DaprPort)
}

Expand All @@ -187,7 +174,8 @@ func getConfig(metadata nr.Metadata) (resolverConfig, error) {
}

resolverCfg.Client = getClientConfig(cfg)
if resolverCfg.Registration, err = getRegistrationConfig(cfg, props); err != nil {
resolverCfg.Registration, err = getRegistrationConfig(cfg, metadata.Properties)
if err != nil {
return resolverCfg, err
}
resolverCfg.QueryOptions = getQueryOptionsConfig(cfg)
Expand All @@ -198,7 +186,7 @@ func getConfig(metadata nr.Metadata) (resolverConfig, error) {
resolverCfg.Registration.Meta = map[string]string{}
}

resolverCfg.Registration.Meta[resolverCfg.DaprPortMetaKey] = daprPort
resolverCfg.Registration.Meta[resolverCfg.DaprPortMetaKey] = metadata.Properties[nr.DaprPort]
}

return resolverCfg, nil
Expand All @@ -217,15 +205,18 @@ func getRegistrationConfig(cfg configSpec, props map[string]string) (*consul.Age
// if advanced registration configured ignore other registration related configs
if cfg.AdvancedRegistration != nil {
return cfg.AdvancedRegistration, nil
} else if !cfg.SelfRegister {
}
if !cfg.SelfRegister {
return nil, nil
}

var appID string
var appPort string
var host string
var httpPort string
var ok bool
var (
appID string
appPort string
host string
httpPort string
ok bool
)

if appID, ok = props[nr.AppID]; !ok {
return nil, fmt.Errorf("metadata property missing: %s", nr.AppID)
Expand Down
77 changes: 68 additions & 9 deletions nameresolution/consul/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestInit(t *testing.T) {
t.Helper()

var mock mockClient
resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)

_ = resolver.Init(metadata)

Expand All @@ -122,7 +122,7 @@ func TestInit(t *testing.T) {
t.Helper()

var mock mockClient
resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)

_ = resolver.Init(metadata)

Expand All @@ -144,7 +144,7 @@ func TestInit(t *testing.T) {
t.Helper()

var mock mockClient
resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)

_ = resolver.Init(metadata)

Expand All @@ -166,7 +166,7 @@ func TestInit(t *testing.T) {

func TestResolveID(t *testing.T) {
t.Parallel()
testConfig := &resolverConfig{
testConfig := resolverConfig{
DaprPortMetaKey: "DAPR_PORT",
}

Expand All @@ -187,7 +187,8 @@ func TestResolveID(t *testing.T) {
serviceResult: []*consul.ServiceEntry{},
},
}
resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig

_, err := resolver.ResolveID(req)
assert.Equal(t, 1, mock.mockHealth.serviceCalled)
Expand Down Expand Up @@ -216,13 +217,68 @@ func TestResolveID(t *testing.T) {
},
},
}
resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig

addr, _ := resolver.ResolveID(req)

assert.Equal(t, "123.234.345.456:50005", addr)
},
},
{
"should get random address from service",
nr.ResolveRequest{
ID: "test-app",
},
func(t *testing.T, req nr.ResolveRequest) {
t.Helper()
mock := mockClient{
mockHealth: mockHealth{
serviceResult: []*consul.ServiceEntry{
{
Service: &consul.AgentService{
Address: "123.234.345.456",
Port: 8600,
Meta: map[string]string{
"DAPR_PORT": "50005",
},
},
},
{
Service: &consul.AgentService{
Address: "234.345.456.678",
Port: 8600,
Meta: map[string]string{
"DAPR_PORT": "50005",
},
},
},
},
},
}
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig

total1 := 0
total2 := 0
for i := 0; i < 100; i++ {
addr, _ := resolver.ResolveID(req)

if addr == "123.234.345.456:50005" {
total1++
} else if addr == "234.345.456.678:50005" {
total2++
} else {
t.Fatalf("Received unexpected address: %s", addr)
}
}

// Because of the random nature of the address being returned, we just check to make sure we get at least 20 of each (and a total of 100)
assert.Equal(t, 100, total1+total2)
assert.Greater(t, total1, 20)
assert.Greater(t, total2, 20)
},
},
{
"should get address from node if not on service",
nr.ResolveRequest{
Expand Down Expand Up @@ -260,7 +316,8 @@ func TestResolveID(t *testing.T) {
},
},
}
resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig

addr, _ := resolver.ResolveID(req)

Expand Down Expand Up @@ -289,7 +346,8 @@ func TestResolveID(t *testing.T) {
},
},
}
resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig

_, err := resolver.ResolveID(req)

Expand All @@ -315,7 +373,8 @@ func TestResolveID(t *testing.T) {
},
},
}
resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig

_, err := resolver.ResolveID(req)

Expand Down