Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cvr, delete): ensure cvr gets deleted during reconcile #1263

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions cmd/cstor-pool-mgmt/controller/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ const (
// InitialImportedPoolVol is to store pool-volume names while pod restart.
var InitialImportedPoolVol []string

// QueueLoad is for storing the key and type of operation before entering workqueue
// QueueLoad represents the payload of the workqueue
//
// It stores the key and corresponding type of operation
type QueueLoad struct {
Key string
Operation QueueOperation
Expand All @@ -139,15 +141,19 @@ const (
OpenEBSIOCStorID Environment = "OPENEBS_IO_CSTOR_ID"
)

//QueueOperation represents the type of operation on resource
// QueueOperation determines the type of operation
// that needs to be executed on the watched resource
type QueueOperation string

//Different type of operations on the controller
// Different type of operations that can be
// supported by the controller/watcher logic
const (
QOpAdd QueueOperation = "add"
QOpDestroy QueueOperation = "destroy"
QOpModify QueueOperation = "modify"
// QOpSync is the operation for syncing(reconciling) on cstor pool object.

// QOpSync is the operation to reconcile
// cstor pool resource
QOpSync QueueOperation = "Sync"
)

Expand Down
23 changes: 8 additions & 15 deletions cmd/cstor-pool-mgmt/controller/replica-controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,10 @@ func IsOnlyStatusChange(oldCVR, newCVR *apis.CStorVolumeReplica) bool {
return false
}

// IsDeletionFailedBefore is to make sure no other operation should happen if the
// status of CStorVolumeReplica is deletion-failed.
func IsDeletionFailedBefore(cVR *apis.CStorVolumeReplica) bool {
if cVR.Status.Phase == apis.CVRStatusDeletionFailed {
return true
}
return false
// IsDeletionFailedBefore flags if status of
// cvr is CVRStatusDeletionFailed
func IsDeletionFailedBefore(cvrObj *apis.CStorVolumeReplica) bool {
return cvrObj.Status.Phase == apis.CVRStatusDeletionFailed
}

// IsStatusOnline is to check if the status of cStorVolumeReplica object is
Expand Down Expand Up @@ -402,14 +399,10 @@ func IsRecreateStatus(cVR *apis.CStorVolumeReplica) bool {
return false
}

// IsErrorDuplicate is to check if the status of cStorVolumeReplica object is error-duplicate.
func IsErrorDuplicate(cVR *apis.CStorVolumeReplica) bool {
if string(cVR.Status.Phase) == string(apis.CVRStatusErrorDuplicate) {
glog.Infof("cVR duplication error: %v", string(cVR.ObjectMeta.UID))
return true
}
glog.V(4).Infof("Not error duplicate status: %v", string(cVR.ObjectMeta.UID))
return false
// IsErrorDuplicate flags if cvr resource is a duplicate
// entry
func IsErrorDuplicate(cvrObj *apis.CStorVolumeReplica) bool {
return cvrObj.Status.Phase == apis.CVRStatusErrorDuplicate
}

// getCVRStatus is a wrapper that fetches the status of cstor volume.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (

const replicaControllerName = "CStorVolumeReplica"

// CStorVolumeReplicaController is the controller implementation for cStorVolumeReplica resources.
// CStorVolumeReplicaController is the controller
// for CVR resources.
type CStorVolumeReplicaController struct {
// kubeclientset is a standard kubernetes clientset.
kubeclientset kubernetes.Interface
Expand All @@ -59,119 +60,201 @@ type CStorVolumeReplicaController struct {
recorder record.EventRecorder
}

// NewCStorVolumeReplicaController returns a new cStor Replica controller instance
// NewCStorVolumeReplicaController returns a new instance
// of CVR controller
func NewCStorVolumeReplicaController(
kubeclientset kubernetes.Interface,
clientset clientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
cStorInformerFactory informers.SharedInformerFactory) *CStorVolumeReplicaController {
cStorInformerFactory informers.SharedInformerFactory,
) *CStorVolumeReplicaController {

// obtain references to shared index informers for the cStorReplica resources.
cStorReplicaInformer := cStorInformerFactory.Openebs().V1alpha1().CStorVolumeReplicas()
// obtain references to shared index informers
// for CVR resources.
cvrInformer := cStorInformerFactory.
Openebs().
V1alpha1().
CStorVolumeReplicas()

err := openebsScheme.AddToScheme(scheme.Scheme)
if err != nil {
glog.Errorf("failed to add to scheme: error {%v}", err)
glog.Errorf("failed to initialise cvr controller: %v", err)
}
// Create event broadcaster
// Add cStor-Replica-controller types to the default Kubernetes Scheme so Events can be
// logged for cStor-Replica-controller types.
glog.V(4).Info("Creating event broadcaster")

// add cvr controller types to default Kubernetes scheme
// to enable logging of cvr contrller events
glog.V(4).Info("creating event broadcaster for cvr")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)

// StartEventWatcher starts sending events received from this EventBroadcaster to the given
// event handler function. The return value can be ignored or used to stop recording, if
// desired. Events("") denotes empty namespace
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: replicaControllerName})
// start sending events received from this event broadcaster
// to the assigned event handler
//
// Its return value can be ignored or used to stop recording, if
// desired.
//
// Events("") denotes empty namespace
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{
Interface: kubeclientset.CoreV1().Events(""),
},
)
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
corev1.EventSource{Component: replicaControllerName},
)

controller := &CStorVolumeReplicaController{
kubeclientset: kubeclientset,
clientset: clientset,
cStorReplicaSynced: cStorReplicaInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CStorVolumeReplica"),
recorder: recorder,
cStorReplicaSynced: cvrInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
"CStorVolumeReplica",
),
recorder: recorder,
}

