From 4a534ae492028377046156d8342ddc72703edbe3 Mon Sep 17 00:00:00 2001 From: hindujaB Date: Tue, 19 Nov 2024 11:52:31 +0530 Subject: [PATCH] Remove executor service --- .../stdlib/mqtt/client/ClientActions.java | 49 +++++++------------ 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/mqtt/client/ClientActions.java b/native/src/main/java/io/ballerina/stdlib/mqtt/client/ClientActions.java index 97d74f4..f2ff86d 100644 --- a/native/src/main/java/io/ballerina/stdlib/mqtt/client/ClientActions.java +++ b/native/src/main/java/io/ballerina/stdlib/mqtt/client/ClientActions.java @@ -31,7 +31,6 @@ import io.ballerina.runtime.api.values.BTypedesc; import io.ballerina.stdlib.mqtt.utils.MqttConstants; import io.ballerina.stdlib.mqtt.utils.MqttUtils; -import io.ballerina.stdlib.mqtt.utils.Util; import org.eclipse.paho.mqttv5.client.MqttClient; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; @@ -41,7 +40,6 @@ import java.util.ArrayList; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -109,25 +107,19 @@ public static Object externPublish(Environment env, BObject clientObject, BStrin MqttClient publisher = (MqttClient) clientObject.getNativeData(MqttConstants.MQTT_CLIENT); MqttMessage mqttMessage = generateMqttMessage(message); return env.yieldAndRun(() -> { - CompletableFuture future = new CompletableFuture<>(); try { - publisher.publish(topic.getValue(), mqttMessage); - LinkedBlockingQueue deliveryTokenQueue = (LinkedBlockingQueue) clientObject - .getNativeData(DELIVERY_TOKEN_QUEUE); - publishExecutorService.execute(() -> { - try { - future.complete(deliveryTokenQueue.take()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - future.complete(MqttUtils.createMqttError(e)); - } - }); - } catch (MqttException e) { + publisher.publish(topic.getValue(), mqttMessage); + LinkedBlockingQueue deliveryTokenQueue = (LinkedBlockingQueue) clientObject + .getNativeData(DELIVERY_TOKEN_QUEUE); + return deliveryTokenQueue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); return MqttUtils.createMqttError(e); - } - return Util.getResult(future); - }); - } + } catch(MqttException e){ + return MqttUtils.createMqttError(e); + } + }); +} public static Object externReceive(BObject clientObject, BTypedesc bTypedesc) { LinkedBlockingQueue blockingQueue = (LinkedBlockingQueue) clientObject.getNativeData(RESPONSE_QUEUE); @@ -182,19 +174,14 @@ public static Object externReconnect(BObject clientObject) { public static Object nextResult(Environment env, BObject streamIterator) { BlockingQueue messageQueue = (BlockingQueue) streamIterator.getNativeData(RESPONSE_QUEUE); - ExecutorService executor = (ExecutorService) streamIterator.getNativeData(RESPONSE_EXECUTOR_SERVICE); return env.yieldAndRun(() -> { - CompletableFuture future = new CompletableFuture<>(); - executor.execute(() -> { - try { - BMap message = (BMap) messageQueue.take(); - future.complete(message); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - future.complete(MqttUtils.createMqttError(e)); - } - }); - return Util.getResult(future); + try { + BMap message = (BMap) messageQueue.take(); + return message; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return MqttUtils.createMqttError(e); + } }); }