Skip to content

Commit

Permalink
Add thread pool
Browse files Browse the repository at this point in the history
Signed-off-by: HenrikJannsen <[email protected]>
  • Loading branch information
HenrikJannsen committed Dec 5, 2022
1 parent fa204bb commit 3127649
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions seednode/src/main/java/bisq/seednode/SeedNodeReportingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

Expand All @@ -72,7 +73,7 @@
@Singleton
public class SeedNodeReportingService {
private final static long REPORT_DELAY_SEC = TimeUnit.MINUTES.toSeconds(5);
private final static long HEART_BEAT_DELAY_SEC = TimeUnit.SECONDS.toSeconds(10);
private final static long HEART_BEAT_DELAY_SEC = TimeUnit.SECONDS.toSeconds(60);

private final P2PService p2PService;
private final NetworkNode networkNode;
Expand All @@ -90,6 +91,7 @@ public class SeedNodeReportingService {
private Timer reportTimer;
private Timer heartBeatTimer;
private boolean initialDataSent;
private final ThreadPoolExecutor executor;

@Inject
public SeedNodeReportingService(P2PService p2PService,
Expand All @@ -114,6 +116,7 @@ public SeedNodeReportingService(P2PService p2PService,
this.maxConnections = maxConnections;
this.seedNodeReportingServerUrl = seedNodeReportingServerUrl;

executor = Utilities.getThreadPoolExecutor("SeedNodeReportingService", 2, 4, 30);
httpClient = HttpClient.newHttpClient();
daoStateMonitoringService.addListener(new DaoStateMonitoringService.Listener() {
@Override
Expand Down Expand Up @@ -146,10 +149,17 @@ public void shutDown() {
if (reportTimer != null) {
reportTimer.stop();
}

Utilities.shutdownAndAwaitTermination(executor, 1, TimeUnit.SECONDS);
}

private void sendHeartBeat() {
// Without the address we cannot report
if (p2PService.getAddress() == null) {
return;
}
ReportingItems reportingItems = new ReportingItems();
reportingItems.add(StringValueItem.address.withValue(p2PService.getAddress().getFullAddress()));
reportingItems.add(IntegerValueItem.usedMemoryInMB.withValue((int) Profiler.getUsedMemoryInMB()));
sendReportingItems(reportingItems);
}
Expand All @@ -159,8 +169,10 @@ private void sendReport() {
}

private void sendReportingItems(ReportingItems reportingItems) {
log.error("sendReportingItems {}", reportingItems.size());
CompletableFuture.runAsync(() -> {
log.info("Send reportingItems to monitor: {}", reportingItems);
log.error("sendReportingItems START {}", reportingItems.size());
// log.info("Send reportingItems to monitor: {}", reportingItems.size());

// We send the data as hex encoded protobuf data. We do not use the envelope as it is not part of the p2p system.
String protobufAsHex = Hex.encode(reportingItems.toProtoMessageAsBytes());
Expand All @@ -170,19 +182,29 @@ private void sendReportingItems(ReportingItems reportingItems) {
.POST(HttpRequest.BodyPublishers.ofString(protobufAsHex))
.header("User-Agent", "bisq-seed-node/" + Version.VERSION)
.build();

Thread.sleep(5000);
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
log.error("Response error message: {}", response);
}
} catch (IOException e) {
log.error("IOException at sending reporting", e);
log.warn("IOException at sending reporting. {}", e.getMessage());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
log.error("sendReportingItems COMP {}", reportingItems.size());

}
}, Utilities.getSingleThreadExecutor("SeedNodeReportingService"));
}, executor);
}

private ReportingItems getReportingItems() {
// Without the address we cannot report
if (p2PService.getAddress() == null) {
return new ReportingItems();
}

// Data
Map<String, Integer> numItemsByType = new HashMap<>();
Stream.concat(p2PDataStorage.getPersistableNetworkPayloadCollection().stream()
Expand Down Expand Up @@ -239,13 +261,12 @@ private ReportingItems getReportingItems() {
if (!initialDataSent) {
initialDataSent = true;
reportingItems.add(IntegerValueItem.maxConnections.withValue(maxConnections));

if (p2PService.getAddress() != null) {
reportingItems.add(StringValueItem.address.withValue(p2PService.getAddress().getFullAddress()));
}
reportingItems.add(StringValueItem.version.withValue(Version.VERSION));
Version.findCommitHash().ifPresent(commitHash -> reportingItems.add(StringValueItem.commitHash.withValue(commitHash)));
}

// We send the address always as it's needed for assigning the data on the server
reportingItems.add(StringValueItem.address.withValue(p2PService.getAddress().getFullAddress()));
return reportingItems;
}
}

0 comments on commit 3127649

Please sign in to comment.