diff --git a/distribution/zip/ballerina-tools/pom.xml b/distribution/zip/ballerina-tools/pom.xml index 8f4f6943471b..989d08a199c4 100644 --- a/distribution/zip/ballerina-tools/pom.xml +++ b/distribution/zip/ballerina-tools/pom.xml @@ -116,6 +116,12 @@ zip ballerina-sources + + org.ballerinalang + ballerina-activemq-artemis + zip + ballerina-sources + org.ballerinalang ballerina-log-api diff --git a/distribution/zip/ballerina/build.gradle b/distribution/zip/ballerina/build.gradle index c255d5e722fa..0f90b3d58c29 100644 --- a/distribution/zip/ballerina/build.gradle +++ b/distribution/zip/ballerina/build.gradle @@ -80,6 +80,15 @@ dependencies { dist 'com.google.protobuf:protobuf-java:3.5.1' dist 'org.wso2.orbit.org.yaml:snakeyaml:1.16.0.wso2v1' dist 'org.wso2.staxon:staxon-core:1.2.0.wso2v2' + dist 'com.jcraft:jzlib:1.1.3' + dist 'org.apache.activemq:artemis-core-client:2.6.3' + dist 'org.apache.activemq:artemis-commons:2.6.3' + dist 'commons-beanutils:commons-beanutils:1.9.3' + dist 'org.jboss.logging:jboss-logging:3.3.1.Final' + dist 'commons-collections:commons-collections:3.2.2' + dist 'org.apache.geronimo.specs:geronimo-json_1.0_spec:1.0-alpha-1' + dist 'io.netty:netty-transport-native-epoll:4.1.34.Final' + dist 'io.netty:netty-transport-native-kqueue:4.1.34.Final' distBal project(path: ':ballerina-auth', configuration: 'baloImplementation') @@ -110,6 +119,7 @@ dependencies { distBal project(path: ':ballerina-time', configuration: 'baloImplementation') distBal project(path: ':ballerina-transactions', configuration: 'baloImplementation') distBal project(path: ':ballerina-websub', configuration: 'baloImplementation') + distBal project(path: ':ballerina-activemq-artemis', configuration: 'baloImplementation') balSource project(path: ':ballerina-auth', configuration: 'balSource') balSource project(path: ':ballerina-builtin', configuration: 'balSource') @@ -139,6 +149,7 @@ dependencies { balSource project(path: ':ballerina-time', configuration: 'balSource') balSource project(path: ':ballerina-transactions', configuration: 'balSource') balSource project(path: ':ballerina-websub', configuration: 'balSource') + balSource project(path: ':ballerina-activemq-artemis', configuration: 'balSource') dist project(':ballerina-auth') dist project(':ballerina-builtin') @@ -181,6 +192,7 @@ dependencies { dist project(':strip-bouncycastle') dist project(':toml-parser') dist project(':tracing-extensions:ballerina-jaeger-extension') + dist project(':ballerina-activemq-artemis') } diff --git a/distribution/zip/ballerina/pom.xml b/distribution/zip/ballerina/pom.xml index f3a6073528bc..6836d615b8f3 100644 --- a/distribution/zip/ballerina/pom.xml +++ b/distribution/zip/ballerina/pom.xml @@ -102,6 +102,10 @@ org.ballerinalang ballerina-jms + + org.ballerinalang + ballerina-activemq-artemis + org.ballerinalang ballerina-log-api @@ -162,6 +166,42 @@ org.ballerinalang ballerina-privacy + + com.jcraft + jzlib + + + org.apache.activemq + artemis-core-client + + + org.apache.activemq + artemis-commons + + + commons-beanutils + commons-beanutils + + + org.jboss.logging + jboss-logging + + + commons-collections + commons-collections + + + org.apache.geronimo.specs + geronimo-json_1.0_spec + + + io.netty + netty-transport-native-epoll + + + io.netty + netty-transport-native-kqueue + @@ -245,10 +285,6 @@ com.h2database h2 - - com.jcraft - jzlib - @@ -403,6 +439,12 @@ zip ballerina-binary-repo + + org.ballerinalang + ballerina-activemq-artemis + zip + ballerina-binary-repo + org.ballerinalang ballerina-jms @@ -598,6 +640,12 @@ zip ballerina-sources + + org.ballerinalang + ballerina-activemq-artemis + zip + ballerina-sources + org.ballerinalang ballerina-log-api @@ -764,6 +812,7 @@ ballerina-socket, ballerina-internal, ballerina-jms, + ballerina-activemq-artemis, ballerina-log-api, ballerina-math, ballerina-mime, diff --git a/distribution/zip/ballerina/src/assembly/bin.xml b/distribution/zip/ballerina/src/assembly/bin.xml index 9c87903c4fb7..8d2d2e06afa8 100644 --- a/distribution/zip/ballerina/src/assembly/bin.xml +++ b/distribution/zip/ballerina/src/assembly/bin.xml @@ -112,6 +112,7 @@ org.ballerinalang:ballerina-io:jar org.ballerinalang:ballerina-socket:jar org.ballerinalang:ballerina-jms:jar + org.ballerinalang:ballerina-activemq-artemis:jar org.ballerinalang:ballerina-log-api:jar org.ballerinalang:ballerina-math:jar org.ballerinalang:ballerina-mime:jar @@ -185,6 +186,17 @@ io.netty:netty-tcnative-boringssl-static com.jcraft:jzlib + + org.apache.activemq:artemis-core-client + org.apache.activemq:artemis-commons + commons-beanutils:commons-beanutils + org.apache.activemq:artemis-commons + org.jboss.logging:jboss-logging + commons-collections:commons-collections + org.apache.geronimo.specs:geronimo-json_1.0_spec + io.netty:netty-transport-native-epoll + io.netty:netty-transport-native-kqueue + org.ballerinalang:ballerina-jaeger-extension:jar io.opentracing:opentracing-api diff --git a/language-server/modules/langserver-compiler/build.gradle b/language-server/modules/langserver-compiler/build.gradle index 9252de03e78e..d01ee3bed682 100644 --- a/language-server/modules/langserver-compiler/build.gradle +++ b/language-server/modules/langserver-compiler/build.gradle @@ -20,6 +20,7 @@ dependencies { implementation project(':ballerina-websub') implementation project(':ballerina-jms') implementation project(':ballerina-grpc') + implementation project(':ballerina-activemq-artemis') testCompile 'org.testng:testng:6.13.1' } diff --git a/language-server/modules/langserver-compiler/pom.xml b/language-server/modules/langserver-compiler/pom.xml index e3a7e88ae474..6684289538d5 100644 --- a/language-server/modules/langserver-compiler/pom.xml +++ b/language-server/modules/langserver-compiler/pom.xml @@ -94,6 +94,12 @@ zip ballerina-binary-repo + + org.ballerinalang + ballerina-activemq-artemis + zip + ballerina-binary-repo + org.ballerinalang ballerina-grpc diff --git a/language-server/modules/langserver-core/build.gradle b/language-server/modules/langserver-core/build.gradle index fda3ad3e401e..c352082878b5 100644 --- a/language-server/modules/langserver-core/build.gradle +++ b/language-server/modules/langserver-core/build.gradle @@ -31,6 +31,7 @@ dependencies { implementation project(':ballerina-jms') implementation project(':ballerina-grpc') implementation project(':testerina:testerina-core') + implementation project(':ballerina-activemq-artemis') implementation 'com.fasterxml.jackson.core:jackson-databind:2.9.1' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.9.1' implementation 'io.netty:netty-buffer:4.1.19.Final' diff --git a/language-server/modules/langserver-core/pom.xml b/language-server/modules/langserver-core/pom.xml index 325aee213b32..65aeea0160d3 100644 --- a/language-server/modules/langserver-core/pom.xml +++ b/language-server/modules/langserver-core/pom.xml @@ -102,6 +102,12 @@ zip ballerina-binary-repo + + org.ballerinalang + ballerina-activemq-artemis + zip + ballerina-binary-repo + org.ballerinalang ballerina-grpc diff --git a/pom.xml b/pom.xml index 729da34a23bc..1d689db2f78e 100644 --- a/pom.xml +++ b/pom.xml @@ -557,6 +557,27 @@ ballerina-binary-repo + + + org.ballerinalang + ballerina-activemq-artemis + ${ballerina.version} + + + org.ballerinalang + ballerina-activemq-artemis + ${ballerina.version} + zip + ballerina-sources + + + org.ballerinalang + ballerina-activemq-artemis + ${ballerina.version} + zip + ballerina-binary-repo + + org.ballerinalang @@ -1603,6 +1624,48 @@ asm ${asm.version} + + + + org.apache.activemq + artemis-core-client + ${artemis.version} + + + org.apache.activemq + artemis-commons + ${artemis.version} + + + commons-beanutils + commons-beanutils + ${beanutils.version} + + + org.jboss.logging + jboss-logging + ${jboss.version} + + + commons-collections + commons-collections + ${commons-collections.version} + + + org.apache.geronimo.specs + geronimo-json_1.0_spec + ${geronimo-json.version} + + + io.netty + netty-transport-native-epoll + ${netty.version} + + + io.netty + netty-transport-native-kqueue + ${netty.version} + @@ -1725,6 +1788,7 @@ stdlib/database/sql stdlib/streams stdlib/privacy + stdlib/messaging/activemq-artemis misc/lib-creator @@ -1815,6 +1879,7 @@ stdlib/database/sql stdlib/streams stdlib/privacy + stdlib/messaging/activemq-artemis misc/lib-creator @@ -1902,6 +1967,7 @@ stdlib/database/sql stdlib/streams stdlib/privacy + stdlib/messaging/activemq-artemis misc/lib-creator @@ -2402,6 +2468,13 @@ 3.2.87 1.0.0-wso2v2 + + 2.6.3 + 1.9.3 + 3.3.1.Final + 3.2.2 + 1.0-alpha-1 + 6.0.55 1.35 diff --git a/settings.gradle b/settings.gradle index f6ca40a1afa6..da204fe94f6f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -87,6 +87,7 @@ include(':ballerina-tools-integration-test') include(':examples-test') include(':plugin-vscode') include(':benchmarks') +include(':ballerina-activemq-artemis') include(':build-config:checkstyle') project(':ballerina-lang').projectDir = file('compiler/ballerina-lang') project(':ballerina-utils').projectDir = file('stdlib/utils') @@ -172,6 +173,7 @@ project(':ballerina-integration-test').projectDir = file('tests/ballerina-integr project(':ballerina-tools-integration-test').projectDir = file('tests/ballerina-tools-integration-test') project(':examples-test').projectDir = file('tests/ballerina-examples-test') project(':plugin-vscode').projectDir = file('tool-plugins/vscode') +project(':ballerina-activemq-artemis').projectDir = file('stdlib/messaging/activemq-artemis') buildCache { remote(HttpBuildCache) { diff --git a/stdlib/messaging/activemq-artemis/assembly/balo.xml b/stdlib/messaging/activemq-artemis/assembly/balo.xml new file mode 100644 index 000000000000..ecb179592764 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/assembly/balo.xml @@ -0,0 +1,35 @@ + + + true + / + ballerina-binary-repo + + zip + + + + + ${project.build.directory}/generated-balo + / + + ** + + + + diff --git a/stdlib/messaging/activemq-artemis/assembly/source.xml b/stdlib/messaging/activemq-artemis/assembly/source.xml new file mode 100644 index 000000000000..a8f89e728fa3 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/assembly/source.xml @@ -0,0 +1,35 @@ + + + true + ballerina + ballerina-sources + + zip + + + + + src/main/ballerina + / + + ** + + + + diff --git a/stdlib/messaging/activemq-artemis/build.gradle b/stdlib/messaging/activemq-artemis/build.gradle new file mode 100644 index 000000000000..b073ab665a66 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/build.gradle @@ -0,0 +1,27 @@ +apply from: "$rootDir/gradle/balNativeLibProject.gradle" + +dependencies { + implementation project(':ballerina-core') + implementation project(':ballerina-io') + implementation project(':ballerina-lang') + implementation project(':ballerina-builtin') + implementation project(':ballerina-utils') + implementation 'org.apache.activemq:artemis-core-client:2.6.3' + + baloImplementation project(path: ':ballerina-builtin', configuration: 'baloImplementation') + baloImplementation project(path: ':ballerina-io', configuration: 'baloImplementation') + baloImplementation project(path: ':ballerina-utils', configuration: 'baloImplementation') +} + +description = 'Ballerina - ActiveMQ Artemis' + +test { + doFirst { + copy { + from "$buildDir/generated-balo/repo/ballerina" + into "$buildDir/lib/repo/ballerina" + } + } + systemProperty "java.util.logging.config.file", "$buildDir/logging.properties" + systemProperty "java.util.logging.manager", "org.ballerinalang.logging.BLogManager" +} diff --git a/stdlib/messaging/activemq-artemis/pom.xml b/stdlib/messaging/activemq-artemis/pom.xml new file mode 100644 index 000000000000..b1c481b93518 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/pom.xml @@ -0,0 +1,247 @@ + + + + + ballerina-parent + org.ballerinalang + 0.990.4-SNAPSHOT + ../../../pom.xml + + 4.0.0 + ballerina-activemq-artemis + jar + Ballerina - ActiveMQ Artemis + https://ballerina.io + + + + org.ballerinalang + ballerina-core + + + org.ballerinalang + ballerina-io + + + org.ballerinalang + ballerina-lang + + + org.ballerinalang + ballerina-builtin + zip + ballerina-binary-repo + + + org.ballerinalang + ballerina-utils + zip + ballerina-binary-repo + + + org.ballerinalang + ballerina-io + zip + ballerina-binary-repo + + + org.apache.activemq + artemis-core-client + + + + + + + src/main/resources + + ballerina/** + + + + + + org.codehaus.mojo + exec-maven-plugin + + + gen-balo + + java + + compile + + + + BALLERINA_DEV_MODE_COMPILE + true + + + + false + ${basedir}/src/main/ballerina/ + ${project.build.directory}/generated-balo/repo/ballerina + ${project.build.directory} + ${project.version} + + + + + + org.ballerinalang.stdlib.utils.GenerateBalo + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + unpack-dependencies + generate-resources + + unpack-dependencies + + + ballerina-binary-repo + ${project.build.directory}/lib + + + + + + org.apache.maven.plugins + maven-compiler-plugin + ${maven.compiler.plugin.version} + + -proc:none + + + + org.apache.maven.plugins + maven-assembly-plugin + + + distribution + package + + attached + + + assembly + + + + + + + org.bsc.maven + maven-processor-plugin + ${mvn.processor.plugin.version} + + + org.ballerinalang.codegen.BallerinaAnnotationProcessor + + + org.ballerinalang.stdlib.artemis.generated.providers + + StandardNativeElementProvider + + + + + process + + process + + generate-sources + + + + + org.jacoco + jacoco-maven-plugin + + + prepare-it-test-agent + + prepare-agent + + + true + true + + org/wso2/ballerinalang/compiler/parser/antlr4/** + + jacoco.agent.argLine + ${project.build.directory}/coverage-reports/jacoco.exec + + + + it-report + verify + + report-aggregate + + + + **/coverage-reports/jacoco.exec + + + org/wso2/ballerinalang/compiler/parser/antlr4/** + + ${project.build.directory}/coverage-reports/site + + + + + + org.apache.maven.plugins + maven-resources-plugin + + + + copy-file-balo + compile + + copy-resources + + + ${project.build.directory}/lib/repo + + + ${project.build.directory}/generated-balo/repo/ + + + + + + + + + + + spotbugs-exclude.xml + **/generated/** + + + diff --git a/stdlib/messaging/activemq-artemis/spotbugs-exclude.xml b/stdlib/messaging/activemq-artemis/spotbugs-exclude.xml new file mode 100644 index 000000000000..1c56b1fc808a --- /dev/null +++ b/stdlib/messaging/activemq-artemis/spotbugs-exclude.xml @@ -0,0 +1,19 @@ + + + diff --git a/stdlib/messaging/activemq-artemis/src/main/ballerina/Ballerina.toml b/stdlib/messaging/activemq-artemis/src/main/ballerina/Ballerina.toml new file mode 100644 index 000000000000..d8f883b63bd5 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/ballerina/Ballerina.toml @@ -0,0 +1,3 @@ +[project] +org-name = "ballerina" +version = "0.0.0" diff --git a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/artemis_commons.bal b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/artemis_commons.bal new file mode 100644 index 000000000000..ffaa4d0cc935 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/artemis_commons.bal @@ -0,0 +1,50 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/io; + +# Constant for the artemis error code. +public const ARTEMIS_ERROR_CODE = "{ballerina/artemis}ArtemisError"; + +# The Artemis error record. +# +# + message - the error message. +public type ArtemisError record { + string message?; + !...; +}; + +# The url configuration for `Producer` and `Consumer`. +# +# + host - The host +# + port - The port +# + username - The username +# + password - The password +public type URLConfiguration record { + string host; + int port; + string username?; + string password?; + !...; +}; + +# Determines how messages are sent to the queues associated with an address. +public type RoutingType MULTICAST | ANYCAST; + +# If you want your messages routed to every queue within the matching address, in a publish-subscribe manner. +public const MULTICAST = "MULTICAST"; +# If you want your messages routed to a single queue within the matching address, in a point-to-point manner. +public const ANYCAST = "ANYCAST"; diff --git a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/connection.bal b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/connection.bal new file mode 100644 index 000000000000..9314473ffecf --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/connection.bal @@ -0,0 +1,70 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +# Represents ActiveMQ Artemis Connection. +public type Connection client object { + + # Creates an ActiveMQ Artemis Connection object. + # + # + url - The connection url to the broker + # + config - The connection configuration + public function __init(string url, ConnectionConfiguration? config = ()) { + ConnectionConfiguration configuration = {}; + if (config is ConnectionConfiguration) { + configuration = config; + } + self.createConnection(url, configuration); + } + + extern function createConnection(string url, ConnectionConfiguration config); + + # Returns true if close was already called. + # + # + return - `true` if closed, `false` otherwise + public extern function isClosed() returns boolean; + + # Closes the connection and release all its resources. + public remote extern function close(); +}; + +# Configurations related to a Artemis `Connection`. +# +# + timeToLive - Connection's time-to-live. negative to disable or greater or equals to 0 +# + callTimeout - The blocking calls timeout in milliseconds +# + consumerWindowSize - Window size in bytes for flow control of the consumers created through this `Connection` +# + consumerMaxRate - Maximum rate of message consumption for consumers created through this `Connection` +# + producerWindowSize - Window size for flow control of the producers created through this `Connection` +# + producerMaxRate - The maximum rate of message production for producers created through this `Connection` +# + retryInterval - The time in milliseconds to retry connection +# + retryIntervalMultiplier - Multiplier to apply to successive retry intervals +# + maxRetryInterval - The maximum retry interval (in the case a retry interval multiplier has been specified) +# + reconnectAttempts - The maximum number of attempts to retry connection in case of failure +# + initialConnectAttempts - The maximum number of attempts to establish an initial connection +public type ConnectionConfiguration record { + //Add this once working + int timeToLive = 60000; + int callTimeout = 30000; + int consumerWindowSize = 1024 * 1024; + int consumerMaxRate = -1; + int producerWindowSize = 64 * 1024; + int producerMaxRate = -1; + int retryInterval = 2000; + float retryIntervalMultiplier = 1; + int maxRetryInterval = 2000; + int reconnectAttempts = 0; + int initialConnectAttempts = 1; + !...; +}; diff --git a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/listener.bal b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/listener.bal new file mode 100644 index 000000000000..44a9923a7604 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/listener.bal @@ -0,0 +1,94 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +# Represents ActiveMQ Artemis Listener. This is an abstraction for that includes the connection and session. +# Consumers are represented by the service attaching to this listener. +public type Listener object { + *AbstractListener; + private Session session; + private boolean anonymousSession; + + public function __init(Session | URLConfiguration sesssionOrURLConfig) { + if (sesssionOrURLConfig is Session) { + self.session = sesssionOrURLConfig; + } else { + Connection connection = new("tcp://" + sesssionOrURLConfig.host + ":" + sesssionOrURLConfig.port); + self.session = new(connection, config = { username: sesssionOrURLConfig["username"], + password: sesssionOrURLConfig["password"] }); + self.anonymousSession = true; + } + } + public function __start() returns error? { + return self.start(); + } + public function __stop() returns error? { + return self.stop(); + } + public function __attach(service serviceType, map annotationData) returns error? { + return self.createConsumer(serviceType); + } + + extern function start() returns error?; + extern function createConsumer(service serviceType) returns error?; + extern function stop() returns error?; +}; + +# The configuration for an Artemis consumer service. +# +# + autoAck - whether to automatically acknowledge a service when a resource completes +# + queueConfig - the configuration for the queue to consume from +# + filter - only messages which match this filter will be consumed +# + browseOnly - whether the ClientConsumer will only browse the queue or consume messages +public type ArtemisServiceConfig record { + boolean autoAck = true; + QueueConfiguration queueConfig; + string? filter = (); + boolean browseOnly = false; + !...; +}; + +public annotation ServiceConfig ArtemisServiceConfig; + +# ActiveMQ Artemis Queue configuration. +# If the `autoCreated` is `false` an error will be thrown if the queue does not exist. +# If `autocreated` is `true` and the queue already exists then the other configurations would be ignored. +# +# + queueName - the name of the queue +# + addressName - the address queue is bound to. If the value is `nil` and `autoCreated` is true and the +# queue does not already exist then the address would take the name of the queue. +# + autoCreated - whether to automatically create the queue +# + routingType - the routing type for the queue, MULTICAST or ANYCAST +# + temporary - whether the queue is temporary. If this value is set to true the `durable` property value shall be ignored. +# + filter - messages which match this filter will be put in the queue +# + durable - whether the queue is durable or not. If `temporary` property value is true this value +# + maxConsumers - how many concurrent consumers will be allowed on this queue +# + purgeOnNoConsumers - whether to delete the contents of the queue when the last consumer disconnects +# + exclusive - whether the queue should be exclusive +# + lastValue - whether the queue should be lastValue +public type QueueConfiguration record { + string queueName; + string? addressName = (); + boolean autoCreated = true; + RoutingType routingType = ANYCAST; + boolean temporary = true; + string? filter = (); + boolean durable = false; + int maxConsumers = -1; + boolean purgeOnNoConsumers = false; + boolean exclusive = false; + boolean lastValue = false; + !...; +}; diff --git a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/message.bal b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/message.bal new file mode 100644 index 000000000000..db7e59028ae5 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/message.bal @@ -0,0 +1,133 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +# Represents ActiveMQ Artemis Message. +public type Message client object { + + private MessageType messageType = TEXT; + private MessageConfiguration configuration; + + public function __init(Session session, io:ReadableByteChannel | int | float | byte | boolean | string | + map | xml | json | byte[] data, + MessageConfiguration? config = ()) { + if (config is MessageConfiguration) { + self.configuration = config; + } else { + self.configuration = {}; + } + + if (data is (string | json | xml | int | float | byte | boolean)) { + self.messageType = TEXT; + } + if (data is io:ReadableByteChannel) { + self.messageType = STREAM; + self.createMessage(session, data, self.configuration); + } else if (data is byte) { + self.createMessage(session, string.convert(int.convert(data)), self.configuration); + } else if (data is map) { + self.messageType = MAP; + self.createMessage(session, data, self.configuration); + } else if (data is xml) { + self.createMessage(session, string.convert(data), self.configuration); + } else if (data is json) { + self.createMessage(session, data.toString(), self.configuration); + } else if (data is byte[]) { + self.messageType = BYTES; + self.createMessage(session, data, self.configuration); + } + } + + extern function createMessage(Session session, string | byte[] | map + | io:ReadableByteChannel data, MessageConfiguration config); + + # Acknowledges reception of this message. + # + # + return - If an error occurred while acknowledging the message + public remote extern function acknowledge() returns error?; + + # Returns the size (in bytes) of this message's body. + # + # + return - the size of the message body + public extern function getBodySize() returns int; + + # Add message property. + # + # + key - The name of the property + # + value - The value of the property + # + return - If an error occures while setting the property + public extern function putProperty(string key, string | int | float | boolean | byte | byte[] value); + + # Get a message property. + # + # + key - The name of the property + # + return - The value of the property or nil if not found + public extern function getProperty(string key) returns string | int | float | boolean | byte | byte[] | () | error; + + # The type of the message. + # + # + return - The `MessageType` of the message + public extern function getType() returns MessageType; + + # The message payload. + # + # + return - The message payload or error on failure to retrieve payload or if the type is unsupported. + # A map payload can contain an error if the type is unsupported. + public extern function getPayload() returns string | byte[] | map | +error | (); + + # Call this function to save to a WritableByteChannel if the message is `STREAM` type. + # + # + ch - The byte channel to save to + # + return - will return an `error` if the message is not of type `STREAM` or on failure + public extern function saveToWritableByteChannel(io:WritableByteChannel ch) returns error?; + + # Get the message configuration. + # + # + return - the `MessageConfiguration` of this message + public function getConfig() returns MessageConfiguration { + return self.configuration; + } +}; + +# Represents a message sent and/or received by ActiveMQ Artemis. +# +# + expiration - The expiration time of this message +# + timeStamp - The message timestamp +# + priority - the message priority (between 0 and 9 inclusive) +# + durable - whether the created message is durable or not +# + routingType - `RoutingType` of the message +public type MessageConfiguration record { + int? expiration = (); + int? timeStamp = (); + byte priority = 0; + boolean durable = true; + RoutingType? routingType = (); + !...; +}; + +# ActiveMQ Artemis message types. +public type MessageType TEXT | BYTES | MAP | STREAM | UNSUPPORTED; + +# The text message type. +public const TEXT = "TEXT"; +# The bytes message type. +public const BYTES = "BYTES"; +# The map message type. +public const MAP = "MAP"; +# The stream message type. +public const STREAM = "STREAM"; +# If the message recieved is not of the supported message type in Ballerina it will have the type as UNSUPPORTED. +public const UNSUPPORTED = "UNSUPPORTED"; diff --git a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/producer.bal b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/producer.bal new file mode 100644 index 000000000000..7dcf164a33a9 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/producer.bal @@ -0,0 +1,75 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +# Represents ActiveMQ Artemis Producer. +public type Producer client object { + private Session session; + private boolean anonymousSession = false; + + public function __init(Session | URLConfiguration sesssionOrURLConfig, string addressName, + AddressConfiguration? addressConfig = (), int rate = -1) { + if (sesssionOrURLConfig is Session) { + self.session = sesssionOrURLConfig; + } else { + Connection connection = new("tcp://" + sesssionOrURLConfig.host + ":" + sesssionOrURLConfig.port); + self.session = new(connection, config = { username: sesssionOrURLConfig["username"], + password: sesssionOrURLConfig["password"] }); + self.anonymousSession = true; + } + AddressConfiguration configuration = { + + }; + if (addressConfig is AddressConfiguration) { + configuration = addressConfig; + } + self.createProducer(addressName, configuration, rate); + } + + extern function createProducer(string addressName, AddressConfiguration addressConfig, int rate); + + # Sends a message to the producer's address. + # + # + data - the `Message` or data to send + # + return - `error` on failure + public remote function send(int | float | string | json | xml | byte | byte[] | map | io:ReadableByteChannel | Message data) returns error? { + return self.externSend(data is Message ? data : new(self.session, data)); + } + + # Returns whether the producer is closed or not + # + # + return - `true` if the producer is closed and `false` otherwise + public extern function isClosed() returns boolean; + + # Closes the ClientProducer. If already closed nothing is done. + # + # + return - `error` on failure to close. + public remote extern function close() returns error?; + + extern function externSend(Message data) returns error?; +}; + +# The ActiveMQ Artemis address related configuration. +# If the `autoCreated` is `false` an error will be thrown if the address does not exist. +# If `autocreated` is `true` and the address already exists then the `routingType` configuration would be ignored. +# +# + routingType - the routing type for the address, MULTICAST or ANYCAST +# + autoCreated - whether the address has to be auto created. +public type AddressConfiguration record { + RoutingType routingType = ANYCAST; + boolean autoCreated = true; + !...; +}; diff --git a/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/session.bal b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/session.bal new file mode 100644 index 000000000000..b03a4c87dbf3 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/ballerina/artemis/session.bal @@ -0,0 +1,50 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +# Represents ActiveMQ Artemis Session. +public type Session client object { + + public function __init(Connection con, SessionConfiguration? config = ()) { + SessionConfiguration configuration = {}; + if (config is SessionConfiguration) { + configuration = config; + } + self.createSession(con, configuration); + } + + extern function createSession(Connection con, SessionConfiguration config); + + # Returns true if close was already called + # + # + return - `true` if closed, `false` otherwise. + public extern function isClosed() returns boolean; + + # Closes the connection and release all its resources + # + # + return - `error` if an error occurs closing the connection or nil + public remote extern function close() returns error?; +}; + +# Configurations related to a Artemis Session. +# +# + username - The username +# + password - The password +public type SessionConfiguration record { + string? username = (); + string? password = (); + !...; +}; + diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConstants.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConstants.java new file mode 100644 index 000000000000..1164d896690e --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisConstants.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis; + +import static org.ballerinalang.util.BLangConstants.ORG_NAME_SEPARATOR; + +/** + * Constants related to Artemis connector. + * + * @since 0.995 + */ +public class ArtemisConstants { + + public static final String BALLERINA = "ballerina"; + public static final String ARTEMIS = "artemis"; + public static final String PROTOCOL_PACKAGE_ARTEMIS = BALLERINA + ORG_NAME_SEPARATOR + ARTEMIS; + + // Error related constants + static final String ARTEMIS_ERROR_CODE = "{ballerina/artemis}ArtemisError"; + static final String ARTEMIS_ERROR_RECORD = "ArtemisError"; + static final String ARTEMIS_ERROR_MESSAGE = "message"; + + // Native objects + public static final String ARTEMIS_CONNECTION_POOL = "artemis-connection-pool"; + public static final String ARTEMIS_SESSION_FACTORY = "artemis-session-factory"; + public static final String ARTEMIS_SESSION = "artemis-session"; + public static final String ARTEMIS_MESSAGE = "artemis-message"; + public static final String ARTEMIS_PRODUCER = "artemis-producer"; + public static final String ARTEMIS_CONSUMER = "artemis-consumer"; + + // The object types + public static final String MESSAGE_OBJ = "Message"; + public static final String CONNECTION_OBJ = "Connection"; + public static final String SESSION_OBJ = "Session"; + public static final String PRODUCER_OBJ = "Producer"; + public static final String LISTENER_OBJ = "Listener"; + + // Config related + static final String MULTICAST = "MULTICAST"; + + // Warning suppression + public static final String UNCHECKED = "unchecked"; + + public static final String COUNTDOWN_LATCH = "countdown-latch"; + + // Field names for Connection + public static final String TIME_TO_LIVE = "timeToLive"; + public static final String CALL_TIMEOUT = "callTimeout"; + public static final String CONSUMER_WINDOW_SIZE = "consumerWindowSize"; + public static final String CONSUMER_MAX_RATE = "consumerMaxRate"; + public static final String PRODUCER_WINDOW_SIZE = "producerWindowSize"; + public static final String PRODUCER_MAX_RATE = "producerMaxRate"; + public static final String RETRY_INTERVAL = "retryInterval"; + public static final String RETRY_INTERVAL_MULTIPLIER = "retryIntervalMultiplier"; + public static final String MAX_RETRY_INTERVAL = "maxRetryInterval"; + public static final String RECONNECT_ATTEMPTS = "reconnectAttempts"; + public static final String INITIAL_CONNECT_ATTEMPTS = "initialConnectAttempts"; + + + // Field names for Consumer + public static final String FILTER = "filter"; + public static final String AUTO_ACK = "autoAck"; + public static final String BROWSE_ONLY = "browseOnly"; + public static final String QUEUE_CONFIG = "queueConfig"; + public static final String QUEUE_NAME = "queueName"; + public static final String TEMPORARY = "temporary"; + public static final String MAX_CONSUMERS = "maxConsumers"; + public static final String PURGE_ON_NO_CONSUMERS = "purgeOnNoConsumers"; + public static final String EXCLUSIVE = "exclusive"; + public static final String LAST_VALUE = "lastValue"; + + // Field names for Message + public static final String MESSAGE_TYPE = "messageType"; + public static final String EXPIRATION = "expiration"; + public static final String TIME_STAMP = "timeStamp"; + public static final String PRIORITY = "priority"; + + // Field names for Producer + public static final String RATE = "rate"; + + // Common field names + public static final String DURABLE = "durable"; + public static final String ROUTING_TYPE = "routingType"; + public static final String AUTO_CREATED = "autoCreated"; + + // Field name for Session + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + + private ArtemisConstants() { + + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisUtils.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisUtils.java new file mode 100644 index 000000000000..49fbbca13bc1 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/ArtemisUtils.java @@ -0,0 +1,203 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BLangVMErrors; +import org.ballerinalang.connector.api.BLangConnectorSPIUtil; +import org.ballerinalang.model.types.BTypes; +import org.ballerinalang.model.values.BBoolean; +import org.ballerinalang.model.values.BByte; +import org.ballerinalang.model.values.BError; +import org.ballerinalang.model.values.BFloat; +import org.ballerinalang.model.values.BInteger; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BString; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.model.values.BValueArray; +import org.ballerinalang.util.exceptions.BallerinaException; +import org.slf4j.Logger; + +/** + * Utility class for Artemis. + */ +public class ArtemisUtils { + + /** + * Util function to throw a {@link BallerinaException}. + * + * @param message the error message + * @param context the Ballerina context + * @param exception the exception to be propagated + * @param logger the logger to log errors + */ + public static void throwBallerinaException(String message, Context context, Exception exception, Logger logger) { + logger.error(message, exception); + throw new BallerinaException(message, exception, context); + } + + /** + * Get error struct. + * + * @param context Represent ballerina context + * @param errMsg Error message + * @return Error struct + */ + public static BError getError(Context context, String errMsg) { + BMap artemisErrorRecord = createArtemisErrorRecord(context); + artemisErrorRecord.put(ArtemisConstants.ARTEMIS_ERROR_MESSAGE, new BString(errMsg)); + return BLangVMErrors.createError(context, true, BTypes.typeError, ArtemisConstants.ARTEMIS_ERROR_CODE, + artemisErrorRecord); + } + + private static BMap createArtemisErrorRecord(Context context) { + return BLangConnectorSPIUtil.createBStruct(context, ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS, + ArtemisConstants.ARTEMIS_ERROR_RECORD); + } + + /** + * Get error struct from throwable. + * + * @param context Represent ballerina context + * @param exception Throwable representing the error. + * @return Error struct + */ + public static BError getError(Context context, Exception exception) { + if (exception.getMessage() == null) { + return getError(context, "Artemis connector error"); + } else { + return getError(context, exception.getMessage()); + } + } + + /** + * Gets an int from the {@link BMap} config. + * + * @param config the BMap config + * @param key the key that has an integer value + * @param logger the logger to log errors + * @return the relevant int value from the config + */ + public static int getIntFromConfig(BMap config, String key, Logger logger) { + return getIntFromLong(((BInteger) config.get(key)).intValue(), key, logger); + } + + /** + * Gets an integer from a long value. Handles errors appropriately. + * + * @param longVal the long value. + * @param name the name of the long value: useful for logging the error. + * @param logger the logger to log errors + * @return the int value from the given long value + */ + public static int getIntFromLong(long longVal, String name, Logger logger) { + if (longVal <= 0) { + return -1; + } + try { + return Math.toIntExact(longVal); + } catch (ArithmeticException e) { + logger.warn("The value set for {} needs to be less than {}. The {} value is set to {}", name, + Integer.MAX_VALUE, name, Integer.MAX_VALUE); + return Integer.MAX_VALUE; + } + } + + /** + * Get the relevant BValure for an Object. + * + * @param obj the Object + * @param context the Ballerina context to to be used in case of errors + * @return the relevant BValue for the object or error + */ + public static BValue getBValueFromObj(Object obj, Context context) { + if (obj instanceof String) { + return new BString((String) obj); + } else if (obj instanceof SimpleString) { + return new BString(((SimpleString) obj).toString()); + } else if (obj instanceof Integer) { + return new BInteger((int) obj); + } else if (obj instanceof Long) { + return new BInteger((long) obj); + } else if (obj instanceof Short) { + return new BInteger((short) obj); + } else if (obj instanceof Float) { + return new BFloat((float) obj); + } else if (obj instanceof Double) { + return new BFloat((double) obj); + } else if (obj instanceof Boolean) { + return new BBoolean((boolean) obj); + } else if (obj instanceof Byte) { + return new BByte((byte) obj); + } else if (obj instanceof byte[]) { + return new BValueArray((byte[]) obj); + } else { + return ArtemisUtils.getError(context, "Unsupported type"); + } + } + + /** + * Gets the {@link RoutingType} from the String type. + * + * @param routingType the string routing type + * @return the relevant {@link RoutingType} + */ + public static RoutingType getRoutingTypeFromString(String routingType) { + return ArtemisConstants.MULTICAST.equals(routingType) ? RoutingType.ANYCAST : + RoutingType.MULTICAST; + } + + /** + * Get the natively stored {@link ClientSession} from the BMap. + * + * @param obj the Ballerina object as a BMap + * @return the natively stored {@link ClientSession} + */ + public static ClientSession getClientSessionFromBMap(BMap obj) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap sessionObj = (BMap) obj.get("session"); + return (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION); + } + + /** + * Close the session if it has been created implicitly identified by the anonymousSession field in the Ballerina + * object. + * + * @param obj the Ballerina object as a BMap + * @throws ActiveMQException on session closure failure + */ + public static void closeIfAnonymousSession(BMap obj) throws ActiveMQException { + boolean anonymousSession = ((BBoolean) obj.get("anonymousSession")).booleanValue(); + if (anonymousSession) { + ClientSession session = ArtemisUtils.getClientSessionFromBMap(obj); + if (!session.isClosed()) { + session.close(); + } + } + } + + private ArtemisUtils() { + + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/Close.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/Close.java new file mode 100644 index 000000000000..b6a5bed9e68e --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/Close.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.connection; + +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function to close Artemis connection. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "close", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.CONNECTION_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + isPublic = true +) +public class Close extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap connection = (BMap) context.getRefArgument(0); + ServerLocator connectionPool = (ServerLocator) connection.getNativeData( + ArtemisConstants.ARTEMIS_CONNECTION_POOL); + ClientSessionFactory sessionFactory = + (ClientSessionFactory) connection.getNativeData(ArtemisConstants.ARTEMIS_SESSION_FACTORY); + connectionPool.close(); + sessionFactory.close(); + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/CreateConnection.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/CreateConnection.java new file mode 100644 index 000000000000..a2b7d71f4128 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/CreateConnection.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.connection; + +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BFloat; +import org.ballerinalang.model.values.BInteger; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.Argument; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extern function for Artemis connection creation. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "createConnection", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.CONNECTION_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + args = { + @Argument(name = "url", type = TypeKind.STRING), + @Argument(name = "config", type = TypeKind.RECORD, structType = "ConnectionConfiguration") + } +) +public class CreateConnection extends BlockingNativeCallableUnit { + private static final Logger logger = LoggerFactory.getLogger(CreateConnection.class); + + @Override + public void execute(Context context) { + try { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap connection = (BMap) context.getRefArgument(0); + + String url = context.getStringArgument(0); + + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap configObj = (BMap) context.getRefArgument(1); + long connectionTTL = ((BInteger) configObj.get(ArtemisConstants.TIME_TO_LIVE)).intValue(); + long callTimeout = ((BInteger) configObj.get(ArtemisConstants.CALL_TIMEOUT)).intValue(); + int consumerWindowSize = ArtemisUtils.getIntFromConfig( + configObj, ArtemisConstants.CONSUMER_WINDOW_SIZE, logger); + int consumerMaxRate = ArtemisUtils.getIntFromConfig(configObj, ArtemisConstants.CONSUMER_MAX_RATE, logger); + int producerWindowSize = ArtemisUtils.getIntFromConfig( + configObj, ArtemisConstants.PRODUCER_WINDOW_SIZE, logger); + int producerMaxRate = ArtemisUtils.getIntFromConfig(configObj, ArtemisConstants.PRODUCER_MAX_RATE, logger); + long retryInterval = ((BInteger) configObj.get(ArtemisConstants.RETRY_INTERVAL)).intValue(); + double retryIntervalMultiplier = ((BFloat) configObj.get( + ArtemisConstants.RETRY_INTERVAL_MULTIPLIER)).floatValue(); + long maxRetryInterval = ((BInteger) configObj.get(ArtemisConstants.MAX_RETRY_INTERVAL)).intValue(); + int reconnectAttempts = ArtemisUtils.getIntFromConfig( + configObj, ArtemisConstants.RECONNECT_ATTEMPTS, logger); + int initialConnectAttempts = ArtemisUtils.getIntFromConfig( + configObj, ArtemisConstants.INITIAL_CONNECT_ATTEMPTS, logger); + + ServerLocator connectionPool = ActiveMQClient.createServerLocator(url); + + //Add config values to the serverLocator before creating the sessionFactory + connectionPool.setConnectionTTL(connectionTTL); + connectionPool.setCallTimeout(callTimeout); + connectionPool.setConsumerWindowSize(consumerWindowSize); + connectionPool.setConsumerMaxRate(consumerMaxRate); + connectionPool.setProducerWindowSize(producerWindowSize); + connectionPool.setProducerMaxRate(producerMaxRate); + connectionPool.setRetryInterval(retryInterval); + connectionPool.setRetryIntervalMultiplier(retryIntervalMultiplier); + connectionPool.setMaxRetryInterval(maxRetryInterval); + connectionPool.setReconnectAttempts(reconnectAttempts); + connectionPool.setInitialConnectAttempts(initialConnectAttempts); + connectionPool.setConfirmationWindowSize(1024); + + ClientSessionFactory factory = connectionPool.createSessionFactory(); + + connection.addNativeData(ArtemisConstants.ARTEMIS_CONNECTION_POOL, connectionPool); + connection.addNativeData(ArtemisConstants.ARTEMIS_SESSION_FACTORY, factory); + + } catch (Exception e) { //catching Exception as it is thrown by the createSessionFactory method + ArtemisUtils.throwBallerinaException("Error occurred while starting connection.", context, e, logger); + } + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/IsClosed.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/IsClosed.java new file mode 100644 index 000000000000..92a23646bcbb --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/connection/IsClosed.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.connection; + +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BBoolean; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function to check Artemis connection closure. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "isClosed", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.CONNECTION_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + isPublic = true +) +public class IsClosed extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap connection = (BMap) context.getRefArgument(0); + ClientSessionFactory sessionFactory = + (ClientSessionFactory) connection.getNativeData(ArtemisConstants.ARTEMIS_SESSION_FACTORY); + context.setReturnValues(new BBoolean(sessionFactory.isClosed())); + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/CreateConsumer.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/CreateConsumer.java new file mode 100644 index 000000000000..40b2f60cabfd --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/CreateConsumer.java @@ -0,0 +1,192 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.consumer; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BLangVMErrors; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.bre.bvm.CallableUnitCallback; +import org.ballerinalang.connector.api.Annotation; +import org.ballerinalang.connector.api.BLangConnectorSPIUtil; +import org.ballerinalang.connector.api.BallerinaConnectorException; +import org.ballerinalang.connector.api.Executor; +import org.ballerinalang.connector.api.Resource; +import org.ballerinalang.connector.api.Service; +import org.ballerinalang.connector.api.Struct; +import org.ballerinalang.connector.api.Value; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BError; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; +import org.ballerinalang.services.ErrorHandlerUtils; +import org.ballerinalang.util.codegen.ProgramFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * Extern function to start the Artemis consumer. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "createConsumer", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.LISTENER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS) +) +public class CreateConsumer extends BlockingNativeCallableUnit { + private static final Logger logger = LoggerFactory.getLogger(CreateConsumer.class); + + @Override + public void execute(Context context) { + try { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap consumerObj = (BMap) context.getRefArgument(0); + + ClientSession session = ArtemisUtils.getClientSessionFromBMap(consumerObj); + + Service service = BLangConnectorSPIUtil.getServiceRegistered(context); + + Annotation serviceAnnotation = getServiceConfigAnnotation(service); + Struct annotationValue = serviceAnnotation.getValue(); + boolean autoAck = annotationValue.getBooleanField(ArtemisConstants.AUTO_ACK); + String consumerFilter = getStringFromValueOrNull(annotationValue.getRefField(ArtemisConstants.FILTER)); + boolean browseOnly = annotationValue.getBooleanField(ArtemisConstants.BROWSE_ONLY); + + Map queueConfig = annotationValue.getMapField(ArtemisConstants.QUEUE_CONFIG); + String queueName = queueConfig.get(ArtemisConstants.QUEUE_NAME).getStringValue(); + SimpleString addressName = new SimpleString(getAddressName(queueConfig, queueName)); + boolean autoCreated = queueConfig.get(ArtemisConstants.AUTO_CREATED).getBooleanValue(); + String routingType = queueConfig.get(ArtemisConstants.ROUTING_TYPE).getStringValue(); + boolean temporary = queueConfig.get(ArtemisConstants.TEMPORARY).getBooleanValue(); + String queueFilter = getStringFromValueOrNull(queueConfig.get(ArtemisConstants.FILTER)); + boolean durable = queueConfig.get(ArtemisConstants.DURABLE).getBooleanValue(); + int maxConsumers = ArtemisUtils.getIntFromLong(queueConfig. + get(ArtemisConstants.MAX_CONSUMERS).getIntValue(), ArtemisConstants.MAX_CONSUMERS, logger); + boolean purgeOnNoConsumers = queueConfig.get(ArtemisConstants.PURGE_ON_NO_CONSUMERS).getBooleanValue(); + boolean exclusive = queueConfig.get(ArtemisConstants.EXCLUSIVE).getBooleanValue(); + boolean lastValue = queueConfig.get(ArtemisConstants.LAST_VALUE).getBooleanValue(); + + if (autoCreated) { + SimpleString simpleQueueName = new SimpleString(queueName); + SimpleString simpleQueueFilter = queueFilter != null ? new SimpleString(queueFilter) : null; + ClientSession.QueueQuery queueQuery = session.queueQuery(simpleQueueName); + if (!queueQuery.isExists()) { + if (!temporary) { + session.createQueue(addressName, ArtemisUtils.getRoutingTypeFromString(routingType), + simpleQueueName, simpleQueueFilter, durable, true, maxConsumers, + purgeOnNoConsumers, exclusive, lastValue); + } else { + session.createTemporaryQueue(addressName, ArtemisUtils.getRoutingTypeFromString(routingType), + simpleQueueName, simpleQueueFilter, maxConsumers, + purgeOnNoConsumers, exclusive, lastValue); + } + } else { + logger.warn( + "Queue with the name {} already exists with routingType: {}, durable: {}, temporary: {}, " + + "filter: {}, purgeOnNoConsumers: {}, exclusive: {}, lastValue: {}", + queueName, queueQuery.getRoutingType(), queueQuery.isDurable(), queueQuery.isTemporary(), + queueQuery.getFilterString(), queueQuery.isPurgeOnNoConsumers(), queueQuery.isExclusive(), + queueQuery.isLastValue()); + } + } + + Resource onMessageResource = service.getResources()[0]; + + ClientConsumer consumer = session.createConsumer(queueName, consumerFilter, browseOnly); + consumerObj.addNativeData(ArtemisConstants.ARTEMIS_CONSUMER, consumer); + if (onMessageResource != null) { + consumer.setMessageHandler( + clientMessage -> Executor + .submit(onMessageResource, new ResponseCallback(clientMessage, autoAck), null, null, + getSignatureParameters(onMessageResource, clientMessage))); + } + } catch (ActiveMQException e) { + context.setReturnValues(ArtemisUtils.getError(context, e)); + } + } + + private String getAddressName(Map queueConfig, String queueName) { + Value addressName = queueConfig.get("addressName"); + return addressName != null ? addressName.getStringValue() : queueName; + } + + private String getStringFromValueOrNull(Value filterVal) { + return filterVal != null ? filterVal.getStringValue() : null; + } + + private Annotation getServiceConfigAnnotation(Service service) { + List annotationList = service + .getAnnotationList(ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS, + "ServiceConfig"); + + if (annotationList == null) { + return null; + } + return annotationList.isEmpty() ? null : annotationList.get(0); + } + + private static class ResponseCallback implements CallableUnitCallback { + private ClientMessage message; + private boolean autoAck; + + ResponseCallback(ClientMessage message, boolean autoAck) { + this.message = message; + this.autoAck = autoAck; + } + + @Override + public void notifySuccess() { + if (autoAck) { + try { + message.acknowledge(); + } catch (ActiveMQException e) { + throw new BallerinaConnectorException("Failure during acknowledging the message", e); + } + } + } + + @Override + public void notifyFailure(BError error) { + ErrorHandlerUtils.printError("error: " + BLangVMErrors.getPrintableStackTrace(error)); + } + } + + private BValue getSignatureParameters(Resource onMessageResource, ClientMessage clientMessage) { + ProgramFile programFile = onMessageResource.getResourceInfo().getPackageInfo().getProgramFile(); + BMap messageObj = BLangConnectorSPIUtil.createBStruct( + programFile, ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS, ArtemisConstants.MESSAGE_OBJ); + messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, clientMessage); + return messageObj; + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Start.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Start.java new file mode 100644 index 000000000000..21b5edfd3d31 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Start.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.consumer; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +import java.util.concurrent.CountDownLatch; + +/** + * Extern function to start the Artemis consumer. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "start", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.LISTENER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS) +) +public class Start extends BlockingNativeCallableUnit { + private CountDownLatch countDownLatch = new CountDownLatch(1); + + @Override + public void execute(Context context) { + try { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap listenerObj = (BMap) context.getRefArgument(0); + listenerObj.addNativeData(ArtemisConstants.COUNTDOWN_LATCH, countDownLatch); + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap sessionObj = (BMap) listenerObj.get("session"); + ClientSession session = (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION); + session.start(); + // It is essential to keep a non-daemon thread running in order to avoid the java program or the + // Ballerina service from exiting + new Thread(() -> { + try { + countDownLatch.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + }).start(); + } catch (ActiveMQException e) { + context.setReturnValues(ArtemisUtils.getError(context, "Error on starting the session")); + } + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Stop.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Stop.java new file mode 100644 index 000000000000..025cfcdc0891 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/consumer/Stop.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.consumer; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +import java.util.concurrent.CountDownLatch; + +/** + * Extern function to stop the Artemis consumer. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "stop", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.LISTENER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS) +) +public class Stop extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + try { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap listenerObj = (BMap) context.getRefArgument(0); + ClientConsumer consumer = (ClientConsumer) listenerObj.getNativeData(ArtemisConstants.ARTEMIS_CONSUMER); + consumer.close(); + ArtemisUtils.closeIfAnonymousSession(listenerObj); + CountDownLatch countDownLatch = + (CountDownLatch) listenerObj.getNativeData(ArtemisConstants.COUNTDOWN_LATCH); + if (countDownLatch != null) { + countDownLatch.countDown(); + } + } catch (ActiveMQException e) { + context.setReturnValues(ArtemisUtils.getError(context, "Error when closing the consumer")); + } + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/Acknowledge.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/Acknowledge.java new file mode 100644 index 000000000000..751e8ca15220 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/Acknowledge.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.message; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function to acknowledge an Artemis message. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "acknowledge", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + isPublic = true +) +public class Acknowledge extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap messageObj = (BMap) context.getRefArgument(0); + ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE); + try { + message.acknowledge(); + } catch (ActiveMQException e) { + context.setReturnValues(ArtemisUtils.getError(context, "Error on acknowledging the message")); + } + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/CreateMessage.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/CreateMessage.java new file mode 100644 index 000000000000..1f6ed7f85f00 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/CreateMessage.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.message; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.reader.BytesMessageUtil; +import org.apache.activemq.artemis.reader.MapMessageUtil; +import org.apache.activemq.artemis.reader.TextMessageUtil; +import org.apache.activemq.artemis.utils.collections.TypedProperties; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BBoolean; +import org.ballerinalang.model.values.BByte; +import org.ballerinalang.model.values.BFloat; +import org.ballerinalang.model.values.BInteger; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BString; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.model.values.BValueArray; +import org.ballerinalang.natives.annotations.Argument; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; +import org.ballerinalang.stdlib.io.channels.base.Channel; +import org.ballerinalang.stdlib.io.utils.IOConstants; + +import java.util.Map; + +/** + * Extern function to create an ActiveMQ Artemis message. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "createMessage", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + args = { + @Argument(name = "session", type = TypeKind.OBJECT, structType = ArtemisConstants.SESSION_OBJ), + @Argument(name = "data", type = TypeKind.UNION), + @Argument(name = "config", type = TypeKind.RECORD, structType = "ConnectionConfiguration") + } +) +public class CreateMessage extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap messageObj = (BMap) context.getRefArgument(0); + String type = messageObj.get(ArtemisConstants.MESSAGE_TYPE).stringValue(); + + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap sessionObj = (BMap) context.getRefArgument(1); + BValue dataVal = context.getRefArgument(2); + + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap configObj = (BMap) context.getRefArgument(3); + long expiration = getIntFromIntOrNil(configObj.get(ArtemisConstants.EXPIRATION), 0); + long timeStamp = getIntFromIntOrNil(configObj.get(ArtemisConstants.TIME_STAMP), System.currentTimeMillis()); + byte priority = (byte) ((BByte) configObj.get(ArtemisConstants.PRIORITY)).byteValue(); + boolean durable = ((BBoolean) configObj.get(ArtemisConstants.DURABLE)).booleanValue(); + BValue routingType = configObj.get(ArtemisConstants.ROUTING_TYPE); + + ClientSession session = (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION); + + byte messageType = getMessageType(type); + ClientMessage message = session.createMessage(messageType, durable, expiration, timeStamp, priority); + if (routingType instanceof BString) { + message.setRoutingType(ArtemisUtils.getRoutingTypeFromString(routingType.stringValue())); + } + + if (messageType == Message.TEXT_TYPE) { + TextMessageUtil.writeBodyText(message.getBodyBuffer(), new SimpleString(dataVal.stringValue())); + } else if (messageType == Message.BYTES_TYPE) { + BytesMessageUtil.bytesWriteBytes(message.getBodyBuffer(), ((BValueArray) dataVal).getBytes()); + } else if (messageType == Message.MAP_TYPE) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + Map mapObj = ((BMap) dataVal).getMap(); + TypedProperties map = new TypedProperties(); + for (Map.Entry entry : mapObj.entrySet()) { + SimpleString key = new SimpleString(entry.getKey()); + BValue value = entry.getValue(); + if (value instanceof BString) { + map.putSimpleStringProperty(key, new SimpleString(value.stringValue())); + } else if (value instanceof BInteger) { + map.putLongProperty(key, ((BInteger) value).intValue()); + } else if (value instanceof BFloat) { + map.putDoubleProperty(key, ((BFloat) value).floatValue()); + } else if (value instanceof BByte) { + map.putByteProperty(key, (byte) ((BByte) value).byteValue()); + } else if (value instanceof BBoolean) { + map.putBooleanProperty(key, ((BBoolean) value).booleanValue()); + } else if (value instanceof BValueArray) { + map.putBytesProperty(key, ((BValueArray) value).getBytes()); + } + MapMessageUtil.writeBodyMap(message.getBodyBuffer(), map); + } + } else if (messageType == Message.STREAM_TYPE) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap streamObj = (BMap) dataVal; + Channel channel = (Channel) streamObj.getNativeData(IOConstants.BYTE_CHANNEL_NAME); + message.setBodyInputStream(channel.getInputStream()); + } + + messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, message); + } + + private byte getMessageType(String type) { + switch (type) { + case "TEXT": + return Message.TEXT_TYPE; + case "BYTES": + return Message.BYTES_TYPE; + case "MAP": + return Message.MAP_TYPE; + case "STREAM": + return Message.STREAM_TYPE; + default: + return Message.DEFAULT_TYPE; + } + } + + private long getIntFromIntOrNil(BValue value, long defaultVal) { + if (value instanceof BInteger) { + return ((BInteger) value).intValue(); + } + return defaultVal; + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetBodySize.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetBodySize.java new file mode 100644 index 000000000000..a4fc0da430b2 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetBodySize.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.message; + +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BInteger; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function to get the body size of an artemis message. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "getBodySize", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + isPublic = true +) +public class GetBodySize extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap messageObj = (BMap) context.getRefArgument(0); + ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE); + context.setReturnValues(new BInteger(message.getBodySize())); + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetPayload.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetPayload.java new file mode 100644 index 000000000000..eb3d5f5ebd6e --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetPayload.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.message; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.reader.BytesMessageUtil; +import org.apache.activemq.artemis.reader.MapMessageUtil; +import org.apache.activemq.artemis.reader.TextMessageUtil; +import org.apache.activemq.artemis.utils.collections.TypedProperties; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.types.BMapType; +import org.ballerinalang.model.types.BTypes; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BString; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.model.values.BValueArray; +import org.ballerinalang.natives.annotations.Argument; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +import java.util.Map; + +/** + * Extern function to get the payload from a message. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "getPayload", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + args = { + @Argument(name = "key", type = TypeKind.STRING) + } +) +public class GetPayload extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap messageObj = (BMap) context.getRefArgument(0); + ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE); + byte messageType = message.getType(); + if (messageType == Message.TEXT_TYPE) { + ActiveMQBuffer msgBuffer = message.getBodyBuffer(); + context.setReturnValues(new BString(TextMessageUtil.readBodyText(msgBuffer).toString())); + } else if (messageType == Message.BYTES_TYPE) { + ActiveMQBuffer msgBuffer = message.getBodyBuffer(); + byte[] bytes = new byte[msgBuffer.readableBytes()]; + BytesMessageUtil.bytesReadBytes(msgBuffer, bytes); + context.setReturnValues(new BValueArray(bytes)); + } else if (messageType == Message.MAP_TYPE) { + ActiveMQBuffer msgBuffer = message.getBodyBuffer(); + TypedProperties properties = MapMessageUtil.readBodyMap(msgBuffer); + Map map = properties.getMap(); + BMap mapObj = getMapObj(map.entrySet().iterator().next().getValue()); + if (mapObj != null) { + for (Map.Entry entry : map.entrySet()) { + mapObj.put(entry.getKey(), ArtemisUtils.getBValueFromObj(entry.getValue(), context)); + } + context.setReturnValues(mapObj); + } else { + context.setReturnValues(ArtemisUtils.getError(context, "Unsupported type")); + } + } else if (messageType == Message.STREAM_TYPE) { + context.setReturnValues( + ArtemisUtils.getError(context, "Use the saveToFile function for STREAM type message")); + } else { + context.setReturnValues(ArtemisUtils.getError(context, "Unsupported type")); + } + } + + private BMap getMapObj(Object val) { + if (val instanceof String) { + return new BMap<>(new BMapType(BTypes.typeString)); + } else if (val instanceof Long || val instanceof Integer || val instanceof Short) { + return new BMap<>(new BMapType(BTypes.typeInt)); + } else if (val instanceof Float || val instanceof Double) { + return new BMap<>(new BMapType(BTypes.typeFloat)); + } else if (val instanceof Byte) { + return new BMap<>(new BMapType(BTypes.typeByte)); + } else if (val instanceof byte[]) { + return new BMap<>(new BMapType(BTypes.fromString("byte[]"))); + } else if (val instanceof Boolean) { + return new BMap<>(new BMapType(BTypes.typeBoolean)); + } + return null; + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetProperty.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetProperty.java new file mode 100644 index 000000000000..6d09d269126e --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetProperty.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.message; + +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.Argument; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function for getting a message property. + * + * @since 0.995.0 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "getProperty", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + args = { + @Argument(name = "key", type = TypeKind.STRING) + } +) +public class GetProperty extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap messageObj = (BMap) context.getRefArgument(0); + ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE); + + String key = context.getStringArgument(0); + Object property = message.getObjectProperty(key); + if (property != null) { + context.setReturnValues(ArtemisUtils.getBValueFromObj(property, context)); + } else { + context.setReturnValues(); + } + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetType.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetType.java new file mode 100644 index 000000000000..998e9384901d --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/GetType.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.message; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BString; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.Argument; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function for getting the type of a message. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "getType", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + args = { + @Argument(name = "key", type = TypeKind.STRING) + } +) +public class GetType extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap messageObj = (BMap) context.getRefArgument(0); + ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE); + byte messageType = message.getType(); + BString type; + switch (messageType) { + case Message.TEXT_TYPE: + type = new BString("TEXT"); + break; + case Message.BYTES_TYPE: + type = new BString("BYTES"); + break; + case Message.MAP_TYPE: + type = new BString("MAP"); + break; + case Message.STREAM_TYPE: + type = new BString("STREAM"); + break; + default: + type = new BString("UNSUPPORTED"); + + } + context.setReturnValues(type); + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/PutProperty.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/PutProperty.java new file mode 100644 index 000000000000..d41f0d95f5b1 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/PutProperty.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.message; + +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BBoolean; +import org.ballerinalang.model.values.BByte; +import org.ballerinalang.model.values.BFloat; +import org.ballerinalang.model.values.BInteger; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BString; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.model.values.BValueArray; +import org.ballerinalang.natives.annotations.Argument; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function for setting a property to a Artemis message. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "putProperty", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + args = { + @Argument(name = "key", type = TypeKind.STRING), + @Argument(name = "value", type = TypeKind.UNION) + } +) +public class PutProperty extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap messageObj = (BMap) context.getRefArgument(0); + ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE); + + String key = context.getStringArgument(0); + BValue valObj = context.getRefArgument(1); + if (valObj instanceof BString) { + message.putStringProperty(key, valObj.stringValue()); + } else if (valObj instanceof BInteger) { + message.putLongProperty(key, ((BInteger) valObj).intValue()); + } else if (valObj instanceof BFloat) { + message.putDoubleProperty(key, ((BFloat) valObj).floatValue()); + } else if (valObj instanceof BBoolean) { + message.putBooleanProperty(key, ((BBoolean) valObj).booleanValue()); + } else if (valObj instanceof BByte) { + message.putByteProperty(key, (byte) ((BByte) valObj).byteValue()); + } else if (valObj instanceof BValueArray) { + message.putBytesProperty(key, ((BValueArray) valObj).getBytes()); + }//else is not needed because these are the only values supported by the Ballerina the method + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/SaveToWritableByteChannel.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/SaveToWritableByteChannel.java new file mode 100644 index 000000000000..e711922208eb --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/message/SaveToWritableByteChannel.java @@ -0,0 +1,78 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.message; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.Argument; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; +import org.ballerinalang.stdlib.io.channels.base.Channel; +import org.ballerinalang.stdlib.io.utils.IOConstants; +import org.ballerinalang.util.exceptions.BallerinaException; + +import java.nio.channels.Channels; + +/** + * Extern function to save a stream message to a byte channel. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "saveToWritableByteChannel", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + args = { + @Argument(name = "ch", type = TypeKind.OBJECT, structType = "WritableByteChannel") + } +) +public class SaveToWritableByteChannel extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap messageObj = (BMap) context.getRefArgument(0); + ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE); + if (message.getType() == Message.STREAM_TYPE) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap byteChannelObj = (BMap) context.getRefArgument(1); + Channel channel = (Channel) byteChannelObj.getNativeData(IOConstants.BYTE_CHANNEL_NAME); + try { + message.saveToOutputStream(Channels.newOutputStream(channel.getByteChannel())); + } catch (ActiveMQException e) { + context.setReturnValues(ArtemisUtils.getError(context, new BallerinaException( + "Error while writing to WritableByteChannel"))); + } + + } else { + context.setReturnValues(ArtemisUtils.getError(context, "Unsupported type")); + } + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/Close.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/Close.java new file mode 100644 index 000000000000..713e73da20b4 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/Close.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.producer; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function to close Artemis producer. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "close", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.PRODUCER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + isPublic = true +) +public class Close extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap producerObj = (BMap) context.getRefArgument(0); + ClientProducer producer = (ClientProducer) producerObj.getNativeData(ArtemisConstants.ARTEMIS_PRODUCER); + try { + producer.close(); + ArtemisUtils.closeIfAnonymousSession(producerObj); + } catch (ActiveMQException e) { + context.setReturnValues(ArtemisUtils.getError(context, "Error when closing the producer")); + } + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/CreateProducer.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/CreateProducer.java new file mode 100644 index 000000000000..175711ff610f --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/CreateProducer.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.producer; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.CallableUnitCallback; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.NativeCallableUnit; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BBoolean; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.Argument; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extern function for creating an ActiveMQ Artemis producer. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "createProducer", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.PRODUCER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + args = { + @Argument(name = "addressName", type = TypeKind.STRING), + @Argument(name = "config", type = TypeKind.RECORD, structType = "AddressConfiguration"), + @Argument(name = "rate", type = TypeKind.INT) + } +) +public class CreateProducer implements NativeCallableUnit { + + private static final Logger logger = LoggerFactory.getLogger(CreateProducer.class); + + @Override + public void execute(Context context, CallableUnitCallback callableUnitCallback) { + try { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap producerObj = (BMap) context.getRefArgument(0); + + SimpleString addressName = new SimpleString(context.getStringArgument(0)); + + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap configObj = (BMap) context.getRefArgument(1); + + String routingType = configObj.get(ArtemisConstants.ROUTING_TYPE).stringValue(); + boolean autoCreated = ((BBoolean) configObj.get(ArtemisConstants.AUTO_CREATED)).booleanValue(); + + int rate = ArtemisUtils.getIntFromLong(context.getIntArgument(0), ArtemisConstants.RATE, logger); + ClientSession session = ArtemisUtils.getClientSessionFromBMap(producerObj); + + if (autoCreated) { + ClientSession.AddressQuery addressQuery = session.addressQuery(addressName); + if (!addressQuery.isExists()) { + session.createAddress(addressName, ArtemisUtils.getRoutingTypeFromString(routingType), true); + } else { + logger.warn("Address with the name {} already exists. ", addressName); + } + } + ClientProducer producer = session.createProducer(addressName, rate); + producerObj.addNativeData(ArtemisConstants.ARTEMIS_PRODUCER, producer); + + } catch (ActiveMQException ex) { + ArtemisUtils.throwBallerinaException("Error occurred while creating the producer.", context, ex, logger); + } + } + + @Override + public boolean isBlocking() { + return true; + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/IsClosed.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/IsClosed.java new file mode 100644 index 000000000000..a60dcea7fe0c --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/IsClosed.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.producer; + +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BBoolean; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function to check Artemis producer closure. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "isClosed", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.PRODUCER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + isPublic = true +) +public class IsClosed extends BlockingNativeCallableUnit { + + @Override + public void execute(Context context) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap producerObj = (BMap) context.getRefArgument(0); + ClientProducer producer = + (ClientProducer) producerObj.getNativeData(ArtemisConstants.ARTEMIS_PRODUCER); + context.setReturnValues(new BBoolean(producer.isClosed())); + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/Send.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/Send.java new file mode 100644 index 000000000000..c0b56ae7463e --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/producer/Send.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.producer; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.CallableUnitCallback; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.NativeCallableUnit; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.Argument; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function of the producer to send a message. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "externSend", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.PRODUCER_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + args = { + @Argument(name = "data", type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ) + } +) +public class Send implements NativeCallableUnit { + + @Override + public void execute(Context context, CallableUnitCallback callableUnitCallback) { + try { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap producerObj = (BMap) context.getRefArgument(0); + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap data = (BMap) context.getRefArgument(1); + ClientProducer producer = (ClientProducer) producerObj.getNativeData(ArtemisConstants.ARTEMIS_PRODUCER); + ClientMessage message = (ClientMessage) data.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE); + producer.send(message, message1 -> callableUnitCallback.notifySuccess()); + } catch (ActiveMQException e) { + context.setReturnValues(ArtemisUtils.getError(context, e)); + callableUnitCallback.notifySuccess(); + } + } + + @Override + public boolean isBlocking() { + return false; + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/Close.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/Close.java new file mode 100644 index 000000000000..4f0378ebcfc6 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/Close.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.session; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.CallableUnitCallback; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.NativeCallableUnit; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function to close Artemis session. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "close", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.SESSION_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + isPublic = true +) +public class Close implements NativeCallableUnit { + + @Override + public void execute(Context context, CallableUnitCallback callableUnitCallback) { + try { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap sessionObj = (BMap) context.getRefArgument(0); + ClientSession session = (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION); + session.close(); + } catch (ActiveMQException e) { + context.setReturnValues(ArtemisUtils.getError(context, "Error when closing the Session")); + } + } + + @Override + public boolean isBlocking() { + return true; + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/CreateSession.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/CreateSession.java new file mode 100644 index 000000000000..51856fa49430 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/CreateSession.java @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.session; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.CallableUnitCallback; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.messaging.artemis.ArtemisUtils; +import org.ballerinalang.model.NativeCallableUnit; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BString; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.Argument; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extern function for Artemis session creation. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "createSession", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.SESSION_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + args = { + @Argument(name = "con", type = TypeKind.OBJECT, structType = ArtemisConstants.CONNECTION_OBJ), + @Argument(name = "config", type = TypeKind.RECORD, structType = "SessionConfiguration") + } +) +public class CreateSession implements NativeCallableUnit { + private static final Logger logger = LoggerFactory.getLogger(CreateSession.class); + + @Override + public void execute(Context context, CallableUnitCallback callableUnitCallback) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap sessionObj = (BMap) context.getRefArgument(0); + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap connection = (BMap) context.getRefArgument(1); + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap config = (BMap) context.getRefArgument(2); + + ServerLocator serverLocator = (ServerLocator) connection.getNativeData( + ArtemisConstants.ARTEMIS_CONNECTION_POOL); + ClientSessionFactory sessionFactory = + (ClientSessionFactory) connection.getNativeData(ArtemisConstants.ARTEMIS_SESSION_FACTORY); + try { + String username = null; + String password = null; + BValue userValue = config.get(ArtemisConstants.USERNAME); + if (userValue instanceof BString) { + username = userValue.stringValue(); + } + BValue passValue = config.get(ArtemisConstants.PASSWORD); + if (passValue instanceof BString) { + password = passValue.stringValue(); + } + ClientSession session = sessionFactory.createSession(username, password, false, true, true, + serverLocator.isPreAcknowledge(), + serverLocator.getAckBatchSize()); + sessionObj.addNativeData(ArtemisConstants.ARTEMIS_SESSION, session); + } catch (ActiveMQException e) { + ArtemisUtils.throwBallerinaException("Error occurred while starting session", context, e, logger); + } + } + + @Override + public boolean isBlocking() { + return true; + } +} diff --git a/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/IsClosed.java b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/IsClosed.java new file mode 100644 index 000000000000..c77f958725d7 --- /dev/null +++ b/stdlib/messaging/activemq-artemis/src/main/java/org/ballerinalang/messaging/artemis/externimpl/session/IsClosed.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.ballerinalang.messaging.artemis.externimpl.session; + +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.ballerinalang.bre.Context; +import org.ballerinalang.bre.bvm.CallableUnitCallback; +import org.ballerinalang.messaging.artemis.ArtemisConstants; +import org.ballerinalang.model.NativeCallableUnit; +import org.ballerinalang.model.types.TypeKind; +import org.ballerinalang.model.values.BBoolean; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.natives.annotations.BallerinaFunction; +import org.ballerinalang.natives.annotations.Receiver; + +/** + * Extern function to check Artemis session closure. + * + * @since 0.995 + */ + +@BallerinaFunction( + orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS, + functionName = "isClosed", + receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.SESSION_OBJ, + structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS), + isPublic = true +) +public class IsClosed implements NativeCallableUnit { + + @Override + public void execute(Context context, CallableUnitCallback callableUnitCallback) { + @SuppressWarnings(ArtemisConstants.UNCHECKED) + BMap sessionObj = (BMap) context.getRefArgument(0); + ClientSession session = + (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION); + context.setReturnValues(new BBoolean(session.isClosed())); + } + + @Override + public boolean isBlocking() { + return true; + } +} diff --git a/tests/ballerina-integration-test/build.gradle b/tests/ballerina-integration-test/build.gradle index 11c9c4c6f1dc..3cf3bb23638f 100644 --- a/tests/ballerina-integration-test/build.gradle +++ b/tests/ballerina-integration-test/build.gradle @@ -1,6 +1,7 @@ apply from: "$rootDir/gradle/javaProject.gradle" dependencies { + testCompile 'org.apache.activemq:artemis-server:2.6.3' implementation project(':ballerina-integration-test-utils') implementation project(':ballerina-config') implementation project(':ballerina-core') @@ -21,14 +22,9 @@ dependencies { implementation project(':ballerina-grpc') implementation project(':protobuf-ballerina') implementation 'com.google.protobuf:protobuf-java:3.5.1' - implementation project(':ballerina-http') - implementation project(':ballerina-h2') - implementation project(':ballerina-mysql') - implementation project(':ballerina-sql') - implementation project(':ballerina-transactions') + implementation project(':ballerina-activemq-artemis') implementation project(':ballerina-websub') implementation project(':ballerina-jms') - implementation project(':ballerina-grpc') implementation project(':ballerina-socket') implementation project(':ballerina-observability') implementation project(':observability-test-utils') diff --git a/tests/ballerina-integration-test/pom.xml b/tests/ballerina-integration-test/pom.xml index 0045a81dd62a..edb554214c8c 100644 --- a/tests/ballerina-integration-test/pom.xml +++ b/tests/ballerina-integration-test/pom.xml @@ -13,6 +13,12 @@ Ballerina - Integration Test + + org.apache.activemq + artemis-server + ${artemis.version} + test + org.ballerinalang ballerina-test-utils @@ -58,6 +64,10 @@ org.ballerinalang ballerina-http + + org.ballerinalang + ballerina-activemq-artemis + org.ballerinalang ballerina-transactions @@ -153,6 +163,11 @@ observability-test-utils test + + org.apache.activemq + artemis-core-client + test + @@ -232,6 +247,12 @@ apacheds-all test + + org.ballerinalang + ballerina-activemq-artemis + zip + ballerina-binary-repo + diff --git a/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/ArtemisTestCommons.java b/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/ArtemisTestCommons.java new file mode 100644 index 000000000000..c95470929d92 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/ArtemisTestCommons.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinalang.test.messaging.artemis; + +import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; +import org.ballerinalang.test.BaseTest; +import org.ballerinalang.test.context.BServerInstance; +import org.ballerinalang.test.context.BallerinaTestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterGroups; +import org.testng.annotations.BeforeGroups; + +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * Includes common functionality for Artemis test cases. + */ +public class ArtemisTestCommons extends BaseTest { + private static final Logger log = LoggerFactory.getLogger(ArtemisTestCommons.class); + + protected static final int TIMEOUT_IN_SECS = 10; + + private EmbeddedActiveMQ embeddedBroker; + + protected static BServerInstance serverInstance; + + @BeforeGroups(value = "artemis-test", alwaysRun = true) + public void start() throws BallerinaTestException { + Path path = Paths.get("src", "test", "resources", "messaging", "artemis"); + + // Start broker + embeddedBroker = new EmbeddedActiveMQ(); + String brokerXML = path.resolve("configfiles").resolve("broker.xml").toUri().toString(); + embeddedBroker.setConfigResourcePath(brokerXML); + try { + embeddedBroker.start(); + } catch (Exception ex) { + log.error("Cannot start ActiveMQ Artemis broker " + ex.getMessage(), ex); + } + + // Start Ballerina server + serverInstance = new BServerInstance(balServer); + serverInstance.startServer(path.toAbsolutePath().toString(), "consumers"); + } + + @AfterGroups(value = "artemis-test", alwaysRun = true) + public void stop() throws Exception { + serverInstance.removeAllLeechers(); + serverInstance.shutdownServer(); + } + +} diff --git a/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/MessagePayloadTest.java b/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/MessagePayloadTest.java new file mode 100644 index 000000000000..2713c88833c4 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/MessagePayloadTest.java @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinalang.test.messaging.artemis; + +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.ballerinalang.launcher.util.BCompileUtil; +import org.ballerinalang.launcher.util.BRunUtil; +import org.ballerinalang.launcher.util.CompileResult; +import org.ballerinalang.model.values.BMap; +import org.ballerinalang.model.values.BValue; +import org.ballerinalang.model.values.BValueArray; +import org.ballerinalang.test.context.BallerinaTestException; +import org.ballerinalang.test.context.LogLeecher; +import org.ballerinalang.test.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * Includes tests for different payload types for ANYCAST and MULTICAST queues. + */ +@Test(groups = {"artemis-test"}) +public class MessagePayloadTest extends ArtemisTestCommons { + private CompileResult anyCastResult; + private CompileResult multiCastResult; + private static final String MULTICAST_MSG = " multicast "; + private Path sourcePath; + + @BeforeClass + public void setup() throws URISyntaxException { + TestUtils.prepareBalo(this); + sourcePath = Paths.get("src", "test", "resources", "messaging", "artemis", "producers"); + anyCastResult = BCompileUtil.compile(sourcePath.resolve("anycast_message.bal").toAbsolutePath().toString()); + multiCastResult = BCompileUtil.compile(sourcePath.resolve("multicast_message.bal").toAbsolutePath().toString()); + } + + @Test(description = "Tests the sending of a string message to a queue") + public void testSendString() { + String errorLog = "string message Hello World"; + String functionName = "testSendString"; + testSend(anyCastResult, errorLog, functionName); + testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName); + + } + + @Test(description = "Tests the sending of a byte[] message to a queue") + public void testSendByteArray() { + String errorLog = "byte[] message [1, 2, 2, 3, 3, 2]"; + String functionName = "testSendByteArray"; + testSend(anyCastResult, errorLog, functionName); + testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName); + } + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapString() { + String errorLog = "map message {\"name\":\"Riyafa\", \"hello\":\"world\"}"; + String functionName = "testSendMapString"; + testSend(anyCastResult, errorLog, functionName); + testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName); + } + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapInt() { + String errorLog = "map message {\"num\":1, \"num2\":2}"; + String functionName = "testSendMapInt"; + testSend(anyCastResult, errorLog, functionName); + testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName); + } + + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapFloat() { + String errorLog = "map message {\"numf1\":1.1, \"numf2\":1.2}"; + String functionName = "testSendMapFloat"; + testSend(anyCastResult, errorLog, functionName); + testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName); + } + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapByte() { + String errorLog = "map message {\"byte1\":1, \"byte2\":7}"; + String functionName = "testSendMapByte"; + testSend(anyCastResult, errorLog, functionName); + testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName); + } + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapBoolean() { + String errorLog = "map message {\"first\":true, \"second\":false}"; + String functionName = "testSendMapBoolean"; + testSend(anyCastResult, errorLog, functionName); + testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName); + } + + @Test(description = "Tests the sending of a map message to a queue") + public void testSendMapByteArray() { + String errorLog = "map message {\"array2\":[5], \"array1\":[1, 2, 3]}"; + String functionName = "testSendMapByteArray"; + testSend(anyCastResult, errorLog, functionName); + testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName); + } + + @Test(description = "Tests the sending of different types of properties with a message to a queue") + public void testSendProperties() { + String errorLog = + "string property Hello again!, int property 123, float property 1.111, boolean property true, byte " + + "property 1, byteArray property [1, 1, 0, 0] message Properties' test"; + String functionName = "testSendProperties"; + testSend(anyCastResult, errorLog, functionName); + testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName); + } + + @Test(description = "Tests the closing of the producer, session and connection") + public void testClose() { + String functionName = "testClose"; + CompileResult result = BCompileUtil.compile(sourcePath.resolve("close.bal").toAbsolutePath().toString()); + BValue[] val = ((BValueArray) BRunUtil.invokeFunction(result, functionName)[0]).getValues(); + Assert.assertTrue(((ClientProducer) ((BMap) val[0]).getNativeData("artemis-producer")).isClosed()); + Assert.assertTrue(((ClientSession) ((BMap) val[1]).getNativeData("artemis-session")).isClosed()); + Assert.assertTrue(((ServerLocator) ((BMap) val[2]).getNativeData("artemis-connection-pool")).isClosed()); + } + + private void testSend(CompileResult result, String expectedErrorLog, String functionName) { + LogLeecher logLeecher = new LogLeecher(expectedErrorLog); + serverInstance.addLogLeecher(logLeecher); + BRunUtil.invoke(result, functionName); + try { + logLeecher.waitForText(TIMEOUT_IN_SECS * 1000); + } catch (BallerinaTestException e) { + Assert.fail(); + } + } +} diff --git a/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/SimpleConsumerTest.java b/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/SimpleConsumerTest.java new file mode 100644 index 000000000000..2c6864b115db --- /dev/null +++ b/tests/ballerina-integration-test/src/test/java/org/ballerinalang/test/messaging/artemis/SimpleConsumerTest.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.ballerinalang.test.messaging.artemis; + +import org.ballerinalang.launcher.util.BCompileUtil; +import org.ballerinalang.launcher.util.BRunUtil; +import org.ballerinalang.launcher.util.CompileResult; +import org.ballerinalang.test.context.BallerinaTestException; +import org.ballerinalang.test.context.LogLeecher; +import org.ballerinalang.test.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; + +/** + * Includes tests for a simple consumer and producer. + */ +@Test(groups = {"artemis-test"}) +public class SimpleConsumerTest extends ArtemisTestCommons { + private CompileResult result; + + @BeforeClass + public void setup() throws URISyntaxException { + TestUtils.prepareBalo(this); + Path sourcePath = Paths.get("src", "test", "resources", "messaging", "artemis", "producers"); + result = BCompileUtil.compile(sourcePath.resolve("simple_producer.bal").toAbsolutePath().toString()); + } + + @Test(description = "Tests the sending of a string message to a queue") + public void testSimpleSend() { + String errorLog = "received: Hello World"; + String functionName = "testSimpleSend"; + testSend(result, errorLog, functionName); + + } + + private void testSend(CompileResult result, String expectedErrorLog, String functionName) { + LogLeecher logLeecher = new LogLeecher(expectedErrorLog); + serverInstance.addLogLeecher(logLeecher); + BRunUtil.invoke(result, functionName); + try { + logLeecher.waitForText(TIMEOUT_IN_SECS * 1000); + } catch (BallerinaTestException e) { + Assert.fail(); + } + } +} diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/.ballerina/.gitignore b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/.ballerina/.gitignore new file mode 100644 index 000000000000..d6b7ef32c847 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/.ballerina/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/.gitignore b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/.gitignore new file mode 100644 index 000000000000..2f7896d1d136 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/.gitignore @@ -0,0 +1 @@ +target/ diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/Ballerina.toml b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/Ballerina.toml new file mode 100644 index 000000000000..12311c42d65d --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/Ballerina.toml @@ -0,0 +1,4 @@ +[project] +org-name = "ballerina-test" +version = "0.0.1" + diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/configfiles/broker.xml b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/configfiles/broker.xml new file mode 100644 index 000000000000..c9434dc9bfa5 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/configfiles/broker.xml @@ -0,0 +1,202 @@ + + + + + + + + false + 0.0.0.0 + + + true + + + ASYNCIO + + data/paging + + data/bindings + + data/journal + + data/large-messages + + true + + 2 + + 10 + + 10M + + + 24000 + + + + 4096 + + + + + + + + + + + + + + + + + + + + + 5000 + + + 90 + + + true + + 120000 + + 60000 + + HALT + + + + + + + + + + + + + tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300 + + + + + + + + + + + + + + + + + + + + + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + DLQ + ExpiryQueue + 0 + + -1 + 10 + PAGE + true + true + + + + +
+ + + +
+
+ + + +
+
+ +
+
diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/anycast_message.bal b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/anycast_message.bal new file mode 100644 index 000000000000..e5c00f448643 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/anycast_message.bal @@ -0,0 +1,90 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/artemis; +import ballerina/io; + +artemis:Connection con = new("tcp://localhost:61616"); +artemis:Session session = new(con); +listener artemis:Listener artemisListener = new(session); + +@artemis:ServiceConfig { + queueConfig: { + queueName: "anycast_queue" + } +} +service anyCastConsumer on artemisListener { + resource function onMessage(artemis:Message message) returns error? { + var payload = message.getPayload(); + if (payload is byte[]) { + io:print("byte[] "); + } else if (payload is map) { + io:print("map "); + } else if (payload is map) { + io:print("map "); + } else if (payload is map) { + io:print("map "); + } else if (payload is map) { + io:print("map "); + } else if (payload is map) { + io:print("map "); + } else if (payload is map) { + io:print("map "); + } else if (payload is string) { + if (payload == "Properties' test") { + printProperty(message); + } else { + io:print("string "); + } + } + io:print("message "); + io:println(payload); + } +} + +function printProperty(artemis:Message message) { + var stringProperty = message.getProperty("string"); + if (stringProperty is string) { + io:print("string property "); + io:print(stringProperty); + } + var intProperty = message.getProperty("int"); + if (intProperty is int) { + io:print(", int property "); + io:print(intProperty); + } + var floatProperty = message.getProperty("float"); + if (floatProperty is float) { + io:print(", float property "); + io:print(floatProperty); + } + var booleanProperty = message.getProperty("boolean"); + if (booleanProperty is boolean) { + io:print(", boolean property "); + io:print(booleanProperty); + } + var byteProperty = message.getProperty("byte"); + if (byteProperty is byte) { + io:print(", byte property "); + io:print(byteProperty); + } + var byteArrayProperty = message.getProperty("byteArray"); + if (byteArrayProperty is byte[]) { + io:print(", byteArray property "); + io:print(byteArrayProperty); + } + io:print(" "); +} \ No newline at end of file diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/multicast_message.bal b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/multicast_message.bal new file mode 100644 index 000000000000..c92c3abc5731 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/multicast_message.bal @@ -0,0 +1,54 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import ballerina/io; +@artemis:ServiceConfig { + queueConfig: { + queueName: "multicast_queue", + addressName: "multicast_address", + routingType: artemis:MULTICAST + } +} +service multiCastConsumer on artemisListener { + resource function onMessage(artemis:Message message) returns error? { + var payload = message.getPayload(); + if (payload is byte[]) { + io:print("byte[] "); + } else if (payload is map) { + io:print("map "); + } else if (payload is map) { + io:print("map "); + } else if (payload is map) { + io:print("map "); + } else if (payload is map) { + io:print("map "); + } else if (payload is map) { + io:print("map "); + } else if (payload is map) { + io:print("map "); + } else if (payload is string) { + if (payload == "Properties' test") { + printProperty(message); + } else { + io:print("string "); + } + } + io:print("message "); + io:print(payload); + io:println(" multicast "); + } +} diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/simple_consumer.bal b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/simple_consumer.bal new file mode 100644 index 000000000000..edf40cf67cb8 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/consumers/simple_consumer.bal @@ -0,0 +1,32 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/artemis; +import ballerina/io; + +@artemis:ServiceConfig { + queueConfig: { + queueName: "simple_queue" + } +} +service simpleConsumer on new artemis:Listener({host: "localhost", port: 61616}) { + resource function onMessage(artemis:Message message) returns error? { + var payload = message.getPayload(); + if (payload is string) { + io:println("received: " + payload); + } + } +} diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/anycast_message.bal b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/anycast_message.bal new file mode 100644 index 000000000000..dff8fb43cd73 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/anycast_message.bal @@ -0,0 +1,94 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/artemis; + +artemis:Connection con = new("tcp://localhost:61616"); +artemis:Session session = new(con); +artemis:Producer prod = new(session, "anycast_queue", addressConfig = {autoCreated:false}); + +public function testSendString() { + var err = prod->send("Hello World"); +} + +public function testSendByteArray() { + byte[6] msg = [1, 2, 2, 3, 3, 2]; + var err = prod->send(msg); +} + +public function testSendMapString() { + map msg = { + "name": "Riyafa", + "hello": "world" + }; + var err = prod->send(msg); +} + +public function testSendMapInt() { + map msg = { + "num": 1, + "num2": 2 + }; + var err = prod->send(msg); +} + +public function testSendMapFloat() { + map msg = { + "numf1": 1.1, + "numf2": 1.2 + }; + var err = prod->send(msg); +} + +public function testSendMapByte() { + map msg = { + "byte1": 1, + "byte2": 7 + }; + var err = prod->send(msg); +} + +public function testSendMapBoolean() { + map msg = { + "first": true, + "second": false + }; + var err = prod->send(msg); +} + +public function testSendMapByteArray() { + byte[3] array1 = [1, 2, 3]; + byte[1] array2 = [5]; + map msg = { + "array1": array1, + "array2": array2 + }; + var err = prod->send(msg); +} + +public function testSendProperties() { + artemis:Message msg = new(session, "Properties' test"); + msg.putProperty("string", "Hello again!"); + msg.putProperty("int", 123); + msg.putProperty("float", 1.111); + msg.putProperty("boolean", true); + byte byteVal = 1; + msg.putProperty("byte", byteVal); + byte[4] arr = [1,1,0,0]; + msg.putProperty("byteArray", arr); + + var err = prod->send(msg); +} diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/close.bal b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/close.bal new file mode 100644 index 000000000000..e4e757781c12 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/close.bal @@ -0,0 +1,29 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/artemis; +import ballerina/io; + +artemis:Connection con = new("tcp://localhost:61616"); +artemis:Session session = new(con); +artemis:Producer prod = new(session, "close_address"); + +public function testClose() returns (artemis:Producer, artemis:Session, artemis:Connection) { + var err = prod->close(); + err = session->close(); + err = con->close(); + return (prod, session, con); +} diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/multicast_message.bal b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/multicast_message.bal new file mode 100644 index 000000000000..1e9af3c4b947 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/multicast_message.bal @@ -0,0 +1,94 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/artemis; + +artemis:Connection con = new("tcp://localhost:61616"); +artemis:Session session = new(con); +artemis:Producer prod = new(session, "multicast_address", addressConfig = {autoCreated:false}); + +public function testSendString() { + var err = prod->send("Hello World"); +} + +public function testSendByteArray() { + byte[6] msg = [1, 2, 2, 3, 3, 2]; + var err = prod->send(msg); +} + +public function testSendMapString() { + map msg = { + "name": "Riyafa", + "hello": "world" + }; + var err = prod->send(msg); +} + +public function testSendMapInt() { + map msg = { + "num": 1, + "num2": 2 + }; + var err = prod->send(msg); +} + +public function testSendMapFloat() { + map msg = { + "numf1": 1.1, + "numf2": 1.2 + }; + var err = prod->send(msg); +} + +public function testSendMapByte() { + map msg = { + "byte1": 1, + "byte2": 7 + }; + var err = prod->send(msg); +} + +public function testSendMapBoolean() { + map msg = { + "first": true, + "second": false + }; + var err = prod->send(msg); +} + +public function testSendMapByteArray() { + byte[3] array1 = [1, 2, 3]; + byte[1] array2 = [5]; + map msg = { + "array1": array1, + "array2": array2 + }; + var err = prod->send(msg); +} + +public function testSendProperties() { + artemis:Message msg = new(session, "Properties' test"); + msg.putProperty("string", "Hello again!"); + msg.putProperty("int", 123); + msg.putProperty("float", 1.111); + msg.putProperty("boolean", true); + byte byteVal = 1; + msg.putProperty("byte", byteVal); + byte[4] arr = [1,1,0,0]; + msg.putProperty("byteArray", arr); + + var err = prod->send(msg); +} diff --git a/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/simple_producer.bal b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/simple_producer.bal new file mode 100644 index 000000000000..ac4930585012 --- /dev/null +++ b/tests/ballerina-integration-test/src/test/resources/messaging/artemis/producers/simple_producer.bal @@ -0,0 +1,22 @@ +// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. +// +// WSO2 Inc. licenses this file to you under the Apache License, +// Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import ballerina/artemis; + +public function testSimpleSend() { + artemis:Producer prod = new({host: "localhost", port: 61616}, "simple_queue", addressConfig = {autoCreated:false}); + var err = prod->send("Hello World"); +} diff --git a/tests/ballerina-integration-test/src/test/resources/testng.xml b/tests/ballerina-integration-test/src/test/resources/testng.xml index 277c24107ac3..55c32a857be0 100644 --- a/tests/ballerina-integration-test/src/test/resources/testng.xml +++ b/tests/ballerina-integration-test/src/test/resources/testng.xml @@ -270,4 +270,12 @@ + + + + + + + +