Skip to content

Commit

Permalink
[Event Hubs] fixes flaky EventProcessor test
Browse files Browse the repository at this point in the history
  • Loading branch information
chradek committed Jul 31, 2019
1 parent 4ca94eb commit ecd9fd4
Showing 1 changed file with 18 additions and 20 deletions.
38 changes: 18 additions & 20 deletions sdk/eventhub/event-hubs/test/eventProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import { EnvVarKeys, getEnvVars } from "./utils/testUtils";
import { generate_uuid } from "rhea-promise";
const env = getEnvVars();

describe("Event Processor", function(): void {
describe("Event Processor", function (): void {
const service = {
connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
path: env[EnvVarKeys.EVENTHUB_NAME]
};
const client: EventHubClient = new EventHubClient(service.connectionString, service.path);
before("validate environment", async function(): Promise<void> {
before("validate environment", async function (): Promise<void> {
should.exist(
env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
"define EVENTHUB_CONNECTION_STRING in your environment before running integration tests."
Expand All @@ -41,11 +41,11 @@ describe("Event Processor", function(): void {
);
});

after("close the connection", async function(): Promise<void> {
after("close the connection", async function (): Promise<void> {
await client.close();
});

it("should treat consecutive start invocations as idempotent", async function(): Promise<void> {
it("should treat consecutive start invocations as idempotent", async function (): Promise<void> {
const partitionIds = await client.getPartitionIds();

// ensure we have at least 2 partitions
Expand Down Expand Up @@ -114,7 +114,7 @@ describe("Event Processor", function(): void {
}
});

it("should not throw if stop is called without start", async function(): Promise<void> {
it("should not throw if stop is called without start", async function (): Promise<void> {
let didPartitionProcessorStart = false;

// The partitionProcess will need to add events to the partitionResultsMap as they are received
Expand Down Expand Up @@ -151,7 +151,7 @@ describe("Event Processor", function(): void {
didPartitionProcessorStart.should.be.false;
});

it("should support start after stopping", async function(): Promise<void> {
it("should support start after stopping", async function (): Promise<void> {
const partitionIds = await client.getPartitionIds();

// ensure we have at least 2 partitions
Expand Down Expand Up @@ -204,7 +204,7 @@ describe("Event Processor", function(): void {
}

// set a delay to give a consumers a chance to receive a message
await delay(1000);
await delay(3000);

// shutdown the processor
await processor.stop();
Expand All @@ -230,24 +230,22 @@ describe("Event Processor", function(): void {
processor.start();

// set a delay to give a consumers a chance to receive a message
await delay(1000);
await delay(3000);

await processor.stop();

didError.should.be.false;
// validate correct events captured for each partition
// validate that partitionProcessor methods were called
// do not check events until checkpointing is implemented
for (const partitionId of partitionIds) {
const results = partitionResultsMap.get(partitionId)!;
const events = results.events;
events.length.should.equal(1);
events[0].should.equal(expectedMessagePrefix + partitionId);
results.initialized.should.be.true;
(results.closeReason === CloseReason.Shutdown).should.be.true;
}
});

describe("Partition processor", function(): void {
it("should support processing events across multiple partitions", async function(): Promise<
describe("Partition processor", function (): void {
it("should support processing events across multiple partitions", async function (): Promise<
void
> {
const partitionIds = await client.getPartitionIds();
Expand Down Expand Up @@ -319,7 +317,7 @@ describe("Event Processor", function(): void {
}
});

it("should support processing events across multiple partitions without initialize or close", async function(): Promise<
it("should support processing events across multiple partitions without initialize or close", async function (): Promise<
void
> {
const partitionIds = await client.getPartitionIds();
Expand Down Expand Up @@ -379,7 +377,7 @@ describe("Event Processor", function(): void {
}
});

it("should call methods on a PartitionProcessor ", async function(): Promise<void> {
it("should call methods on a PartitionProcessor ", async function (): Promise<void> {
const receivedEvents: EventData[] = [];
let isinitializeCalled = false;
let isCloseCalled = false;
Expand Down Expand Up @@ -436,8 +434,8 @@ describe("Event Processor", function(): void {
});
});

describe("InMemory Partition Manager", function(): void {
it("should claim ownership, get a list of ownership and update checkpoint", async function(): Promise<void> {
describe("InMemory Partition Manager", function (): void {
it("should claim ownership, get a list of ownership and update checkpoint", async function (): Promise<void> {
const inMemoryPartitionManager = new InMemoryPartitionManager();
const partitionOwnership1: PartitionOwnership = {
eventHubName: "myEventHub",
Expand All @@ -458,13 +456,13 @@ describe("Event Processor", function(): void {
partitionOwnership2
]);
partitionOwnership.length.should.equals(2);

const ownershipslist = await inMemoryPartitionManager.listOwnerships(
"myEventHub",
EventHubClient.defaultConsumerGroupName
);
ownershipslist.length.should.equals(2);

const checkpoint: Checkpoint = {
eventHubName: "myEventHub",
consumerGroupName: EventHubClient.defaultConsumerGroupName,
Expand Down

0 comments on commit ecd9fd4

Please sign in to comment.