diff --git a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/artemis_commons.bal b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/artemis_commons.bal index 0528b54900c1..3f60569a3987 100644 --- a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/artemis_commons.bal +++ b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/artemis_commons.bal @@ -17,6 +17,7 @@ import ballerina/crypto; import ballerina/filepath; import ballerina/io; +import ballerina/time; # Constant for the artemis error code. public const ARTEMIS_ERROR_CODE = "{ballerina/artemis}ArtemisError"; diff --git a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/message.bal b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/message.bal index 889f30a5cdba..3ffb9004dc37 100644 --- a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/message.bal +++ b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/message.bal @@ -96,7 +96,6 @@ public type Message client object { # # + return - the `MessageConfiguration` of this message public function getConfig() returns MessageConfiguration { - self.configuration.freeze(); return self.configuration; } }; @@ -108,12 +107,20 @@ public type Message client object { # + priority - the message priority (between 0 and 9 inclusive) # + durable - whether the created message is durable or not # + routingType - `RoutingType` of the message +# + groupId - used to group messages so that the same consumer receives all the messages with a particular groupId +# + groupSequence - can use to specify a sequence within the group +# + correlationId - a header for associating the current message with some previous message or application-specific ID +# + replyTo - indicates which address a JMS consumer should reply to public type MessageConfiguration record {| - int? expiration = (); - int? timeStamp = (); + int expiration = 0; + int timeStamp = time:currentTime().time; byte priority = 0; boolean durable = true; RoutingType? routingType = (); + string? groupId = (); + int groupSequence = 0; + string? correlationId = (); + string? replyTo = (); |}; # ActiveMQ Artemis message types. diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConstants.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConstants.java index f433471fe02a..206e1ed00299 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConstants.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConstants.java @@ -79,7 +79,6 @@ public class ArtemisConstants { // Field names for Consumer public static final String FILTER = "filter"; public static final String AUTO_ACK = "autoAck"; - public static final String BROWSE_ONLY = "browseOnly"; public static final String QUEUE_CONFIG = "queueConfig"; public static final String QUEUE_NAME = "queueName"; public static final String TEMPORARY = "temporary"; @@ -95,6 +94,7 @@ public class ArtemisConstants { public static final String EXPIRATION = "expiration"; public static final String TIME_STAMP = "timeStamp"; public static final String PRIORITY = "priority"; + public static final String MESSAGE_CONFIG = "configuration"; // Field names for Producer public static final String RATE = "rate"; @@ -103,8 +103,13 @@ public class ArtemisConstants { // Common field names public static final String DURABLE = "durable"; public static final String ROUTING_TYPE = "routingType"; + public static final String GROUP_ID = "groupId"; + public static final String GROUP_SEQUENCE = "groupSequence"; + public static final String CORRELATION_ID = "correlationId"; + public static final String REPLY_TO = "replyTo"; public static final String AUTO_CREATED = "autoCreated"; - static final String MULTICAST = "MULTICAST"; + public static final String MULTICAST = "MULTICAST"; + public static final String ANYCAST = "ANYCAST"; // Field name for Session public static final String USERNAME = "username"; diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisUtils.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisUtils.java index 83e0db47a7bd..fbad311964cb 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisUtils.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisUtils.java @@ -56,7 +56,7 @@ public class ArtemisUtils { * @param message the error message * @param context the Ballerina context * @param exception the exception to be propagated - * @param logger the logger to log errors + * @param logger the logger to log errors */ public static void throwException(String message, Context context, Exception exception, Logger logger) { logger.error(message, exception); @@ -262,6 +262,44 @@ public static BValue getBArrayValue(ClientMessage message) { return new BValueArray(bytes); } + public static void populateMessageObj(ClientMessage clientMessage, Object transactionContext, + BMap messageObj) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap messageConfigObj = (BMap) messageObj.get(ArtemisConstants.MESSAGE_CONFIG); + populateMessageConfigObj(clientMessage, messageConfigObj); + + messageObj.addNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT, transactionContext); + messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, clientMessage); + } + + private static void populateMessageConfigObj(ClientMessage clientMessage, BMap messageConfigObj) { + messageConfigObj.put(ArtemisConstants.EXPIRATION, new BInteger(clientMessage.getExpiration())); + messageConfigObj.put(ArtemisConstants.TIME_STAMP, new BInteger(clientMessage.getTimestamp())); + messageConfigObj.put(ArtemisConstants.PRIORITY, new BByte(clientMessage.getPriority())); + messageConfigObj.put(ArtemisConstants.DURABLE, new BBoolean(clientMessage.isDurable())); + setRoutingTypeToConfig(messageConfigObj, clientMessage); + if (clientMessage.getGroupID() != null) { + messageConfigObj.put(ArtemisConstants.GROUP_ID, new BString(clientMessage.getGroupID().toString())); + } + messageConfigObj.put(ArtemisConstants.GROUP_SEQUENCE, new BInteger(clientMessage.getGroupSequence())); + if (clientMessage.getCorrelationID() != null) { + messageConfigObj.put(ArtemisConstants.CORRELATION_ID, + new BString(clientMessage.getCorrelationID().toString())); + } + if (clientMessage.getReplyTo() != null) { + messageConfigObj.put(ArtemisConstants.REPLY_TO, new BString(clientMessage.getReplyTo().toString())); + } + } + + public static void setRoutingTypeToConfig(BMap msgConfigObj, ClientMessage message) { + byte routingType = message.getType(); + if (routingType == RoutingType.MULTICAST.getType()) { + msgConfigObj.put(ArtemisConstants.ROUTING_TYPE, new BString(ArtemisConstants.MULTICAST)); + } else if (routingType == RoutingType.ANYCAST.getType()) { + msgConfigObj.put(ArtemisConstants.ROUTING_TYPE, new BString(ArtemisConstants.ANYCAST)); + } + } + private ArtemisUtils() { } diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/ArtemisMessageHandler.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/ArtemisMessageHandler.java index 6dfab63452cf..d61be787a9a1 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/ArtemisMessageHandler.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/ArtemisMessageHandler.java @@ -85,7 +85,7 @@ public void onMessage(ClientMessage clientMessage) { if (paramSize > 1) { dispatchResourceWithDataBinding(clientMessage, paramDetails); } else { - dispatchResource(clientMessage, createAndGetMessageBMap(onMessageResource, clientMessage, sessionObj)); + dispatchResource(clientMessage, createAndGetMessageObj(onMessageResource, clientMessage, sessionObj)); } } @@ -94,12 +94,10 @@ private void dispatchResourceWithDataBinding(ClientMessage clientMessage, List

