Skip to content
This repository has been archived by the owner on Aug 3, 2024. It is now read-only.
/ ServerCommon Public archive

Improve SubscriptionProcessor's telemetry for messages exceeding expected duration #230

Merged
merged 2 commits into from
Nov 26, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/NuGet.Services.ServiceBus/BrokeredMessageWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public BrokeredMessageWrapper(BrokeredMessage brokeredMessage)
public BrokeredMessage BrokeredMessage { get; }

public DateTimeOffset ExpiresAtUtc => new DateTimeOffset(BrokeredMessage.ExpiresAtUtc);

public TimeSpan TimeToLive
{
get => BrokeredMessage.TimeToLive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,14 @@ public interface ISubscriptionProcessorTelemetryService
/// side, if we ever to see any.
/// </remarks>
void TrackEnqueueLag<TMessage>(TimeSpan enqueueLag);

/// <summary>
/// Track how long the handler took to handle a message.
/// </summary>
/// <typeparam name="TMessage">The type of message that was handled.</typeparam>
/// <param name="duration">How long it took to handle the message.</param>
/// <param name="callGuid">The GUID that identifies this attempt to handle the message.</param>
/// <param name="handled">Whether the message was handled successfully.</param>
void TrackMessageHandlerDuration<TMessage>(TimeSpan duration, Guid callGuid, bool handled);
}
}
26 changes: 22 additions & 4 deletions src/NuGet.Services.ServiceBus/SubscriptionProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ private async Task OnMessageAsync(IBrokeredMessage brokeredMessage)

TrackMessageLags(brokeredMessage);

var callGuid = Guid.NewGuid();
var stopwatch = Stopwatch.StartNew();

using (var scope = _logger.BeginScope($"{nameof(SubscriptionProcessor<TMessage>)}.{nameof(OnMessageAsync)} {{CallGuid}} {{CallStartTimestamp}} {{MessageId}}",
Guid.NewGuid(),
callGuid,
DateTimeOffset.UtcNow.ToString("O"),
brokeredMessage.MessageId))
{
Expand All @@ -104,18 +107,33 @@ private async Task OnMessageAsync(IBrokeredMessage brokeredMessage)

if (await _handler.HandleAsync(message))
{
_logger.LogInformation("Message was successfully handled, marking the brokered message as completed");
_logger.LogInformation(
"Message was successfully handled after {ElapsedSeconds} seconds, marking the brokered message as completed",
stopwatch.Elapsed.TotalSeconds);

await brokeredMessage.CompleteAsync();

_telemetryService.TrackMessageHandlerDuration<TMessage>(stopwatch.Elapsed, callGuid, handled: true);
}
else
{
_logger.LogInformation("Handler did not finish processing message, requeueing message to be reprocessed");
_logger.LogInformation(
"Handler did not finish processing message after {DurationSeconds} seconds, requeueing message to be reprocessed",
stopwatch.Elapsed.TotalSeconds);

_telemetryService.TrackMessageHandlerDuration<TMessage>(stopwatch.Elapsed, callGuid, handled: false);
}
}
catch (Exception e)
{
_logger.LogError(Event.SubscriptionMessageHandlerException, e, "Requeueing message as it was unsuccessfully processed due to exception");
_logger.LogError(
loic-sharma marked this conversation as resolved.
Show resolved Hide resolved
Event.SubscriptionMessageHandlerException,
e,
"Requeueing message as it was unsuccessfully processed due to exception after {DurationSeconds} seconds",
stopwatch.Elapsed.TotalSeconds);

_telemetryService.TrackMessageHandlerDuration<TMessage>(stopwatch.Elapsed, callGuid, handled: false);

// exception should not be propagated to the topic client, because it will
// abandon the message and will cause the retry to happen immediately, which,
// in turn, have higher chances of failing again if we, for example, experiencing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,9 @@ public void TrackEnqueueLag<TMessage>(TimeSpan enqueueLag)
{

}

public void TrackMessageHandlerDuration<TMessage>(TimeSpan duration, Guid callGuid, bool success)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void GetOrDefaultReturnsDefaultIfConversionUnsupported()
Assert.Equal(default(NoConversionFromStringToThisClass), unsupportedFromDictionary);
Assert.Equal(defaultNoConversion, unsupportedFromDictionaryWithDefault);
// Safety check to prevent the test from passing if defaultNoConversion is equal to default(NoConversionFromStringToThisClass)
Assert.NotEqual(defaultNoConversion, default(NoConversionFromStringToThisClass));
Assert.NotEqual(default(NoConversionFromStringToThisClass), defaultNoConversion);
}

