Skip to content

Commit

Permalink
Add data binding support for Artemis Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
riyafa committed May 21, 2019
1 parent a4c6e39 commit d474d2c
Show file tree
Hide file tree
Showing 45 changed files with 1,438 additions and 176 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Make sure to have the ActiveMQ Artemis broker running.

# Navigate to the directory that contains the 'artemis_multicast_consumer.bal' file and issue the
# Navigate to the directory that contains the 'artemis_anycast_session_consumer.bal' file and execute the
# 'ballerina run' command as follows.
$ ballerina run artemis_multicast_consumer.bal
# The ActiveMQ Artemis consumer runs as a Ballerina service and listens to the subscribed queue.
$ ballerina run artemis_anycast_session_consumer.bal
[ballerina/artemis] Client Consumer created for queue queue1

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Make sure to have the ActiveMQ Artemis broker running.

# Navigate to the directory that contains the 'artemis_anycast_session_producer.bal' file and issue the
# Navigate to the directory that contains the 'artemis_anycast_session_producer.bal' file and execute the
# 'ballerina run' command as follows.
$ ballerina run artemis_anycast_session_producer.bal
37 changes: 37 additions & 0 deletions examples/artemis-data-binding/artemis_data_binding.bal
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import ballerina/artemis;
import ballerina/http;
import ballerina/log;

type Order record {
string description?;
int id;
float cost;
};

