Skip to content

Commit

Permalink
Merge pull request #11328 from sivchari/replace-atomic-package
Browse files Browse the repository at this point in the history
🌱  use latest atomic variable instead of old one
  • Loading branch information
k8s-ci-robot authored Oct 23, 2024
2 parents 214ab6d + b9a4ffe commit 97f9dd6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
20 changes: 11 additions & 9 deletions test/infrastructure/inmemory/pkg/runtime/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ func Test_cache_scale(t *testing.T) {
operationFrequencyForResourceGroup := 10 * time.Millisecond
testDuration := 2 * time.Minute

var createCount uint64
var getCount uint64
var listCount uint64
var deleteCount uint64
var (
createCount atomic.Uint64
getCount atomic.Uint64
listCount atomic.Uint64
deleteCount atomic.Uint64
)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
Expand Down Expand Up @@ -95,17 +97,17 @@ func Test_cache_scale(t *testing.T) {
err := c.Create(resourceGroup, machine)
if apierrors.IsAlreadyExists(err) {
if err = c.Get(resourceGroup, types.NamespacedName{Name: machineName(item)}, machine); err == nil {
atomic.AddUint64(&getCount, 1)
getCount.Add(1)
continue
}
}
g.Expect(err).ToNot(HaveOccurred())
atomic.AddUint64(&createCount, 1)
createCount.Add(1)
case 1: // list
obj := &cloudv1.CloudMachineList{}
err := c.List(resourceGroup, obj)
g.Expect(err).ToNot(HaveOccurred())
atomic.AddUint64(&listCount, 1)
listCount.Add(1)
case 2: // delete
g.Expect(err).ToNot(HaveOccurred())
machine := &cloudv1.CloudMachine{
Expand All @@ -118,7 +120,7 @@ func Test_cache_scale(t *testing.T) {
continue
}
g.Expect(err).ToNot(HaveOccurred())
atomic.AddUint64(&deleteCount, 1)
deleteCount.Add(1)
}

case <-ctx.Done():
Expand All @@ -130,7 +132,7 @@ func Test_cache_scale(t *testing.T) {

time.Sleep(testDuration)

t.Log("createCount", createCount, "getCount", getCount, "listCount", listCount, "deleteCount", deleteCount)
t.Log("createCount", createCount.Load(), "getCount", getCount.Load(), "listCount", listCount.Load(), "deleteCount", deleteCount.Load())

cancel()
}
8 changes: 4 additions & 4 deletions test/infrastructure/inmemory/pkg/runtime/cache/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ func (c *cache) startGarbageCollector(ctx context.Context) error {
ctx = ctrl.LoggerInto(ctx, log)

log.Info("Starting garbage collector queue")
c.garbageCollectorQueue = workqueue.NewTypedRateLimitingQueue[any](workqueue.DefaultTypedControllerRateLimiter[any]())
c.garbageCollectorQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
go func() {
<-ctx.Done()
c.garbageCollectorQueue.ShutDown()
}()

var workers int64
var workers atomic.Int64
go func() {
log.Info("Starting garbage collector workers", "count", c.garbageCollectorConcurrency)
wg := &sync.WaitGroup{}
wg.Add(c.garbageCollectorConcurrency)
for range c.garbageCollectorConcurrency {
go func() {
atomic.AddInt64(&workers, 1)
workers.Add(1)
defer wg.Done()
for c.processGarbageCollectorWorkItem(ctx) {
}
Expand All @@ -65,7 +65,7 @@ func (c *cache) startGarbageCollector(ctx context.Context) error {
}()

if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 5*time.Second, false, func(context.Context) (done bool, err error) {
if atomic.LoadInt64(&workers) < int64(c.garbageCollectorConcurrency) {
if workers.Load() < int64(c.garbageCollectorConcurrency) {
return false, nil
}
return true, nil
Expand Down

0 comments on commit 97f9dd6

Please sign in to comment.