-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
cluster-level lock for workload cluster client initialization
Before this commit, workload cluster client initialization required a global lock to be held. If initialization of a single workload cluster client took time, all other reconcile-loops who require a workload cluster connection were blocked until initialization finished. Initialization of a workload cluster client can take a significant amount of time, because it requires to initialize the discovery client, which sends multiple request to the API-server. With this change initialization of a workload cluster client only requires to hold a lock for the specific cluster. This means reconciliation for other clusters is not affected by a long running workload cluster client initialization.
- Loading branch information
Florian Gutmann
committed
Apr 5, 2022
1 parent
cb5d3dd
commit a3ca576
Showing
3 changed files
with
194 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
Copyright 2021 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
// Package mutex implements utilities for mutexes. | ||
package mutex | ||
|
||
import "sync" | ||
|
||
// KeyedMutex is a mutex locking on the key provided to the Lock function. | ||
// Only one caller can hold the lock for a specific key at a time. | ||
type KeyedMutex struct { | ||
locksMtx sync.Mutex | ||
locks map[interface{}]*keyLock | ||
} | ||
|
||
// NewKeyedMutex creates a new keyed mutex ready for use. | ||
func NewKeyedMutex() *KeyedMutex { | ||
return &KeyedMutex{ | ||
locksMtx: sync.Mutex{}, | ||
locks: make(map[interface{}]*keyLock), | ||
} | ||
} | ||
|
||
// keyLock is the lock for a single specific key. | ||
type keyLock struct { | ||
sync.Mutex | ||
// users is the number of callers attempting to acquire the mutex, including the one currently holding it. | ||
users uint | ||
} | ||
|
||
// Unlock unlocks a currently locked key. | ||
type Unlock func() | ||
|
||
// Lock locks the passed in key, blocking if the key is locked. | ||
// Returns the unlock function to release the lock on the key. | ||
func (k *KeyedMutex) Lock(key interface{}) Unlock { | ||
// Get an existing keyLock for the key or create a new one and increase the number of users. | ||
l := func() *keyLock { | ||
k.locksMtx.Lock() | ||
defer k.locksMtx.Unlock() | ||
|
||
l, ok := k.locks[key] | ||
if !ok { | ||
l = &keyLock{} | ||
k.locks[key] = l | ||
} | ||
l.users++ | ||
return l | ||
}() | ||
|
||
l.Lock() | ||
|
||
// Unlocks the keyLock for the key, decreases the counter and removes the keyLock from the map if there are no more users left. | ||
return func() { | ||
k.locksMtx.Lock() | ||
defer k.locksMtx.Unlock() | ||
|
||
l.Unlock() | ||
l.users-- | ||
if l.users == 0 { | ||
delete(k.locks, key) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
/* | ||
Copyright 2021 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package mutex | ||
|
||
import ( | ||
"testing" | ||
|
||
. "github.com/onsi/gomega" | ||
) | ||
|
||
func TestKeyedMutex(t *testing.T) { | ||
t.Run("blocks on a locked key until unlocked", func(t *testing.T) { | ||
t.Parallel() | ||
g := NewWithT(t) | ||
|
||
routineStarted := make(chan bool) | ||
routineCompleted := make(chan bool) | ||
key := "key1" | ||
|
||
km := NewKeyedMutex() | ||
unlock := km.Lock(key) | ||
|
||
// start a routine which tries to lock the same key | ||
go func() { | ||
routineStarted <- true | ||
unlock := km.Lock(key) | ||
unlock() | ||
routineCompleted <- true | ||
}() | ||
|
||
<-routineStarted | ||
g.Consistently(routineCompleted).ShouldNot(Receive()) | ||
|
||
// routine should be able to acquire the lock for the key after we unlock | ||
unlock() | ||
g.Eventually(routineCompleted).Should(Receive()) | ||
|
||
// ensure that the lock was cleaned up from the internal map | ||
g.Expect(km.locks).To(HaveLen(0)) | ||
}) | ||
|
||
t.Run("can lock different keys without blocking", func(t *testing.T) { | ||
g := NewWithT(t) | ||
km := NewKeyedMutex() | ||
keys := []string{"a", "b", "c", "d"} | ||
unlocks := make([]Unlock, 0, len(keys)) | ||
|
||
// lock all keys | ||
for _, key := range keys { | ||
unlocks = append(unlocks, km.Lock(key)) | ||
} | ||
|
||
// unlock all keys | ||
for _, unlock := range unlocks { | ||
unlock() | ||
} | ||
|
||
// ensure that the lock was cleaned up from the internal map | ||
g.Expect(km.locks).To(HaveLen(0)) | ||
}) | ||
} |