diff --git a/eng/.docsettings.yml b/eng/.docsettings.yml index dfcf5cb0e333..1d824d93c0fc 100644 --- a/eng/.docsettings.yml +++ b/eng/.docsettings.yml @@ -16,7 +16,7 @@ omitted_paths: - sdk/search/*/test/README.md - sdk/servicebus/*/test/README.md - sdk/servicebus/README.md - - sdk/servicebus/service-bus/test/stress-test-track-2/* + - sdk/servicebus/service-bus/test/stress/* - sdk/schemaregistry/README.md - sdk/storage/*/test/README.md - sdk/storage/storage-internal-avro/* diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/tsconfig.json b/sdk/servicebus/service-bus/test/stress-test-track-2/tsconfig.json deleted file mode 100644 index 2c7079025a21..000000000000 --- a/sdk/servicebus/service-bus/test/stress-test-track-2/tsconfig.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "compilerOptions": { - "target": "ES2015", - "moduleResolution": "node", - "esModuleInterop": true, - "lib": ["ESNext.AsyncIterable"], - "outDir": "dist", - "rootDir": "." - }, - "include": ["./*.ts"], - "exclude": ["node_modules"] -} diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/SetupAndGuide.md b/sdk/servicebus/service-bus/test/stress/SetupAndGuide.md similarity index 100% rename from sdk/servicebus/service-bus/test/stress-test-track-2/SetupAndGuide.md rename to sdk/servicebus/service-bus/test/stress/SetupAndGuide.md diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/package.json b/sdk/servicebus/service-bus/test/stress/package.json similarity index 100% rename from sdk/servicebus/service-bus/test/stress-test-track-2/package.json rename to sdk/servicebus/service-bus/test/stress/package.json diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/scenarioBatchReceive.ts b/sdk/servicebus/service-bus/test/stress/scenarioBatchReceive.ts similarity index 100% rename from sdk/servicebus/service-bus/test/stress-test-track-2/scenarioBatchReceive.ts rename to sdk/servicebus/service-bus/test/stress/scenarioBatchReceive.ts diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/scenarioCloseOpen.ts b/sdk/servicebus/service-bus/test/stress/scenarioCloseOpen.ts similarity index 100% rename from sdk/servicebus/service-bus/test/stress-test-track-2/scenarioCloseOpen.ts rename to sdk/servicebus/service-bus/test/stress/scenarioCloseOpen.ts diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/scenarioPeekMessages.ts b/sdk/servicebus/service-bus/test/stress/scenarioPeekMessages.ts similarity index 100% rename from sdk/servicebus/service-bus/test/stress-test-track-2/scenarioPeekMessages.ts rename to sdk/servicebus/service-bus/test/stress/scenarioPeekMessages.ts diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/scenarioRenewMessageLock.ts b/sdk/servicebus/service-bus/test/stress/scenarioRenewMessageLock.ts similarity index 100% rename from sdk/servicebus/service-bus/test/stress-test-track-2/scenarioRenewMessageLock.ts rename to sdk/servicebus/service-bus/test/stress/scenarioRenewMessageLock.ts diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/scenarioRenewSessionLock.ts b/sdk/servicebus/service-bus/test/stress/scenarioRenewSessionLock.ts similarity index 100% rename from sdk/servicebus/service-bus/test/stress-test-track-2/scenarioRenewSessionLock.ts rename to sdk/servicebus/service-bus/test/stress/scenarioRenewSessionLock.ts diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/scenarioSend.ts b/sdk/servicebus/service-bus/test/stress/scenarioSend.ts similarity index 100% rename from sdk/servicebus/service-bus/test/stress-test-track-2/scenarioSend.ts rename to sdk/servicebus/service-bus/test/stress/scenarioSend.ts diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/scenarioStreamingReceive.ts b/sdk/servicebus/service-bus/test/stress/scenarioStreamingReceive.ts similarity index 100% rename from sdk/servicebus/service-bus/test/stress-test-track-2/scenarioStreamingReceive.ts rename to sdk/servicebus/service-bus/test/stress/scenarioStreamingReceive.ts diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/stressTestsBase.ts b/sdk/servicebus/service-bus/test/stress/stressTestsBase.ts similarity index 100% rename from sdk/servicebus/service-bus/test/stress-test-track-2/stressTestsBase.ts rename to sdk/servicebus/service-bus/test/stress/stressTestsBase.ts diff --git a/sdk/servicebus/service-bus/test/stress/stress_fixedNumberOfClientsMultipleQueues.ts b/sdk/servicebus/service-bus/test/stress/stress_fixedNumberOfClientsMultipleQueues.ts deleted file mode 100644 index bc83c9e6f248..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_fixedNumberOfClientsMultipleQueues.ts +++ /dev/null @@ -1,94 +0,0 @@ -/* -Test Scenario summary: -- Creates 10 clients against 10 different existing queues with names as queue-1, queue-2, ... queue-10. -- Sends, receives & completes messages in a loop on each client independently. -- All senders, receivers and clients are closed in the end. - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace must be supplied. -*/ - -import { SendableMessageInfo, ReceiveMode } from "../../src"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; -import { Sender } from "../../src/sender"; -import { InternalReceiver } from "../../src/internalReceivers"; - -const connectionString = ""; - -const testDurationInMilliseconds = 60000 * 5 * 12 * 24 * 7; // 1 week - -const numOfClients = 10; - -let msgId = 1; - -let snapshotIntervalID: any; - -let isJobDone = false; - -async function main(): Promise { - snapshotIntervalID = setInterval(snapshot, 5000); // Every 5 seconds - setTimeout(() => { - isJobDone = true; - }, testDurationInMilliseconds); - - await sendReceiveMessages(); -} - -async function sendReceiveMessages(): Promise { - const ns = new ServiceBusClient(connectionString); - - const clients = []; - const senders = []; - const receivers = []; - - const sendReceiveMessagePromises = []; - - try { - for (let i = 0; i < numOfClients; i++) { - clients[i] = ns.createQueueClient(`t0-queue-new-${i + 1}`); - senders[i] = clients[i].createSender(); - receivers[i] = clients[i].createReceiver(ReceiveMode.peekLock); - - sendReceiveMessagePromises.push(sendReceiveMessagesPerClient(senders[i], receivers[i])); - } - - await Promise.all(sendReceiveMessagePromises); - } finally { - for (let i = 0; i < numOfClients; i++) { - await senders[i].close(); - await receivers[i].close(); - await clients[i].close(); - } - await ns.close(); - clearInterval(snapshotIntervalID); - } -} - -async function sendReceiveMessagesPerClient( - sender: Sender, - receiver: InternalReceiver -): Promise { - while (!isJobDone) { - const message: SendableMessageInfo = { - messageId: msgId, - body: "test", - label: `${msgId}` - }; - msgId++; - await sender.send(message); - const messagesReceived = await receiver.receiveMessages(1); - await messagesReceived[0].complete(); - } -} - -function snapshot(): void { - console.log("Time : ", new Date()); - console.log("Number of clients opened, closed successfully so far : ", msgId); - console.log("\n"); -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/stress_fixedNumberOfClientsSingleQueue.ts b/sdk/servicebus/service-bus/test/stress/stress_fixedNumberOfClientsSingleQueue.ts deleted file mode 100644 index a1a7c954d478..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_fixedNumberOfClientsSingleQueue.ts +++ /dev/null @@ -1,96 +0,0 @@ -/* -Test Scenario summary: -- Creates 10 clients against a single existing queue. -- Sends, receives & completes messages in a loop on each client independently. -- All senders, receivers and clients are closed in the end. - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace and queue name -must be supplied. -*/ - -import { SendableMessageInfo, ReceiveMode } from "../../src"; -import { Sender } from "../../src/sender"; -import { InternalReceiver } from "../../src/internalReceivers"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; - -const connectionString = ""; -const queueName = ""; - -const testDurationInMilliseconds = 60000 * 5 * 12 * 24 * 7; // 1 week - -const numOfClients = 10; - -let msgId = 1; - -let snapshotIntervalID: any; - -let isJobDone = false; - -async function main(): Promise { - snapshotIntervalID = setInterval(snapshot, 5000); // Every 5 seconds - setTimeout(() => { - isJobDone = true; - }, testDurationInMilliseconds); - - await sendReceiveMessages(); -} - -async function sendReceiveMessages(): Promise { - const ns = new ServiceBusClient(connectionString); - - const clients = []; - const senders = []; - const receivers = []; - - const sendReceiveMessagePromises = []; - - try { - for (let i = 0; i < numOfClients; i++) { - clients[i] = ns.createQueueClient(queueName); - senders[i] = clients[i].createSender(); - receivers[i] = clients[i].createReceiver(ReceiveMode.peekLock); - - sendReceiveMessagePromises.push(sendReceiveMessagesPerClient(senders[i], receivers[i])); - } - - await Promise.all(sendReceiveMessagePromises); - } finally { - for (let i = 0; i < numOfClients; i++) { - await senders[i].close(); - await receivers[i].close(); - await clients[i].close(); - } - await ns.close(); - clearInterval(snapshotIntervalID); - } -} - -async function sendReceiveMessagesPerClient( - sender: Sender, - receiver: InternalReceiver -): Promise { - while (!isJobDone) { - const message: SendableMessageInfo = { - messageId: msgId, - body: "test", - label: `${msgId}` - }; - msgId++; - await sender.send(message); - const messagesReceived = await receiver.receiveMessages(1); - await messagesReceived[0].complete(); - } -} - -function snapshot(): void { - console.log("Time : ", new Date()); - console.log("Number of clients opened, closed successfully so far : ", msgId); - console.log("\n"); -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/stress_messageAutolockRenewal.ts b/sdk/servicebus/service-bus/test/stress/stress_messageAutolockRenewal.ts deleted file mode 100644 index 8388a1d2eee1..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_messageAutolockRenewal.ts +++ /dev/null @@ -1,114 +0,0 @@ -/* -Test Scenario summary: -- Creates a single sender and a single receiver on a queue. -- Receives a message and holds onto it for the duration of test. AutoLockRenewal is enabled and set to the test duration. - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace and queue name -must be supplied. -*/ - -import { - SendableMessageInfo, - OnMessage, - OnError, - delay, - ReceiveMode, - ServiceBusMessage -} from "../../src"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; - -const connectionString = ""; -const queueName = ""; - -let elapsedTime = 0; -const interval = 60000; -const testDurationInMilliseconds = 60000 * 5 * 12 * 24; // 24 hours - -let receivedMessage: ServiceBusMessage; - -async function main(): Promise { - await sendMessage(); - await receiveMessage(); -} - -async function sendMessage(): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - try { - const sender = client.createSender(); - - const message: SendableMessageInfo = { - messageId: "test", - body: "test", - label: `test` - }; - - await sender.send(message); - } finally { - await client.close(); - await ns.close(); - } -} - -async function receiveMessage(): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - - try { - const receiver = client.createReceiver(ReceiveMode.peekLock); - const onMessageHandler: OnMessage = async (brokeredMessage) => { - receivedMessage = brokeredMessage; - if (receivedMessage.messageId !== "test") { - throw new Error("Message is corrupt or unexpected"); - } - console.log("Received message: ", receivedMessage.messageId); - - while (elapsedTime < testDurationInMilliseconds) { - // simulate the user making an async call that takes time. - await delay(interval); - elapsedTime += interval; - - // log how long we've executed. - console.log(`still executing after ${elapsedTime}`); - - console.log("Time now: ", Date.now()); - console.log("Processing message:"); - console.log("MessageId: ", receivedMessage.messageId); - console.log("Delivery count: ", receivedMessage.deliveryCount); - console.log("LockedUntilUTC: ", receivedMessage.lockedUntilUtc); - console.log("Sequence number: ", receivedMessage.sequenceNumber); - console.log("TimeToLive: ", receivedMessage.timeToLive); - console.log("Message content: ", receivedMessage); - console.log("\n"); - } - - await brokeredMessage.complete(); - console.log("Completed message: ", receivedMessage.messageId); - }; - - const onErrorHandler: OnError = (err) => { - // Since implementation of onError handler in SDK is such that it is supposed to be the - // terminal executing code, any error while processing message will surface by crashing the process - console.log("Error thrown by user's OnError handler", err); - throw err; - }; - - receiver.registerMessageHandler(onMessageHandler, onErrorHandler, { - autoComplete: false, - maxAutoRenewLockDurationInMs: testDurationInMilliseconds - }); - - await delay(testDurationInMilliseconds + 5000); - await receiver.close(); - } finally { - await client.close(); - await ns.close(); - } -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/stress_messageLockRenewalBatchingThenStreaming.ts b/sdk/servicebus/service-bus/test/stress/stress_messageLockRenewalBatchingThenStreaming.ts deleted file mode 100644 index f57f0bf16b60..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_messageLockRenewalBatchingThenStreaming.ts +++ /dev/null @@ -1,121 +0,0 @@ -/* -Test Scenario summary: -- Creates a single sender and a single receiver on a queue. -- Send 2 messages -- Receive message using batching receiver -- Receives second message and hold onto it for the duration of test. AutoLockRenewal is enabled and set to the test duration. - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace and queue name -must be supplied. -*/ - -import { - SendableMessageInfo, - OnMessage, - OnError, - delay, - ReceiveMode, - ServiceBusMessage -} from "../../src"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; - -const connectionString = ""; -const queueName = ""; - -let elapsedTime = 0; -const interval = 60000; -const testDurationInMilliseconds = 60000 * 20; // 20 min - -let receivedMessage: ServiceBusMessage; - -async function main(): Promise { - await sendMessage("message-for-batching"); - await sendMessage("message-for-streaming"); - await receiveMessages(); -} - -async function sendMessage(messageId: string): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - try { - const sender = client.createSender(); - - const message: SendableMessageInfo = { - messageId: messageId, - body: "test", - label: `test` - }; - - await sender.send(message); - } finally { - await client.close(); - await ns.close(); - } -} - -async function receiveMessages(): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - - try { - const receiver = client.createReceiver(ReceiveMode.peekLock); - const onMessageHandler: OnMessage = async (brokeredMessage) => { - receivedMessage = brokeredMessage; - - console.log("Received message: ", receivedMessage.messageId); - - while (elapsedTime < testDurationInMilliseconds) { - // simulate the user making an async call that takes time. - await delay(interval); - elapsedTime += interval; - - // log how long we've executed. - console.log(`still executing after ${elapsedTime}`); - - console.log("Time now: ", Date.now()); - console.log("Processing message:"); - console.log("MessageId: ", receivedMessage.messageId); - console.log("Delivery count: ", receivedMessage.deliveryCount); - console.log("LockedUntilUTC: ", receivedMessage.lockedUntilUtc); - console.log("Sequence number: ", receivedMessage.sequenceNumber); - console.log("TimeToLive: ", receivedMessage.timeToLive); - console.log("Message content: ", receivedMessage); - console.log("\n"); - } - - await brokeredMessage.complete(); - console.log("Completed message: ", receivedMessage.messageId); - }; - - const onErrorHandler: OnError = (err) => { - // Since implementation of onError handler in SDK is such that it is supposed to be the - // terminal executing code, any error while processing message will surface by crashing the process - console.log("Error thrown by user's OnError handler", err); - throw err; - }; - - const receivedBatchMessages = await receiver.receiveMessages(1); - if (receivedBatchMessages.length !== 1) { - throw new Error("Test failed: Could not receive message for batching receiver as intended"); - } - await receivedBatchMessages[0].complete(); - - receiver.registerMessageHandler(onMessageHandler, onErrorHandler, { - autoComplete: false, - maxAutoRenewLockDurationInMs: testDurationInMilliseconds - }); - - await delay(testDurationInMilliseconds + 60000); - await receiver.close(); - } finally { - await client.close(); - await ns.close(); - } -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/stress_messageManualLockRenewal.ts b/sdk/servicebus/service-bus/test/stress/stress_messageManualLockRenewal.ts deleted file mode 100644 index f8ceb814e0b2..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_messageManualLockRenewal.ts +++ /dev/null @@ -1,116 +0,0 @@ -/* -Test Scenario summary: -- Creates a single sender and a single receiver on a queue. -- Receives a message and holds onto it for the duration of test. -- Autolockrenewal is disabled and lock renewal is manually done on the message periodically in user code. - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace and queue name -must be supplied. -*/ - -import { - SendableMessageInfo, - OnMessage, - OnError, - delay, - ReceiveMode, - ServiceBusMessage -} from "../../src"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; - -const connectionString = ""; -const queueName = ""; - -let elapsedTime = 0; -const interval = 10000; -const testDurationInMilliseconds = 60000 * 5 * 12 * 24; // 24 hours - -let receivedMessage: ServiceBusMessage; - -async function main(): Promise { - await sendMessage(); - await receiveMessage(); -} - -async function sendMessage(): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - try { - const sender = client.createSender(); - - const message: SendableMessageInfo = { - messageId: "test", - body: "test", - label: `test` - }; - - await sender.send(message); - } finally { - await client.close(); - await ns.close(); - } -} - -async function receiveMessage(): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - - try { - const receiver = client.createReceiver(ReceiveMode.peekLock); - const onMessageHandler: OnMessage = async (brokeredMessage) => { - receivedMessage = brokeredMessage; - if (receivedMessage.messageId !== "test") { - throw new Error("Message is corrupt or unexpected"); - } - console.log("Received message: ", receivedMessage.messageId); - const startTime = Date.now(); - while (elapsedTime < testDurationInMilliseconds) { - // simulate the user making an async call that takes time. - await delay(interval); - await receiver.renewMessageLock(receivedMessage); - elapsedTime = Date.now() - startTime; - - // log how long we've executed. - console.log(`still executing after ${elapsedTime}`); - - console.log("Time now: ", new Date().getUTCDate()); - console.log("Processing message:"); - console.log("MessageId: ", receivedMessage.messageId); - console.log("Delivery count: ", receivedMessage.deliveryCount); - console.log("LockedUntilUTC: ", receivedMessage.lockedUntilUtc); - console.log("Sequence number: ", receivedMessage.sequenceNumber); - console.log("TimeToLive: ", receivedMessage.timeToLive); - console.log("Message content: ", receivedMessage); - console.log("\n"); - } - - await brokeredMessage.complete(); - console.log("Completed message: ", receivedMessage.messageId); - }; - - const onErrorHandler: OnError = (err) => { - // Since implementation of onError handler in SDK is such that it is supposed to be the - // terminal executing code, any error while processing message will surface by crashing the process - console.log("Error thrown by user's OnError handler", err); - throw err; - }; - - receiver.registerMessageHandler(onMessageHandler, onErrorHandler, { - autoComplete: false, - maxAutoRenewLockDurationInMs: 0 - }); - - await delay(testDurationInMilliseconds + 30000); - await receiver.close(); - } finally { - await client.close(); - await ns.close(); - } -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/stress_messageRandomDisposition.ts b/sdk/servicebus/service-bus/test/stress/stress_messageRandomDisposition.ts deleted file mode 100644 index 01a3062e6a7d..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_messageRandomDisposition.ts +++ /dev/null @@ -1,158 +0,0 @@ -/* -Test Scenario summary: -- Creates a single sender and a single receiver on a queue. -- Runs following sequence of steps in a long running loop. -Sends a message -> receives a message -> performs random message disposition option - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace and queue name -must be supplied. -*/ - -import { SendableMessageInfo, OnMessage, OnError, delay, ReceiveMode } from "../../src"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; - -const connectionString = ""; -const queueName = ""; - -const testDurationInMilliseconds = 60000 * 5 * 12 * 24 * 7; // 1 week - -const messagesToProcess: Set = new Set(); - -let abandonAttempt = 0; -let abandonCount = 0; -let completeCount = 0; -let deadletterCount = 0; -let deferCount = 0; - -let msgId = 1; - -let snapshotIntervalID: any; - -let isJobDone = false; - -async function main(): Promise { - snapshotIntervalID = setInterval(snapshot, 5000); // Every 5 seconds - const sendPromise = sendMessages(); - const receivePromise = receiveMessages(); - await Promise.all([sendPromise, receivePromise]); -} - -async function sendMessages(): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - try { - const sender = client.createSender(); - - while (!isJobDone) { - const message: SendableMessageInfo = { - messageId: msgId, - body: "test", - label: `${msgId}` - }; - messagesToProcess.add(msgId); - msgId++; - await sender.send(message); - await delay(2000); // Throttling send to not increase queue size - } - } finally { - await client.close(); - await ns.close(); - } -} - -async function receiveMessages(): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - - try { - const receiver = client.createReceiver(ReceiveMode.peekLock); - const onMessageHandler: OnMessage = async (brokeredMessage) => { - const receivedMsgId = brokeredMessage.messageId; - - if (typeof receivedMsgId !== "number") { - throw new Error("MessageId is corrupt or is of unexpected type"); - } - - /* - Since there are 4 ways a message can be disposed namely abandon(), complete(), - defer() and deadletter(), the randomization factor is chosen to be 4. - */ - const seed = Math.floor((Math.random() * 10) % 4); - - switch (seed) { - case 0: { - abandonAttempt++; - if (brokeredMessage.deliveryCount === 10) { - abandonCount++; - if (messagesToProcess.has(receivedMsgId)) { - messagesToProcess.delete(receivedMsgId); - } - } - await brokeredMessage.abandon(); - break; - } - case 1: { - completeCount++; - if (messagesToProcess.has(receivedMsgId)) { - messagesToProcess.delete(receivedMsgId); - } - await brokeredMessage.complete(); - break; - } - case 2: { - deadletterCount++; - if (messagesToProcess.has(receivedMsgId)) { - messagesToProcess.delete(receivedMsgId); - } - await brokeredMessage.deadLetter(); - break; - } - case 3: { - deferCount++; - if (messagesToProcess.has(receivedMsgId)) { - messagesToProcess.delete(receivedMsgId); - } - await brokeredMessage.defer(); - break; - } - default: { - throw new Error("Unexpected seed"); - } - } - }; - - const onErrorHandler: OnError = (err) => { - throw err; - }; - - receiver.registerMessageHandler(onMessageHandler, onErrorHandler, { autoComplete: false }); - await delay(testDurationInMilliseconds); - - isJobDone = true; - - await receiver.close(); - clearInterval(snapshotIntervalID); - } finally { - await client.close(); - await ns.close(); - } -} - -function snapshot(): void { - console.log("Time : ", new Date()); - console.log("Number of messages not processed yet : ", messagesToProcess.size); - console.log("Number of messages sent so far : ", msgId); - console.log("Number of messages abandoned : ", abandonCount); - console.log("Number of messages completed : ", completeCount); - console.log("Number of messages deadlettered : ", deadletterCount); - console.log("Number of messages deferred : ", deferCount); - console.log("Number of abandon attempts on messages : ", abandonAttempt); - console.log("\n"); -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/stress_messageRandomDispositionOnSession.ts b/sdk/servicebus/service-bus/test/stress/stress_messageRandomDispositionOnSession.ts deleted file mode 100644 index 6d54fcd0b3b5..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_messageRandomDispositionOnSession.ts +++ /dev/null @@ -1,165 +0,0 @@ -/* -Test Scenario summary: -- Creates a single sender and a single receiver on a queue with sessions enabled. -- Runs following sequence of steps in a long running loop. -Sends a message -> receives a message -> performs random message disposition option - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace and queue name -must be supplied. The queue must have sessions enabled. -*/ - -import { - SendableMessageInfo, - OnMessage, - OnError, - delay, - ServiceBusMessage, - ReceiveMode -} from "../../src"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; - -const connectionString = ""; -const queueName = ""; - -const testDurationInMilliseconds = 60000 * 5 * 12 * 24 * 7; // 1 week - -const messagesToProcess: Set = new Set(); - -let abandonAttempt = 0; -let abandonCount = 0; -let completeCount = 0; -let deadletterCount = 0; -let deferCount = 0; - -let msgId = 1; - -let snapshotIntervalID: any; - -let isJobDone = false; - -async function main(): Promise { - snapshotIntervalID = setInterval(snapshot, 5000); // Every 5 seconds - const sendPromise = sendMessages(); - const receivePromise = receiveMessages(); - await Promise.all([sendPromise, receivePromise]); -} - -async function sendMessages(): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - try { - const sender = client.createSender(); - - while (!isJobDone) { - const message: SendableMessageInfo = { - messageId: msgId, - body: "test", - label: `${msgId}`, - sessionId: "session-1" - }; - messagesToProcess.add(msgId); - msgId++; - await sender.send(message); - await delay(2000); // Throttling send to not increase queue size - } - } finally { - await client.close(); - await ns.close(); - } -} - -async function receiveMessages(): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - - try { - const receiver = client.createReceiver(ReceiveMode.peekLock, { sessionId: "session-1" }); - const onMessageHandler: OnMessage = async (brokeredMessage: ServiceBusMessage) => { - const receivedMsgId = brokeredMessage.messageId; - - if (typeof receivedMsgId !== "number") { - throw new Error("MessageId is corrupt or is of unexpected type"); - } - - /* - Since there are 4 ways a message can be disposed namely abandon(), complete(), - defer() and deadletter(), the randomization factor is chosen to be 4. - */ - const seed = Math.floor((Math.random() * 10) % 4); - - switch (seed) { - case 0: { - abandonAttempt++; - if (brokeredMessage.deliveryCount === 10) { - abandonCount++; - if (messagesToProcess.has(receivedMsgId)) { - messagesToProcess.delete(receivedMsgId); - } - } - await brokeredMessage.abandon(); - break; - } - case 1: { - completeCount++; - if (messagesToProcess.has(receivedMsgId)) { - messagesToProcess.delete(receivedMsgId); - } - await brokeredMessage.complete(); - break; - } - case 2: { - deadletterCount++; - if (messagesToProcess.has(receivedMsgId)) { - messagesToProcess.delete(receivedMsgId); - } - await brokeredMessage.deadLetter(); - break; - } - case 3: { - deferCount++; - if (messagesToProcess.has(receivedMsgId)) { - messagesToProcess.delete(receivedMsgId); - } - await brokeredMessage.defer(); - break; - } - default: { - throw new Error("Unexpected seed"); - } - } - }; - const onErrorHandler: OnError = (err: Error) => { - throw err; - }; - - receiver.registerMessageHandler(onMessageHandler, onErrorHandler, { autoComplete: false }); - await delay(testDurationInMilliseconds); - - isJobDone = true; - - await receiver.close(); - clearInterval(snapshotIntervalID); - } finally { - await client.close(); - await ns.close(); - } -} - -function snapshot(): void { - console.log("Time : ", new Date()); - console.log("Number of messages not processed yet : ", messagesToProcess.size); - console.log("Number of messages sent so far : ", msgId); - console.log("Number of messages abandoned : ", abandonCount); - console.log("Number of messages completed : ", completeCount); - console.log("Number of messages deadlettered : ", deadletterCount); - console.log("Number of messages deferred : ", deferCount); - console.log("Number of abandon attempts on messages : ", abandonAttempt); - console.log("\n"); -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/stress_sessionAutolockRenewal.ts b/sdk/servicebus/service-bus/test/stress/stress_sessionAutolockRenewal.ts deleted file mode 100644 index b5272259b199..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_sessionAutolockRenewal.ts +++ /dev/null @@ -1,118 +0,0 @@ -/* -Test Scenario summary: -- Creates a single sender and a single receiver on a session enabled queue. -- Receives a message and holds onto it for the duration of test. AutoLockRenewal on session is enabled and set to the test duration. - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace and queue name -must be supplied. -*/ - -import { - SendableMessageInfo, - OnMessage, - OnError, - delay, - ReceiveMode, - ServiceBusMessage -} from "../../src"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; - -const connectionString = ""; -const queueName = ""; - -let elapsedTime = 0; -const interval = 60000; -const testDurationInMilliseconds = 60000 * 5 * 12 * 24; // 24 hours - -let receivedMessage: ServiceBusMessage; - -async function main(): Promise { - await sendMessage("session-1"); - await receiveMessage("session-1"); -} - -async function sendMessage(sessionId: string): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - try { - const sender = client.createSender(); - - const message: SendableMessageInfo = { - messageId: "test", - body: "test", - label: `test`, - sessionId: sessionId - }; - - await sender.send(message); - } finally { - await client.close(); - await ns.close(); - } -} - -async function receiveMessage(sessionId: string): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - - try { - const receiver = client.createReceiver(ReceiveMode.peekLock, { - sessionId: sessionId, - autoRenewLockDurationInMs: testDurationInMilliseconds - }); - const onMessageHandler: OnMessage = async (brokeredMessage) => { - receivedMessage = brokeredMessage; - if (receivedMessage.messageId !== "test") { - throw new Error("Message is corrupt or unexpected"); - } - console.log("Received message: ", receivedMessage.messageId); - - while (elapsedTime < testDurationInMilliseconds) { - // simulate the user making an async call that takes time. - await delay(interval); - elapsedTime += interval; - - // log how long we've executed. - console.log(`still executing after ${elapsedTime}`); - - console.log("Time now: ", Date.now()); - console.log("Processing message:"); - console.log("MessageId: ", receivedMessage.messageId); - console.log("Delivery count: ", receivedMessage.deliveryCount); - console.log("LockedUntilUTC: ", receivedMessage.lockedUntilUtc); - console.log("Sequence number: ", receivedMessage.sequenceNumber); - console.log("TimeToLive: ", receivedMessage.timeToLive); - console.log("Message content: ", receivedMessage); - console.log("\n"); - } - - console.log("Trying to complete message: ", receivedMessage.messageId); - await brokeredMessage.complete(); - console.log("Completed message: ", receivedMessage.messageId); - }; - - const onErrorHandler: OnError = (err) => { - // Since implementation of onError handler in SDK is such that it is supposed to be the - // terminal executing code, any error while processing message will surface by crashing the process - console.log("Error thrown by user's OnError handler", err); - throw err; - }; - - receiver.registerMessageHandler(onMessageHandler, onErrorHandler, { - autoComplete: false - }); - - await delay(testDurationInMilliseconds + 5000); - await receiver.close(); - } finally { - await client.close(); - await ns.close(); - } -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/stress_sessionManualLockRenewal.ts b/sdk/servicebus/service-bus/test/stress/stress_sessionManualLockRenewal.ts deleted file mode 100644 index dc9659f6954e..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_sessionManualLockRenewal.ts +++ /dev/null @@ -1,124 +0,0 @@ -/* -Test Scenario summary: -- Creates a single sender and a single receiver on a session enabled queue. -- Receives a message and holds onto it for the duration of test. -- Autolockrenewal is disabled on session and lock renewal is manually done on the session periodically in user code. - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace and queue name -must be supplied. -*/ - -import { - SendableMessageInfo, - OnMessage, - OnError, - delay, - ReceiveMode, - ServiceBusMessage -} from "../../src"; -import { InternalSessionReceiver } from "../../src/internalReceivers"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; - -const connectionString = ""; -const queueName = ""; - -let elapsedTime = 0; -const interval = 10000; -const testDurationInMilliseconds = 60000 * 5 * 12 * 24; // 24 hours - -let receivedMessage: ServiceBusMessage; - -async function main(): Promise { - await sendMessage("sessionId-1"); - await receiveMessage("sessionId-1"); -} - -async function sendMessage(sessionId: string): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - try { - const sender = client.createSender(); - - const message: SendableMessageInfo = { - messageId: "test", - body: "test", - label: `test`, - sessionId: sessionId - }; - - await sender.send(message); - } finally { - await client.close(); - await ns.close(); - } -} - -async function receiveMessage(sessionId: string): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - - try { - const receiver = client.createReceiver(ReceiveMode.peekLock, { - sessionId: sessionId, - autoRenewLockDurationInMs: 0 - }); - const onMessageHandler: OnMessage = async (brokeredMessage) => { - receivedMessage = brokeredMessage; - if (receivedMessage.messageId !== "test") { - throw new Error("Message is corrupt or unexpected"); - } - console.log("Received message: ", receivedMessage.messageId); - - const startTime = Date.now(); - while (elapsedTime < testDurationInMilliseconds) { - // simulate the user making an async call that takes time. - await delay(interval); - await receiver.renewSessionLock(); - elapsedTime = Date.now() - startTime; - - // log how long we've executed. - console.log(`still executing after ${elapsedTime}`); - - console.log("Time now: ", new Date().getUTCDate()); - console.log("Session LockedUntilUTC: ", receiver.sessionLockedUntilUtc); - - console.log("Processing message:"); - console.log("MessageId: ", receivedMessage.messageId); - console.log("Delivery count: ", receivedMessage.deliveryCount); - console.log("Message LockedUntilUTC: ", receivedMessage.lockedUntilUtc); - console.log("Sequence number: ", receivedMessage.sequenceNumber); - console.log("TimeToLive: ", receivedMessage.timeToLive); - console.log("Message content: ", receivedMessage); - console.log("\n"); - } - - console.log("Trying to complete message: ", receivedMessage.messageId); - await brokeredMessage.complete(); - console.log("Completed message: ", receivedMessage.messageId); - }; - - const onErrorHandler: OnError = (err) => { - // Since implementation of onError handler in SDK is such that it is supposed to be the - // terminal executing code, any error while processing message will surface by crashing the process - console.log("Error thrown by user's OnError handler", err); - throw err; - }; - - receiver.registerMessageHandler(onMessageHandler, onErrorHandler, { - autoComplete: false - }); - - await delay(testDurationInMilliseconds + 30000); - await receiver.close(); - } finally { - await client.close(); - await ns.close(); - } -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/stress_sessionState.ts b/sdk/servicebus/service-bus/test/stress/stress_sessionState.ts deleted file mode 100644 index cfa6ee80e02f..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_sessionState.ts +++ /dev/null @@ -1,75 +0,0 @@ -/* -Test Scenario summary: -- Creates a single session receiver on a session enabled queue. -- Autolockrenewal is enabled and set to test duration. -- Set state to value #1, wait for half test durations (12 hours), get state to retrieve value #1 -- Wait for other half test durations (12 hours), set state to value #2, get state to retrieve value #2 -- Set state to value #3 and get state to retrieve value #3 - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace and queue name -must be supplied. -*/ - -import { delay, ReceiveMode } from "../../src"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; - -const connectionString = ""; -const queueName = ""; - -const testDurationInMilliseconds = 60000 * 5 * 12 * 24; // 24 hours - -async function main(): Promise { - await setGetSessionState("session-1"); -} - -async function setGetSessionState(sessionId: string): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - - try { - const receiver = client.createReceiver(ReceiveMode.peekLock, { - sessionId: sessionId, - autoRenewLockDurationInMs: testDurationInMilliseconds + 60 * 1000 - }); - - const firstState = { testKey: "testValue-a" }; - await receiver.setState(firstState); - - await delay(testDurationInMilliseconds / 2); - - const retrievedFirstState = await receiver.getState(); - console.log(`Value of first state - ${retrievedFirstState.testKey}`); - - if (retrievedFirstState.testKey !== firstState.testKey) { - throw new Error( - `Step 1 - Expected ${firstState.testKey} but got ${retrievedFirstState.testKey}` - ); - } - - await delay(testDurationInMilliseconds / 2); - - const secondState = { testKey: "testValue-b" }; - await receiver.setState(secondState); - - const retrievedSecondState = await receiver.getState(); - console.log(`Value of second state - ${retrievedSecondState.testKey}`); - - if (retrievedSecondState.testKey !== secondState.testKey) { - throw new Error( - `Step 2 - Expected ${secondState.testKey} but got ${retrievedSecondState.testKey}` - ); - } - - await receiver.close(); - } finally { - await client.close(); - await ns.close(); - } -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/stress_singleClient.ts b/sdk/servicebus/service-bus/test/stress/stress_singleClient.ts deleted file mode 100644 index 68f912085772..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_singleClient.ts +++ /dev/null @@ -1,76 +0,0 @@ -/* -Test Scenario summary: -Runs following sequence of steps in a long running loop: -Single client is created -> single sender is created -> -message is sent -> message received -> message completed -> -receiver closed -> client closed - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace and queue name -must be supplied. -*/ - -import { SendableMessageInfo, ReceiveMode } from "../../src"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; - -const connectionString = ""; -const queueName = ""; - -const testDurationInMilliseconds = 60000 * 5 * 12 * 24 * 7; // 1 week - -let msgId = 1; - -let snapshotIntervalID: any; - -let isJobDone = false; - -async function main(): Promise { - snapshotIntervalID = setInterval(snapshot, 5000); // Every 5 seconds - setTimeout(() => { - isJobDone = true; - }, testDurationInMilliseconds); - - await sendReceiveMessages(); -} - -async function sendReceiveMessages(): Promise { - const ns = new ServiceBusClient(connectionString); - - try { - while (!isJobDone) { - const client = ns.createQueueClient(queueName); - const sender = client.createSender(); - - const message: SendableMessageInfo = { - messageId: msgId, - body: "test", - label: `${msgId}` - }; - msgId++; - await sender.send(message); - await sender.close(); - - const receiver = client.createReceiver(ReceiveMode.peekLock); - const messagesReceived = await receiver.receiveMessages(1); - await messagesReceived[0].complete(); - await receiver.close(); - - await client.close(); - } - } finally { - await ns.close(); - clearInterval(snapshotIntervalID); - } -} - -function snapshot(): void { - console.log("Time : ", new Date()); - console.log("Number of clients opened, closed successfully so far : ", msgId); - console.log("\n"); -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/stress_singleMessageComplete.ts b/sdk/servicebus/service-bus/test/stress/stress_singleMessageComplete.ts deleted file mode 100644 index 707a02f54bca..000000000000 --- a/sdk/servicebus/service-bus/test/stress/stress_singleMessageComplete.ts +++ /dev/null @@ -1,106 +0,0 @@ -/* -Test Scenario summary: -Creates a single sender and a single receiver on a queue. -Runs following sequence of steps in a long running loop. -Sends a message -> receives a message -> completes the message. - -The test assumes no other process is working with the queues defined in here, -but the queues must be empty and use default configurations before running the test. - -For running this test, connection string of the Service Bus namespace and queue name -must be supplied. -*/ - -import { SendableMessageInfo, OnMessage, OnError, delay, ReceiveMode } from "../../src"; -import { ServiceBusClient } from "../../src/old/serviceBusClient"; - -const connectionString = ""; -const queueName = ""; - -const testDurationInMilliseconds = 60000 * 5 * 12 * 24 * 7; // 1 week - -const messageMap: Set = new Set(); -let msgId = 1; - -let snapshotIntervalID: any; - -let isJobDone = false; - -async function main(): Promise { - snapshotIntervalID = setInterval(snapshot, 5000); // Every 5 seconds - const sendPromise = sendMessages(); - const receivePromise = receiveMessages(); - await Promise.all([sendPromise, receivePromise]); -} - -async function sendMessages(): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - try { - const sender = client.createSender(); - - while (!isJobDone) { - const message: SendableMessageInfo = { - messageId: msgId, - body: "test", - label: `${msgId}` - }; - messageMap.add(msgId); - msgId++; - await sender.send(message); - await delay(2000); // Throttling send to not increase queue size - } - } finally { - await client.close(); - await ns.close(); - } -} - -async function receiveMessages(): Promise { - const ns = new ServiceBusClient(connectionString); - const client = ns.createQueueClient(queueName); - - try { - const receiver = client.createReceiver(ReceiveMode.peekLock); - const onMessageHandler: OnMessage = async (brokeredMessage) => { - const receivedMsgId = brokeredMessage.messageId; - - if (typeof receivedMsgId !== "number") { - throw new Error("MessageId is corrupt or is of unexpected type"); - } - - if (!messageMap.has(receivedMsgId)) { - throw new Error("Received message that is not recorded in internal map."); - } - - messageMap.delete(receivedMsgId); - - await brokeredMessage.complete(); - }; - const onErrorHandler: OnError = (err) => { - throw err; - }; - - receiver.registerMessageHandler(onMessageHandler, onErrorHandler, { autoComplete: false }); - await delay(testDurationInMilliseconds); - - isJobDone = true; - - await receiver.close(); - clearInterval(snapshotIntervalID); - } finally { - await client.close(); - await ns.close(); - } -} - -function snapshot(): void { - console.log("Time : ", new Date()); - console.log("Map Size : ", messageMap.size); - console.log("Number of messages sent and received successfully so far : ", msgId); - console.log("\n"); -} - -main().catch((err) => { - console.log("Error occurred: ", err); -}); diff --git a/sdk/servicebus/service-bus/test/stress/tsconfig.json b/sdk/servicebus/service-bus/test/stress/tsconfig.json index 3473b9f65ab2..2c7079025a21 100644 --- a/sdk/servicebus/service-bus/test/stress/tsconfig.json +++ b/sdk/servicebus/service-bus/test/stress/tsconfig.json @@ -1,3 +1,12 @@ { - "extends": "../../samples/tsconfig.json" -} \ No newline at end of file + "compilerOptions": { + "target": "ES2015", + "moduleResolution": "node", + "esModuleInterop": true, + "lib": ["ESNext.AsyncIterable"], + "outDir": "dist", + "rootDir": "." + }, + "include": ["./*.ts"], + "exclude": ["node_modules"] +} diff --git a/sdk/servicebus/service-bus/test/stress-test-track-2/utils.ts b/sdk/servicebus/service-bus/test/stress/utils.ts similarity index 100% rename from sdk/servicebus/service-bus/test/stress-test-track-2/utils.ts rename to sdk/servicebus/service-bus/test/stress/utils.ts