diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java b/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java index fd8e405..08aa90f 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java @@ -30,8 +30,6 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.jms.Destination; import javax.jms.JMSException; @@ -54,8 +52,6 @@ * Represents {@link javax.jms.MessageConsumer} related utility functions. */ public class Actions { - private static final ExecutorService executorService = Executors.newCachedThreadPool(new ConsumerThreadFactory()); - private static final BString CONSUMER_TYPE = StringUtils.fromString("type"); private static final BString MESSAGE_SELECTOR = StringUtils.fromString("messageSelector"); private static final BString NO_LOCAL = StringUtils.fromString("noLocal"); @@ -129,33 +125,31 @@ private static MessageConsumer createConsumer(Session session, BMap { - CompletableFuture balFuture = new CompletableFuture<>(); - executorService.execute(() -> { - try { - Message message = nativeConsumer.receive(timeout); - if (Objects.isNull(message)) { - balFuture.complete(null); - } else { - BMap ballerinaMsg = getBallerinaMessage(message); - balFuture.complete(ballerinaMsg); - } - } catch (JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while receiving messages: %s", exception.getMessage()), - exception); - balFuture.complete(bError); - } catch (BallerinaJmsException exception) { - balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); - } catch (Exception exception) { - BError bError = createError(JMS_ERROR, - String.format("Unknown error occurred while processing the received messages: %s", - exception.getMessage()), exception); - balFuture.complete(bError); + CompletableFuture balFuture = new CompletableFuture<>(); + Thread.startVirtualThread(() -> { + try { + Message message = nativeConsumer.receive(timeout); + if (Objects.isNull(message)) { + balFuture.complete(null); + } else { + BMap ballerinaMsg = getBallerinaMessage(message); + balFuture.complete(ballerinaMsg); } - }); - return Util.getResult(balFuture); + } catch (JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while receiving messages: %s", exception.getMessage()), + exception); + balFuture.complete(bError); + } catch (BallerinaJmsException exception) { + balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); + } catch (Exception exception) { + BError bError = createError(JMS_ERROR, + String.format("Unknown error occurred while processing the received messages: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } }); + return Util.getResult(balFuture); } /** @@ -168,33 +162,31 @@ public static Object receive(Environment env, BObject consumer, long timeout) { */ public static Object receiveNoWait(Environment env, BObject consumer) { MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER); - return env.yieldAndRun(() -> { - CompletableFuture balFuture = new CompletableFuture<>(); - executorService.execute(() -> { - try { - Message message = nativeConsumer.receiveNoWait(); - if (Objects.isNull(message)) { - balFuture.complete(null); - } else { - BMap ballerinaMsg = getBallerinaMessage(message); - balFuture.complete(ballerinaMsg); - } - } catch (JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while receiving messages: %s", exception.getMessage()), - exception); - balFuture.complete(bError); - } catch (BallerinaJmsException exception) { - balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); - } catch (Exception exception) { - BError bError = createError(JMS_ERROR, - String.format("Unknown error occurred while processing the received messages: %s", - exception.getMessage()), exception); - balFuture.complete(bError); + CompletableFuture balFuture = new CompletableFuture<>(); + Thread.startVirtualThread(() -> { + try { + Message message = nativeConsumer.receiveNoWait(); + if (Objects.isNull(message)) { + balFuture.complete(null); + } else { + BMap ballerinaMsg = getBallerinaMessage(message); + balFuture.complete(ballerinaMsg); } - }); - return Util.getResult(balFuture); + } catch (JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while receiving messages: %s", exception.getMessage()), + exception); + balFuture.complete(bError); + } catch (BallerinaJmsException exception) { + balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); + } catch (Exception exception) { + BError bError = createError(JMS_ERROR, + String.format("Unknown error occurred while processing the received messages: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } }); + return Util.getResult(balFuture); } /** diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java index 15bf160..2c6b4bf 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java @@ -64,26 +64,28 @@ public ListenerImpl(BObject consumerService, Runtime ballerinaRuntime) { @Override public void onMessage(Message message) { - try { - Module module = ModuleUtils.getModule(); - StrandMetadata metadata = new StrandMetadata( - module.getOrg(), module.getName(), module.getVersion(), SERVICE_RESOURCE_ON_MESSAGE); - ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(consumerService)); - Object[] params = methodParameters(serviceType, message); - Object result; - if (serviceType.isIsolated() && serviceType.isIsolated(SERVICE_RESOURCE_ON_MESSAGE)) { - result = ballerinaRuntime.startIsolatedWorker( + Thread.startVirtualThread(() -> { + try { + Module module = ModuleUtils.getModule(); + StrandMetadata metadata = new StrandMetadata( + module.getOrg(), module.getName(), module.getVersion(), SERVICE_RESOURCE_ON_MESSAGE); + ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(consumerService)); + Object[] params = methodParameters(serviceType, message); + Object result; + if (serviceType.isIsolated() && serviceType.isIsolated(SERVICE_RESOURCE_ON_MESSAGE)) { + result = ballerinaRuntime.startIsolatedWorker( consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, null, params).get(); - } else { - result = ballerinaRuntime.startNonIsolatedWorker( - consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, null, params); + } else { + result = ballerinaRuntime.startNonIsolatedWorker( + consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, null, params); + } + Util.notifySuccess(result); + } catch (JMSException | BallerinaJmsException e) { + LOGGER.error("Unexpected error occurred while async message processing", e); + } catch (BError bError) { + Util.notifyFailure(bError); } - Util.notifySuccess(result); - } catch (JMSException | BallerinaJmsException e) { - LOGGER.error("Unexpected error occurred while async message processing", e); - } catch (BError bError) { - Util.notifyFailure(bError); - } + }); } private Object[] methodParameters(ObjectType serviceType, Message message) diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java b/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java index 52539c6..93cd597 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java @@ -27,8 +27,6 @@ import io.ballerina.stdlib.java.jms.Util; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.jms.Destination; import javax.jms.JMSException; @@ -47,8 +45,6 @@ * Representation of {@link javax.jms.MessageProducer} with utility methods to invoke as inter-op functions. */ public class Actions { - private static final ExecutorService executorService = Executors.newCachedThreadPool(new ProducerThreadFactory()); - /** * Creates a {@link javax.jms.MessageProducer} object with given {@link javax.jms.Session}. * @@ -84,21 +80,19 @@ public static Object init(BObject producer, BObject session, Object destination) */ public static Object send(Environment env, BObject producer, Message message) { MessageProducer nativeProducer = (MessageProducer) producer.getNativeData(NATIVE_PRODUCER); - return env.yieldAndRun(() -> { - CompletableFuture balFuture = new CompletableFuture<>(); - executorService.execute(() -> { - try { - nativeProducer.send(message); - balFuture.complete(null); - } catch (UnsupportedOperationException | JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while sending a message to the JMS provider: %s", - exception.getMessage()), exception); - balFuture.complete(bError); - } - }); - return Util.getResult(balFuture); + CompletableFuture balFuture = new CompletableFuture<>(); + Thread.startVirtualThread(() -> { + try { + nativeProducer.send(message); + balFuture.complete(null); + } catch (UnsupportedOperationException | JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while sending a message to the JMS provider: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } }); + return Util.getResult(balFuture); } /** @@ -116,25 +110,23 @@ public static Object sendTo(Environment env, BObject producer, BObject session, Message message) { MessageProducer nativeProducer = (MessageProducer) producer.getNativeData(NATIVE_PRODUCER); Session nativeSession = (Session) session.getNativeData(NATIVE_SESSION); - return env.yieldAndRun(() -> { - CompletableFuture balFuture = new CompletableFuture<>(); - executorService.execute(() -> { - try { - Destination jmsDestination = getDestination(nativeSession, destination); - nativeProducer.send(jmsDestination, message); - balFuture.complete(null); - } catch (BallerinaJmsException exception) { - BError bError = createError(JMS_ERROR, exception.getMessage(), exception); - balFuture.complete(bError); - } catch (UnsupportedOperationException | JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while sending a message to the JMS provider: %s", - exception.getMessage()), exception); - balFuture.complete(bError); - } - }); - return Util.getResult(balFuture); + CompletableFuture balFuture = new CompletableFuture<>(); + Thread.startVirtualThread(() -> { + try { + Destination jmsDestination = getDestination(nativeSession, destination); + nativeProducer.send(jmsDestination, message); + balFuture.complete(null); + } catch (BallerinaJmsException exception) { + BError bError = createError(JMS_ERROR, exception.getMessage(), exception); + balFuture.complete(bError); + } catch (UnsupportedOperationException | JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while sending a message to the JMS provider: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } }); + return Util.getResult(balFuture); } /**