Skip to content

Commit

Permalink
Populate the config in the Message Object appropriately
Browse files Browse the repository at this point in the history
The message object config for the received Message objects were not appropriately populated. This makes sure they are populated.
  • Loading branch information
riyafa committed Jun 3, 2019
1 parent 6f8507f commit 533381c
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import ballerina/crypto;
import ballerina/filepath;
import ballerina/io;
import ballerina/time;

# Constant for the artemis error code.
public const ARTEMIS_ERROR_CODE = "{ballerina/artemis}ArtemisError";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public type Message client object {
#
# + return - the `MessageConfiguration` of this message
public function getConfig() returns MessageConfiguration {
self.configuration.freeze();
return self.configuration;
}
};
Expand All @@ -108,12 +107,20 @@ public type Message client object {
# + priority - the message priority (between 0 and 9 inclusive)
# + durable - whether the created message is durable or not
# + routingType - `RoutingType` of the message
# + groupId - used to group messages so that the same consumer receives all the messages with a particular groupId
# + groupSequence - can use to specify a sequence within the group
# + correlationId - a header for associating the current message with some previous message or application-specific ID
# + replyTo - indicates which address a JMS consumer should reply to
public type MessageConfiguration record {|
int? expiration = ();
int? timeStamp = ();
int expiration = 0;
int timeStamp = time:currentTime().time;
byte priority = 0;
boolean durable = true;
RoutingType? routingType = ();
string? groupId = ();
int groupSequence = 0;
string? correlationId = ();
string? replyTo = ();
|};

# ActiveMQ Artemis message types.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public class ArtemisConstants {
// Field names for Consumer
public static final String FILTER = "filter";
public static final String AUTO_ACK = "autoAck";
public static final String BROWSE_ONLY = "browseOnly";
public static final String QUEUE_CONFIG = "queueConfig";
public static final String QUEUE_NAME = "queueName";
public static final String TEMPORARY = "temporary";
Expand All @@ -95,6 +94,7 @@ public class ArtemisConstants {
public static final String EXPIRATION = "expiration";
public static final String TIME_STAMP = "timeStamp";
public static final String PRIORITY = "priority";
public static final String MESSAGE_CONFIG = "configuration";

// Field names for Producer
public static final String RATE = "rate";
Expand All @@ -103,8 +103,13 @@ public class ArtemisConstants {
// Common field names
public static final String DURABLE = "durable";
public static final String ROUTING_TYPE = "routingType";
public static final String GROUP_ID = "groupId";
public static final String GROUP_SEQUENCE = "groupSequence";
public static final String CORRELATION_ID = "correlationId";
public static final String REPLY_TO = "replyTo";
public static final String AUTO_CREATED = "autoCreated";
static final String MULTICAST = "MULTICAST";
public static final String MULTICAST = "MULTICAST";
public static final String ANYCAST = "ANYCAST";

// Field name for Session
public static final String USERNAME = "username";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class ArtemisUtils {
* @param message the error message
* @param context the Ballerina context
* @param exception the exception to be propagated
* @param logger the logger to log errors
* @param logger the logger to log errors
*/
public static void throwException(String message, Context context, Exception exception, Logger logger) {
logger.error(message, exception);
Expand Down Expand Up @@ -262,6 +262,44 @@ public static BValue getBArrayValue(ClientMessage message) {
return new BValueArray(bytes);
}

public static void populateMessageObj(ClientMessage clientMessage, Object transactionContext,
BMap<String, BValue> messageObj) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> messageConfigObj = (BMap<String, BValue>) messageObj.get(ArtemisConstants.MESSAGE_CONFIG);
populateMessageConfigObj(clientMessage, messageConfigObj);

messageObj.addNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT, transactionContext);
messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, clientMessage);
}

private static void populateMessageConfigObj(ClientMessage clientMessage, BMap<String, BValue> messageConfigObj) {
messageConfigObj.put(ArtemisConstants.EXPIRATION, new BInteger(clientMessage.getExpiration()));
messageConfigObj.put(ArtemisConstants.TIME_STAMP, new BInteger(clientMessage.getTimestamp()));
messageConfigObj.put(ArtemisConstants.PRIORITY, new BByte(clientMessage.getPriority()));
messageConfigObj.put(ArtemisConstants.DURABLE, new BBoolean(clientMessage.isDurable()));
setRoutingTypeToConfig(messageConfigObj, clientMessage);
if (clientMessage.getGroupID() != null) {
messageConfigObj.put(ArtemisConstants.GROUP_ID, new BString(clientMessage.getGroupID().toString()));
}
messageConfigObj.put(ArtemisConstants.GROUP_SEQUENCE, new BInteger(clientMessage.getGroupSequence()));
if (clientMessage.getCorrelationID() != null) {
messageConfigObj.put(ArtemisConstants.CORRELATION_ID,
new BString(clientMessage.getCorrelationID().toString()));
}
if (clientMessage.getReplyTo() != null) {
messageConfigObj.put(ArtemisConstants.REPLY_TO, new BString(clientMessage.getReplyTo().toString()));
}
}

public static void setRoutingTypeToConfig(BMap<String, BValue> msgConfigObj, ClientMessage message) {
byte routingType = message.getType();
if (routingType == RoutingType.MULTICAST.getType()) {
msgConfigObj.put(ArtemisConstants.ROUTING_TYPE, new BString(ArtemisConstants.MULTICAST));
} else if (routingType == RoutingType.ANYCAST.getType()) {
msgConfigObj.put(ArtemisConstants.ROUTING_TYPE, new BString(ArtemisConstants.ANYCAST));
}
}

private ArtemisUtils() {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void onMessage(ClientMessage clientMessage) {
if (paramSize > 1) {
dispatchResourceWithDataBinding(clientMessage, paramDetails);
} else {
dispatchResource(clientMessage, createAndGetMessageBMap(onMessageResource, clientMessage, sessionObj));
dispatchResource(clientMessage, createAndGetMessageObj(onMessageResource, clientMessage, sessionObj));
}

}
Expand All @@ -94,12 +94,10 @@ private void dispatchResourceWithDataBinding(ClientMessage clientMessage, List<P
BValue[] bValues = new BValue[paramDetails.size()];
try {
bValues[1] = getContentForType(clientMessage, paramDetails.get(1).getVarType());
bValues[0] = createAndGetMessageBMap(onMessageResource, clientMessage, sessionObj);
bValues[0] = createAndGetMessageObj(onMessageResource, clientMessage, sessionObj);
dispatchResource(clientMessage, bValues);
} catch (BallerinaException ex) {
if (logger.isDebugEnabled()) {
logger.debug("The message received do not match the resource signature", ex);
}
logger.error("The message received do not match the resource signature", ex);
}
}

Expand All @@ -116,14 +114,14 @@ private void dispatchResource(ClientMessage clientMessage, BValue... bValues) {
}
}

private BValue createAndGetMessageBMap(Resource onMessageResource, ClientMessage clientMessage,
BMap<String, BValue> sessionObj) {
private BValue createAndGetMessageObj(Resource onMessageResource, ClientMessage clientMessage,
BMap<String, BValue> sessionObj) {
ProgramFile programFile = onMessageResource.getResourceInfo().getPackageInfo().getProgramFile();
BMap<String, BValue> messageObj = BLangConnectorSPIUtil.createBStruct(
programFile, ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS, ArtemisConstants.MESSAGE_OBJ);
messageObj.addNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT,
sessionObj.getNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT));
messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, clientMessage);
ArtemisUtils.populateMessageObj(clientMessage,
sessionObj.getNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT),
messageObj);
return messageObj;
}

