diff --git a/cmd/training-operator.v1/main.go b/cmd/training-operator.v1/main.go index 9562a2bde2..3a20103909 100644 --- a/cmd/training-operator.v1/main.go +++ b/cmd/training-operator.v1/main.go @@ -60,6 +60,7 @@ func main() { var gangSchedulerName string var namespace string var monitoringPort int + var controllerThreads int flag.StringVar(&metricsAddr, "metrics-bind-address", ":8080", "The address the metric endpoint binds to.") flag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") flag.BoolVar(&enableLeaderElection, "leader-elect", false, @@ -74,6 +75,7 @@ func main() { "If set, it only monitors kubeflow jobs in the given namespace.") flag.IntVar(&monitoringPort, "monitoring-port", 9443, "Endpoint port for displaying monitoring metrics. "+ "It can be set to \"0\" to disable the metrics serving.") + flag.IntVar(&controllerThreads, "controller-threads", 10, "Number of worker threads used by the controller.") // PyTorch related flags flag.StringVar(&config.Config.PyTorchInitContainerImage, "pytorch-init-container-image", @@ -120,7 +122,7 @@ func main() { "scheme not supported", "scheme", s) os.Exit(1) } - if err = setupFunc(mgr, enableGangScheduling); err != nil { + if err = setupFunc(mgr, enableGangScheduling, controllerThreads); err != nil { setupLog.Error(err, "unable to create controller", "controller", s) os.Exit(1) } diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index 30ee1abe5b..be2a60acbe 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -176,9 +176,10 @@ func (jc *MPIJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } // SetupWithManager sets up the controller with the Manager. -func (jc *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (jc *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error { c, err := controller.New(jc.ControllerName(), mgr, controller.Options{ - Reconciler: jc, + Reconciler: jc, + MaxConcurrentReconciles: controllerThreads, }) if err != nil { diff --git a/pkg/controller.v1/mpi/suite_test.go b/pkg/controller.v1/mpi/suite_test.go index e821208644..069f32db88 100644 --- a/pkg/controller.v1/mpi/suite_test.go +++ b/pkg/controller.v1/mpi/suite_test.go @@ -87,7 +87,7 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) reconciler = NewReconciler(mgr, false) - Expect(reconciler.SetupWithManager(mgr)).NotTo(HaveOccurred()) + Expect(reconciler.SetupWithManager(mgr, 10)).NotTo(HaveOccurred()) go func() { defer GinkgoRecover() diff --git a/pkg/controller.v1/mxnet/mxjob_controller.go b/pkg/controller.v1/mxnet/mxjob_controller.go index 9eb0584b88..84cb1c6e2f 100644 --- a/pkg/controller.v1/mxnet/mxjob_controller.go +++ b/pkg/controller.v1/mxnet/mxjob_controller.go @@ -177,9 +177,10 @@ func (r *MXJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } // SetupWithManager sets up the controller with the Manager. -func (r *MXJobReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *MXJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error { c, err := controller.New(r.ControllerName(), mgr, controller.Options{ - Reconciler: r, + Reconciler: r, + MaxConcurrentReconciles: controllerThreads, }) if err != nil { diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go index 4d76e5c187..547911d01d 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go @@ -172,9 +172,10 @@ func (r *PaddleJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } // SetupWithManager sets up the controller with the Manager. -func (r *PaddleJobReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *PaddleJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error { c, err := controller.New(r.ControllerName(), mgr, controller.Options{ - Reconciler: r, + Reconciler: r, + MaxConcurrentReconciles: controllerThreads, }) if err != nil { diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_suite_test.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_suite_test.go index 59e5fc5967..8e531e1421 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_suite_test.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_suite_test.go @@ -83,7 +83,7 @@ var _ = BeforeSuite(func() { r := NewReconciler(mgr, false) - Expect(r.SetupWithManager(mgr)).NotTo(gomega.HaveOccurred()) + Expect(r.SetupWithManager(mgr, 10)).NotTo(gomega.HaveOccurred()) go func() { defer GinkgoRecover() diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller.go b/pkg/controller.v1/pytorch/pytorchjob_controller.go index aacd75e36a..a19de42aca 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller.go @@ -176,9 +176,10 @@ func (r *PyTorchJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // SetupWithManager sets up the controller with the Manager. -func (r *PyTorchJobReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *PyTorchJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error { c, err := controller.New(r.ControllerName(), mgr, controller.Options{ - Reconciler: r, + Reconciler: r, + MaxConcurrentReconciles: controllerThreads, }) if err != nil { diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_suite_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_suite_test.go index ea7816966c..9757c4d861 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_suite_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_suite_test.go @@ -88,7 +88,7 @@ var _ = BeforeSuite(func() { r := NewReconciler(mgr, false) - Expect(r.SetupWithManager(mgr)).NotTo(gomega.HaveOccurred()) + Expect(r.SetupWithManager(mgr, 10)).NotTo(gomega.HaveOccurred()) go func() { defer GinkgoRecover() diff --git a/pkg/controller.v1/register_controller.go b/pkg/controller.v1/register_controller.go index 66bfa554d9..bbdd0af108 100644 --- a/pkg/controller.v1/register_controller.go +++ b/pkg/controller.v1/register_controller.go @@ -31,26 +31,26 @@ import ( const ErrTemplateSchemeNotSupported = "scheme %s is not supported yet" -type ReconcilerSetupFunc func(manager manager.Manager, enableGangScheduling bool) error +type ReconcilerSetupFunc func(manager manager.Manager, enableGangScheduling bool, controllerThreads int) error var SupportedSchemeReconciler = map[string]ReconcilerSetupFunc{ - kubeflowv1.TFJobKind: func(mgr manager.Manager, enableGangScheduling bool) error { - return tensorflowcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr) + kubeflowv1.TFJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { + return tensorflowcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) }, - kubeflowv1.PytorchJobKind: func(mgr manager.Manager, enableGangScheduling bool) error { - return pytorchcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr) + kubeflowv1.PytorchJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { + return pytorchcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) }, - kubeflowv1.MXJobKind: func(mgr manager.Manager, enableGangScheduling bool) error { - return mxnetcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr) + kubeflowv1.MXJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { + return mxnetcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) }, - kubeflowv1.XGBoostJobKind: func(mgr manager.Manager, enableGangScheduling bool) error { - return xgboostcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr) + kubeflowv1.XGBoostJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { + return xgboostcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) }, - kubeflowv1.MPIJobKind: func(mgr manager.Manager, enableGangScheduling bool) error { - return mpicontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr) + kubeflowv1.MPIJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { + return mpicontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) }, - kubeflowv1.PaddleJobKind: func(mgr manager.Manager, enableGangScheduling bool) error { - return paddlecontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr) + kubeflowv1.PaddleJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { + return paddlecontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) }, } diff --git a/pkg/controller.v1/tensorflow/suite_test.go b/pkg/controller.v1/tensorflow/suite_test.go index 667e31c9e0..e6cb0608df 100644 --- a/pkg/controller.v1/tensorflow/suite_test.go +++ b/pkg/controller.v1/tensorflow/suite_test.go @@ -89,7 +89,7 @@ var _ = BeforeSuite(func() { Expect(err).NotTo(HaveOccurred()) reconciler = NewReconciler(mgr, false) - Expect(reconciler.SetupWithManager(mgr)).NotTo(HaveOccurred()) + Expect(reconciler.SetupWithManager(mgr, 10)).NotTo(HaveOccurred()) go func() { defer GinkgoRecover() diff --git a/pkg/controller.v1/tensorflow/tfjob_controller.go b/pkg/controller.v1/tensorflow/tfjob_controller.go index 6079a4e846..81da4c0a01 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller.go @@ -194,9 +194,10 @@ func (r *TFJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } // SetupWithManager sets up the controller with the Manager. -func (r *TFJobReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *TFJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error { c, err := controller.New(r.ControllerName(), mgr, controller.Options{ - Reconciler: r, + Reconciler: r, + MaxConcurrentReconciles: controllerThreads, }) if err != nil { diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller.go b/pkg/controller.v1/xgboost/xgboostjob_controller.go index 7be8ff2c7a..26680e018d 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller.go @@ -183,9 +183,10 @@ func (r *XGBoostJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) } // SetupWithManager sets up the controller with the Manager. -func (r *XGBoostJobReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *XGBoostJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error { c, err := controller.New(r.ControllerName(), mgr, controller.Options{ - Reconciler: r, + Reconciler: r, + MaxConcurrentReconciles: controllerThreads, }) if err != nil {