Skip to content

Commit

Permalink
Skip new session if another is already running (#1264)
Browse files Browse the repository at this point in the history
Signed-off-by: Emruz Hossain <[email protected]>
  • Loading branch information
hossainemruz authored Nov 17, 2020
1 parent db1a200 commit c05e046
Show file tree
Hide file tree
Showing 12 changed files with 294 additions and 30 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
kmodules.xyz/openshift v0.0.0-20201105073146-0da509a7d39f
kmodules.xyz/prober v0.0.0-20201105074402-a243b3a27fd8
kmodules.xyz/webhook-runtime v0.0.0-20201105073856-2dc7382b88c6
stash.appscode.dev/apimachinery v0.11.6
stash.appscode.dev/apimachinery v0.11.7-0.20201112091441-902c1702db9d
)

replace bitbucket.org/ww/goautoneg => gomodules.xyz/goautoneg v0.0.0-20120707110453-a547fc61f48d
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,6 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=
stash.appscode.dev/apimachinery v0.11.6 h1:6JPVYugF6YdeSuLtFqeXTyi6k04owD6E6PxDTBpRsn4=
stash.appscode.dev/apimachinery v0.11.6/go.mod h1:JQtqMYDOMkcxmcsUnOiRhi/BIvFWlceAIcauBw1a48M=
stash.appscode.dev/apimachinery v0.11.7-0.20201112091441-902c1702db9d h1:JCLs2opPfBdyGI4/ouQtqrLzS7VGHtYcQ5dnDwx78Bk=
stash.appscode.dev/apimachinery v0.11.7-0.20201112091441-902c1702db9d/go.mod h1:QnuI/VQTYxiT5ikxHqwOmqO96emPJPykbxfEp7pgjhk=
vbom.ml/util v0.0.0-20160121211510-db5cfe13f5cc/go.mod h1:so/NYdZXCz+E3ZpW0uAoCj6uzU2+8OWDFv/HxUSs7kI=
84 changes: 78 additions & 6 deletions pkg/controller/backup_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
core "k8s.io/api/core/v1"
kerr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -118,9 +119,10 @@ func (c *StashController) runBackupSessionProcessor(key string) error {
}

