Skip to content

Commit

Permalink
Fixing regression at processor shutdown. (Azure#15262)
Browse files Browse the repository at this point in the history
  • Loading branch information
serkantkaraca authored and annelo-msft committed Feb 17, 2021
1 parent 76637f1 commit 4829e35
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,6 @@ public async Task CloseAsync(CloseReason reason)
ProcessorEventSource.Log.PartitionPumpCloseStart(this.Host.HostName, this.PartitionContext.PartitionId, reason.ToString());
this.PumpStatus = PartitionPumpStatus.Closing;

// Release lease as the first thing since closing receiver can take up to operation timeout.
// This helps other available hosts discover lease available sooner.
if (reason != CloseReason.LeaseLost)
{
// Since this pump is dead, release the lease.
try
{
await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false);
}
catch (Exception e)
{
// Log and ignore any failure since expired lease will be picked by another host.
this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, EventProcessorHostActionStrings.ReleasingLease);
}
}

try
{
this.cancellationTokenSource.Cancel();
Expand All @@ -137,6 +121,24 @@ public async Task CloseAsync(CloseReason reason)
// Report the failure to the general error handler instead.
this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, "Closing Event Processor");
}
finally
{
// Release the lease regardless of result from pump's close call above.
// Increase the chance of a healthy host grabbing the lease here.
if (reason != CloseReason.LeaseLost)
{
// Since this pump is dead, release the lease.
try
{
await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false);
}
catch (Exception e)
{
// Log and ignore any failure since expired lease will be picked by another host.
this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, EventProcessorHostActionStrings.ReleasingLease);
}
}
}

this.PumpStatus = PartitionPumpStatus.Closed;
ProcessorEventSource.Log.PartitionPumpCloseStop(this.Host.HostName, this.PartitionContext.PartitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ internal static class TestUtility
internal static string GetEntityConnectionString(string entityName) =>
new EventHubsConnectionStringBuilder(EventHubsConnectionString) { EntityPath = entityName }.ToString();

internal static async Task SendToEventhubAsync(EventHubClient ehClient, int numberOfMessages = 1)
{
TestUtility.Log($"Starting to send {numberOfMessages} messages.");

for (int i = 0; i < numberOfMessages; i++)
{
await ehClient.SendAsync(new EventData(Encoding.UTF8.GetBytes("Hello Event Hubs!")));
}

TestUtility.Log("Sends done.");
}

internal static Task SendToPartitionAsync(EventHubClient ehClient, string partitionId, string messageBody, int numberOfMessages = 1)
{
return SendToPartitionAsync(ehClient, partitionId, new EventData(Encoding.UTF8.GetBytes(messageBody)), numberOfMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,95 @@ public async Task DontCheckpointStartOfStream()
Assert.False(runResult.ReceivedEvents.Any(kvp => kvp.Value.Count != 1), "Didn't receive exactly 1 event.");
}
}

[Fact]
[LiveTest]
[DisplayTestMethodName]
public async Task CheckpointOnShutdown()
{
int checkpointFailureCount = 0;
int checkpointSuccessCount = 0;
int receivedEventCount = 0;

await using (var scope = await EventHubScope.CreateAsync(1))
{
var connectionString = TestUtility.BuildEventHubsConnectionString(scope.EventHubName);

// Use a randomly generated container name so that initial offset provider will be respected.
var eventProcessorHost = new EventProcessorHost(
string.Empty,
PartitionReceiver.DefaultConsumerGroupName,
connectionString,
TestUtility.StorageConnectionString,
Guid.NewGuid().ToString());

var processorOptions = new EventProcessorOptions
{
ReceiveTimeout = TimeSpan.FromSeconds(10),
InitialOffsetProvider = partitionId => EventPosition.FromEnd(),
InvokeProcessorAfterReceiveTimeout = false
};

var processorFactory = new TestEventProcessorFactory();
processorFactory.OnCreateProcessor += (f, createArgs) =>
{
var processor = createArgs.Item2;
string partitionId = createArgs.Item1.PartitionId;
string hostName = createArgs.Item1.Owner;
processor.OnOpen += (_, partitionContext) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor opened");
processor.OnClose += (_, closeArgs) =>
{
TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor closing: {closeArgs.Item2}");
if (closeArgs.Item2 != CloseReason.Shutdown)
{
return;
}

try
{
// Checkpoint at close.
closeArgs.Item1.CheckpointAsync().GetAwaiter().GetResult();
Interlocked.Increment(ref checkpointSuccessCount);
}
catch (Exception)
{
Interlocked.Increment(ref checkpointFailureCount);
throw;
}
};

processor.OnProcessError += (_, errorArgs) => TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor process error {errorArgs.Item2.Message}");
processor.OnProcessEvents += (_, eventsArgs) =>
{
int eventCount = eventsArgs.Item2.events != null ? eventsArgs.Item2.events.Count() : 0;
if (eventCount > 0)
{
TestUtility.Log($"{hostName} > Partition {partitionId} TestEventProcessor processing {eventCount} event(s)");
Interlocked.Increment(ref receivedEventCount);
}
};
};

await eventProcessorHost.RegisterEventProcessorFactoryAsync(processorFactory, processorOptions);

// Now send some number of messages.
var ehClient = EventHubClient.CreateFromConnectionString(connectionString);
await TestUtility.SendToEventhubAsync(ehClient, 10);
await Task.Delay(TimeSpan.FromSeconds(30));

// Now unregister processor
await eventProcessorHost.UnregisterEventProcessorAsync();

// At least one message received.
Assert.True(receivedEventCount > 0, "Didn't receive any events.");

// At least one successful checkpoint call.
Assert.True(checkpointSuccessCount > 0, $"checkpointSuccessCount == 0, checkpointFailureCount={checkpointFailureCount}");

// No failed checkpoint call.
Assert.True(checkpointFailureCount == 0, $"checkpointFailureCount == {checkpointFailureCount}");
}
}
}
}

0 comments on commit 4829e35

Please sign in to comment.