Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consul name resolution in-memory cache #963

Closed
wants to merge 73 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
78768ec
nr_consul_cache init commit for consul nr caching
a-elsheikh Jun 17, 2021
624ea7f
Merge branch 'dapr:master' into nr_consul_cache
a-elsheikh Jun 17, 2021
e68e45b
nr_consul_cache resolving lint issues
a-elsheikh Jun 17, 2021
f704f54
Merge branch 'nr_consul_cache' of https://github.com/man-group/compon…
a-elsheikh Jun 17, 2021
ef53838
nr_consul_cache shared memory improvements
a-elsheikh Jun 18, 2021
042e8f1
Merge branch 'master' into nr_consul_cache
a-elsheikh Jun 30, 2021
27eb664
Merge branch 'master' into nr_consul_cache
a-elsheikh Jul 7, 2021
55bd162
Merge branch 'master' into nr_consul_cache
a-elsheikh Jul 13, 2021
aeec19b
Merge branch 'master' into nr_consul_cache
a-elsheikh Jul 26, 2021
222af97
Merge branch 'master' into nr_consul_cache
a-elsheikh Aug 13, 2021
7426175
Merge branch 'master' into nr_consul_cache
a-elsheikh Sep 9, 2021
a35b2be
Merge branch 'master' into nr_consul_cache
a-elsheikh Sep 20, 2021
2d5e0ef
Merge branch 'master' into nr_consul_cache
a-elsheikh Oct 18, 2021
9a88ae0
Merge branch 'master' into nr_consul_cache
a-elsheikh Oct 19, 2021
46f4aec
Merge branch 'master' into nr_consul_cache
a-elsheikh Oct 25, 2021
0cc18ff
Merge branch 'master' into nr_consul_cache
a-elsheikh Oct 27, 2021
522e3e3
nr_consul_cache fixed linting issues
a-elsheikh Oct 27, 2021
8eb5259
Merge branch 'master' into nr_consul_cache
a-elsheikh Nov 16, 2021
26e9b8c
Merge branch 'master' into nr_consul_cache
a-elsheikh Nov 22, 2021
40c078f
Merge branch 'master' into nr_consul_cache
a-elsheikh Dec 6, 2021
d0de21c
Merge branch 'master' into nr_consul_cache
a-elsheikh Dec 20, 2021
d673172
Merge branch 'master' into nr_consul_cache
a-elsheikh Dec 25, 2021
cc2ff89
Merge branch 'master' into nr_consul_cache
a-elsheikh Jan 3, 2022
db6ac68
Merge branch 'master' into nr_consul_cache
a-elsheikh Feb 7, 2022
ac43d23
Merge branch 'master' into nr_consul_cache
a-elsheikh Mar 11, 2022
4ec4637
Merge branch 'master' into nr_consul_cache
a-elsheikh Mar 14, 2022
111b2ea
Merge branch 'master' into nr_consul_cache
a-elsheikh Mar 16, 2022
a3fddf3
nr_consul_cache changed registry to use sync.Map
a-elsheikh Mar 18, 2022
1169891
nr_consul_cache resolved copylocks
a-elsheikh Mar 18, 2022
fc0a1c8
nr_consul_cache gofumpt
a-elsheikh Mar 18, 2022
2e51512
Merge branch 'master' into nr_consul_cache
a-elsheikh Mar 18, 2022
967a543
Merge branch 'master' into nr_consul_cache
a-elsheikh Mar 21, 2022
64e1a5a
Merge branch 'master' into nr_consul_cache
a-elsheikh Mar 21, 2022
69a0e25
nr_consul_cache remove shuffle in favour of random
a-elsheikh Mar 22, 2022
bd0175e
Merge branch 'master' into nr_consul_cache
a-elsheikh Mar 22, 2022
b20c0a3
Merge branch 'master' into nr_consul_cache
a-elsheikh Mar 22, 2022
d3fd833
Merge branch 'master' into nr_consul_cache
a-elsheikh Mar 28, 2022
dbfe946
Merge branch 'master' into nr_consul_cache
a-elsheikh Mar 30, 2022
9e3ddd3
Merge branch 'master' into nr_consul_cache
a-elsheikh Apr 6, 2022
0646392
Merge branch 'master' into nr_consul_cache
a-elsheikh Apr 7, 2022
bcdf64c
Merge branch 'master' into nr_consul_cache
a-elsheikh Apr 11, 2022
f711dfd
Merge branch 'master' into nr_consul_cache
a-elsheikh Apr 14, 2022
b34366b
Merge branch 'master' into nr_consul_cache
a-elsheikh Apr 19, 2022
f7a76b6
Merge branch 'master' into nr_consul_cache
a-elsheikh Apr 22, 2022
a20e6b6
Merge branch 'master' into nr_consul_cache
a-elsheikh Apr 25, 2022
bda754a
Merge branch 'master' into nr_consul_cache
a-elsheikh Apr 26, 2022
f2d797d
Merge branch 'master' into nr_consul_cache
a-elsheikh Apr 29, 2022
38fbc4d
Merge branch 'master' into nr_consul_cache
a-elsheikh May 6, 2022
231075b
Merge branch 'master' into nr_consul_cache
a-elsheikh May 9, 2022
f044cee
Merge branch 'master' into nr_consul_cache
a-elsheikh May 13, 2022
9dc7777
Merge branch 'master' into nr_consul_cache
a-elsheikh May 16, 2022
aa815c4
Merge branch 'master' into nr_consul_cache
a-elsheikh May 23, 2022
b66f5a1
Merge branch 'master' into nr_consul_cache
a-elsheikh May 30, 2022
aa4d421
Merge branch 'master' into nr_consul_cache
a-elsheikh Jun 13, 2022
f9fb228
Merge branch 'master' into nr_consul_cache
a-elsheikh Jun 20, 2022
94e2bd9
Merge branch 'master' into nr_consul_cache
a-elsheikh Jul 11, 2022
c1fa66a
Merge branch 'master' into nr_consul_cache
a-elsheikh Jul 18, 2022
e97b616
Merge branch 'master' into nr_consul_cache
a-elsheikh Jul 20, 2022
b211bec
Merge branch 'master' into nr_consul_cache
a-elsheikh Aug 16, 2022
2f9e752
Merge branch 'master' into nr_consul_cache
a-elsheikh Nov 15, 2022
b00f401
nr_consul_cache resolve missing changes from merge
a-elsheikh Nov 16, 2022
850f4fa
nr_consul_cache change useCache config default to false
a-elsheikh Nov 16, 2022
f2fa2f7
Merge branch 'master' into nr_consul_cache
a-elsheikh Nov 16, 2022
f765a2f
Merge branch 'master' into nr_consul_cache
a-elsheikh Nov 21, 2022
c246c47
Merge branch 'master' into nr_consul_cache
a-elsheikh Nov 28, 2022
abaa8de
Merge branch 'master' into nr_consul_cache
a-elsheikh Nov 29, 2022
5e25b88
Merge branch 'master' into nr_consul_cache
a-elsheikh Dec 12, 2022
2c0382e
Merge branch 'master' into nr_consul_cache
a-elsheikh Dec 16, 2022
f41da0e
Merge branch 'master' into nr_consul_cache
a-elsheikh Jan 3, 2023
c7d9e20
Merge branch 'master' into nr_consul_cache
a-elsheikh Jan 6, 2023
a3496de
Merge branch 'master' into nr_consul_cache
a-elsheikh May 2, 2023
cc085a3
Merge branch 'master' into nr_consul_cache
a-elsheikh Sep 11, 2023
803d29d
nr_consul_cache fix tests
a-elsheikh Sep 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions nameresolution/consul/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Consul Name Resolution

