diff --git a/sdk/servicebus/service-bus/README.md b/sdk/servicebus/service-bus/README.md index d4b62b77773a..e6058ae0b784 100644 --- a/sdk/servicebus/service-bus/README.md +++ b/sdk/servicebus/service-bus/README.md @@ -102,7 +102,7 @@ using the [createSender][sbclient_createsender] method. This gives you a sender which you can use to [send][sender_send] messages. ```javascript -const sender = serviceBusClient.createSender("my-queue"); +const sender = await serviceBusClient.createSender("my-queue"); // sending a single message await sender.send({ @@ -189,7 +189,7 @@ When sending the message, set the `sessionId` property in the message to ensure your message lands in the right session. ```javascript -const sender = serviceBusClient.createSender("my-session-queue"); +const sender = await serviceBusClient.createSender("my-session-queue"); await sender.send({ body: "my-message-body", sessionId: "my-session" @@ -216,7 +216,7 @@ There are two ways of choosing which session to open: 1. Specify a `sessionId`, which locks a named session. ```javascript - const receiver = serviceBusClient.createSessionReceiver("my-session-queue", "peekLock", { + const receiver = await serviceBusClient.createSessionReceiver("my-session-queue", "peekLock", { sessionId: "my-session" }); ``` @@ -225,7 +225,7 @@ There are two ways of choosing which session to open: that is not already locked. ```javascript - const receiver = serviceBusClient.createSessionReceiver("my-session-queue", "peekLock"); + const receiver = await serviceBusClient.createSessionReceiver("my-session-queue", "peekLock"); ``` You can find the name of the session via the `sessionId` property on the `SessionReceiver`. diff --git a/sdk/servicebus/service-bus/migrationguide.md b/sdk/servicebus/service-bus/migrationguide.md index 678d1107f123..dcdcd7ff768c 100644 --- a/sdk/servicebus/service-bus/migrationguide.md +++ b/sdk/servicebus/service-bus/migrationguide.md @@ -1,4 +1,4 @@ -# Guide to migrate from @azure/service-bus v1 to v7.preview.1 +# Guide to migrate from @azure/service-bus v1 to v7.preview.2 This document is intended for users that would like to try out preview 7 for @azure/service-bus. As the package is in preview, these details might @@ -108,4 +108,11 @@ brings this package in line with the [Azure SDK Design Guidelines for Typescript ruleManager.removeRule(); ``` +* createSender() and createSessionReceiver() are now async methods and initialize the connection + +Prior to v7 `createSender()` and `createSessionReceiver()` worked using lazy-initialization, where the +AMQP connection would only be initialized on first send or receiving of a message. + +The connection and link are now initialized after calling either method. + ![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-js%2Fsdk%2Fservicebus%2Fservice-bus%2FREADME.png) diff --git a/sdk/servicebus/service-bus/review/service-bus.api.md b/sdk/servicebus/service-bus/review/service-bus.api.md index 8fc6e8040c78..3e93fbc47eb8 100644 --- a/sdk/servicebus/service-bus/review/service-bus.api.md +++ b/sdk/servicebus/service-bus/review/service-bus.api.md @@ -41,6 +41,11 @@ export interface CreateBatchOptions extends OperationOptions { maxSizeInBytes?: number; } +// @public +export interface CreateSenderOptions { + abortSignal?: AbortSignalLike; +} + // @public export interface CreateSessionReceiverOptions extends SessionReceiverOptions, OperationOptions { } @@ -162,7 +167,7 @@ export class ServiceBusClient { createReceiver(queueName: string, receiveMode: "receiveAndDelete"): Receiver; createReceiver(topicName: string, subscriptionName: string, receiveMode: "peekLock"): Receiver; createReceiver(topicName: string, subscriptionName: string, receiveMode: "receiveAndDelete"): Receiver; - createSender(queueOrTopicName: string): Sender; + createSender(queueOrTopicName: string, options?: CreateSenderOptions): Promise; createSessionReceiver(queueName: string, receiveMode: "peekLock", options?: CreateSessionReceiverOptions): Promise>; createSessionReceiver(queueName: string, receiveMode: "receiveAndDelete", options?: CreateSessionReceiverOptions): Promise>; createSessionReceiver(topicName: string, subscriptionName: string, receiveMode: "peekLock", options?: CreateSessionReceiverOptions): Promise>; diff --git a/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js b/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js index b4a4b5c58da5..7056f1d3090e 100644 --- a/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js +++ b/sdk/servicebus/service-bus/samples/javascript/advanced/deferral.js @@ -20,8 +20,7 @@ const { ServiceBusClient, delay } = require("@azure/service-bus"); require("dotenv").config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME || ""; async function main() { @@ -33,7 +32,7 @@ async function main() { async function sendMessages() { const sbClient = new ServiceBusClient(connectionString); // createSender() can also be used to create a sender for a topic. - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); const data = [ { step: 1, title: "Shop" }, @@ -76,7 +75,7 @@ async function receiveMessage() { const deferredSteps = new Map(); let lastProcessedRecipeStep = 0; try { - const processMessage = async brokeredMessage => { + const processMessage = async (brokeredMessage) => { if ( brokeredMessage.label === "RecipeStep" && brokeredMessage.contentType === "application/json" @@ -104,7 +103,7 @@ async function receiveMessage() { await brokeredMessage.deadLetter(); } }; - const processError = async err => { + const processError = async (err) => { console.log(">>>>> Error occurred: ", err); }; receiver.subscribe( @@ -138,6 +137,6 @@ async function receiveMessage() { } } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/javascript/advanced/movingMessagesToDLQ.js b/sdk/servicebus/service-bus/samples/javascript/advanced/movingMessagesToDLQ.js index ca138fe02b17..7d91bba3b092 100644 --- a/sdk/servicebus/service-bus/samples/javascript/advanced/movingMessagesToDLQ.js +++ b/sdk/servicebus/service-bus/samples/javascript/advanced/movingMessagesToDLQ.js @@ -35,7 +35,7 @@ async function main() { async function sendMessage() { // createSender() can also be used to create a sender for a topic. - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); const message = { body: { diff --git a/sdk/servicebus/service-bus/samples/javascript/advanced/processMessageFromDLQ.js b/sdk/servicebus/service-bus/samples/javascript/advanced/processMessageFromDLQ.js index 181d52f1e658..270680887d82 100644 --- a/sdk/servicebus/service-bus/samples/javascript/advanced/processMessageFromDLQ.js +++ b/sdk/servicebus/service-bus/samples/javascript/advanced/processMessageFromDLQ.js @@ -18,8 +18,7 @@ const { ServiceBusClient } = require("@azure/service-bus"); require("dotenv").config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME || ""; const sbClient = new ServiceBusClient(connectionString); @@ -56,20 +55,17 @@ async function processDeadletterMessageQueue() { // Send repaired message back to the current queue / topic async function fixAndResendMessage(oldMessage) { // createSender() can also be used to create a sender for a topic. - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); // Inspect given message and make any changes if necessary const repairedMessage = { ...oldMessage }; - console.log( - ">>>>> Cloning the message from DLQ and resending it - ", - oldMessage.body - ); + console.log(">>>>> Cloning the message from DLQ and resending it - ", oldMessage.body); await sender.send(repairedMessage); await sender.close(); } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/javascript/advanced/sessionState.js b/sdk/servicebus/service-bus/samples/javascript/advanced/sessionState.js index 8aedb3521771..fa84b1c6f238 100644 --- a/sdk/servicebus/service-bus/samples/javascript/advanced/sessionState.js +++ b/sdk/servicebus/service-bus/samples/javascript/advanced/sessionState.js @@ -26,10 +26,8 @@ const { ServiceBusClient } = require("@azure/service-bus"); require("dotenv").config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; -const userEventsQueueName = - process.env.QUEUE_NAME_WITH_SESSIONS || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const userEventsQueueName = process.env.QUEUE_NAME_WITH_SESSIONS || ""; const sbClient = new ServiceBusClient(connectionString); async function main() { try { @@ -74,13 +72,9 @@ async function runScenario() { } async function getSessionState(sessionId) { // If receiving from a subscription you can use the createSessionReceiver(topic, subscription) overload - const sessionReceiver = sbClient.createSessionReceiver( - userEventsQueueName, - "peekLock", - { - sessionId: sessionId - } - ); + const sessionReceiver = await sbClient.createSessionReceiver(userEventsQueueName, "peekLock", { + sessionId: sessionId + }); const sessionState = await sessionReceiver.getState(); if (sessionState) { // Get list of items @@ -92,7 +86,7 @@ async function getSessionState(sessionId) { } async function sendMessagesForSession(shoppingEvents, sessionId) { // createSender() can also be used to create a sender for a topic. - const sender = sbClient.createSender(userEventsQueueName); + const sender = await sbClient.createSender(userEventsQueueName); for (let index = 0; index < shoppingEvents.length; index++) { const message = { sessionId: sessionId, @@ -105,13 +99,9 @@ async function sendMessagesForSession(shoppingEvents, sessionId) { } async function processMessageFromSession(sessionId) { // If receiving from a subscription you can use the createSessionReceiver(topic, subscription) overload - const sessionReceiver = sbClient.createSessionReceiver( - userEventsQueueName, - "peekLock", - { - sessionId - } - ); + const sessionReceiver = await sbClient.createSessionReceiver(userEventsQueueName, "peekLock", { + sessionId + }); const messages = await sessionReceiver.receiveBatch(1, { maxWaitTimeSeconds: 10 @@ -143,6 +133,6 @@ async function processMessageFromSession(sessionId) { await sessionReceiver.close(); } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/javascript/advanced/topicFilters.js b/sdk/servicebus/service-bus/samples/javascript/advanced/topicFilters.js index 4e4faecc565a..97bb5b32e66c 100644 --- a/sdk/servicebus/service-bus/samples/javascript/advanced/topicFilters.js +++ b/sdk/servicebus/service-bus/samples/javascript/advanced/topicFilters.js @@ -20,15 +20,11 @@ const { ServiceBusClient } = require("@azure/service-bus"); // Load the .env file if it exists require("dotenv").config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const topicName = process.env.TOPIC_NAME || ""; -const subscriptionName1 = - process.env.TOPIC_FILTER_SUBSCRIPTION_1 || ""; -const subscriptionName2 = - process.env.TOPIC_FILTER_SUBSCRIPTION_2 || ""; -const subscriptionName3 = - process.env.TOPIC_FILTER_SUBSCRIPTION_3 || ""; +const subscriptionName1 = process.env.TOPIC_FILTER_SUBSCRIPTION_1 || ""; +const subscriptionName2 = process.env.TOPIC_FILTER_SUBSCRIPTION_2 || ""; +const subscriptionName3 = process.env.TOPIC_FILTER_SUBSCRIPTION_3 || ""; async function main() { const sbClient = new ServiceBusClient(connectionString); try { @@ -44,18 +40,9 @@ async function main() { // Adds Rules on subscriptions to route messages from a topic to different subscriptions async function addRules(sbClient) { - const subscription1Client = sbClient.getSubscriptionRuleManager( - topicName, - subscriptionName1 - ); - const subscription2Client = sbClient.getSubscriptionRuleManager( - topicName, - subscriptionName2 - ); - const subscription3Client = sbClient.getSubscriptionRuleManager( - topicName, - subscriptionName3 - ); + const subscription1Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName1); + const subscription2Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName2); + const subscription3Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName3); // The default rule on the subscription allows all messages in. // So, remove existing rules before adding new ones await removeAllRules(subscription1Client); @@ -69,7 +56,7 @@ async function addRules(sbClient) { // Sends 100 messages with a user property called "priority" whose value is between 1 and 4 async function sendMessages(sbClient) { - const sender = sbClient.createSender(topicName); + const sender = await sbClient.createSender(topicName); for (let index = 0; index < 10; index++) { const priority = Math.ceil(Math.random() * 4); const message = { @@ -82,21 +69,9 @@ async function sendMessages(sbClient) { } // Prints messages from the 3 subscriptions async function receiveMessages(sbClient) { - const subscription1 = sbClient.createReceiver( - topicName, - subscriptionName1, - "peekLock" - ); - const subscription2 = sbClient.createReceiver( - topicName, - subscriptionName2, - "peekLock" - ); - const subscription3 = sbClient.createReceiver( - topicName, - subscriptionName3, - "peekLock" - ); + const subscription1 = sbClient.createReceiver(topicName, subscriptionName1, "peekLock"); + const subscription2 = sbClient.createReceiver(topicName, subscriptionName2, "peekLock"); + const subscription3 = sbClient.createReceiver(topicName, subscriptionName3, "peekLock"); const messagesFromSubscription1 = await subscription1.receiveBatch(10, { maxWaitTimeSeconds: 5 @@ -136,6 +111,6 @@ async function removeAllRules(client) { } } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/javascript/scheduledMessages.js b/sdk/servicebus/service-bus/samples/javascript/scheduledMessages.js index 4b9c7905d6c5..80816ad5919e 100644 --- a/sdk/servicebus/service-bus/samples/javascript/scheduledMessages.js +++ b/sdk/servicebus/service-bus/samples/javascript/scheduledMessages.js @@ -18,8 +18,7 @@ const { delay, ServiceBusClient } = require("@azure/service-bus"); require("dotenv").config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME || ""; const listOfScientists = [ { lastName: "Einstein", firstName: "Albert" }, @@ -48,9 +47,9 @@ async function main() { // Scheduling messages to be sent after 10 seconds from now async function sendScheduledMessages(sbClient) { // createSender() handles sending to a queue or a topic - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); - const messages = listOfScientists.map(scientist => ({ + const messages = listOfScientists.map((scientist) => ({ body: `${scientist.firstName} ${scientist.lastName}`, label: "Scientist" })); @@ -71,14 +70,12 @@ async function receiveMessages(sbClient) { let queueReceiver = sbClient.createReceiver(queueName, "peekLock"); let numOfMessagesReceived = 0; - const processMessage = async brokeredMessage => { + const processMessage = async (brokeredMessage) => { numOfMessagesReceived++; - console.log( - `Received message: ${brokeredMessage.body} - ${brokeredMessage.label}` - ); + console.log(`Received message: ${brokeredMessage.body} - ${brokeredMessage.label}`); await brokeredMessage.complete(); }; - const processError = async err => { + const processError = async (err) => { console.log("Error occurred: ", err); }; @@ -106,6 +103,6 @@ async function receiveMessages(sbClient) { await sbClient.close(); } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/javascript/sendMessages.js b/sdk/servicebus/service-bus/samples/javascript/sendMessages.js index 0567a3a09c7d..e656111ece1f 100644 --- a/sdk/servicebus/service-bus/samples/javascript/sendMessages.js +++ b/sdk/servicebus/service-bus/samples/javascript/sendMessages.js @@ -18,8 +18,7 @@ const { ServiceBusClient } = require("@azure/service-bus"); require("dotenv").config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME || ""; const listOfScientists = [ @@ -38,7 +37,7 @@ async function main() { const sbClient = new ServiceBusClient(connectionString); // createSender() can also be used to create a sender for a topic. - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); try { for (let index = 0; index < listOfScientists.length; index++) { @@ -58,6 +57,6 @@ async function main() { } } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/javascript/session.js b/sdk/servicebus/service-bus/samples/javascript/session.js index 0066c7732e1e..684c3dfecee5 100644 --- a/sdk/servicebus/service-bus/samples/javascript/session.js +++ b/sdk/servicebus/service-bus/samples/javascript/session.js @@ -20,8 +20,7 @@ require("dotenv").config(); // Define connection string and related Service Bus entity names here // Ensure on portal.azure.com that queue/topic has Sessions feature enabled -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME_WITH_SESSIONS || ""; const listOfScientists = [ @@ -61,7 +60,7 @@ async function main() { async function sendMessage(sbClient, scientist, sessionId) { // createSender() also works with topics - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); const message = { body: `${scientist.firstName} ${scientist.lastName}`, @@ -81,10 +80,10 @@ async function receiveMessages(sbClient, sessionId) { sessionId: sessionId }); - const processMessage = async message => { + const processMessage = async (message) => { console.log(`Received: ${message.sessionId} - ${message.body} `); }; - const processError = async err => { + const processError = async (err) => { console.log(">>>>> Error occurred: ", err); }; @@ -98,6 +97,6 @@ async function receiveMessages(sbClient, sessionId) { await receiver.close(); } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts b/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts index aba8e144d5f0..ecf8ea98cc99 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/advanced/deferral.ts @@ -22,8 +22,7 @@ import * as dotenv from "dotenv"; dotenv.config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME || ""; export async function main() { @@ -35,7 +34,7 @@ export async function main() { async function sendMessages() { const sbClient = new ServiceBusClient(connectionString); // createSender() can also be used to create a sender for a topic. - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); const data = [ { step: 1, title: "Shop" }, @@ -78,7 +77,7 @@ async function receiveMessage() { const deferredSteps = new Map(); let lastProcessedRecipeStep = 0; try { - const processMessage = async brokeredMessage => { + const processMessage = async (brokeredMessage) => { if ( brokeredMessage.label === "RecipeStep" && brokeredMessage.contentType === "application/json" @@ -106,7 +105,7 @@ async function receiveMessage() { await brokeredMessage.deadLetter(); } }; - const processError = async err => { + const processError = async (err) => { console.log(">>>>> Error occurred: ", err); }; @@ -141,6 +140,6 @@ async function receiveMessage() { } } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/advanced/movingMessagesToDLQ.ts b/sdk/servicebus/service-bus/samples/typescript/src/advanced/movingMessagesToDLQ.ts index f3b8d34b60ab..17cda9b4e6c4 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/advanced/movingMessagesToDLQ.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/advanced/movingMessagesToDLQ.ts @@ -37,7 +37,7 @@ export async function main() { async function sendMessage() { // createSender() can also be used to create a sender for a topic. - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); const message = { body: { diff --git a/sdk/servicebus/service-bus/samples/typescript/src/advanced/processMessageFromDLQ.ts b/sdk/servicebus/service-bus/samples/typescript/src/advanced/processMessageFromDLQ.ts index 32062050bd79..f60c2af4b703 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/advanced/processMessageFromDLQ.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/advanced/processMessageFromDLQ.ts @@ -20,8 +20,7 @@ import * as dotenv from "dotenv"; dotenv.config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME || ""; const sbClient: ServiceBusClient = new ServiceBusClient(connectionString); @@ -58,20 +57,17 @@ async function processDeadletterMessageQueue() { // Send repaired message back to the current queue / topic async function fixAndResendMessage(oldMessage: ServiceBusMessage) { // createSender() can also be used to create a sender for a topic. - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); // Inspect given message and make any changes if necessary const repairedMessage = { ...oldMessage }; - console.log( - ">>>>> Cloning the message from DLQ and resending it - ", - oldMessage.body - ); + console.log(">>>>> Cloning the message from DLQ and resending it - ", oldMessage.body); await sender.send(repairedMessage); await sender.close(); } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/advanced/sessionState.ts b/sdk/servicebus/service-bus/samples/typescript/src/advanced/sessionState.ts index 46e6a4d6967f..c11c0decb995 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/advanced/sessionState.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/advanced/sessionState.ts @@ -30,10 +30,8 @@ import * as dotenv from "dotenv"; dotenv.config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; -const userEventsQueueName = - process.env.QUEUE_NAME_WITH_SESSIONS || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const userEventsQueueName = process.env.QUEUE_NAME_WITH_SESSIONS || ""; const sbClient = new ServiceBusClient(connectionString); export async function main() { @@ -86,13 +84,9 @@ async function runScenario() { async function getSessionState(sessionId: string) { // If receiving from a subscription you can use the createSessionReceiver(topic, subscription) overload - const sessionReceiver = sbClient.createSessionReceiver( - userEventsQueueName, - "peekLock", - { - sessionId: sessionId - } - ); + const sessionReceiver = await sbClient.createSessionReceiver(userEventsQueueName, "peekLock", { + sessionId: sessionId + }); const sessionState = await sessionReceiver.getState(); if (sessionState) { @@ -105,12 +99,9 @@ async function getSessionState(sessionId: string) { await sessionReceiver.close(); } -async function sendMessagesForSession( - shoppingEvents: any[], - sessionId: string -) { +async function sendMessagesForSession(shoppingEvents: any[], sessionId: string) { // createSender() can also be used to create a sender for a topic. - const sender = sbClient.createSender(userEventsQueueName); + const sender = await sbClient.createSender(userEventsQueueName); for (let index = 0; index < shoppingEvents.length; index++) { const message = { @@ -125,13 +116,9 @@ async function sendMessagesForSession( async function processMessageFromSession(sessionId: string) { // If receiving from a subscription you can use the createSessionReceiver(topic, subscription) overload - const sessionReceiver = sbClient.createSessionReceiver( - userEventsQueueName, - "peekLock", - { - sessionId - } - ); + const sessionReceiver = await sbClient.createSessionReceiver(userEventsQueueName, "peekLock", { + sessionId + }); const messages = await sessionReceiver.receiveBatch(1, { maxWaitTimeInMs: 10000 @@ -165,6 +152,6 @@ async function processMessageFromSession(sessionId: string) { await sessionReceiver.close(); } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/advanced/topicFilters.ts b/sdk/servicebus/service-bus/samples/typescript/src/advanced/topicFilters.ts index e074458944a6..e7cf774bab95 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/advanced/topicFilters.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/advanced/topicFilters.ts @@ -18,26 +18,18 @@ Topic filters and actions. */ -import { - ServiceBusClient, - ServiceBusMessage, - SubscriptionRuleManager -} from "@azure/service-bus"; +import { ServiceBusClient, ServiceBusMessage, SubscriptionRuleManager } from "@azure/service-bus"; // Load the .env file if it exists import * as dotenv from "dotenv"; dotenv.config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const topicName = process.env.TOPIC_NAME || ""; -const subscriptionName1 = - process.env.TOPIC_FILTER_SUBSCRIPTION_1 || ""; -const subscriptionName2 = - process.env.TOPIC_FILTER_SUBSCRIPTION_2 || ""; -const subscriptionName3 = - process.env.TOPIC_FILTER_SUBSCRIPTION_3 || ""; +const subscriptionName1 = process.env.TOPIC_FILTER_SUBSCRIPTION_1 || ""; +const subscriptionName2 = process.env.TOPIC_FILTER_SUBSCRIPTION_2 || ""; +const subscriptionName3 = process.env.TOPIC_FILTER_SUBSCRIPTION_3 || ""; export async function main() { const sbClient = new ServiceBusClient(connectionString); @@ -54,18 +46,9 @@ export async function main() { // Adds Rules on subscriptions to route messages from a topic to different subscriptions async function addRules(sbClient: ServiceBusClient) { - const subscription1Client = sbClient.getSubscriptionRuleManager( - topicName, - subscriptionName1 - ); - const subscription2Client = sbClient.getSubscriptionRuleManager( - topicName, - subscriptionName2 - ); - const subscription3Client = sbClient.getSubscriptionRuleManager( - topicName, - subscriptionName3 - ); + const subscription1Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName1); + const subscription2Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName2); + const subscription3Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName3); // The default rule on the subscription allows all messages in. // So, remove existing rules before adding new ones @@ -80,7 +63,7 @@ async function addRules(sbClient: ServiceBusClient) { // Sends 100 messages with a user property called "priority" whose value is between 1 and 4 async function sendMessages(sbClient: ServiceBusClient) { - const sender = sbClient.createSender(topicName); + const sender = await sbClient.createSender(topicName); for (let index = 0; index < 10; index++) { const priority = Math.ceil(Math.random() * 4); const message: ServiceBusMessage = { @@ -95,21 +78,9 @@ async function sendMessages(sbClient: ServiceBusClient) { // Prints messages from the 3 subscriptions async function receiveMessages(sbClient: ServiceBusClient) { - const subscription1 = sbClient.createReceiver( - topicName, - subscriptionName1, - "peekLock" - ); - const subscription2 = sbClient.createReceiver( - topicName, - subscriptionName2, - "peekLock" - ); - const subscription3 = sbClient.createReceiver( - topicName, - subscriptionName3, - "peekLock" - ); + const subscription1 = sbClient.createReceiver(topicName, subscriptionName1, "peekLock"); + const subscription2 = sbClient.createReceiver(topicName, subscriptionName2, "peekLock"); + const subscription3 = sbClient.createReceiver(topicName, subscriptionName3, "peekLock"); const messagesFromSubscription1 = await subscription1.receiveBatch(10, { maxWaitTimeInMs: 5000 @@ -149,6 +120,6 @@ async function removeAllRules(client: SubscriptionRuleManager) { } } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/scheduledMessages.ts b/sdk/servicebus/service-bus/samples/typescript/src/scheduledMessages.ts index 1f1991e148bf..96dafbac402a 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/scheduledMessages.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/scheduledMessages.ts @@ -20,8 +20,7 @@ import * as dotenv from "dotenv"; dotenv.config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME || ""; const listOfScientists = [ @@ -51,9 +50,9 @@ export async function main() { // Scheduling messages to be sent after 10 seconds from now async function sendScheduledMessages(sbClient: ServiceBusClient) { // createSender() handles sending to a queue or a topic - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); - const messages: ServiceBusMessage[] = listOfScientists.map(scientist => ({ + const messages: ServiceBusMessage[] = listOfScientists.map((scientist) => ({ body: `${scientist.firstName} ${scientist.lastName}`, label: "Scientist" })); @@ -74,14 +73,12 @@ async function receiveMessages(sbClient: ServiceBusClient) { let queueReceiver = sbClient.createReceiver(queueName, "peekLock"); let numOfMessagesReceived = 0; - const processMessage = async brokeredMessage => { + const processMessage = async (brokeredMessage) => { numOfMessagesReceived++; - console.log( - `Received message: ${brokeredMessage.body} - ${brokeredMessage.label}` - ); + console.log(`Received message: ${brokeredMessage.body} - ${brokeredMessage.label}`); await brokeredMessage.complete(); }; - const processError = async err => { + const processError = async (err) => { console.log("Error occurred: ", err); }; @@ -112,6 +109,6 @@ async function receiveMessages(sbClient: ServiceBusClient) { await sbClient.close(); } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/sendMessages.ts b/sdk/servicebus/service-bus/samples/typescript/src/sendMessages.ts index cc7f2f1b4a2f..61001381a710 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/sendMessages.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/sendMessages.ts @@ -20,8 +20,7 @@ import * as dotenv from "dotenv"; dotenv.config(); // Define connection string and related Service Bus entity names here -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME || ""; const listOfScientists = [ @@ -41,7 +40,7 @@ export async function main() { const sbClient = new ServiceBusClient(connectionString); // createSender() can also be used to create a sender for a topic. - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); try { for (let index = 0; index < listOfScientists.length; index++) { @@ -61,6 +60,6 @@ export async function main() { } } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/samples/typescript/src/session.ts b/sdk/servicebus/service-bus/samples/typescript/src/session.ts index 6163af1f2bc8..6f62d42455e7 100644 --- a/sdk/servicebus/service-bus/samples/typescript/src/session.ts +++ b/sdk/servicebus/service-bus/samples/typescript/src/session.ts @@ -23,8 +23,7 @@ dotenv.config(); // Define connection string and related Service Bus entity names here // Ensure on portal.azure.com that queue/topic has Sessions feature enabled -const connectionString = - process.env.SERVICE_BUS_CONNECTION_STRING || ""; +const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || ""; const queueName = process.env.QUEUE_NAME_WITH_SESSIONS || ""; const listOfScientists = [ @@ -63,13 +62,9 @@ export async function main() { } } -async function sendMessage( - sbClient: ServiceBusClient, - scientist: any, - sessionId: string -) { +async function sendMessage(sbClient: ServiceBusClient, scientist: any, sessionId: string) { // createSender() also works with topics - const sender = sbClient.createSender(queueName); + const sender = await sbClient.createSender(queueName); const message = { body: `${scientist.firstName} ${scientist.lastName}`, @@ -85,14 +80,14 @@ async function sendMessage( async function receiveMessages(sbClient: ServiceBusClient, sessionId: string) { // If receiving from a subscription you can use the createSessionReceiver(topic, subscription) overload - const receiver = sbClient.createSessionReceiver(queueName, "peekLock", { + const receiver = await sbClient.createSessionReceiver(queueName, "peekLock", { sessionId: sessionId }); const processMessage = async (message: ServiceBusMessage) => { console.log(`Received: ${message.sessionId} - ${message.body} `); }; - const processError = async err => { + const processError = async (err) => { console.log(">>>>> Error occurred: ", err); }; receiver.subscribe({ @@ -105,6 +100,6 @@ async function receiveMessages(sbClient: ServiceBusClient, sessionId: string) { await receiver.close(); } -main().catch(err => { +main().catch((err) => { console.log("Error occurred: ", err); }); diff --git a/sdk/servicebus/service-bus/src/core/messageSender.ts b/sdk/servicebus/service-bus/src/core/messageSender.ts index de01d7d6974b..ffd56ed9985d 100644 --- a/sdk/servicebus/service-bus/src/core/messageSender.ts +++ b/sdk/servicebus/service-bus/src/core/messageSender.ts @@ -46,11 +46,11 @@ import { AbortError, AbortSignalLike } from "@azure/abort-controller"; */ export class MessageSender extends LinkEntity { /** - * @property {string} senderLock The unique lock name per connection that is used to acquire the + * @property {string} openLock The unique lock name per connection that is used to acquire the * lock for establishing a sender link by an entity on that connection. * @readonly */ - readonly senderLock: string = `sender-${generate_uuid()}`; + readonly openLock: string = `sender-${generate_uuid()}`; /** * @property {OnAmqpEvent} _onAmqpError The handler function to handle errors that happen on the * underlying sender. @@ -267,7 +267,7 @@ export class MessageSender extends LinkEntity { const initStartTime = Date.now(); if (!this.isOpen()) { - const initTimeoutPromise = new Promise((res, rejectInitTimeoutPromise) => { + const initTimeoutPromise = new Promise((_res, rejectInitTimeoutPromise) => { initTimeoutTimer = setTimeout(() => { const desc: string = `[${this._context.namespace.connectionId}] Sender "${this.name}" ` + @@ -283,17 +283,7 @@ export class MessageSender extends LinkEntity { }); try { - log.sender( - "Acquiring lock %s for initializing the session, sender and " + - "possibly the connection.", - this.senderLock - ); - - const initPromise = defaultLock.acquire(this.senderLock, () => { - return this._init(); - }); - - await Promise.race([initPromise, initTimeoutPromise]); + await Promise.race([this.open(), initTimeoutPromise]); } catch (err) { err = translate(err); log.warning( @@ -387,66 +377,77 @@ export class MessageSender extends LinkEntity { /** * Initializes the sender session on the connection. */ - private async _init(options?: AwaitableSenderOptions): Promise { - try { - // isOpen isConnecting Should establish - // true false No - // true true No - // false true No - // false false Yes - if (!this.isOpen()) { - log.error( - "[%s] The sender '%s' with address '%s' is not open and is not currently " + - "establishing itself. Hence let's try to connect.", - this._context.namespace.connectionId, - this.name, - this.address - ); - this.isConnecting = true; - await this._negotiateClaim(); - log.error( - "[%s] Trying to create sender '%s'...", - this._context.namespace.connectionId, - this.name - ); - if (!options) { - options = this._createSenderOptions(Constants.defaultOperationTimeoutInMs); + public async open(options?: AwaitableSenderOptions): Promise { + if (this.isOpen()) { + return; + } + + log.sender( + "Acquiring lock %s for initializing the session, sender and possibly the connection.", + this.openLock + ); + + return await defaultLock.acquire(this.openLock, async () => { + try { + // isOpen isConnecting Should establish + // true false No + // true true No + // false true No + // false false Yes + if (!this.isOpen()) { + log.error( + "[%s] The sender '%s' with address '%s' is not open and is not currently " + + "establishing itself. Hence let's try to connect.", + this._context.namespace.connectionId, + this.name, + this.address + ); + this.isConnecting = true; + await this._negotiateClaim(); + log.error( + "[%s] Trying to create sender '%s'...", + this._context.namespace.connectionId, + this.name + ); + if (!options) { + options = this._createSenderOptions(Constants.defaultOperationTimeoutInMs); + } + this._sender = await this._context.namespace.connection.createAwaitableSender(options); + this.isConnecting = false; + log.error( + "[%s] Sender '%s' with address '%s' has established itself.", + this._context.namespace.connectionId, + this.name, + this.address + ); + this._sender.setMaxListeners(1000); + log.error( + "[%s] Promise to create the sender resolved. Created sender with name: %s", + this._context.namespace.connectionId, + this.name + ); + log.error( + "[%s] Sender '%s' created with sender options: %O", + this._context.namespace.connectionId, + this.name, + options + ); + // It is possible for someone to close the sender and then start it again. + // Thus make sure that the sender is present in the client cache. + if (!this._sender) this._context.sender = this; + await this._ensureTokenRenewal(); } - this._sender = await this._context.namespace.connection.createAwaitableSender(options); - this.isConnecting = false; - log.error( - "[%s] Sender '%s' with address '%s' has established itself.", - this._context.namespace.connectionId, - this.name, - this.address - ); - this._sender.setMaxListeners(1000); - log.error( - "[%s] Promise to create the sender resolved. Created sender with name: %s", - this._context.namespace.connectionId, - this.name - ); + } catch (err) { + err = translate(err); log.error( - "[%s] Sender '%s' created with sender options: %O", + "[%s] An error occurred while creating the sender %s", this._context.namespace.connectionId, this.name, - options + err ); - // It is possible for someone to close the sender and then start it again. - // Thus make sure that the sender is present in the client cache. - if (!this._sender) this._context.sender = this; - await this._ensureTokenRenewal(); + throw err; } - } catch (err) { - err = translate(err); - log.error( - "[%s] An error occurred while creating the sender %s", - this._context.namespace.connectionId, - this.name, - err - ); - throw err; - } + }); } /** @@ -509,22 +510,20 @@ export class MessageSender extends LinkEntity { ); } if (shouldReopen) { - await defaultLock.acquire(this.senderLock, () => { - const senderOptions = this._createSenderOptions( - Constants.defaultOperationTimeoutInMs, - true - ); - // shall retry as per the provided retryOptions if the error is a retryable error - // else bail out when the error is not retryable or the operation succeeds. - const config: RetryConfig = { - operation: () => this._init(senderOptions), - connectionId: this._context.namespace.connectionId!, - operationType: RetryOperationType.senderLink, - retryOptions: this._retryOptions, - connectionHost: this._context.namespace.config.host - }; - return retry(config); - }); + const senderOptions = this._createSenderOptions( + Constants.defaultOperationTimeoutInMs, + true + ); + // shall retry as per the provided retryOptions if the error is a retryable error + // else bail out when the error is not retryable or the operation succeeds. + const config: RetryConfig = { + operation: () => this.open(senderOptions), + connectionId: this._context.namespace.connectionId!, + operationType: RetryOperationType.senderLink, + retryOptions: this._retryOptions, + connectionHost: this._context.namespace.config.host + }; + return await retry(config); } } catch (err) { log.error( @@ -723,16 +722,16 @@ export class MessageSender extends LinkEntity { } return new Promise(async (resolve, reject) => { try { - await defaultLock.acquire(this.senderLock, () => { - const config: RetryConfig = { - operation: () => this._init(), - connectionId: this._context.namespace.connectionId, - operationType: RetryOperationType.senderLink, - retryOptions: retryOptions - }; - return retry(config); - }); - resolve(this._sender!.maxMessageSize); + const config: RetryConfig = { + operation: () => this.open(), + connectionId: this._context.namespace.connectionId, + operationType: RetryOperationType.senderLink, + retryOptions: retryOptions + }; + + await retry(config); + + return resolve(this._sender!.maxMessageSize); } catch (err) { reject(err); } diff --git a/sdk/servicebus/service-bus/src/index.ts b/sdk/servicebus/service-bus/src/index.ts index 3ad93a96af04..4bb1702fdecf 100644 --- a/sdk/servicebus/service-bus/src/index.ts +++ b/sdk/servicebus/service-bus/src/index.ts @@ -31,6 +31,7 @@ export { Delivery, WebSocketImpl } from "rhea-promise"; export { GetMessageIteratorOptions, CreateSessionReceiverOptions, + CreateSenderOptions, MessageHandlerOptions, MessageHandlers, ReceiveBatchOptions, diff --git a/sdk/servicebus/service-bus/src/models.ts b/sdk/servicebus/service-bus/src/models.ts index d804f33c8773..6612c02205a7 100644 --- a/sdk/servicebus/service-bus/src/models.ts +++ b/sdk/servicebus/service-bus/src/models.ts @@ -4,6 +4,7 @@ import { OperationOptions } from "./modelsToBeSharedWithEventHubs"; import { SessionReceiverOptions } from "./session/messageSession"; import Long from "long"; +import { AbortSignalLike } from "@azure/abort-controller"; /** * The general message handler interface (used for streamMessages). @@ -103,6 +104,16 @@ export interface MessageHandlerOptions { */ export interface CreateSessionReceiverOptions extends SessionReceiverOptions, OperationOptions {} +/** + * Describes the options passed to the `createSender` method on `ServiceBusClient`. + */ +export interface CreateSenderOptions { + /** + * The signal which can be used to abort requests. + */ + abortSignal?: AbortSignalLike; +} + /** * Describes the options passed to the `browseMessages` method on a receiver. */ diff --git a/sdk/servicebus/service-bus/src/sender.ts b/sdk/servicebus/service-bus/src/sender.ts index fb7651746064..e5c01c99061d 100644 --- a/sdk/servicebus/service-bus/src/sender.ts +++ b/sdk/servicebus/service-bus/src/sender.ts @@ -14,7 +14,7 @@ import { throwTypeErrorIfParameterNotLongArray } from "./util/errors"; import { ServiceBusMessageBatch } from "./serviceBusMessageBatch"; -import { CreateBatchOptions } from "./models"; +import { CreateBatchOptions, CreateSenderOptions } from "./models"; import { retry, RetryOperationType, @@ -400,6 +400,20 @@ export class SenderImpl implements Sender { return retry(config); } + async open(options?: CreateSenderOptions): Promise { + this._throwIfSenderOrConnectionClosed(); + + const config: RetryConfig = { + operation: () => this._sender.open(), + connectionId: this._context.namespace.connectionId, + operationType: RetryOperationType.senderLink, + retryOptions: this._retryOptions, + abortSignal: options?.abortSignal + }; + + return retry(config); + } + async close(): Promise { try { this._isClosed = true; diff --git a/sdk/servicebus/service-bus/src/serviceBusClient.ts b/sdk/servicebus/service-bus/src/serviceBusClient.ts index c29e924bf05a..3614bbd81cee 100644 --- a/sdk/servicebus/service-bus/src/serviceBusClient.ts +++ b/sdk/servicebus/service-bus/src/serviceBusClient.ts @@ -11,7 +11,7 @@ import { import { ConnectionContext } from "./connectionContext"; import { ClientEntityContext } from "./clientEntityContext"; import { SenderImpl, Sender } from "./sender"; -import { CreateSessionReceiverOptions } from "./models"; +import { CreateSessionReceiverOptions, CreateSenderOptions } from "./models"; import { Receiver, ReceiverImpl } from "./receivers/receiver"; import { SessionReceiver, SessionReceiverImpl } from "./receivers/sessionReceiver"; import { ReceivedMessageWithLock, ReceivedMessage } from "./serviceBusMessage"; @@ -239,10 +239,12 @@ export class ServiceBusClient { } /** - * Creates a Sender which can be used to send messages, schedule messages to be sent at a later time - * and cancel such scheduled messages. + * Creates a Sender which can be used to send messages, schedule messages to be + * sent at a later time and cancel such scheduled messages. + * @param queueOrTopicName The name of a queue or topic to send messages to. + * @param options Options for creating a sender. */ - createSender(queueOrTopicName: string): Sender { + async createSender(queueOrTopicName: string, options?: CreateSenderOptions): Promise { validateEntityNamesMatch(this._connectionContext.config.entityPath, queueOrTopicName, "sender"); const clientEntityContext = ClientEntityContext.create( @@ -250,7 +252,9 @@ export class ServiceBusClient { this._connectionContext, `${queueOrTopicName}/${generate_uuid()}` ); - return new SenderImpl(clientEntityContext, this._clientOptions.retryOptions); + const sender = new SenderImpl(clientEntityContext, this._clientOptions.retryOptions); + await sender.open(options); + return sender; } /** diff --git a/sdk/servicebus/service-bus/test/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/abortSignal.spec.ts index 0dd851835b99..c25849e54174 100644 --- a/sdk/servicebus/service-bus/test/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/abortSignal.spec.ts @@ -57,7 +57,7 @@ describe("AbortSignal", () => { it("_trySend with an already aborted AbortSignal", async () => { const sender = new MessageSender(clientEntityContext, { timeoutInMs: 1 }); - sender["_init"] = async () => { + sender["open"] = async () => { throw new Error("INIT SHOULD NEVER HAVE BEEN CALLED"); }; @@ -100,7 +100,7 @@ describe("AbortSignal", () => { let initWasCalled = true; - sender["_init"] = async () => { + sender["open"] = async () => { initWasCalled = true; // long enough to let the init timeout expiration code to run. await delay(1000); diff --git a/sdk/servicebus/service-bus/test/backupMessageSettlement.spec.ts b/sdk/servicebus/service-bus/test/backupMessageSettlement.spec.ts index 1a7f2fe1a9fc..4a81ae449b43 100644 --- a/sdk/servicebus/service-bus/test/backupMessageSettlement.spec.ts +++ b/sdk/servicebus/service-bus/test/backupMessageSettlement.spec.ts @@ -39,7 +39,7 @@ describe("Backup message settlement - Through ManagementLink", () => { receiverClient = await serviceBusClient.test.getPeekLockReceiver(entityNames); senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); deadLetterClient = serviceBusClient.test.createDeadLetterReceiver(entityNames); diff --git a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts index 631f017a5671..035c1eee5f49 100644 --- a/sdk/servicebus/service-bus/test/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/batchReceiver.spec.ts @@ -42,7 +42,7 @@ describe("batchReceiver", () => { receiverClient = await serviceBusClient.test.getPeekLockReceiver(entityNames); senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); deadLetterClient = serviceBusClient.test.createDeadLetterReceiver(entityNames); diff --git a/sdk/servicebus/service-bus/test/connectionManagement.spec.ts b/sdk/servicebus/service-bus/test/connectionManagement.spec.ts new file mode 100644 index 000000000000..73744ad28a16 --- /dev/null +++ b/sdk/servicebus/service-bus/test/connectionManagement.spec.ts @@ -0,0 +1,161 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import chai from "chai"; +const assert = chai.assert; +import chaiAsPromised from "chai-as-promised"; +import { delay } from "../src"; +import { createServiceBusClientForTests, ServiceBusClientForTests } from "./utils/testutils2"; +import { defaultLock } from "@azure/core-amqp"; +import { TestClientType } from "./utils/testUtils"; +import { SenderImpl } from "../src/sender"; +import { AbortController } from "@azure/abort-controller"; + +const should = chai.should(); +chai.use(chaiAsPromised); + +describe("controlled connection initialization", () => { + let sender: SenderImpl; + let senderEntityPath: string; + let serviceBusClient: ServiceBusClientForTests; + + beforeEach(async () => { + serviceBusClient = createServiceBusClientForTests(); + + // there's nothing entity specific about what I'm doing here so it can be any + // entity... + const { queue } = await serviceBusClient.test.createTestEntities( + TestClientType.UnpartitionedQueue + ); + + if (queue == null) { + throw new Error("queue name should not be null"); + } + + // casting because I need access to 'open' and the return type of createSender() is an + // interface. + sender = (await serviceBusClient.createSender(queue!)) as SenderImpl; + senderEntityPath = queue!; + }); + + afterEach(() => { + return serviceBusClient.test.afterEach(); + }); + + it("createSender() is no longer lazy", async () => { + assert.isTrue(sender["_context"].sender?.isOpen()); + await checkThatInitializationDoesntReoccur(sender); + }); + + it("open() early exits if the connection is already open (avoid taking unnecessary lock)", async () => { + // open uses a lock (at the sender level) that helps us not to have overlapping open() calls. + await defaultLock.acquire(sender["_context"]!.sender!["openLock"], async () => { + // the connection is _already_ open so it doesn't attempt to take a lock + // or actually try to open the connection (we have an early exit) + const secondOpenCallPromise = sender.open(); + + sender["_context"].sender!["_negotiateClaim"] = async () => { + // this is a decent way to tell if we tried to open the connection + throw new Error( + "We won't get here at all - the connection is already open so we'll early exit." + ); + }; + + const ret = await Promise.race([delayThatReturns999(), secondOpenCallPromise]); + + // ie, the Promise from sender.open() 'won' because we don't + // acquire the lock when we early-exit. + assert.notExists(ret); + }); + }); + + it("open() properly locks to prevent multiple in-flight open() calls", async () => { + // open uses a lock (at the sender level) that helps us not to have overlapping open() calls. + let secondOpenCallPromise: Promise | undefined; + + // acquire the same lock that open() uses and then, while it's 100% locked, + // attempt to call .open() and see that it just blocks... + await defaultLock.acquire(sender["_context"]!.sender!["openLock"], async () => { + // we need to fake the connection being closed or else `open()` won't attempt to acquire + // the lock. + sender["_context"].sender!["isOpen"] = () => false; + + sender["_context"].sender!["_negotiateClaim"] = async () => { + // this is a decent way to tell that we tried to open the connection + throw new Error("We won't get here until _after_ the lock has been released"); + }; + + secondOpenCallPromise = sender.open(); + const ret = await Promise.race([delayThatReturns999(), secondOpenCallPromise]); + + // this time the delay() call wins since our open() call is blocked on the lock internally + assert.equal(typeof ret, "number"); + }); + + // now that we're outside of the lock we can await on the Promise and it should proceed + try { + await secondOpenCallPromise; + assert.fail("Should have thrown once we reached our stubbed out _negotiateClaim() call"); + } catch (err) { + assert.equal(err.message, "We won't get here until _after_ the lock has been released"); + } + }); + + it("open() doesn't re-open a sender when it's been close()'d", async () => { + // we can't revive a sender. + await sender.close(); + + try { + await sender.open(); + assert.fail("Should have thrown once we reached our stubbed out _negotiateClaim() call"); + } catch (err) { + assert.equal( + err.message, + `The sender for "${senderEntityPath}" has been closed and can no longer be used. Please create a new sender using the "getSender" method on the ServiceBusClient.` + ); + } + }); +}); + +function delayThatReturns999(): Promise | Promise { + const ac = new AbortController(); + return delay(1000, ac.signal, "ignored", 999); +} + +/** + * Checks that calling open() on the sender at this point doesn't reopen it + * NOTE: this does change the underlying sender so you won't be able to use it + * again afterwards. + */ +async function checkThatInitializationDoesntReoccur(sender: SenderImpl) { + // make sure the private details haven't shifted out from underneath me. + should.exist(sender["_context"].sender!["_negotiateClaim"]); + assert.isTrue(sender["_context"].sender!["isOpen"](), "The connection is actually open()"); + + // stub out the `MessageSender` methods that handle initializing the + // connection - now that everything is up we should always see that it + // takes the "early exit" path when it sees that the connection is open + let negotiateClaimWasCalled = false; + + // now we'll just fake the rest + let isOpenWasCalled = false; + + sender["_context"].sender!["isOpen"] = () => { + isOpenWasCalled = true; + return true; + }; + + sender["_context"].sender!["_negotiateClaim"] = async () => { + negotiateClaimWasCalled = true; + }; + + await sender.send({ + body: "sending another message just to prove the connection checks work" + }); + + assert.isTrue(isOpenWasCalled, "we should have checked that the connection was open"); + assert.isFalse( + negotiateClaimWasCalled, + "we should NOT have tried to _negotiateClaim since the connection was open" + ); +} diff --git a/sdk/servicebus/service-bus/test/deferredMessage.spec.ts b/sdk/servicebus/service-bus/test/deferredMessage.spec.ts index cf8ebc81b7fa..124acf60d5ee 100644 --- a/sdk/servicebus/service-bus/test/deferredMessage.spec.ts +++ b/sdk/servicebus/service-bus/test/deferredMessage.spec.ts @@ -32,7 +32,7 @@ describe("deferred messages", () => { receiverClient = await serviceBusClient.test.getPeekLockReceiver(entityNames); senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); deadLetterClient = serviceBusClient.test.createDeadLetterReceiver(entityNames); diff --git a/sdk/servicebus/service-bus/test/invalidParameters.spec.ts b/sdk/servicebus/service-bus/test/invalidParameters.spec.ts index 57f2bbd788ee..b083bf1c010b 100644 --- a/sdk/servicebus/service-bus/test/invalidParameters.spec.ts +++ b/sdk/servicebus/service-bus/test/invalidParameters.spec.ts @@ -310,7 +310,7 @@ describe("invalid parameters", () => { ); sender = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue!) + await serviceBusClient.createSender(entityNames.queue!) ); receiver = await serviceBusClient.test.getSessionPeekLockReceiver(entityNames, { @@ -527,7 +527,7 @@ describe("invalid parameters", () => { ); sender = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue!) + await serviceBusClient.createSender(entityNames.queue!) ); receiver = await serviceBusClient.test.getPeekLockReceiver(entityNames); @@ -669,7 +669,7 @@ describe("invalid parameters", () => { ); //const clients = await getSenderReceiverClients(TestClientType.PartitionedQueue, "peekLock"); - sender = serviceBusClient.test.addToCleanup(serviceBusClient.createSender(queue!)); + sender = serviceBusClient.test.addToCleanup(await serviceBusClient.createSender(queue!)); }); after(() => { diff --git a/sdk/servicebus/service-bus/test/propsToModify.spec.ts b/sdk/servicebus/service-bus/test/propsToModify.spec.ts index fcfe78b89848..ef91336e0de9 100644 --- a/sdk/servicebus/service-bus/test/propsToModify.spec.ts +++ b/sdk/servicebus/service-bus/test/propsToModify.spec.ts @@ -31,7 +31,7 @@ describe("dead lettering", () => { // send a test message with the body being the title of the test (for something unique) const sender = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue) + await serviceBusClient.createSender(entityNames.queue) ); await sender.send({ @@ -181,7 +181,7 @@ describe("abandoning", () => { // send a test message with the body being the title of the test (for something unique) const sender = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue) + await serviceBusClient.createSender(entityNames.queue) ); await sender.send({ @@ -302,7 +302,7 @@ describe("deferring", () => { // send a test message with the body being the title of the test (for something unique) const sender = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue) + await serviceBusClient.createSender(entityNames.queue) ); await sender.send({ diff --git a/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts b/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts index efe3834b6419..584a6a975f80 100644 --- a/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts +++ b/sdk/servicebus/service-bus/test/receiveAndDeleteMode.spec.ts @@ -42,7 +42,7 @@ describe("receive and delete", () => { const entityNames = await serviceBusClient.test.createTestEntities(entityType); senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); if (receiveMode === "peekLock") { receiverClient = await serviceBusClient.test.getPeekLockReceiver(entityNames); diff --git a/sdk/servicebus/service-bus/test/renewLock.spec.ts b/sdk/servicebus/service-bus/test/renewLock.spec.ts index e580402a98be..65681e9d8b2c 100644 --- a/sdk/servicebus/service-bus/test/renewLock.spec.ts +++ b/sdk/servicebus/service-bus/test/renewLock.spec.ts @@ -30,7 +30,7 @@ describe("renew lock", () => { receiverClient = await serviceBusClient.test.getPeekLockReceiver(entityNames); senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); } diff --git a/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts b/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts index ae9e0b3069e9..7f943e9a128a 100644 --- a/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts +++ b/sdk/servicebus/service-bus/test/renewLockSessions.spec.ts @@ -35,7 +35,7 @@ describe("renew lock sessions", () => { const entityNames = await serviceBusClient.test.createTestEntities(entityType); sender = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); sessionId = Date.now().toString(); diff --git a/sdk/servicebus/service-bus/test/retries.spec.ts b/sdk/servicebus/service-bus/test/retries.spec.ts index c769d83ed74e..5c9f132ffaa4 100644 --- a/sdk/servicebus/service-bus/test/retries.spec.ts +++ b/sdk/servicebus/service-bus/test/retries.spec.ts @@ -13,7 +13,7 @@ import { } from "../src"; import { TestClientType, TestMessage } from "./utils/testUtils"; import { ServiceBusClientForTests, createServiceBusClientForTests } from "./utils/testutils2"; -import { Sender } from "../src/sender"; +import { Sender, SenderImpl } from "../src/sender"; import { MessagingError } from "@azure/core-amqp"; import Long from "long"; import { BatchingReceiver } from "../src/core/batchingReceiver"; @@ -46,7 +46,7 @@ describe("Retries - ManagementClient", () => { const entityNames = await serviceBusClient.test.createTestEntities(entityType); senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); receiverClient = await serviceBusClient.test.getPeekLockReceiver(entityNames); subscriptionRuleManager = serviceBusClient.test.addToCleanup( @@ -257,7 +257,7 @@ describe("Retries - MessageSender", () => { const entityNames = await serviceBusClient.test.createTestEntities(entityType); senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); } @@ -266,11 +266,13 @@ describe("Retries - MessageSender", () => { } function mockInitToThrowError() { - const fakeFunction = async function() { + const fakeFunction = function() { numberOfTimesInitInvoked++; throw new MessagingError("Hello there, I'm an error"); }; - (senderClient as any)._sender._negotiateClaim = fakeFunction; + + (senderClient as SenderImpl)["_sender"]["isOpen"] = () => false; + (senderClient as SenderImpl)["_sender"]["open"] = fakeFunction; } async function mockInitAndVerifyRetries(func: Function) { @@ -456,7 +458,7 @@ describe("Retries - onDetached", () => { const entityNames = await serviceBusClient.test.createTestEntities(entityType); senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); receiverClient = await serviceBusClient.test.getPeekLockReceiver(entityNames); } @@ -507,8 +509,9 @@ describe("Retries - onDetached", () => { it("Unpartitioned Queue: sender", async function(): Promise { await beforeEachTest(TestClientType.UnpartitionedQueue); await mockOnDetachedAndVerifyRetries(async () => { - (senderClient as any)._sender._init = fakeFunction; - await (senderClient as any)._sender.onDetached( + (senderClient as SenderImpl)["_sender"]["open"] = fakeFunction; + + await (senderClient as SenderImpl)["_sender"].onDetached( new MessagingError("Hello there, I'm an error") ); }); diff --git a/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts b/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts index ecd4d72be191..d45da05c1fc3 100644 --- a/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts +++ b/sdk/servicebus/service-bus/test/sendAndSchedule.spec.ts @@ -36,7 +36,7 @@ describe("send scheduled messages", () => { receiverClient = await serviceBusClient.test.getPeekLockReceiver(entityNames); senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); } diff --git a/sdk/servicebus/service-bus/test/sendBatch.spec.ts b/sdk/servicebus/service-bus/test/sendBatch.spec.ts index 6d6b5d4aea06..dd1d942ab658 100644 --- a/sdk/servicebus/service-bus/test/sendBatch.spec.ts +++ b/sdk/servicebus/service-bus/test/sendBatch.spec.ts @@ -33,7 +33,7 @@ describe("Send Batch", () => { entityNames = await serviceBusClient.test.createTestEntities(entityType); senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); } diff --git a/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts b/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts index 9dcb7f439e79..160ca94bae9d 100644 --- a/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts +++ b/sdk/servicebus/service-bus/test/serviceBusClient.spec.ts @@ -81,7 +81,7 @@ describe("Random scheme in the endpoint from connection string", function(): voi sbClientWithRelaxedEndPoint = new ServiceBusClient( getEnvVars().SERVICEBUS_CONNECTION_STRING.replace("sb://", "CheeseBurger://") ); - senderClient = sbClientWithRelaxedEndPoint.createSender(entities.queue!); + senderClient = await sbClientWithRelaxedEndPoint.createSender(entities.queue!); receiverClient = !entities.usesSessions ? sbClientWithRelaxedEndPoint.createReceiver(entities.queue!, "peekLock") : await sbClientWithRelaxedEndPoint.createSessionReceiver(entities.queue!, "peekLock", { @@ -159,28 +159,16 @@ describe("Errors with non existing Namespace", function(): void { } }; - it("throws error when sending data to a non existing namespace", async function(): Promise { - await sbClient - .createSender("some-queue") - .send({ body: "hello" }) - .catch(testError); - - should.equal(errorWasThrown, true, "Error thrown flag must be true"); - }); - - it("throws error when creating batch data to a non existing namespace", async function(): Promise< + it("throws error when create a sender for a non existing namespace", async function(): Promise< void > { - const sender = sbClient.createSender("some-queue"); - await sender.createBatch().catch(testError); - should.equal(errorWasThrown, true, "Error thrown flag must be true"); - }); + try { + await sbClient.createSender("some-queue"); + should.fail("Should have thrown"); + } catch (err) { + testError(err); + } - it("throws error when sending batch data to a non existing namespace", async function(): Promise< - void - > { - const sender = sbClient.createSender("some-queue"); - await sender.send(1 as any).catch(testError); should.equal(errorWasThrown, true, "Error thrown flag must be true"); }); @@ -245,31 +233,12 @@ describe("Errors with non existing Queue/Topic/Subscription", async function(): } }; - it("throws error when sending data to a non existing queue", async function(): Promise { - await sbClient - .createSender("some-name") - .send({ body: "hello" }) - .catch((err) => testError(err, "some-name")); + it("throws error when opening a sender to a non-existent queue", async function(): Promise { + await sbClient.createSender("some-name").catch((err) => testError(err, "some-name")); should.equal(errorWasThrown, true, "Error thrown flag must be true"); }); - it("throws error when creating batch data to a non existing queue", async function(): Promise< - void - > { - const sender = sbClient.createSender("some-queue"); - await sender.createBatch().catch((err) => testError(err, "some-queue")); - should.equal(errorWasThrown, true, "Error thrown flag must be true"); - }); - - it("throws error when sending batch data to a non existing queue", async function(): Promise< - void - > { - const sender = sbClient.createSender("some-queue"); - await sender.send(1 as any).catch((err) => testError(err, "some-queue")); - should.equal(errorWasThrown, true, "Error thrown flag must be true"); - }); - it("throws error when receiving batch data from a non existing queue", async function(): Promise< void > { @@ -454,7 +423,7 @@ describe("Errors after close()", function(): void { entityName = await sbClient.test.createTestEntities(entityType); sender = sbClient.test.addToCleanup( - sbClient.createSender(entityName.queue ?? entityName.topic!) + await sbClient.createSender(entityName.queue ?? entityName.topic!) ); receiver = await sbClient.test.getPeekLockReceiver(entityName); @@ -601,7 +570,7 @@ describe("Errors after close()", function(): void { async function testCreateSender(expectedErrorMsg: string): Promise { let errorNewSender: string = ""; try { - sbClient.createSender(entityName.queue ?? entityName.topic!); + await sbClient.createSender(entityName.queue ?? entityName.topic!); } catch (err) { errorNewSender = err.message; } diff --git a/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts b/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts index ed2c0c89fe6e..04ed8b6d0aa6 100644 --- a/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts +++ b/sdk/servicebus/service-bus/test/sessionsRequiredCleanEntityTests.spec.ts @@ -29,7 +29,7 @@ describe("sessions tests - requires completely clean entity for each test", () const entityNames = await serviceBusClient.test.createTestEntities(testClientType); sender = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); // Observation - diff --git a/sdk/servicebus/service-bus/test/sessionsTests.spec.ts b/sdk/servicebus/service-bus/test/sessionsTests.spec.ts index 1223075a0561..e6fa094964e7 100644 --- a/sdk/servicebus/service-bus/test/sessionsTests.spec.ts +++ b/sdk/servicebus/service-bus/test/sessionsTests.spec.ts @@ -49,7 +49,7 @@ describe("session tests", () => { }); sender = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); // Observation - diff --git a/sdk/servicebus/service-bus/test/smoketest.spec.ts b/sdk/servicebus/service-bus/test/smoketest.spec.ts index 53be195234bd..b06d752dfbb2 100644 --- a/sdk/servicebus/service-bus/test/smoketest.spec.ts +++ b/sdk/servicebus/service-bus/test/smoketest.spec.ts @@ -34,8 +34,8 @@ describe("Sample scenarios for track 2", () => { queueName = queue!; }); - beforeEach(() => { - sender = serviceBusClient.test.addToCleanup(serviceBusClient.createSender(queueName)); + beforeEach(async () => { + sender = serviceBusClient.test.addToCleanup(await serviceBusClient.createSender(queueName)); }); afterEach(async () => { @@ -191,8 +191,8 @@ describe("Sample scenarios for track 2", () => { subscription = entity.subscription!; }); - beforeEach(() => { - sender = serviceBusClient.test.addToCleanup(serviceBusClient.createSender(topic)); + beforeEach(async () => { + sender = serviceBusClient.test.addToCleanup(await serviceBusClient.createSender(topic)); }); afterEach(async () => { @@ -332,7 +332,7 @@ describe("Sample scenarios for track 2", () => { TestClientType.UnpartitionedQueueWithSessions ); queue = entities.queue!; - sender = serviceBusClient.test.addToCleanup(serviceBusClient.createSender(queue)); + sender = serviceBusClient.test.addToCleanup(await serviceBusClient.createSender(queue)); }); it("Queue, next unlocked session, sessions", async () => { diff --git a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts index 639d6753cb29..1dcc52caec72 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiver.spec.ts @@ -62,7 +62,7 @@ describe("Streaming", () => { receiverClient = await serviceBusClient.test.getPeekLockReceiver(entityNames); } senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); deadLetterClient = serviceBusClient.test.createDeadLetterReceiver(entityNames); diff --git a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts index bc17d02b9de1..b743ab4eebff 100644 --- a/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts +++ b/sdk/servicebus/service-bus/test/streamingReceiverSessions.spec.ts @@ -52,7 +52,7 @@ describe("Streaming with sessions", () => { const entityNames = await createReceiverForTests(testClientType, receiveMode); senderClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) + await serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!) ); deadLetterClient = serviceBusClient.test.createDeadLetterReceiver(entityNames); diff --git a/sdk/servicebus/service-bus/test/topicFilters.spec.ts b/sdk/servicebus/service-bus/test/topicFilters.spec.ts index 72cb8e01fb35..35dea461e8f6 100644 --- a/sdk/servicebus/service-bus/test/topicFilters.spec.ts +++ b/sdk/servicebus/service-bus/test/topicFilters.spec.ts @@ -47,7 +47,7 @@ describe("topic filters", () => { subscriptionClient = await serviceBusClient.test.getPeekLockReceiver(entityNames); topicClient = serviceBusClient.test.addToCleanup( - serviceBusClient.createSender(entityNames.topic!) + await serviceBusClient.createSender(entityNames.topic!) ); subscriptionRuleManager = subscriptionRuleManager = serviceBusClient.test.addToCleanup( diff --git a/sdk/servicebus/service-bus/test/utils/testutils2.ts b/sdk/servicebus/service-bus/test/utils/testutils2.ts index 9f7f5cfc0eb3..37b5b680d11b 100644 --- a/sdk/servicebus/service-bus/test/utils/testutils2.ts +++ b/sdk/servicebus/service-bus/test/utils/testutils2.ts @@ -129,6 +129,10 @@ export async function drainAllMessages(receiver: Receiver<{}>): Promise { export type EntityName = ReturnType; +/** + * A ServiceBusClient with an additional `test` property with useful methods + * to create receivers and cleanup resources. + */ export interface ServiceBusClientForTests extends ServiceBusClient { test: ServiceBusTestHelpers; }