From 33fc37cd5e2674f67a3433de30140e6f07cb5e36 Mon Sep 17 00:00:00 2001 From: "Md. Emruz Hossain" Date: Fri, 26 Jul 2019 19:35:05 +0600 Subject: [PATCH] Add support to restore using volumeClaimTemplate in Function-Task model (#841) --- pkg/controller/restore_session.go | 199 +++++++++++++++--------------- pkg/util/job.go | 2 + 2 files changed, 101 insertions(+), 100 deletions(-) diff --git a/pkg/controller/restore_session.go b/pkg/controller/restore_session.go index 10cf26db6..fbc28bd9b 100644 --- a/pkg/controller/restore_session.go +++ b/pkg/controller/restore_session.go @@ -186,9 +186,15 @@ func (c *StashController) runRestoreSessionProcessor(key string) error { } func (c *StashController) ensureRestoreJob(restoreSession *api_v1beta1.RestoreSession) error { + image := docker.Docker{ + Registry: c.DockerRegistry, + Image: docker.ImageStash, + Tag: c.StashImageTag, + } + offshootLabels := restoreSession.OffshootLabels() - objectMeta := metav1.ObjectMeta{ + jobMeta := metav1.ObjectMeta{ Name: RestoreJobPrefix + restoreSession.Name, Namespace: restoreSession.Namespace, Labels: offshootLabels, @@ -207,9 +213,9 @@ func (c *StashController) ensureRestoreJob(restoreSession *api_v1beta1.RestoreSe serviceAccountName = restoreSession.Spec.RuntimeSettings.Pod.ServiceAccountName } else { // ServiceAccount hasn't been specified. so create new one with same name as RestoreSession object. - serviceAccountName = objectMeta.Name + serviceAccountName = jobMeta.Name - _, _, err = core_util.CreateOrPatchServiceAccount(c.kubeClient, objectMeta, func(in *core.ServiceAccount) *core.ServiceAccount { + _, _, err = core_util.CreateOrPatchServiceAccount(c.kubeClient, jobMeta, func(in *core.ServiceAccount) *core.ServiceAccount { core_util.EnsureOwnerReference(&in.ObjectMeta, ref) in.Labels = offshootLabels return in @@ -238,23 +244,92 @@ func (c *StashController) ensureRestoreJob(restoreSession *api_v1beta1.RestoreSe return err } - // Now there could be two restore scenario for restoring through job. - // 1. Restore target is a Database or a existing PVC. In this case, we need to resolve Task and Function then create a job to restore. - // 2. VolumeClaimTemplate has been specified. In this case, we have to create the PVCs then restore on it. We will create one job for each replicas to restore in parallel. + // Now, there could be two restore scenario for restoring through job. + // 1. Restore process follows Function-Task model. In this case, we have to resolve respective Functions and Task to get desired job definition. + // 2. Restore process does not follow Function-Task model. In this case, we have to generate simple volume restorer job definition. + + var jobTemplate *core.PodTemplateSpec - // Check whether VolumeClaimTemplate is specified. If so, create the PVCs and create restore job for each replicas. - // Otherwise, resolve task template and create a single job. - if restoreSession.Spec.Target != nil && restoreSession.Spec.Target.VolumeClaimTemplates != nil { - return c.createPVCThenRestore(restoreSession, repository, objectMeta, ref, serviceAccountName) + if restoreSession.Spec.Task.Name != "" { + // Restore process follows Function-Task model. So, resolve Function and Task to get desired job definition. + jobTemplate, err = c.resolveRestoreTask(restoreSession, repository, jobMeta, ref, serviceAccountName) + if err != nil { + return err + } } else { - return c.resolveTaskThenRestore(restoreSession, repository, objectMeta, ref, serviceAccountName) + // Restore process does not follow Function-Task model. So, generate simple volume restorer job definition. + jobTemplate, err = util.NewPVCRestorerJob(restoreSession, repository, image, jobMeta) + if err != nil { + return err + } } + // If volumeClaimTemplate is not specified then we don't need any further processing. Just, create the job + if restoreSession.Spec.Target == nil || + (restoreSession.Spec.Target != nil && len(restoreSession.Spec.Target.VolumeClaimTemplates) == 0) { + return c.createRestoreJob(jobTemplate, jobTemplate.ObjectMeta, ref, serviceAccountName) + } + + // volumeClaimTemplate has been specified. Now, we have to do the following for each replica: + // 1. Create PVCs according to the template. + // 2. Mount the PVCs to the restore job. + // 3. Create the restore job to restore the target. + + replicas := int32(1) // set default replicas to 1 + if restoreSession.Spec.Target.Replicas != nil { + replicas = *restoreSession.Spec.Target.Replicas + } + + for ordinal := int32(0); ordinal < replicas; ordinal++ { + // resolve template part of the volumeClaimTemplate and generate PVC definition according to the template + pvcList, err := getPVCFromVolumeClaimTemplates(ordinal, restoreSession.Spec.Target.VolumeClaimTemplates) + if err != nil { + return err + } + + // create the PVCs + err = util.CreateBatchPVC(c.kubeClient, restoreSession.Namespace, pvcList) + if err != nil { + return err + } + + // add PVCs as volume to the job + volumes := util.PVCListToVolumes(pvcList, ordinal) + jobTemplate.Spec.Volumes = core_util.UpsertVolume(jobTemplate.Spec.Volumes, volumes...) + + // add ordinal suffix to the job name so that multiple restore job can run concurrently + jobTemplate.Name = fmt.Sprintf("%s-%d", jobMeta.Name, ordinal) + + // create restore job + err = c.createRestoreJob(jobTemplate, jobTemplate.ObjectMeta, ref, serviceAccountName) + if err != nil { + return err + } + } + return nil +} + +func (c *StashController) createRestoreJob(jobTemplate *core.PodTemplateSpec, meta metav1.ObjectMeta, ref *core.ObjectReference, serviceAccountName string) error { + _, _, err := batch_util.CreateOrPatchJob(c.kubeClient, meta, func(in *batchv1.Job) *batchv1.Job { + // set BackupSession as owner of this Job + core_util.EnsureOwnerReference(&in.ObjectMeta, ref) + + if in.Labels == nil { + in.Labels = make(map[string]string, 0) + } + // ensure that job gets deleted when complete + in.Labels[apis.KeyDeleteJobOnCompletion] = "false" + + in.Spec.Template = *jobTemplate + in.Spec.Template.Spec.ServiceAccountName = serviceAccountName + return in + }) + return err } -// resolveTaskThenRestore resolves Functions and Tasks then create a restore job to restore the target. -func (c *StashController) resolveTaskThenRestore(restoreSession *api_v1beta1.RestoreSession, - repository *api_v1alpha1.Repository, meta metav1.ObjectMeta, ref *core.ObjectReference, serviceAccountName string) error { +// resolveRestoreTask resolves Functions and Tasks then returns a job definition to restore the target. +func (c *StashController) resolveRestoreTask(restoreSession *api_v1beta1.RestoreSession, + repository *api_v1alpha1.Repository, meta metav1.ObjectMeta, ref *core.ObjectReference, serviceAccountName string) (*core.PodTemplateSpec, error) { // resolve task template explicitInputs := make(map[string]string) @@ -264,11 +339,11 @@ func (c *StashController) resolveTaskThenRestore(restoreSession *api_v1beta1.Res repoInputs, err := c.inputsForRepository(repository) if err != nil { - return fmt.Errorf("cannot resolve implicit inputs for Repository %s/%s, reason: %s", repository.Namespace, repository.Name, err) + return nil, fmt.Errorf("cannot resolve implicit inputs for Repository %s/%s, reason: %s", repository.Namespace, repository.Name, err) } rsInputs, err := c.inputsForRestoreSession(*restoreSession) if err != nil { - return fmt.Errorf("cannot resolve implicit inputs for RestoreSession %s/%s, reason: %s", restoreSession.Namespace, restoreSession.Name, err) + return nil, fmt.Errorf("cannot resolve implicit inputs for RestoreSession %s/%s, reason: %s", restoreSession.Namespace, restoreSession.Name, err) } implicitInputs := core_util.UpsertMap(repoInputs, rsInputs) @@ -300,95 +375,19 @@ func (c *StashController) resolveTaskThenRestore(restoreSession *api_v1beta1.Res podSpec, err := taskResolver.GetPodSpec() if err != nil { - return err + return nil, err } + // for local backend, attach volume to all containers if repository.Spec.Backend.Local != nil { podSpec = util.AttachLocalBackend(podSpec, *repository.Spec.Backend.Local) } - // create Restore Job - _, _, err = batch_util.CreateOrPatchJob(c.kubeClient, meta, func(in *batchv1.Job) *batchv1.Job { - // set RestoreSession as owner of this Job - core_util.EnsureOwnerReference(&in.ObjectMeta, ref) - - in.Labels = restoreSession.OffshootLabels() - // restore job is created by resolving task and function. we should not delete it when it goes to completed state. - // user might need to know what was the final resolved job specification for debugging purpose. - in.Labels[apis.KeyDeleteJobOnCompletion] = "false" - - in.Spec.Template.Spec = podSpec - in.Spec.Template.Spec.ServiceAccountName = serviceAccountName - return in - }) - - return err -} - -// createPVCThenRestore creates PVCs according to the VolumeClaimTemplate specified in RestoreSession target -// creates one job for each PVC to restore in parallel. -func (c *StashController) createPVCThenRestore(restoreSession *api_v1beta1.RestoreSession, - repository *api_v1alpha1.Repository, meta metav1.ObjectMeta, ref *core.ObjectReference, serviceAccountName string) error { - // Create PVCs specified in VolumeClaimTemplate - replicas := int32(1) - if restoreSession.Spec.Target.Replicas != nil { - replicas = *restoreSession.Spec.Target.Replicas - } - for ordinal := int32(0); ordinal < replicas; ordinal++ { - pvcList, err := GetPVCFromVolumeClaimTemplates(ordinal, restoreSession.Spec.Target.VolumeClaimTemplates) - if err != nil { - return err - } - - err = util.CreateBatchPVC(c.kubeClient, restoreSession.Namespace, pvcList) - if err != nil { - return err - } - - jobMeta := meta - jobMeta.Name = fmt.Sprintf("%s-%d", meta.Name, ordinal) - err = c.createPVCRestorerJob(restoreSession, repository, jobMeta, ref, serviceAccountName, util.PVCListToVolumes(pvcList, ordinal)) - if err != nil { - return err - } - } - - return nil -} - -func (c *StashController) createPVCRestorerJob(restoreSession *api_v1beta1.RestoreSession, repository *api_v1alpha1.Repository, - meta metav1.ObjectMeta, ref *core.ObjectReference, serviceAccountName string, volumes []core.Volume) error { - image := docker.Docker{ - Registry: c.DockerRegistry, - Image: docker.ImageStash, - Tag: c.StashImageTag, + podTemplate := &core.PodTemplateSpec{ + ObjectMeta: meta, + Spec: podSpec, } - - jobTemplate, err := util.NewPVCRestorerJob(restoreSession, repository, image, meta) - if err != nil { - return err - } - - // add PVCs to volume list of the job - jobTemplate.Spec.Volumes = core_util.UpsertVolume(jobTemplate.Spec.Volumes, volumes...) - - // Create restore Job - _, _, err = batch_util.CreateOrPatchJob(c.kubeClient, meta, func(in *batchv1.Job) *batchv1.Job { - // set BackupSession as owner of this Job - core_util.EnsureOwnerReference(&in.ObjectMeta, ref) - - if in.Labels == nil { - in.Labels = make(map[string]string, 0) - } - // ensure that job gets deleted when complete - in.Labels[apis.KeyDeleteJobOnCompletion] = "true" - - in.Spec.Template = *jobTemplate - in.Spec.Template.Spec.ServiceAccountName = serviceAccountName - return in - }) - - return err + return podTemplate, nil } func (c *StashController) ensureVolumeRestorerJob(restoreSession *api_v1beta1.RestoreSession) error { @@ -598,8 +597,8 @@ func (c *StashController) getRestoreSessionPhase(restoreSession *api_v1beta1.Res return api_v1beta1.RestoreSessionSucceeded, nil } -//GetPVCFromVolumeClaimTemplates return PVC list from VolumeClaimTemplate -func GetPVCFromVolumeClaimTemplates(ordinal int32, claimTemplates []core.PersistentVolumeClaim) ([]core.PersistentVolumeClaim, error) { +// getPVCFromVolumeClaimTemplates returns list of PVCs generated according to the VolumeClaimTemplate +func getPVCFromVolumeClaimTemplates(ordinal int32, claimTemplates []core.PersistentVolumeClaim) ([]core.PersistentVolumeClaim, error) { pvcList := make([]core.PersistentVolumeClaim, 0) for _, claim := range claimTemplates { inputs := make(map[string]string) diff --git a/pkg/util/job.go b/pkg/util/job.go index f09cb89c8..27effdecb 100644 --- a/pkg/util/job.go +++ b/pkg/util/job.go @@ -198,6 +198,7 @@ func NewRecoveryJob(stashClient cs.Interface, recovery *api_v1alpha1.Recovery, i return job, nil } +// NewPVCRestorerJob return a job definition to restore pvc. func NewPVCRestorerJob(rs *api_v1beta1.RestoreSession, repository *api_v1alpha1.Repository, image docker.Docker, meta metav1.ObjectMeta) (*core.PodTemplateSpec, error) { container := core.Container{ Name: StashContainer, @@ -275,6 +276,7 @@ func NewPVCRestorerJob(rs *api_v1beta1.RestoreSession, repository *api_v1alpha1. } jobTemplate := &core.PodTemplateSpec{ + ObjectMeta: meta, Spec: core.PodSpec{ Containers: []core.Container{container}, RestartPolicy: core.RestartPolicyNever,