Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

NMI retries and ticker for periodic sync reconcile #272

Merged
merged 8 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ e2e:

.PHONY: unit-test
unit-test:
go test $(shell go list ./... | grep -v /test/e2e) -v
go test -count=1 $(shell go list ./... | grep -v /test/e2e) -v
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the usage of count here ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to ensure the test runs at least once instead of showing the cached result.


.PHONY: validate-version
validate-version: validate-version-NMI validate-version-MIC validate-version-IDENTITY_VALIDATOR validate-version-DEMO
Expand Down
20 changes: 15 additions & 5 deletions cmd/mic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"os"
"time"

"github.com/Azure/aad-pod-identity/pkg/mic"
"github.com/Azure/aad-pod-identity/version"
Expand All @@ -12,10 +13,11 @@ import (
)

var (
kubeconfig string
cloudconfig string
forceNamespaced bool
versionInfo bool
kubeconfig string
cloudconfig string
forceNamespaced bool
versionInfo bool
syncRetryInterval string
)

func main() {
Expand All @@ -24,6 +26,8 @@ func main() {
flag.StringVar(&cloudconfig, "cloudconfig", "", "Path to cloud config e.g. Azure.json file")
flag.BoolVar(&forceNamespaced, "forceNamespaced", false, "Forces namespaced identities, binding, and assignment")
flag.BoolVar(&versionInfo, "version", false, "Prints the version information")
flag.StringVar(&syncRetryInterval, "syncRetryInterval", "3600s", "The interval in seconds at which sync loop should periodically check for errors and reconcile.")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to duration var


flag.Parse()
if versionInfo {
version.PrintVersionAndExit()
Expand All @@ -43,7 +47,13 @@ func main() {
}

forceNamespaced = forceNamespaced || "true" == os.Getenv("FORCENAMESPACED")
micClient, err := mic.NewMICClient(cloudconfig, config, forceNamespaced)

syncRetryDuration, err := time.ParseDuration(syncRetryInterval)
if err != nil {
glog.Fatalf("Could not read syncRetryInterval. Error %+v", err)
}

micClient, err := mic.NewMICClient(cloudconfig, config, forceNamespaced, syncRetryDuration)
if err != nil {
glog.Fatalf("Could not get the MIC client: %+v", err)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/apis/aadpodidentity/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,14 @@ const (
CRDLabelKey = "aadpodidbinding"

BehaviorKey = "aadpodidentity.k8s.io/Behavior"
// Namespaced Behavior
// BehaviorNamespaced ...
BehaviorNamespaced = "namespaced"
// AssignedIDCreated status indicates azure assigned identity is created
AssignedIDCreated = "Created"
// AssignedIDAssigned status indicates identity has been assigned to the node
AssignedIDAssigned = "Assigned"
// AssignedIDUnAssigned status indicates identity has been unassigned from the node
AssignedIDUnAssigned = "Unassigned"
)

/*** Global data structures ***/
Expand Down
14 changes: 7 additions & 7 deletions pkg/crd/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ClientInt interface {
ListBindings() (res *[]aadpodid.AzureIdentityBinding, err error)
ListAssignedIDs() (res *[]aadpodid.AzureAssignedIdentity, err error)
ListIds() (res *[]aadpodid.AzureIdentity, err error)
ListPodIds(podns, podname string) (*[]aadpodid.AzureIdentity, error)
ListPodIds(podns, podname string) (map[string][]aadpodid.AzureIdentity, error)
}

func NewCRDClientLite(config *rest.Config) (crdClient *Client, err error) {
Expand Down Expand Up @@ -257,22 +257,22 @@ func (c *Client) ListIds() (res *[]aadpodid.AzureIdentity, err error) {
return &ret.(*aadpodid.AzureIdentityList).Items, nil
}

//ListPodIds - given a pod with pod name space
func (c *Client) ListPodIds(podns, podname string) (*[]aadpodid.AzureIdentity, error) {
// ListPodIds - given a pod with pod name space
// returns a map with list of azure identities in each state
func (c *Client) ListPodIds(podns, podname string) (map[string][]aadpodid.AzureIdentity, error) {
azAssignedIDList, err := c.AssignedIDListWatch.List(v1.ListOptions{})
if err != nil {
glog.Error(err)
return nil, err
}

var matchedIds []aadpodid.AzureIdentity
idStateMap := make(map[string][]aadpodid.AzureIdentity)
for _, v := range azAssignedIDList.(*aadpodid.AzureAssignedIdentityList).Items {
if v.Spec.Pod == podname && v.Spec.PodNamespace == podns {
matchedIds = append(matchedIds, *v.Spec.AzureIdentityRef)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please add a comment in the next PR indicating the backward compatibility aspects.

idStateMap[v.Status.Status] = append(idStateMap[v.Status.Status], *v.Spec.AzureIdentityRef)
}
}

return &matchedIds, nil
return idStateMap, nil
}

type patchStatusOps struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Client interface {
// GetPodName return the matching azure identity or nil
GetPodName(podip string) (podns, podname string, err error)
// ListPodIds pod matching azure identity or nil
ListPodIds(podns, podname string) (*[]aadpodid.AzureIdentity, error)
ListPodIds(podns, podname string) (map[string][]aadpodid.AzureIdentity, error)
// GetSecret returns secret the secretRef represents
GetSecret(secretRef *v1.SecretReference) (*v1.Secret, error)
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func GetLocalIP() (string, error) {
}

// ListPodIds lists matching ids for pod or error
func (c *KubeClient) ListPodIds(podns, podname string) (*[]aadpodid.AzureIdentity, error) {
func (c *KubeClient) ListPodIds(podns, podname string) (map[string][]aadpodid.AzureIdentity, error) {
return c.CrdClient.ListPodIds(podns, podname)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/k8s/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package k8s

import (
aadpodid "github.com/Azure/aad-pod-identity/pkg/apis/aadpodidentity/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

// FakeClient implements Interface
Expand All @@ -23,7 +23,7 @@ func (c *FakeClient) GetPodName(podip string) (podns, podname string, err error)
}

// ListPodIds for pod
func (c *FakeClient) ListPodIds(podns string, podname string) (*[]aadpodid.AzureIdentity, error) {
func (c *FakeClient) ListPodIds(podns, podname string) (map[string][]aadpodid.AzureIdentity, error) {
return nil, nil
}

Expand Down
66 changes: 34 additions & 32 deletions pkg/mic/mic.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ import (
const (
stopped = int32(0)
running = int32(1)
// IdentityCreated status indicates azure assigned identity is created
IdentityCreated = "Created"
// IdentityAssigned status indicates identity has been assigned to the node
IdentityAssigned = "Assigned"
// IdentityUnassigned status indicates identity has been unassigned from the node
IdentityUnassigned = "Unassigned"
)

// NodeGetter ...
Expand All @@ -43,13 +37,14 @@ type NodeGetter interface {
// Client has the required pointers to talk to the api server
// and interact with the CRD related datastructure.
type Client struct {
CRDClient crd.ClientInt
CloudClient cloudprovider.ClientInt
PodClient pod.ClientInt
EventRecorder record.EventRecorder
EventChannel chan aadpodid.EventType
NodeClient NodeGetter
IsNamespaced bool
CRDClient crd.ClientInt
CloudClient cloudprovider.ClientInt
PodClient pod.ClientInt
EventRecorder record.EventRecorder
EventChannel chan aadpodid.EventType
NodeClient NodeGetter
IsNamespaced bool
syncRetryInterval time.Duration

syncing int32 // protect against conucrrent sync's
}
Expand All @@ -68,7 +63,7 @@ type trackUserAssignedMSIIds struct {
}

// NewMICClient returnes new mic client
func NewMICClient(cloudconfig string, config *rest.Config, isNamespaced bool) (*Client, error) {
func NewMICClient(cloudconfig string, config *rest.Config, isNamespaced bool, syncRetryInterval time.Duration) (*Client, error) {
glog.Infof("Starting to create the pod identity client. Version: %v. Build date: %v", version.MICVersion, version.BuildDate)

clientSet := kubernetes.NewForConfigOrDie(config)
Expand Down Expand Up @@ -96,13 +91,14 @@ func NewMICClient(cloudconfig string, config *rest.Config, isNamespaced bool) (*
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: aadpodid.CRDGroup})

return &Client{
CRDClient: crdClient,
CloudClient: cloudClient,
PodClient: podClient,
EventRecorder: recorder,
EventChannel: eventCh,
NodeClient: &NodeClient{informer.Core().V1().Nodes()},
IsNamespaced: isNamespaced,
CRDClient: crdClient,
CloudClient: cloudClient,
PodClient: podClient,
EventRecorder: recorder,
EventChannel: eventCh,
NodeClient: &NodeClient{informer.Core().V1().Nodes()},
IsNamespaced: isNamespaced,
syncRetryInterval: syncRetryInterval,
}, nil
}

Expand Down Expand Up @@ -152,20 +148,26 @@ func (c *Client) Sync(exit <-chan struct{}) {
}
defer c.setStopped()

ticker := time.NewTicker(c.syncRetryInterval)
defer ticker.Stop()

glog.Info("Sync thread started.")
var event aadpodid.EventType
for {
select {
case <-exit:
return
case event = <-c.EventChannel:
glog.V(6).Infof("Received event: %v", event)
case <-ticker.C:
glog.V(6).Infof("Running sync retry loop")
}

stats.Init()
// This is the only place where the AzureAssignedIdentity creation is initiated.
begin := time.Now()
workDone := false
glog.V(6).Infof("Received event: %v", event)

// List all pods in all namespaces
systemTime := time.Now()
listPods, err := c.PodClient.GetPods()
Expand Down Expand Up @@ -355,7 +357,7 @@ func (c *Client) getListOfIdsToDelete(deleteList, newAssignedIDs []aadpodid.Azur
isUserAssignedMSI := c.checkIfUserAssignedMSI(id)

// this case includes Assigned state and empty state to ensure backward compatability
if delID.Status.Status == IdentityAssigned || delID.Status.Status == "" {
if delID.Status.Status == aadpodid.AssignedIDAssigned || delID.Status.Status == "" {
if !inUse && isUserAssignedMSI {
c.appendToRemoveListForNode(id.Spec.ResourceID, delID.Spec.NodeName, nodeMap)
}
Expand All @@ -370,7 +372,7 @@ func (c *Client) getListOfIdsToAssign(addList []aadpodid.AzureAssignedIdentity,
id := createID.Spec.AzureIdentityRef
isUserAssignedMSI := c.checkIfUserAssignedMSI(id)

if createID.Status.Status == "" || createID.Status.Status == IdentityCreated {
if createID.Status.Status == "" || createID.Status.Status == aadpodid.AssignedIDCreated {
if isUserAssignedMSI {
c.appendToAddListForNode(id.Spec.ResourceID, createID.Spec.NodeName, nodeMap)
}
Expand Down Expand Up @@ -417,7 +419,7 @@ func (c *Client) getAzureAssignedIDsToCreate(old, new []aadpodid.AzureAssignedId
// if the old assigned id is in created state, then the identity assignment to the node
// is not done. Adding to the list will ensure we retry identity assignment to node for
// this assigned identity.
if oldAssignedID.Status.Status == IdentityCreated {
if oldAssignedID.Status.Status == aadpodid.AssignedIDCreated {
create = append(create, oldAssignedID)
}
break
Expand Down Expand Up @@ -641,7 +643,7 @@ func (c *Client) updateUserMSI(newAssignedIDs []aadpodid.AzureAssignedIdentity,
// this is the state when the azure assigned identity is yet to be created
glog.V(5).Infof("Initiating assigned id creation for pod - %s, binding - %s", createID.Spec.Pod, binding.Name)

createID.Status.Status = IdentityCreated
createID.Status.Status = aadpodid.AssignedIDCreated
err := c.createAssignedIdentity(&createID)
if err != nil {
c.EventRecorder.Event(binding, corev1.EventTypeWarning, "binding apply error",
Expand Down Expand Up @@ -686,8 +688,8 @@ func (c *Client) updateUserMSI(newAssignedIDs []aadpodid.AzureAssignedIdentity,
fmt.Sprintf("Binding %s applied on node %s for pod %s", binding.Name, createID.Spec.NodeName, createID.Name))

// Identity is successfully assigned to node, so update the status of assigned identity to assigned
if updateErr := c.updateAssignedIdentityStatus(&createID, IdentityAssigned); updateErr != nil {
message := fmt.Sprintf("Updating assigned identity %s status to %s for pod %s failed with error %v", createID.Name, IdentityAssigned, createID.Spec.Pod, updateErr.Error())
if updateErr := c.updateAssignedIdentityStatus(&createID, aadpodid.AssignedIDAssigned); updateErr != nil {
message := fmt.Sprintf("Updating assigned identity %s status to %s for pod %s failed with error %v", createID.Name, aadpodid.AssignedIDAssigned, createID.Spec.Pod, updateErr.Error())
c.EventRecorder.Event(&createID, corev1.EventTypeWarning, "status update error", message)
glog.Error(message)
}
Expand Down Expand Up @@ -734,9 +736,9 @@ func (c *Client) updateUserMSI(newAssignedIDs []aadpodid.AzureAssignedIdentity,
for _, createID := range nodeTrackList.assignedIDsToCreate {
binding := createID.Spec.AzureBindingRef
// update the status to assigned for assigned identity as identity was successfully assigned to node.
err = c.updateAssignedIdentityStatus(&createID, IdentityAssigned)
err = c.updateAssignedIdentityStatus(&createID, aadpodid.AssignedIDAssigned)
if err != nil {
message := fmt.Sprintf("Updating assigned identity %s status to %s for pod %s failed with error %v", createID.Name, IdentityAssigned, createID.Spec.Pod, err.Error())
message := fmt.Sprintf("Updating assigned identity %s status to %s for pod %s failed with error %v", createID.Name, aadpodid.AssignedIDAssigned, createID.Spec.Pod, err.Error())
c.EventRecorder.Event(&createID, corev1.EventTypeWarning, "status update error", message)
glog.Error(message)
continue
Expand All @@ -750,9 +752,9 @@ func (c *Client) updateUserMSI(newAssignedIDs []aadpodid.AzureAssignedIdentity,

// update the status for the assigned identity to Unassigned as the identity has been successfully removed from node.
// this will ensure on next sync loop we only try to delete the assigned identity instead of doing everything.
err = c.updateAssignedIdentityStatus(&delID, IdentityUnassigned)
err = c.updateAssignedIdentityStatus(&delID, aadpodid.AssignedIDUnAssigned)
if err != nil {
message := fmt.Sprintf("Updating assigned identity %s status to %s for pod %s failed with error %v", delID.Name, IdentityUnassigned, delID.Spec.Pod, err.Error())
message := fmt.Sprintf("Updating assigned identity %s status to %s for pod %s failed with error %v", delID.Name, aadpodid.AssignedIDUnAssigned, delID.Spec.Pod, err.Error())
c.EventRecorder.Event(&delID, corev1.EventTypeWarning, "status update error", message)
glog.Error(message)
continue
Expand Down
Loading