sessionObj) { + private BValue createAndGetMessageObj(Resource onMessageResource, ClientMessage clientMessage, + BMap sessionObj) { ProgramFile programFile = onMessageResource.getResourceInfo().getPackageInfo().getProgramFile(); BMap messageObj = BLangConnectorSPIUtil.createBStruct( programFile, ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS, ArtemisConstants.MESSAGE_OBJ); - messageObj.addNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT, - sessionObj.getNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT)); - messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, clientMessage); + ArtemisUtils.populateMessageObj(clientMessage, + sessionObj.getNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT), + messageObj); return messageObj; } @@ -158,7 +156,6 @@ private BValue getContentForType(ClientMessage message, BType dataType) { default: throw new ArtemisConnectorException( "The content type of the message received does not match the resource signature type."); - } } diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Receive.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Receive.java index 83590fcd79b5..188280470ad3 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Receive.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Receive.java @@ -73,7 +73,7 @@ public void execute(Context context) { ClientMessage clientMessage = consumer.receive(timeInMilliSeconds); BMap messageObj = BLangConnectorSPIUtil.createBStruct( context, ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS, ArtemisConstants.MESSAGE_OBJ); - messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, clientMessage); + ArtemisUtils.populateMessageObj(clientMessage, transactionContext, messageObj); if (autoAck) { clientMessage.acknowledge(); if (transactionContext != null) { diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/CreateMessage.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/CreateMessage.java index 67ecb7e69b7c..a6163ecae1a8 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/CreateMessage.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/CreateMessage.java @@ -45,6 +45,8 @@ import org.ballerinalang.natives.annotations.Receiver; import org.ballerinalang.stdlib.io.channels.base.Channel; import org.ballerinalang.stdlib.io.utils.IOConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; @@ -81,6 +83,7 @@ } ) public class CreateMessage extends BlockingNativeCallableUnit { + private static final Logger logger = LoggerFactory.getLogger(CreateMessage.class); @Override public void execute(Context context) { @@ -94,11 +97,15 @@ public void execute(Context context) { @SuppressWarnings(ArtemisConstants.UNCHECKED) BMap configObj = (BMap) context.getRefArgument(3); - long expiration = getIntFromIntOrNil(configObj.get(ArtemisConstants.EXPIRATION), 0); - long timeStamp = getIntFromIntOrNil(configObj.get(ArtemisConstants.TIME_STAMP), System.currentTimeMillis()); + long expiration = ((BInteger) configObj.get(ArtemisConstants.EXPIRATION)).intValue(); + long timeStamp = ((BInteger) configObj.get(ArtemisConstants.TIME_STAMP)).intValue(); byte priority = (byte) ((BByte) configObj.get(ArtemisConstants.PRIORITY)).byteValue(); boolean durable = ((BBoolean) configObj.get(ArtemisConstants.DURABLE)).booleanValue(); BValue routingType = configObj.get(ArtemisConstants.ROUTING_TYPE); + BValue groupId = configObj.get(ArtemisConstants.GROUP_ID); + int groupSequence = ArtemisUtils.getIntFromConfig(configObj, ArtemisConstants.GROUP_SEQUENCE, logger); + BValue correlationId = configObj.get(ArtemisConstants.CORRELATION_ID); + BValue replyTo = configObj.get(ArtemisConstants.REPLY_TO); ClientSession session = (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION); @@ -107,7 +114,16 @@ public void execute(Context context) { if (routingType instanceof BString) { message.setRoutingType(ArtemisUtils.getRoutingTypeFromString(routingType.stringValue())); } - + if (groupId instanceof BString) { + message.setGroupID(groupId.stringValue()); + } + message.setGroupSequence(groupSequence); + if (correlationId instanceof BString) { + message.setCorrelationID(correlationId.stringValue()); + } + if (replyTo instanceof BString) { + message.setReplyTo(new SimpleString(replyTo.stringValue())); + } if (messageType == Message.TEXT_TYPE) { TextMessageUtil.writeBodyText(message.getBodyBuffer(), new SimpleString(dataVal.stringValue())); } else if (messageType == Message.BYTES_TYPE) { @@ -159,11 +175,4 @@ private byte getMessageType(String type) { return Message.DEFAULT_TYPE; } } - - private long getIntFromIntOrNil(BValue value, long defaultVal) { - if (value instanceof BInteger) { - return ((BInteger) value).intValue(); - } - return defaultVal; - } } diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetType.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetType.java index ea5a79edb48c..dba6486b2410 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetType.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetType.java @@ -28,7 +28,6 @@ import org.ballerinalang.model.values.BMap; import org.ballerinalang.model.values.BString; import org.ballerinalang.model.values.BValue; -import org.ballerinalang.natives.annotations.Argument; import org.ballerinalang.natives.annotations.BallerinaFunction; import org.ballerinalang.natives.annotations.Receiver; @@ -46,13 +45,7 @@ type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS - ), - args = { - @Argument( - name = "key", - type = TypeKind.STRING - ) - } + ) ) public class GetType extends BlockingNativeCallableUnit {