Skip to content
This repository has been archived by the owner on Mar 16, 2021. It is now read-only.

Commit

Permalink
Refactor download overrides
Browse files Browse the repository at this point in the history
  • Loading branch information
loic-sharma committed Apr 14, 2020
1 parent cadc42b commit 010687c
Show file tree
Hide file tree
Showing 32 changed files with 1,807 additions and 632 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using Microsoft.Extensions.Logging;

namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch
{
public class DataSetComparer : IDataSetComparer
{
private static readonly string[] EmptyStringArray = new string[0];

private readonly IAzureSearchTelemetryService _telemetryService;
private readonly ILogger<DataSetComparer> _logger;

public DataSetComparer(
IAzureSearchTelemetryService telemetryService,
ILogger<DataSetComparer> logger)
{
_telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public SortedDictionary<string, string[]> CompareOwners(
SortedDictionary<string, SortedSet<string>> oldData,
SortedDictionary<string, SortedSet<string>> newData)
{
// Use ordinal comparison to allow username case changes to flow through.
var stopwatch = Stopwatch.StartNew();
var result = CompareData(
oldData,
newData,
"package ID",
"owners",
StringComparer.Ordinal);

stopwatch.Stop();
_telemetryService.TrackOwnerSetComparison(oldData.Count, newData.Count, result.Count, stopwatch.Elapsed);

return result;
}

public SortedDictionary<string, string[]> ComparePopularityTransfers(
SortedDictionary<string, SortedSet<string>> oldData,
SortedDictionary<string, SortedSet<string>> newData)
{
// Ignore case changes in popularity transfers.
var stopwatch = Stopwatch.StartNew();
var result = CompareData(
oldData,
newData,
"package ID",
"popularity transfers",
StringComparer.OrdinalIgnoreCase);

stopwatch.Stop();
_telemetryService.TrackPopularityTransfersSetComparison(oldData.Count, newData.Count, result.Count, stopwatch.Elapsed);

return result;
}

private SortedDictionary<string, string[]> CompareData(
SortedDictionary<string, SortedSet<string>> oldData,
SortedDictionary<string, SortedSet<string>> newData,
string keyName,
string valuesName,
StringComparer valuesComparer)
{
if (oldData.Comparer != StringComparer.OrdinalIgnoreCase)
{
throw new ArgumentException("The old data should have a case-insensitive comparer.", nameof(oldData));
}

if (newData.Comparer != StringComparer.OrdinalIgnoreCase)
{
throw new ArgumentException("The new data should have a case-insensitive comparer.", nameof(newData));
}

// We use a very simplistic algorithm here. Perform one pass on the new data to find the added or changed
// values. Then perform a second pass on the old data to find removed keys. We can optimize
// this later if necessary.
//
// On the "changed" case, we emit all of the values instead of the delta. This is because Azure Search
// does not have a way to append or remove a specific item from a field that is an array.
// The entire new array needs to be provided.
var result = new SortedDictionary<string, string[]>(StringComparer.OrdinalIgnoreCase);

// First pass: find added or changed sets.
foreach (var pair in newData)
{
var key = pair.Key;
var newValues = pair.Value;
if (!oldData.TryGetValue(key, out var oldValues))
{
// ADDED: The key does not exist in the old data, which means the key was added.
result.Add(key, newValues.ToArray());
_logger.LogInformation(
$"The {keyName} {{Key}} has been added, with {{AddedCount}} {valuesName}.",
key,
newValues.Count);
}
else
{
// The key exists in the old data. We need to check if the values set has changed.
var removedValues = oldValues.Except(newValues, valuesComparer).ToList();
var addedValues = newValues.Except(oldValues, valuesComparer).ToList();

if (removedValues.Any() || addedValues.Any())
{
// CHANGED: The values set has changed.
result.Add(key, newValues.ToArray());
_logger.LogInformation(
$"The {keyName} {{Key}} {valuesName} have changed, with {{RemovedCount}} {valuesName} removed and " +
$"{{AddedCount}} {valuesName} added.",
key,
removedValues.Count,
addedValues.Count);
}
}
}

// Second pass: find removed sets.
foreach (var pair in oldData)
{
var key = pair.Key;
var oldValues = pair.Value;

if (!newData.TryGetValue(key, out var newValues))
{
// REMOVED: The key does not exist in the new data, which means the key was removed.
result.Add(key, EmptyStringArray);
_logger.LogInformation(
$"The {keyName} {{Key}} has been removed, with {{RemovedCount}} {valuesName}",
key,
oldValues.Count);
}
}

return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
namespace NuGet.Services.AzureSearch.Auxiliary2AzureSearch
{
/// <summary>
/// Used to compare two sets of owners to determine the changes.
/// Used to compare two sets of data to determine the changes.
/// </summary>
public interface IOwnerSetComparer
public interface IDataSetComparer
{
/// <summary>
/// Compares two sets of owners to determine the package IDs that have changed. The returned dictionary
Expand All @@ -19,7 +19,19 @@ public interface IOwnerSetComparer
/// </summary>
/// <param name="oldData">The old owner information, typically from storage.</param>
/// <param name="newData">The new owner information, typically from gallery DB.</param>
SortedDictionary<string, string[]> Compare(
SortedDictionary<string, string[]> CompareOwners(
SortedDictionary<string, SortedSet<string>> oldData,
SortedDictionary<string, SortedSet<string>> newData);

/// <summary>
/// Compares two sets of popularity transfers to determine changes. The two inputs are maps of package IDs that transfer
/// popularity away to package IDs that receive the popularity. The returned dictionary is subset of these inputs that
/// were added, removed, or changed. For the "added" and "changed" cases, the popularity transfer set is the new data.
/// For the "removed" case, the set is empty.
/// </summary>
/// <param name="oldData">The old popularity transfers, typically from storage.</param>
/// <param name="newData">The new popularity transfers, typically from gallery DB.</param>
SortedDictionary<string, string[]> ComparePopularityTransfers(
SortedDictionary<string, SortedSet<string>> oldData,
SortedDictionary<string, SortedSet<string>> newData);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class UpdateDownloadsCommand : IAzureSearchCommand
private readonly IAuxiliaryFileClient _auxiliaryFileClient;
private readonly IDownloadDataClient _downloadDataClient;
private readonly IDownloadSetComparer _downloadSetComparer;
private readonly IDownloadTransferrer _downloadTransferrer;
private readonly IPopularityTransferDataClient _popularityTransferDataClient;
private readonly ISearchDocumentBuilder _searchDocumentBuilder;
private readonly ISearchIndexActionBuilder _indexActionBuilder;
private readonly Func<IBatchPusher> _batchPusherFactory;
Expand All @@ -43,6 +45,8 @@ public UpdateDownloadsCommand(
IAuxiliaryFileClient auxiliaryFileClient,
IDownloadDataClient downloadDataClient,
IDownloadSetComparer downloadSetComparer,
IDownloadTransferrer downloadTransferrer,
IPopularityTransferDataClient popularityTransferDataClient,
ISearchDocumentBuilder searchDocumentBuilder,
ISearchIndexActionBuilder indexActionBuilder,
Func<IBatchPusher> batchPusherFactory,
Expand All @@ -54,6 +58,8 @@ public UpdateDownloadsCommand(
_auxiliaryFileClient = auxiliaryFileClient ?? throw new ArgumentException(nameof(auxiliaryFileClient));
_downloadDataClient = downloadDataClient ?? throw new ArgumentNullException(nameof(downloadDataClient));
_downloadSetComparer = downloadSetComparer ?? throw new ArgumentNullException(nameof(downloadSetComparer));
_downloadTransferrer = downloadTransferrer ?? throw new ArgumentNullException(nameof(downloadTransferrer));
_popularityTransferDataClient = popularityTransferDataClient ?? throw new ArgumentNullException(nameof(popularityTransferDataClient));
_searchDocumentBuilder = searchDocumentBuilder ?? throw new ArgumentNullException(nameof(searchDocumentBuilder));
_indexActionBuilder = indexActionBuilder ?? throw new ArgumentNullException(nameof(indexActionBuilder));
_batchPusherFactory = batchPusherFactory ?? throw new ArgumentNullException(nameof(batchPusherFactory));
Expand Down Expand Up @@ -106,23 +112,26 @@ private async Task<bool> PushIndexChangesAsync()
_logger.LogInformation("Fetching new download count data from blob storage.");
var newData = await _auxiliaryFileClient.LoadDownloadDataAsync();

_logger.LogInformation("Removing invalid IDs and versions from the old data.");
_logger.LogInformation("Removing invalid IDs and versions from the old downloads data.");
CleanDownloadData(oldResult.Data);

_logger.LogInformation("Removing invalid IDs and versions from the new data.");
_logger.LogInformation("Removing invalid IDs and versions from the new downloads data.");
CleanDownloadData(newData);

// Fetch the download overrides from the auxiliary file. Note that the overriden downloads are kept
// separate from downloads data as the original data will be persisted to auxiliary data, whereas the
// overriden data will be persisted to Azure Search.
_logger.LogInformation("Overriding download count data.");
var downloadOverrides = await _auxiliaryFileClient.LoadDownloadOverridesAsync();
var overridenDownloads = newData.ApplyDownloadOverrides(downloadOverrides, _logger);

_logger.LogInformation("Detecting download count changes.");
var changes = _downloadSetComparer.Compare(oldResult.Data, overridenDownloads);
var changes = _downloadSetComparer.Compare(oldResult.Data, newData);
_logger.LogInformation("{Count} package IDs have download count changes.", changes.Count);

// The "old" data in this case is the popularity transfers data that was last indexed by this job (or
// initialized by Db2AzureSearch).
_logger.LogInformation("Fetching old popularity transfer data from blob storage.");
var oldTransfers = await _popularityTransferDataClient.ReadLatestIndexedAsync();

_logger.LogInformation("Applying popularity transfers to download changes.");
var transferResult = await ApplyPopularityTransferChangesAsync(newData, changes, oldTransfers.Result);

var idBag = new ConcurrentBag<string>(changes.Keys);
_logger.LogInformation("{Count} package IDs have download count changes.", idBag.Count);
_logger.LogInformation("{Count} package IDs need to be updated.", idBag.Count);

if (!changes.Any())
{
Expand All @@ -139,9 +148,34 @@ await ParallelAsync.Repeat(

_logger.LogInformation("Uploading the new download count data to blob storage.");
await _downloadDataClient.ReplaceLatestIndexedAsync(newData, oldResult.Metadata.GetIfMatchCondition());

_logger.LogInformation("Uploading the new popularity transfer data to blob storage.");
await _popularityTransferDataClient.ReplaceLatestIndexedAsync(
transferResult.LatestPopularityTransfers,
oldTransfers.AccessCondition);
return true;
}

private async Task<DownloadTransferResult> ApplyPopularityTransferChangesAsync(
DownloadData newData,
SortedDictionary<string, long> downloadChanges,
SortedDictionary<string, SortedSet<string>> oldTransfers)
{
_logger.LogInformation("Finding download changes from popularity transfers and download overrides.");
var transferResult = await _downloadTransferrer.GetTransferChangesAsync(newData, downloadChanges, oldTransfers);
_logger.LogInformation(
"{Count} package IDs have download count changes from popularity transfers and download overrides.",
transferResult.DownloadChanges.Count);

// Apply the transfer changes to the overall download changes.
foreach (var transferChange in transferResult.DownloadChanges)
{
downloadChanges[transferChange.Key] = transferChange.Value;
}

return transferResult;
}

private async Task WorkAsync(ConcurrentBag<string> idBag, SortedDictionary<string, long> changes)
{
// Perform two batching mechanisms:
Expand Down
Loading

0 comments on commit 010687c

Please sign in to comment.