Skip to content

Commit

Permalink
Merge branch 'master' into sqlite-improv
Browse files Browse the repository at this point in the history
  • Loading branch information
ItalyPaleAle authored Oct 23, 2023
2 parents 1db6a06 + ddc10f7 commit b0fe480
Show file tree
Hide file tree
Showing 12 changed files with 1,611 additions and 215 deletions.
26 changes: 26 additions & 0 deletions .build-tools/pkg/metadataschema/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,32 @@ func (c *ComponentMetadata) AppendBuiltin() error {
},
)
}
if slices.Contains(c.Capabilities, "transactional") {
c.Metadata = append(c.Metadata,
Metadata{
Name: "outboxPublishPubsub",
Type: "string",
Description: "For outbox. Sets the name of the pub/sub component to deliver the notifications when publishing state changes",
},
Metadata{
Name: "outboxPublishTopic",
Type: "string",
Description: `For outbox. Sets the topic that receives the state changes on the pub/sub configured with "outboxPublishPubsub". The message body will be a state transaction item for an insert or update operation`,
},
Metadata{
Name: "outboxPubsub",
Type: "string",
Description: `For outbox. Sets the pub/sub component used by Dapr to coordinate the state and pub/sub transactions. If not set, the pub/sub component configured with "outboxPublishPubsub" is used. This is useful if you want to separate the pub/sub component used to send the notification state changes from the one used to coordinate the transaction`,
Default: "outboxPublishPubsub",
},
Metadata{
Name: "outboxDiscardWhenMissingState",
Description: "By setting outboxDiscardWhenMissingState to true, Dapr discards the transaction if it cannot find the state in the database and does not retry. This setting can be useful if the state store data has been deleted for any reason before Dapr was able to deliver the message and you would like Dapr to drop the items from the pub/sub and stop retrying to fetch the state",
Type: "bool",
Default: "false",
},
)
}

