Skip to content

Commit

Permalink
[service-bus] Track 2 - port sender.open() (Azure#8636)
Browse files Browse the repository at this point in the history
* Open a sender when you create it from servicebusclient

* Update test message

* Updating test file to be a bit more accurate in our new world.

* Somehow missed this one.

* Need to await on createSender now

* Fix merge failures.

* - Adding in retries and abortSignal support into sender.open()/createSender().
- Updating migration guide to mention the change to async for createSender() and createSessionReceiver()

* Updating inline snippets.

* Updating samples for the async sender and session receivers.

* Fixing doc comment for createSender() and properly export the CreateSenderOptions type.

* Internals have changed so that the sender is actually initialized before we've had a chance to mock anything.

Changing to simulate the connection being closed so open() will get called (and fail properly in our mock)

* Simplify the 'send*' style tests for non-existent namespaces to just testing when createSender() is called.

* Same deal here - createSender() opens the connection and the errors are discovered earlier.

* Fixed an issue where I wasn't await'ing on the retry() itself, bypassing the error handling already set up in onDetached.
  • Loading branch information
richardpark-msft authored May 1, 2020
1 parent 19c1a04 commit d0ad83c
Show file tree
Hide file tree
Showing 45 changed files with 458 additions and 381 deletions.
8 changes: 4 additions & 4 deletions sdk/servicebus/service-bus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ using the [createSender][sbclient_createsender] method.
This gives you a sender which you can use to [send][sender_send] messages.

```javascript
const sender = serviceBusClient.createSender("my-queue");
const sender = await serviceBusClient.createSender("my-queue");

// sending a single message
await sender.send({
Expand Down Expand Up @@ -189,7 +189,7 @@ When sending the message, set the `sessionId` property in the message to ensure
your message lands in the right session.
```javascript
const sender = serviceBusClient.createSender("my-session-queue");
const sender = await serviceBusClient.createSender("my-session-queue");
await sender.send({
body: "my-message-body",
sessionId: "my-session"
Expand All @@ -216,7 +216,7 @@ There are two ways of choosing which session to open:
1. Specify a `sessionId`, which locks a named session.
```javascript
const receiver = serviceBusClient.createSessionReceiver("my-session-queue", "peekLock", {
const receiver = await serviceBusClient.createSessionReceiver("my-session-queue", "peekLock", {
sessionId: "my-session"
});
```
Expand All @@ -225,7 +225,7 @@ There are two ways of choosing which session to open:
that is not already locked.
```javascript
const receiver = serviceBusClient.createSessionReceiver("my-session-queue", "peekLock");
const receiver = await serviceBusClient.createSessionReceiver("my-session-queue", "peekLock");
```
You can find the name of the session via the `sessionId` property on the `SessionReceiver`.
Expand Down
9 changes: 8 additions & 1 deletion sdk/servicebus/service-bus/migrationguide.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Guide to migrate from @azure/service-bus v1 to v7.preview.1
# Guide to migrate from @azure/service-bus v1 to v7.preview.2

This document is intended for users that would like to try out preview 7
for @azure/service-bus. As the package is in preview, these details might
Expand Down Expand Up @@ -108,4 +108,11 @@ brings this package in line with the [Azure SDK Design Guidelines for Typescript
ruleManager.removeRule();
```

* createSender() and createSessionReceiver() are now async methods and initialize the connection

Prior to v7 `createSender()` and `createSessionReceiver()` worked using lazy-initialization, where the
AMQP connection would only be initialized on first send or receiving of a message.

The connection and link are now initialized after calling either method.

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-js%2Fsdk%2Fservicebus%2Fservice-bus%2FREADME.png)
7 changes: 6 additions & 1 deletion sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ export interface CreateBatchOptions extends OperationOptions {
maxSizeInBytes?: number;
}

// @public
export interface CreateSenderOptions {
abortSignal?: AbortSignalLike;
}

// @public
export interface CreateSessionReceiverOptions extends SessionReceiverOptions, OperationOptions {
}
Expand Down Expand Up @@ -162,7 +167,7 @@ export class ServiceBusClient {
createReceiver(queueName: string, receiveMode: "receiveAndDelete"): Receiver<ReceivedMessage>;
createReceiver(topicName: string, subscriptionName: string, receiveMode: "peekLock"): Receiver<ReceivedMessageWithLock>;
createReceiver(topicName: string, subscriptionName: string, receiveMode: "receiveAndDelete"): Receiver<ReceivedMessage>;
createSender(queueOrTopicName: string): Sender;
createSender(queueOrTopicName: string, options?: CreateSenderOptions): Promise<Sender>;
createSessionReceiver(queueName: string, receiveMode: "peekLock", options?: CreateSessionReceiverOptions): Promise<SessionReceiver<ReceivedMessageWithLock>>;
createSessionReceiver(queueName: string, receiveMode: "receiveAndDelete", options?: CreateSessionReceiverOptions): Promise<SessionReceiver<ReceivedMessage>>;
createSessionReceiver(topicName: string, subscriptionName: string, receiveMode: "peekLock", options?: CreateSessionReceiverOptions): Promise<SessionReceiver<ReceivedMessageWithLock>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ const { ServiceBusClient, delay } = require("@azure/service-bus");
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const queueName = process.env.QUEUE_NAME || "<queue name>";

async function main() {
Expand All @@ -33,7 +32,7 @@ async function main() {
async function sendMessages() {
const sbClient = new ServiceBusClient(connectionString);
// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(queueName);
const sender = await sbClient.createSender(queueName);

const data = [
{ step: 1, title: "Shop" },
Expand Down Expand Up @@ -76,7 +75,7 @@ async function receiveMessage() {
const deferredSteps = new Map();
let lastProcessedRecipeStep = 0;
try {
const processMessage = async brokeredMessage => {
const processMessage = async (brokeredMessage) => {
if (
brokeredMessage.label === "RecipeStep" &&
brokeredMessage.contentType === "application/json"
Expand Down Expand Up @@ -104,7 +103,7 @@ async function receiveMessage() {
await brokeredMessage.deadLetter();
}
};
const processError = async err => {
const processError = async (err) => {
console.log(">>>>> Error occurred: ", err);
};
receiver.subscribe(
Expand Down Expand Up @@ -138,6 +137,6 @@ async function receiveMessage() {
}
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async function main() {

async function sendMessage() {
// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(queueName);
const sender = await sbClient.createSender(queueName);

const message = {
body: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ const { ServiceBusClient } = require("@azure/service-bus");
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const queueName = process.env.QUEUE_NAME || "<queue name>";

const sbClient = new ServiceBusClient(connectionString);
Expand Down Expand Up @@ -56,20 +55,17 @@ async function processDeadletterMessageQueue() {
// Send repaired message back to the current queue / topic
async function fixAndResendMessage(oldMessage) {
// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(queueName);
const sender = await sbClient.createSender(queueName);

// Inspect given message and make any changes if necessary
const repairedMessage = { ...oldMessage };

console.log(
">>>>> Cloning the message from DLQ and resending it - ",
oldMessage.body
);
console.log(">>>>> Cloning the message from DLQ and resending it - ", oldMessage.body);

await sender.send(repairedMessage);
await sender.close();
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ const { ServiceBusClient } = require("@azure/service-bus");
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const userEventsQueueName =
process.env.QUEUE_NAME_WITH_SESSIONS || "<queue name>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const userEventsQueueName = process.env.QUEUE_NAME_WITH_SESSIONS || "<queue name>";
const sbClient = new ServiceBusClient(connectionString);
async function main() {
try {
Expand Down Expand Up @@ -74,13 +72,9 @@ async function runScenario() {
}
async function getSessionState(sessionId) {
// If receiving from a subscription you can use the createSessionReceiver(topic, subscription) overload
const sessionReceiver = sbClient.createSessionReceiver(
userEventsQueueName,
"peekLock",
{
sessionId: sessionId
}
);
const sessionReceiver = await sbClient.createSessionReceiver(userEventsQueueName, "peekLock", {
sessionId: sessionId
});
const sessionState = await sessionReceiver.getState();
if (sessionState) {
// Get list of items
Expand All @@ -92,7 +86,7 @@ async function getSessionState(sessionId) {
}
async function sendMessagesForSession(shoppingEvents, sessionId) {
// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(userEventsQueueName);
const sender = await sbClient.createSender(userEventsQueueName);
for (let index = 0; index < shoppingEvents.length; index++) {
const message = {
sessionId: sessionId,
Expand All @@ -105,13 +99,9 @@ async function sendMessagesForSession(shoppingEvents, sessionId) {
}
async function processMessageFromSession(sessionId) {
// If receiving from a subscription you can use the createSessionReceiver(topic, subscription) overload
const sessionReceiver = sbClient.createSessionReceiver(
userEventsQueueName,
"peekLock",
{
sessionId
}
);
const sessionReceiver = await sbClient.createSessionReceiver(userEventsQueueName, "peekLock", {
sessionId
});

const messages = await sessionReceiver.receiveBatch(1, {
maxWaitTimeSeconds: 10
Expand Down Expand Up @@ -143,6 +133,6 @@ async function processMessageFromSession(sessionId) {

await sessionReceiver.close();
}
main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@ const { ServiceBusClient } = require("@azure/service-bus");
// Load the .env file if it exists
require("dotenv").config();
// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const topicName = process.env.TOPIC_NAME || "<topic name>";
const subscriptionName1 =
process.env.TOPIC_FILTER_SUBSCRIPTION_1 || "<subscription name>";
const subscriptionName2 =
process.env.TOPIC_FILTER_SUBSCRIPTION_2 || "<subscription name>";
const subscriptionName3 =
process.env.TOPIC_FILTER_SUBSCRIPTION_3 || "<subscription name>";
const subscriptionName1 = process.env.TOPIC_FILTER_SUBSCRIPTION_1 || "<subscription name>";
const subscriptionName2 = process.env.TOPIC_FILTER_SUBSCRIPTION_2 || "<subscription name>";
const subscriptionName3 = process.env.TOPIC_FILTER_SUBSCRIPTION_3 || "<subscription name>";
async function main() {
const sbClient = new ServiceBusClient(connectionString);
try {
Expand All @@ -44,18 +40,9 @@ async function main() {

// Adds Rules on subscriptions to route messages from a topic to different subscriptions
async function addRules(sbClient) {
const subscription1Client = sbClient.getSubscriptionRuleManager(
topicName,
subscriptionName1
);
const subscription2Client = sbClient.getSubscriptionRuleManager(
topicName,
subscriptionName2
);
const subscription3Client = sbClient.getSubscriptionRuleManager(
topicName,
subscriptionName3
);
const subscription1Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName1);
const subscription2Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName2);
const subscription3Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName3);
// The default rule on the subscription allows all messages in.
// So, remove existing rules before adding new ones
await removeAllRules(subscription1Client);
Expand All @@ -69,7 +56,7 @@ async function addRules(sbClient) {

// Sends 100 messages with a user property called "priority" whose value is between 1 and 4
async function sendMessages(sbClient) {
const sender = sbClient.createSender(topicName);
const sender = await sbClient.createSender(topicName);
for (let index = 0; index < 10; index++) {
const priority = Math.ceil(Math.random() * 4);
const message = {
Expand All @@ -82,21 +69,9 @@ async function sendMessages(sbClient) {
}
// Prints messages from the 3 subscriptions
async function receiveMessages(sbClient) {
const subscription1 = sbClient.createReceiver(
topicName,
subscriptionName1,
"peekLock"
);
const subscription2 = sbClient.createReceiver(
topicName,
subscriptionName2,
"peekLock"
);
const subscription3 = sbClient.createReceiver(
topicName,
subscriptionName3,
"peekLock"
);
const subscription1 = sbClient.createReceiver(topicName, subscriptionName1, "peekLock");
const subscription2 = sbClient.createReceiver(topicName, subscriptionName2, "peekLock");
const subscription3 = sbClient.createReceiver(topicName, subscriptionName3, "peekLock");

const messagesFromSubscription1 = await subscription1.receiveBatch(10, {
maxWaitTimeSeconds: 5
Expand Down Expand Up @@ -136,6 +111,6 @@ async function removeAllRules(client) {
}
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
17 changes: 7 additions & 10 deletions sdk/servicebus/service-bus/samples/javascript/scheduledMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ const { delay, ServiceBusClient } = require("@azure/service-bus");
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const queueName = process.env.QUEUE_NAME || "<queue name>";
const listOfScientists = [
{ lastName: "Einstein", firstName: "Albert" },
Expand Down Expand Up @@ -48,9 +47,9 @@ async function main() {
// Scheduling messages to be sent after 10 seconds from now
async function sendScheduledMessages(sbClient) {
// createSender() handles sending to a queue or a topic
const sender = sbClient.createSender(queueName);
const sender = await sbClient.createSender(queueName);

const messages = listOfScientists.map(scientist => ({
const messages = listOfScientists.map((scientist) => ({
body: `${scientist.firstName} ${scientist.lastName}`,
label: "Scientist"
}));
Expand All @@ -71,14 +70,12 @@ async function receiveMessages(sbClient) {
let queueReceiver = sbClient.createReceiver(queueName, "peekLock");

let numOfMessagesReceived = 0;
const processMessage = async brokeredMessage => {
const processMessage = async (brokeredMessage) => {
numOfMessagesReceived++;
console.log(
`Received message: ${brokeredMessage.body} - ${brokeredMessage.label}`
);
console.log(`Received message: ${brokeredMessage.body} - ${brokeredMessage.label}`);
await brokeredMessage.complete();
};
const processError = async err => {
const processError = async (err) => {
console.log("Error occurred: ", err);
};

Expand Down Expand Up @@ -106,6 +103,6 @@ async function receiveMessages(sbClient) {
await sbClient.close();
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
7 changes: 3 additions & 4 deletions sdk/servicebus/service-bus/samples/javascript/sendMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ const { ServiceBusClient } = require("@azure/service-bus");
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const queueName = process.env.QUEUE_NAME || "<queue name>";

const listOfScientists = [
Expand All @@ -38,7 +37,7 @@ async function main() {
const sbClient = new ServiceBusClient(connectionString);

// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(queueName);
const sender = await sbClient.createSender(queueName);

try {
for (let index = 0; index < listOfScientists.length; index++) {
Expand All @@ -58,6 +57,6 @@ async function main() {
}
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
Loading

0 comments on commit d0ad83c

Please sign in to comment.