diff --git a/.github/workflows/build-with-ballerina-lang.yml b/.github/workflows/build-with-ballerina-lang.yml index a2d18d67..eb020988 100644 --- a/.github/workflows/build-with-ballerina-lang.yml +++ b/.github/workflows/build-with-ballerina-lang.yml @@ -30,11 +30,11 @@ jobs: repository: 'ballerina-platform/ballerina-lang' ref: ${{ inputs.lang_tag || 'master' }} - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v3 with: distribution: 'temurin' - java-version: 17.0.7 + java-version: 21.0.3 - name: Set Ballerina Lang version run: | @@ -77,11 +77,11 @@ jobs: repository: 'ballerina-platform/ballerina-lang' ref: ${{ inputs.lang_tag || 'master' }} - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v3 with: distribution: 'temurin' - java-version: 17.0.7 + java-version: 21.0.3 - name: Set Ballerina Lang version run: | diff --git a/.github/workflows/daily-build.yml b/.github/workflows/daily-build.yml index 66b7fa7e..1e1bb745 100644 --- a/.github/workflows/daily-build.yml +++ b/.github/workflows/daily-build.yml @@ -12,11 +12,11 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Set up JDK 17 + - name: Set up JDK 21 uses: actions/setup-java@v3 with: distribution: 'temurin' - java-version: 17.0.7 + java-version: 21.0.3 - name: Set environment variable if: github.event.action == 'check_connector_for_breaking_changes' diff --git a/README.md b/README.md index f704fdd1..c56db00a 100644 --- a/README.md +++ b/README.md @@ -131,7 +131,7 @@ This repository only contains the source code for the library. ### Set up the prerequisites -* Download and install Java SE Development Kit (JDK) version 17 (from one of the following locations). +* Download and install Java SE Development Kit (JDK) version 21 (from one of the following locations). * [Oracle](https://www.oracle.com/java/technologies/downloads/) diff --git a/ballerina/Ballerina.toml b/ballerina/Ballerina.toml index d57a8c10..1a7291a5 100644 --- a/ballerina/Ballerina.toml +++ b/ballerina/Ballerina.toml @@ -1,40 +1,40 @@ [package] org = "ballerinax" name = "kafka" -version = "4.2.0" +version = "4.3.0" authors = ["Ballerina"] keywords = ["kafka", "event streaming", "network", "messaging"] repository = "https://github.com/ballerina-platform/module-ballerinax-kafka" icon = "icon.png" license = ["Apache-2.0"] -distribution = "2201.10.0" +distribution = "2201.10.0-20241025-103700-5c9e6a27" -[platform.java17] +[platform.java21] graalvmCompatible = true -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "kafka-native" -version = "4.2.0" -path = "../native/build/libs/kafka-native-4.2.0.jar" +version = "4.3.0" +path = "../native/build/libs/kafka-native-4.3.0-SNAPSHOT.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.apache.kafka" artifactId = "kafka-clients" version = "3.5.1" path = "./lib/kafka-clients-3.5.1.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.apache.kafka" artifactId = "kafka_2.12" version = "3.5.1" path = "./lib/kafka_2.12-3.5.1.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "constraint-native" -version = "1.5.0" -path = "./lib/constraint-native-1.5.0.jar" +version = "1.6.0" +path = "./lib/constraint-native-1.6.0-20241113-090900-d276ad5.jar" [build-options] observabilityIncluded=true diff --git a/ballerina/CompilerPlugin.toml b/ballerina/CompilerPlugin.toml index 250ecf94..3c5e476a 100644 --- a/ballerina/CompilerPlugin.toml +++ b/ballerina/CompilerPlugin.toml @@ -3,4 +3,4 @@ id = "kafka-compiler-plugin" class = "io.ballerina.stdlib.kafka.plugin.KafkaCompilerPlugin" [[dependency]] -path = "../compiler-plugin/build/libs/kafka-compiler-plugin-4.2.0.jar" +path = "../compiler-plugin/build/libs/kafka-compiler-plugin-4.3.0-SNAPSHOT.jar" diff --git a/ballerina/Dependencies.toml b/ballerina/Dependencies.toml index 6fe4c66f..f7987bbb 100644 --- a/ballerina/Dependencies.toml +++ b/ballerina/Dependencies.toml @@ -5,12 +5,12 @@ [ballerina] dependencies-toml-version = "2" -distribution-version = "2201.10.0" +distribution-version = "2201.11.0-20241112-214900-6b80ab87" [[package]] org = "ballerina" name = "auth" -version = "2.12.0" +version = "2.12.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "crypto"}, @@ -23,7 +23,7 @@ dependencies = [ [[package]] org = "ballerina" name = "cache" -version = "3.8.0" +version = "3.8.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "constraint"}, @@ -35,7 +35,7 @@ dependencies = [ [[package]] org = "ballerina" name = "constraint" -version = "1.5.0" +version = "1.6.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"} @@ -47,7 +47,7 @@ modules = [ [[package]] org = "ballerina" name = "crypto" -version = "2.7.2" +version = "2.7.3" dependencies = [ {org = "ballerina", name = "jballerina.java"}, {org = "ballerina", name = "time"} @@ -56,10 +56,20 @@ modules = [ {org = "ballerina", packageName = "crypto", moduleName = "crypto"} ] +[[package]] +org = "ballerina" +name = "data.jsondata" +version = "0.3.0" +scope = "testOnly" +dependencies = [ + {org = "ballerina", name = "jballerina.java"}, + {org = "ballerina", name = "lang.object"} +] + [[package]] org = "ballerina" name = "file" -version = "1.10.0" +version = "1.10.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, @@ -71,13 +81,14 @@ dependencies = [ [[package]] org = "ballerina" name = "http" -version = "2.12.0" +version = "2.13.0" scope = "testOnly" dependencies = [ {org = "ballerina", name = "auth"}, {org = "ballerina", name = "cache"}, {org = "ballerina", name = "constraint"}, {org = "ballerina", name = "crypto"}, + {org = "ballerina", name = "data.jsondata"}, {org = "ballerina", name = "file"}, {org = "ballerina", name = "io"}, {org = "ballerina", name = "jballerina.java"}, @@ -100,7 +111,7 @@ dependencies = [ [[package]] org = "ballerina" name = "io" -version = "1.6.1" +version = "1.6.2" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"}, @@ -121,7 +132,7 @@ modules = [ [[package]] org = "ballerina" name = "jwt" -version = "2.13.0" +version = "2.13.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "cache"}, @@ -244,7 +255,7 @@ modules = [ [[package]] org = "ballerina" name = "log" -version = "2.10.0" +version = "2.10.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, @@ -259,7 +270,7 @@ modules = [ [[package]] org = "ballerina" name = "mime" -version = "2.10.0" +version = "2.10.2" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, @@ -271,7 +282,7 @@ dependencies = [ [[package]] org = "ballerina" name = "oauth2" -version = "2.12.0" +version = "2.12.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "cache"}, @@ -285,7 +296,7 @@ dependencies = [ [[package]] org = "ballerina" name = "observe" -version = "1.3.0" +version = "1.4.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -293,7 +304,7 @@ dependencies = [ [[package]] org = "ballerina" name = "os" -version = "1.8.0" +version = "1.8.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "io"}, @@ -303,7 +314,7 @@ dependencies = [ [[package]] org = "ballerina" name = "task" -version = "2.5.0" +version = "2.5.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"}, @@ -327,7 +338,7 @@ modules = [ [[package]] org = "ballerina" name = "time" -version = "2.4.0" +version = "2.6.0" dependencies = [ {org = "ballerina", name = "jballerina.java"} ] @@ -335,7 +346,7 @@ dependencies = [ [[package]] org = "ballerina" name = "url" -version = "2.4.0" +version = "2.4.1" scope = "testOnly" dependencies = [ {org = "ballerina", name = "jballerina.java"} @@ -344,7 +355,7 @@ dependencies = [ [[package]] org = "ballerina" name = "uuid" -version = "1.8.0" +version = "1.8.1" dependencies = [ {org = "ballerina", name = "crypto"}, {org = "ballerina", name = "jballerina.java"}, @@ -392,7 +403,7 @@ modules = [ [[package]] org = "ballerinax" name = "kafka" -version = "4.2.0" +version = "4.3.0" dependencies = [ {org = "ballerina", name = "constraint"}, {org = "ballerina", name = "crypto"}, diff --git a/ballerina/tests/listener_client_tests.bal b/ballerina/tests/listener_client_tests.bal index 3e025d22..21ed9649 100644 --- a/ballerina/tests/listener_client_tests.bal +++ b/ballerina/tests/listener_client_tests.bal @@ -271,7 +271,7 @@ function listenerConfigErrorTest() returns error? { } @test:Config { - enable: true, + enable: false, dependsOn: [consumerServiceCommitTest] } function consumerServiceCommitOffsetTest() returns error? { @@ -309,7 +309,7 @@ function consumerServiceCommitOffsetTest() returns error? { check serviceConsumer.gracefulStop(); } -@test:Config {enable: true} +@test:Config {enable: false} function consumerServiceCommitTest() returns error? { string topic = "listener-commit-test-topic"; kafkaTopics.push(topic); diff --git a/ballerina/tests/producer_client_tests.bal b/ballerina/tests/producer_client_tests.bal index 049999da..932748e9 100644 --- a/ballerina/tests/producer_client_tests.bal +++ b/ballerina/tests/producer_client_tests.bal @@ -180,7 +180,7 @@ function producerGetTopicPartitionsErrorTest() returns error? { check producer->close(); } -@test:Config {enable: true} +@test:Config {enable: false} function transactionalProducerTest() returns error? { string topic = "transactional-producer-test-topic"; kafkaTopics.push(topic); @@ -217,7 +217,7 @@ function transactionalProducerTest() returns error? { check consumer->close(); } -@test:Config {enable: true} +@test:Config {enable: false} function transactionalProducerWithAbortTest() returns error? { string topic = "rollback-producer-test-topic"; kafkaTopics.push(topic); diff --git a/build-config/resources/Ballerina.toml b/build-config/resources/Ballerina.toml index 2e134c70..2221c229 100644 --- a/build-config/resources/Ballerina.toml +++ b/build-config/resources/Ballerina.toml @@ -7,30 +7,30 @@ keywords = ["kafka", "event streaming", "network", "messaging"] repository = "https://github.com/ballerina-platform/module-ballerinax-kafka" icon = "icon.png" license = ["Apache-2.0"] -distribution = "2201.11.0" +distribution = "2201.10.0-20241025-103700-5c9e6a27" -[platform.java17] +[platform.java21] graalvmCompatible = true -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "kafka-native" version = "@toml.version@" path = "../native/build/libs/kafka-native-@project.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.apache.kafka" artifactId = "kafka-clients" version = "@kafka.version@" path = "./lib/kafka-clients-@kafka.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "org.apache.kafka" artifactId = "kafka_2.12" version = "@kafka.version@" path = "./lib/kafka_2.12-@kafka.version@.jar" -[[platform.java17.dependency]] +[[platform.java21.dependency]] groupId = "io.ballerina.stdlib" artifactId = "constraint-native" version = "@constraint.version@" diff --git a/build.gradle b/build.gradle index aaadd90f..745d77da 100644 --- a/build.gradle +++ b/build.gradle @@ -16,10 +16,10 @@ */ plugins { - id "com.github.spotbugs" version "5.0.14" - id "com.github.johnrengelman.shadow" version "8.1.1" - id "de.undercouch.download" version "5.4.0" - id "net.researchgate.release" version "2.8.0" + id "com.github.spotbugs" version "${spotbugsPluginVersion}" + id "com.github.johnrengelman.shadow" version "${shadowJarPluginVersion}" + id "de.undercouch.download" version "${downloadPluginVersion}" + id "net.researchgate.release" version "${releasePluginVersion}" } allprojects { diff --git a/compiler-plugin-tests/build.gradle b/compiler-plugin-tests/build.gradle index 191319ae..de132d3c 100644 --- a/compiler-plugin-tests/build.gradle +++ b/compiler-plugin-tests/build.gradle @@ -50,8 +50,11 @@ checkstyle { checkstyleTest.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsTest { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/compiler-plugin/build.gradle b/compiler-plugin/build.gradle index 953258dc..b29edc83 100644 --- a/compiler-plugin/build.gradle +++ b/compiler-plugin/build.gradle @@ -45,8 +45,11 @@ tasks.withType(Checkstyle) { checkstyleMain.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsMain { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/gradle.properties b/gradle.properties index ee4bee0a..e2ae873f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,41 +2,45 @@ org.gradle.caching=true group=io.ballerina.stdlib version=4.3.0-SNAPSHOT kafkaVersion=3.5.1 -ballerinaLangVersion=2201.11.0-20241008-112400-81975006 +ballerinaLangVersion=2201.11.0-20241112-214900-6b80ab87 ballerinaGradlePluginVersion=2.0.1 puppycrawlCheckstyleVersion=10.12.1 -awaitalityVersion=3.1.6 +awaitalityVersion=4.2.2 slf4jVersion=1.7.30 testngVersion=7.6.1 gsonVersion=2.8.8 jacocoVersion=0.8.10 +spotbugsPluginVersion=6.0.18 +shadowJarPluginVersion=8.1.1 +downloadPluginVersion=5.4.0 +releasePluginVersion=2.8.0 -stdlibIoVersion=1.6.1 -stdlibTimeVersion=2.4.0 -stdlibUrlVersion=2.4.0 +stdlibIoVersion=1.6.2-20241112-233100-995cf5f +stdlibTimeVersion=2.6.0-20241113-073800-201b904 +stdlibUrlVersion=2.4.1-20241113-073900-335ff51 -stdlibConstraintVersion=1.5.0 -stdlibCryptoVersion=2.7.2 -stdlibLogVersion=2.10.0 -stdlibOsVersion=1.8.0 -stdlibRandomVersion=1.5.0 -stdlibTaskVersion=2.5.0 +stdlibConstraintVersion=1.6.0-20241113-090900-d276ad5 +stdlibCryptoVersion=2.7.3-20241113-081400-d015a39 +stdlibLogVersion=2.10.1-20241113-120000-4577868 +stdlibOsVersion=1.8.1-20241113-122000-cca973b +stdlibRandomVersion=1.5.1-20241113-122300-1bc770e +stdlibTaskVersion=2.5.1-20241113-123500-f905281 -stdlibCacheVersion=3.8.0 -stdlibFileVersion=1.10.0 -stdlibMimeVersion=2.10.0 -stdlibUuidVersion=1.8.0 +stdlibCacheVersion=3.8.1-20241113-125700-b75a1bf +stdlibFileVersion=1.10.1-20241113-151700-e1a2e38 +stdlibMimeVersion=2.10.2-20241113-154200-d953747 +stdlibUuidVersion=1.8.1-20241113-154400-443c67b -stdlibAuthVersion=2.12.0 -stdlibDataJsonDataVersion=0.3.0-20241105-101100-661d11f -stdlibJwtVersion=2.13.0 -stdlibOAuth2Version=2.12.0 +stdlibAuthVersion=2.12.1-20241113-162300-ded40eb +stdlibDataJsonDataVersion=0.3.0-20241114-143900-285d739 +stdlibJwtVersion=2.13.1-20241113-162400-b59ccfa +stdlibOAuth2Version=2.12.1-20241113-162400-4c6ddfe -stdlibHttpVersion=2.13.0-20241106-120000-d375c3b +stdlibHttpVersion=2.13.0-20241114-182900-7e9f66a -stdlibTransactionVersion=1.10.0 +stdlibTransactionVersion=1.10.1-20241116-112500-189a4e5 # Ballerinax Observer -observeVersion=1.3.0 -observeInternalVersion=1.3.0 +observeVersion=1.4.0-20241113-092000-b83ae74 +observeInternalVersion=1.3.1-20241113-101700-265054d diff --git a/native/build.gradle b/native/build.gradle index f0dcc298..66c50820 100644 --- a/native/build.gradle +++ b/native/build.gradle @@ -48,8 +48,11 @@ tasks.withType(Checkstyle) { checkstyleMain.dependsOn(":checkstyle:downloadCheckstyleRuleFiles") spotbugsMain { - effort "max" - reportLevel "low" + def classLoader = plugins["com.github.spotbugs"].class.classLoader + def SpotBugsConfidence = classLoader.findLoadedClass("com.github.spotbugs.snom.Confidence") + def SpotBugsEffort = classLoader.findLoadedClass("com.github.spotbugs.snom.Effort") + effort = SpotBugsEffort.MAX + reportLevel = SpotBugsConfidence.LOW reportsDir = file("$project.buildDir/reports/spotbugs") reports { html.enabled true diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaListenerImpl.java b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaListenerImpl.java index d67b028d..540bb7bb 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaListenerImpl.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaListenerImpl.java @@ -18,10 +18,8 @@ package io.ballerina.stdlib.kafka.impl; -import io.ballerina.runtime.api.PredefinedTypes; -import io.ballerina.runtime.api.Runtime; -import io.ballerina.runtime.api.async.Callback; -import io.ballerina.runtime.api.async.StrandMetadata; +import io.ballerina.runtime.api.Environment; +import io.ballerina.runtime.api.concurrent.StrandMetadata; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.ArrayType; import io.ballerina.runtime.api.types.IntersectionType; @@ -56,9 +54,9 @@ import java.util.concurrent.Semaphore; import java.util.stream.Stream; -import static io.ballerina.runtime.api.TypeTags.ARRAY_TAG; -import static io.ballerina.runtime.api.TypeTags.INTERSECTION_TAG; -import static io.ballerina.runtime.api.TypeTags.OBJECT_TYPE_TAG; +import static io.ballerina.runtime.api.types.TypeTags.ARRAY_TAG; +import static io.ballerina.runtime.api.types.TypeTags.INTERSECTION_TAG; +import static io.ballerina.runtime.api.types.TypeTags.OBJECT_TYPE_TAG; import static io.ballerina.runtime.api.utils.TypeUtils.getReferredType; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.CONSTRAINT_VALIDATION; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.CONSUMER_CONFIG_FIELD_NAME; @@ -71,7 +69,6 @@ import static io.ballerina.stdlib.kafka.utils.KafkaConstants.PARAM_PAYLOAD_ANNOTATION_NAME; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.TYPE_CHECKER_OBJECT_NAME; import static io.ballerina.stdlib.kafka.utils.KafkaUtils.createKafkaError; -import static io.ballerina.stdlib.kafka.utils.KafkaUtils.getAttachedFunctionReturnType; import static io.ballerina.stdlib.kafka.utils.KafkaUtils.getAutoCommitConfig; import static io.ballerina.stdlib.kafka.utils.KafkaUtils.getAutoSeekOnErrorConfig; import static io.ballerina.stdlib.kafka.utils.KafkaUtils.getConsumerRecords; @@ -84,10 +81,10 @@ public class KafkaListenerImpl implements KafkaListener { private final BObject service; private final BObject listener; - private final Runtime bRuntime; + private final Environment env; - public KafkaListenerImpl(BObject listener, BObject service, Runtime bRuntime) { - this.bRuntime = bRuntime; + public KafkaListenerImpl(BObject listener, BObject service, Environment env) { + this.env = env; this.listener = listener; this.service = service; } @@ -118,51 +115,51 @@ public void onError(Throwable t) { } private void executeResource(BObject listener, KafkaPollCycleFutureListener consumer, ConsumerRecords records) { - StrandMetadata metadata = getStrandMetadata(KAFKA_RESOURCE_ON_RECORD); - Map properties = null; - Type returnType = null; KafkaConsumer kafkaConsumer = (KafkaConsumer) listener.getNativeData(NATIVE_CONSUMER); - if (ObserveUtils.isTracingEnabled()) { - properties = getNewObserverContextInProperties(listener); - returnType = getAttachedFunctionReturnType(service, KAFKA_RESOURCE_ON_RECORD); - } ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(service)); - if (serviceType.isIsolated() && serviceType.isIsolated(KAFKA_RESOURCE_ON_RECORD)) { - bRuntime.invokeMethodAsyncConcurrently(service, KAFKA_RESOURCE_ON_RECORD, null, metadata, - consumer, properties, returnType == null ? PredefinedTypes.TYPE_NULL : returnType, - getResourceParameters(service, this.listener, records, kafkaConsumer)); - } else { - bRuntime.invokeMethodAsyncSequentially(service, KAFKA_RESOURCE_ON_RECORD, null, metadata, - consumer, properties, returnType == null ? PredefinedTypes.TYPE_NULL : returnType, - getResourceParameters(service, this.listener, records, kafkaConsumer)); - } + Thread.startVirtualThread(() -> { + Map properties = null; + if (ObserveUtils.isTracingEnabled()) { + properties = getNewObserverContextInProperties(listener); + } + try { + boolean isIsolated = serviceType.isIsolated() && serviceType.isIsolated(KAFKA_RESOURCE_ON_RECORD); + StrandMetadata metadata = new StrandMetadata(isIsolated, properties); + Object result = env.getRuntime().callMethod(service, KAFKA_RESOURCE_ON_RECORD, metadata, + getResourceParameters(service, this.listener, records, kafkaConsumer)); + consumer.notifySuccess(result); + } catch (BError bError) { + consumer.notifyFailure(bError); + onError(bError); + } + }); } private void executeOnError(MethodType onErrorMethod, Throwable throwable) { - StrandMetadata metadata = getStrandMetadata(KAFKA_RESOURCE_ON_ERROR); - Map properties = null; - if (ObserveUtils.isTracingEnabled()) { - properties = getNewObserverContextInProperties(listener); - } - Object[] arguments = new Object[onErrorMethod.getParameters().length * 2]; + Object[] arguments = new Object[onErrorMethod.getParameters().length]; if (throwable instanceof BError) { arguments[0] = throwable; } else { - arguments[0] = KafkaUtils.createKafkaError(throwable.getMessage()); + arguments[0] = createKafkaError(throwable.getMessage()); } - arguments[1] = true; - if (arguments.length == 4) { - arguments[2] = createCaller(this.listener); - arguments[3] = true; + if (arguments.length == 2) { + arguments[1] = createCaller(this.listener); } ObjectType serviceType = (ObjectType) TypeUtils.getReferredType(TypeUtils.getType(service)); - if (serviceType.isIsolated() && serviceType.isIsolated(KAFKA_RESOURCE_ON_ERROR)) { - bRuntime.invokeMethodAsyncConcurrently(service, KAFKA_RESOURCE_ON_ERROR, null, metadata, - new KafkaOnErrorCallback(), properties, PredefinedTypes.TYPE_NULL, arguments); - } else { - bRuntime.invokeMethodAsyncSequentially(service, KAFKA_RESOURCE_ON_ERROR, null, metadata, - new KafkaOnErrorCallback(), properties, PredefinedTypes.TYPE_NULL, arguments); - } + Thread.startVirtualThread(() -> { + Map properties = null; + if (ObserveUtils.isTracingEnabled()) { + properties = getNewObserverContextInProperties(listener); + } + try { + boolean isIsolated = serviceType.isIsolated() && serviceType.isIsolated(KAFKA_RESOURCE_ON_ERROR); + StrandMetadata metadata = new StrandMetadata(isIsolated, properties); + Object result = env.getRuntime().callMethod(service, KAFKA_RESOURCE_ON_ERROR, metadata, arguments); + (new KafkaOnErrorCallback()).notifySuccess(result); + } catch (BError bError) { + (new KafkaOnErrorCallback()).notifyFailure(bError); + } + }); } private Map getNewObserverContextInProperties(BObject listener) { @@ -181,18 +178,17 @@ public Object[] getResourceParameters(BObject service, BObject listener, Consume boolean callerExists = false; boolean consumerRecordsExists = false; boolean payloadExists = false; - Object[] arguments = new Object[parameters.length * 2]; + Object[] arguments = new Object[parameters.length]; int index = 0; for (Parameter parameter : parameters) { Type referredType = getReferredType(parameter.type); switch (referredType.getTag()) { case OBJECT_TYPE_TAG: if (callerExists) { - throw KafkaUtils.createKafkaError("Invalid remote function signature"); + throw createKafkaError("Invalid remote function signature"); } callerExists = true; arguments[index++] = createCaller(listener); - arguments[index++] = true; break; case INTERSECTION_TAG: case ARRAY_TAG: @@ -202,27 +198,25 @@ public Object[] getResourceParameters(BObject service, BObject listener, Consume boolean autoSeek = getAutoSeekOnErrorConfig(listener); if (isConsumerRecordsType(parameter, consumerRecordMethodType.getAnnotations())) { if (consumerRecordsExists) { - throw KafkaUtils.createKafkaError("Invalid remote function signature"); + throw createKafkaError("Invalid remote function signature"); } consumerRecordsExists = true; BArray consumerRecords = getConsumerRecords(records, (RecordType) getIntendedType(referredType), referredType.isReadOnly(), constraintValidation, autoCommit, kafkaConsumer, autoSeek); arguments[index++] = consumerRecords; - arguments[index++] = true; } else { if (payloadExists) { - throw KafkaUtils.createKafkaError("Invalid remote function signature"); + throw createKafkaError("Invalid remote function signature"); } payloadExists = true; BArray payload = getValuesWithIntendedType(referredType, kafkaConsumer, records, constraintValidation, autoCommit, autoSeek); arguments[index++] = payload; - arguments[index++] = true; } break; default: - throw KafkaUtils.createKafkaError("Invalid remote function signature"); + throw createKafkaError("Invalid remote function signature"); } } return arguments; @@ -274,10 +268,16 @@ private boolean invokeIsAnydataConsumerRecordTypeMethod(Type paramType) { BObject client = ValueCreator.createObjectValue(ModuleUtils.getModule(), TYPE_CHECKER_OBJECT_NAME); Semaphore sem = new Semaphore(0); KafkaRecordTypeCheckCallback recordTypeCheckCallback = new KafkaRecordTypeCheckCallback(sem); - StrandMetadata metadata = getStrandMetadata(KAFKA_RESOURCE_IS_ANYDATA_CONSUMER_RECORD); - bRuntime.invokeMethodAsyncSequentially(client, KAFKA_RESOURCE_IS_ANYDATA_CONSUMER_RECORD, null, metadata, - recordTypeCheckCallback, null, PredefinedTypes.TYPE_BOOLEAN, - ValueCreator.createTypedescValue(paramType), true); + Thread.startVirtualThread(() -> { + try { + Object result = env.getRuntime().callMethod(client, + KAFKA_RESOURCE_IS_ANYDATA_CONSUMER_RECORD, new StrandMetadata(false, null), + ValueCreator.createTypedescValue(paramType)); + recordTypeCheckCallback.notifySuccess(result); + } catch (BError bError) { + recordTypeCheckCallback.notifyFailure(bError); + } + }); try { sem.acquire(); } catch (InterruptedException e) { @@ -286,20 +286,13 @@ private boolean invokeIsAnydataConsumerRecordTypeMethod(Type paramType) { return recordTypeCheckCallback.getIsConsumerRecordType(); } - private StrandMetadata getStrandMetadata(String parentFunctionName) { - return new StrandMetadata(ModuleUtils.getModule().getOrg(), - ModuleUtils.getModule().getName(), ModuleUtils.getModule().getMajorVersion(), parentFunctionName); - } - - static class KafkaOnErrorCallback implements Callback { - @Override + static class KafkaOnErrorCallback { public void notifySuccess(Object result) { if (result instanceof BError) { ((BError) result).printStackTrace(); } } - @Override public void notifyFailure(BError bError) { bError.printStackTrace(); System.exit(1); diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaPollCycleFutureListener.java b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaPollCycleFutureListener.java index 340da3da..3c536b73 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaPollCycleFutureListener.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaPollCycleFutureListener.java @@ -18,7 +18,6 @@ package io.ballerina.stdlib.kafka.impl; -import io.ballerina.runtime.api.async.Callback; import io.ballerina.runtime.api.values.BError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +28,7 @@ * {@code KafkaPollCycleFutureListener} listener provides ability control poll cycle flow by notifications received from * Ballerina side. */ -public class KafkaPollCycleFutureListener implements Callback { +public class KafkaPollCycleFutureListener { private static final Logger logger = LoggerFactory.getLogger(KafkaPollCycleFutureListener.class); @@ -54,7 +53,6 @@ public KafkaPollCycleFutureListener(Semaphore sem, String serviceId) { /** * {@inheritDoc} */ - @Override public void notifySuccess(Object obj) { sem.release(); if (obj instanceof BError) { @@ -70,7 +68,6 @@ public void notifySuccess(Object obj) { /** * {@inheritDoc} */ - @Override public void notifyFailure(BError error) { sem.release(); if (logger.isDebugEnabled()) { @@ -78,7 +75,6 @@ public void notifyFailure(BError error) { ". Semaphore is released to continue next polling cycle.", error.toString()); } error.printStackTrace(); - System.exit(1); } } diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaRecordTypeCheckCallback.java b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaRecordTypeCheckCallback.java index 224b53d4..d635da89 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaRecordTypeCheckCallback.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaRecordTypeCheckCallback.java @@ -18,15 +18,13 @@ package io.ballerina.stdlib.kafka.impl; -import io.ballerina.runtime.api.async.Callback; - import java.util.concurrent.Semaphore; /** * {@code KafkaRecordTypeCheckCallback} provides ability to check whether a given type is a subtype of * kafka:AnydataConsumerRecord. */ -public class KafkaRecordTypeCheckCallback implements Callback { +public class KafkaRecordTypeCheckCallback { private final Semaphore semaphore; private Boolean isConsumerRecordType = false; @@ -38,7 +36,6 @@ public KafkaRecordTypeCheckCallback(Semaphore semaphore) { /** * {@inheritDoc} */ - @Override public void notifySuccess(Object obj) { isConsumerRecordType = (Boolean) obj; semaphore.release(); @@ -47,7 +44,6 @@ public void notifySuccess(Object obj) { /** * {@inheritDoc} */ - @Override public void notifyFailure(io.ballerina.runtime.api.values.BError error) { semaphore.release(); error.printStackTrace(); diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/ConsumerInformationHandler.java b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/ConsumerInformationHandler.java index ac746dd7..1ac16c2f 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/ConsumerInformationHandler.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/ConsumerInformationHandler.java @@ -19,10 +19,10 @@ package io.ballerina.stdlib.kafka.nativeimpl.consumer; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.PredefinedTypes; import io.ballerina.runtime.api.creators.TypeCreator; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.ArrayType; +import io.ballerina.runtime.api.types.PredefinedTypes; import io.ballerina.runtime.api.utils.StringUtils; import io.ballerina.runtime.api.values.BArray; import io.ballerina.runtime.api.values.BDecimal; diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/Poll.java b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/Poll.java index ad463e2a..562b3f5a 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/Poll.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/Poll.java @@ -19,7 +19,6 @@ package io.ballerina.stdlib.kafka.nativeimpl.consumer; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.creators.ValueCreator; import io.ballerina.runtime.api.types.ArrayType; import io.ballerina.runtime.api.types.IntersectionType; @@ -33,11 +32,13 @@ import io.ballerina.stdlib.kafka.observability.KafkaMetricsUtil; import io.ballerina.stdlib.kafka.observability.KafkaObservabilityConstants; import io.ballerina.stdlib.kafka.observability.KafkaTracingUtil; +import io.ballerina.stdlib.kafka.utils.ModuleUtils; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaException; import java.time.Duration; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -63,10 +64,10 @@ public class Poll { public static Object poll(Environment env, BObject consumerObject, BDecimal timeout, BTypedesc bTypedesc) { KafkaTracingUtil.traceResourceInvocation(env, consumerObject); - Future balFuture = env.markAsync(); + CompletableFuture balFuture = new CompletableFuture<>(); KafkaConsumer kafkaConsumer = (KafkaConsumer) consumerObject.getNativeData(NATIVE_CONSUMER); RecordType recordType = getRecordType(bTypedesc); - executorService.execute(()-> { + Thread.startVirtualThread(() -> { try { Duration duration = Duration.ofMillis(getMilliSeconds(timeout)); boolean constraintValidation = (boolean) consumerObject.getMapValue(CONSUMER_CONFIG_FIELD_NAME) @@ -89,14 +90,14 @@ public static Object poll(Environment env, BObject consumerObject, BDecimal time balFuture.complete(e); } }); - return null; + return ModuleUtils.getResult(balFuture); } public static Object pollPayload(Environment env, BObject consumerObject, BDecimal timeout, BTypedesc bTypedesc) { KafkaTracingUtil.traceResourceInvocation(env, consumerObject); - Future balFuture = env.markAsync(); + CompletableFuture balFuture = new CompletableFuture<>(); KafkaConsumer kafkaConsumer = (KafkaConsumer) consumerObject.getNativeData(NATIVE_CONSUMER); - executorService.execute(()-> { + Thread.startVirtualThread(() -> { try { Duration duration = Duration.ofMillis(getMilliSeconds(timeout)); ArrayType arrayType = (ArrayType) TypeUtils.getImpliedType(bTypedesc.getDescribingType()); @@ -122,7 +123,7 @@ public static Object pollPayload(Environment env, BObject consumerObject, BDecim balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage())); } }); - return null; + return ModuleUtils.getResult(balFuture); } private static RecordType getRecordType(BTypedesc bTypedesc) { diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/Send.java b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/Send.java index 837ef2a4..62b2bc26 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/Send.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/producer/Send.java @@ -19,17 +19,18 @@ package io.ballerina.stdlib.kafka.nativeimpl.producer; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Future; import io.ballerina.runtime.api.values.BObject; import io.ballerina.runtime.transactions.TransactionResourceManager; import io.ballerina.stdlib.kafka.observability.KafkaMetricsUtil; import io.ballerina.stdlib.kafka.observability.KafkaObservabilityConstants; import io.ballerina.stdlib.kafka.observability.KafkaTracingUtil; +import io.ballerina.stdlib.kafka.utils.ModuleUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -49,12 +50,12 @@ public class Send { @SuppressWarnings(UNCHECKED) protected static Object sendKafkaRecord(Environment env, ProducerRecord record, BObject producerObject) { KafkaTracingUtil.traceResourceInvocation(env, producerObject, record.topic()); - final Future balFuture = env.markAsync(); + final CompletableFuture balFuture = new CompletableFuture<>(); KafkaProducer producer = (KafkaProducer) producerObject.getNativeData(NATIVE_PRODUCER); if (TransactionResourceManager.getInstance().isInTransaction()) { handleTransactions(producerObject); } - executorService.execute(() -> { + Thread.startVirtualThread(() -> { try { producer.send(record, (metadata, e) -> { if (Objects.nonNull(e)) { @@ -71,7 +72,7 @@ protected static Object sendKafkaRecord(Environment env, ProducerRecord record, balFuture.complete(createKafkaError("Failed to send data to Kafka server: " + e.getMessage())); } }); - return null; + return ModuleUtils.getResult(balFuture); } static class KafkaThreadFactory implements ThreadFactory { diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/service/Register.java b/native/src/main/java/io/ballerina/stdlib/kafka/service/Register.java index 07c3bd0d..1543272a 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/service/Register.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/service/Register.java @@ -19,7 +19,6 @@ package io.ballerina.stdlib.kafka.service; import io.ballerina.runtime.api.Environment; -import io.ballerina.runtime.api.Runtime; import io.ballerina.runtime.api.utils.TypeUtils; import io.ballerina.runtime.api.values.BMap; import io.ballerina.runtime.api.values.BObject; @@ -51,14 +50,13 @@ public static Object register(Environment env, BObject listener, BObject service Object bootStrapServer = listener.get(CONSUMER_BOOTSTRAP_SERVERS_CONFIG); BMap listenerConfigurations = listener.getMapValue(CONSUMER_CONFIG_FIELD_NAME); Properties configs = KafkaUtils.processKafkaConsumerConfig(bootStrapServer, listenerConfigurations); - Runtime runtime = env.getRuntime(); try { KafkaConsumer kafkaConsumer = null; if (Objects.nonNull(listener.getNativeData(NATIVE_CONSUMER))) { kafkaConsumer = (KafkaConsumer) listener.getNativeData(NATIVE_CONSUMER); } - KafkaListener kafkaListener = new KafkaListenerImpl(listener, service, runtime); + KafkaListener kafkaListener = new KafkaListenerImpl(listener, service, env); String serviceId = TypeUtils.getType(service).getQualifiedName(); KafkaServerConnector serverConnector = new KafkaServerConnectorImpl(serviceId, configs, kafkaListener, kafkaConsumer); diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java index 67e4411f..7b3460a8 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java @@ -18,8 +18,6 @@ package io.ballerina.stdlib.kafka.utils; -import io.ballerina.runtime.api.PredefinedTypes; -import io.ballerina.runtime.api.TypeTags; import io.ballerina.runtime.api.creators.ErrorCreator; import io.ballerina.runtime.api.creators.TypeCreator; import io.ballerina.runtime.api.creators.ValueCreator; @@ -29,8 +27,10 @@ import io.ballerina.runtime.api.types.MapType; import io.ballerina.runtime.api.types.MethodType; import io.ballerina.runtime.api.types.ObjectType; +import io.ballerina.runtime.api.types.PredefinedTypes; import io.ballerina.runtime.api.types.RecordType; import io.ballerina.runtime.api.types.Type; +import io.ballerina.runtime.api.types.TypeTags; import io.ballerina.runtime.api.types.UnionType; import io.ballerina.runtime.api.utils.JsonUtils; import io.ballerina.runtime.api.utils.StringUtils; @@ -74,13 +74,13 @@ import java.util.Objects; import java.util.Properties; -import static io.ballerina.runtime.api.TypeTags.ANYDATA_TAG; -import static io.ballerina.runtime.api.TypeTags.ARRAY_TAG; -import static io.ballerina.runtime.api.TypeTags.BYTE_TAG; -import static io.ballerina.runtime.api.TypeTags.INTERSECTION_TAG; -import static io.ballerina.runtime.api.TypeTags.STRING_TAG; -import static io.ballerina.runtime.api.TypeTags.UNION_TAG; -import static io.ballerina.runtime.api.TypeTags.XML_TAG; +import static io.ballerina.runtime.api.types.TypeTags.ANYDATA_TAG; +import static io.ballerina.runtime.api.types.TypeTags.ARRAY_TAG; +import static io.ballerina.runtime.api.types.TypeTags.BYTE_TAG; +import static io.ballerina.runtime.api.types.TypeTags.INTERSECTION_TAG; +import static io.ballerina.runtime.api.types.TypeTags.STRING_TAG; +import static io.ballerina.runtime.api.types.TypeTags.UNION_TAG; +import static io.ballerina.runtime.api.types.TypeTags.XML_TAG; import static io.ballerina.runtime.api.utils.TypeUtils.getReferredType; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.ADDITIONAL_PROPERTIES_MAP_FIELD; import static io.ballerina.stdlib.kafka.utils.KafkaConstants.CONSUMER_CONFIG_FIELD_NAME; diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/ModuleUtils.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/ModuleUtils.java index 8d948eeb..1ba9c254 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/ModuleUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/ModuleUtils.java @@ -20,8 +20,11 @@ import io.ballerina.runtime.api.Environment; import io.ballerina.runtime.api.Module; +import io.ballerina.runtime.api.creators.ErrorCreator; +import io.ballerina.runtime.api.values.BError; import java.util.Date; +import java.util.concurrent.CompletableFuture; import java.util.logging.ConsoleHandler; import java.util.logging.Level; import java.util.logging.LogManager; @@ -81,4 +84,17 @@ public synchronized String format(LogRecord lr) { LogManager.getLogManager().addLogger(apacheKafkaLogger); LogManager.getLogManager().addLogger(balKafkaLogger); } + + public static Object getResult(CompletableFuture balFuture) { + try { + return balFuture.get(); + } catch (BError error) { + throw error; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw ErrorCreator.createError(e); + } catch (Throwable throwable) { + throw ErrorCreator.createError(throwable); + } + } } diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml index ef48f543..cacc0aad 100644 --- a/spotbugs-exclude.xml +++ b/spotbugs-exclude.xml @@ -65,4 +65,12 @@ + + + + + + + +