From 84001c96d21e3331d9415b407d37c8957f14e43f Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Mon, 11 Sep 2023 13:35:56 +0100 Subject: [PATCH 01/20] nr_consul_cache squashed commits to resolve dco Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/README.md | 6 +- nameresolution/consul/configuration.go | 17 +- nameresolution/consul/consul.go | 148 ++++-- nameresolution/consul/consul_test.go | 634 +++++++++++++++++++++---- nameresolution/consul/watcher.go | 167 +++++++ 5 files changed, 857 insertions(+), 115 deletions(-) create mode 100644 nameresolution/consul/watcher.go diff --git a/nameresolution/consul/README.md b/nameresolution/consul/README.md index a53824954b..8e472aacf7 100644 --- a/nameresolution/consul/README.md +++ b/nameresolution/consul/README.md @@ -1,6 +1,6 @@ # Consul Name Resolution -The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behaviour on init and resolution. +The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behavior on init and resolution. ## How To Use @@ -35,7 +35,7 @@ spec: ``` -## Behaviour +## Behavior On init the consul component will either validate the connection to the configured (or default) agent or register the service if configured to do so. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services. @@ -56,7 +56,7 @@ As of writing the configuration spec is fixed to v1.3.0 of the consul api | DaprPortMetaKey | `string` | The key used for getting the Dapr sidecar port from consul service metadata during service resolution, it will also be used to set the Dapr sidecar port in metadata during registration. If blank it will default to `DAPR_PORT` | | SelfRegister | `bool` | Controls if Dapr will register the service to consul. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services. | | AdvancedRegistration | [*api.AgentServiceRegistration](https://pkg.go.dev/github.com/hashicorp/consul/api@v1.3.0#AgentServiceRegistration) | Gives full control of service registration through configuration. If configured the component will ignore any configuration of Checks, Tags, Meta and SelfRegister. | - +| UseCache | `bool` | Configures if Dapr will cache the resolved services in-memory. This is done using consul [blocking queries](https://www.consul.io/api-docs/features/blocking) which can be configured via the QueryOptions configuration. If blank it will default to `false` | ## Samples Configurations ### Basic diff --git a/nameresolution/consul/configuration.go b/nameresolution/consul/configuration.go index 64ba6d821a..4416968625 100644 --- a/nameresolution/consul/configuration.go +++ b/nameresolution/consul/configuration.go @@ -23,6 +23,8 @@ import ( "github.com/dapr/kit/config" ) +const defaultDaprPortMetaKey string = "DAPR_PORT" // default key for DaprPort in meta + // The intermediateConfig is based off of the consul api types. User configurations are // deserialized into this type before being converted to the equivalent consul types // that way breaking changes in future versions of the consul api cannot break user configuration. @@ -33,8 +35,9 @@ type intermediateConfig struct { Meta map[string]string QueryOptions *QueryOptions AdvancedRegistration *AgentServiceRegistration // advanced use-case - SelfRegister bool DaprPortMetaKey string + SelfRegister bool + UseCache bool } type configSpec struct { @@ -44,8 +47,15 @@ type configSpec struct { Meta map[string]string QueryOptions *consul.QueryOptions AdvancedRegistration *consul.AgentServiceRegistration // advanced use-case - SelfRegister bool DaprPortMetaKey string + SelfRegister bool + UseCache bool +} + +func newIntermediateConfig() intermediateConfig { + return intermediateConfig{ + DaprPortMetaKey: defaultDaprPortMetaKey, + } } func parseConfig(rawConfig interface{}) (configSpec, error) { @@ -60,7 +70,7 @@ func parseConfig(rawConfig interface{}) (configSpec, error) { return result, fmt.Errorf("error serializing to json: %w", err) } - var configuration intermediateConfig + configuration := newIntermediateConfig() err = json.Unmarshal(data, &configuration) if err != nil { return result, fmt.Errorf("error deserializing to configSpec: %w", err) @@ -81,6 +91,7 @@ func mapConfig(config intermediateConfig) configSpec { AdvancedRegistration: mapAdvancedRegistration(config.AdvancedRegistration), SelfRegister: config.SelfRegister, DaprPortMetaKey: config.DaprPortMetaKey, + UseCache: config.UseCache, } } diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index e37e4db7c4..383df3202e 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -18,6 +18,7 @@ import ( "math/rand" "net" "strconv" + "sync" consul "github.com/hashicorp/consul/api" @@ -25,8 +26,6 @@ import ( "github.com/dapr/kit/logger" ) -const daprMeta string = "DAPR_PORT" // default key for DAPR_PORT metadata - type client struct { *consul.Client } @@ -66,9 +65,114 @@ type healthInterface interface { } type resolver struct { - config resolverConfig - logger logger.Logger - client clientInterface + config resolverConfig + logger logger.Logger + client clientInterface + registry registryInterface +} + +type registryInterface interface { + get(service string) *registryEntry + expire(service string) // clears slice of instances + remove(service string) // removes entry from registry + addOrUpdate(service string, services []*consul.ServiceEntry) +} + +type registry struct { + entries *sync.Map +} + +type registryEntry struct { + services []*consul.ServiceEntry + mu sync.RWMutex +} + +func (r *registry) get(service string) *registryEntry { + if result, ok := r.entries.Load(service); ok { + return result.(*registryEntry) + } + + return nil +} + +func (e *registryEntry) next() *consul.ServiceEntry { + e.mu.Lock() + defer e.mu.Unlock() + + if len(e.services) == 0 { + return nil + } + + //nolint:gosec + return e.services[rand.Int()%len(e.services)] +} + +func (r *resolver) getService(service string) (*consul.ServiceEntry, error) { + var services []*consul.ServiceEntry + + if r.config.UseCache { + var entry *registryEntry + + if entry = r.registry.get(service); entry != nil { + result := entry.next() + + if result != nil { + return result, nil + } + } else { + r.watchService(service) + } + } + + options := *r.config.QueryOptions + options.WaitHash = "" + options.WaitIndex = 0 + services, _, err := r.client.Health().Service(service, "", true, &options) + + if err != nil { + return nil, fmt.Errorf("failed to query healthy consul services: %w", err) + } else if len(services) == 0 { + return nil, fmt.Errorf("no healthy services found with AppID '%s'", service) + } + + //nolint:gosec + return services[rand.Int()%len(services)], nil +} + +func (r *registry) addOrUpdate(service string, services []*consul.ServiceEntry) { + var entry *registryEntry + + // update + if entry = r.get(service); entry != nil { + entry.mu.Lock() + defer entry.mu.Unlock() + + entry.services = services + + return + } + + // add + r.entries.Store(service, ®istryEntry{ + services: services, + }) +} + +func (r *registry) remove(service string) { + r.entries.Delete(service) +} + +func (r *registry) expire(service string) { + var entry *registryEntry + + if entry = r.get(service); entry == nil { + return + } + + entry.mu.Lock() + defer entry.mu.Unlock() + + entry.services = nil } type resolverConfig struct { @@ -76,17 +180,20 @@ type resolverConfig struct { QueryOptions *consul.QueryOptions Registration *consul.AgentServiceRegistration DaprPortMetaKey string + UseCache bool } // NewResolver creates Consul name resolver. func NewResolver(logger logger.Logger) nr.Resolver { - return newResolver(logger, &client{}) + return newResolver(logger, resolverConfig{}, &client{}, ®istry{entries: &sync.Map{}}) } -func newResolver(logger logger.Logger, client clientInterface) *resolver { +func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface) nr.Resolver { return &resolver{ - logger: logger, - client: client, + logger: logger, + config: resolverConfig, + client: client, + registry: registry, } } @@ -129,23 +236,14 @@ func (r *resolver) Init(metadata nr.Metadata) (err error) { // ResolveID resolves name to address via consul. func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) { cfg := r.config - services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions) + svc, err := r.getService(req.ID) if err != nil { - return "", fmt.Errorf("failed to query healthy consul services: %w", err) + return "", err } - if len(services) == 0 { - return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID) - } - - // Pick a random service from the result - // Note: we're using math/random here as PRNG and that's ok since we're just using this for selecting a random address from a list for load-balancing, so we don't need a CSPRNG - //nolint:gosec - svc := services[rand.Int()%len(services)] - port := svc.Service.Meta[cfg.DaprPortMetaKey] if port == "" { - return "", fmt.Errorf("target service AppID '%s' found but DAPR_PORT missing from meta", req.ID) + return "", fmt.Errorf("target service AppID '%s' found but %s missing from meta", req.ID, cfg.DaprPortMetaKey) } if svc.Service.Address != "" { @@ -180,12 +278,8 @@ func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) { return resolverCfg, err } - // set DaprPortMetaKey used for registring DaprPort and resolving from Consul - if cfg.DaprPortMetaKey == "" { - resolverCfg.DaprPortMetaKey = daprMeta - } else { - resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey - } + resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey + resolverCfg.UseCache = cfg.UseCache resolverCfg.Client = getClientConfig(cfg) resolverCfg.Registration, err = getRegistrationConfig(cfg, metadata.Properties) diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index 3efe3c17a5..e4525e902c 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -17,7 +17,9 @@ import ( "fmt" "net" "strconv" + "sync" "testing" + "time" consul "github.com/hashicorp/consul/api" "github.com/stretchr/testify/assert" @@ -50,16 +52,25 @@ func (m *mockClient) Agent() agentInterface { } type mockHealth struct { - serviceCalled int - serviceErr error - serviceResult []*consul.ServiceEntry - serviceMeta *consul.QueryMeta + serviceCalled int + serviceErr *error + serviceBehavior func(service, tag string, passingOnly bool, q *consul.QueryOptions) + serviceResult []*consul.ServiceEntry + serviceMeta *consul.QueryMeta } func (m *mockHealth) Service(service, tag string, passingOnly bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) { + if m.serviceBehavior != nil { + m.serviceBehavior(service, tag, passingOnly, q) + } + m.serviceCalled++ - return m.serviceResult, m.serviceMeta, m.serviceErr + if m.serviceErr == nil { + return m.serviceResult, m.serviceMeta, nil + } + + return m.serviceResult, m.serviceMeta, *m.serviceErr } type mockAgent struct { @@ -82,6 +93,32 @@ func (m *mockAgent) ServiceRegister(service *consul.AgentServiceRegistration) er return m.serviceRegisterErr } +type mockRegistry struct { + addOrUpdateCalled int + expireCalled int + removeCalled int + getCalled int + getResult *registryEntry +} + +func (m *mockRegistry) addOrUpdate(service string, services []*consul.ServiceEntry) { + m.addOrUpdateCalled++ +} + +func (m *mockRegistry) expire(service string) { + m.expireCalled++ +} + +func (m *mockRegistry) remove(service string) { + m.removeCalled++ +} + +func (m *mockRegistry) get(service string) *registryEntry { + m.getCalled++ + + return m.getResult +} + func TestInit(t *testing.T) { t.Parallel() @@ -99,7 +136,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), &mock) + resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}) _ = resolver.Init(metadata) @@ -122,7 +159,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), &mock) + resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}) _ = resolver.Init(metadata) @@ -144,7 +181,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), &mock) + resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}) _ = resolver.Init(metadata) @@ -168,6 +205,7 @@ func TestResolveID(t *testing.T) { t.Parallel() testConfig := resolverConfig{ DaprPortMetaKey: "DAPR_PORT", + QueryOptions: &consul.QueryOptions{}, } tests := []struct { @@ -175,6 +213,302 @@ func TestResolveID(t *testing.T) { req nr.ResolveRequest test func(*testing.T, nr.ResolveRequest) }{ + { + "should use cache when enabled", + nr.ResolveRequest{ + ID: "test-app", + }, + func(t *testing.T, req nr.ResolveRequest) { + t.Helper() + + blockingCall := make(chan uint64) + firstTime := true + meta := &consul.QueryMeta{ + LastIndex: 0, + } + + serviceEntries := []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + } + + mock := &mockClient{ + mockHealth: mockHealth{ + serviceResult: serviceEntries, + serviceMeta: meta, + serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { + if firstTime { + firstTime = false + } else { + meta.LastIndex = <-blockingCall + } + }, + serviceErr: nil, + }, + } + + cfg := resolverConfig{ + DaprPortMetaKey: "DAPR_PORT", + UseCache: true, + QueryOptions: &consul.QueryOptions{}, + } + + mockReg := &mockRegistry{} + resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg) + addr, _ := resolver.ResolveID(req) + + // Cache miss pass through + assert.Equal(t, 1, mockReg.getCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) + assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, "10.3.245.137:50005", addr) + + mockReg.getResult = ®istryEntry{ + services: serviceEntries, + } + + blockingCall <- 2 + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 2 }) + assert.Equal(t, 2, mockReg.addOrUpdateCalled) + + // no update when no change in index and payload + blockingCall <- 2 + assert.Equal(t, 2, mockReg.addOrUpdateCalled) + + // no update when no change in payload + blockingCall <- 3 + assert.Equal(t, 2, mockReg.addOrUpdateCalled) + + // update when change in index and payload + mock.mockHealth.serviceResult = []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8601, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + } + blockingCall <- 4 + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 3 }) + assert.Equal(t, 3, mockReg.addOrUpdateCalled) + + _, _ = resolver.ResolveID(req) + assert.Equal(t, 2, mockReg.getCalled) + }, + }, + { + "should remove from cache if watch panic", + nr.ResolveRequest{ + ID: "test-app", + }, + func(t *testing.T, req nr.ResolveRequest) { + t.Helper() + + meta := &consul.QueryMeta{ + LastIndex: 0, + } + firstTime := true + mock := mockClient{ + mockHealth: mockHealth{ + serviceResult: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + }, + serviceMeta: meta, + serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { + if firstTime { + firstTime = false + } else { + panic("oh no") + } + }, + serviceErr: nil, + }, + } + + cfg := resolverConfig{ + DaprPortMetaKey: "DAPR_PORT", + UseCache: true, + QueryOptions: &consul.QueryOptions{}, + } + + mockReg := &mockRegistry{} + resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg) + addr, _ := resolver.ResolveID(req) + + // Cache miss pass through + assert.Equal(t, 1, mockReg.getCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) + assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, "10.3.245.137:50005", addr) + + // Remove + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.removeCalled == 1 }) + assert.Equal(t, 0, mockReg.expireCalled) + assert.Equal(t, 1, mockReg.removeCalled) + }, + }, + { + "should use stale agent cache if available", + nr.ResolveRequest{ + ID: "test-app", + }, + func(t *testing.T, req nr.ResolveRequest) { + t.Helper() + + blockingCall := make(chan uint64) + firstTime := true + meta := &consul.QueryMeta{} + + var err error + + mock := mockClient{ + mockHealth: mockHealth{ + serviceResult: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + }, + serviceMeta: meta, + serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { + if firstTime { + firstTime = false + } else { + if q.WaitIndex > 0 { + err = fmt.Errorf("oh no") + } else { + err = nil + } + + meta.LastIndex = <-blockingCall + } + }, + serviceErr: &err, + }, + } + + cfg := resolverConfig{ + DaprPortMetaKey: "DAPR_PORT", + UseCache: true, + QueryOptions: &consul.QueryOptions{ + WaitIndex: 1, + }, + } + + mockReg := &mockRegistry{} + resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg) + addr, _ := resolver.ResolveID(req) + + // Cache miss pass through + assert.Equal(t, 1, mockReg.getCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) + assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, "10.3.245.137:50005", addr) + + // Blocking call will error as WaitIndex = 1 + blockingCall <- 2 + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.serviceCalled == 2 }) + assert.Equal(t, 2, mock.mockHealth.serviceCalled) + + // Will make a non-blocking call to stale cache + meta.CacheHit = true + meta.CacheAge = time.Hour + blockingCall <- 3 + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.serviceCalled == 3 }) + assert.Equal(t, 3, mock.mockHealth.serviceCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 2 }) + assert.Equal(t, 2, mockReg.addOrUpdateCalled) + }, + }, + { + "should expire cache upon blocking call error", + nr.ResolveRequest{ + ID: "test-app", + }, + func(t *testing.T, req nr.ResolveRequest) { + t.Helper() + + blockingCall := make(chan uint64) + firstTime := true + meta := &consul.QueryMeta{ + LastIndex: 0, + } + + err := fmt.Errorf("oh no") + + mock := mockClient{ + mockHealth: mockHealth{ + serviceResult: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + }, + serviceMeta: meta, + serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { + if firstTime { + firstTime = false + } else { + meta.LastIndex = <-blockingCall + } + }, + serviceErr: nil, + }, + } + + cfg := resolverConfig{ + DaprPortMetaKey: "DAPR_PORT", + UseCache: true, + QueryOptions: &consul.QueryOptions{}, + } + + mockReg := &mockRegistry{} + resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg) + addr, _ := resolver.ResolveID(req) + + // Cache miss pass through + assert.Equal(t, 1, mockReg.getCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) + assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, "10.3.245.137:50005", addr) + + // Error and release blocking call + mock.mockHealth.serviceErr = &err + blockingCall <- 2 + + // Cache expired + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.expireCalled == 1 }) + assert.Equal(t, 1, mockReg.expireCalled) + }, + }, { "error if no healthy services found", nr.ResolveRequest{ @@ -187,8 +521,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{}, }, } - resolver := newResolver(logger.NewLogger("test"), &mock) - resolver.config = testConfig + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) _, err := resolver.ResolveID(req) assert.Equal(t, 1, mock.mockHealth.serviceCalled) @@ -207,7 +540,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.245.255", + Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", @@ -217,12 +550,11 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), &mock) - resolver.config = testConfig + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) addr, _ := resolver.ResolveID(req) - assert.Equal(t, "123.234.245.255:50005", addr) + assert.Equal(t, "10.3.245.137:50005", addr) }, }, { @@ -247,8 +579,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), &mock) - resolver.config = testConfig + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) addr, _ := resolver.ResolveID(req) @@ -267,7 +598,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.245.255", + Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", @@ -286,15 +617,14 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), &mock) - resolver.config = testConfig + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) total1 := 0 total2 := 0 for i := 0; i < 100; i++ { addr, _ := resolver.ResolveID(req) - if addr == "123.234.245.255:50005" { + if addr == "10.3.245.137:50005" { total1++ } else if addr == "234.245.255.228:50005" { total2++ @@ -321,7 +651,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{ { Node: &consul.Node{ - Address: "123.234.245.255", + Address: "10.3.245.137", }, Service: &consul.AgentService{ Address: "", @@ -333,7 +663,7 @@ func TestResolveID(t *testing.T) { }, { Node: &consul.Node{ - Address: "123.234.245.255", + Address: "10.3.245.137", }, Service: &consul.AgentService{ Address: "", @@ -346,12 +676,11 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), &mock) - resolver.config = testConfig + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) addr, _ := resolver.ResolveID(req) - assert.Equal(t, "123.234.245.255:50005", addr) + assert.Equal(t, "10.3.245.137:50005", addr) }, }, { @@ -376,8 +705,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), &mock) - resolver.config = testConfig + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) _, err := resolver.ResolveID(req) @@ -403,8 +731,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), &mock) - resolver.config = testConfig + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) _, err := resolver.ResolveID(req) @@ -416,12 +743,126 @@ func TestResolveID(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.testName, func(t *testing.T) { - t.Parallel() tt.test(t, tt.req) }) } } +func TestRegistry(t *testing.T) { + t.Parallel() + + appID := "myService" + tests := []struct { + testName string + test func(*testing.T) + }{ + { + "should add and update entry", + func(t *testing.T) { + t.Helper() + + registry := ®istry{entries: &sync.Map{}} + + result := []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + }, + }, + } + + registry.addOrUpdate(appID, result) + + entry, _ := registry.entries.Load(appID) + assert.Equal(t, result, entry.(*registryEntry).services) + + update := []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "random", + Port: 123, + }, + }, + } + + registry.addOrUpdate(appID, update) + entry, _ = registry.entries.Load(appID) + assert.Equal(t, update, entry.(*registryEntry).services) + }, + }, + { + "should expire entries", + func(t *testing.T) { + t.Helper() + + entryMap := &sync.Map{} + entryMap.Store( + appID, + ®istryEntry{ + services: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + }, + }, + }, + }) + + registry := ®istry{ + entries: entryMap, + } + + entry, _ := registry.entries.Load(appID) + assert.NotNil(t, entry.(*registryEntry).services) + + registry.expire(appID) + + entry, _ = registry.entries.Load(appID) + assert.Nil(t, entry.(*registryEntry).services) + }, + }, + { + "should remove entry", + func(t *testing.T) { + t.Helper() + + entryMap := &sync.Map{} + entryMap.Store( + appID, + ®istryEntry{ + services: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + }, + }, + }, + }) + + registry := ®istry{ + entries: entryMap, + } + + registry.remove(appID) + + entry, _ := registry.entries.Load(appID) + assert.Nil(t, entry) + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.testName, func(t *testing.T) { + t.Parallel() + tt.test(t) + }) + } +} + func TestParseConfig(t *testing.T) { t.Parallel() @@ -456,6 +897,8 @@ func TestParseConfig(t *testing.T) { "UseCache": true, "Filter": "Checks.ServiceTags contains dapr", }, + "DaprPortMetaKey": "DAPR_PORT", + "UseCache": false, }, configSpec{ Checks: []*consul.AgentServiceCheck{ @@ -479,13 +922,17 @@ func TestParseConfig(t *testing.T) { UseCache: true, Filter: "Checks.ServiceTags contains dapr", }, + DaprPortMetaKey: "DAPR_PORT", + UseCache: false, }, }, { "empty configuration in metadata", true, nil, - configSpec{}, + configSpec{ + DaprPortMetaKey: defaultDaprPortMetaKey, + }, }, { "fail on unsupported map key", @@ -545,15 +992,18 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, true, actual.QueryOptions.UseCache) // DaprPortMetaKey - assert.Equal(t, "DAPR_PORT", actual.DaprPortMetaKey) + assert.Equal(t, defaultDaprPortMetaKey, actual.DaprPortMetaKey) + + // Cache + assert.Equal(t, false, actual.UseCache) }, }, { "empty configuration with SelfRegister should default correctly", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey("")}, - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -572,22 +1022,25 @@ func TestGetConfig(t *testing.T) { // Metadata assert.Equal(t, 1, len(actual.Registration.Meta)) - assert.Equal(t, "50001", actual.Registration.Meta["DAPR_PORT"]) + assert.Equal(t, "50001", actual.Registration.Meta[actual.DaprPortMetaKey]) // QueryOptions assert.Equal(t, true, actual.QueryOptions.UseCache) // DaprPortMetaKey - assert.Equal(t, "DAPR_PORT", actual.DaprPortMetaKey) + assert.Equal(t, defaultDaprPortMetaKey, actual.DaprPortMetaKey) + + // Cache + assert.Equal(t, false, actual.UseCache) }, }, { "DaprPortMetaKey should set registration meta and config used for resolve", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey("")}, - Configuration: configSpec{ - SelfRegister: true, - DaprPortMetaKey: "random_key", + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, + "DaprPortMetaKey": "random_key", }, }, func(t *testing.T, metadata nr.Metadata) { @@ -604,8 +1057,8 @@ func TestGetConfig(t *testing.T) { "missing AppID property should error when SelfRegister true", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey(nr.AppID)}, - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -634,8 +1087,8 @@ func TestGetConfig(t *testing.T) { "missing AppPort property should error when SelfRegister true", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey(nr.AppPort)}, - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -664,8 +1117,8 @@ func TestGetConfig(t *testing.T) { "missing HostAddress property should error when SelfRegister true", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey(nr.HostAddress)}, - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -694,8 +1147,8 @@ func TestGetConfig(t *testing.T) { "missing DaprHTTPPort property should error only when SelfRegister true", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey(nr.DaprHTTPPort)}, - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -757,27 +1210,29 @@ func TestGetConfig(t *testing.T) { "registration should configure correctly", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey("")}, - Configuration: configSpec{ - Checks: []*consul.AgentServiceCheck{ - { - Name: "test-app health check name", - CheckID: "test-app health check id", - Interval: "15s", - HTTP: "http://127.0.0.1:3500/health", + Configuration: map[interface{}]interface{}{ + "Checks": []interface{}{ + map[interface{}]interface{}{ + "Name": "test-app health check name", + "CheckID": "test-app health check id", + "Interval": "15s", + "HTTP": "http://127.0.0.1:3500/health", }, }, - Tags: []string{ + "Tags": []interface{}{ "test", }, - Meta: map[string]string{ + "Meta": map[interface{}]interface{}{ "APP_PORT": "8650", "DAPR_GRPC_PORT": "50005", }, - QueryOptions: &consul.QueryOptions{ - UseCache: false, - Filter: "Checks.ServiceTags contains something", + "QueryOptions": map[interface{}]interface{}{ + "UseCache": false, + "Filter": "Checks.ServiceTags contains something", }, - SelfRegister: true, + "SelfRegister": true, + "DaprPortMetaKey": "PORT", + "UseCache": false, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -798,50 +1253,53 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, "test", actual.Registration.Tags[0]) assert.Equal(t, "8650", actual.Registration.Meta["APP_PORT"]) assert.Equal(t, "50005", actual.Registration.Meta["DAPR_GRPC_PORT"]) + assert.Equal(t, metadata.Properties[nr.DaprPort], actual.Registration.Meta["PORT"]) assert.Equal(t, false, actual.QueryOptions.UseCache) assert.Equal(t, "Checks.ServiceTags contains something", actual.QueryOptions.Filter) + assert.Equal(t, "PORT", actual.DaprPortMetaKey) + assert.Equal(t, false, actual.UseCache) }, }, { "advanced registration should override/ignore other configs", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey("")}, - Configuration: configSpec{ - AdvancedRegistration: &consul.AgentServiceRegistration{ - Name: "random-app-id", - Port: 0o00, - Address: "123.345.678", - Tags: []string{"random-tag"}, - Meta: map[string]string{ + Configuration: map[interface{}]interface{}{ + "AdvancedRegistration": map[interface{}]interface{}{ + "Name": "random-app-id", + "Port": 0o00, + "Address": "123.345.678", + "Tags": []string{"random-tag"}, + "Meta": map[string]string{ "APP_PORT": "000", }, - Checks: []*consul.AgentServiceCheck{ - { - Name: "random health check name", - CheckID: "random health check id", - Interval: "15s", - HTTP: "http://127.0.0.1:3500/health", + "Checks": []interface{}{ + map[interface{}]interface{}{ + "Name": "random health check name", + "CheckID": "random health check id", + "Interval": "15s", + "HTTP": "http://127.0.0.1:3500/health", }, }, }, - Checks: []*consul.AgentServiceCheck{ - { - Name: "test-app health check name", - CheckID: "test-app health check id", - Interval: "15s", - HTTP: "http://127.0.0.1:3500/health", + "Checks": []interface{}{ + map[interface{}]interface{}{ + "Name": "test-app health check name", + "CheckID": "test-app health check id", + "Interval": "15s", + "HTTP": "http://127.0.0.1:3500/health", }, }, - Tags: []string{ + "Tags": []string{ "dapr", "test", }, - Meta: map[string]string{ + "Meta": map[string]string{ "APP_PORT": "123", "DAPR_HTTP_PORT": "3500", "DAPR_GRPC_PORT": "50005", }, - SelfRegister: false, + "SelfRegister": false, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -1145,6 +1603,7 @@ func TestMapConfig(t *testing.T) { }, SelfRegister: true, DaprPortMetaKey: "SOMETHINGSOMETHING", + UseCache: false, } actual := mapConfig(expected) @@ -1161,6 +1620,7 @@ func TestMapConfig(t *testing.T) { assert.Equal(t, expected.Meta, actual.Meta) assert.Equal(t, expected.SelfRegister, actual.SelfRegister) assert.Equal(t, expected.DaprPortMetaKey, actual.DaprPortMetaKey) + assert.Equal(t, expected.UseCache, actual.UseCache) }) t.Run("should map empty configuration", func(t *testing.T) { @@ -1317,3 +1777,13 @@ func getTestPropsWithoutKey(removeKey string) map[string]string { return metadata } + +func waitTillTrueOrTimeout(d time.Duration, condition func() bool) { + for i := 0; i < 100; i++ { + if condition() { + return + } + + time.Sleep(d / 100) + } +} diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go new file mode 100644 index 0000000000..3e822e460f --- /dev/null +++ b/nameresolution/consul/watcher.go @@ -0,0 +1,167 @@ +package consul + +import ( + "context" + "fmt" + "reflect" + "strconv" + "time" + + consul "github.com/hashicorp/consul/api" +) + +const ( + // retryInterval is the base retry value. + retryInterval = 5 * time.Second + + // maximum back off time, this is to prevent exponential runaway. + maxBackoffTime = 180 * time.Second +) + +type watchPlan struct { + expired bool + lastParamVal blockingParamVal + lastResult []*consul.ServiceEntry + service string + options *consul.QueryOptions +} + +type blockingParamVal interface { + equal(other blockingParamVal) bool + next(previous blockingParamVal) blockingParamVal +} + +type waitIndexVal uint64 + +// Equal implements BlockingParamVal. +func (idx waitIndexVal) equal(other blockingParamVal) bool { + if otherIdx, ok := other.(waitIndexVal); ok { + return idx == otherIdx + } + + return false +} + +// Next implements BlockingParamVal. +func (idx waitIndexVal) next(previous blockingParamVal) blockingParamVal { + if previous == nil { + return idx + } + prevIdx, ok := previous.(waitIndexVal) + if ok && prevIdx == idx { + // This value is the same as the previous index, reset + return waitIndexVal(0) + } + + return idx +} + +func (r *resolver) watch(p *watchPlan) (blockingParamVal, []*consul.ServiceEntry, bool, error) { + ctx, cancel := context.WithCancel(context.Background()) + p.options = p.options.WithContext(ctx) + + if p.lastParamVal != nil { + p.options.WaitIndex = uint64(p.lastParamVal.(waitIndexVal)) + } + + defer cancel() + nodes, meta, err := r.client.Health().Service(p.service, "", true, p.options) + + // If error try again without blocking (for stale agent cache) + if err != nil && p.options.WaitIndex != uint64(0) { + p.options.WaitIndex = 0 + nodes, meta, err = r.client.Health().Service(p.service, "", true, p.options) + } + + if err != nil { + if p.options.WaitIndex == uint64(0) && !p.expired { + p.lastResult = nil + p.expired = true + r.registry.expire(p.service) + } + + return nil, nil, false, err + } else if meta.CacheHit && meta.CacheAge > 0 { + err = fmt.Errorf("agent cache is stale (age %s)", meta.CacheAge.String()) + p.expired = false + + return nil, nodes, true, err + } + + p.expired = false + + return waitIndexVal(meta.LastIndex), nodes, false, err +} + +func (r *resolver) runWatchPlan(p *watchPlan) { + defer func() { + recover() + r.registry.remove(p.service) + }() + + // add to registry as now begun watching + r.registry.addOrUpdate(p.service, nil) + failures := 0 + + for { + // invoke blocking call + blockParam, result, stale, err := r.watch(p) + // handle an error in the watch function + if err != nil { + // perform an exponential backoff + failures++ + + // always 0 on err so query is forced to return and set p.healthy! + p.lastParamVal = waitIndexVal(0) + + retry := retryInterval * time.Duration(failures*failures) + if retry > maxBackoffTime { + retry = maxBackoffTime + } + + r.logger.Errorf("consul service-watcher:%s error: %v, retry in %v", p.service, err, retry) + + if stale { + r.logger.Debugf("updating registry for service:%s using stale cache", p.service) + r.registry.addOrUpdate(p.service, result) + } + + time.Sleep(retry) + + continue + } + + // clear the failures + failures = 0 + + // if the index is unchanged do nothing + if p.lastParamVal != nil && p.lastParamVal.equal(blockParam) { + continue + } + + // update the index, look for change + oldParamVal := p.lastParamVal + p.lastParamVal = blockParam.next(oldParamVal) + if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) { + continue + } + + // handle the updated result + p.lastResult = result + r.logger.Debugf( + "updating registry for service:%s last-index:%s", + p.service, + strconv.FormatUint(uint64(p.lastParamVal.(waitIndexVal)), 10)) + r.registry.addOrUpdate(p.service, result) + } +} + +func (r *resolver) watchService(service string) { + options := *r.config.QueryOptions + plan := &watchPlan{ + service: service, + options: &options, + } + + go r.runWatchPlan(plan) +} From ecd98f03701c753f8cb6f44ac603bd048d065a10 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Mon, 11 Sep 2023 16:21:33 +0100 Subject: [PATCH 02/20] Update nameresolution/consul/README.md Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nameresolution/consul/README.md b/nameresolution/consul/README.md index 8e472aacf7..c936c706f7 100644 --- a/nameresolution/consul/README.md +++ b/nameresolution/consul/README.md @@ -56,7 +56,7 @@ As of writing the configuration spec is fixed to v1.3.0 of the consul api | DaprPortMetaKey | `string` | The key used for getting the Dapr sidecar port from consul service metadata during service resolution, it will also be used to set the Dapr sidecar port in metadata during registration. If blank it will default to `DAPR_PORT` | | SelfRegister | `bool` | Controls if Dapr will register the service to consul. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services. | | AdvancedRegistration | [*api.AgentServiceRegistration](https://pkg.go.dev/github.com/hashicorp/consul/api@v1.3.0#AgentServiceRegistration) | Gives full control of service registration through configuration. If configured the component will ignore any configuration of Checks, Tags, Meta and SelfRegister. | -| UseCache | `bool` | Configures if Dapr will cache the resolved services in-memory. This is done using consul [blocking queries](https://www.consul.io/api-docs/features/blocking) which can be configured via the QueryOptions configuration. If blank it will default to `false` | +| UseCache | `bool` | Configures if Dapr will cache the resolved services in-memory. This is done using consul [blocking queries](https://www.consul.io/api-docs/features/blocking) which can be configured via the QueryOptions configuration. If unset it will default to `false` | ## Samples Configurations ### Basic From 40cbf1795c2d1c70545b27541e0ee5f7a067d26f Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 20 Sep 2023 13:08:49 +0100 Subject: [PATCH 03/20] nr_consul_cache refactored to use single routine for watching all services and updating cache Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/consul.go | 54 ++- nameresolution/consul/consul_test.go | 529 +++++++++++++++++++++------ nameresolution/consul/watcher.go | 317 ++++++++++++---- 3 files changed, 709 insertions(+), 191 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 383df3202e..2dababdf86 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -62,24 +62,32 @@ type agentInterface interface { type healthInterface interface { Service(service, tag string, passingOnly bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) + State(state string, q *consul.QueryOptions) (consul.HealthChecks, *consul.QueryMeta, error) } type resolver struct { - config resolverConfig - logger logger.Logger - client clientInterface - registry registryInterface + config resolverConfig + logger logger.Logger + client clientInterface + registry registryInterface + watcherStarted bool + watcherMutex sync.Mutex } type registryInterface interface { + getKeys() []string get(service string) *registryEntry expire(service string) // clears slice of instances + expireAll() // clears slice of instances for all entries remove(service string) // removes entry from registry + removeAll() // removes all entries from registry addOrUpdate(service string, services []*consul.ServiceEntry) + registrationChannel() chan string } type registry struct { - entries *sync.Map + entries *sync.Map + serviceChannel chan string } type registryEntry struct { @@ -87,6 +95,16 @@ type registryEntry struct { mu sync.RWMutex } +func (r *registry) getKeys() []string { + var keys []string + r.entries.Range(func(key any, value any) bool { + k := key.(string) + keys = append(keys, k) + return true + }) + return keys +} + func (r *registry) get(service string) *registryEntry { if result, ok := r.entries.Load(service); ok { return result.(*registryEntry) @@ -111,6 +129,10 @@ func (r *resolver) getService(service string) (*consul.ServiceEntry, error) { var services []*consul.ServiceEntry if r.config.UseCache { + if !r.watcherStarted { + r.startWatcher() + } + var entry *registryEntry if entry = r.registry.get(service); entry != nil { @@ -120,7 +142,7 @@ func (r *resolver) getService(service string) (*consul.ServiceEntry, error) { return result, nil } } else { - r.watchService(service) + r.registry.registrationChannel() <- service } } @@ -162,6 +184,13 @@ func (r *registry) remove(service string) { r.entries.Delete(service) } +func (r *registry) removeAll() { + r.entries.Range(func(key any, value any) bool { + r.remove(key.(string)) + return true + }) +} + func (r *registry) expire(service string) { var entry *registryEntry @@ -175,6 +204,17 @@ func (r *registry) expire(service string) { entry.services = nil } +func (r *registry) expireAll() { + r.entries.Range(func(key any, value any) bool { + r.expire(key.(string)) + return true + }) +} + +func (r *registry) registrationChannel() chan string { + return r.serviceChannel +} + type resolverConfig struct { Client *consul.Config QueryOptions *consul.QueryOptions @@ -185,7 +225,7 @@ type resolverConfig struct { // NewResolver creates Consul name resolver. func NewResolver(logger logger.Logger) nr.Resolver { - return newResolver(logger, resolverConfig{}, &client{}, ®istry{entries: &sync.Map{}}) + return newResolver(logger, resolverConfig{}, &client{}, ®istry{entries: &sync.Map{}, serviceChannel: make(chan string, 100)}) } func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface) nr.Resolver { diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index e4525e902c..9751de062d 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -57,6 +57,29 @@ type mockHealth struct { serviceBehavior func(service, tag string, passingOnly bool, q *consul.QueryOptions) serviceResult []*consul.ServiceEntry serviceMeta *consul.QueryMeta + + stateCallStarted int + stateCalled int + stateError *error + stateBehaviour func(state string, q *consul.QueryOptions) + stateResult consul.HealthChecks + stateMeta *consul.QueryMeta +} + +func (m *mockHealth) State(state string, q *consul.QueryOptions) (consul.HealthChecks, *consul.QueryMeta, error) { + m.stateCallStarted++ + + if m.stateBehaviour != nil { + m.stateBehaviour(state, q) + } + + m.stateCalled++ + + if m.stateError == nil { + return m.stateResult, m.stateMeta, nil + } + + return m.stateResult, m.stateMeta, *m.stateError } func (m *mockHealth) Service(service, tag string, passingOnly bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) { @@ -94,14 +117,49 @@ func (m *mockAgent) ServiceRegister(service *consul.AgentServiceRegistration) er } type mockRegistry struct { - addOrUpdateCalled int - expireCalled int - removeCalled int - getCalled int - getResult *registryEntry + getKeysCalled int + getKeysResult *[]string + getKeysBehaviour func() + addOrUpdateCalled int + addOrUpdateBehaviour func(service string, services []*consul.ServiceEntry) + expireCalled int + expireAllCalled int + removeCalled int + removeAllCalled int + getCalled int + getResult *registryEntry + registerChannelCalled int + registerChannelResult chan string +} + +func (m *mockRegistry) registrationChannel() chan string { + m.registerChannelCalled++ + return m.registerChannelResult +} + +func (m *mockRegistry) getKeys() []string { + if m.getKeysBehaviour != nil { + m.getKeysBehaviour() + } + + m.getKeysCalled++ + + return *m.getKeysResult +} + +func (m *mockRegistry) expireAll() { + m.expireAllCalled++ +} + +func (m *mockRegistry) removeAll() { + m.removeAllCalled++ } func (m *mockRegistry) addOrUpdate(service string, services []*consul.ServiceEntry) { + if m.addOrUpdateBehaviour != nil { + m.addOrUpdateBehaviour(service, services) + } + m.addOrUpdateCalled++ } @@ -222,7 +280,6 @@ func TestResolveID(t *testing.T) { t.Helper() blockingCall := make(chan uint64) - firstTime := true meta := &consul.QueryMeta{ LastIndex: 0, } @@ -239,18 +296,43 @@ func TestResolveID(t *testing.T) { }, } + cachedEntries := []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "70007", + }, + }, + }, + } + + healthChecks := consul.HealthChecks{ + &consul.HealthCheck{ + Node: "0e1234", + ServiceID: "test-app-10.3.245.137-3500", + ServiceName: "test-app", + Status: consul.HealthPassing, + }, + } + mock := &mockClient{ mockHealth: mockHealth{ + // Service() serviceResult: serviceEntries, serviceMeta: meta, serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { - if firstTime { - firstTime = false - } else { - meta.LastIndex = <-blockingCall - } }, serviceErr: nil, + + // State() + stateResult: healthChecks, + stateMeta: meta, + stateBehaviour: func(state string, q *consul.QueryOptions) { + meta.LastIndex = <-blockingCall + }, + stateError: nil, }, } @@ -260,54 +342,80 @@ func TestResolveID(t *testing.T) { QueryOptions: &consul.QueryOptions{}, } - mockReg := &mockRegistry{} + serviceKeys := make([]string, 0, 10) + + mockReg := &mockRegistry{ + registerChannelResult: make(chan string, 100), + getKeysResult: &serviceKeys, + addOrUpdateBehaviour: func(service string, services []*consul.ServiceEntry) { + if services == nil { + serviceKeys = append(serviceKeys, service) + } + }, + } resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg) addr, _ := resolver.ResolveID(req) - // Cache miss pass through + // no apps in registry - cache miss, call agent directly assert.Equal(t, 1, mockReg.getCalled) - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) - assert.Equal(t, 1, mockReg.addOrUpdateCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.getKeysCalled == 2 }) + assert.Equal(t, 1, mock.mockHealth.serviceCalled) assert.Equal(t, "10.3.245.137:50005", addr) + // watcher adds app to registry + assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, 2, mockReg.getKeysCalled) + + mockReg.registerChannelResult <- "test-app" mockReg.getResult = ®istryEntry{ - services: serviceEntries, + services: cachedEntries, } + // blocking query - return new index blockingCall <- 2 - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 2 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 2 }) + assert.Equal(t, 1, mock.mockHealth.stateCalled) + + // get healthy nodes and update registry for service in result + assert.Equal(t, 2, mock.mockHealth.serviceCalled) assert.Equal(t, 2, mockReg.addOrUpdateCalled) + // resolve id should only hit cache now + addr, _ = resolver.ResolveID(req) + assert.Equal(t, "10.3.245.137:70007", addr) + addr, _ = resolver.ResolveID(req) + assert.Equal(t, "10.3.245.137:70007", addr) + addr, _ = resolver.ResolveID(req) + assert.Equal(t, "10.3.245.137:70007", addr) + + assert.Equal(t, 2, mock.mockHealth.serviceCalled) + assert.Equal(t, 4, mockReg.getCalled) + // no update when no change in index and payload blockingCall <- 2 + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 3 }) + assert.Equal(t, 2, mock.mockHealth.stateCalled) + assert.Equal(t, 2, mock.mockHealth.serviceCalled) assert.Equal(t, 2, mockReg.addOrUpdateCalled) // no update when no change in payload blockingCall <- 3 + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 4 }) + assert.Equal(t, 3, mock.mockHealth.stateCalled) + assert.Equal(t, 2, mock.mockHealth.serviceCalled) assert.Equal(t, 2, mockReg.addOrUpdateCalled) // update when change in index and payload - mock.mockHealth.serviceResult = []*consul.ServiceEntry{ - { - Service: &consul.AgentService{ - Address: "10.3.245.137", - Port: 8601, - Meta: map[string]string{ - "DAPR_PORT": "50005", - }, - }, - }, - } + mock.mockHealth.stateResult[0].Status = consul.HealthCritical blockingCall <- 4 - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 3 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 5 }) + assert.Equal(t, 4, mock.mockHealth.stateCalled) + assert.Equal(t, 3, mock.mockHealth.serviceCalled) assert.Equal(t, 3, mockReg.addOrUpdateCalled) - - _, _ = resolver.ResolveID(req) - assert.Equal(t, 2, mockReg.getCalled) }, }, { - "should remove from cache if watch panic", + "should remove all from cache if watcher loop panics", nr.ResolveRequest{ ID: "test-app", }, @@ -317,9 +425,10 @@ func TestResolveID(t *testing.T) { meta := &consul.QueryMeta{ LastIndex: 0, } - firstTime := true + mock := mockClient{ mockHealth: mockHealth{ + // Service() serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ @@ -333,11 +442,6 @@ func TestResolveID(t *testing.T) { }, serviceMeta: meta, serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { - if firstTime { - firstTime = false - } else { - panic("oh no") - } }, serviceErr: nil, }, @@ -349,7 +453,17 @@ func TestResolveID(t *testing.T) { QueryOptions: &consul.QueryOptions{}, } - mockReg := &mockRegistry{} + serviceKeys := make([]string, 0, 10) + + mockReg := &mockRegistry{ + registerChannelResult: make(chan string, 100), + getKeysResult: &serviceKeys, + addOrUpdateBehaviour: func(service string, services []*consul.ServiceEntry) { + if services == nil { + serviceKeys = append(serviceKeys, service) + } + }, + } resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg) addr, _ := resolver.ResolveID(req) @@ -359,14 +473,17 @@ func TestResolveID(t *testing.T) { assert.Equal(t, 1, mockReg.addOrUpdateCalled) assert.Equal(t, "10.3.245.137:50005", addr) + mockReg.getKeysBehaviour = func() { panic("oh no") } + mockReg.registerChannelResult <- "test-app" + // Remove - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.removeCalled == 1 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.removeAllCalled == 1 }) assert.Equal(t, 0, mockReg.expireCalled) - assert.Equal(t, 1, mockReg.removeCalled) + assert.Equal(t, 1, mockReg.removeAllCalled) }, }, { - "should use stale agent cache if available", + "should only update cache on change", nr.ResolveRequest{ ID: "test-app", }, @@ -374,13 +491,47 @@ func TestResolveID(t *testing.T) { t.Helper() blockingCall := make(chan uint64) - firstTime := true meta := &consul.QueryMeta{} var err error + // Node 1 all checks healthy + node1check1 := &consul.HealthCheck{ + Node: "0e1234", + ServiceID: "test-app-10.3.245.137-3500", + ServiceName: "test-app", + Status: consul.HealthPassing, + CheckID: "1", + } + + node1check2 := &consul.HealthCheck{ + Node: "0e1234", + ServiceID: "test-app-10.3.245.137-3500", + ServiceName: "test-app", + Status: consul.HealthPassing, + CheckID: "2", + } + + // Node 2 all checks unhealthy + node2check1 := &consul.HealthCheck{ + Node: "0e9878", + ServiceID: "test-app-10.3.245.127-3500", + ServiceName: "test-app", + Status: consul.HealthCritical, + CheckID: "1", + } + + node2check2 := &consul.HealthCheck{ + Node: "0e9878", + ServiceID: "test-app-10.3.245.127-3500", + ServiceName: "test-app", + Status: consul.HealthCritical, + CheckID: "2", + } + mock := mockClient{ mockHealth: mockHealth{ + // Service() serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ @@ -392,21 +543,22 @@ func TestResolveID(t *testing.T) { }, }, }, - serviceMeta: meta, - serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { - if firstTime { - firstTime = false - } else { - if q.WaitIndex > 0 { - err = fmt.Errorf("oh no") - } else { - err = nil - } - - meta.LastIndex = <-blockingCall - } + serviceMeta: meta, + serviceBehavior: nil, + serviceErr: &err, + + // State() + stateResult: consul.HealthChecks{ + node1check1, + node1check2, + node2check1, + node2check2, + }, + stateMeta: meta, + stateBehaviour: func(state string, q *consul.QueryOptions) { + meta.LastIndex = <-blockingCall }, - serviceErr: &err, + stateError: nil, }, } @@ -418,29 +570,90 @@ func TestResolveID(t *testing.T) { }, } - mockReg := &mockRegistry{} + serviceKeys := make([]string, 0, 10) + + mockReg := &mockRegistry{ + registerChannelResult: make(chan string, 100), + getKeysResult: &serviceKeys, + addOrUpdateBehaviour: func(service string, services []*consul.ServiceEntry) { + if services == nil { + serviceKeys = append(serviceKeys, service) + } + }, + } resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg) addr, _ := resolver.ResolveID(req) - // Cache miss pass through + // no apps in registry - cache miss, call agent directly assert.Equal(t, 1, mockReg.getCalled) waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) - assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, 1, mock.mockHealth.serviceCalled) assert.Equal(t, "10.3.245.137:50005", addr) - // Blocking call will error as WaitIndex = 1 + // watcher adds app to registry + assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, 2, mockReg.getKeysCalled) + + // add key to mock registry - trigger watcher + mockReg.registerChannelResult <- "test-app" + mockReg.getResult = ®istryEntry{ + services: mock.mockHealth.serviceResult, + } + + // blocking query - return new index blockingCall <- 2 - waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.serviceCalled == 2 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 2 }) + assert.Equal(t, 1, mock.mockHealth.stateCalled) + + // get healthy nodes and update registry for service in result assert.Equal(t, 2, mock.mockHealth.serviceCalled) + assert.Equal(t, 2, mockReg.addOrUpdateCalled) - // Will make a non-blocking call to stale cache - meta.CacheHit = true - meta.CacheAge = time.Hour + // resolve id should only hit cache now + _, _ = resolver.ResolveID(req) + _, _ = resolver.ResolveID(req) + _, _ = resolver.ResolveID(req) + assert.Equal(t, 2, mock.mockHealth.serviceCalled) + + // change one check for node1 app to critical + node1check1.Status = consul.HealthCritical + + // blocking query - return new index - node1 app is now unhealthy blockingCall <- 3 - waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.serviceCalled == 3 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 3 }) + assert.Equal(t, 2, mock.mockHealth.stateCalled) assert.Equal(t, 3, mock.mockHealth.serviceCalled) - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 2 }) - assert.Equal(t, 2, mockReg.addOrUpdateCalled) + assert.Equal(t, 3, mockReg.addOrUpdateCalled) + + // change remaining check for node1 app to critical + node1check2.Status = consul.HealthCritical + + // blocking query - return new index - node1 app is still unhealthy, no change + blockingCall <- 4 + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 4 }) + assert.Equal(t, 3, mock.mockHealth.stateCalled) + assert.Equal(t, 3, mock.mockHealth.serviceCalled) + assert.Equal(t, 3, mockReg.addOrUpdateCalled) + + // change one check for node2 app to healthy + node2check1.Status = consul.HealthPassing + + // blocking query - return new index - node2 app is still unhealthy, no change + blockingCall <- 4 + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 5 }) + assert.Equal(t, 4, mock.mockHealth.stateCalled) + assert.Equal(t, 3, mock.mockHealth.serviceCalled) + assert.Equal(t, 3, mockReg.addOrUpdateCalled) + + // change remaining check for node2 app to healthy + node2check2.Status = consul.HealthPassing + + // blocking query - return new index - node2 app is now healthy + blockingCall <- 5 + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 6 }) + assert.Equal(t, 5, mock.mockHealth.stateCalled) + assert.Equal(t, 4, mock.mockHealth.serviceCalled) + assert.Equal(t, 4, mockReg.addOrUpdateCalled) }, }, { @@ -452,35 +665,49 @@ func TestResolveID(t *testing.T) { t.Helper() blockingCall := make(chan uint64) - firstTime := true meta := &consul.QueryMeta{ LastIndex: 0, } err := fmt.Errorf("oh no") - mock := mockClient{ - mockHealth: mockHealth{ - serviceResult: []*consul.ServiceEntry{ - { - Service: &consul.AgentService{ - Address: "10.3.245.137", - Port: 8600, - Meta: map[string]string{ - "DAPR_PORT": "50005", - }, - }, + serviceEntries := []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", }, }, - serviceMeta: meta, + }, + } + + healthChecks := consul.HealthChecks{ + &consul.HealthCheck{ + Node: "0e1234", + ServiceID: "test-app-10.3.245.137-3500", + ServiceName: "test-app", + Status: consul.HealthPassing, + }, + } + + mock := &mockClient{ + mockHealth: mockHealth{ + // Service() + serviceResult: serviceEntries, + serviceMeta: meta, serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { - if firstTime { - firstTime = false - } else { - meta.LastIndex = <-blockingCall - } }, serviceErr: nil, + + // State() + stateResult: healthChecks, + stateMeta: meta, + stateBehaviour: func(state string, q *consul.QueryOptions) { + meta.LastIndex = <-blockingCall + }, + stateError: nil, }, } @@ -490,23 +717,44 @@ func TestResolveID(t *testing.T) { QueryOptions: &consul.QueryOptions{}, } - mockReg := &mockRegistry{} - resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg) + serviceKeys := make([]string, 0, 10) + + mockReg := &mockRegistry{ + registerChannelResult: make(chan string, 100), + getKeysResult: &serviceKeys, + addOrUpdateBehaviour: func(service string, services []*consul.ServiceEntry) { + if services == nil { + serviceKeys = append(serviceKeys, service) + } + }, + } + resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg) addr, _ := resolver.ResolveID(req) // Cache miss pass through assert.Equal(t, 1, mockReg.getCalled) waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) + assert.Equal(t, 1, mock.mockHealth.serviceCalled) assert.Equal(t, 1, mockReg.addOrUpdateCalled) assert.Equal(t, "10.3.245.137:50005", addr) - // Error and release blocking call - mock.mockHealth.serviceErr = &err + mockReg.getKeysResult = &serviceKeys + mockReg.registerChannelResult <- "test-app" + mockReg.getResult = ®istryEntry{ + services: serviceEntries, + } + blockingCall <- 2 + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 2 }) + assert.Equal(t, 1, mock.mockHealth.stateCalled) + assert.Equal(t, 2, mock.mockHealth.serviceCalled) + assert.Equal(t, 2, mockReg.addOrUpdateCalled) - // Cache expired - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.expireCalled == 1 }) - assert.Equal(t, 1, mockReg.expireCalled) + mock.mockHealth.stateError = &err + blockingCall <- 3 + blockingCall <- 3 + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 2 }) + assert.Equal(t, 1, mockReg.expireAllCalled) }, }, { @@ -798,7 +1046,33 @@ func TestRegistry(t *testing.T) { entryMap := &sync.Map{} entryMap.Store( - appID, + "A", + ®istryEntry{ + services: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + }, + }, + }, + }) + + entryMap.Store( + "B", + ®istryEntry{ + services: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + }, + }, + }, + }) + + entryMap.Store( + "C", ®istryEntry{ services: []*consul.ServiceEntry{ { @@ -814,13 +1088,27 @@ func TestRegistry(t *testing.T) { entries: entryMap, } - entry, _ := registry.entries.Load(appID) - assert.NotNil(t, entry.(*registryEntry).services) + result, _ := registry.entries.Load("A") + assert.NotNil(t, result.(*registryEntry).services) - registry.expire(appID) + registry.expire("A") - entry, _ = registry.entries.Load(appID) - assert.Nil(t, entry.(*registryEntry).services) + result, _ = registry.entries.Load("A") + assert.Nil(t, result.(*registryEntry).services) + + registry.expireAll() + count := 0 + nilCount := 0 + registry.entries.Range(func(key, value any) bool { + count++ + if value.(*registryEntry).services == nil { + nilCount++ + } + return true + }) + + assert.Equal(t, 3, count) + assert.Equal(t, 3, nilCount) }, }, { @@ -829,27 +1117,42 @@ func TestRegistry(t *testing.T) { t.Helper() entryMap := &sync.Map{} - entryMap.Store( - appID, - ®istryEntry{ - services: []*consul.ServiceEntry{ - { - Service: &consul.AgentService{ - Address: "10.3.245.137", - Port: 8600, - }, + entry := ®istryEntry{ + services: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, }, }, - }) + }, + } + + entryMap.Store("A", entry) + entryMap.Store("B", entry) + entryMap.Store("C", entry) + entryMap.Store("D", entry) registry := ®istry{ entries: entryMap, } - registry.remove(appID) + registry.remove("A") - entry, _ := registry.entries.Load(appID) - assert.Nil(t, entry) + result, _ := registry.entries.Load("A") + assert.Nil(t, result) + + result, _ = registry.entries.Load("B") + assert.NotNil(t, result) + + registry.removeAll() + count := 0 + registry.entries.Range(func(key, value any) bool { + count++ + return true + }) + + assert.Equal(t, 0, count) }, }, } diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index 3e822e460f..0effaba4c6 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -2,9 +2,10 @@ package consul import ( "context" + "errors" "fmt" - "reflect" "strconv" + "strings" "time" consul "github.com/hashicorp/consul/api" @@ -19,11 +20,12 @@ const ( ) type watchPlan struct { - expired bool - lastParamVal blockingParamVal - lastResult []*consul.ServiceEntry - service string - options *consul.QueryOptions + expired bool + lastParamVal blockingParamVal + lastResult map[serviceIdentifier]bool + options *consul.QueryOptions + configuredQueryFilter string + failures int } type blockingParamVal interface { @@ -56,112 +58,285 @@ func (idx waitIndexVal) next(previous blockingParamVal) blockingParamVal { return idx } -func (r *resolver) watch(p *watchPlan) (blockingParamVal, []*consul.ServiceEntry, bool, error) { - ctx, cancel := context.WithCancel(context.Background()) +type serviceIdentifier struct { + serviceName string + serviceID string + node string +} + +func getServiceIdentifier(c *consul.HealthCheck) serviceIdentifier { + return serviceIdentifier{ + serviceID: c.ServiceID, + serviceName: c.ServiceName, + node: c.Node} +} + +func getHealthByService(checks consul.HealthChecks) map[serviceIdentifier]bool { + healthByService := make(map[serviceIdentifier]bool) + for _, check := range checks { + id := getServiceIdentifier(check) + + if state, ok := healthByService[id]; !ok { + // Init to healthy + healthByService[id] = true + } else if !state { + continue + } + + if check.Status != consul.HealthPassing { + healthByService[id] = false + } + } + + return healthByService +} + +func (p *watchPlan) getChangedServices(newResult map[serviceIdentifier]bool) map[string]struct{} { + changedServices := make(map[string]struct{}) + + // get changed services + for newKey, newValue := range newResult { + if oldValue, ok := p.lastResult[newKey]; ok && newValue == oldValue { + continue + } + + changedServices[newKey.serviceName] = struct{}{} + } + + for oldKey := range p.lastResult { + if _, ok := newResult[oldKey]; !ok { + changedServices[oldKey.serviceName] = struct{}{} + } + } + + return changedServices +} + +func (p *watchPlan) buildServiceNameFilter(services []string) { + var nameFilters = make([]string, len(services)) + + for i, v := range services { + nameFilters[i] = fmt.Sprintf("ServiceName==\"%s\"", v) + } + + filter := fmt.Sprintf("(%s)", strings.Join(nameFilters, " or ")) + + if len(p.configuredQueryFilter) < 1 { + p.options.Filter = filter + } else { + p.options.Filter = fmt.Sprintf("%s and %s", p.configuredQueryFilter, filter) + } +} + +func (r *resolver) watch(p *watchPlan, services []string, ctx context.Context) (blockingParamVal, consul.HealthChecks, error, bool) { p.options = p.options.WithContext(ctx) if p.lastParamVal != nil { p.options.WaitIndex = uint64(p.lastParamVal.(waitIndexVal)) } - defer cancel() - nodes, meta, err := r.client.Health().Service(p.service, "", true, p.options) + p.buildServiceNameFilter(services) - // If error try again without blocking (for stale agent cache) - if err != nil && p.options.WaitIndex != uint64(0) { - p.options.WaitIndex = 0 - nodes, meta, err = r.client.Health().Service(p.service, "", true, p.options) - } + checks, meta, err := r.client.Health().State(consul.HealthAny, p.options) if err != nil { - if p.options.WaitIndex == uint64(0) && !p.expired { - p.lastResult = nil - p.expired = true - r.registry.expire(p.service) + // if it failed during long poll try again with no wait + if p.options.WaitIndex != uint64(0) { + p.options.WaitIndex = 0 + checks, meta, err = r.client.Health().State(consul.HealthAny, p.options) } - return nil, nil, false, err - } else if meta.CacheHit && meta.CacheAge > 0 { - err = fmt.Errorf("agent cache is stale (age %s)", meta.CacheAge.String()) - p.expired = false + if err != nil { + // if the context was canceled + if errors.Is(err, context.Canceled) { + return nil, nil, nil, true + } - return nil, nodes, true, err + // if it failed with no wait and plan is not expired + if p.options.WaitIndex == uint64(0) && !p.expired { + p.lastResult = nil + p.expired = true + r.registry.expireAll() + } + + return nil, nil, err, false + } } p.expired = false - return waitIndexVal(meta.LastIndex), nodes, false, err + return waitIndexVal(meta.LastIndex), checks, err, false } -func (r *resolver) runWatchPlan(p *watchPlan) { +func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Context, watchTask chan bool) { defer func() { recover() - r.registry.remove(p.service) + // complete watch task + watchTask <- true }() - // add to registry as now begun watching - r.registry.addOrUpdate(p.service, nil) - failures := 0 + // invoke blocking call + blockParam, result, err, canceled := r.watch(p, services, ctx) + + // if the ctx was canceled then do nothing + if canceled { + return + } + + // handle an error in the watch function + if err != nil { + // always 0 on err so query is forced to return + p.lastParamVal = waitIndexVal(0) + + // perform an exponential backoff + p.failures++ + + retry := retryInterval * time.Duration(p.failures*p.failures) + if retry > maxBackoffTime { + retry = maxBackoffTime + } + + // pause watcher routine until ctx is canceled or retry timer finishes + r.logger.Errorf("consul service-watcher error: %v, retry in %v", err, retry) + sleepTimer := time.NewTimer(retry) + select { + case <-ctx.Done(): + sleepTimer.Stop() + r.logger.Debugf("consul service-watcher retry throttling canceled") + case <-sleepTimer.C: + } + + return + } + + // reset the plan failure count + p.failures = 0 + + // if the index is unchanged do nothing + if p.lastParamVal != nil && p.lastParamVal.equal(blockParam) { + return + } + + // update the plan index + oldParamVal := p.lastParamVal + p.lastParamVal = blockParam.next(oldParamVal) + + // compare last and new result to get changed services + healthByService := getHealthByService(result) + changedServices := p.getChangedServices(healthByService) + + // update the plan last result + p.lastResult = healthByService + + // call agent to get updated healthy nodes for each changed service + for k := range changedServices { + p.options.WaitIndex = 0 + p.options.Filter = p.configuredQueryFilter + p.options = p.options.WithContext(ctx) + result, _, err := r.client.Health().Service(k, "", true, p.options) - for { - // invoke blocking call - blockParam, result, stale, err := r.watch(p) - // handle an error in the watch function if err != nil { - // perform an exponential backoff - failures++ + // on failure, expire service from cache, resolver will fall back to agent + r.logger.Errorf("error invoking health service: %v, for service %s", err, k) + r.registry.expire(k) + + // remove healthchecks for service from last result + for key := range p.lastResult { + if k == key.serviceName { + delete(p.lastResult, key) + } + } - // always 0 on err so query is forced to return and set p.healthy! + // reset plan query index p.lastParamVal = waitIndexVal(0) + } else { + // updated service entries in registry + r.logger.Debugf("updating registry for service:%s last-index:%s", k, strconv.FormatUint(uint64(p.lastParamVal.(waitIndexVal)), 10)) + r.registry.addOrUpdate(k, result) + } + } +} - retry := retryInterval * time.Duration(failures*failures) - if retry > maxBackoffTime { - retry = maxBackoffTime - } +func (r *resolver) runWatchLoop(p *watchPlan) { + defer func() { + recover() + r.registry.removeAll() + r.watcherStarted = false + }() - r.logger.Errorf("consul service-watcher:%s error: %v, retry in %v", p.service, err, retry) + watchTask := make(chan bool, 1) - if stale { - r.logger.Debugf("updating registry for service:%s using stale cache", p.service) - r.registry.addOrUpdate(p.service, result) - } + for { + ctx, cancel := context.WithCancel(context.Background()) - time.Sleep(retry) + services := r.registry.getKeys() + watching := false - continue + if len(services) > 0 { + go r.runWatchPlan(p, services, ctx, watchTask) + watching = true } - // clear the failures - failures = 0 + select { + case <-watchTask: + cancel() - // if the index is unchanged do nothing - if p.lastParamVal != nil && p.lastParamVal.equal(blockParam) { - continue - } + // wait on channel for new services to track + case service := <-r.registry.registrationChannel(): + // cancel blocking query to consul agent + cancel() - // update the index, look for change - oldParamVal := p.lastParamVal - p.lastParamVal = blockParam.next(oldParamVal) - if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) { - continue - } + // generate set of keys + serviceKeys := make(map[string]interface{}) + for i := 0; i < len(services); i++ { + serviceKeys[services[i]] = nil + } - // handle the updated result - p.lastResult = result - r.logger.Debugf( - "updating registry for service:%s last-index:%s", - p.service, - strconv.FormatUint(uint64(p.lastParamVal.(waitIndexVal)), 10)) - r.registry.addOrUpdate(p.service, result) + // add service if it's not in the registry + if _, ok := serviceKeys[service]; !ok { + r.registry.addOrUpdate(service, nil) + } + + // check for any more new services in channel + moreServices := true + for moreServices { + select { + case service := <-r.registry.registrationChannel(): + if _, ok := serviceKeys[service]; !ok { + r.registry.addOrUpdate(service, nil) + } + default: + moreServices = false + } + } + + if watching { + // ensure previous routine completed before next loop + <-watchTask + } + + // reset plan failure count and query index + p.failures = 0 + p.lastParamVal = waitIndexVal(0) + } } } -func (r *resolver) watchService(service string) { +func (r *resolver) startWatcher() { + r.watcherMutex.Lock() + defer r.watcherMutex.Unlock() + + if r.watcherStarted { + return + } + options := *r.config.QueryOptions plan := &watchPlan{ - service: service, - options: &options, + options: &options, + configuredQueryFilter: options.Filter, + lastResult: make(map[serviceIdentifier]bool), } - go r.runWatchPlan(plan) + r.watcherStarted = true + go r.runWatchLoop(plan) } From a74ee037b9c8f1625d94abe43a9bd3d724abb483 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Thu, 21 Sep 2023 17:49:49 +0100 Subject: [PATCH 04/20] nr_consul_cache disable agent cache query for watcher health service call, added more comments Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/watcher.go | 123 +++++++++++++++++-------------- 1 file changed, 68 insertions(+), 55 deletions(-) diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index 0effaba4c6..ecf1667728 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strconv" "strings" "time" @@ -19,13 +18,15 @@ const ( maxBackoffTime = 180 * time.Second ) +// A watchPlan contains all the state tracked in the loop +// that keeps the consul service registry cache fresh type watchPlan struct { - expired bool - lastParamVal blockingParamVal - lastResult map[serviceIdentifier]bool - options *consul.QueryOptions - configuredQueryFilter string - failures int + expired bool + lastParamVal blockingParamVal + lastResult map[serviceIdentifier]bool + options *consul.QueryOptions + healthServiceQueryFilter string + failures int } type blockingParamVal interface { @@ -51,7 +52,7 @@ func (idx waitIndexVal) next(previous blockingParamVal) blockingParamVal { } prevIdx, ok := previous.(waitIndexVal) if ok && prevIdx == idx { - // This value is the same as the previous index, reset + // this value is the same as the previous index, reset return waitIndexVal(0) } @@ -64,25 +65,24 @@ type serviceIdentifier struct { node string } -func getServiceIdentifier(c *consul.HealthCheck) serviceIdentifier { - return serviceIdentifier{ - serviceID: c.ServiceID, - serviceName: c.ServiceName, - node: c.Node} -} - func getHealthByService(checks consul.HealthChecks) map[serviceIdentifier]bool { healthByService := make(map[serviceIdentifier]bool) for _, check := range checks { - id := getServiceIdentifier(check) + // generate unique identifer for service + id := serviceIdentifier{ + serviceID: check.ServiceID, + serviceName: check.ServiceName, + node: check.Node} + // if the service is not in the map - add and init to healthy if state, ok := healthByService[id]; !ok { - // Init to healthy healthByService[id] = true } else if !state { + // service exists and is already unhealthy - skip continue } + // if the check is not healthy then set service to unhealthy if check.Status != consul.HealthPassing { healthByService[id] = false } @@ -92,18 +92,22 @@ func getHealthByService(checks consul.HealthChecks) map[serviceIdentifier]bool { } func (p *watchPlan) getChangedServices(newResult map[serviceIdentifier]bool) map[string]struct{} { - changedServices := make(map[string]struct{}) + changedServices := make(map[string]struct{}) // service name set - // get changed services + // foreach new result for newKey, newValue := range newResult { + // if the service exists in the old result and has the same value - skip if oldValue, ok := p.lastResult[newKey]; ok && newValue == oldValue { continue } + // service is new or changed - add to set changedServices[newKey.serviceName] = struct{}{} } + // foreach old result for oldKey := range p.lastResult { + // if the service does not exist in the new result - add to set if _, ok := newResult[oldKey]; !ok { changedServices[oldKey.serviceName] = struct{}{} } @@ -112,20 +116,14 @@ func (p *watchPlan) getChangedServices(newResult map[serviceIdentifier]bool) map return changedServices } -func (p *watchPlan) buildServiceNameFilter(services []string) { +func getServiceNameFilter(services []string) string { var nameFilters = make([]string, len(services)) for i, v := range services { nameFilters[i] = fmt.Sprintf("ServiceName==\"%s\"", v) } - filter := fmt.Sprintf("(%s)", strings.Join(nameFilters, " or ")) - - if len(p.configuredQueryFilter) < 1 { - p.options.Filter = filter - } else { - p.options.Filter = fmt.Sprintf("%s and %s", p.configuredQueryFilter, filter) - } + return fmt.Sprintf("(%s)", strings.Join(nameFilters, " or ")) } func (r *resolver) watch(p *watchPlan, services []string, ctx context.Context) (blockingParamVal, consul.HealthChecks, error, bool) { @@ -135,8 +133,10 @@ func (r *resolver) watch(p *watchPlan, services []string, ctx context.Context) ( p.options.WaitIndex = uint64(p.lastParamVal.(waitIndexVal)) } - p.buildServiceNameFilter(services) + // build service name filter for all keys + p.options.Filter = getServiceNameFilter(services) + // request health checks for target services using blocking query checks, meta, err := r.client.Health().State(consul.HealthAny, p.options) if err != nil { @@ -164,15 +164,19 @@ func (r *resolver) watch(p *watchPlan, services []string, ctx context.Context) ( } p.expired = false - return waitIndexVal(meta.LastIndex), checks, err, false } -func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Context, watchTask chan bool) { +// runWatchPlan executes the following steps: +// - requests health check changes for the target keys from the consul agent using http long polling +// - compares the results to the previous +// - if there is a change for a given serviceName/appId it invokes the health/service api to get a list of healthy targets +// - signals completion of the watch plan +func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Context, watchPlanComplete chan bool) { defer func() { recover() - // complete watch task - watchTask <- true + // signal completion of the watch plan to unblock the watch plan loop + watchPlanComplete <- true }() // invoke blocking call @@ -185,12 +189,11 @@ func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Con // handle an error in the watch function if err != nil { - // always 0 on err so query is forced to return + // reset the query index so the next attempt does not p.lastParamVal = waitIndexVal(0) // perform an exponential backoff p.failures++ - retry := retryInterval * time.Duration(p.failures*p.failures) if retry > maxBackoffTime { retry = maxBackoffTime @@ -207,20 +210,20 @@ func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Con } return + } else { + // reset the plan failure count + p.failures = 0 } - // reset the plan failure count - p.failures = 0 - - // if the index is unchanged do nothing + // if the result index is unchanged do nothing if p.lastParamVal != nil && p.lastParamVal.equal(blockParam) { return + } else { + // update the plan index + oldParamVal := p.lastParamVal + p.lastParamVal = blockParam.next(oldParamVal) } - // update the plan index - oldParamVal := p.lastParamVal - p.lastParamVal = blockParam.next(oldParamVal) - // compare last and new result to get changed services healthByService := getHealthByService(result) changedServices := p.getChangedServices(healthByService) @@ -231,9 +234,9 @@ func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Con // call agent to get updated healthy nodes for each changed service for k := range changedServices { p.options.WaitIndex = 0 - p.options.Filter = p.configuredQueryFilter + p.options.Filter = p.healthServiceQueryFilter p.options = p.options.WithContext(ctx) - result, _, err := r.client.Health().Service(k, "", true, p.options) + result, meta, err := r.client.Health().Service(k, "", true, p.options) if err != nil { // on failure, expire service from cache, resolver will fall back to agent @@ -251,12 +254,16 @@ func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Con p.lastParamVal = waitIndexVal(0) } else { // updated service entries in registry - r.logger.Debugf("updating registry for service:%s last-index:%s", k, strconv.FormatUint(uint64(p.lastParamVal.(waitIndexVal)), 10)) + r.logger.Debugf("updating consul nr registry for service:%s last-index:%d", k, meta.LastIndex) r.registry.addOrUpdate(k, result) } } } +// runWatchLoop executes the following steps in a forever loop: +// - gets the keys from the registry +// - executes the watch plan with the targets keys +// - waits for (the watch plan to signal completion) or (the resolver to register a new key) func (r *resolver) runWatchLoop(p *watchPlan) { defer func() { recover() @@ -264,26 +271,28 @@ func (r *resolver) runWatchLoop(p *watchPlan) { r.watcherStarted = false }() - watchTask := make(chan bool, 1) + watchPlanComplete := make(chan bool, 1) for { ctx, cancel := context.WithCancel(context.Background()) + // get target keys/app-ids from registry services := r.registry.getKeys() watching := false if len(services) > 0 { - go r.runWatchPlan(p, services, ctx, watchTask) + // run watch plan for targets service with channel to signal completion + go r.runWatchPlan(p, services, ctx, watchPlanComplete) watching = true } select { - case <-watchTask: + case <-watchPlanComplete: cancel() // wait on channel for new services to track case service := <-r.registry.registrationChannel(): - // cancel blocking query to consul agent + // cancel watch plan i.e. blocking query to consul agent cancel() // generate set of keys @@ -297,7 +306,7 @@ func (r *resolver) runWatchLoop(p *watchPlan) { r.registry.addOrUpdate(service, nil) } - // check for any more new services in channel + // check for any more new services in channel and do the same moreServices := true for moreServices { select { @@ -311,8 +320,8 @@ func (r *resolver) runWatchLoop(p *watchPlan) { } if watching { - // ensure previous routine completed before next loop - <-watchTask + // ensure previous watch plan routine completed before next iteration + <-watchPlanComplete } // reset plan failure count and query index @@ -322,6 +331,7 @@ func (r *resolver) runWatchLoop(p *watchPlan) { } } +// startWatcher will configure the watch plan and start the watch loop in a separate routine func (r *resolver) startWatcher() { r.watcherMutex.Lock() defer r.watcherMutex.Unlock() @@ -331,10 +341,13 @@ func (r *resolver) startWatcher() { } options := *r.config.QueryOptions + options.UseCache = false // always ignore consul agent cache for watcher + options.Filter = "" // don't use configured filter for State() calls + plan := &watchPlan{ - options: &options, - configuredQueryFilter: options.Filter, - lastResult: make(map[serviceIdentifier]bool), + options: &options, + healthServiceQueryFilter: r.config.QueryOptions.Filter, + lastResult: make(map[serviceIdentifier]bool), } r.watcherStarted = true From dcb6371bab87e31609723f8c4cec37bd1191cea3 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 4 Oct 2023 10:07:43 +0100 Subject: [PATCH 05/20] Update nameresolution/consul/consul.go Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/consul.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 2dababdf86..856f27a1aa 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -133,9 +133,8 @@ func (r *resolver) getService(service string) (*consul.ServiceEntry, error) { r.startWatcher() } - var entry *registryEntry - - if entry = r.registry.get(service); entry != nil { + entry := r.registry.get(service) + if entry != nil { result := entry.next() if result != nil { From 8274b6d11239a89b6994aed5aadf40d2c1282363 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 4 Oct 2023 10:27:00 +0100 Subject: [PATCH 06/20] Update nameresolution/consul/consul.go Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/consul.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 856f27a1aa..551416d3b5 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -191,9 +191,8 @@ func (r *registry) removeAll() { } func (r *registry) expire(service string) { - var entry *registryEntry - - if entry = r.get(service); entry == nil { + entry := r.get(service) + if entry == nil { return } From 88bdaca89f9b20fde6fa5f7221058884a6cb3920 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 4 Oct 2023 10:28:52 +0100 Subject: [PATCH 07/20] Update nameresolution/consul/consul.go Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/consul.go | 1 + 1 file changed, 1 insertion(+) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 551416d3b5..64d11cee12 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -121,6 +121,7 @@ func (e *registryEntry) next() *consul.ServiceEntry { return nil } + // gosec is complaining that we are using a non-crypto-safe PRNG. This is fine in this scenario since we are using it only for selecting a random address for load-balancing. //nolint:gosec return e.services[rand.Int()%len(e.services)] } From b74f2952792dd62ef3cb5f14528114c047408f80 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 4 Oct 2023 10:29:37 +0100 Subject: [PATCH 08/20] Update nameresolution/consul/watcher.go Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index 0effaba4c6..2aafa49418 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -116,7 +116,7 @@ func (p *watchPlan) buildServiceNameFilter(services []string) { var nameFilters = make([]string, len(services)) for i, v := range services { - nameFilters[i] = fmt.Sprintf("ServiceName==\"%s\"", v) + nameFilters[i] = `ServiceName=="`+v+`"` } filter := fmt.Sprintf("(%s)", strings.Join(nameFilters, " or ")) From 732bc582813c0b940e129dc737aca74195e71d9a Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 4 Oct 2023 10:30:04 +0100 Subject: [PATCH 09/20] Update nameresolution/consul/watcher.go Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index 2aafa49418..659c67b069 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -119,7 +119,7 @@ func (p *watchPlan) buildServiceNameFilter(services []string) { nameFilters[i] = `ServiceName=="`+v+`"` } - filter := fmt.Sprintf("(%s)", strings.Join(nameFilters, " or ")) + filter := "(" + strings.Join(nameFilters, " or ") + ")" if len(p.configuredQueryFilter) < 1 { p.options.Filter = filter From 97b537ae8d8326ebc6c05905e7750a0a8b7af982 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 4 Oct 2023 10:30:43 +0100 Subject: [PATCH 10/20] Update nameresolution/consul/watcher.go Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index 659c67b069..96ba4fbbb8 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -124,7 +124,7 @@ func (p *watchPlan) buildServiceNameFilter(services []string) { if len(p.configuredQueryFilter) < 1 { p.options.Filter = filter } else { - p.options.Filter = fmt.Sprintf("%s and %s", p.configuredQueryFilter, filter) + p.options.Filter = p.configuredQueryFilter + " and " + filter } } From 5f18b8e6422e8fbb265909bc4f3110e44e59d375 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 4 Oct 2023 10:31:37 +0100 Subject: [PATCH 11/20] Update nameresolution/consul/watcher.go Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index 96ba4fbbb8..f950128019 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -202,7 +202,7 @@ func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Con select { case <-ctx.Done(): sleepTimer.Stop() - r.logger.Debugf("consul service-watcher retry throttling canceled") + r.logger.Debug("consul service-watcher retry throttling canceled") case <-sleepTimer.C: } From b8eb7d513f00e38fb5a9caf6146a008fd65d4409 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 4 Oct 2023 10:32:08 +0100 Subject: [PATCH 12/20] Update nameresolution/consul/watcher.go Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index f950128019..5833320f9b 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -287,7 +287,7 @@ func (r *resolver) runWatchLoop(p *watchPlan) { cancel() // generate set of keys - serviceKeys := make(map[string]interface{}) + serviceKeys := make(map[string]any) for i := 0; i < len(services); i++ { serviceKeys[services[i]] = nil } From 3831d257aeb158ca439c891de2148dd844fee13e Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 4 Oct 2023 12:09:19 +0100 Subject: [PATCH 13/20] use backoff libs, refactor ctx params, some tidy up Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/consul.go | 5 ++-- nameresolution/consul/watcher.go | 43 ++++++++++++++++++++------------ 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 64d11cee12..b1c5c50d6f 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -162,10 +162,9 @@ func (r *resolver) getService(service string) (*consul.ServiceEntry, error) { } func (r *registry) addOrUpdate(service string, services []*consul.ServiceEntry) { - var entry *registryEntry - // update - if entry = r.get(service); entry != nil { + entry := r.get(service) + if entry != nil { entry.mu.Lock() defer entry.mu.Unlock() diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index b5a1df0be5..240f4af068 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -6,15 +6,17 @@ import ( "strings" "time" + "github.com/cenkalti/backoff/v4" + consul "github.com/hashicorp/consul/api" ) const ( - // retryInterval is the base retry value. - retryInterval = 5 * time.Second + // initial back interval. + initialBackOffInternal = 5 * time.Second // maximum back off time, this is to prevent exponential runaway. - maxBackoffTime = 180 * time.Second + maxBackOffInternal = 180 * time.Second ) // A watchPlan contains all the state tracked in the loop @@ -25,7 +27,8 @@ type watchPlan struct { lastResult map[serviceIdentifier]bool options *consul.QueryOptions healthServiceQueryFilter string - failures int + failing bool + backOff *backoff.ExponentialBackOff } type blockingParamVal interface { @@ -125,7 +128,7 @@ func getServiceNameFilter(services []string) string { return strings.Join(nameFilters, " or ") } -func (r *resolver) watch(p *watchPlan, services []string, ctx context.Context) (blockingParamVal, consul.HealthChecks, error, bool) { +func (r *resolver) watch(ctx context.Context, p *watchPlan, services []string) (blockingParamVal, consul.HealthChecks, error, bool) { p.options = p.options.WithContext(ctx) if p.lastParamVal != nil { @@ -171,7 +174,7 @@ func (r *resolver) watch(p *watchPlan, services []string, ctx context.Context) ( // - compares the results to the previous // - if there is a change for a given serviceName/appId it invokes the health/service api to get a list of healthy targets // - signals completion of the watch plan -func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Context, watchPlanComplete chan bool) { +func (r *resolver) runWatchPlan(ctx context.Context, p *watchPlan, services []string, watchPlanComplete chan bool) { defer func() { recover() // signal completion of the watch plan to unblock the watch plan loop @@ -179,7 +182,7 @@ func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Con }() // invoke blocking call - blockParam, result, err, canceled := r.watch(p, services, ctx) + blockParam, result, err, canceled := r.watch(ctx, p, services) // if the ctx was canceled then do nothing if canceled { @@ -192,14 +195,15 @@ func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Con p.lastParamVal = waitIndexVal(0) // perform an exponential backoff - p.failures++ - retry := retryInterval * time.Duration(p.failures*p.failures) - if retry > maxBackoffTime { - retry = maxBackoffTime + if !p.failing { + p.failing = true + p.backOff.Reset() } + retry := p.backOff.NextBackOff() + // pause watcher routine until ctx is canceled or retry timer finishes - r.logger.Errorf("consul service-watcher error: %v, retry in %v", err, retry) + r.logger.Errorf("consul service-watcher error: %v, retry in %s", err, retry.Round(time.Second)) sleepTimer := time.NewTimer(retry) select { case <-ctx.Done(): @@ -210,8 +214,8 @@ func (r *resolver) runWatchPlan(p *watchPlan, services []string, ctx context.Con return } else { - // reset the plan failure count - p.failures = 0 + // reset the plan failure flag + p.failing = false } // if the result index is unchanged do nothing @@ -281,7 +285,7 @@ func (r *resolver) runWatchLoop(p *watchPlan) { if len(services) > 0 { // run watch plan for targets service with channel to signal completion - go r.runWatchPlan(p, services, ctx, watchPlanComplete) + go r.runWatchPlan(ctx, p, services, watchPlanComplete) watching = true } @@ -324,7 +328,7 @@ func (r *resolver) runWatchLoop(p *watchPlan) { } // reset plan failure count and query index - p.failures = 0 + p.failing = false p.lastParamVal = waitIndexVal(0) } } @@ -343,10 +347,17 @@ func (r *resolver) startWatcher() { options.UseCache = false // always ignore consul agent cache for watcher options.Filter = "" // don't use configured filter for State() calls + // Configure exponential backoff + ebo := backoff.NewExponentialBackOff() + ebo.InitialInterval = initialBackOffInternal + ebo.MaxInterval = maxBackOffInternal + ebo.MaxElapsedTime = 0 + plan := &watchPlan{ options: &options, healthServiceQueryFilter: r.config.QueryOptions.Filter, lastResult: make(map[serviceIdentifier]bool), + backOff: ebo, } r.watcherStarted = true From fb854e6449a972dab3f3d19957c1d20798953439 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 4 Oct 2023 12:14:44 +0100 Subject: [PATCH 14/20] lint fixes Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/watcher.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index 240f4af068..eae950159d 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -6,8 +6,7 @@ import ( "strings" "time" - "github.com/cenkalti/backoff/v4" - + backoff "github.com/cenkalti/backoff/v4" consul "github.com/hashicorp/consul/api" ) From bdb2f29e14cbb05d436316be4380565b729bf532 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 4 Oct 2023 18:04:20 +0100 Subject: [PATCH 15/20] remove panic recovers Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/consul_test.go | 68 ---------------------------- nameresolution/consul/watcher.go | 2 - 2 files changed, 70 deletions(-) diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index 9751de062d..51332d5139 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -414,74 +414,6 @@ func TestResolveID(t *testing.T) { assert.Equal(t, 3, mockReg.addOrUpdateCalled) }, }, - { - "should remove all from cache if watcher loop panics", - nr.ResolveRequest{ - ID: "test-app", - }, - func(t *testing.T, req nr.ResolveRequest) { - t.Helper() - - meta := &consul.QueryMeta{ - LastIndex: 0, - } - - mock := mockClient{ - mockHealth: mockHealth{ - // Service() - serviceResult: []*consul.ServiceEntry{ - { - Service: &consul.AgentService{ - Address: "10.3.245.137", - Port: 8600, - Meta: map[string]string{ - "DAPR_PORT": "50005", - }, - }, - }, - }, - serviceMeta: meta, - serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { - }, - serviceErr: nil, - }, - } - - cfg := resolverConfig{ - DaprPortMetaKey: "DAPR_PORT", - UseCache: true, - QueryOptions: &consul.QueryOptions{}, - } - - serviceKeys := make([]string, 0, 10) - - mockReg := &mockRegistry{ - registerChannelResult: make(chan string, 100), - getKeysResult: &serviceKeys, - addOrUpdateBehaviour: func(service string, services []*consul.ServiceEntry) { - if services == nil { - serviceKeys = append(serviceKeys, service) - } - }, - } - resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg) - addr, _ := resolver.ResolveID(req) - - // Cache miss pass through - assert.Equal(t, 1, mockReg.getCalled) - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) - assert.Equal(t, 1, mockReg.addOrUpdateCalled) - assert.Equal(t, "10.3.245.137:50005", addr) - - mockReg.getKeysBehaviour = func() { panic("oh no") } - mockReg.registerChannelResult <- "test-app" - - // Remove - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.removeAllCalled == 1 }) - assert.Equal(t, 0, mockReg.expireCalled) - assert.Equal(t, 1, mockReg.removeAllCalled) - }, - }, { "should only update cache on change", nr.ResolveRequest{ diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index eae950159d..aa8ed5d7a1 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -175,7 +175,6 @@ func (r *resolver) watch(ctx context.Context, p *watchPlan, services []string) ( // - signals completion of the watch plan func (r *resolver) runWatchPlan(ctx context.Context, p *watchPlan, services []string, watchPlanComplete chan bool) { defer func() { - recover() // signal completion of the watch plan to unblock the watch plan loop watchPlanComplete <- true }() @@ -268,7 +267,6 @@ func (r *resolver) runWatchPlan(ctx context.Context, p *watchPlan, services []st // - waits for (the watch plan to signal completion) or (the resolver to register a new key) func (r *resolver) runWatchLoop(p *watchPlan) { defer func() { - recover() r.registry.removeAll() r.watcherStarted = false }() From 5b0d2096f0e40c887182be0e99fb08708f0d6e2d Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Thu, 5 Oct 2023 10:57:25 +0100 Subject: [PATCH 16/20] more lint fixes Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/watcher.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index aa8ed5d7a1..5dfbca2365 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -69,11 +69,12 @@ type serviceIdentifier struct { func getHealthByService(checks consul.HealthChecks) map[serviceIdentifier]bool { healthByService := make(map[serviceIdentifier]bool) for _, check := range checks { - // generate unique identifer for service + // generate unique identifier for service id := serviceIdentifier{ serviceID: check.ServiceID, serviceName: check.ServiceName, - node: check.Node} + node: check.Node, + } // if the service is not in the map - add and init to healthy if state, ok := healthByService[id]; !ok { @@ -118,7 +119,7 @@ func (p *watchPlan) getChangedServices(newResult map[serviceIdentifier]bool) map } func getServiceNameFilter(services []string) string { - var nameFilters = make([]string, len(services)) + nameFilters := make([]string, len(services)) for i, v := range services { nameFilters[i] = `ServiceName=="` + v + `"` @@ -139,7 +140,6 @@ func (r *resolver) watch(ctx context.Context, p *watchPlan, services []string) ( // request health checks for target services using blocking query checks, meta, err := r.client.Health().State(consul.HealthAny, p.options) - if err != nil { // if it failed during long poll try again with no wait if p.options.WaitIndex != uint64(0) { From 402e2b1ad1ed449e159189f9e382adc405dc3ba3 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Thu, 5 Oct 2023 12:21:58 +0100 Subject: [PATCH 17/20] resolve data races in tests Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/consul_test.go | 74 ++++++++++++++-------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index 51332d5139..528aa1f0be 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -18,6 +18,7 @@ import ( "net" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -58,7 +59,7 @@ type mockHealth struct { serviceResult []*consul.ServiceEntry serviceMeta *consul.QueryMeta - stateCallStarted int + stateCallStarted atomic.Int32 stateCalled int stateError *error stateBehaviour func(state string, q *consul.QueryOptions) @@ -67,7 +68,7 @@ type mockHealth struct { } func (m *mockHealth) State(state string, q *consul.QueryOptions) (consul.HealthChecks, *consul.QueryMeta, error) { - m.stateCallStarted++ + m.stateCallStarted.Add(1) if m.stateBehaviour != nil { m.stateBehaviour(state, q) @@ -117,10 +118,10 @@ func (m *mockAgent) ServiceRegister(service *consul.AgentServiceRegistration) er } type mockRegistry struct { - getKeysCalled int + getKeysCalled atomic.Int32 getKeysResult *[]string getKeysBehaviour func() - addOrUpdateCalled int + addOrUpdateCalled atomic.Int32 addOrUpdateBehaviour func(service string, services []*consul.ServiceEntry) expireCalled int expireAllCalled int @@ -128,12 +129,10 @@ type mockRegistry struct { removeAllCalled int getCalled int getResult *registryEntry - registerChannelCalled int registerChannelResult chan string } func (m *mockRegistry) registrationChannel() chan string { - m.registerChannelCalled++ return m.registerChannelResult } @@ -142,7 +141,7 @@ func (m *mockRegistry) getKeys() []string { m.getKeysBehaviour() } - m.getKeysCalled++ + m.getKeysCalled.Add(1) return *m.getKeysResult } @@ -160,7 +159,7 @@ func (m *mockRegistry) addOrUpdate(service string, services []*consul.ServiceEnt m.addOrUpdateBehaviour(service, services) } - m.addOrUpdateCalled++ + m.addOrUpdateCalled.Add(1) } func (m *mockRegistry) expire(service string) { @@ -358,13 +357,13 @@ func TestResolveID(t *testing.T) { // no apps in registry - cache miss, call agent directly assert.Equal(t, 1, mockReg.getCalled) - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.getKeysCalled == 2 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.getKeysCalled.Load() == 2 }) assert.Equal(t, 1, mock.mockHealth.serviceCalled) assert.Equal(t, "10.3.245.137:50005", addr) // watcher adds app to registry - assert.Equal(t, 1, mockReg.addOrUpdateCalled) - assert.Equal(t, 2, mockReg.getKeysCalled) + assert.Equal(t, int32(1), mockReg.addOrUpdateCalled.Load()) + assert.Equal(t, int32(2), mockReg.getKeysCalled.Load()) mockReg.registerChannelResult <- "test-app" mockReg.getResult = ®istryEntry{ @@ -373,12 +372,12 @@ func TestResolveID(t *testing.T) { // blocking query - return new index blockingCall <- 2 - waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 2 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 2 }) assert.Equal(t, 1, mock.mockHealth.stateCalled) // get healthy nodes and update registry for service in result assert.Equal(t, 2, mock.mockHealth.serviceCalled) - assert.Equal(t, 2, mockReg.addOrUpdateCalled) + assert.Equal(t, int32(2), mockReg.addOrUpdateCalled.Load()) // resolve id should only hit cache now addr, _ = resolver.ResolveID(req) @@ -393,25 +392,25 @@ func TestResolveID(t *testing.T) { // no update when no change in index and payload blockingCall <- 2 - waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 3 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 3 }) assert.Equal(t, 2, mock.mockHealth.stateCalled) assert.Equal(t, 2, mock.mockHealth.serviceCalled) - assert.Equal(t, 2, mockReg.addOrUpdateCalled) + assert.Equal(t, int32(2), mockReg.addOrUpdateCalled.Load()) // no update when no change in payload blockingCall <- 3 - waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 4 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 4 }) assert.Equal(t, 3, mock.mockHealth.stateCalled) assert.Equal(t, 2, mock.mockHealth.serviceCalled) - assert.Equal(t, 2, mockReg.addOrUpdateCalled) + assert.Equal(t, int32(2), mockReg.addOrUpdateCalled.Load()) // update when change in index and payload mock.mockHealth.stateResult[0].Status = consul.HealthCritical blockingCall <- 4 - waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 5 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 5 }) assert.Equal(t, 4, mock.mockHealth.stateCalled) assert.Equal(t, 3, mock.mockHealth.serviceCalled) - assert.Equal(t, 3, mockReg.addOrUpdateCalled) + assert.Equal(t, int32(3), mockReg.addOrUpdateCalled.Load()) }, }, { @@ -518,13 +517,13 @@ func TestResolveID(t *testing.T) { // no apps in registry - cache miss, call agent directly assert.Equal(t, 1, mockReg.getCalled) - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled.Load() == 1 }) assert.Equal(t, 1, mock.mockHealth.serviceCalled) assert.Equal(t, "10.3.245.137:50005", addr) // watcher adds app to registry - assert.Equal(t, 1, mockReg.addOrUpdateCalled) - assert.Equal(t, 2, mockReg.getKeysCalled) + assert.Equal(t, int32(1), mockReg.addOrUpdateCalled.Load()) + assert.Equal(t, int32(2), mockReg.getKeysCalled.Load()) // add key to mock registry - trigger watcher mockReg.registerChannelResult <- "test-app" @@ -534,12 +533,12 @@ func TestResolveID(t *testing.T) { // blocking query - return new index blockingCall <- 2 - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 2 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled.Load() == 2 }) assert.Equal(t, 1, mock.mockHealth.stateCalled) // get healthy nodes and update registry for service in result assert.Equal(t, 2, mock.mockHealth.serviceCalled) - assert.Equal(t, 2, mockReg.addOrUpdateCalled) + assert.Equal(t, int32(2), mockReg.addOrUpdateCalled.Load()) // resolve id should only hit cache now _, _ = resolver.ResolveID(req) @@ -552,40 +551,40 @@ func TestResolveID(t *testing.T) { // blocking query - return new index - node1 app is now unhealthy blockingCall <- 3 - waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 3 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 3 }) assert.Equal(t, 2, mock.mockHealth.stateCalled) assert.Equal(t, 3, mock.mockHealth.serviceCalled) - assert.Equal(t, 3, mockReg.addOrUpdateCalled) + assert.Equal(t, int32(3), mockReg.addOrUpdateCalled.Load()) // change remaining check for node1 app to critical node1check2.Status = consul.HealthCritical // blocking query - return new index - node1 app is still unhealthy, no change blockingCall <- 4 - waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 4 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 4 }) assert.Equal(t, 3, mock.mockHealth.stateCalled) assert.Equal(t, 3, mock.mockHealth.serviceCalled) - assert.Equal(t, 3, mockReg.addOrUpdateCalled) + assert.Equal(t, int32(3), mockReg.addOrUpdateCalled.Load()) // change one check for node2 app to healthy node2check1.Status = consul.HealthPassing // blocking query - return new index - node2 app is still unhealthy, no change blockingCall <- 4 - waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 5 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 5 }) assert.Equal(t, 4, mock.mockHealth.stateCalled) assert.Equal(t, 3, mock.mockHealth.serviceCalled) - assert.Equal(t, 3, mockReg.addOrUpdateCalled) + assert.Equal(t, int32(3), mockReg.addOrUpdateCalled.Load()) // change remaining check for node2 app to healthy node2check2.Status = consul.HealthPassing // blocking query - return new index - node2 app is now healthy blockingCall <- 5 - waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 6 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 6 }) assert.Equal(t, 5, mock.mockHealth.stateCalled) assert.Equal(t, 4, mock.mockHealth.serviceCalled) - assert.Equal(t, 4, mockReg.addOrUpdateCalled) + assert.Equal(t, int32(4), mockReg.addOrUpdateCalled.Load()) }, }, { @@ -665,11 +664,12 @@ func TestResolveID(t *testing.T) { // Cache miss pass through assert.Equal(t, 1, mockReg.getCalled) - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled.Load() == 1 }) assert.Equal(t, 1, mock.mockHealth.serviceCalled) - assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, int32(1), mockReg.addOrUpdateCalled.Load()) assert.Equal(t, "10.3.245.137:50005", addr) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 1 }) mockReg.getKeysResult = &serviceKeys mockReg.registerChannelResult <- "test-app" mockReg.getResult = ®istryEntry{ @@ -677,15 +677,15 @@ func TestResolveID(t *testing.T) { } blockingCall <- 2 - waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 2 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled.Load() == 2 }) assert.Equal(t, 1, mock.mockHealth.stateCalled) assert.Equal(t, 2, mock.mockHealth.serviceCalled) - assert.Equal(t, 2, mockReg.addOrUpdateCalled) + assert.Equal(t, int32(2), mockReg.addOrUpdateCalled.Load()) mock.mockHealth.stateError = &err blockingCall <- 3 blockingCall <- 3 - waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted == 2 }) + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 2 }) assert.Equal(t, 1, mockReg.expireAllCalled) }, }, From 2745d076bfd1adb2ca3c71b7c03b0fdbeac908c2 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Tue, 10 Oct 2023 11:46:41 -0700 Subject: [PATCH 18/20] =?UTF-8?q?=F0=9F=92=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/consul.go | 12 +++++------- nameresolution/consul/consul_test.go | 29 ++++++++++------------------ nameresolution/consul/watcher.go | 26 +++++++++++-------------- 3 files changed, 26 insertions(+), 41 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index b1c5c50d6f..7e89f5ae20 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -19,6 +19,7 @@ import ( "net" "strconv" "sync" + "sync/atomic" consul "github.com/hashicorp/consul/api" @@ -70,8 +71,7 @@ type resolver struct { logger logger.Logger client clientInterface registry registryInterface - watcherStarted bool - watcherMutex sync.Mutex + watcherStarted atomic.Bool } type registryInterface interface { @@ -86,7 +86,7 @@ type registryInterface interface { } type registry struct { - entries *sync.Map + entries sync.Map serviceChannel chan string } @@ -130,9 +130,7 @@ func (r *resolver) getService(service string) (*consul.ServiceEntry, error) { var services []*consul.ServiceEntry if r.config.UseCache { - if !r.watcherStarted { - r.startWatcher() - } + r.startWatcher() entry := r.registry.get(service) if entry != nil { @@ -223,7 +221,7 @@ type resolverConfig struct { // NewResolver creates Consul name resolver. func NewResolver(logger logger.Logger) nr.Resolver { - return newResolver(logger, resolverConfig{}, &client{}, ®istry{entries: &sync.Map{}, serviceChannel: make(chan string, 100)}) + return newResolver(logger, resolverConfig{}, &client{}, ®istry{serviceChannel: make(chan string, 100)}) } func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface) nr.Resolver { diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index 528aa1f0be..98762e85c9 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -17,7 +17,6 @@ import ( "fmt" "net" "strconv" - "sync" "sync/atomic" "testing" "time" @@ -941,7 +940,7 @@ func TestRegistry(t *testing.T) { func(t *testing.T) { t.Helper() - registry := ®istry{entries: &sync.Map{}} + registry := ®istry{} result := []*consul.ServiceEntry{ { @@ -976,8 +975,8 @@ func TestRegistry(t *testing.T) { func(t *testing.T) { t.Helper() - entryMap := &sync.Map{} - entryMap.Store( + registry := ®istry{} + registry.entries.Store( "A", ®istryEntry{ services: []*consul.ServiceEntry{ @@ -990,7 +989,7 @@ func TestRegistry(t *testing.T) { }, }) - entryMap.Store( + registry.entries.Store( "B", ®istryEntry{ services: []*consul.ServiceEntry{ @@ -1003,7 +1002,7 @@ func TestRegistry(t *testing.T) { }, }) - entryMap.Store( + registry.entries.Store( "C", ®istryEntry{ services: []*consul.ServiceEntry{ @@ -1016,10 +1015,6 @@ func TestRegistry(t *testing.T) { }, }) - registry := ®istry{ - entries: entryMap, - } - result, _ := registry.entries.Load("A") assert.NotNil(t, result.(*registryEntry).services) @@ -1048,7 +1043,7 @@ func TestRegistry(t *testing.T) { func(t *testing.T) { t.Helper() - entryMap := &sync.Map{} + registry := ®istry{} entry := ®istryEntry{ services: []*consul.ServiceEntry{ { @@ -1060,14 +1055,10 @@ func TestRegistry(t *testing.T) { }, } - entryMap.Store("A", entry) - entryMap.Store("B", entry) - entryMap.Store("C", entry) - entryMap.Store("D", entry) - - registry := ®istry{ - entries: entryMap, - } + registry.entries.Store("A", entry) + registry.entries.Store("B", entry) + registry.entries.Store("C", entry) + registry.entries.Store("D", entry) registry.remove("A") diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index 5dfbca2365..4bc18b6cd6 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -128,7 +128,7 @@ func getServiceNameFilter(services []string) string { return strings.Join(nameFilters, " or ") } -func (r *resolver) watch(ctx context.Context, p *watchPlan, services []string) (blockingParamVal, consul.HealthChecks, error, bool) { +func (r *resolver) watch(ctx context.Context, p *watchPlan, services []string) (blockingParamVal, consul.HealthChecks, error) { p.options = p.options.WithContext(ctx) if p.lastParamVal != nil { @@ -150,7 +150,7 @@ func (r *resolver) watch(ctx context.Context, p *watchPlan, services []string) ( if err != nil { // if the context was canceled if errors.Is(err, context.Canceled) { - return nil, nil, nil, true + return nil, nil, err } // if it failed with no wait and plan is not expired @@ -160,12 +160,12 @@ func (r *resolver) watch(ctx context.Context, p *watchPlan, services []string) ( r.registry.expireAll() } - return nil, nil, err, false + return nil, nil, err } } p.expired = false - return waitIndexVal(meta.LastIndex), checks, err, false + return waitIndexVal(meta.LastIndex), checks, err } // runWatchPlan executes the following steps: @@ -173,17 +173,17 @@ func (r *resolver) watch(ctx context.Context, p *watchPlan, services []string) ( // - compares the results to the previous // - if there is a change for a given serviceName/appId it invokes the health/service api to get a list of healthy targets // - signals completion of the watch plan -func (r *resolver) runWatchPlan(ctx context.Context, p *watchPlan, services []string, watchPlanComplete chan bool) { +func (r *resolver) runWatchPlan(ctx context.Context, p *watchPlan, services []string, watchPlanComplete chan struct{}) { defer func() { // signal completion of the watch plan to unblock the watch plan loop - watchPlanComplete <- true + watchPlanComplete <- struct{}{} }() // invoke blocking call - blockParam, result, err, canceled := r.watch(ctx, p, services) + blockParam, result, err := r.watch(ctx, p, services) // if the ctx was canceled then do nothing - if canceled { + if errors.Is(err, context.Canceled) { return } @@ -268,10 +268,10 @@ func (r *resolver) runWatchPlan(ctx context.Context, p *watchPlan, services []st func (r *resolver) runWatchLoop(p *watchPlan) { defer func() { r.registry.removeAll() - r.watcherStarted = false + r.watcherStarted.Store(false) }() - watchPlanComplete := make(chan bool, 1) + watchPlanComplete := make(chan struct{}, 1) for { ctx, cancel := context.WithCancel(context.Background()) @@ -333,10 +333,7 @@ func (r *resolver) runWatchLoop(p *watchPlan) { // startWatcher will configure the watch plan and start the watch loop in a separate routine func (r *resolver) startWatcher() { - r.watcherMutex.Lock() - defer r.watcherMutex.Unlock() - - if r.watcherStarted { + if !r.watcherStarted.CompareAndSwap(false, true) { return } @@ -357,6 +354,5 @@ func (r *resolver) startWatcher() { backOff: ebo, } - r.watcherStarted = true go r.runWatchLoop(plan) } From bbfb762014d450c254a0dc9c292ba9b1134414f4 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Thu, 12 Oct 2023 10:03:00 +0100 Subject: [PATCH 19/20] added resolver close method for cleanup and deregister Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/README.md | 3 +- nameresolution/consul/configuration.go | 3 + nameresolution/consul/consul.go | 55 ++++-- nameresolution/consul/consul_test.go | 264 +++++++++++++++++++++++-- nameresolution/consul/watcher.go | 6 + 5 files changed, 294 insertions(+), 37 deletions(-) diff --git a/nameresolution/consul/README.md b/nameresolution/consul/README.md index c936c706f7..d04eb870a0 100644 --- a/nameresolution/consul/README.md +++ b/nameresolution/consul/README.md @@ -54,7 +54,8 @@ As of writing the configuration spec is fixed to v1.3.0 of the consul api | Tags | `[]string` | Configures any tags to include if/when registering services | | Meta | `map[string]string` | Configures any additional metadata to include if/when registering services | | DaprPortMetaKey | `string` | The key used for getting the Dapr sidecar port from consul service metadata during service resolution, it will also be used to set the Dapr sidecar port in metadata during registration. If blank it will default to `DAPR_PORT` | -| SelfRegister | `bool` | Controls if Dapr will register the service to consul. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services. | +| SelfRegister | `bool` | Controls if Dapr will register the service to consul on startup. If unset it will default to `false` | +| SelfDeregister | `bool` | Controls if Dapr will deregister the service from consul on shutdown. If unset it will default to `false` | | AdvancedRegistration | [*api.AgentServiceRegistration](https://pkg.go.dev/github.com/hashicorp/consul/api@v1.3.0#AgentServiceRegistration) | Gives full control of service registration through configuration. If configured the component will ignore any configuration of Checks, Tags, Meta and SelfRegister. | | UseCache | `bool` | Configures if Dapr will cache the resolved services in-memory. This is done using consul [blocking queries](https://www.consul.io/api-docs/features/blocking) which can be configured via the QueryOptions configuration. If unset it will default to `false` | ## Samples Configurations diff --git a/nameresolution/consul/configuration.go b/nameresolution/consul/configuration.go index 4416968625..9fba047479 100644 --- a/nameresolution/consul/configuration.go +++ b/nameresolution/consul/configuration.go @@ -37,6 +37,7 @@ type intermediateConfig struct { AdvancedRegistration *AgentServiceRegistration // advanced use-case DaprPortMetaKey string SelfRegister bool + SelfDeregister bool UseCache bool } @@ -49,6 +50,7 @@ type configSpec struct { AdvancedRegistration *consul.AgentServiceRegistration // advanced use-case DaprPortMetaKey string SelfRegister bool + SelfDeregister bool UseCache bool } @@ -90,6 +92,7 @@ func mapConfig(config intermediateConfig) configSpec { QueryOptions: mapQueryOptions(config.QueryOptions), AdvancedRegistration: mapAdvancedRegistration(config.AdvancedRegistration), SelfRegister: config.SelfRegister, + SelfDeregister: config.SelfDeregister, DaprPortMetaKey: config.DaprPortMetaKey, UseCache: config.UseCache, } diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 7e89f5ae20..5ed9e071dc 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -59,6 +59,7 @@ type clientInterface interface { type agentInterface interface { Self() (map[string]map[string]interface{}, error) ServiceRegister(service *consul.AgentServiceRegistration) error + ServiceDeregister(serviceID string) error } type healthInterface interface { @@ -67,11 +68,12 @@ type healthInterface interface { } type resolver struct { - config resolverConfig - logger logger.Logger - client clientInterface - registry registryInterface - watcherStarted atomic.Bool + config resolverConfig + logger logger.Logger + client clientInterface + registry registryInterface + watcherStarted atomic.Bool + watcherStopChannel chan struct{} } type registryInterface interface { @@ -212,24 +214,26 @@ func (r *registry) registrationChannel() chan string { } type resolverConfig struct { - Client *consul.Config - QueryOptions *consul.QueryOptions - Registration *consul.AgentServiceRegistration - DaprPortMetaKey string - UseCache bool + Client *consul.Config + QueryOptions *consul.QueryOptions + Registration *consul.AgentServiceRegistration + DeregisterOnClose bool + DaprPortMetaKey string + UseCache bool } // NewResolver creates Consul name resolver. func NewResolver(logger logger.Logger) nr.Resolver { - return newResolver(logger, resolverConfig{}, &client{}, ®istry{serviceChannel: make(chan string, 100)}) + return newResolver(logger, resolverConfig{}, &client{}, ®istry{serviceChannel: make(chan string, 100)}, make(chan struct{})) } -func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface) nr.Resolver { +func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface, watcherStopChannel chan struct{}) nr.Resolver { return &resolver{ - logger: logger, - config: resolverConfig, - client: client, - registry: registry, + logger: logger, + config: resolverConfig, + client: client, + registry: registry, + watcherStopChannel: watcherStopChannel, } } @@ -293,6 +297,24 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) { return formatAddress(addr, port) } +// Close will stop the watcher and deregister app from consul +func (r *resolver) Close() error { + if r.watcherStarted.Load() { + r.watcherStopChannel <- struct{}{} + } + + if r.config.Registration != nil && r.config.DeregisterOnClose { + err := r.client.Agent().ServiceDeregister(r.config.Registration.ID) + if err != nil { + return fmt.Errorf("failed to deregister consul service: %w", err) + } + + r.logger.Info("deregistered service from consul") + } + + return nil +} + func formatAddress(address string, port string) (addr string, err error) { if net.ParseIP(address).To4() != nil { return address + ":" + port, nil @@ -315,6 +337,7 @@ func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) { } resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey + resolverCfg.DeregisterOnClose = cfg.SelfDeregister resolverCfg.UseCache = cfg.UseCache resolverCfg.Client = getClientConfig(cfg) diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index 98762e85c9..bb98d0107c 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -97,11 +97,13 @@ func (m *mockHealth) Service(service, tag string, passingOnly bool, q *consul.Qu } type mockAgent struct { - selfCalled int - selfErr error - selfResult map[string]map[string]interface{} - serviceRegisterCalled int - serviceRegisterErr error + selfCalled int + selfErr error + selfResult map[string]map[string]interface{} + serviceRegisterCalled int + serviceRegisterErr error + serviceDeregisterCalled int + serviceDeregisterErr error } func (m *mockAgent) Self() (map[string]map[string]interface{}, error) { @@ -116,6 +118,12 @@ func (m *mockAgent) ServiceRegister(service *consul.AgentServiceRegistration) er return m.serviceRegisterErr } +func (m *mockAgent) ServiceDeregister(serviceID string) error { + m.serviceDeregisterCalled++ + + return m.serviceDeregisterErr +} + type mockRegistry struct { getKeysCalled atomic.Int32 getKeysResult *[]string @@ -125,7 +133,7 @@ type mockRegistry struct { expireCalled int expireAllCalled int removeCalled int - removeAllCalled int + removeAllCalled atomic.Int32 getCalled int getResult *registryEntry registerChannelResult chan string @@ -150,7 +158,7 @@ func (m *mockRegistry) expireAll() { } func (m *mockRegistry) removeAll() { - m.removeAllCalled++ + m.removeAllCalled.Add(1) } func (m *mockRegistry) addOrUpdate(service string, services []*consul.ServiceEntry) { @@ -192,7 +200,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}) + resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}, make(chan struct{})) _ = resolver.Init(metadata) @@ -215,7 +223,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}) + resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}, make(chan struct{})) _ = resolver.Init(metadata) @@ -237,7 +245,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}) + resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}, make(chan struct{})) _ = resolver.Init(metadata) @@ -351,7 +359,7 @@ func TestResolveID(t *testing.T) { } }, } - resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg) + resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg, make(chan struct{})) addr, _ := resolver.ResolveID(req) // no apps in registry - cache miss, call agent directly @@ -511,7 +519,7 @@ func TestResolveID(t *testing.T) { } }, } - resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg) + resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg, make(chan struct{})) addr, _ := resolver.ResolveID(req) // no apps in registry - cache miss, call agent directly @@ -658,7 +666,7 @@ func TestResolveID(t *testing.T) { } }, } - resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg) + resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg, make(chan struct{})) addr, _ := resolver.ResolveID(req) // Cache miss pass through @@ -688,6 +696,102 @@ func TestResolveID(t *testing.T) { assert.Equal(t, 1, mockReg.expireAllCalled) }, }, + { + "should stop watcher on close", + nr.ResolveRequest{ + ID: "test-app", + }, + func(t *testing.T, req nr.ResolveRequest) { + t.Helper() + + blockingCall := make(chan uint64) + meta := &consul.QueryMeta{ + LastIndex: 0, + } + + serviceEntries := []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "10.3.245.137", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + } + + healthChecks := consul.HealthChecks{ + &consul.HealthCheck{ + Node: "0e1234", + ServiceID: "test-app-10.3.245.137-3500", + ServiceName: "test-app", + Status: consul.HealthPassing, + }, + } + + mock := &mockClient{ + mockHealth: mockHealth{ + // Service() + serviceResult: serviceEntries, + serviceMeta: meta, + serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { + }, + serviceErr: nil, + + // State() + stateResult: healthChecks, + stateMeta: meta, + stateBehaviour: func(state string, q *consul.QueryOptions) { + select { + case meta.LastIndex = <-blockingCall: + case <-q.Context().Done(): + } + }, + stateError: nil, + }, + } + + cfg := resolverConfig{ + DaprPortMetaKey: "DAPR_PORT", + UseCache: true, + QueryOptions: &consul.QueryOptions{}, + } + + serviceKeys := make([]string, 0, 10) + + mockReg := &mockRegistry{ + registerChannelResult: make(chan string, 100), + getKeysResult: &serviceKeys, + addOrUpdateBehaviour: func(service string, services []*consul.ServiceEntry) { + if services == nil { + serviceKeys = append(serviceKeys, service) + } + }, + } + resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg, make(chan struct{})).(*resolver) + addr, _ := resolver.ResolveID(req) + + // Cache miss pass through + assert.Equal(t, 1, mockReg.getCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled.Load() == 1 }) + assert.Equal(t, 1, mock.mockHealth.serviceCalled) + assert.Equal(t, int32(1), mockReg.addOrUpdateCalled.Load()) + assert.Equal(t, "10.3.245.137:50005", addr) + + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 1 }) + mockReg.getKeysResult = &serviceKeys + mockReg.registerChannelResult <- "test-app" + mockReg.getResult = ®istryEntry{ + services: serviceEntries, + } + + resolver.Close() + waitTillTrueOrTimeout(time.Second*1, func() bool { return mockReg.removeAllCalled.Load() == 1 }) + assert.Equal(t, int32(1), mockReg.removeAllCalled.Load()) + assert.Equal(t, false, resolver.watcherStarted.Load()) + }, + }, { "error if no healthy services found", nr.ResolveRequest{ @@ -700,7 +804,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{}, }, } - resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) _, err := resolver.ResolveID(req) assert.Equal(t, 1, mock.mockHealth.serviceCalled) @@ -729,7 +833,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) addr, _ := resolver.ResolveID(req) @@ -758,7 +862,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) addr, _ := resolver.ResolveID(req) @@ -796,7 +900,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) total1 := 0 total2 := 0 @@ -855,7 +959,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) addr, _ := resolver.ResolveID(req) @@ -884,7 +988,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) _, err := resolver.ResolveID(req) @@ -910,7 +1014,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) _, err := resolver.ResolveID(req) @@ -927,6 +1031,110 @@ func TestResolveID(t *testing.T) { } } +func TestClose(t *testing.T) { + t.Parallel() + + tests := []struct { + testName string + metadata nr.Metadata + test func(*testing.T, nr.Metadata) + }{ + { + "should deregister", + nr.Metadata{Base: metadata.Base{ + Properties: getTestPropsWithoutKey(""), + }, Configuration: nil}, + func(t *testing.T, metadata nr.Metadata) { + t.Helper() + + var mock mockClient + resolverConfig := resolverConfig{ + Registration: &consul.AgentServiceRegistration{}, + DeregisterOnClose: true, + } + + resolver := newResolver(logger.NewLogger("test"), resolverConfig, &mock, ®istry{}, make(chan struct{})).(*resolver) + resolver.Close() + + assert.Equal(t, 1, mock.mockAgent.serviceDeregisterCalled) + }, + }, + { + "should not deregister", + nr.Metadata{Base: metadata.Base{ + Properties: getTestPropsWithoutKey(""), + }, Configuration: nil}, + func(t *testing.T, metadata nr.Metadata) { + t.Helper() + + var mock mockClient + resolverConfig := resolverConfig{ + Registration: &consul.AgentServiceRegistration{}, + DeregisterOnClose: false, + } + + resolver := newResolver(logger.NewLogger("test"), resolverConfig, &mock, ®istry{}, make(chan struct{})).(*resolver) + resolver.Close() + + assert.Equal(t, 0, mock.mockAgent.serviceDeregisterCalled) + }, + }, + { + "should not deregister when no registration", + nr.Metadata{Base: metadata.Base{ + Properties: getTestPropsWithoutKey(""), + }, Configuration: nil}, + func(t *testing.T, metadata nr.Metadata) { + t.Helper() + + var mock mockClient + resolverConfig := resolverConfig{ + Registration: nil, + DeregisterOnClose: true, + } + + resolver := newResolver(logger.NewLogger("test"), resolverConfig, &mock, ®istry{}, make(chan struct{})).(*resolver) + resolver.Close() + + assert.Equal(t, 0, mock.mockAgent.serviceDeregisterCalled) + }, + }, + { + "should stop watcher if started", + nr.Metadata{Base: metadata.Base{ + Properties: getTestPropsWithoutKey(""), + }, Configuration: nil}, + func(t *testing.T, metadata nr.Metadata) { + t.Helper() + + var mock mockClient + resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}, make(chan struct{})).(*resolver) + resolver.watcherStarted.Store(true) + + go resolver.Close() + + sleepTimer := time.NewTimer(time.Second) + watcherStoppedInItem := false + select { + case <-sleepTimer.C: + case <-resolver.watcherStopChannel: + watcherStoppedInItem = true + } + + assert.True(t, watcherStoppedInItem) + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.testName, func(t *testing.T) { + t.Parallel() + tt.test(t, tt.metadata) + }) + } +} + func TestRegistry(t *testing.T) { t.Parallel() @@ -1279,6 +1487,22 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, daprPort, actual.Registration.Meta["random_key"]) }, }, + { + "SelfDeregister should set DeregisterOnClose", + nr.Metadata{ + Base: metadata.Base{Properties: getTestPropsWithoutKey("")}, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, + "SelfDeregister": true, + }, + }, + func(t *testing.T, metadata nr.Metadata) { + t.Helper() + actual, _ := getConfig(metadata) + + assert.Equal(t, true, actual.DeregisterOnClose) + }, + }, { "missing AppID property should error when SelfRegister true", nr.Metadata{ diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index 4bc18b6cd6..ece97ab05d 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -273,6 +273,7 @@ func (r *resolver) runWatchLoop(p *watchPlan) { watchPlanComplete := make(chan struct{}, 1) +watchLoop: for { ctx, cancel := context.WithCancel(context.Background()) @@ -327,6 +328,11 @@ func (r *resolver) runWatchLoop(p *watchPlan) { // reset plan failure count and query index p.failing = false p.lastParamVal = waitIndexVal(0) + + // resolver closing + case <-r.watcherStopChannel: + cancel() + break watchLoop } } } From ff8cc371ddd9fa9e44eaf8e9c5cc78cd46f30de2 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Fri, 13 Oct 2023 11:56:38 +0100 Subject: [PATCH 20/20] fix lint issues Signed-off-by: Abdulaziz Elsheikh --- nameresolution/consul/consul_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index bb98d0107c..87fb3653a3 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -1048,12 +1048,12 @@ func TestClose(t *testing.T) { t.Helper() var mock mockClient - resolverConfig := resolverConfig{ + cfg := resolverConfig{ Registration: &consul.AgentServiceRegistration{}, DeregisterOnClose: true, } - resolver := newResolver(logger.NewLogger("test"), resolverConfig, &mock, ®istry{}, make(chan struct{})).(*resolver) + resolver := newResolver(logger.NewLogger("test"), cfg, &mock, ®istry{}, make(chan struct{})).(*resolver) resolver.Close() assert.Equal(t, 1, mock.mockAgent.serviceDeregisterCalled) @@ -1068,12 +1068,12 @@ func TestClose(t *testing.T) { t.Helper() var mock mockClient - resolverConfig := resolverConfig{ + cfg := resolverConfig{ Registration: &consul.AgentServiceRegistration{}, DeregisterOnClose: false, } - resolver := newResolver(logger.NewLogger("test"), resolverConfig, &mock, ®istry{}, make(chan struct{})).(*resolver) + resolver := newResolver(logger.NewLogger("test"), cfg, &mock, ®istry{}, make(chan struct{})).(*resolver) resolver.Close() assert.Equal(t, 0, mock.mockAgent.serviceDeregisterCalled) @@ -1088,12 +1088,12 @@ func TestClose(t *testing.T) { t.Helper() var mock mockClient - resolverConfig := resolverConfig{ + cfg := resolverConfig{ Registration: nil, DeregisterOnClose: true, } - resolver := newResolver(logger.NewLogger("test"), resolverConfig, &mock, ®istry{}, make(chan struct{})).(*resolver) + resolver := newResolver(logger.NewLogger("test"), cfg, &mock, ®istry{}, make(chan struct{})).(*resolver) resolver.Close() assert.Equal(t, 0, mock.mockAgent.serviceDeregisterCalled)