Skip to content

Commit

Permalink
backport of commit a76daf6
Browse files Browse the repository at this point in the history
  • Loading branch information
tgross authored Dec 12, 2023
1 parent 27b5ed3 commit 1f8cd3f
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 46 deletions.
3 changes: 3 additions & 0 deletions .changelog/19449.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
consul (Enterprise): Fixed a bug where implicit Consul constraints were not specific to non-default Consul clusters
```
69 changes: 44 additions & 25 deletions nomad/job_endpoint_hook_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,40 +80,56 @@ func connectGatewayDriverConfig(hostNetwork bool) map[string]interface{} {
// the proper Consul version is used that supports the necessary Connect
// features. This includes bootstrapping envoy with a unix socket for Consul's
// gRPC xDS API, and support for generating local service identity tokens.
func connectSidecarVersionConstraint() *structs.Constraint {
return &structs.Constraint{
LTarget: "${attr.consul.version}",
RTarget: ">= 1.8.0",
Operand: structs.ConstraintSemver,
func connectSidecarVersionConstraint(cluster string) *structs.Constraint {
if cluster != structs.ConsulDefaultCluster && cluster != "" {
return &structs.Constraint{
LTarget: fmt.Sprintf("${attr.consul.%s.version}", cluster),
RTarget: ">= 1.8.0",
Operand: structs.ConstraintSemver,
}
}
return consulServiceDiscoveryConstraint
}

// connectGatewayVersionConstraint is used when building a connect gateway
// task to ensure proper Consul version is used that supports Connect Gateway
// features. This includes making use of Consul Configuration Entries of type
// {ingress,terminating,mesh}-gateway.
func connectGatewayVersionConstraint() *structs.Constraint {
return &structs.Constraint{
LTarget: "${attr.consul.version}",
RTarget: ">= 1.8.0",
Operand: structs.ConstraintSemver,
func connectGatewayVersionConstraint(cluster string) *structs.Constraint {
if cluster != structs.ConsulDefaultCluster && cluster != "" {
return &structs.Constraint{
LTarget: fmt.Sprintf("${attr.consul.%s.version}", cluster),
RTarget: ">= 1.8.0",
Operand: structs.ConstraintSemver,
}
}
return consulServiceDiscoveryConstraint
}

// connectGatewayTLSVersionConstraint is used when building a connect gateway
// task to ensure proper Consul version is used that supports customized TLS version.
// https://github.com/hashicorp/consul/pull/11576
func connectGatewayTLSVersionConstraint() *structs.Constraint {
func connectGatewayTLSVersionConstraint(cluster string) *structs.Constraint {
attr := "${attr.consul.version}"
if cluster != structs.ConsulDefaultCluster {
attr = fmt.Sprintf("${attr.consul.%s.version}", cluster)
}

return &structs.Constraint{
LTarget: "${attr.consul.version}",
LTarget: attr,
RTarget: ">= 1.11.2",
Operand: structs.ConstraintSemver,
}
}

func connectListenerConstraint() *structs.Constraint {
func connectListenerConstraint(cluster string) *structs.Constraint {
attr := "${attr.consul.grpc}"
if cluster != structs.ConsulDefaultCluster {
attr = fmt.Sprintf("${attr.consul.%s.grpc}", cluster)
}

return &structs.Constraint{
LTarget: "${attr.consul.grpc}",
LTarget: attr,
RTarget: "0",
Operand: ">",
}
Expand Down Expand Up @@ -277,7 +293,8 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error {
// If the task doesn't already exist, create a new one and add it to the job
if task == nil {
driver := groupConnectGuessTaskDriver(g)
task = newConnectSidecarTask(service.Name, driver)
cluster := service.GetConsulClusterName(g)
task = newConnectSidecarTask(service.Name, driver, cluster)

// If there happens to be a task defined with the same name
// append an UUID fragment to the task name
Expand Down Expand Up @@ -357,7 +374,8 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error {
netHost := netMode == "host"
customizedTLS := service.Connect.IsCustomizedTLS()

task := newConnectGatewayTask(prefix, service.Name, netHost, customizedTLS)
task := newConnectGatewayTask(prefix, service.Name,
service.GetConsulClusterName(g), netHost, customizedTLS)
g.Tasks = append(g.Tasks, task)

// the connect.sidecar_task block can also be used to configure
Expand Down Expand Up @@ -476,13 +494,13 @@ func gatewayBindAddressesIngressForBridge(ingress *structs.ConsulIngressConfigEn
return addresses
}

func newConnectGatewayTask(prefix, service string, netHost, customizedTls bool) *structs.Task {
func newConnectGatewayTask(prefix, service, cluster string, netHost, customizedTls bool) *structs.Task {
constraints := structs.Constraints{
connectGatewayVersionConstraint(),
connectListenerConstraint(),
connectGatewayVersionConstraint(cluster),
connectListenerConstraint(cluster),
}
if customizedTls {
constraints = append(constraints, connectGatewayTLSVersionConstraint())
constraints = append(constraints, connectGatewayTLSVersionConstraint(cluster))
}
return &structs.Task{
// Name is used in container name so must start with '[A-Za-z0-9]'
Expand All @@ -500,7 +518,11 @@ func newConnectGatewayTask(prefix, service string, netHost, customizedTls bool)
}
}

func newConnectSidecarTask(service, driver string) *structs.Task {
func newConnectSidecarTask(service, driver, cluster string) *structs.Task {

versionConstraint := connectSidecarVersionConstraint(cluster)
listenerConstraint := connectListenerConstraint(cluster)

return &structs.Task{
// Name is used in container name so must start with '[A-Za-z0-9]'
Name: fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, service),
Expand All @@ -517,10 +539,7 @@ func newConnectSidecarTask(service, driver string) *structs.Task {
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
},
Constraints: structs.Constraints{
connectSidecarVersionConstraint(),
connectListenerConstraint(),
},
Constraints: structs.Constraints{versionConstraint, listenerConstraint},
}
}

Expand Down
63 changes: 42 additions & 21 deletions nomad/job_endpoint_hook_connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func TestJobEndpointConnect_groupConnectGuessTaskDriver(t *testing.T) {
func TestJobEndpointConnect_newConnectSidecarTask(t *testing.T) {
ci.Parallel(t)

task := newConnectSidecarTask("redis", "podman")
task := newConnectSidecarTask("redis", "podman", structs.ConsulDefaultCluster)
must.Eq(t, "connect-proxy-redis", task.Name)
must.Eq(t, "podman", task.Driver)

task2 := newConnectSidecarTask("db", "docker")
task2 := newConnectSidecarTask("db", "docker", structs.ConsulDefaultCluster)
must.Eq(t, "connect-proxy-db", task2.Name)
must.Eq(t, "docker", task2.Driver)
}
Expand Down Expand Up @@ -154,8 +154,8 @@ func TestJobEndpointConnect_groupConnectHook(t *testing.T) {
// Expected tasks
tgExp := job.TaskGroups[0].Copy()
tgExp.Tasks = []*structs.Task{
newConnectSidecarTask("backend", "docker"),
newConnectSidecarTask("admin", "docker"),
newConnectSidecarTask("backend", "docker", structs.ConsulDefaultCluster),
newConnectSidecarTask("admin", "docker", structs.ConsulDefaultCluster),
}
tgExp.Services[0].Name = "backend"
tgExp.Services[1].Name = "admin"
Expand Down Expand Up @@ -200,7 +200,8 @@ func TestJobEndpointConnect_groupConnectHook_IngressGateway_BridgeNetwork(t *tes
expTG := job.TaskGroups[0].Copy()
expTG.Tasks = []*structs.Task{
// inject the gateway task
newConnectGatewayTask(structs.ConnectIngressPrefix, "my-gateway", false, true),
newConnectGatewayTask(structs.ConnectIngressPrefix, "my-gateway",
structs.ConsulDefaultCluster, false, true),
}
expTG.Services[0].Name = "my-gateway"
expTG.Tasks[0].Canonicalize(job, expTG)
Expand Down Expand Up @@ -239,7 +240,8 @@ func TestJobEndpointConnect_groupConnectHook_IngressGateway_HostNetwork(t *testi
expTG := job.TaskGroups[0].Copy()
expTG.Tasks = []*structs.Task{
// inject the gateway task
newConnectGatewayTask(structs.ConnectIngressPrefix, "my-gateway", true, false),
newConnectGatewayTask(structs.ConnectIngressPrefix, "my-gateway",
structs.ConsulDefaultCluster, true, false),
}
expTG.Services[0].Name = "my-gateway"
expTG.Tasks[0].Canonicalize(job, expTG)
Expand Down Expand Up @@ -305,8 +307,8 @@ func TestJobEndpointConnect_groupConnectHook_IngressGateway_CustomTask(t *testin
ShutdownDelay: 5 * time.Second,
KillSignal: "SIGHUP",
Constraints: structs.Constraints{
connectGatewayVersionConstraint(),
connectListenerConstraint(),
connectGatewayVersionConstraint(structs.ConsulDefaultCluster),
connectListenerConstraint(structs.ConsulDefaultCluster),
},
},
}
Expand Down Expand Up @@ -341,7 +343,8 @@ func TestJobEndpointConnect_groupConnectHook_TerminatingGateway(t *testing.T) {
expTG := job.TaskGroups[0].Copy()
expTG.Tasks = []*structs.Task{
// inject the gateway task
newConnectGatewayTask(structs.ConnectTerminatingPrefix, "my-gateway", false, false),
newConnectGatewayTask(structs.ConnectTerminatingPrefix, "my-gateway",
structs.ConsulDefaultCluster, false, false),
}
expTG.Services[0].Name = "my-gateway"
expTG.Tasks[0].Canonicalize(job, expTG)
Expand Down Expand Up @@ -375,7 +378,8 @@ func TestJobEndpointConnect_groupConnectHook_MeshGateway(t *testing.T) {
expTG := job.TaskGroups[0].Copy()
expTG.Tasks = []*structs.Task{
// inject the gateway task
newConnectGatewayTask(structs.ConnectMeshPrefix, "my-gateway", false, false),
newConnectGatewayTask(structs.ConnectMeshPrefix, "my-gateway",
structs.ConsulDefaultCluster, false, false),
}
expTG.Services[0].Name = "my-gateway"
expTG.Services[0].PortLabel = "public_port"
Expand Down Expand Up @@ -694,28 +698,45 @@ func TestJobEndpointConnect_newConnectGatewayTask_host(t *testing.T) {
ci.Parallel(t)

t.Run("ingress", func(t *testing.T) {
task := newConnectGatewayTask(structs.ConnectIngressPrefix, "foo", true, false)
require.Equal(t, "connect-ingress-foo", task.Name)
require.Equal(t, "connect-ingress:foo", string(task.Kind))
require.Equal(t, ">= 1.8.0", task.Constraints[0].RTarget)
require.Equal(t, "host", task.Config["network_mode"])
task := newConnectGatewayTask(structs.ConnectIngressPrefix, "foo",
structs.ConsulDefaultCluster, true, false)
must.Eq(t, "connect-ingress-foo", task.Name)
must.Eq(t, "connect-ingress:foo", string(task.Kind))
must.Eq(t, "${attr.consul.version}", task.Constraints[0].LTarget)
must.Eq(t, ">= 1.8.0", task.Constraints[0].RTarget)
must.Eq(t, "host", task.Config["network_mode"])
require.Nil(t, task.Lifecycle)
})

t.Run("terminating", func(t *testing.T) {
task := newConnectGatewayTask(structs.ConnectTerminatingPrefix, "bar", true, false)
require.Equal(t, "connect-terminating-bar", task.Name)
require.Equal(t, "connect-terminating:bar", string(task.Kind))
require.Equal(t, ">= 1.8.0", task.Constraints[0].RTarget)
require.Equal(t, "host", task.Config["network_mode"])
task := newConnectGatewayTask(structs.ConnectTerminatingPrefix, "bar",
structs.ConsulDefaultCluster, true, false)
must.Eq(t, "connect-terminating-bar", task.Name)
must.Eq(t, "connect-terminating:bar", string(task.Kind))
must.Eq(t, "${attr.consul.version}", task.Constraints[0].LTarget)
must.Eq(t, ">= 1.8.0", task.Constraints[0].RTarget)
must.Eq(t, "host", task.Config["network_mode"])
require.Nil(t, task.Lifecycle)
})

// this case can only happen on ENT but gets run in CE code
t.Run("terminating nondefault (ENT)", func(t *testing.T) {
task := newConnectGatewayTask(structs.ConnectTerminatingPrefix, "bar",
"nondefault", true, false)
must.Eq(t, "connect-terminating-bar", task.Name)
must.Eq(t, "connect-terminating:bar", string(task.Kind))
must.Eq(t, "${attr.consul.nondefault.version}", task.Constraints[0].LTarget)
must.Eq(t, ">= 1.8.0", task.Constraints[0].RTarget)
must.Eq(t, "host", task.Config["network_mode"])
require.Nil(t, task.Lifecycle)
})
}

func TestJobEndpointConnect_newConnectGatewayTask_bridge(t *testing.T) {
ci.Parallel(t)

task := newConnectGatewayTask(structs.ConnectIngressPrefix, "service1", false, false)
task := newConnectGatewayTask(structs.ConnectIngressPrefix, "service1",
structs.ConsulDefaultCluster, false, false)
require.NotContains(t, task.Config, "network_mode")
}

Expand Down

0 comments on commit 1f8cd3f

Please sign in to comment.