Skip to content

Commit

Permalink
Cleanup and address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ndhanushkodi committed Mar 22, 2021
1 parent a88535e commit 752eca4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 33 deletions.
39 changes: 13 additions & 26 deletions connect-inject/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// error), we need to deregister all instances in Consul for that service.
if k8serrors.IsNotFound(err) {
// Deregister all instances in Consul for this service. The function deregisterServiceOnAllAgents handles
// the case where the Consul service name is different from the K8s service name.
err := r.deregisterServiceOnAllAgents(req.Name, req.Namespace, false, nil)
if err != nil {
// the case where the Consul service name is different from the Kubernetes service name.
if err := r.deregisterServiceOnAllAgents(req.Name, req.Namespace, false, nil); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
} else if err != nil {
r.Log.Error(err, "failed to get endpoints from Kubernetes", "namespace", req.Namespace, "name", req.Name)
r.Log.Error(err, "failed to get Endpoints from Kubernetes", "name", req.Name, "namespace", req.Namespace)
return ctrl.Result{}, err
}

r.Log.Info("retrieved service from kube", "serviceEndpoints", serviceEndpoints)
r.Log.Info("retrieved Kubernetes Endpoints", "serviceEndpoints", serviceEndpoints.Name, "serviceEndpoints namespace", serviceEndpoints.Namespace)

// endpointAddressMap stores every IP that corresponds to a Pod in the Endpoints object. It is used to compare
// against service instances in Consul to deregister them if they are not in the map.
Expand All @@ -92,7 +92,7 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, err
}

if hasBeenInjected(&pod) {
if hasBeenInjected(pod) {
// Create client for Consul agent local to the pod.
client, err := r.getConsulClient(pod.Status.HostIP)
if err != nil {
Expand Down Expand Up @@ -131,8 +131,7 @@ func (r *EndpointsController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
// Compare service instances in Consul with addresses in Endpoints. If an address is not in Endpoints, deregister
// from Consul. This uses endpointAddressMap which is populated with the addresses in the Endpoints object during
// the registration codepath.
err = r.deregisterServiceOnAllAgents(serviceEndpoints.Name, serviceEndpoints.Namespace, true, endpointAddressMap)
if err != nil {
if err = r.deregisterServiceOnAllAgents(serviceEndpoints.Name, serviceEndpoints.Namespace, true, endpointAddressMap); err != nil {
r.Log.Error(err, "failed to deregister service instances on all agents", "k8s service name", serviceEndpoints.Name, "k8s namespace", serviceEndpoints.Namespace)
return ctrl.Result{}, err
}
Expand All @@ -154,11 +153,10 @@ func (r *EndpointsController) createServiceRegistrations(pod corev1.Pod, service

// TODO: remove logic in handler to always set the service name annotation
// We only want that annotation to be present when explicitly overriding the consul svc name
// Otherwise, the Consul service name should equal the K8s Service name.
// Otherwise, the Consul service name should equal the Kubernetes Service name.
// The service name in Consul defaults to the Endpoints object name, and is overridden by the pod
// annotation annotationService.
var serviceName string
serviceName = serviceEndpoints.Name
serviceName := serviceEndpoints.Name
if raw, ok := pod.Annotations[annotationService]; ok && raw != "" {
serviceName = raw
}
Expand Down Expand Up @@ -295,7 +293,8 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNam
// Deregister each service instance that matches the metadata.
for svcID, serviceRegistration := range svcs {
r.Log.Info("deregistering service", "service id", svcID)
// Todo comment
// If we selectively deregister, only deregister if the address is not in the map. Otherwise, deregister
// every service instance.
if selectivelyDeregister {
if _, ok := endpointsAddressesMap[serviceRegistration.Address]; !ok {
// If the service address is not in the Endpoints addresses, deregister it.
Expand All @@ -310,13 +309,7 @@ func (r *EndpointsController) deregisterServiceOnAllAgents(k8sSvcName, k8sSvcNam
return err
}
}
//err = client.Agent().ServiceDeregister(svcID)
//if err != nil {
// r.Log.Error(err, "failed to deregister service instance", "ID", svcID)
// return err
//}
}

}
return nil
}
Expand Down Expand Up @@ -399,17 +392,11 @@ func (r *EndpointsController) SetupWithManager(mgr ctrl.Manager) error {

// getConsulClient returns an *api.Client that points at the consul agent local to the pod.
func (r *EndpointsController) getConsulClient(ip string) (*api.Client, error) {
// todo: un-hardcode the scheme and port
newAddr := fmt.Sprintf("%s://%s:%s", r.ConsulScheme, ip, r.ConsulPort)
localConfig := api.DefaultConfig()
localConfig.Address = newAddr

localClient, err := consul.NewClient(localConfig)
if err != nil {
return nil, err
}

return localClient, err
return consul.NewClient(localConfig)
}

// shouldIgnore ignores namespaces where we don't connect-inject.
Expand All @@ -435,7 +422,7 @@ func shouldIgnore(namespace string, denySet, allowSet mapset.Set) bool {
}

// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected.
func hasBeenInjected(pod *corev1.Pod) bool {
func hasBeenInjected(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[annotationStatus]; ok {
if anno == injected {
return true
Expand Down
10 changes: 5 additions & 5 deletions connect-inject/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,22 @@ func TestHasBeenInjected(t *testing.T) {
t.Parallel()
cases := []struct {
name string
pod func() *corev1.Pod
pod func() corev1.Pod
expected bool
}{
{
name: "Pod with annotation",
pod: func() *corev1.Pod {
pod: func() corev1.Pod {
pod1 := createPod("pod1", "1.2.3.4", true)
return pod1
return *pod1
},
expected: true,
},
{
name: "Pod without injected annotation",
pod: func() *corev1.Pod {
pod: func() corev1.Pod {
pod1 := createPod("pod1", "1.2.3.4", false)
return pod1
return *pod1
},
expected: false,
},
Expand Down
3 changes: 1 addition & 2 deletions subcommand/inject-connect/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,6 @@ func (c *Command) Run(args []string) int {
ctrlExitCh := make(chan error)

// Create a manager for endpoints controller and the mutating webhook.
// Note: the webhook refactor PR will use this manager for the mutating webhook.
zapLogger := zap.New(zap.UseDevMode(true), zap.Level(zapcore.InfoLevel))
ctrl.SetLogger(zapLogger)
klog.SetLogger(zapLogger)
Expand All @@ -472,7 +471,7 @@ func (c *Command) Run(args []string) int {
setupLog.Error(err, "unable to start manager")
return 1
}
// Start the endpoints controller
// Start the endpoints controller.
if err = (&connectinject.EndpointsController{
Client: mgr.GetClient(),
ConsulClient: c.consulClient,
Expand Down

0 comments on commit 752eca4

Please sign in to comment.