Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement ActiveMQ Artemis Connector
Browse files Browse the repository at this point in the history
riyafa committed Mar 15, 2019
1 parent c29da09 commit a9b82cc
Showing 62 changed files with 3,817 additions and 10 deletions.
6 changes: 6 additions & 0 deletions distribution/zip/ballerina-tools/pom.xml
Original file line number Diff line number Diff line change
@@ -116,6 +116,12 @@
<type>zip</type>
<classifier>ballerina-sources</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-activemq-artemis</artifactId>
<type>zip</type>
<classifier>ballerina-sources</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-log-api</artifactId>
12 changes: 12 additions & 0 deletions distribution/zip/ballerina/build.gradle
Original file line number Diff line number Diff line change
@@ -80,6 +80,15 @@ dependencies {
dist 'com.google.protobuf:protobuf-java:3.5.1'
dist 'org.wso2.orbit.org.yaml:snakeyaml:1.16.0.wso2v1'
dist 'org.wso2.staxon:staxon-core:1.2.0.wso2v2'
dist 'com.jcraft:jzlib:1.1.3'
dist 'org.apache.activemq:artemis-core-client:2.6.3'
dist 'org.apache.activemq:artemis-commons:2.6.3'
dist 'commons-beanutils:commons-beanutils:1.9.3'
dist 'org.jboss.logging:jboss-logging:3.3.1.Final'
dist 'commons-collections:commons-collections:3.2.2'
dist 'org.apache.geronimo.specs:geronimo-json_1.0_spec:1.0-alpha-1'
dist 'io.netty:netty-transport-native-epoll:4.1.34.Final'
dist 'io.netty:netty-transport-native-kqueue:4.1.34.Final'


distBal project(path: ':ballerina-auth', configuration: 'baloImplementation')
@@ -110,6 +119,7 @@ dependencies {
distBal project(path: ':ballerina-time', configuration: 'baloImplementation')
distBal project(path: ':ballerina-transactions', configuration: 'baloImplementation')
distBal project(path: ':ballerina-websub', configuration: 'baloImplementation')
distBal project(path: ':ballerina-activemq-artemis', configuration: 'baloImplementation')

balSource project(path: ':ballerina-auth', configuration: 'balSource')
balSource project(path: ':ballerina-builtin', configuration: 'balSource')
@@ -139,6 +149,7 @@ dependencies {
balSource project(path: ':ballerina-time', configuration: 'balSource')
balSource project(path: ':ballerina-transactions', configuration: 'balSource')
balSource project(path: ':ballerina-websub', configuration: 'balSource')
balSource project(path: ':ballerina-activemq-artemis', configuration: 'balSource')

dist project(':ballerina-auth')
dist project(':ballerina-builtin')
@@ -181,6 +192,7 @@ dependencies {
dist project(':strip-bouncycastle')
dist project(':toml-parser')
dist project(':tracing-extensions:ballerina-jaeger-extension')
dist project(':ballerina-activemq-artemis')

}

57 changes: 53 additions & 4 deletions distribution/zip/ballerina/pom.xml
Original file line number Diff line number Diff line change
@@ -98,6 +98,10 @@
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-jms</artifactId>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-activemq-artemis</artifactId>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-log-api</artifactId>
@@ -158,6 +162,42 @@
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-privacy</artifactId>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jzlib</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-json_1.0_spec</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
</dependency>
<!-- end of stdlib jar dependencies -->

<dependency>
@@ -241,10 +281,6 @@
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jzlib</artifactId>
</dependency>

<!-- Siddhi Stream Dependencies -->
<dependency>
@@ -399,6 +435,12 @@
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-activemq-artemis</artifactId>
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-jms</artifactId>
@@ -594,6 +636,12 @@
<type>zip</type>
<classifier>ballerina-sources</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-activemq-artemis</artifactId>
<type>zip</type>
<classifier>ballerina-sources</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-log-api</artifactId>
@@ -760,6 +808,7 @@
ballerina-socket,
ballerina-internal,
ballerina-jms,
ballerina-activemq-artemis,
ballerina-log-api,
ballerina-math,
ballerina-mime,
12 changes: 12 additions & 0 deletions distribution/zip/ballerina/src/assembly/bin.xml
Original file line number Diff line number Diff line change
@@ -112,6 +112,7 @@
<include>org.ballerinalang:ballerina-io:jar</include>
<include>org.ballerinalang:ballerina-socket:jar</include>
<include>org.ballerinalang:ballerina-jms:jar</include>
<include>org.ballerinalang:ballerina-activemq-artemis:jar</include>
<include>org.ballerinalang:ballerina-log-api:jar</include>
<include>org.ballerinalang:ballerina-math:jar</include>
<include>org.ballerinalang:ballerina-mime:jar</include>
@@ -184,6 +185,17 @@
<include>io.netty:netty-tcnative-boringssl-static</include>
<include>com.jcraft:jzlib</include>

<!-- Artemis connector dependencies -->
<include>org.apache.activemq:artemis-core-client</include>
<include>org.apache.activemq:artemis-commons</include>
<include>commons-beanutils:commons-beanutils</include>
<include>org.apache.activemq:artemis-commons</include>
<include>org.jboss.logging:jboss-logging</include>
<include>commons-collections:commons-collections</include>
<include>org.apache.geronimo.specs:geronimo-json_1.0_spec</include>
<include>io.netty:netty-transport-native-epoll</include>
<include>io.netty:netty-transport-native-kqueue</include>

<!-- Observability Dependencies -->
<include>org.ballerinalang:ballerina-jaeger-extension:jar</include>
<include>io.opentracing:opentracing-api</include>
1 change: 1 addition & 0 deletions language-server/modules/langserver-compiler/build.gradle
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ dependencies {
implementation project(':ballerina-websub')
implementation project(':ballerina-jms')
implementation project(':ballerina-grpc')
implementation project(':ballerina-activemq-artemis')
testCompile 'org.testng:testng:6.13.1'
}

6 changes: 6 additions & 0 deletions language-server/modules/langserver-compiler/pom.xml
Original file line number Diff line number Diff line change
@@ -94,6 +94,12 @@
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-activemq-artemis</artifactId>
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-grpc</artifactId>
1 change: 1 addition & 0 deletions language-server/modules/langserver-core/build.gradle
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ dependencies {
implementation project(':ballerina-jms')
implementation project(':ballerina-grpc')
implementation project(':testerina:testerina-core')
implementation project(':ballerina-activemq-artemis')
implementation 'com.fasterxml.jackson.core:jackson-databind:2.9.1'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.9.1'
implementation 'io.netty:netty-buffer:4.1.19.Final'
6 changes: 6 additions & 0 deletions language-server/modules/langserver-core/pom.xml
Original file line number Diff line number Diff line change
@@ -102,6 +102,12 @@
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-activemq-artemis</artifactId>
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-grpc</artifactId>
72 changes: 72 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -531,6 +531,27 @@
<classifier>ballerina-binary-repo</classifier>
</dependency>

<!--ballerina-activemq-artemis-->
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-activemq-artemis</artifactId>
<version>${ballerina.version}</version>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-activemq-artemis</artifactId>
<version>${ballerina.version}</version>
<type>zip</type>
<classifier>ballerina-sources</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-activemq-artemis</artifactId>
<version>${ballerina.version}</version>
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
</dependency>

<!-- ballerina-log-api -->
<dependency>
<groupId>org.ballerinalang</groupId>
@@ -1557,6 +1578,48 @@
<artifactId>bcpkix-jdk15on</artifactId>
<version>${bouncycastle.version}</version>
</dependency>

<!--Artemis dependencies -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>
<version>${artemis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-commons</artifactId>
<version>${artemis.version}</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>${beanutils.version}</version>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
<version>${jboss.version}</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>${commons-collections.version}</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-json_1.0_spec</artifactId>
<version>${geronimo-json.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

@@ -1679,6 +1742,7 @@
<module>stdlib/database/sql</module>
<module>stdlib/streams</module>
<module>stdlib/privacy</module>
<module>stdlib/messaging/activemq-artemis</module>

<module>misc/lib-creator</module>

@@ -1767,6 +1831,7 @@
<module>stdlib/database/sql</module>
<module>stdlib/streams</module>
<module>stdlib/privacy</module>
<module>stdlib/messaging/activemq-artemis</module>

<module>misc/lib-creator</module>

@@ -2349,6 +2414,13 @@
<andes.client.version>3.2.87</andes.client.version>
<wso2.securevault.version>1.0.0-wso2v2</wso2.securevault.version>

<!-- Artemis connector -->
<artemis.version>2.6.3</artemis.version>
<beanutils.version>1.9.3</beanutils.version>
<jboss.version>3.3.1.Final</jboss.version>
<commons-collections.version>3.2.2</commons-collections.version>
<geronimo-json.version>1.0-alpha-1</geronimo-json.version>

<file.transport.version>6.0.55</file.transport.version>
<chewiebug.gcviewer.version>1.35</chewiebug.gcviewer.version>

2 changes: 2 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -83,6 +83,7 @@ include(':ballerina-tools-integration-test')
include(':examples-test')
include(':plugin-vscode')
include(':benchmarks')
include(':ballerina-activemq-artemis')
include(':build-config:checkstyle')
project(':ballerina-lang').projectDir = file('compiler/ballerina-lang')
project(':ballerina-utils').projectDir = file('stdlib/utils')
@@ -164,6 +165,7 @@ project(':ballerina-integration-test').projectDir = file('tests/ballerina-integr
project(':ballerina-tools-integration-test').projectDir = file('tests/ballerina-tools-integration-test')
project(':examples-test').projectDir = file('tests/ballerina-examples-test')
project(':plugin-vscode').projectDir = file('tool-plugins/vscode')
project(':ballerina-activemq-artemis').projectDir = file('stdlib/messaging/activemq-artemis')

buildCache {
remote(HttpBuildCache) {
35 changes: 35 additions & 0 deletions stdlib/messaging/activemq-artemis/assembly/balo.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<!--
~ /*
~ * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
~ *
~ * Licensed under the Apache License, Version 2.0 (the "License");
~ * you may not use this file except in compliance with the License.
~ * You may obtain a copy of the License at
~ *
~ * http://www.apache.org/licenses/LICENSE-2.0
~ *
~ * Unless required by applicable law or agreed to in writing, software
~ * distributed under the License is distributed on an "AS IS" BASIS,
~ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ * See the License for the specific language governing permissions and
~ * limitations under the License.
~ */
-->
<assembly>
<includeBaseDirectory>true</includeBaseDirectory>
<baseDirectory>/</baseDirectory>
<id>ballerina-binary-repo</id>
<formats>
<format>zip</format>
</formats>

<fileSets>
<fileSet>
<directory>${project.build.directory}/generated-balo</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>**</include>
</includes>
</fileSet>
</fileSets>
</assembly>
35 changes: 35 additions & 0 deletions stdlib/messaging/activemq-artemis/assembly/source.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<!--
~ /*
~ * Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
~ *
~ * Licensed under the Apache License, Version 2.0 (the "License");
~ * you may not use this file except in compliance with the License.
~ * You may obtain a copy of the License at
~ *
~ * http://www.apache.org/licenses/LICENSE-2.0
~ *
~ * Unless required by applicable law or agreed to in writing, software
~ * distributed under the License is distributed on an "AS IS" BASIS,
~ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ * See the License for the specific language governing permissions and
~ * limitations under the License.
~ */
-->
<assembly>
<includeBaseDirectory>true</includeBaseDirectory>
<baseDirectory>ballerina</baseDirectory>
<id>ballerina-sources</id>
<formats>
<format>zip</format>
</formats>

<fileSets>
<fileSet>
<directory>src/main/ballerina</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>**</include>
</includes>
</fileSet>
</fileSets>
</assembly>
27 changes: 27 additions & 0 deletions stdlib/messaging/activemq-artemis/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apply from: "$rootDir/gradle/balNativeLibProject.gradle"

dependencies {
implementation project(':ballerina-core')
implementation project(':ballerina-io')
implementation project(':ballerina-lang')
implementation project(':ballerina-builtin')
implementation project(':ballerina-utils')
implementation 'org.apache.activemq:artemis-core-client:2.6.3'

baloImplementation project(path: ':ballerina-builtin', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-io', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-utils', configuration: 'baloImplementation')
}

description = 'Ballerina - ActiveMQ Artemis'

test {
doFirst {
copy {
from "$buildDir/generated-balo/repo/ballerina"
into "$buildDir/lib/repo/ballerina"
}
}
systemProperty "java.util.logging.config.file", "$buildDir/logging.properties"
systemProperty "java.util.logging.manager", "org.ballerinalang.logging.BLogManager"
}
246 changes: 246 additions & 0 deletions stdlib/messaging/activemq-artemis/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
<!--
~ Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
~
~ WSO2 Inc. licenses this file to you under the Apache License,
~ Version 2.0 (the "License"); you may not use this file except
~ in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>ballerina-parent</artifactId>
<groupId>org.ballerinalang</groupId>
<version>0.990.4-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ballerina-activemq-artemis</artifactId>
<packaging>jar</packaging>
<name>Ballerina - ActiveMQ Artemis</name>
<url>https://ballerina.io</url>

<dependencies>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-core</artifactId>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-io</artifactId>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-lang</artifactId>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-builtin</artifactId>
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-utils</artifactId>
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-io</artifactId>
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-core-client</artifactId>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<excludes>
<exclude>ballerina/**</exclude>
</excludes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<id>gen-balo</id>
<goals>
<goal>java</goal>
</goals>
<phase>compile</phase>
<configuration>
<systemProperties>
<systemProperty>
<key>BALLERINA_DEV_MODE_COMPILE</key>
<value>true</value>
</systemProperty>
</systemProperties>
<arguments>
<argument>false</argument>
<argument>${basedir}/src/main/ballerina/</argument>
<argument>${project.build.directory}/generated-balo/repo/ballerina</argument>
<argument>${project.build.directory}</argument>
<argument>${project.version}</argument>
</arguments>
</configuration>
</execution>
</executions>
<configuration>
<mainClass>org.ballerinalang.stdlib.utils.GenerateBalo</mainClass>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-dependencies</id>
<phase>generate-resources</phase>
<goals>
<goal>unpack-dependencies</goal>
</goals>
<configuration>
<includeClassifiers>ballerina-binary-repo</includeClassifiers>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.plugin.version}</version>
<configuration>
<compilerArgument>-proc:none</compilerArgument>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>distribution</id>
<phase>package</phase>
<goals>
<goal>attached</goal>
</goals>
<configuration>
<descriptorSourceDirectory>assembly</descriptorSourceDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- For ballerina annotation processing -->
<plugin>
<groupId>org.bsc.maven</groupId>
<artifactId>maven-processor-plugin</artifactId>
<configuration>
<processors>
<processor>org.ballerinalang.codegen.BallerinaAnnotationProcessor</processor>
</processors>
<options>
<nativeEntityProviderPackage>org.ballerinalang.stdlib.artemis.generated.providers
</nativeEntityProviderPackage>
<nativeEntityProviderClass>StandardNativeElementProvider</nativeEntityProviderClass>
</options>
</configuration>
<executions>
<execution>
<id>process</id>
<goals>
<goal>process</goal>
</goals>
<phase>generate-sources</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<executions>
<execution>
<id>prepare-it-test-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
<configuration>
<append>true</append>
<inclNoLocationClasses>true</inclNoLocationClasses>
<excludes>
<exclude>org/wso2/ballerinalang/compiler/parser/antlr4/**</exclude>
</excludes>
<propertyName>jacoco.agent.argLine</propertyName>
<destFile>${project.build.directory}/coverage-reports/jacoco.exec</destFile>
</configuration>
</execution>
<execution>
<id>it-report</id>
<phase>verify</phase>
<goals>
<goal>report-aggregate</goal>
</goals>
<configuration>
<dataFileIncludes>
<dataFileInclude>**/coverage-reports/jacoco.exec</dataFileInclude>
</dataFileIncludes>
<excludes>
<exclude>org/wso2/ballerinalang/compiler/parser/antlr4/**</exclude>
</excludes>
<outputDirectory>${project.build.directory}/coverage-reports/site</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<!-- Since balo is needed by tests, here it is copied to target/lib where other balos
are copied to -->
<execution>
<id>copy-file-balo</id>
<phase>compile</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib/repo</outputDirectory>
<resources>
<resource>
<directory>${project.build.directory}/generated-balo/repo/</directory>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

<properties>
<maven.spotbugsplugin.exclude.file>spotbugs-exclude.xml</maven.spotbugsplugin.exclude.file>
<maven.checkstyleplugin.excludes>**/generated/**</maven.checkstyleplugin.excludes>
</properties>

</project>
19 changes: 19 additions & 0 deletions stdlib/messaging/activemq-artemis/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<!--
~ Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
~
~ WSO2 Inc. licenses this file to you under the Apache License,
~ Version 2.0 (the "License"); you may not use this file except
~ in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<FindBugsFilter>
</FindBugsFilter>
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[project]
org-name = "ballerina"
version = "0.0.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import ballerina/io;

# Constant for the artemis error code.
public const ARTEMIS_ERROR_CODE = "{ballerina/artemis}ArtemisError";

# The Artemis error record.
#
# + message - the error message.
public type ArtemisError record {
string message?;
!...;
};

# The url configuration for `Producer` and `Consumer`
#
# + host - The host
# + port - The port
# + username - The username
# + password - The password
public type URLConfiguration record {
string host;
int port;
string username?;
string password?;
!...;
};

# Determines how messages are sent to the queues associated with an address.
public type RoutingType MULTICAST | ANYCAST;

# If you want your messages routed to every queue within the matching address, in a publish-subscribe manner.
public const MULTICAST = "MULTICAST";
# If you want your messages routed to a single queue within the matching address, in a point-to-point manner.
public const ANYCAST = "ANYCAST";
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

# Represents ActiveMQ Artemis Connection
public type Connection client object {

# Creates an ActiveMQ Artemis Connection object.
#
# + url - The connection url to the broker
# + config - The connection configuration
public function __init(string url, ConnectionConfiguration? config = ()) {
ConnectionConfiguration configuration = {};
if (config is ConnectionConfiguration) {
configuration = config;
}
self.createConnection(url, configuration);
}

extern function createConnection(string url, ConnectionConfiguration config);

# Returns true if close was already called
#
# + return - `true` if closed, `false` otherwise.
public extern function isClosed() returns boolean;

# Closes the connection and release all its resources
#
# + return - `error` if an error occurs closing the connection or nil
public remote extern function close() returns error?;
};

# Configurations related to a Artemis `Connection`
#
# + timeToLive - Connection's time-to-live. negative to disable or greater or equals to 0
# + callTimeout - The blocking calls timeout in milliseconds
# + consumerWindowSize - Window size in bytes for flow control of the consumers created through this `Connection`
# + consumerMaxRate - Maximum rate of message consumption for consumers created through this `Connection`
# + producerWindowSize - Window size for flow control of the producers created through this `Connection`
# + producerMaxRate - The maximum rate of message production for producers created through this `Connection`
# + retryInterval - The time in milliseconds to retry connection
# + retryIntervalMultiplier - Multiplier to apply to successive retry intervals
# + maxRetryInterval - The maximum retry interval (in the case a retry interval multiplier has been specified)
# + reconnectAttempts - The maximum number of attempts to retry connection in case of failure
# + initialConnectAttempts - The maximum number of attempts to establish an initial connection
public type ConnectionConfiguration record {
//Add this once working
int timeToLive = 60000;
int callTimeout = 30000;
int consumerWindowSize = 1024 * 1024;
int consumerMaxRate = -1;
int producerWindowSize = 64 * 1024;
int producerMaxRate = -1;
int retryInterval = 2000;
float retryIntervalMultiplier = 1;
int maxRetryInterval = 2000;
int reconnectAttempts = 0;
int initialConnectAttempts = 1;
!...;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

# Represents ActiveMQ Artemis Listener. This is an abstraction for that includes the connection and session.
# Consumers are represented by the service attaching to this listener.
public type Listener object {
*AbstractListener;
private Session session;
private boolean anonymousSession;

public function __init(Session | URLConfiguration sesssionOrURLConfig) {
if (sesssionOrURLConfig is Session) {
self.session = sesssionOrURLConfig;
} else {
Connection connection = new("tcp://" + sesssionOrURLConfig.host + ":" + sesssionOrURLConfig.port);
self.session = new(connection, config = { username: sesssionOrURLConfig.username,
password: sesssionOrURLConfig.password });
self.anonymousSession = true;
}
}
public function __start() returns error? {
return self.start();
}
public function __stop() returns error? {
return self.stop();
}
public function __attach(service serviceType, map<any> annotationData) returns error? {
return self.createConsumer(serviceType);
}

extern function start() returns error?;
extern function createConsumer(service serviceType) returns error?;
extern function stop() returns error?;
};

# The configuration for an Artemis consumer service.
#
# + autoAck - whether to automatically acknowledge a service when a resource completes
# + queueConfig - the configuration for the queue to consume from
# + filter - only messages which match this filter will be consumed
# + browseOnly - whether the ClientConsumer will only browse the queue or consume messages
public type ArtemisServiceConfig record {
boolean autoAck = true;
QueueConfiguration queueConfig;
string? filter = ();
boolean browseOnly = false;
!...;
};

public annotation<service> ServiceConfig ArtemisServiceConfig;

# ActiveMQ Artemis Queue configuration
# If the `autoCreated` is `false` an error will be thrown if the queue does not exist.
# If `autocreated` is `true` and the queue already exists then the other configurations would be ignored.
#
# + queueName - the name of the queue
# + addressName - the address queue is bound to. If the value is `nil` and `autoCreated` is true and the
# queue does not already exist then the address would take the name of the queue.
# + autoCreated - whether to automatically create the queue
# + routingType - the routing type for the queue, MULTICAST or ANYCAST
# + temporary - whether the queue is temporary. If this value is set to true the `durable` property value shall be ignored.
# + filter - messages which match this filter will be put in the queue
# + durable - whether the queue is durable or not. If `temporary` property value is true this value
# + maxConsumers - how many concurrent consumers will be allowed on this queue
# + purgeOnNoConsumers - whether to delete the contents of the queue when the last consumer disconnects
# + exclusive - whether the queue should be exclusive
# + lastValue - whether the queue should be lastValue
public type QueueConfiguration record {
string queueName;
string? addressName = ();
boolean autoCreated = true;
RoutingType routingType = ANYCAST;
boolean temporary = true;
string? filter = ();
boolean durable = false;
int maxConsumers = -1;
boolean purgeOnNoConsumers = false;
boolean exclusive = false;
boolean lastValue = false;
!...;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

# Represents ActiveMQ Artemis Message
public type Message client object {

private MessageType messageType = TEXT;
private MessageConfiguration configuration;

public function __init(Session session, io:ReadableByteChannel | int | float | byte | boolean | string |
map<string | int | float | byte | boolean | byte[]> | xml | json | byte[] data,
MessageConfiguration? config = ()) {
if (config is MessageConfiguration) {
self.configuration = config;
} else {
self.configuration = {};
}

if (data is (string | json | xml | int | float | byte | boolean)) {
self.messageType = TEXT;
}
if (data is io:ReadableByteChannel) {
self.messageType = STREAM;
self.createMessage(session, data, self.configuration);
} else if (data is byte) {
self.createMessage(session, string.convert(int.convert(data)), self.configuration);
} else if (data is map<string | int | float | byte | boolean | byte[]>) {
self.messageType = MAP;
self.createMessage(session, data, self.configuration);
} else if (data is xml) {
self.createMessage(session, string.convert(data), self.configuration);
} else if (data is json) {
self.createMessage(session, data.toString(), self.configuration);
} else if (data is byte[]) {
self.messageType = BYTES;
self.createMessage(session, data, self.configuration);
}
}

extern function createMessage(Session session, string | byte[] | map<string | int | float | byte | boolean | byte[]>
| io:ReadableByteChannel data, MessageConfiguration config);

# Acknowledges reception of this message.
#
# + return - If an error occurred while acknowledging the message
public remote extern function acknowledge() returns error?;

# Returns the size (in bytes) of this message's body.
#
# + return - the size of the message body
public extern function getBodySize() returns int | error;

# Add message property
#
# + key - The name of the property
# + value - The value of the property
# + return - If an error occures while setting the property
public extern function putProperty(string key, string | int | float | boolean | byte | byte[] value) returns error?;

# Get a message property
#
# + key - The name of the property
# + return - The value of the property or nil if not found
public extern function getProperty(string key) returns string | int | float | boolean | byte | byte[] | () | error;

# The type of the message
#
# + return - The `MessageType` of the message
public extern function getType() returns MessageType;

# The message payload
#
# + return - The message payload or error on failure to retrieve payload or if the type is unsupported.
# A map payload can contain an error if the type is unsupported.
public extern function getPayload() returns string | byte[] | map<string | int | float | byte | boolean | byte[]> | error;

# Call this function to save to a WritableByteChannel if the message is `STREAM` type
#
# + ch - The byte channel to save to
# + return - will return an `error` if the message is not of type `STREAM` or on failure
public extern function saveToWritableByteChannel(io:WritableByteChannel ch) returns error?;

# Get the message configuration
#
# + return - the `MessageConfiguration` of this message
public function getConfig() returns MessageConfiguration {
return self.configuration;
}
};

# Represents a message sent and/or received by ActiveMQ Artemis
#
# + expiration - The expiration time of this message
# + timeStamp - The message timestamp
# + priority - the message priority (between 0 and 9 inclusive)
# + durable - whether the created message is durable or not
# + routingType - `RoutingType` of the message
public type MessageConfiguration record {
int? expiration = ();
int? timeStamp = ();
byte priority = 0;
boolean durable = true;
RoutingType? routingType = ();
!...;
};

# ActiveMQ Artemis message types.
public type MessageType TEXT | BYTES | MAP | STREAM | UNSUPPORTED;

# The text message type.
public const TEXT = "TEXT";
# The bytes message type.
public const BYTES = "BYTES";
# The map message type.
public const MAP = "MAP";
# The stream message type.
public const STREAM = "STREAM";
# If the message recieved is not of the supported message type in Ballerina it will have the type as UNSUPPORTED.
public const UNSUPPORTED = "UNSUPPORTED";
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

# Represents ActiveMQ Artemis Producer
public type Producer client object {
private Session session;
private boolean anonymousSession = false;

public function __init(Session | URLConfiguration sesssionOrURLConfig, string addressName,
AddressConfiguration? addressConfig = (), int rate = -1) {
if (sesssionOrURLConfig is Session) {
self.session = sesssionOrURLConfig;
} else {
//Todo: url parse host and the port
Connection connection = new("tcp://" + sesssionOrURLConfig.host + ":" + sesssionOrURLConfig.port);
self.session = new(connection, config = {username: sesssionOrURLConfig.username, password: sesssionOrURLConfig.password});
self.anonymousSession = true;
}
AddressConfiguration configuration = {

};
if (addressConfig is AddressConfiguration) {
configuration = addressConfig;
}
self.createProducer(addressName, configuration, rate);
}

extern function createProducer(string addressName, AddressConfiguration addressConfig, int rate);

# Sends a message to the producer's address
#
# + data - the `Message` or data to send
# + return - `error` on failure
public remote function send(int | float | string | json | xml | byte | byte[] | map<string | int | float | byte
| boolean | byte[]> | io:ReadableByteChannel | Message data) returns error? {
Message msg;
if (data is Message) {
msg = data;
} else {
msg = new(self.session, data);
}
return self.externSend(msg);
}

# Returns whether the producer is closed or not
#
# + return - `true` if the producer is closed and `false` otherwise
public extern function isClosed() returns boolean;

# Closes the ClientProducer. If already closed nothing is done.
#
# + return - `error` on failure to close.
public remote extern function close() returns error?;

extern function externSend(Message data) returns error?;
};

# The ActiveMQ Artemis address related configuration.
# If the `autoCreated` is `false` an error will be thrown if the address does not exist.
# If `autocreated` is `true` and the address already exists then the `routingType` configuration would be ignored.
#
# + routingType - the routing type for the address, MULTICAST or ANYCAST
# + autoCreated - whether the address has to be auto created.
public type AddressConfiguration record {
RoutingType routingType = ANYCAST;
boolean autoCreated = true;
!...;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2019 WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
//
// WSO2 Inc. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

# Represents ActiveMQ Artemis Session
public type Session client object {

public function __init(Connection con, SessionConfiguration? config = ()) {
SessionConfiguration configuration = {};
if (config is SessionConfiguration) {
configuration = config;
}
self.createSession(con, configuration);
}

extern function createSession(Connection con, SessionConfiguration config);

# Returns true if close was already called
#
# + return - `true` if closed, `false` otherwise.
public extern function isClosed() returns boolean;

# Closes the connection and release all its resources
#
# + return - `error` if an error occurs closing the connection or nil
public remote extern function close() returns error?;
};

# Configurations related to a Artemis Session
#
# + username - The username
# + password - The password
public type SessionConfiguration record {
string? username = ();
string? password = ();
!...;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis;

import static org.ballerinalang.util.BLangConstants.ORG_NAME_SEPARATOR;

/**
* Constants related to Artemis connector.
*
* @since 0.995
*/
public class ArtemisConstants {

public static final String BALLERINA = "ballerina";
public static final String ARTEMIS = "artemis";
public static final String PROTOCOL_PACKAGE_ARTEMIS = BALLERINA + ORG_NAME_SEPARATOR + ARTEMIS;

// Error related constants
static final String ARTEMIS_ERROR_CODE = "{ballerina/artemis}ArtemisError";
static final String ARTEMIS_ERROR_RECORD = "ArtemisError";
static final String ARTEMIS_ERROR_MESSAGE = "message";

// Native objects
public static final String ARTEMIS_CONNECTION_POOL = "artemis-connection-pool";
public static final String ARTEMIS_SESSION_FACTORY = "artemis-session-factory";
public static final String ARTEMIS_SESSION = "artemis-session";
public static final String ARTEMIS_MESSAGE = "artemis-message";
public static final String ARTEMIS_PRODUCER = "artemis-producer";
public static final String ARTEMIS_CONSUMER = "artemis-consumer";

// The struct types
public static final String MESSAGE_OBJ = "Message";
public static final String CONNECTION_OBJ = "Connection";
public static final String SESSION_OBJ = "Session";
public static final String PRODUCER_OBJ = "Producer";
public static final String LISTENER_OBJ = "Listener";

// Config related
public static final String ROUTING_TYPE = "routingType";
static final String MULTICAST = "MULTICAST";

// warning suppression
public static final String UNCHECKED = "unchecked";

public static final String COUNTDOWN_LATCH = "countdown-latch";

private ArtemisConstants() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BLangVMErrors;
import org.ballerinalang.connector.api.BLangConnectorSPIUtil;
import org.ballerinalang.model.types.BTypes;
import org.ballerinalang.model.values.BBoolean;
import org.ballerinalang.model.values.BByte;
import org.ballerinalang.model.values.BError;
import org.ballerinalang.model.values.BFloat;
import org.ballerinalang.model.values.BInteger;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BString;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.model.values.BValueArray;
import org.ballerinalang.util.exceptions.BallerinaException;
import org.slf4j.Logger;

/**
* Utility class for Artemis.
*/
public class ArtemisUtils {

/**
* Util function to throw a {@link BallerinaException}.
*
* @param message the error message
* @param context the Ballerina context
* @param exception the exception to be propagated
* @param logger the logger to log errors
*/
public static void throwBallerinaException(String message, Context context, Exception exception, Logger logger) {
logger.error(message, exception);
throw new BallerinaException(message, exception, context);
}

/**
* Get error struct.
*
* @param context Represent ballerina context
* @param errMsg Error message
* @return Error struct
*/
public static BError getError(Context context, String errMsg) {
BMap<String, BValue> artemisErrorRecord = createArtemisErrorRecord(context);
artemisErrorRecord.put(ArtemisConstants.ARTEMIS_ERROR_MESSAGE, new BString(errMsg));
return BLangVMErrors.createError(context, true, BTypes.typeError, ArtemisConstants.ARTEMIS_ERROR_CODE,
artemisErrorRecord);
}

private static BMap<String, BValue> createArtemisErrorRecord(Context context) {
return BLangConnectorSPIUtil.createBStruct(context, ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS,
ArtemisConstants.ARTEMIS_ERROR_RECORD);
}

/**
* Get error struct from throwable.
*
* @param context Represent ballerina context
* @param exception Throwable representing the error.
* @return Error struct
*/
public static BError getError(Context context, Exception exception) {
if (exception.getMessage() == null) {
return getError(context, "Artemis connector error");
} else {
return getError(context, exception.getMessage());
}
}

/**
* Gets a String from the {@link BMap} config.
*
* @param config the BMap config
* @param key the key that has an integer value
* @param logger the logger to log errors
* @return
*/
public static int getIntFromConfig(BMap<String, BValue> config, String key, Logger logger) {
return getIntFromLong(((BInteger) config.get(key)).intValue(), key, logger);
}

/**
* Gets an integer from a long value. Handles errors appropriately.
*
* @param longVal the long value.
* @param name the name of the long value: useful for logging the error.
* @param logger the logger to log errors
* @return
*/
public static int getIntFromLong(long longVal, String name, Logger logger) {
if (longVal <= 0) {
return -1;
}
try {
return Math.toIntExact(longVal);
} catch (ArithmeticException e) {
logger.warn("The value set for {} needs to be less than {}. The {} value is set to {}", name,
Integer.MAX_VALUE, name, Integer.MAX_VALUE);
return Integer.MAX_VALUE;
}
}

/**
* Get the relevant BValure for an Object.
*
* @param obj the Object
* @param context the Ballerina context to to be used in case of errors
* @return the relevant BValue for the object or error
*/
public static BValue getBValueFromObj(Object obj, Context context) {
if (obj instanceof String) {
return new BString((String) obj);
} else if (obj instanceof Integer) {
return new BInteger((int) obj);
} else if (obj instanceof Long) {
return new BInteger((long) obj);
} else if (obj instanceof Short) {
return new BInteger((short) obj);
} else if (obj instanceof Float) {
return new BFloat((float) obj);
} else if (obj instanceof Double) {
return new BFloat((double) obj);
} else if (obj instanceof Boolean) {
return new BBoolean((boolean) obj);
} else if (obj instanceof Byte) {
return new BByte((byte) obj);
} else if (obj instanceof byte[]) {
return new BValueArray((byte[]) obj);
} else {
return ArtemisUtils.getError(context, "Unsupported type");
}
}

/**
* Gets the {@link RoutingType} from the String type.
*
* @param routingType the string routing type
* @return the relevant {@link RoutingType}
*/
public static RoutingType getRoutingTypeFromString(String routingType) {
return ArtemisConstants.MULTICAST.equals(routingType) ? RoutingType.ANYCAST :
RoutingType.MULTICAST;
}

/**
* Get the natively stored {@link ClientSession} from the BMap.
*
* @param obj the Ballerina object as a BMap
* @return the natively stored {@link ClientSession}
*/
public static ClientSession getClientSessionFromBMap(BMap<String, BValue> obj) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> sessionObj = (BMap<String, BValue>) obj.get("session");
return (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION);
}

/**
* Close the session if it has been created implicitly identified by the anonymousSession field in the Ballerina
* object.
*
* @param obj the Ballerina object as a BMap
* @throws ActiveMQException on session closure failure
*/
public static void closeIfAnonymousSession(BMap<String, BValue> obj) throws ActiveMQException {
boolean anonymousSession = ((BBoolean) obj.get("anonymousSession")).booleanValue();
if (anonymousSession) {
ClientSession session = ArtemisUtils.getClientSessionFromBMap(obj);
if (!session.isClosed()) {
session.close();
}
}
}

private ArtemisUtils() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.connection;

import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function to close Artemis connection.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "close",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.CONNECTION_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
isPublic = true
)
public class Close extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
try {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> connection = (BMap<String, BValue>) context.getRefArgument(0);
ServerLocator connectionPool = (ServerLocator) connection.getNativeData(
ArtemisConstants.ARTEMIS_CONNECTION_POOL);
ClientSessionFactory sessionFactory =
(ClientSessionFactory) connection.getNativeData(ArtemisConstants.ARTEMIS_SESSION_FACTORY);
connectionPool.close();
sessionFactory.close();
} catch (Exception e) {
context.setReturnValues(ArtemisUtils.getError(context, e));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.connection;

import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BFloat;
import org.ballerinalang.model.values.BInteger;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Extern function for Artemis connection creation.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "createConnection",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.CONNECTION_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
args = {
@Argument(name = "url", type = TypeKind.STRING),
@Argument(name = "config", type = TypeKind.RECORD, structType = "ConnectionConfiguration")
}
)
public class CreateConnection extends BlockingNativeCallableUnit {
private static final Logger logger = LoggerFactory.getLogger(CreateConnection.class);

@Override
public void execute(Context context) {
try {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> connection = (BMap<String, BValue>) context.getRefArgument(0);

String url = context.getStringArgument(0);

@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> configObj = (BMap<String, BValue>) context.getRefArgument(1);
long connectionTTL = ((BInteger) configObj.get("timeToLive")).intValue();
long callTimeout = ((BInteger) configObj.get("callTimeout")).intValue();
int consumerWindowSize = ArtemisUtils.getIntFromConfig(configObj, "consumerWindowSize", logger);
int consumerMaxRate = ArtemisUtils.getIntFromConfig(configObj, "consumerMaxRate", logger);
int producerWindowSize = ArtemisUtils.getIntFromConfig(configObj, "producerWindowSize", logger);
int producerMaxRate = ArtemisUtils.getIntFromConfig(configObj, "producerMaxRate", logger);
long retryInterval = ((BInteger) configObj.get("retryInterval")).intValue();
double retryIntervalMultiplier = ((BFloat) configObj.get("retryIntervalMultiplier")).floatValue();
long maxRetryInterval = ((BInteger) configObj.get("maxRetryInterval")).intValue();
int reconnectAttempts = ArtemisUtils.getIntFromConfig(configObj, "reconnectAttempts", logger);
int initialConnectAttempts = ArtemisUtils.getIntFromConfig(configObj, "initialConnectAttempts", logger);

ServerLocator connectionPool = ActiveMQClient.createServerLocator(url);

//Add config values to the serverLocator before creating the sessionFactory
connectionPool.setConnectionTTL(connectionTTL);
connectionPool.setCallTimeout(callTimeout);
connectionPool.setConsumerWindowSize(consumerWindowSize);
connectionPool.setConsumerMaxRate(consumerMaxRate);
connectionPool.setProducerWindowSize(producerWindowSize);
connectionPool.setProducerMaxRate(producerMaxRate);
connectionPool.setRetryInterval(retryInterval);
connectionPool.setRetryIntervalMultiplier(retryIntervalMultiplier);
connectionPool.setMaxRetryInterval(maxRetryInterval);
connectionPool.setReconnectAttempts(reconnectAttempts);
connectionPool.setInitialConnectAttempts(initialConnectAttempts);
connectionPool.setConfirmationWindowSize(1024);

ClientSessionFactory factory = connectionPool.createSessionFactory();

connection.addNativeData(ArtemisConstants.ARTEMIS_CONNECTION_POOL, connectionPool);
connection.addNativeData(ArtemisConstants.ARTEMIS_SESSION_FACTORY, factory);

} catch (Exception e) {
ArtemisUtils.throwBallerinaException("Error occurred while starting connection.", context, e, logger);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.connection;

import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BBoolean;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function to check Artemis connection closure.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "isClosed",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.CONNECTION_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
isPublic = true
)
public class IsClosed extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> connection = (BMap<String, BValue>) context.getRefArgument(0);
ClientSessionFactory sessionFactory =
(ClientSessionFactory) connection.getNativeData(ArtemisConstants.ARTEMIS_SESSION_FACTORY);
context.setReturnValues(new BBoolean(sessionFactory.isClosed()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.consumer;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BLangVMErrors;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.connector.api.Annotation;
import org.ballerinalang.connector.api.BLangConnectorSPIUtil;
import org.ballerinalang.connector.api.BallerinaConnectorException;
import org.ballerinalang.connector.api.Executor;
import org.ballerinalang.connector.api.Resource;
import org.ballerinalang.connector.api.Service;
import org.ballerinalang.connector.api.Struct;
import org.ballerinalang.connector.api.Value;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BError;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;
import org.ballerinalang.services.ErrorHandlerUtils;
import org.ballerinalang.util.codegen.ProgramFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

/**
* Extern function to start the Artemis consumer.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "createConsumer",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.LISTENER_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS)
)
public class CreateConsumer extends BlockingNativeCallableUnit {
private static final Logger logger = LoggerFactory.getLogger(CreateConsumer.class);

private static final String FILTER = "filter";

@Override
public void execute(Context context) {
try {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> consumerObj = (BMap<String, BValue>) context.getRefArgument(0);

ClientSession session = ArtemisUtils.getClientSessionFromBMap(consumerObj);

Service service = BLangConnectorSPIUtil.getServiceRegistered(context);

Annotation serviceAnnotation = getServiceConfigAnnotation(service);
Struct annotationValue = serviceAnnotation.getValue();
boolean autoAck = annotationValue.getBooleanField("autoAck");
String consumerFilter = getStringFromValueOrNull(annotationValue.getRefField(FILTER));
boolean browseOnly = annotationValue.getBooleanField("browseOnly");

Map<String, Value> queueConfig = annotationValue.getMapField("queueConfig");
String queueName = queueConfig.get("queueName").getStringValue();
SimpleString addressName = new SimpleString(getAddressName(queueConfig, queueName));
boolean autoCreated = queueConfig.get("autoCreated").getBooleanValue();
String routingType = queueConfig.get(ArtemisConstants.ROUTING_TYPE).getStringValue();
boolean temporary = queueConfig.get("temporary").getBooleanValue();
String queueFilter = getStringFromValueOrNull(queueConfig.get(FILTER));
boolean durable = queueConfig.get("durable").getBooleanValue();
int maxConsumers = ArtemisUtils.getIntFromLong(queueConfig.get("maxConsumers").getIntValue(),
"maxConsumers", logger);
boolean purgeOnNoConsumers = queueConfig.get("purgeOnNoConsumers").getBooleanValue();
boolean exclusive = queueConfig.get("exclusive").getBooleanValue();
boolean lastValue = queueConfig.get("lastValue").getBooleanValue();

if (autoCreated) {
SimpleString simpleQueueName = new SimpleString(queueName);
SimpleString simpleQueueFilter = queueFilter != null ? new SimpleString(queueFilter) : null;
ClientSession.QueueQuery queueQuery = session.queueQuery(simpleQueueName);
if (!queueQuery.isExists()) {
if (!temporary) {
session.createQueue(addressName, ArtemisUtils.getRoutingTypeFromString(routingType),
simpleQueueName, simpleQueueFilter, durable, true, maxConsumers,
purgeOnNoConsumers, exclusive, lastValue);
} else {
session.createTemporaryQueue(addressName, ArtemisUtils.getRoutingTypeFromString(routingType),
simpleQueueName, simpleQueueFilter, maxConsumers,
purgeOnNoConsumers, exclusive, lastValue);
}
} else {
logger.warn(
"Queue with the name {} already exists with routingType: {}, durable: {}, temporary: {}, " +
"filter: {}, purgeOnNoConsumers: {}, exclusive: {}, lastValue: {}",
queueName, queueQuery.getRoutingType(), queueQuery.isDurable(), queueQuery.isTemporary(),
queueQuery.getFilterString(), queueQuery.isPurgeOnNoConsumers(), queueQuery.isExclusive(),
queueQuery.isLastValue());
}
}

Resource onMessageResource = service.getResources()[0];

ClientConsumer consumer = session.createConsumer(queueName, consumerFilter, browseOnly);
consumerObj.addNativeData(ArtemisConstants.ARTEMIS_CONSUMER, consumer);
if (onMessageResource != null) {
consumer.setMessageHandler(
clientMessage -> Executor
.submit(onMessageResource, new ResponseCallback(clientMessage, autoAck), null, null,
getSignatureParameters(onMessageResource, clientMessage)));
}
} catch (ActiveMQException e) {
context.setReturnValues(ArtemisUtils.getError(context, e));
}
}

private String getAddressName(Map<String, Value> queueConfig, String queueName) {
Value addressName = queueConfig.get("addressName");
return addressName != null ? addressName.getStringValue() : queueName;
}

private String getStringFromValueOrNull(Value filterVal) {
return filterVal != null ? filterVal.getStringValue() : null;
}

private Annotation getServiceConfigAnnotation(Service service) {
List<Annotation> annotationList = service
.getAnnotationList(ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS,
"ServiceConfig");

if (annotationList == null) {
return null;
}
return annotationList.isEmpty() ? null : annotationList.get(0);
}

private static class ResponseCallback implements CallableUnitCallback {
private ClientMessage message;
private boolean autoAck;

ResponseCallback(ClientMessage message, boolean autoAck) {
this.message = message;
this.autoAck = autoAck;
}

@Override
public void notifySuccess() {
if (autoAck) {
try {
message.acknowledge();
} catch (ActiveMQException e) {
throw new BallerinaConnectorException("Failure during acknowledging the message", e);
}
}
}

@Override
public void notifyFailure(BError error) {
ErrorHandlerUtils.printError("error: " + BLangVMErrors.getPrintableStackTrace(error));
}
}

private BValue getSignatureParameters(Resource onMessageResource, ClientMessage clientMessage) {
ProgramFile programFile = onMessageResource.getResourceInfo().getPackageInfo().getProgramFile();
BMap<String, BValue> messageObj = BLangConnectorSPIUtil.createBStruct(
programFile, ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS, ArtemisConstants.MESSAGE_OBJ);
messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, clientMessage);
return messageObj;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.consumer;

import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

import java.util.concurrent.CountDownLatch;

/**
* Extern function to start the Artemis consumer.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "start",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.LISTENER_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS)
)
public class Start extends BlockingNativeCallableUnit {
private CountDownLatch countDownLatch = new CountDownLatch(1);

@Override
public void execute(Context context) {
try {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> listenerObj = (BMap<String, BValue>) context.getRefArgument(0);
listenerObj.addNativeData(ArtemisConstants.COUNTDOWN_LATCH, countDownLatch);
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> sessionObj = (BMap<String, BValue>) listenerObj.get("session");
ClientSession session = (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION);
session.start();
new Thread(() -> {
try {
countDownLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
} catch (Exception e) {
context.setReturnValues(ArtemisUtils.getError(context, e));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.consumer;

import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

import java.util.concurrent.CountDownLatch;

/**
* Extern function to stop the Artemis consumer.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "stop",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.LISTENER_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS)
)
public class Stop extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
try {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> listenerObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientConsumer consumer = (ClientConsumer) listenerObj.getNativeData(ArtemisConstants.ARTEMIS_CONSUMER);
consumer.close();
ArtemisUtils.closeIfAnonymousSession(listenerObj);
CountDownLatch countDownLatch =
(CountDownLatch) listenerObj.getNativeData(ArtemisConstants.COUNTDOWN_LATCH);
if (countDownLatch != null) {
countDownLatch.countDown();
}
} catch (Exception e) {
context.setReturnValues(ArtemisUtils.getError(context, e));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.message;

import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function to acknowledge an Artemis message.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "acknowledge",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
isPublic = true
)
public class Acknowledge extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> messageObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE);
try {
message.acknowledge();
} catch (Exception e) {
context.setReturnValues(ArtemisUtils.getError(context, e));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.message;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.reader.BytesMessageUtil;
import org.apache.activemq.artemis.reader.MapMessageUtil;
import org.apache.activemq.artemis.reader.TextMessageUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BBoolean;
import org.ballerinalang.model.values.BByte;
import org.ballerinalang.model.values.BFloat;
import org.ballerinalang.model.values.BInteger;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BString;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.model.values.BValueArray;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;
import org.ballerinalang.stdlib.io.channels.base.Channel;
import org.ballerinalang.stdlib.io.utils.IOConstants;

import java.util.Map;

/**
* Extern function to create an ActiveMQ Artemis message.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "createMessage",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
args = {
@Argument(name = "session", type = TypeKind.OBJECT, structType = ArtemisConstants.SESSION_OBJ),
@Argument(name = "data", type = TypeKind.UNION),
@Argument(name = "config", type = TypeKind.RECORD, structType = "ConnectionConfiguration")
}
)
public class CreateMessage extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> messageObj = (BMap<String, BValue>) context.getRefArgument(0);
String type = messageObj.get("messageType").stringValue();

@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> sessionObj = (BMap<String, BValue>) context.getRefArgument(1);
BValue dataVal = context.getRefArgument(2);

@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> configObj = (BMap<String, BValue>) context.getRefArgument(3);
long expiration = getIntFromIntOrNil(configObj.get("expiration"), 0);
long timeStamp = getIntFromIntOrNil(configObj.get("timeStamp"), System.currentTimeMillis());
byte priority = ((BByte) configObj.get("priority")).byteValue();
boolean durable = ((BBoolean) configObj.get("durable")).booleanValue();
BValue routingType = configObj.get("routingType");

ClientSession session = (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION);

byte messageType = getMessageType(type);
ClientMessage message = session.createMessage(messageType, durable, expiration, timeStamp, priority);
if (routingType instanceof BString) {
message.setRoutingType(ArtemisUtils.getRoutingTypeFromString(routingType.stringValue()));
}

if (messageType == Message.TEXT_TYPE) {
TextMessageUtil.writeBodyText(message.getBodyBuffer(), new SimpleString(dataVal.stringValue()));
} else if (messageType == Message.BYTES_TYPE) {
BytesMessageUtil.bytesWriteBytes(message.getBodyBuffer(), ((BValueArray) dataVal).getBytes());
} else if (messageType == Message.MAP_TYPE) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
Map<String, BValue> mapObj = ((BMap<String, BValue>) dataVal).getMap();
TypedProperties map = new TypedProperties();
for (Map.Entry<String, BValue> entry : mapObj.entrySet()) {
SimpleString key = new SimpleString(entry.getKey());
BValue value = entry.getValue();
if (value instanceof BString) {
map.putSimpleStringProperty(key, new SimpleString(value.stringValue()));
} else if (value instanceof BInteger) {
map.putLongProperty(key, ((BInteger) value).intValue());
} else if (value instanceof BFloat) {
map.putDoubleProperty(key, ((BFloat) value).floatValue());
} else if (value instanceof BByte) {
map.putByteProperty(key, ((BByte) value).byteValue());
} else if (value instanceof BBoolean) {
map.putBooleanProperty(key, ((BBoolean) value).booleanValue());
} else if (value instanceof BValueArray) {
map.putBytesProperty(key, ((BValueArray) value).getBytes());
}
MapMessageUtil.writeBodyMap(message.getBodyBuffer(), map);
}
} else if (messageType == Message.STREAM_TYPE) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> streamObj = (BMap<String, BValue>) dataVal;
Channel channel = (Channel) streamObj.getNativeData(IOConstants.BYTE_CHANNEL_NAME);
message.setBodyInputStream(channel.getInputStream());
}

messageObj.addNativeData(ArtemisConstants.ARTEMIS_MESSAGE, message);
}

private byte getMessageType(String type) {
switch (type) {
case "TEXT":
return Message.TEXT_TYPE;
case "BYTES":
return Message.BYTES_TYPE;
case "MAP":
return Message.MAP_TYPE;
case "STREAM":
return Message.STREAM_TYPE;
default:
return Message.DEFAULT_TYPE;
}
}

private long getIntFromIntOrNil(BValue value, long defaultVal) {
if (value instanceof BInteger) {
return ((BInteger) value).intValue();
}
return defaultVal;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.message;

import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BInteger;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function to get the body size of an artemis message.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "getBodySize",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
isPublic = true
)
public class GetBodySize extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
try {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> messageObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE);
context.setReturnValues(new BInteger(message.getBodySize()));
} catch (Exception e) {
context.setReturnValues(ArtemisUtils.getError(context, e));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.message;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.reader.BytesMessageUtil;
import org.apache.activemq.artemis.reader.MapMessageUtil;
import org.apache.activemq.artemis.reader.TextMessageUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.BMapType;
import org.ballerinalang.model.types.BTypes;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BString;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.model.values.BValueArray;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;
import org.ballerinalang.util.exceptions.BallerinaException;

import java.util.Map;

/**
* Extern function to get the payload from a message.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "getPayload",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
args = {
@Argument(name = "key", type = TypeKind.STRING)
}
)
public class GetPayload extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> messageObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE);
byte messageType = message.getType();
if (messageType == Message.TEXT_TYPE) {
ActiveMQBuffer msgBuffer = message.getBodyBuffer();
context.setReturnValues(new BString(TextMessageUtil.readBodyText(msgBuffer).toString()));
} else if (messageType == Message.BYTES_TYPE) {
ActiveMQBuffer msgBuffer = message.getBodyBuffer();
byte[] bytes = new byte[msgBuffer.readableBytes()];
BytesMessageUtil.bytesReadBytes(msgBuffer, bytes);
context.setReturnValues(new BValueArray(bytes));
} else if (messageType == Message.MAP_TYPE) {
ActiveMQBuffer msgBuffer = message.getBodyBuffer();
TypedProperties properties = MapMessageUtil.readBodyMap(msgBuffer);
Map<String, Object> map = properties.getMap();
BMap<String, BValue> mapObj = getMapObj(map.entrySet().iterator().next().getValue());
if (mapObj != null) {
for (Map.Entry<String, Object> entry : map.entrySet()) {
mapObj.put(entry.getKey(), ArtemisUtils.getBValueFromObj(entry.getValue(), context));
}
context.setReturnValues(mapObj);
} else {
context.setReturnValues(ArtemisUtils.getError(context, "Unsupported type"));
}
} else if (messageType == Message.STREAM_TYPE) {
context.setReturnValues(ArtemisUtils.getError(context, new BallerinaException(
"Use the saveToFile function for STREAM type message")));
} else {
context.setReturnValues(ArtemisUtils.getError(context, new BallerinaException("Unsupported type")));
}
}

private BMap<String, BValue> getMapObj(Object val) {
if (val instanceof String) {
return new BMap<>(new BMapType(BTypes.typeString));
} else if (val instanceof Long || val instanceof Integer || val instanceof Short) {
return new BMap<>(new BMapType(BTypes.typeInt));
} else if (val instanceof Float || val instanceof Double) {
return new BMap<>(new BMapType(BTypes.typeFloat));
} else if (val instanceof Byte) {
return new BMap<>(new BMapType(BTypes.typeByte));
} else if (val instanceof byte[]) {
return new BMap<>(new BMapType(BTypes.fromString("byte[]")));
} else if (val instanceof Boolean) {
return new BMap<>(new BMapType(BTypes.typeBoolean));
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.message;

import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function for getting a message property.
*
* @since 0.995.0
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "getProperty",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
args = {
@Argument(name = "key", type = TypeKind.STRING)
}
)
public class GetProperty extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> messageObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE);

String key = context.getStringArgument(0);
Object property = message.getObjectProperty(key);
if (property != null) {
context.setReturnValues(ArtemisUtils.getBValueFromObj(property, context));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.message;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BString;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function for getting the type of a message.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "getType",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
args = {
@Argument(name = "key", type = TypeKind.STRING)
}
)
public class GetType extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> messageObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE);
byte messageType = message.getType();
BString type;
switch (messageType) {
case Message.TEXT_TYPE:
type = new BString("TEXT");
break;
case Message.BYTES_TYPE:
type = new BString("BYTES");
break;
case Message.MAP_TYPE:
type = new BString("MAP");
break;
case Message.STREAM_TYPE:
type = new BString("STREAM");
break;
default:
type = new BString("UNSUPPORTED");

}
context.setReturnValues(type);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.message;

import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BBoolean;
import org.ballerinalang.model.values.BByte;
import org.ballerinalang.model.values.BFloat;
import org.ballerinalang.model.values.BInteger;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BString;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.model.values.BValueArray;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function for setting a property to a Artemis message.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "putProperty",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
args = {
@Argument(name = "key", type = TypeKind.STRING),
@Argument(name = "value", type = TypeKind.UNION)
}
)
public class PutProperty extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> messageObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE);

String key = context.getStringArgument(0);
BValue valObj = context.getRefArgument(2);
if (valObj instanceof BString) {
message.putStringProperty(key, valObj.stringValue());
} else if (valObj instanceof BInteger) {
message.putLongProperty(key, ((BInteger) valObj).intValue());
} else if (valObj instanceof BFloat) {
message.putDoubleProperty(key, ((BFloat) valObj).floatValue());
} else if (valObj instanceof BBoolean) {
message.putBooleanProperty(key, ((BBoolean) valObj).booleanValue());
} else if (valObj instanceof BByte) {
message.putByteProperty(key, ((BByte) valObj).byteValue());
} else if (valObj instanceof BValueArray) {
message.putBytesProperty(key, ((BValueArray) valObj).getBytes());
}//else is not needed because these are the only values supported by the Ballerina the method
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.message;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;
import org.ballerinalang.stdlib.io.channels.base.Channel;
import org.ballerinalang.stdlib.io.utils.IOConstants;
import org.ballerinalang.util.exceptions.BallerinaException;

import java.nio.channels.Channels;

/**
* Extern function to save a stream message to a byte channel.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "saveToWritableByteChannel",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
args = {
@Argument(name = "ch", type = TypeKind.OBJECT, structType = "WritableByteChannel")
}
)
public class SaveToWritableByteChannel extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> messageObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientMessage message = (ClientMessage) messageObj.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE);
if (message.getType() == Message.STREAM_TYPE) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> byteChannelObj = (BMap<String, BValue>) context.getRefArgument(1);
Channel channel = (Channel) byteChannelObj.getNativeData(IOConstants.BYTE_CHANNEL_NAME);
try {
message.saveToOutputStream(Channels.newOutputStream(channel.getByteChannel()));
} catch (ActiveMQException e) {
context.setReturnValues(ArtemisUtils.getError(context, new BallerinaException(
"Error while writing to WritableByteChannel")));
}

} else {
context.setReturnValues(ArtemisUtils.getError(context, new BallerinaException("Unsupported type")));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.producer;

import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function to close Artemis producer.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "close",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.PRODUCER_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
isPublic = true
)
public class Close extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> producerObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientProducer producer = (ClientProducer) producerObj.getNativeData(ArtemisConstants.ARTEMIS_PRODUCER);
try {
producer.close();
ArtemisUtils.closeIfAnonymousSession(producerObj);
} catch (Exception e) {
context.setReturnValues(ArtemisUtils.getError(context, e));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.producer;

import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.NativeCallableUnit;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BBoolean;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Extern function for creating an ActiveMQ Artemis producer.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "createProducer",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.PRODUCER_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
args = {
@Argument(name = "addressName", type = TypeKind.STRING),
@Argument(name = "config", type = TypeKind.RECORD, structType = "AddressConfiguration"),
@Argument(name = "rate", type = TypeKind.INT)
}
)
public class CreateProducer implements NativeCallableUnit {

private static final Logger logger = LoggerFactory.getLogger(CreateProducer.class);

@Override
public void execute(Context context, CallableUnitCallback callableUnitCallback) {
try {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> producerObj = (BMap<String, BValue>) context.getRefArgument(0);

SimpleString addressName = new SimpleString(context.getStringArgument(0));

@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> configObj = (BMap<String, BValue>) context.getRefArgument(1);

String routingType = configObj.get(ArtemisConstants.ROUTING_TYPE).stringValue();
boolean autoCreated = ((BBoolean) configObj.get("autoCreated")).booleanValue();

int rate = ArtemisUtils.getIntFromLong(context.getIntArgument(0), "rate", logger);
ClientSession session = ArtemisUtils.getClientSessionFromBMap(producerObj);

if (autoCreated) {
ClientSession.AddressQuery addressQuery = session.addressQuery(addressName);
if (!addressQuery.isExists()) {
session.createAddress(addressName, ArtemisUtils.getRoutingTypeFromString(routingType), true);
} else {
logger.warn("Address with the name {} already exists. ", addressName);
}
}
ClientProducer producer = session.createProducer(addressName, rate);
producerObj.addNativeData(ArtemisConstants.ARTEMIS_PRODUCER, producer);

} catch (Exception ex) {
ArtemisUtils.throwBallerinaException("Error occurred while creating the producer.", context, ex, logger);
}
}

@Override
public boolean isBlocking() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.producer;

import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.BlockingNativeCallableUnit;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BBoolean;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function to check Artemis producer closure.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "isClosed",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.PRODUCER_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
isPublic = true
)
public class IsClosed extends BlockingNativeCallableUnit {

@Override
public void execute(Context context) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> producerObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientProducer producer =
(ClientProducer) producerObj.getNativeData(ArtemisConstants.ARTEMIS_PRODUCER);
context.setReturnValues(new BBoolean(producer.isClosed()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.producer;

import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.NativeCallableUnit;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function of the producer to send a message.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "externSend",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.PRODUCER_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
args = {
@Argument(name = "data", type = TypeKind.OBJECT, structType = ArtemisConstants.MESSAGE_OBJ)
}
)
public class Send implements NativeCallableUnit {

@Override
public void execute(Context context, CallableUnitCallback callableUnitCallback) {
try {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> producerObj = (BMap<String, BValue>) context.getRefArgument(0);
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> data = (BMap<String, BValue>) context.getRefArgument(1);
ClientProducer producer = (ClientProducer) producerObj.getNativeData(ArtemisConstants.ARTEMIS_PRODUCER);
ClientMessage message = (ClientMessage) data.getNativeData(ArtemisConstants.ARTEMIS_MESSAGE);
producer.send(message, message1 -> callableUnitCallback.notifySuccess());
} catch (Exception e) {
context.setReturnValues(ArtemisUtils.getError(context, e));
callableUnitCallback.notifySuccess();
}
}

@Override
public boolean isBlocking() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.session;

import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.NativeCallableUnit;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function to close Artemis session.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "close",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.SESSION_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
isPublic = true
)
public class Close implements NativeCallableUnit {

@Override
public void execute(Context context, CallableUnitCallback callableUnitCallback) {
try {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> sessionObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientSession session = (ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION);
session.close();
} catch (Exception e) {
context.setReturnValues(ArtemisUtils.getError(context, e));
}
}

@Override
public boolean isBlocking() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.session;

import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.messaging.artemis.ArtemisUtils;
import org.ballerinalang.model.NativeCallableUnit;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BString;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.Argument;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Extern function for Artemis session creation.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "createSession",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.SESSION_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
args = {
@Argument(name = "con", type = TypeKind.OBJECT, structType = ArtemisConstants.CONNECTION_OBJ),
@Argument(name = "config", type = TypeKind.RECORD, structType = "SessionConfiguration")
}
)
public class CreateSession implements NativeCallableUnit {
private static final Logger logger = LoggerFactory.getLogger(CreateSession.class);

@Override
public void execute(Context context, CallableUnitCallback callableUnitCallback) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> sessionObj = (BMap<String, BValue>) context.getRefArgument(0);
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> connection = (BMap<String, BValue>) context.getRefArgument(1);
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> config = (BMap<String, BValue>) context.getRefArgument(2);

ServerLocator serverLocator = (ServerLocator) connection.getNativeData(
ArtemisConstants.ARTEMIS_CONNECTION_POOL);
ClientSessionFactory sessionFactory =
(ClientSessionFactory) connection.getNativeData(ArtemisConstants.ARTEMIS_SESSION_FACTORY);
try {
String username = null;
String password = null;
BValue userValue = config.get("username");
if (userValue instanceof BString) {
username = userValue.stringValue();
}
BValue passValue = config.get("password");
if (passValue instanceof BString) {
password = passValue.stringValue();
}
ClientSession session = sessionFactory.createSession(username, password, false, true, true,
serverLocator.isPreAcknowledge(),
serverLocator.getAckBatchSize());
sessionObj.addNativeData(ArtemisConstants.ARTEMIS_SESSION, session);
} catch (Exception e) {
ArtemisUtils.throwBallerinaException("Error occurred while starting session", context, e, logger);
}
}

@Override
public boolean isBlocking() {
return true;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.ballerinalang.messaging.artemis.externimpl.session;

import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.ballerinalang.bre.Context;
import org.ballerinalang.bre.bvm.CallableUnitCallback;
import org.ballerinalang.messaging.artemis.ArtemisConstants;
import org.ballerinalang.model.NativeCallableUnit;
import org.ballerinalang.model.types.TypeKind;
import org.ballerinalang.model.values.BBoolean;
import org.ballerinalang.model.values.BMap;
import org.ballerinalang.model.values.BValue;
import org.ballerinalang.natives.annotations.BallerinaFunction;
import org.ballerinalang.natives.annotations.Receiver;

/**
* Extern function to check Artemis session closure.
*
* @since 0.995
*/

@BallerinaFunction(
orgName = ArtemisConstants.BALLERINA, packageName = ArtemisConstants.ARTEMIS,
functionName = "isClosed",
receiver = @Receiver(type = TypeKind.OBJECT, structType = ArtemisConstants.SESSION_OBJ,
structPackage = ArtemisConstants.PROTOCOL_PACKAGE_ARTEMIS),
isPublic = true
)
public class IsClosed implements NativeCallableUnit {

@Override
public void execute(Context context, CallableUnitCallback callableUnitCallback) {
@SuppressWarnings(ArtemisConstants.UNCHECKED)
BMap<String, BValue> sessionObj = (BMap<String, BValue>) context.getRefArgument(0);
ClientSession session =
(ClientSession) sessionObj.getNativeData(ArtemisConstants.ARTEMIS_SESSION);
context.setReturnValues(new BBoolean(session.isClosed()));
}

@Override
public boolean isBlocking() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
#
# WSO2 Inc. licenses this file to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file except
# in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

####################### HANDLERS #######################
# Configurations for console logging
java.util.logging.ConsoleHandler.level=ALL
java.util.logging.ConsoleHandler.formatter=org.ballerinalang.logging.formatters.BallerinaLogFormatter
org.ballerinalang.logging.formatters.BallerinaLogFormatter.format=%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS,%1$tL %2$s [%3$s] - %4$s %n

org.ballerinalang.test.utils.TestLogHandler.level=INFO
org.ballerinalang.test.utils.TestLogHandler.formatter=org.ballerinalang.logging.formatters.DefaultLogFormatter
org.ballerinalang.logging.formatters.DefaultLogFormatter.format=[%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS,%1$tL] %2$s [%3$s] - %4$s %5$s %n

####################### LOGGERS #######################
# Ballerina user level root logger
ballerina.handlers=java.util.logging.ConsoleHandler
ballerina.level=ALL
ballerina.useParentHandlers=false

# JUL root logger
.handlers=org.ballerinalang.test.utils.TestLogHandler
.level=INFO
29 changes: 29 additions & 0 deletions stdlib/messaging/activemq-artemis/src/test/resources/testng.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
WSO2 Inc. licenses this file to you under the Apache License,
Version 2.0 (the "License"); you may not use this file except
in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >

<suite name="ballerina-test-suite">
<!-- Ballerina language Test Cases. -->
<test name="ballerina-lang-test-suite" parallel="false">
<packages>

</packages>
</test>
</suite>
8 changes: 2 additions & 6 deletions tests/ballerina-integration-test/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
apply from: "$rootDir/gradle/javaProject.gradle"

dependencies {
testCompile 'org.apache.activemq:artemis-server:2.6.3'
implementation project(':ballerina-integration-test-utils')
implementation project(':ballerina-config')
implementation project(':ballerina-core')
@@ -21,14 +22,9 @@ dependencies {
implementation project(':ballerina-grpc')
implementation project(':protobuf-ballerina')
implementation 'com.google.protobuf:protobuf-java:3.5.1'
implementation project(':ballerina-http')
implementation project(':ballerina-h2')
implementation project(':ballerina-mysql')
implementation project(':ballerina-sql')
implementation project(':ballerina-transactions')
implementation project(':ballerina-activemq-artemis')
implementation project(':ballerina-websub')
implementation project(':ballerina-jms')
implementation project(':ballerina-grpc')
implementation project(':ballerina-socket')
implementation project(':ballerina-observability')
implementation project(':observability-test-utils')
16 changes: 16 additions & 0 deletions tests/ballerina-integration-test/pom.xml
Original file line number Diff line number Diff line change
@@ -13,6 +13,12 @@
<name>Ballerina - Integration Test</name>

<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-server</artifactId>
<version>${artemis.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-integration-test-utils</artifactId>
@@ -62,6 +68,10 @@
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-http</artifactId>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-activemq-artemis</artifactId>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-transactions</artifactId>
@@ -236,6 +246,12 @@
<artifactId>apacheds-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.ballerinalang</groupId>
<artifactId>ballerina-activemq-artemis</artifactId>
<type>zip</type>
<classifier>ballerina-binary-repo</classifier>
</dependency>
</dependencies>

<build>
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.ballerinalang.test.messaging.artemis;

import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.ballerinalang.test.BaseTest;
import org.ballerinalang.test.context.BServerInstance;
import org.ballerinalang.test.context.BallerinaTestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterGroups;
import org.testng.annotations.BeforeGroups;

import java.nio.file.Path;
import java.nio.file.Paths;

/**
* Includes common functionality for Artemis test cases.
*/
public class ArtemisTestCommons extends BaseTest {
private static final Logger log = LoggerFactory.getLogger(ArtemisTestCommons.class);

protected static final int TIMEOUT_IN_SECS = 10;

private EmbeddedActiveMQ embeddedBroker;

protected static BServerInstance serverInstance;

@BeforeGroups(value = "artemis-test", alwaysRun = true)
public void start() throws BallerinaTestException {
Path path = Paths.get("src", "test", "resources", "messaging", "artemis");

// Start broker
embeddedBroker = new EmbeddedActiveMQ();
String brokerXML = path.resolve("configfiles").resolve("broker.xml").toUri().toString();
embeddedBroker.setConfigResourcePath(brokerXML);
try {
embeddedBroker.start();
} catch (Exception ex) {
log.error("Cannot start ActiveMQ Artemis broker " + ex.getMessage(), ex);
}

// Start Ballerina server
serverInstance = new BServerInstance(balServer);
serverInstance.startServer(path.toAbsolutePath().toString(), "consumers");
}

@AfterGroups(value = "artemis-test", alwaysRun = true)
public void stop() throws Exception {
serverInstance.removeAllLeechers();
serverInstance.shutdownServer();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.ballerinalang.test.messaging.artemis;

import org.ballerinalang.launcher.util.BCompileUtil;
import org.ballerinalang.launcher.util.BRunUtil;
import org.ballerinalang.launcher.util.CompileResult;
import org.ballerinalang.test.context.BallerinaTestException;
import org.ballerinalang.test.context.LogLeecher;
import org.ballerinalang.test.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;

/**
* Includes tests for different payload types for ANYCAST and MULTICAST queues.
*/
@Test(groups = {"artemis-test"})
public class MessagePayloadTest extends ArtemisTestCommons {
private CompileResult anyCastResult;
private CompileResult multiCastResult;
private static final String MULTICAST_MSG = " multicast ";

@BeforeClass
public void setup() throws URISyntaxException {
TestUtils.prepareBalo(this);
Path sourcePath = Paths.get("src", "test", "resources", "messaging", "artemis", "producers");
anyCastResult = BCompileUtil.compile(sourcePath.resolve("anycast_message.bal").toAbsolutePath().toString());
multiCastResult = BCompileUtil.compile(sourcePath.resolve("multicast_message.bal").toAbsolutePath().toString());
}

@Test(description = "Tests the sending of a string message to a queue")
public void testSendString() {
String errorLog = "string message Hello World";
String functionName = "testSendString";
testSend(anyCastResult, errorLog, functionName);
testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName);

}

@Test(description = "Tests the sending of a byte[] message to a queue")
public void testSendByteArray() {
String errorLog = "byte[] message [1, 2, 2, 3, 3, 2]";
String functionName = "testSendByteArray";
testSend(anyCastResult, errorLog, functionName);
testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName);
}

@Test(description = "Tests the sending of a map<string> message to a queue")
public void testSendMapString() {
String errorLog = "map<string> message {\"name\":\"Riyafa\", \"hello\":\"world\"}";
String functionName = "testSendMapString";
testSend(anyCastResult, errorLog, functionName);
testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName);
}

@Test(description = "Tests the sending of a map<int> message to a queue")
public void testSendMapInt() {
String errorLog = "map<int> message {\"num\":1, \"num2\":2}";
String functionName = "testSendMapInt";
testSend(anyCastResult, errorLog, functionName);
testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName);
}


@Test(description = "Tests the sending of a map<float> message to a queue")
public void testSendMapFloat() {
String errorLog = "map<float> message {\"numf1\":1.1, \"numf2\":1.2}";
String functionName = "testSendMapFloat";
testSend(anyCastResult, errorLog, functionName);
testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName);
}

@Test(description = "Tests the sending of a map<byte> message to a queue")
public void testSendMapByte() {
String errorLog = "map<byte> message {\"byte1\":1, \"byte2\":7}";
String functionName = "testSendMapByte";
testSend(anyCastResult, errorLog, functionName);
testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName);
}

@Test(description = "Tests the sending of a map<boolean> message to a queue")
public void testSendMapBoolean() {
String errorLog = "map<boolean> message {\"first\":true, \"second\":false}";
String functionName = "testSendMapBoolean";
testSend(anyCastResult, errorLog, functionName);
testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName);
}

@Test(description = "Tests the sending of a map<byte[]> message to a queue")
public void testSendMapByteArray() {
String errorLog = "map<byte[]> message {\"array2\":[5], \"array1\":[1, 2, 3]}";
String functionName = "testSendMapByteArray";
testSend(anyCastResult, errorLog, functionName);
testSend(multiCastResult, errorLog + MULTICAST_MSG, functionName);
}

private void testSend(CompileResult result, String expectedErrorLog, String functionName) {
LogLeecher logLeecher = new LogLeecher(expectedErrorLog);
serverInstance.addLogLeecher(logLeecher);
BRunUtil.invoke(result, functionName);
try {
logLeecher.waitForText(TIMEOUT_IN_SECS * 1000);
} catch (BallerinaTestException e) {
Assert.fail();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.ballerinalang.test.messaging.artemis;

import org.ballerinalang.launcher.util.BCompileUtil;
import org.ballerinalang.launcher.util.BRunUtil;
import org.ballerinalang.launcher.util.CompileResult;
import org.ballerinalang.test.context.BallerinaTestException;
import org.ballerinalang.test.context.LogLeecher;
import org.ballerinalang.test.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;

/**
* Includes tests for a simple consumer and producer.
*/
@Test(groups = {"artemis-test"})
public class SimpleConsumerTest extends ArtemisTestCommons {
private CompileResult result;

@BeforeClass
public void setup() throws URISyntaxException {
TestUtils.prepareBalo(this);
Path sourcePath = Paths.get("src", "test", "resources", "messaging", "artemis", "producers");
result = BCompileUtil.compile(sourcePath.resolve("simple_producer.bal").toAbsolutePath().toString());
}

@Test(description = "Tests the sending of a string message to a queue")
public void testSimpleSend() {
String errorLog = "received: Hello World";
String functionName = "testSimpleSend";
testSend(result, errorLog, functionName);

}

private void testSend(CompileResult result, String expectedErrorLog, String functionName) {
LogLeecher logLeecher = new LogLeecher(expectedErrorLog);
serverInstance.addLogLeecher(logLeecher);
BRunUtil.invoke(result, functionName);
try {
logLeecher.waitForText(TIMEOUT_IN_SECS * 1000);
} catch (BallerinaTestException e) {
Assert.fail();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[project]
org-name = "riyafa"
version = "0.0.1"

Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:xi="http://www.w3.org/2001/XInclude"
xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq:core ">

<security-enabled>false</security-enabled>
<name>0.0.0.0</name>


<persistence-enabled>true</persistence-enabled>

<!-- this could be ASYNCIO, MAPPED, NIO
ASYNCIO: Linux Libaio
MAPPED: mmap files
NIO: Plain Java Files
-->
<journal-type>ASYNCIO</journal-type>

<paging-directory>data/paging</paging-directory>

<bindings-directory>data/bindings</bindings-directory>

<journal-directory>data/journal</journal-directory>

<large-messages-directory>data/large-messages</large-messages-directory>

<journal-datasync>true</journal-datasync>

<journal-min-files>2</journal-min-files>

<journal-pool-files>10</journal-pool-files>

<journal-file-size>10M</journal-file-size>

<!--
This value was determined through a calculation.
Your system could perform 41.67 writes per millisecond
on the current journal configuration.
That translates as a sync write every 24000 nanoseconds.
Note: If you specify 0 the system will perform writes directly to the disk.
We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
-->
<journal-buffer-timeout>24000</journal-buffer-timeout>


<!--
When using ASYNCIO, this will determine the writing queue depth for libaio.
-->
<journal-max-io>4096</journal-max-io>
<!--
You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
<network-check-NIC>theNicName</network-check-NIC>
-->

<!--
Use this to use an HTTP server to validate the network
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->

<!-- <network-check-period>10000</network-check-period> -->
<!-- <network-check-timeout>1000</network-check-timeout> -->

<!-- this is a comma separated list, no spaces, just DNS or IPs
it should accept IPV6
Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
Using IPs that could eventually disappear or be partially visible may defeat the purpose.
You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
<!-- <network-check-list>10.0.0.1</network-check-list> -->

<!-- use this to customize the ping used for ipv4 addresses -->
<!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->

<!-- use this to customize the ping used for ipv6 addresses -->
<!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->




<!-- how often we are looking for how many bytes are being used on the disk in ms -->
<disk-scan-period>5000</disk-scan-period>

<!-- once the disk hits this limit the system will block, or close the connection in certain protocols
that won't support flow control. -->
<max-disk-usage>90</max-disk-usage>

<!-- should the broker detect dead locks and other issues -->
<critical-analyzer>true</critical-analyzer>

<critical-analyzer-timeout>120000</critical-analyzer-timeout>

<critical-analyzer-check-period>60000</critical-analyzer-check-period>

<critical-analyzer-policy>HALT</critical-analyzer-policy>

<!-- the system will enter into page mode once you hit this limit.
This is an estimate in bytes of how much the messages are using in memory
The system will use half of the available memory (-Xmx) by default for the global-max-size.
You may specify a different value here if you need to customize it to your needs.
<global-max-size>100Mb</global-max-size>
-->

<acceptors>

<!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
<!-- amqpCredits: The number of credits sent to AMQP producers -->
<!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->

<!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
"anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->

<!-- Acceptor for every supported protocol -->
<acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

</acceptors>


<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<!-- we need this otherwise ./artemis data imp wouldn't work -->
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>

<address-settings>
<!-- if you define auto-create on certain queues, management has to be auto-create -->
<address-setting match="activemq.management#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
</address-setting>
</address-settings>

<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="ExpiryQueue">
<anycast>
<queue name="ExpiryQueue" />
</anycast>
</address>
</addresses>

</core>
</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import ballerina/artemis;
import ballerina/io;

artemis:Connection con = new("tcp://localhost:61616");
artemis:Session session = new(con);
listener artemis:Listener artemisListener = new(session);

@artemis:ServiceConfig {
queueConfig: {
queueName: "anycast_queue"
}
}
service anyCastConsumer on artemisListener {
resource function onMessage(artemis:Message message) returns error? {
var payload = message.getPayload();
if (payload is byte[]) {
io:print("byte[] ");
} else if (payload is map<string>) {
io:print("map<string> ");
} else if (payload is map<int>) {
io:print("map<int> ");
} else if (payload is map<float>) {
io:print("map<float> ");
} else if (payload is map<byte>) {
io:print("map<byte> ");
} else if (payload is map<boolean>) {
io:print("map<boolean> ");
} else if (payload is map<byte[]>) {
io:print("map<byte[]> ");
} else if (payload is string) {
io:print("string ");
}
io:print("message ");
io:println(payload);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
@artemis:ServiceConfig {
queueConfig: {
queueName: "multicast_queue",
addressName: "multicast_address",
routingType: artemis:MULTICAST
}
}
service multiCastConsumer on artemisListener {
resource function onMessage(artemis:Message message) returns error? {
var payload = message.getPayload();
if (payload is byte[]) {
io:print("byte[] ");
} else if (payload is map<string>) {
io:print("map<string> ");
} else if (payload is map<int>) {
io:print("map<int> ");
} else if (payload is map<float>) {
io:print("map<float> ");
} else if (payload is map<byte>) {
io:print("map<byte> ");
} else if (payload is map<boolean>) {
io:print("map<boolean> ");
} else if (payload is map<byte[]>) {
io:print("map<byte[]> ");
} else if (payload is string) {
io:print("string ");
}
io:print("message ");
io:print(payload);
io:println(" multicast ");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import ballerina/artemis;
import ballerina/io;

@artemis:ServiceConfig {
queueConfig: {
queueName: "simple_queue"
}
}
service simpleConsumer on new artemis:Listener({host: "localhost", port: 61616}) {
resource function onMessage(artemis:Message message) returns error? {
var payload = message.getPayload();
if (payload is string) {
io:println("received: " + payload);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import ballerina/artemis;

artemis:Connection con = new("tcp://localhost:61616");
artemis:Session session = new(con);
artemis:Producer prod = new(session, "anycast_queue", addressConfig = {autoCreated:false});

public function testSendString() {
var err = prod->send("Hello World");
}

public function testSendByteArray() {
byte[6] msg = [1, 2, 2, 3, 3, 2];
var err = prod->send(msg);
}

public function testSendMapString() {
map<string> msg = {
"name": "Riyafa",
"hello": "world"
};
var err = prod->send(msg);
}

public function testSendMapInt() {
map<int> msg = {
"num": 1,
"num2": 2
};
var err = prod->send(msg);
}

public function testSendMapFloat() {
map<float> msg = {
"numf1": 1.1,
"numf2": 1.2
};
var err = prod->send(msg);
}

public function testSendMapByte() {
map<byte> msg = {
"byte1": 1,
"byte2": 7
};
var err = prod->send(msg);
}

public function testSendMapBoolean() {
map<boolean> msg = {
"first": true,
"second": false
};
var err = prod->send(msg);
}

public function testSendMapByteArray() {
byte[3] array1 = [1, 2, 3];
byte[1] array2 = [5];
map<byte[]> msg = {
"array1": array1,
"array2": array2
};
var err = prod->send(msg);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import ballerina/artemis;

artemis:Connection con = new("tcp://localhost:61616");
artemis:Session session = new(con);
artemis:Producer prod = new(session, "multicast_address", addressConfig = {autoCreated:false});

public function testSendString() {
var err = prod->send("Hello World");
}

public function testSendByteArray() {
byte[6] msg = [1, 2, 2, 3, 3, 2];
var err = prod->send(msg);
}

public function testSendMapString() {
map<string> msg = {
"name": "Riyafa",
"hello": "world"
};
var err = prod->send(msg);
}

public function testSendMapInt() {
map<int> msg = {
"num": 1,
"num2": 2
};
var err = prod->send(msg);
}

public function testSendMapFloat() {
map<float> msg = {
"numf1": 1.1,
"numf2": 1.2
};
var err = prod->send(msg);
}

public function testSendMapByte() {
map<byte> msg = {
"byte1": 1,
"byte2": 7
};
var err = prod->send(msg);
}

public function testSendMapBoolean() {
map<boolean> msg = {
"first": true,
"second": false
};
var err = prod->send(msg);
}

public function testSendMapByteArray() {
byte[3] array1 = [1, 2, 3];
byte[1] array2 = [5];
map<byte[]> msg = {
"array1": array1,
"array2": array2
};
var err = prod->send(msg);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import ballerina/artemis;

public function testSimpleSend() {
artemis:Producer prod = new({host: "localhost", port: 61616}, "simple_queue", addressConfig = {autoCreated:false});
var err = prod->send("Hello World");
}
Original file line number Diff line number Diff line change
@@ -270,4 +270,12 @@
<class name="org.ballerinalang.test.socket.SocketClientCallbackServiceTestCase"/>
</classes>
</test>

<test name="artemis-test" parallel="false">
<classes>
<class name="org.ballerinalang.test.messaging.artemis.ArtemisTestCommons" />
<class name="org.ballerinalang.test.messaging.artemis.MessagePayloadTest" />
<class name="org.ballerinalang.test.messaging.artemis.SimpleConsumerTest" />
</classes>
</test>
</suite>

0 comments on commit a9b82cc

Please sign in to comment.