Skip to content

Commit

Permalink
Merge pull request #12329 from hashicorp/f-gh-266-touchdown
Browse files Browse the repository at this point in the history
client: hookup service wrapper for use in clients
  • Loading branch information
jrasell authored Mar 22, 2022
2 parents 3668bac + f0be952 commit b6f1e9d
Show file tree
Hide file tree
Showing 21 changed files with 586 additions and 128 deletions.
10 changes: 8 additions & 2 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"time"

"github.com/hashicorp/nomad/client/lib/cgutil"

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
Expand All @@ -20,9 +18,11 @@ import (
"github.com/hashicorp/nomad/client/devicemanager"
"github.com/hashicorp/nomad/client/dynamicplugins"
cinterfaces "github.com/hashicorp/nomad/client/interfaces"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/client/vaultclient"
Expand Down Expand Up @@ -178,6 +178,10 @@ type allocRunner struct {
// rpcClient is the RPC Client that should be used by the allocrunner and its
// hooks to communicate with Nomad Servers.
rpcClient RPCer

// serviceRegWrapper is the handler wrapper that is used by service hooks
// to perform service and check registration and deregistration.
serviceRegWrapper *wrapper.HandlerWrapper
}

// RPCer is the interface needed by hooks to make RPC calls.
Expand Down Expand Up @@ -221,6 +225,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
driverManager: config.DriverManager,
serversContactedCh: config.ServersContactedCh,
rpcClient: config.RPCClient,
serviceRegWrapper: config.ServiceRegWrapper,
}

// Create the logger based on the allocation ID
Expand Down Expand Up @@ -274,6 +279,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
ServersContactedCh: ar.serversContactedCh,
StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task),
ShutdownDelayCtx: ar.shutdownDelayCtx,
ServiceRegWrapper: ar.serviceRegWrapper,
}

if ar.cpusetManager != nil {
Expand Down
4 changes: 2 additions & 2 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar, builtTaskEnv),
newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
consul: ar.consulClient,
consulNamespace: alloc.ConsulNamespace(),
namespace: alloc.ServiceProviderNamespace(),
serviceRegWrapper: ar.serviceRegWrapper,
restarter: ar,
taskEnvBuilder: envBuilder,
networkStatusGetter: ar,
Expand Down
4 changes: 3 additions & 1 deletion client/allocrunner/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,8 @@ func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
tg := alloc.Job.TaskGroups[0]
tg.Services = []*structs.Service{
{
Name: "shutdown_service",
Name: "shutdown_service",
Provider: structs.ServiceProviderConsul,
},
}

Expand Down Expand Up @@ -1314,6 +1315,7 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
{
Name: "fakservice",
PortLabel: "http",
Provider: structs.ServiceProviderConsul,
Checks: []*structs.ServiceCheck{
{
Name: "fakecheck",
Expand Down
1 change: 1 addition & 0 deletions client/allocrunner/alloc_runner_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) {
{
Name: "foo",
PortLabel: "8888",
Provider: structs.ServiceProviderConsul,
},
}
task := alloc.Job.TaskGroups[0].Tasks[0]
Expand Down
5 changes: 5 additions & 0 deletions client/allocrunner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
cstate "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -81,4 +82,8 @@ type Config struct {
// RPCClient is the RPC Client that should be used by the allocrunner and its
// hooks to communicate with Nomad Servers.
RPCClient RPCer

// ServiceRegWrapper is the handler wrapper that is used by service hooks
// to perform service and check registration and deregistration.
ServiceRegWrapper *wrapper.HandlerWrapper
}
36 changes: 26 additions & 10 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/client/serviceregistration/wrapper"
"github.com/hashicorp/nomad/client/taskenv"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/nomad/structs"
Expand All @@ -25,15 +26,22 @@ type networkStatusGetter interface {
// deregistration.
type groupServiceHook struct {
allocID string
jobID string
group string
restarter agentconsul.WorkloadRestarter
consulClient serviceregistration.Handler
consulNamespace string
prerun bool
deregistered bool
networkStatusGetter networkStatusGetter
shutdownDelayCtx context.Context

// namespace is the Nomad or Consul namespace in which service
// registrations will be made.
namespace string

// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
serviceRegWrapper *wrapper.HandlerWrapper

logger log.Logger

// The following fields may be updated
Expand All @@ -51,13 +59,19 @@ type groupServiceHook struct {

type groupServiceHookConfig struct {
alloc *structs.Allocation
consul serviceregistration.Handler
consulNamespace string
restarter agentconsul.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
networkStatusGetter networkStatusGetter
shutdownDelayCtx context.Context
logger log.Logger

// namespace is the Nomad or Consul namespace in which service
// registrations will be made.
namespace string

// serviceRegWrapper is the handler wrapper that is used to perform service
// and check registration and deregistration.
serviceRegWrapper *wrapper.HandlerWrapper
}

func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
Expand All @@ -70,15 +84,16 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {

h := &groupServiceHook{
allocID: cfg.alloc.ID,
jobID: cfg.alloc.JobID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
consulNamespace: cfg.consulNamespace,
namespace: cfg.namespace,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
networkStatusGetter: cfg.networkStatusGetter,
logger: cfg.logger.Named(groupServiceHookName),
services: cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup).Services,
serviceRegWrapper: cfg.serviceRegWrapper,
shutdownDelayCtx: cfg.shutdownDelayCtx,
}

Expand Down Expand Up @@ -114,7 +129,7 @@ func (h *groupServiceHook) prerunLocked() error {
}

services := h.getWorkloadServices()
return h.consulClient.RegisterWorkload(services)
return h.serviceRegWrapper.RegisterWorkload(services)
}

func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
Expand Down Expand Up @@ -157,7 +172,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
return nil
}

return h.consulClient.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
return h.serviceRegWrapper.UpdateWorkload(oldWorkloadServices, newWorkloadServices)
}

func (h *groupServiceHook) PreTaskRestart() error {
Expand Down Expand Up @@ -213,7 +228,7 @@ func (h *groupServiceHook) Postrun() error {
func (h *groupServiceHook) deregister() {
if len(h.services) > 0 {
workloadServices := h.getWorkloadServices()
h.consulClient.RemoveWorkload(workloadServices)
h.serviceRegWrapper.RemoveWorkload(workloadServices)
}
}

Expand All @@ -229,8 +244,9 @@ func (h *groupServiceHook) getWorkloadServices() *serviceregistration.WorkloadSe
// Create task services struct with request's driver metadata
return &serviceregistration.WorkloadServices{
AllocID: h.allocID,
JobID: h.jobID,
Group: h.group,
Namespace: h.consulNamespace,
Namespace: h.namespace,
Restarter: h.restarter,
Services: interpolatedServices,
Networks: h.networks,
Expand Down
Loading

0 comments on commit b6f1e9d

Please sign in to comment.