Skip to content

Commit

Permalink
use latest atomic variable instead of old one
Browse files Browse the repository at this point in the history
Signed-off-by: sivchari <[email protected]>
  • Loading branch information
sivchari committed Oct 23, 2024
1 parent 76328ed commit 20a2e79
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
18 changes: 10 additions & 8 deletions test/infrastructure/inmemory/pkg/runtime/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ func Test_cache_scale(t *testing.T) {
operationFrequencyForResourceGroup := 10 * time.Millisecond
testDuration := 2 * time.Minute

var createCount uint64
var getCount uint64
var listCount uint64
var deleteCount uint64
var (
createCount atomic.Uint64
getCount atomic.Uint64
listCount atomic.Uint64
deleteCount atomic.Uint64
)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
Expand Down Expand Up @@ -95,17 +97,17 @@ func Test_cache_scale(t *testing.T) {
err := c.Create(resourceGroup, machine)
if apierrors.IsAlreadyExists(err) {
if err = c.Get(resourceGroup, types.NamespacedName{Name: machineName(item)}, machine); err == nil {
atomic.AddUint64(&getCount, 1)
getCount.Add(1)
continue
}
}
g.Expect(err).ToNot(HaveOccurred())
atomic.AddUint64(&createCount, 1)
createCount.Add(1)
case 1: // list
obj := &cloudv1.CloudMachineList{}
err := c.List(resourceGroup, obj)
g.Expect(err).ToNot(HaveOccurred())
atomic.AddUint64(&listCount, 1)
listCount.Add(1)
case 2: // delete
g.Expect(err).ToNot(HaveOccurred())
machine := &cloudv1.CloudMachine{
Expand All @@ -118,7 +120,7 @@ func Test_cache_scale(t *testing.T) {
continue
}
g.Expect(err).ToNot(HaveOccurred())
atomic.AddUint64(&deleteCount, 1)
deleteCount.Add(1)
}

case <-ctx.Done():
Expand Down
8 changes: 4 additions & 4 deletions test/infrastructure/inmemory/pkg/runtime/cache/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ func (c *cache) startGarbageCollector(ctx context.Context) error {
ctx = ctrl.LoggerInto(ctx, log)

log.Info("Starting garbage collector queue")
c.garbageCollectorQueue = workqueue.NewTypedRateLimitingQueue[any](workqueue.DefaultTypedControllerRateLimiter[any]())
c.garbageCollectorQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
go func() {
<-ctx.Done()
c.garbageCollectorQueue.ShutDown()
}()

var workers int64
var workers atomic.Int64
go func() {
log.Info("Starting garbage collector workers", "count", c.garbageCollectorConcurrency)
wg := &sync.WaitGroup{}
wg.Add(c.garbageCollectorConcurrency)
for range c.garbageCollectorConcurrency {
go func() {
atomic.AddInt64(&workers, 1)
workers.Add(1)
defer wg.Done()
for c.processGarbageCollectorWorkItem(ctx) {
}
Expand All @@ -65,7 +65,7 @@ func (c *cache) startGarbageCollector(ctx context.Context) error {
}()

if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 5*time.Second, false, func(context.Context) (done bool, err error) {
if atomic.LoadInt64(&workers) < int64(c.garbageCollectorConcurrency) {
if workers.Load() < int64(c.garbageCollectorConcurrency) {
return false, nil
}
return true, nil
Expand Down

0 comments on commit 20a2e79

Please sign in to comment.