Skip to content

Commit

Permalink
Ingestion service data delivery status (#1887)
Browse files Browse the repository at this point in the history
* Added eventhandler to transmission

* Updated public API

* Modified changelog.md

* Added more tests

* Fix API

* Added coverage for timeout

* Update to API

* Modified comment in transmission.

* PR Comments

* PR feedback

* Fix test

* Added TransmissionStatusEvent to InMemoryChannel.

* Remove Inmemory change

Co-authored-by: Cijo Thomas <[email protected]>
Co-authored-by: Timothy Mothra <[email protected]>
  • Loading branch information
3 people authored Sep 4, 2020
1 parent a993547 commit 1b2ace3
Show file tree
Hide file tree
Showing 13 changed files with 436 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel.ServerTelemetryChannel.TransmissionStatusEvent.get -> System.EventHandler<Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs>
Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel.ServerTelemetryChannel.TransmissionStatusEvent.set -> void
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel.ServerTelemetryChannel.TransmissionStatusEvent.get -> System.EventHandler<Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs>
Microsoft.ApplicationInsights.WindowsServer.TelemetryChannel.ServerTelemetryChannel.TransmissionStatusEvent.set -> void
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
Microsoft.ApplicationInsights.TelemetryClient.TelemetryConfiguration.get -> Microsoft.ApplicationInsights.Extensibility.TelemetryConfiguration
Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs.TransmissionStatusEventArgs(Microsoft.ApplicationInsights.Extensibility.Implementation.HttpWebResponseWrapper response) -> void
Microsoft.ApplicationInsights.TelemetryClient.TelemetryConfiguration.get -> Microsoft.ApplicationInsights.Extensibility.TelemetryConfiguration
Microsoft.ApplicationInsights.Channel.Transmission.TransmissionStatusEvent.get -> System.EventHandler<Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs>
Microsoft.ApplicationInsights.Channel.Transmission.TransmissionStatusEvent.set -> void
Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs
Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs.Response.get -> Microsoft.ApplicationInsights.Extensibility.Implementation.HttpWebResponseWrapper
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
Microsoft.ApplicationInsights.TelemetryClient.TelemetryConfiguration.get -> Microsoft.ApplicationInsights.Extensibility.TelemetryConfiguration
Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs.TransmissionStatusEventArgs(Microsoft.ApplicationInsights.Extensibility.Implementation.HttpWebResponseWrapper response) -> void
Microsoft.ApplicationInsights.TelemetryClient.TelemetryConfiguration.get -> Microsoft.ApplicationInsights.Extensibility.TelemetryConfiguration
Microsoft.ApplicationInsights.Channel.Transmission.TransmissionStatusEvent.get -> System.EventHandler<Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs>
Microsoft.ApplicationInsights.Channel.Transmission.TransmissionStatusEvent.set -> void
Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs
Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs.Response.get -> Microsoft.ApplicationInsights.Extensibility.Implementation.HttpWebResponseWrapper
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
Microsoft.ApplicationInsights.TelemetryClient.TelemetryConfiguration.get -> Microsoft.ApplicationInsights.Extensibility.TelemetryConfiguration
Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs.TransmissionStatusEventArgs(Microsoft.ApplicationInsights.Extensibility.Implementation.HttpWebResponseWrapper response) -> void
Microsoft.ApplicationInsights.TelemetryClient.TelemetryConfiguration.get -> Microsoft.ApplicationInsights.Extensibility.TelemetryConfiguration
Microsoft.ApplicationInsights.Channel.Transmission.TransmissionStatusEvent.get -> System.EventHandler<Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs>
Microsoft.ApplicationInsights.Channel.Transmission.TransmissionStatusEvent.set -> void
Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs
Microsoft.ApplicationInsights.Channel.TransmissionStatusEventArgs.Response.get -> Microsoft.ApplicationInsights.Extensibility.Implementation.HttpWebResponseWrapper
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
using System.CodeDom;
using Microsoft.ApplicationInsights.Extensibility.Implementation.Tracing;
using System.Diagnostics.Tracing;
using System.Runtime.Serialization;
using System.Text;
using System.Runtime.Serialization.Json;

public class TransmissionTest
{
Expand Down Expand Up @@ -403,13 +406,13 @@ public async Task SendAsyncLogsIngestionReponseTimeEventCounter()
var payload = (IDictionary<string, object>)traces[0].Payload[0];
Assert.AreEqual("IngestionEndpoint-ResponseTimeMsec", payload["Name"].ToString());
Assert.IsTrue((int)payload["Count"] >= 5);
// Mean should be more than 30 ms, as we introduced a delay of 30ms in SendAsync.
// Max should be more than 30 ms, as we introduced a delay of 30ms in SendAsync.
#if NETCOREAPP2_1
Assert.IsTrue((float)payload["Mean"] >= 30);
Assert.IsTrue((float)payload["Max"] >= 30);
#endif

#if NETCOREAPP3_1
Assert.IsTrue((double)payload["Mean"] >= 30);
Assert.IsTrue((double)payload["Max"] >= 30);
#endif
}
}
Expand Down Expand Up @@ -501,6 +504,303 @@ public async Task SendAsyncLogsIngestionReponseTimeAndStatusCode()
}
}
}