glog.Info("Setting up event handlers")

// Instantiating QueueLoad before entering workqueue.
q := common.QueueLoad{}

// Set up an event handler for when cStorReplica resources change.
cStorReplicaInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cVR := obj.(*apis.CStorVolumeReplica)
if !IsRightCStorVolumeReplica(cVR) {
return
}
if IsDeletionFailedBefore(cVR) || IsErrorDuplicate(cVR) {
return
}
q.Operation = common.QOpAdd
glog.Infof("cStorVolumeReplica Added event : %v, %v", cVR.ObjectMeta.Name, string(cVR.ObjectMeta.UID))
controller.recorder.Event(cVR, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.MessageCreateSynced))

// For New request phase of cVR will be empty
// ToDO: Need to have an annotation in CSP and CVR which will state
// about recreation events.
// For every restart of the cstor-pool-mgmt container replica
// watcher will get add event
if IsEmptyStatus(cVR) {
cVR.Status.Phase = apis.CVRStatusInit
} else {
cVR.Status.Phase = apis.CVRStatusRecreate
}

cVR, _ = controller.clientset.OpenebsV1alpha1().CStorVolumeReplicas(cVR.Namespace).Update(cVR)
controller.enqueueCStorReplica(cVR, q)
},
UpdateFunc: func(old, new interface{}) {
newCVR := new.(*apis.CStorVolumeReplica)
oldCVR := old.(*apis.CStorVolumeReplica)
if !IsRightCStorVolumeReplica(newCVR) {
return
}
if IsOnlyStatusChange(oldCVR, newCVR) {
glog.Infof("Only cVR status change: %v, %v", newCVR.ObjectMeta.Name, string(newCVR.ObjectMeta.UID))
return
}
// ToDo: Need to have statuses in more organised manner
// ToDo: IsErrorDuplicate(newCVR) is ignored as of now.

// If DeletionFails -- cVR will not be reconciled.
// Anyway -- due to removal of finalizers this cVR will not exist for reconciliation. But this code
// is present if we might need to add finalizers.
if IsDeletionFailedBefore(newCVR) {
return
}
// Periodic resync will send update events for all known cStorReplica.
// Two different versions of the same cStorReplica will always have different RVs.
if newCVR.ResourceVersion == oldCVR.ResourceVersion {
q.Operation = common.QOpSync
glog.Infof("CstorVolumeReplica status sync event for %s", newCVR.ObjectMeta.Name)
controller.recorder.Event(newCVR, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.StatusSynced))
} else if IsDestroyEvent(newCVR) {
q.Operation = common.QOpDestroy
glog.Infof("cStorVolumeReplica Destroy event : %v, %v", newCVR.ObjectMeta.Name, string(newCVR.ObjectMeta.UID))
controller.recorder.Event(newCVR, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.MessageDestroySynced))
} else {
q.Operation = common.QOpModify
glog.Infof("cStorVolumeReplica Modify event : %v, %v", newCVR.ObjectMeta.Name, string(newCVR.ObjectMeta.UID))
controller.recorder.Event(newCVR, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.MessageModifySynced))
return // will be removed once modify is implemented
}
controller.enqueueCStorReplica(newCVR, q)
},
DeleteFunc: func(obj interface{}) {
cVR := obj.(*apis.CStorVolumeReplica)
if !IsRightCStorVolumeReplica(cVR) {
return
}
glog.Infof("cVR Resource deleted event: %v, %v", cVR.ObjectMeta.Name, string(cVR.ObjectMeta.UID))
},
})
glog.Info("will set up informer event handlers for cvr")

