Skip to content

Commit

Permalink
[network] Improve threading
Browse files Browse the repository at this point in the history
* Use timeouts with CompletableFutures
* Use seperate executor when waiting for results
* Catch exceptions when joining CompletableFutures
* Stop previous detection when starting a new one

Fixes openhab#16305

Signed-off-by: Wouter Born <[email protected]>
  • Loading branch information
wborn committed Jan 22, 2024
1 parent 3d1c6ba commit 0629a30
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -92,6 +94,7 @@ public class PresenceDetection implements IPRequestReceivedCallback {
private Set<String> networkInterfaceNames = Set.of();
private @Nullable ScheduledFuture<?> refreshJob;
protected @Nullable ExecutorService detectionExecutorService;
protected @Nullable ExecutorService waitForResultExecutorService;
private String dhcpState = "off";
int detectionChecks;
private String lastReachableNetworkInterfaceName = "";
Expand Down Expand Up @@ -295,6 +298,21 @@ private void withDestinationAddress(Consumer<InetAddress> consumer) {
}
}

private void stopDetection() {
ExecutorService detectionExecutorService = this.detectionExecutorService;
if (detectionExecutorService != null) {
logger.debug("Shutting down detectionExecutorService");
detectionExecutorService.shutdownNow();
this.detectionExecutorService = null;
}
ExecutorService waitForResultExecutorService = this.waitForResultExecutorService;
if (waitForResultExecutorService != null) {
logger.debug("Shutting down waitForResultExecutorService");
waitForResultExecutorService.shutdownNow();
this.waitForResultExecutorService = null;
}
}