[TestMethod]
public async Task TestTransmissionStatusEventHandlerWithSuccessTransmission()
{
// ARRANGE
var handler = new HandlerForFakeHttpClient
{
InnerHandler = new HttpClientHandler(),
OnSendAsync = (req, cancellationToken) =>
{
return Task.FromResult<HttpResponseMessage>(new HttpResponseMessage());
}
};

using (var fakeHttpClient = new HttpClient(handler))
{
// Instantiate Transmission with the mock HttpClient
Transmission transmission = new Transmission(testUri, new byte[] { 1, 2, 3, 4, 5 }, fakeHttpClient, string.Empty, string.Empty);

// VALIDATE
transmission.TransmissionStatusEvent += delegate (object sender, TransmissionStatusEventArgs args)
{
Assert.IsTrue(sender is Transmission);
Assert.AreEqual((int)HttpStatusCode.OK, args.Response.StatusCode);
};

// ACT
HttpWebResponseWrapper result = await transmission.SendAsync();
}
}

[TestMethod]
public async Task TestTransmissionStatusEventHandlerWithKnownFailureTransmission()
{
// ARRANGE
var handler = new HandlerForFakeHttpClient
{
InnerHandler = new HttpClientHandler(),
OnSendAsync = (req, cancellationToken) =>
{
cancellationToken.ThrowIfCancellationRequested();
return Task.FromResult<HttpResponseMessage>(new HttpResponseMessage(HttpStatusCode.ServiceUnavailable));
}
};

using (var fakeHttpClient = new HttpClient(handler))
{
// Instantiate Transmission with the mock HttpClient
Transmission transmission = new Transmission(testUri, new byte[] { 1, 2, 3, 4, 5 }, fakeHttpClient, string.Empty, string.Empty);
transmission.Timeout = TimeSpan.Zero;

// VALIDATE
transmission.TransmissionStatusEvent += delegate (object sender, TransmissionStatusEventArgs args)
{
Assert.AreEqual((int)HttpStatusCode.RequestTimeout, args.Response.StatusCode);
};

// ACT
HttpWebResponseWrapper result = await transmission.SendAsync();
}
}

[TestMethod]
public async Task TestTransmissionStatusEventHandlerWithUnKnownFailureTransmission()
{
// ARRANGE
var handler = new HandlerForFakeHttpClient
{
InnerHandler = new HttpClientHandler(),
OnSendAsync = (req, cancellationToken) =>
{
throw new Exception("test");
}
};

using (var fakeHttpClient = new HttpClient(handler))
{
// Instantiate Transmission with the mock HttpClient
Transmission transmission = new Transmission(testUri, new byte[] { 1, 2, 3, 4, 5 }, fakeHttpClient, string.Empty, string.Empty);
transmission.Timeout = TimeSpan.Zero;

// VALIDATE
transmission.TransmissionStatusEvent += delegate (object sender, TransmissionStatusEventArgs args)
{
Assert.AreEqual(999, args.Response.StatusCode);
};

// ACT
try
{
HttpWebResponseWrapper result = await transmission.SendAsync();
}
catch (Exception ex)
{
Assert.AreEqual("test", ex.Message);
}
}
}

