From 277fe837c34b6662a91d12f96432c359dad3c51d Mon Sep 17 00:00:00 2001 From: Noah Beard Date: Tue, 6 Jun 2023 17:42:21 -0400 Subject: [PATCH 1/4] Adjust the Greengrass Discovery client to use its own executor service --- .../awssdk/iot/discovery/DiscoveryClient.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java index d8c8900bd..b9c23985b 100644 --- a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java +++ b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java @@ -13,6 +13,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Class for performing network-based discovery of the connectivity properties of registered greengrass cores @@ -35,11 +37,19 @@ public class DiscoveryClient implements AutoCloseable { private final HttpClientConnectionManager httpClientConnectionManager; + /** + * We need to use a custom executor to avoid leaving threads alive when the discovery client is closed + * It also fixes issues with the SecurityManager as well - as created ExecutorServices created via code + * inherit the permissions of the application, unlike the default common thread pool. + */ + private final ExecutorService executorService; + /** * * @param config Greengrass discovery client configuration */ public DiscoveryClient(final DiscoveryClientConfig config) { + this.executorService = Executors.newFixedThreadPool(1); this.httpClientConnectionManager = HttpClientConnectionManager.create( new HttpClientConnectionManagerOptions() .withClientBootstrap(config.getBootstrap()) @@ -102,11 +112,11 @@ public void onResponseComplete(HttpStream httpStream, int errorCode) { catch(InterruptedException | ExecutionException e) { throw new RuntimeException(e.getMessage(), e); } - }); + }, executorService); } private static String getHostname(final DiscoveryClientConfig config) { - //allow greengrass server endpoint to be manualy set for unique endpoints + //allow greengrass server endpoint to be manually set for unique endpoints if (config.getGGServerName().equals("")) { return String.format("greengrass-ats.iot.%s.%s", config.getRegion(), AWS_DOMAIN_SUFFIX_MAP.getOrDefault(config.getRegion(), AWS_DOMAIN_DEFAULT)); @@ -120,5 +130,8 @@ public void close() { if(httpClientConnectionManager != null) { httpClientConnectionManager.close(); } + if (executorService != null) { + executorService.shutdown(); + } } } From 35f3d58cbed7d967ede57eae12d9542fc7fc6bb5 Mon Sep 17 00:00:00 2001 From: Noah Beard Date: Wed, 7 Jun 2023 15:25:50 -0400 Subject: [PATCH 2/4] Instead of using supplyAsync, create a Thread manually and handle the async stuff manually that way --- .../awssdk/iot/discovery/DiscoveryClient.java | 102 +++++++++--------- 1 file changed, 48 insertions(+), 54 deletions(-) diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java index b9c23985b..755ee1c03 100644 --- a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java +++ b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java @@ -12,9 +12,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * Class for performing network-based discovery of the connectivity properties of registered greengrass cores @@ -37,19 +34,11 @@ public class DiscoveryClient implements AutoCloseable { private final HttpClientConnectionManager httpClientConnectionManager; - /** - * We need to use a custom executor to avoid leaving threads alive when the discovery client is closed - * It also fixes issues with the SecurityManager as well - as created ExecutorServices created via code - * inherit the permissions of the application, unlike the default common thread pool. - */ - private final ExecutorService executorService; - /** * * @param config Greengrass discovery client configuration */ public DiscoveryClient(final DiscoveryClientConfig config) { - this.executorService = Executors.newFixedThreadPool(1); this.httpClientConnectionManager = HttpClientConnectionManager.create( new HttpClientConnectionManagerOptions() .withClientBootstrap(config.getBootstrap()) @@ -70,49 +59,57 @@ public CompletableFuture discover(final String thingName) { if(thingName == null) { throw new IllegalArgumentException("ThingName cannot be null!"); } - return CompletableFuture.supplyAsync(() -> { - try(final HttpClientConnection connection = httpClientConnectionManager.acquireConnection().get()) { - final String requestHttpPath = "/greengrass/discover/thing/" + thingName; - final HttpHeader[] headers = new HttpHeader[] { - new HttpHeader("host", httpClientConnectionManager.getUri().getHost()) - }; - final HttpRequest request = new HttpRequest("GET", requestHttpPath, headers, null); - //we are storing everything until we get the entire response - final CompletableFuture responseComplete = new CompletableFuture<>(); - final StringBuilder jsonBodyResponseBuilder = new StringBuilder(); - final Map responseInfo = new HashMap<>(); - try(final HttpStream stream = connection.makeRequest(request, new HttpStreamResponseHandler() { - @Override - public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] httpHeaders) { - Arrays.stream(httpHeaders).forEach(header -> { - responseInfo.put(header.getName(), header.getValue()); - }); - } - @Override - public int onResponseBody(HttpStream stream, byte bodyBytes[]) { - jsonBodyResponseBuilder.append(new String(bodyBytes, StandardCharsets.UTF_8)); - return bodyBytes.length; + + CompletableFuture result = new CompletableFuture<>(); + // We create and start a thread so it executes asynchronously and independently from the main process. The thread will set the + // result CompletableFuture when it is done or if an exception occurs. + new Thread(() -> { + try { + try (final HttpClientConnection connection = httpClientConnectionManager.acquireConnection().get()) { + final String requestHttpPath = "/greengrass/discover/thing/" + thingName; + final HttpHeader[] headers = new HttpHeader[] { + new HttpHeader("host", httpClientConnectionManager.getUri().getHost()) + }; + final HttpRequest request = new HttpRequest("GET", requestHttpPath, headers, null); + //we are storing everything until we get the entire response + final CompletableFuture responseComplete = new CompletableFuture<>(); + final StringBuilder jsonBodyResponseBuilder = new StringBuilder(); + final Map responseInfo = new HashMap<>(); + + try (final HttpStream stream = connection.makeRequest(request, new HttpStreamResponseHandler() { + @Override + public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] httpHeaders) { + Arrays.stream(httpHeaders).forEach(header -> { + responseInfo.put(header.getName(), header.getValue()); + }); + } + @Override + public int onResponseBody(HttpStream stream, byte bodyBytes[]) { + jsonBodyResponseBuilder.append(new String(bodyBytes, StandardCharsets.UTF_8)); + return bodyBytes.length; + } + @Override + public void onResponseComplete(HttpStream httpStream, int errorCode) { + responseComplete.complete(errorCode); + } + })) + { + stream.activate(); + responseComplete.get(); + if (stream.getResponseStatusCode() != 200) { + throw new RuntimeException(String.format("Error %s(%d); RequestId: %s", + HTTP_HEADER_ERROR_TYPE, stream.getResponseStatusCode(), HTTP_HEADER_REQUEST_ID)); + } + final String responseString = jsonBodyResponseBuilder.toString(); + result.complete(GSON.fromJson(new StringReader(responseString), DiscoverResponse.class)); } - @Override - public void onResponseComplete(HttpStream httpStream, int errorCode) { - responseComplete.complete(errorCode); - }})) { - stream.activate(); - responseComplete.get(); - if (stream.getResponseStatusCode() != 200) { - throw new RuntimeException(String.format("Error %s(%d); RequestId: %s", - HTTP_HEADER_ERROR_TYPE, stream.getResponseStatusCode(), HTTP_HEADER_REQUEST_ID)); - } - final String responseString = jsonBodyResponseBuilder.toString(); - return GSON.fromJson(new StringReader(responseString), DiscoverResponse.class); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); } + } catch(Exception ex) { + result.completeExceptionally(ex); } - catch(InterruptedException | ExecutionException e) { - throw new RuntimeException(e.getMessage(), e); - } - }, executorService); + }).start(); + + return result; } private static String getHostname(final DiscoveryClientConfig config) { @@ -130,8 +127,5 @@ public void close() { if(httpClientConnectionManager != null) { httpClientConnectionManager.close(); } - if (executorService != null) { - executorService.shutdown(); - } } } From f9018521ce23a96593628ed7fc3c84411295d810 Mon Sep 17 00:00:00 2001 From: Noah Beard Date: Wed, 7 Jun 2023 17:26:26 -0400 Subject: [PATCH 3/4] Go back to using an executor, but allow the executor to be set in the config --- .../awssdk/iot/discovery/DiscoveryClient.java | 112 ++++++++++-------- .../iot/discovery/DiscoveryClientConfig.java | 28 ++++- 2 files changed, 90 insertions(+), 50 deletions(-) diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java index 755ee1c03..73effaeca 100644 --- a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java +++ b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java @@ -12,6 +12,9 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Class for performing network-based discovery of the connectivity properties of registered greengrass cores @@ -34,12 +37,27 @@ public class DiscoveryClient implements AutoCloseable { private final HttpClientConnectionManager httpClientConnectionManager; + /** + * We need to use a code defined executor to avoid leaving threads alive when the discovery client is closed + * It also fixes issues with the SecurityManager as well - as created ExecutorServices created via code + * inherit the permissions of the application, unlike the default common thread pool that is used by default + * with supplyAsync otherwise. + */ + private ExecutorService executorService = null; + private boolean cleanExecutor = false; + /** * * @param config Greengrass discovery client configuration */ public DiscoveryClient(final DiscoveryClientConfig config) { - this.httpClientConnectionManager = HttpClientConnectionManager.create( + executorService = config.getDiscoveryExecutor(); + if (executorService == null) { + // If an executor is not set, then create one and make sure to clean it when finished. + executorService = Executors.newFixedThreadPool(1); + cleanExecutor = true; + } + httpClientConnectionManager = HttpClientConnectionManager.create( new HttpClientConnectionManagerOptions() .withClientBootstrap(config.getBootstrap()) .withProxyOptions(config.getProxyOptions()) @@ -59,57 +77,49 @@ public CompletableFuture discover(final String thingName) { if(thingName == null) { throw new IllegalArgumentException("ThingName cannot be null!"); } - - CompletableFuture result = new CompletableFuture<>(); - // We create and start a thread so it executes asynchronously and independently from the main process. The thread will set the - // result CompletableFuture when it is done or if an exception occurs. - new Thread(() -> { - try { - try (final HttpClientConnection connection = httpClientConnectionManager.acquireConnection().get()) { - final String requestHttpPath = "/greengrass/discover/thing/" + thingName; - final HttpHeader[] headers = new HttpHeader[] { - new HttpHeader("host", httpClientConnectionManager.getUri().getHost()) - }; - final HttpRequest request = new HttpRequest("GET", requestHttpPath, headers, null); - //we are storing everything until we get the entire response - final CompletableFuture responseComplete = new CompletableFuture<>(); - final StringBuilder jsonBodyResponseBuilder = new StringBuilder(); - final Map responseInfo = new HashMap<>(); - - try (final HttpStream stream = connection.makeRequest(request, new HttpStreamResponseHandler() { - @Override - public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] httpHeaders) { - Arrays.stream(httpHeaders).forEach(header -> { - responseInfo.put(header.getName(), header.getValue()); - }); - } - @Override - public int onResponseBody(HttpStream stream, byte bodyBytes[]) { - jsonBodyResponseBuilder.append(new String(bodyBytes, StandardCharsets.UTF_8)); - return bodyBytes.length; - } - @Override - public void onResponseComplete(HttpStream httpStream, int errorCode) { - responseComplete.complete(errorCode); - } - })) - { - stream.activate(); - responseComplete.get(); - if (stream.getResponseStatusCode() != 200) { - throw new RuntimeException(String.format("Error %s(%d); RequestId: %s", - HTTP_HEADER_ERROR_TYPE, stream.getResponseStatusCode(), HTTP_HEADER_REQUEST_ID)); - } - final String responseString = jsonBodyResponseBuilder.toString(); - result.complete(GSON.fromJson(new StringReader(responseString), DiscoverResponse.class)); + return CompletableFuture.supplyAsync(() -> { + try(final HttpClientConnection connection = httpClientConnectionManager.acquireConnection().get()) { + final String requestHttpPath = "/greengrass/discover/thing/" + thingName; + final HttpHeader[] headers = new HttpHeader[] { + new HttpHeader("host", httpClientConnectionManager.getUri().getHost()) + }; + final HttpRequest request = new HttpRequest("GET", requestHttpPath, headers, null); + //we are storing everything until we get the entire response + final CompletableFuture responseComplete = new CompletableFuture<>(); + final StringBuilder jsonBodyResponseBuilder = new StringBuilder(); + final Map responseInfo = new HashMap<>(); + try(final HttpStream stream = connection.makeRequest(request, new HttpStreamResponseHandler() { + @Override + public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] httpHeaders) { + Arrays.stream(httpHeaders).forEach(header -> { + responseInfo.put(header.getName(), header.getValue()); + }); + } + @Override + public int onResponseBody(HttpStream stream, byte bodyBytes[]) { + jsonBodyResponseBuilder.append(new String(bodyBytes, StandardCharsets.UTF_8)); + return bodyBytes.length; } + @Override + public void onResponseComplete(HttpStream httpStream, int errorCode) { + responseComplete.complete(errorCode); + }})) { + stream.activate(); + responseComplete.get(); + if (stream.getResponseStatusCode() != 200) { + throw new RuntimeException(String.format("Error %s(%d); RequestId: %s", + HTTP_HEADER_ERROR_TYPE, stream.getResponseStatusCode(), HTTP_HEADER_REQUEST_ID)); + } + final String responseString = jsonBodyResponseBuilder.toString(); + return GSON.fromJson(new StringReader(responseString), DiscoverResponse.class); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); } - } catch(Exception ex) { - result.completeExceptionally(ex); } - }).start(); - - return result; + catch(InterruptedException | ExecutionException e) { + throw new RuntimeException(e.getMessage(), e); + } + }, executorService); } private static String getHostname(final DiscoveryClientConfig config) { @@ -127,5 +137,9 @@ public void close() { if(httpClientConnectionManager != null) { httpClientConnectionManager.close(); } + if (cleanExecutor == true && executorService != null) { + executorService.shutdown(); + executorService = null; + } } } diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClientConfig.java b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClientConfig.java index 56f0effa4..1365a064c 100644 --- a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClientConfig.java +++ b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClientConfig.java @@ -1,5 +1,7 @@ package software.amazon.awssdk.iot.discovery; +import java.util.concurrent.ExecutorService; + import software.amazon.awssdk.crt.http.HttpProxyOptions; import software.amazon.awssdk.crt.io.ClientBootstrap; import software.amazon.awssdk.crt.io.SocketOptions; @@ -17,6 +19,7 @@ public class DiscoveryClientConfig implements AutoCloseable { final private int maxConnections; final private HttpProxyOptions proxyOptions; final private String ggServerName; + private ExecutorService discoveryExecutor; /** * Constructor for DiscoveryClientConfig creates the correct endpoint if not in a special region @@ -44,6 +47,7 @@ public DiscoveryClientConfig( this.maxConnections = maxConnections; this.proxyOptions = proxyOptions; this.ggServerName = ""; + this.discoveryExecutor = null; } /** @@ -70,7 +74,7 @@ public DiscoveryClientConfig( /** * Default Constructor for DiscoveryClientConfig that allows the specification of a specific ggServerName to use in special regions - * + * * @param bootstrap client bootstrap to use to establish network connections * @param tlsContextOptions tls configuration for client network connections. For greengrass discovery, the * tls context must be initialized with the certificate and private key of the @@ -94,6 +98,7 @@ public DiscoveryClientConfig( this.maxConnections = maxConnections; this.proxyOptions = proxyOptions; this.ggServerName = ggServerName; + this.discoveryExecutor = null; } /** @@ -164,6 +169,27 @@ public String getGGServerName() { return ggServerName; } + /** + * @return the executor set for this discover client, if one is set. + */ + public ExecutorService getDiscoveryExecutor() { + return this.discoveryExecutor; + } + + /** + * Sets the executor that is used when calling discover(). + * If set using this function, you are expected to close/shutdown the executor when finished with it. + * + * If it is not set, the discovery client will internally manage its own executor. + * + * @param override The executor to use + * @return The client config + */ + public DiscoveryClientConfig setDiscoveryExecutor(ExecutorService override) { + this.discoveryExecutor = override; + return this; + } + @Override public void close() { if(tlsContext != null) { From df858fb7e4300a9d6497d625c0c3ebbbf3379d1f Mon Sep 17 00:00:00 2001 From: Steve Kim Date: Wed, 14 Jun 2023 10:57:55 -0700 Subject: [PATCH 4/4] comment edits and added shutdownNow for the executorService during close after a 30 sec wait --- .../awssdk/iot/discovery/DiscoveryClient.java | 16 ++++++++++++++-- .../iot/discovery/DiscoveryClientConfig.java | 2 +- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java index 73effaeca..d5af3f6c1 100644 --- a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java +++ b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClient.java @@ -15,6 +15,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * Class for performing network-based discovery of the connectivity properties of registered greengrass cores @@ -137,9 +138,20 @@ public void close() { if(httpClientConnectionManager != null) { httpClientConnectionManager.close(); } - if (cleanExecutor == true && executorService != null) { + if (cleanExecutor == true) { executorService.shutdown(); - executorService = null; + try{ + // Give the executorService 30 seconds to finish existing tasks. If it takes longer, force it to shutdown + if(!executorService.awaitTermination(30,TimeUnit.SECONDS)){ + executorService.shutdownNow(); + } + } catch (InterruptedException ie){ + // If current thread is interrupted, force executorService shutdown + executorService.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } } + executorService = null; } } diff --git a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClientConfig.java b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClientConfig.java index 1365a064c..6c05b5f21 100644 --- a/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClientConfig.java +++ b/sdk/src/main/java/software/amazon/awssdk/iot/discovery/DiscoveryClientConfig.java @@ -170,7 +170,7 @@ public String getGGServerName() { } /** - * @return the executor set for this discover client, if one is set. + * @return the ExecutorService set for this discover client config. Returns null if one hasn't been set. */ public ExecutorService getDiscoveryExecutor() { return this.discoveryExecutor;