Skip to content

Commit

Permalink
fix(cvr, delete): ensure cvr gets deleted during reconcile (openebs-a…
Browse files Browse the repository at this point in the history
…rchive#1263)

This commit has following changes:
- remove the check where reconcile was avoided in case
of previous delete failures
- reduce instances of log flooding
- improve log messages
- modify & indent code to make it more readable

Related PR(s):
- openebs-archive#1256
This commit tries to fix the symptom 2 talked in PR 1256

Signed-off-by: AmitKumarDas <[email protected]>
  • Loading branch information
Amit Kumar Das authored and prateekpandey14 committed Jun 19, 2019
1 parent 8ec5e59 commit 6b037ae
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 127 deletions.
14 changes: 10 additions & 4 deletions cmd/cstor-pool-mgmt/controller/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ const (
// InitialImportedPoolVol is to store pool-volume names while pod restart.
var InitialImportedPoolVol []string

// QueueLoad is for storing the key and type of operation before entering workqueue
// QueueLoad represents the payload of the workqueue
//
// It stores the key and corresponding type of operation
type QueueLoad struct {
Key string
Operation QueueOperation
Expand All @@ -139,15 +141,19 @@ const (
OpenEBSIOCStorID Environment = "OPENEBS_IO_CSTOR_ID"
)

//QueueOperation represents the type of operation on resource
// QueueOperation determines the type of operation
// that needs to be executed on the watched resource
type QueueOperation string

//Different type of operations on the controller
// Different type of operations that can be
// supported by the controller/watcher logic
const (
QOpAdd QueueOperation = "add"
QOpDestroy QueueOperation = "destroy"
QOpModify QueueOperation = "modify"
// QOpSync is the operation for syncing(reconciling) on cstor pool object.

// QOpSync is the operation to reconcile
// cstor pool resource
QOpSync QueueOperation = "Sync"
)

Expand Down
23 changes: 8 additions & 15 deletions cmd/cstor-pool-mgmt/controller/replica-controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,10 @@ func IsOnlyStatusChange(oldCVR, newCVR *apis.CStorVolumeReplica) bool {
return false
}

// IsDeletionFailedBefore is to make sure no other operation should happen if the
// status of CStorVolumeReplica is deletion-failed.
func IsDeletionFailedBefore(cVR *apis.CStorVolumeReplica) bool {
if cVR.Status.Phase == apis.CVRStatusDeletionFailed {
return true
}
return false
// IsDeletionFailedBefore flags if status of
// cvr is CVRStatusDeletionFailed
func IsDeletionFailedBefore(cvrObj *apis.CStorVolumeReplica) bool {
return cvrObj.Status.Phase == apis.CVRStatusDeletionFailed
}

// IsStatusOnline is to check if the status of cStorVolumeReplica object is
Expand Down Expand Up @@ -402,14 +399,10 @@ func IsRecreateStatus(cVR *apis.CStorVolumeReplica) bool {
return false
}

// IsErrorDuplicate is to check if the status of cStorVolumeReplica object is error-duplicate.
func IsErrorDuplicate(cVR *apis.CStorVolumeReplica) bool {
if string(cVR.Status.Phase) == string(apis.CVRStatusErrorDuplicate) {
glog.Infof("cVR duplication error: %v", string(cVR.ObjectMeta.UID))
return true
}
glog.V(4).Infof("Not error duplicate status: %v", string(cVR.ObjectMeta.UID))
return false
// IsErrorDuplicate flags if cvr resource is a duplicate
// entry
func IsErrorDuplicate(cvrObj *apis.CStorVolumeReplica) bool {
return cvrObj.Status.Phase == apis.CVRStatusErrorDuplicate
}

// getCVRStatus is a wrapper that fetches the status of cstor volume.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (

const replicaControllerName = "CStorVolumeReplica"

// CStorVolumeReplicaController is the controller implementation for cStorVolumeReplica resources.
// CStorVolumeReplicaController is the controller
// for CVR resources.
type CStorVolumeReplicaController struct {
// kubeclientset is a standard kubernetes clientset.
kubeclientset kubernetes.Interface
Expand All @@ -59,119 +60,201 @@ type CStorVolumeReplicaController struct {
recorder record.EventRecorder
}

// NewCStorVolumeReplicaController returns a new cStor Replica controller instance
// NewCStorVolumeReplicaController returns a new instance
// of CVR controller
func NewCStorVolumeReplicaController(
kubeclientset kubernetes.Interface,
clientset clientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
cStorInformerFactory informers.SharedInformerFactory) *CStorVolumeReplicaController {
cStorInformerFactory informers.SharedInformerFactory,
) *CStorVolumeReplicaController {

// obtain references to shared index informers for the cStorReplica resources.
cStorReplicaInformer := cStorInformerFactory.Openebs().V1alpha1().CStorVolumeReplicas()
// obtain references to shared index informers
// for CVR resources.
cvrInformer := cStorInformerFactory.
Openebs().
V1alpha1().
CStorVolumeReplicas()

err := openebsScheme.AddToScheme(scheme.Scheme)
if err != nil {
glog.Errorf("failed to add to scheme: error {%v}", err)
glog.Errorf("failed to initialise cvr controller: %v", err)
}
// Create event broadcaster
// Add cStor-Replica-controller types to the default Kubernetes Scheme so Events can be
// logged for cStor-Replica-controller types.
glog.V(4).Info("Creating event broadcaster")

// add cvr controller types to default Kubernetes scheme
// to enable logging of cvr contrller events
glog.V(4).Info("creating event broadcaster for cvr")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)

// StartEventWatcher starts sending events received from this EventBroadcaster to the given
// event handler function. The return value can be ignored or used to stop recording, if
// desired. Events("") denotes empty namespace
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: replicaControllerName})
// start sending events received from this event broadcaster
// to the assigned event handler
//
// Its return value can be ignored or used to stop recording, if
// desired.
//
// Events("") denotes empty namespace
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{
Interface: kubeclientset.CoreV1().Events(""),
},
)
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
corev1.EventSource{Component: replicaControllerName},
)

controller := &CStorVolumeReplicaController{
kubeclientset: kubeclientset,
clientset: clientset,
cStorReplicaSynced: cStorReplicaInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "CStorVolumeReplica"),
recorder: recorder,
cStorReplicaSynced: cvrInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(
workqueue.DefaultControllerRateLimiter(),
"CStorVolumeReplica",
),
recorder: recorder,
}

glog.Info("Setting up event handlers")

// Instantiating QueueLoad before entering workqueue.
q := common.QueueLoad{}

// Set up an event handler for when cStorReplica resources change.
cStorReplicaInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cVR := obj.(*apis.CStorVolumeReplica)
if !IsRightCStorVolumeReplica(cVR) {
return
}
if IsDeletionFailedBefore(cVR) || IsErrorDuplicate(cVR) {
return
}
q.Operation = common.QOpAdd
glog.Infof("cStorVolumeReplica Added event : %v, %v", cVR.ObjectMeta.Name, string(cVR.ObjectMeta.UID))
controller.recorder.Event(cVR, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.MessageCreateSynced))