// Consumer listens to the queue (i.e., "my_queue") with the address
// (i.e., "my_address").
@artemis:ServiceConfig {
queueConfig: {
queueName: "my_queue",
addressName: "my_address"
}
}
// Attaches the service to the listener.
service artemisConsumer on new artemis:Listener(
{ host: "localhost", port: 61616 }) {

// This resource is triggered when a valid `Order` is received.
resource function onMessage(artemis:Message message, Order orderDetails)
returns error? {

// Posts order details to the backend and awaits response.
http:Client clientEP = new("http://www.mocky.io");
var response = clientEP->post("/v2/5cde49ef3000005e004307f0",
untaint check json.convert(orderDetails));
if (response is http:Response) {
log:printInfo(check response.getTextPayload());
} else {
log:printError("Invalid response ", err = response);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Artemis data binding helps access payload through the last resource
// parameter. `string`, `json`, `xml`, `byte[]`, `record`, `map<string>`, `map<int>`,
// `map<float>`, `map<byte>`, `map<byte[]>` and `map<boolean>` are supported as
// parameter types.
6 changes: 6 additions & 0 deletions examples/artemis-data-binding/artemis_data_binding.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Make sure to have the ActiveMQ Artemis broker running.

# Navigate to the directory that contains the 'artemis_data_binding.bal' file
# and execute the 'ballerina run' command as follows
$ ballerina run artemis_data_binding.bal
[ballerina/artemis] Client Consumer created for queue my_queue
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Make sure to have the ActiveMQ Artemis broker running.

# Navigate to the directory that contains the 'artemis_multicast_consumer.bal' file and issue the
# 'ballerina run' command.
$ ballerina run artemis_multicast_consumer.bal as follows.
# The ActiveMQ Artemis consumer runs as a Ballerina service and listens to the subscribed queue.
# Navigate to the directory that contains the 'artemis_multicast_consumer.bal' file and execute the
# 'ballerina run' command as follows.
$ ballerina run artemis_multicast_consumer.bal
[ballerina/artemis] Client Consumer created for queue my_queue

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Make sure to have the ActiveMQ Artemis broker running.

# Navigate to the directory that contains the 'artemis_multicast_producer.bal' file and issue the
# Navigate to the directory that contains the 'artemis_multicast_producer.bal' file and execute the
# 'ballerina run' command as follows.
$ ballerina run artemis_multicast_producer.bal
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Make sure to have the ActiveMQ Artemis broker running.

# Navigate to the directory that contains the 'artemis_simple_transaction_consumer.bal' file and issue the
# Navigate to the directory that contains the 'artemis_simple_transaction_consumer.bal' file and execute the
# 'ballerina run' command as follows.
$ ballerina run --experimental artemis_simple_transaction_consumer.bal
# The ActiveMQ Artemis consumer runs as a Ballerina service and listens to the subscribed queue.

[ballerina/http] started HTTP/WS endpoint 172.17.0.1:34985
2019-04-18 15:19:32,475 INFO [ballerina/transactions] - Created transaction: 29787878-3686-41b3-9ab3-4bc60ad278f6
2019-05-17 14:11:29,258 INFO [ballerina/transactions] - Created transaction: 78aabdba-3994-47cd-b083-80744961f69b
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Make sure to have the ActiveMQ Artemis broker running.

# Navigate to the directory that contains the 'artemis_simple_transaction_producer.bal' file and issue the
# Navigate to the directory that contains the 'artemis_simple_transaction_producer.bal' file and execute the
# 'ballerina run' command as follows.
$ ballerina run --experimental artemis_simple_transaction_producer.bal
# The ActiveMQ Artemis producer runs as a Ballerina service and listens to the subscribed queue.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Make sure to have the ActiveMQ Artemis broker running.

# Navigate to the directory that contains the 'artemis_transaction_producer.bal' file and issue the
# Navigate to the directory that contains the 'artemis_transaction_producer.bal' file and execute the
# 'ballerina run' command as follows.
$ ballerina run --experimental artemis_transaction_producer.bal
# The ActiveMQ Artemis producer runs as a Ballerina service and listens to the subscribed queue.
Expand Down
4 changes: 4 additions & 0 deletions examples/index.json
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,10 @@
{
"name": "Transaction Producer",
"url": "artemis-transaction-producer"
},
{
"name": "Data Binding",
"url": "artemis-data-binding"
}
]
},
Expand Down
19 changes: 14 additions & 5 deletions stdlib/messaging/activemq-artemis/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,27 @@ dependencies {
// implementation project(':ballerina-lang')
implementation project(':ballerina-io')
implementation project(':ballerina-builtin')
implementation project(':ballerina-time')
implementation project(':ballerina-crypto')
implementation project(':ballerina-filepath')
implementation project(':ballerina-log-api')
implementation project(':ballerina-system')
implementation project(':ballerina-runtime-api')
implementation project(':ballerina-utils')
implementation project(':ballerina-launcher')
implementation 'org.apache.activemq:artemis-core-client'

baloImplementation project(path: ':ballerina-runtime-api', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-utils', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-builtin', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-io', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-builtin', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-time', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-crypto', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-filepath', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-system', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-log-api', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-system', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-runtime-api', configuration: 'baloImplementation')
baloImplementation project(path: ':ballerina-utils', configuration: 'baloImplementation')

testCompile 'org.testng:testng'

}

test {
Expand All @@ -50,6 +54,11 @@ test {
into "$buildDir/lib/repo/ballerina"
}
}
useTestNG() {
suites 'src/test/resources/testng.xml'
}
systemProperty "java.util.logging.config.file", "src/test/resources/logging.properties"
systemProperty "java.util.logging.manager", "org.ballerinalang.logging.BLogManager"
}

description = 'Ballerina - ActiveMQ Artemis'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2019, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.ballerinalang.messaging.artemis;

import org.ballerinalang.bre.Context;
import org.ballerinalang.util.exceptions.BallerinaException;

/**
* BallerinaException that could occur in Artemis connector.
*
* @since 0.995
*/
public class ArtemisConnectorException extends BallerinaException {
private static final long serialVersionUID = 381055783364464822L;

/**
* Constructs a new {@link ArtemisConnectorException} with the specified detail message.
*
* @param message Error Message
*/
public ArtemisConnectorException(String message) {
super(message);
}

/**
* Constructs a new {@link ArtemisConnectorException} with error message and ballerina context.
*
* @param message Error message
* @param context Ballerina context
*/
public ArtemisConnectorException(String message, Context context) {
super(message, context);
}

/**
* Constructs a new {@link ArtemisConnectorException} with the specified detail message and cause.
*
* @param message Error message
* @param cause Cause
*/
public ArtemisConnectorException(String message, Throwable cause) {
super(message, cause);
}

/**
* Constructs a new {@link ArtemisConnectorException} with the specified detail message,
* cause and ballerina context.
*
* @param message Error message
* @param cause Cause
* @param context Ballerina context
*/
public ArtemisConnectorException(String message, Throwable cause, Context context) {
super(message, cause, context);
}

/**
* Constructs a new {@link ArtemisConnectorException} with the cause.
*
* @param cause Throwable to be wrap by {@link ArtemisConnectorException}
*/
public ArtemisConnectorException(Throwable cause) {
super(cause);
}

/**
* Constructs a new {@link ArtemisConnectorException} with ballerina context.
*
* @param stack Ballerina context
*/
public ArtemisConnectorException(Context stack) {
super(stack);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ArtemisConstants {
public static final String PROTOCOL_PACKAGE_ARTEMIS = BALLERINA_PACKAGE_PREFIX + ARTEMIS;

// Error related constants
static final String ARTEMIS_ERROR_CODE = "{ballerina/artemis}ArtemisError";
static final String ARTEMIS_ERROR_CODE = "{" + PROTOCOL_PACKAGE_ARTEMIS + "}ArtemisError";
static final String ARTEMIS_ERROR_RECORD = "ArtemisError";
static final String ARTEMIS_ERROR_MESSAGE = "message";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import org.ballerinalang.model.tree.ServiceNode;
import org.ballerinalang.util.diagnostic.Diagnostic;
import org.ballerinalang.util.diagnostic.DiagnosticLog;
import org.wso2.ballerinalang.compiler.semantics.model.types.BArrayType;
import org.wso2.ballerinalang.compiler.semantics.model.types.BMapType;
import org.wso2.ballerinalang.compiler.semantics.model.types.BType;
import org.wso2.ballerinalang.compiler.tree.BLangFunction;
import org.wso2.ballerinalang.compiler.tree.BLangSimpleVariable;
import org.wso2.ballerinalang.compiler.util.TypeTags;
import org.wso2.ballerinalang.util.AbstractTransportCompilerPlugin;

import java.util.List;
Expand All @@ -34,12 +38,18 @@
* @since 0.995.0
*/
@SupportedResourceParamTypes(
expectedListenerType = @SupportedResourceParamTypes.Type(packageName = ArtemisConstants.ARTEMIS,
name = ArtemisConstants.LISTENER_OBJ),
paramTypes = {@SupportedResourceParamTypes.Type(packageName = ArtemisConstants.ARTEMIS,
name = ArtemisConstants.MESSAGE_OBJ)})
expectedListenerType = @SupportedResourceParamTypes.Type(
packageName = ArtemisConstants.ARTEMIS,
name = ArtemisConstants.LISTENER_OBJ
),
paramTypes = {
@SupportedResourceParamTypes.Type(
packageName = ArtemisConstants.ARTEMIS,
name = ArtemisConstants.MESSAGE_OBJ
)})
public class ArtemisServiceCompilerPlugin extends AbstractTransportCompilerPlugin {

private static final String INVALID_RESOURCE_SIGNATURE_FOR = "Invalid resource signature for ";
private DiagnosticLog dlog = null;

@Override
Expand Down Expand Up @@ -77,16 +87,52 @@ private void validate(BLangFunction resource) {
dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, "Invalid return type: expected error?");
}
List<BLangSimpleVariable> paramDetails = resource.getParameters();
if (paramDetails == null || paramDetails.size() != 1) {
dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, "Invalid resource signature for "
+ resource.getName().getValue() +
" resource: Unexpected parameter count(expected parameter count = 1)");
if (paramDetails == null || paramDetails.isEmpty() || paramDetails.size() > 2) {
dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos,
INVALID_RESOURCE_SIGNATURE_FOR + resource.getName().getValue() +
" resource: Unexpected parameter count(expected parameter count 1 or 2)");
return;
}
if (!ArtemisConstants.MESSAGE_OBJ_FULL_NAME.equals(paramDetails.get(0).type.toString())) {
dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, "Invalid resource signature for "
dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, INVALID_RESOURCE_SIGNATURE_FOR
+ resource.getName().getValue() + " resource: The first parameter should be an artemis:Message");
}
if (paramDetails.size() == 2) {
validateSecondParam(resource, paramDetails);
}
}

