diff --git a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts index 0a779d9ad59b..dc314b45fd97 100644 --- a/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventProcessor.spec.ts @@ -24,13 +24,13 @@ import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; import { generate_uuid } from "rhea-promise"; const env = getEnvVars(); -describe("Event Processor", function(): void { +describe("Event Processor", function (): void { const service = { connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], path: env[EnvVarKeys.EVENTHUB_NAME] }; const client: EventHubClient = new EventHubClient(service.connectionString, service.path); - before("validate environment", async function(): Promise { + before("validate environment", async function (): Promise { should.exist( env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], "define EVENTHUB_CONNECTION_STRING in your environment before running integration tests." @@ -41,11 +41,11 @@ describe("Event Processor", function(): void { ); }); - after("close the connection", async function(): Promise { + after("close the connection", async function (): Promise { await client.close(); }); - it("should treat consecutive start invocations as idempotent", async function(): Promise { + it("should treat consecutive start invocations as idempotent", async function (): Promise { const partitionIds = await client.getPartitionIds(); // ensure we have at least 2 partitions @@ -114,7 +114,7 @@ describe("Event Processor", function(): void { } }); - it("should not throw if stop is called without start", async function(): Promise { + it("should not throw if stop is called without start", async function (): Promise { let didPartitionProcessorStart = false; // The partitionProcess will need to add events to the partitionResultsMap as they are received @@ -151,7 +151,7 @@ describe("Event Processor", function(): void { didPartitionProcessorStart.should.be.false; }); - it("should support start after stopping", async function(): Promise { + it("should support start after stopping", async function (): Promise { const partitionIds = await client.getPartitionIds(); // ensure we have at least 2 partitions @@ -204,7 +204,7 @@ describe("Event Processor", function(): void { } // set a delay to give a consumers a chance to receive a message - await delay(1000); + await delay(3000); // shutdown the processor await processor.stop(); @@ -230,24 +230,22 @@ describe("Event Processor", function(): void { processor.start(); // set a delay to give a consumers a chance to receive a message - await delay(1000); + await delay(3000); await processor.stop(); didError.should.be.false; - // validate correct events captured for each partition + // validate that partitionProcessor methods were called + // do not check events until checkpointing is implemented for (const partitionId of partitionIds) { const results = partitionResultsMap.get(partitionId)!; - const events = results.events; - events.length.should.equal(1); - events[0].should.equal(expectedMessagePrefix + partitionId); results.initialized.should.be.true; (results.closeReason === CloseReason.Shutdown).should.be.true; } }); - describe("Partition processor", function(): void { - it("should support processing events across multiple partitions", async function(): Promise< + describe("Partition processor", function (): void { + it("should support processing events across multiple partitions", async function (): Promise< void > { const partitionIds = await client.getPartitionIds(); @@ -319,7 +317,7 @@ describe("Event Processor", function(): void { } }); - it("should support processing events across multiple partitions without initialize or close", async function(): Promise< + it("should support processing events across multiple partitions without initialize or close", async function (): Promise< void > { const partitionIds = await client.getPartitionIds(); @@ -379,7 +377,7 @@ describe("Event Processor", function(): void { } }); - it("should call methods on a PartitionProcessor ", async function(): Promise { + it("should call methods on a PartitionProcessor ", async function (): Promise { const receivedEvents: EventData[] = []; let isinitializeCalled = false; let isCloseCalled = false; @@ -436,8 +434,8 @@ describe("Event Processor", function(): void { }); }); - describe("InMemory Partition Manager", function(): void { - it("should claim ownership, get a list of ownership and update checkpoint", async function(): Promise { + describe("InMemory Partition Manager", function (): void { + it("should claim ownership, get a list of ownership and update checkpoint", async function (): Promise { const inMemoryPartitionManager = new InMemoryPartitionManager(); const partitionOwnership1: PartitionOwnership = { eventHubName: "myEventHub", @@ -458,13 +456,13 @@ describe("Event Processor", function(): void { partitionOwnership2 ]); partitionOwnership.length.should.equals(2); - + const ownershipslist = await inMemoryPartitionManager.listOwnerships( "myEventHub", EventHubClient.defaultConsumerGroupName ); ownershipslist.length.should.equals(2); - + const checkpoint: Checkpoint = { eventHubName: "myEventHub", consumerGroupName: EventHubClient.defaultConsumerGroupName,