From 4f7925e96ec987063a91a58ecaf72e870c060a29 Mon Sep 17 00:00:00 2001 From: Alexey Rodionov Date: Fri, 11 Feb 2022 14:55:14 -0800 Subject: [PATCH] Stop Listener when disposed (#104) --- release_notes.md | 8 ++++-- .../Listeners/EventHubListener.cs | 28 ++++++++++++++++--- .../WebJobs.Extensions.EventHubs.csproj | 2 +- .../EventHubListenerTests.cs | 27 +++++++++++++++++- 4 files changed, 57 insertions(+), 8 deletions(-) diff --git a/release_notes.md b/release_notes.md index 16f9c61..e74a529 100644 --- a/release_notes.md +++ b/release_notes.md @@ -5,5 +5,9 @@ #### Version 4.2.0 - User configurable initial offset support [#79](https://github.com/Azure/azure-functions-eventhubs-extension/pull/79) -**Release sprint:** Sprint 87 -[ [bugs](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+87%22+label%3Abug+is%3Aclosed) | [features](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+87%22+label%3Afeature+is%3Aclosed) ] +#### Version 4.3.0 +- Adding explicit reference Microsoft.Azure.Amqp 2.4.11 [#99](https://github.com/Azure/azure-functions-eventhubs-extension/pull/99) + +#### Version 4.3.1 +- Stop Listener when disposed [#105](https://github.com/Azure/azure-functions-eventhubs-extension/pull/105) +- Add listener details [#105](https://github.com/Azure/azure-functions-eventhubs-extension/pull/105) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Listeners/EventHubListener.cs b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Listeners/EventHubListener.cs index 767b341..c04deb7 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Listeners/EventHubListener.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Listeners/EventHubListener.cs @@ -34,6 +34,8 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca private readonly bool _singleDispatch; private readonly EventHubOptions _options; private readonly ILogger _logger; + private readonly SemaphoreSlim _stopSemaphoreSlim = new SemaphoreSlim(1, 1); + private readonly string _details; private bool _started; private Lazy _scaleMonitor; @@ -62,6 +64,8 @@ public EventHubListener( _options = options; _logger = logger; _scaleMonitor = new Lazy(() => new EventHubsScaleMonitor(_functionId, _eventHubName, _consumerGroup, _connectionString, _storageConnectionString, _logger, blobContainer)); + _details = $"'namespace='{eventProcessorHost?.EndpointAddress}', eventHub='{eventProcessorHost?.EventHubPath}', " + + $"consumerGroup='{eventProcessorHost?.ConsumerGroupName}', functionId='{functionId}', singleDispatch='{singleDispatch}'"; } void IListener.Cancel() @@ -71,21 +75,37 @@ void IListener.Cancel() void IDisposable.Dispose() { + StopAsync(CancellationToken.None).Wait(); } public async Task StartAsync(CancellationToken cancellationToken) { await _eventProcessorHost.RegisterEventProcessorFactoryAsync(this, _options.EventProcessorOptions); _started = true; + + _logger.LogDebug($"EventHub listener started ({_details})"); } public async Task StopAsync(CancellationToken cancellationToken) { - if (_started) + await _stopSemaphoreSlim.WaitAsync(); + try + { + if (_started) + { + await _eventProcessorHost.UnregisterEventProcessorAsync(); + _logger.LogDebug($"EventHub listener stopped ({_details})"); + } + else + { + _logger.LogDebug($"EventHub listener is already stopped ({_details})"); + } + _started = false; + } + finally { - await _eventProcessorHost.UnregisterEventProcessorAsync(); + _stopSemaphoreSlim.Release(); } - _started = false; } IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext context) @@ -132,7 +152,7 @@ public Task CloseAsync(PartitionContext context, CloseReason reason) { // signal cancellation for any in progress executions _cts.Cancel(); - + _logger.LogDebug(GetOperationDetails(context, $"CloseAsync, {reason.ToString()}")); return Task.CompletedTask; } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/WebJobs.Extensions.EventHubs.csproj b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/WebJobs.Extensions.EventHubs.csproj index d592548..9b011b2 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/WebJobs.Extensions.EventHubs.csproj +++ b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/WebJobs.Extensions.EventHubs.csproj @@ -6,7 +6,7 @@ Microsoft.Azure.WebJobs.EventHubs Microsoft.Azure.WebJobs.Extensions.EventHubs Microsoft Azure WebJobs SDK EventHubs Extension - 4.3.0 + 4.3.1 N/A $(Version) Commit hash: $(CommitHash) Microsoft diff --git a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubListenerTests.cs b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubListenerTests.cs index 4075f98..c14b711 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubListenerTests.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubListenerTests.cs @@ -11,6 +11,7 @@ using Microsoft.Azure.EventHubs.Processor; using Microsoft.Azure.WebJobs.EventHubs.Listeners; using Microsoft.Azure.WebJobs.Host.Executors; +using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Extensions.Logging; @@ -198,7 +199,6 @@ public void GetMonitor_ReturnsExpectedValue() var functionId = "FunctionId"; var eventHubName = "EventHubName"; var consumerGroup = "ConsumerGroup"; - var storageUri = new Uri("https://eventhubsteststorageaccount.blob.core.windows.net/"); var testLogger = new TestLogger("Test"); var listener = new EventHubListener( functionId, @@ -222,5 +222,30 @@ public void GetMonitor_ReturnsExpectedValue() Assert.Same(scaleMonitor, scaleMonitor2); } + + [Fact] + public void Dispose_Calls_StopAsync() + { + string functionId = "FunctionId"; + string eventHubName = "EventHubName"; + string consumerGroup = "ConsumerGroup"; + var storageUri = new Uri("https://eventhubsteststorageaccount.blob.core.windows.net/"); + var testLogger = new TestLogger("Test"); + EventHubListener listener = new EventHubListener( + functionId, + eventHubName, + consumerGroup, + "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=", + "DefaultEndpointsProtocol=https;AccountName=EventHubScaleMonitorFakeTestAccount;AccountKey=ABCDEFG;EndpointSuffix=core.windows.net", + new Mock(MockBehavior.Strict).Object, + null, + false, + new EventHubOptions(), + testLogger, + new Mock(MockBehavior.Strict, new Uri("https://eventhubsteststorageaccount.blob.core.windows.net/azure-webjobs-eventhub")).Object); + + (listener as IDisposable).Dispose(); + Assert.Single(testLogger.GetLogMessages().Where(x => x.FormattedMessage.StartsWith("EventHub listener is already stopped ("))); + } } }