Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Add test to receive events from the checkpoint and update EPH sample #4621

Merged
merged 4 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions sdk/eventhub/event-hubs/samples/eventProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,42 @@
import {
EventHubClient,
EventData,
ReceivedEventData,
EventPosition,
delay,
EventProcessor,
PartitionContext
PartitionContext,
InMemoryPartitionManager,
CheckpointManager
} from "@azure/event-hubs";

class SimplePartitionProcessor {
private _context: PartitionContext;
constructor(context: PartitionContext) {
private _checkpointManager: CheckpointManager;
constructor(context: PartitionContext, checkpointManager: CheckpointManager) {
this._context = context;
this._checkpointManager = checkpointManager;
}
async processEvents(events: EventData[]) {
async processEvents(events: ReceivedEventData[]) {
for (const event of events) {
ShivangiReja marked this conversation as resolved.
Show resolved Hide resolved
console.log(
"Received event: '%s' from partition: '%s' and consumer group: '%s'",
event.body,
this._context.partitionId,
this._context.consumerGroupName
);
try {
// checkpoint using the last event in the batch
await this._checkpointManager.updateCheckpoint(events[events.length - 1]);
console.log(
"Successfully checkpointed event: '%s' from partition: '%s'",
events[events.length - 1].body,
this._context.partitionId
);
} catch (err) {
console.log(
`Encountered an error while checkpointing on ${this._context.partitionId}: ${err.message}`
);
}
}
}

Expand All @@ -43,24 +60,24 @@ const eventHubName = "";
async function main() {
const client = new EventHubClient(connectionString, eventHubName);

const eventProcessorFactory = (context: PartitionContext) => {
return new SimplePartitionProcessor(context);
const eventProcessorFactory = (context: PartitionContext, checkpoint: CheckpointManager) => {
return new SimplePartitionProcessor(context, checkpoint);
};

const processor = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
eventProcessorFactory,
"partitionManager" as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.earliest(),
maxBatchSize: 10,
maxWaitTimeInSeconds: 20
}
);
await processor.start();
// after 2 seconds, stop processing
await delay(2000);
// after 5 seconds, stop processing
await delay(5000);

await processor.stop();
await client.close();
Expand Down
12 changes: 11 additions & 1 deletion sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,19 @@ export class EventProcessor {
);

// eventually this will 1st check if the existing PartitionOwnership has a position
const eventPosition =
let eventPosition =
this._processorOptions.initialEventPosition || EventPosition.earliest();

const partitionOwnerships = await this._partitionManager.listOwnership(
this._eventHubClient.eventHubName,
this._consumerGroupName
);
for (const ownership of partitionOwnerships) {
if (ownership.partitionId === partitionId && ownership.sequenceNumber) {
eventPosition = EventPosition.fromSequenceNumber(ownership.sequenceNumber);
ShivangiReja marked this conversation as resolved.
Show resolved Hide resolved
}
}

tasks.push(
this._pumpManager.createPump(
this._eventHubClient,
Expand Down
125 changes: 117 additions & 8 deletions sdk/eventhub/event-hubs/test/eventProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import {
PartitionOwnership,
Checkpoint,
PartitionProcessorFactory,
CloseReason
CloseReason,
ReceivedEventData
} from "../src";
import { EnvVarKeys, getEnvVars } from "./utils/testUtils";
import { generate_uuid } from "rhea-promise";
Expand Down Expand Up @@ -81,7 +82,7 @@ describe("Event Processor", function(): void {
EventHubClient.defaultConsumerGroupName,
client,
factory,
undefined as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
Expand Down Expand Up @@ -139,7 +140,7 @@ describe("Event Processor", function(): void {
EventHubClient.defaultConsumerGroupName,
client,
factory,
undefined as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
Expand Down Expand Up @@ -187,7 +188,7 @@ describe("Event Processor", function(): void {
EventHubClient.defaultConsumerGroupName,
client,
factory,
undefined as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
Expand Down Expand Up @@ -283,7 +284,7 @@ describe("Event Processor", function(): void {
EventHubClient.defaultConsumerGroupName,
client,
factory,
undefined as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
Expand Down Expand Up @@ -346,7 +347,7 @@ describe("Event Processor", function(): void {
EventHubClient.defaultConsumerGroupName,
client,
factory,
undefined as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
Expand Down Expand Up @@ -410,7 +411,7 @@ describe("Event Processor", function(): void {
EventHubClient.defaultConsumerGroupName,
client,
eventProcessorFactory,
"partitionManager" as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromSequenceNumber(
partitionInfo.lastEnqueuedSequenceNumber
Expand Down Expand Up @@ -458,7 +459,6 @@ describe("Event Processor", function(): void {
partitionOwnership2
]);
partitionOwnership.length.should.equals(2);

const ownershiplist = await inMemoryPartitionManager.listOwnership(
"myEventHub",
EventHubClient.defaultConsumerGroupName
Expand All @@ -484,5 +484,114 @@ describe("Event Processor", function(): void {
partitionOwnershipList[0].sequenceNumber!.should.equals(checkpoint.sequenceNumber);
partitionOwnershipList[0].offset!.should.equals(checkpoint.offset);
});

it("should receive events from the checkpoint", async function(): Promise<void> {
ShivangiReja marked this conversation as resolved.
Show resolved Hide resolved
const partitionIds = await client.getPartitionIds();

// ensure we have at least 2 partitions
partitionIds.length.should.gte(2);

let partitionResultsMap = new Map<string, ReceivedEventData[]>();
partitionIds.forEach((id) => partitionResultsMap.set(id, []));
let didError = false;

// The partitionProcess will need to add events to the partitionResultsMap as they are received
const factory: PartitionProcessorFactory = (context, checkpointManager) => {
return {
async processEvents(events: ReceivedEventData[]) {
const existingEvents = partitionResultsMap.get(context.partitionId)!;
events.forEach((event: ReceivedEventData) => {
ShivangiReja marked this conversation as resolved.
Show resolved Hide resolved
existingEvents.push(event);
debug(
"Received event: '%s' from partition: '%s' and consumer group: '%s'",
event.body,
context.partitionId
);
checkpointManager.updateCheckpoint(event);
});
},
async processError() {
didError = true;
}
};
};

const inMemoryPartitionManager = new InMemoryPartitionManager();
const processor1 = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
factory,
inMemoryPartitionManager,
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
);

// start first processor
processor1.start();

// create messages
const expectedMessagePrefix = "EventProcessor test - checkpoint - ";
const events: EventData[] = [];

for (const partitionId of partitionIds) {
const producer = client.createProducer({ partitionId });
for (let index = 1; index <= 100; index++) {
events.push({ body: `${expectedMessagePrefix} ${index} ${partitionId}` });
}
await producer.send(events);
await producer.close();
}

// set a delay to give a consumers a chance to receive a message
await delay(500);
ShivangiReja marked this conversation as resolved.
Show resolved Hide resolved

// shutdown the first processor
await processor1.stop();

let lastEventsReceivedFromProcessor1: ReceivedEventData[] = [];
ShivangiReja marked this conversation as resolved.
Show resolved Hide resolved
let index = 0;

for (const partitionId of partitionIds) {
const receivedEvents = partitionResultsMap.get(partitionId)!;
lastEventsReceivedFromProcessor1[index++] = receivedEvents[receivedEvents.length - 1];
}

partitionResultsMap = new Map<string, ReceivedEventData[]>();
partitionIds.forEach((id) => partitionResultsMap.set(id, []));

const processor2 = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
factory,
inMemoryPartitionManager
);
// start second processor
processor2.start();

// set a delay to give a consumers a chance to receive a message
await delay(2000);

// shutdown the second processor
await processor2.stop();

index = 0;
let firstEventsReceivedFromProcessor2: ReceivedEventData[] = [];
for (const partitionId of partitionIds) {
const receivedEvents = partitionResultsMap.get(partitionId)!;
firstEventsReceivedFromProcessor2[index++] = receivedEvents[0];
}

didError.should.be.false;
index = 0;
// validate correct events captured for each partition using checkpoint
for (const partitionId of partitionIds) {
debug(`Validate events for partition: ${partitionId}`);
lastEventsReceivedFromProcessor1[index].sequenceNumber.should.equal(
firstEventsReceivedFromProcessor2[index].sequenceNumber - 1
);
index++;
}
});
});
}).timeout(90000);