diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index b431b061a731..ac82816b5d5f 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -109,7 +109,7 @@ export class EventHubBufferedProducerClient { // @public export interface EventHubBufferedProducerClientOptions extends EventHubClientOptions { - maxBufferedEventCount?: number; + maxEventBufferLengthPerPartition?: number; maxWaitTimeInMs?: number; onSendEventsErrorHandler: (ctx: OnSendEventsErrorContext) => Promise; onSendEventsSuccessHandler?: (ctx: OnSendEventsSuccessContext) => Promise; diff --git a/sdk/eventhub/event-hubs/src/batchingPartitionChannel.ts b/sdk/eventhub/event-hubs/src/batchingPartitionChannel.ts new file mode 100644 index 000000000000..b96da94a0b6f --- /dev/null +++ b/sdk/eventhub/event-hubs/src/batchingPartitionChannel.ts @@ -0,0 +1,309 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { AmqpAnnotatedMessage, delay } from "@azure/core-amqp"; +import { + EventData, + EventDataBatch, + EventHubBufferedProducerClientOptions, + EventHubProducerClient, + OperationOptions +} from "./index"; +import { AwaitableQueue } from "./impl/awaitableQueue"; +import { isDefined, isObjectWithProperties } from "./util/typeGuards"; +import { AbortSignalLike } from "@azure/abort-controller"; +import { getPromiseParts } from "./util/getPromiseParts"; + +export interface BatchingPartitionChannelProps { + loopAbortSignal: AbortSignalLike; + maxBufferSize: number; + maxWaitTimeInMs: number; + partitionId: string; + producer: EventHubProducerClient; + /** + * The handler to call once a batch has successfully published. + */ + onSendEventsSuccessHandler?: EventHubBufferedProducerClientOptions["onSendEventsSuccessHandler"]; + /** + * The handler to call when a batch fails to publish. + */ + onSendEventsErrorHandler: EventHubBufferedProducerClientOptions["onSendEventsErrorHandler"]; +} + +/** + * The `BatchingPartitionChannel` is responsible for accepting enqueued events + * and optimally batching and sending them to an Event Hub. + * @internal + */ +export class BatchingPartitionChannel { + private _eventQueue = new AwaitableQueue(); + private _batchedEvents: Array = []; + private _bufferCount: number = 0; + private _readyQueue: Array<{ + resolve: (value: void) => void; + reject: (reason?: any) => void; + }> = []; + private _flushState: + | { isFlushing: false } + | { isFlushing: true; currentPromise: Promise; resolve: () => void } = { + isFlushing: false + }; + private _isRunning: boolean = false; + private _lastBatchCreationTime: number = 0; + private _loopAbortSignal: AbortSignalLike; + private _maxBufferSize: number; + private _maxWaitTimeInMs: number; + private _onSendEventsErrorHandler: EventHubBufferedProducerClientOptions["onSendEventsErrorHandler"]; + private _onSendEventsSuccessHandler?: EventHubBufferedProducerClientOptions["onSendEventsSuccessHandler"]; + + private _partitionId: string; + private _producer: EventHubProducerClient; + + constructor({ + loopAbortSignal, + maxBufferSize, + maxWaitTimeInMs, + onSendEventsErrorHandler, + onSendEventsSuccessHandler, + partitionId, + producer + }: BatchingPartitionChannelProps) { + this._loopAbortSignal = loopAbortSignal; + this._maxBufferSize = maxBufferSize; + this._maxWaitTimeInMs = maxWaitTimeInMs; + this._onSendEventsErrorHandler = onSendEventsErrorHandler; + this._onSendEventsSuccessHandler = onSendEventsSuccessHandler; + this._partitionId = partitionId; + this._producer = producer; + } + + getCurrentBufferedCount(): number { + return this._bufferCount; + } + + async enqueueEvent(event: EventData | AmqpAnnotatedMessage): Promise { + await this._ready(); + this._eventQueue.push(event); + this._bufferCount++; + + if (!this._isRunning) { + this._isRunning = true; + this._startPublishLoop().catch(() => { + /* TODO: Log error */ + }); + } + } + + /** + * Sets the flush state so that no new events can be enqueued until + * all the currently buffered events are sent to the Event Hub. + * + * Returns a promise that resolves once flushing is complete. + */ + async flush(_options: OperationOptions = {}): Promise { + const state = this._flushState; + if (state.isFlushing) { + return state.currentPromise; + } + + if (this.getCurrentBufferedCount() === 0) { + return Promise.resolve(); + } + + const { promise, resolve } = getPromiseParts(); + this._flushState = { isFlushing: true, currentPromise: promise, resolve }; + + return promise; + } + + /** + * Returns a promise that resolves once there is room for events to be added + * to the buffer. + */ + private _ready(): Promise { + const currentBufferedCount = this.getCurrentBufferedCount(); + + // If the buffer isn't full and we don't have any pending `ready()` calls, + // then it's safe to return right away. + if ( + currentBufferedCount < this._maxBufferSize && + !this._readyQueue.length && + !this._flushState.isFlushing + ) { + return Promise.resolve(); + } + + const { promise: readyPromise, reject, resolve } = getPromiseParts(); + this._readyQueue.push({ resolve, reject }); + + return readyPromise; + } + + /** + * Starts the loop that creates batches and sends them to the Event Hub. + * + * The loop will run until the `_loopAbortSignal` is aborted. + */ + private async _startPublishLoop() { + let batch: EventDataBatch | undefined; + let futureEvent = this._eventQueue.shift(); + // `eventToAddToBatch` is used to keep track of an event that has been removed + // from the queue, but has not yet been added to a batch. + // This prevents losing an event if a `sendBatch` or `createBatch` call fails + // before the event is added to a batch. + let eventToAddToBatch: EventData | AmqpAnnotatedMessage | undefined; + while (!this._loopAbortSignal.aborted) { + try { + if (!isDefined(batch)) { + batch = await this._createBatch(); + } + const timeSinceLastBatchCreation = Date.now() - this._lastBatchCreationTime; + const maximumTimeToWaitForEvent = batch.count + ? Math.max(this._maxWaitTimeInMs - timeSinceLastBatchCreation, 0) + : this._maxWaitTimeInMs; + + const event = + eventToAddToBatch ?? + (await Promise.race([futureEvent, delay(maximumTimeToWaitForEvent)])); + + if (!event) { + // We didn't receive an event within the allotted time. + // Send the existing batch if it has events in it. + if (batch.count) { + await this._producer.sendBatch(batch); + this._reportSuccess(); + batch = await this._createBatch(); + } + continue; + } else if (!eventToAddToBatch) { + eventToAddToBatch = event; + // We received an event, so get a promise for the next one. + futureEvent = this._eventQueue.shift(); + } + + const didAdd = batch.tryAdd(event); + if (didAdd) { + // This event will definitely make it to one of the user-provided handlers + // since it was added to a batch. + // Store it so we can return it in a handler later. + this._batchedEvents.push(event); + // Clear reference to existing event since it has been added to the batch. + eventToAddToBatch = undefined; + } + + if (didAdd && batch.count >= this._maxBufferSize) { + // Whenever batch.count exceeds the max count of buffered events, send the batch. + await this._producer.sendBatch(batch); + this._reportSuccess(); + batch = await this._createBatch(); + } else if (!didAdd && batch.count) { + // If the event wasn't able to be added and the current batch isn't empty, + // attempt to send the current batch and add the event to a new batch. + await this._producer.sendBatch(batch); + this._reportSuccess(); + batch = await this._createBatch(); + } + + if (!didAdd && !batch.tryAdd(event)) { + // TODO: Report MaxMesageSizeExceeded error. Mimic service's error. + this._reportFailure(new Error("Placeholder for max message size exceeded"), event); + } else if (!didAdd) { + // Handles the case where the event _was_ successfull added to the new batch. + this._batchedEvents.push(event); + } + // Clear reference to existing event since it has been added to the batch. + eventToAddToBatch = undefined; + } catch (err) { + if (!isObjectWithProperties(err, ["name"]) || err.name !== "AbortError") { + this._reportFailure(err); + batch = undefined; + this._batchedEvents = []; + } + } + } + } + + /** + * Helper method that returns an `EventDataBatch`. + * This also has the side effects of + * - keeping track of batch creation time: needed for maxWaitTime calculations. + * - clearing reference to batched events. + * - incrementing the readiness: creating a new batch indicates the buffer + * should have room, so we can resolve some pending `ready()` calls. + */ + private async _createBatch(): Promise { + this._lastBatchCreationTime = Date.now(); + this._batchedEvents = []; + const batch = await this._producer.createBatch({ + partitionId: this._partitionId + }); + this._incrementReadiness(); + return batch; + } + + /** + * This method will resolve as many pending `ready()` calls as it can + * based on how much space remains in the buffer. + * + * If the channel is currently flushing, this is a no-op. This prevents + * `enqueueEvent` calls from adding the event to the buffer until flushing + * completes. + */ + private _incrementReadiness() { + if (this._flushState.isFlushing) { + return; + } + const currentBufferedCount = this.getCurrentBufferedCount(); + const num = Math.min(this._maxBufferSize - currentBufferedCount, this._readyQueue.length); + for (let i = 0; i < num; i++) { + this._readyQueue.shift()?.resolve(); + } + } + + /** + * Calls the user-provided `onSendEventsSuccessHandler` with the events + * that were successfully sent. + */ + private _reportSuccess() { + this._bufferCount = this._bufferCount - this._batchedEvents.length; + this._updateFlushState(); + this._onSendEventsSuccessHandler?.({ + events: this._batchedEvents, + partitionId: this._partitionId + }).catch(() => { + /* TODO: Log error */ + }); + } + + /** + * Calls the user-provided `onSendEventsErrorHandler` with an error and the events + * that were not successfully sent. + */ + private _reportFailure(err: any, event?: EventData | AmqpAnnotatedMessage) { + this._bufferCount = this._bufferCount - (event ? 1 : this._batchedEvents.length); + this._updateFlushState(); + this._onSendEventsErrorHandler({ + error: err, + events: event ? [event] : this._batchedEvents, + partitionId: this._partitionId + }).catch(() => { + /* TODO: Log error */ + }); + } + + /** + * Updates the channel's flush state once the size of the + * event buffer has decreased to 0. + */ + private _updateFlushState() { + const state = this._flushState; + if (!state.isFlushing || this.getCurrentBufferedCount() !== 0) { + return; + } + + state.resolve(); + + this._flushState = { isFlushing: false }; + this._incrementReadiness(); + } +} diff --git a/sdk/eventhub/event-hubs/src/eventHubBufferedProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubBufferedProducerClient.ts index 4da9b94997c4..de9c970e77aa 100644 --- a/sdk/eventhub/event-hubs/src/eventHubBufferedProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubBufferedProducerClient.ts @@ -1,10 +1,12 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +import { AbortController } from "@azure/abort-controller"; import { AmqpAnnotatedMessage } from "@azure/core-amqp"; import { NamedKeyCredential, SASCredential, TokenCredential } from "@azure/core-auth"; -import { EventData, OperationOptions } from "."; -import { ConnectionContext, createConnectionContext } from "./connectionContext"; +import { BatchingPartitionChannel } from "./batchingPartitionChannel"; +import { PartitionAssigner } from "./impl/partitionAssigner"; +import { EventData, EventHubProducerClient, OperationOptions } from "./index"; import { EventHubProperties, PartitionProperties } from "./managementClient"; import { EventHubClientOptions, @@ -54,11 +56,15 @@ export interface OnSendEventsErrorContext { */ export interface EventHubBufferedProducerClientOptions extends EventHubClientOptions { /** - * The maximum number of events to buffer. + * The total number of events that can be buffered for publishing at a given time for a given partition. + * + * Default: 1500 */ - maxBufferedEventCount?: number; + maxEventBufferLengthPerPartition?: number; /** - * The maximum amount of time to wait before sending a batch of messages to the Event Hub. + * The amount of time to wait for a new event to be enqueued in the buffer before publishing a partially full batch. + * + * Default: 250 milliseconds. */ maxWaitTimeInMs?: number; /** @@ -102,21 +108,47 @@ export interface EnqueueEventOptions extends SendBatchOptions {} */ export class EventHubBufferedProducerClient { /** - * Describes the amqp connection context for the client. + * Controls the `abortSignal` passed to each `BatchingPartitionChannel`. + * Used to signal when a channel should stop waiting for events. + */ + private _abortController = new AbortController(); + + /** + * Indicates whether the client has been explicitly closed. */ - private _context: ConnectionContext; + private _isClosed: boolean = false; /** - * The options passed by the user when creating the EventHubClient instance. + * Handles assigning partitions. */ - private _clientOptions: EventHubClientOptions; + private _partitionAssigner = new PartitionAssigner(); + + /** + * The known partitionIds that will be used when assigning events to partitions. + */ + private _partitionIds: string[] = []; + /** + * The EventHubProducerClient to use when creating and sending batches to the Event Hub. + */ + private _producer: EventHubProducerClient; + + /** + * Mapping of partitionIds to `BatchingPartitionChannels`. + * Each `BatchingPartitionChannel` handles buffering events and backpressure independently. + */ + private _partitionChannels = new Map(); + + /** + * The options passed by the user when creating the EventHubBufferedProducerClient instance. + */ + private _clientOptions: EventHubBufferedProducerClientOptions; /** * @readonly * The name of the Event Hub instance for which this client is created. */ get eventHubName(): string { - throw new Error("Not implemented"); + return this._producer.eventHubName; } /** @@ -125,7 +157,7 @@ export class EventHubBufferedProducerClient { * This is likely to be similar to .servicebus.windows.net. */ get fullyQualifiedNamespace(): string { - throw new Error("Not implemented"); + return this._producer.fullyQualifiedNamespace; } /** @@ -195,18 +227,27 @@ export class EventHubBufferedProducerClient { | EventHubBufferedProducerClientOptions, options4?: EventHubBufferedProducerClientOptions ) { - this._context = createConnectionContext( - fullyQualifiedNamespaceOrConnectionString1, - eventHubNameOrOptions2, - credentialOrOptions3, - options4 - ); if (typeof eventHubNameOrOptions2 !== "string") { - this._clientOptions = eventHubNameOrOptions2 || {}; + this._producer = new EventHubProducerClient( + fullyQualifiedNamespaceOrConnectionString1, + eventHubNameOrOptions2 + ); + this._clientOptions = { ...eventHubNameOrOptions2 }; } else if (!isCredential(credentialOrOptions3)) { - this._clientOptions = credentialOrOptions3 || {}; + this._producer = new EventHubProducerClient( + fullyQualifiedNamespaceOrConnectionString1, + eventHubNameOrOptions2, + credentialOrOptions3 + ); + this._clientOptions = { ...credentialOrOptions3! }; } else { - this._clientOptions = options4 || {}; + this._producer = new EventHubProducerClient( + fullyQualifiedNamespaceOrConnectionString1, + eventHubNameOrOptions2, + credentialOrOptions3, + options4 + ); + this._clientOptions = { ...options4! }; } } @@ -214,6 +255,10 @@ export class EventHubBufferedProducerClient { * Closes the AMQP connection to the Event Hub instance, * returning a promise that will be resolved when disconnection is completed. * + * This will wait for enqueued events to be flushed to the service before closing + * the connection. + * To close without flushing, set the `flush` option to `false`. + * * @param options - The set of options to apply to the operation call. * @returns Promise * @throws Error if the underlying connection encounters an error while closing. @@ -222,7 +267,10 @@ export class EventHubBufferedProducerClient { if (!isDefined(options.flush) || options.flush === true) { await this.flush(options); } - return this._context.close(); + // Calling abort signals to the BatchingPartitionChannels that they + // should stop reading/sending events. + this._abortController.abort(); + return this._producer.close(); } /** @@ -247,7 +295,27 @@ export class EventHubBufferedProducerClient { event: EventData | AmqpAnnotatedMessage, options: EnqueueEventOptions = {} ): Promise { - throw new Error(`Not implemented ${event}, ${options}`); + if (this._isClosed) { + throw new Error( + `This EventHubBufferedProducerClient has already been closed. Create a new client to enqueue events.` + ); + } + + // TODO: Start a loop that queries partition Ids. + // partition ids can be added to an Event Hub after it's been created. + if (!this._partitionIds.length) { + this._partitionIds = await this.getPartitionIds(); + this._partitionAssigner.setPartitionIds(this._partitionIds); + } + + const partitionId = this._partitionAssigner.assignPartition({ + partitionId: options.partitionId, + partitionKey: options.partitionKey + }); + + const partitionChannel = this._getPartitionChannel(partitionId); + await partitionChannel.enqueueEvent(event); + return this._getTotalBufferedEventsCount(); } /** @@ -272,7 +340,11 @@ export class EventHubBufferedProducerClient { events: EventData[] | AmqpAnnotatedMessage[], options: EnqueueEventOptions = {} ): Promise { - throw new Error(`Not implemented ${events}, ${options}`); + for (const event of events) { + await this.enqueueEvent(event, options); + } + + return this._getTotalBufferedEventsCount(); } /** @@ -284,7 +356,9 @@ export class EventHubBufferedProducerClient { * @param options - The set of options to apply to the operation call. */ async flush(options: OperationOptions = {}): Promise { - throw new Error(`Not implemented ${options}`); + await Promise.all( + Array.from(this._partitionChannels.values()).map((channel) => channel.flush(options)) + ); } /** @@ -295,10 +369,7 @@ export class EventHubBufferedProducerClient { * @throws AbortError if the operation is cancelled via the abortSignal. */ getEventHubProperties(options: GetEventHubPropertiesOptions = {}): Promise { - return this._context.managementSession!.getEventHubProperties({ - ...options, - retryOptions: this._clientOptions.retryOptions - }); + return this._producer.getEventHubProperties(options); } /** @@ -310,14 +381,7 @@ export class EventHubBufferedProducerClient { * @throws AbortError if the operation is cancelled via the abortSignal. */ getPartitionIds(options: GetPartitionIdsOptions = {}): Promise> { - return this._context - .managementSession!.getEventHubProperties({ - ...options, - retryOptions: this._clientOptions.retryOptions - }) - .then((eventHubProperties) => { - return eventHubProperties.partitionIds; - }); + return this._producer.getPartitionIds(options); } /** @@ -332,9 +396,39 @@ export class EventHubBufferedProducerClient { partitionId: string, options: GetPartitionPropertiesOptions = {} ): Promise { - return this._context.managementSession!.getPartitionProperties(partitionId, { - ...options, - retryOptions: this._clientOptions.retryOptions - }); + return this._producer.getPartitionProperties(partitionId, options); + } + + /** + * Gets the `BatchingPartitionChannel` associated with the partitionId. + * + * If one does not exist, it is created. + */ + private _getPartitionChannel(partitionId: string): BatchingPartitionChannel { + const partitionChannel = + this._partitionChannels.get(partitionId) ?? + new BatchingPartitionChannel({ + loopAbortSignal: this._abortController.signal, + maxBufferSize: this._clientOptions.maxEventBufferLengthPerPartition || 1500, + maxWaitTimeInMs: this._clientOptions.maxWaitTimeInMs || 250, + onSendEventsErrorHandler: this._clientOptions.onSendEventsErrorHandler, + onSendEventsSuccessHandler: this._clientOptions.onSendEventsSuccessHandler, + partitionId, + producer: this._producer + }); + this._partitionChannels.set(partitionId, partitionChannel); + return partitionChannel; + } + + /** + * Returns the total number of buffered events across all partitions. + */ + private _getTotalBufferedEventsCount(): number { + let total = 0; + for (const [_, channel] of this._partitionChannels) { + total += channel.getCurrentBufferedCount(); + } + + return total; } } diff --git a/sdk/eventhub/event-hubs/src/impl/awaitableQueue.ts b/sdk/eventhub/event-hubs/src/impl/awaitableQueue.ts new file mode 100644 index 000000000000..e4c8a0a6363b --- /dev/null +++ b/sdk/eventhub/event-hubs/src/impl/awaitableQueue.ts @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/** + * `AwaitableQueue` stores items in the order that they are received. + * + * This differs from ordinary Queues in that `shift` returns a Promise for a value. + * This allows a consumer of the queue to request an item that the queue does not yet have. + * + * @hidden + */ +export class AwaitableQueue { + private readonly _items: T[]; + + private readonly _resolvers: Array<(value: T) => void> = []; + + constructor() { + this._items = []; + } + + public size(): number { + return this._items.length; + } + + /** + * Returns a Promise that will resolve with the next item in the queue. + */ + public shift(): Promise { + const item = this._items.shift(); + if (typeof item !== "undefined") { + return Promise.resolve(item); + } + + return new Promise((resolve) => this._resolvers.push(resolve)); + } + + /** + * Appends new item to the queue. + */ + public push(item: T): void { + if (!this._resolveNextItem(item)) { + this._items.push(item); + } + } + + private _resolveNextItem(item: T) { + const resolver = this._resolvers.shift(); + if (!resolver) { + return false; + } + + resolver(item); + return true; + } +} diff --git a/sdk/eventhub/event-hubs/src/impl/partitionAssigner.ts b/sdk/eventhub/event-hubs/src/impl/partitionAssigner.ts new file mode 100644 index 000000000000..f2f0297c1ef4 --- /dev/null +++ b/sdk/eventhub/event-hubs/src/impl/partitionAssigner.ts @@ -0,0 +1,76 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { isDefined } from "../util/typeGuards"; + +/** + * @internal + * Assigns a partition based on the partition ids it knows about and an optional partition id or partition key. + */ +export class PartitionAssigner { + private _partitions: string[] = []; + + private _lastRoundRobinPartitionIndex: number = -1; + + /** + * Set the partition ids that can be used when assigning a partition. + * @param partitionIds - All valid partition ids. + */ + public setPartitionIds(partitionIds: string[]): void { + this._partitions = partitionIds; + } + + /** + * Returns a partitionId from the list of partition ids set via `setPartitionIds`. + * + * If a partitionId is specified, then that will be returned directly. + * If a partitionKey is specified, then a partitionId will be calculated based on the partitionKey. + * Specifying both partitionId and partitionKey results in an error. + * + * If neither partitionId nor partitionKey are specified, then a partitionId will be selected + * based on a round-robin approach. + */ + assignPartition({ + partitionId, + partitionKey + }: { + partitionId?: string; + partitionKey?: string; + }): string { + if (isDefined(partitionId) && isDefined(partitionKey)) { + throw new Error( + `The partitionId (${partitionId}) and partitionKey (${partitionKey}) cannot both be specified.` + ); + } + + if (!this._partitions.length) { + throw new Error(`Unable to determine partitionIds, can't assign partitionId.`); + } + + if (isDefined(partitionId)) { + return partitionId; + } + + if (isDefined(partitionKey)) { + return this._assignPartitionForPartitionKey(partitionKey); + } + + return this._assignRoundRobinPartition(); + } + + private _assignPartitionForPartitionKey(partitionKey: string): string { + // TODO: Implement hashing function + return partitionKey ? this._partitions[0] : this._partitions[0]; + } + + private _assignRoundRobinPartition(): string { + const maxPartitionIndex = this._partitions.length - 1; + const proposedPartitionIndex = this._lastRoundRobinPartitionIndex + 1; + + const nextPartitionIndex = + proposedPartitionIndex > maxPartitionIndex ? 0 : proposedPartitionIndex; + + this._lastRoundRobinPartitionIndex = nextPartitionIndex; + return this._partitions[nextPartitionIndex]; + } +} diff --git a/sdk/eventhub/event-hubs/src/util/getPromiseParts.ts b/sdk/eventhub/event-hubs/src/util/getPromiseParts.ts new file mode 100644 index 000000000000..202ca3f3128e --- /dev/null +++ b/sdk/eventhub/event-hubs/src/util/getPromiseParts.ts @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/** + * @internal + * Returns a promise and the promise's resolve and reject methods. + */ +export function getPromiseParts(): { + promise: Promise; + resolve: (value: T) => void; + reject: (reason: Error) => void; +} { + let resolver: (value: T) => void; + let rejector: (reason?: any) => void; + const promise = new Promise((resolve, reject) => { + resolver = resolve; + rejector = reject; + }); + return { + promise, + resolve: resolver!, + reject: rejector! + }; +} diff --git a/sdk/eventhub/event-hubs/test/internal/impl/awaitableQueue.spec.ts b/sdk/eventhub/event-hubs/test/internal/impl/awaitableQueue.spec.ts new file mode 100644 index 000000000000..60bcffa4b9a1 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/internal/impl/awaitableQueue.spec.ts @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import { testWithServiceTypes } from "../../public/utils/testWithServiceTypes"; +import { AwaitableQueue } from "../../../src/impl/awaitableQueue"; +const should = chai.should(); + +testWithServiceTypes(() => { + describe("AwaitableQueue", () => { + it("can be instantiated", () => { + const queue = new AwaitableQueue(); + should.exist(queue, "queue was not defined."); + should.equal(queue.size(), 0, "Unexpected number of values in queue."); + }); + + it("supports adding and removing items", async () => { + const expectedNumberOfItems = 10; + const queue = new AwaitableQueue(); + + for (let i = 0; i < expectedNumberOfItems; i++) { + queue.push(i); + } + + should.equal(queue.size(), expectedNumberOfItems, "Unexpected number of items in queue."); + + let receivedCount = 0; + while (queue.size()) { + const value = await queue.shift(); + should.equal(value, receivedCount, "Unexpected value shifted from queue."); + receivedCount++; + } + }); + + it("shift resolves with next pushed item", async () => { + const queue = new AwaitableQueue(); + should.equal(queue.size(), 0, "Expected the queue to be empty."); + + const futureValue = queue.shift(); + should.equal(queue.size(), 0, "Expected the queue to be empty."); + + queue.push("foo"); + + const value = await futureValue; + should.equal(value, "foo", "Unexpected value"); + should.equal(queue.size(), 0, "Expected the queue to be empty."); + }); + + it("each shift call resolves with the next consecutive item that appears in the queue", async () => { + const queue = new AwaitableQueue(); + should.equal(queue.size(), 0, "Expected the queue to be empty."); + + const expectedResults = ["foo", "bar", "baz"]; + const futureValues: Promise[] = []; + + for (let i = 0; i < expectedResults.length; i++) { + futureValues.push(queue.shift()); + } + + for (const input of expectedResults) { + queue.push(input); + } + + const values = await Promise.all(futureValues); + for (let i = 0; i < values.length; i++) { + should.equal(values[i], expectedResults[i], "Unexpected value encountered."); + } + + should.equal(queue.size(), 0, "Expected the queue to be empty."); + }); + }); +}); diff --git a/sdk/eventhub/event-hubs/test/public/eventHubBufferedProducerClient.spec.ts b/sdk/eventhub/event-hubs/test/public/eventHubBufferedProducerClient.spec.ts new file mode 100644 index 000000000000..c99e4ff06ed1 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/public/eventHubBufferedProducerClient.spec.ts @@ -0,0 +1,240 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; +import { testWithServiceTypes } from "./utils/testWithServiceTypes"; +import { createMockServer } from "./utils/mockService"; +import { + EventHubBufferedProducerClient, + EventData, + OnSendEventsErrorContext, + OnSendEventsSuccessContext +} from "../../src/index"; +import { AmqpAnnotatedMessage } from "@azure/core-amqp"; + +const assert = chai.assert; + +type ResultError = { type: "error"; context: OnSendEventsErrorContext }; +type ResultSuccess = { type: "success"; context: OnSendEventsSuccessContext }; +type ResultEnqueue = { type: "enqueue"; event: EventData | AmqpAnnotatedMessage }; +type ResultFlush = { type: "flush" }; +type Result = ResultEnqueue | ResultError | ResultSuccess | ResultFlush; + +testWithServiceTypes((serviceVersion) => { + const env = getEnvVars(); + if (serviceVersion === "mock") { + let service: ReturnType; + before("Starting mock service", () => { + service = createMockServer(); + return service.start(); + }); + + after("Stopping mock service", () => { + return service?.stop(); + }); + } + + describe("EventHubBufferedProducerClient", () => { + const connectionString = env[EnvVarKeys.EVENTHUB_CONNECTION_STRING]; + const eventHubName = env[EnvVarKeys.EVENTHUB_NAME]; + let client: EventHubBufferedProducerClient | undefined; + + before(() => { + assert.exists( + connectionString, + "define EVENTHUB_CONNECTION_STRING in your environment before running integration tests." + ); + assert.exists( + eventHubName, + "define EVENTHUB_NAME in your environment before running integration tests." + ); + }); + + afterEach("Ensure client is closed between tests.", async () => { + if (client) { + await client.close(); + client = undefined; + } + }); + + describe("enqueueEvent", () => { + afterEach("close EventHubBufferedProducerClient", () => { + return client?.close({ flush: false }); + }); + + it("batches events targetting the same partitionId together", async () => { + const results: Result[] = []; + const expectedEventCount = 10; + const testEvents: EventData[] = []; + for (let i = 0; i < expectedEventCount; i++) { + testEvents.push({ body: `Test event ${i}` }); + } + + client = new EventHubBufferedProducerClient(connectionString, eventHubName, { + async onSendEventsErrorHandler(context) { + results.push({ type: "error", context }); + }, + async onSendEventsSuccessHandler(context) { + results.push({ type: "success", context }); + }, + maxWaitTimeInMs: 1000 + }); + + for (let i = 0; i < expectedEventCount; i++) { + const bufferedEventCount = await client.enqueueEvent(testEvents[i], { partitionId: "0" }); + assert.equal(bufferedEventCount, i + 1, "Unexpected number of events buffered."); + } + + await client.flush(); + const resultSuccess = results + .filter((r) => r.type === "success") + .map((r) => (r as ResultSuccess).context.events) + .reduce((prev, cur) => [...prev, ...cur], []); + assert.deepEqual(resultSuccess, testEvents, "Expected sent events to match test events."); + }); + + it("batches events targetting the same partitionKey together", async () => { + const results: Result[] = []; + const expectedEventCount = 10; + const testEvents: EventData[] = []; + for (let i = 0; i < expectedEventCount; i++) { + testEvents.push({ body: `Test event ${i}` }); + } + + client = new EventHubBufferedProducerClient(connectionString, eventHubName, { + async onSendEventsErrorHandler(context) { + results.push({ type: "error", context }); + }, + async onSendEventsSuccessHandler(context) { + results.push({ type: "success", context }); + }, + maxWaitTimeInMs: 1000 + }); + + for (let i = 0; i < expectedEventCount; i++) { + const bufferedEventCount = await client.enqueueEvent(testEvents[i], { + partitionKey: "foo" + }); + assert.equal(bufferedEventCount, i + 1, "Unexpected number of events buffered."); + } + + await client.flush(); + const resultSuccess = results + .filter((r) => r.type === "success") + .map((r) => (r as ResultSuccess).context.events) + .reduce((prev, cur) => [...prev, ...cur], []); + assert.deepEqual(resultSuccess, testEvents, "Expected sent events to match test events."); + }); + + it("waits until buffer has space for the event before yielding", async () => { + const results: Result[] = []; + const expectedEventCount = 5; + const testEvents: EventData[] = []; + for (let i = 0; i < expectedEventCount; i++) { + testEvents.push({ + body: `Test event ${i}` + }); + } + + client = new EventHubBufferedProducerClient(connectionString, eventHubName, { + async onSendEventsErrorHandler(context) { + results.push({ type: "error", context }); + }, + async onSendEventsSuccessHandler(context) { + results.push({ type: "success", context }); + }, + maxEventBufferLengthPerPartition: 2 + }); + + for (const testEvent of testEvents) { + await client.enqueueEvent(testEvent, { + partitionKey: "foo" + }); + results.push({ + type: "enqueue", + event: testEvent + }); + } + + await client.flush(); + const resultTypes = results.map((r) => r.type); + const resultEnqueued = results + .filter((r) => r.type === "enqueue") + .map((r) => (r as ResultEnqueue).event); + const resultSuccess = results + .filter((r) => r.type === "success") + .map((r) => (r as ResultSuccess).context.events) + .reduce((prev, cur) => [...prev, ...cur], []); + assert.deepEqual(resultTypes, [ + "enqueue", + "enqueue", + "success", + "enqueue", + "enqueue", + "success", + "enqueue", + "success" + ]); + assert.deepEqual( + resultEnqueued, + testEvents, + "Expected enqueued events to match test events." + ); + assert.deepEqual(resultSuccess, testEvents, "Expected sent events to match test events."); + }); + + it("waits until flush is complete to enqueue", async () => { + const results: Result[] = []; + + client = new EventHubBufferedProducerClient(connectionString, eventHubName, { + async onSendEventsErrorHandler(context) { + results.push({ type: "error", context }); + }, + async onSendEventsSuccessHandler(context) { + results.push({ type: "success", context }); + }, + maxEventBufferLengthPerPartition: 2 + }); + + /** + * One way to test that `enqueueEvent` waits for an in-progress `flush` + * to complete before yielding is to call `enqueueEvent` before `flush` yields. + * + * `flush` won't complete until any buffered events are either successfully sent + * or they error out. That means we can track when the `success` handler is called + * and when `flush` yields, and if there were buffered events we should see them + * one after the other. + * + * We enqueue an event to start with to ensure there's something to flush. + * + * Next, we call `flush`, and then another `enqueueEvent` without waiting for + * the `flush` to yield. + * + * Finally, we call `flush` after both methods complete to ensure there was still + * an event to send to the service. + * + * If this works properly, we should see: + * [ "success", "flush", "success", "flush" ] and each "success" should have a single event. + * + * This indicates that the 2nd `enqueueEvent` had to wait for the flush to complete before + * the event was actually accepted. + * + * If the 2nd `enqueueEvent` had not waited for the `flush` to complete, we would have seen: + * [ "success", "flush", "flush"] and the "success" would have had 2 events. + */ + + await client.enqueueEvent({ body: 1 }, { partitionId: "0" }); + await Promise.all([ + client.flush().then(() => results.push({ type: "flush" })), + client.enqueueEvent({ body: 2 }, { partitionId: "0" }) + ]); + await client.flush(); + results.push({ type: "flush" }); + + const resultTypes = results.map((r) => r.type); + assert.deepEqual(resultTypes, ["success", "flush", "success", "flush"]); + }); + }); + }); +}); diff --git a/sdk/eventhub/event-hubs/test/public/hubruntime.spec.ts b/sdk/eventhub/event-hubs/test/public/hubruntime.spec.ts index 2914c1567841..5cc0db28a135 100644 --- a/sdk/eventhub/event-hubs/test/public/hubruntime.spec.ts +++ b/sdk/eventhub/event-hubs/test/public/hubruntime.spec.ts @@ -11,10 +11,20 @@ import { EnvVarKeys, getEnvVars, setTracerForTest } from "./utils/testUtils"; import { setSpan, context } from "@azure/core-tracing"; import { SpanGraph } from "@azure/test-utils"; -import { EventHubProducerClient, EventHubConsumerClient, MessagingError } from "../../src"; +import { + EventHubBufferedProducerClient, + EventHubProducerClient, + EventHubConsumerClient, + MessagingError +} from "../../src"; import { testWithServiceTypes } from "./utils/testWithServiceTypes"; import { createMockServer } from "./utils/mockService"; +type ClientCommonMethods = Pick< + EventHubProducerClient, + "close" | "getEventHubProperties" | "getPartitionIds" | "getPartitionProperties" +>; + testWithServiceTypes((serviceVersion) => { const env = getEnvVars(); if (serviceVersion === "mock") { @@ -30,8 +40,13 @@ testWithServiceTypes((serviceVersion) => { } describe("RuntimeInformation", function(): void { - let producerClient: EventHubProducerClient; - let consumerClient: EventHubConsumerClient; + const clientTypes = [ + "EventHubBufferedProducerClient", + "EventHubConsumerClient", + "EventHubProducerClient" + ] as const; + const clientMap = new Map(); + const service = { connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], path: env[EnvVarKeys.EVENTHUB_NAME] @@ -49,17 +64,28 @@ testWithServiceTypes((serviceVersion) => { beforeEach(async () => { debug("Creating the clients.."); - producerClient = new EventHubProducerClient(service.connectionString, service.path); - consumerClient = new EventHubConsumerClient( - EventHubConsumerClient.defaultConsumerGroupName, - service.connectionString, - service.path + clientMap.set( + "EventHubBufferedProducerClient", + new EventHubBufferedProducerClient(service.connectionString, service.path) + ); + clientMap.set( + "EventHubConsumerClient", + new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString, + service.path + ) + ); + clientMap.set( + "EventHubProducerClient", + new EventHubProducerClient(service.connectionString, service.path) ); }); afterEach("close the connection", async function(): Promise { - await producerClient.close(); - await consumerClient.close(); + for (const client of clientMap.values()) { + await client?.close(); + } }); function arrayOfIncreasingNumbersFromZero(length: any): Array { @@ -70,373 +96,186 @@ testWithServiceTypes((serviceVersion) => { return result; } - describe("getPartitionIds", function(): void { - it("EventHubProducerClient returns an array of partition IDs", async function(): Promise< - void - > { - const ids = await producerClient.getPartitionIds({}); - ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); - }); - - it("EventHubConsumerClient returns an array of partition IDs", async function(): Promise< - void - > { - const ids = await consumerClient.getPartitionIds({}); - ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); - }); - - it("EventHubProducerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); - - const rootSpan = tracer.startSpan("root"); - const ids = await producerClient.getPartitionIds({ - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) - } + clientTypes.forEach((clientType) => { + describe(`${clientType}.getPartitionIds`, () => { + it("returns an array of partition ids", async () => { + const client = clientMap.get(clientType)!; + const ids = await client.getPartitionIds({}); + ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); }); - ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getEventHubProperties", - children: [] - } - ] - } - ] - }; - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); - }); - - it("EventHubConsumerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); + it("can be manually traced", async () => { + const client = clientMap.get(clientType)!; + const { tracer, resetTracer } = setTracerForTest(); - const rootSpan = tracer.startSpan("root"); - const ids = await consumerClient.getPartitionIds({ - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) - } - }); - ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getEventHubProperties", - children: [] - } - ] + const rootSpan = tracer.startSpan("root"); + const ids = await client.getPartitionIds({ + tracingOptions: { + tracingContext: setSpan(context.active(), rootSpan) } - ] - }; - - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); - }); - }); - - describe("hub runtime information", function(): void { - it("EventHubProducerClient gets the hub runtime information", async function(): Promise< - void - > { - const hubRuntimeInfo = await producerClient.getEventHubProperties(); - debug(hubRuntimeInfo); - hubRuntimeInfo.name.should.equal(service.path); - - hubRuntimeInfo.partitionIds.should.have.members( - arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) - ); - hubRuntimeInfo.createdOn.should.be.instanceof(Date); - }); - - it("EventHubConsumerClient gets the hub runtime information", async function(): Promise< - void - > { - const hubRuntimeInfo = await consumerClient.getEventHubProperties(); - debug(hubRuntimeInfo); - hubRuntimeInfo.name.should.equal(service.path); - - hubRuntimeInfo.partitionIds.should.have.members( - arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) - ); - hubRuntimeInfo.createdOn.should.be.instanceof(Date); - }); - - it("EventHubProducerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); - - const rootSpan = tracer.startSpan("root"); - const hubRuntimeInfo = await producerClient.getEventHubProperties({ - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) - } + }); + ids.should.have.members(arrayOfIncreasingNumbersFromZero(ids.length)); + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(1, "Should only have one root span."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.EventHubs.getEventHubProperties", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + resetTracer(); }); - hubRuntimeInfo.partitionIds.should.have.members( - arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) - ); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getEventHubProperties", - children: [] - } - ] - } - ] - }; - - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); }); - it("EventHubConsumerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); + describe(`${clientType}.getEventHubProperties`, () => { + it("gets the Event Hub runtime information", async () => { + const client = clientMap.get(clientType)!; + const hubRuntimeInfo = await client.getEventHubProperties(); + hubRuntimeInfo.name.should.equal(service.path); - const rootSpan = tracer.startSpan("root"); - const hubRuntimeInfo = await consumerClient.getEventHubProperties({ - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) - } + hubRuntimeInfo.partitionIds.should.have.members( + arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) + ); + hubRuntimeInfo.createdOn.should.be.instanceof(Date); }); - hubRuntimeInfo.partitionIds.should.have.members( - arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) - ); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getEventHubProperties", - children: [] - } - ] - } - ] - }; - - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); - }); - }); - describe("partition runtime information", function(): void { - it("EventHubProducerClient should throw an error if partitionId is missing", async function(): Promise< - void - > { - try { - await producerClient.getPartitionProperties(undefined as any); - throw new Error("Test failure"); - } catch (err) { - err.name.should.equal("TypeError"); - err.message.should.equal( - `getPartitionProperties called without required argument "partitionId"` - ); - } - }); + it("can be manually traced", async function(): Promise { + const client = clientMap.get(clientType)!; + const { tracer, resetTracer } = setTracerForTest(); - it("EventHubConsumerClient should throw an error if partitionId is missing", async function(): Promise< - void - > { - try { - await consumerClient.getPartitionProperties(undefined as any); - throw new Error("Test failure"); - } catch (err) { - err.name.should.equal("TypeError"); - err.message.should.equal( - `getPartitionProperties called without required argument "partitionId"` + const rootSpan = tracer.startSpan("root"); + const hubRuntimeInfo = await client.getEventHubProperties({ + tracingOptions: { + tracingContext: setSpan(context.active(), rootSpan) + } + }); + hubRuntimeInfo.partitionIds.should.have.members( + arrayOfIncreasingNumbersFromZero(hubRuntimeInfo.partitionIds.length) ); - } - }); - - it("EventHubProducerClient gets the partition runtime information with partitionId as a string", async function(): Promise< - void - > { - const partitionRuntimeInfo = await producerClient.getPartitionProperties("0"); - debug(partitionRuntimeInfo); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - }); - - it("EventHubConsumerClient gets the partition runtime information with partitionId as a string", async function(): Promise< - void - > { - const partitionRuntimeInfo = await consumerClient.getPartitionProperties("0"); - debug(partitionRuntimeInfo); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - }); - - it("EventHubProducerClient gets the partition runtime information with partitionId as a number", async function(): Promise< - void - > { - const partitionRuntimeInfo = await producerClient.getPartitionProperties(0 as any); - debug(partitionRuntimeInfo); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - }); - - it("EventHubConsumerClient gets the partition runtime information with partitionId as a number", async function(): Promise< - void - > { - const partitionRuntimeInfo = await consumerClient.getPartitionProperties(0 as any); - debug(partitionRuntimeInfo); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - }); - - it("EventHubProducerClient bubbles up error from service for invalid partitionId", async function(): Promise< - void - > { - try { - await producerClient.getPartitionProperties("boo"); - throw new Error("Test failure"); - } catch (err) { - debug(`>>>> Received error - `, err); - should.exist(err); - should.equal((err as MessagingError).code, "ArgumentOutOfRangeError"); - } - }); - - it("EventHubConsumerClient bubbles up error from service for invalid partitionId", async function(): Promise< - void - > { - try { - await consumerClient.getPartitionProperties("boo"); - throw new Error("Test failure"); - } catch (err) { - debug(`>>>> Received error - `, err); - should.exist(err); - should.equal((err as MessagingError).code, "ArgumentOutOfRangeError"); - } + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(1, "Should only have one root span."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.EventHubs.getEventHubProperties", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + resetTracer(); + }); }); - it("EventHubProducerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); - - const rootSpan = tracer.startSpan("root"); - const partitionRuntimeInfo = await producerClient.getPartitionProperties("0", { - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) + describe(`${clientType}.getPartitionProperties`, () => { + it("should throw an error if partitionId is missing", async () => { + try { + const client = clientMap.get(clientType)!; + await client.getPartitionProperties(undefined as any); + throw new Error("Test failure"); + } catch (err) { + (err as any).name.should.equal("TypeError"); + (err as any).message.should.equal( + `getPartitionProperties called without required argument "partitionId"` + ); } }); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getPartitionProperties", - children: [] - } - ] - } - ] - }; - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); - }); + it("gets the partition runtime information with partitionId as a string", async () => { + const client = clientMap.get(clientType)!; + const partitionRuntimeInfo = await client.getPartitionProperties("0"); + partitionRuntimeInfo.partitionId.should.equal("0"); + partitionRuntimeInfo.eventHubName.should.equal(service.path); + partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); + should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); + should.exist(partitionRuntimeInfo.lastEnqueuedOffset); + }); - it("EventHubConsumerClient can be manually traced", async function(): Promise { - const { tracer, resetTracer } = setTracerForTest(); + it("gets the partition runtime information with partitionId as a number", async () => { + const client = clientMap.get(clientType)!; + const partitionRuntimeInfo = await client.getPartitionProperties(0 as any); + partitionRuntimeInfo.partitionId.should.equal("0"); + partitionRuntimeInfo.eventHubName.should.equal(service.path); + partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); + should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); + should.exist(partitionRuntimeInfo.lastEnqueuedOffset); + }); - const rootSpan = tracer.startSpan("root"); - const partitionRuntimeInfo = await consumerClient.getPartitionProperties("0", { - tracingOptions: { - tracingContext: setSpan(context.active(), rootSpan) + it("bubbles up error from service for invalid partitionId", async () => { + try { + const client = clientMap.get(clientType)!; + await client.getPartitionProperties("boo"); + throw new Error("Test failure"); + } catch (err) { + should.exist(err); + should.equal((err as MessagingError).code, "ArgumentOutOfRangeError"); } }); - partitionRuntimeInfo.partitionId.should.equal("0"); - partitionRuntimeInfo.eventHubName.should.equal(service.path); - partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); - should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); - should.exist(partitionRuntimeInfo.lastEnqueuedOffset); - rootSpan.end(); - - const rootSpans = tracer.getRootSpans(); - rootSpans.length.should.equal(1, "Should only have one root span."); - rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); - - const expectedGraph: SpanGraph = { - roots: [ - { - name: rootSpan.name, - children: [ - { - name: "Azure.EventHubs.getPartitionProperties", - children: [] - } - ] - } - ] - }; - tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); - tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); - resetTracer(); + it("can be manually traced", async () => { + const client = clientMap.get(clientType)!; + const { tracer, resetTracer } = setTracerForTest(); + + const rootSpan = tracer.startSpan("root"); + const partitionRuntimeInfo = await client.getPartitionProperties("0", { + tracingOptions: { + tracingContext: setSpan(context.active(), rootSpan) + } + }); + partitionRuntimeInfo.partitionId.should.equal("0"); + partitionRuntimeInfo.eventHubName.should.equal(service.path); + partitionRuntimeInfo.lastEnqueuedOnUtc.should.be.instanceof(Date); + should.exist(partitionRuntimeInfo.lastEnqueuedSequenceNumber); + should.exist(partitionRuntimeInfo.lastEnqueuedOffset); + rootSpan.end(); + + const rootSpans = tracer.getRootSpans(); + rootSpans.length.should.equal(1, "Should only have one root span."); + rootSpans[0].should.equal(rootSpan, "The root span should match what was passed in."); + + const expectedGraph: SpanGraph = { + roots: [ + { + name: rootSpan.name, + children: [ + { + name: "Azure.EventHubs.getPartitionProperties", + children: [] + } + ] + } + ] + }; + + tracer.getSpanGraph(rootSpan.spanContext().traceId).should.eql(expectedGraph); + tracer.getActiveSpans().length.should.equal(0, "All spans should have had end called."); + resetTracer(); + }); }); }); }).timeout(60000); diff --git a/sdk/eventhub/mock-hub/src/services/eventHubs.ts b/sdk/eventhub/mock-hub/src/services/eventHubs.ts index 621d67f8e5e0..1eabdad64210 100644 --- a/sdk/eventhub/mock-hub/src/services/eventHubs.ts +++ b/sdk/eventhub/mock-hub/src/services/eventHubs.ts @@ -116,6 +116,8 @@ export class MockEventHub implements IMockEventHub { private _connectionInactivityTimeoutInMs: number; private _connections: Set = new Set(); + + private _clearableTimeouts = new Set>(); /** * This provides a way to find all the partition senders for a combination * of `consumerGroup` and `partitionId`. @@ -188,10 +190,13 @@ export class MockEventHub implements IMockEventHub { }; let tid = setTimeout(forceCloseConnection, this._connectionInactivityTimeoutInMs); + this._clearableTimeouts.add(tid); const bounceTimeout = () => { clearTimeout(tid); + this._clearableTimeouts.delete(tid); tid = setTimeout(forceCloseConnection, this._connectionInactivityTimeoutInMs); + this._clearableTimeouts.add(tid); }; connection.addListener(ConnectionEvents.settled, bounceTimeout); @@ -710,6 +715,10 @@ export class MockEventHub implements IMockEventHub { * Stops the service. */ stop() { + for (const tid of this._clearableTimeouts.values()) { + clearTimeout(tid); + } + this._clearableTimeouts.clear(); return this._mockServer.stop(); }