Skip to content

Commit

Permalink
Add suggestions from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
ravinperera00 committed Nov 4, 2024
1 parent e061407 commit 37f710b
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.jms.Destination;
import javax.jms.JMSException;
Expand All @@ -54,8 +52,6 @@
* Represents {@link javax.jms.MessageConsumer} related utility functions.
*/
public class Actions {
private static final ExecutorService executorService = Executors.newCachedThreadPool(new ConsumerThreadFactory());

private static final BString CONSUMER_TYPE = StringUtils.fromString("type");
private static final BString MESSAGE_SELECTOR = StringUtils.fromString("messageSelector");
private static final BString NO_LOCAL = StringUtils.fromString("noLocal");
Expand Down Expand Up @@ -129,33 +125,31 @@ private static MessageConsumer createConsumer(Session session, BMap<BString, Obj
*/
public static Object receive(Environment env, BObject consumer, long timeout) {
MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER);
return env.yieldAndRun(() -> {
CompletableFuture<Object> balFuture = new CompletableFuture<>();
executorService.execute(() -> {
try {
Message message = nativeConsumer.receive(timeout);
if (Objects.isNull(message)) {
balFuture.complete(null);
} else {
BMap<BString, Object> ballerinaMsg = getBallerinaMessage(message);
balFuture.complete(ballerinaMsg);
}
} catch (JMSException exception) {
BError bError = createError(JMS_ERROR,
String.format("Error occurred while receiving messages: %s", exception.getMessage()),
exception);
balFuture.complete(bError);
} catch (BallerinaJmsException exception) {
balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception));
} catch (Exception exception) {
BError bError = createError(JMS_ERROR,
String.format("Unknown error occurred while processing the received messages: %s",
exception.getMessage()), exception);
balFuture.complete(bError);
CompletableFuture<Object> balFuture = new CompletableFuture<>();
Thread.startVirtualThread(() -> {
try {
Message message = nativeConsumer.receive(timeout);
if (Objects.isNull(message)) {
balFuture.complete(null);
} else {
BMap<BString, Object> ballerinaMsg = getBallerinaMessage(message);
balFuture.complete(ballerinaMsg);
}
});
return Util.getResult(balFuture);
} catch (JMSException exception) {
BError bError = createError(JMS_ERROR,
String.format("Error occurred while receiving messages: %s", exception.getMessage()),
exception);
balFuture.complete(bError);
} catch (BallerinaJmsException exception) {
balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception));
} catch (Exception exception) {
BError bError = createError(JMS_ERROR,
String.format("Unknown error occurred while processing the received messages: %s",
exception.getMessage()), exception);
balFuture.complete(bError);
}
});
return Util.getResult(balFuture);
}

