Skip to content
This repository has been archived by the owner on Mar 16, 2021. It is now read-only.

Commit

Permalink
Refactor download overrides
Browse files Browse the repository at this point in the history
  • Loading branch information
loic-sharma committed Apr 14, 2020
1 parent 18ac316 commit 6bbc513
Show file tree
Hide file tree
Showing 14 changed files with 378 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class UpdateDownloadsCommand : IAzureSearchCommand
private readonly IAuxiliaryFileClient _auxiliaryFileClient;
private readonly IDownloadDataClient _downloadDataClient;
private readonly IDownloadSetComparer _downloadSetComparer;
private readonly IDownloadTransferrer _downloadTransferrer;
private readonly IPopularityTransferDataClient _popularityTransferDataClient;
private readonly ISearchDocumentBuilder _searchDocumentBuilder;
private readonly ISearchIndexActionBuilder _indexActionBuilder;
private readonly Func<IBatchPusher> _batchPusherFactory;
Expand All @@ -43,6 +45,8 @@ public UpdateDownloadsCommand(
IAuxiliaryFileClient auxiliaryFileClient,
IDownloadDataClient downloadDataClient,
IDownloadSetComparer downloadSetComparer,
IDownloadTransferrer downloadTransferrer,
IPopularityTransferDataClient popularityTransferDataClient,
ISearchDocumentBuilder searchDocumentBuilder,
ISearchIndexActionBuilder indexActionBuilder,
Func<IBatchPusher> batchPusherFactory,
Expand All @@ -54,6 +58,8 @@ public UpdateDownloadsCommand(
_auxiliaryFileClient = auxiliaryFileClient ?? throw new ArgumentException(nameof(auxiliaryFileClient));
_downloadDataClient = downloadDataClient ?? throw new ArgumentNullException(nameof(downloadDataClient));
_downloadSetComparer = downloadSetComparer ?? throw new ArgumentNullException(nameof(downloadSetComparer));
_downloadTransferrer = downloadTransferrer ?? throw new ArgumentNullException(nameof(downloadTransferrer));
_popularityTransferDataClient = popularityTransferDataClient ?? throw new ArgumentNullException(nameof(popularityTransferDataClient));
_searchDocumentBuilder = searchDocumentBuilder ?? throw new ArgumentNullException(nameof(searchDocumentBuilder));
_indexActionBuilder = indexActionBuilder ?? throw new ArgumentNullException(nameof(indexActionBuilder));
_batchPusherFactory = batchPusherFactory ?? throw new ArgumentNullException(nameof(batchPusherFactory));
Expand Down Expand Up @@ -106,23 +112,26 @@ private async Task<bool> PushIndexChangesAsync()
_logger.LogInformation("Fetching new download count data from blob storage.");
var newData = await _auxiliaryFileClient.LoadDownloadDataAsync();

_logger.LogInformation("Removing invalid IDs and versions from the old data.");
_logger.LogInformation("Removing invalid IDs and versions from the old downloads data.");
CleanDownloadData(oldResult.Data);

_logger.LogInformation("Removing invalid IDs and versions from the new data.");
_logger.LogInformation("Removing invalid IDs and versions from the new downloads data.");
CleanDownloadData(newData);

// Fetch the download overrides from the auxiliary file. Note that the overriden downloads are kept
// separate from downloads data as the original data will be persisted to auxiliary data, whereas the
// overriden data will be persisted to Azure Search.
_logger.LogInformation("Overriding download count data.");
var downloadOverrides = await _auxiliaryFileClient.LoadDownloadOverridesAsync();
var overridenDownloads = newData.ApplyDownloadOverrides(downloadOverrides, _logger);

_logger.LogInformation("Detecting download count changes.");
var changes = _downloadSetComparer.Compare(oldResult.Data, overridenDownloads);
var changes = _downloadSetComparer.Compare(oldResult.Data, newData);
_logger.LogInformation("{Count} package IDs have download count changes.", changes.Count);

// The "old" data in this case is the popularity transfers data that was last indexed by this job (or
// initialized by Db2AzureSearch).
_logger.LogInformation("Fetching old popularity transfer data from blob storage.");
var oldTransfers = await _popularityTransferDataClient.ReadLatestIndexedAsync();

_logger.LogInformation("Applying popularity transfers to download changes.");
var transferResult = await ApplyPopularityTransferChangesAsync(newData, changes, oldTransfers.Result);

var idBag = new ConcurrentBag<string>(changes.Keys);
_logger.LogInformation("{Count} package IDs have download count changes.", idBag.Count);
_logger.LogInformation("{Count} package IDs need to be updated.", idBag.Count);

if (!changes.Any())
{
Expand All @@ -139,9 +148,34 @@ await ParallelAsync.Repeat(

_logger.LogInformation("Uploading the new download count data to blob storage.");
await _downloadDataClient.ReplaceLatestIndexedAsync(newData, oldResult.Metadata.GetIfMatchCondition());

_logger.LogInformation("Uploading the new popularity transfer data to blob storage.");
await _popularityTransferDataClient.ReplaceLatestIndexedAsync(
transferResult.LatestPopularityTransfers,
oldTransfers.AccessCondition);
return true;
}

private async Task<DownloadTransferResult> ApplyPopularityTransferChangesAsync(
DownloadData newData,
SortedDictionary<string, long> downloadChanges,
SortedDictionary<string, SortedSet<string>> oldTransfers)
{
_logger.LogInformation("Finding download changes from popularity transfers and download overrides.");
var transferResult = await _downloadTransferrer.GetTransferChangesAsync(newData, downloadChanges, oldTransfers);
_logger.LogInformation(
"{Count} package IDs have download count changes from popularity transfers and download overrides.",
transferResult.DownloadChanges.Count);

// Apply the transfer changes to the overall download changes.
foreach (var transferChange in transferResult.DownloadChanges)
{
downloadChanges[transferChange.Key] = transferChange.Value;
}

return transferResult;
}

private async Task WorkAsync(ConcurrentBag<string> idBag, SortedDictionary<string, long> changes)
{
// Perform two batching mechanisms:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class Db2AzureSearchCommand : IAzureSearchCommand
private readonly IOwnerDataClient _ownerDataClient;
private readonly IDownloadDataClient _downloadDataClient;
private readonly IVerifiedPackagesDataClient _verifiedPackagesDataClient;
private readonly IPopularityTransferDataClient _popularityTransferDataClient;
private readonly IOptionsSnapshot<Db2AzureSearchConfiguration> _options;
private readonly IOptionsSnapshot<Db2AzureSearchDevelopmentConfiguration> _developmentOptions;
private readonly ILogger<Db2AzureSearchCommand> _logger;
Expand All @@ -45,6 +46,7 @@ public Db2AzureSearchCommand(
IOwnerDataClient ownerDataClient,
IDownloadDataClient downloadDataClient,
IVerifiedPackagesDataClient verifiedPackagesDataClient,
IPopularityTransferDataClient popularityTransferDataClient,
IOptionsSnapshot<Db2AzureSearchConfiguration> options,
IOptionsSnapshot<Db2AzureSearchDevelopmentConfiguration> developmentOptions,
ILogger<Db2AzureSearchCommand> logger)
Expand All @@ -59,6 +61,7 @@ public Db2AzureSearchCommand(
_ownerDataClient = ownerDataClient ?? throw new ArgumentNullException(nameof(ownerDataClient));
_downloadDataClient = downloadDataClient ?? throw new ArgumentNullException(nameof(downloadDataClient));
_verifiedPackagesDataClient = verifiedPackagesDataClient ?? throw new ArgumentNullException(nameof(verifiedPackagesDataClient));
_popularityTransferDataClient = popularityTransferDataClient ?? throw new ArgumentNullException(nameof(popularityTransferDataClient));
_options = options ?? throw new ArgumentNullException(nameof(options));
_developmentOptions = developmentOptions ?? throw new ArgumentNullException(nameof(developmentOptions));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Expand Down Expand Up @@ -114,6 +117,9 @@ private async Task ExecuteAsync(CancellationToken token)
// Write the verified packages data file.
await WriteVerifiedPackagesDataAsync(initialAuxiliaryData.VerifiedPackages);

// Write popularity transfers data file.
await WritePopularityTransfersDataAsync(initialAuxiliaryData.PopularityTransfers);

// Write the cursor.
_logger.LogInformation("Writing the initial cursor value to be {CursorValue:O}.", initialCursorValue);
var frontCursorStorage = _storageFactory.Create();
Expand Down Expand Up @@ -195,6 +201,15 @@ await _verifiedPackagesDataClient.ReplaceLatestAsync(
_logger.LogInformation("Done uploading the initial verified packages data file.");
}

private async Task WritePopularityTransfersDataAsync(SortedDictionary<string, SortedSet<string>> popularityTransfers)
{
_logger.LogInformation("Writing the initial popularity transfers data file.");
await _popularityTransferDataClient.ReplaceLatestIndexedAsync(
popularityTransfers,
AccessConditionWrapper.GenerateIfNotExistsCondition());
_logger.LogInformation("Done uploading the initial popularity transfers data file.");
}

private async Task<InitialAuxiliaryData> ProduceWorkAsync(
ConcurrentBag<NewPackageRegistration> allWork,
CancellationTokenSource produceWorkCts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@ public InitialAuxiliaryData(
SortedDictionary<string, SortedSet<string>> owners,
DownloadData downloads,
HashSet<string> excludedPackages,
HashSet<string> verifiedPackages)
HashSet<string> verifiedPackages,
SortedDictionary<string, SortedSet<string>> popularityTransfers)
{
Owners = owners ?? throw new ArgumentNullException(nameof(owners));
Downloads = downloads ?? throw new ArgumentNullException(nameof(downloads));
ExcludedPackages = excludedPackages ?? throw new ArgumentNullException(nameof(excludedPackages));
VerifiedPackages = verifiedPackages ?? throw new ArgumentNullException(nameof(verifiedPackages));
PopularityTransfers = popularityTransfers ?? throw new ArgumentNullException(nameof(popularityTransfers));
}

public SortedDictionary<string, SortedSet<string>> Owners { get; }
public DownloadData Downloads { get; }
public HashSet<string> ExcludedPackages { get; }
public HashSet<string> VerifiedPackages { get; }
public SortedDictionary<string, SortedSet<string>> PopularityTransfers { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,24 @@ public class NewPackageRegistrationProducer : INewPackageRegistrationProducer
{
private readonly IEntitiesContextFactory _contextFactory;
private readonly IAuxiliaryFileClient _auxiliaryFileClient;
private readonly IDownloadTransferrer _downloadTransferrer;
private readonly IOptionsSnapshot<Db2AzureSearchConfiguration> _options;
private readonly IOptionsSnapshot<Db2AzureSearchDevelopmentConfiguration> _developmentOptions;
private readonly ILogger<NewPackageRegistrationProducer> _logger;

public NewPackageRegistrationProducer(
IEntitiesContextFactory contextFactory,
IAuxiliaryFileClient auxiliaryFileClient,
IDownloadTransferrer downloadTransferrer,
IOptionsSnapshot<Db2AzureSearchConfiguration> options,
IOptionsSnapshot<Db2AzureSearchDevelopmentConfiguration> developmentOptions,
ILogger<NewPackageRegistrationProducer> logger)
{
_contextFactory = contextFactory ?? throw new ArgumentNullException(nameof(contextFactory));
_auxiliaryFileClient = auxiliaryFileClient ?? throw new ArgumentNullException(nameof(auxiliaryFileClient));
_downloadTransferrer = downloadTransferrer ?? throw new ArgumentNullException(nameof(downloadTransferrer));
_options = options ?? throw new ArgumentNullException(nameof(options));
_developmentOptions = developmentOptions ?? throw new ArgumentNullException(nameof(developmentOptions));
_auxiliaryFileClient = auxiliaryFileClient ?? throw new ArgumentNullException(nameof(auxiliaryFileClient));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

Expand All @@ -54,15 +57,13 @@ public async Task<InitialAuxiliaryData> ProduceWorkAsync(
$"Excluded packages HashSet should be using {nameof(StringComparer.OrdinalIgnoreCase)}");

// Fetch the download data from the auxiliary file, since this is what is used for displaying download
// counts in the search service. The gallery DB and the downloads data file have different download count
// numbers we don't use the gallery DB values.
// counts in the search service. We don't use the gallery DB values as they are different from the
// auxiliary file.
var downloads = await _auxiliaryFileClient.LoadDownloadDataAsync();

// Fetch the download overrides from the auxiliary file. Note that the overriden downloads are kept
// separate from downloads data as the original data will be persisted to auxiliary data, whereas the
// overriden data will be persisted to Azure Search.
var downloadOverrides = await _auxiliaryFileClient.LoadDownloadOverridesAsync();
var overridenDownloads = downloads.ApplyDownloadOverrides(downloadOverrides, _logger);
// Apply changes from popularity transfers.
var transferResult = await _downloadTransferrer.GetTransferChangesAsync(downloads);
var packageToDownloads = GetPackageToDownloads(downloads, transferResult.DownloadChanges);

// Build a list of the owners data and verified IDs as we collect package registrations from the database.
var ownersBuilder = new PackageIdToOwnersBuilder(_logger);
Expand Down Expand Up @@ -91,6 +92,11 @@ public async Task<InitialAuxiliaryData> ProduceWorkAsync(

foreach (var pr in packageRegistrationInfo)
{
if (!packageToDownloads.TryGetValue(pr.Id, out var packageDownloads))
{
packageDownloads = 0;
}

if (!keyToPackages.TryGetValue(pr.Key, out var packages))
{
packages = new List<Package>();
Expand All @@ -100,7 +106,7 @@ public async Task<InitialAuxiliaryData> ProduceWorkAsync(

allWork.Add(new NewPackageRegistration(
pr.Id,
overridenDownloads.GetDownloadCount(pr.Id),
packageDownloads,
pr.Owners,
packages,
isExcludedByDefault));
Expand All @@ -120,7 +126,8 @@ public async Task<InitialAuxiliaryData> ProduceWorkAsync(
ownersBuilder.GetResult(),
downloads,
excludedPackages,
verifiedPackages);
verifiedPackages,
transferResult.LatestPopularityTransfers);
}

private bool ShouldWait(ConcurrentBag<NewPackageRegistration> allWork, bool log)
Expand All @@ -145,6 +152,25 @@ private bool ShouldWait(ConcurrentBag<NewPackageRegistration> allWork, bool log)
return false;
}

private Dictionary<string, long> GetPackageToDownloads(
DownloadData downloads,
Dictionary<string, long> downloadChanges)
{
var result = new Dictionary<string, long>(StringComparer.OrdinalIgnoreCase);

foreach (var packageDownload in downloads)
{
result[packageDownload.Key] = packageDownload.Value.Total;
}

foreach (var downloadChange in downloadChanges)
{
result[downloadChange.Key] = downloadChange.Value;
}

return result;
}

private async Task<IReadOnlyList<Package>> GetPackagesAsync(PackageRegistrationRange range)
{
using (var context = await CreateContextAsync())
Expand Down
Loading

0 comments on commit 6bbc513

Please sign in to comment.