Skip to content

Commit

Permalink
Add support to restore using volumeClaimTemplate in Function-Task mod…
Browse files Browse the repository at this point in the history
…el (#841)
  • Loading branch information
Md. Emruz Hossain authored and tamalsaha committed Jul 26, 2019
1 parent 3de534b commit 33fc37c
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 100 deletions.
199 changes: 99 additions & 100 deletions pkg/controller/restore_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,15 @@ func (c *StashController) runRestoreSessionProcessor(key string) error {
}

func (c *StashController) ensureRestoreJob(restoreSession *api_v1beta1.RestoreSession) error {
image := docker.Docker{
Registry: c.DockerRegistry,
Image: docker.ImageStash,
Tag: c.StashImageTag,
}

offshootLabels := restoreSession.OffshootLabels()

objectMeta := metav1.ObjectMeta{
jobMeta := metav1.ObjectMeta{
Name: RestoreJobPrefix + restoreSession.Name,
Namespace: restoreSession.Namespace,
Labels: offshootLabels,
Expand All @@ -207,9 +213,9 @@ func (c *StashController) ensureRestoreJob(restoreSession *api_v1beta1.RestoreSe
serviceAccountName = restoreSession.Spec.RuntimeSettings.Pod.ServiceAccountName
} else {
// ServiceAccount hasn't been specified. so create new one with same name as RestoreSession object.
serviceAccountName = objectMeta.Name
serviceAccountName = jobMeta.Name

_, _, err = core_util.CreateOrPatchServiceAccount(c.kubeClient, objectMeta, func(in *core.ServiceAccount) *core.ServiceAccount {
_, _, err = core_util.CreateOrPatchServiceAccount(c.kubeClient, jobMeta, func(in *core.ServiceAccount) *core.ServiceAccount {
core_util.EnsureOwnerReference(&in.ObjectMeta, ref)
in.Labels = offshootLabels
return in
Expand Down Expand Up @@ -238,23 +244,92 @@ func (c *StashController) ensureRestoreJob(restoreSession *api_v1beta1.RestoreSe
return err
}

// Now there could be two restore scenario for restoring through job.
// 1. Restore target is a Database or a existing PVC. In this case, we need to resolve Task and Function then create a job to restore.
// 2. VolumeClaimTemplate has been specified. In this case, we have to create the PVCs then restore on it. We will create one job for each replicas to restore in parallel.
// Now, there could be two restore scenario for restoring through job.
// 1. Restore process follows Function-Task model. In this case, we have to resolve respective Functions and Task to get desired job definition.
// 2. Restore process does not follow Function-Task model. In this case, we have to generate simple volume restorer job definition.

var jobTemplate *core.PodTemplateSpec

// Check whether VolumeClaimTemplate is specified. If so, create the PVCs and create restore job for each replicas.
// Otherwise, resolve task template and create a single job.
if restoreSession.Spec.Target != nil && restoreSession.Spec.Target.VolumeClaimTemplates != nil {
return c.createPVCThenRestore(restoreSession, repository, objectMeta, ref, serviceAccountName)
if restoreSession.Spec.Task.Name != "" {
// Restore process follows Function-Task model. So, resolve Function and Task to get desired job definition.
jobTemplate, err = c.resolveRestoreTask(restoreSession, repository, jobMeta, ref, serviceAccountName)
if err != nil {
return err
}
} else {
return c.resolveTaskThenRestore(restoreSession, repository, objectMeta, ref, serviceAccountName)
// Restore process does not follow Function-Task model. So, generate simple volume restorer job definition.
jobTemplate, err = util.NewPVCRestorerJob(restoreSession, repository, image, jobMeta)
if err != nil {
return err
}
}

// If volumeClaimTemplate is not specified then we don't need any further processing. Just, create the job
if restoreSession.Spec.Target == nil ||
(restoreSession.Spec.Target != nil && len(restoreSession.Spec.Target.VolumeClaimTemplates) == 0) {
return c.createRestoreJob(jobTemplate, jobTemplate.ObjectMeta, ref, serviceAccountName)
}

// volumeClaimTemplate has been specified. Now, we have to do the following for each replica:
// 1. Create PVCs according to the template.
// 2. Mount the PVCs to the restore job.
// 3. Create the restore job to restore the target.

replicas := int32(1) // set default replicas to 1
if restoreSession.Spec.Target.Replicas != nil {
replicas = *restoreSession.Spec.Target.Replicas
}

for ordinal := int32(0); ordinal < replicas; ordinal++ {
// resolve template part of the volumeClaimTemplate and generate PVC definition according to the template
pvcList, err := getPVCFromVolumeClaimTemplates(ordinal, restoreSession.Spec.Target.VolumeClaimTemplates)
if err != nil {
return err
}

// create the PVCs
err = util.CreateBatchPVC(c.kubeClient, restoreSession.Namespace, pvcList)
if err != nil {
return err
}

// add PVCs as volume to the job
volumes := util.PVCListToVolumes(pvcList, ordinal)
jobTemplate.Spec.Volumes = core_util.UpsertVolume(jobTemplate.Spec.Volumes, volumes...)

// add ordinal suffix to the job name so that multiple restore job can run concurrently
jobTemplate.Name = fmt.Sprintf("%s-%d", jobMeta.Name, ordinal)

// create restore job
err = c.createRestoreJob(jobTemplate, jobTemplate.ObjectMeta, ref, serviceAccountName)
if err != nil {
return err
}
}
return nil
}

func (c *StashController) createRestoreJob(jobTemplate *core.PodTemplateSpec, meta metav1.ObjectMeta, ref *core.ObjectReference, serviceAccountName string) error {
_, _, err := batch_util.CreateOrPatchJob(c.kubeClient, meta, func(in *batchv1.Job) *batchv1.Job {
// set BackupSession as owner of this Job
core_util.EnsureOwnerReference(&in.ObjectMeta, ref)

if in.Labels == nil {
in.Labels = make(map[string]string, 0)
}
// ensure that job gets deleted when complete
in.Labels[apis.KeyDeleteJobOnCompletion] = "false"

in.Spec.Template = *jobTemplate
in.Spec.Template.Spec.ServiceAccountName = serviceAccountName
return in
})
return err
}

// resolveTaskThenRestore resolves Functions and Tasks then create a restore job to restore the target.
func (c *StashController) resolveTaskThenRestore(restoreSession *api_v1beta1.RestoreSession,
repository *api_v1alpha1.Repository, meta metav1.ObjectMeta, ref *core.ObjectReference, serviceAccountName string) error {
// resolveRestoreTask resolves Functions and Tasks then returns a job definition to restore the target.
func (c *StashController) resolveRestoreTask(restoreSession *api_v1beta1.RestoreSession,
repository *api_v1alpha1.Repository, meta metav1.ObjectMeta, ref *core.ObjectReference, serviceAccountName string) (*core.PodTemplateSpec, error) {

// resolve task template
explicitInputs := make(map[string]string)
Expand All @@ -264,11 +339,11 @@ func (c *StashController) resolveTaskThenRestore(restoreSession *api_v1beta1.Res

repoInputs, err := c.inputsForRepository(repository)
if err != nil {
return fmt.Errorf("cannot resolve implicit inputs for Repository %s/%s, reason: %s", repository.Namespace, repository.Name, err)
return nil, fmt.Errorf("cannot resolve implicit inputs for Repository %s/%s, reason: %s", repository.Namespace, repository.Name, err)
}
rsInputs, err := c.inputsForRestoreSession(*restoreSession)
if err != nil {
return fmt.Errorf("cannot resolve implicit inputs for RestoreSession %s/%s, reason: %s", restoreSession.Namespace, restoreSession.Name, err)
return nil, fmt.Errorf("cannot resolve implicit inputs for RestoreSession %s/%s, reason: %s", restoreSession.Namespace, restoreSession.Name, err)
}

implicitInputs := core_util.UpsertMap(repoInputs, rsInputs)
Expand Down Expand Up @@ -300,95 +375,19 @@ func (c *StashController) resolveTaskThenRestore(restoreSession *api_v1beta1.Res

podSpec, err := taskResolver.GetPodSpec()
if err != nil {
return err
return nil, err
}

// for local backend, attach volume to all containers
if repository.Spec.Backend.Local != nil {
podSpec = util.AttachLocalBackend(podSpec, *repository.Spec.Backend.Local)
}

// create Restore Job
_, _, err = batch_util.CreateOrPatchJob(c.kubeClient, meta, func(in *batchv1.Job) *batchv1.Job {
// set RestoreSession as owner of this Job
core_util.EnsureOwnerReference(&in.ObjectMeta, ref)

in.Labels = restoreSession.OffshootLabels()
// restore job is created by resolving task and function. we should not delete it when it goes to completed state.
// user might need to know what was the final resolved job specification for debugging purpose.
in.Labels[apis.KeyDeleteJobOnCompletion] = "false"

in.Spec.Template.Spec = podSpec
in.Spec.Template.Spec.ServiceAccountName = serviceAccountName
return in
})

return err
}

// createPVCThenRestore creates PVCs according to the VolumeClaimTemplate specified in RestoreSession target
// creates one job for each PVC to restore in parallel.
func (c *StashController) createPVCThenRestore(restoreSession *api_v1beta1.RestoreSession,
repository *api_v1alpha1.Repository, meta metav1.ObjectMeta, ref *core.ObjectReference, serviceAccountName string) error {
// Create PVCs specified in VolumeClaimTemplate
replicas := int32(1)
if restoreSession.Spec.Target.Replicas != nil {
replicas = *restoreSession.Spec.Target.Replicas
}
for ordinal := int32(0); ordinal < replicas; ordinal++ {
pvcList, err := GetPVCFromVolumeClaimTemplates(ordinal, restoreSession.Spec.Target.VolumeClaimTemplates)
if err != nil {
return err
}

err = util.CreateBatchPVC(c.kubeClient, restoreSession.Namespace, pvcList)
if err != nil {
return err
}

jobMeta := meta
jobMeta.Name = fmt.Sprintf("%s-%d", meta.Name, ordinal)
err = c.createPVCRestorerJob(restoreSession, repository, jobMeta, ref, serviceAccountName, util.PVCListToVolumes(pvcList, ordinal))
if err != nil {
return err
}
}

return nil
}

func (c *StashController) createPVCRestorerJob(restoreSession *api_v1beta1.RestoreSession, repository *api_v1alpha1.Repository,
meta metav1.ObjectMeta, ref *core.ObjectReference, serviceAccountName string, volumes []core.Volume) error {
image := docker.Docker{
Registry: c.DockerRegistry,
Image: docker.ImageStash,
Tag: c.StashImageTag,
podTemplate := &core.PodTemplateSpec{
ObjectMeta: meta,
Spec: podSpec,
}

jobTemplate, err := util.NewPVCRestorerJob(restoreSession, repository, image, meta)
if err != nil {
return err
}

// add PVCs to volume list of the job
jobTemplate.Spec.Volumes = core_util.UpsertVolume(jobTemplate.Spec.Volumes, volumes...)

// Create restore Job
_, _, err = batch_util.CreateOrPatchJob(c.kubeClient, meta, func(in *batchv1.Job) *batchv1.Job {
// set BackupSession as owner of this Job
core_util.EnsureOwnerReference(&in.ObjectMeta, ref)

if in.Labels == nil {
in.Labels = make(map[string]string, 0)
}
// ensure that job gets deleted when complete
in.Labels[apis.KeyDeleteJobOnCompletion] = "true"

in.Spec.Template = *jobTemplate
in.Spec.Template.Spec.ServiceAccountName = serviceAccountName
return in
})

return err
return podTemplate, nil
}

func (c *StashController) ensureVolumeRestorerJob(restoreSession *api_v1beta1.RestoreSession) error {
Expand Down Expand Up @@ -598,8 +597,8 @@ func (c *StashController) getRestoreSessionPhase(restoreSession *api_v1beta1.Res
return api_v1beta1.RestoreSessionSucceeded, nil
}

//GetPVCFromVolumeClaimTemplates return PVC list from VolumeClaimTemplate
func GetPVCFromVolumeClaimTemplates(ordinal int32, claimTemplates []core.PersistentVolumeClaim) ([]core.PersistentVolumeClaim, error) {
// getPVCFromVolumeClaimTemplates returns list of PVCs generated according to the VolumeClaimTemplate
func getPVCFromVolumeClaimTemplates(ordinal int32, claimTemplates []core.PersistentVolumeClaim) ([]core.PersistentVolumeClaim, error) {
pvcList := make([]core.PersistentVolumeClaim, 0)
for _, claim := range claimTemplates {
inputs := make(map[string]string)
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func NewRecoveryJob(stashClient cs.Interface, recovery *api_v1alpha1.Recovery, i
return job, nil
}

// NewPVCRestorerJob return a job definition to restore pvc.
func NewPVCRestorerJob(rs *api_v1beta1.RestoreSession, repository *api_v1alpha1.Repository, image docker.Docker, meta metav1.ObjectMeta) (*core.PodTemplateSpec, error) {
container := core.Container{
Name: StashContainer,
Expand Down Expand Up @@ -275,6 +276,7 @@ func NewPVCRestorerJob(rs *api_v1beta1.RestoreSession, repository *api_v1alpha1.
}

jobTemplate := &core.PodTemplateSpec{
ObjectMeta: meta,
Spec: core.PodSpec{
Containers: []core.Container{container},
RestartPolicy: core.RestartPolicyNever,
Expand Down

0 comments on commit 33fc37c

Please sign in to comment.