func (c *StashController) applyBackupSessionReconciliationLogic(backupSession *api_v1beta1.BackupSession) error {
// ================= Don't Process Completed BackupSession ===========================
// ================= Don't Process Completed/Skipped BackupSession ===========================
if backupSession.Status.Phase == api_v1beta1.BackupSessionFailed ||
backupSession.Status.Phase == api_v1beta1.BackupSessionSucceeded {
backupSession.Status.Phase == api_v1beta1.BackupSessionSucceeded ||
backupSession.Status.Phase == api_v1beta1.BackupSessionSkipped {
log.Infof("Skipping processing BackupSession %s/%s. Reason: phase is %q.",
backupSession.Namespace,
backupSession.Name,
Expand All @@ -144,6 +146,24 @@ func (c *StashController) applyBackupSessionReconciliationLogic(backupSession *a

// check whether backup session is completed or running and set it's phase accordingly
phase, err := c.getBackupSessionPhase(backupSession)

// if current BackupSession phase is "Pending" and there is already another BackupSession in `Running` state for this invoker,
// then we should skip the current session
if phase == api_v1beta1.BackupSessionPending {
// check if there is another BackupSession in running state.
runningBS, err := c.getRunningBackupSessionForInvoker(inv)
if err != nil {
return err
}
if runningBS != nil {
log.Infof("Skipped taking new backup. Reason: Previous BackupSession: %s is %q.",
runningBS.Name,
runningBS.Status.Phase,
)
return c.setBackupSessionSkipped(inv, backupSession, runningBS)
}
}

var condErr error

// ==================== Execute Global PostBackup Hooks ===========================
Expand Down Expand Up @@ -712,6 +732,37 @@ func (c *StashController) setBackupSessionFailed(inv invoker.BackupInvoker, back
return errors.NewAggregate([]error{backupErr, err})
}

func (c *StashController) setBackupSessionSkipped(inv invoker.BackupInvoker, currentBS, runningBS *api_v1beta1.BackupSession) error {

// set BackupSession phase to "Skipped"
_, statusErr := stash_util.UpdateBackupSessionStatus(
context.TODO(),
c.stashClient.StashV1beta1(),
currentBS.ObjectMeta,
func(in *api_v1beta1.BackupSessionStatus) (types.UID, *api_v1beta1.BackupSessionStatus) {
in.Phase = api_v1beta1.BackupSessionSkipped
return currentBS.UID, in
},
metav1.UpdateOptions{},
)

// write failure event to BackupSession
_, _ = eventer.CreateEvent(
c.kubeClient,
eventer.EventSourceBackupSessionController,
currentBS,
core.EventTypeWarning,
eventer.EventReasonBackupSessionSkipped,
fmt.Sprintf("Skipped taking new backup. Reason: Previous BackupSession: %s is %q.",
runningBS.Name,
runningBS.Status.Phase,
))

// cleanup old BackupSessions
err := c.cleanupBackupHistory(currentBS.Spec.Invoker, currentBS.Namespace, inv.BackupHistoryLimit)
return errors.NewAggregate([]error{statusErr, err})
}

func (c *StashController) getBackupSessionPhase(backupSession *api_v1beta1.BackupSession) (api_v1beta1.BackupSessionPhase, error) {
// BackupSession phase is empty or "Pending" then return it. controller will process accordingly
if backupSession.Status.Phase == "" ||
Expand Down Expand Up @@ -819,9 +870,12 @@ func (c *StashController) cleanupBackupHistory(backupInvokerRef api_v1beta1.Back

// delete the BackupSession that does not fit within the history limit
for i := int(historyLimit); i < len(bsList); i++ {
err = c.stashClient.StashV1beta1().BackupSessions(namespace).Delete(context.TODO(), bsList[i].Name, meta.DeleteInBackground())
if err != nil && !(kerr.IsNotFound(err) || kerr.IsGone(err)) {
return err
// delete only the BackupSessions that has completed its backup
if backupCompleted(bsList[i].Status.Phase) {
err = c.stashClient.StashV1beta1().BackupSessions(namespace).Delete(context.TODO(), bsList[i].Name, meta.DeleteInBackground())
if err != nil && !(kerr.IsNotFound(err) || kerr.IsGone(err)) {
return err
}
}
}
return nil
Expand Down Expand Up @@ -852,7 +906,9 @@ func backupExecutor(inv invoker.BackupInvoker, tref api_v1beta1.TargetRef) strin
}

func backupCompleted(phase api_v1beta1.BackupSessionPhase) bool {
return phase == api_v1beta1.BackupSessionFailed || phase == api_v1beta1.BackupSessionSucceeded
return phase == api_v1beta1.BackupSessionFailed ||
phase == api_v1beta1.BackupSessionSucceeded ||
phase == api_v1beta1.BackupSessionSkipped
}

func globalPostBackupHookExecuted(inv invoker.BackupInvoker, backupSession *api_v1beta1.BackupSession) bool {
Expand Down Expand Up @@ -901,3 +957,19 @@ func postBackupActionsSucceeded(conditions []kmapi.Condition) (bool, string) {

return true, ""
}

func (c *StashController) getRunningBackupSessionForInvoker(inv invoker.BackupInvoker) (*api_v1beta1.BackupSession, error) {
backupSessions, err := c.backupSessionLister.List(labels.SelectorFromSet(map[string]string{
apis.LabelInvokerName: inv.ObjectMeta.Name,
apis.LabelInvokerType: inv.TypeMeta.Kind,
}))
if err != nil {
return nil, err
}
for i := range backupSessions {
if backupSessions[i].Status.Phase == api_v1beta1.BackupSessionRunning {
return backupSessions[i], nil
}
}
return nil, nil
}
2 changes: 1 addition & 1 deletion test/e2e/framework/backup_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (fi *Invocation) GetBackupConfiguration(repoName string, transformFuncs ...
// hence, set the schedule to a large interval so that no backup schedule appear before completing running backup
// we will use manual triggering for taking backup. this will help us to avoid waiting for fixed interval before each backup
// and the problem mentioned above
Schedule: "59 * * * *",
Schedule: "0 0 * 12 *",
RetentionPolicy: v1alpha1.RetentionPolicy{
Name: "keep-last-5",
KeepLast: 5,
Expand Down
60 changes: 50 additions & 10 deletions test/e2e/framework/backup_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package framework

import (
Expand All @@ -27,6 +26,7 @@ import (
. "github.com/onsi/gomega"
"gomodules.xyz/x/crypto/rand"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

func (f *Framework) EventuallyBackupSessionPhase(meta metav1.ObjectMeta) GomegaAsyncAssertion {
Expand Down Expand Up @@ -102,7 +102,7 @@ func (fi *Invocation) TriggerInstantBackup(objMeta metav1.ObjectMeta, invokerRef
return fi.StashClient.StashV1beta1().BackupSessions(backupSession.Namespace).Create(context.TODO(), backupSession, metav1.CreateOptions{})
}

func (fi *Invocation) EventuallyBackupCount(invokerMeta metav1.ObjectMeta, invokerKind string) GomegaAsyncAssertion {
func (fi *Invocation) EventuallySuccessfulBackupCount(invokerMeta metav1.ObjectMeta, invokerKind string) GomegaAsyncAssertion {
return Eventually(func() int64 {
count, err := fi.GetSuccessfulBackupSessionCount(invokerMeta, invokerKind)
if err != nil {
Expand All @@ -112,17 +112,40 @@ func (fi *Invocation) EventuallyBackupCount(invokerMeta metav1.ObjectMeta, invok
}, WaitTimeOut, PullInterval)
}

func (fi *Invocation) EventuallySkippedBackupCount(invokerMeta metav1.ObjectMeta, invokerKind string) GomegaAsyncAssertion {
return Eventually(func() int64 {
count, err := fi.GetSkippedBackupSessionCount(invokerMeta, invokerKind)
if err != nil {
return 0
}
return count
}, WaitTimeOut, PullInterval)
}

func (fi *Invocation) GetSuccessfulBackupSessionCount(invokerMeta metav1.ObjectMeta, invokerKind string) (int64, error) {
backupSessions, err := fi.StashClient.StashV1beta1().BackupSessions(fi.namespace).List(context.TODO(), metav1.ListOptions{})
backupSessions, err := fi.GetBackupSessionsForInvoker(invokerMeta, invokerKind)
if err != nil {
return 0, err
}

count := int64(0)
for _, bs := range backupSessions.Items {
if bs.Spec.Invoker.Kind == invokerKind &&
bs.Spec.Invoker.Name == invokerMeta.Name &&
bs.Status.Phase == v1beta1.BackupSessionSucceeded {
if bs.Status.Phase == v1beta1.BackupSessionSucceeded {
count++
}
}
return count, nil
}

func (fi *Invocation) GetSkippedBackupSessionCount(invokerMeta metav1.ObjectMeta, invokerKind string) (int64, error) {
backupSessions, err := fi.GetBackupSessionsForInvoker(invokerMeta, invokerKind)
if err != nil {
return 0, err
}

count := int64(0)
for _, bs := range backupSessions.Items {
if bs.Status.Phase == v1beta1.BackupSessionSkipped {
count++
}
}
Expand All @@ -131,17 +154,34 @@ func (fi *Invocation) GetSuccessfulBackupSessionCount(invokerMeta metav1.ObjectM

func (fi *Invocation) EventuallyRunningBackupCompleted(invokerMeta metav1.ObjectMeta, invokerKind string) GomegaAsyncAssertion {
return Eventually(func() bool {
backupSessions, err := fi.StashClient.StashV1beta1().BackupSessions(fi.namespace).List(context.TODO(), metav1.ListOptions{})
backupSessions, err := fi.GetBackupSessionsForInvoker(invokerMeta, invokerKind)
if err != nil {
return false
}
for _, bs := range backupSessions.Items {
if bs.Spec.Invoker.Kind == invokerKind &&
bs.Spec.Invoker.Name == invokerMeta.Name &&
bs.Status.Phase == v1beta1.BackupSessionRunning {
if bs.Status.Phase == v1beta1.BackupSessionRunning {
return false
}
}
return true
}, WaitTimeOut, PullInterval)
}

func (fi *Invocation) EventuallyBackupSessionCount(invokerMeta metav1.ObjectMeta, invokerKind string) GomegaAsyncAssertion {
return Eventually(func() int {
bsList, err := fi.GetBackupSessionsForInvoker(invokerMeta, invokerKind)
if err != nil {
return 0
}
return len(bsList.Items)
}, WaitTimeOut, PullInterval)
}

func (fi *Invocation) GetBackupSessionsForInvoker(invokerMeta metav1.ObjectMeta, invokerKind string) (*v1beta1.BackupSessionList, error) {
return fi.StashClient.StashV1beta1().BackupSessions(fi.namespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
apis.LabelInvokerName: invokerMeta.Name,
apis.LabelInvokerType: invokerKind,
}).String(),
})
}
Loading

0 comments on commit c05e046

Please sign in to comment.