Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consul name resolution in-memory cache #3121

Merged
merged 30 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
84001c9
nr_consul_cache squashed commits to resolve dco
a-elsheikh Sep 11, 2023
ecd98f0
Update nameresolution/consul/README.md
a-elsheikh Sep 11, 2023
01ed67b
Merge branch 'master' into nr_consul_cache_dco
a-elsheikh Sep 12, 2023
7c7691d
Merge branch 'master' into nr_consul_cache_dco
a-elsheikh Sep 14, 2023
40cbf17
nr_consul_cache refactored to use single routine for watching all ser…
a-elsheikh Sep 20, 2023
f1c0804
Merge branch 'master' into nr_consul_cache_dco
a-elsheikh Sep 20, 2023
a74ee03
nr_consul_cache disable agent cache query for watcher health service …
a-elsheikh Sep 21, 2023
c02778e
Merge branch 'master' into nr_consul_cache_dco
a-elsheikh Sep 22, 2023
673e068
Merge branch 'master' into nr_consul_cache_dco
a-elsheikh Oct 2, 2023
dcb6371
Update nameresolution/consul/consul.go
a-elsheikh Oct 4, 2023
8274b6d
Update nameresolution/consul/consul.go
a-elsheikh Oct 4, 2023
88bdaca
Update nameresolution/consul/consul.go
a-elsheikh Oct 4, 2023
b74f295
Update nameresolution/consul/watcher.go
a-elsheikh Oct 4, 2023
732bc58
Update nameresolution/consul/watcher.go
a-elsheikh Oct 4, 2023
97b537a
Update nameresolution/consul/watcher.go
a-elsheikh Oct 4, 2023
5f18b8e
Update nameresolution/consul/watcher.go
a-elsheikh Oct 4, 2023
b8eb7d5
Update nameresolution/consul/watcher.go
a-elsheikh Oct 4, 2023
74b69a9
merge nr_consul_cache_dco
a-elsheikh Oct 4, 2023
3831d25
use backoff libs, refactor ctx params, some tidy up
a-elsheikh Oct 4, 2023
fb854e6
lint fixes
a-elsheikh Oct 4, 2023
bdb2f29
remove panic recovers
a-elsheikh Oct 4, 2023
b1cbd69
Merge branch 'master' into nr_consul_cache_dco
a-elsheikh Oct 4, 2023
5b0d209
more lint fixes
a-elsheikh Oct 5, 2023
368709f
Merge branch 'nr_consul_cache_dco' of https://github.com/man-group/co…
a-elsheikh Oct 5, 2023
402e2b1
resolve data races in tests
a-elsheikh Oct 5, 2023
0659997
Merge branch 'master' into nr_consul_cache_dco
a-elsheikh Oct 10, 2023
2745d07
💄
ItalyPaleAle Oct 10, 2023
bbfb762
added resolver close method for cleanup and deregister
a-elsheikh Oct 12, 2023
ff8cc37
fix lint issues
a-elsheikh Oct 13, 2023
7c10a9b
Merge branch 'master' into nr_consul_cache_dco
a-elsheikh Oct 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions nameresolution/consul/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Consul Name Resolution

The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behaviour on init and resolution.
The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behavior on init and resolution.

## How To Use

