Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage] Queues - Failed message handler #17001

Merged
34 commits merged into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7d7b268
wip
kasobol-msft Nov 16, 2020
74a3175
export api.
kasobol-msft Nov 16, 2020
f8d31c8
Merge remote-tracking branch 'upstream/master' into failed-message-ha…
kasobol-msft Nov 19, 2020
f0755e0
that works.
kasobol-msft Nov 19, 2020
f91dc77
handle peeked messages
kasobol-msft Nov 19, 2020
fe4a3ce
api
kasobol-msft Nov 19, 2020
6185993
merge master
kasobol-msft Dec 4, 2020
2b92bbb
post merge.
kasobol-msft Dec 4, 2020
cf90918
propagate queueclient.
kasobol-msft Dec 4, 2020
7fb3036
merge upstream/master
kasobol-msft Dec 18, 2020
dc3e36a
Merge remote-tracking branch 'upstream/master' into failed-message-ha…
kasobol-msft Jan 7, 2021
e489930
fire and forget callback.
kasobol-msft Jan 7, 2021
415e4fd
tweaks.
kasobol-msft Jan 7, 2021
31279da
merge master.
kasobol-msft Jan 26, 2021
87c5c9e
re-record.
kasobol-msft Jan 26, 2021
4d5eeac
hack core temporarily.
kasobol-msft Jan 26, 2021
451c330
use event hander from core.
kasobol-msft Jan 26, 2021
3f8aeec
revert test change.
kasobol-msft Jan 26, 2021
13cf452
remove direct core reference from test package.
kasobol-msft Jan 26, 2021
af47e2c
that won't be necessary.
kasobol-msft Jan 26, 2021
7cad45d
more tests.
kasobol-msft Jan 26, 2021
918349f
readme.
kasobol-msft Jan 26, 2021
ea14fa1
whitespace.
kasobol-msft Jan 26, 2021
a078234
merge master
kasobol-msft Jan 27, 2021
27fb680
some pr feedback.
kasobol-msft Jan 27, 2021
8eb362e
api
kasobol-msft Jan 27, 2021
23c5214
get parent queue service.
kasobol-msft Jan 28, 2021
40e56ec
use project ref.
kasobol-msft Jan 29, 2021
30aad88
readme tweaks.
kasobol-msft Jan 29, 2021
13d53d5
renaming.
kasobol-msft Jan 29, 2021
6585e9f
misc.
kasobol-msft Jan 29, 2021
46f69cb
protected onevent thing.
kasobol-msft Jan 29, 2021
59c3d24
merge master.
kasobol-msft Jan 29, 2021
523d886
post merge.
kasobol-msft Jan 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>$(RequiredTargetFrameworks)</TargetFrameworks>
</PropertyGroup>
Expand All @@ -17,6 +17,10 @@
</Description>
<RootNamespace>Azure.Storage</RootNamespace>
</PropertyGroup>
<PropertyGroup>
<!-- Force a project reference until SyncAsyncEventHandler has shipped -->
<UseProjectReferenceToAzureClients>true</UseProjectReferenceToAzureClients>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Core" />
</ItemGroup>
Expand Down
14 changes: 14 additions & 0 deletions sdk/storage/Azure.Storage.Queues/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ QueueClient queue = new QueueClient(accountUri, new DefaultAzureCredential());

