Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Track 2 - port sender.open() #8636

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions sdk/servicebus/service-bus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ using the [createSender][sbclient_createsender] method.
This gives you a sender which you can use to [send][sender_send] messages.

```javascript
const sender = serviceBusClient.createSender("my-queue");
const sender = await serviceBusClient.createSender("my-queue");

// sending a single message
await sender.send({
Expand Down Expand Up @@ -189,7 +189,7 @@ When sending the message, set the `sessionId` property in the message to ensure
your message lands in the right session.

```javascript
const sender = serviceBusClient.createSender("my-session-queue");
const sender = await serviceBusClient.createSender("my-session-queue");
await sender.send({
body: "my-message-body",
sessionId: "my-session"
Expand All @@ -216,7 +216,7 @@ There are two ways of choosing which session to open:
1. Specify a `sessionId`, which locks a named session.

```javascript
const receiver = serviceBusClient.createSessionReceiver("my-session-queue", "peekLock", {
const receiver = await serviceBusClient.createSessionReceiver("my-session-queue", "peekLock", {
sessionId: "my-session"
});
```
Expand All @@ -225,7 +225,7 @@ There are two ways of choosing which session to open:
that is not already locked.

```javascript
const receiver = serviceBusClient.createSessionReceiver("my-session-queue", "peekLock");
const receiver = await serviceBusClient.createSessionReceiver("my-session-queue", "peekLock");
```

You can find the name of the session via the `sessionId` property on the `SessionReceiver`.
Expand Down
9 changes: 8 additions & 1 deletion sdk/servicebus/service-bus/migrationguide.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Guide to migrate from @azure/service-bus v1 to v7.preview.1
# Guide to migrate from @azure/service-bus v1 to v7.preview.2

This document is intended for users that would like to try out preview 7
for @azure/service-bus. As the package is in preview, these details might
Expand Down Expand Up @@ -108,4 +108,11 @@ brings this package in line with the [Azure SDK Design Guidelines for Typescript
ruleManager.removeRule();
```

* createSender() and createSessionReceiver() are now async methods and initialize the connection

Prior to v7 `createSender()` and `createSessionReceiver()` worked using lazy-initialization, where the
AMQP connection would only be initialized on first send or receiving of a message.

The connection and link are now initialized after calling either method.

![Impressions](https://azure-sdk-impressions.azurewebsites.net/api/impressions/azure-sdk-for-js%2Fsdk%2Fservicebus%2Fservice-bus%2FREADME.png)
7 changes: 6 additions & 1 deletion sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ export interface CreateBatchOptions extends OperationOptions {
maxSizeInBytes?: number;
}

// @public
export interface CreateSenderOptions {
abortSignal?: AbortSignalLike;
}

// @public
export interface CreateSessionReceiverOptions extends SessionReceiverOptions, OperationOptions {
}
Expand Down Expand Up @@ -162,7 +167,7 @@ export class ServiceBusClient {
createReceiver(queueName: string, receiveMode: "receiveAndDelete"): Receiver<ReceivedMessage>;
createReceiver(topicName: string, subscriptionName: string, receiveMode: "peekLock"): Receiver<ReceivedMessageWithLock>;
createReceiver(topicName: string, subscriptionName: string, receiveMode: "receiveAndDelete"): Receiver<ReceivedMessage>;
createSender(queueOrTopicName: string): Sender;
createSender(queueOrTopicName: string, options?: CreateSenderOptions): Promise<Sender>;
createSessionReceiver(queueName: string, receiveMode: "peekLock", options?: CreateSessionReceiverOptions): Promise<SessionReceiver<ReceivedMessageWithLock>>;
createSessionReceiver(queueName: string, receiveMode: "receiveAndDelete", options?: CreateSessionReceiverOptions): Promise<SessionReceiver<ReceivedMessage>>;
createSessionReceiver(topicName: string, subscriptionName: string, receiveMode: "peekLock", options?: CreateSessionReceiverOptions): Promise<SessionReceiver<ReceivedMessageWithLock>>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ const { ServiceBusClient, delay } = require("@azure/service-bus");
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const queueName = process.env.QUEUE_NAME || "<queue name>";

async function main() {
Expand All @@ -33,7 +32,7 @@ async function main() {
async function sendMessages() {
const sbClient = new ServiceBusClient(connectionString);
// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(queueName);
const sender = await sbClient.createSender(queueName);

const data = [
{ step: 1, title: "Shop" },
Expand Down Expand Up @@ -76,7 +75,7 @@ async function receiveMessage() {
const deferredSteps = new Map();
let lastProcessedRecipeStep = 0;
try {
const processMessage = async brokeredMessage => {
const processMessage = async (brokeredMessage) => {
if (
brokeredMessage.label === "RecipeStep" &&
brokeredMessage.contentType === "application/json"
Expand Down Expand Up @@ -104,7 +103,7 @@ async function receiveMessage() {
await brokeredMessage.deadLetter();
}
};
const processError = async err => {
const processError = async (err) => {
console.log(">>>>> Error occurred: ", err);
};
receiver.subscribe(
Expand Down Expand Up @@ -138,6 +137,6 @@ async function receiveMessage() {
}
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async function main() {

async function sendMessage() {
// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(queueName);
const sender = await sbClient.createSender(queueName);

const message = {
body: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ const { ServiceBusClient } = require("@azure/service-bus");
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const queueName = process.env.QUEUE_NAME || "<queue name>";

const sbClient = new ServiceBusClient(connectionString);
Expand Down Expand Up @@ -56,20 +55,17 @@ async function processDeadletterMessageQueue() {
// Send repaired message back to the current queue / topic
async function fixAndResendMessage(oldMessage) {
// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(queueName);
const sender = await sbClient.createSender(queueName);

// Inspect given message and make any changes if necessary
const repairedMessage = { ...oldMessage };

console.log(
">>>>> Cloning the message from DLQ and resending it - ",
oldMessage.body
);
console.log(">>>>> Cloning the message from DLQ and resending it - ", oldMessage.body);

await sender.send(repairedMessage);
await sender.close();
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ const { ServiceBusClient } = require("@azure/service-bus");
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const userEventsQueueName =
process.env.QUEUE_NAME_WITH_SESSIONS || "<queue name>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const userEventsQueueName = process.env.QUEUE_NAME_WITH_SESSIONS || "<queue name>";
const sbClient = new ServiceBusClient(connectionString);
async function main() {
try {
Expand Down Expand Up @@ -74,13 +72,9 @@ async function runScenario() {
}
async function getSessionState(sessionId) {
// If receiving from a subscription you can use the createSessionReceiver(topic, subscription) overload
const sessionReceiver = sbClient.createSessionReceiver(
userEventsQueueName,
"peekLock",
{
sessionId: sessionId
}
);
const sessionReceiver = await sbClient.createSessionReceiver(userEventsQueueName, "peekLock", {
sessionId: sessionId
});
const sessionState = await sessionReceiver.getState();
if (sessionState) {
// Get list of items
Expand All @@ -92,7 +86,7 @@ async function getSessionState(sessionId) {
}
async function sendMessagesForSession(shoppingEvents, sessionId) {
// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(userEventsQueueName);
const sender = await sbClient.createSender(userEventsQueueName);
for (let index = 0; index < shoppingEvents.length; index++) {
const message = {
sessionId: sessionId,
Expand All @@ -105,13 +99,9 @@ async function sendMessagesForSession(shoppingEvents, sessionId) {
}
async function processMessageFromSession(sessionId) {
// If receiving from a subscription you can use the createSessionReceiver(topic, subscription) overload
const sessionReceiver = sbClient.createSessionReceiver(
userEventsQueueName,
"peekLock",
{
sessionId
}
);
const sessionReceiver = await sbClient.createSessionReceiver(userEventsQueueName, "peekLock", {
sessionId
});

const messages = await sessionReceiver.receiveBatch(1, {
maxWaitTimeSeconds: 10
Expand Down Expand Up @@ -143,6 +133,6 @@ async function processMessageFromSession(sessionId) {

await sessionReceiver.close();
}
main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@ const { ServiceBusClient } = require("@azure/service-bus");
// Load the .env file if it exists
require("dotenv").config();
// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const topicName = process.env.TOPIC_NAME || "<topic name>";
const subscriptionName1 =
process.env.TOPIC_FILTER_SUBSCRIPTION_1 || "<subscription name>";
const subscriptionName2 =
process.env.TOPIC_FILTER_SUBSCRIPTION_2 || "<subscription name>";
const subscriptionName3 =
process.env.TOPIC_FILTER_SUBSCRIPTION_3 || "<subscription name>";
const subscriptionName1 = process.env.TOPIC_FILTER_SUBSCRIPTION_1 || "<subscription name>";
const subscriptionName2 = process.env.TOPIC_FILTER_SUBSCRIPTION_2 || "<subscription name>";
const subscriptionName3 = process.env.TOPIC_FILTER_SUBSCRIPTION_3 || "<subscription name>";
async function main() {
const sbClient = new ServiceBusClient(connectionString);
try {
Expand All @@ -44,18 +40,9 @@ async function main() {

// Adds Rules on subscriptions to route messages from a topic to different subscriptions
async function addRules(sbClient) {
const subscription1Client = sbClient.getSubscriptionRuleManager(
topicName,
subscriptionName1
);
const subscription2Client = sbClient.getSubscriptionRuleManager(
topicName,
subscriptionName2
);
const subscription3Client = sbClient.getSubscriptionRuleManager(
topicName,
subscriptionName3
);
const subscription1Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName1);
const subscription2Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName2);
const subscription3Client = sbClient.getSubscriptionRuleManager(topicName, subscriptionName3);
// The default rule on the subscription allows all messages in.
// So, remove existing rules before adding new ones
await removeAllRules(subscription1Client);
Expand All @@ -69,7 +56,7 @@ async function addRules(sbClient) {

// Sends 100 messages with a user property called "priority" whose value is between 1 and 4
async function sendMessages(sbClient) {
const sender = sbClient.createSender(topicName);
const sender = await sbClient.createSender(topicName);
for (let index = 0; index < 10; index++) {
const priority = Math.ceil(Math.random() * 4);
const message = {
Expand All @@ -82,21 +69,9 @@ async function sendMessages(sbClient) {
}
// Prints messages from the 3 subscriptions
async function receiveMessages(sbClient) {
const subscription1 = sbClient.createReceiver(
topicName,
subscriptionName1,
"peekLock"
);
const subscription2 = sbClient.createReceiver(
topicName,
subscriptionName2,
"peekLock"
);
const subscription3 = sbClient.createReceiver(
topicName,
subscriptionName3,
"peekLock"
);
const subscription1 = sbClient.createReceiver(topicName, subscriptionName1, "peekLock");
const subscription2 = sbClient.createReceiver(topicName, subscriptionName2, "peekLock");
const subscription3 = sbClient.createReceiver(topicName, subscriptionName3, "peekLock");

const messagesFromSubscription1 = await subscription1.receiveBatch(10, {
maxWaitTimeSeconds: 5
Expand Down Expand Up @@ -136,6 +111,6 @@ async function removeAllRules(client) {
}
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
17 changes: 7 additions & 10 deletions sdk/servicebus/service-bus/samples/javascript/scheduledMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ const { delay, ServiceBusClient } = require("@azure/service-bus");
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const queueName = process.env.QUEUE_NAME || "<queue name>";
const listOfScientists = [
{ lastName: "Einstein", firstName: "Albert" },
Expand Down Expand Up @@ -48,9 +47,9 @@ async function main() {
// Scheduling messages to be sent after 10 seconds from now
async function sendScheduledMessages(sbClient) {
// createSender() handles sending to a queue or a topic
const sender = sbClient.createSender(queueName);
const sender = await sbClient.createSender(queueName);

const messages = listOfScientists.map(scientist => ({
const messages = listOfScientists.map((scientist) => ({
body: `${scientist.firstName} ${scientist.lastName}`,
label: "Scientist"
}));
Expand All @@ -71,14 +70,12 @@ async function receiveMessages(sbClient) {
let queueReceiver = sbClient.createReceiver(queueName, "peekLock");

let numOfMessagesReceived = 0;
const processMessage = async brokeredMessage => {
const processMessage = async (brokeredMessage) => {
numOfMessagesReceived++;
console.log(
`Received message: ${brokeredMessage.body} - ${brokeredMessage.label}`
);
console.log(`Received message: ${brokeredMessage.body} - ${brokeredMessage.label}`);
await brokeredMessage.complete();
};
const processError = async err => {
const processError = async (err) => {
console.log("Error occurred: ", err);
};

Expand Down Expand Up @@ -106,6 +103,6 @@ async function receiveMessages(sbClient) {
await sbClient.close();
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
7 changes: 3 additions & 4 deletions sdk/servicebus/service-bus/samples/javascript/sendMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ const { ServiceBusClient } = require("@azure/service-bus");
require("dotenv").config();

// Define connection string and related Service Bus entity names here
const connectionString =
process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const connectionString = process.env.SERVICE_BUS_CONNECTION_STRING || "<connection string>";
const queueName = process.env.QUEUE_NAME || "<queue name>";

const listOfScientists = [
Expand All @@ -38,7 +37,7 @@ async function main() {
const sbClient = new ServiceBusClient(connectionString);

// createSender() can also be used to create a sender for a topic.
const sender = sbClient.createSender(queueName);
const sender = await sbClient.createSender(queueName);

try {
for (let index = 0; index < listOfScientists.length; index++) {
Expand All @@ -58,6 +57,6 @@ async function main() {
}
}

main().catch(err => {
main().catch((err) => {
console.log("Error occurred: ", err);
});
Loading