From 3127649d5137165a3ca71fd882ef007f04f52bd1 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Mon, 5 Dec 2022 13:08:57 -0500 Subject: [PATCH] Add thread pool Signed-off-by: HenrikJannsen --- .../seednode/SeedNodeReportingService.java | 37 +++++++++++++++---- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java b/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java index 46f471f7554..7107f535744 100644 --- a/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java +++ b/seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java @@ -61,6 +61,7 @@ import java.util.LinkedList; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -72,7 +73,7 @@ @Singleton public class SeedNodeReportingService { private final static long REPORT_DELAY_SEC = TimeUnit.MINUTES.toSeconds(5); - private final static long HEART_BEAT_DELAY_SEC = TimeUnit.SECONDS.toSeconds(10); + private final static long HEART_BEAT_DELAY_SEC = TimeUnit.SECONDS.toSeconds(60); private final P2PService p2PService; private final NetworkNode networkNode; @@ -90,6 +91,7 @@ public class SeedNodeReportingService { private Timer reportTimer; private Timer heartBeatTimer; private boolean initialDataSent; + private final ThreadPoolExecutor executor; @Inject public SeedNodeReportingService(P2PService p2PService, @@ -114,6 +116,7 @@ public SeedNodeReportingService(P2PService p2PService, this.maxConnections = maxConnections; this.seedNodeReportingServerUrl = seedNodeReportingServerUrl; + executor = Utilities.getThreadPoolExecutor("SeedNodeReportingService", 2, 4, 30); httpClient = HttpClient.newHttpClient(); daoStateMonitoringService.addListener(new DaoStateMonitoringService.Listener() { @Override @@ -146,10 +149,17 @@ public void shutDown() { if (reportTimer != null) { reportTimer.stop(); } + + Utilities.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS); } private void sendHeartBeat() { + // Without the address we cannot report + if (p2PService.getAddress() == null) { + return; + } ReportingItems reportingItems = new ReportingItems(); + reportingItems.add(StringValueItem.address.withValue(p2PService.getAddress().getFullAddress())); reportingItems.add(IntegerValueItem.usedMemoryInMB.withValue((int) Profiler.getUsedMemoryInMB())); sendReportingItems(reportingItems); } @@ -159,8 +169,10 @@ private void sendReport() { } private void sendReportingItems(ReportingItems reportingItems) { + log.error("sendReportingItems {}", reportingItems.size()); CompletableFuture.runAsync(() -> { - log.info("Send reportingItems to monitor: {}", reportingItems); + log.error("sendReportingItems START {}", reportingItems.size()); + // log.info("Send reportingItems to monitor: {}", reportingItems.size()); // We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system. String protobufAsHex = Hex.encode(reportingItems.toProtoMessageAsBytes()); @@ -170,19 +182,29 @@ private void sendReportingItems(ReportingItems reportingItems) { .POST(HttpRequest.BodyPublishers.ofString(protobufAsHex)) .header("User-Agent", "bisq-seed-node/" + Version.VERSION) .build(); + + Thread.sleep(5000); HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); if (response.statusCode() != 200) { log.error("Response error message: {}", response); } } catch (IOException e) { - log.error("IOException at sending reporting", e); + log.warn("IOException at sending reporting. {}", e.getMessage()); } catch (InterruptedException e) { throw new RuntimeException(e); + } finally { + log.error("sendReportingItems COMP {}", reportingItems.size()); + } - }, Utilities.getSingleThreadExecutor("SeedNodeReportingService")); + }, executor); } private ReportingItems getReportingItems() { + // Without the address we cannot report + if (p2PService.getAddress() == null) { + return new ReportingItems(); + } + // Data Map numItemsByType = new HashMap<>(); Stream.concat(p2PDataStorage.getPersistableNetworkPayloadCollection().stream() @@ -239,13 +261,12 @@ private ReportingItems getReportingItems() { if (!initialDataSent) { initialDataSent = true; reportingItems.add(IntegerValueItem.maxConnections.withValue(maxConnections)); - - if (p2PService.getAddress() != null) { - reportingItems.add(StringValueItem.address.withValue(p2PService.getAddress().getFullAddress())); - } reportingItems.add(StringValueItem.version.withValue(Version.VERSION)); Version.findCommitHash().ifPresent(commitHash -> reportingItems.add(StringValueItem.commitHash.withValue(commitHash))); } + + // We send the address always as it's needed for assigning the data on the server + reportingItems.add(StringValueItem.address.withValue(p2PService.getAddress().getFullAddress())); return reportingItems; } }