[TestMethod]
public async Task TestTransmissionStatusEventHandlerFails()
{
// ARRANGE
var handler = new HandlerForFakeHttpClient
{
InnerHandler = new HttpClientHandler(),
OnSendAsync = (req, cancellationToken) =>
{
return Task.FromResult<HttpResponseMessage>(new HttpResponseMessage());
}
};

using (var listener = new TestEventListener())
{
listener.EnableEvents(CoreEventSource.Log, EventLevel.LogAlways,
(EventKeywords)AllKeywords);

using (var fakeHttpClient = new HttpClient(handler))
{
// Instantiate Transmission with the mock HttpClient
Transmission transmission = new Transmission(testUri, new byte[] { 1, 2, 3, 4, 5 }, fakeHttpClient, string.Empty, string.Empty);

// VALIDATE
transmission.TransmissionStatusEvent += delegate (object sender, TransmissionStatusEventArgs args)
{
throw new Exception("test");
};

// ACT
HttpWebResponseWrapper result = await transmission.SendAsync();
}

// Assert:
var allTraces = listener.Messages.ToList();
var traces = allTraces.Where(item => item.EventId == 71).ToList();
Assert.AreEqual(1, traces.Count);
}
}

[TestMethod]
public async Task TestTransmissionStatusEventWithEventsFromMultipleIKey()
{
// ARRANGE
// Raw response from backend for partial response
var ingestionResponse = "{" +
"\r\n \"itemsReceived\": 5,\r\n \"itemsAccepted\": 2,\r\n " +
"\"errors\": [\r\n {\r\n " +
"\"index\": 0,\r\n \"statusCode\": 400,\r\n \"message\": \"Error 1\"\r\n },\r\n {\r\n " +
"\"index\": 2,\r\n \"statusCode\": 503,\r\n \"message\": \"Error 2\"\r\n },\r\n {\r\n " +
"\"index\": 3,\r\n \"statusCode\": 500,\r\n \"message\": \"Error 3\"\r\n }\r\n ]\r\n}";

// Fake HttpClient will respond back with partial content
var handler = new HandlerForFakeHttpClient
{
InnerHandler = new HttpClientHandler(),
OnSendAsync = (req, cancellationToken) =>
{
return Task.FromResult<HttpResponseMessage>(new HttpResponseMessage { StatusCode = HttpStatusCode.PartialContent, Content = new StringContent(ingestionResponse) });
}
};

using (var fakeHttpClient = new HttpClient(handler))
{
// Create a list of telemetry which could send information to different instrumentation keys
var telemetryItems = new List<ITelemetry>();

EventTelemetry eventTelemetry1 = new EventTelemetry("Event1");
eventTelemetry1.Context.InstrumentationKey = "IKEY_1";
telemetryItems.Add(eventTelemetry1);

EventTelemetry eventTelemetry2 = new EventTelemetry("Event2");
eventTelemetry2.Context.InstrumentationKey = "IKEY_2";
telemetryItems.Add(eventTelemetry2);

EventTelemetry eventTelemetry3 = new EventTelemetry("Event3");
eventTelemetry3.Context.InstrumentationKey = "IKEY_3";
telemetryItems.Add(eventTelemetry3);

EventTelemetry eventTelemetry4 = new EventTelemetry("Event3");
eventTelemetry4.Context.InstrumentationKey = "IKEY_2";
telemetryItems.Add(eventTelemetry4);

EventTelemetry eventTelemetry5 = new EventTelemetry("Event5");
eventTelemetry5.Context.InstrumentationKey = "IKEY_1";
telemetryItems.Add(eventTelemetry5);

// Serialize the telemetry items before passing to transmission
var serializedData = JsonSerializer.Serialize(telemetryItems, true);

// Instantiate Transmission with the mock HttpClient
Transmission transmission = new Transmission(testUri, serializedData, fakeHttpClient, string.Empty, string.Empty);

// VALIDATE
transmission.TransmissionStatusEvent += delegate (object sender, TransmissionStatusEventArgs args)
{
var sendertransmission = sender as Transmission;
// convert raw JSON response to Backendresponse object
BackendResponse backendResponse = GetBackendResponse(args.Response.Content);

// Deserialize telemetry items to identify which items has failed
string[] items = JsonSerializer
.Deserialize(sendertransmission.Content)
.Split(new[] { Environment.NewLine }, StringSplitOptions.RemoveEmptyEntries);

string[] failedItems = new string[3];
int i = 0;

// Create a list of failed items
foreach (var error in backendResponse.Errors)
{
failedItems[i++] = items[error.Index];
}

Assert.AreEqual((int)HttpStatusCode.PartialContent, args.Response.StatusCode);
Assert.AreEqual(5, backendResponse.ItemsReceived);
Assert.AreEqual(2, backendResponse.ItemsAccepted);

//IKEY_1
int totalItemsForIkey = items.Where(x => x.Contains("IKEY_1")).Count();
int failedItemsForIkey = failedItems.Where(x => x.Contains("IKEY_1")).Count();
Assert.AreEqual(2, totalItemsForIkey);
Assert.AreEqual(1, failedItemsForIkey);

//IKEY_2
totalItemsForIkey = items.Where(x => x.Contains("IKEY_2")).Count();
failedItemsForIkey = failedItems.Where(x => x.Contains("IKEY_2")).Count();
Assert.AreEqual(2, totalItemsForIkey);
Assert.AreEqual(1, failedItemsForIkey);

//IKEY_3
totalItemsForIkey = items.Where(x => x.Contains("IKEY_3")).Count();
failedItemsForIkey = failedItems.Where(x => x.Contains("IKEY_3")).Count();
Assert.AreEqual(1, totalItemsForIkey);
Assert.AreEqual(1, failedItemsForIkey);
};

// ACT
HttpWebResponseWrapper result = await transmission.SendAsync();
}
}

