Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Service Bus] [Core AMQP] Merge AmqpMessage{Header|Properties} exported from both the libraries #12091

Merged
28 commits merged into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4e20516
Amqp{Header|Properties} -> RheaAmqp{Header|Properties}
HarshaNalluru Oct 27, 2020
9b8b905
Api report and fix build failures
HarshaNalluru Oct 27, 2020
36ca283
reuse core-amqp types
HarshaNalluru Oct 27, 2020
7a7b701
API report - service-bus
HarshaNalluru Oct 27, 2020
8a4e6d4
fix build failures
HarshaNalluru Oct 27, 2020
bcf7a0e
API Report
HarshaNalluru Oct 27, 2020
30f16f1
Export AmqpAnnotatedMessage from core-amqp
HarshaNalluru Oct 28, 2020
11f412d
remove AMQPAnnotatedMessage from exports - API Report
HarshaNalluru Oct 28, 2020
f510cae
Do not export RheaAmqpMessageHeader and RheaAmqpMessageProps
HarshaNalluru Oct 28, 2020
ef92276
API report and fix build failures
HarshaNalluru Oct 28, 2020
779aa91
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js in…
HarshaNalluru Oct 28, 2020
132d942
API Report
HarshaNalluru Oct 28, 2020
dc691cf
RheaAmqp to Rhea
HarshaNalluru Oct 29, 2020
6392a44
changelog and version update
HarshaNalluru Oct 29, 2020
b0b94a6
remove AmqpHeader and AmqpProps from the exports in index.ts
HarshaNalluru Oct 29, 2020
35a38ec
API Report
HarshaNalluru Oct 29, 2020
43f0396
core-amqp version update in service-bus
HarshaNalluru Oct 29, 2020
ad6da86
update core-amqp for event-hubs
HarshaNalluru Oct 29, 2020
37ed73a
Move changes to v2
HarshaNalluru Oct 29, 2020
6786692
pin beta versions
HarshaNalluru Oct 29, 2020
a745549
Make _amqpAnnotatedMessage optional in the ReceivedMessage since it i…
HarshaNalluru Oct 29, 2020
e87d659
changelog for service-bus and API report and fix build failures
HarshaNalluru Oct 29, 2020
dfe570a
Changelog for core-amqp v2
HarshaNalluru Oct 29, 2020
ba70742
Amqp spec link
HarshaNalluru Oct 29, 2020
1f0cde3
Revert "Make _amqpAnnotatedMessage optional in the ReceivedMessage si…
HarshaNalluru Oct 29, 2020
8738f03
remove bang(!)
HarshaNalluru Oct 29, 2020
cf1eab8
http -> https
HarshaNalluru Oct 29, 2020
e90c03f
API Report
HarshaNalluru Oct 29, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/core/core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Release History

## 1.1.7 (Unreleased)
## 2.0.0-beta.1 (Unreleased)

