diff --git a/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommand.cs b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommand.cs
index 61c861316..df83b653d 100644
--- a/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommand.cs
+++ b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommand.cs
@@ -2,86 +2,31 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
using System.Diagnostics;
-using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Options;
-using NuGet.Packaging;
-using NuGet.Services.AzureSearch.AuxiliaryFiles;
-using NuGet.Services.AzureSearch.Wrappers;
-using NuGet.Services.Metadata.Catalog.Helpers;
-using NuGet.Versioning;
-using NuGetGallery;
namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch
{
public class Auxiliary2AzureSearchCommand : IAzureSearchCommand
{
- ///
- /// A package ID can result in one document per search filter if the there is a version that applies to each
- /// of the filters. The simplest such case is a prerelease, SemVer 1.0.0 package version like 1.0.0-beta. This
- /// version applies to all package filters.
- ///
- private static readonly int MaxDocumentsPerId = Enum.GetValues(typeof(SearchFilters)).Length;
-
- private readonly IAuxiliaryFileClient _auxiliaryFileClient;
- private readonly IDatabaseAuxiliaryDataFetcher _databaseFetcher;
- private readonly IDownloadDataClient _downloadDataClient;
- private readonly IVerifiedPackagesDataClient _verifiedPackagesDataClient;
- private readonly IDownloadSetComparer _downloadSetComparer;
- private readonly ISearchDocumentBuilder _searchDocumentBuilder;
- private readonly ISearchIndexActionBuilder _indexActionBuilder;
- private readonly Func _batchPusherFactory;
- private readonly ISystemTime _systemTime;
- private readonly IOptionsSnapshot _options;
+ private readonly IAzureSearchCommand[] _commands;
private readonly IAzureSearchTelemetryService _telemetryService;
private readonly ILogger _logger;
- private readonly StringCache _stringCache;
public Auxiliary2AzureSearchCommand(
- IAuxiliaryFileClient auxiliaryFileClient,
- IDatabaseAuxiliaryDataFetcher databaseFetcher,
- IDownloadDataClient downloadDataClient,
- IVerifiedPackagesDataClient verifiedPackagesDataClient,
- IDownloadSetComparer downloadSetComparer,
- ISearchDocumentBuilder searchDocumentBuilder,
- ISearchIndexActionBuilder indexActionBuilder,
- Func batchPusherFactory,
- ISystemTime systemTime,
- IOptionsSnapshot options,
+ IAzureSearchCommand updateVerifiedPackagesCommand,
+ IAzureSearchCommand updateDownloadsCommand,
IAzureSearchTelemetryService telemetryService,
ILogger logger)
{
- _auxiliaryFileClient = auxiliaryFileClient ?? throw new ArgumentException(nameof(auxiliaryFileClient));
- _databaseFetcher = databaseFetcher ?? throw new ArgumentNullException(nameof(databaseFetcher));
- _downloadDataClient = downloadDataClient ?? throw new ArgumentNullException(nameof(downloadDataClient));
- _verifiedPackagesDataClient = verifiedPackagesDataClient ?? throw new ArgumentNullException(nameof(verifiedPackagesDataClient));
- _downloadSetComparer = downloadSetComparer ?? throw new ArgumentNullException(nameof(downloadSetComparer));
- _searchDocumentBuilder = searchDocumentBuilder ?? throw new ArgumentNullException(nameof(searchDocumentBuilder));
- _indexActionBuilder = indexActionBuilder ?? throw new ArgumentNullException(nameof(indexActionBuilder));
- _batchPusherFactory = batchPusherFactory ?? throw new ArgumentNullException(nameof(batchPusherFactory));
- _systemTime = systemTime ?? throw new ArgumentNullException(nameof(systemTime));
- _options = options ?? throw new ArgumentNullException(nameof(options));
+ _commands = new[]
+ {
+ updateVerifiedPackagesCommand ?? throw new ArgumentNullException(nameof(updateVerifiedPackagesCommand)),
+ updateDownloadsCommand ?? throw new ArgumentNullException(nameof(updateDownloadsCommand)),
+ };
_telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
- _stringCache = new StringCache();
-
- if (_options.Value.MaxConcurrentBatches <= 0)
- {
- throw new ArgumentOutOfRangeException(
- nameof(options),
- $"The {nameof(AzureSearchJobConfiguration.MaxConcurrentBatches)} must be greater than zero.");
- }
-
- if (_options.Value.MaxConcurrentVersionListWriters <= 0)
- {
- throw new ArgumentOutOfRangeException(
- nameof(options),
- $"The {nameof(AzureSearchJobConfiguration.MaxConcurrentVersionListWriters)} must be greater than zero.");
- }
}
public async Task ExecuteAsync()
@@ -90,9 +35,14 @@ public async Task ExecuteAsync()
var outcome = JobOutcome.Failure;
try
{
- var hasVerifiedPackagesChanged = await UpdateVerifiedPackagesAsync();
- var hasIndexChanged = await PushIndexChangesAsync();
- outcome = hasVerifiedPackagesChanged || hasIndexChanged ? JobOutcome.Success : JobOutcome.NoOp;
+ foreach (var command in _commands)
+ {
+ _logger.LogInformation("Starting {CommandName}...", command.GetType().Name);
+ await command.ExecuteAsync();
+ _logger.LogInformation("Completed {CommandName}.", command.GetType().Name);
+ }
+
+ outcome = JobOutcome.Success;
}
finally
{
@@ -100,272 +50,5 @@ public async Task ExecuteAsync()
_telemetryService.TrackAuxiliary2AzureSearchCompleted(outcome, stopwatch.Elapsed);
}
}
-
- private async Task UpdateVerifiedPackagesAsync()
- {
- // The "old" data in this case is the latest file that was copied to the region's storage container by this
- // job (or initialized by Db2AzureSearch).
- var oldResult = await _verifiedPackagesDataClient.ReadLatestAsync(
- AccessConditionWrapper.GenerateEmptyCondition(),
- _stringCache);
-
- // The "new" data in this case is from the database.
- var newData = await _databaseFetcher.GetVerifiedPackagesAsync();
-
- var changes = new HashSet(oldResult.Data, oldResult.Data.Comparer);
- changes.SymmetricExceptWith(newData);
- _logger.LogInformation("{Count} package IDs have verified status changes.", changes.Count);
-
- if (changes.Count == 0)
- {
- return false;
- }
- else
- {
- await _verifiedPackagesDataClient.ReplaceLatestAsync(newData, oldResult.Metadata.GetIfMatchCondition());
- return true;
- }
- }
-
- private async Task PushIndexChangesAsync()
- {
- // The "old" data in this case is the download count data that was last indexed by this job (or
- // initialized by Db2AzureSearch).
- _logger.LogInformation("Fetching old download count data from blob storage.");
- var oldResult = await _downloadDataClient.ReadLatestIndexedAsync(
- AccessConditionWrapper.GenerateEmptyCondition(),
- _stringCache);
-
- // The "new" data in this case is from the statistics pipeline.
- _logger.LogInformation("Fetching new download count data from blob storage.");
- var newData = await _auxiliaryFileClient.LoadDownloadDataAsync();
-
- _logger.LogInformation("Removing invalid IDs and versions from the old data.");
- CleanDownloadData(oldResult.Data);
-
- _logger.LogInformation("Removing invalid IDs and versions from the new data.");
- CleanDownloadData(newData);
-
- // Fetch the download overrides from the auxiliary file. Note that the overriden downloads are kept
- // separate from downloads data as the original data will be persisted to auxiliary data, whereas the
- // overriden data will be persisted to Azure Search.
- _logger.LogInformation("Overriding download count data.");
- var downloadOverrides = await _auxiliaryFileClient.LoadDownloadOverridesAsync();
- var overridenDownloads = newData.ApplyDownloadOverrides(downloadOverrides, _logger);
-
- _logger.LogInformation("Detecting download count changes.");
- var changes = _downloadSetComparer.Compare(oldResult.Data, overridenDownloads);
- var idBag = new ConcurrentBag(changes.Keys);
- _logger.LogInformation("{Count} package IDs have download count changes.", idBag.Count);
-
- if (!changes.Any())
- {
- return false;
- }
-
- _logger.LogInformation(
- "Starting {Count} workers pushing download count changes to Azure Search.",
- _options.Value.MaxConcurrentBatches);
- await ParallelAsync.Repeat(
- () => WorkAsync(idBag, changes),
- _options.Value.MaxConcurrentBatches);
- _logger.LogInformation("All of the download count changes have been pushed to Azure Search.");
-
- _logger.LogInformation("Uploading the new download count data to blob storage.");
- await _downloadDataClient.ReplaceLatestIndexedAsync(newData, oldResult.Metadata.GetIfMatchCondition());
- return true;
- }
-
- private async Task WorkAsync(ConcurrentBag idBag, SortedDictionary changes)
- {
- // Perform two batching mechanisms:
- //
- // 1. Group package IDs into batches so version lists can be fetched in parallel.
- // 2. Group index actions so documents can be pushed to Azure Search in batches.
- //
- // Also, throttle the pushes to Azure Search based on time so that we don't cause too much load.
- var idsToIndex = new ConcurrentBag();
- var indexActionsToPush = new ConcurrentBag>();
- var timeSinceLastPush = new Stopwatch();
-
- while (idBag.TryTake(out var id))
- {
- // FIRST, check if we have a full batch of package IDs to produce index actions for.
- //
- // If all of the IDs to index and the current ID were to need a document for each search filter and
- // that number plus the current index actions to push would make the batch larger than the maximum
- // batch size, produce index actions for the IDs that we have collected so far.
- if (GetBatchSize(indexActionsToPush) + ((idsToIndex.Count + 1) * MaxDocumentsPerId) > _options.Value.AzureSearchBatchSize)
- {
- await GenerateIndexActionsAsync(idsToIndex, indexActionsToPush, changes);
- }
-
- // SECOND, check if we have a full batch of index actions to push to Azure Search.
- //
- // If the current ID were to need a document for each search filter and the current batch size would
- // make the batch larger than the maximum batch size, push the index actions we have so far.
- if (GetBatchSize(indexActionsToPush) + MaxDocumentsPerId > _options.Value.AzureSearchBatchSize)
- {
- _logger.LogInformation(
- "Starting to push a batch. There are {IdCount} unprocessed IDs left to index and push.",
- idBag.Count);
- await PushIndexActionsAsync(indexActionsToPush, timeSinceLastPush);
- }
-
- // THIRD, now that the two batching "buffers" have been flushed if necessary, add the current ID to the
- // batch of IDs to produce index actions for.
- idsToIndex.Add(id);
- }
-
- // Process any leftover IDs that didn't make it into a full batch.
- if (idsToIndex.Any())
- {
- await GenerateIndexActionsAsync(idsToIndex, indexActionsToPush, changes);
- }
-
- // Push any leftover index actions that didn't make it into a full batch.
- if (indexActionsToPush.Any())
- {
- await PushIndexActionsAsync(indexActionsToPush, timeSinceLastPush);
- }
-
- Guard.Assert(idsToIndex.IsEmpty, "There should be no more IDs to process.");
- Guard.Assert(indexActionsToPush.IsEmpty, "There should be no more index actions to push.");
- }
-
- ///
- /// Generate index actions for each provided ID. This reads the version list per package ID so we want to
- /// parallel this work by .
- ///
- private async Task GenerateIndexActionsAsync(
- ConcurrentBag idsToIndex,
- ConcurrentBag> indexActionsToPush,
- SortedDictionary changes)
- {
- await ParallelAsync.Repeat(
- async () =>
- {
- while (idsToIndex.TryTake(out var id))
- {
- var indexActions = await _indexActionBuilder.UpdateAsync(
- id,
- sf => _searchDocumentBuilder.UpdateDownloadCount(id, sf, changes[id]));
-
- if (indexActions.IsEmpty)
- {
- continue;
- }
-
- Guard.Assert(indexActions.Hijack.Count == 0, "There should be no hijack index changes.");
-
- indexActionsToPush.Add(new IdAndValue(id, indexActions));
- }
- },
- _options.Value.MaxConcurrentVersionListWriters);
- }
-
- private async Task PushIndexActionsAsync(
- ConcurrentBag> indexActionsToPush,
- Stopwatch timeSinceLastPush)
- {
- var elapsed = timeSinceLastPush.Elapsed;
- if (timeSinceLastPush.IsRunning && elapsed < _options.Value.MinPushPeriod)
- {
- var timeUntilNextPush = _options.Value.MinPushPeriod - elapsed;
- _logger.LogInformation(
- "Waiting for {Duration} before continuing.",
- timeUntilNextPush);
- await _systemTime.Delay(timeUntilNextPush);
- }
-
- /// Create a new batch pusher just for this operation. Note that we don't use the internal queue of the
- /// batch pusher for more than a single batch because we want to control exactly when batches are pushed to
- /// Azure Search so that we can observe the
- /// configuration property.
- var batchPusher = _batchPusherFactory();
-
- _logger.LogInformation(
- "Pushing a batch of {IdCount} IDs and {DocumentCount} documents.",
- indexActionsToPush.Count,
- GetBatchSize(indexActionsToPush));
-
- while (indexActionsToPush.TryTake(out var indexActions))
- {
- batchPusher.EnqueueIndexActions(indexActions.Id, indexActions.Value);
- }
-
- // Note that this method can throw a storage exception if one of the version lists has been modified during
- // the execution of this job loop.
- await batchPusher.FinishAsync();
-
- // Restart the timer AFTER the push is completed to err on the side of caution.
- timeSinceLastPush.Restart();
-
- return 0;
- }
-
- ///
- /// This returns the number of documents that will be pushed to an Azure Search index in a
- /// single batch. The caller is responsible that this number does not exceed
- /// . If this job were to start pushing changes
- /// to more than one index (more than the search index), then the number returned here should be the max of
- /// the document counts per index.
- ///
- private int GetBatchSize(ConcurrentBag> indexActionsToPush)
- {
- return indexActionsToPush.Sum(x => x.Value.Search.Count);
- }
-
- private void CleanDownloadData(DownloadData data)
- {
- var invalidIdCount = 0;
- var invalidVersionCount = 0;
- var nonNormalizedVersionCount = 0;
-
- foreach (var id in data.Keys.ToList())
- {
- var isValidId = id.Length <= PackageIdValidator.MaxPackageIdLength && PackageIdValidator.IsValidPackageId(id);
- if (!isValidId)
- {
- invalidIdCount++;
- }
-
- foreach (var version in data[id].Keys.ToList())
- {
- var isValidVersion = NuGetVersion.TryParse(version, out var parsedVersion);
- if (!isValidVersion)
- {
- invalidVersionCount++;
- }
-
- if (!isValidId || !isValidVersion)
- {
- // Clear the download count if the ID or version is invalid.
- data.SetDownloadCount(id, version, 0);
- continue;
- }
-
- var normalizedVersion = parsedVersion.ToNormalizedString();
- var isNormalizedVersion = StringComparer.OrdinalIgnoreCase.Equals(version, normalizedVersion);
-
- if (!isNormalizedVersion)
- {
- nonNormalizedVersionCount++;
-
- // Use the normalized version string if the original was not normalized.
- var downloads = data.GetDownloadCount(id, version);
- data.SetDownloadCount(id, version, 0);
- data.SetDownloadCount(id, normalizedVersion, downloads);
- }
- }
- }
-
- _logger.LogInformation(
- "There were {InvalidIdCount} invalid IDs, {InvalidVersionCount} invalid versions, and " +
- "{NonNormalizedVersionCount} non-normalized IDs.",
- invalidIdCount,
- invalidVersionCount,
- nonNormalizedVersionCount);
- }
}
}
diff --git a/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateDownloadsCommand.cs b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateDownloadsCommand.cs
new file mode 100644
index 000000000..52d69c6fb
--- /dev/null
+++ b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateDownloadsCommand.cs
@@ -0,0 +1,337 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using NuGet.Packaging;
+using NuGet.Services.AzureSearch.AuxiliaryFiles;
+using NuGet.Services.AzureSearch.Wrappers;
+using NuGet.Services.Metadata.Catalog.Helpers;
+using NuGet.Versioning;
+using NuGetGallery;
+
+namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch
+{
+ public class UpdateDownloadsCommand : IAzureSearchCommand
+ {
+ ///
+ /// A package ID can result in one document per search filter if the there is a version that applies to each
+ /// of the filters. The simplest such case is a prerelease, SemVer 1.0.0 package version like 1.0.0-beta. This
+ /// version applies to all package filters.
+ ///
+ private static readonly int MaxDocumentsPerId = Enum.GetValues(typeof(SearchFilters)).Length;
+
+ private readonly IAuxiliaryFileClient _auxiliaryFileClient;
+ private readonly IDownloadDataClient _downloadDataClient;
+ private readonly IDownloadSetComparer _downloadSetComparer;
+ private readonly ISearchDocumentBuilder _searchDocumentBuilder;
+ private readonly ISearchIndexActionBuilder _indexActionBuilder;
+ private readonly Func _batchPusherFactory;
+ private readonly ISystemTime _systemTime;
+ private readonly IOptionsSnapshot _options;
+ private readonly IAzureSearchTelemetryService _telemetryService;
+ private readonly ILogger _logger;
+ private readonly StringCache _stringCache;
+
+ public UpdateDownloadsCommand(
+ IAuxiliaryFileClient auxiliaryFileClient,
+ IDownloadDataClient downloadDataClient,
+ IDownloadSetComparer downloadSetComparer,
+ ISearchDocumentBuilder searchDocumentBuilder,
+ ISearchIndexActionBuilder indexActionBuilder,
+ Func batchPusherFactory,
+ ISystemTime systemTime,
+ IOptionsSnapshot options,
+ IAzureSearchTelemetryService telemetryService,
+ ILogger logger)
+ {
+ _auxiliaryFileClient = auxiliaryFileClient ?? throw new ArgumentException(nameof(auxiliaryFileClient));
+ _downloadDataClient = downloadDataClient ?? throw new ArgumentNullException(nameof(downloadDataClient));
+ _downloadSetComparer = downloadSetComparer ?? throw new ArgumentNullException(nameof(downloadSetComparer));
+ _searchDocumentBuilder = searchDocumentBuilder ?? throw new ArgumentNullException(nameof(searchDocumentBuilder));
+ _indexActionBuilder = indexActionBuilder ?? throw new ArgumentNullException(nameof(indexActionBuilder));
+ _batchPusherFactory = batchPusherFactory ?? throw new ArgumentNullException(nameof(batchPusherFactory));
+ _systemTime = systemTime ?? throw new ArgumentNullException(nameof(systemTime));
+ _options = options ?? throw new ArgumentNullException(nameof(options));
+ _telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ _stringCache = new StringCache();
+
+ if (_options.Value.MaxConcurrentBatches <= 0)
+ {
+ throw new ArgumentOutOfRangeException(
+ nameof(options),
+ $"The {nameof(AzureSearchJobConfiguration.MaxConcurrentBatches)} must be greater than zero.");
+ }
+
+ if (_options.Value.MaxConcurrentVersionListWriters <= 0)
+ {
+ throw new ArgumentOutOfRangeException(
+ nameof(options),
+ $"The {nameof(AzureSearchJobConfiguration.MaxConcurrentVersionListWriters)} must be greater than zero.");
+ }
+ }
+
+ public async Task ExecuteAsync()
+ {
+ var stopwatch = Stopwatch.StartNew();
+ var outcome = JobOutcome.Failure;
+ try
+ {
+ outcome = await PushIndexChangesAsync() ? JobOutcome.Success : JobOutcome.NoOp;
+ }
+ finally
+ {
+ stopwatch.Stop();
+ _telemetryService.TrackUpdateDownloadsCompleted(outcome, stopwatch.Elapsed);
+ }
+ }
+
+ private async Task PushIndexChangesAsync()
+ {
+ // The "old" data in this case is the download count data that was last indexed by this job (or
+ // initialized by Db2AzureSearch).
+ _logger.LogInformation("Fetching old download count data from blob storage.");
+ var oldResult = await _downloadDataClient.ReadLatestIndexedAsync(
+ AccessConditionWrapper.GenerateEmptyCondition(),
+ _stringCache);
+
+ // The "new" data in this case is from the statistics pipeline.
+ _logger.LogInformation("Fetching new download count data from blob storage.");
+ var newData = await _auxiliaryFileClient.LoadDownloadDataAsync();
+
+ _logger.LogInformation("Removing invalid IDs and versions from the old data.");
+ CleanDownloadData(oldResult.Data);
+
+ _logger.LogInformation("Removing invalid IDs and versions from the new data.");
+ CleanDownloadData(newData);
+
+ // Fetch the download overrides from the auxiliary file. Note that the overriden downloads are kept
+ // separate from downloads data as the original data will be persisted to auxiliary data, whereas the
+ // overriden data will be persisted to Azure Search.
+ _logger.LogInformation("Overriding download count data.");
+ var downloadOverrides = await _auxiliaryFileClient.LoadDownloadOverridesAsync();
+ var overridenDownloads = newData.ApplyDownloadOverrides(downloadOverrides, _logger);
+
+ _logger.LogInformation("Detecting download count changes.");
+ var changes = _downloadSetComparer.Compare(oldResult.Data, overridenDownloads);
+ var idBag = new ConcurrentBag(changes.Keys);
+ _logger.LogInformation("{Count} package IDs have download count changes.", idBag.Count);
+
+ if (!changes.Any())
+ {
+ return false;
+ }
+
+ _logger.LogInformation(
+ "Starting {Count} workers pushing download count changes to Azure Search.",
+ _options.Value.MaxConcurrentBatches);
+ await ParallelAsync.Repeat(
+ () => WorkAsync(idBag, changes),
+ _options.Value.MaxConcurrentBatches);
+ _logger.LogInformation("All of the download count changes have been pushed to Azure Search.");
+
+ _logger.LogInformation("Uploading the new download count data to blob storage.");
+ await _downloadDataClient.ReplaceLatestIndexedAsync(newData, oldResult.Metadata.GetIfMatchCondition());
+ return true;
+ }
+
+ private async Task WorkAsync(ConcurrentBag idBag, SortedDictionary changes)
+ {
+ // Perform two batching mechanisms:
+ //
+ // 1. Group package IDs into batches so version lists can be fetched in parallel.
+ // 2. Group index actions so documents can be pushed to Azure Search in batches.
+ //
+ // Also, throttle the pushes to Azure Search based on time so that we don't cause too much load.
+ var idsToIndex = new ConcurrentBag();
+ var indexActionsToPush = new ConcurrentBag>();
+ var timeSinceLastPush = new Stopwatch();
+
+ while (idBag.TryTake(out var id))
+ {
+ // FIRST, check if we have a full batch of package IDs to produce index actions for.
+ //
+ // If all of the IDs to index and the current ID were to need a document for each search filter and
+ // that number plus the current index actions to push would make the batch larger than the maximum
+ // batch size, produce index actions for the IDs that we have collected so far.
+ if (GetBatchSize(indexActionsToPush) + ((idsToIndex.Count + 1) * MaxDocumentsPerId) > _options.Value.AzureSearchBatchSize)
+ {
+ await GenerateIndexActionsAsync(idsToIndex, indexActionsToPush, changes);
+ }
+
+ // SECOND, check if we have a full batch of index actions to push to Azure Search.
+ //
+ // If the current ID were to need a document for each search filter and the current batch size would
+ // make the batch larger than the maximum batch size, push the index actions we have so far.
+ if (GetBatchSize(indexActionsToPush) + MaxDocumentsPerId > _options.Value.AzureSearchBatchSize)
+ {
+ _logger.LogInformation(
+ "Starting to push a batch. There are {IdCount} unprocessed IDs left to index and push.",
+ idBag.Count);
+ await PushIndexActionsAsync(indexActionsToPush, timeSinceLastPush);
+ }
+
+ // THIRD, now that the two batching "buffers" have been flushed if necessary, add the current ID to the
+ // batch of IDs to produce index actions for.
+ idsToIndex.Add(id);
+ }
+
+ // Process any leftover IDs that didn't make it into a full batch.
+ if (idsToIndex.Any())
+ {
+ await GenerateIndexActionsAsync(idsToIndex, indexActionsToPush, changes);
+ }
+
+ // Push any leftover index actions that didn't make it into a full batch.
+ if (indexActionsToPush.Any())
+ {
+ await PushIndexActionsAsync(indexActionsToPush, timeSinceLastPush);
+ }
+
+ Guard.Assert(idsToIndex.IsEmpty, "There should be no more IDs to process.");
+ Guard.Assert(indexActionsToPush.IsEmpty, "There should be no more index actions to push.");
+ }
+
+ ///
+ /// Generate index actions for each provided ID. This reads the version list per package ID so we want to
+ /// parallel this work by .
+ ///
+ private async Task GenerateIndexActionsAsync(
+ ConcurrentBag idsToIndex,
+ ConcurrentBag> indexActionsToPush,
+ SortedDictionary changes)
+ {
+ await ParallelAsync.Repeat(
+ async () =>
+ {
+ while (idsToIndex.TryTake(out var id))
+ {
+ var indexActions = await _indexActionBuilder.UpdateAsync(
+ id,
+ sf => _searchDocumentBuilder.UpdateDownloadCount(id, sf, changes[id]));
+
+ if (indexActions.IsEmpty)
+ {
+ continue;
+ }
+
+ Guard.Assert(indexActions.Hijack.Count == 0, "There should be no hijack index changes.");
+
+ indexActionsToPush.Add(new IdAndValue(id, indexActions));
+ }
+ },
+ _options.Value.MaxConcurrentVersionListWriters);
+ }
+
+ private async Task PushIndexActionsAsync(
+ ConcurrentBag> indexActionsToPush,
+ Stopwatch timeSinceLastPush)
+ {
+ var elapsed = timeSinceLastPush.Elapsed;
+ if (timeSinceLastPush.IsRunning && elapsed < _options.Value.MinPushPeriod)
+ {
+ var timeUntilNextPush = _options.Value.MinPushPeriod - elapsed;
+ _logger.LogInformation(
+ "Waiting for {Duration} before continuing.",
+ timeUntilNextPush);
+ await _systemTime.Delay(timeUntilNextPush);
+ }
+
+ /// Create a new batch pusher just for this operation. Note that we don't use the internal queue of the
+ /// batch pusher for more than a single batch because we want to control exactly when batches are pushed to
+ /// Azure Search so that we can observe the
+ /// configuration property.
+ var batchPusher = _batchPusherFactory();
+
+ _logger.LogInformation(
+ "Pushing a batch of {IdCount} IDs and {DocumentCount} documents.",
+ indexActionsToPush.Count,
+ GetBatchSize(indexActionsToPush));
+
+ while (indexActionsToPush.TryTake(out var indexActions))
+ {
+ batchPusher.EnqueueIndexActions(indexActions.Id, indexActions.Value);
+ }
+
+ // Note that this method can throw a storage exception if one of the version lists has been modified during
+ // the execution of this job loop.
+ await batchPusher.FinishAsync();
+
+ // Restart the timer AFTER the push is completed to err on the side of caution.
+ timeSinceLastPush.Restart();
+
+ return 0;
+ }
+
+ ///
+ /// This returns the number of documents that will be pushed to an Azure Search index in a
+ /// single batch. The caller is responsible that this number does not exceed
+ /// . If this job were to start pushing changes
+ /// to more than one index (more than the search index), then the number returned here should be the max of
+ /// the document counts per index.
+ ///
+ private int GetBatchSize(ConcurrentBag> indexActionsToPush)
+ {
+ return indexActionsToPush.Sum(x => x.Value.Search.Count);
+ }
+
+ private void CleanDownloadData(DownloadData data)
+ {
+ var invalidIdCount = 0;
+ var invalidVersionCount = 0;
+ var nonNormalizedVersionCount = 0;
+
+ foreach (var id in data.Keys.ToList())
+ {
+ var isValidId = id.Length <= PackageIdValidator.MaxPackageIdLength && PackageIdValidator.IsValidPackageId(id);
+ if (!isValidId)
+ {
+ invalidIdCount++;
+ }
+
+ foreach (var version in data[id].Keys.ToList())
+ {
+ var isValidVersion = NuGetVersion.TryParse(version, out var parsedVersion);
+ if (!isValidVersion)
+ {
+ invalidVersionCount++;
+ }
+
+ if (!isValidId || !isValidVersion)
+ {
+ // Clear the download count if the ID or version is invalid.
+ data.SetDownloadCount(id, version, 0);
+ continue;
+ }
+
+ var normalizedVersion = parsedVersion.ToNormalizedString();
+ var isNormalizedVersion = StringComparer.OrdinalIgnoreCase.Equals(version, normalizedVersion);
+
+ if (!isNormalizedVersion)
+ {
+ nonNormalizedVersionCount++;
+
+ // Use the normalized version string if the original was not normalized.
+ var downloads = data.GetDownloadCount(id, version);
+ data.SetDownloadCount(id, version, 0);
+ data.SetDownloadCount(id, normalizedVersion, downloads);
+ }
+ }
+ }
+
+ _logger.LogInformation(
+ "There were {InvalidIdCount} invalid IDs, {InvalidVersionCount} invalid versions, and " +
+ "{NonNormalizedVersionCount} non-normalized IDs.",
+ invalidIdCount,
+ invalidVersionCount,
+ nonNormalizedVersionCount);
+ }
+ }
+}
diff --git a/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommand.cs b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommand.cs
new file mode 100644
index 000000000..8d0402b26
--- /dev/null
+++ b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommand.cs
@@ -0,0 +1,76 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Logging;
+using NuGet.Services.AzureSearch.AuxiliaryFiles;
+using NuGetGallery;
+
+namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch
+{
+ public class UpdateVerifiedPackagesCommand : IAzureSearchCommand
+ {
+ private readonly IDatabaseAuxiliaryDataFetcher _databaseFetcher;
+ private readonly IVerifiedPackagesDataClient _verifiedPackagesDataClient;
+ private readonly IAzureSearchTelemetryService _telemetryService;
+ private readonly ILogger _logger;
+ private readonly StringCache _stringCache;
+
+ public UpdateVerifiedPackagesCommand(
+ IDatabaseAuxiliaryDataFetcher databaseFetcher,
+ IVerifiedPackagesDataClient verifiedPackagesDataClient,
+ IAzureSearchTelemetryService telemetryService,
+ ILogger logger)
+ {
+ _databaseFetcher = databaseFetcher ?? throw new ArgumentNullException(nameof(databaseFetcher));
+ _verifiedPackagesDataClient = verifiedPackagesDataClient ?? throw new ArgumentNullException(nameof(verifiedPackagesDataClient));
+ _telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService));
+ _logger = logger ?? throw new ArgumentNullException(nameof(logger));
+ _stringCache = new StringCache();
+ }
+
+ public async Task ExecuteAsync()
+ {
+ var stopwatch = Stopwatch.StartNew();
+ var outcome = JobOutcome.Failure;
+ try
+ {
+ outcome = await UpdateVerifiedPackagesAsync() ? JobOutcome.Success : JobOutcome.NoOp;
+ }
+ finally
+ {
+ stopwatch.Stop();
+ _telemetryService.TrackUpdateVerifiedPackagesCompleted(outcome, stopwatch.Elapsed);
+ }
+ }
+
+ private async Task UpdateVerifiedPackagesAsync()
+ {
+ // The "old" data in this case is the latest file that was copied to the region's storage container by this
+ // job (or initialized by Db2AzureSearch).
+ var oldResult = await _verifiedPackagesDataClient.ReadLatestAsync(
+ AccessConditionWrapper.GenerateEmptyCondition(),
+ _stringCache);
+
+ // The "new" data in this case is from the database.
+ var newData = await _databaseFetcher.GetVerifiedPackagesAsync();
+
+ var changes = new HashSet(oldResult.Data, oldResult.Data.Comparer);
+ changes.SymmetricExceptWith(newData);
+ _logger.LogInformation("{Count} package IDs have verified status changes.", changes.Count);
+
+ if (changes.Count == 0)
+ {
+ return false;
+ }
+ else
+ {
+ await _verifiedPackagesDataClient.ReplaceLatestAsync(newData, oldResult.Metadata.GetIfMatchCondition());
+ return true;
+ }
+ }
+ }
+}
diff --git a/src/NuGet.Services.AzureSearch/AzureSearchTelemetryService.cs b/src/NuGet.Services.AzureSearch/AzureSearchTelemetryService.cs
index 623b3632a..60685d87c 100644
--- a/src/NuGet.Services.AzureSearch/AzureSearchTelemetryService.cs
+++ b/src/NuGet.Services.AzureSearch/AzureSearchTelemetryService.cs
@@ -404,5 +404,27 @@ public void TrackReadLatestVerifiedPackagesFromDatabase(int packageIdCount, Time
{ "PackageIdCount", packageIdCount.ToString() },
});
}
+
+ public void TrackUpdateVerifiedPackagesCompleted(JobOutcome outcome, TimeSpan elapsed)
+ {
+ _telemetryClient.TrackMetric(
+ Prefix + "UpdateVerifiedPackagesCompletedSeconds",
+ elapsed.TotalSeconds,
+ new Dictionary
+ {
+ { "Outcome", outcome.ToString() },
+ });
+ }
+
+ public void TrackUpdateDownloadsCompleted(JobOutcome outcome, TimeSpan elapsed)
+ {
+ _telemetryClient.TrackMetric(
+ Prefix + "UpdateDownloadsCompletedSeconds",
+ elapsed.TotalSeconds,
+ new Dictionary
+ {
+ { "Outcome", outcome.ToString() },
+ });
+ }
}
}
diff --git a/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs b/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs
index fdc277bc9..caeeabeb4 100644
--- a/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs
+++ b/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs
@@ -232,7 +232,13 @@ public static IServiceCollection AddAzureSearch(
services.AddSingleton();
- services.AddTransient();
+ services.AddTransient();
+ services.AddTransient();
+ services.AddTransient(p => new Auxiliary2AzureSearchCommand(
+ p.GetRequiredService(),
+ p.GetRequiredService(),
+ p.GetRequiredService(),
+ p.GetRequiredService>()));
services.AddTransient();
services.AddTransient();
diff --git a/src/NuGet.Services.AzureSearch/IAzureSearchTelemetryService.cs b/src/NuGet.Services.AzureSearch/IAzureSearchTelemetryService.cs
index 0b0707516..7b13c985f 100644
--- a/src/NuGet.Services.AzureSearch/IAzureSearchTelemetryService.cs
+++ b/src/NuGet.Services.AzureSearch/IAzureSearchTelemetryService.cs
@@ -49,8 +49,10 @@ void TrackDownloadCountDecrease(
void TrackV3GetDocument(TimeSpan elapsed);
void TrackV2GetDocumentWithSearchIndex(TimeSpan elapsed);
void TrackV2GetDocumentWithHijackIndex(TimeSpan elapsed);
+ void TrackUpdateVerifiedPackagesCompleted(JobOutcome outcome, TimeSpan elapsed);
void TrackReadLatestVerifiedPackages(int? packageIdCount, bool notModified, TimeSpan elapsed);
IDisposable TrackReplaceLatestVerifiedPackages(int packageIdCount);
void TrackAuxiliaryFilesStringCache(int stringCount, long charCount, int requestCount, int hitCount);
+ void TrackUpdateDownloadsCompleted(JobOutcome outcome, TimeSpan elapsed);
}
}
\ No newline at end of file
diff --git a/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj b/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj
index 35763d103..5502e8ef0 100644
--- a/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj
+++ b/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj
@@ -49,6 +49,8 @@
+
+
diff --git a/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommandFacts.cs b/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateDownloadsCommandFacts.cs
similarity index 87%
rename from tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommandFacts.cs
rename to tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateDownloadsCommandFacts.cs
index 03f34c002..6c0f988da 100644
--- a/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommandFacts.cs
+++ b/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateDownloadsCommandFacts.cs
@@ -4,10 +4,8 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
-using System.Data.Entity;
using System.Linq;
using System.Threading.Tasks;
-using Castle.Core.Logging;
using Microsoft.Azure.Search.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -21,7 +19,7 @@
namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch
{
- public class Auxiliary2AzureSearchCommandFacts
+ public class UpdateDownloadsCommandFacts
{
public class ExecuteAsync : Facts
{
@@ -29,47 +27,6 @@ public ExecuteAsync(ITestOutputHelper output) : base(output)
{
}
- [Fact]
- public async Task PushesAddedVerifiedPackage()
- {
- NewVerifiedPackagesData.Add("NuGet.Versioning");
-
- await Target.ExecuteAsync();
-
- VerifyCompletedTelemetry(JobOutcome.Success);
- VerifiedPackagesDataClient.Verify(
- x => x.ReplaceLatestAsync(
- NewVerifiedPackagesData,
- It.Is(a => a.IfMatchETag == OldVerifiedPackagesResult.Metadata.ETag)),
- Times.Once);
- }
-
- [Fact]
- public async Task PushesRemovedVerifiedPackage()
- {
- OldVerifiedPackagesData.Add("NuGet.Versioning");
-
- await Target.ExecuteAsync();
-
- VerifyCompletedTelemetry(JobOutcome.Success);
- VerifiedPackagesDataClient.Verify(
- x => x.ReplaceLatestAsync(
- NewVerifiedPackagesData,
- It.Is(a => a.IfMatchETag == OldVerifiedPackagesResult.Metadata.ETag)),
- Times.Once);
- }
-
- [Fact]
- public async Task DoesNotPushUnchangedVerifiedPackages()
- {
- await Target.ExecuteAsync();
-
- VerifyCompletedTelemetry(JobOutcome.NoOp);
- VerifiedPackagesDataClient.Verify(
- x => x.ReplaceLatestAsync(It.IsAny>(), It.IsAny()),
- Times.Never);
- }
-
[Fact]
public async Task PushesNothingWhenThereAreNoChanges()
{
@@ -372,9 +329,7 @@ public abstract class Facts
public Facts(ITestOutputHelper output)
{
AuxiliaryFileClient = new Mock();
- DatabaseAuxiliaryDataFetcher = new Mock();
DownloadDataClient = new Mock();
- VerifiedPackagesDataClient = new Mock();
DownloadSetComparer = new Mock();
SearchDocumentBuilder = new Mock();
IndexActionBuilder = new Mock();
@@ -405,14 +360,6 @@ public Facts(ITestOutputHelper output)
.Setup(x => x.LoadDownloadOverridesAsync())
.ReturnsAsync(() => DownloadOverrides);
- OldVerifiedPackagesData = new HashSet();
- OldVerifiedPackagesResult = Data.GetAuxiliaryFileResult(OldVerifiedPackagesData, "verified-packages-etag");
- VerifiedPackagesDataClient
- .Setup(x => x.ReadLatestAsync(It.IsAny(), It.IsAny()))
- .ReturnsAsync(() => OldVerifiedPackagesResult);
- NewVerifiedPackagesData = new HashSet();
- DatabaseAuxiliaryDataFetcher.Setup(x => x.GetVerifiedPackagesAsync()).ReturnsAsync(() => NewVerifiedPackagesData);
-
Changes = new SortedDictionary();
DownloadSetComparer
.Setup(x => x.Compare(It.IsAny(), It.IsAny()))
@@ -454,11 +401,9 @@ public Facts(ITestOutputHelper output)
CurrentBatch = new ConcurrentBag();
});
- Target = new Auxiliary2AzureSearchCommand(
+ Target = new UpdateDownloadsCommand(
AuxiliaryFileClient.Object,
- DatabaseAuxiliaryDataFetcher.Object,
DownloadDataClient.Object,
- VerifiedPackagesDataClient.Object,
DownloadSetComparer.Object,
SearchDocumentBuilder.Object,
IndexActionBuilder.Object,
@@ -470,9 +415,7 @@ public Facts(ITestOutputHelper output)
}
public Mock AuxiliaryFileClient { get; }
- public Mock DatabaseAuxiliaryDataFetcher { get; }
public Mock DownloadDataClient { get; }
- public Mock VerifiedPackagesDataClient { get; }
public Mock DownloadSetComparer { get; }
public Mock SearchDocumentBuilder { get; }
public Mock IndexActionBuilder { get; }
@@ -487,23 +430,20 @@ public Facts(ITestOutputHelper output)
public DownloadData NewDownloadData { get; }
public Dictionary DownloadOverrides { get; }
public SortedDictionary Changes { get; }
- public Auxiliary2AzureSearchCommand Target { get; }
+ public UpdateDownloadsCommand Target { get; }
public IndexActions IndexActions { get; set; }
public ConcurrentBag ProcessedIds { get; }
public ConcurrentBag PushedIds { get; }
public ConcurrentBag CurrentBatch { get; set; }
public ConcurrentBag> FinishedBatches { get; }
- public HashSet OldVerifiedPackagesData { get; }
- public AuxiliaryFileResult> OldVerifiedPackagesResult { get; }
- public HashSet NewVerifiedPackagesData { get; }
public void VerifyCompletedTelemetry(JobOutcome outcome)
{
TelemetryService.Verify(
- x => x.TrackAuxiliary2AzureSearchCompleted(It.IsAny(), It.IsAny()),
+ x => x.TrackUpdateDownloadsCompleted(It.IsAny(), It.IsAny()),
Times.Once);
TelemetryService.Verify(
- x => x.TrackAuxiliary2AzureSearchCompleted(outcome, It.IsAny()),
+ x => x.TrackUpdateDownloadsCompleted(outcome, It.IsAny()),
Times.Once);
}
diff --git a/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommandFacts.cs b/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommandFacts.cs
new file mode 100644
index 000000000..967a542eb
--- /dev/null
+++ b/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommandFacts.cs
@@ -0,0 +1,111 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Threading.Tasks;
+using Microsoft.Extensions.Options;
+using Moq;
+using NuGet.Services.AzureSearch.AuxiliaryFiles;
+using NuGet.Services.AzureSearch.Support;
+using NuGetGallery;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch
+{
+ public class UpdateVerifiedPackagesCommandFacts
+ {
+ public class ExecuteAsync : Facts
+ {
+ public ExecuteAsync(ITestOutputHelper output) : base(output)
+ {
+ }
+
+ [Fact]
+ public async Task PushesAddedVerifiedPackage()
+ {
+ NewVerifiedPackagesData.Add("NuGet.Versioning");
+
+ await Target.ExecuteAsync();
+
+ VerifyCompletedTelemetry(JobOutcome.Success);
+ VerifiedPackagesDataClient.Verify(
+ x => x.ReplaceLatestAsync(
+ NewVerifiedPackagesData,
+ It.Is(a => a.IfMatchETag == OldVerifiedPackagesResult.Metadata.ETag)),
+ Times.Once);
+ }
+
+ [Fact]
+ public async Task PushesRemovedVerifiedPackage()
+ {
+ OldVerifiedPackagesData.Add("NuGet.Versioning");
+
+ await Target.ExecuteAsync();
+
+ VerifyCompletedTelemetry(JobOutcome.Success);
+ VerifiedPackagesDataClient.Verify(
+ x => x.ReplaceLatestAsync(
+ NewVerifiedPackagesData,
+ It.Is(a => a.IfMatchETag == OldVerifiedPackagesResult.Metadata.ETag)),
+ Times.Once);
+ }
+
+ [Fact]
+ public async Task DoesNotPushUnchangedVerifiedPackages()
+ {
+ await Target.ExecuteAsync();
+
+ VerifyCompletedTelemetry(JobOutcome.NoOp);
+ VerifiedPackagesDataClient.Verify(
+ x => x.ReplaceLatestAsync(It.IsAny>(), It.IsAny()),
+ Times.Never);
+ }
+ }
+
+ public abstract class Facts
+ {
+ public Facts(ITestOutputHelper output)
+ {
+ DatabaseAuxiliaryDataFetcher = new Mock();
+ VerifiedPackagesDataClient = new Mock();
+ TelemetryService = new Mock();
+ Logger = output.GetLogger();
+
+ OldVerifiedPackagesData = new HashSet();
+ OldVerifiedPackagesResult = Data.GetAuxiliaryFileResult(OldVerifiedPackagesData, "verified-packages-etag");
+ VerifiedPackagesDataClient
+ .Setup(x => x.ReadLatestAsync(It.IsAny(), It.IsAny()))
+ .ReturnsAsync(() => OldVerifiedPackagesResult);
+ NewVerifiedPackagesData = new HashSet();
+ DatabaseAuxiliaryDataFetcher.Setup(x => x.GetVerifiedPackagesAsync()).ReturnsAsync(() => NewVerifiedPackagesData);
+
+ Target = new UpdateVerifiedPackagesCommand(
+ DatabaseAuxiliaryDataFetcher.Object,
+ VerifiedPackagesDataClient.Object,
+ TelemetryService.Object,
+ Logger);
+ }
+
+ public Mock DatabaseAuxiliaryDataFetcher { get; }
+ public Mock VerifiedPackagesDataClient { get; }
+ public Mock TelemetryService { get; }
+ public RecordingLogger Logger { get; }
+ public UpdateVerifiedPackagesCommand Target { get; }
+ public HashSet OldVerifiedPackagesData { get; }
+ public AuxiliaryFileResult> OldVerifiedPackagesResult { get; }
+ public HashSet NewVerifiedPackagesData { get; }
+
+ public void VerifyCompletedTelemetry(JobOutcome outcome)
+ {
+ TelemetryService.Verify(
+ x => x.TrackUpdateVerifiedPackagesCompleted(It.IsAny(), It.IsAny()),
+ Times.Once);
+ TelemetryService.Verify(
+ x => x.TrackUpdateVerifiedPackagesCompleted(outcome, It.IsAny()),
+ Times.Once);
+ }
+ }
+ }
+}
diff --git a/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj b/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj
index b9541c48f..1418648eb 100644
--- a/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj
+++ b/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj
@@ -37,7 +37,8 @@
-
+
+