Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf: significant resource optimization around garbage collection, memory and connection utilization #4727

Merged
merged 10 commits into from
Oct 21, 2020
Merged
8 changes: 5 additions & 3 deletions libraries/Microsoft.Bot.Builder/BotFrameworkAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -750,9 +750,11 @@ public async Task<ConversationsResult> GetConversationsAsync(string serviceUrl,
throw new ArgumentNullException(nameof(credentials));
}

var connectorClient = CreateConnectorClient(serviceUrl, credentials);
var results = await connectorClient.Conversations.GetConversationsAsync(continuationToken, cancellationToken).ConfigureAwait(false);
return results;
using (var connectorClient = CreateConnectorClient(serviceUrl, credentials))
stevengum marked this conversation as resolved.
Show resolved Hide resolved
{
var results = await connectorClient.Conversations.GetConversationsAsync(continuationToken, cancellationToken).ConfigureAwait(false);
return results;
}
}

/// <summary>
Expand Down
36 changes: 22 additions & 14 deletions libraries/Microsoft.Bot.Builder/CloudAdapterBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,17 @@ protected async Task ProcessProactiveAsync(ClaimsIdentity claimsIdentity, Conver
var credentials = await _botFrameworkAuthentication.GetProactiveCredentialsAsync(claimsIdentity, audience, cancellationToken).ConfigureAwait(false);

// Create the connector client to use for outbound requests.
var connectorClient = new ConnectorClient(new Uri(reference.ServiceUrl), credentials, _httpClient);

// Create a turn context and run the pipeline.
using (var context = CreateTurnContext(reference.GetContinuationActivity(), claimsIdentity, audience, connectorClient, callback))
using (var connectorClient = new ConnectorClient(new Uri(reference.ServiceUrl), credentials, _httpClient))
{
// Run the pipeline.
await RunPipelineAsync(context, callback, cancellationToken).ConfigureAwait(false);
// Create a turn context and run the pipeline.
using (var context = CreateTurnContext(reference.GetContinuationActivity(), claimsIdentity, audience, connectorClient, callback))
{
// Run the pipeline.
await RunPipelineAsync(context, callback, cancellationToken).ConfigureAwait(false);

// Cleanup disposable resources in case other code kept a reference to it.
context.TurnState.Set<IConnectorClient>(null);
}
}
}