// For New request phase of cVR will be empty
// ToDO: Need to have an annotation in CSP and CVR which will state
// about recreation events.
// For every restart of the cstor-pool-mgmt container replica
// watcher will get add event
if IsEmptyStatus(cVR) {
cVR.Status.Phase = apis.CVRStatusInit
} else {
cVR.Status.Phase = apis.CVRStatusRecreate
}

cVR, _ = controller.clientset.OpenebsV1alpha1().CStorVolumeReplicas(cVR.Namespace).Update(cVR)
controller.enqueueCStorReplica(cVR, q)
},
UpdateFunc: func(old, new interface{}) {
newCVR := new.(*apis.CStorVolumeReplica)
oldCVR := old.(*apis.CStorVolumeReplica)
if !IsRightCStorVolumeReplica(newCVR) {
return
}
if IsOnlyStatusChange(oldCVR, newCVR) {
glog.Infof("Only cVR status change: %v, %v", newCVR.ObjectMeta.Name, string(newCVR.ObjectMeta.UID))
return
}
// ToDo: Need to have statuses in more organised manner
// ToDo: IsErrorDuplicate(newCVR) is ignored as of now.

// If DeletionFails -- cVR will not be reconciled.
// Anyway -- due to removal of finalizers this cVR will not exist for reconciliation. But this code
// is present if we might need to add finalizers.
if IsDeletionFailedBefore(newCVR) {
return
}
// Periodic resync will send update events for all known cStorReplica.
// Two different versions of the same cStorReplica will always have different RVs.
if newCVR.ResourceVersion == oldCVR.ResourceVersion {
q.Operation = common.QOpSync
glog.Infof("CstorVolumeReplica status sync event for %s", newCVR.ObjectMeta.Name)
controller.recorder.Event(newCVR, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.StatusSynced))
} else if IsDestroyEvent(newCVR) {
q.Operation = common.QOpDestroy
glog.Infof("cStorVolumeReplica Destroy event : %v, %v", newCVR.ObjectMeta.Name, string(newCVR.ObjectMeta.UID))
controller.recorder.Event(newCVR, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.MessageDestroySynced))
} else {
q.Operation = common.QOpModify
glog.Infof("cStorVolumeReplica Modify event : %v, %v", newCVR.ObjectMeta.Name, string(newCVR.ObjectMeta.UID))
controller.recorder.Event(newCVR, corev1.EventTypeNormal, string(common.SuccessSynced), string(common.MessageModifySynced))
return // will be removed once modify is implemented
}
controller.enqueueCStorReplica(newCVR, q)
},
DeleteFunc: func(obj interface{}) {
cVR := obj.(*apis.CStorVolumeReplica)
if !IsRightCStorVolumeReplica(cVR) {
return
}
glog.Infof("cVR Resource deleted event: %v, %v", cVR.ObjectMeta.Name, string(cVR.ObjectMeta.UID))
},
})
glog.Info("will set up informer event handlers for cvr")

