diff --git a/.gitignore b/.gitignore index 41e974a..3db7f57 100644 --- a/.gitignore +++ b/.gitignore @@ -23,7 +23,6 @@ monitor/work/* /bundles /bisq-* /lib -desktop.ini */target/* *.class deploy @@ -33,3 +32,5 @@ deploy /.run monitor.properties +/bisq-gradle/out/* + diff --git a/build.gradle b/build.gradle index def2aec..edfcba7 100644 --- a/build.gradle +++ b/build.gradle @@ -2,6 +2,7 @@ plugins { id 'application' alias(libs.plugins.openjfx) id 'bisq.post-build' + id 'com.github.johnrengelman.shadow' version '7.1.2' } repositories { @@ -53,6 +54,7 @@ dependencies { annotationProcessor libs.lombok implementation libs.slf4j.api + implementation "com.google.protobuf:protobuf-java:3.19.1" implementation libs.spark.core implementation "com.google.code.gson:gson:2.8.5" implementation("com.github.bisq-network:bitcoinj:42bbae9") { diff --git a/src/main/java/bisq/monitor/AppChooserMain.java b/src/main/java/bisq/monitor/AppChooserMain.java index 0c655dc..9d33a06 100644 --- a/src/main/java/bisq/monitor/AppChooserMain.java +++ b/src/main/java/bisq/monitor/AppChooserMain.java @@ -21,6 +21,7 @@ import bisq.monitor.dump.DataDumpMain; import bisq.monitor.monitor.MonitorMain; import bisq.monitor.server.ServerMain; +import bisq.monitor.utils.PropertiesUtil; import lombok.extern.slf4j.Slf4j; import java.util.List; diff --git a/src/main/java/bisq/monitor/dump/handlers/TradeStatisticsHandler.java b/src/main/java/bisq/monitor/dump/handlers/TradeStatisticsHandler.java index 8e98fdd..37533d3 100644 --- a/src/main/java/bisq/monitor/dump/handlers/TradeStatisticsHandler.java +++ b/src/main/java/bisq/monitor/dump/handlers/TradeStatisticsHandler.java @@ -24,7 +24,7 @@ import bisq.core.util.FormattingUtils; import bisq.core.util.VolumeUtil; import bisq.monitor.dump.ReporterProvider; -import bisq.monitor.reporter.Metric; +import bisq.monitor.reporter.Metrics; import bisq.monitor.reporter.Reporter; import bisq.network.p2p.storage.P2PDataStorage; import javafx.collections.SetChangeListener; @@ -68,35 +68,35 @@ public void onAllServicesInitialized() { tradeStatisticsManager.getObservableTradeStatisticsSet().addListener((SetChangeListener) change -> { TradeStatistics3 newItem = change.getElementAdded(); if (isNotProcessed(newItem)) { - Set reportItems = toMetrics(newItem); + Set reportItems = toMetrics(newItem); sendReports(reportItems); } }); } - private Set toMetrics(TradeStatistics3 tradeStatistics) { + private Set toMetrics(TradeStatistics3 tradeStatistics) { alreadyProcessed.add(new P2PDataStorage.ByteArray(tradeStatistics.getHash())); - Set metrics = new HashSet<>(); + Set metrics = new HashSet<>(); long timeStampInSec = tradeStatistics.getDateAsLong() / 1000; String market = CurrencyUtil.getCurrencyPair(tradeStatistics.getCurrency()).replace("/", "_"); String path = PREFIX + "." + "price" + "." + market; String value = FormattingUtils.formatPrice(tradeStatistics.getTradePrice()); - metrics.add(new Metric(path, value, timeStampInSec)); + metrics.add(new Metrics(path, value, timeStampInSec)); path = PREFIX + "." + "amount" + "." + market; value = new MainNetParams().getMonetaryFormat().noCode().format(tradeStatistics.getTradeAmount()).toString(); - metrics.add(new Metric(path, value, timeStampInSec)); + metrics.add(new Metrics(path, value, timeStampInSec)); path = PREFIX + "." + "volume" + "." + market; value = VolumeUtil.formatVolume(tradeStatistics.getTradeVolume()); - metrics.add(new Metric(path, value, timeStampInSec)); + metrics.add(new Metrics(path, value, timeStampInSec)); return metrics; } - private void sendReports(Set reportItems) { + private void sendReports(Set reportItems) { log.error(reportItems.toString()); reportItems.forEach(reporter::report); } diff --git a/src/main/java/bisq/monitor/monitor/CompletableFutureUtil.java b/src/main/java/bisq/monitor/monitor/CompletableFutureUtil.java new file mode 100644 index 0000000..1e7a840 --- /dev/null +++ b/src/main/java/bisq/monitor/monitor/CompletableFutureUtil.java @@ -0,0 +1,57 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.monitor.monitor; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +//todo +public class CompletableFutureUtil { + public static CompletableFuture> allOf(Collection> collection) { + //noinspection unchecked + return allOf(collection.toArray(new CompletableFuture[0])); + } + + public static CompletableFuture> allOf(Stream> stream) { + return allOf(stream.collect(Collectors.toList())); + } + + public static CompletableFuture> allOf(CompletableFuture... list) { + CompletableFuture> result = CompletableFuture.allOf(list).thenApply(v -> + Stream.of(list) + .map(future -> { + // We want to return the results in list, not the futures. Once allOf call is complete + // we know that all futures have completed (normally, exceptional or cancelled). + // For exceptional and canceled cases we throw an exception. + T res = future.join(); + if (future.isCompletedExceptionally()) { + throw new RuntimeException((future.handle((r, throwable) -> throwable).join())); + } + if (future.isCancelled()) { + throw new RuntimeException("Future got canceled"); + } + return res; + }) + .collect(Collectors.toList()) + ); + return result; + } +} \ No newline at end of file diff --git a/src/main/java/bisq/monitor/monitor/Monitor.java b/src/main/java/bisq/monitor/monitor/Monitor.java index eec86d9..949e3e4 100644 --- a/src/main/java/bisq/monitor/monitor/Monitor.java +++ b/src/main/java/bisq/monitor/monitor/Monitor.java @@ -17,40 +17,37 @@ package bisq.monitor.monitor; -import bisq.monitor.monitor.tasks.tornetwork.TorStartupTime; +import bisq.monitor.monitor.tasks.TorConnectionTime; import bisq.monitor.reporter.Reporter; import lombok.extern.slf4j.Slf4j; -import org.berndpruenster.netlayer.tor.Tor; import java.io.File; -import java.util.ArrayList; -import java.util.List; import java.util.Properties; -import java.util.concurrent.CompletableFuture; -/** - * Monitor executable for the Bisq network. - * - * @author Florian Reimair - */ @Slf4j public class Monitor { + private final MonitorTaskRunner monitorTaskRunner; - public Monitor(File appDir, Properties properties, Reporter reporter) { - List monitorTasks = new ArrayList<>(); - monitorTasks.add(new TorStartupTime(properties, appDir, reporter)); - monitorTasks.forEach(MonitorTask::init); - } + public Monitor(Properties properties, Reporter reporter, File appDir) { + monitorTaskRunner = new MonitorTaskRunner(); - public CompletableFuture shutDown() { - return CompletableFuture.runAsync(() -> { - MonitorTask.haltAllMetrics(); + monitorTaskRunner.add(new TorConnectionTime(properties, reporter, appDir, false)); + monitorTaskRunner.add(new TorConnectionTime(properties, reporter, appDir, true)); + + /* monitorTaskRunner.add(new TorStartupTime(properties, reporter, appDir)); + monitorTaskRunner.add(new TorHiddenServiceStartupTime(properties, reporter)); + monitorTaskRunner.add(new PriceNodeData(properties, reporter)); + monitorTaskRunner.add(new TorConnectionTime(properties, reporter, appDir, false)); + monitorTaskRunner.add(new SeedNodeRoundTripTime(properties, reporter, false)); + + // We use restart tor to get for the next SeedNodeRoundTripTime again new connections with a new tor instance + monitorTaskRunner.add(new TorConnectionTime(properties, reporter, appDir, true)); + monitorTaskRunner.add(new SeedNodeRoundTripTime(properties, reporter, true));*/ + + monitorTaskRunner.start(); + } - log.info("shutting down tor..."); - Tor tor = Tor.getDefault(); - if (tor != null) { - tor.shutdown(); - } - }); + public void shutDown() { + monitorTaskRunner.shutDown(); } } diff --git a/src/main/java/bisq/monitor/monitor/utils/MonitorHttpClient.java b/src/main/java/bisq/monitor/monitor/MonitorHttpClient.java similarity index 99% rename from src/main/java/bisq/monitor/monitor/utils/MonitorHttpClient.java rename to src/main/java/bisq/monitor/monitor/MonitorHttpClient.java index 70cbbe9..ff45b47 100644 --- a/src/main/java/bisq/monitor/monitor/utils/MonitorHttpClient.java +++ b/src/main/java/bisq/monitor/monitor/MonitorHttpClient.java @@ -15,7 +15,7 @@ * along with Bisq. If not, see . */ -package bisq.monitor.monitor.utils; +package bisq.monitor.monitor; import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; import com.runjva.sourceforge.jsocks.protocol.SocksException; diff --git a/src/main/java/bisq/monitor/monitor/MonitorMain.java b/src/main/java/bisq/monitor/monitor/MonitorMain.java index 9ccba6b..79cd22b 100644 --- a/src/main/java/bisq/monitor/monitor/MonitorMain.java +++ b/src/main/java/bisq/monitor/monitor/MonitorMain.java @@ -20,16 +20,16 @@ import bisq.common.UserThread; import bisq.common.app.Log; +import bisq.common.app.Version; +import bisq.common.config.BaseCurrencyNetwork; import bisq.common.config.Config; import bisq.common.util.Utilities; +import bisq.core.locale.Res; import bisq.core.setup.CoreNetworkCapabilities; -import bisq.monitor.PropertiesUtil; -import bisq.monitor.monitor.tor.AvailableTor; -import bisq.monitor.monitor.tor.TorNode; import bisq.monitor.reporter.ConsoleReporter; import bisq.monitor.reporter.GraphiteReporter; import bisq.monitor.reporter.Reporter; -import bisq.monitor.server.Server; +import bisq.monitor.utils.PropertiesUtil; import ch.qos.logback.classic.Level; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; @@ -46,13 +46,10 @@ @Slf4j public class MonitorMain { private static boolean stopped; - private static Monitor monitor; - @Nullable - private static Server server; @Nullable private static TorNode torNode; - @Nullable - private static TorBasedMonitor torBasedMonitor; + private static Monitor monitor; + private static File appDir; /** * @param args Can be empty or is property file path @@ -65,75 +62,69 @@ public static void main(String[] args) { properties = PropertiesUtil.getProperties(args[0].replace("--config=", "")); } - String appName = properties.getProperty("appDir"); - File appDir = new File(Utilities.getUserDataDir(), appName); - if (!appDir.exists() && !appDir.mkdir()) { - log.warn("make appDir failed"); - } - setup(appDir); - CoreNetworkCapabilities.setSupportedCapabilities(new Config()); + setup(properties); Reporter reporter = "true".equals(properties.getProperty("GraphiteReporter.enabled", "false")) ? new GraphiteReporter(properties) : new ConsoleReporter(); - boolean useTor = properties.getProperty("useTor").equals("true"); - boolean useServer = properties.getProperty("useServer").equals("true"); + boolean useLocalhost = "true".equals(properties.getProperty("useLocalhost")); + boolean useTor = "true".equals(properties.getProperty("useTor")); CompletableFuture.runAsync(() -> { - monitor = new Monitor(appDir, properties, reporter); - }, Utilities.getSingleThreadExecutor("Monitor")) - .thenRunAsync(() -> { - if (useServer) { - server = new Server(properties, reporter); - } - }, Utilities.getSingleThreadExecutor("Server")) - .thenRunAsync(() -> { - if (useTor) { - AvailableTor.setAppDir(appDir); - torNode = new TorNode(appDir); - } - }, Utilities.getSingleThreadExecutor("TorNode")) - .thenRunAsync(() -> { - if (useTor) { - torBasedMonitor = new TorBasedMonitor(properties, reporter); - } - }, Utilities.getSingleThreadExecutor("TorBasedMonitor")); + if (useTor && !useLocalhost) { + long ts = System.currentTimeMillis(); + torNode = new TorNode(appDir); + log.info("Starting tor took {} ms", System.currentTimeMillis() - ts); + } + monitor = new Monitor(properties, reporter, appDir); + }, Utilities.getSingleThreadExecutor("Monitor")); keepRunning(); } - public static void setup(File appDir) { + public static void setup(Properties properties) { + String appName = properties.getProperty("appDir"); + appDir = new File(Utilities.getUserDataDir(), appName); + if (!appDir.exists() && !appDir.mkdir()) { + log.warn("make appDir failed"); + } + String logPath = Paths.get(appDir.getPath(), "bisq").toString(); Log.setup(logPath); Log.setLevel(Level.INFO); + Config config = new Config(); + CoreNetworkCapabilities.setSupportedCapabilities(config); + BaseCurrencyNetwork baseCurrencyNetwork = BaseCurrencyNetwork.valueOf(properties.getProperty("baseCurrencyNetwork", "BTC_REGTEST")); + Version.setBaseCryptoNetworkId(baseCurrencyNetwork.ordinal()); + Res.setup(); + ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat(MonitorMain.class.getSimpleName()) .setDaemon(true) .build(); UserThread.setExecutor(Executors.newSingleThreadExecutor(threadFactory)); - Signal.handle(new Signal("INT"), signal -> UserThread.execute(MonitorMain::shutDown)); - Signal.handle(new Signal("TERM"), signal -> UserThread.execute(MonitorMain::shutDown)); + Signal.handle(new Signal("INT"), signal -> { + UserThread.execute(MonitorMain::shutDown); + }); + + Signal.handle(new Signal("TERM"), signal -> { + UserThread.execute(MonitorMain::shutDown); + }); - Runtime.getRuntime().addShutdownHook(new Thread(MonitorMain::shutDown, "Shutdown Hook")); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + UserThread.execute(MonitorMain::shutDown); + }, "Shutdown Hook")); } public static void shutDown() { stopped = true; - if (monitor == null) { - System.exit(0); - return; - } - - monitor.shutDown() + CompletableFuture.runAsync(() -> { + }) .thenRun(() -> { - if (server != null) { - server.shutDown(); - } - }).thenRun(() -> { - if (torBasedMonitor != null) { - torBasedMonitor.shutDown(); + if (monitor != null) { + monitor.shutDown(); } }) .thenRun(() -> { @@ -141,14 +132,18 @@ public static void shutDown() { torNode.shutDown(); } }) - .thenRun(() -> System.exit(0)); + .thenRun(() -> { + System.exit(0); + }); } public static void keepRunning() { while (!stopped) { try { Thread.sleep(Long.MAX_VALUE); + log.error("saf"); } catch (InterruptedException ignore) { + log.error(""); } } } diff --git a/src/main/java/bisq/monitor/monitor/MonitorTask.java b/src/main/java/bisq/monitor/monitor/MonitorTask.java index 6b63e83..f84704f 100644 --- a/src/main/java/bisq/monitor/monitor/MonitorTask.java +++ b/src/main/java/bisq/monitor/monitor/MonitorTask.java @@ -17,108 +17,78 @@ package bisq.monitor.monitor; -import bisq.common.app.Version; -import bisq.common.config.BaseCurrencyNetwork; -import bisq.common.util.Utilities; -import bisq.core.locale.Res; -import bisq.monitor.monitor.utils.Configurable; import bisq.monitor.reporter.Reporter; +import bisq.network.p2p.NodeAddress; import lombok.extern.slf4j.Slf4j; +import org.berndpruenster.netlayer.tor.TorSocket; +import java.io.IOException; +import java.net.Socket; import java.util.Properties; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; @Slf4j -public abstract class MonitorTask extends Configurable implements Runnable { - private static final String INTERVAL = "interval"; - private static ScheduledExecutorService executor; +public abstract class MonitorTask { + protected final Properties properties; protected final Reporter reporter; - private ScheduledFuture scheduler; - - protected MonitorTask(Properties properties, Reporter reporter) { - super.configure(properties); + protected final boolean runSerial; + protected final long interval; + private final boolean enabled; + protected long lastRunTs; + public MonitorTask(Properties properties, Reporter reporter, boolean runSerial) { + this.properties = properties; this.reporter = reporter; - reporter.configure(properties); - if (executor == null) { - executor = new ScheduledThreadPoolExecutor(6); + String className = getClass().getSimpleName(); + interval = Integer.parseInt(properties.getProperty(className + ".interval", "600")) * 1000L; + String runSerialFromProperties = properties.getProperty(className + ".runSerial", ""); + if ("".equals(runSerialFromProperties)) { + this.runSerial = runSerial; + } else { + this.runSerial = "true".equals(runSerialFromProperties); } - BaseCurrencyNetwork baseCurrencyNetwork = BaseCurrencyNetwork.valueOf(properties.getProperty("baseCurrencyNetwork", "BTC_REGTEST")); - Version.setBaseCryptoNetworkId(baseCurrencyNetwork.ordinal()); - Res.setup(); - } - - public void init() { - // decide whether to enable or disable the task - boolean isEnabled = configuration.getProperty("enabled", "false").equals("true"); - if (configuration.isEmpty() || !isEnabled || !configuration.containsKey(INTERVAL)) { - stop(); - - // some informative log output - if (configuration.isEmpty()) - log.error("{} is not configured at all. Will not run.", getName()); - else if (!isEnabled) - log.debug("{} is deactivated. Will not run.", getName()); - else if (!configuration.containsKey(INTERVAL)) - log.error("{} is missing mandatory '" + INTERVAL + "' property. Will not run.", getName()); - else - log.error("{} is mis-configured. Will not run.", getName()); - } else if (!started()) { - // check if this Metric got activated after being disabled. - // if so, resume execution - start(); - log.info("{} started", getName()); - } + enabled = "true".equals(properties.getProperty(getClass().getSimpleName() + ".enabled", "false")); } - private void stop() { - if (scheduler != null) - scheduler.cancel(false); + protected String getName() { + String postFix = runSerial ? ".serial" : ".parallel"; + return getClass().getSimpleName() + postFix; } - private void start() { - scheduler = executor.scheduleWithFixedDelay(this, 0, - Long.parseLong(configuration.getProperty(INTERVAL)), TimeUnit.SECONDS); - } + public abstract void run(); - boolean started() { - if (scheduler != null) - return !scheduler.isCancelled(); - else + public boolean canRun() { + if (!enabled) { return false; + } + if (lastRunTs > System.currentTimeMillis() - interval) { + log.info("Skip {} because we have not passed our interval time", getName()); + return false; + } + + lastRunTs = System.currentTimeMillis(); + log.info("Run task '{}'", getName()); + return true; } - @Override - public void run() { - try { - Thread.currentThread().setName("MonitorTask: " + getName()); + protected String getAddressForMetric(NodeAddress nodeAddress) { + return nodeAddress.getHostName().contains(".onion") ? + nodeAddress.getHostNameWithoutPostFix() : + nodeAddress.getFullAddress() + .replace("http://", "") + .replace("https://", ""); + } - // execute all the things - synchronized (this) { - log.info("{} started", getName()); - execute(); - log.info("{} done", getName()); - } - } catch (Throwable e) { - log.error("Error at executing monitor task", e); + protected Socket getSocket(NodeAddress nodeAddress) throws IOException { + String hostName = nodeAddress.getHostName(); + if (hostName.contains(".onion")) { + return new TorSocket(hostName, nodeAddress.getPort(), null); + } else { + return new Socket(hostName, nodeAddress.getPort()); } } - /** - * Gets scheduled repeatedly. - */ - protected abstract void execute(); - - /** - * initiate an orderly shutdown on all metrics. Blocks until all metrics are - * shut down or after one minute. - */ - public static void haltAllMetrics() { - Utilities.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS); + public void shutDown() { } -} +} \ No newline at end of file diff --git a/src/main/java/bisq/monitor/monitor/MonitorTaskRunner.java b/src/main/java/bisq/monitor/monitor/MonitorTaskRunner.java new file mode 100644 index 0000000..4c26916 --- /dev/null +++ b/src/main/java/bisq/monitor/monitor/MonitorTaskRunner.java @@ -0,0 +1,85 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.monitor.monitor; + +import bisq.common.util.Utilities; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * We execute all tasks serial. Once all tasks are completed we check if we have passed the min. interval time and if + * so we repeat running all tasks. It can be that tasks have their own interval time not passed and skip execution. + * If we are too fast we put the thread on sleep for the interval time to not execute the while loop too often. + * There is no guarantee for tasks that their interval gets met if the sum of all other tasks take longer. + * But there is a guarantee that the task will not execute faster as the defined interval. + * The reason why we do not want to have parallel execution here is that tasks could interfere each other's results + * if Tor network get more load from parallel tasks. We prefer to have the tasks in isolated conditions to get more + * reliable results. + */ +@Slf4j +public class MonitorTaskRunner { + private final List monitorTasks = new ArrayList<>(); + private final long interval = 5000; + private long lastRunTs; + private volatile boolean stopped; + + public MonitorTaskRunner() { + } + + public void add(MonitorTask monitorTask) { + monitorTasks.add(monitorTask); + } + + public void start() { + CompletableFuture.runAsync(() -> { + while (!stopped) { + if (lastRunTs > System.currentTimeMillis() - interval) { + log.info("We iterate the loop too fast. We put the thread on sleep and try again to see if any " + + "task is ready to run again"); + try { + Thread.sleep(interval); + } catch (InterruptedException ignore) { + } + } + + lastRunTs = System.currentTimeMillis(); + log.info("Start to run all tasks"); + // Each task run call is blocking until completed. + // Once a task is completed we are ready for the next task + monitorTasks.forEach(task -> { + try { + if (task.canRun()) { + task.run(); + } + } catch (Throwable t) { + log.error("Error at task {}. Error: {}", task.getClass().getSimpleName(), t.getMessage()); + } + }); + log.info("All tasks have been completed"); + } + }, Utilities.getSingleThreadExecutor("MonitorTaskRunner-loop")); + } + + public void shutDown() { + stopped = true; + monitorTasks.forEach(MonitorTask::shutDown); + } +} \ No newline at end of file diff --git a/src/main/java/bisq/monitor/monitor/TorBasedMonitor.java b/src/main/java/bisq/monitor/monitor/TorBasedMonitor.java deleted file mode 100644 index cbf7568..0000000 --- a/src/main/java/bisq/monitor/monitor/TorBasedMonitor.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.monitor.monitor; - -import bisq.monitor.monitor.tasks.bisqnetwork.P2PMarketStats; -import bisq.monitor.monitor.tasks.bisqnetwork.P2PNetworkLoad; -import bisq.monitor.monitor.tasks.pricenode.PriceNodeData; -import bisq.monitor.monitor.tasks.seed.SeedNodeRoundTripTime; -import bisq.monitor.monitor.tasks.tornetwork.TorConnectionTime; -import bisq.monitor.monitor.tasks.tornetwork.TorHiddenServiceStartupTime; -import bisq.monitor.reporter.Reporter; -import bisq.monitor.server.Util; -import lombok.extern.slf4j.Slf4j; -import org.berndpruenster.netlayer.tor.Tor; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -@Slf4j -public class TorBasedMonitor { - - public TorBasedMonitor(Properties properties, Reporter reporter) { - Map seedNodeOperatorByAddress = Util.getOperatorByNodeAddress(properties.getProperty("baseCurrencyNetwork")); - List monitorTasks = new ArrayList<>(); - monitorTasks.add(new TorConnectionTime(properties, reporter)); - monitorTasks.add(new TorHiddenServiceStartupTime(properties, reporter)); - monitorTasks.add(new PriceNodeData(properties, reporter)); - monitorTasks.add(new SeedNodeRoundTripTime(properties, reporter, seedNodeOperatorByAddress)); - monitorTasks.add(new P2PNetworkLoad(properties, reporter)); - monitorTasks.add(new P2PMarketStats(properties, reporter, seedNodeOperatorByAddress)); - monitorTasks.forEach(MonitorTask::init); - } - - public void shutDown() { - MonitorTask.haltAllMetrics(); - - log.info("shutting down tor..."); - Tor tor = Tor.getDefault(); - if (tor != null) { - tor.shutdown(); - } - } -} diff --git a/src/main/java/bisq/monitor/monitor/tor/TorNode.java b/src/main/java/bisq/monitor/monitor/TorNode.java similarity index 79% rename from src/main/java/bisq/monitor/monitor/tor/TorNode.java rename to src/main/java/bisq/monitor/monitor/TorNode.java index 5825b38..ab74c41 100644 --- a/src/main/java/bisq/monitor/monitor/tor/TorNode.java +++ b/src/main/java/bisq/monitor/monitor/TorNode.java @@ -15,7 +15,7 @@ * along with Bisq. If not, see . */ -package bisq.monitor.monitor.tor; +package bisq.monitor.monitor; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -24,6 +24,7 @@ import org.berndpruenster.netlayer.tor.TorCtlException; import java.io.File; +import java.util.concurrent.CompletableFuture; @Slf4j public class TorNode { @@ -41,10 +42,13 @@ public TorNode(File appDir) { } } - public void shutDown() { - Tor tor = Tor.getDefault(); - if (tor != null) { - tor.shutdown(); - } + public CompletableFuture shutDown() { + return CompletableFuture.runAsync(() -> { + log.info("shutting down tor..."); + Tor tor = Tor.getDefault(); + if (tor != null) { + tor.shutdown(); + } + }); } } \ No newline at end of file diff --git a/src/main/java/bisq/monitor/monitor/tasks/pricenode/PriceNodeData.java b/src/main/java/bisq/monitor/monitor/tasks/PriceNodeData.java similarity index 62% rename from src/main/java/bisq/monitor/monitor/tasks/pricenode/PriceNodeData.java rename to src/main/java/bisq/monitor/monitor/tasks/PriceNodeData.java index 339afe1..64dcc27 100644 --- a/src/main/java/bisq/monitor/monitor/tasks/pricenode/PriceNodeData.java +++ b/src/main/java/bisq/monitor/monitor/tasks/PriceNodeData.java @@ -15,15 +15,13 @@ * along with bisq. If not, see . */ -package bisq.monitor.monitor.tasks.pricenode; +package bisq.monitor.monitor.tasks; import bisq.core.provider.ProvidersRepository; +import bisq.monitor.monitor.MonitorHttpClient; import bisq.monitor.monitor.MonitorTask; -import bisq.monitor.monitor.tor.OnionParser; -import bisq.monitor.monitor.utils.MonitorHttpClient; -import bisq.monitor.reporter.Metric; +import bisq.monitor.reporter.Metrics; import bisq.monitor.reporter.Reporter; -import bisq.network.p2p.NodeAddress; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; @@ -44,65 +42,88 @@ public class PriceNodeData extends MonitorTask { private final Set priceNodes = new HashSet<>(); public PriceNodeData(Properties properties, Reporter reporter) { - super(properties, reporter); + super(properties, reporter, true); - String hosts = configuration.getProperty("hosts", ""); + String hosts = properties.getProperty("PriceNodeData.hosts", ""); if (hosts == null || hosts.isEmpty()) { - priceNodes.addAll(ProvidersRepository.DEFAULT_NODES.stream().map(e -> e.replace("/", "")).collect(Collectors.toSet())); + priceNodes.addAll(ProvidersRepository.DEFAULT_NODES.stream() + .map(address -> { + if (address.endsWith("/")) { + return address.substring(0, address.length() - 1); + } else { + return address; + } + }) + .collect(Collectors.toSet())); } else { priceNodes.addAll(List.of(hosts.split(","))); } priceNodes.add("https://price.bisq.wiz.biz"); excluded.add("NON_EXISTING_SYMBOL"); - excluded.addAll(Set.of(configuration.getProperty("excluded", "").split(","))); + excluded.addAll(Set.of(properties.getProperty("PriceNodeData.excluded", "").split(","))); } @Override - protected void execute() { + protected String getName() { + return getClass().getSimpleName(); + } + + @Override + public void run() { try { Socks5Proxy proxy = priceNodes.toString().contains(".onion") ? Objects.requireNonNull(Tor.getDefault()).getProxy() : null; - for (String address : priceNodes) { + priceNodes.forEach(address -> { + log.info("Send request to '{}'", address); try { if (address.contains(".onion")) { - NodeAddress nodeAddress = OnionParser.getNodeAddress(address); - MonitorHttpClient httpClient = MonitorHttpClient.config(nodeAddress.getHostName(), nodeAddress.getPort(), proxy); - String host = nodeAddress.getHostNameWithoutPostFix(); + address = getAddressWithoutProtocol(address); + MonitorHttpClient httpClient = MonitorHttpClient.config(address, 80, proxy); + String host = address.replace(".onion", ""); reportFees(host, httpClient.getWithTor("/getFees/")); reportPrices(host, httpClient.getWithTor("/getAllMarketPrices/")); } else { MonitorHttpClient httpClient = MonitorHttpClient.config(address); - NodeAddress nodeAddress = new NodeAddress(address); - String host = nodeAddress.getHostNameWithoutPostFix(); - reportFees(host, httpClient.get("/getFees")); - reportPrices(host, httpClient.get("/getAllMarketPrices")); + address = getAddressWithoutProtocol(address); + reportFees(address, httpClient.get("/getFees")); + reportPrices(address, httpClient.get("/getAllMarketPrices")); } } catch (IOException e) { log.error(e.toString()); + reporter.report(new Metrics(getName() + ".price.error." + getAddressWithoutProtocol(address), -1)); } - } + }); } catch (TorCtlException e) { - e.printStackTrace(); + log.error(e.toString()); } } + @Override + public void shutDown() { + super.shutDown(); + } + private void reportFees(String address, String json) { String btcTxFee = new JsonParser().parse(json).getAsJsonObject() .get("dataMap").getAsJsonObject() .get("btcTxFee").getAsString(); - reporter.report(new Metric(getName(), "fee." + address, btcTxFee)); + reporter.report(new Metrics(getName(), "fee." + address, btcTxFee)); } - private void reportPrices(String host, String json) { + private void reportPrices(String address, String json) { new JsonParser().parse(json).getAsJsonObject().get("data").getAsJsonArray() .forEach(item -> { JsonObject priceItem = item.getAsJsonObject(); String currencyCode = priceItem.get("currencyCode").getAsString(); if (!excluded.contains(currencyCode)) { String price = String.format("%.12f", priceItem.get("price").getAsDouble()); - reporter.report(new Metric(getName() + ".price." + host, currencyCode, price)); + reporter.report(new Metrics(getName() + ".price." + address, currencyCode, price)); } }); } + + private String getAddressWithoutProtocol(String address) { + return address.replace("http://", "").replace("https://", ""); + } } diff --git a/src/main/java/bisq/monitor/monitor/tasks/SeedNodeRoundTripTime.java b/src/main/java/bisq/monitor/monitor/tasks/SeedNodeRoundTripTime.java new file mode 100644 index 0000000..3283ad8 --- /dev/null +++ b/src/main/java/bisq/monitor/monitor/tasks/SeedNodeRoundTripTime.java @@ -0,0 +1,134 @@ +/* + * This file is part of Bisq. + * + * bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with bisq. If not, see . + */ + +package bisq.monitor.monitor.tasks; + +import bisq.common.proto.network.NetworkEnvelope; +import bisq.core.proto.network.CoreNetworkProtoResolver; +import bisq.monitor.monitor.CompletableFutureUtil; +import bisq.monitor.monitor.MonitorTask; +import bisq.monitor.reporter.Metrics; +import bisq.monitor.reporter.Reporter; +import bisq.monitor.utils.Util; +import bisq.network.p2p.NodeAddress; +import bisq.network.p2p.peers.keepalive.messages.Ping; +import bisq.network.p2p.peers.keepalive.messages.Pong; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.time.Clock; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Opens socket connections to all seed nodes sends a Ping message and report the round trip time when we get back the Pong. + */ +@Slf4j +public class SeedNodeRoundTripTime extends MonitorTask { + private static final int SOCKET_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(60); + + private final Map startTimeByNonce = new HashMap<>(); + private final CoreNetworkProtoResolver networkProtoResolver; + private final List> allFutures = new ArrayList<>(); + private final Set addresses; + + public SeedNodeRoundTripTime(Properties properties, Reporter reporter, boolean runSerial) { + super(properties, reporter, runSerial); + + addresses = Util.getOperatorByNodeAddress(properties.getProperty("baseCurrencyNetwork")).keySet(); + networkProtoResolver = new CoreNetworkProtoResolver(Clock.systemDefaultZone()); + } + + @Override + public void run() { + allFutures.clear(); + addresses.forEach(address -> { + log.info("Send request to '{}'", address); + NodeAddress nodeAddress = new NodeAddress(address); + int nonce = new Random().nextInt(); + startTimeByNonce.put(nodeAddress.getFullAddress() + nonce, System.currentTimeMillis()); + NetworkEnvelope request = new Ping(nonce, 0); + + CompletableFuture future = sendMessage(nodeAddress, request) + .whenComplete((response, throwable) -> { + if (throwable == null) { + log.info("Received '{}' from '{}' for request '{}'", response, address, request); + if (response instanceof Pong) { + Pong pong = (Pong) response; + String key = nodeAddress.getFullAddress() + pong.getRequestNonce(); + if (startTimeByNonce.containsKey(key)) { + long startTime = startTimeByNonce.get(key); + long rrt = System.currentTimeMillis() - startTime; + reporter.report(new Metrics(getName() + "." + getAddressForMetric(nodeAddress), rrt)); + } + } + } else { + reporter.report(new Metrics(getName() + "." + getAddressForMetric(nodeAddress), -1)); + } + }); + if (runSerial) { + try { + // join throws an (unchecked) exception if completed exceptionally. + // We use it to enforce completing in serial use case and ignore the exception as it got + // handled already at whenComplete. + future.join(); + } catch (Throwable ignore) { + } + } else { + allFutures.add(future); + } + }); + + // If we used parallel mode we wait until all futures have completed before we return to caller to + // execute next task. + if (!runSerial) { + try { + CompletableFutureUtil.allOf(allFutures).join(); + } catch (Throwable ignore) { + } + } + } + + + @Override + public void shutDown() { + super.shutDown(); + allFutures.forEach(future -> future.cancel(true)); + } + + private CompletableFuture sendMessage(NodeAddress nodeAddress, NetworkEnvelope request) { + return CompletableFuture.supplyAsync(() -> { + try (Socket socket = getSocket(nodeAddress)) { + socket.setSoTimeout(SOCKET_TIMEOUT); + OutputStream outputStream = socket.getOutputStream(); + protobuf.NetworkEnvelope requestProto = request.toProtoNetworkEnvelope(); + requestProto.writeDelimitedTo(outputStream); + outputStream.flush(); + + // Wait blocking for response + protobuf.NetworkEnvelope responseProto = protobuf.NetworkEnvelope.parseDelimitedFrom(socket.getInputStream()); + return networkProtoResolver.fromProto(responseProto); + } catch (IOException e) { + log.error("Error when sending {} to {}. Exception: {}", request, nodeAddress, e.getMessage()); + throw new RuntimeException(e); + } + }); + } +} diff --git a/src/main/java/bisq/monitor/monitor/tasks/TorBasedMonitorTask.java b/src/main/java/bisq/monitor/monitor/tasks/TorBasedMonitorTask.java deleted file mode 100644 index a23faf1..0000000 --- a/src/main/java/bisq/monitor/monitor/tasks/TorBasedMonitorTask.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * This file is part of Bisq. - * - * bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with bisq. If not, see . - */ - -package bisq.monitor.monitor.tasks; - -import bisq.common.app.Version; -import bisq.common.config.BaseCurrencyNetwork; -import bisq.common.persistence.PersistenceManager; -import bisq.common.proto.network.NetworkEnvelope; -import bisq.core.account.witness.AccountAgeWitnessStore; -import bisq.core.proto.network.CoreNetworkProtoResolver; -import bisq.core.proto.persistable.CorePersistenceProtoResolver; -import bisq.core.trade.statistics.TradeStatistics3Store; -import bisq.monitor.monitor.MonitorTask; -import bisq.monitor.monitor.tor.AvailableTor; -import bisq.monitor.monitor.tor.OnionParser; -import bisq.monitor.monitor.utils.ThreadGate; -import bisq.monitor.reporter.Reporter; -import bisq.network.p2p.CloseConnectionMessage; -import bisq.network.p2p.NodeAddress; -import bisq.network.p2p.network.Connection; -import bisq.network.p2p.network.MessageListener; -import bisq.network.p2p.network.NetworkNode; -import bisq.network.p2p.network.TorNetworkNode; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.SettableFuture; -import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; - -import java.io.File; -import java.time.Clock; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -/** - * Contacts a list of hosts and asks them for all the data excluding persisted messages. The - * answers are then compiled into buckets of message types. Based on these - * buckets, the Metric reports (for each host) the message types observed and - * their number. - * - * @author Florian Reimair - */ -@Slf4j -public abstract class TorBasedMonitorTask extends MonitorTask implements MessageListener { - private static final String HOSTS = "hosts"; - private static final String TOR_PROXY_PORT = "run.torProxyPort"; - private static final String DATABASE_DIR = "run.dbDir"; - - protected final Map> bucketsPerHost = new ConcurrentHashMap<>(); - private final ThreadGate gate = new ThreadGate(); - protected final Set hashes = new TreeSet<>(Arrays::compare); - protected final Map seedNodeOperatorByAddress; - private final List seedNodes; - - /** - * Statistics Interface for use with derived classes. - * - * @param the value type of the statistics implementation - */ - public abstract static class Statistics { - protected final Map buckets = new HashMap<>(); - - abstract public void log(Object message); - - public Map values() { - return buckets; - } - - public void reset() { - buckets.clear(); - } - } - - public TorBasedMonitorTask(Properties properties, Reporter reporter, Map seedNodeOperatorByAddress) { - super(properties, reporter); - this.seedNodeOperatorByAddress = seedNodeOperatorByAddress; - seedNodes = new ArrayList<>(seedNodeOperatorByAddress.keySet()); - } - - @Override - public void configure(Properties properties) { - super.configure(properties); - - if (hashes.isEmpty() && configuration.getProperty(DATABASE_DIR) != null) { - File dir = new File(configuration.getProperty(DATABASE_DIR)); - String networkPostfix = "_" + BaseCurrencyNetwork.values()[Version.getBaseCurrencyNetwork()].toString(); - try { - CorePersistenceProtoResolver persistenceProtoResolver = new CorePersistenceProtoResolver(null, null); - - //TODO will not work with historical data... should be refactored to re-use code for reading resource files - TradeStatistics3Store tradeStatistics3Store = new TradeStatistics3Store(); - PersistenceManager tradeStatistics3PersistenceManager = new PersistenceManager<>(dir, - persistenceProtoResolver, null); - tradeStatistics3PersistenceManager.initialize(tradeStatistics3Store, - tradeStatistics3Store.getDefaultStorageFileName() + networkPostfix, - PersistenceManager.Source.NETWORK); - TradeStatistics3Store persistedTradeStatistics3Store = tradeStatistics3PersistenceManager.getPersisted(); - if (persistedTradeStatistics3Store != null) { - tradeStatistics3Store.getMap().putAll(persistedTradeStatistics3Store.getMap()); - } - hashes.addAll(tradeStatistics3Store.getMap().keySet().stream() - .map(byteArray -> byteArray.bytes).collect(Collectors.toSet())); - - AccountAgeWitnessStore accountAgeWitnessStore = new AccountAgeWitnessStore(); - PersistenceManager accountAgeWitnessPersistenceManager = new PersistenceManager<>(dir, - persistenceProtoResolver, null); - accountAgeWitnessPersistenceManager.initialize(accountAgeWitnessStore, - accountAgeWitnessStore.getDefaultStorageFileName() + networkPostfix, - PersistenceManager.Source.NETWORK); - AccountAgeWitnessStore persistedAccountAgeWitnessStore = accountAgeWitnessPersistenceManager.getPersisted(); - if (persistedAccountAgeWitnessStore != null) { - accountAgeWitnessStore.getMap().putAll(persistedAccountAgeWitnessStore.getMap()); - } - hashes.addAll(accountAgeWitnessStore.getMap().keySet().stream() - .map(byteArray -> byteArray.bytes).collect(Collectors.toSet())); - } catch (NullPointerException e) { - // in case there is no store file - log.error("There is no storage file where there should be one: {}", dir.getAbsolutePath()); - } - } - } - - @Override - protected void execute() { - // start the network node - NetworkNode networkNode = new TorNetworkNode(Integer.parseInt(configuration.getProperty(TOR_PROXY_PORT, "9054")), - new CoreNetworkProtoResolver(Clock.systemDefaultZone()), false, - new AvailableTor("unused"), null); - // we do not need to start the networkNode, as we do not need the HS - //networkNode.start(this); - - // clear our buckets - bucketsPerHost.clear(); - - getRequests().forEach(request -> send(networkNode, request)); - - report(); - } - - protected abstract List getRequests(); - - protected void send(NetworkNode networkNode, NetworkEnvelope message) { - ArrayList threadList = new ArrayList<>(); - // for each configured host - String hosts = configuration.getProperty(HOSTS, ""); - List nodes = hosts != null && !hosts.isEmpty() ? List.of(hosts.split(",")) : seedNodes; - for (String current : nodes) { - threadList.add(new Thread(() -> { - try { - NodeAddress node = OnionParser.getNodeAddress(current); - aboutToSend(message); - SettableFuture future = networkNode.sendMessage(node, message); - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(Connection connection) { - connection.addMessageListener(TorBasedMonitorTask.this); - } - - @Override - public void onFailure(@NotNull Throwable throwable) { - gate.proceed(); - log.error("Sending {} failed. That is expected if the peer is offline.\n\tException={}", - message.getClass().getSimpleName(), throwable.getMessage()); - } - }, MoreExecutors.directExecutor()); - - } catch (Exception e) { - gate.proceed(); // release the gate on error - e.printStackTrace(); - } - }, "Thread-" + current)); - } - - gate.engage(threadList.size()); - - // start all threads and wait until they all finished. We do that so we can - // minimize the time between querying the hosts and therefore the chance of - // inconsistencies. - threadList.forEach(Thread::start); - - gate.await(); - } - - @Override - public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { - if (processMessage(networkEnvelope, connection)) { - gate.proceed(); - } else if (networkEnvelope instanceof CloseConnectionMessage) { - gate.unlock(); - } else { - log.warn("Got an unexpected message of type <{}>", - networkEnvelope.getClass().getSimpleName()); - } - connection.removeMessageListener(this); - } - - protected abstract boolean processMessage(NetworkEnvelope networkEnvelope, Connection connection); - - protected void aboutToSend(NetworkEnvelope message) { - } - - public abstract void report(); - -} diff --git a/src/main/java/bisq/monitor/monitor/tasks/TorConnectionTime.java b/src/main/java/bisq/monitor/monitor/tasks/TorConnectionTime.java new file mode 100644 index 0000000..55812be --- /dev/null +++ b/src/main/java/bisq/monitor/monitor/tasks/TorConnectionTime.java @@ -0,0 +1,134 @@ +/* + * This file is part of Bisq. + * + * bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with bisq. If not, see . + */ + +package bisq.monitor.monitor.tasks; + +import bisq.monitor.monitor.CompletableFutureUtil; +import bisq.monitor.monitor.MonitorTask; +import bisq.monitor.reporter.Metrics; +import bisq.monitor.reporter.Reporter; +import bisq.network.p2p.NodeAddress; +import lombok.extern.slf4j.Slf4j; +import org.berndpruenster.netlayer.tor.NativeTor; +import org.berndpruenster.netlayer.tor.Tor; + +import java.io.File; +import java.io.IOException; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +/** + * Opens socket connections to the given hosts and report time how long it took. + * If we have a connection once established and the socket closed, and later re-connect it is much faster. + */ +@Slf4j +public class TorConnectionTime extends MonitorTask { + private final List addresses; + private final boolean restartTor; + private final List> allFutures = new ArrayList<>(); + private final File torDir; + private boolean firstRunCompleted; + + public TorConnectionTime(Properties properties, Reporter reporter, File appDir, boolean restartTor) { + super(properties, reporter, false); + + this.restartTor = restartTor; + torDir = new File(appDir, "TorConnectionTime"); + addresses = List.of(properties.getProperty("TorConnectionTime.hosts", "").split(",")); + } + + @Override + protected String getName() { + String postFix = restartTor ? ".restartTor" : ".reconnect"; + return getClass().getSimpleName() + postFix; + } + + @Override + public void run() { + addresses.forEach(address -> { + if (restartTor) { + try { + Tor.getDefault().shutdown(); + Tor.setDefault(new NativeTor(torDir, null, null)); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + log.info("Connect to '{}'", address); + NodeAddress nodeAddress = new NodeAddress(address); + CompletableFuture future = connect(nodeAddress) + .whenComplete((duration, throwable) -> { + String path = getName() + "." + getAddressForMetric(nodeAddress); + if (throwable == null) { + if (restartTor || firstRunCompleted) { + reporter.report(new Metrics(path, duration)); + } else { + log.info("We do not report the first connections if restartTor=false because the " + + "first connection takes much longer and we do not want to pollute our metrics."); + } + } else { + reporter.report(new Metrics(path, -1)); + } + }); + if (runSerial) { + try { + // join throws an (unchecked) exception if completed exceptionally. + // We use it to enforce completing in serial use case and ignore the exception as it got + // handled already at whenComplete. + future.join(); + } catch (Throwable ignore) { + } + } else { + allFutures.add(future); + } + }); + + // If we used parallel mode we wait until all futures have completed before we return to caller to + // execute next task. + if (!runSerial) { + try { + CompletableFutureUtil.allOf(allFutures).join(); + } catch (Throwable ignore) { + } + } + firstRunCompleted = true; + } + + @Override + public void shutDown() { + super.shutDown(); + allFutures.forEach(future -> future.cancel(true)); + } + + private CompletableFuture connect(NodeAddress nodeAddress) { + return CompletableFuture.supplyAsync(() -> { + long ts = System.currentTimeMillis(); + try (Socket socket = getSocket(nodeAddress)) { + log.info("Connection established to '{}'. socket={}", nodeAddress, socket); + return System.currentTimeMillis() - ts; + } catch (IOException e) { + log.error("Error when connecting to {}. Exception: {}", nodeAddress, e.getMessage()); + throw new RuntimeException(e); + } + }); + } + +} diff --git a/src/main/java/bisq/monitor/monitor/tasks/TorHiddenServiceStartupTime.java b/src/main/java/bisq/monitor/monitor/tasks/TorHiddenServiceStartupTime.java new file mode 100644 index 0000000..857075a --- /dev/null +++ b/src/main/java/bisq/monitor/monitor/tasks/TorHiddenServiceStartupTime.java @@ -0,0 +1,80 @@ +/* + * This file is part of Bisq. + * + * bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with bisq. If not, see . + */ + +package bisq.monitor.monitor.tasks; + +import bisq.monitor.monitor.MonitorTask; +import bisq.monitor.monitor.TorNode; +import bisq.monitor.reporter.Metrics; +import bisq.monitor.reporter.Reporter; +import lombok.extern.slf4j.Slf4j; +import org.berndpruenster.netlayer.tor.HiddenServiceSocket; + +import java.io.File; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +/** + * Opens socket connections to the given hosts and report time how long it took. + * If we have a connection once established and the socket closed, and later re-connect it is much faster. + */ +@Slf4j +public class TorHiddenServiceStartupTime extends MonitorTask { + private static final String HS_DIR = "hiddenService_temp"; + private CompletableFuture future; + + public TorHiddenServiceStartupTime(Properties properties, Reporter reporter) { + super(properties, reporter, true); + } + + @Override + public void run() { + HiddenServiceSocket hiddenServiceSocket = null; + try { + long ts = System.currentTimeMillis(); + new File(TorNode.getTorDir() + "/" + HS_DIR).delete(); + hiddenServiceSocket = new HiddenServiceSocket(9998, HS_DIR, 9999); + future = new CompletableFuture<>(); + hiddenServiceSocket.addReadyListener(socket -> { + future.complete(socket); + return null; + }); + future.join(); + + reporter.report(new Metrics(getName(), System.currentTimeMillis() - ts)); + } catch (Throwable e) { + throw new RuntimeException(e); + } finally { + if (hiddenServiceSocket != null) { + hiddenServiceSocket.close(); + } + } + } + + @Override + public void shutDown() { + super.shutDown(); + if (future != null) { + future.cancel(true); + } + } + + @Override + protected String getName() { + return getClass().getSimpleName(); + } +} diff --git a/src/main/java/bisq/monitor/monitor/tasks/TorStartupTime.java b/src/main/java/bisq/monitor/monitor/tasks/TorStartupTime.java new file mode 100644 index 0000000..b650cd5 --- /dev/null +++ b/src/main/java/bisq/monitor/monitor/tasks/TorStartupTime.java @@ -0,0 +1,69 @@ +/* + * This file is part of Bisq. + * + * bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with bisq. If not, see . + */ + +package bisq.monitor.monitor.tasks; + +import bisq.monitor.monitor.MonitorTask; +import bisq.monitor.reporter.Metrics; +import bisq.monitor.reporter.Reporter; +import lombok.extern.slf4j.Slf4j; +import org.berndpruenster.netlayer.tor.NativeTor; + +import java.io.File; +import java.util.Properties; + +/** + * Opens socket connections to the given hosts and report time how long it took. + * If we have a connection once established and the socket closed, and later re-connect it is much faster. + */ +@Slf4j +public class TorStartupTime extends MonitorTask { + private final File torDir; + private NativeTor tor; + + public TorStartupTime(Properties properties, Reporter reporter, File appDir) { + super(properties, reporter, true); + + torDir = new File(appDir, "TorStartupTime"); + } + + @Override + public void run() { + try { + torDir.delete(); + long ts = System.currentTimeMillis(); + tor = new NativeTor(torDir, null, null); + reporter.report(new Metrics(getName(), System.currentTimeMillis() - ts)); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + @Override + public void shutDown() { + super.shutDown(); + + if (tor != null) { + tor.shutdown(); + } + } + + @Override + protected String getName() { + return getClass().getSimpleName(); + } +} diff --git a/src/main/java/bisq/monitor/monitor/tasks/bisqnetwork/P2PMarketStats.java b/src/main/java/bisq/monitor/monitor/tasks/bisqnetwork/P2PMarketStats.java deleted file mode 100644 index 0b69b5a..0000000 --- a/src/main/java/bisq/monitor/monitor/tasks/bisqnetwork/P2PMarketStats.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * This file is part of Bisq. - * - * bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with bisq. If not, see . - */ - -package bisq.monitor.monitor.tasks.bisqnetwork; - -import bisq.common.proto.network.NetworkEnvelope; -import bisq.core.offer.OfferUtil; -import bisq.core.offer.bisq_v1.OfferPayload; -import bisq.monitor.monitor.tasks.TorBasedMonitorTask; -import bisq.monitor.reporter.Reporter; -import bisq.network.p2p.NodeAddress; -import bisq.network.p2p.network.Connection; -import bisq.network.p2p.peers.getdata.messages.GetDataResponse; -import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest; -import bisq.network.p2p.storage.payload.ProtectedStorageEntry; -import bisq.network.p2p.storage.payload.ProtectedStoragePayload; -import lombok.extern.slf4j.Slf4j; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -import static com.google.common.base.Preconditions.checkNotNull; - -//todo remove - -/** - * Demo Stats metric derived from the OfferPayload messages we get from the seed nodes - * - * @author Florian Reimair - */ -@Slf4j -public class P2PMarketStats extends TorBasedMonitorTask { - final Map> versionBucketsPerHost = new ConcurrentHashMap<>(); - final Map> offerVolumeBucketsPerHost = new ConcurrentHashMap<>(); - final Map>> offerVolumeDistributionBucketsPerHost = new ConcurrentHashMap<>(); - final Map>> offersPerTraderBucketsPerHost = new ConcurrentHashMap<>(); - final Map>> volumePerTraderBucketsPerHost = new ConcurrentHashMap<>(); - - /** - * Efficient way to aggregate numbers. - */ - private static class Aggregator { - private long value = 0; - - synchronized long value() { - return value; - } - - synchronized void increment() { - value++; - } - - synchronized void add(long amount) { - value += amount; - } - } - - private abstract static class OfferStatistics extends Statistics { - @Override - public synchronized void log(Object message) { - if (message instanceof OfferPayload) { - OfferPayload currentMessage = (OfferPayload) message; - // For logging different data types - String market = currentMessage.getDirection() + "." + currentMessage.getBaseCurrencyCode() + "_" + currentMessage.getCounterCurrencyCode(); - - process(market, currentMessage); - } - } - - abstract void process(String market, OfferPayload currentMessage); - } - - private class OfferCountStatistics extends OfferStatistics { - - @Override - void process(String market, OfferPayload currentMessage) { - buckets.putIfAbsent(market, new Aggregator()); - buckets.get(market).increment(); - } - } - - private class OfferVolumeStatistics extends OfferStatistics { - - @Override - void process(String market, OfferPayload currentMessage) { - buckets.putIfAbsent(market, new Aggregator()); - buckets.get(market).add(currentMessage.getAmount()); - } - } - - private class OfferVolumeDistributionStatistics extends OfferStatistics> { - - @Override - void process(String market, OfferPayload currentMessage) { - buckets.putIfAbsent(market, new ArrayList<>()); - buckets.get(market).add(currentMessage.getAmount()); - } - } - - private class OffersPerTraderStatistics extends OfferStatistics> { - - @Override - void process(String market, OfferPayload currentMessage) { - buckets.putIfAbsent(market, new HashMap<>()); - buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Aggregator()); - buckets.get(market).get(currentMessage.getOwnerNodeAddress()).increment(); - } - } - - private class VolumePerTraderStatistics extends OfferStatistics> { - - @Override - void process(String market, OfferPayload currentMessage) { - buckets.putIfAbsent(market, new HashMap<>()); - buckets.get(market).putIfAbsent(currentMessage.getOwnerNodeAddress(), new Aggregator()); - buckets.get(market).get(currentMessage.getOwnerNodeAddress()).add(currentMessage.getAmount()); - } - } - - private class VersionsStatistics extends Statistics { - - @Override - public void log(Object message) { - - if (message instanceof OfferPayload) { - OfferPayload offerPayload = (OfferPayload) message; - String version = "v" + OfferUtil.getVersionFromId(offerPayload.getId()); - buckets.putIfAbsent(version, new Aggregator()); - buckets.get(version).increment(); - } - } - } - - public P2PMarketStats(Properties properties, Reporter graphiteReporter, Map seedNodeOperatorByAddress) { - super(properties, graphiteReporter, seedNodeOperatorByAddress); - } - - @Override - protected List getRequests() { - List result = new ArrayList<>(); - - Random random = new Random(); - result.add(new PreliminaryGetDataRequest(random.nextInt(), hashes)); - - return result; - } - - protected void createHistogram(List input, String market, Map report) { - int numberOfBins = 5; - - // - get biggest offer - double max = input.stream().max(Long::compareTo).map(value -> value * 1.01).orElse(0.0); - - // - create histogram - input.stream().collect( - Collectors.groupingBy(aLong -> aLong == max ? numberOfBins - 1 : (int) Math.floor(aLong / (max / numberOfBins)), Collectors.counting())). - forEach((integer, integer2) -> report.put(market + ".bin_" + integer, String.valueOf(integer2))); - - report.put(market + ".number_of_bins", String.valueOf(numberOfBins)); - report.put(market + ".max", String.valueOf((int) max)); - } - - @Override - public void report() { - Map report = new HashMap<>(); - bucketsPerHost.values().stream().findFirst().ifPresent(nodeAddressStatisticsEntry -> nodeAddressStatisticsEntry.values().forEach((market, numberOfOffers) -> report.put(market, String.valueOf(((Aggregator) numberOfOffers).value())))); - reporter.report(report, getName() + ".offerCount"); - - // do offerbook volume statistics - report.clear(); - offerVolumeBucketsPerHost.values().stream().findFirst().ifPresent(aggregatorStatistics -> aggregatorStatistics.values().forEach((market, numberOfOffers) -> report.put(market, String.valueOf(numberOfOffers.value())))); - reporter.report(report, getName() + ".volume"); - - // do the offer vs volume histogram - report.clear(); - // - get a data set - offerVolumeDistributionBucketsPerHost.values().stream().findFirst().ifPresent(listStatistics -> listStatistics.values().forEach((market, offers) -> { - createHistogram(offers, market, report); - })); - reporter.report(report, getName() + ".volume-per-offer-distribution"); - - // do offers per trader - report.clear(); - // - get a data set - offersPerTraderBucketsPerHost.values().stream().findFirst().ifPresent(mapStatistics -> mapStatistics.values().forEach((market, stuff) -> { - List offerPerTrader = stuff.values().stream().map(Aggregator::value).collect(Collectors.toList()); - - createHistogram(offerPerTrader, market, report); - })); - reporter.report(report, getName() + ".traders_by_number_of_offers"); - - // do volume per trader - report.clear(); - // - get a data set - volumePerTraderBucketsPerHost.values().stream().findFirst().ifPresent(mapStatistics -> mapStatistics.values().forEach((market, stuff) -> { - List volumePerTrader = stuff.values().stream().map(Aggregator::value).collect(Collectors.toList()); - - createHistogram(volumePerTrader, market, report); - })); - reporter.report(report, getName() + ".traders_by_volume"); - - // do version statistics - report.clear(); - Optional> optionalStatistics = versionBucketsPerHost.values().stream().findAny(); - optionalStatistics.ifPresent(aggregatorStatistics -> aggregatorStatistics.values() - .forEach((version, numberOfOccurrences) -> report.put(version, String.valueOf(numberOfOccurrences.value())))); - reporter.report(report, "versions"); - } - - protected boolean processMessage(NetworkEnvelope networkEnvelope, Connection connection) { - checkNotNull(connection.getPeersNodeAddressProperty(), - "although the property is nullable, we need it to not be null"); - - if (networkEnvelope instanceof GetDataResponse) { - - Statistics offerCount = new OfferCountStatistics(); - Statistics offerVolume = new OfferVolumeStatistics(); - Statistics offerVolumeDistribution = new OfferVolumeDistributionStatistics(); - Statistics offersPerTrader = new OffersPerTraderStatistics(); - Statistics volumePerTrader = new VolumePerTraderStatistics(); - Statistics versions = new VersionsStatistics(); - - GetDataResponse dataResponse = (GetDataResponse) networkEnvelope; - final Set dataSet = dataResponse.getDataSet(); - dataSet.forEach(e -> { - final ProtectedStoragePayload protectedStoragePayload = e.getProtectedStoragePayload(); - if (protectedStoragePayload == null) { - log.warn("StoragePayload was null: {}", networkEnvelope.toString()); - return; - } - - offerCount.log(protectedStoragePayload); - offerVolume.log(protectedStoragePayload); - offerVolumeDistribution.log(protectedStoragePayload); - offersPerTrader.log(protectedStoragePayload); - volumePerTrader.log(protectedStoragePayload); - versions.log(protectedStoragePayload); - }); - - dataResponse.getPersistableNetworkPayloadSet().forEach(persistableNetworkPayload -> { - // memorize message hashes - //Byte[] bytes = new Byte[persistableNetworkPayload.getHash().length]; - //Arrays.setAll(bytes, n -> persistableNetworkPayload.getHash()[n]); - - //hashes.add(bytes); - - hashes.add(persistableNetworkPayload.getHash()); - }); - - bucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerCount); - offerVolumeBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerVolume); - offerVolumeDistributionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offerVolumeDistribution); - offersPerTraderBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), offersPerTrader); - volumePerTraderBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), volumePerTrader); - versionBucketsPerHost.put(connection.getPeersNodeAddressProperty().getValue(), versions); - return true; - } - return false; - } -} diff --git a/src/main/java/bisq/monitor/monitor/tasks/bisqnetwork/P2PNetworkLoad.java b/src/main/java/bisq/monitor/monitor/tasks/bisqnetwork/P2PNetworkLoad.java deleted file mode 100644 index e4ab5f4..0000000 --- a/src/main/java/bisq/monitor/monitor/tasks/bisqnetwork/P2PNetworkLoad.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.monitor.monitor.tasks.bisqnetwork; - -import bisq.common.ClockWatcher; -import bisq.common.config.Config; -import bisq.common.file.CorruptedStorageFileHandler; -import bisq.common.persistence.PersistenceManager; -import bisq.common.proto.network.NetworkEnvelope; -import bisq.common.proto.network.NetworkProtoResolver; -import bisq.core.network.p2p.seed.DefaultSeedNodeRepository; -import bisq.core.proto.network.CoreNetworkProtoResolver; -import bisq.core.proto.persistable.CorePersistenceProtoResolver; -import bisq.monitor.monitor.MonitorTask; -import bisq.monitor.monitor.tor.AvailableTor; -import bisq.monitor.monitor.utils.ThreadGate; -import bisq.monitor.reporter.Reporter; -import bisq.network.p2p.network.*; -import bisq.network.p2p.peers.PeerManager; -import bisq.network.p2p.peers.keepalive.KeepAliveManager; -import bisq.network.p2p.peers.peerexchange.PeerExchangeManager; -import bisq.network.p2p.storage.messages.BroadcastMessage; -import lombok.extern.slf4j.Slf4j; - -import java.io.File; -import java.time.Clock; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -//todo remove - -/** - * Contacts a list of hosts and asks them for all the data we do not have. The - * answers are then compiled into buckets of message types. Based on these - * buckets, the Metric reports (for each host) the message types observed and - * their number along with a relative comparison between all hosts. - * - * @author Florian Reimair - */ -@Slf4j -public class P2PNetworkLoad extends MonitorTask implements MessageListener, SetupListener { - private static final String TOR_PROXY_PORT = "run.torProxyPort"; - private static final String MAX_CONNECTIONS = "run.maxConnections"; - private static final String HISTORY_SIZE = "run.historySize"; - - private NetworkNode networkNode; - private final File torHiddenServiceDir = new File("metric_" + getName()); - private final ThreadGate hsReady = new ThreadGate(); - private final Map buckets = new ConcurrentHashMap<>(); - - /** - * Buffers the last X message we received. New messages will only be logged in case - * the message isn't already in the history. Note that the oldest message hashes are - * dropped to record newer hashes. - */ - private Map history; - private long lastRun = 0; - - public P2PNetworkLoad(Properties properties, Reporter reporter) { - super(properties, reporter); - } - - @Override - public void configure(Properties properties) { - super.configure(properties); - history = Collections.synchronizedMap(new FixedSizeHistoryTracker<>(Integer.parseInt(configuration.getProperty(HISTORY_SIZE, "200")))); - } - - @Override - public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { - if (networkEnvelope instanceof BroadcastMessage) { - try { - if (history.get(networkEnvelope.hashCode()) == null) { - history.put(networkEnvelope.hashCode(), null); - buckets.get(networkEnvelope.getClass().getSimpleName()).increment(); - } - } catch (NullPointerException e) { - // use exception handling because we hardly ever need to add a fresh bucket - buckets.put(networkEnvelope.getClass().getSimpleName(), new Counter()); - } - } - } - - @Override - protected void execute() { - // in case we do not have a NetworkNode up and running, we create one - if (null == networkNode) { - // prepare the gate - hsReady.engage(); - - // start the network node - int port = Integer.parseInt(configuration.getProperty(TOR_PROXY_PORT, "9053")); - AvailableTor availableTor = new AvailableTor(torHiddenServiceDir.getName()); - networkNode = new TorNetworkNode(port, - new CoreNetworkProtoResolver(Clock.systemDefaultZone()), - false, - availableTor, - null); - networkNode.start(this); - - // wait for the HS to be published - hsReady.await(); - - // boot up P2P node - try { - Config config = new Config(); - CorruptedStorageFileHandler corruptedStorageFileHandler = new CorruptedStorageFileHandler(); - int maxConnections = Integer.parseInt(configuration.getProperty(MAX_CONNECTIONS, "12")); - NetworkProtoResolver networkProtoResolver = new CoreNetworkProtoResolver(Clock.systemDefaultZone()); - CorePersistenceProtoResolver persistenceProtoResolver = new CorePersistenceProtoResolver(null, - networkProtoResolver); - DefaultSeedNodeRepository seedNodeRepository = new DefaultSeedNodeRepository(config); - PeerManager peerManager = new PeerManager(networkNode, seedNodeRepository, new ClockWatcher(), - new PersistenceManager<>(torHiddenServiceDir, persistenceProtoResolver, corruptedStorageFileHandler), maxConnections); - - // init file storage - peerManager.readPersisted(() -> { - }); - - PeerExchangeManager peerExchangeManager = new PeerExchangeManager(networkNode, seedNodeRepository, - peerManager); - // updates the peer list every now and then as well - peerExchangeManager.requestReportedPeersFromSeedNodes(seedNodeRepository.getSeedNodeAddresses().iterator().next()); - - KeepAliveManager keepAliveManager = new KeepAliveManager(networkNode, peerManager); - keepAliveManager.start(); - - networkNode.addMessageListener(this); - } catch (Throwable e) { - e.printStackTrace(); - } - } - - // report - Map report = new HashMap<>(); - - if (lastRun != 0 && System.currentTimeMillis() - lastRun != 0) { - // - normalize to data/minute - double perMinuteFactor = 60000.0 / (System.currentTimeMillis() - lastRun); - - - // - get snapshot so we do not loose data - Set keys = new HashSet<>(buckets.keySet()); - - // - transfer values to report - keys.forEach(key -> { - int value = buckets.get(key).getAndReset(); - if (value != 0) { - String valueAsString = String.format("%.2f", value * perMinuteFactor); - report.put(key, valueAsString); - } - }); - - // - report - reporter.report(report, getName()); - } - - // - reset last run - lastRun = System.currentTimeMillis(); - } - - - @Override - public void onTorNodeReady() { - } - - @Override - public void onHiddenServicePublished() { - // open the gate - hsReady.proceed(); - } - - @Override - public void onSetupFailed(Throwable throwable) { - } - - @Override - public void onRequestCustomBridges() { - } - - - /** - * History implementation using a {@link LinkedHashMap} and its - * {@link LinkedHashMap#removeEldestEntry(Map.Entry)} option. - */ - private static class FixedSizeHistoryTracker extends LinkedHashMap { - final int historySize; - - FixedSizeHistoryTracker(int historySize) { - super(historySize, 10, true); - this.historySize = historySize; - } - - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > historySize; - } - } - - /** - * Efficient way to count message occurrences. - */ - private static class Counter { - private int value = 1; - - synchronized int getAndReset() { - try { - return value; - } finally { - value = 0; - } - } - - synchronized void increment() { - value++; - } - } -} diff --git a/src/main/java/bisq/monitor/monitor/tasks/seed/SeedNodeRoundTripTime.java b/src/main/java/bisq/monitor/monitor/tasks/seed/SeedNodeRoundTripTime.java deleted file mode 100644 index ddfefcd..0000000 --- a/src/main/java/bisq/monitor/monitor/tasks/seed/SeedNodeRoundTripTime.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * This file is part of Bisq. - * - * bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with bisq. If not, see . - */ - -package bisq.monitor.monitor.tasks.seed; - -import bisq.common.proto.network.NetworkEnvelope; -import bisq.monitor.monitor.tasks.TorBasedMonitorTask; -import bisq.monitor.reporter.Metric; -import bisq.monitor.reporter.Reporter; -import bisq.network.p2p.network.CloseConnectionReason; -import bisq.network.p2p.network.Connection; -import bisq.network.p2p.peers.keepalive.messages.Ping; -import bisq.network.p2p.peers.keepalive.messages.Pong; - -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Sends a Ping message to all seed nodes and get the round trip time when we get back the Pong. - */ -public class SeedNodeRoundTripTime extends TorBasedMonitorTask { - private final Map rrtByAddress = new ConcurrentHashMap<>(); - - public SeedNodeRoundTripTime(Properties properties, Reporter reporter, Map seedNodeOperatorByAddress) { - super(properties, reporter, seedNodeOperatorByAddress); - } - - @Override - protected List getRequests() { - // We use the request timestamp as nonce and take it out from the Pong later - return List.of(new Ping((int) System.currentTimeMillis() / 1000, 0)); - } - - @Override - protected boolean processMessage(NetworkEnvelope networkEnvelope, Connection connection) { - if (networkEnvelope instanceof Pong) { - checkNotNull(connection.getPeersNodeAddressProperty(), "nodeAddress must not be null at that moment"); - String address = connection.getPeersNodeAddressProperty().get().getHostNameWithoutPostFix(); - Pong pong = (Pong) networkEnvelope; - int rrt = (int) System.currentTimeMillis() / 1000 - pong.getRequestNonce(); - rrtByAddress.put(address, rrt); - connection.shutDown(CloseConnectionReason.CLOSE_REQUESTED_BY_PEER); - return true; - } - return false; - } - - @Override - public void report() { - rrtByAddress.forEach((key, value) -> reporter.report(new Metric(getName() + "." + key, value))); - rrtByAddress.clear(); - } -} diff --git a/src/main/java/bisq/monitor/monitor/tasks/tornetwork/TorConnectionTime.java b/src/main/java/bisq/monitor/monitor/tasks/tornetwork/TorConnectionTime.java deleted file mode 100644 index 4045e78..0000000 --- a/src/main/java/bisq/monitor/monitor/tasks/tornetwork/TorConnectionTime.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.monitor.monitor.tasks.tornetwork; - -import bisq.monitor.monitor.MonitorTask; -import bisq.monitor.monitor.tor.OnionParser; -import bisq.monitor.reporter.Metric; -import bisq.monitor.reporter.Reporter; -import bisq.network.p2p.NodeAddress; -import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy; -import com.runjva.sourceforge.jsocks.protocol.SocksSocket; -import lombok.extern.slf4j.Slf4j; -import org.berndpruenster.netlayer.tor.Tor; -import org.berndpruenster.netlayer.tor.TorCtlException; - -import java.io.IOException; -import java.util.Properties; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Open a tor socket connection to the given hosts and measure the time how long it took for establishing the connection. - */ -@Slf4j -public class TorConnectionTime extends MonitorTask { - public TorConnectionTime(Properties properties, Reporter reporter) { - super(properties, reporter); - } - - @Override - protected void execute() { - SocksSocket socket; - NodeAddress nodeAddress = null; - try { - Tor tor = Tor.getDefault(); - checkNotNull(tor, "tor must not be null"); - Socks5Proxy proxy = tor.getProxy(); - for (String host : configuration.getProperty("hosts", "").split(",")) { - nodeAddress = OnionParser.getNodeAddress(host); - long start = System.currentTimeMillis(); - // Connect to node - socket = new SocksSocket(proxy, nodeAddress.getHostName(), nodeAddress.getPort()); - reporter.report(new Metric(getName(), nodeAddress.getHostNameWithoutPostFix(), String.valueOf(System.currentTimeMillis() - start))); - socket.close(); - } - } catch (TorCtlException | IOException e) { - if (nodeAddress != null) { - log.error("Error while connecting to {}", nodeAddress); - } - log.error("Error at connection to host", e); - } - } -} diff --git a/src/main/java/bisq/monitor/monitor/tasks/tornetwork/TorHiddenServiceStartupTime.java b/src/main/java/bisq/monitor/monitor/tasks/tornetwork/TorHiddenServiceStartupTime.java deleted file mode 100644 index 17574b2..0000000 --- a/src/main/java/bisq/monitor/monitor/tasks/tornetwork/TorHiddenServiceStartupTime.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.monitor.monitor.tasks.tornetwork; - -import bisq.monitor.monitor.MonitorTask; -import bisq.monitor.monitor.tor.TorNode; -import bisq.monitor.monitor.utils.ThreadGate; -import bisq.monitor.reporter.Metric; -import bisq.monitor.reporter.Reporter; -import lombok.extern.slf4j.Slf4j; -import org.berndpruenster.netlayer.tor.HiddenServiceSocket; - -import java.io.File; -import java.util.Properties; - -/** - * A Metric to measure the startup time of a Tor Hidden Service on a already - * running Tor. - * - * @author Florian Reimair - */ -@Slf4j -public class TorHiddenServiceStartupTime extends MonitorTask { - private final String hiddenServiceDirectory = getName(); - private final ThreadGate gate = new ThreadGate(); - - public TorHiddenServiceStartupTime(Properties properties, Reporter reporter) { - super(properties, reporter); - } - - @Override - protected void execute() { - int localPort = Integer.parseInt(configuration.getProperty("localPort", "9998")); - int servicePort = Integer.parseInt(configuration.getProperty("servicePort", "9999")); - - // clear directory, so we get a new onion address every time - new File(TorNode.getTorDir() + "/" + hiddenServiceDirectory).delete(); - gate.engage(); - long start = System.currentTimeMillis(); - HiddenServiceSocket hiddenServiceSocket = new HiddenServiceSocket(localPort, hiddenServiceDirectory, servicePort); - hiddenServiceSocket.addReadyListener(socket -> { - reporter.report(new Metric(getName(), System.currentTimeMillis() - start)); - gate.proceed(); - return null; - }); - - gate.await(); - hiddenServiceSocket.close(); - } -} diff --git a/src/main/java/bisq/monitor/monitor/tasks/tornetwork/TorStartupTime.java b/src/main/java/bisq/monitor/monitor/tasks/tornetwork/TorStartupTime.java deleted file mode 100644 index 7da8e01..0000000 --- a/src/main/java/bisq/monitor/monitor/tasks/tornetwork/TorStartupTime.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.monitor.monitor.tasks.tornetwork; - -import bisq.monitor.monitor.MonitorTask; -import bisq.monitor.reporter.Metric; -import bisq.monitor.reporter.Reporter; -import lombok.extern.slf4j.Slf4j; -import org.berndpruenster.netlayer.tor.NativeTor; -import org.berndpruenster.netlayer.tor.Tor; -import org.berndpruenster.netlayer.tor.TorCtlException; -import org.berndpruenster.netlayer.tor.Torrc; - -import java.io.File; -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.Properties; - -/** - * A Metric to measure the deployment and startup time of the packaged Tor - * binaries. - * - * @author Florian Reimair - */ -@Slf4j -public class TorStartupTime extends MonitorTask { - private final File torDir; - private Torrc torOverrides; - - public TorStartupTime(Properties properties, File appDir, Reporter reporter) { - super(properties, reporter); - this.torDir = new File(appDir, "TorStartupTime"); - } - - @Override - public void configure(Properties properties) { - super.configure(properties); - - synchronized (this) { - LinkedHashMap overrides = new LinkedHashMap<>(); - //todo why? - overrides.put("SOCKSPort", configuration.getProperty("socksPort", "90500")); - - try { - torOverrides = new Torrc(overrides); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - @Override - protected void execute() { - torDir.delete(); - Tor tor = null; - long start = System.currentTimeMillis(); - try { - tor = new NativeTor(torDir, null, torOverrides); - reporter.report(new Metric(getName(), System.currentTimeMillis() - start)); - } catch (TorCtlException e) { - log.error("Error at starting tor", e); - } finally { - if (tor != null) { - tor.shutdown(); - } - } - } -} diff --git a/src/main/java/bisq/monitor/monitor/tor/AvailableTor.java b/src/main/java/bisq/monitor/monitor/tor/AvailableTor.java deleted file mode 100644 index b3354b7..0000000 --- a/src/main/java/bisq/monitor/monitor/tor/AvailableTor.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.monitor.monitor.tor; - -import bisq.network.p2p.network.TorMode; -import lombok.Setter; -import org.berndpruenster.netlayer.tor.Tor; - -import java.io.File; - -/** - * This class uses an already running Tor instance via Tor.getDefault() - * - * @author Florian Reimair - */ -public class AvailableTor extends TorMode { - @Setter - private static File appDir; - private final String hiddenServiceDirectory; - - public AvailableTor(String hiddenServiceDirectory) { - super(new File(appDir + "/tor")); - - this.hiddenServiceDirectory = hiddenServiceDirectory; - } - - @Override - public Tor getTor() { - return Tor.getDefault(); - } - - @Override - public String getHiddenServiceDirectory() { - return hiddenServiceDirectory; - } - -} diff --git a/src/main/java/bisq/monitor/monitor/tor/OnionParser.java b/src/main/java/bisq/monitor/monitor/tor/OnionParser.java deleted file mode 100644 index d7a28fd..0000000 --- a/src/main/java/bisq/monitor/monitor/tor/OnionParser.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.monitor.monitor.tor; - -import bisq.network.p2p.NodeAddress; - -import java.net.MalformedURLException; -import java.net.URL; - -/** - * Helper for parsing and pretty printing onion addresses. - * - * @author Florian Reimair - */ -public class OnionParser { - - public static NodeAddress getNodeAddress(final String current) throws MalformedURLException { - String nodeAddress = current.trim(); - if (!nodeAddress.startsWith("http://")) - nodeAddress = "http://" + nodeAddress; - URL tmp = new URL(nodeAddress); - return new NodeAddress(tmp.getHost(), tmp.getPort() > 0 ? tmp.getPort() : 80); - } - - public static String prettyPrint(final NodeAddress host) { - return host.getHostNameWithoutPostFix(); - } - - public static String prettyPrint(String host) throws MalformedURLException { - return prettyPrint(getNodeAddress(host)); - } -} diff --git a/src/main/java/bisq/monitor/monitor/utils/Configurable.java b/src/main/java/bisq/monitor/monitor/utils/Configurable.java deleted file mode 100644 index dc0c771..0000000 --- a/src/main/java/bisq/monitor/monitor/utils/Configurable.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.monitor.monitor.utils; - -import java.util.Properties; - -public abstract class Configurable { - protected Properties configuration = new Properties(); - private String name = getClass().getSimpleName(); - - public void configure(final Properties properties) { - Properties myProperties = new Properties(); - properties.forEach((k, v) -> { - String key = (String) k; - if (key.startsWith(getName())) - myProperties.put(key.substring(key.indexOf(".") + 1), v); - }); - - this.configuration = myProperties; - } - - protected String getName() { - return name; - } -} diff --git a/src/main/java/bisq/monitor/monitor/utils/StatisticsHelper.java b/src/main/java/bisq/monitor/monitor/utils/StatisticsHelper.java deleted file mode 100644 index 36e5aaa..0000000 --- a/src/main/java/bisq/monitor/monitor/utils/StatisticsHelper.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.monitor.monitor.utils; - -import java.util.*; - -/** - * Calculates average, max, min, p25, p50, p75 off of a list of samples and - * throws in the sample size for good measure. - * - * @author Florian Reimair - */ -public class StatisticsHelper { - - public static Map process(Collection input) { - if (input.isEmpty()) { - return new HashMap<>(); - } - - List samples = new ArrayList<>(input); - - // aftermath - Collections.sort(samples); - - // - average, max, min , sample size - LongSummaryStatistics statistics = samples.stream().mapToLong(val -> val).summaryStatistics(); - - Map results = new HashMap<>(); - results.put("average", String.valueOf(Math.round(statistics.getAverage()))); - results.put("max", String.valueOf(statistics.getMax())); - results.put("min", String.valueOf(statistics.getMin())); - results.put("sampleSize", String.valueOf(statistics.getCount())); - - // - p25, median, p75 - Integer[] percentiles = new Integer[]{25, 50, 75}; - for (Integer percentile : percentiles) { - double rank = statistics.getCount() * percentile / 100.0; - Long percentileValue; - if (samples.size() <= rank + 1) - percentileValue = samples.get(samples.size() - 1); - else if (Math.floor(rank) == rank) - percentileValue = samples.get((int) rank); - else - percentileValue = Math.round(samples.get((int) Math.floor(rank)) - + (samples.get((int) (Math.floor(rank) + 1)) - samples.get((int) Math.floor(rank))) - / (rank - Math.floor(rank))); - results.put("p" + percentile, String.valueOf(percentileValue)); - } - - return results; - } -} diff --git a/src/main/java/bisq/monitor/monitor/utils/ThreadGate.java b/src/main/java/bisq/monitor/monitor/utils/ThreadGate.java deleted file mode 100644 index eaa76cc..0000000 --- a/src/main/java/bisq/monitor/monitor/utils/ThreadGate.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * This file is part of Bisq. - * - * Bisq is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bisq is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bisq. If not, see . - */ - -package bisq.monitor.monitor.utils; - -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * Gate pattern to help with thread synchronization - * - * @author Florian Reimair - */ -@Slf4j -public class ThreadGate { - - private CountDownLatch lock = new CountDownLatch(0); - - /** - * Make everyone wait until the gate is open again. - */ - public void engage() { - lock = new CountDownLatch(1); - } - - /** - * Make everyone wait until the gate is open again. - * - * @param numberOfLocks how often the gate has to be unlocked until the gate - * opens. - */ - public void engage(int numberOfLocks) { - lock = new CountDownLatch(numberOfLocks); - } - - /** - * Wait for the gate to be opened. Blocks until the gate is open again. Returns - * immediately if the gate is already open. - */ - public synchronized void await() { - while (lock.getCount() > 0) - try { - if (!lock.await(60, TimeUnit.SECONDS)) { - log.warn("timeout occurred!"); - break; // break the loop - } - } catch (InterruptedException ignore) { - } - } - - /** - * Open the gate and let everyone proceed with their execution. - */ - public void proceed() { - lock.countDown(); - } - - /** - * Open the gate with no regards on how many locks are still in place. - */ - public void unlock() { - while (lock.getCount() > 0) - lock.countDown(); - } -} diff --git a/src/main/java/bisq/monitor/reporter/BatchWriter.java b/src/main/java/bisq/monitor/reporter/BatchWriter.java index b9bb6b9..f2a5c9d 100644 --- a/src/main/java/bisq/monitor/reporter/BatchWriter.java +++ b/src/main/java/bisq/monitor/reporter/BatchWriter.java @@ -58,7 +58,7 @@ public BatchWriter(Properties properties) { } // https://graphite.readthedocs.io/en/latest/feeding-carbon.html - public CompletableFuture report(Collection metrics) { + public CompletableFuture report(Collection metrics) { return CompletableFuture.supplyAsync(() -> { try (Socket socket = new Socket(host, port); OutputStream outputStream = socket.getOutputStream()) { @@ -77,11 +77,11 @@ public CompletableFuture report(Collection metrics) { }); } - private static String serializeMetricItems(Collection metrics) { + private static String serializeMetricItems(Collection metrics) { StringBuilder pickled = new StringBuilder(metrics.size() * 75); pickled.append(MARK).append(LIST); - for (Metric tuple : metrics) { + for (Metrics tuple : metrics) { // begin outer tuple pickled.append(MARK); diff --git a/src/main/java/bisq/monitor/reporter/ConsoleReporter.java b/src/main/java/bisq/monitor/reporter/ConsoleReporter.java index 2ef15ac..9a7ba81 100644 --- a/src/main/java/bisq/monitor/reporter/ConsoleReporter.java +++ b/src/main/java/bisq/monitor/reporter/ConsoleReporter.java @@ -24,12 +24,12 @@ @Slf4j public class ConsoleReporter extends Reporter { @Override - public void report(Metric metric) { - System.out.println(metric.toString()); + public void report(Metrics metrics) { + System.out.println(metrics.toString()); } @Override - public void report(Set metrics) { + public void report(Set metrics) { metrics.forEach(this::report); } } diff --git a/src/main/java/bisq/monitor/reporter/GraphiteReporter.java b/src/main/java/bisq/monitor/reporter/GraphiteReporter.java index fa15893..1c32559 100644 --- a/src/main/java/bisq/monitor/reporter/GraphiteReporter.java +++ b/src/main/java/bisq/monitor/reporter/GraphiteReporter.java @@ -26,16 +26,11 @@ import java.util.Properties; import java.util.Set; -/** - * Reports our findings to a graphite service. - * - * @author Florian Reimair - */ @Slf4j public class GraphiteReporter extends Reporter { private final LineWriter lineWriter; private final BatchWriter batchWriter; - private final Set pending = new ConcurrentHashSet<>(); + private final Set pending = new ConcurrentHashSet<>(); private final int delayForBatchingSec; private final int minItemsForBatching; private Timer timer; @@ -48,8 +43,9 @@ public GraphiteReporter(Properties properties) { minItemsForBatching = Integer.parseInt(properties.getProperty("GraphiteReporter.minItemsForBatching", "5")); } - public void report(Metric metric) { - pending.add(metric); + public void report(Metrics metrics) { + System.out.println(metrics.toString()); + pending.add(metrics); if (timer == null) { // We wait a bit if more items arrive, so we can batch them @@ -58,14 +54,15 @@ public void report(Metric metric) { } @Override - public void report(Set metrics) { + public void report(Set metrics) { + metrics.forEach(metric -> System.out.println(metric.toString())); pending.addAll(metrics); sendPending(); } private void sendPending() { - Set clone = new HashSet<>(pending); + Set clone = new HashSet<>(pending); pending.clear(); if (clone.size() >= minItemsForBatching) { batchWriter.report(clone); diff --git a/src/main/java/bisq/monitor/reporter/LineWriter.java b/src/main/java/bisq/monitor/reporter/LineWriter.java index 4f509a5..c45c3ac 100644 --- a/src/main/java/bisq/monitor/reporter/LineWriter.java +++ b/src/main/java/bisq/monitor/reporter/LineWriter.java @@ -40,9 +40,9 @@ public LineWriter(Properties properties) { port = Integer.parseInt(tokens[1]); } - public CompletableFuture report(Metric metric) { + public CompletableFuture report(Metrics metrics) { // trailing line break is needed - String payload = metric.getPath() + " " + metric.getValue() + " " + metric.getTimeStampInSec() + "\n"; + String payload = metrics.getPath() + " " + metrics.getValue() + " " + metrics.getTimeStampInSec() + "\n"; return CompletableFuture.supplyAsync(() -> { try (Socket socket = new Socket(host, port)) { socket.getOutputStream().write(payload.getBytes(Charsets.UTF_8)); diff --git a/src/main/java/bisq/monitor/reporter/Metric.java b/src/main/java/bisq/monitor/reporter/Metrics.java similarity index 72% rename from src/main/java/bisq/monitor/reporter/Metric.java rename to src/main/java/bisq/monitor/reporter/Metrics.java index 33e13de..1ea876a 100644 --- a/src/main/java/bisq/monitor/reporter/Metric.java +++ b/src/main/java/bisq/monitor/reporter/Metrics.java @@ -20,48 +20,52 @@ import lombok.Value; @Value -public class Metric { +public class Metrics { public static final String ROOT = "bisq_v2"; String path; String value; long timeStampInSec; - public Metric(String path, String value, long timeStampInSec) { + public Metrics(String path, String value, long timeStampInSec) { this.path = ROOT + "." + path; this.value = value; this.timeStampInSec = timeStampInSec; } - public Metric(String path, String value) { + public Metrics(String path, String value) { this(path, value, System.currentTimeMillis() / 1000); } - public Metric(String prefix, String key, String value) { + public Metrics(String prefix, String key, String value) { this(prefix.isEmpty() ? key : prefix + "." + key, value); } - public Metric(String prefix, String key, String value, long timeStampInSec) { + public Metrics(String prefix, String key, String value, long timeStampInSec) { this(prefix.isEmpty() ? key : prefix + "." + key, value, timeStampInSec); } - public Metric(String path, int value, long timeStampInSec) { + public Metrics(String path, int value, long timeStampInSec) { this(path, String.valueOf(value), timeStampInSec); } - public Metric(String path, int value) { + public Metrics(String path, int value) { this(path, String.valueOf(value)); } - public Metric(String path, long value, long timeStampInSec) { + public Metrics(String path, long value, long timeStampInSec) { this(path, String.valueOf(value), timeStampInSec); } - public Metric(String path, double value) { + public Metrics(String path, long value) { this(path, String.valueOf(value)); } - public Metric(String path, double value, long timeStampInSec) { + public Metrics(String path, double value) { + this(path, String.valueOf(value)); + } + + public Metrics(String path, double value, long timeStampInSec) { this(path, String.valueOf(value), timeStampInSec); } diff --git a/src/main/java/bisq/monitor/reporter/Reporter.java b/src/main/java/bisq/monitor/reporter/Reporter.java index 97e6382..df361d0 100644 --- a/src/main/java/bisq/monitor/reporter/Reporter.java +++ b/src/main/java/bisq/monitor/reporter/Reporter.java @@ -17,32 +17,24 @@ package bisq.monitor.reporter; -import bisq.monitor.monitor.utils.Configurable; - import java.util.Map; import java.util.Set; -/** - * Reports findings to a specific service/file/place using the proper means to - * do so. - * - * @author Florian Reimair - */ -public abstract class Reporter extends Configurable { +public abstract class Reporter { protected Reporter() { } - abstract public void report(Metric metric); + abstract public void report(Metrics metrics); - abstract public void report(Set metrics); + abstract public void report(Set metrics); public void shutDown() { } public void report(Map map, String prefix) { map.entrySet().stream() - .map(entry -> new Metric(prefix, entry.getKey(), entry.getValue())) + .map(entry -> new Metrics(prefix, entry.getKey(), entry.getValue())) .forEach(this::report); } } diff --git a/src/main/java/bisq/monitor/server/RequestHandler.java b/src/main/java/bisq/monitor/server/RequestHandler.java index 280270e..0022d3e 100644 --- a/src/main/java/bisq/monitor/server/RequestHandler.java +++ b/src/main/java/bisq/monitor/server/RequestHandler.java @@ -21,6 +21,7 @@ import bisq.core.monitor.ReportingItems; import bisq.monitor.reporter.Reporter; import bisq.monitor.server.handlers.*; +import bisq.monitor.utils.Util; import lombok.extern.slf4j.Slf4j; import spark.Request; import spark.Response; @@ -46,9 +47,6 @@ public RequestHandler(Properties properties, Reporter reporter) { reportingHandlers.add(new NetworkLoadHandler(reporter, seedNodeOperatorByAddress)); } - public void shutdown() { - } - public String onRequest(Request request, Response response) { String hex = request.body(); checkArgument(hex != null && !hex.trim().isEmpty()); diff --git a/src/main/java/bisq/monitor/server/Server.java b/src/main/java/bisq/monitor/server/Server.java index 92d64cc..6d87adc 100644 --- a/src/main/java/bisq/monitor/server/Server.java +++ b/src/main/java/bisq/monitor/server/Server.java @@ -17,31 +17,20 @@ package bisq.monitor.server; -import bisq.monitor.reporter.Reporter; import lombok.extern.slf4j.Slf4j; import spark.Spark; -import java.util.Properties; import java.util.concurrent.CompletableFuture; @Slf4j public class Server { - private final RequestHandler requestHandler; - public Server(Properties properties, Reporter reporter) { - requestHandler = new RequestHandler(properties, reporter); + public Server() { + } + + public void start(int port, RequestHandler requestHandler) { try { - int port = Integer.parseInt(properties.getProperty("Server.port")); Spark.port(port); - - boolean useTLS = "true".equals(properties.getProperty("Server.useTLS")); - if (useTLS) { - String keystoreFile = properties.getProperty("Server.keystoreFile"); - String keystorePassword = properties.getProperty("Server.keystorePassword"); - String truststoreFile = properties.getProperty("Server.truststoreFile"); - String truststorePassword = properties.getProperty("Server.truststorePassword"); - Spark.secure(keystoreFile, keystorePassword, truststoreFile, truststorePassword); - } Spark.post("/", requestHandler::onRequest); log.info("Server setup for listening on port {}", port); } catch (Throwable t) { @@ -51,9 +40,6 @@ public Server(Properties properties, Reporter reporter) { } public CompletableFuture shutDown() { - return CompletableFuture.runAsync(() -> { - Spark.stop(); - requestHandler.shutdown(); - }); + return CompletableFuture.runAsync(Spark::stop); } } diff --git a/src/main/java/bisq/monitor/server/ServerMain.java b/src/main/java/bisq/monitor/server/ServerMain.java index d31c21f..d2c1783 100644 --- a/src/main/java/bisq/monitor/server/ServerMain.java +++ b/src/main/java/bisq/monitor/server/ServerMain.java @@ -18,28 +18,23 @@ package bisq.monitor.server; -import bisq.common.UserThread; import bisq.common.app.Log; import bisq.common.util.Utilities; -import bisq.monitor.PropertiesUtil; import bisq.monitor.reporter.ConsoleReporter; import bisq.monitor.reporter.GraphiteReporter; import bisq.monitor.reporter.Reporter; +import bisq.monitor.utils.PropertiesUtil; import ch.qos.logback.classic.Level; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import sun.misc.Signal; import java.io.File; import java.nio.file.Paths; import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; @Slf4j public class ServerMain { - private static boolean stopped; + private static volatile boolean stopped; private static Server server; /** @@ -53,35 +48,33 @@ public static void main(String[] args) { properties = PropertiesUtil.getProperties(args[0].replace("--config=", "")); } - String appName = properties.getProperty("Server.appDir"); - File appDir = new File(Utilities.getUserDataDir(), appName); - if (!appDir.exists() && !appDir.mkdir()) { - log.warn("make appDir failed"); - } - setup(appDir); + setup(properties); Reporter reporter = "true".equals(properties.getProperty("GraphiteReporter.enabled", "false")) ? new GraphiteReporter(properties) : new ConsoleReporter(); - - CompletableFuture.runAsync(() -> server = new Server(properties, reporter), - Utilities.getSingleThreadExecutor("Server")); + RequestHandler requestHandler = new RequestHandler(properties, reporter); + int port = Integer.parseInt(properties.getProperty("Server.port", "13003")); + server = new Server(); + server.start(port, requestHandler); keepRunning(); } - public static void setup(File appDir) { + public static void setup(Properties properties) { + Thread.currentThread().setName("Server"); + + String appName = properties.getProperty("Server.appDir", "bisq-monitor-server"); + File appDir = new File(Utilities.getUserDataDir(), appName); + if (!appDir.exists() && !appDir.mkdir()) { + log.warn("make appDir failed"); + } + String logPath = Paths.get(appDir.getPath(), "bisq").toString(); Log.setup(logPath); Log.setLevel(Level.INFO); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setNameFormat(ServerMain.class.getSimpleName()) - .setDaemon(true) - .build(); - UserThread.setExecutor(Executors.newSingleThreadExecutor(threadFactory)); - - Signal.handle(new Signal("INT"), signal -> UserThread.execute(ServerMain::shutDown)); - Signal.handle(new Signal("TERM"), signal -> UserThread.execute(ServerMain::shutDown)); + Signal.handle(new Signal("INT"), signal -> ServerMain.shutDown()); + Signal.handle(new Signal("TERM"), signal -> ServerMain.shutDown()); Runtime.getRuntime().addShutdownHook(new Thread(ServerMain::shutDown, "Shutdown Hook")); } diff --git a/src/main/java/bisq/monitor/server/handlers/DaoStateHandler.java b/src/main/java/bisq/monitor/server/handlers/DaoStateHandler.java index c94af3e..c678572 100644 --- a/src/main/java/bisq/monitor/server/handlers/DaoStateHandler.java +++ b/src/main/java/bisq/monitor/server/handlers/DaoStateHandler.java @@ -19,9 +19,9 @@ import bisq.common.util.Tuple2; import bisq.core.monitor.ReportingItems; -import bisq.monitor.reporter.Metric; +import bisq.monitor.reporter.Metrics; import bisq.monitor.reporter.Reporter; -import bisq.monitor.server.Util; +import bisq.monitor.utils.Util; import lombok.extern.slf4j.Slf4j; import java.util.*; @@ -30,7 +30,7 @@ @Slf4j public class DaoStateHandler extends ReportingHandler { - private final Map, Map>>> map = new ConcurrentHashMap<>(); + private final Map, Map>>> map = new ConcurrentHashMap<>(); public DaoStateHandler(Reporter reporter, Map seedNodeOperatorByAddress) { super(reporter, seedNodeOperatorByAddress); @@ -55,14 +55,14 @@ public void report(ReportingItems reportingItems) { String blindVoteHash = Util.findStringValue(reportingItems, "dao.blindVoteHash").orElseThrow(); fillHashValue(map, nodeId, height, blockTimeIsSec, "blindVoteHash", blindVoteHash); - Set metrics = getMetricItems(map); - metrics.add(new Metric("dao.height." + nodeId, height, blockTimeIsSec)); + Set metrics = getMetricItems(map); + metrics.add(new Metrics("dao.height." + nodeId, height, blockTimeIsSec)); metrics.forEach(this::sendReport); } catch (Throwable ignore) { } } - private static void fillHashValue(Map, Map>>> map, + private static void fillHashValue(Map, Map>>> map, String nodeId, int height, int blockTimeIsSec, @@ -70,20 +70,20 @@ private static void fillHashValue(Map, Map blockHeightTuple = new Tuple2<>(height, blockTimeIsSec); map.putIfAbsent(blockHeightTuple, new HashMap<>()); - Map>> mapByHashType = map.get(blockHeightTuple); + Map>> mapByHashType = map.get(blockHeightTuple); mapByHashType.putIfAbsent(hashType, new HashMap<>()); - Map> setByHashValue = mapByHashType.get(hashType); + Map> setByHashValue = mapByHashType.get(hashType); setByHashValue.putIfAbsent(hashValue, new HashMap<>()); - Map metricItemByNodeId = setByHashValue.get(hashValue); + Map metricItemByNodeId = setByHashValue.get(hashValue); metricItemByNodeId.putIfAbsent(nodeId, null); } - private static Set getMetricItems(Map, Map>>> map) { - Set metrics = new HashSet<>(); + private static Set getMetricItems(Map, Map>>> map) { + Set metrics = new HashSet<>(); map.forEach((blockHeightTuple, mapByHashType) -> { int blockTimeIsSec = blockHeightTuple.second; mapByHashType.forEach((hashType, byHashValue) -> { - Comparator>> entryComparator = Comparator.comparing(e -> e.getValue().size()); + Comparator>> entryComparator = Comparator.comparing(e -> e.getValue().size()); List rankedHashBuckets = byHashValue.entrySet().stream() .sorted(entryComparator.reversed()) .map(Map.Entry::getKey) @@ -103,9 +103,9 @@ private static Set getMetricItems(Map, Map updated = metricItemByNodeId.entrySet().stream() + Map updated = metricItemByNodeId.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, - entry -> new Metric("dao." + hashType + "." + entry.getKey(), finalIndex, blockTimeIsSec))); + entry -> new Metrics("dao." + hashType + "." + entry.getKey(), finalIndex, blockTimeIsSec))); metricItemByNodeId.putAll(updated); metrics.addAll(updated.values()); }); @@ -114,7 +114,7 @@ private static Set getMetricItems(Map, Map, Map>>> map, int height) { + private void pruneMap(Map, Map>>> map, int height) { int minHeight = height - 2; var pruned = map.entrySet().stream() .filter(e -> e.getKey().first > minHeight) diff --git a/src/main/java/bisq/monitor/server/handlers/NodeLoadHandler.java b/src/main/java/bisq/monitor/server/handlers/NodeLoadHandler.java index fa95699..dda542b 100644 --- a/src/main/java/bisq/monitor/server/handlers/NodeLoadHandler.java +++ b/src/main/java/bisq/monitor/server/handlers/NodeLoadHandler.java @@ -19,9 +19,9 @@ import bisq.common.util.Hex; import bisq.core.monitor.ReportingItems; -import bisq.monitor.reporter.Metric; +import bisq.monitor.reporter.Metrics; import bisq.monitor.reporter.Reporter; -import bisq.monitor.server.Util; +import bisq.monitor.utils.Util; import lombok.extern.slf4j.Slf4j; import java.math.BigInteger; @@ -41,20 +41,20 @@ public void report(ReportingItems reportingItems) { Util.findIntegerValue(reportingItems, "node.jvmStartTimeInSec") .ifPresent(jvmStartTime -> { long running = System.currentTimeMillis() / 1000 - jvmStartTime; - sendReport(new Metric("node.jvmRunningInSec." + nodeId, running)); + sendReport(new Metrics("node.jvmRunningInSec." + nodeId, running)); }); Util.findStringValue(reportingItems, "node.version").ifPresent(version -> { try { int versionAsInt = Integer.parseInt(version.replace(".", "")); - sendReport(new Metric("node.versionAsInt." + nodeId, versionAsInt)); + sendReport(new Metrics("node.versionAsInt." + nodeId, versionAsInt)); } catch (Throwable ignore) { } }); Util.findStringValue(reportingItems, "node.commitHash").ifPresent(commitHash -> { try { int commitHashAsInt = new BigInteger(Hex.decode(commitHash)).intValue(); - sendReport(new Metric("node.commitHashAsInt." + nodeId, commitHashAsInt)); + sendReport(new Metrics("node.commitHashAsInt." + nodeId, commitHashAsInt)); } catch (Throwable ignore) { } }); diff --git a/src/main/java/bisq/monitor/server/handlers/ReportingHandler.java b/src/main/java/bisq/monitor/server/handlers/ReportingHandler.java index 10d365f..cc9dcb8 100644 --- a/src/main/java/bisq/monitor/server/handlers/ReportingHandler.java +++ b/src/main/java/bisq/monitor/server/handlers/ReportingHandler.java @@ -21,9 +21,9 @@ import bisq.core.monitor.IntegerValueItem; import bisq.core.monitor.ReportingItems; import bisq.core.monitor.StringValueItem; -import bisq.monitor.reporter.Metric; +import bisq.monitor.reporter.Metrics; import bisq.monitor.reporter.Reporter; -import bisq.monitor.server.Util; +import bisq.monitor.utils.Util; import java.util.HashSet; import java.util.Map; @@ -32,7 +32,7 @@ public abstract class ReportingHandler { protected final Reporter reporter; protected final Map seedNodeOperatorByAddress; - private final Set sentReports = new HashSet<>(); + private final Set sentReports = new HashSet<>(); public ReportingHandler(Reporter reporter, Map seedNodeOperatorByAddress) { this.reporter = reporter; @@ -51,14 +51,14 @@ public void report(ReportingItems reportingItems, String group, Set excl .filter(item -> item.getGroup().equals(group)) .filter(item -> !excludedKeys.contains(item.getKey())) .map(item -> { - String path = item.getPath() + "." + nodeId; + String path = "seedReport." + item.getPath() + "." + nodeId; if (item instanceof IntegerValueItem) { - return new Metric(path, ((IntegerValueItem) item).getValue()); + return new Metrics(path, ((IntegerValueItem) item).getValue()); } else if (item instanceof DoubleValueItem) { - return new Metric(path, ((DoubleValueItem) item).getValue()); + return new Metrics(path, ((DoubleValueItem) item).getValue()); } if (item instanceof StringValueItem) { - return new Metric(path, ((StringValueItem) item).getValue()); + return new Metrics(path, ((StringValueItem) item).getValue()); } else { return null; } @@ -66,14 +66,14 @@ public void report(ReportingItems reportingItems, String group, Set excl .forEach(this::sendReport); } - protected void sendReport(Metric reportItem) { + protected void sendReport(Metrics reportItem) { if (notYetSent(reportItem)) { sentReports.add(reportItem); reporter.report(reportItem); } } - private boolean notYetSent(Metric reportItem) { + private boolean notYetSent(Metrics reportItem) { return !sentReports.contains(reportItem); } } diff --git a/src/main/java/bisq/monitor/PropertiesUtil.java b/src/main/java/bisq/monitor/utils/PropertiesUtil.java similarity index 98% rename from src/main/java/bisq/monitor/PropertiesUtil.java rename to src/main/java/bisq/monitor/utils/PropertiesUtil.java index c5f3db8..88347a1 100644 --- a/src/main/java/bisq/monitor/PropertiesUtil.java +++ b/src/main/java/bisq/monitor/utils/PropertiesUtil.java @@ -15,7 +15,7 @@ * along with Bisq. If not, see . */ -package bisq.monitor; +package bisq.monitor.utils; import lombok.extern.slf4j.Slf4j; diff --git a/src/main/java/bisq/monitor/server/Util.java b/src/main/java/bisq/monitor/utils/Util.java similarity index 99% rename from src/main/java/bisq/monitor/server/Util.java rename to src/main/java/bisq/monitor/utils/Util.java index 5fe34ed..08a1d4f 100644 --- a/src/main/java/bisq/monitor/server/Util.java +++ b/src/main/java/bisq/monitor/utils/Util.java @@ -1,4 +1,4 @@ -package bisq.monitor.server;/* +package bisq.monitor.utils;/* * This file is part of Bisq. * * Bisq is free software: you can redistribute it and/or modify it diff --git a/src/main/resources/example_monitor.properties b/src/main/resources/example_monitor.properties index 83c59bd..1d44c4b 100644 --- a/src/main/resources/example_monitor.properties +++ b/src/main/resources/example_monitor.properties @@ -7,7 +7,8 @@ appDir=bisq-monitor # Using enums from bisq.common.config.BaseCurrencyNetwork (BTC_MAINNET, BTC_REGTEST,...) baseCurrencyNetwork=BTC_MAINNET -useTor=false +useTor=true +useLocalhost=false ### GraphiteReporter @@ -15,7 +16,7 @@ GraphiteReporter.enabled=true GraphiteReporter.serviceUrl=127.0.0.1:2003 GraphiteReporter.picklePort=2004 GraphiteReporter.delayForBatchingSec=1 -GraphiteReporter.minItemsForBatching=50000 +GraphiteReporter.minItemsForBatching=5000 ############################################################################## @@ -24,13 +25,7 @@ GraphiteReporter.minItemsForBatching=50000 ### Listening for clear-net reporting from seed nodes Server.appDir=bisq-monitor-server -Server.port=8082 -Server.useTLS=false -Server.keystoreFile=/Users/user/Documents/cert/server/keystore.jks -Server.keystorePassword=/password -Server.truststoreFile=/Users/user/Documents/cert/server/cacerts.jks -Server.truststorePassword=password - +Server.port=13003 ############################################################################## ### Dump Bisq network data to Grafana @@ -44,8 +39,8 @@ Server.truststorePassword=password ### Seed nodes ############################################################################## -SeedNodeRoundTripTime.enabled=false -SeedNodeRoundTripTime.interval=300 +SeedNodeRoundTripTime.enabled=true +SeedNodeRoundTripTime.interval=10 SeedNodeRoundTripTime.hosts= @@ -53,53 +48,30 @@ SeedNodeRoundTripTime.hosts= ### Tor network ############################################################################## -TorStartupTime.enabled=false -TorStartupTime.interval=300 +TorStartupTime.enabled=true +TorStartupTime.interval=10 TorStartupTime.socksPort=90500 -TorConnectionTime.enabled=false -TorConnectionTime.interval=300 -TorConnectionTime.hosts=duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad.onion:80,http://xmh57jrknzkhv6y3ls3ubitzfqnkrwxhopf5aygthi7d6rplyvk3noyd.onion:80,http://p53lf57qovyuvwsc6xnrppyply3vtqm7l6pcobkmyqsiofyeznfu5uqd.onion:80,http://archiveiya74codqgiixo33q62qlrqtkgmcitqx5u2oeqnmn5bpcbiyd.onion +TorConnectionTime.enabled=true +TorConnectionTime.interval=10 +TorConnectionTime.hosts=duckduckgogg42xjoc72x3sjasowoarfbgcmvfimaftt6twagswzczad.onion:80,p53lf57qovyuvwsc6xnrppyply3vtqm7l6pcobkmyqsiofyeznfu5uqd.onion:80,archiveiya74codqgiixo33q62qlrqtkgmcitqx5u2oeqnmn5bpcbiyd.onion:80 -TorHiddenServiceStartupTime.enabled=false -TorHiddenServiceStartupTime.interval=300 +TorHiddenServiceStartupTime.enabled=true +TorHiddenServiceStartupTime.interval=10 TorHiddenServiceStartupTime.localPort=1234 TorHiddenServiceStartupTime.servicePort=1235 -############################################################################## -### Bisq markets -############################################################################## - -### MarketStats Metric -MarketStats.enabled=false -MarketStats.interval=600 -MarketStats.url=https://bisq.markets - - ############################################################################## ### Price nodes ############################################################################## -PriceNodeData.enabled=false -PriceNodeData.interval=300 +PriceNodeData.enabled=true +PriceNodeData.interval=10 # clearnet or tor addresses of price nodes. If empty the default nodes are used (incl. one clear-net) PriceNodeData.hosts= # comma separated list of currency codes to exclude from reporting PriceNodeData.excluded= -############################################################################## -### Bisq network -############################################################################## - -P2PMarketStats.enabled=false -P2PMarketStats.interval=300 -P2PMarketStats.hosts=jmacxxto7g7welbgcjwfpquzqaehmvp5bf6mqlz5nciho2sc6v7hbyid.onion:8000 -P2PMarketStats.run.torProxyPort=9063 - -### P2PNetworkLoad Metric -P2PNetworkLoad.enabled=300 -P2PNetworkLoad.interval=211 -P2PNetworkLoad.hosts=jmacxxto7g7welbgcjwfpquzqaehmvp5bf6mqlz5nciho2sc6v7hbyid.onion:8000,devinsn3xuzxhj6pmammrxpydhwwmwp75qkksedo5dn2tlmu7jggo7id.onion:8000