Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ServiceBus] add state property to ServiceBusReceivedMessage #18938

Merged
merged 9 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions common/config/rush/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sdk/core/core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Changed TS compilation target to ES2017 in order to produce smaller bundles and use more native platform features
- With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details.
- Updated to use the latest version of the `rhea` package.
- Add a constant `messageState` with value of `"x-opt-message-state"` which is the name for the new message state property in the message annotations. [PR 18938](https://github.com/Azure/azure-sdk-for-js/pull/18938)

## 3.0.0 (2021-06-09)

Expand Down
1 change: 1 addition & 0 deletions sdk/core/core-amqp/review/core-amqp.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ export const Constants: {
readonly publisher: "x-opt-publisher-name";
readonly viaPartitionKey: "x-opt-via-partition-key";
readonly deadLetterSource: "x-opt-deadletter-source";
readonly messageState: "x-opt-message-state";
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
readonly enqueuedTimeAnnotation: "amqp.annotation.x-opt-enqueued-time";
readonly offsetAnnotation: "amqp.annotation.x-opt-offset";
readonly sequenceNumberAnnotation: "amqp.annotation.x-opt-sequence-number";
Expand Down
1 change: 1 addition & 0 deletions sdk/core/core-amqp/src/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const Constants = {
publisher: "x-opt-publisher-name",
viaPartitionKey: "x-opt-via-partition-key",
deadLetterSource: "x-opt-deadletter-source",
messageState: "x-opt-message-state",
enqueuedTimeAnnotation: `amqp.annotation.x-opt-enqueued-time`,
offsetAnnotation: `amqp.annotation.x-opt-offset`,
sequenceNumberAnnotation: `amqp.annotation.x-opt-sequence-number`,
Expand Down
4 changes: 3 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Release History

## 7.4.1 (Unreleased)
## 7.5.0-beta.1 (Unreleased)

### Features Added

- Add `state` property to `ServiceBusReceivedMessage`. Its value is one of `"active"`, `"deferred"`, or `"scheduled"`. [PR 18938](https://github.com/Azure/azure-sdk-for-js/pull/18938)

### Breaking Changes

### Bugs Fixed
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@azure/service-bus",
"sdk-type": "client",
"author": "Microsoft Corporation",
"version": "7.4.1",
"version": "7.5.0-beta.1",
"license": "MIT",
"description": "Azure Service Bus SDK for JavaScript",
"homepage": "https://github.com/Azure/azure-sdk-for-js/tree/main/sdk/servicebus/service-bus/",
Expand Down Expand Up @@ -107,7 +107,7 @@
},
"dependencies": {
"@azure/abort-controller": "^1.0.0",
"@azure/core-amqp": "^3.0.0",
"@azure/core-amqp": "^3.1.0",
"@azure/core-asynciterator-polyfill": "^1.0.0",
"@azure/core-http": "^2.0.0",
"@azure/core-tracing": "1.0.0-preview.13",
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ export interface ServiceBusReceivedMessage extends ServiceBusMessage {
readonly lockToken?: string;
readonly _rawAmqpMessage: AmqpAnnotatedMessage;
readonly sequenceNumber?: Long_2;
readonly state: "active" | "deferred" | "scheduled";
}

// @public
Expand Down
26 changes: 25 additions & 1 deletion sdk/servicebus/service-bus/src/serviceBusMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ export interface ServiceBusMessageAnnotations extends MessageAnnotations {
* Annotation for the message being locked until.
*/
"x-opt-locked-until"?: Date | number;
/**
* Annotation for the message state.
*/
"x-opt-message-state"?: number;
}

/**
Expand Down Expand Up @@ -494,6 +498,11 @@ export interface ServiceBusReceivedMessage extends ServiceBusMessage {
* @readonly
*/
readonly deadLetterSource?: string;
/**
* State of the message can be active, deferred or scheduled. Deferred messages have deferred state,
* scheduled messages have scheduled state, all other messages have active state.
*/
readonly state: "active" | "deferred" | "scheduled";
/**
* The underlying raw amqp message.
* @readonly
Expand Down Expand Up @@ -574,11 +583,19 @@ export function fromRheaMessage(
-readonly [P in keyof T]: T[P];
}
>;
const props: PartialWritable<ServiceBusReceivedMessage> = {};
const props: PartialWritable<ServiceBusReceivedMessage> & {
state: "active" | "deferred" | "scheduled";
} = { state: "active" };
if (rheaMessage.message_annotations != null) {
if (rheaMessage.message_annotations[Constants.deadLetterSource] != null) {
props.deadLetterSource = rheaMessage.message_annotations[Constants.deadLetterSource];
}
const messageState = rheaMessage.message_annotations[Constants.messageState];
if (messageState === 1) {
props.state = "deferred";
} else if (messageState === 2) {
props.state = "scheduled";
}
if (rheaMessage.message_annotations[Constants.enqueueSequenceNumber] != null) {
props.enqueuedSequenceNumber =
rheaMessage.message_annotations[Constants.enqueueSequenceNumber];
Expand Down Expand Up @@ -848,6 +865,11 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage {
* @readonly
*/
readonly deadLetterSource?: string;
/**
* State of the message can be active, deferred or scheduled. Deferred messages have deferred state,
* scheduled messages have scheduled state, all other messages have active state.
*/
readonly state: "active" | "deferred" | "scheduled";
/**
* The associated delivery of the received message.
*/
Expand Down Expand Up @@ -882,6 +904,8 @@ export class ServiceBusMessageImpl implements ServiceBusReceivedMessage {
shouldReorderLockToken
);
Object.assign(this, restOfMessageProps);
this.state = restOfMessageProps.state; // to suppress error TS2564: Property 'state' has no initializer and is not definitely assigned in the constructor.

// Lock on a message is applicable only in peekLock mode, but the service sets
// the lock token even in receiveAndDelete mode if the entity in question is partitioned.
if (receiveMode === "receiveAndDelete") {
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/
export const packageJsonInfo = {
name: "@azure/service-bus",
version: "7.4.1"
version: "7.5.0-beta.1"
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ describe("ServiceBusClient live tests", () => {
should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array");
should.equal(msgs[0].body, testMessages.body, "MessageBody is different than expected");
should.equal(msgs.length, 1, "Unexpected number of messages");
should.equal(msgs[0].state, "active");
} finally {
await sbClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import { createServiceBusLogger, ServiceBusLogger } from "../../../src/log";
import { ProcessErrorArgs } from "../../../src/models";
import { ServiceBusError, translateServiceBusError } from "../../../src/serviceBusError";
import { MessagingError, RetryOperationType } from "@azure/core-amqp";
import { DispositionType, ServiceBusMessageImpl } from "../../../src/serviceBusMessage";
import {
DispositionType,
ServiceBusMessageImpl,
ServiceBusReceivedMessage
} from "../../../src/serviceBusMessage";
import { ConnectionContext } from "../../../src/connectionContext";
import { DispositionStatusOptions } from "../../../src/core/managementClient";
import { Delivery } from "rhea-promise";
Expand Down Expand Up @@ -343,12 +347,13 @@ it("error handler wrapper", () => {
});

it("getMessageIterator doesn't yield empty responses", async () => {
const messages = [
const messages: ServiceBusReceivedMessage[][] = [
[],
[
{
body: "hello",
_rawAmqpMessage: { body: "hello" }
_rawAmqpMessage: { body: "hello" },
state: "active"
}
]
];
Expand Down Expand Up @@ -380,7 +385,8 @@ it("getMessageIterator doesn't yield empty responses", async () => {
[
{
body: "hello",
_rawAmqpMessage: { body: "hello" }
_rawAmqpMessage: { body: "hello" },
state: "active"
}
],
allReceivedMessages,
Expand Down
3 changes: 3 additions & 0 deletions sdk/servicebus/service-bus/test/internal/unit/utils.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ describe("utils", () => {
const flags = "00";
const receivedMessage: ServiceBusReceivedMessage = {
body: "This is a test.",
state: "active",
enqueuedTimeUtc: new Date(),
applicationProperties: {
[TRACEPARENT_PROPERTY]: `00-${traceId}-${spanId}-${flags}`
Expand All @@ -356,6 +357,7 @@ describe("utils", () => {
const flags = "00";
const receivedMessage: ServiceBusReceivedMessage = {
body: "This is a test.",
state: "active",
enqueuedTimeUtc: new Date(),
applicationProperties: {
[TRACEPARENT_PROPERTY]: `99-${traceId}-${spanId}-${flags}`
Expand All @@ -374,6 +376,7 @@ describe("utils", () => {
it("should return undefined when ServiceBusMessage is not instrumented", function() {
const receivedMessage: ServiceBusReceivedMessage = {
body: "This is a test.",
state: "active",
enqueuedTimeUtc: new Date(),
_rawAmqpMessage: { body: "This is a test." }
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ describe("Deferred Messages", () => {
if (!deferredMsg) {
throw "No message received for sequence number";
}
should.equal(deferredMsg.state, "deferred");
should.equal(
!!(deferredMsg as any)["delivery"],
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ describe("Sender Tests", () => {
should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array");
should.equal(msgs.length, 1, "Unexpected number of messages");
should.equal(msgs[0].deliveryCount, 0, "DeliveryCount is different than expected");
should.equal(msgs[0].state, "active");

TestMessage.checkMessageContents(
testMessage,
Expand Down Expand Up @@ -158,6 +159,7 @@ describe("Sender Tests", () => {
const msgs = await receiver.receiveMessages(1);
const msgEnqueueTime = msgs[0].enqueuedTimeUtc ? msgs[0].enqueuedTimeUtc.valueOf() : 0;

should.equal(msgs[0].state, "scheduled");
should.equal(Array.isArray(msgs), true, "`ReceivedMessages` is not an array");
should.equal(msgs.length, 1, "Unexpected number of messages");
should.equal(
Expand Down