/**
* Perform a presence detection with ICMP-, ARP ping and TCP connection attempts simultaneously.
* A fixed thread pool will be created with as many threads as necessary to perform all tests at once.
Expand Down Expand Up @@ -333,50 +351,60 @@ public CompletableFuture<PresenceDetectionValue> performPresenceDetection() {
return CompletableFuture.completedFuture(pdv);
}

stopDetection();

ExecutorService detectionExecutorService = getThreadsFor(detectionChecks);
this.detectionExecutorService = detectionExecutorService;
ExecutorService waitForResultExecutorService = getThreadsFor(1);
this.waitForResultExecutorService = waitForResultExecutorService;

List<CompletableFuture<Void>> completableFutures = new ArrayList<>();

for (Integer tcpPort : tcpPorts) {
completableFutures.add(CompletableFuture.runAsync(() -> {
addAsyncDetection(completableFutures, () -> {
Thread.currentThread().setName("presenceDetectionTCP_" + hostname + " " + tcpPort);
performServicePing(pdv, tcpPort);
}, detectionExecutorService));
}, detectionExecutorService);
}

// ARP ping for IPv4 addresses. Use single executor for Windows tool and
// each own executor for each network interface for other tools
if (arpPingMethod == ArpPingUtilEnum.ELI_FULKERSON_ARP_PING_FOR_WINDOWS) {
completableFutures.add(CompletableFuture.runAsync(() -> {
addAsyncDetection(completableFutures, () -> {
Thread.currentThread().setName("presenceDetectionARP_" + hostname + " ");
// arp-ping.exe tool capable of handling multiple interfaces by itself
performArpPing(pdv, "");
}, detectionExecutorService));
}, detectionExecutorService);
} else if (interfaceNames != null) {
for (final String interfaceName : interfaceNames) {
completableFutures.add(CompletableFuture.runAsync(() -> {
addAsyncDetection(completableFutures, () -> {
Thread.currentThread().setName("presenceDetectionARP_" + hostname + " " + interfaceName);
performArpPing(pdv, interfaceName);
}, detectionExecutorService));
}, detectionExecutorService);
}
}

// ICMP ping
if (pingMethod != null) {
completableFutures.add(CompletableFuture.runAsync(() -> {
addAsyncDetection(completableFutures, () -> {
Thread.currentThread().setName("presenceDetectionICMP_" + hostname);
if (pingMethod == IpPingMethodEnum.JAVA_PING) {
performJavaPing(pdv);
} else {
performSystemPing(pdv);
}
}, detectionExecutorService));
}, detectionExecutorService);
}

return CompletableFuture.supplyAsync(() -> {
logger.debug("Waiting for {} detection futures for {} to complete", completableFutures.size(), hostname);
completableFutures.forEach(CompletableFuture::join);
completableFutures.forEach(completableFuture -> {
try {
completableFuture.join();
} catch (CancellationException | CompletionException e) {
logger.debug("Detection future failed to complete", e);
}
});
logger.debug("All {} detection futures for {} have completed", completableFutures.size(), hostname);

if (!pdv.isReachable()) {
Expand All @@ -392,7 +420,13 @@ public CompletableFuture<PresenceDetectionValue> performPresenceDetection() {
detectionChecks = 0;

return pdv;
}, scheduledExecutorService);
}, waitForResultExecutorService);
}

private void addAsyncDetection(List<CompletableFuture<Void>> completableFutures, Runnable detectionRunnable,
ExecutorService executorService) {
completableFutures.add(CompletableFuture.runAsync(detectionRunnable, executorService)
.orTimeout(timeout.plusSeconds(3).toMillis(), TimeUnit.MILLISECONDS));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class PresenceDetectionTest {

private @Mock @NonNullByDefault({}) Consumer<PresenceDetectionValue> callback;
private @Mock @NonNullByDefault({}) ExecutorService detectionExecutorService;
private @Mock @NonNullByDefault({}) ExecutorService waitForResultExecutorService;
private @Mock @NonNullByDefault({}) ScheduledExecutorService scheduledExecutorService;
private @Mock @NonNullByDefault({}) PresenceDetectionListener listener;
private @Mock @NonNullByDefault({}) NetworkUtils networkUtils;
Expand Down Expand Up @@ -90,6 +91,8 @@ public void threadCountTest() {
doNothing().when(subject).performSystemPing(any());
doNothing().when(subject).performServicePing(any(), anyInt());

doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1);

subject.getValue(callback -> {
});

Expand All @@ -99,7 +102,7 @@ public void threadCountTest() {

// "Wait" for the presence detection to finish
ArgumentCaptor<Runnable> runnableCapture = ArgumentCaptor.forClass(Runnable.class);
verify(scheduledExecutorService, times(1)).execute(runnableCapture.capture());
verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture());
runnableCapture.getValue().run();

assertThat(subject.detectionChecks, is(0));
Expand All @@ -114,7 +117,8 @@ public void partialAndFinalCallbackTests() throws InterruptedException, IOExcept
anyString(), any(), any());
doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any());

doReturn(detectionExecutorService).when(subject).getThreadsFor(anyInt());
doReturn(detectionExecutorService).when(subject).getThreadsFor(3);
doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1);

subject.performPresenceDetection();

Expand All @@ -129,7 +133,7 @@ public void partialAndFinalCallbackTests() throws InterruptedException, IOExcept

// "Wait" for the presence detection to finish
ArgumentCaptor<Runnable> runnableCapture = ArgumentCaptor.forClass(Runnable.class);
verify(scheduledExecutorService, times(1)).execute(runnableCapture.capture());
verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture());
runnableCapture.getValue().run();

assertThat(subject.detectionChecks, is(0));
Expand All @@ -154,7 +158,8 @@ public void cacheTest() throws InterruptedException, IOException {
anyString(), any(), any());
doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any());

doReturn(detectionExecutorService).when(subject).getThreadsFor(anyInt());
doReturn(detectionExecutorService).when(subject).getThreadsFor(3);
doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1);

// We expect no valid value
assertTrue(subject.cache.isExpired());
Expand All @@ -174,7 +179,7 @@ public void cacheTest() throws InterruptedException, IOException {

// "Wait" for the presence detection to finish
capture = ArgumentCaptor.forClass(Runnable.class);
verify(scheduledExecutorService, times(1)).execute(capture.capture());
verify(waitForResultExecutorService, times(1)).execute(capture.capture());
capture.getValue().run();

// Although there are multiple partial results and a final result,
Expand Down

0 comments on commit 0629a30

Please sign in to comment.