diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 141454d1fd8..ae095c86317 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "net/url" + "reflect" "strconv" "strings" "sync" @@ -29,6 +30,10 @@ const ( // for tasks. nomadTaskPrefix = nomadServicePrefix + "-task-" + // nomadCheckPrefix is the prefix that scopes Nomad registered checks for + // services. + nomadCheckPrefix = nomadServicePrefix + "-check-" + // defaultRetryInterval is how quickly to retry syncing services and // checks to Consul when an error occurs. Will backoff up to a max. defaultRetryInterval = time.Second @@ -83,6 +88,15 @@ type AgentAPI interface { UpdateTTL(id, output, status string) error } +func agentServiceUpdateRequired(reg *api.AgentServiceRegistration, svc *api.AgentService) bool { + return !(reg.Kind == svc.Kind && + reg.ID == svc.ID && + reg.Port == svc.Port && + reg.Address == svc.Address && + reg.Name == svc.Service && + reflect.DeepEqual(reg.Tags, svc.Tags)) +} + // operations are submitted to the main loop via commit() for synchronizing // with Consul. type operations struct { @@ -466,16 +480,26 @@ func (c *ServiceClient) sync() error { metrics.IncrCounter([]string{"client", "consul", "service_deregistrations"}, 1) } - // Add Nomad services missing from Consul + // Add Nomad services missing from Consul, or where the service has been updated. for id, locals := range c.services { - if _, ok := consulServices[id]; !ok { - if err = c.client.ServiceRegister(locals); err != nil { - metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) - return err + existingSvc, ok := consulServices[id] + + if ok { + // There is an existing registration of this service in Consul, so here + // we validate to see if the service has been invalidated to see if it + // should be updated. + if !agentServiceUpdateRequired(locals, existingSvc) { + // No Need to update services that have not changed + continue } - sreg++ - metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } + + if err = c.client.ServiceRegister(locals); err != nil { + metrics.IncrCounter([]string{"client", "consul", "sync_failure"}, 1) + return err + } + sreg++ + metrics.IncrCounter([]string{"client", "consul", "service_registrations"}, 1) } // Remove Nomad checks in Consul but unknown locally @@ -489,7 +513,7 @@ func (c *ServiceClient) sync() error { // Nomad managed checks if this is not a client agent. // This is to prevent server agents from removing checks // registered by client agents - if !isNomadService(check.ServiceID) || !c.isClientAgent { + if !isNomadService(check.ServiceID) || !c.isClientAgent || !isNomadCheck(check.CheckID) { // Service not managed by Nomad, skip continue } @@ -809,10 +833,10 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { newIDs[makeTaskServiceID(newTask.AllocID, newTask.Name, s, newTask.Canary)] = s } - // Loop over existing Service IDs to see if they have been removed or - // updated. + // Loop over existing Service IDs to see if they have been removed for existingID, existingSvc := range existingIDs { newSvc, ok := newIDs[existingID] + if !ok { // Existing service entry removed ops.deregServices = append(ops.deregServices, existingID) @@ -828,8 +852,12 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { continue } - // Service exists and hasn't changed, don't re-add it later - delete(newIDs, existingID) + oldHash := existingSvc.Hash(old.AllocID, old.Name, old.Canary) + newHash := newSvc.Hash(newTask.AllocID, newTask.Name, newTask.Canary) + if oldHash == newHash { + // Service exists and hasn't changed, don't re-add it later + delete(newIDs, existingID) + } // Service still exists so add it to the task's registration sreg := &ServiceRegistration{ @@ -848,7 +876,8 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { for _, check := range newSvc.Checks { checkID := makeCheckID(existingID, check) if _, exists := existingChecks[checkID]; exists { - // Check exists, so don't remove it + // Check is still required. Remove it from the map so it doesn't get + // deleted later. delete(existingChecks, checkID) sreg.checkIDs[checkID] = struct{}{} } @@ -861,7 +890,6 @@ func (c *ServiceClient) UpdateTask(old, newTask *TaskServices) error { for _, checkID := range newCheckIDs { sreg.checkIDs[checkID] = struct{}{} - } // Update all watched checks as CheckRestart fields aren't part of ID @@ -1082,14 +1110,16 @@ func makeAgentServiceID(role string, service *structs.Service) string { // Consul. All structs.Service fields are included in the ID's hash except // Checks. This allows updates to merely compare IDs. // -// Example Service ID: _nomad-task-TNM333JKJPM5AK4FAS3VXQLXFDWOF4VH +// Example Service ID: _nomad-task-b4e61df9-b095-d64e-f241-23860da1375f-redis-http func makeTaskServiceID(allocID, taskName string, service *structs.Service, canary bool) string { - return nomadTaskPrefix + service.Hash(allocID, taskName, canary) + return fmt.Sprintf("%s%s-%s-%s", nomadTaskPrefix, allocID, taskName, service.Name) } // makeCheckID creates a unique ID for a check. +// +// Example Check ID: _nomad-check-434ae42f9a57c5705344974ac38de2aee0ee089d func makeCheckID(serviceID string, check *structs.ServiceCheck) string { - return check.Hash(serviceID) + return fmt.Sprintf("%s%s", nomadCheckPrefix, check.Hash(serviceID)) } // createCheckReg creates a Check that can be registered with Consul. @@ -1154,6 +1184,12 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host return &chkReg, nil } +// isNomadCheck returns true if the ID matches the pattern of a Nomad managed +// check. +func isNomadCheck(id string) bool { + return strings.HasPrefix(id, nomadCheckPrefix) +} + // isNomadService returns true if the ID matches the pattern of a Nomad managed // service (new or old formats). Agent services return false as independent // client and server agents may be running on the same machine. #2827 diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 4d2009e5aa8..e555a59eb7e 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -128,97 +128,38 @@ func setupFake(t *testing.T) *testFakeCtx { func TestConsul_ChangeTags(t *testing.T) { ctx := setupFake(t) + require := require.New(t) - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } + require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") - // Query the allocs registrations and then again when we update. The IDs - // should change + // Validate the alloc registration reg1, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID) - if err != nil { - t.Fatalf("Looking up alloc registration failed: %v", err) - } - if reg1 == nil { - t.Fatalf("Nil alloc registrations: %v", err) - } - if num := reg1.NumServices(); num != 1 { - t.Fatalf("Wrong number of services: got %d; want 1", num) - } - if num := reg1.NumChecks(); num != 0 { - t.Fatalf("Wrong number of checks: got %d; want 0", num) - } + require.NoError(err) + require.NotNil(reg1, "Unexpected nil alloc registration") + require.Equal(1, reg1.NumServices()) + require.Equal(0, reg1.NumChecks()) - origKey := "" - for k, v := range ctx.FakeConsul.services { - origKey = k - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } + for _, v := range ctx.FakeConsul.services { + require.Equal(v.Name, ctx.Task.Services[0].Name) + require.Equal(v.Tags, ctx.Task.Services[0].Tags) } + // Update the task definition origTask := ctx.Task.Copy() ctx.Task.Services[0].Tags[0] = "newtag" - if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } - - for k, v := range ctx.FakeConsul.services { - if k == origKey { - t.Errorf("expected key to change but found %q", k) - } - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } - } - - // Check again and ensure the IDs changed - reg2, err := ctx.ServiceClient.AllocRegistrations(ctx.Task.AllocID) - if err != nil { - t.Fatalf("Looking up alloc registration failed: %v", err) - } - if reg2 == nil { - t.Fatalf("Nil alloc registrations: %v", err) - } - if num := reg2.NumServices(); num != 1 { - t.Fatalf("Wrong number of services: got %d; want 1", num) - } - if num := reg2.NumChecks(); num != 0 { - t.Fatalf("Wrong number of checks: got %d; want 0", num) - } - for task, treg := range reg1.Tasks { - otherTaskReg, ok := reg2.Tasks[task] - if !ok { - t.Fatalf("Task %q not in second reg", task) - } + // Register and sync the update + require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") - for sID := range treg.Services { - if _, ok := otherTaskReg.Services[sID]; ok { - t.Fatalf("service ID didn't change") - } - } + // Validate the metadata changed + for _, v := range ctx.FakeConsul.services { + require.Equal(v.Name, ctx.Task.Services[0].Name) + require.Equal(v.Tags, ctx.Task.Services[0].Tags) + require.Equal("newtag", v.Tags[0]) } } @@ -227,6 +168,8 @@ func TestConsul_ChangeTags(t *testing.T) { // slightly different code path than changing tags. func TestConsul_ChangePorts(t *testing.T) { ctx := setupFake(t) + require := require.New(t) + ctx.Task.Services[0].Checks = []*structs.ServiceCheck{ { Name: "c1", @@ -252,35 +195,17 @@ func TestConsul_ChangePorts(t *testing.T) { }, } - if err := ctx.ServiceClient.RegisterTask(ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } + require.NoError(ctx.ServiceClient.RegisterTask(ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") - origServiceKey := "" - for k, v := range ctx.FakeConsul.services { - origServiceKey = k - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } - if v.Port != xPort { - t.Errorf("expected Port x=%v but found: %v", xPort, v.Port) - } + for _, v := range ctx.FakeConsul.services { + require.Equal(ctx.Task.Services[0].Name, v.Name) + require.Equal(ctx.Task.Services[0].Tags, v.Tags) + require.Equal(xPort, v.Port) } - if n := len(ctx.FakeConsul.checks); n != 3 { - t.Fatalf("expected 3 checks but found %d:\n%#v", n, ctx.FakeConsul.checks) - } + require.Equal(3, len(ctx.FakeConsul.checks)) origTCPKey := "" origScriptKey := "" @@ -289,29 +214,28 @@ func TestConsul_ChangePorts(t *testing.T) { switch v.Name { case "c1": origTCPKey = k - if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected { - t.Errorf("expected Port x=%v but found: %v", expected, v.TCP) - } + require.Equal(fmt.Sprintf(":%d", xPort), v.TCP) case "c2": origScriptKey = k select { case <-ctx.MockExec.execs: - if n := len(ctx.MockExec.execs); n > 0 { - t.Errorf("expected 1 exec but found: %d", n+1) - } + // Here we validate there is nothing left on the channel + require.Equal(0, len(ctx.MockExec.execs)) case <-time.After(3 * time.Second): - t.Errorf("script not called in time") + t.Fatalf("script not called in time") } case "c3": origHTTPKey = k - if expected := fmt.Sprintf("http://:%d/", yPort); v.HTTP != expected { - t.Errorf("expected Port y=%v but found: %v", expected, v.HTTP) - } + require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP) default: t.Fatalf("unexpected check: %q", v.Name) } } + require.NotEmpty(origTCPKey) + require.NotEmpty(origScriptKey) + require.NotEmpty(origHTTPKey) + // Now update the PortLabel on the Service and Check c3 origTask := ctx.Task.Copy() ctx.Task.Services[0].PortLabel = "y" @@ -339,64 +263,31 @@ func TestConsul_ChangePorts(t *testing.T) { // Removed PortLabel; should default to service's (y) }, } - if err := ctx.ServiceClient.UpdateTask(origTask, ctx.Task); err != nil { - t.Fatalf("unexpected error registering task: %v", err) - } - if err := ctx.syncOnce(); err != nil { - t.Fatalf("unexpected error syncing task: %v", err) - } - if n := len(ctx.FakeConsul.services); n != 1 { - t.Fatalf("expected 1 service but found %d:\n%#v", n, ctx.FakeConsul.services) - } + require.NoError(ctx.ServiceClient.UpdateTask(origTask, ctx.Task)) + require.NoError(ctx.syncOnce()) + require.Equal(1, len(ctx.FakeConsul.services), "Expected 1 service to be registered with Consul") - for k, v := range ctx.FakeConsul.services { - if k == origServiceKey { - t.Errorf("expected key change; still: %q", k) - } - if v.Name != ctx.Task.Services[0].Name { - t.Errorf("expected Name=%q != %q", ctx.Task.Services[0].Name, v.Name) - } - if !reflect.DeepEqual(v.Tags, ctx.Task.Services[0].Tags) { - t.Errorf("expected Tags=%v != %v", ctx.Task.Services[0].Tags, v.Tags) - } - if v.Port != yPort { - t.Errorf("expected Port y=%v but found: %v", yPort, v.Port) - } + for _, v := range ctx.FakeConsul.services { + require.Equal(ctx.Task.Services[0].Name, v.Name) + require.Equal(ctx.Task.Services[0].Tags, v.Tags) + require.Equal(yPort, v.Port) } - if n := len(ctx.FakeConsul.checks); n != 3 { - t.Fatalf("expected 3 check but found %d:\n%#v", n, ctx.FakeConsul.checks) - } + require.Equal(3, len(ctx.FakeConsul.checks)) for k, v := range ctx.FakeConsul.checks { switch v.Name { case "c1": - if k == origTCPKey { - t.Errorf("expected key change for %s from %q", v.Name, origTCPKey) - } - if expected := fmt.Sprintf(":%d", xPort); v.TCP != expected { - t.Errorf("expected Port x=%v but found: %v", expected, v.TCP) - } + // C1 is not changed + require.Equal(origTCPKey, k) + require.Equal(fmt.Sprintf(":%d", xPort), v.TCP) case "c2": - if k == origScriptKey { - t.Errorf("expected key change for %s from %q", v.Name, origScriptKey) - } - select { - case <-ctx.MockExec.execs: - if n := len(ctx.MockExec.execs); n > 0 { - t.Errorf("expected 1 exec but found: %d", n+1) - } - case <-time.After(3 * time.Second): - t.Errorf("script not called in time") - } + // C2 is not changed and should not have been re-registered + require.Equal(origScriptKey, k) case "c3": - if k == origHTTPKey { - t.Errorf("expected %s key to change from %q", v.Name, k) - } - if expected := fmt.Sprintf("http://:%d/", yPort); v.HTTP != expected { - t.Errorf("expected Port y=%v but found: %v", expected, v.HTTP) - } + require.NotEqual(origHTTPKey, k) + require.Equal(fmt.Sprintf("http://:%d/", yPort), v.HTTP) default: t.Errorf("Unknown check: %q", k) }