private void validateSecondParam(BLangFunction resource, List<BLangSimpleVariable> paramDetails) {
BType secondParamType = paramDetails.get(1).type;
int secondParamTypeTag = secondParamType.tag;
if (secondParamTypeTag != TypeTags.STRING && secondParamTypeTag != TypeTags.JSON &&
secondParamTypeTag != TypeTags.XML && secondParamTypeTag != TypeTags.RECORD &&
secondParamTypeTag != TypeTags.MAP && checkArrayType(secondParamType)) {
dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, INVALID_RESOURCE_SIGNATURE_FOR
+ resource.getName().getValue() + " resource in service " +
": The second parameter should be a string, json, xml, byte[], map or a record type");
}
if (secondParamTypeTag == TypeTags.MAP && checkMapConstraint(secondParamType)) {
dlog.logDiagnostic(Diagnostic.Kind.ERROR, resource.pos, INVALID_RESOURCE_SIGNATURE_FOR
+ resource.getName().getValue() + " resource in service " +
": The second parameter should be a map of string, int, float, byte, boolean or byte[]");
}
}

private boolean checkArrayType(BType secondParamType) {
return secondParamType.tag != TypeTags.ARRAY || (secondParamType instanceof BArrayType &&
((BArrayType) secondParamType).getElementType().tag != org.ballerinalang.model.types.TypeTags.BYTE_TAG);
}

private boolean checkMapConstraint(BType paramType) {
if (paramType instanceof BMapType) {
BType constraintType = ((BMapType) paramType).constraint;
int constraintTypeTag = constraintType.tag;
return constraintTypeTag != TypeTags.STRING && constraintTypeTag != TypeTags.INT &&
constraintTypeTag != TypeTags.FLOAT && constraintTypeTag != TypeTags.BYTE &&
constraintTypeTag != TypeTags.BOOLEAN && checkArrayType(constraintType);
}
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ public void handleTransactionBlock(Context context, String objectType) {
session.commit();
return;
} catch (ActiveMQException e) {
context.setError(ArtemisUtils.getError(context, e));
throw new ArtemisConnectorException("Session commit failed: " + e.getMessage(), e, context);
}
}
} else {
if (!context.isInTransaction()) {
context.setError(ArtemisUtils.getError(context, "The Session used by the Artemis " + objectType +
" object is transacted. Hence " + objectType +
" transacted actions cannot be used outside a transaction block"));
throw new ArtemisConnectorException("The Session used by the Artemis " + objectType +
" object is transacted. Hence " + objectType +
" transacted actions cannot be used outside a transaction" +
" block", context);
}
}
TransactionLocalContext transactionLocalContext = context.getLocalTransactionInfo();
Expand Down
Loading

0 comments on commit d474d2c

Please sign in to comment.