diff --git a/sdk/eventhub/event-hubs/samples/eventProcessor.ts b/sdk/eventhub/event-hubs/samples/eventProcessor.ts index 9015fe7cf592..5b1ae4c4c328 100644 --- a/sdk/eventhub/event-hubs/samples/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/samples/eventProcessor.ts @@ -1,18 +1,25 @@ import { EventHubClient, - EventData, + ReceivedEventData, EventPosition, delay, EventProcessor, - PartitionContext + PartitionContext, + InMemoryPartitionManager, + CheckpointManager } from "@azure/event-hubs"; class SimplePartitionProcessor { private _context: PartitionContext; - constructor(context: PartitionContext) { + private _checkpointManager: CheckpointManager; + constructor(context: PartitionContext, checkpointManager: CheckpointManager) { this._context = context; + this._checkpointManager = checkpointManager; } - async processEvents(events: EventData[]) { + async processEvents(events: ReceivedEventData[]) { + if(events.length === 0){ + return; + } for (const event of events) { console.log( "Received event: '%s' from partition: '%s' and consumer group: '%s'", @@ -20,6 +27,19 @@ class SimplePartitionProcessor { this._context.partitionId, this._context.consumerGroupName ); + try { + // checkpoint using the last event in the batch + await this._checkpointManager.updateCheckpoint(events[events.length - 1]); + console.log( + "Successfully checkpointed event: '%s' from partition: '%s'", + events[events.length - 1].body, + this._context.partitionId + ); + } catch (err) { + console.log( + `Encountered an error while checkpointing on ${this._context.partitionId}: ${err.message}` + ); + } } } @@ -43,15 +63,15 @@ const eventHubName = ""; async function main() { const client = new EventHubClient(connectionString, eventHubName); - const eventProcessorFactory = (context: PartitionContext) => { - return new SimplePartitionProcessor(context); + const eventProcessorFactory = (context: PartitionContext, checkpoint: CheckpointManager) => { + return new SimplePartitionProcessor(context, checkpoint); }; const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, eventProcessorFactory, - "partitionManager" as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.earliest(), maxBatchSize: 10, @@ -59,8 +79,8 @@ async function main() { } ); await processor.start(); - // after 2 seconds, stop processing - await delay(2000); + // after 5 seconds, stop processing + await delay(5000); await processor.stop(); await client.close(); diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 558f78bbfe0a..92973113749f 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -242,9 +242,20 @@ export class EventProcessor { ); // eventually this will 1st check if the existing PartitionOwnership has a position - const eventPosition = + let eventPosition = this._processorOptions.initialEventPosition || EventPosition.earliest(); + const partitionOwnerships = await this._partitionManager.listOwnership( + this._eventHubClient.eventHubName, + this._consumerGroupName + ); + for (const ownership of partitionOwnerships) { + if (ownership.partitionId === partitionId && ownership.sequenceNumber) { + eventPosition = EventPosition.fromSequenceNumber(ownership.sequenceNumber); + break; + } + } + tasks.push( this._pumpManager.createPump( this._eventHubClient, diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 390b5a698e9d..c3f703bbb511 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -18,7 +18,8 @@ import { PartitionOwnership, Checkpoint, PartitionProcessorFactory, - CloseReason + CloseReason, + ReceivedEventData } from "../src"; import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; import { generate_uuid } from "rhea-promise"; @@ -81,7 +82,7 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName, client, factory, - undefined as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) } @@ -139,7 +140,7 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName, client, factory, - undefined as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) } @@ -187,7 +188,7 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName, client, factory, - undefined as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) } @@ -283,7 +284,7 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName, client, factory, - undefined as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) } @@ -346,7 +347,7 @@ describe("Event Processor", function(): void { EventHubClient.defaultConsumerGroupName, client, factory, - undefined as any, + new InMemoryPartitionManager(), { initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) } @@ -405,28 +406,26 @@ describe("Event Processor", function(): void { const eventProcessorFactory = (context: PartitionContext) => { return new SimpleEventProcessor(); }; - const partitionInfo = await client.getPartitionProperties("0"); + const processor = new EventProcessor( EventHubClient.defaultConsumerGroupName, client, eventProcessorFactory, - "partitionManager" as any, + new InMemoryPartitionManager(), { - initialEventPosition: EventPosition.fromSequenceNumber( - partitionInfo.lastEnqueuedSequenceNumber - ), + initialEventPosition:EventPosition.fromEnqueuedTime(new Date()), maxBatchSize: 1, maxWaitTimeInSeconds: 5 } ); const producer = client.createProducer({ partitionId: "0" }); await producer.send({ body: "Hello world!!!" }); + await producer.close(); await processor.start(); // after 2 seconds, stop processing await delay(2000); await processor.stop(); - await producer.close(); isinitializeCalled.should.equal(true); receivedEvents.length.should.equal(1); receivedEvents[0].body.should.equal("Hello world!!!"); @@ -458,7 +457,6 @@ describe("Event Processor", function(): void { partitionOwnership2 ]); partitionOwnership.length.should.equals(2); - const ownershiplist = await inMemoryPartitionManager.listOwnership( "myEventHub", EventHubClient.defaultConsumerGroupName @@ -484,5 +482,116 @@ describe("Event Processor", function(): void { partitionOwnershipList[0].sequenceNumber!.should.equals(checkpoint.sequenceNumber); partitionOwnershipList[0].offset!.should.equals(checkpoint.offset); }); + + it("should receive events from the checkpoint", async function(): Promise { + const partitionIds = await client.getPartitionIds(); + + // ensure we have at least 2 partitions + partitionIds.length.should.gte(2); + + let checkpointMap = new Map(); + partitionIds.forEach((id) => checkpointMap.set(id, [])); + let didError = false; + + let partionCount: { [x: string]: number } = {}; + const factory: PartitionProcessorFactory = (context, checkpointManager) => { + return { + async processEvents(events: ReceivedEventData[]) { + !partionCount[context.partitionId] + ? (partionCount[context.partitionId] = 1) + : partionCount[context.partitionId]++; + const existingEvents = checkpointMap.get(context.partitionId)!; + for (const event of events) { + debug("Received event: '%s' from partition: '%s'", event.body, context.partitionId); + if (partionCount[context.partitionId] <= 50) { + await checkpointManager.updateCheckpoint(event); + existingEvents.push(event); + } + } + }, + async processError() { + didError = true; + } + }; + }; + + const inMemoryPartitionManager = new InMemoryPartitionManager(); + const processor1 = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + factory, + inMemoryPartitionManager, + { + initialEventPosition: EventPosition.fromEnqueuedTime(new Date()) + } + ); + + // start first processor + processor1.start(); + + // create messages + const expectedMessagePrefix = "EventProcessor test - checkpoint - "; + const events: EventData[] = []; + + for (const partitionId of partitionIds) { + const producer = client.createProducer({ partitionId }); + for (let index = 1; index <= 100; index++) { + events.push({ body: `${expectedMessagePrefix} ${index} ${partitionId}` }); + } + await producer.send(events); + await producer.close(); + } + + // set a delay to give a consumers a chance to receive a message + await delay(5000); + + // shutdown the first processor + await processor1.stop(); + + const lastEventsReceivedFromProcessor1: ReceivedEventData[] = []; + let index = 0; + + for (const partitionId of partitionIds) { + const receivedEvents = checkpointMap.get(partitionId)!; + lastEventsReceivedFromProcessor1[index++] = receivedEvents[receivedEvents.length - 1]; + } + + checkpointMap = new Map(); + partitionIds.forEach((id) => checkpointMap.set(id, [])); + partionCount = {}; + + const processor2 = new EventProcessor( + EventHubClient.defaultConsumerGroupName, + client, + factory, + inMemoryPartitionManager + ); + // start second processor + processor2.start(); + + // set a delay to give a consumers a chance to receive a message + await delay(5000); + + // shutdown the second processor + await processor2.stop(); + + index = 0; + const firstEventsReceivedFromProcessor2: ReceivedEventData[] = []; + for (const partitionId of partitionIds) { + const receivedEvents = checkpointMap.get(partitionId)!; + firstEventsReceivedFromProcessor2[index++] = receivedEvents[0]; + } + + didError.should.be.false; + index = 0; + // validate correct events captured for each partition using checkpoint + for (const partitionId of partitionIds) { + debug(`Validate events for partition: ${partitionId}`); + lastEventsReceivedFromProcessor1[index].sequenceNumber.should.equal( + firstEventsReceivedFromProcessor2[index].sequenceNumber - 1 + ); + index++; + } + }); }); }).timeout(90000);