diff --git a/acceptance/framework/k8s/deploy.go b/acceptance/framework/k8s/deploy.go index 2cf5e8876a..09272d5382 100644 --- a/acceptance/framework/k8s/deploy.go +++ b/acceptance/framework/k8s/deploy.go @@ -16,8 +16,6 @@ import ( "k8s.io/apimachinery/pkg/util/yaml" ) -const staticClientName = "static-client" - // Deploy creates a Kubernetes deployment by applying configuration stored at filepath, // sets up a cleanup function and waits for the deployment to become available. func Deploy(t *testing.T, options *k8s.KubectlOptions, noCleanupOnFailure bool, debugDirectory string, filepath string) { @@ -71,38 +69,43 @@ func DeployKustomize(t *testing.T, options *k8s.KubectlOptions, noCleanupOnFailu RunKubectl(t, options, "wait", "--for=condition=available", "--timeout=5m", fmt.Sprintf("deploy/%s", deployment.Name)) } -// CheckStaticServerConnection execs into a pod of the deployment given by deploymentName +// CheckStaticServerConnection execs into a pod of sourceApp // and runs a curl command with the provided curlArgs. // This function assumes that the connection is made to the static-server and expects the output -// to be "hello world" in a case of success. +// to be "hello world" by default, or expectedSuccessOutput in a case of success. // If expectSuccess is true, it will expect connection to succeed, // otherwise it will expect failure due to intentions. -func CheckStaticServerConnection(t *testing.T, options *k8s.KubectlOptions, expectSuccess bool, failureMessages []string, curlArgs ...string) { +func CheckStaticServerConnection(t *testing.T, options *k8s.KubectlOptions, sourceApp string, expectSuccess bool, failureMessages []string, expectedSuccessOutput string, curlArgs ...string) { t.Helper() - CheckStaticServerConnectionMultipleFailureMessages(t, options, expectSuccess, failureMessages, curlArgs...) + CheckStaticServerConnectionMultipleFailureMessages(t, options, sourceApp, expectSuccess, failureMessages, expectedSuccessOutput, curlArgs...) } -// CheckStaticServerConnectionMultipleFailureMessages execs into a pod of the deployment given by deploymentName +// CheckStaticServerConnectionMultipleFailureMessages execs into a pod of sourceApp // and runs a curl command with the provided curlArgs. // This function assumes that the connection is made to the static-server and expects the output -// to be "hello world" in a case of success. +// to be "hello world" by default, or expectedSuccessOutput in a case of success. // If expectSuccess is true, it will expect connection to succeed, // otherwise it will expect failure due to intentions. If multiple failureMessages are provided it will assert // on the existence of any of them. -func CheckStaticServerConnectionMultipleFailureMessages(t *testing.T, options *k8s.KubectlOptions, expectSuccess bool, failureMessages []string, curlArgs ...string) { +func CheckStaticServerConnectionMultipleFailureMessages(t *testing.T, options *k8s.KubectlOptions, sourceApp string, expectSuccess bool, failureMessages []string, expectedSuccessOutput string, curlArgs ...string) { t.Helper() + expectedOutput := "hello world" + if expectedSuccessOutput != "" { + expectedOutput = expectedSuccessOutput + } + retrier := &retry.Timer{Timeout: 80 * time.Second, Wait: 2 * time.Second} - args := []string{"exec", "deploy/" + staticClientName, "-c", staticClientName, "--", "curl", "-vvvsSf"} + args := []string{"exec", "deploy/" + sourceApp, "-c", sourceApp, "--", "curl", "-vvvsSf"} args = append(args, curlArgs...) retry.RunWith(retrier, t, func(r *retry.R) { output, err := RunKubectlAndGetOutputE(t, options, args...) if expectSuccess { require.NoError(r, err) - require.Contains(r, output, "hello world") + require.Contains(r, output, expectedOutput) } else { require.Error(r, err) require.Condition(r, func() bool { @@ -118,24 +121,33 @@ func CheckStaticServerConnectionMultipleFailureMessages(t *testing.T, options *k }) } +// CheckStaticServerConnectionSuccessfulWithMessage is just like CheckStaticServerConnectionSuccessful +// but it asserts on a non-default expected message. +func CheckStaticServerConnectionSuccessfulWithMessage(t *testing.T, options *k8s.KubectlOptions, sourceApp string, message string, curlArgs ...string) { + t.Helper() + start := time.Now() + CheckStaticServerConnectionMultipleFailureMessages(t, options, sourceApp, true, nil, message, curlArgs...) + logger.Logf(t, "Took %s to check if static server connection was successful", time.Since(start)) +} + // CheckStaticServerConnectionSuccessful is just like CheckStaticServerConnection // but it always expects a successful connection. -func CheckStaticServerConnectionSuccessful(t *testing.T, options *k8s.KubectlOptions, curlArgs ...string) { +func CheckStaticServerConnectionSuccessful(t *testing.T, options *k8s.KubectlOptions, sourceApp string, curlArgs ...string) { t.Helper() start := time.Now() - CheckStaticServerConnection(t, options, true, nil, curlArgs...) + CheckStaticServerConnection(t, options, sourceApp, true, nil, "", curlArgs...) logger.Logf(t, "Took %s to check if static server connection was successful", time.Since(start)) } // CheckStaticServerConnectionFailing is just like CheckStaticServerConnection // but it always expects a failing connection with various errors. -func CheckStaticServerConnectionFailing(t *testing.T, options *k8s.KubectlOptions, curlArgs ...string) { +func CheckStaticServerConnectionFailing(t *testing.T, options *k8s.KubectlOptions, sourceApp string, curlArgs ...string) { t.Helper() - CheckStaticServerConnection(t, options, false, []string{ + CheckStaticServerConnection(t, options, sourceApp, false, []string{ "curl: (52) Empty reply from server", "curl: (7) Failed to connect", "curl: (56) Recv failure: Connection reset by peer", - }, curlArgs...) + }, "", curlArgs...) } // labelMapToString takes a label map[string]string diff --git a/acceptance/tests/connect/connect_helper.go b/acceptance/tests/connect/connect_helper.go index f59864f900..cf7853c1b3 100644 --- a/acceptance/tests/connect/connect_helper.go +++ b/acceptance/tests/connect/connect_helper.go @@ -81,9 +81,9 @@ func ConnectInjectConnectivityCheck(t *testing.T, ctx environment.TestContext, c if secure { logger.Log(t, "checking that the connection is not successful because there's no intention") if cfg.EnableTransparentProxy { - k8s.CheckStaticServerConnectionFailing(t, ctx.KubectlOptions(t), "http://static-server") + k8s.CheckStaticServerConnectionFailing(t, ctx.KubectlOptions(t), staticClientName, "http://static-server") } else { - k8s.CheckStaticServerConnectionFailing(t, ctx.KubectlOptions(t), "http://localhost:1234") + k8s.CheckStaticServerConnectionFailing(t, ctx.KubectlOptions(t), staticClientName, "http://localhost:1234") } logger.Log(t, "creating intention") @@ -103,9 +103,9 @@ func ConnectInjectConnectivityCheck(t *testing.T, ctx environment.TestContext, c logger.Log(t, "checking that connection is successful") if cfg.EnableTransparentProxy { // todo: add an assertion that the traffic is going through the proxy - k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), "http://static-server") + k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://static-server") } else { - k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://localhost:1234") } // Test that kubernetes readiness status is synced to Consul. @@ -120,8 +120,8 @@ func ConnectInjectConnectivityCheck(t *testing.T, ctx environment.TestContext, c // from server, which is the case when a connection is unsuccessful due to intentions in other tests. logger.Log(t, "checking that connection is unsuccessful") if cfg.EnableTransparentProxy { - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server port 80: Connection refused"}, "http://static-server") + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server port 80: Connection refused"}, "", "http://static-server") } else { - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "http://localhost:1234") + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:1234") } } diff --git a/acceptance/tests/connect/connect_inject_namespaces_test.go b/acceptance/tests/connect/connect_inject_namespaces_test.go index c5dfa5604a..815b28dea4 100644 --- a/acceptance/tests/connect/connect_inject_namespaces_test.go +++ b/acceptance/tests/connect/connect_inject_namespaces_test.go @@ -177,9 +177,9 @@ func TestConnectInjectNamespaces(t *testing.T) { if c.secure { logger.Log(t, "checking that the connection is not successful because there's no intention") if cfg.EnableTransparentProxy { - k8s.CheckStaticServerConnectionFailing(t, staticClientOpts, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) + k8s.CheckStaticServerConnectionFailing(t, staticClientOpts, staticClientName, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) } else { - k8s.CheckStaticServerConnectionFailing(t, staticClientOpts, "http://localhost:1234") + k8s.CheckStaticServerConnectionFailing(t, staticClientOpts, staticClientName, "http://localhost:1234") } intention := &api.ServiceIntentionsConfigEntry{ @@ -209,9 +209,9 @@ func TestConnectInjectNamespaces(t *testing.T) { logger.Log(t, "checking that connection is successful") if cfg.EnableTransparentProxy { - k8s.CheckStaticServerConnectionSuccessful(t, staticClientOpts, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) + k8s.CheckStaticServerConnectionSuccessful(t, staticClientOpts, staticClientName, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) } else { - k8s.CheckStaticServerConnectionSuccessful(t, staticClientOpts, "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, staticClientOpts, staticClientName, "http://localhost:1234") } // Test that kubernetes readiness status is synced to Consul. @@ -226,9 +226,9 @@ func TestConnectInjectNamespaces(t *testing.T) { // from server, which is the case when a connection is unsuccessful due to intentions in other tests. logger.Log(t, "checking that connection is unsuccessful") if cfg.EnableTransparentProxy { - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, staticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, staticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, "", fmt.Sprintf("http://static-server.%s", staticServerNamespace)) } else { - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, staticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "http://localhost:1234") + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, staticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:1234") } }) } diff --git a/acceptance/tests/connect/connect_inject_test.go b/acceptance/tests/connect/connect_inject_test.go index ef3a95a6be..bf4d8a7047 100644 --- a/acceptance/tests/connect/connect_inject_test.go +++ b/acceptance/tests/connect/connect_inject_test.go @@ -6,11 +6,13 @@ import ( "strconv" "strings" "testing" + "time" "github.com/hashicorp/consul-k8s/acceptance/framework/consul" "github.com/hashicorp/consul-k8s/acceptance/framework/helpers" "github.com/hashicorp/consul-k8s/acceptance/framework/k8s" "github.com/hashicorp/consul-k8s/acceptance/framework/logger" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -137,9 +139,9 @@ func TestConnectInject_RestartConsulClients(t *testing.T) { logger.Log(t, "checking that connection is successful") if cfg.EnableTransparentProxy { - k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), "http://static-server") + k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://static-server") } else { - k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://localhost:1234") } logger.Log(t, "restarting Consul client daemonset") @@ -148,8 +150,176 @@ func TestConnectInject_RestartConsulClients(t *testing.T) { logger.Log(t, "checking that connection is still successful") if cfg.EnableTransparentProxy { - k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), "http://static-server") + k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://static-server") } else { - k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://localhost:1234") + } +} + +const multiport = "multiport" +const multiportAdmin = "multiport-admin" + +// Test that Connect works for an application with multiple ports. The multiport application is a Pod listening on +// two ports. This tests inbound connections to each port of the multiport app, and outbound connections from the +// multiport app to static-server. +func TestConnectInject_MultiportServices(t *testing.T) { + cases := []struct { + secure bool + autoEncrypt bool + }{ + {false, false}, + {true, false}, + {true, true}, + } + + for _, c := range cases { + name := fmt.Sprintf("secure: %t; auto-encrypt: %t", c.secure, c.autoEncrypt) + t.Run(name, func(t *testing.T) { + cfg := suite.Config() + ctx := suite.Environment().DefaultContext(t) + + // Multi port apps don't work with transparent proxy. + if cfg.EnableTransparentProxy { + t.Skipf("skipping this test because transparent proxy is enabled") + } + + helmValues := map[string]string{ + "connectInject.enabled": "true", + + "global.tls.enabled": strconv.FormatBool(c.secure), + "global.tls.enableAutoEncrypt": strconv.FormatBool(c.autoEncrypt), + "global.acls.manageSystemACLs": strconv.FormatBool(c.secure), + } + + releaseName := helpers.RandomName() + consulCluster := consul.NewHelmCluster(t, helmValues, ctx, cfg, releaseName) + + consulCluster.Create(t) + + consulClient := consulCluster.SetupConsulClient(t, c.secure) + + // Check that the ACL token is deleted. + if c.secure { + // We need to register the cleanup function before we create the deployments + // because golang will execute them in reverse order i.e. the last registered + // cleanup function will be executed first. + t.Cleanup(func() { + retrier := &retry.Timer{Timeout: 5 * time.Minute, Wait: 1 * time.Second} + retry.RunWith(retrier, t, func(r *retry.R) { + tokens, _, err := consulClient.ACL().TokenList(nil) + require.NoError(r, err) + for _, token := range tokens { + require.NotContains(r, token.Description, multiport) + require.NotContains(r, token.Description, multiportAdmin) + require.NotContains(r, token.Description, staticClientName) + require.NotContains(r, token.Description, staticServerName) + } + }) + }) + } + + logger.Log(t, "creating multiport static-server and static-client deployments") + k8s.DeployKustomize(t, ctx.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/bases/multiport-app") + k8s.DeployKustomize(t, ctx.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/cases/static-client-inject-multiport") + + // Check that static-client has been injected and now has 2 containers. + podList, err := ctx.KubernetesClient(t).CoreV1().Pods(ctx.KubectlOptions(t).Namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=static-client", + }) + require.NoError(t, err) + require.Len(t, podList.Items, 1) + require.Len(t, podList.Items[0].Spec.Containers, 2) + + // Check that multiport has been injected and now has 4 containers. + podList, err = ctx.KubernetesClient(t).CoreV1().Pods(ctx.KubectlOptions(t).Namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: "app=multiport", + }) + require.NoError(t, err) + require.Len(t, podList.Items, 1) + require.Len(t, podList.Items[0].Spec.Containers, 4) + + if c.secure { + logger.Log(t, "checking that the connection is not successful because there's no intention") + k8s.CheckStaticServerConnectionFailing(t, ctx.KubectlOptions(t), staticClientName, "http://localhost:1234") + k8s.CheckStaticServerConnectionFailing(t, ctx.KubectlOptions(t), staticClientName, "http://localhost:2234") + + logger.Log(t, fmt.Sprintf("creating intention for %s", multiport)) + _, _, err := consulClient.ConfigEntries().Set(&api.ServiceIntentionsConfigEntry{ + Kind: api.ServiceIntentions, + Name: multiport, + Sources: []*api.SourceIntention{ + { + Name: staticClientName, + Action: api.IntentionActionAllow, + }, + }, + }, nil) + require.NoError(t, err) + logger.Log(t, fmt.Sprintf("creating intention for %s", multiportAdmin)) + _, _, err = consulClient.ConfigEntries().Set(&api.ServiceIntentionsConfigEntry{ + Kind: api.ServiceIntentions, + Name: multiportAdmin, + Sources: []*api.SourceIntention{ + { + Name: staticClientName, + Action: api.IntentionActionAllow, + }, + }, + }, nil) + require.NoError(t, err) + } + + // Check connection from static-client to multiport. + k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://localhost:1234") + + // Check connection from static-client to multiport-admin. + k8s.CheckStaticServerConnectionSuccessfulWithMessage(t, ctx.KubectlOptions(t), staticClientName, "hello world from 9090 admin", "http://localhost:2234") + + // Now that we've checked inbound connections to a multi port pod, check outbound connection from multi port + // pod to static-server. + + // Deploy static-server. + k8s.DeployKustomize(t, ctx.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/cases/static-server-inject") + + // For outbound connections from the multi port pod, only intentions from the first service in the multiport + // pod need to be created, since all upstream connections are made through the first service's envoy proxy. + if c.secure { + logger.Log(t, "checking that the connection is not successful because there's no intention") + + k8s.CheckStaticServerConnectionFailing(t, ctx.KubectlOptions(t), multiport, "http://localhost:3234") + + logger.Log(t, fmt.Sprintf("creating intention for %s", staticServerName)) + _, _, err := consulClient.ConfigEntries().Set(&api.ServiceIntentionsConfigEntry{ + Kind: api.ServiceIntentions, + Name: staticServerName, + Sources: []*api.SourceIntention{ + { + Name: multiport, + Action: api.IntentionActionAllow, + }, + }, + }, nil) + require.NoError(t, err) + } + + // Check the connection from the multi port pod to static-server. + k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), multiport, "http://localhost:3234") + + // Test that kubernetes readiness status is synced to Consul. This will make the multi port pods unhealthy + // and check inbound connections to the multi port pods' services. + // Create the files so that the readiness probes of the multi port pod fails. + logger.Log(t, "testing k8s -> consul health checks sync by making the multiport unhealthy") + k8s.RunKubectl(t, ctx.KubectlOptions(t), "exec", "deploy/"+multiport, "--", "touch", "/tmp/unhealthy-multiport") + logger.Log(t, "testing k8s -> consul health checks sync by making the multiport-admin unhealthy") + k8s.RunKubectl(t, ctx.KubectlOptions(t), "exec", "deploy/"+multiport, "--", "touch", "/tmp/unhealthy-multiport-admin") + + // The readiness probe should take a moment to be reflected in Consul, CheckStaticServerConnection will retry + // until Consul marks the service instance unavailable for mesh traffic, causing the connection to fail. + // We are expecting a "connection reset by peer" error because in a case of health checks, + // there will be no healthy proxy host to connect to. That's why we can't assert that we receive an empty reply + // from server, which is the case when a connection is unsuccessful due to intentions in other tests. + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:1234") + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, ctx.KubectlOptions(t), staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:2234") + }) } } diff --git a/acceptance/tests/fixtures/bases/multiport-app/anyuid-scc-rolebinding.yaml b/acceptance/tests/fixtures/bases/multiport-app/anyuid-scc-rolebinding.yaml new file mode 100644 index 0000000000..f80bd41d81 --- /dev/null +++ b/acceptance/tests/fixtures/bases/multiport-app/anyuid-scc-rolebinding.yaml @@ -0,0 +1,23 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: multiport-openshift-anyuid +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:openshift:scc:anyuid +subjects: + - kind: ServiceAccount + name: multiport +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: multiport-admin-openshift-anyuid +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:openshift:scc:anyuid +subjects: + - kind: ServiceAccount + name: multiport-admin diff --git a/acceptance/tests/fixtures/bases/multiport-app/deployment.yaml b/acceptance/tests/fixtures/bases/multiport-app/deployment.yaml new file mode 100644 index 0000000000..a99d415d80 --- /dev/null +++ b/acceptance/tests/fixtures/bases/multiport-app/deployment.yaml @@ -0,0 +1,78 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: multiport +spec: + replicas: 1 + selector: + matchLabels: + app: multiport + template: + metadata: + name: multiport + labels: + app: multiport + annotations: + "consul.hashicorp.com/connect-inject": "true" + 'consul.hashicorp.com/connect-service': 'multiport,multiport-admin' + 'consul.hashicorp.com/connect-service-upstreams': 'static-server:3234' + 'consul.hashicorp.com/connect-service-port': '8080,9090' + 'consul.hashicorp.com/transparent-proxy': 'false' + 'consul.hashicorp.com/enable-metrics': 'false' + 'consul.hashicorp.com/enable-metrics-merging': 'false' + spec: + containers: + - name: multiport + image: docker.mirror.hashicorp.services/hashicorp/http-echo:alpine + args: + - -text="hello world" + - -listen=:8080 + ports: + - containerPort: 8080 + name: http + livenessProbe: + httpGet: + port: 8080 + initialDelaySeconds: 1 + failureThreshold: 1 + periodSeconds: 1 + startupProbe: + httpGet: + port: 8080 + initialDelaySeconds: 1 + failureThreshold: 30 + periodSeconds: 1 + readinessProbe: + exec: + command: ['sh', '-c', 'test ! -f /tmp/unhealthy-multiport'] + initialDelaySeconds: 1 + failureThreshold: 1 + periodSeconds: 1 + - name: multiport-admin + image: docker.mirror.hashicorp.services/hashicorp/http-echo:alpine + args: + - -text="hello world from 9090 admin" + - -listen=:9090 + ports: + - containerPort: 9090 + name: http + livenessProbe: + httpGet: + port: 9090 + initialDelaySeconds: 1 + failureThreshold: 1 + periodSeconds: 1 + startupProbe: + httpGet: + port: 9090 + initialDelaySeconds: 1 + failureThreshold: 30 + periodSeconds: 1 + readinessProbe: + exec: + command: ['sh', '-c', 'test ! -f /tmp/unhealthy-multiport-admin'] + initialDelaySeconds: 1 + failureThreshold: 1 + periodSeconds: 1 + serviceAccountName: multiport + terminationGracePeriodSeconds: 0 # so deletion is quick diff --git a/acceptance/tests/fixtures/bases/multiport-app/kustomization.yaml b/acceptance/tests/fixtures/bases/multiport-app/kustomization.yaml new file mode 100644 index 0000000000..b9d8e11f73 --- /dev/null +++ b/acceptance/tests/fixtures/bases/multiport-app/kustomization.yaml @@ -0,0 +1,7 @@ +resources: + - deployment.yaml + - service.yaml + - serviceaccount.yaml + - psp-rolebinding.yaml + - anyuid-scc-rolebinding.yaml + - privileged-scc-rolebinding.yaml \ No newline at end of file diff --git a/acceptance/tests/fixtures/bases/multiport-app/privileged-scc-rolebinding.yaml b/acceptance/tests/fixtures/bases/multiport-app/privileged-scc-rolebinding.yaml new file mode 100644 index 0000000000..f909785b36 --- /dev/null +++ b/acceptance/tests/fixtures/bases/multiport-app/privileged-scc-rolebinding.yaml @@ -0,0 +1,23 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: multiport-openshift-privileged +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:openshift:scc:privileged +subjects: + - kind: ServiceAccount + name: multiport +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: multiport-admin-openshift-privileged +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:openshift:scc:privileged +subjects: + - kind: ServiceAccount + name: multiport-admin diff --git a/acceptance/tests/fixtures/bases/multiport-app/psp-rolebinding.yaml b/acceptance/tests/fixtures/bases/multiport-app/psp-rolebinding.yaml new file mode 100644 index 0000000000..fce63f0076 --- /dev/null +++ b/acceptance/tests/fixtures/bases/multiport-app/psp-rolebinding.yaml @@ -0,0 +1,23 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: multiport +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: test-psp +subjects: + - kind: ServiceAccount + name: multiport +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: multiport-admin +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: test-psp +subjects: + - kind: ServiceAccount + name: multiport-admin diff --git a/acceptance/tests/fixtures/bases/multiport-app/service.yaml b/acceptance/tests/fixtures/bases/multiport-app/service.yaml new file mode 100644 index 0000000000..d18da258a3 --- /dev/null +++ b/acceptance/tests/fixtures/bases/multiport-app/service.yaml @@ -0,0 +1,23 @@ +apiVersion: v1 +kind: Service +metadata: + name: multiport +spec: + selector: + app: multiport + ports: + - name: http + port: 80 + targetPort: 8080 +--- +apiVersion: v1 +kind: Service +metadata: + name: multiport-admin +spec: + selector: + app: multiport + ports: + - protocol: TCP + port: 80 + targetPort: 9090 diff --git a/acceptance/tests/fixtures/bases/multiport-app/serviceaccount.yaml b/acceptance/tests/fixtures/bases/multiport-app/serviceaccount.yaml new file mode 100644 index 0000000000..2293c2e173 --- /dev/null +++ b/acceptance/tests/fixtures/bases/multiport-app/serviceaccount.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: multiport +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: multiport-admin diff --git a/acceptance/tests/fixtures/cases/static-client-inject-multiport/kustomization.yaml b/acceptance/tests/fixtures/cases/static-client-inject-multiport/kustomization.yaml new file mode 100644 index 0000000000..9834f91903 --- /dev/null +++ b/acceptance/tests/fixtures/cases/static-client-inject-multiport/kustomization.yaml @@ -0,0 +1,5 @@ +resources: + - ../../bases/static-client + +patchesStrategicMerge: + - patch.yaml \ No newline at end of file diff --git a/acceptance/tests/fixtures/cases/static-client-inject-multiport/patch.yaml b/acceptance/tests/fixtures/cases/static-client-inject-multiport/patch.yaml new file mode 100644 index 0000000000..c38ce8e448 --- /dev/null +++ b/acceptance/tests/fixtures/cases/static-client-inject-multiport/patch.yaml @@ -0,0 +1,10 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: static-client +spec: + template: + metadata: + annotations: + "consul.hashicorp.com/connect-inject": "true" + "consul.hashicorp.com/connect-service-upstreams": "multiport:1234, multiport-admin:2234" \ No newline at end of file diff --git a/acceptance/tests/ingress-gateway/ingress_gateway_namespaces_test.go b/acceptance/tests/ingress-gateway/ingress_gateway_namespaces_test.go index f7ae5d13bc..f47f98d70f 100644 --- a/acceptance/tests/ingress-gateway/ingress_gateway_namespaces_test.go +++ b/acceptance/tests/ingress-gateway/ingress_gateway_namespaces_test.go @@ -128,7 +128,7 @@ func TestIngressGatewaySingleNamespace(t *testing.T) { // via the bounce pod. It should fail to connect with the // static-server pod because of intentions. logger.Log(t, "testing intentions prevent ingress") - k8s.CheckStaticServerConnectionFailing(t, nsK8SOptions, "-H", "Host: static-server.ingress.consul", ingressGatewayService) + k8s.CheckStaticServerConnectionFailing(t, nsK8SOptions, staticClientName, "-H", "Host: static-server.ingress.consul", ingressGatewayService) // Now we create the allow intention. logger.Log(t, "creating ingress-gateway => static-server intention") @@ -150,7 +150,7 @@ func TestIngressGatewaySingleNamespace(t *testing.T) { // Test that we can make a call to the ingress gateway // via the static-client pod. It should route to the static-server pod. logger.Log(t, "trying calls to ingress gateway") - k8s.CheckStaticServerConnectionSuccessful(t, nsK8SOptions, "-H", "Host: static-server.ingress.consul", ingressGatewayService) + k8s.CheckStaticServerConnectionSuccessful(t, nsK8SOptions, staticClientName, "-H", "Host: static-server.ingress.consul", ingressGatewayService) }) } } @@ -253,7 +253,7 @@ func TestIngressGatewayNamespaceMirroring(t *testing.T) { // via the bounce pod. It should fail to connect with the // static-server pod because of intentions. logger.Log(t, "testing intentions prevent ingress") - k8s.CheckStaticServerConnectionFailing(t, nsK8SOptions, "-H", "Host: static-server.ingress.consul", ingressGatewayService) + k8s.CheckStaticServerConnectionFailing(t, nsK8SOptions, staticClientName, "-H", "Host: static-server.ingress.consul", ingressGatewayService) // Now we create the allow intention. logger.Log(t, "creating ingress-gateway => static-server intention") @@ -275,7 +275,7 @@ func TestIngressGatewayNamespaceMirroring(t *testing.T) { // Test that we can make a call to the ingress gateway // via the static-client pod. It should route to the static-server pod. logger.Log(t, "trying calls to ingress gateway") - k8s.CheckStaticServerConnectionSuccessful(t, nsK8SOptions, "-H", "Host: static-server.ingress.consul", ingressGatewayService) + k8s.CheckStaticServerConnectionSuccessful(t, nsK8SOptions, staticClientName, "-H", "Host: static-server.ingress.consul", ingressGatewayService) }) } } diff --git a/acceptance/tests/ingress-gateway/ingress_gateway_test.go b/acceptance/tests/ingress-gateway/ingress_gateway_test.go index b727d4332a..359b917a73 100644 --- a/acceptance/tests/ingress-gateway/ingress_gateway_test.go +++ b/acceptance/tests/ingress-gateway/ingress_gateway_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/require" ) +const staticClientName = "static-client" + // Test that ingress gateways work in a default installation and a secure installation. func TestIngressGateway(t *testing.T) { cases := []struct { @@ -92,7 +94,7 @@ func TestIngressGateway(t *testing.T) { // via the bounce pod. It should fail to connect with the // static-server pod because of intentions. logger.Log(t, "testing intentions prevent ingress") - k8s.CheckStaticServerConnectionFailing(t, k8sOptions, "-H", "Host: static-server.ingress.consul", fmt.Sprintf("http://%s-consul-ingress-gateway:8080/", releaseName)) + k8s.CheckStaticServerConnectionFailing(t, k8sOptions, staticClientName, "-H", "Host: static-server.ingress.consul", fmt.Sprintf("http://%s-consul-ingress-gateway:8080/", releaseName)) // Now we create the allow intention. logger.Log(t, "creating ingress-gateway => static-server intention") @@ -112,7 +114,7 @@ func TestIngressGateway(t *testing.T) { // Test that we can make a call to the ingress gateway // via the static-client pod. It should route to the static-server pod. logger.Log(t, "trying calls to ingress gateway") - k8s.CheckStaticServerConnectionSuccessful(t, k8sOptions, "-H", "Host: static-server.ingress.consul", fmt.Sprintf("http://%s-consul-ingress-gateway:8080/", releaseName)) + k8s.CheckStaticServerConnectionSuccessful(t, k8sOptions, staticClientName, "-H", "Host: static-server.ingress.consul", fmt.Sprintf("http://%s-consul-ingress-gateway:8080/", releaseName)) }) } } diff --git a/acceptance/tests/mesh-gateway/mesh_gateway_test.go b/acceptance/tests/mesh-gateway/mesh_gateway_test.go index ed67252670..d6e4be90be 100644 --- a/acceptance/tests/mesh-gateway/mesh_gateway_test.go +++ b/acceptance/tests/mesh-gateway/mesh_gateway_test.go @@ -125,7 +125,7 @@ func TestMeshGatewayDefault(t *testing.T) { k8s.DeployKustomize(t, primaryContext.KubectlOptions(t), cfg.NoCleanupOnFailure, cfg.DebugDirectory, "../fixtures/cases/static-client-multi-dc") logger.Log(t, "checking that connection is successful") - k8s.CheckStaticServerConnectionSuccessful(t, primaryContext.KubectlOptions(t), "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, primaryContext.KubectlOptions(t), staticClientName, "http://localhost:1234") } // Test that Connect and wan federation over mesh gateways work in a secure installation, @@ -274,7 +274,7 @@ func TestMeshGatewaySecure(t *testing.T) { require.NoError(t, err) logger.Log(t, "checking that connection is successful") - k8s.CheckStaticServerConnectionSuccessful(t, primaryContext.KubectlOptions(t), "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, primaryContext.KubectlOptions(t), staticClientName, "http://localhost:1234") }) } } diff --git a/acceptance/tests/partitions/partitions_test.go b/acceptance/tests/partitions/partitions_test.go index a4a95e7078..d3c732f071 100644 --- a/acceptance/tests/partitions/partitions_test.go +++ b/acceptance/tests/partitions/partitions_test.go @@ -379,11 +379,11 @@ func TestPartitions(t *testing.T) { if c.ACLsAndAutoEncryptEnabled { logger.Log(t, "checking that the connection is not successful because there's no intention") if cfg.EnableTransparentProxy { - k8s.CheckStaticServerConnectionFailing(t, serverClusterStaticClientOpts, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) - k8s.CheckStaticServerConnectionFailing(t, clientClusterStaticClientOpts, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) + k8s.CheckStaticServerConnectionFailing(t, serverClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) + k8s.CheckStaticServerConnectionFailing(t, clientClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) } else { - k8s.CheckStaticServerConnectionFailing(t, serverClusterStaticClientOpts, "http://localhost:1234") - k8s.CheckStaticServerConnectionFailing(t, clientClusterStaticClientOpts, "http://localhost:1234") + k8s.CheckStaticServerConnectionFailing(t, serverClusterStaticClientOpts, staticClientName, "http://localhost:1234") + k8s.CheckStaticServerConnectionFailing(t, clientClusterStaticClientOpts, staticClientName, "http://localhost:1234") } intention := &api.ServiceIntentionsConfigEntry{ @@ -421,11 +421,11 @@ func TestPartitions(t *testing.T) { logger.Log(t, "checking that connection is successful") if cfg.EnableTransparentProxy { - k8s.CheckStaticServerConnectionSuccessful(t, serverClusterStaticClientOpts, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) - k8s.CheckStaticServerConnectionSuccessful(t, clientClusterStaticClientOpts, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) + k8s.CheckStaticServerConnectionSuccessful(t, serverClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) + k8s.CheckStaticServerConnectionSuccessful(t, clientClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) } else { - k8s.CheckStaticServerConnectionSuccessful(t, serverClusterStaticClientOpts, "http://localhost:1234") - k8s.CheckStaticServerConnectionSuccessful(t, clientClusterStaticClientOpts, "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, serverClusterStaticClientOpts, staticClientName, "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, clientClusterStaticClientOpts, staticClientName, "http://localhost:1234") } // Test that kubernetes readiness status is synced to Consul. @@ -441,11 +441,11 @@ func TestPartitions(t *testing.T) { // from server, which is the case when a connection is unsuccessful due to intentions in other tests. logger.Log(t, "checking that connection is unsuccessful") if cfg.EnableTransparentProxy { - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, serverClusterStaticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, clientClusterStaticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, fmt.Sprintf("http://static-server.%s", staticServerNamespace)) + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, serverClusterStaticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, "", fmt.Sprintf("http://static-server.%s", staticServerNamespace)) + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, clientClusterStaticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, "", fmt.Sprintf("http://static-server.%s", staticServerNamespace)) } else { - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, serverClusterStaticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "http://localhost:1234") - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, clientClusterStaticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "http://localhost:1234") + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, serverClusterStaticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:1234") + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, clientClusterStaticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:1234") } }) // This section of the tests runs the cross-partition networking tests. @@ -541,15 +541,15 @@ func TestPartitions(t *testing.T) { logger.Log(t, "checking that the connection is not successful because there's no intention") if cfg.EnableTransparentProxy { if !c.mirrorK8S { - k8s.CheckStaticServerConnectionFailing(t, serverClusterStaticClientOpts, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, secondaryPartition)) - k8s.CheckStaticServerConnectionFailing(t, clientClusterStaticClientOpts, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, defaultPartition)) + k8s.CheckStaticServerConnectionFailing(t, serverClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, secondaryPartition)) + k8s.CheckStaticServerConnectionFailing(t, clientClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, defaultPartition)) } else { - k8s.CheckStaticServerConnectionFailing(t, serverClusterStaticClientOpts, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, secondaryPartition)) - k8s.CheckStaticServerConnectionFailing(t, clientClusterStaticClientOpts, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, defaultPartition)) + k8s.CheckStaticServerConnectionFailing(t, serverClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, secondaryPartition)) + k8s.CheckStaticServerConnectionFailing(t, clientClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, defaultPartition)) } } else { - k8s.CheckStaticServerConnectionFailing(t, serverClusterStaticClientOpts, "http://localhost:1234") - k8s.CheckStaticServerConnectionFailing(t, clientClusterStaticClientOpts, "http://localhost:1234") + k8s.CheckStaticServerConnectionFailing(t, serverClusterStaticClientOpts, staticClientName, "http://localhost:1234") + k8s.CheckStaticServerConnectionFailing(t, clientClusterStaticClientOpts, staticClientName, "http://localhost:1234") } intention := &api.ServiceIntentionsConfigEntry{ @@ -590,15 +590,15 @@ func TestPartitions(t *testing.T) { logger.Log(t, "checking that connection is successful") if cfg.EnableTransparentProxy { if !c.mirrorK8S { - k8s.CheckStaticServerConnectionSuccessful(t, serverClusterStaticClientOpts, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, secondaryPartition)) - k8s.CheckStaticServerConnectionSuccessful(t, clientClusterStaticClientOpts, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, defaultPartition)) + k8s.CheckStaticServerConnectionSuccessful(t, serverClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, secondaryPartition)) + k8s.CheckStaticServerConnectionSuccessful(t, clientClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, defaultPartition)) } else { - k8s.CheckStaticServerConnectionSuccessful(t, serverClusterStaticClientOpts, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, secondaryPartition)) - k8s.CheckStaticServerConnectionSuccessful(t, clientClusterStaticClientOpts, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, defaultPartition)) + k8s.CheckStaticServerConnectionSuccessful(t, serverClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, secondaryPartition)) + k8s.CheckStaticServerConnectionSuccessful(t, clientClusterStaticClientOpts, staticClientName, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, defaultPartition)) } } else { - k8s.CheckStaticServerConnectionSuccessful(t, serverClusterStaticClientOpts, "http://localhost:1234") - k8s.CheckStaticServerConnectionSuccessful(t, clientClusterStaticClientOpts, "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, serverClusterStaticClientOpts, staticClientName, "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, clientClusterStaticClientOpts, staticClientName, "http://localhost:1234") } // Test that kubernetes readiness status is synced to Consul. @@ -615,15 +615,15 @@ func TestPartitions(t *testing.T) { logger.Log(t, "checking that connection is unsuccessful") if cfg.EnableTransparentProxy { if !c.mirrorK8S { - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, serverClusterStaticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, secondaryPartition)) - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, clientClusterStaticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, defaultPartition)) + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, serverClusterStaticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, "", fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, secondaryPartition)) + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, clientClusterStaticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, "", fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", c.destinationNamespace, defaultPartition)) } else { - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, serverClusterStaticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, secondaryPartition)) - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, clientClusterStaticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, defaultPartition)) + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, serverClusterStaticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, "", fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, secondaryPartition)) + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, clientClusterStaticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server", "curl: (7) Failed to connect to static-server.ns1 port 80: Connection refused"}, "", fmt.Sprintf("http://static-server.virtual.%s.ns.%s.ap.dc1.dc.consul", staticServerNamespace, defaultPartition)) } } else { - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, serverClusterStaticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "http://localhost:1234") - k8s.CheckStaticServerConnectionMultipleFailureMessages(t, clientClusterStaticClientOpts, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "http://localhost:1234") + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, serverClusterStaticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:1234") + k8s.CheckStaticServerConnectionMultipleFailureMessages(t, clientClusterStaticClientOpts, staticClientName, false, []string{"curl: (56) Recv failure: Connection reset by peer", "curl: (52) Empty reply from server"}, "", "http://localhost:1234") } }) }) diff --git a/acceptance/tests/terminating-gateway/terminating_gateway_namespaces_test.go b/acceptance/tests/terminating-gateway/terminating_gateway_namespaces_test.go index 061d85e662..76510b9a76 100644 --- a/acceptance/tests/terminating-gateway/terminating_gateway_namespaces_test.go +++ b/acceptance/tests/terminating-gateway/terminating_gateway_namespaces_test.go @@ -121,7 +121,7 @@ func TestTerminatingGatewaySingleNamespace(t *testing.T) { // Test that we can make a call to the terminating gateway. logger.Log(t, "trying calls to terminating gateway") - k8s.CheckStaticServerConnectionSuccessful(t, nsK8SOptions, "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, nsK8SOptions, staticClientName, "http://localhost:1234") }) } } @@ -229,7 +229,7 @@ func TestTerminatingGatewayNamespaceMirroring(t *testing.T) { // Test that we can make a call to the terminating gateway logger.Log(t, "trying calls to terminating gateway") - k8s.CheckStaticServerConnectionSuccessful(t, ns2K8SOptions, "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, ns2K8SOptions, staticClientName, "http://localhost:1234") }) } } diff --git a/acceptance/tests/terminating-gateway/terminating_gateway_test.go b/acceptance/tests/terminating-gateway/terminating_gateway_test.go index 690e231e2b..cb362d4445 100644 --- a/acceptance/tests/terminating-gateway/terminating_gateway_test.go +++ b/acceptance/tests/terminating-gateway/terminating_gateway_test.go @@ -93,7 +93,7 @@ func TestTerminatingGateway(t *testing.T) { // Test that we can make a call to the terminating gateway. logger.Log(t, "trying calls to terminating gateway") - k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://localhost:1234") }) } } @@ -191,7 +191,7 @@ func assertNoConnectionAndAddIntention(t *testing.T, consulClient *api.Client, k t.Helper() logger.Log(t, "testing intentions prevent connections through the terminating gateway") - k8s.CheckStaticServerConnectionFailing(t, k8sOptions, "http://localhost:1234") + k8s.CheckStaticServerConnectionFailing(t, k8sOptions, staticClientName, "http://localhost:1234") logger.Log(t, "creating static-client => static-server intention") _, _, err := consulClient.ConfigEntries().Set(&api.ServiceIntentionsConfigEntry{ diff --git a/acceptance/tests/vault/vault_test.go b/acceptance/tests/vault/vault_test.go index 8025741c0b..ed4410999b 100644 --- a/acceptance/tests/vault/vault_test.go +++ b/acceptance/tests/vault/vault_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/require" ) +const staticClientName = "static-client" + // TestVault installs Vault, bootstraps it with secrets, policies, and Kube Auth Method. // It then configures Consul to use vault as the backend and checks that it works. func TestVault(t *testing.T) { @@ -132,8 +134,8 @@ func TestVault(t *testing.T) { logger.Log(t, "checking that connection is successful") if cfg.EnableTransparentProxy { - k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), "http://static-server") + k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://static-server") } else { - k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, ctx.KubectlOptions(t), staticClientName, "http://localhost:1234") } } diff --git a/acceptance/tests/vault/vault_wan_fed_test.go b/acceptance/tests/vault/vault_wan_fed_test.go index 23428e3d5a..16d9163cfb 100644 --- a/acceptance/tests/vault/vault_wan_fed_test.go +++ b/acceptance/tests/vault/vault_wan_fed_test.go @@ -298,7 +298,7 @@ func TestVault_WANFederationViaGateways(t *testing.T) { require.NoError(t, err) logger.Log(t, "checking that connection is successful") - k8s.CheckStaticServerConnectionSuccessful(t, primaryCtx.KubectlOptions(t), "http://localhost:1234") + k8s.CheckStaticServerConnectionSuccessful(t, primaryCtx.KubectlOptions(t), staticClientName, "http://localhost:1234") } // vaultAddress returns Vault's server URL depending on test configuration. diff --git a/charts/consul/templates/connect-inject-clusterrole.yaml b/charts/consul/templates/connect-inject-clusterrole.yaml index 892ef8f406..9d01420363 100644 --- a/charts/consul/templates/connect-inject-clusterrole.yaml +++ b/charts/consul/templates/connect-inject-clusterrole.yaml @@ -49,5 +49,10 @@ rules: - {{ template "consul.fullname" . }}-connect-inject-acl-token verbs: - get +- apiGroups: [""] + resources: + - serviceaccounts + verbs: + - get {{- end }} {{- end }} diff --git a/control-plane/connect-inject/container_init.go b/control-plane/connect-inject/container_init.go index c258203c9a..526e473bfc 100644 --- a/control-plane/connect-inject/container_init.go +++ b/control-plane/connect-inject/container_init.go @@ -72,6 +72,18 @@ type initContainerCommandData struct { // ConsulDNSClusterIP is the IP of the Consul DNS Service. ConsulDNSClusterIP string + + // MultiPort determines whether this is a multi port Pod, which configures the init container to be specific to one + // of the services on the multi port Pod. + MultiPort bool + + // EnvoyAdminPort configures the admin port of the Envoy sidecar. This will be unique per service in a multi port + // Pod. + EnvoyAdminPort int + + // BearerTokenFile configures where the service account token can be found. This will be unique per service in a + // multi port Pod. + BearerTokenFile string } // initCopyContainer returns the init container spec for the copy container which places @@ -104,9 +116,9 @@ func (h *Handler) initCopyContainer() corev1.Container { return container } -// containerInit returns the init container spec for that polls for the service and the connect proxy service to be registered -// so that it can save theproxy service id to the shared volume and boostrap Envoy with the proxy-id -func (h *Handler) containerInit(namespace corev1.Namespace, pod corev1.Pod) (corev1.Container, error) { +// containerInit returns the init container spec for connect-init that polls for the service and the connect proxy service to be registered +// so that it can save the proxy service id to the shared volume and boostrap Envoy with the proxy-id. +func (h *Handler) containerInit(namespace corev1.Namespace, pod corev1.Pod, mpi multiPortInfo) (corev1.Container, error) { // Check if tproxy is enabled on this pod. tproxyEnabled, err := transparentProxyEnabled(namespace, pod, h.EnableTransparentProxy) if err != nil { @@ -129,6 +141,8 @@ func (h *Handler) containerInit(namespace corev1.Namespace, pod corev1.Pod) (cor } } + multiPort := mpi.serviceName != "" + data := initContainerCommandData{ AuthMethod: h.AuthMethod, ConsulPartition: h.ConsulPartition, @@ -142,12 +156,41 @@ func (h *Handler) containerInit(namespace corev1.Namespace, pod corev1.Pod) (cor TProxyExcludeUIDs: splitCommaSeparatedItemsFromAnnotation(annotationTProxyExcludeUIDs, pod), ConsulDNSClusterIP: consulDNSClusterIP, EnvoyUID: envoyUserAndGroupID, + MultiPort: multiPort, + EnvoyAdminPort: 19000 + mpi.serviceIndex, } - if data.AuthMethod != "" { - data.ServiceAccountName = pod.Spec.ServiceAccountName + // Create expected volume mounts + volMounts := []corev1.VolumeMount{ + { + Name: volumeName, + MountPath: "/consul/connect-inject", + }, + } + + if multiPort { + data.ServiceName = mpi.serviceName + } else { data.ServiceName = pod.Annotations[annotationService] } + if h.AuthMethod != "" { + if multiPort { + // If multi port then we require that the service account name + // matches the service name. + data.ServiceAccountName = mpi.serviceName + } else { + data.ServiceAccountName = pod.Spec.ServiceAccountName + } + // Extract the service account token's volume mount + saTokenVolumeMount, bearerTokenFile, err := findServiceAccountVolumeMount(pod, multiPort, mpi.serviceName) + if err != nil { + return corev1.Container{}, err + } + data.BearerTokenFile = bearerTokenFile + + // Append to volume mounts + volMounts = append(volMounts, saTokenVolumeMount) + } // This determines how to configure the consul connect envoy command: what // metrics backend to use and what path to expose on the @@ -166,25 +209,6 @@ func (h *Handler) containerInit(namespace corev1.Namespace, pod corev1.Pod) (cor data.PrometheusBackendPort = mergedMetricsPort } - // Create expected volume mounts - volMounts := []corev1.VolumeMount{ - { - Name: volumeName, - MountPath: "/consul/connect-inject", - }, - } - - if h.AuthMethod != "" { - // Extract the service account token's volume mount - saTokenVolumeMount, err := findServiceAccountVolumeMount(pod) - if err != nil { - return corev1.Container{}, err - } - - // Append to volume mounts - volMounts = append(volMounts, saTokenVolumeMount) - } - // Render the command var buf bytes.Buffer tpl := template.Must(template.New("root").Parse(strings.TrimSpace( @@ -194,8 +218,12 @@ func (h *Handler) containerInit(namespace corev1.Namespace, pod corev1.Pod) (cor return corev1.Container{}, err } + initContainerName := InjectInitContainerName + if multiPort { + initContainerName = fmt.Sprintf("%s-%s", InjectInitContainerName, mpi.serviceName) + } container := corev1.Container{ - Name: InjectInitContainerName, + Name: initContainerName, Image: h.ImageConsulK8S, Env: []corev1.EnvVar{ { @@ -327,6 +355,10 @@ consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD -acl-auth-method="{{ .AuthMethod }}" \ -service-account-name="{{ .ServiceAccountName }}" \ -service-name="{{ .ServiceName }}" \ + -bearer-token-file={{ .BearerTokenFile }} \ + {{- if .MultiPort }} + -acl-token-sink=/consul/connect-inject/acl-token-{{ .ServiceName }} \ + {{- end }} {{- if .ConsulNamespace }} {{- if .NamespaceMirroringEnabled }} {{- /* If namespace mirroring is enabled, the auth method is @@ -337,6 +369,13 @@ consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD {{- end }} {{- end }} {{- end }} + {{- if .MultiPort }} + -multiport=true \ + -proxy-id-file=/consul/connect-inject/proxyid-{{ .ServiceName }} \ + {{- if not .AuthMethod }} + -service-name="{{ .ServiceName }}" \ + {{- end }} + {{- end }} {{- if .ConsulPartition }} -partition="{{ .ConsulPartition }}" \ {{- end }} @@ -346,7 +385,11 @@ consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD # Generate the envoy bootstrap code /consul/connect-inject/consul connect envoy \ + {{- if .MultiPort }} + -proxy-id="$(cat /consul/connect-inject/proxyid-{{.ServiceName}})" \ + {{- else }} -proxy-id="$(cat /consul/connect-inject/proxyid)" \ + {{- end }} {{- if .PrometheusScrapePath }} -prometheus-scrape-path="{{ .PrometheusScrapePath }}" \ {{- end }} @@ -354,15 +397,23 @@ consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD -prometheus-backend-port="{{ .PrometheusBackendPort }}" \ {{- end }} {{- if .AuthMethod }} + {{- if .MultiPort }} + -token-file="/consul/connect-inject/acl-token-{{ .ServiceName }}" \ + {{- else }} -token-file="/consul/connect-inject/acl-token" \ {{- end }} + {{- end }} {{- if .ConsulPartition }} -partition="{{ .ConsulPartition }}" \ {{- end }} {{- if .ConsulNamespace }} -namespace="{{ .ConsulNamespace }}" \ {{- end }} - -bootstrap > /consul/connect-inject/envoy-bootstrap.yaml + {{- if .MultiPort }} + -admin-bind=127.0.0.1:{{ .EnvoyAdminPort }} \ + {{- end }} + -bootstrap > {{ if .MultiPort }}/consul/connect-inject/envoy-bootstrap-{{.ServiceName}}.yaml{{ else }}/consul/connect-inject/envoy-bootstrap.yaml{{ end }} + {{- if .EnableTransparentProxy }} {{- /* The newline below is intentional to allow extra space diff --git a/control-plane/connect-inject/container_init_test.go b/control-plane/connect-inject/container_init_test.go index 58cfe95d73..f536ba68f0 100644 --- a/control-plane/connect-inject/container_init_test.go +++ b/control-plane/connect-inject/container_init_test.go @@ -131,7 +131,7 @@ consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD h := tt.Handler pod := *tt.Pod(minimal()) - container, err := h.containerInit(testNS, pod) + container, err := h.containerInit(testNS, pod, multiPortInfo{}) require.NoError(err) actual := strings.Join(container.Command, " ") require.Contains(actual, tt.Cmd) @@ -296,7 +296,7 @@ func TestHandlerContainerInit_transparentProxy(t *testing.T) { } ns := testNS ns.Labels = c.namespaceLabel - container, err := h.containerInit(ns, *pod) + container, err := h.containerInit(ns, *pod, multiPortInfo{}) require.NoError(t, err) actualCmd := strings.Join(container.Command, " ") @@ -384,7 +384,7 @@ func TestHandlerContainerInit_consulDNS(t *testing.T) { ns := testNS ns.Labels = c.namespaceLabel - container, err := h.containerInit(ns, *pod) + container, err := h.containerInit(ns, *pod, multiPortInfo{}) require.NoError(t, err) actualCmd := strings.Join(container.Command, " ") @@ -573,6 +573,7 @@ consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD -acl-auth-method="auth-method" \ -service-account-name="web" \ -service-name="" \ + -bearer-token-file=/var/run/secrets/kubernetes.io/serviceaccount/token \ -auth-method-namespace="non-default" \ -partition="default" \ -consul-service-namespace="non-default" \ @@ -605,6 +606,7 @@ consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD -acl-auth-method="auth-method" \ -service-account-name="web" \ -service-name="" \ + -bearer-token-file=/var/run/secrets/kubernetes.io/serviceaccount/token \ -auth-method-namespace="default" \ -partition="non-default" \ -consul-service-namespace="k8snamespace" \ @@ -702,6 +704,7 @@ consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD -acl-auth-method="auth-method" \ -service-account-name="web" \ -service-name="web" \ + -bearer-token-file=/var/run/secrets/kubernetes.io/serviceaccount/token \ -auth-method-namespace="default" \ -partition="non-default" \ -consul-service-namespace="k8snamespace" \ @@ -729,7 +732,7 @@ consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD require := require.New(t) h := tt.Handler - container, err := h.containerInit(testNS, *tt.Pod(minimal())) + container, err := h.containerInit(testNS, *tt.Pod(minimal()), multiPortInfo{}) require.NoError(err) actual := strings.Join(container.Command, " ") require.Equal(tt.Cmd, actual) @@ -737,6 +740,178 @@ consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD } } +func TestHandlerContainerInit_Multiport(t *testing.T) { + minimal := func() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + annotationService: "web,web-admin", + }, + }, + + Spec: corev1.PodSpec{ + Volumes: []corev1.Volume{ + { + Name: "web-admin-service-account", + }, + }, + Containers: []corev1.Container{ + { + Name: "web", + }, + { + Name: "web-side", + }, + { + Name: "web-admin", + }, + { + Name: "web-admin-side", + }, + { + Name: "auth-method-secret", + VolumeMounts: []corev1.VolumeMount{ + { + Name: "service-account-secret", + MountPath: "/var/run/secrets/kubernetes.io/serviceaccount", + }, + }, + }, + }, + ServiceAccountName: "web", + }, + } + } + + cases := []struct { + Name string + Pod func(*corev1.Pod) *corev1.Pod + Handler Handler + NumInitContainers int + MultiPortInfos []multiPortInfo + Cmd []string // Strings.Contains test + }{ + { + "Whole template, multiport", + func(pod *corev1.Pod) *corev1.Pod { + return pod + }, + Handler{}, + 2, + []multiPortInfo{ + { + serviceIndex: 0, + serviceName: "web", + }, + { + serviceIndex: 1, + serviceName: "web-admin", + }, + }, + []string{`/bin/sh -ec +export CONSUL_HTTP_ADDR="${HOST_IP}:8500" +export CONSUL_GRPC_ADDR="${HOST_IP}:8502" +consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ + -multiport=true \ + -proxy-id-file=/consul/connect-inject/proxyid-web \ + -service-name="web" \ + +# Generate the envoy bootstrap code +/consul/connect-inject/consul connect envoy \ + -proxy-id="$(cat /consul/connect-inject/proxyid-web)" \ + -admin-bind=127.0.0.1:19000 \ + -bootstrap > /consul/connect-inject/envoy-bootstrap-web.yaml`, + + `/bin/sh -ec +export CONSUL_HTTP_ADDR="${HOST_IP}:8500" +export CONSUL_GRPC_ADDR="${HOST_IP}:8502" +consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ + -multiport=true \ + -proxy-id-file=/consul/connect-inject/proxyid-web-admin \ + -service-name="web-admin" \ + +# Generate the envoy bootstrap code +/consul/connect-inject/consul connect envoy \ + -proxy-id="$(cat /consul/connect-inject/proxyid-web-admin)" \ + -admin-bind=127.0.0.1:19001 \ + -bootstrap > /consul/connect-inject/envoy-bootstrap-web-admin.yaml`, + }, + }, + { + "Whole template, multiport, auth method", + func(pod *corev1.Pod) *corev1.Pod { + return pod + }, + Handler{ + AuthMethod: "auth-method", + }, + 2, + []multiPortInfo{ + { + serviceIndex: 0, + serviceName: "web", + }, + { + serviceIndex: 1, + serviceName: "web-admin", + }, + }, + []string{`/bin/sh -ec +export CONSUL_HTTP_ADDR="${HOST_IP}:8500" +export CONSUL_GRPC_ADDR="${HOST_IP}:8502" +consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ + -acl-auth-method="auth-method" \ + -service-account-name="web" \ + -service-name="web" \ + -bearer-token-file=/var/run/secrets/kubernetes.io/serviceaccount/token \ + -acl-token-sink=/consul/connect-inject/acl-token-web \ + -multiport=true \ + -proxy-id-file=/consul/connect-inject/proxyid-web \ + +# Generate the envoy bootstrap code +/consul/connect-inject/consul connect envoy \ + -proxy-id="$(cat /consul/connect-inject/proxyid-web)" \ + -token-file="/consul/connect-inject/acl-token-web" \ + -admin-bind=127.0.0.1:19000 \ + -bootstrap > /consul/connect-inject/envoy-bootstrap-web.yaml`, + + `/bin/sh -ec +export CONSUL_HTTP_ADDR="${HOST_IP}:8500" +export CONSUL_GRPC_ADDR="${HOST_IP}:8502" +consul-k8s-control-plane connect-init -pod-name=${POD_NAME} -pod-namespace=${POD_NAMESPACE} \ + -acl-auth-method="auth-method" \ + -service-account-name="web-admin" \ + -service-name="web-admin" \ + -bearer-token-file=/consul/serviceaccount-web-admin/token \ + -acl-token-sink=/consul/connect-inject/acl-token-web-admin \ + -multiport=true \ + -proxy-id-file=/consul/connect-inject/proxyid-web-admin \ + +# Generate the envoy bootstrap code +/consul/connect-inject/consul connect envoy \ + -proxy-id="$(cat /consul/connect-inject/proxyid-web-admin)" \ + -token-file="/consul/connect-inject/acl-token-web-admin" \ + -admin-bind=127.0.0.1:19001 \ + -bootstrap > /consul/connect-inject/envoy-bootstrap-web-admin.yaml`, + }, + }, + } + + for _, tt := range cases { + t.Run(tt.Name, func(t *testing.T) { + require := require.New(t) + + h := tt.Handler + for i := 0; i < tt.NumInitContainers; i++ { + container, err := h.containerInit(testNS, *tt.Pod(minimal()), tt.MultiPortInfos[i]) + require.NoError(err) + actual := strings.Join(container.Command, " ") + require.Equal(tt.Cmd[i], actual) + } + }) + } +} + func TestHandlerContainerInit_authMethod(t *testing.T) { require := require.New(t) h := Handler{ @@ -765,7 +940,7 @@ func TestHandlerContainerInit_authMethod(t *testing.T) { ServiceAccountName: "foo", }, } - container, err := h.containerInit(testNS, *pod) + container, err := h.containerInit(testNS, *pod, multiPortInfo{}) require.NoError(err) actual := strings.Join(container.Command, " ") require.Contains(actual, ` @@ -802,7 +977,7 @@ func TestHandlerContainerInit_WithTLS(t *testing.T) { }, }, } - container, err := h.containerInit(testNS, *pod) + container, err := h.containerInit(testNS, *pod, multiPortInfo{}) require.NoError(err) actual := strings.Join(container.Command, " ") require.Contains(actual, ` @@ -846,7 +1021,7 @@ func TestHandlerContainerInit_Resources(t *testing.T) { }, }, } - container, err := h.containerInit(testNS, *pod) + container, err := h.containerInit(testNS, *pod, multiPortInfo{}) require.NoError(err) require.Equal(corev1.ResourceRequirements{ Limits: corev1.ResourceList{ diff --git a/control-plane/connect-inject/endpoints_controller.go b/control-plane/connect-inject/endpoints_controller.go index 7dcca14091..8213fa2278 100644 --- a/control-plane/connect-inject/endpoints_controller.go +++ b/control-plane/connect-inject/endpoints_controller.go @@ -367,9 +367,14 @@ func (r *EndpointsController) upsertHealthCheck(pod corev1.Pod, client *api.Clie return nil } +// getServiceName computes the service name to register with Consul from the pod and endpoints object. In a single port +// service, it defaults to the endpoints name, but can be overridden by a pod annotation. In a multi port service, the +// endpoints name is always used since the pod annotation will have multiple service names listed (one per port). +// Changing the Consul service name via annotations is not supported for multi port services. func getServiceName(pod corev1.Pod, serviceEndpoints corev1.Endpoints) string { serviceName := serviceEndpoints.Name - if serviceNameFromAnnotation, ok := pod.Annotations[annotationService]; ok && serviceNameFromAnnotation != "" { + // If the annotation has a comma, it is a multi port Pod. In that case we always use the name of the endpoint. + if serviceNameFromAnnotation, ok := pod.Annotations[annotationService]; ok && serviceNameFromAnnotation != "" && !strings.Contains(serviceNameFromAnnotation, ",") { serviceName = serviceNameFromAnnotation } return serviceName @@ -397,6 +402,11 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service // The handler will always set the port annotation if one is not provided on the pod. var consulServicePort int if raw, ok := pod.Annotations[annotationPort]; ok && raw != "" { + if multiPort := strings.Split(raw, ","); len(multiPort) > 1 { + // Figure out which index of the ports annotation to use by + // finding the index of the service names annotation. + raw = multiPort[getMultiPortIdx(pod, serviceEndpoints)] + } if port, err := portValue(pod, raw); port > 0 { if err != nil { return nil, nil, err @@ -471,17 +481,21 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service proxyConfig.LocalServicePort = consulServicePort } - upstreams, err := r.processUpstreams(pod) + upstreams, err := r.processUpstreams(pod, serviceEndpoints) if err != nil { return nil, nil, err } proxyConfig.Upstreams = upstreams + proxyPort := 20000 + if idx := getMultiPortIdx(pod, serviceEndpoints); idx >= 0 { + proxyPort += idx + } proxyService := &api.AgentServiceRegistration{ Kind: api.ServiceKindConnectProxy, ID: proxyServiceID, Name: proxyServiceName, - Port: 20000, + Port: proxyPort, Address: pod.Status.PodIP, Meta: meta, Namespace: r.consulNamespace(pod.Namespace), @@ -489,7 +503,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service Checks: api.AgentServiceChecks{ { Name: "Proxy Public Listener", - TCP: fmt.Sprintf("%s:20000", pod.Status.PodIP), + TCP: fmt.Sprintf("%s:%d", pod.Status.PodIP, proxyPort), Interval: "10s", DeregisterCriticalServiceAfter: "10m", }, @@ -816,7 +830,14 @@ func serviceInstancesForK8SServiceNameAndNamespace(k8sServiceName, k8sServiceNam // processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream // objects. -func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, error) { +func (r *EndpointsController) processUpstreams(pod corev1.Pod, endpoints corev1.Endpoints) ([]api.Upstream, error) { + // In a multiport pod, only the first service's proxy should have upstreams configured. This skips configuring + // upstreams on additional services on the pod. + mpIdx := getMultiPortIdx(pod, endpoints) + if mpIdx > 0 { + return []api.Upstream{}, nil + } + var upstreams []api.Upstream if raw, ok := pod.Annotations[annotationUpstreams]; ok && raw != "" { for _, raw := range strings.Split(raw, ",") { @@ -1072,3 +1093,12 @@ func consulTags(pod corev1.Pod) []string { return interpolatedTags } + +func getMultiPortIdx(pod corev1.Pod, serviceEndpoints corev1.Endpoints) int { + for i, name := range strings.Split(pod.Annotations[annotationService], ",") { + if name == getServiceName(pod, serviceEndpoints) { + return i + } + } + return -1 +} diff --git a/control-plane/connect-inject/endpoints_controller_test.go b/control-plane/connect-inject/endpoints_controller_test.go index 98351c3f39..9b25c149b0 100644 --- a/control-plane/connect-inject/endpoints_controller_test.go +++ b/control-plane/connect-inject/endpoints_controller_test.go @@ -171,7 +171,14 @@ func TestProcessUpstreamsTLSandACLs(t *testing.T) { pod := createPod("pod1", "1.2.3.4", true, true) pod.Annotations[annotationUpstreams] = "upstream1:1234:dc1" - upstreams, err := ep.processUpstreams(*pod) + upstreams, err := ep.processUpstreams(*pod, corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svcname", + Namespace: "default", + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + }) require.NoError(t, err) expected := []api.Upstream{ @@ -527,7 +534,14 @@ func TestProcessUpstreams(t *testing.T) { EnableConsulPartitions: tt.consulPartitionsEnabled, } - upstreams, err := ep.processUpstreams(*tt.pod()) + upstreams, err := ep.processUpstreams(*tt.pod(), corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svcname", + Namespace: "default", + Labels: map[string]string{}, + Annotations: map[string]string{}, + }, + }) if tt.expErr != "" { require.EqualError(t, err, tt.expErr) } else { @@ -538,6 +552,376 @@ func TestProcessUpstreams(t *testing.T) { } } +func TestGetServiceName(t *testing.T) { + t.Parallel() + cases := []struct { + name string + pod func() *corev1.Pod + endpoint *corev1.Endpoints + expSvcName string + }{ + { + name: "single port, with annotation", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationService] = "web" + return pod1 + }, + endpoint: &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "not-web", + Namespace: "default", + }, + }, + expSvcName: "web", + }, + { + name: "single port, without annotation", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + return pod1 + }, + endpoint: &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ep-name", + Namespace: "default", + }, + }, + expSvcName: "ep-name", + }, + { + name: "multi port, with annotation", + pod: func() *corev1.Pod { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationService] = "web,web-admin" + return pod1 + }, + endpoint: &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ep-name-multiport", + Namespace: "default", + }, + }, + expSvcName: "ep-name-multiport", + }, + } + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + + svcName := getServiceName(*tt.pod(), *tt.endpoint) + require.Equal(t, tt.expSvcName, svcName) + + }) + } +} + +func TestReconcileCreateEndpoint_MultiportService(t *testing.T) { + t.Parallel() + nodeName := "test-node" + cases := []struct { + name string + consulSvcName string + k8sObjects func() []runtime.Object + initialConsulSvcs []*api.AgentServiceRegistration + expectedNumSvcInstances int + expectedConsulSvcInstancesMap map[string][]*api.CatalogService + expectedProxySvcInstancesMap map[string][]*api.CatalogService + expectedAgentHealthChecks []*api.AgentCheck + }{ + { + name: "Multiport service", + consulSvcName: "web,web-admin", + k8sObjects: func() []runtime.Object { + pod1 := createPod("pod1", "1.2.3.4", true, true) + pod1.Annotations[annotationPort] = "8080,9090" + pod1.Annotations[annotationService] = "web,web-admin" + pod1.Annotations[annotationUpstreams] = "upstream1:1234" + endpoint1 := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "web", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: "default", + }, + }, + }, + }, + }, + } + endpoint2 := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "web-admin", + Namespace: "default", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{ + { + IP: "1.2.3.4", + NodeName: &nodeName, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: "pod1", + Namespace: "default", + }, + }, + }, + }, + }, + } + return []runtime.Object{pod1, endpoint1, endpoint2} + }, + initialConsulSvcs: []*api.AgentServiceRegistration{}, + expectedNumSvcInstances: 1, + expectedConsulSvcInstancesMap: map[string][]*api.CatalogService{ + "web": { + { + ServiceID: "pod1-web", + ServiceName: "web", + ServiceAddress: "1.2.3.4", + ServicePort: 8080, + ServiceMeta: map[string]string{ + MetaKeyPodName: "pod1", + MetaKeyKubeServiceName: "web", + MetaKeyKubeNS: "default", + MetaKeyManagedBy: managedByValue, + }, + ServiceTags: []string{}, + }, + }, + "web-admin": { + { + ServiceID: "pod1-web-admin", + ServiceName: "web-admin", + ServiceAddress: "1.2.3.4", + ServicePort: 9090, + ServiceMeta: map[string]string{ + MetaKeyPodName: "pod1", + MetaKeyKubeServiceName: "web-admin", + MetaKeyKubeNS: "default", + MetaKeyManagedBy: managedByValue, + }, + ServiceTags: []string{}, + }, + }, + }, + expectedProxySvcInstancesMap: map[string][]*api.CatalogService{ + "web": { + { + ServiceID: "pod1-web-sidecar-proxy", + ServiceName: "web-sidecar-proxy", + ServiceAddress: "1.2.3.4", + ServicePort: 20000, + ServiceProxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "web", + DestinationServiceID: "pod1-web", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 8080, + Upstreams: []api.Upstream{ + { + DestinationType: api.UpstreamDestTypeService, + DestinationName: "upstream1", + LocalBindPort: 1234, + }, + }, + }, + ServiceMeta: map[string]string{ + MetaKeyPodName: "pod1", + MetaKeyKubeServiceName: "web", + MetaKeyKubeNS: "default", + MetaKeyManagedBy: managedByValue, + }, + ServiceTags: []string{}, + }, + }, + "web-admin": { + { + ServiceID: "pod1-web-admin-sidecar-proxy", + ServiceName: "web-admin-sidecar-proxy", + ServiceAddress: "1.2.3.4", + ServicePort: 20001, + ServiceProxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "web-admin", + DestinationServiceID: "pod1-web-admin", + LocalServiceAddress: "127.0.0.1", + LocalServicePort: 9090, + }, + ServiceMeta: map[string]string{ + MetaKeyPodName: "pod1", + MetaKeyKubeServiceName: "web-admin", + MetaKeyKubeNS: "default", + MetaKeyManagedBy: managedByValue, + }, + ServiceTags: []string{}, + }, + }, + }, + expectedAgentHealthChecks: []*api.AgentCheck{ + { + CheckID: "default/pod1-web/kubernetes-health-check", + ServiceName: "web", + ServiceID: "pod1-web", + Name: "Kubernetes Health Check", + Status: api.HealthPassing, + Output: kubernetesSuccessReasonMsg, + Type: ttl, + }, + { + CheckID: "default/pod1-web-admin/kubernetes-health-check", + ServiceName: "web-admin", + ServiceID: "pod1-web-admin", + Name: "Kubernetes Health Check", + Status: api.HealthPassing, + Output: kubernetesSuccessReasonMsg, + Type: ttl, + }, + }, + }, + } + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + // The agent pod needs to have the address 127.0.0.1 so when the + // code gets the agent pods via the label component=client, and + // makes requests against the agent API, it will actually hit the + // test server we have on localhost. + fakeClientPod := createPod("fake-consul-client", "127.0.0.1", false, true) + fakeClientPod.Labels = map[string]string{"component": "client", "app": "consul", "release": "consul"} + + // Add the default namespace. + ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} + // Create fake k8s client + k8sObjects := append(tt.k8sObjects(), fakeClientPod, &ns) + + fakeClient := fake.NewClientBuilder().WithRuntimeObjects(k8sObjects...).Build() + + // Create test consul server + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.NodeName = nodeName + }) + require.NoError(t, err) + defer consul.Stop() + consul.WaitForServiceIntentions(t) + + cfg := &api.Config{ + Address: consul.HTTPAddr, + } + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + addr := strings.Split(consul.HTTPAddr, ":") + consulPort := addr[1] + + // Register service and proxy in consul. + for _, svc := range tt.initialConsulSvcs { + err = consulClient.Agent().ServiceRegister(svc) + require.NoError(t, err) + } + + // Create the endpoints controller + ep := &EndpointsController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + ConsulPort: consulPort, + ConsulScheme: "http", + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSetWith(), + ReleaseName: "consul", + ReleaseNamespace: "default", + ConsulClientCfg: cfg, + } + namespacedName := types.NamespacedName{ + Namespace: "default", + Name: "web", + } + namespacedName2 := types.NamespacedName{ + Namespace: "default", + Name: "web-admin", + } + + resp, err := ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + require.NoError(t, err) + require.False(t, resp.Requeue) + resp, err = ep.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName2, + }) + require.NoError(t, err) + require.False(t, resp.Requeue) + + // After reconciliation, Consul should have the service with the correct number of instances + svcs := strings.Split(tt.consulSvcName, ",") + for _, service := range svcs { + serviceInstances, _, err := consulClient.Catalog().Service(service, "", nil) + require.NoError(t, err) + require.Len(t, serviceInstances, tt.expectedNumSvcInstances) + for i, instance := range serviceInstances { + require.Equal(t, tt.expectedConsulSvcInstancesMap[service][i].ServiceID, instance.ServiceID) + require.Equal(t, tt.expectedConsulSvcInstancesMap[service][i].ServiceName, instance.ServiceName) + require.Equal(t, tt.expectedConsulSvcInstancesMap[service][i].ServiceAddress, instance.ServiceAddress) + require.Equal(t, tt.expectedConsulSvcInstancesMap[service][i].ServicePort, instance.ServicePort) + require.Equal(t, tt.expectedConsulSvcInstancesMap[service][i].ServiceMeta, instance.ServiceMeta) + require.Equal(t, tt.expectedConsulSvcInstancesMap[service][i].ServiceTags, instance.ServiceTags) + } + proxyServiceInstances, _, err := consulClient.Catalog().Service(fmt.Sprintf("%s-sidecar-proxy", service), "", nil) + require.NoError(t, err) + require.Len(t, proxyServiceInstances, tt.expectedNumSvcInstances) + for i, instance := range proxyServiceInstances { + require.Equal(t, tt.expectedProxySvcInstancesMap[service][i].ServiceID, instance.ServiceID) + require.Equal(t, tt.expectedProxySvcInstancesMap[service][i].ServiceName, instance.ServiceName) + require.Equal(t, tt.expectedProxySvcInstancesMap[service][i].ServiceAddress, instance.ServiceAddress) + require.Equal(t, tt.expectedProxySvcInstancesMap[service][i].ServicePort, instance.ServicePort) + require.Equal(t, tt.expectedProxySvcInstancesMap[service][i].ServiceMeta, instance.ServiceMeta) + require.Equal(t, tt.expectedProxySvcInstancesMap[service][i].ServiceTags, instance.ServiceTags) + + // When comparing the ServiceProxy field we ignore the DestinationNamespace + // field within that struct because on Consul OSS it's set to "" but on Consul Enterprise + // it's set to "default" and we want to re-use this test for both OSS and Ent. + // This does mean that we don't test that field but that's okay because + // it's not getting set specifically in this test. + // To do the comparison that ignores that field we use go-cmp instead + // of the regular require.Equal call since it supports ignoring certain + // fields. + diff := cmp.Diff(tt.expectedProxySvcInstancesMap[service][i].ServiceProxy, instance.ServiceProxy, + cmpopts.IgnoreFields(api.Upstream{}, "DestinationNamespace", "DestinationPartition")) + require.Empty(t, diff, "expected objects to be equal") + } + _, checkInfos, err := consulClient.Agent().AgentHealthServiceByName(fmt.Sprintf("%s-sidecar-proxy", service)) + expectedChecks := []string{"Proxy Public Listener", "Destination Alias"} + require.NoError(t, err) + require.Len(t, checkInfos, tt.expectedNumSvcInstances) + for _, checkInfo := range checkInfos { + checks := checkInfo.Checks + require.Contains(t, expectedChecks, checks[0].Name) + require.Contains(t, expectedChecks, checks[1].Name) + } + } + + // Check that the Consul health check was created for the k8s pod. + if tt.expectedAgentHealthChecks != nil { + for i := range tt.expectedAgentHealthChecks { + filter := fmt.Sprintf("CheckID == `%s`", tt.expectedAgentHealthChecks[i].CheckID) + check, err := consulClient.Agent().ChecksWithFilter(filter) + require.NoError(t, err) + require.EqualValues(t, len(check), 1) + // Ignoring Namespace because the response from ENT includes it and OSS does not. + var ignoredFields = []string{"Node", "Definition", "Namespace", "Partition"} + require.True(t, cmp.Equal(check[tt.expectedAgentHealthChecks[i].CheckID], tt.expectedAgentHealthChecks[i], cmpopts.IgnoreFields(api.AgentCheck{}, ignoredFields...))) + } + } + }) + } +} + // TestReconcileCreateEndpoint tests the logic to create service instances in Consul from the addresses in the Endpoints // object. The cases test an empty endpoints object, a basic endpoints object with one address, a basic endpoints object // with two addresses, and an endpoints object with every possible customization. diff --git a/control-plane/connect-inject/envoy_sidecar.go b/control-plane/connect-inject/envoy_sidecar.go index b521297c54..02d3869556 100644 --- a/control-plane/connect-inject/envoy_sidecar.go +++ b/control-plane/connect-inject/envoy_sidecar.go @@ -10,19 +10,25 @@ import ( "k8s.io/apimachinery/pkg/api/resource" ) -func (h *Handler) envoySidecar(namespace corev1.Namespace, pod corev1.Pod) (corev1.Container, error) { +func (h *Handler) envoySidecar(namespace corev1.Namespace, pod corev1.Pod, mpi multiPortInfo) (corev1.Container, error) { resources, err := h.envoySidecarResources(pod) if err != nil { return corev1.Container{}, err } - cmd, err := h.getContainerSidecarCommand(pod) + multiPort := mpi.serviceName != "" + cmd, err := h.getContainerSidecarCommand(pod, mpi.serviceName, mpi.serviceIndex) if err != nil { return corev1.Container{}, err } + containerName := envoySidecarContainer + if multiPort { + containerName = fmt.Sprintf("%s-%s", envoySidecarContainer, mpi.serviceName) + } + container := corev1.Container{ - Name: envoySidecarContainer, + Name: containerName, Image: h.ImageEnvoy, Env: []corev1.EnvVar{ { @@ -62,7 +68,7 @@ func (h *Handler) envoySidecar(namespace corev1.Namespace, pod corev1.Pod) (core // has only injected init containers so all containers defined in pod.Spec.Containers are from the user. for _, c := range pod.Spec.Containers { // User container and Envoy container cannot have the same UID. - if c.SecurityContext != nil && c.SecurityContext.RunAsUser != nil && *c.SecurityContext.RunAsUser == envoyUserAndGroupID { + if c.SecurityContext != nil && c.SecurityContext.RunAsUser != nil && *c.SecurityContext.RunAsUser == envoyUserAndGroupID && c.Image != h.ImageEnvoy { return corev1.Container{}, fmt.Errorf("container %q has runAsUser set to the same uid %q as envoy which is not allowed", c.Name, envoyUserAndGroupID) } } @@ -76,10 +82,18 @@ func (h *Handler) envoySidecar(namespace corev1.Namespace, pod corev1.Pod) (core return container, nil } -func (h *Handler) getContainerSidecarCommand(pod corev1.Pod) ([]string, error) { +func (h *Handler) getContainerSidecarCommand(pod corev1.Pod, multiPortSvcName string, multiPortSvcIdx int) ([]string, error) { + bootstrapFile := "/consul/connect-inject/envoy-bootstrap.yaml" + if multiPortSvcName != "" { + bootstrapFile = fmt.Sprintf("/consul/connect-inject/envoy-bootstrap-%s.yaml", multiPortSvcName) + } cmd := []string{ "envoy", - "--config-path", "/consul/connect-inject/envoy-bootstrap.yaml", + "--config-path", bootstrapFile, + } + if multiPortSvcName != "" { + // --base-id is needed so multiple Envoy proxies can run on the same host. + cmd = append(cmd, "--base-id", fmt.Sprintf("%d", multiPortSvcIdx)) } extraArgs, annotationSet := pod.Annotations[annotationEnvoyExtraArgs] diff --git a/control-plane/connect-inject/envoy_sidecar_test.go b/control-plane/connect-inject/envoy_sidecar_test.go index 269581c4b8..56af91ab3e 100644 --- a/control-plane/connect-inject/envoy_sidecar_test.go +++ b/control-plane/connect-inject/envoy_sidecar_test.go @@ -28,7 +28,7 @@ func TestHandlerEnvoySidecar(t *testing.T) { }, }, } - container, err := h.envoySidecar(testNS, pod) + container, err := h.envoySidecar(testNS, pod, multiPortInfo{}) require.NoError(err) require.Equal(container.Command, []string{ "envoy", @@ -43,6 +43,55 @@ func TestHandlerEnvoySidecar(t *testing.T) { }) } +func TestHandlerEnvoySidecar_Multiport(t *testing.T) { + require := require.New(t) + h := Handler{} + pod := corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + annotationService: "web,web-admin", + }, + }, + + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "web", + }, + { + Name: "web-admin", + }, + }, + }, + } + multiPortInfos := []multiPortInfo{ + { + serviceIndex: 0, + serviceName: "web", + }, + { + serviceIndex: 1, + serviceName: "web-admin", + }, + } + expCommand := map[int][]string{ + 0: {"envoy", "--config-path", "/consul/connect-inject/envoy-bootstrap-web.yaml", "--base-id", "0"}, + 1: {"envoy", "--config-path", "/consul/connect-inject/envoy-bootstrap-web-admin.yaml", "--base-id", "1"}, + } + for i := 0; i < 2; i++ { + container, err := h.envoySidecar(testNS, pod, multiPortInfos[i]) + require.NoError(err) + require.Equal(expCommand[i], container.Command) + + require.Equal(container.VolumeMounts, []corev1.VolumeMount{ + { + Name: volumeName, + MountPath: "/consul/connect-inject", + }, + }) + } +} + func TestHandlerEnvoySidecar_withSecurityContext(t *testing.T) { cases := map[string]struct { tproxyEnabled bool @@ -106,7 +155,7 @@ func TestHandlerEnvoySidecar_withSecurityContext(t *testing.T) { }, }, } - ec, err := h.envoySidecar(testNS, pod) + ec, err := h.envoySidecar(testNS, pod, multiPortInfo{}) require.NoError(t, err) require.Equal(t, c.expSecurityContext, ec.SecurityContext) }) @@ -130,37 +179,88 @@ func TestHandlerEnvoySidecar_FailsWithDuplicatePodSecurityContextUID(t *testing. }, }, } - _, err := h.envoySidecar(testNS, pod) + _, err := h.envoySidecar(testNS, pod, multiPortInfo{}) require.Error(err, fmt.Sprintf("pod security context cannot have the same uid as envoy: %v", envoyUserAndGroupID)) } -// Test that if the user specifies a container with security context with the same uid as `envoyUserAndGroupID` -// that we return an error to the handler. +// Test that if the user specifies a container with security context with the same uid as `envoyUserAndGroupID` that we +// return an error to the handler. If a container using the envoy image has the same uid, we don't return an error +// because in multiport pod there can be multiple envoy sidecars. func TestHandlerEnvoySidecar_FailsWithDuplicateContainerSecurityContextUID(t *testing.T) { - require := require.New(t) - h := Handler{} - pod := corev1.Pod{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "web", - // Setting RunAsUser: 1 should succeed. - SecurityContext: &corev1.SecurityContext{ - RunAsUser: pointerToInt64(1), + cases := []struct { + name string + pod corev1.Pod + handler Handler + expErr bool + expErrMessage error + }{ + { + name: "fails with non envoy image", + pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "web", + // Setting RunAsUser: 1 should succeed. + SecurityContext: &corev1.SecurityContext{ + RunAsUser: pointerToInt64(1), + }, + }, + { + Name: "app", + // Setting RunAsUser: 5995 should fail. + SecurityContext: &corev1.SecurityContext{ + RunAsUser: pointerToInt64(envoyUserAndGroupID), + }, + Image: "not-envoy", + }, }, }, - { - Name: "app", - // Setting RunAsUser: 5995 should fail. - SecurityContext: &corev1.SecurityContext{ - RunAsUser: pointerToInt64(envoyUserAndGroupID), + }, + handler: Handler{}, + expErr: true, + expErrMessage: fmt.Errorf("container app has runAsUser set to the same uid %q as envoy which is not allowed", envoyUserAndGroupID), + }, + { + name: "doesn't fail with envoy image", + pod: corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "web", + // Setting RunAsUser: 1 should succeed. + SecurityContext: &corev1.SecurityContext{ + RunAsUser: pointerToInt64(1), + }, + }, + { + Name: "sidecar", + // Setting RunAsUser: 5995 should succeed if the image matches h.ImageEnvoy. + SecurityContext: &corev1.SecurityContext{ + RunAsUser: pointerToInt64(envoyUserAndGroupID), + }, + Image: "envoy", + }, }, }, }, + handler: Handler{ + ImageEnvoy: "envoy", + }, + expErr: false, }, } - _, err := h.envoySidecar(testNS, pod) - require.Error(err, fmt.Sprintf("container %q has runAsUser set to the same uid %q as envoy which is not allowed", pod.Spec.Containers[1].Name, envoyUserAndGroupID)) + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, err := tc.handler.envoySidecar(testNS, tc.pod, multiPortInfo{}) + if tc.expErr { + require.Error(t, err, tc.expErrMessage) + } else { + require.NoError(t, err) + } + }) + } } // Test that we can pass extra args to envoy via the extraEnvoyArgs flag @@ -247,7 +347,7 @@ func TestHandlerEnvoySidecar_EnvoyExtraArgs(t *testing.T) { EnvoyExtraArgs: tc.envoyExtraArgs, } - c, err := h.envoySidecar(testNS, *tc.pod) + c, err := h.envoySidecar(testNS, *tc.pod, multiPortInfo{}) require.NoError(t, err) require.Equal(t, tc.expectedContainerCommand, c.Command) }) @@ -421,7 +521,7 @@ func TestHandlerEnvoySidecar_Resources(t *testing.T) { }, }, } - container, err := c.handler.envoySidecar(testNS, pod) + container, err := c.handler.envoySidecar(testNS, pod, multiPortInfo{}) if c.expErr != "" { require.NotNil(err) require.Contains(err.Error(), c.expErr) diff --git a/control-plane/connect-inject/handler.go b/control-plane/connect-inject/handler.go index 8e7eb48a63..6e454557c0 100644 --- a/control-plane/connect-inject/handler.go +++ b/control-plane/connect-inject/handler.go @@ -6,7 +6,9 @@ import ( "errors" "fmt" "net/http" + "path/filepath" "strconv" + "strings" mapset "github.com/deckarep/golang-set" "github.com/go-logr/logr" @@ -155,6 +157,10 @@ type Handler struct { decoder *admission.Decoder } +type multiPortInfo struct { + serviceIndex int + serviceName string +} // Handle is the admission.Handler implementation that actually handles the // webhook request for admission control. This should be registered or @@ -225,21 +231,91 @@ func (h *Handler) Handle(ctx context.Context, req admission.Request) admission.R return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error getting namespace metadata for container: %s", err)) } - // Add the init container that listens for the service and proxy service and sets up the Envoy configuration. - initContainer, err := h.containerInit(*ns, pod) - if err != nil { - h.Log.Error(err, "error configuring injection init container", "request name", req.Name) - return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error configuring injection init container: %s", err)) - } - pod.Spec.InitContainers = append(pod.Spec.InitContainers, initContainer) + // Get service names from the annotation. If theres 0-1 service names, it's a single port pod, otherwise it's multi + // port. + annotatedSvcNames := h.annotatedServiceNames(pod) + multiPort := len(annotatedSvcNames) > 1 - // Add the Envoy sidecar. - envoySidecar, err := h.envoySidecar(*ns, pod) - if err != nil { - h.Log.Error(err, "error configuring injection sidecar container", "request name", req.Name) - return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error configuring injection sidecar container: %s", err)) + // For single port pods, add the single init container and envoy sidecar. + if !multiPort { + // Add the init container that registers the service and sets up the Envoy configuration. + initContainer, err := h.containerInit(*ns, pod, multiPortInfo{}) + if err != nil { + h.Log.Error(err, "error configuring injection init container", "request name", req.Name) + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error configuring injection init container: %s", err)) + } + pod.Spec.InitContainers = append(pod.Spec.InitContainers, initContainer) + + // Add the Envoy sidecar. + envoySidecar, err := h.envoySidecar(*ns, pod, multiPortInfo{}) + if err != nil { + h.Log.Error(err, "error configuring injection sidecar container", "request name", req.Name) + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error configuring injection sidecar container: %s", err)) + } + pod.Spec.Containers = append(pod.Spec.Containers, envoySidecar) + } else { + // For multi port pods, check for unsupported cases, mount all relevant service account tokens, and mount an init + // container and envoy sidecar per port. Tproxy, metrics, and metrics merging are not supported for multi port pods. + // In a single port pod, the service account specified in the pod is sufficient for mounting the service account + // token to the pod. In a multi port pod, where multiple services are registered with Consul, we also require a + // service account per service. So, this will look for service accounts whose name matches the service and mount + // those tokens if not already specified via the pod's serviceAccountName. + + h.Log.Info("processing multiport pod") + err := h.checkUnsupportedMultiPortCases(*ns, pod) + if err != nil { + h.Log.Error(err, "checking unsupported cases for multi port pods") + return admission.Errored(http.StatusInternalServerError, err) + } + for i, svc := range annotatedSvcNames { + h.Log.Info(fmt.Sprintf("service: %s", svc)) + if h.AuthMethod != "" { + if svc != "" && pod.Spec.ServiceAccountName != svc { + sa, err := h.Clientset.CoreV1().ServiceAccounts(req.Namespace).Get(ctx, svc, metav1.GetOptions{}) + if err != nil { + h.Log.Error(err, "couldn't get service accounts") + return admission.Errored(http.StatusInternalServerError, err) + } + if len(sa.Secrets) == 0 { + h.Log.Info(fmt.Sprintf("service account %s has zero secrets exp at least 1", svc)) + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("service account %s has zero secrets, expected at least one", svc)) + } + saSecret := sa.Secrets[0].Name + h.Log.Info("found service account, mounting service account secret to Pod", "serviceAccountName", sa.Name) + pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{ + Name: fmt.Sprintf("%s-service-account", svc), + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: saSecret, + }, + }, + }) + } + } + + // This will get passed to the init and sidecar containers so they are configured correctly. + mpi := multiPortInfo{ + serviceIndex: i, + serviceName: svc, + } + + // Add the init container that registers the service and sets up the Envoy configuration. + initContainer, err := h.containerInit(*ns, pod, mpi) + if err != nil { + h.Log.Error(err, "error configuring injection init container", "request name", req.Name) + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error configuring injection init container: %s", err)) + } + pod.Spec.InitContainers = append(pod.Spec.InitContainers, initContainer) + + // Add the Envoy sidecar. + envoySidecar, err := h.envoySidecar(*ns, pod, mpi) + if err != nil { + h.Log.Error(err, "error configuring injection sidecar container", "request name", req.Name) + return admission.Errored(http.StatusInternalServerError, fmt.Errorf("error configuring injection sidecar container: %s", err)) + } + pod.Spec.Containers = append(pod.Spec.Containers, envoySidecar) + } } - pod.Spec.Containers = append(pod.Spec.Containers, envoySidecar) // Now that the consul-sidecar no longer needs to re-register services periodically // (that functionality lives in the endpoints-controller), @@ -460,6 +536,7 @@ func (h *Handler) validatePod(pod corev1.Pod) error { } func portValue(pod corev1.Pod, value string) (int32, error) { + value = strings.Split(value, ",")[0] // First search for the named port. for _, c := range pod.Spec.Containers { for _, p := range c.Ports { @@ -474,7 +551,23 @@ func portValue(pod corev1.Pod, value string) (int32, error) { return int32(raw), err } -func findServiceAccountVolumeMount(pod corev1.Pod) (corev1.VolumeMount, error) { +func findServiceAccountVolumeMount(pod corev1.Pod, multiPort bool, multiPortSvcName string) (corev1.VolumeMount, string, error) { + // In the case of a multiPort pod, there may be another service account + // token mounted as a different volume. Its name must be -serviceaccount. + // If not we'll fall back to the service account for the pod. + if multiPort { + for _, v := range pod.Spec.Volumes { + if v.Name == fmt.Sprintf("%s-service-account", multiPortSvcName) { + mountPath := fmt.Sprintf("/consul/serviceaccount-%s", multiPortSvcName) + return corev1.VolumeMount{ + Name: v.Name, + ReadOnly: true, + MountPath: mountPath, + }, filepath.Join(mountPath, "token"), nil + } + } + } + // Find the volume mount that is mounted at the known // service account token location var volumeMount corev1.VolumeMount @@ -489,10 +582,43 @@ func findServiceAccountVolumeMount(pod corev1.Pod) (corev1.VolumeMount, error) { // Return an error if volumeMount is still empty if (corev1.VolumeMount{}) == volumeMount { - return volumeMount, errors.New("unable to find service account token volumeMount") + return volumeMount, "", errors.New("unable to find service account token volumeMount") + } + + return volumeMount, "/var/run/secrets/kubernetes.io/serviceaccount/token", nil +} + +func (h *Handler) annotatedServiceNames(pod corev1.Pod) []string { + var annotatedSvcNames []string + if anno, ok := pod.Annotations[annotationService]; ok { + annotatedSvcNames = strings.Split(anno, ",") } + return annotatedSvcNames +} - return volumeMount, nil +func (h *Handler) checkUnsupportedMultiPortCases(ns corev1.Namespace, pod corev1.Pod) error { + tproxyEnabled, err := transparentProxyEnabled(ns, pod, h.EnableTransparentProxy) + if err != nil { + return fmt.Errorf("couldn't check if tproxy is enabled: %s", err) + } + metricsEnabled, err := h.MetricsConfig.enableMetrics(pod) + if err != nil { + return fmt.Errorf("couldn't check if metrics is enabled: %s", err) + } + metricsMergingEnabled, err := h.MetricsConfig.enableMetricsMerging(pod) + if err != nil { + return fmt.Errorf("couldn't check if metrics merging is enabled: %s", err) + } + if tproxyEnabled { + return fmt.Errorf("multi port services are not compatible with transparent proxy") + } + if metricsEnabled { + return fmt.Errorf("multi port services are not compatible with metrics") + } + if metricsMergingEnabled { + return fmt.Errorf("multi port services are not compatible with metrics merging") + } + return nil } func (h *Handler) InjectDecoder(d *admission.Decoder) error { diff --git a/control-plane/connect-inject/handler_test.go b/control-plane/connect-inject/handler_test.go index 8f34ecea71..cd77de66f4 100644 --- a/control-plane/connect-inject/handler_test.go +++ b/control-plane/connect-inject/handler_test.go @@ -525,6 +525,60 @@ func TestHandlerHandle(t *testing.T) { }, }, }, + { + "multi port pod", + Handler{ + Log: logrtest.TestLogger{T: t}, + AllowK8sNamespacesSet: mapset.NewSetWith("*"), + DenyK8sNamespacesSet: mapset.NewSet(), + decoder: decoder, + Clientset: defaultTestClientWithNamespace(), + }, + admission.Request{ + AdmissionRequest: admissionv1.AdmissionRequest{ + Namespace: namespaces.DefaultNamespace, + Object: encodeRaw(t, &corev1.Pod{ + Spec: basicSpec, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + annotationService: "web, web-admin", + }, + }, + }), + }, + }, + "", + []jsonpatch.Operation{ + { + Operation: "add", + Path: "/spec/volumes", + }, + { + Operation: "add", + Path: "/spec/initContainers", + }, + { + Operation: "add", + Path: "/spec/containers/1", + }, + { + Operation: "add", + Path: "/spec/containers/2", + }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(keyInjectStatus), + }, + { + Operation: "add", + Path: "/metadata/annotations/" + escapeJSONPointer(annotationOriginalPod), + }, + { + Operation: "add", + Path: "/metadata/labels", + }, + }, + }, } for _, tt := range cases { @@ -1612,6 +1666,42 @@ func TestOverwriteProbes(t *testing.T) { } } +func TestHandler_checkUnsupportedMultiPortCases(t *testing.T) { + cases := []struct { + name string + annotations map[string]string + expErr string + }{ + { + name: "tproxy", + annotations: map[string]string{keyTransparentProxy: "true"}, + expErr: "multi port services are not compatible with transparent proxy", + }, + { + name: "metrics", + annotations: map[string]string{annotationEnableMetrics: "true"}, + expErr: "multi port services are not compatible with metrics", + }, + { + name: "metrics merging", + annotations: map[string]string{annotationEnableMetricsMerging: "true"}, + expErr: "multi port services are not compatible with metrics merging", + }, + } + for _, tt := range cases { + t.Run(tt.name, func(t *testing.T) { + h := Handler{} + pod := minimal() + pod.Annotations = tt.annotations + err := h.checkUnsupportedMultiPortCases(corev1.Namespace{}, *pod) + require.Error(t, err) + require.Equal(t, tt.expErr, err.Error()) + }) + + } + +} + // encodeRaw is a helper to encode some data into a RawExtension. func encodeRaw(t *testing.T, input interface{}) runtime.RawExtension { data, err := json.Marshal(input) diff --git a/control-plane/subcommand/connect-init/command.go b/control-plane/subcommand/connect-init/command.go index 3d30710107..01a23e9c2c 100644 --- a/control-plane/subcommand/connect-init/command.go +++ b/control-plane/subcommand/connect-init/command.go @@ -44,9 +44,10 @@ type Command struct { flagLogLevel string flagLogJSON bool - bearerTokenFile string // Location of the bearer token. Default is /var/run/secrets/kubernetes.io/serviceaccount/token. - tokenSinkFile string // Location to write the output token. Default is defaultTokenSinkFile. - proxyIDFile string // Location to write the output proxyID. Default is defaultProxyIDFile. + flagBearerTokenFile string // Location of the bearer token. Default is /var/run/secrets/kubernetes.io/serviceaccount/token. + flagACLTokenSink string // Location to write the output token. Default is defaultTokenSinkFile. + flagProxyIDFile string // Location to write the output proxyID. Default is defaultProxyIDFile. + flagMultiPort bool serviceRegistrationPollingAttempts uint64 // Number of times to poll for this service to be registered. flagSet *flag.FlagSet @@ -66,21 +67,16 @@ func (c *Command) init() { c.flagSet.StringVar(&c.flagConsulServiceNamespace, "consul-service-namespace", "", "Consul destination namespace of the service.") c.flagSet.StringVar(&c.flagServiceAccountName, "service-account-name", "", "Service account name on the pod.") c.flagSet.StringVar(&c.flagServiceName, "service-name", "", "Service name as specified via the pod annotation.") + c.flagSet.StringVar(&c.flagBearerTokenFile, "bearer-token-file", defaultBearerTokenFile, "Path to service account token file.") + c.flagSet.StringVar(&c.flagACLTokenSink, "acl-token-sink", defaultTokenSinkFile, "File name where where ACL token should be saved.") + c.flagSet.StringVar(&c.flagProxyIDFile, "proxy-id-file", defaultProxyIDFile, "File name where proxy's Consul service ID should be saved.") + c.flagSet.BoolVar(&c.flagMultiPort, "multiport", false, "If the pod is a multi port pod.") c.flagSet.StringVar(&c.flagLogLevel, "log-level", "info", "Log verbosity level. Supported values (in order of detail) are \"trace\", "+ "\"debug\", \"info\", \"warn\", and \"error\".") c.flagSet.BoolVar(&c.flagLogJSON, "log-json", false, "Enable or disable JSON output format for logging.") - if c.bearerTokenFile == "" { - c.bearerTokenFile = defaultBearerTokenFile - } - if c.tokenSinkFile == "" { - c.tokenSinkFile = defaultTokenSinkFile - } - if c.proxyIDFile == "" { - c.proxyIDFile = defaultProxyIDFile - } if c.serviceRegistrationPollingAttempts == 0 { c.serviceRegistrationPollingAttempts = defaultServicePollingRetries } @@ -134,7 +130,7 @@ func (c *Command) Run(args []string) int { // loginMeta is the default metadata that we pass to the consul login API. loginMeta := map[string]string{"pod": fmt.Sprintf("%s/%s", c.flagPodNamespace, c.flagPodName)} err = backoff.Retry(func() error { - err := common.ConsulLogin(consulClient, c.bearerTokenFile, c.flagACLAuthMethod, c.tokenSinkFile, c.flagAuthMethodNamespace, loginMeta) + err := common.ConsulLogin(consulClient, c.flagBearerTokenFile, c.flagACLAuthMethod, c.flagACLTokenSink, c.flagAuthMethodNamespace, loginMeta) if err != nil { c.logger.Error("Consul login failed; retrying", "error", err) } @@ -151,7 +147,7 @@ func (c *Command) Run(args []string) int { return 1 } // Now update the client so that it will read the ACL token we just fetched. - cfg.TokenFile = c.tokenSinkFile + cfg.TokenFile = c.flagACLTokenSink consulClient, err = consul.NewClient(cfg) if err != nil { c.logger.Error("Unable to update client connection", "error", err) @@ -210,7 +206,13 @@ func (c *Command) Run(args []string) int { var errServiceNameMismatch error err = backoff.Retry(func() error { registrationRetryCount++ - filter := fmt.Sprintf("Meta[%q] == %q and Meta[%q] == %q", connectinject.MetaKeyPodName, c.flagPodName, connectinject.MetaKeyKubeNS, c.flagPodNamespace) + filter := fmt.Sprintf("Meta[%q] == %q and Meta[%q] == %q ", + connectinject.MetaKeyPodName, c.flagPodName, connectinject.MetaKeyKubeNS, c.flagPodNamespace) + if c.flagMultiPort && c.flagServiceName != "" { + // If the service name is set and this is a multi-port pod there may be multiple services registered for + // this one Pod. If so, we want to ensure the service and proxy matching our expected name is registered. + filter += fmt.Sprintf(` and (Service == %q or Service == "%s-sidecar-proxy")`, c.flagServiceName, c.flagServiceName) + } serviceList, err := consulClient.Agent().ServicesWithFilter(filter) if err != nil { c.logger.Error("Unable to get Agent services", "error", err) @@ -231,7 +233,7 @@ func (c *Command) Run(args []string) int { " `consul.hashicorp.com/service-ignore: \"true\"` to all services except the one used by Consul for handling requests.") } - return fmt.Errorf("did not find correct number of services: %d", len(serviceList)) + return fmt.Errorf("did not find correct number of services, found: %d, services: %+v", len(serviceList), serviceList) } for _, svc := range serviceList { c.logger.Info("Registered service has been detected", "service", svc.Service) @@ -271,7 +273,7 @@ func (c *Command) Run(args []string) int { return 1 } // Write the proxy ID to the shared volume so `consul connect envoy` can use it for bootstrapping. - err = common.WriteFileWithPerms(c.proxyIDFile, proxyID, os.FileMode(0444)) + err = common.WriteFileWithPerms(c.flagProxyIDFile, proxyID, os.FileMode(0444)) if err != nil { c.logger.Error("Unable to write proxy ID to file", "error", err) return 1 diff --git a/control-plane/subcommand/connect-init/command_ent_test.go b/control-plane/subcommand/connect-init/command_ent_test.go index f043542d83..891ee2ed32 100644 --- a/control-plane/subcommand/connect-init/command_ent_test.go +++ b/control-plane/subcommand/connect-init/command_ent_test.go @@ -184,9 +184,6 @@ func TestRun_ServicePollingWithACLsAndTLSWithNamespaces(t *testing.T) { ui := cli.NewMockUi() cmd := Command{ UI: ui, - bearerTokenFile: bearerFile, - tokenSinkFile: tokenFile, - proxyIDFile: proxyFile, serviceRegistrationPollingAttempts: 5, } // We build the http-addr because normally it's defined by the init container setting @@ -196,6 +193,9 @@ func TestRun_ServicePollingWithACLsAndTLSWithNamespaces(t *testing.T) { "-service-account-name", testServiceAccountName, "-http-addr", fmt.Sprintf("%s://%s", cfg.Scheme, cfg.Address), "-consul-service-namespace", c.consulServiceNamespace, + "-acl-token-sink", tokenFile, + "-bearer-token-file", bearerFile, + "-proxy-id-file", proxyFile, } if c.acls { flags = append(flags, "-acl-auth-method", test.AuthMethod, "-auth-method-namespace", c.authMethodNamespace) diff --git a/control-plane/subcommand/connect-init/command_test.go b/control-plane/subcommand/connect-init/command_test.go index 40136b9fd8..7965a5ea30 100644 --- a/control-plane/subcommand/connect-init/command_test.go +++ b/control-plane/subcommand/connect-init/command_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "net/url" "os" + "strconv" "testing" "time" @@ -68,6 +69,7 @@ func TestRun_ServicePollingWithACLsAndTLS(t *testing.T) { includeServiceAccountName bool serviceAccountNameMismatch bool expFail bool + multiport bool }{ { name: "ACLs enabled, no tls", @@ -91,6 +93,13 @@ func TestRun_ServicePollingWithACLsAndTLS(t *testing.T) { serviceAccountName: "web", serviceName: "web", }, + { + name: "ACLs enabled, multiport service", + tls: false, + serviceAccountName: "counting-admin", + serviceName: "counting-admin", + multiport: true, + }, { name: "ACLs enabled, service name annotation doesn't match service account name", tls: false, @@ -152,6 +161,9 @@ func TestRun_ServicePollingWithACLsAndTLS(t *testing.T) { // Register Consul services. testConsulServices := []api.AgentServiceRegistration{consulCountingSvc, consulCountingSvcSidecar} + if tt.multiport { + testConsulServices = append(testConsulServices, consulCountingSvcMultiport, consulCountingSvcSidecarMultiport) + } for _, svc := range testConsulServices { require.NoError(t, consulClient.Agent().ServiceRegister(&svc)) } @@ -159,9 +171,6 @@ func TestRun_ServicePollingWithACLsAndTLS(t *testing.T) { ui := cli.NewMockUi() cmd := Command{ UI: ui, - bearerTokenFile: bearerFile, - tokenSinkFile: tokenFile, - proxyIDFile: proxyFile, serviceRegistrationPollingAttempts: 3, } @@ -173,6 +182,10 @@ func TestRun_ServicePollingWithACLsAndTLS(t *testing.T) { "-service-account-name", tt.serviceAccountName, "-service-name", tt.serviceName, "-http-addr", fmt.Sprintf("%s://%s", cfg.Scheme, cfg.Address), + "-bearer-token-file", bearerFile, + "-acl-token-sink", tokenFile, + "-proxy-id-file", proxyFile, + "-multiport=" + strconv.FormatBool(tt.multiport), } // Add the CA File if necessary since we're not setting CONSUL_CACERT in tt ENV. if tt.tls { @@ -201,7 +214,11 @@ func TestRun_ServicePollingWithACLsAndTLS(t *testing.T) { // Validate contents of proxyFile. data, err := ioutil.ReadFile(proxyFile) require.NoError(t, err) - require.Contains(t, string(data), "counting-counting-sidecar-proxy") + if tt.multiport { + require.Contains(t, string(data), "counting-admin-sidecar-proxy-id") + } else { + require.Contains(t, string(data), "counting-counting-sidecar-proxy") + } }) } } @@ -210,8 +227,10 @@ func TestRun_ServicePollingWithACLsAndTLS(t *testing.T) { func TestRun_ServicePollingOnly(t *testing.T) { t.Parallel() cases := []struct { - name string - tls bool + name string + tls bool + serviceName string + multiport bool }{ { name: "ACLs disabled, no tls", @@ -221,6 +240,12 @@ func TestRun_ServicePollingOnly(t *testing.T) { name: "ACLs disabled, tls", tls: true, }, + { + name: "Multiport, ACLs disabled, no tls", + tls: false, + serviceName: "counting-admin", + multiport: true, + }, } for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { @@ -260,6 +285,9 @@ func TestRun_ServicePollingOnly(t *testing.T) { // Register Consul services. testConsulServices := []api.AgentServiceRegistration{consulCountingSvc, consulCountingSvcSidecar} + if tt.multiport { + testConsulServices = append(testConsulServices, consulCountingSvcMultiport, consulCountingSvcSidecarMultiport) + } for _, svc := range testConsulServices { require.NoError(t, consulClient.Agent().ServiceRegister(&svc)) } @@ -267,7 +295,6 @@ func TestRun_ServicePollingOnly(t *testing.T) { ui := cli.NewMockUi() cmd := Command{ UI: ui, - proxyIDFile: proxyFile, serviceRegistrationPollingAttempts: 3, } // We build the http-addr because normally it's defined by the init container setting @@ -275,7 +302,15 @@ func TestRun_ServicePollingOnly(t *testing.T) { flags := []string{ "-pod-name", testPodName, "-pod-namespace", testPodNamespace, + "-proxy-id-file", proxyFile, + "-multiport=" + strconv.FormatBool(tt.multiport), "-http-addr", fmt.Sprintf("%s://%s", cfg.Scheme, cfg.Address)} + + // In a multiport case, the service name will be passed in to the test. + if tt.serviceName != "" { + flags = append(flags, "-service-name", tt.serviceName) + } + // Add the CA File if necessary since we're not setting CONSUL_CACERT in tt ENV. if tt.tls { flags = append(flags, "-ca-file", caFile) @@ -288,7 +323,11 @@ func TestRun_ServicePollingOnly(t *testing.T) { // Validate contents of proxyFile. data, err := ioutil.ReadFile(proxyFile) require.NoError(t, err) - require.Contains(t, string(data), "counting-counting-sidecar-proxy") + if tt.multiport { + require.Contains(t, string(data), "counting-admin-sidecar-proxy-id") + } else { + require.Contains(t, string(data), "counting-counting-sidecar-proxy") + } }) } @@ -460,13 +499,13 @@ func TestRun_ServicePollingErrors(t *testing.T) { ui := cli.NewMockUi() cmd := Command{ UI: ui, - proxyIDFile: proxyFile, serviceRegistrationPollingAttempts: 1, } flags := []string{ "-http-addr", server.HTTPAddr, "-pod-name", testPodName, "-pod-namespace", testPodNamespace, + "-proxy-id-file", proxyFile, } code := cmd.Run(flags) @@ -504,13 +543,13 @@ func TestRun_RetryServicePolling(t *testing.T) { ui := cli.NewMockUi() cmd := Command{ UI: ui, - proxyIDFile: proxyFile, serviceRegistrationPollingAttempts: 10, } flags := []string{ "-pod-name", testPodName, "-pod-namespace", testPodNamespace, "-http-addr", server.HTTPAddr, + "-proxy-id-file", proxyFile, } code := cmd.Run(flags) require.Equal(t, 0, code) @@ -544,13 +583,13 @@ func TestRun_InvalidProxyFile(t *testing.T) { ui := cli.NewMockUi() cmd := Command{ UI: ui, - proxyIDFile: randFileName, serviceRegistrationPollingAttempts: 3, } flags := []string{ "-pod-name", testPodName, "-pod-namespace", testPodNamespace, "-http-addr", server.HTTPAddr, + "-proxy-id-file", randFileName, } code := cmd.Run(flags) require.Equal(t, 1, code) @@ -608,8 +647,8 @@ func TestRun_FailsWithBadServerResponses(t *testing.T) { ui := cli.NewMockUi() cmd := Command{ UI: ui, - bearerTokenFile: bearerFile, - tokenSinkFile: tokenFile, + flagBearerTokenFile: bearerFile, + flagACLTokenSink: tokenFile, serviceRegistrationPollingAttempts: uint64(servicesGetRetries), } @@ -619,6 +658,8 @@ func TestRun_FailsWithBadServerResponses(t *testing.T) { "-pod-name", testPodName, "-pod-namespace", testPodNamespace, "-acl-auth-method", test.AuthMethod, "-service-account-name", testServiceAccountName, + "-bearer-token-file", bearerFile, + "-acl-token-sink", tokenFile, "-http-addr", serverURL.String()} code := cmd.Run(flags) require.Equal(t, 1, code) @@ -683,16 +724,16 @@ func TestRun_LoginWithRetries(t *testing.T) { ui := cli.NewMockUi() cmd := Command{ - UI: ui, - tokenSinkFile: tokenFile, - bearerTokenFile: bearerFile, - proxyIDFile: proxyFile, + UI: ui, } code := cmd.Run([]string{ "-pod-name", testPodName, "-pod-namespace", testPodNamespace, "-acl-auth-method", test.AuthMethod, "-service-account-name", testServiceAccountName, + "-acl-token-sink", tokenFile, + "-bearer-token-file", bearerFile, + "-proxy-id-file", proxyFile, "-http-addr", serverURL.String()}) fmt.Println(ui.ErrorWriter.String()) require.Equal(t, c.ExpCode, code) @@ -762,16 +803,16 @@ func TestRun_EnsureTokenExists(t *testing.T) { ui := cli.NewMockUi() cmd := Command{ - UI: ui, - tokenSinkFile: tokenFile, - bearerTokenFile: bearerFile, - proxyIDFile: proxyFile, + UI: ui, } code := cmd.Run([]string{ "-pod-name", testPodName, "-pod-namespace", testPodNamespace, "-acl-auth-method", test.AuthMethod, "-service-account-name", testServiceAccountName, + "-acl-token-sink", tokenFile, + "-bearer-token-file", bearerFile, + "-proxy-id-file", proxyFile, "-http-addr", serverURL.String()}) if c.neverSucceed { require.Equal(t, 1, code) @@ -937,4 +978,32 @@ var ( metaKeyKubeServiceName: "counting", }, } + consulCountingSvcMultiport = api.AgentServiceRegistration{ + ID: "counting-admin-id", + Name: "counting-admin", + Address: "127.0.0.1", + Meta: map[string]string{ + metaKeyPodName: "counting-pod", + metaKeyKubeNS: "default-ns", + metaKeyKubeServiceName: "counting-admin", + }, + } + consulCountingSvcSidecarMultiport = api.AgentServiceRegistration{ + ID: "counting-admin-sidecar-proxy-id", + Name: "counting-admin-sidecar-proxy", + Kind: "connect-proxy", + Proxy: &api.AgentServiceConnectProxyConfig{ + DestinationServiceName: "counting-admin", + DestinationServiceID: "counting-admin-id", + Config: nil, + Upstreams: nil, + }, + Port: 9999, + Address: "127.0.0.1", + Meta: map[string]string{ + metaKeyPodName: "counting-pod", + metaKeyKubeNS: "default-ns", + metaKeyKubeServiceName: "counting-admin", + }, + } )