Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Add test to receive events from the checkpoint and update EPH sample #4621

Merged
merged 4 commits into from
Aug 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions sdk/eventhub/event-hubs/samples/eventProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,45 @@
import {
EventHubClient,
EventData,
ReceivedEventData,
EventPosition,
delay,
EventProcessor,
PartitionContext
PartitionContext,
InMemoryPartitionManager,
CheckpointManager
} from "@azure/event-hubs";

class SimplePartitionProcessor {
private _context: PartitionContext;
constructor(context: PartitionContext) {
private _checkpointManager: CheckpointManager;
constructor(context: PartitionContext, checkpointManager: CheckpointManager) {
this._context = context;
this._checkpointManager = checkpointManager;
}
async processEvents(events: EventData[]) {
async processEvents(events: ReceivedEventData[]) {
if(events.length === 0){
return;
}
for (const event of events) {
ShivangiReja marked this conversation as resolved.
Show resolved Hide resolved
console.log(
"Received event: '%s' from partition: '%s' and consumer group: '%s'",
event.body,
this._context.partitionId,
this._context.consumerGroupName
);
try {
// checkpoint using the last event in the batch
await this._checkpointManager.updateCheckpoint(events[events.length - 1]);
console.log(
"Successfully checkpointed event: '%s' from partition: '%s'",
events[events.length - 1].body,
this._context.partitionId
);
} catch (err) {
console.log(
`Encountered an error while checkpointing on ${this._context.partitionId}: ${err.message}`
);
}
}
}

Expand All @@ -43,24 +63,24 @@ const eventHubName = "";
async function main() {
const client = new EventHubClient(connectionString, eventHubName);

const eventProcessorFactory = (context: PartitionContext) => {
return new SimplePartitionProcessor(context);
const eventProcessorFactory = (context: PartitionContext, checkpoint: CheckpointManager) => {
return new SimplePartitionProcessor(context, checkpoint);
};

const processor = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
eventProcessorFactory,
"partitionManager" as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.earliest(),
maxBatchSize: 10,
maxWaitTimeInSeconds: 20
}
);
await processor.start();
// after 2 seconds, stop processing
await delay(2000);
// after 5 seconds, stop processing
await delay(5000);

await processor.stop();
await client.close();
Expand Down
13 changes: 12 additions & 1 deletion sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,20 @@ export class EventProcessor {
);

// eventually this will 1st check if the existing PartitionOwnership has a position
const eventPosition =
let eventPosition =
this._processorOptions.initialEventPosition || EventPosition.earliest();

const partitionOwnerships = await this._partitionManager.listOwnership(
this._eventHubClient.eventHubName,
this._consumerGroupName
);
for (const ownership of partitionOwnerships) {
if (ownership.partitionId === partitionId && ownership.sequenceNumber) {
eventPosition = EventPosition.fromSequenceNumber(ownership.sequenceNumber);
ShivangiReja marked this conversation as resolved.
Show resolved Hide resolved
break;
}
}

tasks.push(
this._pumpManager.createPump(
this._eventHubClient,
Expand Down
135 changes: 122 additions & 13 deletions sdk/eventhub/event-hubs/test/eventProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import {
PartitionOwnership,
Checkpoint,
PartitionProcessorFactory,
CloseReason
CloseReason,
ReceivedEventData
} from "../src";
import { EnvVarKeys, getEnvVars } from "./utils/testUtils";
import { generate_uuid } from "rhea-promise";
Expand Down Expand Up @@ -81,7 +82,7 @@ describe("Event Processor", function(): void {
EventHubClient.defaultConsumerGroupName,
client,
factory,
undefined as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
Expand Down Expand Up @@ -139,7 +140,7 @@ describe("Event Processor", function(): void {
EventHubClient.defaultConsumerGroupName,
client,
factory,
undefined as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
Expand Down Expand Up @@ -187,7 +188,7 @@ describe("Event Processor", function(): void {
EventHubClient.defaultConsumerGroupName,
client,
factory,
undefined as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
Expand Down Expand Up @@ -283,7 +284,7 @@ describe("Event Processor", function(): void {
EventHubClient.defaultConsumerGroupName,
client,
factory,
undefined as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
Expand Down Expand Up @@ -346,7 +347,7 @@ describe("Event Processor", function(): void {
EventHubClient.defaultConsumerGroupName,
client,
factory,
undefined as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
Expand Down Expand Up @@ -405,28 +406,26 @@ describe("Event Processor", function(): void {
const eventProcessorFactory = (context: PartitionContext) => {
return new SimpleEventProcessor();
};
const partitionInfo = await client.getPartitionProperties("0");

const processor = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
eventProcessorFactory,
"partitionManager" as any,
new InMemoryPartitionManager(),
{
initialEventPosition: EventPosition.fromSequenceNumber(
partitionInfo.lastEnqueuedSequenceNumber
),
initialEventPosition:EventPosition.fromEnqueuedTime(new Date()),
maxBatchSize: 1,
maxWaitTimeInSeconds: 5
}
);
const producer = client.createProducer({ partitionId: "0" });
await producer.send({ body: "Hello world!!!" });
await producer.close();

await processor.start();
// after 2 seconds, stop processing
await delay(2000);
await processor.stop();
await producer.close();
isinitializeCalled.should.equal(true);
receivedEvents.length.should.equal(1);
receivedEvents[0].body.should.equal("Hello world!!!");
Expand Down Expand Up @@ -458,7 +457,6 @@ describe("Event Processor", function(): void {
partitionOwnership2
]);
partitionOwnership.length.should.equals(2);

const ownershiplist = await inMemoryPartitionManager.listOwnership(
"myEventHub",
EventHubClient.defaultConsumerGroupName
Expand All @@ -484,5 +482,116 @@ describe("Event Processor", function(): void {
partitionOwnershipList[0].sequenceNumber!.should.equals(checkpoint.sequenceNumber);
partitionOwnershipList[0].offset!.should.equals(checkpoint.offset);
});

it("should receive events from the checkpoint", async function(): Promise<void> {
ShivangiReja marked this conversation as resolved.
Show resolved Hide resolved
const partitionIds = await client.getPartitionIds();

// ensure we have at least 2 partitions
partitionIds.length.should.gte(2);

let checkpointMap = new Map<string, ReceivedEventData[]>();
partitionIds.forEach((id) => checkpointMap.set(id, []));
let didError = false;

let partionCount: { [x: string]: number } = {};
const factory: PartitionProcessorFactory = (context, checkpointManager) => {
return {
async processEvents(events: ReceivedEventData[]) {
!partionCount[context.partitionId]
? (partionCount[context.partitionId] = 1)
: partionCount[context.partitionId]++;
const existingEvents = checkpointMap.get(context.partitionId)!;
for (const event of events) {
debug("Received event: '%s' from partition: '%s'", event.body, context.partitionId);
if (partionCount[context.partitionId] <= 50) {
await checkpointManager.updateCheckpoint(event);
existingEvents.push(event);
}
}
},
async processError() {
didError = true;
}
};
};

const inMemoryPartitionManager = new InMemoryPartitionManager();
const processor1 = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
factory,
inMemoryPartitionManager,
{
initialEventPosition: EventPosition.fromEnqueuedTime(new Date())
}
);

// start first processor
processor1.start();

// create messages
const expectedMessagePrefix = "EventProcessor test - checkpoint - ";
const events: EventData[] = [];

for (const partitionId of partitionIds) {
const producer = client.createProducer({ partitionId });
for (let index = 1; index <= 100; index++) {
events.push({ body: `${expectedMessagePrefix} ${index} ${partitionId}` });
}
await producer.send(events);
await producer.close();
}

// set a delay to give a consumers a chance to receive a message
await delay(5000);

// shutdown the first processor
await processor1.stop();

const lastEventsReceivedFromProcessor1: ReceivedEventData[] = [];
let index = 0;

for (const partitionId of partitionIds) {
const receivedEvents = checkpointMap.get(partitionId)!;
lastEventsReceivedFromProcessor1[index++] = receivedEvents[receivedEvents.length - 1];
}

checkpointMap = new Map<string, ReceivedEventData[]>();
partitionIds.forEach((id) => checkpointMap.set(id, []));
partionCount = {};

const processor2 = new EventProcessor(
EventHubClient.defaultConsumerGroupName,
client,
factory,
inMemoryPartitionManager
);
// start second processor
processor2.start();

// set a delay to give a consumers a chance to receive a message
await delay(5000);

// shutdown the second processor
await processor2.stop();

index = 0;
const firstEventsReceivedFromProcessor2: ReceivedEventData[] = [];
for (const partitionId of partitionIds) {
const receivedEvents = checkpointMap.get(partitionId)!;
firstEventsReceivedFromProcessor2[index++] = receivedEvents[0];
}

didError.should.be.false;
index = 0;
// validate correct events captured for each partition using checkpoint
for (const partitionId of partitionIds) {
debug(`Validate events for partition: ${partitionId}`);
lastEventsReceivedFromProcessor1[index].sequenceNumber.should.equal(
firstEventsReceivedFromProcessor2[index].sequenceNumber - 1
);
index++;
}
});
});
}).timeout(90000);