diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java index f6b4cd2b60e75..1f1e657cc5206 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java @@ -25,7 +25,9 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -92,6 +94,7 @@ public class PresenceDetection implements IPRequestReceivedCallback { private Set networkInterfaceNames = Set.of(); private @Nullable ScheduledFuture refreshJob; protected @Nullable ExecutorService detectionExecutorService; + protected @Nullable ExecutorService waitForResultExecutorService; private String dhcpState = "off"; int detectionChecks; private String lastReachableNetworkInterfaceName = ""; @@ -295,6 +298,21 @@ private void withDestinationAddress(Consumer consumer) { } } + private void stopDetection() { + ExecutorService detectionExecutorService = this.detectionExecutorService; + if (detectionExecutorService != null) { + logger.debug("Shutting down detectionExecutorService"); + detectionExecutorService.shutdownNow(); + this.detectionExecutorService = null; + } + ExecutorService waitForResultExecutorService = this.waitForResultExecutorService; + if (waitForResultExecutorService != null) { + logger.debug("Shutting down waitForResultExecutorService"); + waitForResultExecutorService.shutdownNow(); + this.waitForResultExecutorService = null; + } + } + /** * Perform a presence detection with ICMP-, ARP ping and TCP connection attempts simultaneously. * A fixed thread pool will be created with as many threads as necessary to perform all tests at once. @@ -333,50 +351,60 @@ public CompletableFuture performPresenceDetection() { return CompletableFuture.completedFuture(pdv); } + stopDetection(); + ExecutorService detectionExecutorService = getThreadsFor(detectionChecks); this.detectionExecutorService = detectionExecutorService; + ExecutorService waitForResultExecutorService = getThreadsFor(1); + this.waitForResultExecutorService = waitForResultExecutorService; List> completableFutures = new ArrayList<>(); for (Integer tcpPort : tcpPorts) { - completableFutures.add(CompletableFuture.runAsync(() -> { + addAsyncDetection(completableFutures, () -> { Thread.currentThread().setName("presenceDetectionTCP_" + hostname + " " + tcpPort); performServicePing(pdv, tcpPort); - }, detectionExecutorService)); + }, detectionExecutorService); } // ARP ping for IPv4 addresses. Use single executor for Windows tool and // each own executor for each network interface for other tools if (arpPingMethod == ArpPingUtilEnum.ELI_FULKERSON_ARP_PING_FOR_WINDOWS) { - completableFutures.add(CompletableFuture.runAsync(() -> { + addAsyncDetection(completableFutures, () -> { Thread.currentThread().setName("presenceDetectionARP_" + hostname + " "); // arp-ping.exe tool capable of handling multiple interfaces by itself performArpPing(pdv, ""); - }, detectionExecutorService)); + }, detectionExecutorService); } else if (interfaceNames != null) { for (final String interfaceName : interfaceNames) { - completableFutures.add(CompletableFuture.runAsync(() -> { + addAsyncDetection(completableFutures, () -> { Thread.currentThread().setName("presenceDetectionARP_" + hostname + " " + interfaceName); performArpPing(pdv, interfaceName); - }, detectionExecutorService)); + }, detectionExecutorService); } } // ICMP ping if (pingMethod != null) { - completableFutures.add(CompletableFuture.runAsync(() -> { + addAsyncDetection(completableFutures, () -> { Thread.currentThread().setName("presenceDetectionICMP_" + hostname); if (pingMethod == IpPingMethodEnum.JAVA_PING) { performJavaPing(pdv); } else { performSystemPing(pdv); } - }, detectionExecutorService)); + }, detectionExecutorService); } return CompletableFuture.supplyAsync(() -> { logger.debug("Waiting for {} detection futures for {} to complete", completableFutures.size(), hostname); - completableFutures.forEach(CompletableFuture::join); + completableFutures.forEach(completableFuture -> { + try { + completableFuture.join(); + } catch (CancellationException | CompletionException e) { + logger.debug("Detection future failed to complete", e); + } + }); logger.debug("All {} detection futures for {} have completed", completableFutures.size(), hostname); if (!pdv.isReachable()) { @@ -392,7 +420,13 @@ public CompletableFuture performPresenceDetection() { detectionChecks = 0; return pdv; - }, scheduledExecutorService); + }, waitForResultExecutorService); + } + + private void addAsyncDetection(List> completableFutures, Runnable detectionRunnable, + ExecutorService executorService) { + completableFutures.add(CompletableFuture.runAsync(detectionRunnable, executorService) + .orTimeout(timeout.plusSeconds(3).toMillis(), TimeUnit.MILLISECONDS)); } /** diff --git a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java index bf9d09fc6a8e7..0f770e9157a21 100644 --- a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java +++ b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java @@ -53,6 +53,7 @@ public class PresenceDetectionTest { private @Mock @NonNullByDefault({}) Consumer callback; private @Mock @NonNullByDefault({}) ExecutorService detectionExecutorService; + private @Mock @NonNullByDefault({}) ExecutorService waitForResultExecutorService; private @Mock @NonNullByDefault({}) ScheduledExecutorService scheduledExecutorService; private @Mock @NonNullByDefault({}) PresenceDetectionListener listener; private @Mock @NonNullByDefault({}) NetworkUtils networkUtils; @@ -90,6 +91,8 @@ public void threadCountTest() { doNothing().when(subject).performSystemPing(any()); doNothing().when(subject).performServicePing(any(), anyInt()); + doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1); + subject.getValue(callback -> { }); @@ -99,7 +102,7 @@ public void threadCountTest() { // "Wait" for the presence detection to finish ArgumentCaptor runnableCapture = ArgumentCaptor.forClass(Runnable.class); - verify(scheduledExecutorService, times(1)).execute(runnableCapture.capture()); + verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture()); runnableCapture.getValue().run(); assertThat(subject.detectionChecks, is(0)); @@ -114,7 +117,8 @@ public void partialAndFinalCallbackTests() throws InterruptedException, IOExcept anyString(), any(), any()); doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any()); - doReturn(detectionExecutorService).when(subject).getThreadsFor(anyInt()); + doReturn(detectionExecutorService).when(subject).getThreadsFor(3); + doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1); subject.performPresenceDetection(); @@ -129,7 +133,7 @@ public void partialAndFinalCallbackTests() throws InterruptedException, IOExcept // "Wait" for the presence detection to finish ArgumentCaptor runnableCapture = ArgumentCaptor.forClass(Runnable.class); - verify(scheduledExecutorService, times(1)).execute(runnableCapture.capture()); + verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture()); runnableCapture.getValue().run(); assertThat(subject.detectionChecks, is(0)); @@ -154,7 +158,8 @@ public void cacheTest() throws InterruptedException, IOException { anyString(), any(), any()); doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any()); - doReturn(detectionExecutorService).when(subject).getThreadsFor(anyInt()); + doReturn(detectionExecutorService).when(subject).getThreadsFor(3); + doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1); // We expect no valid value assertTrue(subject.cache.isExpired()); @@ -174,7 +179,7 @@ public void cacheTest() throws InterruptedException, IOException { // "Wait" for the presence detection to finish capture = ArgumentCaptor.forClass(Runnable.class); - verify(scheduledExecutorService, times(1)).execute(capture.capture()); + verify(waitForResultExecutorService, times(1)).execute(capture.capture()); capture.getValue().run(); // Although there are multiple partial results and a final result,