Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[mqtt.homie] Improve Homie discovery time (openhab#7760)
Browse files Browse the repository at this point in the history
* Improve Homie discovery time

Signed-off-by: Aitor Iturrioz <riturrioz@gmail.com>
Signed-off-by: CSchlipp <christian@schlipp.de>
bodiroga authored and CSchlipp committed Jul 26, 2020
1 parent 8452247 commit 0514ebc
Showing 2 changed files with 15 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.eclipse.jdt.annotation.NonNullByDefault;
@@ -37,7 +38,7 @@ public class DelayedBatchProcessing<T> implements Consumer<T> {
private final Consumer<List<T>> consumer;
private final List<T> queue = Collections.synchronizedList(new ArrayList<>());
private final ScheduledExecutorService executor;
protected @Nullable ScheduledFuture<?> future;
protected final AtomicReference<@Nullable ScheduledFuture<?>> futureRef = new AtomicReference<>();

/**
* Creates a {@link DelayedBatchProcessing}.
@@ -56,18 +57,15 @@ public DelayedBatchProcessing(int delay, Consumer<List<T>> consumer, ScheduledEx
}

/**
* Add new object to the batch process list. If the list was empty, the delay timer
* is armed and all successive objects are accumulated from here on.
* Add new object to the batch process list. Every time a new object is received,
* the delay timer is rescheduled.
*
* @param t An object
*/
@Override
public void accept(T t) {
queue.add(t);
final ScheduledFuture<?> scheduledFuture = this.future;
if (scheduledFuture == null || scheduledFuture.isDone()) {
this.future = executor.schedule(this::run, delay, TimeUnit.MILLISECONDS);
}
cancel(futureRef.getAndSet(executor.schedule(this::run, delay, TimeUnit.MILLISECONDS)));
}

/**
@@ -76,10 +74,7 @@ public void accept(T t) {
* @return A list of accumulated objects
*/
public List<T> join() {
ScheduledFuture<?> scheduledFuture = this.future;
if (scheduledFuture != null && !scheduledFuture.isDone()) {
scheduledFuture.cancel(false);
}
cancel(futureRef.getAndSet(null));
List<T> lqueue = new ArrayList<>();
synchronized (queue) {
lqueue.addAll(queue);
@@ -92,18 +87,15 @@ public List<T> join() {
* Return true if there is a delayed processing going on.
*/
public boolean isArmed() {
ScheduledFuture<?> scheduledFuture = this.future;
ScheduledFuture<?> scheduledFuture = this.futureRef.get();
return scheduledFuture != null && !scheduledFuture.isDone();
}

/**
* Deliver queued items now to the target consumer.
*/
public void forceProcessNow() {
ScheduledFuture<?> scheduledFuture = this.future;
if (scheduledFuture != null && !scheduledFuture.isDone()) {
scheduledFuture.cancel(false);
}
cancel(futureRef.getAndSet(null));
run();
}

@@ -118,4 +110,10 @@ private void run() {
consumer.accept(lqueue);
}
}

private static void cancel(@Nullable ScheduledFuture<?> future) {
if (future != null) {
future.cancel(false);
}
}
}
Original file line number Diff line number Diff line change
@@ -89,7 +89,7 @@ protected void unsetChannelProvider(MqttChannelTypeProvider provider) {
ThingTypeUID thingTypeUID = thing.getThingTypeUID();

if (thingTypeUID.equals(MqttBindingConstants.HOMIE300_MQTT_THING)) {
return new HomieThingHandler(thing, typeProvider, 15000, 2000);
return new HomieThingHandler(thing, typeProvider, 1000, 500);
}
return null;
}

0 comments on commit 0514ebc

Please sign in to comment.