-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
receiveMessagesStreaming.ts
93 lines (82 loc) · 3.67 KB
/
receiveMessagesStreaming.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT Licence.
/**
* This sample demonstrates how the receive() function can be used to receive Service Bus messages
* in a stream.
*
* Setup: Please run "sendMessages.ts" sample before running this to populate the queue/topic
*
* @summary Demonstrates how to receive Service Bus messages in a stream
* @azsdk-weight 90
*/
import {
delay,
isServiceBusError,
ProcessErrorArgs,
ServiceBusClient,
ServiceBusReceivedMessage,
} from "@azure/service-bus";
// Load the .env file if it exists
import * as dotenv from "dotenv";
dotenv.config();
// Define connection string and related Service Bus entity names here
const connectionString = process.env.SERVICEBUS_CONNECTION_STRING || "<connection string>";
const queueName = process.env.QUEUE_NAME || "<queue name>";
export async function main() {
const sbClient = new ServiceBusClient(connectionString);
// - If receiving from a subscription you can use the createReceiver(topicName, subscriptionName) overload
// instead.
// - See session.ts for how to receive using sessions.
const receiver = sbClient.createReceiver(queueName);
try {
const subscription = receiver.subscribe({
// After executing this callback you provide, the receiver will remove the message from the queue if you
// have not already settled the message in your callback.
// You can disable this by passing `false` to the `autoCompleteMessages` option in the `subscribe()` method.
// If your callback _does_ throw an error before the message is settled, then it will be abandoned.
processMessage: async (brokeredMessage: ServiceBusReceivedMessage) => {
console.log(`Received message: ${brokeredMessage.body}`);
},
// This callback will be called for any error that occurs when either in the receiver when receiving the message
// or when executing your `processMessage` callback or when the receiver automatically completes or abandons the message.
processError: async (args: ProcessErrorArgs) => {
console.log(`Error from source ${args.errorSource} occurred: `, args.error);
// the `subscribe() call will not stop trying to receive messages without explicit intervention from you.
if (isServiceBusError(args.error)) {
switch (args.error.code) {
case "MessagingEntityDisabled":
case "MessagingEntityNotFound":
case "UnauthorizedAccess":
// It's possible you have a temporary infrastructure change (for instance, the entity being
// temporarily disabled). The handler will continue to retry if `close()` is not called on the subscription - it is completely up to you
// what is considered fatal for your program.
console.log(
`An unrecoverable error occurred. Stopping processing. ${args.error.code}`,
args.error
);
await subscription.close();
break;
case "MessageLockLost":
console.log(`Message lock lost for message`, args.error);
break;
case "ServiceBusy":
// choosing an arbitrary amount of time to wait.
await delay(1000);
break;
}
}
},
});
// Waiting long enough before closing the receiver to receive messages
console.log(`Receiving messages for 20 seconds before exiting...`);
await delay(20000);
console.log(`Closing...`);
await receiver.close();
} finally {
await sbClient.close();
}
}
main().catch((err) => {
console.log("ReceiveMessagesStreaming - Error occurred: ", err);
process.exit(1);
});