Skip to content

Commit

Permalink
Remove executor service
Browse files Browse the repository at this point in the history
  • Loading branch information
HindujaB committed Nov 19, 2024
1 parent 35a8b6d commit 4a534ae
Showing 1 changed file with 18 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.ballerina.runtime.api.values.BTypedesc;
import io.ballerina.stdlib.mqtt.utils.MqttConstants;
import io.ballerina.stdlib.mqtt.utils.MqttUtils;
import io.ballerina.stdlib.mqtt.utils.Util;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
Expand All @@ -41,7 +40,6 @@

import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -109,25 +107,19 @@ public static Object externPublish(Environment env, BObject clientObject, BStrin
MqttClient publisher = (MqttClient) clientObject.getNativeData(MqttConstants.MQTT_CLIENT);
MqttMessage mqttMessage = generateMqttMessage(message);
return env.yieldAndRun(() -> {
CompletableFuture<Object> future = new CompletableFuture<>();
try {
publisher.publish(topic.getValue(), mqttMessage);
LinkedBlockingQueue deliveryTokenQueue = (LinkedBlockingQueue) clientObject
.getNativeData(DELIVERY_TOKEN_QUEUE);
publishExecutorService.execute(() -> {
try {
future.complete(deliveryTokenQueue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
future.complete(MqttUtils.createMqttError(e));
}
});
} catch (MqttException e) {
publisher.publish(topic.getValue(), mqttMessage);
LinkedBlockingQueue deliveryTokenQueue = (LinkedBlockingQueue) clientObject
.getNativeData(DELIVERY_TOKEN_QUEUE);
return deliveryTokenQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return MqttUtils.createMqttError(e);
}
return Util.getResult(future);
});
}
} catch(MqttException e){
return MqttUtils.createMqttError(e);
}
});
}

public static Object externReceive(BObject clientObject, BTypedesc bTypedesc) {
LinkedBlockingQueue blockingQueue = (LinkedBlockingQueue) clientObject.getNativeData(RESPONSE_QUEUE);
Expand Down Expand Up @@ -182,19 +174,14 @@ public static Object externReconnect(BObject clientObject) {

public static Object nextResult(Environment env, BObject streamIterator) {
BlockingQueue<?> messageQueue = (BlockingQueue<?>) streamIterator.getNativeData(RESPONSE_QUEUE);
ExecutorService executor = (ExecutorService) streamIterator.getNativeData(RESPONSE_EXECUTOR_SERVICE);
return env.yieldAndRun(() -> {
CompletableFuture<Object> future = new CompletableFuture<>();
executor.execute(() -> {
try {
BMap message = (BMap) messageQueue.take();
future.complete(message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
future.complete(MqttUtils.createMqttError(e));
}
});
return Util.getResult(future);
try {
BMap message = (BMap) messageQueue.take();
return message;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return MqttUtils.createMqttError(e);
}
});
}

Expand Down

0 comments on commit 4a534ae

Please sign in to comment.