diff --git a/examples/artemis-anycast-session-consumer/artemis_anycast_session_consumer.out b/examples/artemis-anycast-session-consumer/artemis_anycast_session_consumer.out index 608fe5ed1e27..ed26bab8ca85 100644 --- a/examples/artemis-anycast-session-consumer/artemis_anycast_session_consumer.out +++ b/examples/artemis-anycast-session-consumer/artemis_anycast_session_consumer.out @@ -1,7 +1,7 @@ # Make sure to have the ActiveMQ Artemis broker running. -# Navigate to the directory that contains the 'artemis_multicast_consumer.bal' file and issue the +# Navigate to the directory that contains the 'artemis_anycast_session_consumer.bal' file and execute the # 'ballerina run' command as follows. -$ ballerina run artemis_multicast_consumer.bal -# The ActiveMQ Artemis consumer runs as a Ballerina service and listens to the subscribed queue. +$ ballerina run artemis_anycast_session_consumer.bal +[ballerina/artemis] Client Consumer created for queue queue1 diff --git a/examples/artemis-anycast-session-producer/artemis_anycast_session_producer.out b/examples/artemis-anycast-session-producer/artemis_anycast_session_producer.out index 9020fe39bfee..f799b3b6ddef 100644 --- a/examples/artemis-anycast-session-producer/artemis_anycast_session_producer.out +++ b/examples/artemis-anycast-session-producer/artemis_anycast_session_producer.out @@ -1,5 +1,5 @@ # Make sure to have the ActiveMQ Artemis broker running. -# Navigate to the directory that contains the 'artemis_anycast_session_producer.bal' file and issue the +# Navigate to the directory that contains the 'artemis_anycast_session_producer.bal' file and execute the # 'ballerina run' command as follows. $ ballerina run artemis_anycast_session_producer.bal diff --git a/examples/artemis-data-binding/artemis_data_binding.bal b/examples/artemis-data-binding/artemis_data_binding.bal new file mode 100644 index 000000000000..3b535e9fd11b --- /dev/null +++ b/examples/artemis-data-binding/artemis_data_binding.bal @@ -0,0 +1,37 @@ +import ballerina/artemis; +import ballerina/http; +import ballerina/log; + +type Order record { + string description?; + int id; + float cost; +}; + +// Consumer listens to the queue (i.e., "my_queue") with the address +// (i.e., "my_address"). +@artemis:ServiceConfig { + queueConfig: { + queueName: "my_queue", + addressName: "my_address" + } +} +// Attaches the service to the listener. +service artemisConsumer on new artemis:Listener( + { host: "localhost", port: 61616 }) { + + // This resource is triggered when a valid `Order` is received. + resource function onMessage(artemis:Message message, Order orderDetails) + returns error? { + + // Posts order details to the backend and awaits response. + http:Client clientEP = new("http://www.mocky.io"); + var response = clientEP->post("/v2/5cde49ef3000005e004307f0", + untaint check json.convert(orderDetails)); + if (response is http:Response) { + log:printInfo(check response.getTextPayload()); + } else { + log:printError("Invalid response ", err = response); + } + } +} diff --git a/examples/artemis-data-binding/artemis_data_binding.description b/examples/artemis-data-binding/artemis_data_binding.description new file mode 100644 index 000000000000..fd2c18b720a6 --- /dev/null +++ b/examples/artemis-data-binding/artemis_data_binding.description @@ -0,0 +1,4 @@ +// Artemis data binding helps access payload through the last resource +// parameter. `string`, `json`, `xml`, `byte[]`, `record`, `map`, `map`, +// `map`, `map`, `map` and `map` are supported as +// parameter types. diff --git a/examples/artemis-data-binding/artemis_data_binding.out b/examples/artemis-data-binding/artemis_data_binding.out new file mode 100644 index 000000000000..acafcd3288a9 --- /dev/null +++ b/examples/artemis-data-binding/artemis_data_binding.out @@ -0,0 +1,6 @@ +# Make sure to have the ActiveMQ Artemis broker running. + +# Navigate to the directory that contains the 'artemis_data_binding.bal' file +# and execute the 'ballerina run' command as follows +$ ballerina run artemis_data_binding.bal +[ballerina/artemis] Client Consumer created for queue my_queue diff --git a/examples/artemis-multicast-consumer/artemis_multicast_consumer.out b/examples/artemis-multicast-consumer/artemis_multicast_consumer.out index 7c9a33f6e29b..ff35ec9c0ce4 100644 --- a/examples/artemis-multicast-consumer/artemis_multicast_consumer.out +++ b/examples/artemis-multicast-consumer/artemis_multicast_consumer.out @@ -1,7 +1,7 @@ # Make sure to have the ActiveMQ Artemis broker running. -# Navigate to the directory that contains the 'artemis_multicast_consumer.bal' file and issue the -# 'ballerina run' command. -$ ballerina run artemis_multicast_consumer.bal as follows. -# The ActiveMQ Artemis consumer runs as a Ballerina service and listens to the subscribed queue. +# Navigate to the directory that contains the 'artemis_multicast_consumer.bal' file and execute the +# 'ballerina run' command as follows. +$ ballerina run artemis_multicast_consumer.bal +[ballerina/artemis] Client Consumer created for queue my_queue diff --git a/examples/artemis-multicast-producer/artemis_multicast_producer.out b/examples/artemis-multicast-producer/artemis_multicast_producer.out index 1d32bc3773ba..cd482eb8488c 100644 --- a/examples/artemis-multicast-producer/artemis_multicast_producer.out +++ b/examples/artemis-multicast-producer/artemis_multicast_producer.out @@ -1,5 +1,5 @@ # Make sure to have the ActiveMQ Artemis broker running. -# Navigate to the directory that contains the 'artemis_multicast_producer.bal' file and issue the +# Navigate to the directory that contains the 'artemis_multicast_producer.bal' file and execute the # 'ballerina run' command as follows. $ ballerina run artemis_multicast_producer.bal diff --git a/examples/artemis-simple-transaction-consumer/artemis_simple_transaction_consumer.out b/examples/artemis-simple-transaction-consumer/artemis_simple_transaction_consumer.out index 5be31979f672..7cc6ff56ce09 100755 --- a/examples/artemis-simple-transaction-consumer/artemis_simple_transaction_consumer.out +++ b/examples/artemis-simple-transaction-consumer/artemis_simple_transaction_consumer.out @@ -1,9 +1,6 @@ # Make sure to have the ActiveMQ Artemis broker running. -# Navigate to the directory that contains the 'artemis_simple_transaction_consumer.bal' file and issue the +# Navigate to the directory that contains the 'artemis_simple_transaction_consumer.bal' file and execute the # 'ballerina run' command as follows. $ ballerina run --experimental artemis_simple_transaction_consumer.bal -# The ActiveMQ Artemis consumer runs as a Ballerina service and listens to the subscribed queue. - -[ballerina/http] started HTTP/WS endpoint 172.17.0.1:34985 -2019-04-18 15:19:32,475 INFO [ballerina/transactions] - Created transaction: 29787878-3686-41b3-9ab3-4bc60ad278f6 +2019-05-17 14:11:29,258 INFO [ballerina/transactions] - Created transaction: 78aabdba-3994-47cd-b083-80744961f69b diff --git a/examples/artemis-simple-transaction-producer/artemis_simple_transaction_producer.out b/examples/artemis-simple-transaction-producer/artemis_simple_transaction_producer.out index a65ce09b5a0e..3a3f8b92ada6 100755 --- a/examples/artemis-simple-transaction-producer/artemis_simple_transaction_producer.out +++ b/examples/artemis-simple-transaction-producer/artemis_simple_transaction_producer.out @@ -1,6 +1,6 @@ # Make sure to have the ActiveMQ Artemis broker running. -# Navigate to the directory that contains the 'artemis_simple_transaction_producer.bal' file and issue the +# Navigate to the directory that contains the 'artemis_simple_transaction_producer.bal' file and execute the # 'ballerina run' command as follows. $ ballerina run --experimental artemis_simple_transaction_producer.bal # The ActiveMQ Artemis producer runs as a Ballerina service and listens to the subscribed queue. diff --git a/examples/artemis-transaction-producer/artemis_transaction_producer.out b/examples/artemis-transaction-producer/artemis_transaction_producer.out index 75d6b1ea9265..d41f6674b586 100755 --- a/examples/artemis-transaction-producer/artemis_transaction_producer.out +++ b/examples/artemis-transaction-producer/artemis_transaction_producer.out @@ -1,6 +1,6 @@ # Make sure to have the ActiveMQ Artemis broker running. -# Navigate to the directory that contains the 'artemis_transaction_producer.bal' file and issue the +# Navigate to the directory that contains the 'artemis_transaction_producer.bal' file and execute the # 'ballerina run' command as follows. $ ballerina run --experimental artemis_transaction_producer.bal # The ActiveMQ Artemis producer runs as a Ballerina service and listens to the subscribed queue. diff --git a/examples/index.json b/examples/index.json index f805ee8355de..cd259b018e36 100644 --- a/examples/index.json +++ b/examples/index.json @@ -998,6 +998,10 @@ { "name": "Transaction Producer", "url": "artemis-transaction-producer" + }, + { + "name": "Data Binding", + "url": "artemis-data-binding" } ] }, diff --git a/stdlib/messaging/activemq-artemis/build.gradle b/stdlib/messaging/activemq-artemis/build.gradle index 77376a526d51..7bc096b9ca17 100644 --- a/stdlib/messaging/activemq-artemis/build.gradle +++ b/stdlib/messaging/activemq-artemis/build.gradle @@ -24,23 +24,27 @@ dependencies { // implementation project(':ballerina-lang') implementation project(':ballerina-io') implementation project(':ballerina-builtin') + implementation project(':ballerina-time') + implementation project(':ballerina-crypto') + implementation project(':ballerina-filepath') + implementation project(':ballerina-log-api') + implementation project(':ballerina-system') implementation project(':ballerina-runtime-api') implementation project(':ballerina-utils') implementation project(':ballerina-launcher') implementation 'org.apache.activemq:artemis-core-client' - baloImplementation project(path: ':ballerina-runtime-api', configuration: 'baloImplementation') - baloImplementation project(path: ':ballerina-utils', configuration: 'baloImplementation') - baloImplementation project(path: ':ballerina-builtin', configuration: 'baloImplementation') baloImplementation project(path: ':ballerina-io', configuration: 'baloImplementation') + baloImplementation project(path: ':ballerina-builtin', configuration: 'baloImplementation') baloImplementation project(path: ':ballerina-time', configuration: 'baloImplementation') baloImplementation project(path: ':ballerina-crypto', configuration: 'baloImplementation') baloImplementation project(path: ':ballerina-filepath', configuration: 'baloImplementation') - baloImplementation project(path: ':ballerina-system', configuration: 'baloImplementation') baloImplementation project(path: ':ballerina-log-api', configuration: 'baloImplementation') + baloImplementation project(path: ':ballerina-system', configuration: 'baloImplementation') + baloImplementation project(path: ':ballerina-runtime-api', configuration: 'baloImplementation') + baloImplementation project(path: ':ballerina-utils', configuration: 'baloImplementation') testCompile 'org.testng:testng' - } test { @@ -50,6 +54,11 @@ test { into "$buildDir/lib/repo/ballerina" } } + useTestNG() { + suites 'src/test/resources/testng.xml' + } + systemProperty "java.util.logging.config.file", "src/test/resources/logging.properties" + systemProperty "java.util.logging.manager", "org.ballerinalang.logging.BLogManager" } description = 'Ballerina - ActiveMQ Artemis' diff --git a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/artemis_commons.bal b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/artemis_commons.bal index 0528b54900c1..3f60569a3987 100644 --- a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/artemis_commons.bal +++ b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/artemis_commons.bal @@ -17,6 +17,7 @@ import ballerina/crypto; import ballerina/filepath; import ballerina/io; +import ballerina/time; # Constant for the artemis error code. public const ARTEMIS_ERROR_CODE = "{ballerina/artemis}ArtemisError"; diff --git a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/message.bal b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/message.bal index 369aa58ac871..3ffb9004dc37 100644 --- a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/message.bal +++ b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/message.bal @@ -107,12 +107,20 @@ public type Message client object { # + priority - the message priority (between 0 and 9 inclusive) # + durable - whether the created message is durable or not # + routingType - `RoutingType` of the message +# + groupId - used to group messages so that the same consumer receives all the messages with a particular groupId +# + groupSequence - can use to specify a sequence within the group +# + correlationId - a header for associating the current message with some previous message or application-specific ID +# + replyTo - indicates which address a JMS consumer should reply to public type MessageConfiguration record {| - int? expiration = (); - int? timeStamp = (); + int expiration = 0; + int timeStamp = time:currentTime().time; byte priority = 0; boolean durable = true; RoutingType? routingType = (); + string? groupId = (); + int groupSequence = 0; + string? correlationId = (); + string? replyTo = (); |}; # ActiveMQ Artemis message types. diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConnectorException.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConnectorException.java new file mode 100644 index 000000000000..dded3c5ded95 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConnectorException.java @@ -0,0 +1,90 @@ +/* +* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +* +* WSO2 Inc. licenses this file to you under the Apache License, +* Version 2.0 (the "License"); you may not use this file except +* in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.ballerinalang.messaging.artemis; + +import org.ballerinalang.bre.Context; +import org.ballerinalang.util.exceptions.BallerinaException; + +/** + * BallerinaException that could occur in Artemis connector. + * + * @since 0.995 + */ +public class ArtemisConnectorException extends BallerinaException { + private static final long serialVersionUID = 381055783364464822L; + + /** + * Constructs a new {@link ArtemisConnectorException} with the specified detail message. + * + * @param message Error Message + */ + public ArtemisConnectorException(String message) { + super(message); + } + + /** + * Constructs a new {@link ArtemisConnectorException} with error message and ballerina context. + * + * @param message Error message + * @param context Ballerina context + */ + public ArtemisConnectorException(String message, Context context) { + super(message, context); + } + + /** + * Constructs a new {@link ArtemisConnectorException} with the specified detail message and cause. + * + * @param message Error message + * @param cause Cause + */ + public ArtemisConnectorException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a new {@link ArtemisConnectorException} with the specified detail message, + * cause and ballerina context. + * + * @param message Error message + * @param cause Cause + * @param context Ballerina context + */ + public ArtemisConnectorException(String message, Throwable cause, Context context) { + super(message, cause, context); + } + + /** + * Constructs a new {@link ArtemisConnectorException} with the cause. + * + * @param cause Throwable to be wrap by {@link ArtemisConnectorException} + */ + public ArtemisConnectorException(Throwable cause) { + super(cause); + } + + /** + * Constructs a new {@link ArtemisConnectorException} with ballerina context. + * + * @param stack Ballerina context + */ + public ArtemisConnectorException(Context stack) { + super(stack); + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConstants.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConstants.java index d26e3b1a4d65..206e1ed00299 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConstants.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConstants.java @@ -34,7 +34,7 @@ public class ArtemisConstants { public static final String PROTOCOL_PACKAGE_ARTEMIS = BALLERINA_PACKAGE_PREFIX + ARTEMIS; // Error related constants - static final String ARTEMIS_ERROR_CODE = "{ballerina/artemis}ArtemisError"; + static final String ARTEMIS_ERROR_CODE = "{" + PROTOCOL_PACKAGE_ARTEMIS + "}ArtemisError"; static final String ARTEMIS_ERROR_RECORD = "ArtemisError"; static final String ARTEMIS_ERROR_MESSAGE = "message"; @@ -79,7 +79,6 @@ public class ArtemisConstants { // Field names for Consumer public static final String FILTER = "filter"; public static final String AUTO_ACK = "autoAck"; - public static final String BROWSE_ONLY = "browseOnly"; public static final String QUEUE_CONFIG = "queueConfig"; public static final String QUEUE_NAME = "queueName"; public static final String TEMPORARY = "temporary"; @@ -95,6 +94,7 @@ public class ArtemisConstants { public static final String EXPIRATION = "expiration"; public static final String TIME_STAMP = "timeStamp"; public static final String PRIORITY = "priority"; + public static final String MESSAGE_CONFIG = "configuration"; // Field names for Producer public static final String RATE = "rate"; @@ -103,8 +103,13 @@ public class ArtemisConstants { // Common field names public static final String DURABLE = "durable"; public static final String ROUTING_TYPE = "routingType"; + public static final String GROUP_ID = "groupId"; + public static final String GROUP_SEQUENCE = "groupSequence"; + public static final String CORRELATION_ID = "correlationId"; + public static final String REPLY_TO = "replyTo"; public static final String AUTO_CREATED = "autoCreated"; - static final String MULTICAST = "MULTICAST"; + public static final String MULTICAST = "MULTICAST"; + public static final String ANYCAST = "ANYCAST"; // Field name for Session public static final String USERNAME = "username"; diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisServiceCompilerPlugin.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisServiceCompilerPlugin.java index e3c821f5564c..f905fe258730 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisServiceCompilerPlugin.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisServiceCompilerPlugin.java @@ -22,8 +22,12 @@ import org.ballerinalang.model.tree.ServiceNode; import org.ballerinalang.util.diagnostic.Diagnostic; import org.ballerinalang.util.diagnostic.DiagnosticLog; +import org.wso2.ballerinalang.compiler.semantics.model.types.BArrayType; +import org.wso2.ballerinalang.compiler.semantics.model.types.BMapType; +import org.wso2.ballerinalang.compiler.semantics.model.types.BType; import org.wso2.ballerinalang.compiler.tree.BLangFunction; import org.wso2.ballerinalang.compiler.tree.BLangSimpleVariable; +import org.wso2.ballerinalang.compiler.util.TypeTags; import org.wso2.ballerinalang.util.AbstractTransportCompilerPlugin; import java.util.List; @@ -34,12 +38,18 @@ * @since 0.995.0 */ @SupportedResourceParamTypes( - expectedListenerType = @SupportedResourceParamTypes.Type(packageName = ArtemisConstants.ARTEMIS, - name = ArtemisConstants.LISTENER_OBJ), - paramTypes = {@SupportedResourceParamTypes.Type(packageName = ArtemisConstants.ARTEMIS, - name = ArtemisConstants.MESSAGE_OBJ)}) + expectedListenerType = @SupportedResourceParamTypes.Type( + packageName = ArtemisConstants.ARTEMIS, + name = ArtemisConstants.LISTENER_OBJ + ), + paramTypes = { + @SupportedResourceParamTypes.Type( + packageName = ArtemisConstants.ARTEMIS, + name = ArtemisConstants.MESSAGE_OBJ + )}) public class ArtemisServiceCompilerPlugin extends AbstractTransportCompilerPlugin { + private static final String INVALID_RESOURCE_SIGNATURE_FOR = "Invalid resource signature for "; private DiagnosticLog dlog = null; @Override @@ -77,16 +87,52 @@ private void validate(BLangFunction resource) { dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, "Invalid return type: expected error?"); } List paramDetails = resource.getParameters(); - if (paramDetails == null || paramDetails.size() != 1) { - dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, "Invalid resource signature for " - + resource.getName().getValue() + - " resource: Unexpected parameter count(expected parameter count = 1)"); + if (paramDetails == null || paramDetails.isEmpty() || paramDetails.size() > 2) { + dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, + INVALID_RESOURCE_SIGNATURE_FOR + resource.getName().getValue() + + " resource: Unexpected parameter count(expected parameter count 1 or 2)"); return; } if (!ArtemisConstants.MESSAGE_OBJ_FULL_NAME.equals(paramDetails.get(0).type.toString())) { - dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, "Invalid resource signature for " + dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, INVALID_RESOURCE_SIGNATURE_FOR + resource.getName().getValue() + " resource: The first parameter should be an artemis:Message"); } + if (paramDetails.size() == 2) { + validateSecondParam(resource, paramDetails); + } + } + + private void validateSecondParam(BLangFunction resource, List paramDetails) { + BType secondParamType = paramDetails.get(1).type; + int secondParamTypeTag = secondParamType.tag; + if (secondParamTypeTag != TypeTags.STRING && secondParamTypeTag != TypeTags.JSON && + secondParamTypeTag != TypeTags.XML && secondParamTypeTag != TypeTags.RECORD && + secondParamTypeTag != TypeTags.MAP && checkArrayType(secondParamType)) { + dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, INVALID_RESOURCE_SIGNATURE_FOR + + resource.getName().getValue() + " resource in service " + + ": The second parameter should be a string, json, xml, byte[], map or a record type"); + } + if (secondParamTypeTag == TypeTags.MAP && checkMapConstraint(secondParamType)) { + dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, INVALID_RESOURCE_SIGNATURE_FOR + + resource.getName().getValue() + " resource in service " + + ": The second parameter should be a map of string, int, float, byte, boolean or byte[]"); + } + } + + private boolean checkArrayType(BType secondParamType) { + return secondParamType.tag != TypeTags.ARRAY || (secondParamType instanceof BArrayType && + ((BArrayType) secondParamType).getElementType().tag != org.ballerinalang.model.types.TypeTags.BYTE_TAG); + } + + private boolean checkMapConstraint(BType paramType) { + if (paramType instanceof BMapType) { + BType constraintType = ((BMapType) paramType).constraint; + int constraintTypeTag = constraintType.tag; + return constraintTypeTag != TypeTags.STRING && constraintTypeTag != TypeTags.INT && + constraintTypeTag != TypeTags.FLOAT && constraintTypeTag != TypeTags.BYTE && + constraintTypeTag != TypeTags.BOOLEAN && checkArrayType(constraintType); + } + return false; } } diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisTransactionContext.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisTransactionContext.java index 48b7b03d8215..6f3beacdafbd 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisTransactionContext.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisTransactionContext.java @@ -84,14 +84,15 @@ public void handleTransactionBlock(Context context, String objectType) { session.commit(); return; } catch (ActiveMQException e) { - context.setError(ArtemisUtils.getError(context, e)); + throw new ArtemisConnectorException("Session commit failed: " + e.getMessage(), e, context); } } } else { if (!context.isInTransaction()) { - context.setError(ArtemisUtils.getError(context, "The Session used by the Artemis " + objectType + - " object is transacted. Hence " + objectType + - " transacted actions cannot be used outside a transaction block")); + throw new ArtemisConnectorException("The Session used by the Artemis " + objectType + + " object is transacted. Hence " + objectType + + " transacted actions cannot be used outside a transaction" + + " block", context); } } TransactionLocalContext transactionLocalContext = context.getLocalTransactionInfo(); diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisUtils.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisUtils.java index 074e5fdac768..fbad311964cb 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisUtils.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisUtils.java @@ -19,11 +19,14 @@ package org.ballerinalang.messaging.artemis; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.reader.BytesMessageUtil; import org.ballerinalang.bre.Context; import org.ballerinalang.bre.bvm.BLangVMErrors; import org.ballerinalang.connector.api.BLangConnectorSPIUtil; @@ -53,11 +56,11 @@ public class ArtemisUtils { * @param message the error message * @param context the Ballerina context * @param exception the exception to be propagated - * @param logger the logger to log errors + * @param logger the logger to log errors */ - public static void logAndSetError(String message, Context context, Exception exception, Logger logger) { + public static void throwException(String message, Context context, Exception exception, Logger logger) { logger.error(message, exception); - context.setError(getError(context, message)); + throw new ArtemisConnectorException(message, exception, context); } /** @@ -252,6 +255,51 @@ public static boolean isAnonymousSession(BMap sessionObj) { return ((BBoolean) sessionObj.get("anonymousSession")).booleanValue(); } + public static BValue getBArrayValue(ClientMessage message) { + ActiveMQBuffer msgBuffer = message.getBodyBuffer(); + byte[] bytes = new byte[msgBuffer.readableBytes()]; + BytesMessageUtil.bytesReadBytes(msgBuffer, bytes); + return new BValueArray(bytes); + } + + public static void populateMessageObj(ClientMessage clientMessage, Object transactionContext, + BMap messageObj) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap messageConfigObj = (BMap) messageObj.get(ArtemisConstants.MESSAGE_CONFIG); + populateMessageConfigObj(clientMessage, messageConfigObj); + + messageObj.addNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT, transactionContext); + messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, clientMessage); + } + + private static void populateMessageConfigObj(ClientMessage clientMessage, BMap messageConfigObj) { + messageConfigObj.put(ArtemisConstants.EXPIRATION, new BInteger(clientMessage.getExpiration())); + messageConfigObj.put(ArtemisConstants.TIME_STAMP, new BInteger(clientMessage.getTimestamp())); + messageConfigObj.put(ArtemisConstants.PRIORITY, new BByte(clientMessage.getPriority())); + messageConfigObj.put(ArtemisConstants.DURABLE, new BBoolean(clientMessage.isDurable())); + setRoutingTypeToConfig(messageConfigObj, clientMessage); + if (clientMessage.getGroupID() != null) { + messageConfigObj.put(ArtemisConstants.GROUP_ID, new BString(clientMessage.getGroupID().toString())); + } + messageConfigObj.put(ArtemisConstants.GROUP_SEQUENCE, new BInteger(clientMessage.getGroupSequence())); + if (clientMessage.getCorrelationID() != null) { + messageConfigObj.put(ArtemisConstants.CORRELATION_ID, + new BString(clientMessage.getCorrelationID().toString())); + } + if (clientMessage.getReplyTo() != null) { + messageConfigObj.put(ArtemisConstants.REPLY_TO, new BString(clientMessage.getReplyTo().toString())); + } + } + + public static void setRoutingTypeToConfig(BMap msgConfigObj, ClientMessage message) { + byte routingType = message.getType(); + if (routingType == RoutingType.MULTICAST.getType()) { + msgConfigObj.put(ArtemisConstants.ROUTING_TYPE, new BString(ArtemisConstants.MULTICAST)); + } else if (routingType == RoutingType.ANYCAST.getType()) { + msgConfigObj.put(ArtemisConstants.ROUTING_TYPE, new BString(ArtemisConstants.ANYCAST)); + } + } + private ArtemisUtils() { } diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/CreateConnection.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/CreateConnection.java index 2193296f1b00..ea6cfb07f775 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/CreateConnection.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/CreateConnection.java @@ -105,7 +105,7 @@ public void execute(Context context) { connection.addNativeData(ArtemisConstants.ARTEMIS_SESSION_FACTORY, factory); } catch (Exception e) { //catching Exception as it is thrown by the createSessionFactory method - ArtemisUtils.logAndSetError("Error occurred while starting connection.", context, e, logger); + ArtemisUtils.throwException("Error occurred while starting connection.", context, e, logger); } } } diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/ArtemisMessageHandler.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/ArtemisMessageHandler.java new file mode 100644 index 000000000000..d61be787a9a1 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/ArtemisMessageHandler.java @@ -0,0 +1,273 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.consumer; + + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.MessageHandler; +import org.apache.activemq.artemis.reader.MapMessageUtil; +import org.apache.activemq.artemis.reader.TextMessageUtil; +import org.ballerinalang.connector.api.BLangConnectorSPIUtil; +import org.ballerinalang.connector.api.Executor; +import org.ballerinalang.connector.api.ParamDetail; +import org.ballerinalang.connector.api.Resource; +import org.ballerinalang.messaging.artemis.ArtemisConnectorException; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.types.BArrayType; +import org.ballerinalang.model.types.BMapType; +import org.ballerinalang.model.types.BStructureType; +import org.ballerinalang.model.types.BType; +import org.ballerinalang.model.types.BTypes; +import org.ballerinalang.model.types.TypeTags; +import org.ballerinalang.model.util.JSONUtils; +import org.ballerinalang.model.util.JsonParser; +import org.ballerinalang.model.util.XMLNodeType; +import org.ballerinalang.model.util.XMLUtils; +import org.ballerinalang.model.values.BBoolean; +import org.ballerinalang.model.values.BByte; +import org.ballerinalang.model.values.BFloat; +import org.ballerinalang.model.values.BInteger; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BString; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.model.values.BValueArray; +import org.ballerinalang.model.values.BXML; +import org.ballerinalang.util.codegen.ProgramFile; +import org.ballerinalang.util.exceptions.BallerinaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +/** + * Handler for Artemis messages. + * + * @since 0.995 + */ +class ArtemisMessageHandler implements MessageHandler { + private static final Logger logger = LoggerFactory.getLogger(ArtemisMessageHandler.class); + + private Resource onMessageResource; + private BMap sessionObj; + private boolean autoAck; + + ArtemisMessageHandler(Resource onMessageResource, BMap sessionObj, boolean autoAck) { + this.onMessageResource = onMessageResource; + this.sessionObj = sessionObj; + this.autoAck = autoAck; + } + + @Override + public void onMessage(ClientMessage clientMessage) { + List paramDetails = onMessageResource.getParamDetails(); + int paramSize = paramDetails.size(); + if (paramSize > 1) { + dispatchResourceWithDataBinding(clientMessage, paramDetails); + } else { + dispatchResource(clientMessage, createAndGetMessageObj(onMessageResource, clientMessage, sessionObj)); + } + + } + + private void dispatchResourceWithDataBinding(ClientMessage clientMessage, List paramDetails) { + BValue[] bValues = new BValue[paramDetails.size()]; + try { + bValues[1] = getContentForType(clientMessage, paramDetails.get(1).getVarType()); + bValues[0] = createAndGetMessageObj(onMessageResource, clientMessage, sessionObj); + dispatchResource(clientMessage, bValues); + } catch (BallerinaException ex) { + logger.error("The message received do not match the resource signature", ex); + } + } + + private void dispatchResource(ClientMessage clientMessage, BValue... bValues) { + // A CountDownLatch is used to prevent multiple resources executing in parallel and hence preventing the use + // of the same session in multiple threads concurrently (Error AMQ212051). + CountDownLatch countDownLatch = new CountDownLatch(1); + Executor.submit(onMessageResource, new ArtemisResourceCallback(clientMessage, autoAck, sessionObj, + countDownLatch), null, null, bValues); + try { + countDownLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private BValue createAndGetMessageObj(Resource onMessageResource, ClientMessage clientMessage, + BMap sessionObj) { + ProgramFile programFile = onMessageResource.getResourceInfo().getPackageInfo().getProgramFile(); + BMap messageObj = BLangConnectorSPIUtil.createBStruct( + programFile, ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS, ArtemisConstants.MESSAGE_OBJ); + ArtemisUtils.populateMessageObj(clientMessage, + sessionObj.getNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT), + messageObj); + return messageObj; + } + + private BValue getContentForType(ClientMessage message, BType dataType) { + int dataTypeTag = dataType.getTag(); + byte messageType = message.getType(); + switch (dataTypeTag) { + case TypeTags.STRING_TAG: + return new BString(getBodyText(message, messageType)); + case TypeTags.JSON_TAG: + return JsonParser.parse(getBodyText(message, messageType)); + case TypeTags.XML_TAG: + BXML bxml = XMLUtils.parse(getBodyText(message, messageType)); + if (bxml.getNodeType() != XMLNodeType.ELEMENT) { + throw new ArtemisConnectorException("Invalid XML data"); + } + return bxml; + case TypeTags.RECORD_TYPE_TAG: + return JSONUtils.convertJSONToStruct(JsonParser.parse(getBodyText(message, messageType)), + (BStructureType) dataType); + case TypeTags.ARRAY_TAG: + if (((BArrayType) dataType).getElementType().getTag() == TypeTags.BYTE_TAG) { + validateBytesContentType(messageType); + return ArtemisUtils.getBArrayValue(message); + } else { + throw new ArtemisConnectorException("Only byte[] is supported."); + } + case TypeTags.MAP_TAG: + validateMapContentType(messageType); + int constrainedType = ((BMapType) dataType).getConstrainedType().getTag(); + return createAndGetMapContent(message, constrainedType); + default: + throw new ArtemisConnectorException( + "The content type of the message received does not match the resource signature type."); + } + } + + private BValue createAndGetMapContent(ClientMessage message, int constrainedType) { + Map map; + BMap mapObj; + switch (constrainedType) { + case TypeTags.STRING_TAG: + map = getStringObjectMap(message); + mapObj = new BMap<>(new BMapType(BTypes.typeString)); + for (Map.Entry entry : map.entrySet()) { + Object value = entry.getValue(); + if (value instanceof String) { + mapObj.put(entry.getKey(), new BString((String) value)); + } else { + throw new ArtemisConnectorException("The map has other than string data"); + } + } + return mapObj; + case TypeTags.INT_TAG: + map = getStringObjectMap(message); + mapObj = new BMap<>(new BMapType(BTypes.typeInt)); + for (Map.Entry entry : map.entrySet()) { + Object value = entry.getValue(); + if (value instanceof Integer || value instanceof Long || value instanceof Short) { + mapObj.put(entry.getKey(), new BInteger((long) value)); + } else { + throw new ArtemisConnectorException("The map has other than int data"); + } + } + return mapObj; + case TypeTags.FLOAT_TAG: + map = getStringObjectMap(message); + mapObj = new BMap<>(new BMapType(BTypes.typeFloat)); + for (Map.Entry entry : map.entrySet()) { + Object value = entry.getValue(); + if (value instanceof Float || value instanceof Double) { + mapObj.put(entry.getKey(), new BFloat((double) value)); + } else { + throw new ArtemisConnectorException("The map has other than float data"); + } + } + return mapObj; + case TypeTags.BYTE_TAG: + map = getStringObjectMap(message); + mapObj = new BMap<>(new BMapType(BTypes.typeByte)); + for (Map.Entry entry : map.entrySet()) { + Object value = entry.getValue(); + if (value instanceof Byte) { + mapObj.put(entry.getKey(), new BByte((byte) value)); + } else { + throw new ArtemisConnectorException("The map has other than byte data"); + } + } + return mapObj; + case TypeTags.ARRAY_TAG: + map = getStringObjectMap(message); + mapObj = new BMap<>(new BMapType(BTypes.fromString("byte[]"))); + for (Map.Entry entry : map.entrySet()) { + Object value = entry.getValue(); + if (value instanceof byte[]) { + mapObj.put(entry.getKey(), new BValueArray((byte[]) value)); + } else { + throw new ArtemisConnectorException("The map has other than byte[] data"); + } + } + return mapObj; + case TypeTags.BOOLEAN_TAG: + map = getStringObjectMap(message); + mapObj = new BMap<>(new BMapType(BTypes.typeBoolean)); + for (Map.Entry entry : map.entrySet()) { + Object value = entry.getValue(); + if (value instanceof Boolean) { + mapObj.put(entry.getKey(), new BBoolean((boolean) value)); + } else { + throw new ArtemisConnectorException("The map has other than boolean data"); + } + } + return mapObj; + default: + throw new ArtemisConnectorException( + "The content type of the message received does not match the provided type."); + } + } + + private Map getStringObjectMap(ClientMessage message) { + return MapMessageUtil.readBodyMap(message.getBodyBuffer()).getMap(); + } + + private String getBodyText(ClientMessage message, byte messageType) { + validateTextContentType(messageType); + return TextMessageUtil.readBodyText(message.getBodyBuffer()).toString(); + } + + private void validateTextContentType(byte messageType) { + if (messageType != Message.TEXT_TYPE) { + throw new ArtemisConnectorException( + "Invalid resource signature. Message received does not have text content"); + } + } + + private void validateMapContentType(byte messageType) { + if (messageType != Message.MAP_TYPE) { + throw new ArtemisConnectorException( + "Invalid resource signature. Message received does not have map content"); + } + } + + private void validateBytesContentType(byte messageType) { + if (messageType != Message.BYTES_TYPE && messageType != Message.DEFAULT_TYPE) { + throw new ArtemisConnectorException( + "Invalid resource signature. Message received does not have map content"); + } + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/ArtemisResourceCallback.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/ArtemisResourceCallback.java new file mode 100644 index 000000000000..d660e33d20d4 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/ArtemisResourceCallback.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.consumer; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.ballerinalang.bre.bvm.BLangVMErrors; +import org.ballerinalang.bre.bvm.CallableUnitCallback; +import org.ballerinalang.messaging.artemis.ArtemisConnectorException; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.values.BError; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.services.ErrorHandlerUtils; + +import java.util.concurrent.CountDownLatch; + +/** + * The resource call back implementation for Artemis async consumer. + * + * @since 0.995 + */ +public class ArtemisResourceCallback implements CallableUnitCallback { + private ClientMessage message; + private boolean autoAck; + private BMap sessionObj; + private CountDownLatch countDownLatch; + + ArtemisResourceCallback(ClientMessage message, boolean autoAck, BMap sessionObj, + CountDownLatch countDownLatch) { + this.message = message; + this.autoAck = autoAck; + this.sessionObj = sessionObj; + this.countDownLatch = countDownLatch; + } + + @Override + public void notifySuccess() { + try { + if (autoAck) { + try { + message.acknowledge(); + if (ArtemisUtils.isAnonymousSession(sessionObj)) { + ((ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION)).commit(); + } + } catch (ActiveMQException e) { + throw new ArtemisConnectorException("Failure during acknowledging the message", e); + } + } + } finally { + countDownLatch.countDown(); + } + } + + @Override + public void notifyFailure(BError error) { + countDownLatch.countDown(); + ErrorHandlerUtils.printError("error: " + BLangVMErrors.getPrintableStackTrace(error)); + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/CreateConsumer.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/CreateConsumer.java index b0f9d64b6abd..5c5b6287b0e0 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/CreateConsumer.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/CreateConsumer.java @@ -42,10 +42,14 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "createConsumer", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.CONSUMER_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS) + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.CONSUMER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ) ) public class CreateConsumer extends BlockingNativeCallableUnit { private static final Logger logger = LoggerFactory.getLogger(CreateConsumer.class); diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/CreateServiceConsumer.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/CreateServiceConsumer.java index fe951e3de1fa..2192a4b4628b 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/CreateServiceConsumer.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/CreateServiceConsumer.java @@ -22,16 +22,11 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; -import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.ballerinalang.bre.Context; -import org.ballerinalang.bre.bvm.BLangVMErrors; import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; -import org.ballerinalang.bre.bvm.CallableUnitCallback; import org.ballerinalang.connector.api.Annotation; import org.ballerinalang.connector.api.BLangConnectorSPIUtil; -import org.ballerinalang.connector.api.BallerinaConnectorException; -import org.ballerinalang.connector.api.Executor; import org.ballerinalang.connector.api.Resource; import org.ballerinalang.connector.api.Service; import org.ballerinalang.connector.api.Struct; @@ -39,16 +34,14 @@ import org.ballerinalang.messaging.artemis.ArtemisConstants; import org.ballerinalang.messaging.artemis.ArtemisUtils; import org.ballerinalang.model.types.TypeKind; -import org.ballerinalang.model.values.BError; import org.ballerinalang.model.values.BMap; import org.ballerinalang.model.values.BValue; import org.ballerinalang.natives.annotations.BallerinaFunction; import org.ballerinalang.natives.annotations.Receiver; -import org.ballerinalang.services.ErrorHandlerUtils; -import org.ballerinalang.util.codegen.ProgramFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.PrintStream; import java.util.List; import java.util.Map; @@ -59,13 +52,18 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "createConsumer", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.LISTENER_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS) + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.LISTENER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ) ) public class CreateServiceConsumer extends BlockingNativeCallableUnit { private static final Logger logger = LoggerFactory.getLogger(CreateServiceConsumer.class); + private static final PrintStream console = System.out; @Override public void execute(Context context) { @@ -101,14 +99,12 @@ public void execute(Context context) { addressName, autoCreated, routingType, temporary, queueFilter, durable, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, logger); + console.println("[" + ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + "] Client Consumer created for queue " + + queueName); Resource onMessageResource = service.getResources()[0]; if (onMessageResource != null) { - consumer.setMessageHandler( - clientMessage -> Executor - .execute(onMessageResource, new ResponseCallback(clientMessage, autoAck, sessionObj), - null, null, - getSignatureParameters(onMessageResource, clientMessage, sessionObj))); + consumer.setMessageHandler(new ArtemisMessageHandler(onMessageResource, sessionObj, autoAck)); } } catch (ActiveMQException e) { context.setReturnValues(ArtemisUtils.getError(context, e)); @@ -134,46 +130,4 @@ private Annotation getServiceConfigAnnotation(Service service) { } return annotationList.isEmpty() ? null : annotationList.get(0); } - - private static class ResponseCallback implements CallableUnitCallback { - private ClientMessage message; - private boolean autoAck; - private BMap sessionObj; - - ResponseCallback(ClientMessage message, boolean autoAck, BMap sessionObj) { - this.message = message; - this.autoAck = autoAck; - this.sessionObj = sessionObj; - } - - @Override - public void notifySuccess() { - if (autoAck) { - try { - message.acknowledge(); - if (ArtemisUtils.isAnonymousSession(sessionObj)) { - ((ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION)).commit(); - } - } catch (ActiveMQException e) { - throw new BallerinaConnectorException("Failure during acknowledging the message", e); - } - } - } - - @Override - public void notifyFailure(BError error) { - ErrorHandlerUtils.printError("error: " + BLangVMErrors.getPrintableStackTrace(error)); - } - } - - private BValue getSignatureParameters(Resource onMessageResource, ClientMessage clientMessage, - BMap sessionObj) { - ProgramFile programFile = onMessageResource.getResourceInfo().getPackageInfo().getProgramFile(); - BMap messageObj = BLangConnectorSPIUtil.createBStruct( - programFile, ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS, ArtemisConstants.MESSAGE_OBJ); - messageObj.addNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT, - sessionObj.getNativeData(ArtemisConstants.ARTEMIS_TRANSACTION_CONTEXT)); - messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, clientMessage); - return messageObj; - } } diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Receive.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Receive.java index 83590fcd79b5..188280470ad3 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Receive.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Receive.java @@ -73,7 +73,7 @@ public void execute(Context context) { ClientMessage clientMessage = consumer.receive(timeInMilliSeconds); BMap messageObj = BLangConnectorSPIUtil.createBStruct( context, ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS, ArtemisConstants.MESSAGE_OBJ); - messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, clientMessage); + ArtemisUtils.populateMessageObj(clientMessage, transactionContext, messageObj); if (autoAck) { clientMessage.acknowledge(); if (transactionContext != null) { diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Start.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Start.java index 21b5edfd3d31..32eb52fea31c 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Start.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Start.java @@ -40,10 +40,14 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "start", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.LISTENER_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS) + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.LISTENER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ) ) public class Start extends BlockingNativeCallableUnit { private CountDownLatch countDownLatch = new CountDownLatch(1); diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Stop.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Stop.java index 025cfcdc0891..a97dbce44475 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Stop.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Stop.java @@ -40,10 +40,14 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "stop", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.LISTENER_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS) + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.LISTENER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ) ) public class Stop extends BlockingNativeCallableUnit { diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/CreateMessage.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/CreateMessage.java index ea1fe095f466..a6163ecae1a8 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/CreateMessage.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/CreateMessage.java @@ -45,6 +45,8 @@ import org.ballerinalang.natives.annotations.Receiver; import org.ballerinalang.stdlib.io.channels.base.Channel; import org.ballerinalang.stdlib.io.utils.IOConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; @@ -55,17 +57,33 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "createMessage", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ), args = { - @Argument(name = "session", type = TypeKind.OBJECT, structType = ArtemisConstants.SESSION_OBJ), - @Argument(name = "data", type = TypeKind.UNION), - @Argument(name = "config", type = TypeKind.RECORD, structType = "ConnectionConfiguration") + @Argument( + name = "session", + type = TypeKind.OBJECT, + structType = ArtemisConstants.SESSION_OBJ + ), + @Argument( + name = "data", + type = TypeKind.UNION + ), + @Argument( + name = "config", + type = TypeKind.RECORD, + structType = "ConnectionConfiguration" + ) } ) public class CreateMessage extends BlockingNativeCallableUnit { + private static final Logger logger = LoggerFactory.getLogger(CreateMessage.class); @Override public void execute(Context context) { @@ -79,11 +97,15 @@ public void execute(Context context) { @SuppressWarnings(ArtemisConstants.UNCHECKED) BMap configObj = (BMap) context.getRefArgument(3); - long expiration = getIntFromIntOrNil(configObj.get(ArtemisConstants.EXPIRATION), 0); - long timeStamp = getIntFromIntOrNil(configObj.get(ArtemisConstants.TIME_STAMP), System.currentTimeMillis()); + long expiration = ((BInteger) configObj.get(ArtemisConstants.EXPIRATION)).intValue(); + long timeStamp = ((BInteger) configObj.get(ArtemisConstants.TIME_STAMP)).intValue(); byte priority = (byte) ((BByte) configObj.get(ArtemisConstants.PRIORITY)).byteValue(); boolean durable = ((BBoolean) configObj.get(ArtemisConstants.DURABLE)).booleanValue(); BValue routingType = configObj.get(ArtemisConstants.ROUTING_TYPE); + BValue groupId = configObj.get(ArtemisConstants.GROUP_ID); + int groupSequence = ArtemisUtils.getIntFromConfig(configObj, ArtemisConstants.GROUP_SEQUENCE, logger); + BValue correlationId = configObj.get(ArtemisConstants.CORRELATION_ID); + BValue replyTo = configObj.get(ArtemisConstants.REPLY_TO); ClientSession session = (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION); @@ -92,7 +114,16 @@ public void execute(Context context) { if (routingType instanceof BString) { message.setRoutingType(ArtemisUtils.getRoutingTypeFromString(routingType.stringValue())); } - + if (groupId instanceof BString) { + message.setGroupID(groupId.stringValue()); + } + message.setGroupSequence(groupSequence); + if (correlationId instanceof BString) { + message.setCorrelationID(correlationId.stringValue()); + } + if (replyTo instanceof BString) { + message.setReplyTo(new SimpleString(replyTo.stringValue())); + } if (messageType == Message.TEXT_TYPE) { TextMessageUtil.writeBodyText(message.getBodyBuffer(), new SimpleString(dataVal.stringValue())); } else if (messageType == Message.BYTES_TYPE) { @@ -144,11 +175,4 @@ private byte getMessageType(String type) { return Message.DEFAULT_TYPE; } } - - private long getIntFromIntOrNil(BValue value, long defaultVal) { - if (value instanceof BInteger) { - return ((BInteger) value).intValue(); - } - return defaultVal; - } } diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetBodySize.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetBodySize.java index a4fc0da430b2..e04d395db19a 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetBodySize.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetBodySize.java @@ -37,10 +37,14 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "getBodySize", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ), isPublic = true ) public class GetBodySize extends BlockingNativeCallableUnit { diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetPayload.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetPayload.java index 00ee4d3d030c..166d5da89d7c 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetPayload.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetPayload.java @@ -22,7 +22,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.client.ClientMessage; -import org.apache.activemq.artemis.reader.BytesMessageUtil; import org.apache.activemq.artemis.reader.MapMessageUtil; import org.apache.activemq.artemis.reader.TextMessageUtil; import org.apache.activemq.artemis.utils.collections.TypedProperties; @@ -36,7 +35,6 @@ import org.ballerinalang.model.values.BMap; import org.ballerinalang.model.values.BString; import org.ballerinalang.model.values.BValue; -import org.ballerinalang.model.values.BValueArray; import org.ballerinalang.natives.annotations.Argument; import org.ballerinalang.natives.annotations.BallerinaFunction; import org.ballerinalang.natives.annotations.Receiver; @@ -50,12 +48,19 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "getPayload", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ), args = { - @Argument(name = "key", type = TypeKind.STRING) + @Argument( + name = "key", + type = TypeKind.STRING + ) } ) public class GetPayload extends BlockingNativeCallableUnit { @@ -70,10 +75,7 @@ public void execute(Context context) { ActiveMQBuffer msgBuffer = message.getBodyBuffer(); context.setReturnValues(new BString(TextMessageUtil.readBodyText(msgBuffer).toString())); } else if (messageType == Message.BYTES_TYPE || messageType == Message.DEFAULT_TYPE) { - ActiveMQBuffer msgBuffer = message.getBodyBuffer(); - byte[] bytes = new byte[msgBuffer.readableBytes()]; - BytesMessageUtil.bytesReadBytes(msgBuffer, bytes); - context.setReturnValues(new BValueArray(bytes)); + context.setReturnValues(ArtemisUtils.getBArrayValue(message)); } else if (messageType == Message.MAP_TYPE) { ActiveMQBuffer msgBuffer = message.getBodyBuffer(); TypedProperties properties = MapMessageUtil.readBodyMap(msgBuffer); diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetProperty.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetProperty.java index 6d09d269126e..4aa6723b59d8 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetProperty.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetProperty.java @@ -38,12 +38,19 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "getProperty", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ), args = { - @Argument(name = "key", type = TypeKind.STRING) + @Argument( + name = "key", + type = TypeKind.STRING + ) } ) public class GetProperty extends BlockingNativeCallableUnit { diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetType.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetType.java index 998e9384901d..dba6486b2410 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetType.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetType.java @@ -28,7 +28,6 @@ import org.ballerinalang.model.values.BMap; import org.ballerinalang.model.values.BString; import org.ballerinalang.model.values.BValue; -import org.ballerinalang.natives.annotations.Argument; import org.ballerinalang.natives.annotations.BallerinaFunction; import org.ballerinalang.natives.annotations.Receiver; @@ -39,13 +38,14 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "getType", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), - args = { - @Argument(name = "key", type = TypeKind.STRING) - } + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ) ) public class GetType extends BlockingNativeCallableUnit { diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/PutProperty.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/PutProperty.java index 057492b3d594..d884655f28c0 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/PutProperty.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/PutProperty.java @@ -44,13 +44,23 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "putProperty", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ), args = { - @Argument(name = "key", type = TypeKind.STRING), - @Argument(name = "value", type = TypeKind.UNION) + @Argument( + name = "key", + type = TypeKind.STRING + ), + @Argument( + name = "value", + type = TypeKind.UNION + ) } ) public class PutProperty extends BlockingNativeCallableUnit { diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/SaveToWritableByteChannel.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/SaveToWritableByteChannel.java index 9adf477ca933..8b081dd2a7e0 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/SaveToWritableByteChannel.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/SaveToWritableByteChannel.java @@ -44,12 +44,20 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "saveToWritableByteChannel", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ), args = { - @Argument(name = "ch", type = TypeKind.OBJECT, structType = "WritableByteChannel") + @Argument( + name = "ch", + type = TypeKind.OBJECT, + structType = "WritableByteChannel" + ) } ) public class SaveToWritableByteChannel extends BlockingNativeCallableUnit { diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/CreateProducer.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/CreateProducer.java index 202be66e1af6..846df18ba236 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/CreateProducer.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/CreateProducer.java @@ -45,14 +45,28 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "createProducer", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.PRODUCER_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.PRODUCER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ), args = { - @Argument(name = "addressName", type = TypeKind.STRING), - @Argument(name = "config", type = TypeKind.RECORD, structType = "AddressConfiguration"), - @Argument(name = "rate", type = TypeKind.INT) + @Argument( + name = "addressName", + type = TypeKind.STRING + ), + @Argument( + name = "config", + type = TypeKind.RECORD, + structType = "AddressConfiguration" + ), + @Argument( + name = "rate", + type = TypeKind.INT + ) } ) public class CreateProducer implements NativeCallableUnit { @@ -92,7 +106,7 @@ public void execute(Context context, CallableUnitCallback callableUnitCallback) producerObj.addNativeData(ArtemisConstants.ARTEMIS_PRODUCER, producer); } catch (ActiveMQException ex) { - ArtemisUtils.logAndSetError("Error occurred while creating the producer.", context, ex, logger); + ArtemisUtils.throwException("Error occurred while creating the producer.", context, ex, logger); } } diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/IsClosed.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/IsClosed.java index a60dcea7fe0c..f3ce81887d89 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/IsClosed.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/IsClosed.java @@ -37,10 +37,14 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "isClosed", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.PRODUCER_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.PRODUCER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ), isPublic = true ) public class IsClosed extends BlockingNativeCallableUnit { diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/CreateSession.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/CreateSession.java index 6116938e9a99..165f8068f64a 100644 --- a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/CreateSession.java +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/CreateSession.java @@ -46,13 +46,25 @@ */ @BallerinaFunction( - orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + orgName = ArtemisConstants.BALLERINA, + packageName = ArtemisConstants.ARTEMIS, functionName = "createSession", - receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.SESSION_OBJ, - structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + receiver = @Receiver( + type = TypeKind.OBJECT, + structType = ArtemisConstants.SESSION_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS + ), args = { - @Argument(name = "con", type = TypeKind.OBJECT, structType = ArtemisConstants.CONNECTION_OBJ), - @Argument(name = "config", type = TypeKind.RECORD, structType = "SessionConfiguration") + @Argument( + name = "con", + type = TypeKind.OBJECT, + structType = ArtemisConstants.CONNECTION_OBJ + ), + @Argument( + name = "config", + type = TypeKind.RECORD, + structType = "SessionConfiguration" + ) } ) public class CreateSession extends BlockingNativeCallableUnit { @@ -93,7 +105,7 @@ public void execute(Context context) { new ArtemisTransactionContext(sessionObj)); } } catch (ActiveMQException e) { - ArtemisUtils.logAndSetError("Error occurred while starting session", context, e, logger); + ArtemisUtils.throwException("Error occurred while starting session", context, e, logger); } } } diff --git a/stdlib/messaging/activemq-artemis/src/test/java/org/ballerinalang/messaging/artemis/ArtemisCompilationTest.java b/stdlib/messaging/activemq-artemis/src/test/java/org/ballerinalang/messaging/artemis/ArtemisCompilationTest.java index 5cb63f93bc93..66c8251df19d 100644 --- a/stdlib/messaging/activemq-artemis/src/test/java/org/ballerinalang/messaging/artemis/ArtemisCompilationTest.java +++ b/stdlib/messaging/activemq-artemis/src/test/java/org/ballerinalang/messaging/artemis/ArtemisCompilationTest.java @@ -37,16 +37,14 @@ public class ArtemisCompilationTest { @Test(description = "Successfully compiling Artemis service") public void testValidService() { - CompileResult compileResult = BCompileUtil.compile(TEST_PATH.resolve("artemis_success.bal").toAbsolutePath() - .toString()); + CompileResult compileResult = getCompileResult("artemis_success.bal"); Assert.assertEquals(compileResult.toString(), "Compilation Successful"); } @Test(description = "More than expected number of resources in the service") public void testMoreResourcesInService() { - CompileResult compileResult = BCompileUtil.compile(TEST_PATH.resolve("artemis_more_resources.bal") - .toAbsolutePath().toString()); + CompileResult compileResult = getCompileResult("artemis_more_resources.bal"); assertExpectedDiagnosticsLength(compileResult); BAssertUtil.validateError(compileResult, 0, "Only one resource is allowed in the service", 24, 1); @@ -54,8 +52,7 @@ public void testMoreResourcesInService() { @Test(description = "More than expected number of annotations for the service") public void testMoreServiceAnnotationsForService() { - CompileResult compileResult = BCompileUtil.compile(TEST_PATH.resolve("artemis_more_service_annotation.bal") - .toAbsolutePath().toString()); + CompileResult compileResult = getCompileResult("artemis_multiple_service_annotation.bal"); assertExpectedDiagnosticsLength(compileResult); BAssertUtil.validateError(compileResult, 0, "There cannot be more than one Artemis service annotations", 29, 1); @@ -63,8 +60,7 @@ public void testMoreServiceAnnotationsForService() { @Test(description = "One service annotation is mandatory") public void testMandatoryServiceAnnotation() { - CompileResult compileResult = BCompileUtil.compile(TEST_PATH.resolve("artemis_mandatory_service_annotation.bal") - .toAbsolutePath().toString()); + CompileResult compileResult = getCompileResult("artemis_mandatory_service_annotation.bal"); assertExpectedDiagnosticsLength(compileResult); BAssertUtil.validateError(compileResult, 0, @@ -73,8 +69,7 @@ public void testMandatoryServiceAnnotation() { @Test(description = "Resource returns can only be error or nil") public void testResourceReturn() { - CompileResult compileResult = BCompileUtil.compile(TEST_PATH.resolve("artemis_resource_return.bal") - .toAbsolutePath().toString()); + CompileResult compileResult = getCompileResult("artemis_resource_return.bal"); assertExpectedDiagnosticsLength(compileResult); BAssertUtil.validateError(compileResult, 0, "Invalid return type: expected error?", 25, 5); @@ -82,30 +77,27 @@ public void testResourceReturn() { @Test(description = "Resource without resource parameters") public void testNoResourceParams() { - CompileResult compileResult = BCompileUtil.compile( - TEST_PATH.resolve("artemis_no_resource_params.bal").toAbsolutePath().toString()); + CompileResult compileResult = getCompileResult("artemis_no_resource_params.bal"); assertExpectedDiagnosticsLength(compileResult); BAssertUtil.validateError(compileResult, 0, "Invalid resource signature for onMsg resource: Unexpected parameter count(expected" + - " parameter count = 1)", 25, 5); + " parameter count 1 or 2)", 25, 5); } @Test(description = "Resource with multiple resource parameters") public void testMultipleResourceParams() { - CompileResult compileResult = BCompileUtil.compile( - TEST_PATH.resolve("artemis_more_resource_params.bal").toAbsolutePath().toString()); + CompileResult compileResult = getCompileResult("artemis_more_resource_params.bal"); assertExpectedDiagnosticsLength(compileResult); BAssertUtil.validateError(compileResult, 0, "Invalid resource signature for onMsg resource: Unexpected parameter count(expected" + - " parameter count = 1)", 25, 5); + " parameter count 1 or 2)", 25, 5); } @Test(description = "Invalid resource parameters") public void testDifferentResourceParams() { - CompileResult compileResult = BCompileUtil.compile( - TEST_PATH.resolve("artemis_different_resource_param.bal").toAbsolutePath().toString()); + CompileResult compileResult = getCompileResult("artemis_different_resource_param.bal"); assertExpectedDiagnosticsLength(compileResult); BAssertUtil.validateError( @@ -113,6 +105,10 @@ public void testDifferentResourceParams() { "Invalid resource signature for xyz resource: The first parameter should be an artemis:Message", 25, 5); } + private CompileResult getCompileResult(String s) { + return BCompileUtil.compile(TEST_PATH.resolve(s).toAbsolutePath().toString()); + } + private void assertExpectedDiagnosticsLength(CompileResult compileResult) { Assert.assertEquals(compileResult.getDiagnostics().length, 1); } diff --git a/stdlib/messaging/activemq-artemis/src/test/java/org/ballerinalang/messaging/artemis/ArtemisDataBindingCompilationTest.java b/stdlib/messaging/activemq-artemis/src/test/java/org/ballerinalang/messaging/artemis/ArtemisDataBindingCompilationTest.java new file mode 100644 index 000000000000..1b945bd76ce6 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/test/java/org/ballerinalang/messaging/artemis/ArtemisDataBindingCompilationTest.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinalang.messaging.artemis; + +import org.ballerinalang.launcher.util.BAssertUtil; +import org.ballerinalang.launcher.util.BCompileUtil; +import org.ballerinalang.launcher.util.CompileResult; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * Test Artemis Listener Service Compilation. + */ +public class ArtemisDataBindingCompilationTest { + private static final Path TEST_PATH = Paths.get("src", "test", "resources", "test-src", "data-binding"); + + @Test(description = "Successfully compiling Artemis services") + public void testValidService() { + CompileResult compileResult = BCompileUtil.compile(TEST_PATH.resolve("artemis_success.bal").toAbsolutePath() + .toString()); + + Assert.assertEquals(compileResult.toString(), "Compilation Successful"); + } + + @Test(description = "Unsupported type int[]") + public void testUnsupportedTypeInResource() { + CompileResult compileResult = BCompileUtil.compile(TEST_PATH.resolve("unsupported_type.bal") + .toAbsolutePath().toString()); + + assertExpectedDiagnosticsLength(compileResult); + BAssertUtil.validateError(compileResult, 0, + "Invalid resource signature for xyz resource in service : The second parameter " + + "should be a string, json, xml, byte[], map or a record type", + 27, 5); + } + + @Test(description = "Unsupported map type map") + public void testUnsupportedMapTypeInResource() { + CompileResult compileResult = BCompileUtil.compile(TEST_PATH.resolve("unsupported_map_type.bal") + .toAbsolutePath().toString()); + + assertExpectedDiagnosticsLength(compileResult); + BAssertUtil.validateError(compileResult, 0, + "Invalid resource signature for xyz resource in service : The second parameter " + + "should be a map of string, int, float, byte, boolean or byte[]", + 27, 5); + } + + private void assertExpectedDiagnosticsLength(CompileResult compileResult) { + Assert.assertEquals(compileResult.getDiagnostics().length, 1); + } +} diff --git a/stdlib/messaging/activemq-artemis/src/test/resources/test-src/artemis_more_resource_params.bal b/stdlib/messaging/activemq-artemis/src/test/resources/test-src/artemis_more_resource_params.bal index b709de7883e7..b2abf3b97357 100644 --- a/stdlib/messaging/activemq-artemis/src/test/resources/test-src/artemis_more_resource_params.bal +++ b/stdlib/messaging/activemq-artemis/src/test/resources/test-src/artemis_more_resource_params.bal @@ -22,6 +22,6 @@ import ballerina/artemis; } } service artemisConsumer on new artemis:Listener({host:"localhost", port:61616}) { - resource function onMsg(artemis:Message message, string text) returns error? { + resource function onMsg(artemis:Message message, string text, boolean val) returns error? { } } diff --git a/stdlib/messaging/activemq-artemis/src/test/resources/test-src/artemis_more_service_annotation.bal b/stdlib/messaging/activemq-artemis/src/test/resources/test-src/artemis_multiple_service_annotation.bal similarity index 100% rename from stdlib/messaging/activemq-artemis/src/test/resources/test-src/artemis_more_service_annotation.bal rename to stdlib/messaging/activemq-artemis/src/test/resources/test-src/artemis_multiple_service_annotation.bal diff --git a/stdlib/messaging/activemq-artemis/src/test/resources/test-src/data-binding/artemis_success.bal b/stdlib/messaging/activemq-artemis/src/test/resources/test-src/data-binding/artemis_success.bal new file mode 100644 index 000000000000..e6ca7dfc12f2 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/test/resources/test-src/data-binding/artemis_success.bal @@ -0,0 +1,144 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/artemis; + +listener artemis:Listener artemisListener = new({host:"localhost", port:61616}); + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue1" + } +} +service artemisConsumer on artemisListener { + resource function xyz(artemis:Message message, string data) returns error? { + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue2" + } +} +service artemisConsumer2 on artemisListener { + resource function xyz(artemis:Message message, json data) returns error? { + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue3" + } +} +service artemisConsumer3 on artemisListener { + resource function xyz(artemis:Message message, xml data) returns error? { + } +} + +type Person record { + string name; + int age; +}; + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue4" + } +} +service artemisConsumer4 on artemisListener { + resource function xyz(artemis:Message message, Person data) returns error? { + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue6" + } +} +service artemisConsumer6 on artemisListener { + resource function xyz(artemis:Message message, byte[] data) returns error? { + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue7" + } +} +service artemisConsumer7 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue8" + } +} +service artemisConsumer8 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue9" + } +} +service artemisConsumer9 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue10" + } +} +service artemisConsumer10 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue11" + } +} +service artemisConsumer11 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue12" + } +} +service artemisConsumer12 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue13" + } +} +service artemisConsumer13 on artemisListener { + resource function xyz(artemis:Message message) returns error? { + } +} diff --git a/stdlib/messaging/activemq-artemis/src/test/resources/test-src/data-binding/unsupported_map_type.bal b/stdlib/messaging/activemq-artemis/src/test/resources/test-src/data-binding/unsupported_map_type.bal new file mode 100644 index 000000000000..54ac9c04c076 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/test/resources/test-src/data-binding/unsupported_map_type.bal @@ -0,0 +1,29 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/artemis; + +listener artemis:Listener artemisListener = new({host:"localhost", port:61616}); + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue1" + } +} +service artemisConsumer on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + } +} diff --git a/stdlib/messaging/activemq-artemis/src/test/resources/test-src/data-binding/unsupported_type.bal b/stdlib/messaging/activemq-artemis/src/test/resources/test-src/data-binding/unsupported_type.bal new file mode 100644 index 000000000000..e38d0f7f7afc --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/test/resources/test-src/data-binding/unsupported_type.bal @@ -0,0 +1,29 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/artemis; + +listener artemis:Listener artemisListener = new({host:"localhost", port:61616}); + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue1" + } +} +service artemisConsumer on artemisListener { + resource function xyz(artemis:Message message, int[] data) returns error? { + } +} diff --git a/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/DataBindingTest.java b/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/DataBindingTest.java new file mode 100644 index 000000000000..57840002093f --- /dev/null +++ b/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/DataBindingTest.java @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinalang.test.messaging.artemis; + +import org.ballerinalang.launcher.util.BCompileUtil; +import org.ballerinalang.launcher.util.CompileResult; +import org.ballerinalang.test.util.TestUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; + +import static org.ballerinalang.test.messaging.artemis.ArtemisTestUtils.testSend; + +/** + * Includes tests for different payload types for ANYCAST and MULTICAST queues. + */ +@Test(groups = {"artemis-test"}) +public class DataBindingTest extends ArtemisTestCommons { + private CompileResult anyCastResult; + + @BeforeClass + public void setup() throws URISyntaxException { + TestUtils.prepareBalo(this); + Path sourcePath = Paths.get("src", "test", "resources", "messaging", "artemis", "producers"); + anyCastResult = BCompileUtil.compile(sourcePath.resolve("data_binding_message.bal").toAbsolutePath() + .toString()); + } + + @Test(description = "Tests the sending of a string message to a queue") + public void testSendString() { + String log = "string data Hello World"; + String functionName = "testSendString"; + testSend(anyCastResult, log, functionName, serverInstance); + } + + @Test(description = "Tests the sending of a json message to a queue") + public void testSendJson() { + String log = "json data {\"name\":\"Riyafa\"}"; + String functionName = "testSendJson"; + testSend(anyCastResult, log, functionName, serverInstance); + } + + @Test(description = "Tests the sending of a xml message to a queue") + public void testSendXml() { + String log = "The Lost World"; + String functionName = "testSendXml"; + testSend(anyCastResult, log, functionName, serverInstance); + } + + @Test(description = "Tests the sending of a record message to a queue") + public void testSendRecord() { + String log = "person data {name:\"John\", age:20}"; + String functionName = "testSendRecord"; + testSend(anyCastResult, log, functionName, serverInstance); + } + + @Test(description = "Tests the sending of a byte[] message to a queue") + public void testSendByteArray() { + String log = "byte[] data [1, 2, 2, 3, 3, 2]"; + String functionName = "testSendByteArray"; + testSend(anyCastResult, log, functionName, serverInstance); + } + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapString() { + String log = "map data {\"name\":\"Riyafa\", \"hello\":\"world\"}"; + String functionName = "testSendMapString"; + testSend(anyCastResult, log, functionName, serverInstance); + } + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapInt() { + String log = "map data {\"num\":1, \"num2\":2}"; + String functionName = "testSendMapInt"; + testSend(anyCastResult, log, functionName, serverInstance); + } + + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapFloat() { + String log = "map data {\"numf1\":1.1, \"numf2\":1.2}"; + String functionName = "testSendMapFloat"; + testSend(anyCastResult, log, functionName, serverInstance); + } + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapByte() { + String log = "map data {\"byte1\":1, \"byte2\":7}"; + String functionName = "testSendMapByte"; + testSend(anyCastResult, log, functionName, serverInstance); + } + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapByteArray() { + String log = "map data {\"array2\":[5], \"array1\":[1, 2, 3]}"; + String functionName = "testSendMapByteArray"; + testSend(anyCastResult, log, functionName, serverInstance); + } + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapBoolean() { + String log = "map data {\"first\":true, \"second\":false}"; + String functionName = "testSendMapBoolean"; + testSend(anyCastResult, log, functionName, serverInstance); + } +} diff --git a/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/LocalTransactionTest.java b/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/LocalTransactionTest.java index b2ea8b547c5c..19ac401f49d3 100644 --- a/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/LocalTransactionTest.java +++ b/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/LocalTransactionTest.java @@ -23,6 +23,7 @@ import org.ballerinalang.launcher.util.CompileResult; import org.ballerinalang.model.values.BValue; import org.ballerinalang.test.util.TestUtils; +import org.ballerinalang.util.exceptions.BLangRuntimeException; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -66,4 +67,17 @@ public void testSend() { String returnVal = BRunUtil.invoke(consumerResult, "transactionConsumerReceive", consumerVal)[0].stringValue(); Assert.assertEquals(returnVal, "Example Example ", "Invalid message received"); } + + @Test(description = "Tests transaction erring") + public void testErringSend() { + //Invoking this function is needed to make sure the queue is created before the sending + BValue[] consumerVal = BRunUtil.invoke(consumerResult, "createErringConsumer"); + try { + BRunUtil.invoke(producerResult, "testErringSend"); + } catch (BLangRuntimeException ex) { + // Ignore + } + String returnVal = BRunUtil.invoke(consumerResult, "receiveAndGetText", consumerVal)[0].stringValue(); + Assert.assertEquals(returnVal, "Example ", "Invalid message received"); + } } diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/data_binding.bal b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/data_binding.bal new file mode 100644 index 000000000000..978048bf34d4 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/data_binding.bal @@ -0,0 +1,154 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/artemis; +import ballerina/io; + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue1" + } +} +service artemisConsumer on artemisListener { + resource function xyz(artemis:Message message, string data) returns error? { + io:print("string data "); + io:println(data); + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue2" + } +} +service artemisConsumer2 on artemisListener { + resource function xyz(artemis:Message message, json data) returns error? { + io:print("json data "); + io:println(data); + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue3" + } +} +service artemisConsumer3 on artemisListener { + resource function xyz(artemis:Message message, xml data) returns error? { + io:println(data); + } +} + +type Person record {| + string name; + int age; +|}; + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue4" + } +} +service artemisConsumer4 on artemisListener { + resource function xyz(artemis:Message message, Person data) returns error? { + io:print("person data "); + io:println(data); + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue6" + } +} +service artemisConsumer6 on artemisListener { + resource function xyz(artemis:Message message, byte[] data) returns error? { + io:print("byte[] data "); + io:println(data); + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue7" + } +} +service artemisConsumer7 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + io:print("map data "); + io:println(data); + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue8" + } +} +service artemisConsumer8 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + io:print("map data "); + io:println(data); + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue9" + } +} +service artemisConsumer9 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + io:print("map data "); + io:println(data); + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue10" + } +} +service artemisConsumer10 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + io:print("map data "); + io:println(data); + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue11" + } +} +service artemisConsumer11 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + io:print("map data "); + io:println(data); + } +} + +@artemis:ServiceConfig { + queueConfig: { + queueName: "queue12" + } +} +service artemisConsumer12 on artemisListener { + resource function xyz(artemis:Message message, map data) returns error? { + io:print("map data "); + io:println(data); + } +} diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/transaction_consumer.bal b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/transaction_consumer.bal index eec54a87f2d0..646a4a117f0f 100644 --- a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/transaction_consumer.bal +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/transaction_consumer.bal @@ -17,7 +17,7 @@ import ballerina/artemis; public function createSimpleConsumer() returns artemis:Consumer|error { - artemis:Listener lis = new artemis:Listener({host:"localhost", port:61616}); + artemis:Listener lis = new artemis:Listener({host: "localhost", port: 61616}); return lis.createAndGetConsumer({queueName: "example"}); } @@ -46,7 +46,12 @@ public function transactionConsumerReceive(artemis:Consumer consumer) returns st return msgTxt; } -function receiveAndGetText(artemis:Consumer consumer) returns string { +public function createErringConsumer() returns artemis:Consumer|error { + artemis:Listener lis = new artemis:Listener({host: "localhost", port: 61616}); + return lis.createAndGetConsumer({queueName: "example3"}); +} + +public function receiveAndGetText(artemis:Consumer consumer) returns string { string msgTxt = ""; var msg = consumer->receive(); if(msg is artemis:Message) { diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/data_binding_message.bal b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/data_binding_message.bal new file mode 100644 index 000000000000..abc8b7e886dd --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/data_binding_message.bal @@ -0,0 +1,105 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/artemis; + +artemis:EndpointConfiguration config = {host: "localhost", port: 61616}; +artemis:AddressConfiguration addressConfig = {autoCreated:false}; + +public function testSendString() { + artemis:Producer prod = new(config , "queue1", addressConfig = addressConfig); + var err = prod->send("Hello World"); +} + +public function testSendJson() { + artemis:Producer prod = new(config, "queue2", addressConfig = addressConfig); + json name = {"name": "Riyafa"}; + var err = prod->send(name); +} + +public function testSendXml() { + artemis:Producer prod = new(config, "queue3", addressConfig = addressConfig); + xml book = xml `The Lost World`; + var err = prod->send(book); +} + +public function testSendRecord() { + artemis:Producer prod = new(config, "queue4", addressConfig = addressConfig); + json person = {"name": "John", "age":20}; + var err = prod->send(person); +} + +public function testSendByteArray() { + artemis:Producer prod = new(config, "queue6", addressConfig = addressConfig); + byte[6] msg = [1, 2, 2, 3, 3, 2]; + var err = prod->send(msg); +} + +public function testSendMapString() { + artemis:Producer prod = new(config, "queue7", addressConfig = addressConfig); + map msg = { + "name": "Riyafa", + "hello": "world" + }; + var err = prod->send(msg); +} + +public function testSendMapInt() { + artemis:Producer prod = new(config, "queue8", addressConfig = addressConfig); + map msg = { + "num": 1, + "num2": 2 + }; + var err = prod->send(msg); +} + +public function testSendMapFloat() { + artemis:Producer prod = new(config, "queue9", addressConfig = addressConfig); + map msg = { + "numf1": 1.1, + "numf2": 1.2 + }; + var err = prod->send(msg); +} + +public function testSendMapByte() { + artemis:Producer prod = new(config, "queue10", addressConfig = addressConfig); + map msg = { + "byte1": 1, + "byte2": 7 + }; + var err = prod->send(msg); +} + +public function testSendMapByteArray() { + artemis:Producer prod = new(config, "queue11", addressConfig = addressConfig); + byte[3] array1 = [1, 2, 3]; + byte[1] array2 = [5]; + map msg = { + "array1": array1, + "array2": array2 + }; + var err = prod->send(msg); +} + +public function testSendMapBoolean() { + artemis:Producer prod = new(config, "queue12", addressConfig = addressConfig); + map msg = { + "first": true, + "second": false + }; + var err = prod->send(msg); +} diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/transaction_producer.bal b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/transaction_producer.bal index d167a9acf389..5c11688f23c5 100644 --- a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/transaction_producer.bal +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/transaction_producer.bal @@ -18,26 +18,36 @@ import ballerina/artemis; import ballerina/io; public function testSimpleTransactionSend() { - artemis:Producer prod = new({host:"localhost", port:61616}, "example"); + artemis:Producer prod = new({host: "localhost", port: 61616}, "example"); send(prod); transaction { send(prod); } } +public function testErringSend() { + artemis:Producer prod = new({host: "localhost", port: 61616}, "example3"); + send(prod); + transaction { + send(prod); + error err = error("Failed during send"); + panic err; + } +} + public function testTransactionSend() { artemis:Connection con = new("tcp://localhost:61616"); artemis:Session session = new(con); artemis:Producer prod = new(session, "example2"); - send(prod); transaction { send(prod); + send(prod); } } function send(artemis:Producer prod) { var err = prod->send("Example "); - if(err is error) { + if (err is error) { io:println("Error occurred sending message"); } } diff --git a/tests/ballerina-integration-test/src/test/resources/testng.xml b/tests/ballerina-integration-test/src/test/resources/testng.xml index 133ced2ca004..5762e34bf8a3 100644 --- a/tests/ballerina-integration-test/src/test/resources/testng.xml +++ b/tests/ballerina-integration-test/src/test/resources/testng.xml @@ -281,6 +281,7 @@ +