Learn more about enabling Azure Active Directory for authentication with Azure Storage in [our documentation][storage_ad] and [our samples](#next-steps).

### Message encoding

This version of library does not encode message by default. V11 and prior versions as well as Azure Functions use base64-encoded messages by default.
Therefore it's recommended to use this feature for interop scenarios.

```C# Snippet:Azure_Storage_Queues_Samples_Sample03_MessageEncoding_ConfigureMessageEncodingAsync
QueueClientOptions queueClientOptions = new QueueClientOptions()
{
MessageEncoding = QueueMessageEncoding.Base64
};

QueueClient queueClient = new QueueClient(connectionString, queueName, queueClientOptions);
```

## Troubleshooting

All Azure Storage Queue service operations will throw a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public QueueClient(System.Uri queueUri, Azure.Storage.StorageSharedKeyCredential
public virtual System.Uri GenerateSasUri(Azure.Storage.Sas.QueueSasPermissions permissions, System.DateTimeOffset expiresOn) { throw null; }
public virtual Azure.Response<System.Collections.Generic.IEnumerable<Azure.Storage.Queues.Models.QueueSignedIdentifier>> GetAccessPolicy(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<System.Collections.Generic.IEnumerable<Azure.Storage.Queues.Models.QueueSignedIdentifier>>> GetAccessPolicyAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected internal virtual Azure.Storage.Queues.QueueServiceClient GetParentQueueServiceClientCore() { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.QueueProperties> GetProperties(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.QueueProperties>> GetPropertiesAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected virtual System.Threading.Tasks.Task OnMessageDecodingFailedAsync(Azure.Storage.Queues.Models.QueueMessage receivedMessage, Azure.Storage.Queues.Models.PeekedMessage peekedMessage, bool runSynchronously, System.Threading.CancellationToken cancellationToken) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.PeekedMessage> PeekMessage(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Storage.Queues.Models.PeekedMessage>> PeekMessageAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Storage.Queues.Models.PeekedMessage[]> PeekMessages(int? maxMessages = default(int?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down Expand Up @@ -72,6 +74,7 @@ public QueueClientOptions(Azure.Storage.Queues.QueueClientOptions.ServiceVersion
public System.Uri GeoRedundantSecondaryUri { get { throw null; } set { } }
public Azure.Storage.Queues.QueueMessageEncoding MessageEncoding { get { throw null; } set { } }
public Azure.Storage.Queues.QueueClientOptions.ServiceVersion Version { get { throw null; } }
public event Azure.Core.SyncAsyncEventHandler<Azure.Storage.Queues.QueueMessageDecodingFailedEventArgs> MessageDecodingFailed { add { } remove { } }
public enum ServiceVersion
{
V2019_02_02 = 1,
Expand All @@ -82,6 +85,13 @@ public enum ServiceVersion
V2020_06_12 = 6,
}
}
public partial class QueueMessageDecodingFailedEventArgs : Azure.SyncAsyncEventArgs
{
public QueueMessageDecodingFailedEventArgs(Azure.Storage.Queues.QueueClient queueClient, Azure.Storage.Queues.Models.QueueMessage receivedMessage, Azure.Storage.Queues.Models.PeekedMessage peekedMessage, bool runSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(bool), default(System.Threading.CancellationToken)) { }
public Azure.Storage.Queues.Models.PeekedMessage PeekedMessage { get { throw null; } }
public Azure.Storage.Queues.QueueClient Queue { get { throw null; } }
public Azure.Storage.Queues.Models.QueueMessage ReceivedMessage { get { throw null; } }
}
public enum QueueMessageEncoding
{
None = 0,
Expand Down Expand Up @@ -369,6 +379,7 @@ public SpecializedQueueClientOptions(Azure.Storage.Queues.QueueClientOptions.Ser
}
public static partial class SpecializedQueueExtensions
{
public static Azure.Storage.Queues.QueueServiceClient GetParentQueueServiceClient(this Azure.Storage.Queues.QueueClient client) { throw null; }
Copy link
Contributor Author

@kasobol-msft kasobol-msft Jan 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to #16437 .
While working on downstream stuff I figured that this will be extremely useful if someone wants to jump from InvalidMessageEventArgs.QueueClient to some other queue (i.e. poison queue).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice. Should we add an extension method in WebJobs to get the poison queue for a QueueClient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. In webjobs PR though.

public static Azure.Storage.Queues.QueueClient WithClientSideEncryptionOptions(this Azure.Storage.Queues.QueueClient client, Azure.Storage.ClientSideEncryptionOptions clientSideEncryptionOptions) { throw null; }
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using Azure.Storage.Queues.Models;
using NUnit.Framework;

namespace Azure.Storage.Queues.Samples.Tests
{
public class Sample03_MessageEncoding : SampleTest
{
[Test]
public void ConfigureMessageEncodingAsync()
{
var connectionString = ConnectionString;
var queueName = "foo";
#region Snippet:Azure_Storage_Queues_Samples_Sample03_MessageEncoding_ConfigureMessageEncodingAsync

QueueClientOptions queueClientOptions = new QueueClientOptions()
{
MessageEncoding = QueueMessageEncoding.Base64
};

QueueClient queueClient = new QueueClient(connectionString, queueName, queueClientOptions);
#endregion
}

[Test]
public void MessageDecodingFailedHandlerAsync()
{
var connectionString = ConnectionString;
var queueName = "foo";
#region Snippet:Azure_Storage_Queues_Samples_Sample03_MessageEncoding_MessageDecodingFailedHandlerAsync

QueueClientOptions queueClientOptions = new QueueClientOptions()
{
MessageEncoding = QueueMessageEncoding.Base64
};

queueClientOptions.MessageDecodingFailed += async (QueueMessageDecodingFailedEventArgs args) =>
{
if (args.PeekedMessage != null)
{
Console.WriteLine($"Invalid message has been peeked, message id={args.PeekedMessage.MessageId} body={args.PeekedMessage.Body}");
}
else if (args.ReceivedMessage != null)
{
Console.WriteLine($"Invalid message has been received, message id={args.ReceivedMessage.MessageId} body={args.ReceivedMessage.Body}");

if (args.RunSynchronously)
{
args.Queue.DeleteMessage(args.ReceivedMessage.MessageId, args.ReceivedMessage.PopReceipt);
}
else
{
await args.Queue.DeleteMessageAsync(args.ReceivedMessage.MessageId, args.ReceivedMessage.PopReceipt);
}
}
};

QueueClient queueClient = new QueueClient(connectionString, queueName, queueClientOptions);
#endregion
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<Compile Include="$(AzureCoreSharedSources)HttpMessageSanitizer.cs" Link="Shared\Core\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureCoreSharedSources)DiagnosticScopeFactory.cs" Link="Shared\Core\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureCoreSharedSources)TaskExtensions.cs" Link="Shared\Core\%(RecursiveDir)\%(Filename)%(Extension)" />
<Compile Include="$(AzureCoreSharedSources)SyncAsyncEventHandlerExtensions.cs" Link="Shared\Core\%(RecursiveDir)\%(Filename)%(Extension)" />
</ItemGroup>
<ItemGroup>
<Compile Include="$(AzureStorageSharedSources)ClientsideEncryption\*.cs" Link="Shared\ClientsideEncryption\%(RecursiveDir)\%(Filename)%(Extension)" />
Expand Down
Loading