Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

NMI retries and ticker for periodic sync reconcile #272

Merged
merged 8 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions cmd/mic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"os"
"time"

"github.com/Azure/aad-pod-identity/pkg/mic"
"github.com/Azure/aad-pod-identity/version"
Expand All @@ -12,10 +13,11 @@ import (
)

var (
kubeconfig string
cloudconfig string
forceNamespaced bool
versionInfo bool
kubeconfig string
cloudconfig string
forceNamespaced bool
versionInfo bool
syncRetryInterval string
)

func main() {
Expand All @@ -24,6 +26,8 @@ func main() {
flag.StringVar(&cloudconfig, "cloudconfig", "", "Path to cloud config e.g. Azure.json file")
flag.BoolVar(&forceNamespaced, "forceNamespaced", false, "Forces namespaced identities, binding, and assignment")
flag.BoolVar(&versionInfo, "version", false, "Prints the version information")
flag.StringVar(&syncRetryInterval, "syncRetryInterval", "3600s", "The interval in seconds at which sync loop should periodically check for errors and reconcile.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to duration var


flag.Parse()
if versionInfo {
version.PrintVersionAndExit()
Expand All @@ -43,7 +47,13 @@ func main() {
}

forceNamespaced = forceNamespaced || "true" == os.Getenv("FORCENAMESPACED")
micClient, err := mic.NewMICClient(cloudconfig, config, forceNamespaced)

syncRetryDuration, err := time.ParseDuration(syncRetryInterval)
if err != nil {
glog.Fatalf("Could not read syncRetryInterval. Error %+v", err)
}

micClient, err := mic.NewMICClient(cloudconfig, config, forceNamespaced, syncRetryDuration*time.Second)
if err != nil {
glog.Fatalf("Could not get the MIC client: %+v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/crd/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (c *Client) ListPodIds(podns, podname string) (*[]aadpodid.AzureIdentity, e

var matchedIds []aadpodid.AzureIdentity
for _, v := range azAssignedIDList.(*aadpodid.AzureAssignedIdentityList).Items {
if v.Spec.Pod == podname && v.Spec.PodNamespace == podns {
if v.Spec.Pod == podname && v.Spec.PodNamespace == podns && v.Status.Status == "Assigned" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make these states a constant ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my initial plan, but the only confusion is IdentityCreated is an int (EventType) in the v1 pkg.

matchedIds = append(matchedIds, *v.Spec.AzureIdentityRef)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please add a comment in the next PR indicating the backward compatibility aspects.

}
}
Expand Down
40 changes: 24 additions & 16 deletions pkg/mic/mic.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ type NodeGetter interface {
// Client has the required pointers to talk to the api server
// and interact with the CRD related datastructure.
type Client struct {
CRDClient crd.ClientInt
CloudClient cloudprovider.ClientInt
PodClient pod.ClientInt
EventRecorder record.EventRecorder
EventChannel chan aadpodid.EventType
NodeClient NodeGetter
IsNamespaced bool
CRDClient crd.ClientInt
CloudClient cloudprovider.ClientInt
PodClient pod.ClientInt
EventRecorder record.EventRecorder
EventChannel chan aadpodid.EventType
NodeClient NodeGetter
IsNamespaced bool
syncRetryInterval time.Duration

syncing int32 // protect against conucrrent sync's
}
Expand All @@ -68,7 +69,7 @@ type trackUserAssignedMSIIds struct {
}

// NewMICClient returnes new mic client
func NewMICClient(cloudconfig string, config *rest.Config, isNamespaced bool) (*Client, error) {
func NewMICClient(cloudconfig string, config *rest.Config, isNamespaced bool, syncRetryInterval time.Duration) (*Client, error) {
glog.Infof("Starting to create the pod identity client. Version: %v. Build date: %v", version.MICVersion, version.BuildDate)

clientSet := kubernetes.NewForConfigOrDie(config)
Expand Down Expand Up @@ -96,13 +97,14 @@ func NewMICClient(cloudconfig string, config *rest.Config, isNamespaced bool) (*
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: aadpodid.CRDGroup})

return &Client{
CRDClient: crdClient,
CloudClient: cloudClient,
PodClient: podClient,
EventRecorder: recorder,
EventChannel: eventCh,
NodeClient: &NodeClient{informer.Core().V1().Nodes()},
IsNamespaced: isNamespaced,
CRDClient: crdClient,
CloudClient: cloudClient,
PodClient: podClient,
EventRecorder: recorder,
EventChannel: eventCh,
NodeClient: &NodeClient{informer.Core().V1().Nodes()},
IsNamespaced: isNamespaced,
syncRetryInterval: syncRetryInterval,
}, nil
}

Expand Down Expand Up @@ -152,20 +154,26 @@ func (c *Client) Sync(exit <-chan struct{}) {
}
defer c.setStopped()

ticker := time.NewTicker(c.syncRetryInterval)
defer ticker.Stop()

glog.Info("Sync thread started.")
var event aadpodid.EventType
for {
select {
case <-exit:
return
case event = <-c.EventChannel:
glog.V(6).Infof("Received event: %v", event)
case <-ticker.C:
glog.V(6).Infof("Running sync retry loop")
}

stats.Init()
// This is the only place where the AzureAssignedIdentity creation is initiated.
begin := time.Now()
workDone := false
glog.V(6).Infof("Received event: %v", event)

// List all pods in all namespaces
systemTime := time.Now()
listPods, err := c.PodClient.GetPods()
Expand Down
88 changes: 82 additions & 6 deletions pkg/mic/mic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,12 +556,13 @@ func (c *TestEventRecorder) AnnotatedEventf(object runtime.Object, annotations m
func NewMICTestClient(eventCh chan aadpodid.EventType, cpClient *TestCloudClient, crdClient *TestCrdClient, podClient *TestPodClient, nodeClient *TestNodeClient, eventRecorder *TestEventRecorder) *TestMICClient {

realMICClient := &Client{
CloudClient: cpClient,
CRDClient: crdClient,
EventRecorder: eventRecorder,
PodClient: podClient,
EventChannel: eventCh,
NodeClient: nodeClient,
CloudClient: cpClient,
CRDClient: crdClient,
EventRecorder: eventRecorder,
PodClient: podClient,
EventChannel: eventCh,
NodeClient: nodeClient,
syncRetryInterval: 120 * time.Second,
}

return &TestMICClient{
Expand Down Expand Up @@ -1079,6 +1080,81 @@ func TestMICStateFlow(t *testing.T) {
}
}

func TestSyncRetryLoop(t *testing.T) {
eventCh := make(chan aadpodid.EventType, 100)
cloudClient := NewTestCloudClient(config.AzureConfig{})
crdClient := NewTestCrdClient(nil)
podClient := NewTestPodClient()
nodeClient := NewTestNodeClient()
var evtRecorder TestEventRecorder
evtRecorder.lastEvent = new(LastEvent)
evtRecorder.eventChannel = make(chan bool, 100)

micClient := NewMICTestClient(eventCh, cloudClient, crdClient, podClient, nodeClient, &evtRecorder)
micClient.syncRetryInterval = 10 * time.Second

// Add a pod, identity and binding.
crdClient.CreateID("test-id1", aadpodid.UserAssignedMSI, "test-user-msi-resourceid", "test-user-msi-clientid", nil, "", "", "")
crdClient.CreateBinding("testbinding1", "test-id1", "test-select1")

nodeClient.AddNode("test-node1")
podClient.AddPod("test-pod1", "default", "test-node1", "test-select1")

eventCh <- aadpodid.PodCreated
defer micClient.testRunSync()(t)

if !evtRecorder.WaitForEvents(1) {
t.Fatalf("Timeout waiting for mic sync cycles")
}
listAssignedIDs, err := crdClient.ListAssignedIDs()
if err != nil {
glog.Error(err)
t.Errorf("list assigned failed")
}
if !(len(*listAssignedIDs) == 1) {
t.Fatalf("expected assigned identities len: %d, got: %d", 1, len(*listAssignedIDs))
}
if !((*listAssignedIDs)[0].Status.Status == IdentityAssigned) {
t.Fatalf("expected status to be %s, got: %s", IdentityCreated, (*listAssignedIDs)[0].Status.Status)
}

// delete the pod, simulate failure in cloud calls on trying to un-assign identity from node
podClient.DeletePod("test-pod1", "default")
cloudClient.SetError(errors.New("error removing identity from node"))
cloudClient.testVMClient.identity = &compute.VirtualMachineIdentity{IdentityIds: &[]string{"test-user-msi-resourceid"}}

eventCh <- aadpodid.PodDeleted
if !evtRecorder.WaitForEvents(1) {
t.Fatalf("Timeout waiting for mic sync cycles")
}

listAssignedIDs, err = crdClient.ListAssignedIDs()
if err != nil {
glog.Error(err)
t.Errorf("list assigned failed")
}
if !(len(*listAssignedIDs) == 1) {
t.Fatalf("expected assigned identities len: %d, got: %d", 1, len(*listAssignedIDs))
}
if !((*listAssignedIDs)[0].Status.Status == IdentityAssigned) {
t.Fatalf("expected status to be %s, got: %s", IdentityAssigned, (*listAssignedIDs)[0].Status.Status)
}
cloudClient.UnSetError()

if !evtRecorder.WaitForEvents(1) {
t.Fatalf("Timeout waiting for mic sync retry cycle")
}

listAssignedIDs, err = crdClient.ListAssignedIDs()
if err != nil {
glog.Error(err)
t.Errorf("list assigned failed")
}
if !(len(*listAssignedIDs) == 0) {
t.Fatalf("expected assigned identities len: %d, got: %d", 0, len(*listAssignedIDs))
}
}

func TestSyncExit(t *testing.T) {
eventCh := make(chan aadpodid.EventType)
cloudClient := NewTestCloudClient(config.AzureConfig{VMType: "vmss"})
Expand Down
58 changes: 49 additions & 9 deletions pkg/nmi/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -27,6 +28,8 @@ import (
const (
iptableUpdateTimeIntervalInSeconds = 60
localhost = "127.0.0.1"
listPodIDsRetryAttempts = 7
listPodIDsRetryIntervalInSeconds = 5
)

// Server encapsulates all of the parameters necessary for starting up
Expand Down Expand Up @@ -153,6 +156,8 @@ func (fn appHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

func (s *Server) hostHandler(logger *log.Entry, w http.ResponseWriter, r *http.Request) {
hostIP := parseRemoteAddr(r.RemoteAddr)
rqClientID, rqResource := parseRequestClientIDAndResource(r)

if hostIP != localhost {
msg := "request remote address is not from a host"
logger.Error(msg)
Expand All @@ -167,15 +172,15 @@ func (s *Server) hostHandler(logger *log.Entry, w http.ResponseWriter, r *http.R
return
}

podIDs, err := s.KubeClient.ListPodIds(podns, podname)
if err != nil || len(*podIDs) == 0 {
podIDs, err := listPodIDsWithRetry(r.Context(), s.KubeClient, logger, podns, podname, rqClientID, listPodIDsRetryAttempts)
if err != nil {
msg := fmt.Sprintf("no AzureAssignedIdentity found for pod:%s/%s", podns, podname)
logger.Errorf("%s, %+v", msg, err)
http.Error(w, msg, http.StatusForbidden)
http.Error(w, msg, http.StatusNotFound)
return
}

// filter out if we are in namesoaced mode
// filter out if we are in namespaced mode
filterPodIdentities := []aadpodid.AzureIdentity{}
for _, val := range *(podIDs) {
if s.IsNamespaced || aadpodid.IsNamespacedIdentity(&val) {
Expand All @@ -193,7 +198,6 @@ func (s *Server) hostHandler(logger *log.Entry, w http.ResponseWriter, r *http.R
}
}
podIDs = &filterPodIdentities
rqClientID, rqResource := parseRequestClientIDAndResource(r)
token, clientID, err := getTokenForMatchingID(s.KubeClient, logger, rqClientID, rqResource, podIDs)
if err != nil {
logger.Errorf("failed to get service principal token for pod:%s/%s, %+v", podns, podname, err)
Expand All @@ -220,6 +224,8 @@ func (s *Server) hostHandler(logger *log.Entry, w http.ResponseWriter, r *http.R
// configured id.
func (s *Server) msiHandler(logger *log.Entry, w http.ResponseWriter, r *http.Request) {
podIP := parseRemoteAddr(r.RemoteAddr)
rqClientID, rqResource := parseRequestClientIDAndResource(r)

if podIP == "" {
msg := "request remote address is empty"
logger.Error(msg)
Expand All @@ -232,14 +238,15 @@ func (s *Server) msiHandler(logger *log.Entry, w http.ResponseWriter, r *http.Re
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
podIDs, err := s.KubeClient.ListPodIds(podns, podname)
if err != nil || len(*podIDs) == 0 {

podIDs, err := listPodIDsWithRetry(r.Context(), s.KubeClient, logger, podns, podname, rqClientID, listPodIDsRetryAttempts)
if err != nil {
msg := fmt.Sprintf("no AzureAssignedIdentity found for pod:%s/%s", podns, podname)
logger.Errorf("%s, %+v", msg, err)
http.Error(w, msg, http.StatusForbidden)
http.Error(w, msg, http.StatusNotFound)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hosthandler and mishandler looks identical through most parts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please unify the new code which you are adding into one single function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add this as a TODO and refactor in the next PR. Have a few questions on why we check filter identities using force namespaced only in 1 handler and not another. We can go over that during the review.

return
}
rqClientID, rqResource := parseRequestClientIDAndResource(r)

token, _, err := getTokenForMatchingID(s.KubeClient, logger, rqClientID, rqResource, podIDs)
if err != nil {
logger.Errorf("failed to get service principal token for pod:%s/%s, %+v", podns, podname, err)
Expand Down Expand Up @@ -380,3 +387,36 @@ func handleTermination() {
log.Infof("Exiting with %v", exitCode)
os.Exit(exitCode)
}

func listPodIDsWithRetry(ctx context.Context, kubeClient k8s.Client, logger *log.Entry, podns, podname, rqClientID string, maxAttempts int) (*[]aadpodid.AzureIdentity, error) {
attempt := 0
var err error
var podIDs *[]aadpodid.AzureIdentity

for attempt < maxAttempts {
podIDs, err = kubeClient.ListPodIds(podns, podname)
if err == nil && len(*podIDs) != 0 {
if len(rqClientID) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are two kinds of error, one is that after a few seconds, we still did not find the AzureAssignedIdentity and another is that it never moved to Assigned. Not able to gather that different from retry attempts and varying time durations for them..?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry attempts are not different based on the states. The final desired state is Assigned. So I think this encompasses all possible scenarios.

return podIDs, nil
}
// if client id exists in request, we need to ensure the identity with this client
// exists and is in Assigned state
for _, podID := range *podIDs {
if strings.EqualFold(rqClientID, podID.Spec.ClientID) {
return podIDs, nil
}
}
}

attempt++

select {
case <-time.After(listPodIDsRetryIntervalInSeconds * time.Second):
case <-ctx.Done():
err = ctx.Err()
return nil, err
}
logger.Warningf("failed to get assigned ids for pod:%s/%s, retrying attempt: %d", podns, podname, attempt)
}
return nil, fmt.Errorf("getting assigned identities for pod %s/%s failed after %d attempts. Error: %v", podns, podname, maxAttempts, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this error propagate to the caller in the application ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this will be propagated if it fails after retries.

}
4 changes: 2 additions & 2 deletions test/e2e/aadpodidentity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ var _ = Describe("Kubernetes cluster using aad-pod-identity", func() {
fmt.Println("\nTearing down the test environment...")

// Ensure a clean cluster after the end of each test
cmd := exec.Command("kubectl", "delete", "AzureIdentity,AzureIdentityBinding,AzureAssignedIdentity", "--all")
cmd := exec.Command("kubectl", "delete", "AzureIdentity,AzureIdentityBinding", "--all")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing azureassignedidentity from the delete command. It should be deleted by MIC once the identity and binding are removed and it's a good test to ensure we reliably delete the assigned identities.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if that is the exception we should add the check at the end to see if no AzureAssignedIdentities exists and fail the test if it does.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already ensure the length of azureassignedidentities is equal to 0 at the end of AfterEach.

util.PrintCommand(cmd)
_, err := cmd.CombinedOutput()
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -627,7 +627,7 @@ func setUpIdentityAndDeployment(azureIdentityName, suffix, replicas string) {
Expect(err).NotTo(HaveOccurred())
Expect(ok).To(Equal(true))

time.Sleep(30 * time.Second)
//time.Sleep(30 * time.Second)
}

// validateAzureAssignedIdentity will make sure a given AzureAssignedIdentity has the correct properties
Expand Down