diff --git a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/tools/DelayedBatchProcessing.java b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/tools/DelayedBatchProcessing.java index e85fec7180d06..d358d329402e4 100644 --- a/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/tools/DelayedBatchProcessing.java +++ b/bundles/org.openhab.binding.mqtt.generic/src/main/java/org/openhab/binding/mqtt/generic/tools/DelayedBatchProcessing.java @@ -18,6 +18,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -37,7 +38,7 @@ public class DelayedBatchProcessing implements Consumer { private final Consumer> consumer; private final List queue = Collections.synchronizedList(new ArrayList<>()); private final ScheduledExecutorService executor; - protected @Nullable ScheduledFuture future; + protected final AtomicReference<@Nullable ScheduledFuture> futureRef = new AtomicReference<>(); /** * Creates a {@link DelayedBatchProcessing}. @@ -56,18 +57,15 @@ public DelayedBatchProcessing(int delay, Consumer> consumer, ScheduledEx } /** - * Add new object to the batch process list. If the list was empty, the delay timer - * is armed and all successive objects are accumulated from here on. + * Add new object to the batch process list. Every time a new object is received, + * the delay timer is rescheduled. * * @param t An object */ @Override public void accept(T t) { queue.add(t); - final ScheduledFuture scheduledFuture = this.future; - if (scheduledFuture == null || scheduledFuture.isDone()) { - this.future = executor.schedule(this::run, delay, TimeUnit.MILLISECONDS); - } + cancel(futureRef.getAndSet(executor.schedule(this::run, delay, TimeUnit.MILLISECONDS))); } /** @@ -76,10 +74,7 @@ public void accept(T t) { * @return A list of accumulated objects */ public List join() { - ScheduledFuture scheduledFuture = this.future; - if (scheduledFuture != null && !scheduledFuture.isDone()) { - scheduledFuture.cancel(false); - } + cancel(futureRef.getAndSet(null)); List lqueue = new ArrayList<>(); synchronized (queue) { lqueue.addAll(queue); @@ -92,7 +87,7 @@ public List join() { * Return true if there is a delayed processing going on. */ public boolean isArmed() { - ScheduledFuture scheduledFuture = this.future; + ScheduledFuture scheduledFuture = this.futureRef.get(); return scheduledFuture != null && !scheduledFuture.isDone(); } @@ -100,10 +95,7 @@ public boolean isArmed() { * Deliver queued items now to the target consumer. */ public void forceProcessNow() { - ScheduledFuture scheduledFuture = this.future; - if (scheduledFuture != null && !scheduledFuture.isDone()) { - scheduledFuture.cancel(false); - } + cancel(futureRef.getAndSet(null)); run(); } @@ -118,4 +110,10 @@ private void run() { consumer.accept(lqueue); } } + + private static void cancel(@Nullable ScheduledFuture future) { + if (future != null) { + future.cancel(false); + } + } } diff --git a/bundles/org.openhab.binding.mqtt.homie/src/main/java/org/openhab/binding/mqtt/homie/generic/internal/MqttThingHandlerFactory.java b/bundles/org.openhab.binding.mqtt.homie/src/main/java/org/openhab/binding/mqtt/homie/generic/internal/MqttThingHandlerFactory.java index bd8ca41ba412e..45df573aed3fd 100644 --- a/bundles/org.openhab.binding.mqtt.homie/src/main/java/org/openhab/binding/mqtt/homie/generic/internal/MqttThingHandlerFactory.java +++ b/bundles/org.openhab.binding.mqtt.homie/src/main/java/org/openhab/binding/mqtt/homie/generic/internal/MqttThingHandlerFactory.java @@ -89,7 +89,7 @@ protected void unsetChannelProvider(MqttChannelTypeProvider provider) { ThingTypeUID thingTypeUID = thing.getThingTypeUID(); if (thingTypeUID.equals(MqttBindingConstants.HOMIE300_MQTT_THING)) { - return new HomieThingHandler(thing, typeProvider, 15000, 2000); + return new HomieThingHandler(thing, typeProvider, 1000, 500); } return null; }