/**
Expand All @@ -168,33 +162,31 @@ public static Object receive(Environment env, BObject consumer, long timeout) {
*/
public static Object receiveNoWait(Environment env, BObject consumer) {
MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER);
return env.yieldAndRun(() -> {
CompletableFuture<Object> balFuture = new CompletableFuture<>();
executorService.execute(() -> {
try {
Message message = nativeConsumer.receiveNoWait();
if (Objects.isNull(message)) {
balFuture.complete(null);
} else {
BMap<BString, Object> ballerinaMsg = getBallerinaMessage(message);
balFuture.complete(ballerinaMsg);
}
} catch (JMSException exception) {
BError bError = createError(JMS_ERROR,
String.format("Error occurred while receiving messages: %s", exception.getMessage()),
exception);
balFuture.complete(bError);
} catch (BallerinaJmsException exception) {
balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception));
} catch (Exception exception) {
BError bError = createError(JMS_ERROR,
String.format("Unknown error occurred while processing the received messages: %s",
exception.getMessage()), exception);
balFuture.complete(bError);
CompletableFuture<Object> balFuture = new CompletableFuture<>();
Thread.startVirtualThread(() -> {
try {
Message message = nativeConsumer.receiveNoWait();
if (Objects.isNull(message)) {
balFuture.complete(null);
} else {
BMap<BString, Object> ballerinaMsg = getBallerinaMessage(message);
balFuture.complete(ballerinaMsg);
}
});
return Util.getResult(balFuture);
} catch (JMSException exception) {
BError bError = createError(JMS_ERROR,
String.format("Error occurred while receiving messages: %s", exception.getMessage()),
exception);
balFuture.complete(bError);
} catch (BallerinaJmsException exception) {
balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception));
} catch (Exception exception) {
BError bError = createError(JMS_ERROR,
String.format("Unknown error occurred while processing the received messages: %s",
exception.getMessage()), exception);
balFuture.complete(bError);
}
});
return Util.getResult(balFuture);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,28 @@ public ListenerImpl(BObject consumerService, Runtime ballerinaRuntime) {

@Override
public void onMessage(Message message) {
try {
Module module = ModuleUtils.getModule();
StrandMetadata metadata = new StrandMetadata(
module.getOrg(), module.getName(), module.getVersion(), SERVICE_RESOURCE_ON_MESSAGE);
ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(consumerService));
Object[] params = methodParameters(serviceType, message);
Object result;
if (serviceType.isIsolated() && serviceType.isIsolated(SERVICE_RESOURCE_ON_MESSAGE)) {
result = ballerinaRuntime.startIsolatedWorker(
Thread.startVirtualThread(() -> {
try {
Module module = ModuleUtils.getModule();
StrandMetadata metadata = new StrandMetadata(
module.getOrg(), module.getName(), module.getVersion(), SERVICE_RESOURCE_ON_MESSAGE);
ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(consumerService));
Object[] params = methodParameters(serviceType, message);
Object result;
if (serviceType.isIsolated() && serviceType.isIsolated(SERVICE_RESOURCE_ON_MESSAGE)) {
result = ballerinaRuntime.startIsolatedWorker(
consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, null, params).get();
} else {
result = ballerinaRuntime.startNonIsolatedWorker(
consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, null, params);
} else {
result = ballerinaRuntime.startNonIsolatedWorker(
consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, null, params);
}
Util.notifySuccess(result);
} catch (JMSException | BallerinaJmsException e) {
LOGGER.error("Unexpected error occurred while async message processing", e);
} catch (BError bError) {
Util.notifyFailure(bError);
}
Util.notifySuccess(result);
} catch (JMSException | BallerinaJmsException e) {
LOGGER.error("Unexpected error occurred while async message processing", e);
} catch (BError bError) {
Util.notifyFailure(bError);
}
});
}

private Object[] methodParameters(ObjectType serviceType, Message message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import io.ballerina.stdlib.java.jms.Util;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.jms.Destination;
import javax.jms.JMSException;
Expand All @@ -47,8 +45,6 @@
* Representation of {@link javax.jms.MessageProducer} with utility methods to invoke as inter-op functions.
*/
public class Actions {
private static final ExecutorService executorService = Executors.newCachedThreadPool(new ProducerThreadFactory());

/**
* Creates a {@link javax.jms.MessageProducer} object with given {@link javax.jms.Session}.
*
Expand Down Expand Up @@ -84,21 +80,19 @@ public static Object init(BObject producer, BObject session, Object destination)
*/
public static Object send(Environment env, BObject producer, Message message) {
MessageProducer nativeProducer = (MessageProducer) producer.getNativeData(NATIVE_PRODUCER);
return env.yieldAndRun(() -> {
CompletableFuture<Object> balFuture = new CompletableFuture<>();
executorService.execute(() -> {
try {
nativeProducer.send(message);
balFuture.complete(null);
} catch (UnsupportedOperationException | JMSException exception) {
BError bError = createError(JMS_ERROR,
String.format("Error occurred while sending a message to the JMS provider: %s",
exception.getMessage()), exception);
balFuture.complete(bError);
}
});
return Util.getResult(balFuture);
CompletableFuture<Object> balFuture = new CompletableFuture<>();
Thread.startVirtualThread(() -> {
try {
nativeProducer.send(message);
balFuture.complete(null);
} catch (UnsupportedOperationException | JMSException exception) {
BError bError = createError(JMS_ERROR,
String.format("Error occurred while sending a message to the JMS provider: %s",
exception.getMessage()), exception);
balFuture.complete(bError);
}
});
return Util.getResult(balFuture);
}

/**
Expand All @@ -116,25 +110,23 @@ public static Object sendTo(Environment env, BObject producer, BObject session,
Message message) {
MessageProducer nativeProducer = (MessageProducer) producer.getNativeData(NATIVE_PRODUCER);
Session nativeSession = (Session) session.getNativeData(NATIVE_SESSION);
return env.yieldAndRun(() -> {
CompletableFuture<Object> balFuture = new CompletableFuture<>();
executorService.execute(() -> {
try {
Destination jmsDestination = getDestination(nativeSession, destination);
nativeProducer.send(jmsDestination, message);
balFuture.complete(null);
} catch (BallerinaJmsException exception) {
BError bError = createError(JMS_ERROR, exception.getMessage(), exception);
balFuture.complete(bError);
} catch (UnsupportedOperationException | JMSException exception) {
BError bError = createError(JMS_ERROR,
String.format("Error occurred while sending a message to the JMS provider: %s",
exception.getMessage()), exception);
balFuture.complete(bError);
}
});
return Util.getResult(balFuture);
CompletableFuture<Object> balFuture = new CompletableFuture<>();
Thread.startVirtualThread(() -> {
try {
Destination jmsDestination = getDestination(nativeSession, destination);
nativeProducer.send(jmsDestination, message);
balFuture.complete(null);
} catch (BallerinaJmsException exception) {
BError bError = createError(JMS_ERROR, exception.getMessage(), exception);
balFuture.complete(bError);
} catch (UnsupportedOperationException | JMSException exception) {
BError bError = createError(JMS_ERROR,
String.format("Error occurred while sending a message to the JMS provider: %s",
exception.getMessage()), exception);
balFuture.complete(bError);
}
});
return Util.getResult(balFuture);
}

/**
Expand Down

0 comments on commit 37f710b

Please sign in to comment.