Skip to content

Commit

Permalink
[Event Hubs] Update checkpointing behavior to prioritize sequence num…
Browse files Browse the repository at this point in the history
…ber (Azure#41360)

* first set of changes

* next set of changes

* test fixes

* adjust AmqpFilter to prefer seq number

* update changelog

* update CHANGELOG

* comment tweaks

* InMemoryCheckpointStore fix

* Apply suggestions from code review

Co-authored-by: Jesse Squire <[email protected]>

* feedback

* revert AmqpFilterTests.cs

* AmqpFilter fix

* AmqpFilter test fix

---------

Co-authored-by: Jesse Squire <[email protected]>
  • Loading branch information
m-redding and jsquire authored Jan 19, 2024
1 parent e29cc38 commit 318d362
Show file tree
Hide file tree
Showing 15 changed files with 72 additions and 77 deletions.
3 changes: 3 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs.Processor/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 5.11.0-beta.1 (Unreleased)

### Features Added
- Added a `CheckpointPosition` struct to use when updating a checkpoint. The specified position indicates that an event processor should begin reading from the next event. Added new `UpdateCheckpointAsync` overloads to `EventProcessorClient` and `BlobCheckpointStore` that accept the `CheckpointPosition` struct instead of individual values for offset and sequence number.

### Breaking Changes

Expand All @@ -14,6 +15,8 @@
- To
- ReplyTo

- The base implementations of both `UpdateCheckpointAsync` method overloads in `PluggableCheckpointStoreEventProcessor<TPartition>` and `EventProcessor<TPartition>` now choose sequence number over offset when writing a checkpoint and both values are provided. Previously, writing a checkpoint prioritized offset over sequence number. There is no difference in behavior for normal usage scenarios.

### Bugs Fixed

- Fixed a race condition that could lead to a synchronization primitive being double-released if `IsRunning` was called concurrently while starting or stopping the processor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public EventProcessorClient(BlobContainerClient checkpointStore,
/// <remarks>
/// <para>The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see cref="EventProcessorClient" />
/// does not assume the ability to manage the storage account and is safe to run with only read/write permission for blobs in the container. It is
/// recommended that this container be unique to the Event Hub and consumer group used by the processor and that it not conain other blobs.</para>
/// recommended that this container be unique to the Event Hub and consumer group used by the processor and that it not contain other blobs.</para>
///
/// <para>If the connection string is copied from the Event Hub itself, it will contain the name of the desired Event Hub,
/// and can be used directly without passing the <paramref name="eventHubName" />. The name of the Event Hub should be
Expand Down Expand Up @@ -496,7 +496,7 @@ public EventProcessorClient(BlobContainerClient checkpointStore,
/// <remarks>
/// The container associated with the <paramref name="checkpointStore" /> is expected to exist; the <see cref="EventProcessorClient" />
/// does not assume the ability to manage the storage account and is safe to run with only read/write permission for blobs in the container. It is
/// recommended that this container be unique to the Event Hub and consumer group used by the processor and that it not conain other blobs.
/// recommended that this container be unique to the Event Hub and consumer group used by the processor and that it not contain other blobs.
/// </remarks>
///
public EventProcessorClient(BlobContainerClient checkpointStore,
Expand Down Expand Up @@ -881,8 +881,8 @@ protected override async Task ValidateProcessingPreconditions(CancellationToken
/// </summary>
///
/// <param name="partitionId">The identifier of the partition the checkpoint is for.</param>
/// <param name="offset">The offset to associate with the checkpoint, indicating that a processor should begin reading form the next event in the stream.</param>
/// <param name="sequenceNumber">An optional sequence number to associate with the checkpoint, intended as informational metadata. The <paramref name="offset" /> will be used for positioning when events are read.</param>
/// <param name="offset">The offset to associate with the checkpoint, intended as informational metadata. This will only be used for positioning if there is no value provided for <paramref name="sequenceNumber"/>.</param>
/// <param name="sequenceNumber">The sequence number to associate with the checkpoint, indicating that a processor should begin reading from the next event in the stream.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal a request to cancel the operation.</param>
///
[EditorBrowsable(EditorBrowsableState.Never)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ public override Task<EventProcessorCheckpoint> GetCheckpointAsync(string fullyQu
/// <param name="eventHubName">The name of the specific Event Hub the ownership are associated with, relative to the Event Hubs namespace that contains it.</param>
/// <param name="consumerGroup">The name of the consumer group the checkpoint is associated with.</param>
/// <param name="partitionId">The identifier of the partition the checkpoint is for.</param>
/// <param name="offset">The offset to associate with the checkpoint, indicating that a processor should begin reading form the next event in the stream.</param>
/// <param name="sequenceNumber">An optional sequence number to associate with the checkpoint, intended as informational metadata. The <paramref name="offset" /> will be used for positioning when events are read.</param>
/// <param name="offset">The offset to associate with the checkpoint, intended as informational metadata. This will only be used for positioning if there is no value provided for <paramref name="sequenceNumber"/>.</param>
/// <param name="sequenceNumber">The sequence number to associate with the checkpoint, indicating that a processor should begin reading from the next event in the stream.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> instance to signal a request to cancel the operation.</param>
///
[EditorBrowsable(EditorBrowsableState.Never)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,8 +951,6 @@ public async Task ProcessorRaisesProcessEventHandlerWhenEventsAreRead()
Assert.That(capturedEventArgs[index].CancellationToken, Is.EqualTo(cancellationSource.Token), $"The cancellation token should have been propagated at index { index }.");
Assert.That(async () => await capturedEventArgs[index].UpdateCheckpointAsync(), Throws.Nothing, $"A checkpoint should be allowed for the event at index { index }.");

var expectedStart = EventPosition.FromOffset(capturedEventArgs[index].Data.Offset);

mockCheckpointStore
.Verify(storage => storage.UpdateCheckpointAsync(
processorClient.FullyQualifiedNamespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,31 +444,31 @@ private EventProcessorCheckpoint CreateCheckpoint(string fullyQualifiedNamespace
var sequenceNumber = default(long?);
var clientIdentifier = default(string);

if (metadata.TryGetValue(BlobMetadataKey.Offset, out var str) && long.TryParse(str, NumberStyles.Integer, CultureInfo.InvariantCulture, out var result))
if (metadata.TryGetValue(BlobMetadataKey.SequenceNumber, out var sequenceStr) && long.TryParse(sequenceStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out var sequenceResult))
{
offset = result;
if (offset != long.MinValue) // This means no value was passed.
sequenceNumber = sequenceResult;
if (sequenceNumber != long.MinValue) // If the sequence number is not equal to the default (long.MinValue), then a value was passed in.
{
startingPosition = EventPosition.FromOffset(result, false);
startingPosition = EventPosition.FromSequenceNumber(sequenceResult, false);
}
}
if (metadata.TryGetValue(BlobMetadataKey.SequenceNumber, out str) && long.TryParse(str, NumberStyles.Integer, CultureInfo.InvariantCulture, out result))
if (metadata.TryGetValue(BlobMetadataKey.Offset, out var offsetStr) && long.TryParse(offsetStr, NumberStyles.Integer, CultureInfo.InvariantCulture, out var offsetResult))
{
sequenceNumber = result;
if (sequenceNumber != long.MinValue) // This means no value was passed.
offset = offsetResult;
if (offset != long.MinValue) // If the offset is not equal to the default (long.MinValue), then a value was passed in.
{
startingPosition ??= EventPosition.FromSequenceNumber(result, false);
startingPosition ??= EventPosition.FromOffset(offsetResult, false);
}
}
if (metadata.TryGetValue(BlobMetadataKey.ClientIdentifier, out str))
if (metadata.TryGetValue(BlobMetadataKey.ClientIdentifier, out var idStr))
{
clientIdentifier = str;
clientIdentifier = idStr;
}

// If either the offset or the sequence number was not populated,
// this is not a valid checkpoint.
// If either the offset or the sequence number was not populated,
// this is not a valid checkpoint.

if (!startingPosition.HasValue)
if (!startingPosition.HasValue)
{
InvalidCheckpointFound(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup);
return null;
Expand Down Expand Up @@ -519,9 +519,13 @@ private async Task<EventProcessorCheckpoint> CreateLegacyCheckpoint(string fully
out long? offset,
out long? sequenceNumber))
{
if (offset.HasValue)
if (sequenceNumber.HasValue && sequenceNumber.Value != long.MinValue)
{
startingPosition = EventPosition.FromSequenceNumber(sequenceNumber.Value, false);
}
else if (offset.HasValue)
{
startingPosition = EventPosition.FromOffset(offset.Value, false);
startingPosition ??= EventPosition.FromOffset(offset.Value, false);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ EventProcessorCheckpoint TransformCheckpointData(CheckpointData data) =>
PartitionId = data.PartitionId,
StartingPosition = EventPosition.FromOffset(data.StartingPosition.Offset.Value, false),
ClientIdentifier = data.ClientIdentifier,
LastModified = DateTimeOffset.Parse(data.LastModified)
LastModified = (DateTimeOffset.TryParse(data.LastModified, out var lastModified) ? lastModified : default)
};

lock (_checkpointLock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ public async Task CheckpointUpdateCreatesTheBlobOnFirstCall()
storedCheckpoint = await checkpointStore.GetCheckpointAsync(checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, checkpoint.PartitionId, default);

Assert.That(storedCheckpoint, Is.Not.Null);
Assert.That(storedCheckpoint.StartingPosition, Is.EqualTo(EventPosition.FromOffset(mockEvent.Offset, false)));
Assert.That(storedCheckpoint.StartingPosition, Is.EqualTo(EventPosition.FromSequenceNumber(mockEvent.SequenceNumber, false)));
Assert.That(storedCheckpoint.ClientIdentifier, Is.EqualTo("Id"));

// There should be a single blob in the container.
Expand Down Expand Up @@ -826,7 +826,7 @@ public async Task CheckpointUpdatesAnExistingBlob()

Assert.That(blobCount, Is.EqualTo(1));
Assert.That(storedCheckpoint, Is.Not.Null);
Assert.That(storedCheckpoint.StartingPosition, Is.EqualTo(EventPosition.FromOffset(mockEvent.Offset, false)));
Assert.That(storedCheckpoint.StartingPosition, Is.EqualTo(EventPosition.FromSequenceNumber(mockEvent.SequenceNumber, false)));
Assert.That(storedCheckpoint.ClientIdentifier, Is.EqualTo("Id"));

// Calling update again should update the existing checkpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,11 @@ public async Task GetCheckpointUsesSequenceNumberAsTheStartingPositionWhenNoOffs
/// </summary>
///
[Test]
public async Task GetCheckpointPrefersOffsetNumberAsTheStartingPosition()
public async Task GetCheckpointPrefersSequenceNumberAsTheStartingPosition()
{
var expectedOffset = 13;
var expectedSequenceNumber = 7777;
var expectedStartingPosition = EventPosition.FromOffset(expectedOffset, false);
var offset = 13;
var sequenceNumber = 7777;
var expectedStartingPosition = EventPosition.FromSequenceNumber(sequenceNumber, false);
var partition = Guid.NewGuid().ToString();

var blobList = new List<BlobItem>
Expand All @@ -444,8 +444,8 @@ public async Task GetCheckpointPrefersOffsetNumberAsTheStartingPosition()
new Dictionary<string, string>
{
{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()},
{BlobMetadataKey.Offset, expectedOffset.ToString()},
{BlobMetadataKey.SequenceNumber, expectedSequenceNumber.ToString()}
{BlobMetadataKey.Offset, offset.ToString()},
{BlobMetadataKey.SequenceNumber, sequenceNumber.ToString()}
})
};
var target = new BlobCheckpointStoreInternal(new MockBlobContainerClient() { Blobs = blobList });
Expand Down Expand Up @@ -494,7 +494,7 @@ public async Task GetCheckpointConsidersDataInvalidWithNoOffsetOrSequenceNumber(
/// </summary>
///
[Test]
public async Task GetCheckpointUsesOffsetAsTheStartingPositionWhenPresentInLegacyCheckpoint()
public async Task GetCheckpointUsesSequenceNumberAsTheStartingPositionWhenPresentInLegacyCheckpoint()
{
var blobList = new List<BlobItem>
{
Expand All @@ -521,7 +521,7 @@ public async Task GetCheckpointUsesOffsetAsTheStartingPositionWhenPresentInLegac
var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None);

Assert.That(checkpoint, Is.Not.Null, "A checkpoints should have been returned.");
Assert.That(checkpoint.StartingPosition, Is.EqualTo(EventPosition.FromOffset(13, false)));
Assert.That(checkpoint.StartingPosition, Is.EqualTo(EventPosition.FromSequenceNumber(960180, false)));
Assert.That(checkpoint.PartitionId, Is.EqualTo("0"));
}

Expand Down Expand Up @@ -1313,7 +1313,7 @@ public async Task GetCheckpointPreferredNewCheckpointOverLegacy()
var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None);

Assert.That(checkpoint, Is.Not.Null, "A single checkpoint should have been returned.");
Assert.That(checkpoint.StartingPosition, Is.EqualTo(EventPosition.FromOffset(14, false)));
Assert.That(checkpoint.StartingPosition, Is.EqualTo(EventPosition.FromSequenceNumber(960182, false)));
Assert.That(checkpoint.PartitionId, Is.EqualTo("0"));

Assert.That(checkpoint, Is.InstanceOf<BlobCheckpointStoreInternal.BlobStorageCheckpoint>(), "Checkpoint instance was not the expected type.");
Expand Down Expand Up @@ -1355,7 +1355,7 @@ public async Task GetCheckpointsUsesOffsetAsTheStartingPositionWhenPresentInLega
var checkpoint = await target.GetCheckpointAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, "0", CancellationToken.None);

Assert.That(checkpoint, Is.Not.Null, "A set of checkpoints should have been returned.");
Assert.That(checkpoint.StartingPosition, Is.EqualTo(EventPosition.FromOffset(13, false)));
Assert.That(checkpoint.StartingPosition, Is.EqualTo(EventPosition.FromSequenceNumber(960180, false)));
Assert.That(checkpoint.PartitionId, Is.EqualTo("0"));
}

Expand Down
6 changes: 5 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@

### Features Added

- Added a `CheckpointPosition` struct to `Azure.Messaging.EventHubs.Processor` to use when updating a checkpoint. The specified position indicates that an event processor should begin reading from the next event. Added new `UpdateCheckpointAsync` overloads to `CheckpointStore`, `PluggableCheckpointStoreEventProcessor<TPartition` and `EventProcessor<TPartition>` that accept the `CheckpointPosition` struct instead of individual values for offset and sequence number.

### Breaking Changes

- The type of several existing values in the `EventData.SystemProperties` collection have been changed so that they are properly represented as .NET string types. Previously, the underlying AMQP types were unintentionally returned, forcing callers to call `ToString()` to read the value.

This is a behavioral breaking change that will impacts only those callers who were explicitly casting system property values to `AmqpAddress` or `AmqpMessageId` before calling `ToString()`. The affected system properties are:
- MessageId
- CorelationId
- CorrelationId
- To
- ReplyTo

- The base implementations of both `UpdateCheckpointAsync` method overloads in `PluggableCheckpointStoreEventProcessor<TPartition>` and `EventProcessor<TPartition>` now choose sequence number over offset when writing a checkpoint and both values are provided. Previously, writing a checkpoint prioritized offset over sequence number. There is no behavioral change for those using the official checkpoint store implementations.

### Bugs Fixed

- Load balancing is no longer blocked when event processing for a lost partition does not honor the cancellation token. Previously, long-running processing could cause delays in load balancing that resulted in ownership not being renewed for all partitions.
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public static string BuildFilterExpression(EventPosition eventPosition)
return $"{ OffsetName } { (eventPosition.IsInclusive ? ">=" : ">") } { eventPosition.Offset }";
}

if (eventPosition.SequenceNumber.HasValue)
if (!string.IsNullOrEmpty(eventPosition.SequenceNumber))
{
return $"{ SequenceNumberName } { (eventPosition.IsInclusive ? ">=" : ">") } { eventPosition.SequenceNumber.Value }";
return $"{ SequenceNumberName } { (eventPosition.IsInclusive ? ">=" : ">") } { eventPosition.SequenceNumber }";
}

if (eventPosition.EnqueuedTime.HasValue)
Expand Down
Loading

0 comments on commit 318d362

Please sign in to comment.