diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 17971c72bb9..fa01c618b2e 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -6,8 +6,6 @@ import ( "sync" "time" - "github.com/hashicorp/nomad/client/lib/cgutil" - log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" @@ -20,9 +18,11 @@ import ( "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/dynamicplugins" cinterfaces "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/serviceregistration" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" @@ -178,6 +178,10 @@ type allocRunner struct { // rpcClient is the RPC Client that should be used by the allocrunner and its // hooks to communicate with Nomad Servers. rpcClient RPCer + + // serviceRegWrapper is the handler wrapper that is used by service hooks + // to perform service and check registration and deregistration. + serviceRegWrapper *wrapper.HandlerWrapper } // RPCer is the interface needed by hooks to make RPC calls. @@ -221,6 +225,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { driverManager: config.DriverManager, serversContactedCh: config.ServersContactedCh, rpcClient: config.RPCClient, + serviceRegWrapper: config.ServiceRegWrapper, } // Create the logger based on the allocation ID @@ -274,6 +279,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { ServersContactedCh: ar.serversContactedCh, StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task), ShutdownDelayCtx: ar.shutdownDelayCtx, + ServiceRegWrapper: ar.serviceRegWrapper, } if ar.cpusetManager != nil { diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 52252f08313..30611b394aa 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -153,8 +153,8 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv), newGroupServiceHook(groupServiceHookConfig{ alloc: alloc, - consul: ar.consulClient, - consulNamespace: alloc.ConsulNamespace(), + namespace: alloc.ServiceProviderNamespace(), + serviceRegWrapper: ar.serviceRegWrapper, restarter: ar, taskEnvBuilder: envBuilder, networkStatusGetter: ar, diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 23d59dc6915..93f0e399876 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -489,7 +489,8 @@ func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) { tg := alloc.Job.TaskGroups[0] tg.Services = []*structs.Service{ { - Name: "shutdown_service", + Name: "shutdown_service", + Provider: structs.ServiceProviderConsul, }, } @@ -1314,6 +1315,7 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { { Name: "fakservice", PortLabel: "http", + Provider: structs.ServiceProviderConsul, Checks: []*structs.ServiceCheck{ { Name: "fakecheck", diff --git a/client/allocrunner/alloc_runner_unix_test.go b/client/allocrunner/alloc_runner_unix_test.go index 63d06fd3fea..a321f54cc3a 100644 --- a/client/allocrunner/alloc_runner_unix_test.go +++ b/client/allocrunner/alloc_runner_unix_test.go @@ -38,6 +38,7 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) { { Name: "foo", PortLabel: "8888", + Provider: structs.ServiceProviderConsul, }, } task := alloc.Job.TaskGroups[0].Tasks[0] diff --git a/client/allocrunner/config.go b/client/allocrunner/config.go index d1500c90674..0ec3ba51c3a 100644 --- a/client/allocrunner/config.go +++ b/client/allocrunner/config.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/serviceregistration" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" cstate "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" @@ -81,4 +82,8 @@ type Config struct { // RPCClient is the RPC Client that should be used by the allocrunner and its // hooks to communicate with Nomad Servers. RPCClient RPCer + + // ServiceRegWrapper is the handler wrapper that is used by service hooks + // to perform service and check registration and deregistration. + ServiceRegWrapper *wrapper.HandlerWrapper } diff --git a/client/allocrunner/groupservice_hook.go b/client/allocrunner/groupservice_hook.go index 1d5a6205373..d94de2db663 100644 --- a/client/allocrunner/groupservice_hook.go +++ b/client/allocrunner/groupservice_hook.go @@ -8,6 +8,7 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/serviceregistration" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/taskenv" agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" @@ -25,15 +26,22 @@ type networkStatusGetter interface { // deregistration. type groupServiceHook struct { allocID string + jobID string group string restarter agentconsul.WorkloadRestarter - consulClient serviceregistration.Handler - consulNamespace string prerun bool deregistered bool networkStatusGetter networkStatusGetter shutdownDelayCtx context.Context + // namespace is the Nomad or Consul namespace in which service + // registrations will be made. + namespace string + + // serviceRegWrapper is the handler wrapper that is used to perform service + // and check registration and deregistration. + serviceRegWrapper *wrapper.HandlerWrapper + logger log.Logger // The following fields may be updated @@ -51,13 +59,19 @@ type groupServiceHook struct { type groupServiceHookConfig struct { alloc *structs.Allocation - consul serviceregistration.Handler - consulNamespace string restarter agentconsul.WorkloadRestarter taskEnvBuilder *taskenv.Builder networkStatusGetter networkStatusGetter shutdownDelayCtx context.Context logger log.Logger + + // namespace is the Nomad or Consul namespace in which service + // registrations will be made. + namespace string + + // serviceRegWrapper is the handler wrapper that is used to perform service + // and check registration and deregistration. + serviceRegWrapper *wrapper.HandlerWrapper } func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook { @@ -70,15 +84,16 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook { h := &groupServiceHook{ allocID: cfg.alloc.ID, + jobID: cfg.alloc.JobID, group: cfg.alloc.TaskGroup, restarter: cfg.restarter, - consulClient: cfg.consul, - consulNamespace: cfg.consulNamespace, + namespace: cfg.namespace, taskEnvBuilder: cfg.taskEnvBuilder, delay: shutdownDelay, networkStatusGetter: cfg.networkStatusGetter, logger: cfg.logger.Named(groupServiceHookName), services: cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup).Services, + serviceRegWrapper: cfg.serviceRegWrapper, shutdownDelayCtx: cfg.shutdownDelayCtx, } @@ -114,7 +129,7 @@ func (h *groupServiceHook) prerunLocked() error { } services := h.getWorkloadServices() - return h.consulClient.RegisterWorkload(services) + return h.serviceRegWrapper.RegisterWorkload(services) } func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { @@ -157,7 +172,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error { return nil } - return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices) + return h.serviceRegWrapper.UpdateWorkload(oldWorkloadServices, newWorkloadServices) } func (h *groupServiceHook) PreTaskRestart() error { @@ -213,7 +228,7 @@ func (h *groupServiceHook) Postrun() error { func (h *groupServiceHook) deregister() { if len(h.services) > 0 { workloadServices := h.getWorkloadServices() - h.consulClient.RemoveWorkload(workloadServices) + h.serviceRegWrapper.RemoveWorkload(workloadServices) } } @@ -229,8 +244,9 @@ func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadSe // Create task services struct with request's driver metadata return &serviceregistration.WorkloadServices{ AllocID: h.allocID, + JobID: h.jobID, Group: h.group, - Namespace: h.consulNamespace, + Namespace: h.namespace, Restarter: h.restarter, Services: interpolatedServices, Networks: h.networks, diff --git a/client/allocrunner/groupservice_hook_test.go b/client/allocrunner/groupservice_hook_test.go index 4fae46a300a..79cd122c10e 100644 --- a/client/allocrunner/groupservice_hook_test.go +++ b/client/allocrunner/groupservice_hook_test.go @@ -9,6 +9,7 @@ import ( ctestutil "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/nomad/client/allocrunner/interfaces" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/taskenv" agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" @@ -32,17 +33,24 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) { alloc := mock.Alloc() alloc.Job.TaskGroups[0].Services = []*structs.Service{{ Name: "foo", + Provider: "consul", PortLabel: "9999", }} logger := testlog.HCLogger(t) - consulClient := regMock.NewServiceRegistrationHandler(logger) + + consulMockClient := regMock.NewServiceRegistrationHandler(logger) + + regWrapper := wrapper.NewHandlerWrapper( + logger, + consulMockClient, + regMock.NewServiceRegistrationHandler(logger)) h := newGroupServiceHook(groupServiceHookConfig{ - alloc: alloc, - consul: consulClient, - restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), - logger: logger, + alloc: alloc, + serviceRegWrapper: regWrapper, + restarter: agentconsul.NoopRestarter(), + taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), + logger: logger, }) require.NoError(t, h.Prerun()) @@ -53,7 +61,7 @@ func TestGroupServiceHook_NoGroupServices(t *testing.T) { require.NoError(t, h.PreTaskRestart()) - ops := consulClient.GetOps() + ops := consulMockClient.GetOps() require.Len(t, ops, 5) require.Equal(t, "add", ops[0].Op) // Prerun require.Equal(t, "update", ops[1].Op) // Update @@ -71,14 +79,20 @@ func TestGroupServiceHook_ShutdownDelayUpdate(t *testing.T) { alloc.Job.TaskGroups[0].ShutdownDelay = helper.TimeToPtr(10 * time.Second) logger := testlog.HCLogger(t) - consulClient := regMock.NewServiceRegistrationHandler(logger) + consulMockClient := regMock.NewServiceRegistrationHandler(logger) + + regWrapper := wrapper.NewHandlerWrapper( + logger, + consulMockClient, + regMock.NewServiceRegistrationHandler(logger), + ) h := newGroupServiceHook(groupServiceHookConfig{ - alloc: alloc, - consul: consulClient, - restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), - logger: logger, + alloc: alloc, + serviceRegWrapper: regWrapper, + restarter: agentconsul.NoopRestarter(), + taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), + logger: logger, }) require.NoError(t, h.Prerun()) @@ -105,15 +119,21 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { t.Parallel() alloc := mock.ConnectAlloc() + alloc.Job.Canonicalize() logger := testlog.HCLogger(t) - consulClient := regMock.NewServiceRegistrationHandler(logger) + consulMockClient := regMock.NewServiceRegistrationHandler(logger) + + regWrapper := wrapper.NewHandlerWrapper( + logger, + consulMockClient, + regMock.NewServiceRegistrationHandler(logger)) h := newGroupServiceHook(groupServiceHookConfig{ - alloc: alloc, - consul: consulClient, - restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), - logger: logger, + alloc: alloc, + serviceRegWrapper: regWrapper, + restarter: agentconsul.NoopRestarter(), + taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), + logger: logger, }) require.NoError(t, h.Prerun()) @@ -124,7 +144,7 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { require.NoError(t, h.PreTaskRestart()) - ops := consulClient.GetOps() + ops := consulMockClient.GetOps() require.Len(t, ops, 5) require.Equal(t, "add", ops[0].Op) // Prerun require.Equal(t, "update", ops[1].Op) // Update @@ -133,6 +153,55 @@ func TestGroupServiceHook_GroupServices(t *testing.T) { require.Equal(t, "add", ops[4].Op) // Restart -> preRun } +// TestGroupServiceHook_GroupServices_Nomad asserts group service hooks with +// group services does not error when using the Nomad provider. +func TestGroupServiceHook_GroupServices_Nomad(t *testing.T) { + t.Parallel() + + // Create a mock alloc, and add a group service using provider Nomad. + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Services = []*structs.Service{ + { + Name: "nomad-provider-service", + Provider: structs.ServiceProviderNomad, + }, + } + + // Create our base objects and our subsequent wrapper. + logger := testlog.HCLogger(t) + consulMockClient := regMock.NewServiceRegistrationHandler(logger) + nomadMockClient := regMock.NewServiceRegistrationHandler(logger) + + regWrapper := wrapper.NewHandlerWrapper(logger, consulMockClient, nomadMockClient) + + h := newGroupServiceHook(groupServiceHookConfig{ + alloc: alloc, + serviceRegWrapper: regWrapper, + restarter: agentconsul.NoopRestarter(), + taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), + logger: logger, + }) + require.NoError(t, h.Prerun()) + + // Trigger our hook requests. + req := &interfaces.RunnerUpdateRequest{Alloc: alloc} + require.NoError(t, h.Update(req)) + require.NoError(t, h.Postrun()) + require.NoError(t, h.PreTaskRestart()) + + // Ensure the Nomad mock provider has the expected operations. + ops := nomadMockClient.GetOps() + require.Len(t, ops, 5) + require.Equal(t, "add", ops[0].Op) // Prerun + require.Equal(t, "update", ops[1].Op) // Update + require.Equal(t, "remove", ops[2].Op) // Postrun + require.Equal(t, "remove", ops[3].Op) // Restart -> preKill + require.Equal(t, "add", ops[4].Op) // Restart -> preRun + + // Ensure the Consul mock provider has zero operations. + require.Len(t, consulMockClient.GetOps(), 0) +} + // TestGroupServiceHook_Error asserts group service hooks with group // services but no group network is handled gracefully. func TestGroupServiceHook_NoNetwork(t *testing.T) { @@ -144,6 +213,7 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { tg.Services = []*structs.Service{ { Name: "testconnect", + Provider: "consul", PortLabel: "9999", Connect: &structs.ConsulConnect{ SidecarService: &structs.ConsulSidecarService{}, @@ -152,14 +222,19 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { } logger := testlog.HCLogger(t) - consulClient := regMock.NewServiceRegistrationHandler(logger) + consulMockClient := regMock.NewServiceRegistrationHandler(logger) + + regWrapper := wrapper.NewHandlerWrapper( + logger, + consulMockClient, + regMock.NewServiceRegistrationHandler(logger)) h := newGroupServiceHook(groupServiceHookConfig{ - alloc: alloc, - consul: consulClient, - restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), - logger: logger, + alloc: alloc, + serviceRegWrapper: regWrapper, + restarter: agentconsul.NoopRestarter(), + taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), + logger: logger, }) require.NoError(t, h.Prerun()) @@ -170,7 +245,7 @@ func TestGroupServiceHook_NoNetwork(t *testing.T) { require.NoError(t, h.PreTaskRestart()) - ops := consulClient.GetOps() + ops := consulMockClient.GetOps() require.Len(t, ops, 5) require.Equal(t, "add", ops[0].Op) // Prerun require.Equal(t, "update", ops[1].Op) // Update @@ -196,14 +271,19 @@ func TestGroupServiceHook_getWorkloadServices(t *testing.T) { } logger := testlog.HCLogger(t) - consulClient := regMock.NewServiceRegistrationHandler(logger) + consulMockClient := regMock.NewServiceRegistrationHandler(logger) + + regWrapper := wrapper.NewHandlerWrapper( + logger, + consulMockClient, + regMock.NewServiceRegistrationHandler(logger)) h := newGroupServiceHook(groupServiceHookConfig{ - alloc: alloc, - consul: consulClient, - restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), - logger: logger, + alloc: alloc, + serviceRegWrapper: regWrapper, + restarter: agentconsul.NoopRestarter(), + taskEnvBuilder: taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region), + logger: logger, }) services := h.getWorkloadServices() @@ -234,8 +314,15 @@ func TestGroupServiceHook_Update08Alloc(t *testing.T) { require.NoError(t, err) namespacesClient := agentconsul.NewNamespacesClient(consulClient.Namespaces(), consulClient.Agent()) + logger := testlog.HCLogger(t) serviceClient := agentconsul.NewServiceClient(consulClient.Agent(), namespacesClient, testlog.HCLogger(t), true) + regWrapper := wrapper.NewHandlerWrapper( + logger, + serviceClient, + regMock.NewServiceRegistrationHandler(logger), + ) + // Lower periodicInterval to ensure periodic syncing doesn't improperly // remove Connect services. //const interval = 50 * time.Millisecond @@ -266,6 +353,7 @@ func TestGroupServiceHook_Update08Alloc(t *testing.T) { tg.Services = []*structs.Service{ { Name: "testconnect", + Provider: "consul", PortLabel: "9999", Connect: &structs.ConsulConnect{ SidecarService: &structs.ConsulSidecarService{ @@ -284,11 +372,11 @@ func TestGroupServiceHook_Update08Alloc(t *testing.T) { // Create the group service hook h := newGroupServiceHook(groupServiceHookConfig{ - alloc: oldAlloc, - consul: serviceClient, - restarter: agentconsul.NoopRestarter(), - taskEnvBuilder: taskenv.NewBuilder(mock.Node(), oldAlloc, nil, oldAlloc.Job.Region), - logger: testlog.HCLogger(t), + alloc: oldAlloc, + serviceRegWrapper: regWrapper, + restarter: agentconsul.NoopRestarter(), + taskEnvBuilder: taskenv.NewBuilder(mock.Node(), oldAlloc, nil, oldAlloc.Job.Region), + logger: testlog.HCLogger(t), }) require.NoError(t, h.Prerun()) @@ -300,5 +388,4 @@ func TestGroupServiceHook_Update08Alloc(t *testing.T) { require.NoError(t, err) return len(services) == 2 }, 3*time.Second, 100*time.Millisecond) - } diff --git a/client/allocrunner/taskrunner/script_check_hook_test.go b/client/allocrunner/taskrunner/script_check_hook_test.go index 5463a0b2e1a..2c149ae1545 100644 --- a/client/allocrunner/taskrunner/script_check_hook_test.go +++ b/client/allocrunner/taskrunner/script_check_hook_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" "github.com/hashicorp/nomad/client/serviceregistration" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/taskenv" agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" @@ -228,6 +229,7 @@ func TestScript_TaskEnvInterpolation(t *testing.T) { logger := testlog.HCLogger(t) consulClient := regMock.NewServiceRegistrationHandler(logger) + regWrap := wrapper.NewHandlerWrapper(logger, consulClient, nil) exec, cancel := newBlockingScriptExec() defer cancel() @@ -243,10 +245,10 @@ func TestScript_TaskEnvInterpolation(t *testing.T) { map[string]string{"SVC_NAME": "frontend"}).Build() svcHook := newServiceHook(serviceHookConfig{ - alloc: alloc, - task: task, - consulServices: consulClient, - logger: logger, + alloc: alloc, + task: task, + serviceRegWrapper: regWrap, + logger: logger, }) // emulate prestart having been fired svcHook.taskEnv = env diff --git a/client/allocrunner/taskrunner/service_hook.go b/client/allocrunner/taskrunner/service_hook.go index 9684317e541..87245352706 100644 --- a/client/allocrunner/taskrunner/service_hook.go +++ b/client/allocrunner/taskrunner/service_hook.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/interfaces" tinterfaces "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces" "github.com/hashicorp/nomad/client/serviceregistration" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/taskenv" agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/nomad/structs" @@ -21,11 +22,21 @@ var _ interfaces.TaskExitedHook = &serviceHook{} var _ interfaces.TaskStopHook = &serviceHook{} var _ interfaces.TaskUpdateHook = &serviceHook{} +const ( + taskServiceHookName = "task_services" +) + type serviceHookConfig struct { - alloc *structs.Allocation - task *structs.Task - consulServices serviceregistration.Handler - consulNamespace string + alloc *structs.Allocation + task *structs.Task + + // namespace is the Nomad or Consul namespace in which service + // registrations will be made. + namespace string + + // serviceRegWrapper is the handler wrapper that is used to perform service + // and check registration and deregistration. + serviceRegWrapper *wrapper.HandlerWrapper // Restarter is a subset of the TaskLifecycle interface restarter agentconsul.WorkloadRestarter @@ -34,12 +45,11 @@ type serviceHookConfig struct { } type serviceHook struct { - allocID string - taskName string - consulNamespace string - consulServices serviceregistration.Handler - restarter agentconsul.WorkloadRestarter - logger log.Logger + allocID string + jobID string + taskName string + restarter agentconsul.WorkloadRestarter + logger log.Logger // The following fields may be updated driverExec tinterfaces.ScriptExecutor @@ -50,6 +60,14 @@ type serviceHook struct { ports structs.AllocatedPorts taskEnv *taskenv.TaskEnv + // namespace is the Nomad or Consul namespace in which service + // registrations will be made. + namespace string + + // serviceRegWrapper is the handler wrapper that is used to perform service + // and check registration and deregistration. + serviceRegWrapper *wrapper.HandlerWrapper + // initialRegistrations tracks if Poststart has completed, initializing // fields required in other lifecycle funcs initialRegistration bool @@ -65,13 +83,14 @@ type serviceHook struct { func newServiceHook(c serviceHookConfig) *serviceHook { h := &serviceHook{ - allocID: c.alloc.ID, - taskName: c.task.Name, - consulServices: c.consulServices, - consulNamespace: c.consulNamespace, - services: c.task.Services, - restarter: c.restarter, - ports: c.alloc.AllocatedResources.Shared.Ports, + allocID: c.alloc.ID, + jobID: c.alloc.JobID, + taskName: c.task.Name, + namespace: c.namespace, + serviceRegWrapper: c.serviceRegWrapper, + services: c.task.Services, + restarter: c.restarter, + ports: c.alloc.AllocatedResources.Shared.Ports, } if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil { @@ -86,9 +105,7 @@ func newServiceHook(c serviceHookConfig) *serviceHook { return h } -func (h *serviceHook) Name() string { - return "consul_services" -} +func (h *serviceHook) Name() string { return taskServiceHookName } func (h *serviceHook) Poststart(ctx context.Context, req *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error { h.mu.Lock() @@ -106,15 +123,15 @@ func (h *serviceHook) Poststart(ctx context.Context, req *interfaces.TaskPoststa // Create task services struct with request's driver metadata workloadServices := h.getWorkloadServices() - return h.consulServices.RegisterWorkload(workloadServices) + return h.serviceRegWrapper.RegisterWorkload(workloadServices) } func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequest, _ *interfaces.TaskUpdateResponse) error { h.mu.Lock() defer h.mu.Unlock() if !h.initialRegistration { - // no op Consul since initial registration has not finished - // only update hook fields + // no op since initial registration has not finished only update hook + // fields. return h.updateHookFields(req) } @@ -129,7 +146,7 @@ func (h *serviceHook) Update(ctx context.Context, req *interfaces.TaskUpdateRequ // Create new task services struct with those new values newWorkloadServices := h.getWorkloadServices() - return h.consulServices.UpdateWorkload(oldWorkloadServices, newWorkloadServices) + return h.serviceRegWrapper.UpdateWorkload(oldWorkloadServices, newWorkloadServices) } func (h *serviceHook) updateHookFields(req *interfaces.TaskUpdateRequest) error { @@ -180,7 +197,7 @@ func (h *serviceHook) Exited(context.Context, *interfaces.TaskExitedRequest, *in func (h *serviceHook) deregister() { if len(h.services) > 0 && !h.deregistered { workloadServices := h.getWorkloadServices() - h.consulServices.RemoveWorkload(workloadServices) + h.serviceRegWrapper.RemoveWorkload(workloadServices) } h.initialRegistration = false h.deregistered = true @@ -200,8 +217,9 @@ func (h *serviceHook) getWorkloadServices() *serviceregistration.WorkloadService // Create task services struct with request's driver metadata return &serviceregistration.WorkloadServices{ AllocID: h.allocID, + JobID: h.jobID, Task: h.taskName, - Namespace: h.consulNamespace, + Namespace: h.namespace, Restarter: h.restarter, Services: interpolatedServices, DriverExec: h.driverExec, diff --git a/client/allocrunner/taskrunner/service_hook_test.go b/client/allocrunner/taskrunner/service_hook_test.go index 4489e722008..494fb61370d 100644 --- a/client/allocrunner/taskrunner/service_hook_test.go +++ b/client/allocrunner/taskrunner/service_hook_test.go @@ -6,8 +6,12 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/interfaces" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" + "github.com/hashicorp/nomad/client/taskenv" + agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" ) @@ -19,20 +23,38 @@ var _ interfaces.TaskUpdateHook = (*serviceHook)(nil) func TestUpdate_beforePoststart(t *testing.T) { alloc := mock.Alloc() + alloc.Job.Canonicalize() logger := testlog.HCLogger(t) + c := regMock.NewServiceRegistrationHandler(logger) + regWrap := wrapper.NewHandlerWrapper(logger, c, nil) + + // Interpolating workload services performs a check on the task env, if it + // is nil, nil is returned meaning no services. This does not work with the + // wrapper len protections, so we need a dummy taskenv. + spoofTaskEnv := taskenv.TaskEnv{NodeAttrs: map[string]string{}} hook := newServiceHook(serviceHookConfig{ - alloc: alloc, - task: alloc.LookupTask("web"), - consulServices: c, - logger: logger, + alloc: alloc, + task: alloc.LookupTask("web"), + serviceRegWrapper: regWrap, + logger: logger, }) - require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{Alloc: alloc}, &interfaces.TaskUpdateResponse{})) + require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{ + Alloc: alloc, + TaskEnv: &spoofTaskEnv, + }, &interfaces.TaskUpdateResponse{})) require.Len(t, c.GetOps(), 0) - require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{}, &interfaces.TaskPoststartResponse{})) + + require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{ + TaskEnv: &spoofTaskEnv, + }, &interfaces.TaskPoststartResponse{})) require.Len(t, c.GetOps(), 1) - require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{Alloc: alloc}, &interfaces.TaskUpdateResponse{})) + + require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{ + Alloc: alloc, + TaskEnv: &spoofTaskEnv, + }, &interfaces.TaskUpdateResponse{})) require.Len(t, c.GetOps(), 2) // When a task exits it could be restarted with new driver info @@ -40,15 +62,31 @@ func TestUpdate_beforePoststart(t *testing.T) { require.NoError(t, hook.Exited(context.Background(), &interfaces.TaskExitedRequest{}, &interfaces.TaskExitedResponse{})) require.Len(t, c.GetOps(), 3) - require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{Alloc: alloc}, &interfaces.TaskUpdateResponse{})) + + require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{ + Alloc: alloc, + TaskEnv: &spoofTaskEnv, + }, &interfaces.TaskUpdateResponse{})) require.Len(t, c.GetOps(), 3) - require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{}, &interfaces.TaskPoststartResponse{})) + + require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{ + TaskEnv: &spoofTaskEnv, + }, &interfaces.TaskPoststartResponse{})) require.Len(t, c.GetOps(), 4) - require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{Alloc: alloc}, &interfaces.TaskUpdateResponse{})) + + require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{ + Alloc: alloc, + TaskEnv: &spoofTaskEnv, + }, &interfaces.TaskUpdateResponse{})) require.Len(t, c.GetOps(), 5) + require.NoError(t, hook.PreKilling(context.Background(), &interfaces.TaskPreKillRequest{}, &interfaces.TaskPreKillResponse{})) require.Len(t, c.GetOps(), 6) - require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{Alloc: alloc}, &interfaces.TaskUpdateResponse{})) + + require.NoError(t, hook.Update(context.Background(), &interfaces.TaskUpdateRequest{ + Alloc: alloc, + TaskEnv: &spoofTaskEnv, + }, &interfaces.TaskUpdateResponse{})) require.Len(t, c.GetOps(), 6) } @@ -56,17 +94,26 @@ func Test_serviceHook_multipleDeRegisterCall(t *testing.T) { alloc := mock.Alloc() logger := testlog.HCLogger(t) + c := regMock.NewServiceRegistrationHandler(logger) + regWrap := wrapper.NewHandlerWrapper(logger, c, nil) hook := newServiceHook(serviceHookConfig{ - alloc: alloc, - task: alloc.LookupTask("web"), - consulServices: c, - logger: logger, + alloc: alloc, + task: alloc.LookupTask("web"), + serviceRegWrapper: regWrap, + logger: logger, }) + // Interpolating workload services performs a check on the task env, if it + // is nil, nil is returned meaning no services. This does not work with the + // wrapper len protections, so we need a dummy taskenv. + spoofTaskEnv := taskenv.TaskEnv{NodeAttrs: map[string]string{}} + // Add a registration, as we would in normal operation. - require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{}, &interfaces.TaskPoststartResponse{})) + require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{ + TaskEnv: &spoofTaskEnv, + }, &interfaces.TaskPoststartResponse{})) require.Len(t, c.GetOps(), 1) // Call all three deregister backed functions in a row. Ensure the number @@ -84,7 +131,9 @@ func Test_serviceHook_multipleDeRegisterCall(t *testing.T) { require.Equal(t, c.GetOps()[1].Op, "remove") // Now we act like a restart. - require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{}, &interfaces.TaskPoststartResponse{})) + require.NoError(t, hook.Poststart(context.Background(), &interfaces.TaskPoststartRequest{ + TaskEnv: &spoofTaskEnv, + }, &interfaces.TaskPoststartResponse{})) require.Len(t, c.GetOps(), 3) require.Equal(t, c.GetOps()[2].Op, "add") @@ -101,3 +150,57 @@ func Test_serviceHook_multipleDeRegisterCall(t *testing.T) { require.Len(t, c.GetOps(), 4) require.Equal(t, c.GetOps()[3].Op, "remove") } + +// Test_serviceHook_Nomad performs a normal operation test of the serviceHook +// when using task services which utilise the Nomad provider. +func Test_serviceHook_Nomad(t *testing.T) { + t.Parallel() + + // Create a mock alloc, and add a task service using provider Nomad. + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Tasks[0].Services = []*structs.Service{ + { + Name: "nomad-provider-service", + Provider: structs.ServiceProviderNomad, + }, + } + + // Create our base objects and our subsequent wrapper. + logger := testlog.HCLogger(t) + consulMockClient := regMock.NewServiceRegistrationHandler(logger) + nomadMockClient := regMock.NewServiceRegistrationHandler(logger) + + regWrapper := wrapper.NewHandlerWrapper(logger, consulMockClient, nomadMockClient) + + h := newServiceHook(serviceHookConfig{ + alloc: alloc, + task: alloc.LookupTask("web"), + namespace: "default", + serviceRegWrapper: regWrapper, + restarter: agentconsul.NoopRestarter(), + logger: logger, + }) + + // Create a taskEnv builder to use in requests, otherwise interpolation of + // services will always return nil. + taskEnvBuilder := taskenv.NewBuilder(mock.Node(), alloc, nil, alloc.Job.Region) + + // Trigger our initial hook function. + require.NoError(t, h.Poststart(context.Background(), &interfaces.TaskPoststartRequest{ + TaskEnv: taskEnvBuilder.Build()}, nil)) + + // Trigger all the possible stop functions to ensure we only deregister + // once. + require.NoError(t, h.PreKilling(context.Background(), nil, nil)) + require.NoError(t, h.Exited(context.Background(), nil, nil)) + require.NoError(t, h.Stop(context.Background(), nil, nil)) + + // Ensure the Nomad mock provider has the expected operations. + nomadOps := nomadMockClient.GetOps() + require.Len(t, nomadOps, 2) + require.Equal(t, "add", nomadOps[0].Op) // Poststart + require.Equal(t, "remove", nomadOps[1].Op) // PreKilling,Exited,Stop + + // Ensure the Consul mock provider has zero operations. + require.Len(t, consulMockClient.GetOps(), 0) +} diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 41938aea33f..b8c3b270c32 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -26,6 +26,7 @@ import ( "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/serviceregistration" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/taskenv" @@ -239,6 +240,10 @@ type TaskRunner struct { networkIsolationSpec *drivers.NetworkIsolationSpec allocHookResources *cstructs.AllocHookResources + + // serviceRegWrapper is the handler wrapper that is used by service hooks + // to perform service and check registration and deregistration. + serviceRegWrapper *wrapper.HandlerWrapper } type Config struct { @@ -300,6 +305,10 @@ type Config struct { // ShutdownDelayCancelFn should only be used in testing. ShutdownDelayCancelFn context.CancelFunc + + // ServiceRegWrapper is the handler wrapper that is used by service hooks + // to perform service and check registration and deregistration. + ServiceRegWrapper *wrapper.HandlerWrapper } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -357,6 +366,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { startConditionMetCtx: config.StartConditionMetCtx, shutdownDelayCtx: config.ShutdownDelayCtx, shutdownDelayCancelFn: config.ShutdownDelayCancelFn, + serviceRegWrapper: config.ServiceRegWrapper, } // Create the logger based on the allocation ID diff --git a/client/allocrunner/taskrunner/task_runner_hooks.go b/client/allocrunner/taskrunner/task_runner_hooks.go index 62ff26c4b62..86c8ff37ad6 100644 --- a/client/allocrunner/taskrunner/task_runner_hooks.go +++ b/client/allocrunner/taskrunner/task_runner_hooks.go @@ -96,9 +96,13 @@ func (tr *TaskRunner) initHooks() { })) } - // Get the consul namespace for the TG of the allocation + // Get the consul namespace for the TG of the allocation. consulNamespace := tr.alloc.ConsulNamespace() + // Identify the service registration provider, which can differ from the + // Consul namespace depending on which provider is used. + serviceProviderNamespace := tr.alloc.ServiceProviderNamespace() + // If there are templates is enabled, add the hook if len(task.Templates) != 0 { tr.runnerHooks = append(tr.runnerHooks, newTemplateHook(&templateHookConfig{ @@ -115,12 +119,12 @@ func (tr *TaskRunner) initHooks() { // Always add the service hook. A task with no services on initial registration // may be updated to include services, which must be handled with this hook. tr.runnerHooks = append(tr.runnerHooks, newServiceHook(serviceHookConfig{ - alloc: tr.Alloc(), - task: tr.Task(), - consulServices: tr.consulServiceClient, - consulNamespace: consulNamespace, - restarter: tr, - logger: hookLogger, + alloc: tr.Alloc(), + task: tr.Task(), + namespace: serviceProviderNamespace, + serviceRegWrapper: tr.serviceRegWrapper, + restarter: tr, + logger: hookLogger, })) // If this is a Connect sidecar proxy (or a Connect Native) service, diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 139808d913e..415e4436cdd 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" cstate "github.com/hashicorp/nomad/client/state" ctestutil "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/client/vaultclient" @@ -104,13 +105,18 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri closedCh := make(chan struct{}) close(closedCh) + // Set up the Nomad and Consul registration providers along with the wrapper. + consulRegMock := regMock.NewServiceRegistrationHandler(logger) + nomadRegMock := regMock.NewServiceRegistrationHandler(logger) + wrapperMock := wrapper.NewHandlerWrapper(logger, consulRegMock, nomadRegMock) + conf := &Config{ Alloc: alloc, ClientConfig: clientConf, Task: thisTask, TaskDir: taskDir, Logger: clientConf.Logger, - Consul: regMock.NewServiceRegistrationHandler(logger), + Consul: consulRegMock, ConsulSI: consulapi.NewMockServiceIdentitiesClient(), Vault: vaultclient.NewMockVaultClient(), StateDB: cstate.NoopDB{}, @@ -121,6 +127,7 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri StartConditionMetCtx: closedCh, ShutdownDelayCtx: shutdownDelayCtx, ShutdownDelayCancelFn: shutdownDelayCancelFn, + ServiceRegWrapper: wrapperMock, } return conf, trCleanup } @@ -1229,6 +1236,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { Grace: 100 * time.Millisecond, }, } + task.Services[0].Provider = structs.ServiceProviderConsul conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) defer cleanup() @@ -1246,6 +1254,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { defer consulClient.Shutdown() conf.Consul = consulClient + conf.ServiceRegWrapper = wrapper.NewHandlerWrapper(conf.Logger, consulClient, nil) tr, err := NewTaskRunner(conf) require.NoError(t, err) @@ -1885,6 +1894,7 @@ func TestTaskRunner_DriverNetwork(t *testing.T) { Name: "host-service", PortLabel: "http", AddressMode: "host", + Provider: structs.ServiceProviderConsul, Checks: []*structs.ServiceCheck{ { Name: "driver-check", @@ -1898,6 +1908,7 @@ func TestTaskRunner_DriverNetwork(t *testing.T) { Name: "driver-service", PortLabel: "5678", AddressMode: "driver", + Provider: structs.ServiceProviderConsul, Checks: []*structs.ServiceCheck{ { Name: "host-check", @@ -1928,6 +1939,7 @@ func TestTaskRunner_DriverNetwork(t *testing.T) { go consulClient.Run() conf.Consul = consulClient + conf.ServiceRegWrapper = wrapper.NewHandlerWrapper(conf.Logger, consulClient, nil) tr, err := NewTaskRunner(conf) require.NoError(t, err) diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index 4fde2cd0234..4910d292490 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -7,14 +7,14 @@ import ( "sync" "testing" - "github.com/hashicorp/nomad/client/lib/cgutil" - "github.com/hashicorp/nomad/client/allocwatcher" clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" + "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/serviceregistration/mock" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" @@ -57,13 +57,17 @@ func (m *MockStateUpdater) Reset() { func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, func()) { clientConf, cleanup := clientconfig.TestClientConfig(t) + + consulRegMock := mock.NewServiceRegistrationHandler(clientConf.Logger) + nomadRegMock := mock.NewServiceRegistrationHandler(clientConf.Logger) + conf := &Config{ // Copy the alloc in case the caller edits and reuses it Alloc: alloc.Copy(), Logger: clientConf.Logger, ClientConfig: clientConf, StateDB: state.NoopDB{}, - Consul: mock.NewServiceRegistrationHandler(clientConf.Logger), + Consul: consulRegMock, ConsulSI: consul.NewMockServiceIdentitiesClient(), Vault: vaultclient.NewMockVaultClient(), StateUpdater: &MockStateUpdater{}, @@ -73,6 +77,7 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu DriverManager: drivermanager.TestDriverManager(t), CpusetManager: cgutil.NoopCpusetManager(), ServersContactedCh: make(chan struct{}), + ServiceRegWrapper: wrapper.NewHandlerWrapper(clientConf.Logger, consulRegMock, nomadRegMock), } return conf, cleanup } diff --git a/client/client.go b/client/client.go index c70a1547daa..1b52582699b 100644 --- a/client/client.go +++ b/client/client.go @@ -40,6 +40,8 @@ import ( "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" "github.com/hashicorp/nomad/client/servers" "github.com/hashicorp/nomad/client/serviceregistration" + "github.com/hashicorp/nomad/client/serviceregistration/nsd" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/stats" cstructs "github.com/hashicorp/nomad/client/structs" @@ -226,10 +228,19 @@ type Client struct { // allocUpdates stores allocations that need to be synced to the server. allocUpdates chan *structs.Allocation - // consulService is Nomad's custom Consul client for managing services + // consulService is the Consul handler implementation for managing services // and checks. consulService serviceregistration.Handler + // nomadService is the Nomad handler implementation for managing service + // registrations. + nomadService serviceregistration.Handler + + // serviceRegWrapper wraps the consulService and nomadService + // implementations so that the alloc and task runner service hooks can call + // this without needing to identify which backend provider should be used. + serviceRegWrapper *wrapper.HandlerWrapper + // consulProxies is Nomad's custom Consul client for looking up supported // envoy versions consulProxies consulApi.SupportedProxiesAPI @@ -472,6 +483,12 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie c.devicemanager = devManager c.pluginManagers.RegisterAndRun(devManager) + // Set up the service registration wrapper using the Consul and Nomad + // implementations. The Nomad implementation is only ever used on the + // client, so we do that here rather than within the agent. + c.setupNomadServiceRegistrationHandler() + c.serviceRegWrapper = wrapper.NewHandlerWrapper(c.logger, c.consulService, c.nomadService) + // Batching of initial fingerprints is done to reduce the number of node // updates sent to the server on startup. This is the first RPC to the servers go c.batchFirstFingerprints() @@ -785,6 +802,13 @@ func (c *Client) Shutdown() error { } arGroup.Wait() + // Assert the implementation, so we can trigger the shutdown call. This is + // the only place this occurs, so it's OK to store the interface rather + // than the implementation. + if h, ok := c.nomadService.(*nsd.ServiceRegistrationHandler); ok { + h.Shutdown() + } + // Shutdown the plugin managers c.pluginManagers.Shutdown() @@ -1141,6 +1165,7 @@ func (c *Client) restoreState() error { DeviceManager: c.devicemanager, DriverManager: c.drivermanager, ServersContactedCh: c.serversContactedCh, + ServiceRegWrapper: c.serviceRegWrapper, RPCClient: c, } c.configLock.RUnlock() @@ -2462,6 +2487,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error CpusetManager: c.cpusetManager, DeviceManager: c.devicemanager, DriverManager: c.drivermanager, + ServiceRegWrapper: c.serviceRegWrapper, RPCClient: c, } c.configLock.RUnlock() @@ -2509,6 +2535,20 @@ func (c *Client) setupVaultClient() error { return nil } +// setupNomadServiceRegistrationHandler sets up the registration handler to use +// for native service discovery. +func (c *Client) setupNomadServiceRegistrationHandler() { + cfg := nsd.ServiceRegistrationHandlerCfg{ + Datacenter: c.Datacenter(), + Enabled: c.config.NomadServiceDiscovery, + NodeID: c.NodeID(), + NodeSecret: c.secretNodeID(), + Region: c.Region(), + RPCFn: c.RPC, + } + c.nomadService = nsd.NewServiceRegistrationHandler(c.logger, &cfg) +} + // deriveToken takes in an allocation and a set of tasks and derives vault // tokens for each of the tasks, unwraps all of them using the supplied vault // client and returns a map of unwrapped tokens, indexed by the task name. diff --git a/client/serviceregistration/wrapper/wrapper.go b/client/serviceregistration/wrapper/wrapper.go index 37b5e4f614b..1c52dfd1987 100644 --- a/client/serviceregistration/wrapper/wrapper.go +++ b/client/serviceregistration/wrapper/wrapper.go @@ -66,20 +66,28 @@ func (h *HandlerWrapper) RegisterWorkload(workload *serviceregistration.Workload // workload unless the provider is unknown. func (h *HandlerWrapper) RemoveWorkload(services *serviceregistration.WorkloadServices) { - // Don't rely on callers to check there are no services to remove. - if len(services.Services) == 0 { - return - } + var provider string - provider := services.RegistrationProvider() + // It is possible the services field is empty depending on the exact + // situation which resulted in the call. + if len(services.Services) > 0 { + provider = services.RegistrationProvider() + } - // Call the correct provider. In the case it is not supported, we can't do - // much apart from log, although we should never reach this point because - // of job validation. + // Call the correct provider, if we have managed to identify it. An empty + // string means you didn't find a provider, therefore default to consul. + // + // In certain situations this function is called with zero services, + // therefore meaning we make an assumption on the provider. When this + // happens, we need to ensure the allocation is removed from the Consul + // implementation. This tracking (allocRegistrations) is used by the + // allochealth tracker and so is critical to be removed. The test + // allocrunner.TestAllocRunner_Restore_RunningTerminal covers the case + // described here. switch provider { case structs.ServiceProviderNomad: h.nomadServiceProvider.RemoveWorkload(services) - case structs.ServiceProviderConsul: + case structs.ServiceProviderConsul, "": h.consulServiceProvider.RemoveWorkload(services) default: h.log.Error("unknown service registration provider", "provider", provider) diff --git a/client/serviceregistration/wrapper/wrapper_test.go b/client/serviceregistration/wrapper/wrapper_test.go index 81dfd7c1f67..2acb8237626 100644 --- a/client/serviceregistration/wrapper/wrapper_test.go +++ b/client/serviceregistration/wrapper/wrapper_test.go @@ -131,10 +131,10 @@ func TestHandlerWrapper_RemoveWorkload(t *testing.T) { // Generate the test wrapper and provider mocks. wrapper, consul, nomad := setupTestWrapper() - // Call the function with no services and check that nothing is - // registered. + // Call the function with no services and check that consul is + // defaulted to. wrapper.RemoveWorkload(&serviceregistration.WorkloadServices{}) - require.Len(t, consul.GetOps(), 0) + require.Len(t, consul.GetOps(), 1) require.Len(t, nomad.GetOps(), 0) }, name: "zero services", diff --git a/command/agent/agent.go b/command/agent/agent.go index b65e101ef65..9d2668d8842 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -14,8 +14,6 @@ import ( "sync" "time" - "github.com/hashicorp/nomad/lib/cpuset" - metrics "github.com/armon/go-metrics" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" @@ -28,6 +26,7 @@ import ( "github.com/hashicorp/nomad/command/agent/event" "github.com/hashicorp/nomad/helper/pluginutils/loader" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/lib/cpuset" "github.com/hashicorp/nomad/nomad" "github.com/hashicorp/nomad/nomad/deploymentwatcher" "github.com/hashicorp/nomad/nomad/structs" @@ -891,7 +890,6 @@ func (a *Agent) setupClient() error { if !a.config.Client.Enabled { return nil } - // Setup the configuration conf, err := a.clientConfig() if err != nil { diff --git a/command/agent/consul/int_test.go b/command/agent/consul/int_test.go index 12aa80f8f35..5ec318354b1 100644 --- a/command/agent/consul/int_test.go +++ b/command/agent/consul/int_test.go @@ -15,6 +15,8 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/devicemanager" "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + regMock "github.com/hashicorp/nomad/client/serviceregistration/mock" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/command/agent/consul" @@ -93,6 +95,7 @@ func TestConsul_Integration(t *testing.T) { Name: "httpd", PortLabel: "http", Tags: []string{"nomad", "test", "http"}, + Provider: structs.ServiceProviderConsul, Checks: []*structs.ServiceCheck{ { Name: "httpd-http-check", @@ -114,6 +117,7 @@ func TestConsul_Integration(t *testing.T) { { Name: "httpd2", PortLabel: "http", + Provider: structs.ServiceProviderConsul, Tags: []string{ "test", // Use URL-unfriendly tags to test #3620 @@ -162,6 +166,7 @@ func TestConsul_Integration(t *testing.T) { DeviceManager: devicemanager.NoopMockManager(), DriverManager: drivermanager.TestDriverManager(t), StartConditionMetCtx: closedCh, + ServiceRegWrapper: wrapper.NewHandlerWrapper(logger, serviceClient, regMock.NewServiceRegistrationHandler(logger)), } tr, err := taskrunner.NewTaskRunner(config) diff --git a/nomad/structs/alloc.go b/nomad/structs/alloc.go index 9f1dee8d079..a0277477373 100644 --- a/nomad/structs/alloc.go +++ b/nomad/structs/alloc.go @@ -22,3 +22,37 @@ type AllocServiceRegistrationsResponse struct { Services []*ServiceRegistration QueryMeta } + +// ServiceProviderNamespace returns the namespace within which the allocations +// services should be registered. This takes into account the different +// providers that can provide service registrations. In the event no services +// are found, the function will return the Consul namespace which allows hooks +// to work as they did before this feature. +// +// It currently assumes that all services within an allocation use the same +// provider and therefore the same namespace. +func (a *Allocation) ServiceProviderNamespace() string { + tg := a.Job.LookupTaskGroup(a.TaskGroup) + + if len(tg.Services) > 0 { + switch tg.Services[0].Provider { + case ServiceProviderNomad: + return a.Job.Namespace + default: + return tg.Consul.GetNamespace() + } + } + + if len(tg.Tasks) > 0 { + if len(tg.Tasks[0].Services) > 0 { + switch tg.Tasks[0].Services[0].Provider { + case ServiceProviderNomad: + return a.Job.Namespace + default: + return tg.Consul.GetNamespace() + } + } + } + + return tg.Consul.GetNamespace() +} diff --git a/nomad/structs/alloc_test.go b/nomad/structs/alloc_test.go index ce2ce52dab8..92bc6161550 100644 --- a/nomad/structs/alloc_test.go +++ b/nomad/structs/alloc_test.go @@ -10,3 +10,105 @@ func TestAllocServiceRegistrationsRequest_StaleReadSupport(t *testing.T) { req := &AllocServiceRegistrationsRequest{} require.True(t, req.IsRead()) } + +func Test_Allocation_ServiceProviderNamespace(t *testing.T) { + testCases := []struct { + inputAllocation *Allocation + expectedOutput string + name string + }{ + { + inputAllocation: &Allocation{ + Job: &Job{ + TaskGroups: []*TaskGroup{ + { + Name: "test-group", + Services: []*Service{ + { + Provider: ServiceProviderConsul, + }, + }, + }, + }, + }, + TaskGroup: "test-group", + }, + expectedOutput: "", + name: "consul task group service", + }, + { + inputAllocation: &Allocation{ + Job: &Job{ + TaskGroups: []*TaskGroup{ + { + Name: "test-group", + Tasks: []*Task{ + { + Services: []*Service{ + { + Provider: ServiceProviderConsul, + }, + }, + }, + }, + }, + }, + }, + TaskGroup: "test-group", + }, + expectedOutput: "", + name: "consul task service", + }, + { + inputAllocation: &Allocation{ + Job: &Job{ + Namespace: "platform", + TaskGroups: []*TaskGroup{ + { + Name: "test-group", + Services: []*Service{ + { + Provider: ServiceProviderNomad, + }, + }, + }, + }, + }, + TaskGroup: "test-group", + }, + expectedOutput: "platform", + name: "nomad task group service", + }, + { + inputAllocation: &Allocation{ + Job: &Job{ + Namespace: "platform", + TaskGroups: []*TaskGroup{ + { + Name: "test-group", + Tasks: []*Task{ + { + Services: []*Service{ + { + Provider: ServiceProviderNomad, + }, + }, + }, + }, + }, + }, + }, + TaskGroup: "test-group", + }, + expectedOutput: "platform", + name: "nomad task service", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualOutput := tc.inputAllocation.ServiceProviderNamespace() + require.Equal(t, tc.expectedOutput, actualOutput) + }) + } +}