Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Queues - Failed message handler #17001

Merged
34 commits merged into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7d7b268
wip
kasobol-msft Nov 16, 2020
74a3175
export api.
kasobol-msft Nov 16, 2020
f8d31c8
Merge remote-tracking branch 'upstream/master' into failed-message-ha…
kasobol-msft Nov 19, 2020
f0755e0
that works.
kasobol-msft Nov 19, 2020
f91dc77
handle peeked messages
kasobol-msft Nov 19, 2020
fe4a3ce
api
kasobol-msft Nov 19, 2020
6185993
merge master
kasobol-msft Dec 4, 2020
2b92bbb
post merge.
kasobol-msft Dec 4, 2020
cf90918
propagate queueclient.
kasobol-msft Dec 4, 2020
7fb3036
merge upstream/master
kasobol-msft Dec 18, 2020
dc3e36a
Merge remote-tracking branch 'upstream/master' into failed-message-ha…
kasobol-msft Jan 7, 2021
e489930
fire and forget callback.
kasobol-msft Jan 7, 2021
415e4fd
tweaks.
kasobol-msft Jan 7, 2021
31279da
merge master.
kasobol-msft Jan 26, 2021
87c5c9e
re-record.
kasobol-msft Jan 26, 2021
4d5eeac
hack core temporarily.
kasobol-msft Jan 26, 2021
451c330
use event hander from core.
kasobol-msft Jan 26, 2021
3f8aeec
revert test change.
kasobol-msft Jan 26, 2021
13cf452
remove direct core reference from test package.
kasobol-msft Jan 26, 2021
af47e2c
that won't be necessary.
kasobol-msft Jan 26, 2021
7cad45d
more tests.
kasobol-msft Jan 26, 2021
918349f
readme.
kasobol-msft Jan 26, 2021
ea14fa1
whitespace.
kasobol-msft Jan 26, 2021
a078234
merge master
kasobol-msft Jan 27, 2021
27fb680
some pr feedback.
kasobol-msft Jan 27, 2021
8eb362e
api
kasobol-msft Jan 27, 2021
23c5214
get parent queue service.
kasobol-msft Jan 28, 2021
40e56ec
use project ref.
kasobol-msft Jan 29, 2021
30aad88
readme tweaks.
kasobol-msft Jan 29, 2021
13d53d5
renaming.
kasobol-msft Jan 29, 2021
6585e9f
misc.
kasobol-msft Jan 29, 2021
46f69cb
protected onevent thing.
kasobol-msft Jan 29, 2021
59c3d24
merge master.
kasobol-msft Jan 29, 2021
523d886
post merge.
kasobol-msft Jan 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions sdk/core/Azure.Core/src/Shared/SyncAsyncEventHandlerExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.Core.Pipeline;

// TODO (kasobol-msft) This is borrowed from https://github.com/Azure/azure-sdk-for-net/pull/18170.
kasobol-msft marked this conversation as resolved.
Show resolved Hide resolved
namespace Azure.Core
{
/// <summary>
/// Extensions for raising <see cref="SyncAsyncEventHandler{T}"/>
/// events.
/// </summary>
internal static class SyncAsyncEventHandlerExtensions
{
/// <summary>
/// Raise an <see cref="Azure.Core.SyncAsyncEventHandler{T}"/>
/// event by executing each of the handlers sequentially (to avoid
/// introducing accidental parallelism in customer code) and collecting
/// any exceptions.
/// </summary>
/// <typeparam name="T">Type of the event arguments.</typeparam>
/// <param name="eventHandler">The event's delegate.</param>
/// <param name="e">
/// An <see cref="SyncAsyncEventArgs"/> instance that contains the
/// event data.
/// </param>
/// <param name="declaringTypeName">
/// The name of the type declaring the event to construct a helpful
/// exception message and distributed tracing span.
/// </param>
/// <param name="eventName">
/// The name of the event to construct a helpful exception message and
/// distributed tracing span.
/// </param>
/// <param name="clientDiagnostics">
/// Client diagnostics to wrap all the handlers in a new distributed
/// tracing span.
/// </param>
/// <returns>
/// A task that represents running all of the event's handlers.
/// </returns>
/// <exception cref="AggregateException">
/// An exception was thrown during the execution of at least one of the
/// event's handlers.
/// </exception>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="e"/>, <paramref name="declaringTypeName"/>,
/// <paramref name="eventName"/>, or <paramref name="clientDiagnostics"/>
/// are null.
/// </exception>
/// <exception cref="ArgumentException">
/// Thrown when <paramref name="declaringTypeName"/> or
/// <paramref name="eventName"/> are empty.
/// </exception>
public static async Task RaiseAsync<T>(
this SyncAsyncEventHandler<T> eventHandler,
T e,
string declaringTypeName,
string eventName,
ClientDiagnostics clientDiagnostics)
where T : SyncAsyncEventArgs
{
Argument.AssertNotNull(e, nameof(e));
Argument.AssertNotNullOrEmpty(declaringTypeName, nameof(declaringTypeName));
Argument.AssertNotNullOrEmpty(eventName, nameof(eventName));
Argument.AssertNotNull(clientDiagnostics, nameof(clientDiagnostics));

// Get the invocation list, but return early if there's no work
if (eventHandler == null) { return; }
Delegate[] handlers = eventHandler.GetInvocationList();
if (handlers == null || handlers.Length == 0) { return; }

// Wrap handler invocation in a distributed tracing span so it's
// easy for customers to track and measure
string eventFullName = declaringTypeName + "." + eventName;
using DiagnosticScope scope = clientDiagnostics.CreateScope(eventFullName);
scope.Start();
try
{
// Collect any exceptions raised by handlers
List<Exception> failures = null;

// Raise the handlers sequentially so we don't introduce any
// unintentional parallelism in customer code
foreach (Delegate handler in handlers)
{
SyncAsyncEventHandler<T> azureHandler = (SyncAsyncEventHandler<T>)handler;
try
{
Task runHandlerTask = azureHandler(e);
// We can consider logging something when e.RunSynchronously
// is true, but runHandlerTask.IsComplete is false.
// (We're not bother to check our tests because
// EnsureCompleted on the code path that raised the
// event will catch it for us.)
await runHandlerTask.ConfigureAwait(false);
}
catch (Exception ex)
{
failures ??= new List<Exception>();
failures.Add(ex);
}
}

// Wrap any exceptions in an AggregateException
if (failures?.Count > 0)
{
// Include the event name in the exception for easier debugging
throw new AggregateException(
"Unhandled exception(s) thrown when raising the " + eventFullName + " event.",
failures);
}
}
catch (Exception ex)
{
scope.Failed(ex);
throw;
}
}
}
}
95 changes: 95 additions & 0 deletions sdk/core/Azure.Core/src/SyncAsyncEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Threading;

