From 78768ec90b727b0fc65ba746d34620a89da08cc0 Mon Sep 17 00:00:00 2001 From: "Elsheikh, Abdulaziz (London)" Date: Thu, 17 Jun 2021 10:23:10 +0100 Subject: [PATCH 01/11] nr_consul_cache init commit for consul nr caching --- nameresolution/consul/Readme.md | 6 +- nameresolution/consul/configuration.go | 18 +- nameresolution/consul/consul.go | 177 ++++++-- nameresolution/consul/consul_test.go | 602 ++++++++++++++++++++++--- nameresolution/consul/watcher.go | 168 +++++++ 5 files changed, 857 insertions(+), 114 deletions(-) create mode 100644 nameresolution/consul/watcher.go diff --git a/nameresolution/consul/Readme.md b/nameresolution/consul/Readme.md index a53824954b..154461cf74 100644 --- a/nameresolution/consul/Readme.md +++ b/nameresolution/consul/Readme.md @@ -1,6 +1,6 @@ # Consul Name Resolution -The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behaviour on init and resolution. +The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behavior on init and resolution. ## How To Use @@ -35,7 +35,7 @@ spec: ``` -## Behaviour +## Behavior On init the consul component will either validate the connection to the configured (or default) agent or register the service if configured to do so. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services. @@ -56,7 +56,7 @@ As of writing the configuration spec is fixed to v1.3.0 of the consul api | DaprPortMetaKey | `string` | The key used for getting the Dapr sidecar port from consul service metadata during service resolution, it will also be used to set the Dapr sidecar port in metadata during registration. If blank it will default to `DAPR_PORT` | | SelfRegister | `bool` | Controls if Dapr will register the service to consul. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services. | | AdvancedRegistration | [*api.AgentServiceRegistration](https://pkg.go.dev/github.com/hashicorp/consul/api@v1.3.0#AgentServiceRegistration) | Gives full control of service registration through configuration. If configured the component will ignore any configuration of Checks, Tags, Meta and SelfRegister. | - +| UseCache | `bool` | Configures if Dapr will cache the resolved services in-memory. This is done using consul [blocking queries](https://www.consul.io/api-docs/features/blocking) which can be configured via the QueryOptions configuration. If blank it will default to `true` | ## Samples Configurations ### Basic diff --git a/nameresolution/consul/configuration.go b/nameresolution/consul/configuration.go index c64932e615..f53102d665 100644 --- a/nameresolution/consul/configuration.go +++ b/nameresolution/consul/configuration.go @@ -11,6 +11,8 @@ import ( "github.com/dapr/components-contrib/internal/config" ) +const defaultDaprPortMetaKey string = "DAPR_PORT" // default key for DaprPort in meta + // The intermediateConfig is based off of the consul api types. User configurations are // deserialized into this type before being converted to the equivalent consul types // that way breaking changes in future versions of the consul api cannot break user configuration @@ -21,8 +23,9 @@ type intermediateConfig struct { Meta map[string]string QueryOptions *QueryOptions AdvancedRegistration *AgentServiceRegistration // advanced use-case - SelfRegister bool DaprPortMetaKey string + SelfRegister bool + UseCache bool } type configSpec struct { @@ -32,8 +35,16 @@ type configSpec struct { Meta map[string]string QueryOptions *consul.QueryOptions AdvancedRegistration *consul.AgentServiceRegistration // advanced use-case - SelfRegister bool DaprPortMetaKey string + SelfRegister bool + UseCache bool +} + +func newIntermediateConfig() intermediateConfig { + return intermediateConfig{ + UseCache: true, + DaprPortMetaKey: defaultDaprPortMetaKey, + } } func parseConfig(rawConfig interface{}) (configSpec, error) { @@ -51,7 +62,7 @@ func parseConfig(rawConfig interface{}) (configSpec, error) { decoder := json.NewDecoder(bytes.NewReader(data)) decoder.DisallowUnknownFields() - var configuration intermediateConfig + configuration := newIntermediateConfig() if err := decoder.Decode(&configuration); err != nil { return result, fmt.Errorf("error deserializing to configSpec: %w", err) } @@ -71,6 +82,7 @@ func mapConfig(config intermediateConfig) configSpec { AdvancedRegistration: mapAdvancedRegistration(config.AdvancedRegistration), SelfRegister: config.SelfRegister, DaprPortMetaKey: config.DaprPortMetaKey, + UseCache: config.UseCache, } } diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index d1d821194b..ff62afeaa5 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "strconv" + "sync" consul "github.com/hashicorp/consul/api" @@ -12,8 +13,6 @@ import ( "github.com/dapr/kit/logger" ) -const daprMeta string = "DAPR_PORT" // default key for DAPR_PORT metadata - type client struct { *consul.Client } @@ -53,9 +52,117 @@ type healthInterface interface { } type resolver struct { - config resolverConfig - logger logger.Logger - client clientInterface + config resolverConfig + logger logger.Logger + client clientInterface + registry registryInterface +} + +type registryInterface interface { + get(service string) *registryEntry + expire(service string) // clears slice of instances + remove(service string) // removes entry from registry + addOrUpdate(service string, services []*consul.ServiceEntry) +} + +type registry struct { + entries map[string]*registryEntry + mu sync.RWMutex +} + +type registryEntry struct { + services []*consul.ServiceEntry + mu sync.RWMutex +} + +func (r *registry) get(service string) *registryEntry { + return r.entries[service] +} + +func (e *registryEntry) next() *consul.ServiceEntry { + e.mu.RLock() + defer e.mu.RUnlock() + + if len(e.services) == 0 { + return nil + } + + return shuffle(e.services)[0] +} + +func (r *resolver) getService(service string) (*consul.ServiceEntry, error) { + var services []*consul.ServiceEntry + + if r.config.UseCache { + var entry *registryEntry + + if entry = r.registry.get(service); entry != nil { + result := entry.next() + + if result != nil { + return result, nil + } + } else { + r.watchService(service) + } + } + + options := *r.config.QueryOptions + options.WaitHash = "" + options.WaitIndex = 0 + services, _, err := r.client.Health().Service(service, "", true, &options) + + if err != nil { + return nil, fmt.Errorf("failed to query healthy consul services: %w", err) + } else if len(services) == 0 { + return nil, fmt.Errorf("no healthy services found with AppID:%s", service) + } + + return shuffle(services)[0], nil +} + +func (r *registry) addOrUpdate(service string, services []*consul.ServiceEntry) { + var entry *registryEntry + + if entry = r.get(service); entry == nil { + r.mu.Lock() + if _, ok := r.entries[service]; !ok { + r.entries[service] = ®istryEntry{ + services: services, + } + } + r.mu.Unlock() + + return + } + + entry.mu.Lock() + defer entry.mu.Unlock() + + entry.services = services +} + +func (r *registry) remove(service string) { + if entry := r.get(service); entry == nil { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + delete(r.entries, service) +} + +func (r *registry) expire(service string) { + var entry *registryEntry + + if entry = r.get(service); entry == nil { + return + } + + entry.mu.Lock() + defer entry.mu.Unlock() + + entry.services = nil } type resolverConfig struct { @@ -63,18 +170,20 @@ type resolverConfig struct { QueryOptions *consul.QueryOptions Registration *consul.AgentServiceRegistration DaprPortMetaKey string + UseCache bool } // NewResolver creates Consul name resolver. func NewResolver(logger logger.Logger) nr.Resolver { - return newResolver(logger, resolverConfig{}, &client{}) + return newResolver(logger, resolverConfig{}, &client{}, ®istry{entries: map[string]*registryEntry{}}) } -func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface) nr.Resolver { +func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface) nr.Resolver { return &resolver{ - logger: logger, - config: resolverConfig, - client: client, + logger: logger, + config: resolverConfig, + client: client, + registry: registry, } } @@ -107,32 +216,15 @@ func (r *resolver) Init(metadata nr.Metadata) error { // ResolveID resolves name to address via consul func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) { - cfg := r.config - services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions) - if err != nil { - return "", fmt.Errorf("failed to query healthy consul services: %w", err) - } - - if len(services) == 0 { - return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID) - } - - shuffle := func(services []*consul.ServiceEntry) []*consul.ServiceEntry { - for i := len(services) - 1; i > 0; i-- { - rndbig, _ := rand.Int(rand.Reader, big.NewInt(int64(i+1))) - j := rndbig.Int64() + var addr string - services[i], services[j] = services[j], services[i] - } + svc, err := r.getService(req.ID) - return services + if err != nil { + return "", err } - svc := shuffle(services)[0] - - addr := "" - - if port, ok := svc.Service.Meta[cfg.DaprPortMetaKey]; ok { + if port, ok := svc.Service.Meta[r.config.DaprPortMetaKey]; ok { if svc.Service.Address != "" { addr = fmt.Sprintf("%s:%s", svc.Service.Address, port) } else if svc.Node.Address != "" { @@ -141,12 +233,23 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) { return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID) } } else { - return "", fmt.Errorf("target service AppID:%s found but DAPR_PORT missing from meta", req.ID) + return "", fmt.Errorf("target service AppID:%s found but %s missing from meta", req.ID, r.config.DaprPortMetaKey) } return addr, nil } +func shuffle(services []*consul.ServiceEntry) []*consul.ServiceEntry { + for i := len(services) - 1; i > 0; i-- { + rndbig, _ := rand.Int(rand.Reader, big.NewInt(int64(i+1))) + j := rndbig.Int64() + + services[i], services[j] = services[j], services[i] + } + + return services +} + // getConfig configuration from metadata, defaults are best suited for self-hosted mode func getConfig(metadata nr.Metadata) (resolverConfig, error) { var daprPort string @@ -165,12 +268,8 @@ func getConfig(metadata nr.Metadata) (resolverConfig, error) { return resolverCfg, err } - // set DaprPortMetaKey used for registring DaprPort and resolving from Consul - if cfg.DaprPortMetaKey == "" { - resolverCfg.DaprPortMetaKey = daprMeta - } else { - resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey - } + resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey + resolverCfg.UseCache = cfg.UseCache resolverCfg.Client = getClientConfig(cfg) if resolverCfg.Registration, err = getRegistrationConfig(cfg, props); err != nil { diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index fd6c529a73..6971df8bcb 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" "testing" + "time" consul "github.com/hashicorp/consul/api" "github.com/stretchr/testify/assert" @@ -35,16 +36,25 @@ func (m *mockClient) Agent() agentInterface { } type mockHealth struct { - serviceCalled int - serviceErr error - serviceResult []*consul.ServiceEntry - serviceMeta *consul.QueryMeta + serviceCalled int + serviceErr *error + serviceBehavior func(service, tag string, passingOnly bool, q *consul.QueryOptions) + serviceResult []*consul.ServiceEntry + serviceMeta *consul.QueryMeta } func (m *mockHealth) Service(service, tag string, passingOnly bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) { + if m.serviceBehavior != nil { + m.serviceBehavior(service, tag, passingOnly, q) + } + m.serviceCalled++ - return m.serviceResult, m.serviceMeta, m.serviceErr + if m.serviceErr == nil { + return m.serviceResult, m.serviceMeta, nil + } + + return m.serviceResult, m.serviceMeta, *m.serviceErr } type mockAgent struct { @@ -67,6 +77,32 @@ func (m *mockAgent) ServiceRegister(service *consul.AgentServiceRegistration) er return m.serviceRegisterErr } +type mockRegistry struct { + addOrUpdateCalled int + expireCalled int + removeCalled int + getCalled int + getResult *registryEntry +} + +func (m *mockRegistry) addOrUpdate(service string, services []*consul.ServiceEntry) { + m.addOrUpdateCalled++ +} + +func (m *mockRegistry) expire(service string) { + m.expireCalled++ +} + +func (m *mockRegistry) remove(service string) { + m.removeCalled++ +} + +func (m *mockRegistry) get(service string) *registryEntry { + m.getCalled++ + + return m.getResult +} + func TestInit(t *testing.T) { t.Parallel() @@ -85,7 +121,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock) + resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}) _ = resolver.Init(metadata) @@ -106,7 +142,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock) + resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}) _ = resolver.Init(metadata) @@ -128,7 +164,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock) + resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}) _ = resolver.Init(metadata) @@ -149,9 +185,9 @@ func TestInit(t *testing.T) { } func TestResolveID(t *testing.T) { - t.Parallel() - testConfig := &resolverConfig{ + testConfig := resolverConfig{ DaprPortMetaKey: "DAPR_PORT", + QueryOptions: &consul.QueryOptions{}, } tests := []struct { @@ -159,6 +195,302 @@ func TestResolveID(t *testing.T) { req nr.ResolveRequest test func(*testing.T, nr.ResolveRequest) }{ + { + "should use cache when enabled", + nr.ResolveRequest{ + ID: "test-app", + }, + func(t *testing.T, req nr.ResolveRequest) { + t.Helper() + + blockingCall := make(chan uint64) + firstTime := true + meta := &consul.QueryMeta{ + LastIndex: 0, + } + + serviceEntries := []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "123.234.345.456", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + } + + mock := &mockClient{ + mockHealth: mockHealth{ + serviceResult: serviceEntries, + serviceMeta: meta, + serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { + if firstTime { + firstTime = false + } else { + meta.LastIndex = <-blockingCall + } + }, + serviceErr: nil, + }, + } + + cfg := resolverConfig{ + DaprPortMetaKey: "DAPR_PORT", + UseCache: true, + QueryOptions: &consul.QueryOptions{}, + } + + mockReg := &mockRegistry{} + resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg) + addr, _ := resolver.ResolveID(req) + + // Cache miss pass through + assert.Equal(t, 1, mockReg.getCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) + assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, "123.234.345.456:50005", addr) + + mockReg.getResult = ®istryEntry{ + services: serviceEntries, + } + + blockingCall <- 2 + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 2 }) + assert.Equal(t, 2, mockReg.addOrUpdateCalled) + + // no update when no change in index and payload + blockingCall <- 2 + assert.Equal(t, 2, mockReg.addOrUpdateCalled) + + // no update when no change in payload + blockingCall <- 3 + assert.Equal(t, 2, mockReg.addOrUpdateCalled) + + // update when change in index and payload + mock.mockHealth.serviceResult = []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "123.234.345.456", + Port: 8601, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + } + blockingCall <- 4 + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 3 }) + assert.Equal(t, 3, mockReg.addOrUpdateCalled) + + _, _ = resolver.ResolveID(req) + assert.Equal(t, 2, mockReg.getCalled) + }, + }, + { + "should remove from cache if watch panic", + nr.ResolveRequest{ + ID: "test-app", + }, + func(t *testing.T, req nr.ResolveRequest) { + t.Helper() + + meta := &consul.QueryMeta{ + LastIndex: 0, + } + firstTime := true + var mock = mockClient{ + mockHealth: mockHealth{ + serviceResult: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "123.234.345.456", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + }, + serviceMeta: meta, + serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { + if firstTime { + firstTime = false + } else { + panic("oh no") + } + }, + serviceErr: nil, + }, + } + + cfg := resolverConfig{ + DaprPortMetaKey: "DAPR_PORT", + UseCache: true, + QueryOptions: &consul.QueryOptions{}, + } + + mockReg := &mockRegistry{} + resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg) + addr, _ := resolver.ResolveID(req) + + // Cache miss pass through + assert.Equal(t, 1, mockReg.getCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) + assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, "123.234.345.456:50005", addr) + + // Remove + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.removeCalled == 1 }) + assert.Equal(t, 0, mockReg.expireCalled) + assert.Equal(t, 1, mockReg.removeCalled) + }, + }, + { + "should use stale agent cache if available", + nr.ResolveRequest{ + ID: "test-app", + }, + func(t *testing.T, req nr.ResolveRequest) { + t.Helper() + + blockingCall := make(chan uint64) + firstTime := true + meta := &consul.QueryMeta{} + + var err error + + var mock = mockClient{ + mockHealth: mockHealth{ + serviceResult: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "123.234.345.456", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + }, + serviceMeta: meta, + serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { + if firstTime { + firstTime = false + } else { + if q.WaitIndex > 0 { + err = fmt.Errorf("oh no") + } else { + err = nil + } + + meta.LastIndex = <-blockingCall + } + }, + serviceErr: &err, + }, + } + + cfg := resolverConfig{ + DaprPortMetaKey: "DAPR_PORT", + UseCache: true, + QueryOptions: &consul.QueryOptions{ + WaitIndex: 1, + }, + } + + mockReg := &mockRegistry{} + resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg) + addr, _ := resolver.ResolveID(req) + + // Cache miss pass through + assert.Equal(t, 1, mockReg.getCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) + assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, "123.234.345.456:50005", addr) + + // Blocking call will error as WaitIndex = 1 + blockingCall <- 2 + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.serviceCalled == 2 }) + assert.Equal(t, 2, mock.mockHealth.serviceCalled) + + // Will make a non-blocking call to stale cache + meta.CacheHit = true + meta.CacheAge = time.Hour + blockingCall <- 3 + waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.serviceCalled == 3 }) + assert.Equal(t, 3, mock.mockHealth.serviceCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 2 }) + assert.Equal(t, 2, mockReg.addOrUpdateCalled) + }, + }, + { + "should expire cache upon blocking call error", + nr.ResolveRequest{ + ID: "test-app", + }, + func(t *testing.T, req nr.ResolveRequest) { + t.Helper() + + blockingCall := make(chan uint64) + firstTime := true + meta := &consul.QueryMeta{ + LastIndex: 0, + } + + err := fmt.Errorf("oh no") + + var mock = mockClient{ + mockHealth: mockHealth{ + serviceResult: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "123.234.345.456", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + }, + serviceMeta: meta, + serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { + if firstTime { + firstTime = false + } else { + meta.LastIndex = <-blockingCall + } + }, + serviceErr: nil, + }, + } + + cfg := resolverConfig{ + DaprPortMetaKey: "DAPR_PORT", + UseCache: true, + QueryOptions: &consul.QueryOptions{}, + } + + mockReg := &mockRegistry{} + resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg) + addr, _ := resolver.ResolveID(req) + + // Cache miss pass through + assert.Equal(t, 1, mockReg.getCalled) + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) + assert.Equal(t, 1, mockReg.addOrUpdateCalled) + assert.Equal(t, "123.234.345.456:50005", addr) + + // Error and release blocking call + mock.mockHealth.serviceErr = &err + blockingCall <- 2 + + // Cache expired + waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.expireCalled == 1 }) + assert.Equal(t, 1, mockReg.expireCalled) + }, + }, { "error if no healthy services found", nr.ResolveRequest{ @@ -171,7 +503,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{}, }, } - resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) _, err := resolver.ResolveID(req) assert.Equal(t, 1, mock.mockHealth.serviceCalled) @@ -200,7 +532,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) addr, _ := resolver.ResolveID(req) @@ -244,7 +576,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) addr, _ := resolver.ResolveID(req) @@ -273,7 +605,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) _, err := resolver.ResolveID(req) @@ -299,7 +631,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) _, err := resolver.ResolveID(req) @@ -311,12 +643,114 @@ func TestResolveID(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.testName, func(t *testing.T) { - t.Parallel() tt.test(t, tt.req) }) } } +func TestRegistry(t *testing.T) { + t.Parallel() + + appID := "myService" + tests := []struct { + testName string + test func(*testing.T) + }{ + { + "should add and update entry", + func(t *testing.T) { + t.Helper() + + registry := ®istry{entries: map[string]*registryEntry{}} + + result := []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "123.234.345.456", + Port: 8600, + }, + }, + } + + registry.addOrUpdate(appID, result) + assert.Equal(t, result, registry.entries[appID].services) + + update := []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "random", + Port: 123, + }, + }, + } + + registry.addOrUpdate(appID, update) + assert.Equal(t, update, registry.entries[appID].services) + }, + }, + { + "should expire entries", + func(t *testing.T) { + t.Helper() + + registry := ®istry{ + entries: map[string]*registryEntry{ + appID: { + services: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "123.234.345.456", + Port: 8600, + }, + }, + }, + }, + }, + } + + assert.NotNil(t, registry.entries[appID].services) + + registry.expire(appID) + + assert.Nil(t, registry.entries[appID].services) + }, + }, + { + "should remove entry", + func(t *testing.T) { + t.Helper() + + registry := ®istry{ + entries: map[string]*registryEntry{ + appID: { + services: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "123.234.345.456", + Port: 8600, + }, + }, + }, + }, + }, + } + + registry.remove(appID) + + assert.Nil(t, registry.entries[appID]) + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.testName, func(t *testing.T) { + t.Parallel() + tt.test(t) + }) + } +} + func TestParseConfig(t *testing.T) { t.Parallel() @@ -351,6 +785,8 @@ func TestParseConfig(t *testing.T) { "UseCache": true, "Filter": "Checks.ServiceTags contains dapr", }, + "DaprPortMetaKey": "DAPR_PORT", + "UseCache": false, }, configSpec{ Checks: []*consul.AgentServiceCheck{ @@ -374,6 +810,8 @@ func TestParseConfig(t *testing.T) { UseCache: true, Filter: "Checks.ServiceTags contains dapr", }, + DaprPortMetaKey: "DAPR_PORT", + UseCache: false, }, }, { @@ -407,7 +845,10 @@ func TestParseConfig(t *testing.T) { "empty configuration in metadata", true, nil, - configSpec{}, + configSpec{ + UseCache: true, + DaprPortMetaKey: defaultDaprPortMetaKey, + }, }, { "fail on unsupported map key", @@ -467,15 +908,18 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, true, actual.QueryOptions.UseCache) // DaprPortMetaKey - assert.Equal(t, "DAPR_PORT", actual.DaprPortMetaKey) + assert.Equal(t, defaultDaprPortMetaKey, actual.DaprPortMetaKey) + + // Cache + assert.Equal(t, true, actual.UseCache) }, }, { "empty configuration with SelfRegister should default correctly", nr.Metadata{ Properties: getTestPropsWithoutKey(""), - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -494,22 +938,25 @@ func TestGetConfig(t *testing.T) { // Metadata assert.Equal(t, 1, len(actual.Registration.Meta)) - assert.Equal(t, "50001", actual.Registration.Meta["DAPR_PORT"]) + assert.Equal(t, "50001", actual.Registration.Meta[actual.DaprPortMetaKey]) // QueryOptions assert.Equal(t, true, actual.QueryOptions.UseCache) // DaprPortMetaKey - assert.Equal(t, "DAPR_PORT", actual.DaprPortMetaKey) + assert.Equal(t, defaultDaprPortMetaKey, actual.DaprPortMetaKey) + + // Cache + assert.Equal(t, true, actual.UseCache) }, }, { "DaprPortMetaKey should set registration meta and config used for resolve", nr.Metadata{ Properties: getTestPropsWithoutKey(""), - Configuration: configSpec{ - SelfRegister: true, - DaprPortMetaKey: "random_key", + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, + "DaprPortMetaKey": "random_key", }, }, func(t *testing.T, metadata nr.Metadata) { @@ -526,8 +973,8 @@ func TestGetConfig(t *testing.T) { "missing AppID property should error when SelfRegister true", nr.Metadata{ Properties: getTestPropsWithoutKey(nr.AppID), - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -556,8 +1003,8 @@ func TestGetConfig(t *testing.T) { "missing AppPort property should error when SelfRegister true", nr.Metadata{ Properties: getTestPropsWithoutKey(nr.AppPort), - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -586,8 +1033,8 @@ func TestGetConfig(t *testing.T) { "missing HostAddress property should error when SelfRegister true", nr.Metadata{ Properties: getTestPropsWithoutKey(nr.HostAddress), - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -616,8 +1063,8 @@ func TestGetConfig(t *testing.T) { "missing DaprHTTPPort property should error only when SelfRegister true", nr.Metadata{ Properties: getTestPropsWithoutKey(nr.DaprHTTPPort), - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -679,27 +1126,29 @@ func TestGetConfig(t *testing.T) { "registration should configure correctly", nr.Metadata{ Properties: getTestPropsWithoutKey(""), - Configuration: configSpec{ - Checks: []*consul.AgentServiceCheck{ - { - Name: "test-app health check name", - CheckID: "test-app health check id", - Interval: "15s", - HTTP: "http://127.0.0.1:3500/health", + Configuration: map[interface{}]interface{}{ + "Checks": []interface{}{ + map[interface{}]interface{}{ + "Name": "test-app health check name", + "CheckID": "test-app health check id", + "Interval": "15s", + "HTTP": "http://127.0.0.1:3500/health", }, }, - Tags: []string{ + "Tags": []interface{}{ "test", }, - Meta: map[string]string{ + "Meta": map[interface{}]interface{}{ "APP_PORT": "8650", "DAPR_GRPC_PORT": "50005", }, - QueryOptions: &consul.QueryOptions{ - UseCache: false, - Filter: "Checks.ServiceTags contains something", + "QueryOptions": map[interface{}]interface{}{ + "UseCache": false, + "Filter": "Checks.ServiceTags contains something", }, - SelfRegister: true, + "SelfRegister": true, + "DaprPortMetaKey": "PORT", + "UseCache": false, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -720,50 +1169,53 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, "test", actual.Registration.Tags[0]) assert.Equal(t, "8650", actual.Registration.Meta["APP_PORT"]) assert.Equal(t, "50005", actual.Registration.Meta["DAPR_GRPC_PORT"]) + assert.Equal(t, metadata.Properties[nr.DaprPort], actual.Registration.Meta["PORT"]) assert.Equal(t, false, actual.QueryOptions.UseCache) assert.Equal(t, "Checks.ServiceTags contains something", actual.QueryOptions.Filter) + assert.Equal(t, "PORT", actual.DaprPortMetaKey) + assert.Equal(t, false, actual.UseCache) }, }, { "advanced registration should override/ignore other configs", nr.Metadata{ Properties: getTestPropsWithoutKey(""), - Configuration: configSpec{ - AdvancedRegistration: &consul.AgentServiceRegistration{ - Name: "random-app-id", - Port: 0o00, - Address: "123.345.678", - Tags: []string{"random-tag"}, - Meta: map[string]string{ + Configuration: map[interface{}]interface{}{ + "AdvancedRegistration": map[interface{}]interface{}{ + "Name": "random-app-id", + "Port": 000, + "Address": "123.345.678", + "Tags": []string{"random-tag"}, + "Meta": map[string]string{ "APP_PORT": "000", }, - Checks: []*consul.AgentServiceCheck{ - { - Name: "random health check name", - CheckID: "random health check id", - Interval: "15s", - HTTP: "http://127.0.0.1:3500/health", + "Checks": []interface{}{ + map[interface{}]interface{}{ + "Name": "random health check name", + "CheckID": "random health check id", + "Interval": "15s", + "HTTP": "http://127.0.0.1:3500/health", }, }, }, - Checks: []*consul.AgentServiceCheck{ - { - Name: "test-app health check name", - CheckID: "test-app health check id", - Interval: "15s", - HTTP: "http://127.0.0.1:3500/health", + "Checks": []interface{}{ + map[interface{}]interface{}{ + "Name": "test-app health check name", + "CheckID": "test-app health check id", + "Interval": "15s", + "HTTP": "http://127.0.0.1:3500/health", }, }, - Tags: []string{ + "Tags": []string{ "dapr", "test", }, - Meta: map[string]string{ + "Meta": map[string]string{ "APP_PORT": "123", "DAPR_HTTP_PORT": "3500", "DAPR_GRPC_PORT": "50005", }, - SelfRegister: false, + "SelfRegister": false, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -774,7 +1226,7 @@ func TestGetConfig(t *testing.T) { assert.NotNil(t, actual.Registration) assert.Equal(t, "random-app-id", actual.Registration.Name) assert.Equal(t, "123.345.678", actual.Registration.Address) - assert.Equal(t, 0o00, actual.Registration.Port) + assert.Equal(t, 000, actual.Registration.Port) assert.Equal(t, "random health check name", actual.Registration.Checks[0].Name) assert.Equal(t, "000", actual.Registration.Meta["APP_PORT"]) assert.Equal(t, "random-tag", actual.Registration.Tags[0]) @@ -1067,6 +1519,7 @@ func TestMapConfig(t *testing.T) { }, SelfRegister: true, DaprPortMetaKey: "SOMETHINGSOMETHING", + UseCache: false, } actual := mapConfig(expected) @@ -1083,6 +1536,7 @@ func TestMapConfig(t *testing.T) { assert.Equal(t, expected.Meta, actual.Meta) assert.Equal(t, expected.SelfRegister, actual.SelfRegister) assert.Equal(t, expected.DaprPortMetaKey, actual.DaprPortMetaKey) + assert.Equal(t, expected.UseCache, actual.UseCache) }) t.Run("should map empty configuration", func(t *testing.T) { @@ -1239,3 +1693,13 @@ func getTestPropsWithoutKey(removeKey string) map[string]string { return metadata } + +func waitTillTrueOrTimeout(d time.Duration, condition func() bool) { + for i := 0; i < 100; i++ { + if condition() { + return + } + + time.Sleep(d / 100) + } +} diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go new file mode 100644 index 0000000000..e9941f6281 --- /dev/null +++ b/nameresolution/consul/watcher.go @@ -0,0 +1,168 @@ +package consul + +import ( + "context" + "fmt" + "reflect" + "strconv" + "time" + + consul "github.com/hashicorp/consul/api" +) + +const ( + // retryInterval is the base retry value + retryInterval = 5 * time.Second + + // maximum back off time, this is to prevent exponential runaway + maxBackoffTime = 180 * time.Second +) + +type watchPlan struct { + expired bool + lastParamVal blockingParamVal + lastResult []*consul.ServiceEntry + service string + options *consul.QueryOptions +} + +type blockingParamVal interface { + equal(other blockingParamVal) bool + next(previous blockingParamVal) blockingParamVal +} + +type waitIndexVal uint64 + +// Equal implements BlockingParamVal +func (idx waitIndexVal) equal(other blockingParamVal) bool { + if otherIdx, ok := other.(waitIndexVal); ok { + return idx == otherIdx + } + + return false +} + +// Next implements BlockingParamVal +func (idx waitIndexVal) next(previous blockingParamVal) blockingParamVal { + if previous == nil { + return idx + } + prevIdx, ok := previous.(waitIndexVal) + if ok && prevIdx == idx { + // This value is the same as the previous index, reset + return waitIndexVal(0) + } + + return idx +} + +func (r *resolver) watch(p *watchPlan) (blockingParamVal, []*consul.ServiceEntry, bool, error) { + ctx, cancel := context.WithCancel(context.Background()) + p.options = p.options.WithContext(ctx) + + if p.lastParamVal != nil { + p.options.WaitIndex = uint64(p.lastParamVal.(waitIndexVal)) + } + + defer cancel() + nodes, meta, err := r.client.Health().Service(p.service, "", true, p.options) + + // If error try again without blocking (for stale agent cache) + if err != nil && p.options.WaitIndex != uint64(0) { + p.options.WaitIndex = 0 + nodes, meta, err = r.client.Health().Service(p.service, "", true, p.options) + } + + if err != nil { + if p.options.WaitIndex == uint64(0) && !p.expired { + p.lastResult = nil + p.expired = true + r.registry.expire(p.service) + } + + return nil, nil, false, err + } else if meta.CacheHit && meta.CacheAge > 0 { + err = fmt.Errorf("agent cache is stale (age %s)", meta.CacheAge.String()) + p.expired = false + + return nil, nodes, true, err + } + + p.expired = false + + return waitIndexVal(meta.LastIndex), nodes, false, err +} + +func (r *resolver) runWatchPlan(p *watchPlan) { + defer func() { + recover() + r.registry.remove(p.service) + }() + + // add to registry as now begun watching + r.registry.addOrUpdate(p.service, nil) + failures := 0 + + for { + // invoke blocking call + blockParam, result, stale, err := r.watch(p) + + // handle an error in the watch function + if err != nil { + // perform an exponential backoff + failures++ + + // always 0 on err so query is forced to return and set p.healthy! + p.lastParamVal = waitIndexVal(0) + + retry := retryInterval * time.Duration(failures*failures) + if retry > maxBackoffTime { + retry = maxBackoffTime + } + + r.logger.Errorf("consul service-watcher:%s error: %v, retry in %v", p.service, err, retry) + + if stale { + r.logger.Debugf("updating registry for service:%s using stale cache", p.service) + r.registry.addOrUpdate(p.service, result) + } + + time.Sleep(retry) + + continue + } + + // clear the failures + failures = 0 + + // if the index is unchanged do nothing + if p.lastParamVal != nil && p.lastParamVal.equal(blockParam) { + continue + } + + // update the index, look for change + oldParamVal := p.lastParamVal + p.lastParamVal = blockParam.next(oldParamVal) + if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) { + continue + } + + // handle the updated result + p.lastResult = result + r.logger.Debugf( + "updating registry for service:%s last-index:%s", + p.service, + strconv.FormatUint(uint64(p.lastParamVal.(waitIndexVal)), 10)) + r.registry.addOrUpdate(p.service, result) + } +} + +func (r *resolver) watchService(service string) { + options := *r.config.QueryOptions + plan := &watchPlan{ + service: service, + options: &options, + } + + go r.runWatchPlan(plan) +} From e68e45b1420d06a8ffc9c544e485993560d58de9 Mon Sep 17 00:00:00 2001 From: "Elsheikh, Abdulaziz (London)" Date: Thu, 17 Jun 2021 17:35:20 +0100 Subject: [PATCH 02/11] nr_consul_cache resolving lint issues --- nameresolution/consul/consul.go | 1 - nameresolution/consul/consul_test.go | 6 +++--- nameresolution/consul/watcher.go | 1 - 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index ff62afeaa5..0983506541 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -219,7 +219,6 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) { var addr string svc, err := r.getService(req.ID) - if err != nil { return "", err } diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index 6971df8bcb..bf3194cc7c 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -300,7 +300,7 @@ func TestResolveID(t *testing.T) { LastIndex: 0, } firstTime := true - var mock = mockClient{ + mock := mockClient{ mockHealth: mockHealth{ serviceResult: []*consul.ServiceEntry{ { @@ -361,7 +361,7 @@ func TestResolveID(t *testing.T) { var err error - var mock = mockClient{ + mock := mockClient{ mockHealth: mockHealth{ serviceResult: []*consul.ServiceEntry{ { @@ -441,7 +441,7 @@ func TestResolveID(t *testing.T) { err := fmt.Errorf("oh no") - var mock = mockClient{ + mock := mockClient{ mockHealth: mockHealth{ serviceResult: []*consul.ServiceEntry{ { diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index e9941f6281..f4211788f6 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -106,7 +106,6 @@ func (r *resolver) runWatchPlan(p *watchPlan) { for { // invoke blocking call blockParam, result, stale, err := r.watch(p) - // handle an error in the watch function if err != nil { // perform an exponential backoff From ef53838cb40224ac64ea281b07e10539294b95ab Mon Sep 17 00:00:00 2001 From: "Elsheikh, Abdulaziz (London)" Date: Fri, 18 Jun 2021 14:58:38 +0100 Subject: [PATCH 03/11] nr_consul_cache shared memory improvements --- nameresolution/consul/consul.go | 38 ++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 0983506541..2e45d22f39 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -76,12 +76,15 @@ type registryEntry struct { } func (r *registry) get(service string) *registryEntry { + r.mu.RLock() + defer r.mu.RUnlock() + return r.entries[service] } func (e *registryEntry) next() *consul.ServiceEntry { - e.mu.RLock() - defer e.mu.RUnlock() + e.mu.Lock() + defer e.mu.Unlock() if len(e.services) == 0 { return nil @@ -124,31 +127,32 @@ func (r *resolver) getService(service string) (*consul.ServiceEntry, error) { func (r *registry) addOrUpdate(service string, services []*consul.ServiceEntry) { var entry *registryEntry - if entry = r.get(service); entry == nil { - r.mu.Lock() - if _, ok := r.entries[service]; !ok { - r.entries[service] = ®istryEntry{ - services: services, - } - } - r.mu.Unlock() + // update + if entry = r.get(service); entry != nil { + entry.mu.Lock() + defer entry.mu.Unlock() + + entry.services = services return } - entry.mu.Lock() - defer entry.mu.Unlock() - - entry.services = services + // add + r.mu.Lock() + defer r.mu.Unlock() + r.entries[service] = ®istryEntry{ + services: services, + } } func (r *registry) remove(service string) { - if entry := r.get(service); entry == nil { + r.mu.Lock() + defer r.mu.Unlock() + + if _, ok := r.entries[service]; !ok { return } - r.mu.Lock() - defer r.mu.Unlock() delete(r.entries, service) } From 522e3e30a14b12ddf1acf3f3bf02579907cc1f0b Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 27 Oct 2021 10:28:00 +0100 Subject: [PATCH 04/11] nr_consul_cache fixed linting issues --- nameresolution/consul/watcher.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nameresolution/consul/watcher.go b/nameresolution/consul/watcher.go index f4211788f6..3e822e460f 100644 --- a/nameresolution/consul/watcher.go +++ b/nameresolution/consul/watcher.go @@ -11,10 +11,10 @@ import ( ) const ( - // retryInterval is the base retry value + // retryInterval is the base retry value. retryInterval = 5 * time.Second - // maximum back off time, this is to prevent exponential runaway + // maximum back off time, this is to prevent exponential runaway. maxBackoffTime = 180 * time.Second ) @@ -33,7 +33,7 @@ type blockingParamVal interface { type waitIndexVal uint64 -// Equal implements BlockingParamVal +// Equal implements BlockingParamVal. func (idx waitIndexVal) equal(other blockingParamVal) bool { if otherIdx, ok := other.(waitIndexVal); ok { return idx == otherIdx @@ -42,7 +42,7 @@ func (idx waitIndexVal) equal(other blockingParamVal) bool { return false } -// Next implements BlockingParamVal +// Next implements BlockingParamVal. func (idx waitIndexVal) next(previous blockingParamVal) blockingParamVal { if previous == nil { return idx From a3fddf30164b9da200b85d3c936e50e7ddee9481 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Fri, 18 Mar 2022 10:21:08 +0000 Subject: [PATCH 05/11] nr_consul_cache changed registry to use sync.Map --- nameresolution/consul/consul.go | 27 ++++------- nameresolution/consul/consul_test.go | 67 ++++++++++++++++------------ 2 files changed, 48 insertions(+), 46 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 9f92d20577..2f4665e214 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -79,8 +79,7 @@ type registryInterface interface { } type registry struct { - entries map[string]*registryEntry - mu sync.RWMutex + entries sync.Map } type registryEntry struct { @@ -89,10 +88,11 @@ type registryEntry struct { } func (r *registry) get(service string) *registryEntry { - r.mu.RLock() - defer r.mu.RUnlock() + if result, ok := r.entries.Load(service); ok { + return result.(*registryEntry) + } - return r.entries[service] + return nil } func (e *registryEntry) next() *consul.ServiceEntry { @@ -151,22 +151,13 @@ func (r *registry) addOrUpdate(service string, services []*consul.ServiceEntry) } // add - r.mu.Lock() - defer r.mu.Unlock() - r.entries[service] = ®istryEntry{ + r.entries.Store(service, ®istryEntry{ services: services, - } + }) } func (r *registry) remove(service string) { - r.mu.Lock() - defer r.mu.Unlock() - - if _, ok := r.entries[service]; !ok { - return - } - - delete(r.entries, service) + r.entries.Delete(service) } func (r *registry) expire(service string) { @@ -192,7 +183,7 @@ type resolverConfig struct { // NewResolver creates Consul name resolver. func NewResolver(logger logger.Logger) nr.Resolver { - return newResolver(logger, resolverConfig{}, &client{}, ®istry{entries: map[string]*registryEntry{}}) + return newResolver(logger, resolverConfig{}, &client{}, ®istry{entries: sync.Map{}}) } func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface) nr.Resolver { diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index ee10d13ff7..b03a68f58b 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -16,6 +16,7 @@ package consul import ( "fmt" "strconv" + "sync" "testing" "time" @@ -674,7 +675,7 @@ func TestRegistry(t *testing.T) { func(t *testing.T) { t.Helper() - registry := ®istry{entries: map[string]*registryEntry{}} + registry := ®istry{entries: sync.Map{}} result := []*consul.ServiceEntry{ { @@ -686,7 +687,9 @@ func TestRegistry(t *testing.T) { } registry.addOrUpdate(appID, result) - assert.Equal(t, result, registry.entries[appID].services) + + entry, _ := registry.entries.Load(appID) + assert.Equal(t, result, entry.(*registryEntry).services) update := []*consul.ServiceEntry{ { @@ -698,7 +701,8 @@ func TestRegistry(t *testing.T) { } registry.addOrUpdate(appID, update) - assert.Equal(t, update, registry.entries[appID].services) + entry, _ = registry.entries.Load(appID) + assert.Equal(t, update, entry.(*registryEntry).services) }, }, { @@ -706,26 +710,30 @@ func TestRegistry(t *testing.T) { func(t *testing.T) { t.Helper() - registry := ®istry{ - entries: map[string]*registryEntry{ - appID: { - services: []*consul.ServiceEntry{ - { - Service: &consul.AgentService{ - Address: "123.234.345.456", - Port: 8600, - }, + entryMap := sync.Map{} + entryMap.Store( + appID, + ®istryEntry{ + services: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "123.234.345.456", + Port: 8600, }, }, - }, - }, + }}) + + registry := ®istry{ + entries: entryMap, } - assert.NotNil(t, registry.entries[appID].services) + entry, _ := registry.entries.Load(appID) + assert.NotNil(t, entry.(*registryEntry).services) registry.expire(appID) - assert.Nil(t, registry.entries[appID].services) + entry, _ = registry.entries.Load(appID) + assert.Nil(t, entry.(*registryEntry).services) }, }, { @@ -733,24 +741,27 @@ func TestRegistry(t *testing.T) { func(t *testing.T) { t.Helper() - registry := ®istry{ - entries: map[string]*registryEntry{ - appID: { - services: []*consul.ServiceEntry{ - { - Service: &consul.AgentService{ - Address: "123.234.345.456", - Port: 8600, - }, + entryMap := sync.Map{} + entryMap.Store( + appID, + ®istryEntry{ + services: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "123.234.345.456", + Port: 8600, }, }, - }, - }, + }}) + + registry := ®istry{ + entries: entryMap, } registry.remove(appID) - assert.Nil(t, registry.entries[appID]) + entry, _ := registry.entries.Load(appID) + assert.Nil(t, entry) }, }, } From 116989175fe34eaea3951874d5afbec351b46f82 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Fri, 18 Mar 2022 11:15:50 +0000 Subject: [PATCH 06/11] nr_consul_cache resolved copylocks --- nameresolution/consul/consul.go | 4 ++-- nameresolution/consul/consul_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 2f4665e214..58d8bc45ce 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -79,7 +79,7 @@ type registryInterface interface { } type registry struct { - entries sync.Map + entries *sync.Map } type registryEntry struct { @@ -183,7 +183,7 @@ type resolverConfig struct { // NewResolver creates Consul name resolver. func NewResolver(logger logger.Logger) nr.Resolver { - return newResolver(logger, resolverConfig{}, &client{}, ®istry{entries: sync.Map{}}) + return newResolver(logger, resolverConfig{}, &client{}, ®istry{entries: &sync.Map{}}) } func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface) nr.Resolver { diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index b03a68f58b..2eacae67d4 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -675,7 +675,7 @@ func TestRegistry(t *testing.T) { func(t *testing.T) { t.Helper() - registry := ®istry{entries: sync.Map{}} + registry := ®istry{entries: &sync.Map{}} result := []*consul.ServiceEntry{ { @@ -710,7 +710,7 @@ func TestRegistry(t *testing.T) { func(t *testing.T) { t.Helper() - entryMap := sync.Map{} + entryMap := &sync.Map{} entryMap.Store( appID, ®istryEntry{ @@ -741,7 +741,7 @@ func TestRegistry(t *testing.T) { func(t *testing.T) { t.Helper() - entryMap := sync.Map{} + entryMap := &sync.Map{} entryMap.Store( appID, ®istryEntry{ From fc0a1c8e876ef9d40435910b0867785495e88638 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Fri, 18 Mar 2022 11:25:44 +0000 Subject: [PATCH 07/11] nr_consul_cache gofumpt --- nameresolution/consul/consul_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index 2eacae67d4..70ecf15bb3 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -721,7 +721,8 @@ func TestRegistry(t *testing.T) { Port: 8600, }, }, - }}) + }, + }) registry := ®istry{ entries: entryMap, @@ -752,7 +753,8 @@ func TestRegistry(t *testing.T) { Port: 8600, }, }, - }}) + }, + }) registry := ®istry{ entries: entryMap, @@ -1207,7 +1209,7 @@ func TestGetConfig(t *testing.T) { Configuration: map[interface{}]interface{}{ "AdvancedRegistration": map[interface{}]interface{}{ "Name": "random-app-id", - "Port": 000, + "Port": 0o00, "Address": "123.345.678", "Tags": []string{"random-tag"}, "Meta": map[string]string{ @@ -1250,7 +1252,7 @@ func TestGetConfig(t *testing.T) { assert.NotNil(t, actual.Registration) assert.Equal(t, "random-app-id", actual.Registration.Name) assert.Equal(t, "123.345.678", actual.Registration.Address) - assert.Equal(t, 000, actual.Registration.Port) + assert.Equal(t, 0o00, actual.Registration.Port) assert.Equal(t, "random health check name", actual.Registration.Checks[0].Name) assert.Equal(t, "000", actual.Registration.Meta["APP_PORT"]) assert.Equal(t, "random-tag", actual.Registration.Tags[0]) From 69a0e25d4fab54bcaf634f803c8594b25bf0b7cb Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Tue, 22 Mar 2022 10:01:23 +0000 Subject: [PATCH 08/11] nr_consul_cache remove shuffle in favour of random --- nameresolution/consul/consul.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 58d8bc45ce..c6075f9ad9 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -103,7 +103,8 @@ func (e *registryEntry) next() *consul.ServiceEntry { return nil } - return shuffle(e.services)[0] + rndbig, _ := rand.Int(rand.Reader, big.NewInt(int64(len(e.services)))) + return e.services[rndbig.Int64()] } func (r *resolver) getService(service string) (*consul.ServiceEntry, error) { @@ -134,7 +135,8 @@ func (r *resolver) getService(service string) (*consul.ServiceEntry, error) { return nil, fmt.Errorf("no healthy services found with AppID:%s", service) } - return shuffle(services)[0], nil + rndbig, _ := rand.Int(rand.Reader, big.NewInt(int64(len(services)))) + return services[rndbig.Int64()], nil } func (r *registry) addOrUpdate(service string, services []*consul.ServiceEntry) { @@ -246,17 +248,6 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) { return addr, nil } -func shuffle(services []*consul.ServiceEntry) []*consul.ServiceEntry { - for i := len(services) - 1; i > 0; i-- { - rndbig, _ := rand.Int(rand.Reader, big.NewInt(int64(i+1))) - j := rndbig.Int64() - - services[i], services[j] = services[j], services[i] - } - - return services -} - // getConfig configuration from metadata, defaults are best suited for self-hosted mode. func getConfig(metadata nr.Metadata) (resolverConfig, error) { var daprPort string From b00f4014488a0053ec7574a1092db04ee5c4db46 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 16 Nov 2022 11:13:29 +0000 Subject: [PATCH 09/11] nr_consul_cache resolve missing changes from merge --- nameresolution/consul/consul_test.go | 98 ++++++++++++++-------------- 1 file changed, 50 insertions(+), 48 deletions(-) diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index be37f73ab0..b06e4d7f9c 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -947,8 +947,8 @@ func TestGetConfig(t *testing.T) { "empty configuration with SelfRegister should default correctly", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey("")}, - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -983,9 +983,9 @@ func TestGetConfig(t *testing.T) { "DaprPortMetaKey should set registration meta and config used for resolve", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey("")}, - Configuration: configSpec{ - SelfRegister: true, - DaprPortMetaKey: "random_key", + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, + "DaprPortMetaKey": "random_key", }, }, func(t *testing.T, metadata nr.Metadata) { @@ -1002,8 +1002,8 @@ func TestGetConfig(t *testing.T) { "missing AppID property should error when SelfRegister true", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey(nr.AppID)}, - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -1032,8 +1032,8 @@ func TestGetConfig(t *testing.T) { "missing AppPort property should error when SelfRegister true", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey(nr.AppPort)}, - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -1062,8 +1062,8 @@ func TestGetConfig(t *testing.T) { "missing HostAddress property should error when SelfRegister true", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey(nr.HostAddress)}, - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -1092,8 +1092,8 @@ func TestGetConfig(t *testing.T) { "missing DaprHTTPPort property should error only when SelfRegister true", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey(nr.DaprHTTPPort)}, - Configuration: configSpec{ - SelfRegister: true, + Configuration: map[interface{}]interface{}{ + "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -1155,27 +1155,29 @@ func TestGetConfig(t *testing.T) { "registration should configure correctly", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey("")}, - Configuration: configSpec{ - Checks: []*consul.AgentServiceCheck{ - { - Name: "test-app health check name", - CheckID: "test-app health check id", - Interval: "15s", - HTTP: "http://127.0.0.1:3500/health", + Configuration: map[interface{}]interface{}{ + "Checks": []interface{}{ + map[interface{}]interface{}{ + "Name": "test-app health check name", + "CheckID": "test-app health check id", + "Interval": "15s", + "HTTP": "http://127.0.0.1:3500/health", }, }, - Tags: []string{ + "Tags": []interface{}{ "test", }, - Meta: map[string]string{ + "Meta": map[interface{}]interface{}{ "APP_PORT": "8650", "DAPR_GRPC_PORT": "50005", }, - QueryOptions: &consul.QueryOptions{ - UseCache: false, - Filter: "Checks.ServiceTags contains something", + "QueryOptions": map[interface{}]interface{}{ + "UseCache": false, + "Filter": "Checks.ServiceTags contains something", }, - SelfRegister: true, + "SelfRegister": true, + "DaprPortMetaKey": "PORT", + "UseCache": false, }, }, func(t *testing.T, metadata nr.Metadata) { @@ -1207,42 +1209,42 @@ func TestGetConfig(t *testing.T) { "advanced registration should override/ignore other configs", nr.Metadata{ Base: metadata.Base{Properties: getTestPropsWithoutKey("")}, - Configuration: configSpec{ - AdvancedRegistration: &consul.AgentServiceRegistration{ - Name: "random-app-id", - Port: 0o00, - Address: "123.345.678", - Tags: []string{"random-tag"}, - Meta: map[string]string{ + Configuration: map[interface{}]interface{}{ + "AdvancedRegistration": map[interface{}]interface{}{ + "Name": "random-app-id", + "Port": 0o00, + "Address": "123.345.678", + "Tags": []string{"random-tag"}, + "Meta": map[string]string{ "APP_PORT": "000", }, - Checks: []*consul.AgentServiceCheck{ - { - Name: "random health check name", - CheckID: "random health check id", - Interval: "15s", - HTTP: "http://127.0.0.1:3500/health", + "Checks": []interface{}{ + map[interface{}]interface{}{ + "Name": "random health check name", + "CheckID": "random health check id", + "Interval": "15s", + "HTTP": "http://127.0.0.1:3500/health", }, }, }, - Checks: []*consul.AgentServiceCheck{ - { - Name: "test-app health check name", - CheckID: "test-app health check id", - Interval: "15s", - HTTP: "http://127.0.0.1:3500/health", + "Checks": []interface{}{ + map[interface{}]interface{}{ + "Name": "test-app health check name", + "CheckID": "test-app health check id", + "Interval": "15s", + "HTTP": "http://127.0.0.1:3500/health", }, }, - Tags: []string{ + "Tags": []string{ "dapr", "test", }, - Meta: map[string]string{ + "Meta": map[string]string{ "APP_PORT": "123", "DAPR_HTTP_PORT": "3500", "DAPR_GRPC_PORT": "50005", }, - SelfRegister: false, + "SelfRegister": false, }, }, func(t *testing.T, metadata nr.Metadata) { From 850f4fa5ed6155cd44c87fb203fc662170cbaaa5 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Wed, 16 Nov 2022 11:18:31 +0000 Subject: [PATCH 10/11] nr_consul_cache change useCache config default to false --- nameresolution/consul/README.md | 2 +- nameresolution/consul/configuration.go | 1 - nameresolution/consul/consul_test.go | 5 ++--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/nameresolution/consul/README.md b/nameresolution/consul/README.md index 154461cf74..8e472aacf7 100644 --- a/nameresolution/consul/README.md +++ b/nameresolution/consul/README.md @@ -56,7 +56,7 @@ As of writing the configuration spec is fixed to v1.3.0 of the consul api | DaprPortMetaKey | `string` | The key used for getting the Dapr sidecar port from consul service metadata during service resolution, it will also be used to set the Dapr sidecar port in metadata during registration. If blank it will default to `DAPR_PORT` | | SelfRegister | `bool` | Controls if Dapr will register the service to consul. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services. | | AdvancedRegistration | [*api.AgentServiceRegistration](https://pkg.go.dev/github.com/hashicorp/consul/api@v1.3.0#AgentServiceRegistration) | Gives full control of service registration through configuration. If configured the component will ignore any configuration of Checks, Tags, Meta and SelfRegister. | -| UseCache | `bool` | Configures if Dapr will cache the resolved services in-memory. This is done using consul [blocking queries](https://www.consul.io/api-docs/features/blocking) which can be configured via the QueryOptions configuration. If blank it will default to `true` | +| UseCache | `bool` | Configures if Dapr will cache the resolved services in-memory. This is done using consul [blocking queries](https://www.consul.io/api-docs/features/blocking) which can be configured via the QueryOptions configuration. If blank it will default to `false` | ## Samples Configurations ### Basic diff --git a/nameresolution/consul/configuration.go b/nameresolution/consul/configuration.go index d3f0399811..9bc33487d8 100644 --- a/nameresolution/consul/configuration.go +++ b/nameresolution/consul/configuration.go @@ -55,7 +55,6 @@ type configSpec struct { func newIntermediateConfig() intermediateConfig { return intermediateConfig{ - UseCache: true, DaprPortMetaKey: defaultDaprPortMetaKey, } } diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index b06e4d7f9c..fbb0483f4a 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -875,7 +875,6 @@ func TestParseConfig(t *testing.T) { true, nil, configSpec{ - UseCache: true, DaprPortMetaKey: defaultDaprPortMetaKey, }, }, @@ -940,7 +939,7 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, defaultDaprPortMetaKey, actual.DaprPortMetaKey) // Cache - assert.Equal(t, true, actual.UseCache) + assert.Equal(t, false, actual.UseCache) }, }, { @@ -976,7 +975,7 @@ func TestGetConfig(t *testing.T) { assert.Equal(t, defaultDaprPortMetaKey, actual.DaprPortMetaKey) // Cache - assert.Equal(t, true, actual.UseCache) + assert.Equal(t, false, actual.UseCache) }, }, { From 803d29dfdd771e2eda606212e9fa43ca1bd4e2c2 Mon Sep 17 00:00:00 2001 From: Abdulaziz Elsheikh Date: Mon, 11 Sep 2023 12:37:33 +0100 Subject: [PATCH 11/11] nr_consul_cache fix tests --- nameresolution/consul/consul_test.go | 41 ++++++++++++++-------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index ccccac7911..e4525e902c 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -230,7 +230,7 @@ func TestResolveID(t *testing.T) { serviceEntries := []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.345.456", + Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", @@ -268,7 +268,7 @@ func TestResolveID(t *testing.T) { assert.Equal(t, 1, mockReg.getCalled) waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) assert.Equal(t, 1, mockReg.addOrUpdateCalled) - assert.Equal(t, "123.234.345.456:50005", addr) + assert.Equal(t, "10.3.245.137:50005", addr) mockReg.getResult = ®istryEntry{ services: serviceEntries, @@ -290,7 +290,7 @@ func TestResolveID(t *testing.T) { mock.mockHealth.serviceResult = []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.345.456", + Address: "10.3.245.137", Port: 8601, Meta: map[string]string{ "DAPR_PORT": "50005", @@ -323,7 +323,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.345.456", + Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", @@ -357,7 +357,7 @@ func TestResolveID(t *testing.T) { assert.Equal(t, 1, mockReg.getCalled) waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) assert.Equal(t, 1, mockReg.addOrUpdateCalled) - assert.Equal(t, "123.234.345.456:50005", addr) + assert.Equal(t, "10.3.245.137:50005", addr) // Remove waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.removeCalled == 1 }) @@ -384,7 +384,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.345.456", + Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", @@ -426,7 +426,7 @@ func TestResolveID(t *testing.T) { assert.Equal(t, 1, mockReg.getCalled) waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) assert.Equal(t, 1, mockReg.addOrUpdateCalled) - assert.Equal(t, "123.234.345.456:50005", addr) + assert.Equal(t, "10.3.245.137:50005", addr) // Blocking call will error as WaitIndex = 1 blockingCall <- 2 @@ -464,7 +464,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.345.456", + Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", @@ -498,7 +498,7 @@ func TestResolveID(t *testing.T) { assert.Equal(t, 1, mockReg.getCalled) waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled == 1 }) assert.Equal(t, 1, mockReg.addOrUpdateCalled) - assert.Equal(t, "123.234.345.456:50005", addr) + assert.Equal(t, "10.3.245.137:50005", addr) // Error and release blocking call mock.mockHealth.serviceErr = &err @@ -540,7 +540,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.245.255", + Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", @@ -554,7 +554,7 @@ func TestResolveID(t *testing.T) { addr, _ := resolver.ResolveID(req) - assert.Equal(t, "123.234.245.255:50005", addr) + assert.Equal(t, "10.3.245.137:50005", addr) }, }, { @@ -579,8 +579,7 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), &mock) - resolver.config = testConfig + resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}) addr, _ := resolver.ResolveID(req) @@ -599,7 +598,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.245.255", + Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", @@ -625,7 +624,7 @@ func TestResolveID(t *testing.T) { for i := 0; i < 100; i++ { addr, _ := resolver.ResolveID(req) - if addr == "123.234.245.255:50005" { + if addr == "10.3.245.137:50005" { total1++ } else if addr == "234.245.255.228:50005" { total2++ @@ -652,7 +651,7 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{ { Node: &consul.Node{ - Address: "123.234.245.255", + Address: "10.3.245.137", }, Service: &consul.AgentService{ Address: "", @@ -664,7 +663,7 @@ func TestResolveID(t *testing.T) { }, { Node: &consul.Node{ - Address: "123.234.245.255", + Address: "10.3.245.137", }, Service: &consul.AgentService{ Address: "", @@ -681,7 +680,7 @@ func TestResolveID(t *testing.T) { addr, _ := resolver.ResolveID(req) - assert.Equal(t, "123.234.245.255:50005", addr) + assert.Equal(t, "10.3.245.137:50005", addr) }, }, { @@ -767,7 +766,7 @@ func TestRegistry(t *testing.T) { result := []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.345.456", + Address: "10.3.245.137", Port: 8600, }, }, @@ -804,7 +803,7 @@ func TestRegistry(t *testing.T) { services: []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.345.456", + Address: "10.3.245.137", Port: 8600, }, }, @@ -836,7 +835,7 @@ func TestRegistry(t *testing.T) { services: []*consul.ServiceEntry{ { Service: &consul.AgentService{ - Address: "123.234.345.456", + Address: "10.3.245.137", Port: 8600, }, },