/// <summary>
/// Serializes response from ingestion service to BackendResponse object.
/// </summary>
/// <param name="response">Response from ingestion service.</param>
/// <returns></returns>
private BackendResponse GetBackendResponse(string responseContent)
{
BackendResponse backendResponse = null;
DataContractJsonSerializer Serializer = new DataContractJsonSerializer(typeof(BackendResponse));

try
{
if (!string.IsNullOrEmpty(responseContent))
{
using (MemoryStream ms = new MemoryStream(Encoding.Unicode.GetBytes(responseContent)))
{
backendResponse = Serializer.ReadObject(ms) as BackendResponse;
}
}
}
catch
{
backendResponse = null;
}

return backendResponse;
}
}
}

/// <summary>
/// DataContract class to hold response from ingestion service.
/// </summary>
[DataContract]
internal class BackendResponse
{
[DataMember(Name = "itemsReceived", IsRequired = true)]
public int ItemsReceived { get; set; }

[DataMember(Name = "itemsAccepted", IsRequired = true)]
public int ItemsAccepted { get; set; }

[DataMember(Name = "errors")]
public Error[] Errors { get; set; }

[DataContract]
internal class Error
{
[DataMember(Name = "index")]
public int Index { get; set; }

[DataMember(Name = "statusCode")]
public int StatusCode { get; set; }

[DataMember(Name = "message")]
public string Message { get; set; }
}
}

Expand Down
Loading

0 comments on commit 1b2ace3

Please sign in to comment.