ql := common.QueueLoad{}

cvrInformer.Informer().
AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cvrObj := obj.(*apis.CStorVolumeReplica)

if !IsRightCStorVolumeReplica(cvrObj) || IsErrorDuplicate(cvrObj) {
// do nothing
return
}

glog.V(4).Infof(
"received informer add event for cvr {%s}",
cvrObj.Name,
)

controller.recorder.Event(
cvrObj,
corev1.EventTypeNormal,
string(common.SuccessSynced),
string(common.MessageCreateSynced),
)

// new cvr requests will have phase as blank
//
// NOTE:
// for every restart of controller container
// this informer handler will get add event
// for each cvr resource present in k8s
if IsEmptyStatus(cvrObj) {
cvrObj.Status.Phase = apis.CVRStatusInit
} else {
cvrObj.Status.Phase = apis.CVRStatusRecreate
}

cvrObj, _ = controller.clientset.
OpenebsV1alpha1().
CStorVolumeReplicas(cvrObj.Namespace).
Update(cvrObj)

// push this operation to workqueue
ql.Operation = common.QOpAdd
controller.enqueueCStorReplica(cvrObj, ql)
},

// TODO @amitkumardas
//
// Need to think of writing more manageable code
// In the current code, ordering of conditions
// is very important. I am sure these conditions
// will only increase as we release more versions.
//
// This logic has tried to handle multiple
// responsibilities. IMO this particular informer
// handler should act only as a **switch** to continue
// to handle this change further or just reject this
// change.
//
// We need to do a good job to categorise these
// **if** conditions. Only the reject related conditions
// should be here. Other conditions should be part
// of actual business logic that handles change to
// a resource.
UpdateFunc: func(old, new interface{}) {
newCVR := new.(*apis.CStorVolumeReplica)
oldCVR := old.(*apis.CStorVolumeReplica)

if !IsRightCStorVolumeReplica(newCVR) {
// do nothing
return
}

glog.V(4).Infof(
"received informer update event for cvr {%s}",
newCVR.Name,
)

if IsDestroyEvent(newCVR) {
controller.recorder.Event(
newCVR,
corev1.EventTypeNormal,
string(common.SuccessSynced),
string(common.MessageDestroySynced),
)

// push this operation to workqueue
ql.Operation = common.QOpDestroy
controller.enqueueCStorReplica(newCVR, ql)
return
}

if IsErrorDuplicate(newCVR) || IsOnlyStatusChange(oldCVR, newCVR) {
// do nothing
return
}

if newCVR.ResourceVersion != oldCVR.ResourceVersion {
// cvr modify is not implemented
// hence below is commented
// ql.Operation = common.QOpModify

controller.recorder.Event(
newCVR,
corev1.EventTypeNormal,
string(common.SuccessSynced),
string(common.MessageModifySynced),
)

// no further handling needed
return
}

// finally !!!
controller.recorder.Event(
newCVR,
corev1.EventTypeNormal,
string(common.SuccessSynced),
string(common.StatusSynced),
)

// push this operation to workqueue
ql.Operation = common.QOpSync
controller.enqueueCStorReplica(newCVR, ql)
},

DeleteFunc: func(obj interface{}) {
cvrObj := obj.(*apis.CStorVolumeReplica)

glog.V(4).Infof(
"received informer delete event for cvr {%s}",
cvrObj.Name,
)

// this is a noop since cvr delete is
// handled in UpdateFunc
},
})

return controller
}
Expand Down
Loading