Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move away from Jobs to Pods #266

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions controllers/ansibletest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"strconv"
"time"

"reflect"

Expand All @@ -30,12 +29,10 @@ import (
"github.com/openstack-k8s-operators/lib-common/modules/common/condition"
"github.com/openstack-k8s-operators/lib-common/modules/common/env"
"github.com/openstack-k8s-operators/lib-common/modules/common/helper"
"github.com/openstack-k8s-operators/lib-common/modules/common/job"
common_rbac "github.com/openstack-k8s-operators/lib-common/modules/common/rbac"
"github.com/openstack-k8s-operators/test-operator/api/v1beta1"
testv1beta1 "github.com/openstack-k8s-operators/test-operator/api/v1beta1"
v1beta1 "github.com/openstack-k8s-operators/test-operator/api/v1beta1"
"github.com/openstack-k8s-operators/test-operator/pkg/ansibletest"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -161,7 +158,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Log.Info(InfoTestingCompleted)
return ctrl.Result{}, nil

case CreateFirstJob:
case CreateFirstPod:
lockAcquired, err := r.AcquireLock(ctx, instance, helper, false)
if !lockAcquired {
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
Expand All @@ -170,7 +167,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))

case CreateNextJob:
case CreateNextPod:
// Confirm that we still hold the lock. This is useful to check if for
// example somebody / something deleted the lock and it got claimed by
// another instance. This is considered to be an error state.
Expand Down Expand Up @@ -213,7 +210,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// Create a new job
mountCerts := r.CheckSecretExists(ctx, instance, "combined-ca-bundle")
jobName := r.GetJobName(instance, nextWorkflowStep)
jobName := r.GetPodName(instance, nextWorkflowStep)
envVars, workflowOverrideParams := r.PrepareAnsibleEnv(instance, nextWorkflowStep)
logsPVCName := r.GetPVCLogsName(instance, 0)
containerImage, err := r.GetContainerImage(ctx, workflowOverrideParams["ContainerImage"], instance)
Expand Down Expand Up @@ -245,8 +242,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return rbacResult, nil
}
// Service account, role, binding - end

