From 70c282dbcae4ef9a6b9d7012491150f2d0f1a650 Mon Sep 17 00:00:00 2001 From: Feng Min Date: Wed, 25 Apr 2018 15:06:02 -0700 Subject: [PATCH] Implement parallel worker in machine controller. (#74) * Implement parallel worker in machine controller. * Add Unit Test for Machine Controller parallelism --- .../google/cmd/gce-machine-controller/main.go | 2 +- pkg/controller/config/configuration.go | 9 +- pkg/controller/machine/controller_test.go | 45 ++++++++- pkg/controller/machine/machine_suite_test.go | 36 ++++--- pkg/controller/machine/queue.go | 50 ++++++++++ pkg/controller/machine/reconcile_test.go | 87 ++++++---------- pkg/controller/machine/testactuator.go | 99 +++++++++++++++++++ 7 files changed, 252 insertions(+), 76 deletions(-) create mode 100644 pkg/controller/machine/queue.go create mode 100644 pkg/controller/machine/testactuator.go diff --git a/cloud/google/cmd/gce-machine-controller/main.go b/cloud/google/cmd/gce-machine-controller/main.go index 64330ef43244..fb5adf62aef6 100644 --- a/cloud/google/cmd/gce-machine-controller/main.go +++ b/cloud/google/cmd/gce-machine-controller/main.go @@ -65,6 +65,6 @@ func main() { // If this doesn't compile, the code generator probably // overwrote the customized NewMachineController function. c := machine.NewMachineController(config, si, actuator) - c.Run(shutdown) + c.RunAsync(shutdown) select {} } diff --git a/pkg/controller/config/configuration.go b/pkg/controller/config/configuration.go index fac3f936e743..4fdf315177c9 100644 --- a/pkg/controller/config/configuration.go +++ b/pkg/controller/config/configuration.go @@ -21,15 +21,18 @@ import ( ) type Configuration struct { - Kubeconfig string - InCluster bool + Kubeconfig string + InCluster bool + WorkerCount int } var ControllerConfig = Configuration{ - InCluster: true, + InCluster: true, + WorkerCount: 5, // Default 5 worker. } func (c *Configuration) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&c.Kubeconfig, "kubeconfig", c.Kubeconfig, "Path to kubeconfig file with authorization and master location information.") fs.BoolVar(&c.InCluster, "incluster", c.InCluster, "Controller will be running inside the cluster.") + fs.IntVar(&c.WorkerCount, "workers", c.WorkerCount, "The number of workers for controller.") } diff --git a/pkg/controller/machine/controller_test.go b/pkg/controller/machine/controller_test.go index b289cc33be0e..e74a663b0559 100644 --- a/pkg/controller/machine/controller_test.go +++ b/pkg/controller/machine/controller_test.go @@ -17,11 +17,13 @@ limitations under the License. package machine import ( + "strconv" "sync" "testing" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1" "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1/testutil" @@ -36,9 +38,12 @@ func machineControllerReconcile(t *testing.T, cs *clientset.Clientset, controlle // When creating a new object, it should invoke the reconcile method. cluster := testutil.GetVanillaCluster() cluster.Name = "cluster-1" - if _, err := cs.ClusterV1alpha1().Clusters("default").Create(&cluster); err != nil { + clusterClient := cs.ClusterV1alpha1().Clusters("default") + if _, err := clusterClient.Create(&cluster); err != nil { t.Fatal(err) } + defer clusterClient.Delete(cluster.Name, &metav1.DeleteOptions{}) + client := cs.ClusterV1alpha1().Machines("default") before := make(chan struct{}) after := make(chan struct{}) @@ -94,3 +99,41 @@ func machineControllerReconcile(t *testing.T, cs *clientset.Clientset, controlle t.Fatalf("reconcile never finished") } } + +func machineControllerConcurrentReconcile(t *testing.T, cs *clientset.Clientset, + controller *MachineController) { + // Create a cluster object. + cluster := testutil.GetVanillaCluster() + cluster.Name = "cluster-1" + clusterClient := cs.ClusterV1alpha1().Clusters("default") + if _, err := clusterClient.Create(&cluster); err != nil { + t.Fatal(err) + } + defer clusterClient.Delete(cluster.Name, &metav1.DeleteOptions{}) + + client := cs.ClusterV1alpha1().Machines("default") + + // Direct test actutaor to block on Create() call. + ta := controller.controller.actuator.(*TestActuator) + ta.BlockOnCreate = true + ta.CreateCallCount = 0 + defer ta.Unblock() + + // Create a few instances + const numMachines = 5 + for i := 0; i < numMachines; i++ { + instance := v1alpha1.Machine{} + instance.Name = "instance" + strconv.Itoa(i) + if _, err := client.Create(&instance); err != nil { + t.Fatal(err) + } + } + + err := wait.Poll(time.Second, 10*time.Second, func() (bool, error) { + return (ta.CreateCallCount == numMachines), nil + }) + + if err != nil { + t.Fatalf("The reconcilation didn't run in parallel.") + } +} diff --git a/pkg/controller/machine/machine_suite_test.go b/pkg/controller/machine/machine_suite_test.go index f805bbe522c0..45eb5a4d3dde 100644 --- a/pkg/controller/machine/machine_suite_test.go +++ b/pkg/controller/machine/machine_suite_test.go @@ -20,35 +20,41 @@ import ( "testing" "github.com/kubernetes-incubator/apiserver-builder/pkg/test" + "k8s.io/client-go/rest" "sigs.k8s.io/cluster-api/pkg/apis" - clusterv1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1" "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset" "sigs.k8s.io/cluster-api/pkg/controller/sharedinformers" "sigs.k8s.io/cluster-api/pkg/openapi" ) -type fakeActuator struct{} - -func (fakeActuator) Create(*clusterv1.Cluster, *clusterv1.Machine) error { return nil } -func (fakeActuator) Delete(*clusterv1.Machine) error { return nil } -func (fakeActuator) Update(c *clusterv1.Cluster, machine *clusterv1.Machine) error { return nil } -func (fakeActuator) Exists(*clusterv1.Machine) (bool, error) { return false, nil } - func TestMachine(t *testing.T) { testenv := test.NewTestEnvironment() config := testenv.Start(apis.GetAllApiBuilders(), openapi.GetOpenAPIDefinitions) cs := clientset.NewForConfigOrDie(config) - shutdown := make(chan struct{}) - si := sharedinformers.NewSharedInformers(config, shutdown) - controller := NewMachineController(config, si, fakeActuator{}) - controller.Run(shutdown) - + // TODO: When cluster-api support per namepsace object, we need to make each subtest to run + // in different namespace. Everything lives inside default namespace so that the test needs to run + // sequentially, which is the default behavior for "go test". t.Run("machineControllerReconcile", func(t *testing.T) { + controller, shutdown := getController(config) + defer close(shutdown) machineControllerReconcile(t, cs, controller) }) - - close(shutdown) + t.Run("machineControllerConcurrentReconcile", func(t *testing.T) { + controller, shutdown := getController(config) + defer close(shutdown) + machineControllerConcurrentReconcile(t, cs, controller) + }) testenv.Stop() } + +func getController(config *rest.Config) (*MachineController, chan struct{}) { + shutdown := make(chan struct{}) + si := sharedinformers.NewSharedInformers(config, shutdown) + actuator := NewTestActuator() + controller := NewMachineController(config, si, actuator) + controller.RunAsync(shutdown) + + return controller, shutdown +} diff --git a/pkg/controller/machine/queue.go b/pkg/controller/machine/queue.go new file mode 100644 index 000000000000..2bf6debf42ec --- /dev/null +++ b/pkg/controller/machine/queue.go @@ -0,0 +1,50 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine + +import ( + "time" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/kubernetes-incubator/apiserver-builder/pkg/controller" + "sigs.k8s.io/cluster-api/pkg/controller/config" +) + +func (c *MachineController) RunAsync(stopCh <-chan struct{}) { + for _, q := range c.Informers.WorkerQueues { + go c.StartWorkerQueue(q, stopCh) + } +} + +// StartWorkerQueue schedules a routine to continuously process Queue messages +// until shutdown is closed +func (c *MachineController) StartWorkerQueue(q *controller.QueueWorker, shutdown <-chan struct{}) { + glog.Infof("Start %s Queue", q.Name) + defer q.Queue.ShutDown() + + for i := 0; i < config.ControllerConfig.WorkerCount; i++ { + // Every second, process all messages in the Queue until it is time to shutdown + go wait.Until(q.ProcessAllMessages, time.Second, shutdown) + } + + <-shutdown + + // Stop accepting messages into the Queue + glog.V(1).Infof("Shutting down %s Queue\n", q.Name) +} diff --git a/pkg/controller/machine/reconcile_test.go b/pkg/controller/machine/reconcile_test.go index de05e618cb6d..1c3a7af2b475 100644 --- a/pkg/controller/machine/reconcile_test.go +++ b/pkg/controller/machine/reconcile_test.go @@ -30,46 +30,20 @@ import ( "sigs.k8s.io/cluster-api/util" ) -type CountActuator struct{ - CreateCallCount int64 - DeleteCallCount int64 - UpdateCallCount int64 - ExistsCallCount int64 - ExistsValue bool -} - -func (a *CountActuator) Create(*v1alpha1.Cluster, *v1alpha1.Machine) error { - a.CreateCallCount++ - return nil -} -func (a *CountActuator) Delete(*v1alpha1.Machine) error { - a.DeleteCallCount++ - return nil -} -func (a *CountActuator) Update(c *v1alpha1.Cluster, machine *v1alpha1.Machine) error { - a.UpdateCallCount++ - return nil -} -func (a *CountActuator) Exists(*v1alpha1.Machine) (bool, error) { - a.ExistsCallCount++ - return a.ExistsValue, nil -} - - func TestMachineSetControllerReconcileHandler(t *testing.T) { tests := []struct { - name string - objExists bool - instanceExists bool - isDeleting bool - withFinalizer bool - isMaster bool - ignoreDeleteCallCount bool - expectFinalizerRemoved bool - numExpectedCreateCalls int64 - numExpectedDeleteCalls int64 - numExpectedUpdateCalls int64 - numExpectedExistsCalls int64 + name string + objExists bool + instanceExists bool + isDeleting bool + withFinalizer bool + isMaster bool + ignoreDeleteCallCount bool + expectFinalizerRemoved bool + numExpectedCreateCalls int64 + numExpectedDeleteCalls int64 + numExpectedUpdateCalls int64 + numExpectedExistsCalls int64 }{ { name: "Create machine", @@ -96,11 +70,11 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) { }, { // This should not be possible. Here for completeness. - name: "Delete machine, instance exists without finalizer", - objExists: true, - instanceExists: true, - isDeleting: true, - withFinalizer: false, + name: "Delete machine, instance exists without finalizer", + objExists: true, + instanceExists: true, + isDeleting: true, + withFinalizer: false, }, { name: "Delete machine, instance does not exist, with finalizer", @@ -112,19 +86,19 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) { expectFinalizerRemoved: true, }, { - name: "Delete machine, instance does not exist, without finalizer", - objExists: true, - instanceExists: false, - isDeleting: true, - withFinalizer: false, + name: "Delete machine, instance does not exist, without finalizer", + objExists: true, + instanceExists: false, + isDeleting: true, + withFinalizer: false, }, { - name: "Delete machine, skip master", - objExists: true, - instanceExists: true, - isDeleting: true, - withFinalizer: true, - isMaster: true, + name: "Delete machine, skip master", + objExists: true, + instanceExists: true, + isDeleting: true, + withFinalizer: true, + isMaster: true, }, } @@ -151,7 +125,8 @@ func TestMachineSetControllerReconcileHandler(t *testing.T) { t.Fatal(err) } - actuator := &CountActuator{ExistsValue: test.instanceExists} + actuator := NewTestActuator() + actuator.ExistsValue = test.instanceExists target := &MachineControllerImpl{} target.actuator = actuator @@ -193,7 +168,7 @@ func getMachine(name string, isDeleting, hasFinalizer, isMaster bool) *v1alpha1. APIVersion: v1alpha1.SchemeGroupVersion.String(), }, ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: name, Namespace: metav1.NamespaceDefault, }, } diff --git a/pkg/controller/machine/testactuator.go b/pkg/controller/machine/testactuator.go new file mode 100644 index 000000000000..359383af3e1f --- /dev/null +++ b/pkg/controller/machine/testactuator.go @@ -0,0 +1,99 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package machine + +import ( + "sync" + + "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1" +) + +type TestActuator struct { + unblock chan string + BlockOnCreate bool + BlockOnDelete bool + BlockOnUpdate bool + BlockOnExists bool + CreateCallCount int64 + DeleteCallCount int64 + UpdateCallCount int64 + ExistsCallCount int64 + ExistsValue bool + Lock sync.Mutex +} + +func (a *TestActuator) Create(*v1alpha1.Cluster, *v1alpha1.Machine) error { + defer func() { + if a.BlockOnCreate { + <-a.unblock + } + }() + + a.Lock.Lock() + defer a.Lock.Unlock() + a.CreateCallCount++ + return nil +} + +func (a *TestActuator) Delete(*v1alpha1.Machine) error { + defer func() { + if a.BlockOnDelete { + <-a.unblock + } + }() + + a.Lock.Lock() + defer a.Lock.Unlock() + a.DeleteCallCount++ + return nil +} + +func (a *TestActuator) Update(c *v1alpha1.Cluster, machine *v1alpha1.Machine) error { + defer func() { + if a.BlockOnUpdate { + <-a.unblock + } + }() + + a.Lock.Lock() + defer a.Lock.Unlock() + a.UpdateCallCount++ + return nil +} + +func (a *TestActuator) Exists(*v1alpha1.Machine) (bool, error) { + defer func() { + if a.BlockOnExists { + <-a.unblock + } + }() + + a.Lock.Lock() + defer a.Lock.Unlock() + a.ExistsCallCount++ + return a.ExistsValue, nil +} + +func NewTestActuator() *TestActuator { + ta := new(TestActuator) + ta.unblock = make(chan string) + return ta +} + +func (a *TestActuator) Unblock() { + close(a.unblock) +}