From e061407340baa651d1e151442db6acf41c126746 Mon Sep 17 00:00:00 2001 From: ravinperera00 Date: Mon, 14 Oct 2024 12:15:24 +0530 Subject: [PATCH 1/2] Migrate to Java 21 --- .../workflows/build-timestamped-master.yml | 2 +- .../workflows/build-with-bal-test-graalvm.yml | 2 +- .github/workflows/central-publish.yml | 2 +- .github/workflows/publish-release.yml | 2 +- .github/workflows/pull-request.yml | 2 +- .github/workflows/trigger-load-tests.yml | 2 +- .github/workflows/trivy-scan.yml | 2 +- README.md | 2 +- ballerina/Ballerina.toml | 10 +- ballerina/Dependencies.toml | 2 +- build-config/resources/Ballerina.toml | 10 +- gradle.properties | 18 ++-- native/build.gradle | 7 +- .../io/ballerina/stdlib/java.jms/Util.java | 52 ++++++++++ .../stdlib/java.jms/consumer/Actions.java | 99 ++++++++++--------- .../java.jms/listener/ListenerCallback.java | 41 -------- .../java.jms/listener/ListenerImpl.java | 23 +++-- .../stdlib/java.jms/producer/Actions.java | 63 ++++++------ 18 files changed, 182 insertions(+), 159 deletions(-) create mode 100644 native/src/main/java/io/ballerina/stdlib/java.jms/Util.java delete mode 100644 native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerCallback.java diff --git a/.github/workflows/build-timestamped-master.yml b/.github/workflows/build-timestamped-master.yml index 2d154ffc..f202d1aa 100644 --- a/.github/workflows/build-timestamped-master.yml +++ b/.github/workflows/build-timestamped-master.yml @@ -14,5 +14,5 @@ jobs: call_workflow: name: Run Build Workflow if: ${{ github.repository_owner == 'ballerina-platform' }} - uses: ballerina-platform/ballerina-library/.github/workflows/build-timestamp-master-template.yml@main + uses: ballerina-platform/ballerina-library/.github/workflows/build-timestamp-master-template.yml@java21 secrets: inherit diff --git a/.github/workflows/build-with-bal-test-graalvm.yml b/.github/workflows/build-with-bal-test-graalvm.yml index 9c4fd6eb..21f53242 100644 --- a/.github/workflows/build-with-bal-test-graalvm.yml +++ b/.github/workflows/build-with-bal-test-graalvm.yml @@ -30,7 +30,7 @@ jobs: call_stdlib_workflow: name: Run StdLib Workflow if: ${{ github.event_name != 'schedule' || (github.event_name == 'schedule' && github.repository_owner == 'ballerina-platform') }} - uses: ballerina-platform/ballerina-library/.github/workflows/build-with-bal-test-graalvm-template.yml@main + uses: ballerina-platform/ballerina-library/.github/workflows/build-with-bal-test-graalvm-template.yml@java21 with: lang_tag: ${{ inputs.lang_tag }} lang_version: ${{ inputs.lang_version }} diff --git a/.github/workflows/central-publish.yml b/.github/workflows/central-publish.yml index 11922b55..ebe213e7 100644 --- a/.github/workflows/central-publish.yml +++ b/.github/workflows/central-publish.yml @@ -15,7 +15,7 @@ jobs: call_workflow: name: Run Central Publish Workflow if: ${{ github.repository_owner == 'ballerina-platform' }} - uses: ballerina-platform/ballerina-library/.github/workflows/central-publish-template.yml@main + uses: ballerina-platform/ballerina-library/.github/workflows/central-publish-template.yml@java21 secrets: inherit with: environment: ${{ github.event.inputs.environment }} diff --git a/.github/workflows/publish-release.yml b/.github/workflows/publish-release.yml index 69914a55..28b3551a 100644 --- a/.github/workflows/publish-release.yml +++ b/.github/workflows/publish-release.yml @@ -9,7 +9,7 @@ jobs: call_workflow: name: Run Release Workflow if: ${{ github.repository_owner == 'ballerina-platform' }} - uses: ballerina-platform/ballerina-library/.github/workflows/release-package-template.yml@main + uses: ballerina-platform/ballerina-library/.github/workflows/release-package-template.yml@java21 secrets: inherit with: package-name: java.jms diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 68b9979d..daab7eb5 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -10,7 +10,7 @@ jobs: call_workflow: name: Run PR Build Workflow if: ${{ github.repository_owner == 'ballerina-platform' }} - uses: ballerina-platform/ballerina-library/.github/workflows/pull-request-build-template.yml@main + uses: ballerina-platform/ballerina-library/.github/workflows/pull-request-build-template.yml@java21 with: additional-windows-test-flags: "-x test" secrets: inherit diff --git a/.github/workflows/trigger-load-tests.yml b/.github/workflows/trigger-load-tests.yml index fc0b41e9..3e61b084 100644 --- a/.github/workflows/trigger-load-tests.yml +++ b/.github/workflows/trigger-load-tests.yml @@ -22,7 +22,7 @@ jobs: call_stdlib_trigger_load_test_workflow: name: Run StdLib Load Test Workflow if: ${{ github.event_name != 'schedule' || (github.event_name == 'schedule' && github.repository_owner == 'ballerina-platform') }} - uses: ballerina-platform/ballerina-library/.github/workflows/trigger-load-tests-template.yml@main + uses: ballerina-platform/ballerina-library/.github/workflows/trigger-load-tests-template.yml@java21 with: repo_name: 'module-ballerina-java.jms' runtime_artifacts_url: 'https://api.github.com/repos/ballerina-platform/module-ballerina-java.jms/actions/artifacts' diff --git a/.github/workflows/trivy-scan.yml b/.github/workflows/trivy-scan.yml index 458aab57..d91a5f37 100644 --- a/.github/workflows/trivy-scan.yml +++ b/.github/workflows/trivy-scan.yml @@ -9,5 +9,5 @@ jobs: call_workflow: name: Run Trivy Scan Workflow if: ${{ github.repository_owner == 'ballerina-platform' }} - uses: ballerina-platform/ballerina-library/.github/workflows/trivy-scan-template.yml@main + uses: ballerina-platform/ballerina-library/.github/workflows/trivy-scan-template.yml@java21 secrets: inherit diff --git a/README.md b/README.md index 8ca25585..2cb87ac3 100644 --- a/README.md +++ b/README.md @@ -118,7 +118,7 @@ This repository only contains the source code for the library. ### Set up the prerequisites -1. Download and install Java SE Development Kit (JDK) version 17 (from one of the following locations). +1. Download and install Java SE Development Kit (JDK) version 21 (from one of the following locations). * [Oracle](https://www.oracle.com/java/technologies/downloads/) * [OpenJDK](https://adoptium.net/) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index 0b6af721..14c30ba1 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -7,27 +7,27 @@ keywords = ["jms"] repository = "https://github.com/ballerina-platform/module-ballerina-java.jms" icon = "icon.png" license = ["Apache-2.0"] -distribution = "2201.8.0" +distribution = "2201.10.0-20241011-161100-51978649" [build-options] observabilityIncluded = true -[platform.java17] +[platform.java21] graalvmCompatible = true -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "java.jms-native" version = "1.0.2" path = "../native/build/libs/java.jms-native-1.0.2-SNAPSHOT.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.slf4j" artifactId = "slf4j-api" version = "2.0.7" path = "./lib/slf4j-api-2.0.7.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "javax.jms" artifactId = "javax.jms-api" version = "2.0.1" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index f3894d73..82004dbc 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -5,7 +5,7 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.8.0" +distribution-version = "2201.10.0-20241011-161100-51978649" [[package]] org = "ballerina" diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index bc41f0de..55263444 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -7,27 +7,27 @@ keywords = ["jms"] repository = "https://github.com/ballerina-platform/module-ballerina-java.jms" icon = "icon.png" license = ["Apache-2.0"] -distribution = "2201.8.0" +distribution = "2201.10.0-20241011-161100-51978649" [build-options] observabilityIncluded = true -[platform.java17] +[platform.java21] graalvmCompatible = true -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "java.jms-native" version = "@toml.version@" path = "../native/build/libs/java.jms-native-@project.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.slf4j" artifactId = "slf4j-api" version = "@slf4j.version@" path = "./lib/slf4j-api-@slf4j.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "javax.jms" artifactId = "javax.jms-api" version = "@javax.jms.version@" diff --git a/gradle.properties b/gradle.properties index 9f83dbfe..99eacf42 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,10 +1,10 @@ org.gradle.caching=true group=io.ballerina.stdlib version=1.0.2-SNAPSHOT -ballerinaLangVersion=2201.8.0 +ballerinaLangVersion=2201.10.0-20241011-161100-51978649 checkstylePluginVersion=10.12.1 -spotbugsPluginVersion=5.0.14 +spotbugsPluginVersion=6.0.18 shadowJarPluginVersion=8.1.1 downloadPluginVersion=5.4.0 releasePluginVersion=2.8.0 @@ -20,15 +20,15 @@ activeMQDriverVersion=1.0.1 #stdlib dependencies # Level 01 -stdlibTimeVersion=2.4.0 -stdlibIoVersion=1.6.0 +stdlibTimeVersion=2.5.1-20240930-120200-e59222b +stdlibIoVersion=1.6.2-20240928-084100-656404f # Level 02 -stdlibLogVersion=2.9.0 -stdlibCryptoVersion=2.5.0 +stdlibLogVersion=2.10.1-20240930-154200-5ab2aa4 +stdlibCryptoVersion=2.7.3-20240930-132000-5ecc9ab # Level 03 -stdlibUuidVersion=1.7.0 +stdlibUuidVersion=1.8.1-20241009-134600-a05012b -observeVersion=1.2.0 -observeInternalVersion=1.2.0 +observeVersion=1.3.1-20241007-161000-645452d +observeInternalVersion=1.3.1-20241007-182700-a5f77a1 diff --git a/native/build.gradle b/native/build.gradle index fe0c2691..38ca0a85 100644 --- a/native/build.gradle +++ b/native/build.gradle @@ -44,8 +44,11 @@ checkstyle { checkstyleMain.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsMain { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/Util.java b/native/src/main/java/io/ballerina/stdlib/java.jms/Util.java new file mode 100644 index 00000000..55c4f6c4 --- /dev/null +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/Util.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2024 WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.ballerina.stdlib.java.jms; + +import io.ballerina.runtime.api.creators.ErrorCreator; +import io.ballerina.runtime.api.values.BError; + +import java.util.concurrent.CompletableFuture; + +/** + * Util methods for java.jms operations. + */ +public class Util { + private Util() { + //private constructor + } + + public static Object getResult(CompletableFuture balFuture) { + try { + return balFuture.get(); + } catch (Throwable throwable) { + throw ErrorCreator.createError(throwable); + } + } + + public static void notifySuccess(Object o) { + if (o instanceof BError) { + ((BError) o).printStackTrace(); + } + } + + public static void notifyFailure(BError bError) { + bError.printStackTrace(); + } + +} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java b/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java index 20b01159..fd8e4058 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java @@ -19,16 +19,17 @@ package io.ballerina.stdlib.java.jms.consumer; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; import io.ballerina.stdlib.java.jms.BallerinaJmsException; +import io.ballerina.stdlib.java.jms.Util; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -128,31 +129,33 @@ private static MessageConsumer createConsumer(Session session, BMap { - try { - Message message = nativeConsumer.receive(timeout); - if (Objects.isNull(message)) { - balFuture.complete(null); - } else { - BMap ballerinaMsg = getBallerinaMessage(message); - balFuture.complete(ballerinaMsg); + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + executorService.execute(() -> { + try { + Message message = nativeConsumer.receive(timeout); + if (Objects.isNull(message)) { + balFuture.complete(null); + } else { + BMap ballerinaMsg = getBallerinaMessage(message); + balFuture.complete(ballerinaMsg); + } + } catch (JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while receiving messages: %s", exception.getMessage()), + exception); + balFuture.complete(bError); + } catch (BallerinaJmsException exception) { + balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); + } catch (Exception exception) { + BError bError = createError(JMS_ERROR, + String.format("Unknown error occurred while processing the received messages: %s", + exception.getMessage()), exception); + balFuture.complete(bError); } - } catch (JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while receiving messages: %s", exception.getMessage()), - exception); - balFuture.complete(bError); - } catch (BallerinaJmsException exception) { - balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); - } catch (Exception exception) { - BError bError = createError(JMS_ERROR, - String.format("Unknown error occurred while processing the received messages: %s", - exception.getMessage()), exception); - balFuture.complete(bError); - } + }); + return Util.getResult(balFuture); }); - return null; } /** @@ -165,31 +168,33 @@ public static Object receive(Environment env, BObject consumer, long timeout) { */ public static Object receiveNoWait(Environment env, BObject consumer) { MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER); - Future balFuture = env.markAsync(); - executorService.execute(() -> { - try { - Message message = nativeConsumer.receiveNoWait(); - if (Objects.isNull(message)) { - balFuture.complete(null); - } else { - BMap ballerinaMsg = getBallerinaMessage(message); - balFuture.complete(ballerinaMsg); + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + executorService.execute(() -> { + try { + Message message = nativeConsumer.receiveNoWait(); + if (Objects.isNull(message)) { + balFuture.complete(null); + } else { + BMap ballerinaMsg = getBallerinaMessage(message); + balFuture.complete(ballerinaMsg); + } + } catch (JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while receiving messages: %s", exception.getMessage()), + exception); + balFuture.complete(bError); + } catch (BallerinaJmsException exception) { + balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); + } catch (Exception exception) { + BError bError = createError(JMS_ERROR, + String.format("Unknown error occurred while processing the received messages: %s", + exception.getMessage()), exception); + balFuture.complete(bError); } - } catch (JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while receiving messages: %s", exception.getMessage()), - exception); - balFuture.complete(bError); - } catch (BallerinaJmsException exception) { - balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); - } catch (Exception exception) { - BError bError = createError(JMS_ERROR, - String.format("Unknown error occurred while processing the received messages: %s", - exception.getMessage()), exception); - balFuture.complete(bError); - } + }); + return Util.getResult(balFuture); }); - return null; } /** diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerCallback.java b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerCallback.java deleted file mode 100644 index e8d18041..00000000 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerCallback.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. - * - * WSO2 LLC. licenses this file to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.ballerina.stdlib.java.jms.listener; - -import io.ballerina.runtime.api.async.Callback; -import io.ballerina.runtime.api.values.BError; - -/** - * Callback code to be executed when the message-listener complete a message producing cycle to the ballerina service. - */ -public class ListenerCallback implements Callback { - @Override - public void notifySuccess(Object o) { - if (o instanceof BError) { - ((BError) o).printStackTrace(); - } - } - - @Override - public void notifyFailure(BError bError) { - bError.printStackTrace(); - System.exit(1); - } - -} diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java index 7033d138..15bf1601 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java @@ -19,9 +19,7 @@ package io.ballerina.stdlib.java.jms.listener; import io.ballerina.runtime.api.Module; -import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.Runtime; -import io.ballerina.runtime.api.async.Callback; import io.ballerina.runtime.api.async.StrandMetadata; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.MethodType; @@ -29,10 +27,12 @@ import io.ballerina.runtime.api.types.Parameter; import io.ballerina.runtime.api.types.Type; import io.ballerina.runtime.api.utils.TypeUtils; +import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BObject; import io.ballerina.stdlib.java.jms.BallerinaJmsException; import io.ballerina.stdlib.java.jms.Constants; import io.ballerina.stdlib.java.jms.ModuleUtils; +import io.ballerina.stdlib.java.jms.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +56,6 @@ public class ListenerImpl implements MessageListener { private final BObject consumerService; private final Runtime ballerinaRuntime; - private final Callback callback = new ListenerCallback(); public ListenerImpl(BObject consumerService, Runtime ballerinaRuntime) { this.consumerService = consumerService; @@ -71,17 +70,19 @@ public void onMessage(Message message) { module.getOrg(), module.getName(), module.getVersion(), SERVICE_RESOURCE_ON_MESSAGE); ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(consumerService)); Object[] params = methodParameters(serviceType, message); + Object result; if (serviceType.isIsolated() && serviceType.isIsolated(SERVICE_RESOURCE_ON_MESSAGE)) { - ballerinaRuntime.invokeMethodAsyncConcurrently( - consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, callback, - null, PredefinedTypes.TYPE_NULL, params); + result = ballerinaRuntime.startIsolatedWorker( + consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, null, params).get(); } else { - ballerinaRuntime.invokeMethodAsyncSequentially( - consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, callback, - null, PredefinedTypes.TYPE_NULL, params); + result = ballerinaRuntime.startNonIsolatedWorker( + consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, null, params); } + Util.notifySuccess(result); } catch (JMSException | BallerinaJmsException e) { LOGGER.error("Unexpected error occurred while async message processing", e); + } catch (BError bError) { + Util.notifyFailure(bError); } } @@ -93,18 +94,16 @@ private Object[] methodParameters(ObjectType serviceType, Message message) if (onMessageFuncOpt.isPresent()) { MethodType onMessageFunction = onMessageFuncOpt.get(); Parameter[] parameters = onMessageFunction.getParameters(); - Object[] args = new Object[parameters.length * 2]; + Object[] args = new Object[parameters.length]; int idx = 0; for (Parameter param: parameters) { Type referredType = TypeUtils.getReferredType(param.type); switch (referredType.getTag()) { case OBJECT_TYPE_TAG: args[idx++] = ValueCreator.createObjectValue(ModuleUtils.getModule(), Constants.CALLER); - args[idx++] = true; break; case RECORD_TYPE_TAG: args[idx++] = getBallerinaMessage(message); - args[idx++] = true; break; default: throw new BallerinaJmsException( diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java b/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java index 0a63e8de..52539c6e 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java @@ -19,13 +19,14 @@ package io.ballerina.stdlib.java.jms.producer; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.values.BError; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.api.values.BString; import io.ballerina.stdlib.java.jms.BallerinaJmsException; +import io.ballerina.stdlib.java.jms.Util; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -83,19 +84,21 @@ public static Object init(BObject producer, BObject session, Object destination) */ public static Object send(Environment env, BObject producer, Message message) { MessageProducer nativeProducer = (MessageProducer) producer.getNativeData(NATIVE_PRODUCER); - Future balFuture = env.markAsync(); - executorService.execute(() -> { - try { - nativeProducer.send(message); - balFuture.complete(null); - } catch (UnsupportedOperationException | JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while sending a message to the JMS provider: %s", - exception.getMessage()), exception); - balFuture.complete(bError); - } + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + executorService.execute(() -> { + try { + nativeProducer.send(message); + balFuture.complete(null); + } catch (UnsupportedOperationException | JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while sending a message to the JMS provider: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } + }); + return Util.getResult(balFuture); }); - return null; } /** @@ -113,23 +116,25 @@ public static Object sendTo(Environment env, BObject producer, BObject session, Message message) { MessageProducer nativeProducer = (MessageProducer) producer.getNativeData(NATIVE_PRODUCER); Session nativeSession = (Session) session.getNativeData(NATIVE_SESSION); - Future balFuture = env.markAsync(); - executorService.execute(() -> { - try { - Destination jmsDestination = getDestination(nativeSession, destination); - nativeProducer.send(jmsDestination, message); - balFuture.complete(null); - } catch (BallerinaJmsException exception) { - BError bError = createError(JMS_ERROR, exception.getMessage(), exception); - balFuture.complete(bError); - } catch (UnsupportedOperationException | JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while sending a message to the JMS provider: %s", - exception.getMessage()), exception); - balFuture.complete(bError); - } + return env.yieldAndRun(() -> { + CompletableFuture balFuture = new CompletableFuture<>(); + executorService.execute(() -> { + try { + Destination jmsDestination = getDestination(nativeSession, destination); + nativeProducer.send(jmsDestination, message); + balFuture.complete(null); + } catch (BallerinaJmsException exception) { + BError bError = createError(JMS_ERROR, exception.getMessage(), exception); + balFuture.complete(bError); + } catch (UnsupportedOperationException | JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while sending a message to the JMS provider: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } + }); + return Util.getResult(balFuture); }); - return null; } /** From 37f710ba66f7639159f2a5eacf0703a230a79f53 Mon Sep 17 00:00:00 2001 From: ravinperera00 Date: Mon, 4 Nov 2024 13:37:11 +0530 Subject: [PATCH 2/2] Add suggestions from code review --- .../stdlib/java.jms/consumer/Actions.java | 100 ++++++++---------- .../java.jms/listener/ListenerImpl.java | 38 +++---- .../stdlib/java.jms/producer/Actions.java | 64 +++++------ 3 files changed, 94 insertions(+), 108 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java b/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java index fd8e4058..08aa90f3 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/consumer/Actions.java @@ -30,8 +30,6 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.jms.Destination; import javax.jms.JMSException; @@ -54,8 +52,6 @@ * Represents {@link javax.jms.MessageConsumer} related utility functions. */ public class Actions { - private static final ExecutorService executorService = Executors.newCachedThreadPool(new ConsumerThreadFactory()); - private static final BString CONSUMER_TYPE = StringUtils.fromString("type"); private static final BString MESSAGE_SELECTOR = StringUtils.fromString("messageSelector"); private static final BString NO_LOCAL = StringUtils.fromString("noLocal"); @@ -129,33 +125,31 @@ private static MessageConsumer createConsumer(Session session, BMap { - CompletableFuture balFuture = new CompletableFuture<>(); - executorService.execute(() -> { - try { - Message message = nativeConsumer.receive(timeout); - if (Objects.isNull(message)) { - balFuture.complete(null); - } else { - BMap ballerinaMsg = getBallerinaMessage(message); - balFuture.complete(ballerinaMsg); - } - } catch (JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while receiving messages: %s", exception.getMessage()), - exception); - balFuture.complete(bError); - } catch (BallerinaJmsException exception) { - balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); - } catch (Exception exception) { - BError bError = createError(JMS_ERROR, - String.format("Unknown error occurred while processing the received messages: %s", - exception.getMessage()), exception); - balFuture.complete(bError); + CompletableFuture balFuture = new CompletableFuture<>(); + Thread.startVirtualThread(() -> { + try { + Message message = nativeConsumer.receive(timeout); + if (Objects.isNull(message)) { + balFuture.complete(null); + } else { + BMap ballerinaMsg = getBallerinaMessage(message); + balFuture.complete(ballerinaMsg); } - }); - return Util.getResult(balFuture); + } catch (JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while receiving messages: %s", exception.getMessage()), + exception); + balFuture.complete(bError); + } catch (BallerinaJmsException exception) { + balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); + } catch (Exception exception) { + BError bError = createError(JMS_ERROR, + String.format("Unknown error occurred while processing the received messages: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } }); + return Util.getResult(balFuture); } /** @@ -168,33 +162,31 @@ public static Object receive(Environment env, BObject consumer, long timeout) { */ public static Object receiveNoWait(Environment env, BObject consumer) { MessageConsumer nativeConsumer = (MessageConsumer) consumer.getNativeData(NATIVE_CONSUMER); - return env.yieldAndRun(() -> { - CompletableFuture balFuture = new CompletableFuture<>(); - executorService.execute(() -> { - try { - Message message = nativeConsumer.receiveNoWait(); - if (Objects.isNull(message)) { - balFuture.complete(null); - } else { - BMap ballerinaMsg = getBallerinaMessage(message); - balFuture.complete(ballerinaMsg); - } - } catch (JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while receiving messages: %s", exception.getMessage()), - exception); - balFuture.complete(bError); - } catch (BallerinaJmsException exception) { - balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); - } catch (Exception exception) { - BError bError = createError(JMS_ERROR, - String.format("Unknown error occurred while processing the received messages: %s", - exception.getMessage()), exception); - balFuture.complete(bError); + CompletableFuture balFuture = new CompletableFuture<>(); + Thread.startVirtualThread(() -> { + try { + Message message = nativeConsumer.receiveNoWait(); + if (Objects.isNull(message)) { + balFuture.complete(null); + } else { + BMap ballerinaMsg = getBallerinaMessage(message); + balFuture.complete(ballerinaMsg); } - }); - return Util.getResult(balFuture); + } catch (JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while receiving messages: %s", exception.getMessage()), + exception); + balFuture.complete(bError); + } catch (BallerinaJmsException exception) { + balFuture.complete(createError(JMS_ERROR, exception.getMessage(), exception)); + } catch (Exception exception) { + BError bError = createError(JMS_ERROR, + String.format("Unknown error occurred while processing the received messages: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } }); + return Util.getResult(balFuture); } /** diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java index 15bf1601..2c6b4bf6 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/listener/ListenerImpl.java @@ -64,26 +64,28 @@ public ListenerImpl(BObject consumerService, Runtime ballerinaRuntime) { @Override public void onMessage(Message message) { - try { - Module module = ModuleUtils.getModule(); - StrandMetadata metadata = new StrandMetadata( - module.getOrg(), module.getName(), module.getVersion(), SERVICE_RESOURCE_ON_MESSAGE); - ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(consumerService)); - Object[] params = methodParameters(serviceType, message); - Object result; - if (serviceType.isIsolated() && serviceType.isIsolated(SERVICE_RESOURCE_ON_MESSAGE)) { - result = ballerinaRuntime.startIsolatedWorker( + Thread.startVirtualThread(() -> { + try { + Module module = ModuleUtils.getModule(); + StrandMetadata metadata = new StrandMetadata( + module.getOrg(), module.getName(), module.getVersion(), SERVICE_RESOURCE_ON_MESSAGE); + ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(consumerService)); + Object[] params = methodParameters(serviceType, message); + Object result; + if (serviceType.isIsolated() && serviceType.isIsolated(SERVICE_RESOURCE_ON_MESSAGE)) { + result = ballerinaRuntime.startIsolatedWorker( consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, null, params).get(); - } else { - result = ballerinaRuntime.startNonIsolatedWorker( - consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, null, params); + } else { + result = ballerinaRuntime.startNonIsolatedWorker( + consumerService, SERVICE_RESOURCE_ON_MESSAGE, null, metadata, null, params); + } + Util.notifySuccess(result); + } catch (JMSException | BallerinaJmsException e) { + LOGGER.error("Unexpected error occurred while async message processing", e); + } catch (BError bError) { + Util.notifyFailure(bError); } - Util.notifySuccess(result); - } catch (JMSException | BallerinaJmsException e) { - LOGGER.error("Unexpected error occurred while async message processing", e); - } catch (BError bError) { - Util.notifyFailure(bError); - } + }); } private Object[] methodParameters(ObjectType serviceType, Message message) diff --git a/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java b/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java index 52539c6e..93cd5974 100644 --- a/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java +++ b/native/src/main/java/io/ballerina/stdlib/java.jms/producer/Actions.java @@ -27,8 +27,6 @@ import io.ballerina.stdlib.java.jms.Util; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.jms.Destination; import javax.jms.JMSException; @@ -47,8 +45,6 @@ * Representation of {@link javax.jms.MessageProducer} with utility methods to invoke as inter-op functions. */ public class Actions { - private static final ExecutorService executorService = Executors.newCachedThreadPool(new ProducerThreadFactory()); - /** * Creates a {@link javax.jms.MessageProducer} object with given {@link javax.jms.Session}. * @@ -84,21 +80,19 @@ public static Object init(BObject producer, BObject session, Object destination) */ public static Object send(Environment env, BObject producer, Message message) { MessageProducer nativeProducer = (MessageProducer) producer.getNativeData(NATIVE_PRODUCER); - return env.yieldAndRun(() -> { - CompletableFuture balFuture = new CompletableFuture<>(); - executorService.execute(() -> { - try { - nativeProducer.send(message); - balFuture.complete(null); - } catch (UnsupportedOperationException | JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while sending a message to the JMS provider: %s", - exception.getMessage()), exception); - balFuture.complete(bError); - } - }); - return Util.getResult(balFuture); + CompletableFuture balFuture = new CompletableFuture<>(); + Thread.startVirtualThread(() -> { + try { + nativeProducer.send(message); + balFuture.complete(null); + } catch (UnsupportedOperationException | JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while sending a message to the JMS provider: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } }); + return Util.getResult(balFuture); } /** @@ -116,25 +110,23 @@ public static Object sendTo(Environment env, BObject producer, BObject session, Message message) { MessageProducer nativeProducer = (MessageProducer) producer.getNativeData(NATIVE_PRODUCER); Session nativeSession = (Session) session.getNativeData(NATIVE_SESSION); - return env.yieldAndRun(() -> { - CompletableFuture balFuture = new CompletableFuture<>(); - executorService.execute(() -> { - try { - Destination jmsDestination = getDestination(nativeSession, destination); - nativeProducer.send(jmsDestination, message); - balFuture.complete(null); - } catch (BallerinaJmsException exception) { - BError bError = createError(JMS_ERROR, exception.getMessage(), exception); - balFuture.complete(bError); - } catch (UnsupportedOperationException | JMSException exception) { - BError bError = createError(JMS_ERROR, - String.format("Error occurred while sending a message to the JMS provider: %s", - exception.getMessage()), exception); - balFuture.complete(bError); - } - }); - return Util.getResult(balFuture); + CompletableFuture balFuture = new CompletableFuture<>(); + Thread.startVirtualThread(() -> { + try { + Destination jmsDestination = getDestination(nativeSession, destination); + nativeProducer.send(jmsDestination, message); + balFuture.complete(null); + } catch (BallerinaJmsException exception) { + BError bError = createError(JMS_ERROR, exception.getMessage(), exception); + balFuture.complete(bError); + } catch (UnsupportedOperationException | JMSException exception) { + BError bError = createError(JMS_ERROR, + String.format("Error occurred while sending a message to the JMS provider: %s", + exception.getMessage()), exception); + balFuture.complete(bError); + } }); + return Util.getResult(balFuture); } /**