// TODO (kasobol-msft) This is borrowed from https://github.com/Azure/azure-sdk-for-net/pull/18170.
kasobol-msft marked this conversation as resolved.
Show resolved Hide resolved
namespace Azure
{
/// <summary>
/// Provides data for <see cref="Azure.Core.SyncAsyncEventHandler{T}"/>
/// events that can be invoked either synchronously or asynchronously.
/// </summary>
public class SyncAsyncEventArgs : EventArgs
{
/// <summary>
/// Gets a value indicating whether the event handler was invoked
/// synchronously or asynchronously. Please see
/// <see cref="Azure.Core.SyncAsyncEventHandler{T}"/> for more details.
/// </summary>
/// <remarks>
/// <para>
/// The same <see cref="Azure.Core.SyncAsyncEventHandler{T}"/>
/// event can be raised from both synchronous and asynchronous code
/// paths depending on whether you're calling sync or async methods on
/// a client. If you write an async handler but raise it from a sync
/// method, the handler will be doing sync-over-async and may cause
/// ThreadPool starvation. See
/// <see href="https://docs.microsoft.com/archive/blogs/vancem/diagnosing-net-core-threadpool-starvation-with-perfview-why-my-service-is-not-saturating-all-cores-or-seems-to-stall">
/// Diagnosing .NET Core ThreadPool Starvation with PerfView</see> for
/// a detailed explanation of how that can cause ThreadPool starvation
/// and serious performance problems.
/// </para>
/// <para>
/// You can use this <see cref="RunSynchronously"/> property to check
/// how the event is being raised and implement your handler
/// accordingly. Here's an example handler that's safe to invoke from
/// both sync and async code paths.
/// <code snippet="Snippet:Azure_Core_Samples_EventSamples_CombinedHandler">
/// var client = new AlarmClient();
/// client.Ring += async (SyncAsyncEventArgs e) =&gt;
/// {
/// if (e.RunSynchronously)
/// {
/// Console.WriteLine(&quot;Wake up!&quot;);
/// }
/// else
/// {
/// await Console.Out.WriteLineAsync(&quot;Wake up!&quot;);
/// }
/// };
///
/// client.Snooze(); // sync call that blocks
/// await client.SnoozeAsync(); // async call that doesn&apos;t block
/// </code>
/// </para>
/// </remarks>
public bool RunSynchronously { get; }

/// <summary>
/// Gets a cancellation token related to the original operation that
/// raised the event. It's important for your handler to pass this
/// token along to any asynchronous or long-running synchronous
/// operations that take a token so cancellation (via something like
/// <code>
/// new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token
/// </code>
/// for example) will correctly propagate.
/// </summary>
public CancellationToken CancellationToken { get; }

/// <summary>
/// Initializes a new instance of the <see cref="SyncAsyncEventArgs"/>
/// class.
/// </summary>
/// <param name="runSynchronously">
/// A value indicating whether the event handler was invoked
/// synchronously or asynchronously. Please see
/// <see cref="Azure.Core.SyncAsyncEventHandler{T}"/> for more details.
/// </param>
/// <param name="cancellationToken">
/// A cancellation token related to the original operation that raised
/// the event. It's important for your handler to pass this token
/// along to any asynchronous or long-running synchronous operations
/// that take a token so cancellation will correctly propagate. The
/// default value is <see cref="CancellationToken.None"/>.
/// </param>
public SyncAsyncEventArgs(bool runSynchronously, CancellationToken cancellationToken = default)
: base()
{
RunSynchronously = runSynchronously;
CancellationToken = cancellationToken;
}
}
}
Loading