jobDef := ansibletest.Job(
podDef := ansibletest.Pod(
instance,
serviceLabels,
jobName,
Expand All @@ -258,15 +254,8 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
containerImage,
privileged,
)
ansibleTestsJob := job.NewJob(
jobDef,
testv1beta1.ConfigHash,
true,
time.Duration(5)*time.Second,
"",
)

ctrlResult, err = ansibleTestsJob.DoJob(ctx, helper)
ctrlResult, err = r.CreatePod(ctx, *helper, podDef)
if err != nil {
// Creation of the ansibleTests job was not successfull.
// Release the lock and allow other controllers to spawn
Expand Down Expand Up @@ -299,7 +288,7 @@ func (r *AnsibleTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
func (r *AnsibleTestReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&testv1beta1.AnsibleTest{}).
Owns(&batchv1.Job{}).
Owns(&corev1.Pod{}).
Owns(&corev1.Secret{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
Expand Down
117 changes: 79 additions & 38 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/openstack-k8s-operators/lib-common/modules/common/util"
v1beta1 "github.com/openstack-k8s-operators/test-operator/api/v1beta1"
"gopkg.in/yaml.v3"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -27,6 +26,7 @@ import (
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

const (
Expand Down Expand Up @@ -80,13 +80,13 @@ const (
// to change
Wait = iota

// CreateFirstJob indicates that the Reconcile loop should create the first job
// CreateFirstPod indicates that the Reconcile loop should create the first job
// either specified in the .Spec section or in the .Spec.Workflow section.
CreateFirstJob
CreateFirstPod

// CreateNextJob indicates that the Reconcile loop should create a next job
// CreateNextPod indicates that the Reconcile loop should create a next job
// specified in the .Spec.Workflow section (if .Spec.Workflow is defined)
CreateNextJob
CreateNextPod

// EndTesting indicates that all jobs have already finished. The Reconcile
// loop should end the testing and release resources that are required to
Expand All @@ -97,59 +97,100 @@ const (
Failure
)

// GetPod returns pod that has a specific name (podName) in a given namespace
// (podNamespace).
func (r *Reconciler) GetPod(
ctx context.Context,
podName string,
podNamespace string,
) (*corev1.Pod, error) {
pod := &corev1.Pod{}
objectKey := client.ObjectKey{Namespace: podNamespace, Name: podName}
if err := r.Client.Get(ctx, objectKey, pod); err != nil {
return pod, err
}

return pod, nil
}

// CreatePod creates a pod based on a spec provided via PodSpec.
func (r *Reconciler) CreatePod(
ctx context.Context,
h helper.Helper,
podSpec *corev1.Pod,
) (ctrl.Result, error) {
_, err := r.GetPod(ctx, podSpec.Name, podSpec.Namespace)
if err == nil {
return ctrl.Result{}, nil
} else if !k8s_errors.IsNotFound(err) {
return ctrl.Result{}, err
}

err = controllerutil.SetControllerReference(h.GetBeforeObject(), podSpec, r.GetScheme())
if err != nil {
return ctrl.Result{}, err
}

if err := r.Client.Create(ctx, podSpec); err != nil {
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}

// NextAction indicates what action needs to be performed by the Reconcile loop
// based on the current state of the OpenShift cluster.
func (r *Reconciler) NextAction(
ctx context.Context,
instance client.Object,
workflowLength int,
) (NextAction, int, error) {
// Get the latest job. The latest job is job with the highest value stored
// Get the latest pod. The latest pod is pod with the highest value stored
// in workflowStep label
workflowStepIdx := 0
lastJob, err := r.GetLastJob(ctx, instance)
lastPod, err := r.GetLastPod(ctx, instance)
if err != nil {
return Failure, workflowStepIdx, err
}

// If there is a job associated with the current instance.
if lastJob != nil {
workflowStepIdx, err := strconv.Atoi(lastJob.Labels[workflowStepLabel])
// If there is a pod associated with the current instance.
if lastPod != nil {
workflowStepIdx, err := strconv.Atoi(lastPod.Labels[workflowStepLabel])
if err != nil {
return Failure, workflowStepIdx, err
}

// If the last job is not in Failed or Succeded state -> Wait
lastJobFinished := (lastJob.Status.Failed + lastJob.Status.Succeeded) > 0
if !lastJobFinished {
// If the last pod is not in Failed or Succeded state -> Wait
lastPodFinished := lastPod.Status.Phase == corev1.PodFailed || lastPod.Status.Phase == corev1.PodSucceeded
if !lastPodFinished {
return Wait, workflowStepIdx, nil
}

// If the last job is in Failed or Succeeded state and it is NOT the last
// job which was supposed to be created -> CreateNextJob
if lastJobFinished && !isLastJobIndex(workflowStepIdx, workflowLength) {
// If the last pod is in Failed or Succeeded state and it is NOT the last
// pod which was supposed to be created -> CreateNextPod
if lastPodFinished && !isLastPodIndex(workflowStepIdx, workflowLength) {
workflowStepIdx++
return CreateNextJob, workflowStepIdx, nil
return CreateNextPod, workflowStepIdx, nil
}

// Otherwise if the job is in Failed or Succeded stated and it IS the
// last job -> EndTesting
if lastJobFinished && isLastJobIndex(workflowStepIdx, workflowLength) {
// Otherwise if the pod is in Failed or Succeded stated and it IS the
// last pod -> EndTesting
if lastPodFinished && isLastPodIndex(workflowStepIdx, workflowLength) {
return EndTesting, workflowStepIdx, nil
}
}

// If there is not any job associated with the instance -> createFirstJob
if lastJob == nil {
return CreateFirstJob, workflowStepIdx, nil
// If there is not any pod associated with the instance -> createFirstPod
if lastPod == nil {
return CreateFirstPod, workflowStepIdx, nil
}

return Failure, workflowStepIdx, nil
}

// isLastJobIndex returns true when jobIndex is the index of the last job that
// isLastPodIndex returns true when jobIndex is the index of the last job that
// should be executed. Otherwise the return value is false.
func isLastJobIndex(jobIndex int, workflowLength int) bool {
func isLastPodIndex(jobIndex int, workflowLength int) bool {
switch workflowLength {
case 0:
return jobIndex == workflowLength
Expand All @@ -160,26 +201,26 @@ func isLastJobIndex(jobIndex int, workflowLength int) bool {

// GetLastJob returns job associated with an instance which has the highest value
// stored in the workflowStep label
func (r *Reconciler) GetLastJob(
func (r *Reconciler) GetLastPod(
ctx context.Context,
instance client.Object,
) (*batchv1.Job, error) {
) (*corev1.Pod, error) {
labels := map[string]string{instanceNameLabel: instance.GetName()}
namespaceListOpt := client.InNamespace(instance.GetNamespace())
labelsListOpt := client.MatchingLabels(labels)
jobList := &batchv1.JobList{}
err := r.Client.List(ctx, jobList, namespaceListOpt, labelsListOpt)
podList := &corev1.PodList{}
err := r.Client.List(ctx, podList, namespaceListOpt, labelsListOpt)
if err != nil {
return nil, err
}

var maxJob *batchv1.Job
var maxJob *corev1.Pod
maxJobWorkflowStep := 0

for _, job := range jobList.Items {
for _, job := range podList.Items {
workflowStep, err := strconv.Atoi(job.Labels[workflowStepLabel])
if err != nil {
return &batchv1.Job{}, err
return &corev1.Pod{}, err
}

if workflowStep >= maxJobWorkflowStep {
Expand Down Expand Up @@ -307,7 +348,7 @@ func (r *Reconciler) GetContainerImage(
return "", nil
}

func (r *Reconciler) GetJobName(instance interface{}, workflowStepNum int) string {
func (r *Reconciler) GetPodName(instance interface{}, workflowStepNum int) string {
if typedInstance, ok := instance.(*v1beta1.Tobiko); ok {
if len(typedInstance.Spec.Workflow) == 0 || workflowStepNum == workflowStepNumInvalid {
return typedInstance.Name
Expand Down Expand Up @@ -552,11 +593,11 @@ func (r *Reconciler) ReleaseLock(ctx context.Context, instance client.Object) (b
return false, errors.New("failed to delete test-operator-lock")
}

func (r *Reconciler) JobExists(ctx context.Context, instance client.Object, workflowStepNum int) bool {
job := &batchv1.Job{}
jobName := r.GetJobName(instance, workflowStepNum)
objectKey := client.ObjectKey{Namespace: instance.GetNamespace(), Name: jobName}
err := r.Client.Get(ctx, objectKey, job)
func (r *Reconciler) PodExists(ctx context.Context, instance client.Object, workflowStepNum int) bool {
pod := &corev1.Pod{}
podName := r.GetPodName(instance, workflowStepNum)
objectKey := client.ObjectKey{Namespace: instance.GetNamespace(), Name: podName}
err := r.Client.Get(ctx, objectKey, pod)
if err != nil && k8s_errors.IsNotFound(err) {
return false
}
Expand Down
23 changes: 6 additions & 17 deletions controllers/horizontest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,15 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/go-logr/logr"
"github.com/openstack-k8s-operators/lib-common/modules/common"
"github.com/openstack-k8s-operators/lib-common/modules/common/condition"
"github.com/openstack-k8s-operators/lib-common/modules/common/env"
"github.com/openstack-k8s-operators/lib-common/modules/common/helper"
"github.com/openstack-k8s-operators/lib-common/modules/common/job"
common_rbac "github.com/openstack-k8s-operators/lib-common/modules/common/rbac"
testv1beta1 "github.com/openstack-k8s-operators/test-operator/api/v1beta1"
"github.com/openstack-k8s-operators/test-operator/pkg/horizontest"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -154,7 +151,7 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Log.Info(InfoTestingCompleted)
return ctrl.Result{}, nil

case CreateFirstJob:
case CreateFirstPod:
lockAcquired, err := r.AcquireLock(ctx, instance, helper, instance.Spec.Parallel)
if !lockAcquired {
Log.Info(fmt.Sprintf(InfoCanNotAcquireLock, testOperatorLockName))
Expand All @@ -163,7 +160,7 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

Log.Info(fmt.Sprintf(InfoCreatingFirstPod, nextWorkflowStep))

case CreateNextJob:
case CreateNextPod:
// Confirm that we still hold the lock. This is useful to check if for
// example somebody / something deleted the lock and it got claimed by
// another instance. This is considered to be an error state.
Expand Down Expand Up @@ -224,7 +221,7 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)

// Prepare HorizonTest env vars
envVars := r.PrepareHorizonTestEnvVars(instance)
jobName := r.GetJobName(instance, 0)
jobName := r.GetPodName(instance, 0)
logsPVCName := r.GetPVCLogsName(instance, 0)
containerImage, err := r.GetContainerImage(ctx, instance.Spec.ContainerImage, instance)
if err != nil {
Expand All @@ -240,8 +237,7 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return rbacResult, nil
}
// Service account, role, binding - end

jobDef := horizontest.Job(
podDef := horizontest.Pod(
instance,
serviceLabels,
jobName,
Expand All @@ -252,15 +248,8 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
envVars,
containerImage,
)
horizontestJob := job.NewJob(
jobDef,
testv1beta1.ConfigHash,
true,
time.Duration(5)*time.Second,
"",
)

ctrlResult, err = horizontestJob.DoJob(ctx, helper)
ctrlResult, err = r.CreatePod(ctx, *helper, podDef)
if err != nil {
instance.Status.Conditions.Set(condition.FalseCondition(
condition.DeploymentReadyCondition,
Expand All @@ -286,7 +275,7 @@ func (r *HorizonTestReconciler) Reconcile(ctx context.Context, req ctrl.Request)
func (r *HorizonTestReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&testv1beta1.HorizonTest{}).
Owns(&batchv1.Job{}).
Owns(&corev1.Pod{}).
Owns(&corev1.Secret{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
Expand Down
Loading
Loading