Skip to content

Commit

Permalink
Webhook refactor (#454)
Browse files Browse the repository at this point in the history
* Refactor handler webhook to be of type admission.Webhook
- Replace pointer references with values refernces in methods used
  by connect-inject for Pods.

* Add watcher for agent pods to endpoints controller
- This watcher watches for Consul Agent pods to be in a running phase
  and the condition ready to be true and then reconcile all endpoints
that have a ready/not-ready address that share a node name with that of
the consul agent pod.
  • Loading branch information
Ashwin Venkatesh authored Mar 24, 2021
1 parent 281552c commit a6c110c
Show file tree
Hide file tree
Showing 19 changed files with 1,379 additions and 1,101 deletions.
2 changes: 1 addition & 1 deletion connect-inject/consul_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
corev1 "k8s.io/api/core/v1"
)

func (h *Handler) consulSidecar(pod *corev1.Pod) (corev1.Container, error) {
func (h *Handler) consulSidecar(pod corev1.Pod) (corev1.Container, error) {
command := []string{
"consul-k8s",
"consul-sidecar",
Expand Down
10 changes: 5 additions & 5 deletions connect-inject/consul_sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestConsulSidecar_Default(t *testing.T) {
ImageConsulK8S: "hashicorp/consul-k8s:9.9.9",
ConsulSidecarResources: consulSidecarResources,
}
container, err := handler.consulSidecar(&corev1.Pod{
container, err := handler.consulSidecar(corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestConsulSidecar_AuthMethod(t *testing.T) {
AuthMethod: authMethod,
ImageConsulK8S: "hashicorp/consul-k8s:9.9.9",
}
container, err := handler.consulSidecar(&corev1.Pod{
container, err := handler.consulSidecar(corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestConsulSidecar_SyncPeriodAnnotation(t *testing.T) {
Log: hclog.Default().Named("handler"),
ImageConsulK8S: "hashicorp/consul-k8s:9.9.9",
}
container, err := handler.consulSidecar(&corev1.Pod{
container, err := handler.consulSidecar(corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
"consul.hashicorp.com/connect-sync-period": "55s",
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestConsulSidecar_TLS(t *testing.T) {
ConsulCACert: "consul-ca-cert",
ConsulSidecarResources: consulSidecarResources,
}
container, err := handler.consulSidecar(&corev1.Pod{
container, err := handler.consulSidecar(corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestConsulSidecar_MetricsFlags(t *testing.T) {
DefaultEnableMetrics: true,
DefaultEnableMetricsMerging: true,
}
container, err := handler.consulSidecar(&corev1.Pod{
container, err := handler.consulSidecar(corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
annotationMergedMetricsPort: "20100",
Expand Down
2 changes: 1 addition & 1 deletion connect-inject/container_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
corev1 "k8s.io/api/core/v1"
)

func (h *Handler) containerEnvVars(pod *corev1.Pod) []corev1.EnvVar {
func (h *Handler) containerEnvVars(pod corev1.Pod) []corev1.EnvVar {
raw, ok := pod.Annotations[annotationUpstreams]
if !ok || raw == "" {
return []corev1.EnvVar{}
Expand Down
2 changes: 1 addition & 1 deletion connect-inject/container_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestContainerEnvVars(t *testing.T) {
require := require.New(t)

var h Handler
envVars := h.containerEnvVars(&corev1.Pod{
envVars := h.containerEnvVars(corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
annotationService: "foo",
Expand Down
2 changes: 1 addition & 1 deletion connect-inject/container_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (h *Handler) containerInitCopyContainer() corev1.Container {

// containerInit returns the init container spec for registering the Consul
// service, setting up the Envoy bootstrap, etc.
func (h *Handler) containerInit(pod *corev1.Pod, k8sNamespace string) (corev1.Container, error) {
func (h *Handler) containerInit(pod corev1.Pod, k8sNamespace string) (corev1.Container, error) {
data := initContainerCommandData{
ServiceName: pod.Annotations[annotationService],
ProxyServiceName: fmt.Sprintf("%s-sidecar-proxy", pod.Annotations[annotationService]),
Expand Down
16 changes: 8 additions & 8 deletions connect-inject/container_init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ services {
h := Handler{
ConsulClient: consulClient,
}
container, err := h.containerInit(tt.Pod(minimal()), k8sNamespace)
container, err := h.containerInit(*tt.Pod(minimal()), k8sNamespace)
require.NoError(err)
actual := strings.Join(container.Command, " ")
require.Contains(actual, tt.Cmd)
Expand Down Expand Up @@ -1103,7 +1103,7 @@ EOF
require.True(written)
h.ConsulClient = consulClient

container, err := h.containerInit(tt.Pod(minimal()), k8sNamespace)
container, err := h.containerInit(*tt.Pod(minimal()), k8sNamespace)
require.NoError(err)
actual := strings.Join(container.Command, " ")
require.Contains(actual, tt.Cmd)
Expand Down Expand Up @@ -1142,7 +1142,7 @@ func TestHandlerContainerInit_authMethod(t *testing.T) {
ServiceAccountName: "foo",
},
}
container, err := h.containerInit(pod, k8sNamespace)
container, err := h.containerInit(*pod, k8sNamespace)
require.NoError(err)
actual := strings.Join(container.Command, " ")
require.Contains(actual, `
Expand Down Expand Up @@ -1183,7 +1183,7 @@ func TestHandlerContainerInit_WithTLS(t *testing.T) {
},
},
}
container, err := h.containerInit(pod, k8sNamespace)
container, err := h.containerInit(*pod, k8sNamespace)
require.NoError(err)
actual := strings.Join(container.Command, " ")
require.Contains(actual, `
Expand Down Expand Up @@ -1227,7 +1227,7 @@ func TestHandlerContainerInit_Resources(t *testing.T) {
},
},
}
container, err := h.containerInit(pod, k8sNamespace)
container, err := h.containerInit(*pod, k8sNamespace)
require.NoError(err)
require.Equal(corev1.ResourceRequirements{
Limits: corev1.ResourceList{
Expand Down Expand Up @@ -1262,7 +1262,7 @@ func TestHandlerContainerInit_MismatchedServiceNameServiceAccountNameWithACLsEna
},
}

_, err := h.containerInit(pod, k8sNamespace)
_, err := h.containerInit(*pod, k8sNamespace)
require.EqualError(err, `serviceAccountName "notServiceName" does not match service name "foo"`)
}

Expand All @@ -1285,7 +1285,7 @@ func TestHandlerContainerInit_MismatchedServiceNameServiceAccountNameWithACLsDis
},
}

_, err := h.containerInit(pod, k8sNamespace)
_, err := h.containerInit(*pod, k8sNamespace)
require.NoError(err)
}

Expand Down Expand Up @@ -1410,7 +1410,7 @@ func TestHandlerContainerInit_MeshGatewayModeErrors(t *testing.T) {
},
},
}
_, err = h.containerInit(pod, k8sNamespace)
_, err = h.containerInit(*pod, k8sNamespace)
if c.ExpError == "" {
require.NoError(err)
} else {
Expand Down
106 changes: 100 additions & 6 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@ import (
"fmt"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

mapset "github.com/deckarep/golang-set"
"github.com/go-logr/logr"
"github.com/hashicorp/consul-k8s/consul"
"github.com/hashicorp/consul/api"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

type EndpointsController struct {
Expand Down Expand Up @@ -141,7 +145,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
// and register that port for the host service.
var servicePort int
if raw, ok := pod.Annotations[annotationPort]; ok && raw != "" {
if port, err := portValue(&pod, raw); port > 0 {
if port, err := portValue(pod, raw); port > 0 {
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -206,7 +210,7 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service
proxyConfig.LocalServicePort = servicePort
}

upstreams, err := r.processUpstreams(&pod)
upstreams, err := r.processUpstreams(pod)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -311,7 +315,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNam

// processUpstreams reads the list of upstreams from the Pod annotation and converts them into a list of api.Upstream
// objects.
func (r *EndpointsController) processUpstreams(pod *corev1.Pod) ([]api.Upstream, error) {
func (r *EndpointsController) processUpstreams(pod corev1.Pod) ([]api.Upstream, error) {
var upstreams []api.Upstream
if raw, ok := pod.Annotations[annotationUpstreams]; ok && raw != "" {
for _, raw := range strings.Split(raw, ",") {
Expand Down Expand Up @@ -382,7 +386,11 @@ func (r *EndpointsController) Logger(name types.NamespacedName) logr.Logger {
func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Endpoints{}).
Complete(r)
Watches(
&source.Kind{Type: &corev1.Pod{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: handler.ToRequestsFunc(r.requestsForRunningAgentPods)},
builder.WithPredicates(predicate.NewPredicateFuncs(r.filterAgentPods)),
).Complete(r)
}

// getConsulClient returns an *api.Client that points at the consul agent local to the pod.
Expand Down Expand Up @@ -416,6 +424,92 @@ func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool {
return false
}

// filterAgentPods receives meta and object information for Kubernetes resources that are being watched,
// which in this case are Pods. It only returns true if the Pod is a Consul Client Agent Pod. It reads the labels
// from the meta of the resource and uses the values of the "app" and "component" label to validate that
// the Pod is a Consul Client Agent.
func (r EndpointsController) filterAgentPods(meta metav1.Object, object runtime.Object) bool {
podLabels := meta.GetLabels()
app, ok := podLabels["app"]
if !ok {
return false
}
component, ok := podLabels["component"]
if !ok {
return false
}

release, ok := podLabels["release"]
if !ok {
return false
}

if app == "consul" && component == "client" && release == r.ReleaseName {
return true
}
return false
}

// requestsForRunningAgentPods creates a slice of requests for the endpoints controller.
// It enqueues a request for each endpoint that needs to be reconciled. It iterates through
// the list of endpoints and creates a request for those endpoints that have an address that
// are on the same node as the new Consul Agent pod. It receives a Pod Object which is a
// Consul Agent that has been filtered by filterAgentPods and only enqueues endpoints
// for client agent pods where the Ready condition is true.
func (r EndpointsController) requestsForRunningAgentPods(object handler.MapObject) []ctrl.Request {
var consulClientPod corev1.Pod
r.Log.Info("received update for consulClientPod", "podName", object.Meta.GetName())
err := r.Client.Get(r.Ctx, types.NamespacedName{Name: object.Meta.GetName(), Namespace: object.Meta.GetNamespace()}, &consulClientPod)
if k8serrors.IsNotFound(err) {
// Ignore if consulClientPod is not found.
return []ctrl.Request{}
}
if err != nil {
r.Log.Error(err, "failed to get consulClientPod", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not running, since
// we can't reconcile and register/deregister services against that agent.
if consulClientPod.Status.Phase != corev1.PodRunning {
r.Log.Info("ignoring consulClientPod because it's not running", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
// We can ignore the agent pod if it's not yet ready, since
// we can't reconcile and register/deregister services against that agent.
for _, cond := range consulClientPod.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status != corev1.ConditionTrue {
// Ignore if consulClientPod is not ready.
r.Log.Info("ignoring consulClientPod because it's not ready", "consulClientPod", consulClientPod.Name)
return []ctrl.Request{}
}
}

// Get the list of all endpoints.
var endpointsList corev1.EndpointsList
err = r.Client.List(r.Ctx, &endpointsList)
if err != nil {
r.Log.Error(err, "failed to list endpoints")
return []ctrl.Request{}
}

// Enqueue requests for endpoints that are on the same node
// as the client agent.
var requests []reconcile.Request
for _, ep := range endpointsList.Items {
for _, subset := range ep.Subsets {
allAddresses := subset.Addresses
allAddresses = append(allAddresses, subset.NotReadyAddresses...)
for _, address := range allAddresses {
// Only add requests for the address that is on the same node as the consul client pod.
if address.NodeName != nil && *address.NodeName == consulClientPod.Spec.NodeName {
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Name: ep.Name, Namespace: ep.Namespace}})
}
}
}
}
return requests
}

// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected.
func hasBeenInjected(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[annotationStatus]; ok {
Expand Down
Loading

0 comments on commit a6c110c

Please sign in to comment.