c.Metadata = append(c.Metadata,
Metadata{
Expand Down
9 changes: 5 additions & 4 deletions nameresolution/consul/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Consul Name Resolution

The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behaviour on init and resolution.
The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behavior on init and resolution.

## How To Use

Expand Down Expand Up @@ -35,7 +35,7 @@ spec:
```


## Behaviour
## Behavior

On init the consul component will either validate the connection to the configured (or default) agent or register the service if configured to do so. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services.

Expand All @@ -54,9 +54,10 @@ As of writing the configuration spec is fixed to v1.3.0 of the consul api
| Tags | `[]string` | Configures any tags to include if/when registering services |
| Meta | `map[string]string` | Configures any additional metadata to include if/when registering services |
| DaprPortMetaKey | `string` | The key used for getting the Dapr sidecar port from consul service metadata during service resolution, it will also be used to set the Dapr sidecar port in metadata during registration. If blank it will default to `DAPR_PORT` |
| SelfRegister | `bool` | Controls if Dapr will register the service to consul. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services. |
| SelfRegister | `bool` | Controls if Dapr will register the service to consul on startup. If unset it will default to `false` |
| SelfDeregister | `bool` | Controls if Dapr will deregister the service from consul on shutdown. If unset it will default to `false` |
| AdvancedRegistration | [*api.AgentServiceRegistration](https://pkg.go.dev/github.com/hashicorp/consul/[email protected]#AgentServiceRegistration) | Gives full control of service registration through configuration. If configured the component will ignore any configuration of Checks, Tags, Meta and SelfRegister. |

| UseCache | `bool` | Configures if Dapr will cache the resolved services in-memory. This is done using consul [blocking queries](https://www.consul.io/api-docs/features/blocking) which can be configured via the QueryOptions configuration. If unset it will default to `false` |
## Samples Configurations

### Basic
Expand Down
20 changes: 17 additions & 3 deletions nameresolution/consul/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/dapr/kit/config"
)

const defaultDaprPortMetaKey string = "DAPR_PORT" // default key for DaprPort in meta

// The intermediateConfig is based off of the consul api types. User configurations are
// deserialized into this type before being converted to the equivalent consul types
// that way breaking changes in future versions of the consul api cannot break user configuration.
Expand All @@ -33,8 +35,10 @@ type intermediateConfig struct {
Meta map[string]string
QueryOptions *QueryOptions
AdvancedRegistration *AgentServiceRegistration // advanced use-case
SelfRegister bool
DaprPortMetaKey string
SelfRegister bool
SelfDeregister bool
UseCache bool
}

type configSpec struct {
Expand All @@ -44,8 +48,16 @@ type configSpec struct {
Meta map[string]string
QueryOptions *consul.QueryOptions
AdvancedRegistration *consul.AgentServiceRegistration // advanced use-case
SelfRegister bool
DaprPortMetaKey string
SelfRegister bool
SelfDeregister bool
UseCache bool
}

func newIntermediateConfig() intermediateConfig {
return intermediateConfig{
DaprPortMetaKey: defaultDaprPortMetaKey,
}
}

func parseConfig(rawConfig interface{}) (configSpec, error) {
Expand All @@ -60,7 +72,7 @@ func parseConfig(rawConfig interface{}) (configSpec, error) {
return result, fmt.Errorf("error serializing to json: %w", err)
}

var configuration intermediateConfig
configuration := newIntermediateConfig()
err = json.Unmarshal(data, &configuration)
if err != nil {
return result, fmt.Errorf("error deserializing to configSpec: %w", err)
Expand All @@ -80,7 +92,9 @@ func mapConfig(config intermediateConfig) configSpec {
QueryOptions: mapQueryOptions(config.QueryOptions),
AdvancedRegistration: mapAdvancedRegistration(config.AdvancedRegistration),
SelfRegister: config.SelfRegister,
SelfDeregister: config.SelfDeregister,
DaprPortMetaKey: config.DaprPortMetaKey,
UseCache: config.UseCache,
}
}

Expand Down
215 changes: 184 additions & 31 deletions nameresolution/consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import (
"math/rand"
"net"
"strconv"
"sync"
"sync/atomic"

consul "github.com/hashicorp/consul/api"

nr "github.com/dapr/components-contrib/nameresolution"
"github.com/dapr/kit/logger"
)

const daprMeta string = "DAPR_PORT" // default key for DAPR_PORT metadata

type client struct {
*consul.Client
}
Expand Down Expand Up @@ -59,34 +59,181 @@ type clientInterface interface {
type agentInterface interface {
Self() (map[string]map[string]interface{}, error)
ServiceRegister(service *consul.AgentServiceRegistration) error
ServiceDeregister(serviceID string) error
}

type healthInterface interface {
Service(service, tag string, passingOnly bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error)
State(state string, q *consul.QueryOptions) (consul.HealthChecks, *consul.QueryMeta, error)
}

type resolver struct {
config resolverConfig
logger logger.Logger
client clientInterface
config resolverConfig
logger logger.Logger
client clientInterface
registry registryInterface
watcherStarted atomic.Bool
watcherStopChannel chan struct{}
}

type registryInterface interface {
getKeys() []string
get(service string) *registryEntry
expire(service string) // clears slice of instances
expireAll() // clears slice of instances for all entries
remove(service string) // removes entry from registry
removeAll() // removes all entries from registry
addOrUpdate(service string, services []*consul.ServiceEntry)
registrationChannel() chan string
}

type registry struct {
entries sync.Map
serviceChannel chan string
}

type registryEntry struct {
services []*consul.ServiceEntry
mu sync.RWMutex
}

func (r *registry) getKeys() []string {
var keys []string
r.entries.Range(func(key any, value any) bool {
k := key.(string)
keys = append(keys, k)
return true
})
return keys
}

func (r *registry) get(service string) *registryEntry {
if result, ok := r.entries.Load(service); ok {
return result.(*registryEntry)
}

return nil
}

func (e *registryEntry) next() *consul.ServiceEntry {
e.mu.Lock()
defer e.mu.Unlock()

if len(e.services) == 0 {
return nil
}

// gosec is complaining that we are using a non-crypto-safe PRNG. This is fine in this scenario since we are using it only for selecting a random address for load-balancing.
//nolint:gosec
return e.services[rand.Int()%len(e.services)]
}

func (r *resolver) getService(service string) (*consul.ServiceEntry, error) {
var services []*consul.ServiceEntry

if r.config.UseCache {
r.startWatcher()

entry := r.registry.get(service)
if entry != nil {
result := entry.next()

if result != nil {
return result, nil
}
} else {
r.registry.registrationChannel() <- service
}
}

options := *r.config.QueryOptions
options.WaitHash = ""
options.WaitIndex = 0
services, _, err := r.client.Health().Service(service, "", true, &options)

if err != nil {
return nil, fmt.Errorf("failed to query healthy consul services: %w", err)
} else if len(services) == 0 {
return nil, fmt.Errorf("no healthy services found with AppID '%s'", service)
}

//nolint:gosec
return services[rand.Int()%len(services)], nil
}

func (r *registry) addOrUpdate(service string, services []*consul.ServiceEntry) {
// update
entry := r.get(service)
if entry != nil {
entry.mu.Lock()
defer entry.mu.Unlock()

entry.services = services

return
}

// add
r.entries.Store(service, &registryEntry{
services: services,
})
}

func (r *registry) remove(service string) {
r.entries.Delete(service)
}

func (r *registry) removeAll() {
r.entries.Range(func(key any, value any) bool {
r.remove(key.(string))
return true
})
}

func (r *registry) expire(service string) {
entry := r.get(service)
if entry == nil {
return
}

entry.mu.Lock()
defer entry.mu.Unlock()

entry.services = nil
}

func (r *registry) expireAll() {
r.entries.Range(func(key any, value any) bool {
r.expire(key.(string))
return true
})
}

func (r *registry) registrationChannel() chan string {
return r.serviceChannel
}

type resolverConfig struct {
Client *consul.Config
QueryOptions *consul.QueryOptions
Registration *consul.AgentServiceRegistration
DaprPortMetaKey string
Client *consul.Config
QueryOptions *consul.QueryOptions
Registration *consul.AgentServiceRegistration
DeregisterOnClose bool
DaprPortMetaKey string
UseCache bool
}

// NewResolver creates Consul name resolver.
func NewResolver(logger logger.Logger) nr.Resolver {
return newResolver(logger, &client{})
return newResolver(logger, resolverConfig{}, &client{}, &registry{serviceChannel: make(chan string, 100)}, make(chan struct{}))
}

func newResolver(logger logger.Logger, client clientInterface) *resolver {
func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface, watcherStopChannel chan struct{}) nr.Resolver {
return &resolver{
logger: logger,
client: client,
logger: logger,
config: resolverConfig,
client: client,
registry: registry,
watcherStopChannel: watcherStopChannel,
}
}

Expand Down Expand Up @@ -129,23 +276,14 @@ func (r *resolver) Init(metadata nr.Metadata) (err error) {
// ResolveID resolves name to address via consul.
func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) {
cfg := r.config
services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions)
svc, err := r.getService(req.ID)
if err != nil {
return "", fmt.Errorf("failed to query healthy consul services: %w", err)
return "", err
}

if len(services) == 0 {
return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID)
}

// Pick a random service from the result
// Note: we're using math/random here as PRNG and that's ok since we're just using this for selecting a random address from a list for load-balancing, so we don't need a CSPRNG
//nolint:gosec
svc := services[rand.Int()%len(services)]

port := svc.Service.Meta[cfg.DaprPortMetaKey]
if port == "" {
return "", fmt.Errorf("target service AppID '%s' found but DAPR_PORT missing from meta", req.ID)
return "", fmt.Errorf("target service AppID '%s' found but %s missing from meta", req.ID, cfg.DaprPortMetaKey)
}

if svc.Service.Address != "" {
Expand All @@ -159,6 +297,24 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) {
return formatAddress(addr, port)
}

// Close will stop the watcher and deregister app from consul
func (r *resolver) Close() error {
if r.watcherStarted.Load() {
r.watcherStopChannel <- struct{}{}
}

if r.config.Registration != nil && r.config.DeregisterOnClose {
err := r.client.Agent().ServiceDeregister(r.config.Registration.ID)
if err != nil {
return fmt.Errorf("failed to deregister consul service: %w", err)
}

r.logger.Info("deregistered service from consul")
}

return nil
}

func formatAddress(address string, port string) (addr string, err error) {
if net.ParseIP(address).To4() != nil {
return address + ":" + port, nil
Expand All @@ -180,12 +336,9 @@ func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
return resolverCfg, err
}

// set DaprPortMetaKey used for registring DaprPort and resolving from Consul
if cfg.DaprPortMetaKey == "" {
resolverCfg.DaprPortMetaKey = daprMeta
} else {
resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey
}
resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey
resolverCfg.DeregisterOnClose = cfg.SelfDeregister
resolverCfg.UseCache = cfg.UseCache

resolverCfg.Client = getClientConfig(cfg)
resolverCfg.Registration, err = getRegistrationConfig(cfg, metadata.Properties)
Expand Down
Loading

0 comments on commit b0fe480

Please sign in to comment.