[Theory]
Expand All @@ -106,7 +106,10 @@ public void GetOrThrowThrowsIfKeyNotFound<T>(T value)
// Arrange
const string notKey = "notKey";

IDictionary<string, string> dictionary = new Dictionary<string, string>();
IDictionary<string, string> dictionary = new Dictionary<string, string>
{
{ "otherKey", value.ToString() }
};

// Assert
Assert.Throws<KeyNotFoundException>(() => dictionary.GetOrThrow<T>(notKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void HandlesKeyNotFound()

// Assert
Assert.Throws<KeyNotFoundException>(() => dummy[notFoundKey]);
Assert.Equal(false, result);
Assert.False(result);
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="System.ValueTuple">
<Version>4.3.1</Version>
<Version>4.4.0</Version>
</PackageReference>
<PackageReference Include="Microsoft.Owin">
<Version>3.0.1</Version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public async Task DoesntCallHandlerOnDeserializationException()
_serializer.Verify(s => s.Deserialize(It.IsAny<IBrokeredMessage>()), Times.Once);
_handler.Verify(h => h.HandleAsync(It.IsAny<TestMessage>()), Times.Never);
_brokeredMessage.Verify(m => m.CompleteAsync(), Times.Never);
_telemetryService.Verify(t => t.TrackMessageHandlerDuration<TestMessage>(It.IsAny<TimeSpan>(), It.IsAny<Guid>(), false), Times.Once);
}

[Fact]
Expand Down Expand Up @@ -81,6 +82,7 @@ public async Task CallsHandlerWhenMessageIsReceived()
_serializer.Verify(s => s.Deserialize(It.IsAny<IBrokeredMessage>()), Times.Once);
_handler.Verify(h => h.HandleAsync(It.IsAny<TestMessage>()), Times.Once);
_brokeredMessage.Verify(m => m.CompleteAsync(), Times.Once);
_telemetryService.Verify(t => t.TrackMessageHandlerDuration<TestMessage>(It.IsAny<TimeSpan>(), It.IsAny<Guid>(), true), Times.Once);
}

[Fact]
Expand Down Expand Up @@ -115,6 +117,7 @@ public async Task DoesNotCompleteMessageIfHandlerReturnsFalse()
_serializer.Verify(s => s.Deserialize(It.IsAny<IBrokeredMessage>()), Times.Once);
_handler.Verify(h => h.HandleAsync(It.IsAny<TestMessage>()), Times.Once);
_brokeredMessage.Verify(m => m.CompleteAsync(), Times.Never);
_telemetryService.Verify(t => t.TrackMessageHandlerDuration<TestMessage>(It.IsAny<TimeSpan>(), It.IsAny<Guid>(), false), Times.Once);
}

[Fact]
Expand Down Expand Up @@ -150,6 +153,7 @@ public async Task BrokedMessageIsntCompletedIfHandlerThrows()
_serializer.Verify(s => s.Deserialize(It.IsAny<IBrokeredMessage>()), Times.Once);
_handler.Verify(h => h.HandleAsync(It.IsAny<TestMessage>()), Times.Once);
_brokeredMessage.Verify(m => m.CompleteAsync(), Times.Never);
_telemetryService.Verify(t => t.TrackMessageHandlerDuration<TestMessage>(It.IsAny<TimeSpan>(), It.IsAny<Guid>(), false), Times.Once);
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class TheDeserializeMethod
{
[Theory]
[MemberData(nameof(DeserializationOfIssuesWithNoPropertiesData))]
public void DeserializationOfIssuesWithNoProperties(ValidationIssueCode code, string data)
public void DeserializationOfIssuesWithNoProperties(ValidationIssueCode code)
{
// Arrange
var validationIssue = CreatePackageValidationIssue(code, Strings.EmptyJson);
Expand All @@ -112,10 +112,7 @@ public static IEnumerable<object[]> DeserializationOfIssuesWithNoPropertiesData
{
foreach (var code in IssuesWithNoProperties.Keys)
{
foreach (var data in InvalidData)
{
yield return new object[] { code, data };
}
yield return new object[] { code };
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void ProducesExpectedMessageForPreviousVersion()
Assert.Equal(ValidationTrackingId, output.ValidationTrackingId);
Assert.Equal(DeliveryCount, output.DeliveryCount);
Assert.Equal(ValidatingType.Package, output.ValidatingType);
Assert.Equal(null, output.EntityKey);
Assert.Null(output.EntityKey);
}

[Theory]
Expand Down