From c1007decae0e225d7edc892d6d340d3b29887929 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Thu, 18 Jun 2020 11:35:05 +0200 Subject: [PATCH] UPSTREAM: 89937: portAllocator sync local data before allocate Origin-commit: e2ada0a8f2d3e999b8c10afaf41c52f0c8b50a87 --- .../core/service/allocator/storage/storage.go | 54 +++++------- .../service/allocator/storage/storage_test.go | 56 +++++++++++++ pkg/registry/core/service/portallocator/BUILD | 1 + .../integration/master/kube_apiserver_test.go | 82 +++++++++++++++++++ 4 files changed, 161 insertions(+), 32 deletions(-) diff --git a/pkg/registry/core/service/allocator/storage/storage.go b/pkg/registry/core/service/allocator/storage/storage.go index 52a9acde08de2..4fe4242b9560c 100644 --- a/pkg/registry/core/service/allocator/storage/storage.go +++ b/pkg/registry/core/service/allocator/storage/storage.go @@ -76,17 +76,12 @@ func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.Grou } } -// Allocate attempts to allocate the item locally and then in etcd. +// Allocate attempts to allocate the item. func (e *Etcd) Allocate(offset int) (bool, error) { e.lock.Lock() defer e.lock.Unlock() - ok, err := e.alloc.Allocate(offset) - if !ok || err != nil { - return ok, err - } - - err = e.tryUpdate(func() error { + err := e.tryUpdate(func() error { ok, err := e.alloc.Allocate(offset) if err != nil { return err @@ -105,49 +100,44 @@ func (e *Etcd) Allocate(offset int) (bool, error) { return true, nil } -// AllocateNext attempts to allocate the next item locally and then in etcd. +// AllocateNext attempts to allocate the next item. func (e *Etcd) AllocateNext() (int, bool, error) { e.lock.Lock() defer e.lock.Unlock() - - offset, ok, err := e.alloc.AllocateNext() - if !ok || err != nil { - return offset, ok, err - } + var offset int + var ok bool + var err error err = e.tryUpdate(func() error { - ok, err := e.alloc.Allocate(offset) + // update the offset here + offset, ok, err = e.alloc.AllocateNext() if err != nil { return err } if !ok { - // update the offset here - offset, ok, err = e.alloc.AllocateNext() - if err != nil { - return err - } - if !ok { - return errorUnableToAllocate - } - return nil + return errorUnableToAllocate } return nil }) - return offset, ok, err + + if err != nil { + if err == errorUnableToAllocate { + return offset, false, nil + } + return offset, false, err + } + return offset, true, nil } -// Release attempts to release the provided item locally and then in etcd. +// Release attempts to release the provided item. func (e *Etcd) Release(item int) error { e.lock.Lock() defer e.lock.Unlock() - if err := e.alloc.Release(item); err != nil { - return err - } - return e.tryUpdate(func() error { return e.alloc.Release(item) }) + } func (e *Etcd) ForEach(fn func(int)) { @@ -168,9 +158,9 @@ func (e *Etcd) tryUpdate(fn func() error) error { if err := e.alloc.Restore(existing.Range, existing.Data); err != nil { return nil, err } - if err := fn(); err != nil { - return nil, err - } + } + if err := fn(); err != nil { + return nil, err } e.last = existing.ResourceVersion rangeSpec, data := e.alloc.Snapshot() diff --git a/pkg/registry/core/service/allocator/storage/storage_test.go b/pkg/registry/core/service/allocator/storage/storage_test.go index aaacb0f86dde1..54064e38d5894 100644 --- a/pkg/registry/core/service/allocator/storage/storage_test.go +++ b/pkg/registry/core/service/allocator/storage/storage_test.go @@ -96,3 +96,59 @@ func TestStore(t *testing.T) { t.Fatal(err) } } + +// Test that one item is allocated in storage but is not allocated locally +// When try to allocate it, it should fail despite it's free in the local bitmap +// bot not in the storage +func TestAllocatedStorageButReleasedLocally(t *testing.T) { + storage, server, backing, _ := newStorage(t) + defer server.Terminate(t) + if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Allocate an item in the storage + if _, err := storage.Allocate(2); err != nil { + t.Fatal(err) + } + + // Release the item in the local bitmap + // emulating it's out of sync with the storage + err := backing.Release(2) + if err != nil { + t.Fatal(err) + } + + // It should fail trying to allocate it deespite it's free + // in the local bitmap because it's not in the storage + ok, err := storage.Allocate(2) + if ok || err != nil { + t.Fatal(err) + } + +} + +// Test that one item is free in storage but is allocated locally +// When try to allocate it, it should succeed despite it's allocated +// in the local bitmap bot not in the storage +func TestAllocatedLocallyButReleasedStorage(t *testing.T) { + storage, server, backing, _ := newStorage(t) + defer server.Terminate(t) + if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Allocate an item in the local bitmap only but not in the storage + // emulating it's out of sync with the storage + if _, err := backing.Allocate(2); err != nil { + t.Fatal(err) + } + + // It should be able to allocate it + // because it's free in the storage + ok, err := storage.Allocate(2) + if !ok || err != nil { + t.Fatal(err) + } + +} diff --git a/pkg/registry/core/service/portallocator/BUILD b/pkg/registry/core/service/portallocator/BUILD index c997a6776acab..7bf982a8effdf 100644 --- a/pkg/registry/core/service/portallocator/BUILD +++ b/pkg/registry/core/service/portallocator/BUILD @@ -44,6 +44,7 @@ filegroup( srcs = [ ":package-srcs", "//pkg/registry/core/service/portallocator/controller:all-srcs", + "//pkg/registry/core/service/portallocator/storage:all-srcs", ], tags = ["automanaged"], ) diff --git a/test/integration/master/kube_apiserver_test.go b/test/integration/master/kube_apiserver_test.go index 187b1da1b00a6..a2a9da0f6dca4 100644 --- a/test/integration/master/kube_apiserver_test.go +++ b/test/integration/master/kube_apiserver_test.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/client-go/kubernetes" @@ -250,3 +251,84 @@ func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) { func TestReconcilerMasterLeaseMultiCombined(t *testing.T) { testReconcilersMasterLease(t, 3, 3) } + +func TestMultiMasterNodePortAllocation(t *testing.T) { + var kubeAPIServers []*kubeapiservertesting.TestServer + var clientAPIServers []*kubernetes.Clientset + etcd := framework.SharedEtcd() + + instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{ + DisableStorageCleanup: true, + } + + // cleanup the registry storage + defer registry.CleanupStorage() + + // create 2 api servers and 2 clients + for i := 0; i < 2; i++ { + // start master count api server + t.Logf("starting api server: %d", i) + server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{ + "--advertise-address", fmt.Sprintf("10.0.1.%v", i+1), + }, etcd) + kubeAPIServers = append(kubeAPIServers, server) + + // verify kube API servers have registered and create a client + if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) { + client, err := kubernetes.NewForConfig(kubeAPIServers[i].ClientConfig) + if err != nil { + t.Logf("create client error: %v", err) + return false, nil + } + clientAPIServers = append(clientAPIServers, client) + endpoints, err := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{}) + if err != nil { + t.Logf("error fetching endpoints: %v", err) + return false, nil + } + return verifyEndpointsWithIPs(kubeAPIServers, getEndpointIPs(endpoints)), nil + }); err != nil { + t.Fatalf("did not find only lease endpoints: %v", err) + } + } + + serviceObject := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo": "bar"}, + Name: "test-node-port", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "nodeport-test", + Port: 443, + TargetPort: intstr.IntOrString{IntVal: 443}, + NodePort: 32080, + Protocol: "TCP", + }, + }, + Type: "NodePort", + Selector: map[string]string{"foo": "bar"}, + }, + } + + // create and delete the same nodePortservice using different APIservers + // to check that API servers are using the same port allocation bitmap + for i := 0; i < 2; i++ { + // Create the service using the first API server + _, err := clientAPIServers[0].CoreV1().Services(metav1.NamespaceDefault).Create(serviceObject) + if err != nil { + t.Fatalf("unable to create service: %v", err) + } + // Delete the service using the second API server + if err := clientAPIServers[1].CoreV1().Services(metav1.NamespaceDefault).Delete(serviceObject.ObjectMeta.Name, &metav1.DeleteOptions{}); err != nil { + t.Fatalf("got unexpected error: %v", err) + } + } + + // shutdown the api servers + for _, server := range kubeAPIServers { + server.TearDownFn() + } + +}