From 10bbd42799b39f34e7361f47601ea12c71b6efa7 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Thu, 23 May 2024 14:12:08 -0400 Subject: [PATCH] CBG-3940 use atomics for counting docs The DCP callback is accessed simultaneously by the numbers of DCPClientWorkers --- db/util_testing.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/db/util_testing.go b/db/util_testing.go index 60127a1ef0..e79817534f 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -14,6 +14,7 @@ import ( "context" "errors" "fmt" + "sync/atomic" "testing" "time" @@ -220,10 +221,10 @@ func EmptyPrimaryIndex(ctx context.Context, dataStore sgbucket.DataStore) error // purgeWithDCPFeed purges all documents seen on a DCP feed with system xattrs, including tombstones which aren't found when emptying the primary index. func purgeWithDCPFeed(ctx context.Context, dataStore sgbucket.DataStore, tbp *base.TestBucketPool) (numCompacted int, err error) { - purgedDocCount := 0 purgeTimeout := 60 * time.Second purgeBody := Body{"_purged": true} - processedDocCount := 0 + var processedDocCount atomic.Int64 + var purgedDocCount atomic.Int64 var purgeErrors *base.MultiError collection, err := base.AsCollection(dataStore) @@ -246,7 +247,7 @@ func purgeWithDCPFeed(ctx context.Context, dataStore sgbucket.DataStore, tbp *ba purgeCallback := func(event sgbucket.FeedEvent) bool { var purgeErr error - processedDocCount++ + processedDocCount.Add(1) // We only need to purge mutations/deletions if event.Opcode != sgbucket.FeedOpMutation && event.Opcode != sgbucket.FeedOpDeletion { return false @@ -277,7 +278,7 @@ func purgeWithDCPFeed(ctx context.Context, dataStore sgbucket.DataStore, tbp *ba purgeErrors = purgeErrors.Append(delErr) tbp.Logf(ctx, "Error deleting key %s. %v", key, delErr) } - purgedDocCount++ + purgedDocCount.Add(1) } else if purgeErr != nil { purgeErrors = purgeErrors.Append(purgeErr) tbp.Logf(ctx, "Error removing key %s (purge). %v", key, purgeErr) @@ -308,9 +309,9 @@ func purgeWithDCPFeed(ctx context.Context, dataStore sgbucket.DataStore, tbp *ba tbp.Logf(ctx, "error closing purge DCP feed: %v", closeErr) } - tbp.Logf(ctx, "Finished purge DCP feed ... Total docs purged: %d", purgedDocCount) - tbp.Logf(ctx, "Finished purge DCP feed ... Total docs processed: %d", processedDocCount) - return purgedDocCount, purgeErrors.ErrorOrNil() + tbp.Logf(ctx, "Finished purge DCP feed ... Total docs purged: %d", purgedDocCount.Load()) + tbp.Logf(ctx, "Finished purge DCP feed ... Total docs processed: %d", processedDocCount.Load()) + return int(purgedDocCount.Load()), purgeErrors.ErrorOrNil() } // emptyAllDocsIndex ensures the AllDocs index for the given bucket is empty, including tombstones which aren't found when emptying the primary index.