Expand Down Expand Up @@ -35,7 +35,7 @@ spec:
```


## Behaviour
## Behavior

On init the consul component will either validate the connection to the configured (or default) agent or register the service if configured to do so. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services.

Expand All @@ -54,9 +54,10 @@ As of writing the configuration spec is fixed to v1.3.0 of the consul api
| Tags | `[]string` | Configures any tags to include if/when registering services |
| Meta | `map[string]string` | Configures any additional metadata to include if/when registering services |
| DaprPortMetaKey | `string` | The key used for getting the Dapr sidecar port from consul service metadata during service resolution, it will also be used to set the Dapr sidecar port in metadata during registration. If blank it will default to `DAPR_PORT` |
| SelfRegister | `bool` | Controls if Dapr will register the service to consul. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services. |
| SelfRegister | `bool` | Controls if Dapr will register the service to consul on startup. If unset it will default to `false` |
| SelfDeregister | `bool` | Controls if Dapr will deregister the service from consul on shutdown. If unset it will default to `false` |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think when we tried to do that (to solve #2490 ) this was a bad idea. It did un-register all instances of an app and not just the one that was going offline.

I would prefer if this wasn't even offered as an option, as it can cause confusion to users.

(And with #2490 merged in 1.12, this isn't needed anyways)

| AdvancedRegistration | [*api.AgentServiceRegistration](https://pkg.go.dev/github.com/hashicorp/consul/[email protected]#AgentServiceRegistration) | Gives full control of service registration through configuration. If configured the component will ignore any configuration of Checks, Tags, Meta and SelfRegister. |

| UseCache | `bool` | Configures if Dapr will cache the resolved services in-memory. This is done using consul [blocking queries](https://www.consul.io/api-docs/features/blocking) which can be configured via the QueryOptions configuration. If unset it will default to `false` |
## Samples Configurations

### Basic
Expand Down
20 changes: 17 additions & 3 deletions nameresolution/consul/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/dapr/kit/config"
)

const defaultDaprPortMetaKey string = "DAPR_PORT" // default key for DaprPort in meta

// The intermediateConfig is based off of the consul api types. User configurations are
// deserialized into this type before being converted to the equivalent consul types
// that way breaking changes in future versions of the consul api cannot break user configuration.
Expand All @@ -33,8 +35,10 @@ type intermediateConfig struct {
Meta map[string]string
QueryOptions *QueryOptions
AdvancedRegistration *AgentServiceRegistration // advanced use-case
SelfRegister bool
DaprPortMetaKey string
SelfRegister bool
SelfDeregister bool
UseCache bool
}

type configSpec struct {
Expand All @@ -44,8 +48,16 @@ type configSpec struct {
Meta map[string]string
QueryOptions *consul.QueryOptions
AdvancedRegistration *consul.AgentServiceRegistration // advanced use-case
SelfRegister bool
DaprPortMetaKey string
SelfRegister bool
SelfDeregister bool
UseCache bool
}

func newIntermediateConfig() intermediateConfig {
return intermediateConfig{
DaprPortMetaKey: defaultDaprPortMetaKey,
}
}

func parseConfig(rawConfig interface{}) (configSpec, error) {
Expand All @@ -60,7 +72,7 @@ func parseConfig(rawConfig interface{}) (configSpec, error) {
return result, fmt.Errorf("error serializing to json: %w", err)
}

var configuration intermediateConfig
configuration := newIntermediateConfig()
err = json.Unmarshal(data, &configuration)
if err != nil {
return result, fmt.Errorf("error deserializing to configSpec: %w", err)
Expand All @@ -80,7 +92,9 @@ func mapConfig(config intermediateConfig) configSpec {
QueryOptions: mapQueryOptions(config.QueryOptions),
AdvancedRegistration: mapAdvancedRegistration(config.AdvancedRegistration),
SelfRegister: config.SelfRegister,
SelfDeregister: config.SelfDeregister,
DaprPortMetaKey: config.DaprPortMetaKey,
UseCache: config.UseCache,
}
}

Expand Down
215 changes: 184 additions & 31 deletions nameresolution/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
"math/rand"
"net"
"strconv"
"sync"
"sync/atomic"

consul "github.com/hashicorp/consul/api"

nr "github.com/dapr/components-contrib/nameresolution"
"github.com/dapr/kit/logger"
)

const daprMeta string = "DAPR_PORT" // default key for DAPR_PORT metadata

type client struct {
*consul.Client
}
Expand Down Expand Up @@ -59,34 +59,181 @@ type clientInterface interface {
type agentInterface interface {
Self() (map[string]map[string]interface{}, error)
ServiceRegister(service *consul.AgentServiceRegistration) error
ServiceDeregister(serviceID string) error
}

type healthInterface interface {
Service(service, tag string, passingOnly bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error)
State(state string, q *consul.QueryOptions) (consul.HealthChecks, *consul.QueryMeta, error)
}

type resolver struct {
config resolverConfig
logger logger.Logger
client clientInterface
config resolverConfig
logger logger.Logger
client clientInterface
registry registryInterface
watcherStarted atomic.Bool
watcherStopChannel chan struct{}
}

type registryInterface interface {
getKeys() []string
get(service string) *registryEntry
expire(service string) // clears slice of instances
expireAll() // clears slice of instances for all entries
remove(service string) // removes entry from registry
removeAll() // removes all entries from registry
addOrUpdate(service string, services []*consul.ServiceEntry)
registrationChannel() chan string
}

type registry struct {
entries sync.Map
serviceChannel chan string
}

type registryEntry struct {
services []*consul.ServiceEntry
mu sync.RWMutex
}

func (r *registry) getKeys() []string {
var keys []string
r.entries.Range(func(key any, value any) bool {
k := key.(string)
keys = append(keys, k)
return true
})
return keys
}

func (r *registry) get(service string) *registryEntry {
if result, ok := r.entries.Load(service); ok {
return result.(*registryEntry)
}

return nil
}

func (e *registryEntry) next() *consul.ServiceEntry {
e.mu.Lock()
defer e.mu.Unlock()

if len(e.services) == 0 {
return nil
}

// gosec is complaining that we are using a non-crypto-safe PRNG. This is fine in this scenario since we are using it only for selecting a random address for load-balancing.
//nolint:gosec
a-elsheikh marked this conversation as resolved.
Show resolved Hide resolved
return e.services[rand.Int()%len(e.services)]
}

func (r *resolver) getService(service string) (*consul.ServiceEntry, error) {
var services []*consul.ServiceEntry

if r.config.UseCache {
r.startWatcher()

entry := r.registry.get(service)
if entry != nil {
result := entry.next()

if result != nil {
return result, nil
}
} else {
r.registry.registrationChannel() <- service
}
}

options := *r.config.QueryOptions
options.WaitHash = ""
options.WaitIndex = 0
a-elsheikh marked this conversation as resolved.
Show resolved Hide resolved
services, _, err := r.client.Health().Service(service, "", true, &options)

if err != nil {
return nil, fmt.Errorf("failed to query healthy consul services: %w", err)
} else if len(services) == 0 {
return nil, fmt.Errorf("no healthy services found with AppID '%s'", service)
}

//nolint:gosec
return services[rand.Int()%len(services)], nil
}

func (r *registry) addOrUpdate(service string, services []*consul.ServiceEntry) {
// update
entry := r.get(service)
if entry != nil {
entry.mu.Lock()
defer entry.mu.Unlock()

entry.services = services

return
}

// add
r.entries.Store(service, &registryEntry{
services: services,
})
}

func (r *registry) remove(service string) {
r.entries.Delete(service)
}

func (r *registry) removeAll() {
r.entries.Range(func(key any, value any) bool {
r.remove(key.(string))
return true
})
}

func (r *registry) expire(service string) {
entry := r.get(service)
if entry == nil {
return
}

entry.mu.Lock()
defer entry.mu.Unlock()

entry.services = nil
}

func (r *registry) expireAll() {
r.entries.Range(func(key any, value any) bool {
r.expire(key.(string))
return true
})
}

func (r *registry) registrationChannel() chan string {
return r.serviceChannel
}

type resolverConfig struct {
Client *consul.Config
QueryOptions *consul.QueryOptions
Registration *consul.AgentServiceRegistration
DaprPortMetaKey string
Client *consul.Config
QueryOptions *consul.QueryOptions
Registration *consul.AgentServiceRegistration
DeregisterOnClose bool
DaprPortMetaKey string
UseCache bool
}

// NewResolver creates Consul name resolver.
func NewResolver(logger logger.Logger) nr.Resolver {
return newResolver(logger, &client{})
return newResolver(logger, resolverConfig{}, &client{}, &registry{serviceChannel: make(chan string, 100)}, make(chan struct{}))
}

func newResolver(logger logger.Logger, client clientInterface) *resolver {
func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface, watcherStopChannel chan struct{}) nr.Resolver {
return &resolver{
logger: logger,
client: client,
logger: logger,
config: resolverConfig,
client: client,
registry: registry,
watcherStopChannel: watcherStopChannel,
}
}

Expand Down Expand Up @@ -129,23 +276,14 @@ func (r *resolver) Init(metadata nr.Metadata) (err error) {
// ResolveID resolves name to address via consul.
func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) {
cfg := r.config
services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions)
svc, err := r.getService(req.ID)
if err != nil {
return "", fmt.Errorf("failed to query healthy consul services: %w", err)
return "", err
}

if len(services) == 0 {
return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID)
}

// Pick a random service from the result
// Note: we're using math/random here as PRNG and that's ok since we're just using this for selecting a random address from a list for load-balancing, so we don't need a CSPRNG
//nolint:gosec
svc := services[rand.Int()%len(services)]

port := svc.Service.Meta[cfg.DaprPortMetaKey]
if port == "" {
return "", fmt.Errorf("target service AppID '%s' found but DAPR_PORT missing from meta", req.ID)
return "", fmt.Errorf("target service AppID '%s' found but %s missing from meta", req.ID, cfg.DaprPortMetaKey)
}

if svc.Service.Address != "" {
Expand All @@ -159,6 +297,24 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) {
return formatAddress(addr, port)
}

// Close will stop the watcher and deregister app from consul
func (r *resolver) Close() error {
if r.watcherStarted.Load() {
r.watcherStopChannel <- struct{}{}
}

if r.config.Registration != nil && r.config.DeregisterOnClose {
err := r.client.Agent().ServiceDeregister(r.config.Registration.ID)
if err != nil {
return fmt.Errorf("failed to deregister consul service: %w", err)
}

r.logger.Info("deregistered service from consul")
}

return nil
}

func formatAddress(address string, port string) (addr string, err error) {
if net.ParseIP(address).To4() != nil {
return address + ":" + port, nil
Expand All @@ -180,12 +336,9 @@ func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
return resolverCfg, err
}

// set DaprPortMetaKey used for registring DaprPort and resolving from Consul
if cfg.DaprPortMetaKey == "" {
resolverCfg.DaprPortMetaKey = daprMeta
} else {
resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey
}
resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey
resolverCfg.DeregisterOnClose = cfg.SelfDeregister
resolverCfg.UseCache = cfg.UseCache

resolverCfg.Client = getClientConfig(cfg)
resolverCfg.Registration, err = getRegistrationConfig(cfg, metadata.Properties)
Expand Down
Loading