ql := common.QueueLoad{}

cvrInformer.Informer().
AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cvrObj := obj.(*apis.CStorVolumeReplica)

if !IsRightCStorVolumeReplica(cvrObj) || IsErrorDuplicate(cvrObj) {
// do nothing
return
}

glog.V(4).Infof(
"received informer add event for cvr {%s}",
cvrObj.Name,
)

controller.recorder.Event(
cvrObj,
corev1.EventTypeNormal,
string(common.SuccessSynced),
string(common.MessageCreateSynced),
)

// new cvr requests will have phase as blank
//
// NOTE:
// for every restart of controller container
// this informer handler will get add event
// for each cvr resource present in k8s
if IsEmptyStatus(cvrObj) {
cvrObj.Status.Phase = apis.CVRStatusInit
} else {
cvrObj.Status.Phase = apis.CVRStatusRecreate
}

cvrObj, _ = controller.clientset.
OpenebsV1alpha1().
CStorVolumeReplicas(cvrObj.Namespace).
Update(cvrObj)

// push this operation to workqueue
ql.Operation = common.QOpAdd
controller.enqueueCStorReplica(cvrObj, ql)
},

// TODO @amitkumardas
//
// Need to think of writing more manageable code
// In the current code, ordering of conditions
// is very important. I am sure these conditions
// will only increase as we release more versions.
//
// This logic has tried to handle multiple
// responsibilities. IMO this particular informer
// handler should act only as a **switch** to continue
// to handle this change further or just reject this
// change.
//
// We need to do a good job to categorise these
// **if** conditions. Only the reject related conditions
// should be here. Other conditions should be part
// of actual business logic that handles change to
// a resource.
UpdateFunc: func(old, new interface{}) {
newCVR := new.(*apis.CStorVolumeReplica)
oldCVR := old.(*apis.CStorVolumeReplica)

if !IsRightCStorVolumeReplica(newCVR) {
// do nothing
return
}

glog.V(4).Infof(
"received informer update event for cvr {%s}",
newCVR.Name,
)

if IsDestroyEvent(newCVR) {
controller.recorder.Event(
newCVR,
corev1.EventTypeNormal,
string(common.SuccessSynced),
string(common.MessageDestroySynced),
)

// push this operation to workqueue
ql.Operation = common.QOpDestroy
controller.enqueueCStorReplica(newCVR, ql)
return
}

if IsErrorDuplicate(newCVR) || IsOnlyStatusChange(oldCVR, newCVR) {
// do nothing
return
}

if newCVR.ResourceVersion != oldCVR.ResourceVersion {
// cvr modify is not implemented
// hence below is commented
// ql.Operation = common.QOpModify

controller.recorder.Event(
newCVR,
corev1.EventTypeNormal,
string(common.SuccessSynced),
string(common.MessageModifySynced),
)

// no further handling needed
return
}

// finally !!!
controller.recorder.Event(
newCVR,
corev1.EventTypeNormal,
string(common.SuccessSynced),
string(common.StatusSynced),
)

// push this operation to workqueue
ql.Operation = common.QOpSync
controller.enqueueCStorReplica(newCVR, ql)
},

DeleteFunc: func(obj interface{}) {
cvrObj := obj.(*apis.CStorVolumeReplica)

glog.V(4).Infof(
"received informer delete event for cvr {%s}",
cvrObj.Name,
)

// this is a noop since cvr delete is
// handled in UpdateFunc
},
})

return controller
}
Expand Down
Loading

0 comments on commit 6b037ae

Please sign in to comment.