Expand Down Expand Up @@ -158,7 +156,6 @@ private BValue getContentForType(ClientMessage message, BType dataType) {
default:
throw new ArtemisConnectorException(
"The content type of the message received does not match the resource signature type.");

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void execute(Context context) {
ClientMessage clientMessage = consumer.receive(timeInMilliSeconds);
BMap<String, BValue> messageObj = BLangConnectorSPIUtil.createBStruct(
context, ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS, ArtemisConstants.MESSAGE_OBJ);
messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, clientMessage);
ArtemisUtils.populateMessageObj(clientMessage, transactionContext, messageObj);
if (autoAck) {
clientMessage.acknowledge();
if (transactionContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.ballerinalang.natives.annotations.Receiver;
import org.ballerinalang.stdlib.io.channels.base.Channel;
import org.ballerinalang.stdlib.io.utils.IOConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

Expand Down Expand Up @@ -81,6 +83,7 @@
}
)
public class CreateMessage extends BlockingNativeCallableUnit {
private static final Logger logger = LoggerFactory.getLogger(CreateMessage.class);

@Override
public void execute(Context context) {
Expand All @@ -94,11 +97,15 @@ public void execute(Context context) {

@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> configObj = (BMap<String, BValue>) context.getRefArgument(3);
long expiration = getIntFromIntOrNil(configObj.get(ArtemisConstants.EXPIRATION), 0);
long timeStamp = getIntFromIntOrNil(configObj.get(ArtemisConstants.TIME_STAMP), System.currentTimeMillis());
long expiration = ((BInteger) configObj.get(ArtemisConstants.EXPIRATION)).intValue();
long timeStamp = ((BInteger) configObj.get(ArtemisConstants.TIME_STAMP)).intValue();
byte priority = (byte) ((BByte) configObj.get(ArtemisConstants.PRIORITY)).byteValue();
boolean durable = ((BBoolean) configObj.get(ArtemisConstants.DURABLE)).booleanValue();
BValue routingType = configObj.get(ArtemisConstants.ROUTING_TYPE);
BValue groupId = configObj.get(ArtemisConstants.GROUP_ID);
int groupSequence = ArtemisUtils.getIntFromConfig(configObj, ArtemisConstants.GROUP_SEQUENCE, logger);
BValue correlationId = configObj.get(ArtemisConstants.CORRELATION_ID);
BValue replyTo = configObj.get(ArtemisConstants.REPLY_TO);

ClientSession session = (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION);

Expand All @@ -107,7 +114,16 @@ public void execute(Context context) {
if (routingType instanceof BString) {
message.setRoutingType(ArtemisUtils.getRoutingTypeFromString(routingType.stringValue()));
}

if (groupId instanceof BString) {
message.setGroupID(groupId.stringValue());
}
message.setGroupSequence(groupSequence);
if (correlationId instanceof BString) {
message.setCorrelationID(correlationId.stringValue());
}
if (replyTo instanceof BString) {
message.setReplyTo(new SimpleString(replyTo.stringValue()));
}
if (messageType == Message.TEXT_TYPE) {
TextMessageUtil.writeBodyText(message.getBodyBuffer(), new SimpleString(dataVal.stringValue()));
} else if (messageType == Message.BYTES_TYPE) {
Expand Down Expand Up @@ -159,11 +175,4 @@ private byte getMessageType(String type) {
return Message.DEFAULT_TYPE;
}
}

private long getIntFromIntOrNil(BValue value, long defaultVal) {
if (value instanceof BInteger) {
return ((BInteger) value).intValue();
}
return defaultVal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BString;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

Expand All @@ -46,13 +45,7 @@
type = TypeKind.OBJECT,
structType = ArtemisConstants.MESSAGE_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS
),
args = {
@Argument(
name = "key",
type = TypeKind.STRING
)
}
)
)
public class GetType extends BlockingNativeCallableUnit {

Expand Down

0 comments on commit 533381c

Please sign in to comment.