Skip to content
This repository has been archived by the owner on Mar 16, 2021. It is now read-only.

[Package Renames 1] Add popularity transfers data client #765

Merged
merged 2 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Threading.Tasks;
using NuGetGallery;

namespace NuGet.Services.AzureSearch.AuxiliaryFiles
{
/// <summary>
/// The purpose of this interface is allow reading and writing populairty transfer information from storage.
/// The Auxiliary2AzureSearch job does a comparison of latest popularity transfer data from the database with
/// a snapshot of information stored in Azure Blob Storage. This interface handles the reading and writing of
/// that snapshot from storage.
/// </summary>
public interface IPopularityTransferDataClient
{
/// <summary>
/// Read all of the latest indexed popularity transfers from storage. Also, return the current etag to allow
/// optimistic concurrency checks on the writing of the file. The returned dictionary's key is the
/// package ID that is transferring away its popularity, and the values are the package IDs receiving popularity.
/// The dictionary and the sets are case-insensitive.
/// </summary>
Task<ResultAndAccessCondition<SortedDictionary<string, SortedSet<string>>>> ReadLatestIndexedAsync();

/// <summary>
/// Replace the existing latest indexed popularity transfers file (i.e. "popularityTransfers.v1.json" file).
/// </summary>
/// <param name="newData">The new data to be serialized into storage.</param>
/// <param name="accessCondition">The access condition (i.e. etag) to use during the upload.</param>
Task ReplaceLatestIndexedAsync(
SortedDictionary<string, SortedSet<string>> newData,
IAccessCondition accessCondition);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;
using NuGetGallery;

namespace NuGet.Services.AzureSearch.AuxiliaryFiles
{
public class PopularityTransferDataClient : IPopularityTransferDataClient
{
private static readonly JsonSerializer Serializer = new JsonSerializer();

private readonly ICloudBlobClient _cloudBlobClient;
private readonly IOptionsSnapshot<AzureSearchConfiguration> _options;
private readonly IAzureSearchTelemetryService _telemetryService;
private readonly ILogger<PopularityTransferDataClient> _logger;
private readonly Lazy<ICloudBlobContainer> _lazyContainer;

public PopularityTransferDataClient(
ICloudBlobClient cloudBlobClient,
IOptionsSnapshot<AzureSearchConfiguration> options,
IAzureSearchTelemetryService telemetryService,
ILogger<PopularityTransferDataClient> logger)
{
_cloudBlobClient = cloudBlobClient ?? throw new ArgumentNullException(nameof(cloudBlobClient));
_options = options ?? throw new ArgumentNullException(nameof(options));
_telemetryService = telemetryService ?? throw new ArgumentNullException(nameof(telemetryService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));

_lazyContainer = new Lazy<ICloudBlobContainer>(
() => _cloudBlobClient.GetContainerReference(_options.Value.StorageContainer));
}

private ICloudBlobContainer Container => _lazyContainer.Value;

public async Task<ResultAndAccessCondition<SortedDictionary<string, SortedSet<string>>>> ReadLatestIndexedAsync()
{
var stopwatch = Stopwatch.StartNew();
var blobName = GetLatestIndexedBlobName();
var blobReference = Container.GetBlobReference(blobName);

_logger.LogInformation("Reading the latest indexed popularity transfers from {BlobName}.", blobName);

var builder = new PackageIdToPopularityTransfersBuilder(_logger);
IAccessCondition accessCondition;
try
{
using (var stream = await blobReference.OpenReadAsync(AccessCondition.GenerateEmptyCondition()))
{
accessCondition = AccessConditionWrapper.GenerateIfMatchCondition(blobReference.ETag);
ReadStream(stream, builder.Add);
}
}
catch (StorageException ex) when (ex.RequestInformation.HttpStatusCode == (int)HttpStatusCode.NotFound)
{
accessCondition = AccessConditionWrapper.GenerateIfNotExistsCondition();
_logger.LogInformation("The blob {BlobName} does not exist.", blobName);
}

var output = new ResultAndAccessCondition<SortedDictionary<string, SortedSet<string>>>(
builder.GetResult(),
accessCondition);

stopwatch.Stop();
_telemetryService.TrackReadLatestIndexedPopularityTransfers(output.Result.Count, stopwatch.Elapsed);

return output;
}

public async Task ReplaceLatestIndexedAsync(
SortedDictionary<string, SortedSet<string>> newData,
IAccessCondition accessCondition)
{
using (_telemetryService.TrackReplaceLatestIndexedPopularityTransfers(newData.Count))
{
var blobName = GetLatestIndexedBlobName();
_logger.LogInformation("Replacing the latest indexed popularity transfers from {BlobName}.", blobName);

var mappedAccessCondition = new AccessCondition
{
IfNoneMatchETag = accessCondition.IfNoneMatchETag,
IfMatchETag = accessCondition.IfMatchETag,
};

var blobReference = Container.GetBlobReference(blobName);

using (var stream = await blobReference.OpenWriteAsync(mappedAccessCondition))
using (var streamWriter = new StreamWriter(stream))
using (var jsonTextWriter = new JsonTextWriter(streamWriter))
{
blobReference.Properties.ContentType = "application/json";
Serializer.Serialize(jsonTextWriter, newData);
}
}
}

private static void ReadStream(Stream stream, Action<string, IReadOnlyList<string>> add)
{
using (var textReader = new StreamReader(stream))
using (var jsonReader = new JsonTextReader(textReader))
{
Guard.Assert(jsonReader.Read(), "The blob should be readable.");
Guard.Assert(jsonReader.TokenType == JsonToken.StartObject, "The first token should be the start of an object.");
Guard.Assert(jsonReader.Read(), "There should be a second token.");
while (jsonReader.TokenType == JsonToken.PropertyName)
{
var id = (string)jsonReader.Value;

Guard.Assert(jsonReader.Read(), "There should be a token after the property name.");
Guard.Assert(jsonReader.TokenType == JsonToken.StartArray, "The token after the property name should be the start of an object.");

var transfers = Serializer.Deserialize<List<string>>(jsonReader);
add(id, transfers);

Guard.Assert(jsonReader.TokenType == JsonToken.EndArray, "The token after reading the array should be the end of an array.");
Guard.Assert(jsonReader.Read(), "There should be a token after the end of the array.");
}

Guard.Assert(jsonReader.TokenType == JsonToken.EndObject, "The last token should be the end of an object.");
Guard.Assert(!jsonReader.Read(), "There should be no token after the end of the object.");
}
}

private string GetLatestIndexedBlobName()
{
return $"{_options.Value.NormalizeStoragePath()}popularity-transfers/popularity-transfers.v1.json";
}
}
}

21 changes: 21 additions & 0 deletions src/NuGet.Services.AzureSearch/AzureSearchTelemetryService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,27 @@ public void TrackOwnerSetComparison(int oldCount, int newCount, int changeCount,
});
}