The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behaviour on init and resolution.
The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behavior on init and resolution.

## How To Use

Expand Down Expand Up @@ -35,7 +35,7 @@ spec:
```


## Behaviour
## Behavior

On init the consul component will either validate the connection to the configured (or default) agent or register the service if configured to do so. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services.

Expand All @@ -56,7 +56,7 @@ As of writing the configuration spec is fixed to v1.3.0 of the consul api
| DaprPortMetaKey | `string` | The key used for getting the Dapr sidecar port from consul service metadata during service resolution, it will also be used to set the Dapr sidecar port in metadata during registration. If blank it will default to `DAPR_PORT` |
| SelfRegister | `bool` | Controls if Dapr will register the service to consul. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services. |
| AdvancedRegistration | [*api.AgentServiceRegistration](https://pkg.go.dev/github.com/hashicorp/consul/[email protected]#AgentServiceRegistration) | Gives full control of service registration through configuration. If configured the component will ignore any configuration of Checks, Tags, Meta and SelfRegister. |

| UseCache | `bool` | Configures if Dapr will cache the resolved services in-memory. This is done using consul [blocking queries](https://www.consul.io/api-docs/features/blocking) which can be configured via the QueryOptions configuration. If blank it will default to `false` |
## Samples Configurations

### Basic
Expand Down
17 changes: 14 additions & 3 deletions nameresolution/consul/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/dapr/kit/config"
)

const defaultDaprPortMetaKey string = "DAPR_PORT" // default key for DaprPort in meta

// The intermediateConfig is based off of the consul api types. User configurations are
// deserialized into this type before being converted to the equivalent consul types
// that way breaking changes in future versions of the consul api cannot break user configuration.
Expand All @@ -33,8 +35,9 @@ type intermediateConfig struct {
Meta map[string]string
QueryOptions *QueryOptions
AdvancedRegistration *AgentServiceRegistration // advanced use-case
SelfRegister bool
DaprPortMetaKey string
SelfRegister bool
UseCache bool
}

type configSpec struct {
Expand All @@ -44,8 +47,15 @@ type configSpec struct {
Meta map[string]string
QueryOptions *consul.QueryOptions
AdvancedRegistration *consul.AgentServiceRegistration // advanced use-case
SelfRegister bool
DaprPortMetaKey string
SelfRegister bool
UseCache bool
}

func newIntermediateConfig() intermediateConfig {
return intermediateConfig{
DaprPortMetaKey: defaultDaprPortMetaKey,
}
}

func parseConfig(rawConfig interface{}) (configSpec, error) {
Expand All @@ -60,7 +70,7 @@ func parseConfig(rawConfig interface{}) (configSpec, error) {
return result, fmt.Errorf("error serializing to json: %w", err)
}

var configuration intermediateConfig
configuration := newIntermediateConfig()
err = json.Unmarshal(data, &configuration)
if err != nil {
return result, fmt.Errorf("error deserializing to configSpec: %w", err)
Expand All @@ -81,6 +91,7 @@ func mapConfig(config intermediateConfig) configSpec {
AdvancedRegistration: mapAdvancedRegistration(config.AdvancedRegistration),
SelfRegister: config.SelfRegister,
DaprPortMetaKey: config.DaprPortMetaKey,
UseCache: config.UseCache,
}
}

Expand Down
148 changes: 121 additions & 27 deletions nameresolution/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ import (
"math/rand"
"net"
"strconv"
"sync"

consul "github.com/hashicorp/consul/api"

nr "github.com/dapr/components-contrib/nameresolution"
"github.com/dapr/kit/logger"
)

const daprMeta string = "DAPR_PORT" // default key for DAPR_PORT metadata

type client struct {
*consul.Client
}
Expand Down Expand Up @@ -66,27 +65,135 @@ type healthInterface interface {
}

type resolver struct {
config resolverConfig
logger logger.Logger
client clientInterface
config resolverConfig
logger logger.Logger
client clientInterface
registry registryInterface
}

type registryInterface interface {
get(service string) *registryEntry
expire(service string) // clears slice of instances
remove(service string) // removes entry from registry
addOrUpdate(service string, services []*consul.ServiceEntry)
}

type registry struct {
entries *sync.Map
}

type registryEntry struct {
services []*consul.ServiceEntry
mu sync.RWMutex
}

func (r *registry) get(service string) *registryEntry {
a-elsheikh marked this conversation as resolved.
Show resolved Hide resolved
if result, ok := r.entries.Load(service); ok {
return result.(*registryEntry)
}

return nil
}

func (e *registryEntry) next() *consul.ServiceEntry {
e.mu.Lock()
defer e.mu.Unlock()

if len(e.services) == 0 {
return nil
}

//nolint:gosec
return e.services[rand.Int()%len(e.services)]
}

func (r *resolver) getService(service string) (*consul.ServiceEntry, error) {
var services []*consul.ServiceEntry

if r.config.UseCache {
var entry *registryEntry

if entry = r.registry.get(service); entry != nil {
result := entry.next()

if result != nil {
return result, nil
}
} else {
r.watchService(service)
}
}

options := *r.config.QueryOptions
options.WaitHash = ""
options.WaitIndex = 0
services, _, err := r.client.Health().Service(service, "", true, &options)

if err != nil {
return nil, fmt.Errorf("failed to query healthy consul services: %w", err)
} else if len(services) == 0 {
return nil, fmt.Errorf("no healthy services found with AppID '%s'", service)
}

//nolint:gosec
return services[rand.Int()%len(services)], nil
}

func (r *registry) addOrUpdate(service string, services []*consul.ServiceEntry) {
var entry *registryEntry

// update
if entry = r.get(service); entry != nil {
entry.mu.Lock()
defer entry.mu.Unlock()

entry.services = services

return
}

// add
r.entries.Store(service, &registryEntry{
services: services,
})
}

func (r *registry) remove(service string) {
r.entries.Delete(service)
}

func (r *registry) expire(service string) {
a-elsheikh marked this conversation as resolved.
Show resolved Hide resolved
var entry *registryEntry

if entry = r.get(service); entry == nil {
return
}

entry.mu.Lock()
defer entry.mu.Unlock()

entry.services = nil
}

type resolverConfig struct {
Client *consul.Config
QueryOptions *consul.QueryOptions
Registration *consul.AgentServiceRegistration
DaprPortMetaKey string
UseCache bool
}

// NewResolver creates Consul name resolver.
func NewResolver(logger logger.Logger) nr.Resolver {
return newResolver(logger, &client{})
return newResolver(logger, resolverConfig{}, &client{}, &registry{entries: &sync.Map{}})
}

func newResolver(logger logger.Logger, client clientInterface) *resolver {
func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface) nr.Resolver {
return &resolver{
logger: logger,
client: client,
logger: logger,
config: resolverConfig,
client: client,
registry: registry,
}
}

Expand Down Expand Up @@ -129,23 +236,14 @@ func (r *resolver) Init(metadata nr.Metadata) (err error) {
// ResolveID resolves name to address via consul.
func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) {
cfg := r.config
services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions)
svc, err := r.getService(req.ID)
if err != nil {
return "", fmt.Errorf("failed to query healthy consul services: %w", err)
return "", err
}

if len(services) == 0 {
return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID)
}

// Pick a random service from the result
// Note: we're using math/random here as PRNG and that's ok since we're just using this for selecting a random address from a list for load-balancing, so we don't need a CSPRNG
//nolint:gosec
svc := services[rand.Int()%len(services)]

port := svc.Service.Meta[cfg.DaprPortMetaKey]
if port == "" {
return "", fmt.Errorf("target service AppID '%s' found but DAPR_PORT missing from meta", req.ID)
return "", fmt.Errorf("target service AppID '%s' found but %s missing from meta", req.ID, cfg.DaprPortMetaKey)
}

if svc.Service.Address != "" {
Expand Down Expand Up @@ -180,12 +278,8 @@ func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
return resolverCfg, err
}

// set DaprPortMetaKey used for registring DaprPort and resolving from Consul
if cfg.DaprPortMetaKey == "" {
resolverCfg.DaprPortMetaKey = daprMeta
} else {
resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey
}
resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey
resolverCfg.UseCache = cfg.UseCache

resolverCfg.Client = getClientConfig(cfg)
resolverCfg.Registration, err = getRegistrationConfig(cfg, metadata.Properties)
Expand Down
Loading