- Internal improvement - Previously, each `RequestResponseLink.sendRequest` call adds an "onMessage" listener to the `ReceiverEvents.message` event and keeps discarding the responses that did not match the request-id and returns the response if matched. Adding many listeners would also result in a warning such as `MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 message listeners added to [Receiver]. Use emittr.setMaxListeners() to increase limit`.
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
This has been improved to reuse a single listener for all the requests by maintaining a map of deferred promises that would be resolved(or rejected) upon receiving a message event.
[PR 11749](https://github.com/Azure/azure-sdk-for-js/pull/11749)
- TODO: Mention the changes to AmqpMessageHeaders, Props, and AnnotatedMessage addition

## 1.1.6 (2020-09-08)

Expand Down
2 changes: 1 addition & 1 deletion sdk/core/core-amqp/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/core-amqp",
"sdk-type": "client",
"version": "1.1.7",
"version": "2.0.0-beta.1",
"description": "Common library for amqp based azure sdks like @azure/event-hubs.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down
104 changes: 62 additions & 42 deletions sdk/core/core-amqp/review/core-amqp.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import { AbortSignalLike } from '@azure/abort-controller';
import { AccessToken } from '@azure/core-auth';
import { AmqpError } from 'rhea-promise';
import { Message as AmqpMessage } from 'rhea-promise';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we keep this as Message and not use the Amqp prefix here? Everywhere else where we use the Amqp prefix, we are referring to the type/interface defined in core-amqp which has camel casing. Where as this AmqpMessage is from rhea-promise. To be consistent with MessageHeader and MessageProperties from rhea-promise, this should just Message as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We either do that or use the Rhea prefix

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking it, but refrained from doing since it would change service-bus and event-hubs.
But it is probably worth doing it.. will do!

import { MessageHeader as AmqpMessageHeader } from 'rhea-promise';
import { MessageProperties as AmqpMessageProperties } from 'rhea-promise';
import AsyncLock from 'async-lock';
import { Connection } from 'rhea-promise';
import { Dictionary } from 'rhea-promise';
import { isAmqpError } from 'rhea-promise';
import { isTokenCredential } from '@azure/core-auth';
import { MessageHeader } from 'rhea-promise';
import { MessageProperties } from 'rhea-promise';
import { Receiver } from 'rhea-promise';
import { ReceiverOptions } from 'rhea-promise';
import { ReqResLink } from 'rhea-promise';
Expand All @@ -26,11 +26,69 @@ import { WebSocketImpl } from 'rhea-promise';

export { AccessToken }

// @public
export interface AmqpAnnotatedMessage {
applicationProperties?: {
[key: string]: any;
};
body: any;
deliveryAnnotations?: {
[key: string]: any;
};
footer?: {
[key: string]: any;
};
header?: AmqpMessageHeader;
messageAnnotations?: {
[key: string]: any;
};
properties?: AmqpMessageProperties;
}

// @public
export const AmqpAnnotatedMessage: {
fromRheaMessage(msg: AmqpMessage): AmqpAnnotatedMessage;
};

export { AmqpMessage }

export { AmqpMessageHeader }
// @public
export interface AmqpMessageHeader {
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
deliveryCount?: number;
durable?: boolean;
firstAcquirer?: boolean;
priority?: number;
timeToLive?: number;
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
}

// @public
export const AmqpMessageHeader: {
toRheaMessageHeader(props: AmqpMessageHeader): MessageHeader;
fromRheaMessageHeader(props: MessageHeader): AmqpMessageHeader;
};

// @public
export interface AmqpMessageProperties {
absoluteExpiryTime?: number;
contentEncoding?: string;
contentType?: string;
correlationId?: string | number | Buffer;
creationTime?: number;
groupId?: string;
groupSequence?: number;
messageId?: string | number | Buffer;
replyTo?: string;
replyToGroupId?: string;
subject?: string;
to?: string;
userId?: string;
}

export { AmqpMessageProperties }
// @public
export const AmqpMessageProperties: {
toRheaMessageProperties(props: AmqpMessageProperties): MessageProperties;
fromRheaMessageProperties(props: MessageProperties): AmqpMessageProperties;
};

export { AsyncLock }

Expand Down Expand Up @@ -507,44 +565,6 @@ export const logger: import("@azure/logger").AzureLogger;
// @public
export type MessageErrorCodes = "AddressAlreadyInUseError" | "StoreLockLostError" | "NoMatchingSubscriptionError" | "PartitionNotOwnedError" | "PublisherRevokedError" | "MessagingEntityAlreadyExistsError" | "MessagingEntityDisabledError" | "MessageLockLostError" | "SessionLockLostError" | "SessionCannotBeLockedError" | "InternalServerError" | "ServiceCommunicationError" | "MessageNotFoundError" | "RelayNotFoundError" | "NotImplementedError" | "InvalidOperationError" | "QuotaExceededError" | "UnauthorizedError" | "ServiceUnavailableError" | "MessageWaitTimeout" | "ArgumentOutOfRangeError" | "PreconditionFailedError" | "DecodeError" | "InvalidFieldError" | "ResourceLockedError" | "ResourceDeletedError" | "IllegalStateError" | "FrameSizeTooSmallError" | "DetachForcedError" | "TransferLimitExceededError" | "MessageTooLargeError" | "LinkRedirectError" | "ReceiverDisconnectedError" | "SessionWindowViolationError" | "ErrantLinkError" | "HandleInUseError" | "UnattachedHandleError" | "ConnectionForcedError" | "FramingError" | "ConnectionRedirectError" | "ServerBusyError" | "ArgumentError" | "OperationCancelledError" | "SenderBusyError" | "SystemError";

// @public
export interface MessageHeader {
deliveryCount?: number;
durable?: boolean;
firstAcquirer?: boolean;
priority?: number;
ttl?: number;
}

// @public
export const MessageHeader: {
toAmqpMessageHeader(props: MessageHeader): AmqpMessageHeader;
fromAmqpMessageHeader(props: AmqpMessageHeader): MessageHeader;
};

// @public
export interface MessageProperties {
absoluteExpiryTime?: number;
contentEncoding?: string;
contentType?: string;
correlationId?: string | number | Buffer;
creationTime?: number;
groupId?: string;
groupSequence?: number;
messageId?: string | number | Buffer;
replyTo?: string;
replyToGroupId?: string;
subject?: string;
to?: string;
userId?: string;
}

// @public
export const MessageProperties: {
toAmqpMessageProperties(props: MessageProperties): AmqpMessageProperties;
fromAmqpMessageProperties(props: AmqpMessageProperties): MessageProperties;
};

// @public
export class MessagingError extends Error {
constructor(message: string, originalError?: Error);
Expand Down
63 changes: 63 additions & 0 deletions sdk/core/core-amqp/src/amqpAnnotatedMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
/* eslint-disable eqeqeq */
import { AmqpMessageHeader } from "./messageHeader";
import { AmqpMessageProperties } from "./messageProperties";
import { Message as AmqpMessage } from "rhea-promise";

/**
* Describes the AmqpAnnotatedMessage, part of the ServiceBusReceivedMessage(as `amqpAnnotatedMessage` property).
*/
export interface AmqpAnnotatedMessage {
/**
* Describes the defined set of standard header properties of the message.
*/
header?: AmqpMessageHeader;
/**
* Describes set of footer properties of the message.
*/
footer?: { [key: string]: any };
/**
* A dictionary containing message attributes that will be held in the message header
*/
messageAnnotations?: { [key: string]: any };
/**
* A dictionary used for delivery-specific
* non-standard properties at the head of the message.
*/
deliveryAnnotations?: { [key: string]: any };
/**
* A dictionary containing application specific message properties.
*/
applicationProperties?: { [key: string]: any };
/**
* Describes the defined set of standard properties of the message.
*/
properties?: AmqpMessageProperties;
/**
* The message body.
*/
body: any;
}

/**
* Describes the operations that can be performed on(or to get) the AmqpAnnotatedMessage.
*/
export const AmqpAnnotatedMessage = {
/**
* Takes AmqpMessage(`Message` type from "rhea") and returns it in the AmqpAnnotatedMessage format.
*
* @param {AmqpMessage} msg
*/
fromRheaMessage(msg: AmqpMessage): AmqpAnnotatedMessage {
return {
header: AmqpMessageHeader.fromRheaMessageHeader(msg),
footer: (msg as any).footer,
messageAnnotations: msg.message_annotations,
deliveryAnnotations: msg.delivery_annotations,
applicationProperties: msg.application_properties,
properties: AmqpMessageProperties.fromRheaMessageProperties(msg),
body: msg.body
};
}
};
13 changes: 4 additions & 9 deletions sdk/core/core-amqp/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,14 @@ export { IotHubConnectionConfig } from "./connectionConfig/iothubConnectionConfi

export { CbsClient, CbsResponse } from "./cbs";
export { Constants } from "./util/constants";
export { MessageHeader } from "./messageHeader";
export { MessageProperties } from "./messageProperties";
export { AmqpMessageHeader } from "./messageHeader";
export { AmqpMessageProperties } from "./messageProperties";
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
export {
ConnectionContextBase,
ConnectionProperties,
CreateConnectionContextBaseParameters
} from "./ConnectionContextBase";
export {
Dictionary,
Message as AmqpMessage,
isAmqpError,
MessageHeader as AmqpMessageHeader,
MessageProperties as AmqpMessageProperties
} from "rhea-promise";
export { Dictionary, Message as AmqpMessage, isAmqpError } from "rhea-promise";
export {
MessagingError,
MessageErrorCodes,
Expand Down Expand Up @@ -64,4 +58,5 @@ export {
isNode,
WebSocketOptions
} from "./util/utils";
export { AmqpAnnotatedMessage } from "./amqpAnnotatedMessage";
export { logger } from "./log";
38 changes: 19 additions & 19 deletions sdk/core/core-amqp/src/messageHeader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
// Licensed under the MIT license.
/* eslint-disable eqeqeq */

import { MessageHeader as AmqpMessageHeader } from "rhea-promise";
import { MessageHeader as RheaMessageHeader } from "rhea-promise";
import { logger } from "./log";

/**
* Describes the defined set of standard header properties of the message.
*/
export interface MessageHeader {
export interface AmqpMessageHeader {
/**
* @property {boolean} [firstAcquirer] If this value is true, then this message has not been
* acquired by any other link. If this value is false, then this message MAY have previously
Expand All @@ -20,9 +20,9 @@ export interface MessageHeader {
*/
deliveryCount?: number;
/**
* @property {number} [ttl] time to live in ms.
* @property {number} [timeToLive] time to live in ms.
*/
ttl?: number;
timeToLive?: number;
/**
* @property {boolean} [durable] Specifies durability requirements.
*/
Expand All @@ -38,15 +38,15 @@ export interface MessageHeader {
* Describes the operations that can be performed on the message header.
* @module MessageHeader
*/
export const MessageHeader = {
export const AmqpMessageHeader = {
/**
* Converts MessageHeader to AmqpMessageHeader.
* Converts MessageHeader to RheaMessageHeader.
*
* @param {MessageHeader} props Message header.
* @returns {AmqpMessageHeader} AmqpMessageHeader
* @returns {RheaMessageHeader} RheaMessageHeader
*/
toAmqpMessageHeader(props: MessageHeader): AmqpMessageHeader {
const amqpHeader: AmqpMessageHeader = {};
toRheaMessageHeader(props: AmqpMessageHeader): RheaMessageHeader {
const amqpHeader: RheaMessageHeader = {};
if (props.deliveryCount != undefined) {
amqpHeader.delivery_count = props.deliveryCount;
}
Expand All @@ -57,21 +57,21 @@ export const MessageHeader = {
if (props.priority != undefined) {
amqpHeader.priority = props.priority;
}
if (props.ttl != undefined) {
amqpHeader.ttl = props.ttl;
if (props.timeToLive != undefined) {
amqpHeader.ttl = props.timeToLive;
}
logger.verbose("To AmqpMessageHeader: %O", amqpHeader);
logger.verbose("To RheaMessageHeader: %O", amqpHeader);
return amqpHeader;
},

/**
* Converts AmqpMessageHeader to MessageHeader.
* Converts RheaMessageHeader to MessageHeader.
*
* @param {AmqpMessageHeader} props Amqp Message Header
* @returns {MessageHeader} MessageHeader.
* @param {RheaMessageHeader} props Amqp Message Header
* @returns {AmqpMessageHeader} MessageHeader.
*/
fromAmqpMessageHeader(props: AmqpMessageHeader): MessageHeader {
const msgHeader: MessageHeader = {};
fromRheaMessageHeader(props: RheaMessageHeader): AmqpMessageHeader {
const msgHeader: AmqpMessageHeader = {};
if (props.delivery_count != undefined) {
msgHeader.deliveryCount = props.delivery_count;
}
Expand All @@ -85,9 +85,9 @@ export const MessageHeader = {
msgHeader.priority = props.priority;
}
if (props.ttl != undefined) {
msgHeader.ttl = props.ttl;
msgHeader.timeToLive = props.ttl;
}
logger.verbose("From AmqpMessageHeader: %O", msgHeader);
logger.verbose("From RheaMessageHeader: %O", msgHeader);
return msgHeader;
}
};
Loading