public void TrackReadLatestIndexedPopularityTransfers(int outgoingTransfers, TimeSpan elapsed)
{
_telemetryClient.TrackMetric(
Prefix + "ReadLatestIndexedPopularityTransfersSeconds",
elapsed.TotalSeconds,
new Dictionary<string, string>
{
{ "OutgoingTransfers", outgoingTransfers.ToString() }
});
}

public IDisposable TrackReplaceLatestIndexedPopularityTransfers(int outogingTransfers)
{
return _telemetryClient.TrackDuration(
Prefix + "ReplaceLatestIndexedPopularityTransfers",
new Dictionary<string, string>
{
{ "OutgoingTransfers", outogingTransfers.ToString() }
});
}

public IDisposable TrackCatalog2AzureSearchProcessBatch(int catalogLeafCount, int latestCatalogLeafCount, int packageIdCount)
{
return _telemetryClient.TrackDuration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ public interface IAzureSearchTelemetryService
void TrackUpdateOwnersCompleted(JobOutcome outcome, TimeSpan elapsed);
void TrackOwnerSetComparison(int oldCount, int newCount, int changeCount, TimeSpan elapsed);
void TrackReadLatestIndexedOwners(int packageIdCount, TimeSpan elapsed);
void TrackReadLatestIndexedPopularityTransfers(int outgoingTransfers, TimeSpan elapsed);
void TrackReadLatestOwnersFromDatabase(int packageIdCount, TimeSpan elapsed);
void TrackReadLatestVerifiedPackagesFromDatabase(int packageIdCount, TimeSpan elapsed);
IDisposable TrackReplaceLatestIndexedOwners(int packageIdCount);
IDisposable TrackUploadOwnerChangeHistory(int packageIdCount);
IDisposable TrackReplaceLatestIndexedPopularityTransfers(int outgoingTransfers);
IDisposable TrackVersionListsUpdated(int versionListCount, int workerCount);
IDisposable TrackCatalog2AzureSearchProcessBatch(int catalogLeafCount, int latestCatalogLeafCount, int packageIdCount);
void TrackV2SearchQueryWithSearchIndex(TimeSpan elapsed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
<Compile Include="AuxiliaryFiles\DownloadDataExtensions.cs" />
<Compile Include="AuxiliaryFiles\DownloadOverrides.cs" />
<Compile Include="AuxiliaryFiles\DownloadsV1Reader.cs" />
<Compile Include="AuxiliaryFiles\IPopularityTransferDataClient.cs" />
<Compile Include="AuxiliaryFiles\JsonStringArrayFileParser.cs" />
<Compile Include="AuxiliaryFiles\PopularityTransferDataClient.cs" />
<Compile Include="AuxiliaryFiles\SimpleCloudBlobExtensions.cs" />
<Compile Include="AuxiliaryFiles\DownloadByVersionData.cs" />
<Compile Include="AuxiliaryFiles\DownloadData.cs" />
Expand Down Expand Up @@ -88,6 +90,7 @@
<Compile Include="Models\IUpdatedDocument.cs" />
<Compile Include="Models\UpdatedDocument.cs" />
<Compile Include="Auxiliary2AzureSearch\IOwnerSetComparer.cs" />
<Compile Include="PackageIdToPopularityTransfersBuilder.cs" />
<Compile Include="SearchIndexActionBuilder.cs" />
<Compile Include="Auxiliary2AzureSearch\OwnerSetComparer.cs" />
<Compile Include="PackageIdToOwnersBuilder.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;

namespace NuGet.Services.AzureSearch
{
public class PackageIdToPopularityTransfersBuilder
{
private readonly ILogger _logger;
private int _addCount;
private readonly Dictionary<string, string> _idInternPool;
private readonly SortedDictionary<string, SortedSet<string>> _result;

public PackageIdToPopularityTransfersBuilder(ILogger logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_addCount = 0;
_idInternPool = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
_result = new SortedDictionary<string, SortedSet<string>>(StringComparer.OrdinalIgnoreCase);
}

/// <summary>
/// Add multiple popularity transfers.
/// </summary>
/// <param name="fromId">The package that is transferring its popularity away.</param>
/// <param name="toIds">The packages that are receiving the transferred popularity.</param>
public void Add(string fromId, IReadOnlyList<string> toIds)
{
foreach (var toId in toIds)
{
Add(fromId, toId);
}
}

/// <summary>
/// Add a popularity transfers
/// </summary>
/// <param name="fromId">The package that is transferring its popularity away.</param>
/// <param name="toId">The package that is receiving the transferred popularity.</param>
public void Add(string fromId, string toId)
{
_addCount++;
if (_addCount % 10000 == 0)
{
_logger.LogInformation("{AddCount} popularity transfers have been added so far.", _addCount);
}

// Use a single instance of each "toId" string.
toId = InternId(toId);

if (!_result.TryGetValue(fromId, out var toIds))
{
toIds = new SortedSet<string>(StringComparer.OrdinalIgnoreCase);
fromId = InternId(fromId);

_result.Add(fromId, toIds);
}

toIds.Add(toId);
}

/// <summary>
/// Get the popularity transfers.
/// </summary>
/// <returns>A map of packages transferring popularity away to the packages receiving the popularity.</returns>
public SortedDictionary<string, SortedSet<string>> GetResult()
{
_logger.LogInformation("{RecordCount} popularity transfers were found.", _addCount);
_logger.LogInformation("{FromTransfers} packages transfer popularity away.", _result.Count);
_logger.LogInformation("{UniqueIds} unique package IDs.", _idInternPool.Count);

return _result;
}

private string InternId(string id)
{
if (_idInternPool.TryGetValue(id, out var existingId))
{
return existingId;
}

_idInternPool.Add(id, id);
return id;
}
}
}
Loading