diff --git a/sdk/core/core-amqp/package.json b/sdk/core/core-amqp/package.json index 47d3694d4777..f1930eda2b11 100644 --- a/sdk/core/core-amqp/package.json +++ b/sdk/core/core-amqp/package.json @@ -70,13 +70,12 @@ }, "dependencies": { "@azure/abort-controller": "^1.0.0", + "@azure/core-auth": "^1.2.0", "@azure/logger": "^1.0.0", "@types/async-lock": "^1.1.0", - "@types/is-buffer": "^2.0.0", "async-lock": "^1.1.3", "buffer": "^5.2.1", "events": "^3.0.0", - "is-buffer": "^2.0.3", "jssha": "^3.1.0", "process": "^0.11.10", "rhea": "^1.0.24", diff --git a/sdk/core/core-amqp/review/core-amqp.api.md b/sdk/core/core-amqp/review/core-amqp.api.md index 0d0a37d9c039..4dd05a9dade7 100644 --- a/sdk/core/core-amqp/review/core-amqp.api.md +++ b/sdk/core/core-amqp/review/core-amqp.api.md @@ -5,15 +5,18 @@ ```ts import { AbortSignalLike } from '@azure/abort-controller'; +import { AccessToken } from '@azure/core-auth'; import { AmqpError } from 'rhea-promise'; import AsyncLock from 'async-lock'; import { Connection } from 'rhea-promise'; import { Message } from 'rhea-promise'; import { MessageHeader } from 'rhea-promise'; import { MessageProperties } from 'rhea-promise'; +import { NamedKeyCredential } from '@azure/core-auth'; import { Receiver } from 'rhea-promise'; import { ReceiverOptions } from 'rhea-promise'; import { ReqResLink } from 'rhea-promise'; +import { SASCredential } from '@azure/core-auth'; import { Sender } from 'rhea-promise'; import { SenderOptions } from 'rhea-promise'; import { Session } from 'rhea-promise'; @@ -338,6 +341,14 @@ export interface CreateConnectionContextBaseParameters { operationTimeoutInMs?: number; } +// @public +export function createSasTokenProvider(data: { + sharedAccessKeyName: string; + sharedAccessKey: string; +} | { + sharedAccessSignature: string; +} | NamedKeyCredential | SASCredential): SasTokenProvider; + // @public export const defaultLock: AsyncLock; @@ -396,6 +407,9 @@ export enum ErrorNameConditionMapper { // @public export function isMessagingError(error: Error | MessagingError): error is MessagingError; +// @public +export function isSasTokenProvider(thing: unknown): thing is SasTokenProvider; + // @public export function isSystemError(err: unknown): err is NetworkSystemError; @@ -517,6 +531,12 @@ export interface RetryOptions { timeoutInMs?: number; } +// @public +export interface SasTokenProvider { + getToken(audience: string): AccessToken; + isSasTokenProvider: true; +} + // @public export interface SendRequestOptions { abortSignal?: AbortSignalLike; diff --git a/sdk/core/core-amqp/src/auth/tokenProvider.ts b/sdk/core/core-amqp/src/auth/tokenProvider.ts new file mode 100644 index 000000000000..9dcb34ab0659 --- /dev/null +++ b/sdk/core/core-amqp/src/auth/tokenProvider.ts @@ -0,0 +1,123 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { + AccessToken, + NamedKeyCredential, + SASCredential, + isNamedKeyCredential, + isSASCredential +} from "@azure/core-auth"; +import jssha from "jssha"; +import { isObjectWithProperties } from "../util/typeGuards"; + +/** + * A SasTokenProvider provides an alternative to TokenCredential for providing an `AccessToken`. + * @hidden + */ +export interface SasTokenProvider { + /** + * Property used to distinguish SasTokenProvider from TokenCredential. + */ + isSasTokenProvider: true; + /** + * Gets the token provided by this provider. + * + * This method is called automatically by Azure SDK client libraries. + * + * @param audience - The audience for which the token is desired. + */ + getToken(audience: string): AccessToken; +} + +/** + * Creates a token provider from the provided shared access data. + * @param data - The sharedAccessKeyName/sharedAccessKey pair or the sharedAccessSignature. + * @hidden + */ +export function createSasTokenProvider( + data: + | { sharedAccessKeyName: string; sharedAccessKey: string } + | { sharedAccessSignature: string } + | NamedKeyCredential + | SASCredential +): SasTokenProvider { + if (isNamedKeyCredential(data) || isSASCredential(data)) { + return new SasTokenProviderImpl(data); + } else if (isObjectWithProperties(data, ["sharedAccessKeyName", "sharedAccessKey"])) { + return new SasTokenProviderImpl({ name: data.sharedAccessKeyName, key: data.sharedAccessKey }); + } else { + return new SasTokenProviderImpl({ signature: data.sharedAccessSignature }); + } +} + +/** + * A TokenProvider that generates a Sas token: + * `SharedAccessSignature sr=&sig=&se=&skn=` + * + * @internal + */ +export class SasTokenProviderImpl implements SasTokenProvider { + /** + * Property used to distinguish TokenProvider from TokenCredential. + */ + get isSasTokenProvider(): true { + return true; + } + + /** + * The SASCredential containing the key name and secret key value. + */ + private _credential: SASCredential | NamedKeyCredential; + + /** + * Initializes a new instance of SasTokenProvider + * @param credential - The source `NamedKeyCredential` or `SASCredential`. + */ + constructor(credential: SASCredential | NamedKeyCredential) { + this._credential = credential; + } + + /** + * Gets the sas token for the specified audience + * @param audience - The audience for which the token is desired. + */ + getToken(audience: string): AccessToken { + if (isNamedKeyCredential(this._credential)) { + return createToken( + this._credential.name, + this._credential.key, + Math.floor(Date.now() / 1000) + 3600, + audience + ); + } else { + return { + token: this._credential.signature, + expiresOnTimestamp: 0 + }; + } + } +} + +/** + * Creates the sas token based on the provided information. + * @param keyName - The shared access key name. + * @param key - The shared access key. + * @param expiry - The time period in unix time after which the token will expire. + * @param audience - The audience for which the token is desired. + * @internal + */ +function createToken(keyName: string, key: string, expiry: number, audience: string): AccessToken { + audience = encodeURIComponent(audience); + keyName = encodeURIComponent(keyName); + const stringToSign = audience + "\n" + expiry; + + const shaObj = new jssha("SHA-256", "TEXT"); + shaObj.setHMACKey(key, "TEXT"); + shaObj.update(stringToSign); + const sig = encodeURIComponent(shaObj.getHMAC("B64")); + return { + token: `SharedAccessSignature sr=${audience}&sig=${sig}&se=${expiry}&skn=${keyName}`, + expiresOnTimestamp: expiry + }; +} diff --git a/sdk/core/core-amqp/src/errors.ts b/sdk/core/core-amqp/src/errors.ts index 0618bad6c8ae..26a8178d1f94 100644 --- a/sdk/core/core-amqp/src/errors.ts +++ b/sdk/core/core-amqp/src/errors.ts @@ -554,6 +554,10 @@ export const retryableErrors: string[] = [ "ServiceUnavailableError", "OperationCancelledError", + // The service may throw UnauthorizedError if credentials have been rotated. + // Attempt to retry in case the user has also rotated their credentials. + "UnauthorizedError", + // OperationTimeoutError occurs when the service fails to respond within a given timeframe. // Since reasons for such failures can be transient, this is treated as a retryable error. "OperationTimeoutError", diff --git a/sdk/core/core-amqp/src/index.ts b/sdk/core/core-amqp/src/index.ts index c987d77ce39e..5583596af4b5 100644 --- a/sdk/core/core-amqp/src/index.ts +++ b/sdk/core/core-amqp/src/index.ts @@ -40,3 +40,4 @@ export { } from "./util/utils"; export { AmqpAnnotatedMessage } from "./amqpAnnotatedMessage"; export { logger } from "./log"; +export * from "./internals"; diff --git a/sdk/core/core-amqp/src/internals.ts b/sdk/core/core-amqp/src/internals.ts new file mode 100644 index 000000000000..d0e7d58cfc9b --- /dev/null +++ b/sdk/core/core-amqp/src/internals.ts @@ -0,0 +1,7 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { SasTokenProvider, createSasTokenProvider } from "./auth/tokenProvider"; +import { isSasTokenProvider } from "./util/typeGuards"; + +export { SasTokenProvider, createSasTokenProvider, isSasTokenProvider }; diff --git a/sdk/core/core-amqp/src/util/typeGuards.ts b/sdk/core/core-amqp/src/util/typeGuards.ts index 9fc7d6d6de83..7e1c0bfd15bc 100644 --- a/sdk/core/core-amqp/src/util/typeGuards.ts +++ b/sdk/core/core-amqp/src/util/typeGuards.ts @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +import { SasTokenProvider } from "../auth/tokenProvider"; + /** * Helper TypeGuard that checks if something is defined or not. * @param thing - Anything @@ -47,3 +49,12 @@ export function objectHasProperty { return typeof thing === "object" && property in (thing as Record); } + +/** + * Typeguard that checks if the input is a SasTokenProvider. + * @param thing - Any object. + * @hidden + */ +export function isSasTokenProvider(thing: unknown): thing is SasTokenProvider { + return isObjectWithProperties(thing, ["isSasTokenProvider"]) && thing.isSasTokenProvider === true; +} diff --git a/sdk/core/core-amqp/test/tokenProvider.spec.ts b/sdk/core/core-amqp/test/tokenProvider.spec.ts new file mode 100644 index 000000000000..06c83fb4bb4d --- /dev/null +++ b/sdk/core/core-amqp/test/tokenProvider.spec.ts @@ -0,0 +1,66 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import chai from "chai"; +const should = chai.should(); +import { AzureNamedKeyCredential, AzureSASCredential } from "@azure/core-auth"; +import { createSasTokenProvider } from "../src/index"; + +describe("SasTokenProvider", function(): void { + describe("createSasTokenProvider", () => { + it("should work as expected with AzureNamedKeyCredential", async function(): Promise { + const keyName = "myKeyName"; + const key = "importantValue"; + const tokenProvider = createSasTokenProvider(new AzureNamedKeyCredential(keyName, key)); + const now = Math.floor(Date.now() / 1000) + 3600; + const tokenInfo = tokenProvider.getToken("myaudience"); + tokenInfo.token.should.match( + /SharedAccessSignature sr=myaudience&sig=(.*)&se=\d{10}&skn=myKeyName/g + ); + tokenInfo.expiresOnTimestamp.should.equal(now); + }); + + it("should work as expected with `shareAccessKeyName` and `sharedAccessKey`", async function(): Promise< + void + > { + // This is how createSasTokenProvider will be called if SAK params are passed through a connection string. + const tokenProvider = createSasTokenProvider({ + sharedAccessKeyName: "sakName", + sharedAccessKey: "sak" + }); + const now = Math.floor(Date.now() / 1000) + 3600; + const tokenInfo = tokenProvider.getToken("sb://hostname.servicebus.windows.net/"); + tokenInfo.token.should.match( + /SharedAccessSignature sr=sb%3A%2F%2Fhostname.servicebus.windows.net%2F&sig=(.*)&se=\d{10}&skn=sakName/g + ); + tokenInfo.expiresOnTimestamp.should.equal(now); + }); + }); + + it("should work as expected with AzureSASCredential", async function(): Promise { + const sasTokenProvider = createSasTokenProvider( + new AzureSASCredential("SharedAccessSignature se=") + ); + const accessToken = sasTokenProvider.getToken("audience isn't used"); + + should.equal( + accessToken.token, + "SharedAccessSignature se=", + "SAS URI we were constructed with should just be returned verbatim without interpretation (and the audience is ignored)" + ); + + should.equal( + accessToken.expiresOnTimestamp, + 0, + "SAS URI always returns 0 for expiry (ignoring what's in the SAS token)" + ); + }); + + it("should work as expected with `sharedAccessSignature`", async function(): Promise { + // This is how createSasTokenProvider will be called if the shared access signature is passed through a connection string. + const tokenProvider = createSasTokenProvider({ sharedAccessSignature: "" }); + const tokenInfo = tokenProvider.getToken("sb://hostname.servicebus.windows.net/"); + tokenInfo.token.should.match(//g); + tokenInfo.expiresOnTimestamp.should.equal(0); + }); +}); diff --git a/sdk/core/core-auth/CHANGELOG.md b/sdk/core/core-auth/CHANGELOG.md index 6971de1660e5..9e0eb12da762 100644 --- a/sdk/core/core-auth/CHANGELOG.md +++ b/sdk/core/core-auth/CHANGELOG.md @@ -3,7 +3,7 @@ ## 1.3.0 (Unreleased) - Adds the `AzureNamedKeyCredential` class which supports credential rotation and a corresponding `NamedKeyCredential` interface to support the use of static string-based names and keys in Azure clients. -- Adds the `isNamedKeyCredential` and `isSASCredential` typeguard functions. +- Adds the `isNamedKeyCredential` and `isSASCredential` typeguard functions similar to the existing `isTokenCredential`. ## 1.2.0 (2021-02-08) diff --git a/sdk/eventhub/event-hubs/CHANGELOG.md b/sdk/eventhub/event-hubs/CHANGELOG.md index 82d3be2727ec..a5cc7142e65c 100644 --- a/sdk/eventhub/event-hubs/CHANGELOG.md +++ b/sdk/eventhub/event-hubs/CHANGELOG.md @@ -2,6 +2,9 @@ ## 5.5.0 (Unreleased) +- Allows passing `NamedKeyCredential` and `SASCredential` as the credential type to `EventHubConsumerClient` and `EventHubProducerClient`. + These credential types support rotation via their `update` methods and are an alternative to using the `SharedAccessKeyName/SharedAccessKey` or `SharedAccessSignature` properties in a connection string. + - Updates the methods on the `CheckpointStore` interface to accept an optional `options` parameter that can be used to pass in an `abortSignal` and `tracingOptions`. diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 71b69e682ead..e9b1d105f026 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -6,9 +6,11 @@ import { AbortSignalLike } from '@azure/abort-controller'; import { MessagingError } from '@azure/core-amqp'; +import { NamedKeyCredential } from '@azure/core-auth'; import { OperationTracingOptions } from '@azure/core-tracing'; import { RetryMode } from '@azure/core-amqp'; import { RetryOptions } from '@azure/core-amqp'; +import { SASCredential } from '@azure/core-auth'; import { Span } from '@opentelemetry/api'; import { SpanContext } from '@opentelemetry/api'; import { TokenCredential } from '@azure/core-auth'; @@ -97,8 +99,8 @@ export class EventHubConsumerClient { constructor(consumerGroup: string, connectionString: string, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions); constructor(consumerGroup: string, connectionString: string, eventHubName: string, options?: EventHubConsumerClientOptions); constructor(consumerGroup: string, connectionString: string, eventHubName: string, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions); - constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, options?: EventHubConsumerClientOptions); - constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions); + constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential | NamedKeyCredential | SASCredential, options?: EventHubConsumerClientOptions); + constructor(consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential | NamedKeyCredential | SASCredential, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions); close(): Promise; static defaultConsumerGroupName: string; get eventHubName(): string; @@ -119,7 +121,7 @@ export interface EventHubConsumerClientOptions extends EventHubClientOptions { export class EventHubProducerClient { constructor(connectionString: string, options?: EventHubClientOptions); constructor(connectionString: string, eventHubName: string, options?: EventHubClientOptions); - constructor(fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential, options?: EventHubClientOptions); + constructor(fullyQualifiedNamespace: string, eventHubName: string, credential: TokenCredential | NamedKeyCredential | SASCredential, options?: EventHubClientOptions); close(): Promise; createBatch(options?: CreateBatchOptions): Promise; get eventHubName(): string; diff --git a/sdk/eventhub/event-hubs/src/connectionContext.ts b/sdk/eventhub/event-hubs/src/connectionContext.ts index 7b54c599f91c..d462724b53cb 100644 --- a/sdk/eventhub/event-hubs/src/connectionContext.ts +++ b/sdk/eventhub/event-hubs/src/connectionContext.ts @@ -7,21 +7,32 @@ import { logger, logErrorStackTrace } from "./log"; import { getRuntimeInfo } from "./util/runtimeInfo"; import { packageJsonInfo } from "./util/constants"; -import { parseEventHubConnectionString } from "./util/connectionStringUtils"; +import { + EventHubConnectionStringProperties, + parseEventHubConnectionString +} from "./util/connectionStringUtils"; import { EventHubReceiver } from "./eventHubReceiver"; import { EventHubSender } from "./eventHubSender"; import { ConnectionContextBase, Constants, CreateConnectionContextBaseParameters, - ConnectionConfig + ConnectionConfig, + SasTokenProvider, + createSasTokenProvider } from "@azure/core-amqp"; -import { TokenCredential, isTokenCredential } from "@azure/core-auth"; +import { + TokenCredential, + NamedKeyCredential, + SASCredential, + isNamedKeyCredential, + isSASCredential +} from "@azure/core-auth"; import { ManagementClient, ManagementClientOptions } from "./managementClient"; import { EventHubClientOptions } from "./models/public"; import { Connection, ConnectionEvents, Dictionary, EventContext, OnAmqpEvent } from "rhea-promise"; import { EventHubConnectionConfig } from "./eventhubConnectionConfig"; -import { SharedKeyCredential } from "./eventhubSharedKeyCredential"; +import { isCredential } from "./util/typeGuards"; /** * @internal @@ -36,9 +47,9 @@ export interface ConnectionContext extends ConnectionContextBase { readonly config: EventHubConnectionConfig; /** * The credential to be used for Authentication. - * Default value: SharedKeyCredentials. + * Default value: SasTokenProvider. */ - tokenCredential: SharedKeyCredential | TokenCredential; + tokenCredential: SasTokenProvider | TokenCredential; /** * Indicates whether the close() method was * called on theconnection object. @@ -150,7 +161,7 @@ export namespace ConnectionContext { export function create( config: EventHubConnectionConfig, - tokenCredential: SharedKeyCredential | TokenCredential, + tokenCredential: SasTokenProvider | TokenCredential, options?: ConnectionContextOptions ): ConnectionContext { if (!options) options = {}; @@ -436,15 +447,19 @@ export namespace ConnectionContext { export function createConnectionContext( hostOrConnectionString: string, eventHubNameOrOptions?: string | EventHubClientOptions, - credentialOrOptions?: TokenCredential | EventHubClientOptions, + credentialOrOptions?: + | TokenCredential + | NamedKeyCredential + | SASCredential + | EventHubClientOptions, options?: EventHubClientOptions ): ConnectionContext { let connectionString; let config; - let credential: TokenCredential | SharedKeyCredential; + let credential: TokenCredential | SasTokenProvider; hostOrConnectionString = String(hostOrConnectionString); - if (!isTokenCredential(credentialOrOptions)) { + if (!isCredential(credentialOrOptions)) { const parsedCS = parseEventHubConnectionString(hostOrConnectionString); if ( !( @@ -480,13 +495,21 @@ export function createConnectionContext( options = credentialOrOptions; } - // Since connectionstring was passed, create a SharedKeyCredential - credential = SharedKeyCredential.fromConnectionString(connectionString); + const parsed = parseEventHubConnectionString(connectionString) as Required< + | Pick + | Pick + >; + // Since connectionString was passed, create a TokenProvider. + credential = createSasTokenProvider(parsed); } else { // host, eventHubName, a TokenCredential and/or options were passed to constructor const eventHubName = eventHubNameOrOptions; let host = hostOrConnectionString; - credential = credentialOrOptions; + if (isNamedKeyCredential(credentialOrOptions) || isSASCredential(credentialOrOptions)) { + credential = createSasTokenProvider(credentialOrOptions); + } else { + credential = credentialOrOptions; + } if (!eventHubName) { throw new TypeError(`"eventHubName" is missing`); } diff --git a/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts b/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts index 175c220a3869..427ff5633e2f 100644 --- a/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubConsumerClient.ts @@ -19,7 +19,7 @@ import { Subscription, SubscriptionEventHandlers } from "./eventHubConsumerClientModels"; -import { TokenCredential, isTokenCredential } from "@azure/core-auth"; +import { TokenCredential, NamedKeyCredential, SASCredential } from "@azure/core-auth"; import { EventHubProperties, PartitionProperties } from "./managementClient"; import { PartitionGate } from "./impl/partitionGate"; import { v4 as uuid } from "uuid"; @@ -28,6 +28,7 @@ import { LoadBalancingStrategy } from "./loadBalancerStrategies/loadBalancingStr import { UnbalancedLoadBalancingStrategy } from "./loadBalancerStrategies/unbalancedStrategy"; import { GreedyLoadBalancingStrategy } from "./loadBalancerStrategies/greedyStrategy"; import { BalancedLoadBalancingStrategy } from "./loadBalancerStrategies/balancedStrategy"; +import { isCredential } from "./util/typeGuards"; const defaultConsumerClientOptions: Required.servicebus.windows.net * @param eventHubName - The name of the specific Event Hub to connect the client to. * @param credential - An credential object used by the client to get the token to authenticate the connection - * with the Azure Event Hubs service. See @azure/identity for creating the credentials. + * with the Azure Event Hubs service. + * See @azure/identity for creating credentials that support AAD auth. + * Use the `AzureNamedKeyCredential` from @azure/core-auth if you want to pass in a `SharedAccessKeyName` + * and `SharedAccessKey` without using a connection string. These fields map to the `name` and `key` field respectively + * in `AzureNamedKeyCredential`. + * Use the `AzureSASCredential` from @azure/core-auth if you want to pass in a `SharedAccessSignature` + * without using a connection string. This field maps to `signature` in `AzureSASCredential`. * @param options - A set of options to apply when configuring the client. * - `retryOptions` : Configures the retry policy for all the operations on the client. * For example, `{ "maxRetries": 4 }` or `{ "maxRetries": 4, "retryDelayInMs": 30000 }`. @@ -206,7 +213,7 @@ export class EventHubConsumerClient { consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, - credential: TokenCredential, + credential: TokenCredential | NamedKeyCredential | SASCredential, options?: EventHubConsumerClientOptions ); // #3 /** @@ -217,7 +224,13 @@ export class EventHubConsumerClient { * .servicebus.windows.net * @param eventHubName - The name of the specific Event Hub to connect the client to. * @param credential - An credential object used by the client to get the token to authenticate the connection - * with the Azure Event Hubs service. See @azure/identity for creating the credentials. + * with the Azure Event Hubs service. + * See @azure/identity for creating credentials that support AAD auth. + * Use the `AzureNamedKeyCredential` from @azure/core-auth if you want to pass in a `SharedAccessKeyName` + * and `SharedAccessKey` without using a connection string. These fields map to the `name` and `key` field respectively + * in `AzureNamedKeyCredential`. + * Use the `AzureSASCredential` from @azure/core-auth if you want to pass in a `SharedAccessSignature` + * without using a connection string. This field maps to `signature` in `AzureSASCredential`. * @param checkpointStore - A checkpoint store that is used by the client to read checkpoints to determine * the position from where it should resume receiving events when your application gets restarted. * It is also used by the client to load balance multiple instances of your application. @@ -231,7 +244,7 @@ export class EventHubConsumerClient { consumerGroup: string, fullyQualifiedNamespace: string, eventHubName: string, - credential: TokenCredential, + credential: TokenCredential | NamedKeyCredential | SASCredential, checkpointStore: CheckpointStore, options?: EventHubConsumerClientOptions ); // #3.1 @@ -245,11 +258,13 @@ export class EventHubConsumerClient { checkpointStoreOrCredentialOrOptions4?: | CheckpointStore | EventHubConsumerClientOptions - | TokenCredential, + | TokenCredential + | NamedKeyCredential + | SASCredential, checkpointStoreOrOptions5?: CheckpointStore | EventHubConsumerClientOptions, options6?: EventHubConsumerClientOptions ) { - if (isTokenCredential(checkpointStoreOrCredentialOrOptions4)) { + if (isCredential(checkpointStoreOrCredentialOrOptions4)) { // #3 or 3.1 logger.info("Creating EventHubConsumerClient with TokenCredential."); diff --git a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts index 86e31b81f625..d1ff253ed3f9 100644 --- a/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubProducerClient.ts @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { isTokenCredential, TokenCredential } from "@azure/core-auth"; +import { NamedKeyCredential, SASCredential, TokenCredential } from "@azure/core-auth"; import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api"; import { ConnectionContext, createConnectionContext } from "./connectionContext"; import { instrumentEventData, TRACEPARENT_PROPERTY } from "./diagnostics/instrumentEventData"; @@ -20,7 +20,7 @@ import { SendBatchOptions } from "./models/public"; import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error"; -import { isDefined } from "./util/typeGuards"; +import { isCredential, isDefined } from "./util/typeGuards"; import { OperationOptions } from "./util/operationOptions"; import { createEventHubSpan } from "./diagnostics/tracing"; @@ -101,7 +101,13 @@ export class EventHubProducerClient { * .servicebus.windows.net * @param eventHubName - The name of the specific Event Hub to connect the client to. * @param credential - An credential object used by the client to get the token to authenticate the connection - * with the Azure Event Hubs service. See @azure/identity for creating the credentials. + * with the Azure Event Hubs service. + * See @azure/identity for creating credentials that support AAD auth. + * Use the `AzureNamedKeyCredential` from @azure/core-auth if you want to pass in a `SharedAccessKeyName` + * and `SharedAccessKey` without using a connection string. These fields map to the `name` and `key` field respectively + * in `AzureNamedKeyCredential`. + * Use the `AzureSASCredential` from @azure/core-auth if you want to pass in a `SharedAccessSignature` + * without using a connection string. This field maps to `signature` in `AzureSASCredential`. * @param options - A set of options to apply when configuring the client. * - `retryOptions` : Configures the retry policy for all the operations on the client. * For example, `{ "maxRetries": 4 }` or `{ "maxRetries": 4, "retryDelayInMs": 30000 }`. @@ -111,13 +117,17 @@ export class EventHubProducerClient { constructor( fullyQualifiedNamespace: string, eventHubName: string, - credential: TokenCredential, + credential: TokenCredential | NamedKeyCredential | SASCredential, options?: EventHubClientOptions // eslint-disable-line @azure/azure-sdk/ts-naming-options ); constructor( fullyQualifiedNamespaceOrConnectionString1: string, eventHubNameOrOptions2?: string | EventHubClientOptions, - credentialOrOptions3?: TokenCredential | EventHubClientOptions, + credentialOrOptions3?: + | TokenCredential + | NamedKeyCredential + | SASCredential + | EventHubClientOptions, options4?: EventHubClientOptions // eslint-disable-line @azure/azure-sdk/ts-naming-options ) { this._context = createConnectionContext( @@ -128,7 +138,7 @@ export class EventHubProducerClient { ); if (typeof eventHubNameOrOptions2 !== "string") { this._clientOptions = eventHubNameOrOptions2 || {}; - } else if (!isTokenCredential(credentialOrOptions3)) { + } else if (!isCredential(credentialOrOptions3)) { this._clientOptions = credentialOrOptions3 || {}; } else { this._clientOptions = options4 || {}; diff --git a/sdk/eventhub/event-hubs/src/eventhubSharedKeyCredential.ts b/sdk/eventhub/event-hubs/src/eventhubSharedKeyCredential.ts deleted file mode 100644 index bb04cc8df6ef..000000000000 --- a/sdk/eventhub/event-hubs/src/eventhubSharedKeyCredential.ts +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -import { parseEventHubConnectionString } from "./util/connectionStringUtils"; -import { AccessToken } from "@azure/core-auth"; -import { Buffer } from "buffer"; -import isBuffer from "is-buffer"; -import jssha from "jssha"; - -/** - * Defines the SharedKeyCredential . - */ -export class SharedKeyCredential { - /** - * The name of the EventHub/ServiceBus key. - */ - keyName: string; - - /** - * The secret value associated with the above EventHub/ServiceBus key. - */ - key: string; - - /** - * Initializes a new instance of SharedKeyCredential - * @param keyName - The name of the EventHub/ServiceBus key. - * @param key - The secret value associated with the above EventHub/ServiceBus key - */ - constructor(keyName: string, key: string) { - this.keyName = keyName; - this.key = key; - } - - /** - * Gets the sas token for the specified audience - * @param audience - The audience for which the token is desired. - */ - getToken(audience: string): AccessToken { - return this._createToken(Math.floor(Date.now() / 1000) + 3600, audience); - } - - /** - * Creates the sas token based on the provided information - * @param expiry - The time period in unix time after which the token will expire. - * @param audience - The audience for which the token is desired. - * @param hashInput - The input to be provided to hmac to create the hash. - */ - protected _createToken( - expiry: number, - audience: string, - hashInput?: string | Buffer - ): AccessToken { - audience = encodeURIComponent(audience); - const keyName = encodeURIComponent(this.keyName); - const stringToSign = audience + "\n" + expiry; - hashInput = hashInput || this.key; - let shaObj: any; - if (isBuffer(hashInput)) { - shaObj = new jssha("SHA-256", "ARRAYBUFFER"); - shaObj.setHMACKey(hashInput, "ARRAYBUFFER"); - shaObj.update(Buffer.from(stringToSign)); - } else { - shaObj = new jssha("SHA-256", "TEXT"); - shaObj.setHMACKey(hashInput, "TEXT"); - shaObj.update(stringToSign); - } - const sig = encodeURIComponent(shaObj.getHMAC("B64")); - return { - token: `SharedAccessSignature sr=${audience}&sig=${sig}&se=${expiry}&skn=${keyName}`, - expiresOnTimestamp: expiry - }; - } - - /** - * Creates a token provider from the EventHub/ServiceBus connection string; - * @param connectionString - The EventHub/ServiceBus connection string - */ - static fromConnectionString(connectionString: string): SharedKeyCredential { - const parsed = parseEventHubConnectionString(connectionString); - - if (parsed.sharedAccessSignature == null) { - return new SharedKeyCredential(parsed.sharedAccessKeyName!, parsed.sharedAccessKey!); - } else { - return new SharedAccessSignatureCredential(parsed.sharedAccessSignature); - } - } -} - -/** - * A credential that takes a SharedAccessSignature: - * `SharedAccessSignature sr=&sig=&se=&skn=` - * - * @internal - */ -export class SharedAccessSignatureCredential extends SharedKeyCredential { - private _accessToken: AccessToken; - - /** - * @param sharedAccessSignature - A shared access signature of the form - * `SharedAccessSignature sr=&sig=&se=&skn=` - */ - constructor(sharedAccessSignature: string) { - super("", ""); - - this._accessToken = { - token: sharedAccessSignature, - expiresOnTimestamp: 0 - }; - } - - /** - * Retrieve a valid token for authenticaton. - * - * @param _audience - Not applicable in SharedAccessSignatureCredential as the token is not re-generated at every invocation of the method - */ - getToken(_audience: string): AccessToken { - return this._accessToken; - } -} diff --git a/sdk/eventhub/event-hubs/src/linkEntity.ts b/sdk/eventhub/event-hubs/src/linkEntity.ts index a631d8a3dac0..c2ed9233f2e8 100644 --- a/sdk/eventhub/event-hubs/src/linkEntity.ts +++ b/sdk/eventhub/event-hubs/src/linkEntity.ts @@ -2,12 +2,11 @@ // Licensed under the MIT license. import { v4 as uuid } from "uuid"; -import { Constants, TokenType, defaultLock } from "@azure/core-amqp"; +import { Constants, TokenType, defaultLock, isSasTokenProvider } from "@azure/core-amqp"; import { AccessToken } from "@azure/core-auth"; import { ConnectionContext } from "./connectionContext"; import { AwaitableSender, Receiver } from "rhea-promise"; import { logger } from "./log"; -import { SharedKeyCredential } from "../src/eventhubSharedKeyCredential"; /** * @hidden @@ -132,16 +131,12 @@ export class LinkEntity { }); let tokenObject: AccessToken; let tokenType: TokenType; - if (this._context.tokenCredential instanceof SharedKeyCredential) { + if (isSasTokenProvider(this._context.tokenCredential)) { tokenObject = this._context.tokenCredential.getToken(this.audience); tokenType = TokenType.CbsTokenTypeSas; - // expiresOnTimestamp can be 0 if the token is not meant to be renewed - // (ie, SharedAccessSignatureCredential) - if (tokenObject.expiresOnTimestamp > 0) { - // renew sas token in every 45 minutess - this._tokenTimeoutInMs = (3600 - 900) * 1000; - } + // renew sas token in every 45 minutess + this._tokenTimeoutInMs = (3600 - 900) * 1000; } else { const aadToken = await this._context.tokenCredential.getToken(Constants.aadEventHubsScope); if (!aadToken) { diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index 2b7d23d5d0ae..247443937b9b 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -10,6 +10,7 @@ import { RetryOptions, SendRequestOptions, defaultLock, + isSasTokenProvider, retry, translate } from "@azure/core-amqp"; @@ -31,7 +32,6 @@ import { AbortSignalLike } from "@azure/abort-controller"; import { throwErrorIfConnectionClosed, throwTypeErrorIfParameterMissing } from "./util/error"; import { CanonicalCode } from "@opentelemetry/api"; import { OperationOptions } from "./util/operationOptions"; -import { SharedKeyCredential } from "../src/eventhubSharedKeyCredential"; import { createEventHubSpan } from "./diagnostics/tracing"; import { waitForTimeoutOrAbortOrResolve } from "./util/timeoutAbortSignalUtils"; @@ -138,7 +138,7 @@ export class ManagementClient extends LinkEntity { * @internal */ async getSecurityToken(): Promise { - if (this._context.tokenCredential instanceof SharedKeyCredential) { + if (isSasTokenProvider(this._context.tokenCredential)) { // the security_token has the $management address removed from the end of the audience // expected audience: sb://fully.qualified.namespace/event-hub-name/$management const audienceParts = this.audience.split("/"); diff --git a/sdk/eventhub/event-hubs/src/util/typeGuards.ts b/sdk/eventhub/event-hubs/src/util/typeGuards.ts index 10b4b0985f1d..f475bfedeadb 100644 --- a/sdk/eventhub/event-hubs/src/util/typeGuards.ts +++ b/sdk/eventhub/event-hubs/src/util/typeGuards.ts @@ -1,6 +1,15 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. +import { + isNamedKeyCredential, + isSASCredential, + isTokenCredential, + NamedKeyCredential, + SASCredential, + TokenCredential +} from "@azure/core-auth"; + /** * Helper TypeGuard that checks if something is defined or not. * @param thing - Anything @@ -45,3 +54,14 @@ export function objectHasProperty { return typeof thing === "object" && property in (thing as Record); } + +/** + * Typeguard that checks if the input is a credential type the clients accept. + * @param thing - Any object. + * @internal + */ +export function isCredential( + thing: unknown +): thing is TokenCredential | NamedKeyCredential | SASCredential { + return isTokenCredential(thing) || isNamedKeyCredential(thing) || isSASCredential(thing); +} diff --git a/sdk/eventhub/event-hubs/test/internal/auth.spec.ts b/sdk/eventhub/event-hubs/test/internal/auth.spec.ts index e1c3f9cf9d03..a72125e1133b 100644 --- a/sdk/eventhub/event-hubs/test/internal/auth.spec.ts +++ b/sdk/eventhub/event-hubs/test/internal/auth.spec.ts @@ -2,24 +2,30 @@ // Licensed under the MIT license. import { + EventHubConnectionStringProperties, EventHubConsumerClient, EventHubProducerClient, parseEventHubConnectionString } from "../../src"; import { EnvVarKeys, getEnvVars } from "../public/utils/testUtils"; import chai from "chai"; -import { SharedKeyCredential } from "../../src/eventhubSharedKeyCredential"; +import { AzureNamedKeyCredential, AzureSASCredential } from "@azure/core-auth"; +import { createSasTokenProvider } from "@azure/core-amqp"; const should = chai.should(); const env = getEnvVars(); -describe("Authentication via SAS", () => { +describe("Authentication via", () => { + const { + endpoint, + fullyQualifiedNamespace, + sharedAccessKey, + sharedAccessKeyName + } = parseEventHubConnectionString(env[EnvVarKeys.EVENTHUB_CONNECTION_STRING]); const service = { connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], path: env[EnvVarKeys.EVENTHUB_NAME], - endpoint: parseEventHubConnectionString( - env[EnvVarKeys.EVENTHUB_CONNECTION_STRING] - ).endpoint.replace(/\/+$/, "") + endpoint: endpoint.replace(/\/+$/, "") }; before(() => { @@ -33,37 +39,145 @@ describe("Authentication via SAS", () => { ); }); - it("EventHubConsumerClient", async () => { - const sasConnectionString = getSasConnectionString(); + describe("Keys", () => { + describe("using connection string", () => { + it("EventHubConsumerClient", async () => { + const consumerClient = new EventHubConsumerClient( + "$Default", + service.connectionString, + service.path + ); - const consumerClient = new EventHubConsumerClient( - "$Default", - sasConnectionString, - service.path - ); + const properties = await consumerClient.getEventHubProperties(); + should.exist(properties); - const properties = await consumerClient.getEventHubProperties(); - should.exist(properties); + await consumerClient.close(); + }); - await consumerClient.close(); - }); + it("EventHubProducerClient", async () => { + const producerClient = new EventHubProducerClient(service.connectionString, service.path); + + const properties = await producerClient.getEventHubProperties(); + should.exist(properties); + + await producerClient.close(); + }); + }); + + describe("using NamedKeyCredential", () => { + it("EventHubConsumerClient", async () => { + const namedKeyCredential = new AzureNamedKeyCredential( + sharedAccessKeyName!, + sharedAccessKey! + ); + + const consumerClient = new EventHubConsumerClient( + "$Default", + fullyQualifiedNamespace, + service.path, + namedKeyCredential + ); + + const properties = await consumerClient.getEventHubProperties(); + should.exist(properties); - it("EventHubProducerClient", async () => { - const sasConnectionString = getSasConnectionString(); + await consumerClient.close(); + }); - const producerClient = new EventHubProducerClient(sasConnectionString, service.path); + it("EventHubProducerClient", async () => { + const namedKeyCredential = new AzureNamedKeyCredential( + sharedAccessKeyName!, + sharedAccessKey! + ); - const properties = await producerClient.getEventHubProperties(); - should.exist(properties); + const producerClient = new EventHubProducerClient( + fullyQualifiedNamespace, + service.path, + namedKeyCredential + ); - await producerClient.close(); + const properties = await producerClient.getEventHubProperties(); + should.exist(properties); + + await producerClient.close(); + }); + }); }); - function getSasConnectionString(): string { - const sas = SharedKeyCredential.fromConnectionString(service.connectionString).getToken( - `${service.endpoint}/${service.path}` - ).token; + describe("SAS", () => { + function getSas(): string { + const parsed = parseEventHubConnectionString(service.connectionString) as Required< + | Pick + | Pick + >; + return createSasTokenProvider(parsed).getToken(`${service.endpoint}/${service.path}`).token; + } + + describe("using connection string", () => { + function getSasConnectionString(): string { + const sas = getSas(); + + return `Endpoint=${service.endpoint}/;SharedAccessSignature=${sas}`; + } + + it("EventHubConsumerClient", async () => { + const sasConnectionString = getSasConnectionString(); + + const consumerClient = new EventHubConsumerClient( + "$Default", + sasConnectionString, + service.path + ); + + const properties = await consumerClient.getEventHubProperties(); + should.exist(properties); - return `Endpoint=${service.endpoint}/;SharedAccessSignature=${sas}`; - } + await consumerClient.close(); + }); + + it("EventHubProducerClient", async () => { + const sasConnectionString = getSasConnectionString(); + + const producerClient = new EventHubProducerClient(sasConnectionString, service.path); + + const properties = await producerClient.getEventHubProperties(); + should.exist(properties); + + await producerClient.close(); + }); + }); + + describe("using SASCredential", () => { + it("EventHubConsumerClient", async () => { + const sasCredential = new AzureSASCredential(getSas()); + + const consumerClient = new EventHubConsumerClient( + "$Default", + fullyQualifiedNamespace, + service.path, + sasCredential + ); + + const properties = await consumerClient.getEventHubProperties(); + should.exist(properties); + + await consumerClient.close(); + }); + + it("EventHubProducerClient", async () => { + const sasCredential = new AzureSASCredential(getSas()); + + const producerClient = new EventHubProducerClient( + fullyQualifiedNamespace, + service.path, + sasCredential + ); + + const properties = await producerClient.getEventHubProperties(); + should.exist(properties); + + await producerClient.close(); + }); + }); + }); }); diff --git a/sdk/eventhub/event-hubs/test/internal/sharedKeyCredential.spec.ts b/sdk/eventhub/event-hubs/test/internal/sharedKeyCredential.spec.ts deleted file mode 100644 index b1bf9d7efbd6..000000000000 --- a/sdk/eventhub/event-hubs/test/internal/sharedKeyCredential.spec.ts +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -import chai from "chai"; -const should = chai.should(); -import { - SharedKeyCredential, - SharedAccessSignatureCredential -} from "../../src/eventhubSharedKeyCredential"; - -describe("SharedKeyCredential", function(): void { - it("should work as expected with required parameters", async function(): Promise { - const keyName = "myKeyName"; - const key = "importantValue"; - const tokenProvider = new SharedKeyCredential(keyName, key); - const now = Math.floor(Date.now() / 1000) + 3600; - const tokenInfo = tokenProvider.getToken("myaudience"); - tokenInfo.token.should.match( - /SharedAccessSignature sr=myaudience&sig=(.*)&se=\d{10}&skn=myKeyName/g - ); - tokenInfo.expiresOnTimestamp.should.equal(now); - }); - it("should work as expected when created from a connection string", async function(): Promise< - void - > { - const cs = - "Endpoint=sb://hostname.servicebus.windows.net/;SharedAccessKeyName=sakName;SharedAccessKey=sak;EntityPath=ep"; - const tokenProvider = SharedKeyCredential.fromConnectionString(cs); - const now = Math.floor(Date.now() / 1000) + 3600; - const tokenInfo = tokenProvider.getToken("sb://hostname.servicebus.windows.net/"); - tokenInfo.token.should.match( - /SharedAccessSignature sr=sb%3A%2F%2Fhostname.servicebus.windows.net%2F&sig=(.*)&se=\d{10}&skn=sakName/g - ); - tokenInfo.expiresOnTimestamp.should.equal(now); - }); - it("SharedAccessSignatureCredential", () => { - const sasCred = new SharedAccessSignatureCredential("SharedAccessSignature se="); - const accessToken = sasCred.getToken("audience isn't used"); - - should.equal( - accessToken.token, - "SharedAccessSignature se=", - "SAS URI we were constructed with should just be returned verbatim without interpretation (and the audience is ignored)" - ); - - should.equal( - accessToken.expiresOnTimestamp, - 0, - "SAS URI always returns 0 for expiry (ignoring what's in the SAS token)" - ); - - // these just exist because we're a SharedKeyCredential but we don't currently - // parse any attributes out (they're available but we've carved out a spot so - // they're not needed.) - should.equal(sasCred.key, ""); - should.equal(sasCred.keyName, ""); - }); -}); diff --git a/sdk/eventhub/event-hubs/test/public/auth.spec.ts b/sdk/eventhub/event-hubs/test/public/auth.spec.ts new file mode 100644 index 000000000000..dd8fce06f393 --- /dev/null +++ b/sdk/eventhub/event-hubs/test/public/auth.spec.ts @@ -0,0 +1,380 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { + EventHubConsumerClient, + EventHubProducerClient, + parseEventHubConnectionString +} from "../../src/index"; +import { EnvVarKeys, getEnvVars } from "./utils/testUtils"; +import chai from "chai"; +import { AzureNamedKeyCredential, AzureSASCredential } from "@azure/core-auth"; +import { createSasTokenProvider } from "@azure/core-amqp"; +import { SinonFakeTimers, useFakeTimers } from "sinon"; + +const should = chai.should(); +const env = getEnvVars(); + +const TEST_FAILURE = "test failure"; + +describe("Authentication via", () => { + const { + endpoint, + fullyQualifiedNamespace, + sharedAccessKey, + sharedAccessKeyName + } = parseEventHubConnectionString(env[EnvVarKeys.EVENTHUB_CONNECTION_STRING]); + const service = { + connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + path: env[EnvVarKeys.EVENTHUB_NAME], + endpoint: endpoint.replace(/\/+$/, "") + }; + + before(() => { + should.exist( + env[EnvVarKeys.EVENTHUB_CONNECTION_STRING], + "define EVENTHUB_CONNECTION_STRING in your environment before running integration tests." + ); + should.exist( + env[EnvVarKeys.EVENTHUB_NAME], + "define EVENTHUB_NAME in your environment before running integration tests." + ); + }); + + let clock: SinonFakeTimers; + beforeEach("setup new space-time continuum", () => { + clock = useFakeTimers({ + now: new Date(), + shouldAdvanceTime: true + }); + }); + + afterEach("returning back to current space-time variant", () => { + clock.restore(); + }); + + describe("AzureNamedKeyCredential", () => { + describe("supports key rotation", () => { + it("EventHubConsumerClient $management calls", async () => { + const namedKeyCredential = new AzureNamedKeyCredential( + sharedAccessKeyName!, + sharedAccessKey! + ); + + const consumerClient = new EventHubConsumerClient( + "$Default", + fullyQualifiedNamespace, + service.path, + namedKeyCredential + ); + + const properties = await consumerClient.getEventHubProperties(); + should.exist(properties); + + // Rotate credential to invalid value. + namedKeyCredential.update("foo", "bar"); + try { + await consumerClient.getEventHubProperties(); + throw new Error(TEST_FAILURE); + } catch (err) { + should.equal(err.code, "UnauthorizedError"); + } + + // Rotate credential to valid value. + namedKeyCredential.update(sharedAccessKeyName!, sharedAccessKey!); + await consumerClient.getEventHubProperties(); + should.exist(properties); + + return consumerClient.close(); + }); + + it("EventHubConsumerClient receive calls", async () => { + const namedKeyCredential = new AzureNamedKeyCredential( + sharedAccessKeyName!, + sharedAccessKey! + ); + + const consumerClient = new EventHubConsumerClient( + "$Default", + fullyQualifiedNamespace, + service.path, + namedKeyCredential, + { + retryOptions: { + maxRetries: 0 + } + } + ); + + await new Promise((resolve, reject) => { + // My attempt at defining the order of operations I expect to see. + const steps: Array<(...args: any[]) => void> = [ + // 1: wait for a `processEvents` to be called, then rotate the credentials to an invalid value and fast forward the clock! + (events: []) => { + if (!Array.isArray(events)) { + reject(new Error("Step 1 failed. Expected to see a list of events.")); + } + // Rotate credentials to invalid values and fast forward past the token refresh. + namedKeyCredential.update("foo", "bar"); + clock.tick(1000 * 60 * 45); + }, + // 2: observe another `processEvents` call. We should see this because the maxWaitTimeInSeconds is set to 5 seconds, and we fast forwarded the clock 45 minutes. + (events: []) => { + if (!Array.isArray(events)) { + reject(new Error("Step 2 failed. Expected to see a list of events.")); + } + }, + // 3: Since the token renewal has occurred, we should start seeing `UnauthorizedError` being thrown from our `processError` handler. + // Rotate the credentials back to valid values. + (err: any) => { + if (err.code !== "UnauthorizedError") { + reject( + new Error(`Step 3 failed. Expected ${err.code} to equal "UnauthorizedError".`) + ); + } + // Rotate the credentials back to valid values. + namedKeyCredential.update(sharedAccessKeyName!, sharedAccessKey!); + }, + // 4: observe another `processEvents` call. + // If the credentials were still invalid, we'd expect to see `processError` thrown instead. + (events: []) => { + if (!Array.isArray(events)) { + reject(new Error("Step 4 failed. Expected to see a list of events.")); + } + resolve(); + } + ]; + + consumerClient.subscribe( + "0", + { + async processError(err) { + const step = steps.shift(); + if (step) step(err); + }, + async processEvents(events) { + const step = steps.shift(); + if (step) step(events); + } + }, + { + maxWaitTimeInSeconds: 5 + } + ); + }); + + return consumerClient.close(); + }); + + it("EventHubProducerClient send calls", async () => { + const namedKeyCredential = new AzureNamedKeyCredential( + sharedAccessKeyName!, + sharedAccessKey! + ); + + const producerClient = new EventHubProducerClient( + fullyQualifiedNamespace, + service.path, + namedKeyCredential, + { + retryOptions: { + maxRetries: 0 + } + } + ); + + // The 1st sendBatch is called with valid credentials, so it should succeed. + await producerClient.sendBatch([{ body: "test" }]); + + // Rotate credential to invalid value. + namedKeyCredential.update("foo", "bar"); + // Fast forward through time to after the token refresh. + clock.tick(1000 * 60 * 45); + + try { + // This sendBatch should fail because we've updated the credential to invalid values and allowed the cbs link to refresh. + await producerClient.sendBatch([{ body: "I don't have access." }]); + throw new Error(TEST_FAILURE); + } catch (err) { + should.equal(err.code, "UnauthorizedError"); + } + + // Rotate credential to valid value. + namedKeyCredential.update(sharedAccessKeyName!, sharedAccessKey!); + + // This last sendBatch should succeed because we've updated our credentials again. + // Notice that we didn't have to fast forward through time to move past a token refresh! + await producerClient.sendBatch([{ body: "test2" }]); + + return producerClient.close(); + }); + }); + }); + + describe("AzureSASCredential", () => { + function getSas(): string { + return createSasTokenProvider({ + sharedAccessKeyName: sharedAccessKeyName!, + sharedAccessKey: sharedAccessKey! + }).getToken(`${service.endpoint}/${service.path}`).token; + } + + describe("supports key rotation", () => { + it("EventHubConsumerClient $management calls", async () => { + const sasCredential = new AzureSASCredential(getSas()); + + const consumerClient = new EventHubConsumerClient( + "$Default", + fullyQualifiedNamespace, + service.path, + sasCredential, + { + retryOptions: { + maxRetries: 0 + } + } + ); + + const properties = await consumerClient.getEventHubProperties(); + should.exist(properties); + + // Rotate credential to invalid value. + sasCredential.update( + `SharedAccessSignature sr=fake&sig=foo&se=${Date.now() / 1000}&skn=FakeKey` + ); + try { + await consumerClient.getEventHubProperties(); + throw new Error(TEST_FAILURE); + } catch (err) { + should.equal(err.code, "UnauthorizedError"); + } + + // Rotate credential to valid value. + sasCredential.update(getSas()); + await consumerClient.getEventHubProperties(); + should.exist(properties); + + return consumerClient.close(); + }); + + it("EventHubConsumerClient receive calls", async () => { + const sasCredential = new AzureSASCredential(getSas()); + + const consumerClient = new EventHubConsumerClient( + "$Default", + fullyQualifiedNamespace, + service.path, + sasCredential, + { + retryOptions: { + maxRetries: 0 + } + } + ); + + await new Promise((resolve, reject) => { + // My attempt at defining the order of operations I expect to see. + const steps: Array<(...args: any[]) => void> = [ + // 1: wait for a `processEvents` to be called, then rotate the credentials to an invalid value and fast forward the clock! + (events: []) => { + if (!Array.isArray(events)) { + reject(new Error("Step 1 failed. Expected to see a list of events.")); + } + // Rotate credentials to invalid values and fast forward past the token refresh. + sasCredential.update( + `SharedAccessSignature sr=fake&sig=foo&se=${Date.now() / 1000}&skn=FakeKey` + ); + clock.tick(1000 * 60 * 45); + }, + // 2: observe another `processEvents` call. We should see this because the maxWaitTimeInSeconds is set to 5 seconds, and we fast forwarded the clock 45 minutes. + (events: []) => { + if (!Array.isArray(events)) { + reject(new Error("Step 2 failed. Expected to see a list of events.")); + } + }, + // 3: Since the token renewal has occurred, we should start seeing `UnauthorizedError` being thrown from our `processError` handler. + // Rotate the credentials back to valid values. + (err: any) => { + if (err.code !== "UnauthorizedError") { + reject( + new Error(`Step 3 failed. Expected ${err.code} to equal "UnauthorizedError".`) + ); + } + // Rotate the credentials back to valid values. + sasCredential.update(getSas()); + }, + // 4: observe another `processEvents` call. + // If the credentials were still invalid, we'd expect to see `processError` thrown instead. + (events: []) => { + if (!Array.isArray(events)) { + reject(new Error("Step 4 failed. Expected to see a list of events.")); + } + resolve(); + } + ]; + + consumerClient.subscribe( + "0", + { + async processError(err) { + const step = steps.shift(); + if (step) step(err); + }, + async processEvents(events) { + const step = steps.shift(); + if (step) step(events); + } + }, + { + maxWaitTimeInSeconds: 5 + } + ); + }); + + return consumerClient.close(); + }); + + it("EventHubProducerClient send calls", async () => { + const sasCredential = new AzureSASCredential(getSas()); + + const producerClient = new EventHubProducerClient( + fullyQualifiedNamespace, + service.path, + sasCredential, + { + retryOptions: { + maxRetries: 0 + } + } + ); + + // The 1st sendBatch is called with valid credentials, so it should succeed. + await producerClient.sendBatch([{ body: "test" }]); + + // Rotate credential to invalid value. + sasCredential.update( + `SharedAccessSignature sr=fake&sig=foo&se=${Date.now() / 1000}&skn=FakeKey` + ); + // Fast forward through time to after the token refresh. + clock.tick(1000 * 60 * 45); + + try { + // This sendBatch should fail because we've updated the credential to invalid values and allowed the cbs link to refresh. + await producerClient.sendBatch([{ body: "I don't have access." }]); + throw new Error(TEST_FAILURE); + } catch (err) { + should.equal(err.code, "UnauthorizedError"); + } + + // Rotate credential to valid value. + sasCredential.update(getSas()); + + // This last sendBatch should succeed because we've updated our credentials again. + // Notice that we didn't have to fast forward through time to move past a token refresh! + await producerClient.sendBatch([{ body: "test2" }]); + + return producerClient.close(); + }); + }); + }); +});