Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Rewrite monitor code
Browse files Browse the repository at this point in the history
HenrikJannsen committed Dec 3, 2022

Verified

This commit was signed with the committer’s verified signature.
dtolnay David Tolnay
1 parent c800ce0 commit bea06d9
Showing 44 changed files with 849 additions and 1,720 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ monitor/work/*
/bundles
/bisq-*
/lib
desktop.ini
*/target/*
*.class
deploy
@@ -33,3 +32,5 @@ deploy
/.run
monitor.properties

/bisq-gradle/out/*

2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ plugins {
id 'application'
alias(libs.plugins.openjfx)
id 'bisq.post-build'
id 'com.github.johnrengelman.shadow' version '7.1.2'
}

repositories {
@@ -53,6 +54,7 @@ dependencies {
annotationProcessor libs.lombok
implementation libs.slf4j.api

implementation "com.google.protobuf:protobuf-java:3.19.1"
implementation libs.spark.core
implementation "com.google.code.gson:gson:2.8.5"
implementation("com.github.bisq-network:bitcoinj:42bbae9") {
1 change: 1 addition & 0 deletions src/main/java/bisq/monitor/AppChooserMain.java
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import bisq.monitor.dump.DataDumpMain;
import bisq.monitor.monitor.MonitorMain;
import bisq.monitor.server.ServerMain;
import bisq.monitor.utils.PropertiesUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
import bisq.core.util.FormattingUtils;
import bisq.core.util.VolumeUtil;
import bisq.monitor.dump.ReporterProvider;
import bisq.monitor.reporter.Metric;
import bisq.monitor.reporter.Metrics;
import bisq.monitor.reporter.Reporter;
import bisq.network.p2p.storage.P2PDataStorage;
import javafx.collections.SetChangeListener;
@@ -68,35 +68,35 @@ public void onAllServicesInitialized() {
tradeStatisticsManager.getObservableTradeStatisticsSet().addListener((SetChangeListener<TradeStatistics3>) change -> {
TradeStatistics3 newItem = change.getElementAdded();
if (isNotProcessed(newItem)) {
Set<Metric> reportItems = toMetrics(newItem);
Set<Metrics> reportItems = toMetrics(newItem);
sendReports(reportItems);
}
});
}

private Set<Metric> toMetrics(TradeStatistics3 tradeStatistics) {
private Set<Metrics> toMetrics(TradeStatistics3 tradeStatistics) {
alreadyProcessed.add(new P2PDataStorage.ByteArray(tradeStatistics.getHash()));
Set<Metric> metrics = new HashSet<>();
Set<Metrics> metrics = new HashSet<>();

long timeStampInSec = tradeStatistics.getDateAsLong() / 1000;
String market = CurrencyUtil.getCurrencyPair(tradeStatistics.getCurrency()).replace("/", "_");

String path = PREFIX + "." + "price" + "." + market;
String value = FormattingUtils.formatPrice(tradeStatistics.getTradePrice());
metrics.add(new Metric(path, value, timeStampInSec));
metrics.add(new Metrics(path, value, timeStampInSec));

path = PREFIX + "." + "amount" + "." + market;
value = new MainNetParams().getMonetaryFormat().noCode().format(tradeStatistics.getTradeAmount()).toString();
metrics.add(new Metric(path, value, timeStampInSec));
metrics.add(new Metrics(path, value, timeStampInSec));

path = PREFIX + "." + "volume" + "." + market;
value = VolumeUtil.formatVolume(tradeStatistics.getTradeVolume());
metrics.add(new Metric(path, value, timeStampInSec));
metrics.add(new Metrics(path, value, timeStampInSec));

return metrics;
}

private void sendReports(Set<Metric> reportItems) {
private void sendReports(Set<Metrics> reportItems) {
log.error(reportItems.toString());
reportItems.forEach(reporter::report);
}
57 changes: 57 additions & 0 deletions src/main/java/bisq/monitor/monitor/CompletableFutureUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.monitor.monitor;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;

//todo
public class CompletableFutureUtil {
public static <T> CompletableFuture<List<T>> allOf(Collection<CompletableFuture<T>> collection) {
//noinspection unchecked
return allOf(collection.toArray(new CompletableFuture[0]));
}

public static <T> CompletableFuture<List<T>> allOf(Stream<CompletableFuture<T>> stream) {
return allOf(stream.collect(Collectors.toList()));
}

public static <T> CompletableFuture<List<T>> allOf(CompletableFuture<T>... list) {
CompletableFuture<List<T>> result = CompletableFuture.allOf(list).thenApply(v ->
Stream.of(list)
.map(future -> {
// We want to return the results in list, not the futures. Once allOf call is complete
// we know that all futures have completed (normally, exceptional or cancelled).
// For exceptional and canceled cases we throw an exception.
T res = future.join();
if (future.isCompletedExceptionally()) {
throw new RuntimeException((future.handle((r, throwable) -> throwable).join()));
}
if (future.isCancelled()) {
throw new RuntimeException("Future got canceled");
}
return res;
})
.collect(Collectors.<T>toList())
);
return result;
}
}
45 changes: 21 additions & 24 deletions src/main/java/bisq/monitor/monitor/Monitor.java
Original file line number Diff line number Diff line change
@@ -17,40 +17,37 @@

package bisq.monitor.monitor;

import bisq.monitor.monitor.tasks.tornetwork.TorStartupTime;
import bisq.monitor.monitor.tasks.TorConnectionTime;
import bisq.monitor.reporter.Reporter;
import lombok.extern.slf4j.Slf4j;
import org.berndpruenster.netlayer.tor.Tor;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

/**
* Monitor executable for the Bisq network.
*
* @author Florian Reimair
*/
@Slf4j
public class Monitor {
private final MonitorTaskRunner monitorTaskRunner;

public Monitor(File appDir, Properties properties, Reporter reporter) {
List<MonitorTask> monitorTasks = new ArrayList<>();
monitorTasks.add(new TorStartupTime(properties, appDir, reporter));
monitorTasks.forEach(MonitorTask::init);
}
public Monitor(Properties properties, Reporter reporter, File appDir) {
monitorTaskRunner = new MonitorTaskRunner();

public CompletableFuture<Void> shutDown() {
return CompletableFuture.runAsync(() -> {
MonitorTask.haltAllMetrics();
monitorTaskRunner.add(new TorConnectionTime(properties, reporter, appDir, false));
monitorTaskRunner.add(new TorConnectionTime(properties, reporter, appDir, true));

/* monitorTaskRunner.add(new TorStartupTime(properties, reporter, appDir));
monitorTaskRunner.add(new TorHiddenServiceStartupTime(properties, reporter));
monitorTaskRunner.add(new PriceNodeData(properties, reporter));
monitorTaskRunner.add(new TorConnectionTime(properties, reporter, appDir, false));
monitorTaskRunner.add(new SeedNodeRoundTripTime(properties, reporter, false));
// We use restart tor to get for the next SeedNodeRoundTripTime again new connections with a new tor instance
monitorTaskRunner.add(new TorConnectionTime(properties, reporter, appDir, true));
monitorTaskRunner.add(new SeedNodeRoundTripTime(properties, reporter, true));*/

monitorTaskRunner.start();
}

log.info("shutting down tor...");
Tor tor = Tor.getDefault();
if (tor != null) {
tor.shutdown();
}
});
public void shutDown() {
monitorTaskRunner.shutDown();
}
}
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.monitor.monitor.utils;
package bisq.monitor.monitor;

import com.runjva.sourceforge.jsocks.protocol.Socks5Proxy;
import com.runjva.sourceforge.jsocks.protocol.SocksException;
101 changes: 48 additions & 53 deletions src/main/java/bisq/monitor/monitor/MonitorMain.java
Original file line number Diff line number Diff line change
@@ -20,16 +20,16 @@

import bisq.common.UserThread;
import bisq.common.app.Log;
import bisq.common.app.Version;
import bisq.common.config.BaseCurrencyNetwork;
import bisq.common.config.Config;
import bisq.common.util.Utilities;
import bisq.core.locale.Res;
import bisq.core.setup.CoreNetworkCapabilities;
import bisq.monitor.PropertiesUtil;
import bisq.monitor.monitor.tor.AvailableTor;
import bisq.monitor.monitor.tor.TorNode;
import bisq.monitor.reporter.ConsoleReporter;
import bisq.monitor.reporter.GraphiteReporter;
import bisq.monitor.reporter.Reporter;
import bisq.monitor.server.Server;
import bisq.monitor.utils.PropertiesUtil;
import ch.qos.logback.classic.Level;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
@@ -46,13 +46,10 @@
@Slf4j
public class MonitorMain {
private static boolean stopped;
private static Monitor monitor;
@Nullable
private static Server server;
@Nullable
private static TorNode torNode;
@Nullable
private static TorBasedMonitor torBasedMonitor;
private static Monitor monitor;
private static File appDir;

/**
* @param args Can be empty or is property file path
@@ -65,90 +62,88 @@ public static void main(String[] args) {
properties = PropertiesUtil.getProperties(args[0].replace("--config=", ""));
}

String appName = properties.getProperty("appDir");
File appDir = new File(Utilities.getUserDataDir(), appName);
if (!appDir.exists() && !appDir.mkdir()) {
log.warn("make appDir failed");
}
setup(appDir);

CoreNetworkCapabilities.setSupportedCapabilities(new Config());
setup(properties);

Reporter reporter = "true".equals(properties.getProperty("GraphiteReporter.enabled", "false")) ?
new GraphiteReporter(properties) : new ConsoleReporter();
boolean useTor = properties.getProperty("useTor").equals("true");
boolean useServer = properties.getProperty("useServer").equals("true");

boolean useLocalhost = "true".equals(properties.getProperty("useLocalhost"));
boolean useTor = "true".equals(properties.getProperty("useTor"));
CompletableFuture.runAsync(() -> {
monitor = new Monitor(appDir, properties, reporter);
}, Utilities.getSingleThreadExecutor("Monitor"))
.thenRunAsync(() -> {
if (useServer) {
server = new Server(properties, reporter);
}
}, Utilities.getSingleThreadExecutor("Server"))
.thenRunAsync(() -> {
if (useTor) {
AvailableTor.setAppDir(appDir);
torNode = new TorNode(appDir);
}
}, Utilities.getSingleThreadExecutor("TorNode"))
.thenRunAsync(() -> {
if (useTor) {
torBasedMonitor = new TorBasedMonitor(properties, reporter);
}
}, Utilities.getSingleThreadExecutor("TorBasedMonitor"));
if (useTor && !useLocalhost) {
long ts = System.currentTimeMillis();
torNode = new TorNode(appDir);
log.info("Starting tor took {} ms", System.currentTimeMillis() - ts);
}
monitor = new Monitor(properties, reporter, appDir);
}, Utilities.getSingleThreadExecutor("Monitor"));

keepRunning();
}

public static void setup(File appDir) {
public static void setup(Properties properties) {
String appName = properties.getProperty("appDir");
appDir = new File(Utilities.getUserDataDir(), appName);
if (!appDir.exists() && !appDir.mkdir()) {
log.warn("make appDir failed");
}

String logPath = Paths.get(appDir.getPath(), "bisq").toString();
Log.setup(logPath);
Log.setLevel(Level.INFO);

Config config = new Config();
CoreNetworkCapabilities.setSupportedCapabilities(config);
BaseCurrencyNetwork baseCurrencyNetwork = BaseCurrencyNetwork.valueOf(properties.getProperty("baseCurrencyNetwork", "BTC_REGTEST"));
Version.setBaseCryptoNetworkId(baseCurrencyNetwork.ordinal());
Res.setup();

ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(MonitorMain.class.getSimpleName())
.setDaemon(true)
.build();
UserThread.setExecutor(Executors.newSingleThreadExecutor(threadFactory));

Signal.handle(new Signal("INT"), signal -> UserThread.execute(MonitorMain::shutDown));
Signal.handle(new Signal("TERM"), signal -> UserThread.execute(MonitorMain::shutDown));
Signal.handle(new Signal("INT"), signal -> {
UserThread.execute(MonitorMain::shutDown);
});

Signal.handle(new Signal("TERM"), signal -> {
UserThread.execute(MonitorMain::shutDown);
});

Runtime.getRuntime().addShutdownHook(new Thread(MonitorMain::shutDown, "Shutdown Hook"));
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
UserThread.execute(MonitorMain::shutDown);
}, "Shutdown Hook"));
}

public static void shutDown() {
stopped = true;
if (monitor == null) {
System.exit(0);
return;
}

monitor.shutDown()
CompletableFuture.runAsync(() -> {
})
.thenRun(() -> {
if (server != null) {
server.shutDown();
}
}).thenRun(() -> {
if (torBasedMonitor != null) {
torBasedMonitor.shutDown();
if (monitor != null) {
monitor.shutDown();
}
})
.thenRun(() -> {
if (torNode != null) {
torNode.shutDown();
}
})
.thenRun(() -> System.exit(0));
.thenRun(() -> {
System.exit(0);
});
}

public static void keepRunning() {
while (!stopped) {
try {
Thread.sleep(Long.MAX_VALUE);
log.error("saf");
} catch (InterruptedException ignore) {
log.error("");
}
}
}
Loading

0 comments on commit bea06d9

Please sign in to comment.