From 4b78882b823ac3bb8d73ad4c7485206ccb9faa41 Mon Sep 17 00:00:00 2001 From: Joel Verhagen Date: Thu, 20 Feb 2020 15:10:00 -0800 Subject: [PATCH] Refactor Auxiliary2AzureSearch to split steps into classes (#747) Progress on https://github.com/NuGet/Engineering/issues/2978 --- .../Auxiliary2AzureSearchCommand.cs | 349 +----------------- .../UpdateDownloadsCommand.cs | 337 +++++++++++++++++ .../UpdateVerifiedPackagesCommand.cs | 76 ++++ .../AzureSearchTelemetryService.cs | 22 ++ .../DependencyInjectionExtensions.cs | 8 +- .../IAzureSearchTelemetryService.cs | 2 + .../NuGet.Services.AzureSearch.csproj | 2 + ...acts.cs => UpdateDownloadsCommandFacts.cs} | 70 +--- .../UpdateVerifiedPackagesCommandFacts.cs | 111 ++++++ .../NuGet.Services.AzureSearch.Tests.csproj | 3 +- 10 files changed, 580 insertions(+), 400 deletions(-) create mode 100644 src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateDownloadsCommand.cs create mode 100644 src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommand.cs rename tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/{Auxiliary2AzureSearchCommandFacts.cs => UpdateDownloadsCommandFacts.cs} (87%) create mode 100644 tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommandFacts.cs diff --git a/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommand.cs b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommand.cs index 61c861316..df83b653d 100644 --- a/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommand.cs +++ b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommand.cs @@ -2,86 +2,31 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; -using System.Collections.Concurrent; -using System.Collections.Generic; using System.Diagnostics; -using System.Linq; using System.Threading.Tasks; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using NuGet.Packaging; -using NuGet.Services.AzureSearch.AuxiliaryFiles; -using NuGet.Services.AzureSearch.Wrappers; -using NuGet.Services.Metadata.Catalog.Helpers; -using NuGet.Versioning; -using NuGetGallery; namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch { public class Auxiliary2AzureSearchCommand : IAzureSearchCommand { - /// - /// A package ID can result in one document per search filter if the there is a version that applies to each - /// of the filters. The simplest such case is a prerelease, SemVer 1.0.0 package version like 1.0.0-beta. This - /// version applies to all package filters. - /// - private static readonly int MaxDocumentsPerId = Enum.GetValues(typeof(SearchFilters)).Length; - - private readonly IAuxiliaryFileClient _auxiliaryFileClient; - private readonly IDatabaseAuxiliaryDataFetcher _databaseFetcher; - private readonly IDownloadDataClient _downloadDataClient; - private readonly IVerifiedPackagesDataClient _verifiedPackagesDataClient; - private readonly IDownloadSetComparer _downloadSetComparer; - private readonly ISearchDocumentBuilder _searchDocumentBuilder; - private readonly ISearchIndexActionBuilder _indexActionBuilder; - private readonly Func _batchPusherFactory; - private readonly ISystemTime _systemTime; - private readonly IOptionsSnapshot _options; + private readonly IAzureSearchCommand[] _commands; private readonly IAzureSearchTelemetryService _telemetryService; private readonly ILogger _logger; - private readonly StringCache _stringCache; public Auxiliary2AzureSearchCommand( - IAuxiliaryFileClient auxiliaryFileClient, - IDatabaseAuxiliaryDataFetcher databaseFetcher, - IDownloadDataClient downloadDataClient, - IVerifiedPackagesDataClient verifiedPackagesDataClient, - IDownloadSetComparer downloadSetComparer, - ISearchDocumentBuilder searchDocumentBuilder, - ISearchIndexActionBuilder indexActionBuilder, - Func batchPusherFactory, - ISystemTime systemTime, - IOptionsSnapshot options, + IAzureSearchCommand updateVerifiedPackagesCommand, + IAzureSearchCommand updateDownloadsCommand, IAzureSearchTelemetryService telemetryService, ILogger logger) { - _auxiliaryFileClient = auxiliaryFileClient ?? throw new ArgumentException(nameof(auxiliaryFileClient)); - _databaseFetcher = databaseFetcher ?? throw new ArgumentNullException(nameof(databaseFetcher)); - _downloadDataClient = downloadDataClient ?? throw new ArgumentNullException(nameof(downloadDataClient)); - _verifiedPackagesDataClient = verifiedPackagesDataClient ?? throw new ArgumentNullException(nameof(verifiedPackagesDataClient)); - _downloadSetComparer = downloadSetComparer ?? throw new ArgumentNullException(nameof(downloadSetComparer)); - _searchDocumentBuilder = searchDocumentBuilder ?? throw new ArgumentNullException(nameof(searchDocumentBuilder)); - _indexActionBuilder = indexActionBuilder ?? throw new ArgumentNullException(nameof(indexActionBuilder)); - _batchPusherFactory = batchPusherFactory ?? throw new ArgumentNullException(nameof(batchPusherFactory)); - _systemTime = systemTime ?? throw new ArgumentNullException(nameof(systemTime)); - _options = options ?? throw new ArgumentNullException(nameof(options)); + _commands = new[] + { + updateVerifiedPackagesCommand ?? throw new ArgumentNullException(nameof(updateVerifiedPackagesCommand)), + updateDownloadsCommand ?? throw new ArgumentNullException(nameof(updateDownloadsCommand)), + }; _telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _stringCache = new StringCache(); - - if (_options.Value.MaxConcurrentBatches <= 0) - { - throw new ArgumentOutOfRangeException( - nameof(options), - $"The {nameof(AzureSearchJobConfiguration.MaxConcurrentBatches)} must be greater than zero."); - } - - if (_options.Value.MaxConcurrentVersionListWriters <= 0) - { - throw new ArgumentOutOfRangeException( - nameof(options), - $"The {nameof(AzureSearchJobConfiguration.MaxConcurrentVersionListWriters)} must be greater than zero."); - } } public async Task ExecuteAsync() @@ -90,9 +35,14 @@ public async Task ExecuteAsync() var outcome = JobOutcome.Failure; try { - var hasVerifiedPackagesChanged = await UpdateVerifiedPackagesAsync(); - var hasIndexChanged = await PushIndexChangesAsync(); - outcome = hasVerifiedPackagesChanged || hasIndexChanged ? JobOutcome.Success : JobOutcome.NoOp; + foreach (var command in _commands) + { + _logger.LogInformation("Starting {CommandName}...", command.GetType().Name); + await command.ExecuteAsync(); + _logger.LogInformation("Completed {CommandName}.", command.GetType().Name); + } + + outcome = JobOutcome.Success; } finally { @@ -100,272 +50,5 @@ public async Task ExecuteAsync() _telemetryService.TrackAuxiliary2AzureSearchCompleted(outcome, stopwatch.Elapsed); } } - - private async Task UpdateVerifiedPackagesAsync() - { - // The "old" data in this case is the latest file that was copied to the region's storage container by this - // job (or initialized by Db2AzureSearch). - var oldResult = await _verifiedPackagesDataClient.ReadLatestAsync( - AccessConditionWrapper.GenerateEmptyCondition(), - _stringCache); - - // The "new" data in this case is from the database. - var newData = await _databaseFetcher.GetVerifiedPackagesAsync(); - - var changes = new HashSet(oldResult.Data, oldResult.Data.Comparer); - changes.SymmetricExceptWith(newData); - _logger.LogInformation("{Count} package IDs have verified status changes.", changes.Count); - - if (changes.Count == 0) - { - return false; - } - else - { - await _verifiedPackagesDataClient.ReplaceLatestAsync(newData, oldResult.Metadata.GetIfMatchCondition()); - return true; - } - } - - private async Task PushIndexChangesAsync() - { - // The "old" data in this case is the download count data that was last indexed by this job (or - // initialized by Db2AzureSearch). - _logger.LogInformation("Fetching old download count data from blob storage."); - var oldResult = await _downloadDataClient.ReadLatestIndexedAsync( - AccessConditionWrapper.GenerateEmptyCondition(), - _stringCache); - - // The "new" data in this case is from the statistics pipeline. - _logger.LogInformation("Fetching new download count data from blob storage."); - var newData = await _auxiliaryFileClient.LoadDownloadDataAsync(); - - _logger.LogInformation("Removing invalid IDs and versions from the old data."); - CleanDownloadData(oldResult.Data); - - _logger.LogInformation("Removing invalid IDs and versions from the new data."); - CleanDownloadData(newData); - - // Fetch the download overrides from the auxiliary file. Note that the overriden downloads are kept - // separate from downloads data as the original data will be persisted to auxiliary data, whereas the - // overriden data will be persisted to Azure Search. - _logger.LogInformation("Overriding download count data."); - var downloadOverrides = await _auxiliaryFileClient.LoadDownloadOverridesAsync(); - var overridenDownloads = newData.ApplyDownloadOverrides(downloadOverrides, _logger); - - _logger.LogInformation("Detecting download count changes."); - var changes = _downloadSetComparer.Compare(oldResult.Data, overridenDownloads); - var idBag = new ConcurrentBag(changes.Keys); - _logger.LogInformation("{Count} package IDs have download count changes.", idBag.Count); - - if (!changes.Any()) - { - return false; - } - - _logger.LogInformation( - "Starting {Count} workers pushing download count changes to Azure Search.", - _options.Value.MaxConcurrentBatches); - await ParallelAsync.Repeat( - () => WorkAsync(idBag, changes), - _options.Value.MaxConcurrentBatches); - _logger.LogInformation("All of the download count changes have been pushed to Azure Search."); - - _logger.LogInformation("Uploading the new download count data to blob storage."); - await _downloadDataClient.ReplaceLatestIndexedAsync(newData, oldResult.Metadata.GetIfMatchCondition()); - return true; - } - - private async Task WorkAsync(ConcurrentBag idBag, SortedDictionary changes) - { - // Perform two batching mechanisms: - // - // 1. Group package IDs into batches so version lists can be fetched in parallel. - // 2. Group index actions so documents can be pushed to Azure Search in batches. - // - // Also, throttle the pushes to Azure Search based on time so that we don't cause too much load. - var idsToIndex = new ConcurrentBag(); - var indexActionsToPush = new ConcurrentBag>(); - var timeSinceLastPush = new Stopwatch(); - - while (idBag.TryTake(out var id)) - { - // FIRST, check if we have a full batch of package IDs to produce index actions for. - // - // If all of the IDs to index and the current ID were to need a document for each search filter and - // that number plus the current index actions to push would make the batch larger than the maximum - // batch size, produce index actions for the IDs that we have collected so far. - if (GetBatchSize(indexActionsToPush) + ((idsToIndex.Count + 1) * MaxDocumentsPerId) > _options.Value.AzureSearchBatchSize) - { - await GenerateIndexActionsAsync(idsToIndex, indexActionsToPush, changes); - } - - // SECOND, check if we have a full batch of index actions to push to Azure Search. - // - // If the current ID were to need a document for each search filter and the current batch size would - // make the batch larger than the maximum batch size, push the index actions we have so far. - if (GetBatchSize(indexActionsToPush) + MaxDocumentsPerId > _options.Value.AzureSearchBatchSize) - { - _logger.LogInformation( - "Starting to push a batch. There are {IdCount} unprocessed IDs left to index and push.", - idBag.Count); - await PushIndexActionsAsync(indexActionsToPush, timeSinceLastPush); - } - - // THIRD, now that the two batching "buffers" have been flushed if necessary, add the current ID to the - // batch of IDs to produce index actions for. - idsToIndex.Add(id); - } - - // Process any leftover IDs that didn't make it into a full batch. - if (idsToIndex.Any()) - { - await GenerateIndexActionsAsync(idsToIndex, indexActionsToPush, changes); - } - - // Push any leftover index actions that didn't make it into a full batch. - if (indexActionsToPush.Any()) - { - await PushIndexActionsAsync(indexActionsToPush, timeSinceLastPush); - } - - Guard.Assert(idsToIndex.IsEmpty, "There should be no more IDs to process."); - Guard.Assert(indexActionsToPush.IsEmpty, "There should be no more index actions to push."); - } - - /// - /// Generate index actions for each provided ID. This reads the version list per package ID so we want to - /// parallel this work by . - /// - private async Task GenerateIndexActionsAsync( - ConcurrentBag idsToIndex, - ConcurrentBag> indexActionsToPush, - SortedDictionary changes) - { - await ParallelAsync.Repeat( - async () => - { - while (idsToIndex.TryTake(out var id)) - { - var indexActions = await _indexActionBuilder.UpdateAsync( - id, - sf => _searchDocumentBuilder.UpdateDownloadCount(id, sf, changes[id])); - - if (indexActions.IsEmpty) - { - continue; - } - - Guard.Assert(indexActions.Hijack.Count == 0, "There should be no hijack index changes."); - - indexActionsToPush.Add(new IdAndValue(id, indexActions)); - } - }, - _options.Value.MaxConcurrentVersionListWriters); - } - - private async Task PushIndexActionsAsync( - ConcurrentBag> indexActionsToPush, - Stopwatch timeSinceLastPush) - { - var elapsed = timeSinceLastPush.Elapsed; - if (timeSinceLastPush.IsRunning && elapsed < _options.Value.MinPushPeriod) - { - var timeUntilNextPush = _options.Value.MinPushPeriod - elapsed; - _logger.LogInformation( - "Waiting for {Duration} before continuing.", - timeUntilNextPush); - await _systemTime.Delay(timeUntilNextPush); - } - - /// Create a new batch pusher just for this operation. Note that we don't use the internal queue of the - /// batch pusher for more than a single batch because we want to control exactly when batches are pushed to - /// Azure Search so that we can observe the - /// configuration property. - var batchPusher = _batchPusherFactory(); - - _logger.LogInformation( - "Pushing a batch of {IdCount} IDs and {DocumentCount} documents.", - indexActionsToPush.Count, - GetBatchSize(indexActionsToPush)); - - while (indexActionsToPush.TryTake(out var indexActions)) - { - batchPusher.EnqueueIndexActions(indexActions.Id, indexActions.Value); - } - - // Note that this method can throw a storage exception if one of the version lists has been modified during - // the execution of this job loop. - await batchPusher.FinishAsync(); - - // Restart the timer AFTER the push is completed to err on the side of caution. - timeSinceLastPush.Restart(); - - return 0; - } - - /// - /// This returns the number of documents that will be pushed to an Azure Search index in a - /// single batch. The caller is responsible that this number does not exceed - /// . If this job were to start pushing changes - /// to more than one index (more than the search index), then the number returned here should be the max of - /// the document counts per index. - /// - private int GetBatchSize(ConcurrentBag> indexActionsToPush) - { - return indexActionsToPush.Sum(x => x.Value.Search.Count); - } - - private void CleanDownloadData(DownloadData data) - { - var invalidIdCount = 0; - var invalidVersionCount = 0; - var nonNormalizedVersionCount = 0; - - foreach (var id in data.Keys.ToList()) - { - var isValidId = id.Length <= PackageIdValidator.MaxPackageIdLength && PackageIdValidator.IsValidPackageId(id); - if (!isValidId) - { - invalidIdCount++; - } - - foreach (var version in data[id].Keys.ToList()) - { - var isValidVersion = NuGetVersion.TryParse(version, out var parsedVersion); - if (!isValidVersion) - { - invalidVersionCount++; - } - - if (!isValidId || !isValidVersion) - { - // Clear the download count if the ID or version is invalid. - data.SetDownloadCount(id, version, 0); - continue; - } - - var normalizedVersion = parsedVersion.ToNormalizedString(); - var isNormalizedVersion = StringComparer.OrdinalIgnoreCase.Equals(version, normalizedVersion); - - if (!isNormalizedVersion) - { - nonNormalizedVersionCount++; - - // Use the normalized version string if the original was not normalized. - var downloads = data.GetDownloadCount(id, version); - data.SetDownloadCount(id, version, 0); - data.SetDownloadCount(id, normalizedVersion, downloads); - } - } - } - - _logger.LogInformation( - "There were {InvalidIdCount} invalid IDs, {InvalidVersionCount} invalid versions, and " + - "{NonNormalizedVersionCount} non-normalized IDs.", - invalidIdCount, - invalidVersionCount, - nonNormalizedVersionCount); - } } } diff --git a/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateDownloadsCommand.cs b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateDownloadsCommand.cs new file mode 100644 index 000000000..52d69c6fb --- /dev/null +++ b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateDownloadsCommand.cs @@ -0,0 +1,337 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using NuGet.Packaging; +using NuGet.Services.AzureSearch.AuxiliaryFiles; +using NuGet.Services.AzureSearch.Wrappers; +using NuGet.Services.Metadata.Catalog.Helpers; +using NuGet.Versioning; +using NuGetGallery; + +namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch +{ + public class UpdateDownloadsCommand : IAzureSearchCommand + { + /// + /// A package ID can result in one document per search filter if the there is a version that applies to each + /// of the filters. The simplest such case is a prerelease, SemVer 1.0.0 package version like 1.0.0-beta. This + /// version applies to all package filters. + /// + private static readonly int MaxDocumentsPerId = Enum.GetValues(typeof(SearchFilters)).Length; + + private readonly IAuxiliaryFileClient _auxiliaryFileClient; + private readonly IDownloadDataClient _downloadDataClient; + private readonly IDownloadSetComparer _downloadSetComparer; + private readonly ISearchDocumentBuilder _searchDocumentBuilder; + private readonly ISearchIndexActionBuilder _indexActionBuilder; + private readonly Func _batchPusherFactory; + private readonly ISystemTime _systemTime; + private readonly IOptionsSnapshot _options; + private readonly IAzureSearchTelemetryService _telemetryService; + private readonly ILogger _logger; + private readonly StringCache _stringCache; + + public UpdateDownloadsCommand( + IAuxiliaryFileClient auxiliaryFileClient, + IDownloadDataClient downloadDataClient, + IDownloadSetComparer downloadSetComparer, + ISearchDocumentBuilder searchDocumentBuilder, + ISearchIndexActionBuilder indexActionBuilder, + Func batchPusherFactory, + ISystemTime systemTime, + IOptionsSnapshot options, + IAzureSearchTelemetryService telemetryService, + ILogger logger) + { + _auxiliaryFileClient = auxiliaryFileClient ?? throw new ArgumentException(nameof(auxiliaryFileClient)); + _downloadDataClient = downloadDataClient ?? throw new ArgumentNullException(nameof(downloadDataClient)); + _downloadSetComparer = downloadSetComparer ?? throw new ArgumentNullException(nameof(downloadSetComparer)); + _searchDocumentBuilder = searchDocumentBuilder ?? throw new ArgumentNullException(nameof(searchDocumentBuilder)); + _indexActionBuilder = indexActionBuilder ?? throw new ArgumentNullException(nameof(indexActionBuilder)); + _batchPusherFactory = batchPusherFactory ?? throw new ArgumentNullException(nameof(batchPusherFactory)); + _systemTime = systemTime ?? throw new ArgumentNullException(nameof(systemTime)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + _telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _stringCache = new StringCache(); + + if (_options.Value.MaxConcurrentBatches <= 0) + { + throw new ArgumentOutOfRangeException( + nameof(options), + $"The {nameof(AzureSearchJobConfiguration.MaxConcurrentBatches)} must be greater than zero."); + } + + if (_options.Value.MaxConcurrentVersionListWriters <= 0) + { + throw new ArgumentOutOfRangeException( + nameof(options), + $"The {nameof(AzureSearchJobConfiguration.MaxConcurrentVersionListWriters)} must be greater than zero."); + } + } + + public async Task ExecuteAsync() + { + var stopwatch = Stopwatch.StartNew(); + var outcome = JobOutcome.Failure; + try + { + outcome = await PushIndexChangesAsync() ? JobOutcome.Success : JobOutcome.NoOp; + } + finally + { + stopwatch.Stop(); + _telemetryService.TrackUpdateDownloadsCompleted(outcome, stopwatch.Elapsed); + } + } + + private async Task PushIndexChangesAsync() + { + // The "old" data in this case is the download count data that was last indexed by this job (or + // initialized by Db2AzureSearch). + _logger.LogInformation("Fetching old download count data from blob storage."); + var oldResult = await _downloadDataClient.ReadLatestIndexedAsync( + AccessConditionWrapper.GenerateEmptyCondition(), + _stringCache); + + // The "new" data in this case is from the statistics pipeline. + _logger.LogInformation("Fetching new download count data from blob storage."); + var newData = await _auxiliaryFileClient.LoadDownloadDataAsync(); + + _logger.LogInformation("Removing invalid IDs and versions from the old data."); + CleanDownloadData(oldResult.Data); + + _logger.LogInformation("Removing invalid IDs and versions from the new data."); + CleanDownloadData(newData); + + // Fetch the download overrides from the auxiliary file. Note that the overriden downloads are kept + // separate from downloads data as the original data will be persisted to auxiliary data, whereas the + // overriden data will be persisted to Azure Search. + _logger.LogInformation("Overriding download count data."); + var downloadOverrides = await _auxiliaryFileClient.LoadDownloadOverridesAsync(); + var overridenDownloads = newData.ApplyDownloadOverrides(downloadOverrides, _logger); + + _logger.LogInformation("Detecting download count changes."); + var changes = _downloadSetComparer.Compare(oldResult.Data, overridenDownloads); + var idBag = new ConcurrentBag(changes.Keys); + _logger.LogInformation("{Count} package IDs have download count changes.", idBag.Count); + + if (!changes.Any()) + { + return false; + } + + _logger.LogInformation( + "Starting {Count} workers pushing download count changes to Azure Search.", + _options.Value.MaxConcurrentBatches); + await ParallelAsync.Repeat( + () => WorkAsync(idBag, changes), + _options.Value.MaxConcurrentBatches); + _logger.LogInformation("All of the download count changes have been pushed to Azure Search."); + + _logger.LogInformation("Uploading the new download count data to blob storage."); + await _downloadDataClient.ReplaceLatestIndexedAsync(newData, oldResult.Metadata.GetIfMatchCondition()); + return true; + } + + private async Task WorkAsync(ConcurrentBag idBag, SortedDictionary changes) + { + // Perform two batching mechanisms: + // + // 1. Group package IDs into batches so version lists can be fetched in parallel. + // 2. Group index actions so documents can be pushed to Azure Search in batches. + // + // Also, throttle the pushes to Azure Search based on time so that we don't cause too much load. + var idsToIndex = new ConcurrentBag(); + var indexActionsToPush = new ConcurrentBag>(); + var timeSinceLastPush = new Stopwatch(); + + while (idBag.TryTake(out var id)) + { + // FIRST, check if we have a full batch of package IDs to produce index actions for. + // + // If all of the IDs to index and the current ID were to need a document for each search filter and + // that number plus the current index actions to push would make the batch larger than the maximum + // batch size, produce index actions for the IDs that we have collected so far. + if (GetBatchSize(indexActionsToPush) + ((idsToIndex.Count + 1) * MaxDocumentsPerId) > _options.Value.AzureSearchBatchSize) + { + await GenerateIndexActionsAsync(idsToIndex, indexActionsToPush, changes); + } + + // SECOND, check if we have a full batch of index actions to push to Azure Search. + // + // If the current ID were to need a document for each search filter and the current batch size would + // make the batch larger than the maximum batch size, push the index actions we have so far. + if (GetBatchSize(indexActionsToPush) + MaxDocumentsPerId > _options.Value.AzureSearchBatchSize) + { + _logger.LogInformation( + "Starting to push a batch. There are {IdCount} unprocessed IDs left to index and push.", + idBag.Count); + await PushIndexActionsAsync(indexActionsToPush, timeSinceLastPush); + } + + // THIRD, now that the two batching "buffers" have been flushed if necessary, add the current ID to the + // batch of IDs to produce index actions for. + idsToIndex.Add(id); + } + + // Process any leftover IDs that didn't make it into a full batch. + if (idsToIndex.Any()) + { + await GenerateIndexActionsAsync(idsToIndex, indexActionsToPush, changes); + } + + // Push any leftover index actions that didn't make it into a full batch. + if (indexActionsToPush.Any()) + { + await PushIndexActionsAsync(indexActionsToPush, timeSinceLastPush); + } + + Guard.Assert(idsToIndex.IsEmpty, "There should be no more IDs to process."); + Guard.Assert(indexActionsToPush.IsEmpty, "There should be no more index actions to push."); + } + + /// + /// Generate index actions for each provided ID. This reads the version list per package ID so we want to + /// parallel this work by . + /// + private async Task GenerateIndexActionsAsync( + ConcurrentBag idsToIndex, + ConcurrentBag> indexActionsToPush, + SortedDictionary changes) + { + await ParallelAsync.Repeat( + async () => + { + while (idsToIndex.TryTake(out var id)) + { + var indexActions = await _indexActionBuilder.UpdateAsync( + id, + sf => _searchDocumentBuilder.UpdateDownloadCount(id, sf, changes[id])); + + if (indexActions.IsEmpty) + { + continue; + } + + Guard.Assert(indexActions.Hijack.Count == 0, "There should be no hijack index changes."); + + indexActionsToPush.Add(new IdAndValue(id, indexActions)); + } + }, + _options.Value.MaxConcurrentVersionListWriters); + } + + private async Task PushIndexActionsAsync( + ConcurrentBag> indexActionsToPush, + Stopwatch timeSinceLastPush) + { + var elapsed = timeSinceLastPush.Elapsed; + if (timeSinceLastPush.IsRunning && elapsed < _options.Value.MinPushPeriod) + { + var timeUntilNextPush = _options.Value.MinPushPeriod - elapsed; + _logger.LogInformation( + "Waiting for {Duration} before continuing.", + timeUntilNextPush); + await _systemTime.Delay(timeUntilNextPush); + } + + /// Create a new batch pusher just for this operation. Note that we don't use the internal queue of the + /// batch pusher for more than a single batch because we want to control exactly when batches are pushed to + /// Azure Search so that we can observe the + /// configuration property. + var batchPusher = _batchPusherFactory(); + + _logger.LogInformation( + "Pushing a batch of {IdCount} IDs and {DocumentCount} documents.", + indexActionsToPush.Count, + GetBatchSize(indexActionsToPush)); + + while (indexActionsToPush.TryTake(out var indexActions)) + { + batchPusher.EnqueueIndexActions(indexActions.Id, indexActions.Value); + } + + // Note that this method can throw a storage exception if one of the version lists has been modified during + // the execution of this job loop. + await batchPusher.FinishAsync(); + + // Restart the timer AFTER the push is completed to err on the side of caution. + timeSinceLastPush.Restart(); + + return 0; + } + + /// + /// This returns the number of documents that will be pushed to an Azure Search index in a + /// single batch. The caller is responsible that this number does not exceed + /// . If this job were to start pushing changes + /// to more than one index (more than the search index), then the number returned here should be the max of + /// the document counts per index. + /// + private int GetBatchSize(ConcurrentBag> indexActionsToPush) + { + return indexActionsToPush.Sum(x => x.Value.Search.Count); + } + + private void CleanDownloadData(DownloadData data) + { + var invalidIdCount = 0; + var invalidVersionCount = 0; + var nonNormalizedVersionCount = 0; + + foreach (var id in data.Keys.ToList()) + { + var isValidId = id.Length <= PackageIdValidator.MaxPackageIdLength && PackageIdValidator.IsValidPackageId(id); + if (!isValidId) + { + invalidIdCount++; + } + + foreach (var version in data[id].Keys.ToList()) + { + var isValidVersion = NuGetVersion.TryParse(version, out var parsedVersion); + if (!isValidVersion) + { + invalidVersionCount++; + } + + if (!isValidId || !isValidVersion) + { + // Clear the download count if the ID or version is invalid. + data.SetDownloadCount(id, version, 0); + continue; + } + + var normalizedVersion = parsedVersion.ToNormalizedString(); + var isNormalizedVersion = StringComparer.OrdinalIgnoreCase.Equals(version, normalizedVersion); + + if (!isNormalizedVersion) + { + nonNormalizedVersionCount++; + + // Use the normalized version string if the original was not normalized. + var downloads = data.GetDownloadCount(id, version); + data.SetDownloadCount(id, version, 0); + data.SetDownloadCount(id, normalizedVersion, downloads); + } + } + } + + _logger.LogInformation( + "There were {InvalidIdCount} invalid IDs, {InvalidVersionCount} invalid versions, and " + + "{NonNormalizedVersionCount} non-normalized IDs.", + invalidIdCount, + invalidVersionCount, + nonNormalizedVersionCount); + } + } +} diff --git a/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommand.cs b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommand.cs new file mode 100644 index 000000000..8d0402b26 --- /dev/null +++ b/src/NuGet.Services.AzureSearch/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommand.cs @@ -0,0 +1,76 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using NuGet.Services.AzureSearch.AuxiliaryFiles; +using NuGetGallery; + +namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch +{ + public class UpdateVerifiedPackagesCommand : IAzureSearchCommand + { + private readonly IDatabaseAuxiliaryDataFetcher _databaseFetcher; + private readonly IVerifiedPackagesDataClient _verifiedPackagesDataClient; + private readonly IAzureSearchTelemetryService _telemetryService; + private readonly ILogger _logger; + private readonly StringCache _stringCache; + + public UpdateVerifiedPackagesCommand( + IDatabaseAuxiliaryDataFetcher databaseFetcher, + IVerifiedPackagesDataClient verifiedPackagesDataClient, + IAzureSearchTelemetryService telemetryService, + ILogger logger) + { + _databaseFetcher = databaseFetcher ?? throw new ArgumentNullException(nameof(databaseFetcher)); + _verifiedPackagesDataClient = verifiedPackagesDataClient ?? throw new ArgumentNullException(nameof(verifiedPackagesDataClient)); + _telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _stringCache = new StringCache(); + } + + public async Task ExecuteAsync() + { + var stopwatch = Stopwatch.StartNew(); + var outcome = JobOutcome.Failure; + try + { + outcome = await UpdateVerifiedPackagesAsync() ? JobOutcome.Success : JobOutcome.NoOp; + } + finally + { + stopwatch.Stop(); + _telemetryService.TrackUpdateVerifiedPackagesCompleted(outcome, stopwatch.Elapsed); + } + } + + private async Task UpdateVerifiedPackagesAsync() + { + // The "old" data in this case is the latest file that was copied to the region's storage container by this + // job (or initialized by Db2AzureSearch). + var oldResult = await _verifiedPackagesDataClient.ReadLatestAsync( + AccessConditionWrapper.GenerateEmptyCondition(), + _stringCache); + + // The "new" data in this case is from the database. + var newData = await _databaseFetcher.GetVerifiedPackagesAsync(); + + var changes = new HashSet(oldResult.Data, oldResult.Data.Comparer); + changes.SymmetricExceptWith(newData); + _logger.LogInformation("{Count} package IDs have verified status changes.", changes.Count); + + if (changes.Count == 0) + { + return false; + } + else + { + await _verifiedPackagesDataClient.ReplaceLatestAsync(newData, oldResult.Metadata.GetIfMatchCondition()); + return true; + } + } + } +} diff --git a/src/NuGet.Services.AzureSearch/AzureSearchTelemetryService.cs b/src/NuGet.Services.AzureSearch/AzureSearchTelemetryService.cs index 623b3632a..60685d87c 100644 --- a/src/NuGet.Services.AzureSearch/AzureSearchTelemetryService.cs +++ b/src/NuGet.Services.AzureSearch/AzureSearchTelemetryService.cs @@ -404,5 +404,27 @@ public void TrackReadLatestVerifiedPackagesFromDatabase(int packageIdCount, Time { "PackageIdCount", packageIdCount.ToString() }, }); } + + public void TrackUpdateVerifiedPackagesCompleted(JobOutcome outcome, TimeSpan elapsed) + { + _telemetryClient.TrackMetric( + Prefix + "UpdateVerifiedPackagesCompletedSeconds", + elapsed.TotalSeconds, + new Dictionary + { + { "Outcome", outcome.ToString() }, + }); + } + + public void TrackUpdateDownloadsCompleted(JobOutcome outcome, TimeSpan elapsed) + { + _telemetryClient.TrackMetric( + Prefix + "UpdateDownloadsCompletedSeconds", + elapsed.TotalSeconds, + new Dictionary + { + { "Outcome", outcome.ToString() }, + }); + } } } diff --git a/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs b/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs index fdc277bc9..caeeabeb4 100644 --- a/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs +++ b/src/NuGet.Services.AzureSearch/DependencyInjectionExtensions.cs @@ -232,7 +232,13 @@ public static IServiceCollection AddAzureSearch( services.AddSingleton(); - services.AddTransient(); + services.AddTransient(); + services.AddTransient(); + services.AddTransient(p => new Auxiliary2AzureSearchCommand( + p.GetRequiredService(), + p.GetRequiredService(), + p.GetRequiredService(), + p.GetRequiredService>())); services.AddTransient(); services.AddTransient(); diff --git a/src/NuGet.Services.AzureSearch/IAzureSearchTelemetryService.cs b/src/NuGet.Services.AzureSearch/IAzureSearchTelemetryService.cs index 0b0707516..7b13c985f 100644 --- a/src/NuGet.Services.AzureSearch/IAzureSearchTelemetryService.cs +++ b/src/NuGet.Services.AzureSearch/IAzureSearchTelemetryService.cs @@ -49,8 +49,10 @@ void TrackDownloadCountDecrease( void TrackV3GetDocument(TimeSpan elapsed); void TrackV2GetDocumentWithSearchIndex(TimeSpan elapsed); void TrackV2GetDocumentWithHijackIndex(TimeSpan elapsed); + void TrackUpdateVerifiedPackagesCompleted(JobOutcome outcome, TimeSpan elapsed); void TrackReadLatestVerifiedPackages(int? packageIdCount, bool notModified, TimeSpan elapsed); IDisposable TrackReplaceLatestVerifiedPackages(int packageIdCount); void TrackAuxiliaryFilesStringCache(int stringCount, long charCount, int requestCount, int hitCount); + void TrackUpdateDownloadsCompleted(JobOutcome outcome, TimeSpan elapsed); } } \ No newline at end of file diff --git a/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj b/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj index 35763d103..5502e8ef0 100644 --- a/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj +++ b/src/NuGet.Services.AzureSearch/NuGet.Services.AzureSearch.csproj @@ -49,6 +49,8 @@ + + diff --git a/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommandFacts.cs b/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateDownloadsCommandFacts.cs similarity index 87% rename from tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommandFacts.cs rename to tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateDownloadsCommandFacts.cs index 03f34c002..6c0f988da 100644 --- a/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/Auxiliary2AzureSearchCommandFacts.cs +++ b/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateDownloadsCommandFacts.cs @@ -4,10 +4,8 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Data.Entity; using System.Linq; using System.Threading.Tasks; -using Castle.Core.Logging; using Microsoft.Azure.Search.Models; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -21,7 +19,7 @@ namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch { - public class Auxiliary2AzureSearchCommandFacts + public class UpdateDownloadsCommandFacts { public class ExecuteAsync : Facts { @@ -29,47 +27,6 @@ public ExecuteAsync(ITestOutputHelper output) : base(output) { } - [Fact] - public async Task PushesAddedVerifiedPackage() - { - NewVerifiedPackagesData.Add("NuGet.Versioning"); - - await Target.ExecuteAsync(); - - VerifyCompletedTelemetry(JobOutcome.Success); - VerifiedPackagesDataClient.Verify( - x => x.ReplaceLatestAsync( - NewVerifiedPackagesData, - It.Is(a => a.IfMatchETag == OldVerifiedPackagesResult.Metadata.ETag)), - Times.Once); - } - - [Fact] - public async Task PushesRemovedVerifiedPackage() - { - OldVerifiedPackagesData.Add("NuGet.Versioning"); - - await Target.ExecuteAsync(); - - VerifyCompletedTelemetry(JobOutcome.Success); - VerifiedPackagesDataClient.Verify( - x => x.ReplaceLatestAsync( - NewVerifiedPackagesData, - It.Is(a => a.IfMatchETag == OldVerifiedPackagesResult.Metadata.ETag)), - Times.Once); - } - - [Fact] - public async Task DoesNotPushUnchangedVerifiedPackages() - { - await Target.ExecuteAsync(); - - VerifyCompletedTelemetry(JobOutcome.NoOp); - VerifiedPackagesDataClient.Verify( - x => x.ReplaceLatestAsync(It.IsAny>(), It.IsAny()), - Times.Never); - } - [Fact] public async Task PushesNothingWhenThereAreNoChanges() { @@ -372,9 +329,7 @@ public abstract class Facts public Facts(ITestOutputHelper output) { AuxiliaryFileClient = new Mock(); - DatabaseAuxiliaryDataFetcher = new Mock(); DownloadDataClient = new Mock(); - VerifiedPackagesDataClient = new Mock(); DownloadSetComparer = new Mock(); SearchDocumentBuilder = new Mock(); IndexActionBuilder = new Mock(); @@ -405,14 +360,6 @@ public Facts(ITestOutputHelper output) .Setup(x => x.LoadDownloadOverridesAsync()) .ReturnsAsync(() => DownloadOverrides); - OldVerifiedPackagesData = new HashSet(); - OldVerifiedPackagesResult = Data.GetAuxiliaryFileResult(OldVerifiedPackagesData, "verified-packages-etag"); - VerifiedPackagesDataClient - .Setup(x => x.ReadLatestAsync(It.IsAny(), It.IsAny())) - .ReturnsAsync(() => OldVerifiedPackagesResult); - NewVerifiedPackagesData = new HashSet(); - DatabaseAuxiliaryDataFetcher.Setup(x => x.GetVerifiedPackagesAsync()).ReturnsAsync(() => NewVerifiedPackagesData); - Changes = new SortedDictionary(); DownloadSetComparer .Setup(x => x.Compare(It.IsAny(), It.IsAny())) @@ -454,11 +401,9 @@ public Facts(ITestOutputHelper output) CurrentBatch = new ConcurrentBag(); }); - Target = new Auxiliary2AzureSearchCommand( + Target = new UpdateDownloadsCommand( AuxiliaryFileClient.Object, - DatabaseAuxiliaryDataFetcher.Object, DownloadDataClient.Object, - VerifiedPackagesDataClient.Object, DownloadSetComparer.Object, SearchDocumentBuilder.Object, IndexActionBuilder.Object, @@ -470,9 +415,7 @@ public Facts(ITestOutputHelper output) } public Mock AuxiliaryFileClient { get; } - public Mock DatabaseAuxiliaryDataFetcher { get; } public Mock DownloadDataClient { get; } - public Mock VerifiedPackagesDataClient { get; } public Mock DownloadSetComparer { get; } public Mock SearchDocumentBuilder { get; } public Mock IndexActionBuilder { get; } @@ -487,23 +430,20 @@ public Facts(ITestOutputHelper output) public DownloadData NewDownloadData { get; } public Dictionary DownloadOverrides { get; } public SortedDictionary Changes { get; } - public Auxiliary2AzureSearchCommand Target { get; } + public UpdateDownloadsCommand Target { get; } public IndexActions IndexActions { get; set; } public ConcurrentBag ProcessedIds { get; } public ConcurrentBag PushedIds { get; } public ConcurrentBag CurrentBatch { get; set; } public ConcurrentBag> FinishedBatches { get; } - public HashSet OldVerifiedPackagesData { get; } - public AuxiliaryFileResult> OldVerifiedPackagesResult { get; } - public HashSet NewVerifiedPackagesData { get; } public void VerifyCompletedTelemetry(JobOutcome outcome) { TelemetryService.Verify( - x => x.TrackAuxiliary2AzureSearchCompleted(It.IsAny(), It.IsAny()), + x => x.TrackUpdateDownloadsCompleted(It.IsAny(), It.IsAny()), Times.Once); TelemetryService.Verify( - x => x.TrackAuxiliary2AzureSearchCompleted(outcome, It.IsAny()), + x => x.TrackUpdateDownloadsCompleted(outcome, It.IsAny()), Times.Once); } diff --git a/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommandFacts.cs b/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommandFacts.cs new file mode 100644 index 000000000..967a542eb --- /dev/null +++ b/tests/NuGet.Services.AzureSearch.Tests/Auxiliary2AzureSearch/UpdateVerifiedPackagesCommandFacts.cs @@ -0,0 +1,111 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Extensions.Options; +using Moq; +using NuGet.Services.AzureSearch.AuxiliaryFiles; +using NuGet.Services.AzureSearch.Support; +using NuGetGallery; +using Xunit; +using Xunit.Abstractions; + +namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch +{ + public class UpdateVerifiedPackagesCommandFacts + { + public class ExecuteAsync : Facts + { + public ExecuteAsync(ITestOutputHelper output) : base(output) + { + } + + [Fact] + public async Task PushesAddedVerifiedPackage() + { + NewVerifiedPackagesData.Add("NuGet.Versioning"); + + await Target.ExecuteAsync(); + + VerifyCompletedTelemetry(JobOutcome.Success); + VerifiedPackagesDataClient.Verify( + x => x.ReplaceLatestAsync( + NewVerifiedPackagesData, + It.Is(a => a.IfMatchETag == OldVerifiedPackagesResult.Metadata.ETag)), + Times.Once); + } + + [Fact] + public async Task PushesRemovedVerifiedPackage() + { + OldVerifiedPackagesData.Add("NuGet.Versioning"); + + await Target.ExecuteAsync(); + + VerifyCompletedTelemetry(JobOutcome.Success); + VerifiedPackagesDataClient.Verify( + x => x.ReplaceLatestAsync( + NewVerifiedPackagesData, + It.Is(a => a.IfMatchETag == OldVerifiedPackagesResult.Metadata.ETag)), + Times.Once); + } + + [Fact] + public async Task DoesNotPushUnchangedVerifiedPackages() + { + await Target.ExecuteAsync(); + + VerifyCompletedTelemetry(JobOutcome.NoOp); + VerifiedPackagesDataClient.Verify( + x => x.ReplaceLatestAsync(It.IsAny>(), It.IsAny()), + Times.Never); + } + } + + public abstract class Facts + { + public Facts(ITestOutputHelper output) + { + DatabaseAuxiliaryDataFetcher = new Mock(); + VerifiedPackagesDataClient = new Mock(); + TelemetryService = new Mock(); + Logger = output.GetLogger(); + + OldVerifiedPackagesData = new HashSet(); + OldVerifiedPackagesResult = Data.GetAuxiliaryFileResult(OldVerifiedPackagesData, "verified-packages-etag"); + VerifiedPackagesDataClient + .Setup(x => x.ReadLatestAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync(() => OldVerifiedPackagesResult); + NewVerifiedPackagesData = new HashSet(); + DatabaseAuxiliaryDataFetcher.Setup(x => x.GetVerifiedPackagesAsync()).ReturnsAsync(() => NewVerifiedPackagesData); + + Target = new UpdateVerifiedPackagesCommand( + DatabaseAuxiliaryDataFetcher.Object, + VerifiedPackagesDataClient.Object, + TelemetryService.Object, + Logger); + } + + public Mock DatabaseAuxiliaryDataFetcher { get; } + public Mock VerifiedPackagesDataClient { get; } + public Mock TelemetryService { get; } + public RecordingLogger Logger { get; } + public UpdateVerifiedPackagesCommand Target { get; } + public HashSet OldVerifiedPackagesData { get; } + public AuxiliaryFileResult> OldVerifiedPackagesResult { get; } + public HashSet NewVerifiedPackagesData { get; } + + public void VerifyCompletedTelemetry(JobOutcome outcome) + { + TelemetryService.Verify( + x => x.TrackUpdateVerifiedPackagesCompleted(It.IsAny(), It.IsAny()), + Times.Once); + TelemetryService.Verify( + x => x.TrackUpdateVerifiedPackagesCompleted(outcome, It.IsAny()), + Times.Once); + } + } + } +} diff --git a/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj b/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj index b9541c48f..1418648eb 100644 --- a/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj +++ b/tests/NuGet.Services.AzureSearch.Tests/NuGet.Services.AzureSearch.Tests.csproj @@ -37,7 +37,8 @@ - + +