Skip to content

Commit

Permalink
Merge pull request kubernetes#25151 from sttts/sttts-nodeport-sync-3.11
Browse files Browse the repository at this point in the history
Bug 1753649: UPSTREAM: 89937: portAllocator sync local data before allocate

Origin-commit: d548b192d8b9e38402772509c8f0abc60ac7069c
  • Loading branch information
k8s-publishing-bot committed Jul 16, 2020
2 parents d9a5bec + c1007de commit d120195
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 32 deletions.
54 changes: 22 additions & 32 deletions pkg/registry/core/service/allocator/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,12 @@ func NewEtcd(alloc allocator.Snapshottable, baseKey string, resource schema.Grou
}
}

// Allocate attempts to allocate the item locally and then in etcd.
// Allocate attempts to allocate the item.
func (e *Etcd) Allocate(offset int) (bool, error) {
e.lock.Lock()
defer e.lock.Unlock()

ok, err := e.alloc.Allocate(offset)
if !ok || err != nil {
return ok, err
}

err = e.tryUpdate(func() error {
err := e.tryUpdate(func() error {
ok, err := e.alloc.Allocate(offset)
if err != nil {
return err
Expand All @@ -105,49 +100,44 @@ func (e *Etcd) Allocate(offset int) (bool, error) {
return true, nil
}

// AllocateNext attempts to allocate the next item locally and then in etcd.
// AllocateNext attempts to allocate the next item.
func (e *Etcd) AllocateNext() (int, bool, error) {
e.lock.Lock()
defer e.lock.Unlock()

offset, ok, err := e.alloc.AllocateNext()
if !ok || err != nil {
return offset, ok, err
}
var offset int
var ok bool
var err error

err = e.tryUpdate(func() error {
ok, err := e.alloc.Allocate(offset)
// update the offset here
offset, ok, err = e.alloc.AllocateNext()
if err != nil {
return err
}
if !ok {
// update the offset here
offset, ok, err = e.alloc.AllocateNext()
if err != nil {
return err
}
if !ok {
return errorUnableToAllocate
}
return nil
return errorUnableToAllocate
}
return nil
})
return offset, ok, err

if err != nil {
if err == errorUnableToAllocate {
return offset, false, nil
}
return offset, false, err
}
return offset, true, nil
}

// Release attempts to release the provided item locally and then in etcd.
// Release attempts to release the provided item.
func (e *Etcd) Release(item int) error {
e.lock.Lock()
defer e.lock.Unlock()

if err := e.alloc.Release(item); err != nil {
return err
}

return e.tryUpdate(func() error {
return e.alloc.Release(item)
})

}

func (e *Etcd) ForEach(fn func(int)) {
Expand All @@ -168,9 +158,9 @@ func (e *Etcd) tryUpdate(fn func() error) error {
if err := e.alloc.Restore(existing.Range, existing.Data); err != nil {
return nil, err
}
if err := fn(); err != nil {
return nil, err
}
}
if err := fn(); err != nil {
return nil, err
}
e.last = existing.ResourceVersion
rangeSpec, data := e.alloc.Snapshot()
Expand Down
56 changes: 56 additions & 0 deletions pkg/registry/core/service/allocator/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,59 @@ func TestStore(t *testing.T) {
t.Fatal(err)
}
}

// Test that one item is allocated in storage but is not allocated locally
// When try to allocate it, it should fail despite it's free in the local bitmap
// bot not in the storage
func TestAllocatedStorageButReleasedLocally(t *testing.T) {
storage, server, backing, _ := newStorage(t)
defer server.Terminate(t)
if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}

// Allocate an item in the storage
if _, err := storage.Allocate(2); err != nil {
t.Fatal(err)
}

// Release the item in the local bitmap
// emulating it's out of sync with the storage
err := backing.Release(2)
if err != nil {
t.Fatal(err)
}

// It should fail trying to allocate it deespite it's free
// in the local bitmap because it's not in the storage
ok, err := storage.Allocate(2)
if ok || err != nil {
t.Fatal(err)
}

}

// Test that one item is free in storage but is allocated locally
// When try to allocate it, it should succeed despite it's allocated
// in the local bitmap bot not in the storage
func TestAllocatedLocallyButReleasedStorage(t *testing.T) {
storage, server, backing, _ := newStorage(t)
defer server.Terminate(t)
if err := storage.storage.Create(context.TODO(), key(), validNewRangeAllocation(), nil, 0); err != nil {
t.Fatalf("unexpected error: %v", err)
}

// Allocate an item in the local bitmap only but not in the storage
// emulating it's out of sync with the storage
if _, err := backing.Allocate(2); err != nil {
t.Fatal(err)
}

// It should be able to allocate it
// because it's free in the storage
ok, err := storage.Allocate(2)
if !ok || err != nil {
t.Fatal(err)
}

}
1 change: 1 addition & 0 deletions pkg/registry/core/service/portallocator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ filegroup(
srcs = [
":package-srcs",
"//pkg/registry/core/service/portallocator/controller:all-srcs",
"//pkg/registry/core/service/portallocator/storage:all-srcs",
],
tags = ["automanaged"],
)
82 changes: 82 additions & 0 deletions test/integration/master/kube_apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -250,3 +251,84 @@ func TestReconcilerMasterLeaseMultiMoreMasters(t *testing.T) {
func TestReconcilerMasterLeaseMultiCombined(t *testing.T) {
testReconcilersMasterLease(t, 3, 3)
}

func TestMultiMasterNodePortAllocation(t *testing.T) {
var kubeAPIServers []*kubeapiservertesting.TestServer
var clientAPIServers []*kubernetes.Clientset
etcd := framework.SharedEtcd()

instanceOptions := &kubeapiservertesting.TestServerInstanceOptions{
DisableStorageCleanup: true,
}

// cleanup the registry storage
defer registry.CleanupStorage()

// create 2 api servers and 2 clients
for i := 0; i < 2; i++ {
// start master count api server
t.Logf("starting api server: %d", i)
server := kubeapiservertesting.StartTestServerOrDie(t, instanceOptions, []string{
"--advertise-address", fmt.Sprintf("10.0.1.%v", i+1),
}, etcd)
kubeAPIServers = append(kubeAPIServers, server)

// verify kube API servers have registered and create a client
if err := wait.PollImmediate(3*time.Second, 2*time.Minute, func() (bool, error) {
client, err := kubernetes.NewForConfig(kubeAPIServers[i].ClientConfig)
if err != nil {
t.Logf("create client error: %v", err)
return false, nil
}
clientAPIServers = append(clientAPIServers, client)
endpoints, err := client.CoreV1().Endpoints("default").Get("kubernetes", metav1.GetOptions{})
if err != nil {
t.Logf("error fetching endpoints: %v", err)
return false, nil
}
return verifyEndpointsWithIPs(kubeAPIServers, getEndpointIPs(endpoints)), nil
}); err != nil {
t.Fatalf("did not find only lease endpoints: %v", err)
}
}

serviceObject := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{"foo": "bar"},
Name: "test-node-port",
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "nodeport-test",
Port: 443,
TargetPort: intstr.IntOrString{IntVal: 443},
NodePort: 32080,
Protocol: "TCP",
},
},
Type: "NodePort",
Selector: map[string]string{"foo": "bar"},
},
}

// create and delete the same nodePortservice using different APIservers
// to check that API servers are using the same port allocation bitmap
for i := 0; i < 2; i++ {
// Create the service using the first API server
_, err := clientAPIServers[0].CoreV1().Services(metav1.NamespaceDefault).Create(serviceObject)
if err != nil {
t.Fatalf("unable to create service: %v", err)
}
// Delete the service using the second API server
if err := clientAPIServers[1].CoreV1().Services(metav1.NamespaceDefault).Delete(serviceObject.ObjectMeta.Name, &metav1.DeleteOptions{}); err != nil {
t.Fatalf("got unexpected error: %v", err)
}
}

// shutdown the api servers
for _, server := range kubeAPIServers {
server.TearDownFn()
}

}

0 comments on commit d120195

Please sign in to comment.