diff --git a/db/util_testing.go b/db/util_testing.go index 60127a1ef0..e79817534f 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -14,6 +14,7 @@ import ( "context" "errors" "fmt" + "sync/atomic" "testing" "time" @@ -220,10 +221,10 @@ func EmptyPrimaryIndex(ctx context.Context, dataStore sgbucket.DataStore) error // purgeWithDCPFeed purges all documents seen on a DCP feed with system xattrs, including tombstones which aren't found when emptying the primary index. func purgeWithDCPFeed(ctx context.Context, dataStore sgbucket.DataStore, tbp *base.TestBucketPool) (numCompacted int, err error) { - purgedDocCount := 0 purgeTimeout := 60 * time.Second purgeBody := Body{"_purged": true} - processedDocCount := 0 + var processedDocCount atomic.Int64 + var purgedDocCount atomic.Int64 var purgeErrors *base.MultiError collection, err := base.AsCollection(dataStore) @@ -246,7 +247,7 @@ func purgeWithDCPFeed(ctx context.Context, dataStore sgbucket.DataStore, tbp *ba purgeCallback := func(event sgbucket.FeedEvent) bool { var purgeErr error - processedDocCount++ + processedDocCount.Add(1) // We only need to purge mutations/deletions if event.Opcode != sgbucket.FeedOpMutation && event.Opcode != sgbucket.FeedOpDeletion { return false @@ -277,7 +278,7 @@ func purgeWithDCPFeed(ctx context.Context, dataStore sgbucket.DataStore, tbp *ba purgeErrors = purgeErrors.Append(delErr) tbp.Logf(ctx, "Error deleting key %s. %v", key, delErr) } - purgedDocCount++ + purgedDocCount.Add(1) } else if purgeErr != nil { purgeErrors = purgeErrors.Append(purgeErr) tbp.Logf(ctx, "Error removing key %s (purge). %v", key, purgeErr) @@ -308,9 +309,9 @@ func purgeWithDCPFeed(ctx context.Context, dataStore sgbucket.DataStore, tbp *ba tbp.Logf(ctx, "error closing purge DCP feed: %v", closeErr) } - tbp.Logf(ctx, "Finished purge DCP feed ... Total docs purged: %d", purgedDocCount) - tbp.Logf(ctx, "Finished purge DCP feed ... Total docs processed: %d", processedDocCount) - return purgedDocCount, purgeErrors.ErrorOrNil() + tbp.Logf(ctx, "Finished purge DCP feed ... Total docs purged: %d", purgedDocCount.Load()) + tbp.Logf(ctx, "Finished purge DCP feed ... Total docs processed: %d", processedDocCount.Load()) + return int(purgedDocCount.Load()), purgeErrors.ErrorOrNil() } // emptyAllDocsIndex ensures the AllDocs index for the given bucket is empty, including tombstones which aren't found when emptying the primary index.