Expand All @@ -270,16 +274,20 @@ protected async Task<InvokeResponse> ProcessActivityAsync(string authHeader, Act
activity.CallerId = authenticateRequestResult.CallerId;

// Create the connector client to use for outbound requests.
var connectorClient = new ConnectorClient(new Uri(activity.ServiceUrl), authenticateRequestResult.Credentials, _httpClient);

// Create a turn context and run the pipeline.
using (var context = CreateTurnContext(activity, authenticateRequestResult.ClaimsIdentity, authenticateRequestResult.Scope, connectorClient, callback))
using (var connectorClient = new ConnectorClient(new Uri(activity.ServiceUrl), authenticateRequestResult.Credentials, _httpClient))
{
// Run the pipeline.
await RunPipelineAsync(context, callback, cancellationToken).ConfigureAwait(false);
// Create a turn context and run the pipeline.
using (var context = CreateTurnContext(activity, authenticateRequestResult.ClaimsIdentity, authenticateRequestResult.Scope, connectorClient, callback))
{
// Run the pipeline.
await RunPipelineAsync(context, callback, cancellationToken).ConfigureAwait(false);

// If there are any results they will have been left on the TurnContext.
return ProcessTurnResults(context);
// Cleanup disposable resources in case other code kept a reference to it.
context.TurnState.Set<IConnectorClient>(null);

// If there are any results they will have been left on the TurnContext.
return ProcessTurnResults(context);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,16 @@ public async Task<InvokeResponse> ProcessStreamingActivityAsync(Activity activit
context.TurnState.Add<IIdentity>(BotIdentityKey, ClaimsIdentity);
}

var connectorClient = CreateStreamingConnectorClient(activity, requestHandler);
context.TurnState.Add(connectorClient);
using (var connectorClient = CreateStreamingConnectorClient(activity, requestHandler))
{
// Add connector client to be used throughout the turn
context.TurnState.Add(connectorClient);

await RunPipelineAsync(context, callbackHandler, cancellationToken).ConfigureAwait(false);

await RunPipelineAsync(context, callbackHandler, cancellationToken).ConfigureAwait(false);
// Cleanup connector client
context.TurnState.Set<IConnectorClient>(null);
}

if (activity.Type == ActivityTypes.Invoke)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public override async Task<StreamingResponse> ProcessRequestAsync(ReceiveRequest
string body;
try
{
body = request.ReadBodyAsString();
body = await request.ReadBodyAsStringAsync().ConfigureAwait(false);
}
#pragma warning disable CA1031 // Do not catch general exception types (we log the exception and continue execution)
catch (Exception ex)
Expand Down
47 changes: 34 additions & 13 deletions libraries/Microsoft.Bot.Streaming/ReceiveRequestExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;

namespace Microsoft.Bot.Streaming
Expand All @@ -23,29 +24,37 @@ public static class ReceiveRequestExtensions
/// Otherwise a default instance of type T.
/// </returns>
public static T ReadBodyAsJson<T>(this ReceiveRequest request)
{
return request.ReadBodyAsJsonAsync<T>().GetAwaiter().GetResult();
}

/// <summary>
/// Serializes the body of this <see cref="ReceiveRequest"/> as JSON.
/// </summary>
/// <typeparam name="T">The type to attempt to deserialize the contents of this <see cref="ReceiveRequest"/>'s body into.</typeparam>
/// <param name="request">The current instance of <see cref="ReceiveRequest"/>.</param>
/// <returns>On success, an object of type T populated with data serialized from the <see cref="ReceiveRequest"/> body.
/// Otherwise a default instance of type T.
/// </returns>
public static async Task<T> ReadBodyAsJsonAsync<T>(this ReceiveRequest request)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3

{
// The first stream attached to a ReceiveRequest is always the ReceiveRequest body.
// Any additional streams must be defined within the body or they will not be
// attached properly when processing activities.
var contentStream = request.Streams.FirstOrDefault();

/* If the response had no body we have to return a compatible
* but empty object to avoid throwing exceptions upstream anytime
* an empty response is received.
*/
* but empty object to avoid throwing exceptions upstream anytime
* an empty response is received.
*/
if (contentStream == null)
{
return default;
}

using (var reader = new StreamReader(contentStream.Stream, Encoding.UTF8))
{
using (var jsonReader = new JsonTextReader(reader))
{
var serializer = JsonSerializer.Create(SerializationSettings.DefaultDeserializationSettings);
return serializer.Deserialize<T>(jsonReader);
}
}
var bodyString = await request.ReadBodyAsStringAsync().ConfigureAwait(false);

return JsonConvert.DeserializeObject<T>(bodyString, SerializationSettings.DefaultDeserializationSettings);
}

/// <summary>
Expand All @@ -56,17 +65,29 @@ public static T ReadBodyAsJson<T>(this ReceiveRequest request)
/// Otherwise null.
/// </returns>
public static string ReadBodyAsString(this ReceiveRequest request)
{
return request.ReadBodyAsStringAsync().GetAwaiter().GetResult();
}

/// <summary>
/// Reads the body of this <see cref="ReceiveRequest"/> as a string.
/// </summary>
/// <param name="request">The current instance of <see cref="ReceiveRequest"/>.</param>
/// <returns>On success, a string populated with data read from the <see cref="ReceiveRequest"/> body.
/// Otherwise null.
/// </returns>
public static Task<string> ReadBodyAsStringAsync(this ReceiveRequest request)
{
var contentStream = request.Streams.FirstOrDefault();

if (contentStream == null)
{
return string.Empty;
return Task.FromResult(string.Empty);
}

using (var reader = new StreamReader(contentStream.Stream, Encoding.UTF8))
{
return reader.ReadToEnd();
return reader.ReadToEndAsync();
}
}
}
Expand Down