Skip to content

Commit

Permalink
Rename run with runtime
Browse files Browse the repository at this point in the history
Signed-off-by: Yuki Iwai <[email protected]>
  • Loading branch information
tenzen-y committed Oct 22, 2024
1 parent 94507e6 commit 6016495
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions pkg/controller.v2/trainjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package controllerv2

import (
"context"
"errors"
"fmt"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -29,22 +31,24 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1"
runtime "github.com/kubeflow/training-operator/pkg/runtime.v2"
jobruntimes "github.com/kubeflow/training-operator/pkg/runtime.v2"
)

var errorUnsupportedRuntime = errors.New("the specified runtime is not supported")

type TrainJobReconciler struct {
log logr.Logger
client client.Client
recorder record.EventRecorder
runtimes map[string]runtime.Runtime
runtimes map[string]jobruntimes.Runtime
}

func NewTrainJobReconciler(client client.Client, recorder record.EventRecorder, runs map[string]runtime.Runtime) *TrainJobReconciler {
func NewTrainJobReconciler(client client.Client, recorder record.EventRecorder, runtimes map[string]jobruntimes.Runtime) *TrainJobReconciler {
return &TrainJobReconciler{
log: ctrl.Log.WithName("trainjob-controller"),
client: client,
recorder: recorder,
runtimes: runs,
runtimes: runtimes,
}
}

Expand All @@ -66,9 +70,12 @@ func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
func (r *TrainJobReconciler) createOrUpdateObjs(ctx context.Context, trainJob *kubeflowv2.TrainJob) error {
log := ctrl.LoggerFrom(ctx)

// Controller assumes the runtime existence has already verified in the webhook on TrainJob creation.
run := r.runtimes[runtimeRefToGroupKind(trainJob.Spec.RuntimeRef).String()]
objs, err := run.NewObjects(ctx, trainJob)
runtimeRefGK := runtimeRefToGroupKind(trainJob.Spec.RuntimeRef).String()
runtime, ok := r.runtimes[runtimeRefGK]
if !ok {
return fmt.Errorf("%w: %s", errorUnsupportedRuntime, runtimeRefGK)
}
objs, err := runtime.NewObjects(ctx, trainJob)
if err != nil {
return err
}
Expand Down Expand Up @@ -117,8 +124,8 @@ func runtimeRefToGroupKind(runtimeRef kubeflowv2.RuntimeRef) schema.GroupKind {
func (r *TrainJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
b := ctrl.NewControllerManagedBy(mgr).
For(&kubeflowv2.TrainJob{})
for _, run := range r.runtimes {
for _, registrar := range run.EventHandlerRegistrars() {
for _, runtime := range r.runtimes {
for _, registrar := range runtime.EventHandlerRegistrars() {
if registrar != nil {
b = registrar(b, mgr.GetClient())
}
Expand Down

0 comments on commit 6016495

Please sign in to comment.