From 189e94a1f1b0065acd1b5e1c86c8d9df62a92591 Mon Sep 17 00:00:00 2001 From: djing chan Date: Tue, 19 Dec 2023 14:17:29 +0700 Subject: [PATCH 1/2] Create PeerGroupService at startup as it is used for persistence and the framework requires that persistence service classes are available when the persistence framework read in all data. As we do not want to pass the default node which is created later via a setter to PeerGroupService, it got refactored so that clients use node instead. This also reduced the usage of PeerGroupService where not needed. The peer group management could need more refactoring as it's a bit messy as it is now. --- .../bisq/desktop/main/left/NetworkInfo.java | 11 ++-- .../network/p2p/node/BaseNodesByIdTest.java | 5 +- .../exchange/BasePeerExchangeServiceTest.java | 12 +++-- .../java/bisq/network/p2p/ServiceNode.java | 25 ++++----- .../main/java/bisq/network/p2p/node/Node.java | 19 ++++++- .../ack/MessageDeliveryStatusService.java | 7 +-- .../p2p/services/data/DataNetworkService.java | 5 +- .../services/data/broadcast/Broadcaster.java | 9 ++-- .../data/inventory/InventoryService.java | 10 ++-- .../services/peergroup/PeerGroupManager.java | 33 ++++++------ .../services/peergroup/PeerGroupService.java | 53 +++++-------------- .../exchange/PeerExchangeStrategy.java | 15 +++--- .../peergroup/keepalive/KeepAliveService.java | 7 +-- .../NetworkLoadExchangeService.java | 3 +- 14 files changed, 100 insertions(+), 114 deletions(-) diff --git a/apps/desktop/desktop/src/main/java/bisq/desktop/main/left/NetworkInfo.java b/apps/desktop/desktop/src/main/java/bisq/desktop/main/left/NetworkInfo.java index 8e14d67223..a313ab40d8 100644 --- a/apps/desktop/desktop/src/main/java/bisq/desktop/main/left/NetworkInfo.java +++ b/apps/desktop/desktop/src/main/java/bisq/desktop/main/left/NetworkInfo.java @@ -66,6 +66,7 @@ public void onActivate() { networkService.getServiceNodesByTransport().findServiceNode(type).ifPresent(serviceNode -> { serviceNode.getPeerGroupManager().ifPresent(peerGroupManager -> { PeerGroupService peerGroupService = peerGroupManager.getPeerGroupService(); + Node node = peerGroupManager.getNode(); String numTargetConnections = String.valueOf(peerGroupService.getTargetNumConnectedPeers()); switch (type) { case CLEAR: @@ -87,16 +88,16 @@ public void onMessage(EnvelopePayloadMessage envelopePayloadMessage, Connection @Override public void onConnection(Connection connection) { - onNumConnectionsChanged(type, peerGroupService); + onNumConnectionsChanged(type, node); } @Override public void onDisconnect(Connection connection, CloseReason closeReason) { - onNumConnectionsChanged(type, peerGroupService); + onNumConnectionsChanged(type, node); } }); - onNumConnectionsChanged(type, peerGroupService); + onNumConnectionsChanged(type, node); }); }) ); @@ -112,9 +113,9 @@ private void onNavigateToNetworkInfo() { onNavigationTargetSelectedHandler.accept(NavigationTarget.NETWORK_INFO); } - private void onNumConnectionsChanged(TransportType transportType, PeerGroupService peerGroupService) { + private void onNumConnectionsChanged(TransportType transportType, Node node) { UIThread.run(() -> { - String value = String.valueOf(peerGroupService.getNumConnections()); + String value = String.valueOf(node.getNumConnections()); switch (transportType) { case CLEAR: model.getClearNetNumConnections().set(value); diff --git a/network/network/src/integrationTest/java/bisq/network/p2p/node/BaseNodesByIdTest.java b/network/network/src/integrationTest/java/bisq/network/p2p/node/BaseNodesByIdTest.java index 81245a4529..eac57427f2 100644 --- a/network/network/src/integrationTest/java/bisq/network/p2p/node/BaseNodesByIdTest.java +++ b/network/network/src/integrationTest/java/bisq/network/p2p/node/BaseNodesByIdTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; @Slf4j public abstract class BaseNodesByIdTest extends BaseNetworkTest { @@ -44,7 +45,7 @@ void test_messageRoundTrip(Node.Config nodeConfig) throws InterruptedException { BanList banList = new BanList(); TransportService transportService = TransportService.create(nodeConfig.getTransportType(), nodeConfig.getTransportConfig()); PersistenceService persistenceService = new PersistenceService(""); - KeyBundleService keyBundleService = new KeyBundleService(persistenceService); + KeyBundleService keyBundleService = new KeyBundleService(persistenceService, mock(KeyBundleService.Config.class)); NodesById nodesById = new NodesById(banList, nodeConfig, keyBundleService, transportService, new NetworkLoadService(), new AuthorizationService(new HashCashService())); long ts = System.currentTimeMillis(); @@ -135,7 +136,7 @@ void test_initializeServer(Node.Config nodeConfig) { BanList banList = new BanList(); TransportService transportService = TransportService.create(nodeConfig.getTransportType(), nodeConfig.getTransportConfig()); PersistenceService persistenceService = new PersistenceService(""); - KeyBundleService keyBundleService = new KeyBundleService(persistenceService); + KeyBundleService keyBundleService = new KeyBundleService(persistenceService, mock(KeyBundleService.Config.class)); NodesById nodesById = new NodesById(banList, nodeConfig, keyBundleService, transportService, new NetworkLoadService(), new AuthorizationService(new HashCashService())); initializeServers(2, nodesById); nodesById.shutdown().join(); diff --git a/network/network/src/integrationTest/java/bisq/network/p2p/services/peergroup/exchange/BasePeerExchangeServiceTest.java b/network/network/src/integrationTest/java/bisq/network/p2p/services/peergroup/exchange/BasePeerExchangeServiceTest.java index 7b1335e7cf..f5f8bbd0cb 100644 --- a/network/network/src/integrationTest/java/bisq/network/p2p/services/peergroup/exchange/BasePeerExchangeServiceTest.java +++ b/network/network/src/integrationTest/java/bisq/network/p2p/services/peergroup/exchange/BasePeerExchangeServiceTest.java @@ -18,6 +18,7 @@ package bisq.network.p2p.services.peergroup.exchange; import bisq.network.common.Address; +import bisq.network.common.TransportType; import bisq.network.p2p.BaseNetworkTest; import bisq.network.p2p.node.Node; import bisq.network.p2p.node.transport.TransportService; @@ -43,7 +44,8 @@ void test_peerExchange(Node.Config nodeConfig) throws InterruptedException, Exec int numSeeds = 2; int numNodes = 2; BanList banList = new BanList(); - TransportService transportService = TransportService.create(nodeConfig.getTransportType(), nodeConfig.getTransportConfig()); + TransportType transportType = nodeConfig.getTransportType(); + TransportService transportService = TransportService.create(transportType, nodeConfig.getTransportConfig()); PersistenceService persistenceService = new PersistenceService(getBaseDir().toAbsolutePath().toString()); PeerGroupManager.Config peerGroupServiceConfig = new PeerGroupManager.Config( null, null, null, @@ -99,11 +101,11 @@ void test_peerExchange(Node.Config nodeConfig) throws InterruptedException, Exec for (int i = 0; i < numNodes; i++) { Node node = nodes.get(i); PeerGroupService peerGroupService = new PeerGroupService(persistenceService, - node, + transportType, new PeerGroupService.Config(), new HashSet<>(seedNodeAddresses), banList); - PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, new PeerExchangeStrategy.Config()); + PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, node, new PeerExchangeStrategy.Config()); PeerExchangeService peerExchangeService = new PeerExchangeService(node, peerExchangeStrategy); peerExchangeService.startInitialPeerExchange().whenComplete((result, throwable) -> { assertNull(throwable); @@ -120,11 +122,11 @@ void test_peerExchange(Node.Config nodeConfig) throws InterruptedException, Exec for (int i = 0; i < numNodes; i++) { Node node = nodes.get(i); PeerGroupService peerGroupService = new PeerGroupService(persistenceService, - node, + transportType, new PeerGroupService.Config(), new HashSet<>(seedNodeAddresses), banList); - PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, new PeerExchangeStrategy.Config()); + PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, node, new PeerExchangeStrategy.Config()); PeerExchangeService peerExchangeService = new PeerExchangeService(node, peerExchangeStrategy); peerExchangeService.startInitialPeerExchange().whenComplete((result, throwable) -> { assertNull(throwable); diff --git a/network/network/src/main/java/bisq/network/p2p/ServiceNode.java b/network/network/src/main/java/bisq/network/p2p/ServiceNode.java index 601f029703..309db1cfc3 100644 --- a/network/network/src/main/java/bisq/network/p2p/ServiceNode.java +++ b/network/network/src/main/java/bisq/network/p2p/ServiceNode.java @@ -41,6 +41,7 @@ import bisq.network.p2p.services.data.inventory.InventoryService; import bisq.network.p2p.services.peergroup.BanList; import bisq.network.p2p.services.peergroup.PeerGroupManager; +import bisq.network.p2p.services.peergroup.PeerGroupService; import bisq.persistence.PersistenceService; import bisq.security.keys.KeyBundleService; import bisq.security.keys.PubKey; @@ -103,10 +104,10 @@ public enum SupportedService { private final Config config; private final PeerGroupManager.Config peerGroupServiceConfig; private final Optional dataService; + private final PeerGroupService peerGroupService; private final InventoryService.Config inventoryServiceConfig; private final Optional messageDeliveryStatusService; private final KeyBundleService keyBundleService; - private final PersistenceService persistenceService; private final Set
seedNodeAddresses; @Getter @@ -147,11 +148,12 @@ public enum SupportedService { this.messageDeliveryStatusService = messageDeliveryStatusService; this.dataService = dataService; this.keyBundleService = keyBundleService; - this.persistenceService = persistenceService; this.seedNodeAddresses = seedNodeAddresses; transportService = TransportService.create(transportType, nodeConfig.getTransportConfig()); nodesById = new NodesById(banList, nodeConfig, keyBundleService, transportService, networkLoadService, authorizationService); + + peerGroupService = new PeerGroupService(persistenceService, transportType, peerGroupServiceConfig.getPeerGroupConfig(), seedNodeAddresses, banList); } @@ -164,18 +166,17 @@ Node getInitializedDefaultNode(NetworkId defaultNetworkId) { Set supportedServices = config.getSupportedServices(); peerGroupManager = supportedServices.contains(SupportedService.PEER_GROUP) ? - Optional.of(new PeerGroupManager(persistenceService, - defaultNode, + Optional.of(new PeerGroupManager(defaultNode, + peerGroupService, banList, - peerGroupServiceConfig, - seedNodeAddresses)) : + peerGroupServiceConfig)) : Optional.empty(); boolean dataServiceEnabled = supportedServices.contains(SupportedService.PEER_GROUP) && supportedServices.contains(SupportedService.DATA); dataNetworkService = dataServiceEnabled ? - Optional.of(new DataNetworkService(defaultNode, peerGroupManager.orElseThrow(), dataService.orElseThrow())) : + Optional.of(new DataNetworkService(defaultNode, dataService.orElseThrow())) : Optional.empty(); inventoryService = dataServiceEnabled ? @@ -192,8 +193,8 @@ Node getInitializedDefaultNode(NetworkId defaultNetworkId) { setState(State.INITIALIZING); transportService.initialize();// blocking defaultNode.initialize();// blocking - peerGroupManager.ifPresentOrElse(peerGroupService -> { - peerGroupService.initialize();// blocking + peerGroupManager.ifPresentOrElse(peerGroupManager -> { + peerGroupManager.initialize();// blocking setState(State.INITIALIZED); }, () -> setState(State.INITIALIZED)); @@ -225,19 +226,19 @@ boolean isNodeInitialized(NetworkId networkId) { void addSeedNodeAddresses(Set
seedNodeAddresses) { this.seedNodeAddresses.addAll(seedNodeAddresses); - peerGroupManager.ifPresent(peerGroupService -> peerGroupService.addSeedNodeAddresses(seedNodeAddresses)); + peerGroupManager.ifPresent(peerGroupManager -> peerGroupManager.addSeedNodeAddresses(seedNodeAddresses)); } void addSeedNodeAddress(Address seedNodeAddress) { // In case we would get called before peerGroupManager is created we add the seedNodeAddress to the // seedNodeAddresses field seedNodeAddresses.add(seedNodeAddress); - peerGroupManager.ifPresent(peerGroupService -> peerGroupService.addSeedNodeAddress(seedNodeAddress)); + peerGroupManager.ifPresent(peerGroupManager -> peerGroupManager.addSeedNodeAddress(seedNodeAddress)); } void removeSeedNodeAddress(Address seedNodeAddress) { seedNodeAddresses.remove(seedNodeAddress); - peerGroupManager.ifPresent(peerGroupService -> peerGroupService.removeSeedNodeAddress(seedNodeAddress)); + peerGroupManager.ifPresent(peerGroupManager -> peerGroupManager.removeSeedNodeAddress(seedNodeAddress)); } SendConfidentialMessageResult confidentialSend(EnvelopePayloadMessage envelopePayloadMessage, diff --git a/network/network/src/main/java/bisq/network/p2p/node/Node.java b/network/network/src/main/java/bisq/network/p2p/node/Node.java index b399e11081..a477cb2ea7 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/Node.java +++ b/network/network/src/main/java/bisq/network/p2p/node/Node.java @@ -438,6 +438,21 @@ public Stream getAllConnections() { return Stream.concat(inboundConnectionsByAddress.values().stream(), outboundConnectionsByAddress.values().stream()); } + public Stream getAllActiveConnections() { + return getAllConnections().filter(Connection::isRunning); + } + + public Stream getActiveOutboundConnections() { + return getOutboundConnectionsByAddress().values().stream().filter(Connection::isRunning); + } + + public Stream getActiveInboundConnections() { + return getInboundConnectionsByAddress().values().stream().filter(Connection::isRunning); + } + + public int getNumConnections() { + return (int) getAllActiveConnections().count(); + } /////////////////////////////////////////////////////////////////////////////////////////////////// // Connection.Handler @@ -587,8 +602,8 @@ public Optional
findMyAddress() { return server.map(Server::getAddress); } - public int getNumConnections() { - return inboundConnectionsByAddress.size() + outboundConnectionsByAddress.size(); + public boolean notMyself(Address address) { + return findMyAddress().stream().noneMatch(myAddress -> myAddress.equals(address)); } public boolean isInitialized() { diff --git a/network/network/src/main/java/bisq/network/p2p/services/confidential/ack/MessageDeliveryStatusService.java b/network/network/src/main/java/bisq/network/p2p/services/confidential/ack/MessageDeliveryStatusService.java index 8cd8d583ce..42da755994 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/confidential/ack/MessageDeliveryStatusService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/confidential/ack/MessageDeliveryStatusService.java @@ -123,10 +123,11 @@ private void processAckMessage(AckMessage ackMessage) { return; } - if (observableStatus.get() == MessageDeliveryStatus.ADDED_TO_MAILBOX) { - observableStatus.set(MessageDeliveryStatus.MAILBOX_MSG_RECEIVED); - } else { + if (observableStatus.get() == MessageDeliveryStatus.START_SENDING) { observableStatus.set(MessageDeliveryStatus.ARRIVED); + } else { + // Covers ADDED_TO_MAILBOX, TRY_ADD_TO_MAILBOX and FAILED + observableStatus.set(MessageDeliveryStatus.MAILBOX_MSG_RECEIVED); } } else { messageDeliveryStatusByMessageId.put(messageId, new Observable<>(MessageDeliveryStatus.ARRIVED)); diff --git a/network/network/src/main/java/bisq/network/p2p/services/data/DataNetworkService.java b/network/network/src/main/java/bisq/network/p2p/services/data/DataNetworkService.java index 6e580fd06b..ae54356922 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/data/DataNetworkService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/data/DataNetworkService.java @@ -23,7 +23,6 @@ import bisq.network.p2p.node.Connection; import bisq.network.p2p.node.Node; import bisq.network.p2p.services.data.broadcast.Broadcaster; -import bisq.network.p2p.services.peergroup.PeerGroupManager; import lombok.extern.slf4j.Slf4j; /** @@ -36,10 +35,10 @@ public class DataNetworkService implements Node.Listener { private final DataService dataService; private final Broadcaster broadcaster; - public DataNetworkService(Node node, PeerGroupManager peerGroupManager, DataService dataService) { + public DataNetworkService(Node node, DataService dataService) { this.node = node; this.dataService = dataService; - broadcaster = new Broadcaster(node, peerGroupManager.getPeerGroupService()); + broadcaster = new Broadcaster(node); node.addListener(this); dataService.addBroadcaster(broadcaster); } diff --git a/network/network/src/main/java/bisq/network/p2p/services/data/broadcast/Broadcaster.java b/network/network/src/main/java/bisq/network/p2p/services/data/broadcast/Broadcaster.java index 7394228968..1e4a031f09 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/data/broadcast/Broadcaster.java +++ b/network/network/src/main/java/bisq/network/p2p/services/data/broadcast/Broadcaster.java @@ -20,7 +20,6 @@ import bisq.network.NetworkService; import bisq.network.p2p.node.Connection; import bisq.network.p2p.node.Node; -import bisq.network.p2p.services.peergroup.PeerGroupService; import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; import lombok.extern.slf4j.Slf4j; @@ -39,12 +38,10 @@ public class Broadcaster { private static final long RE_BROADCAST_DELAY_MS = 100; private final Node node; - private final PeerGroupService peerGroupService; private final RetryPolicy retryPolicy; - public Broadcaster(Node node, PeerGroupService peerGroupService) { + public Broadcaster(Node node) { this.node = node; - this.peerGroupService = peerGroupService; retryPolicy = RetryPolicy.builder() .handle(IllegalStateException.class) @@ -81,11 +78,11 @@ public CompletableFuture doBroadcast(BroadcastMessage broadcast .orTimeout(BROADCAST_TIMEOUT, TimeUnit.SECONDS); AtomicInteger numSuccess = new AtomicInteger(0); AtomicInteger numFaults = new AtomicInteger(0); - long numConnections = peerGroupService.getAllConnections().count(); + long numConnections = node.getAllActiveConnections().count(); long numBroadcasts = Math.min(numConnections, Math.round(numConnections * distributionFactor)); log.debug("Broadcast {} to {} out of {} peers. distributionFactor={}", broadcastMessage.getClass().getSimpleName(), numBroadcasts, numConnections, distributionFactor); - List allConnections = peerGroupService.getAllConnections().collect(Collectors.toList()); + List allConnections = node.getAllActiveConnections().collect(Collectors.toList()); Collections.shuffle(allConnections); NetworkService.NETWORK_IO_POOL.submit(() -> { allConnections.stream() diff --git a/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryService.java b/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryService.java index d8acf260d6..ebb4abd0d7 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/data/inventory/InventoryService.java @@ -131,7 +131,7 @@ public void onMessage(EnvelopePayloadMessage envelopePayloadMessage, Connection public void onConnection(Connection connection) { if (sufficientConnections()) { log.info("We are sufficiently connected to start the inventory request. numConnections={}", - peerGroupService.getNumConnections()); + node.getNumConnections()); doRequest(); } } @@ -187,16 +187,16 @@ private void doRequest() { private List> request(DataFilter dataFilter) { int maxSeeds = 2; int maxCandidates = 4; - List
candidates = peerGroupService.getAllConnectedPeers() + List
candidates = peerGroupService.getAllConnectedPeers(node) .filter(peerGroupService::isSeed) .limit(maxSeeds) .map(Peer::getAddress) .collect(Collectors.toList()); - candidates.addAll(peerGroupService.getAllConnectedPeers() + candidates.addAll(peerGroupService.getAllConnectedPeers(node) .filter(peerGroupService::notASeed) .map(Peer::getAddress) .collect(Collectors.toList())); - return peerGroupService.getAllConnections() + return node.getAllActiveConnections() .filter(connection -> !requestHandlerMap.containsKey(connection.getId())) .filter(connection -> candidates.contains(connection.getPeerAddress())) .limit(maxCandidates) @@ -224,7 +224,7 @@ private List> request(DataFilter dataFilter) { } private boolean sufficientConnections() { - return peerGroupService.getNumConnections() > peerGroupService.getTargetNumConnectedPeers() / 2; + return node.getNumConnections() > peerGroupService.getTargetNumConnectedPeers() / 2; } private FilterEntry toFilterEntry(Map.Entry mapEntry) { diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java index 2621c9b493..245e93c404 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupManager.java @@ -27,7 +27,6 @@ import bisq.network.p2p.services.peergroup.exchange.PeerExchangeStrategy; import bisq.network.p2p.services.peergroup.keepalive.KeepAliveService; import bisq.network.p2p.services.peergroup.network_load.NetworkLoadExchangeService; -import bisq.persistence.PersistenceService; import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; import lombok.Getter; @@ -114,6 +113,7 @@ public static Config from(PeerGroupService.Config peerGroupConfig, } } + @Getter private final Node node; private final BanList banList; private final Config config; @@ -131,19 +131,19 @@ public static Config from(PeerGroupService.Config peerGroupConfig, private final RetryPolicy retryPolicy; - public PeerGroupManager(PersistenceService persistenceService, - Node node, + public PeerGroupManager(Node node, + PeerGroupService peerGroupService, BanList banList, - Config config, - Set
seedNodeAddresses) { + Config config) { this.node = node; this.banList = banList; this.config = config; - peerGroupService = new PeerGroupService(persistenceService, node, config.peerGroupConfig, seedNodeAddresses, banList); + this.peerGroupService = peerGroupService; PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, + node, config.getPeerExchangeConfig()); peerExchangeService = new PeerExchangeService(node, peerExchangeStrategy); - keepAliveService = new KeepAliveService(node, peerGroupService, config.getKeepAliveServiceConfig()); + keepAliveService = new KeepAliveService(node, config.getKeepAliveServiceConfig()); networkLoadExchangeService = new NetworkLoadExchangeService(node, peerGroupService); retryPolicy = RetryPolicy.builder() @@ -246,7 +246,7 @@ private void runBlockingTasks() { private void closeBanned() { log.debug("Node {} called closeBanned", node); - peerGroupService.getAllConnections() + node.getAllActiveConnections() .filter(Connection::isRunning) .filter(connection -> banList.isBanned(connection.getPeerAddress())) .peek(connection -> log.info("Close connection to banned node. connection={} ", connection.getPeerAddress())) @@ -258,10 +258,10 @@ private void closeBanned() { */ private void maybeCloseDuplicateConnections() { log.debug("Node {} called maybeCloseDuplicateConnections", node); - Set
outboundAddresses = peerGroupService.getOutboundConnections() + Set
outboundAddresses = node.getActiveOutboundConnections() .map(Connection::getPeerAddress) .collect(Collectors.toSet()); - peerGroupService.getInboundConnections() + node.getActiveInboundConnections() .filter(this::mayDisconnect) .filter(inbound -> outboundAddresses.contains(inbound.getPeerAddress())) .peek(inbound -> log.info("{} -> {}: Send CloseConnectionMessage as we have an " + @@ -270,11 +270,10 @@ private void maybeCloseDuplicateConnections() { .forEach(inbound -> node.closeConnectionGracefully(inbound, CloseReason.DUPLICATE_CONNECTION)); } - private void maybeCloseConnectionsToSeeds() { log.debug("Node {} called maybeCloseConnectionsToSeeds", node); Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); // reversed as we use skip - peerGroupService.getAllConnections() + node.getAllActiveConnections() .filter(this::mayDisconnect) .filter(peerGroupService::isSeed) .sorted(comparator) @@ -287,7 +286,7 @@ private void maybeCloseConnectionsToSeeds() { private void maybeCloseAgedConnections() { log.debug("Node {} called maybeCloseAgedConnections", node); - peerGroupService.getAllConnections() + node.getAllActiveConnections() .filter(this::mayDisconnect) .filter(connection -> connection.getConnectionMetrics().getAge() > config.getMaxAge()) .peek(connection -> log.info("{} -> {}: Send CloseConnectionMessage as the connection age " + @@ -300,7 +299,7 @@ private void maybeCloseAgedConnections() { private void maybeCloseExceedingInboundConnections() { log.debug("Node {} called maybeCloseExceedingInboundConnections", node); Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); - peerGroupService.getInboundConnections() + node.getActiveInboundConnections() .filter(this::mayDisconnect) .sorted(comparator) .skip(peerGroupService.getMaxInboundConnections()) @@ -313,7 +312,7 @@ private void maybeCloseExceedingInboundConnections() { private void maybeCloseExceedingConnections() { log.debug("Node {} called maybeCloseExceedingConnections", node); Comparator comparator = peerGroupService.getConnectionAgeComparator().reversed(); - peerGroupService.getAllConnections() + node.getAllActiveConnections() .filter(this::mayDisconnect) .sorted(comparator) .skip(peerGroupService.getMaxNumConnectedPeers()) @@ -329,7 +328,7 @@ private void maybeCreateConnections() { // We want to have at least 40% of our minNumConnectedPeers as outbound connections if (getMissingOutboundConnections() <= 0) { // We have enough outbound connections, lets check if we have sufficient connections in total - if (peerGroupService.getNumConnections() >= minNumConnectedPeers) { + if (node.getNumConnections() >= minNumConnectedPeers) { log.debug("Node {} has sufficient connections", node); CompletableFuture.completedFuture(null); return; @@ -396,7 +395,7 @@ private boolean notBootstrapping(Connection connection) { } private int getMissingOutboundConnections() { - return peerGroupService.getMinOutboundConnections() - (int) peerGroupService.getOutboundConnections().count(); + return peerGroupService.getMinOutboundConnections() - (int) node.getActiveOutboundConnections().count(); } } \ No newline at end of file diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java index 63990e49c1..afad2e41d9 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/PeerGroupService.java @@ -20,10 +20,9 @@ import bisq.common.util.MathUtils; import bisq.network.NetworkService; import bisq.network.common.Address; +import bisq.network.common.TransportType; import bisq.network.p2p.node.Connection; -import bisq.network.p2p.node.InboundConnection; import bisq.network.p2p.node.Node; -import bisq.network.p2p.node.OutboundConnection; import bisq.persistence.Persistence; import bisq.persistence.PersistenceClient; import bisq.persistence.PersistenceService; @@ -72,7 +71,6 @@ public static Config from(com.typesafe.config.Config typesafeConfig) { private final Persistence persistence; @Getter private final PeerGroupStore persistableStore = new PeerGroupStore(); - private final Node node; private final Config config; @Getter private final Set
seedNodeAddresses; @@ -81,18 +79,17 @@ public static Config from(com.typesafe.config.Config typesafeConfig) { private final Set reportedPeers = new CopyOnWriteArraySet<>(); public PeerGroupService(PersistenceService persistenceService, - Node node, + TransportType transportType, Config config, Set
seedNodeAddresses, BanList banList) { - this.node = node; this.config = config; this.seedNodeAddresses = seedNodeAddresses; this.banList = banList; persistence = persistenceService.getOrCreatePersistence(this, NetworkService.NETWORK_DB_PATH, - node.getTransportType().name().toLowerCase() + persistableStore.getClass().getSimpleName(), + transportType.name().toLowerCase() + persistableStore.getClass().getSimpleName(), persistableStore); } @@ -105,13 +102,15 @@ public Set getPersistedPeers() { } public void addPersistedPeers(Set peers) { - getPersistedPeers().addAll(peers); - persist(); + if (getPersistedPeers().addAll(peers)) { + persist(); + } } - public void removePersistedPeers(Collection candidates) { - getPersistedPeers().removeAll(candidates); - persist(); + public void removePersistedPeers(Collection peers) { + if (getPersistedPeers().removeAll(peers)) { + persist(); + } } @@ -132,22 +131,6 @@ public void removeReportedPeers(Collection peers) { // Connections /////////////////////////////////////////////////////////////////////////////////////////////////// - public Stream getOutboundConnections() { - return node.getOutboundConnectionsByAddress().values().stream().filter(Connection::isRunning); - } - - public Stream getInboundConnections() { - return node.getInboundConnectionsByAddress().values().stream().filter(Connection::isRunning); - } - - public Stream getAllConnections() { - return node.getAllConnections().filter(Connection::isRunning); - } - - public int getNumConnections() { - return (int) getAllConnections().count(); - } - public boolean isSeed(Connection connection) { return seedNodeAddresses.stream().anyMatch(seedAddress -> seedAddress.equals(connection.getPeerAddress())); } @@ -169,8 +152,8 @@ public Comparator getConnectionAgeComparator() { // Peers /////////////////////////////////////////////////////////////////////////////////////////////////// - public Stream getAllConnectedPeers() { - return getAllConnections().map(connection -> + public Stream getAllConnectedPeers(Node node) { + return node.getAllActiveConnections().map(connection -> new Peer(connection.getPeersCapability(), connection.getPeersNetworkLoadService().getCurrentNetworkLoad(), connection.isOutboundConnection())); @@ -184,10 +167,6 @@ public boolean isNotBanned(Address address) { return banList.isNotBanned(address); } - public boolean notMyself(Peer peer) { - return notMyself(peer.getAddress()); - } - public int getMinNumReportedPeers() { return config.getMinNumReportedPeers(); } @@ -217,14 +196,6 @@ public void removeSeedNodeAddress(Address seedNodeAddress) { this.seedNodeAddresses.remove(seedNodeAddress); } - public Stream
getAllConnectedPeerAddresses() { - return getAllConnectedPeers().map(Peer::getAddress); - } - - public boolean notMyself(Address address) { - return node.findMyAddress().stream().noneMatch(myAddress -> myAddress.equals(address)); - } - public boolean isSeed(Address address) { return seedNodeAddresses.stream().anyMatch(seedAddress -> seedAddress.equals(address)); } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java index d77381bda8..4a5a503d34 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/exchange/PeerExchangeStrategy.java @@ -18,6 +18,7 @@ package bisq.network.p2p.services.peergroup.exchange; import bisq.network.common.Address; +import bisq.network.p2p.node.Node; import bisq.network.p2p.services.peergroup.Peer; import bisq.network.p2p.services.peergroup.PeerGroupService; import lombok.Getter; @@ -61,11 +62,13 @@ public static Config from(com.typesafe.config.Config typesafeConfig) { } private final PeerGroupService peerGroupService; + private final Node node; private final Config config; private final Set
usedAddresses = new CopyOnWriteArraySet<>(); - public PeerExchangeStrategy(PeerGroupService peerGroupService, Config config) { + public PeerExchangeStrategy(PeerGroupService peerGroupService, Node node, Config config) { this.peerGroupService = peerGroupService; + this.node = node; this.config = config; } @@ -107,7 +110,7 @@ List
getAddressesForFurtherPeerExchange() { boolean shouldRedoInitialPeerExchange(int numSuccess, int numRequests) { int numFailed = numRequests - numSuccess; return numFailed > numRequests / 2 || - peerGroupService.getAllConnectedPeers().count() < peerGroupService.getTargetNumConnectedPeers() || + peerGroupService.getAllConnectedPeers(node).count() < peerGroupService.getTargetNumConnectedPeers() || peerGroupService.getReportedPeers().size() < peerGroupService.getMinNumReportedPeers(); } @@ -123,7 +126,7 @@ private int getPeerExchangeLimit() { int minNumConnectedPeers = peerGroupService.getMinNumConnectedPeers(); // default 8 // We want at least 25% of minNumConnectedPeers int minValue = minNumConnectedPeers / 4; - int missing = Math.max(0, peerGroupService.getTargetNumConnectedPeers() - peerGroupService.getNumConnections()); + int missing = Math.max(0, peerGroupService.getTargetNumConnectedPeers() - node.getNumConnections()); int limit = Math.max(minValue, missing); // In case we have enough connections but do not have received at least 25% of our @@ -150,7 +153,7 @@ private List
getPriorityListForFurtherPeerExchange() { private List
getSeedAddresses() { return getShuffled(peerGroupService.getSeedNodeAddresses()).stream() - .filter(peerGroupService::notMyself) + .filter(node::notMyself) .filter(peerGroupService::isNotBanned) .limit(config.getNumSeedNodesAtBoostrap()) .collect(Collectors.toList()); @@ -229,7 +232,7 @@ private boolean notASeed(Peer peer) { private boolean isValidNonSeedPeer(Address address) { return notASeed(address) && peerGroupService.isNotBanned(address) && - peerGroupService.notMyself(address); + node.notMyself(address); } private boolean isValidNonSeedPeer(Peer peer) { @@ -253,7 +256,7 @@ private boolean notSameAddress(Address address, Peer peer) { } private Stream getAllConnectedPeers() { - return peerGroupService.getAllConnectedPeers() + return peerGroupService.getAllConnectedPeers(node) .filter(this::isValidNonSeedPeer) .sorted(Comparator.comparing(Peer::getDate).reversed()); } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveService.java index a94baad095..1329ec7edf 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/keepalive/KeepAliveService.java @@ -24,7 +24,6 @@ import bisq.network.p2p.node.CloseReason; import bisq.network.p2p.node.Connection; import bisq.network.p2p.node.Node; -import bisq.network.p2p.services.peergroup.PeerGroupService; import lombok.Getter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; @@ -60,14 +59,12 @@ public static Config from(com.typesafe.config.Config typesafeConfig) { } private final Node node; - private final PeerGroupService peerGroupService; private final Config config; private final Map requestHandlerMap = new ConcurrentHashMap<>(); private Optional scheduler = Optional.empty(); - public KeepAliveService(Node node, PeerGroupService peerGroupService, Config config) { + public KeepAliveService(Node node, Config config) { this.node = node; - this.peerGroupService = peerGroupService; this.config = config; this.node.addListener(this); } @@ -124,7 +121,7 @@ public void onDisconnect(Connection connection, CloseReason closeReason) { } private void sendPingIfRequired() { - peerGroupService.getAllConnections() + node.getAllActiveConnections() .filter(this::isRequired) .forEach(this::sendPing); } diff --git a/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeService.java b/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeService.java index 53dc5b2d8c..ef29a60454 100644 --- a/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeService.java +++ b/network/network/src/main/java/bisq/network/p2p/services/peergroup/network_load/NetworkLoadExchangeService.java @@ -67,9 +67,8 @@ public void shutdown() { requestHandlerMap.clear(); } - private void requestFromAll() { - peerGroupService.getAllConnections() + node.getAllActiveConnections() .filter(this::needsUpdate) .forEach(this::request); } From f811f287bcfe011f186e9d1218eabaacefbf3772 Mon Sep 17 00:00:00 2001 From: djing chan Date: Tue, 19 Dec 2023 14:50:32 +0700 Subject: [PATCH 2/2] Improve logs and comments --- .../bisq/network/p2p/node/Connection.java | 13 +++--- .../main/java/bisq/network/p2p/node/Node.java | 41 ++++++++++--------- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/network/network/src/main/java/bisq/network/p2p/node/Connection.java b/network/network/src/main/java/bisq/network/p2p/node/Connection.java index a7d713b0df..4d7c283a39 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/Connection.java +++ b/network/network/src/main/java/bisq/network/p2p/node/Connection.java @@ -126,9 +126,11 @@ protected Connection(Socket socket, } catch (Exception exception) { //todo StreamCorruptedException from i2p at shutdown. prob it send some text data at shut down if (isInputStreamActive()) { - log.debug("Call shutdown from startListen read handler {} due exception={}", this, exception.toString()); + log.debug("Exception at input handler on {}", this, exception); close(CloseReason.EXCEPTION.exception(exception)); - // EOFException expected if connection got closed + + // EOFException expected if connection got closed (Socket closed message) + // TODO maybe also filter SocketException if (!(exception instanceof EOFException)) { errorHandler.accept(this, exception); } @@ -191,6 +193,8 @@ Connection send(EnvelopePayloadMessage envelopePayloadMessage, AuthorizationToke } catch (Throwable throwable) { if (!isStopped) { throw throwable; + } else { + log.warn("Send message failed at stopped connection", throwable); } } } @@ -207,11 +211,10 @@ Connection send(EnvelopePayloadMessage envelopePayloadMessage, AuthorizationToke return this; } catch (IOException exception) { if (!isStopped) { - log.error("Call shutdown from send {} due exception={}", this, exception.toString()); + log.error("Send message failed. {}", this, exception); close(CloseReason.EXCEPTION.exception(exception)); } - // We wrap any exception (also expected EOFException in case of connection close), to inform the caller - // that the "send proto" intent failed. + // We wrap any exception (also expected EOFException in case of connection close), to leave handling of the exception to the caller. throw new ConnectionException(exception); } } diff --git a/network/network/src/main/java/bisq/network/p2p/node/Node.java b/network/network/src/main/java/bisq/network/p2p/node/Node.java index a477cb2ea7..5cd4bf91f4 100644 --- a/network/network/src/main/java/bisq/network/p2p/node/Node.java +++ b/network/network/src/main/java/bisq/network/p2p/node/Node.java @@ -46,10 +46,7 @@ import java.io.EOFException; import java.io.IOException; -import java.net.Socket; -import java.net.SocketException; -import java.net.SocketTimeoutException; -import java.net.UnknownHostException; +import java.net.*; import java.time.Duration; import java.util.ArrayList; import java.util.Map; @@ -311,6 +308,7 @@ public Connection send(EnvelopePayloadMessage envelopePayloadMessage, Address ad public Connection send(EnvelopePayloadMessage envelopePayloadMessage, Connection connection) { if (connection.isStopped()) { + log.debug("Send message failed as connection is already stopped {}", this); throw new ConnectionClosedException(connection); } try { @@ -322,6 +320,7 @@ public Connection send(EnvelopePayloadMessage envelopePayloadMessage, Connection } catch (Throwable throwable) { if (connection.isRunning()) { handleException(connection, throwable); + log.debug("Send message failed on {}", this, throwable); closeConnection(connection, CloseReason.EXCEPTION.exception(throwable)); } throw new ConnectionClosedException(connection); @@ -349,15 +348,17 @@ public Connection getConnection(Address address) { /////////////////////////////////////////////////////////////////////////////////////////////////// private Connection createOutboundConnection(Address address) { - return myCapability.map(capability -> createOutboundConnection(address, capability)).orElseGet(() -> { - int port = networkId.getAddressByTransportTypeMap().get(transportType).getPort(); - log.warn("We create an outbound connection but we have not initialized our server. " + - "We create a server on port {} now but clients better control node " + - "life cycle themselves.", port); - initialize(); - checkArgument(myCapability.isPresent(), "myCapability must be present after initializeServer got called"); - return createOutboundConnection(address, myCapability.get()); - }); + log.debug("Create outbound connection to {}", address); + return myCapability.map(capability -> createOutboundConnection(address, capability)) + .orElseGet(() -> { + int port = networkId.getAddressByTransportTypeMap().get(transportType).getPort(); + log.warn("We create an outbound connection but we have not initialized our server. " + + "We create a server on port {} now but clients better control node " + + "life cycle themselves.", port); + initialize(); + checkArgument(myCapability.isPresent(), "myCapability must be present after initializeServer got called"); + return createOutboundConnection(address, myCapability.get()); + }); } private Connection createOutboundConnection(Address address, Capability myCapability) { @@ -640,17 +641,19 @@ private void handleException(Throwable exception) { if (isShutdown()) { return; } + String msg = "Exception: "; if (exception instanceof EOFException) { - log.debug(exception.toString(), exception); + log.info(msg, exception); + } else if (exception instanceof ConnectException) { + log.debug(msg, exception); } else if (exception instanceof SocketException) { - log.debug(exception.toString(), exception); + log.debug(msg, exception); } else if (exception instanceof UnknownHostException) { - log.warn("UnknownHostException. Might happen if we try to connect to wrong network type"); - log.warn(exception.toString(), exception); + log.warn("UnknownHostException. Might happen if we try to connect to wrong network type.", exception); } else if (exception instanceof SocketTimeoutException) { - log.warn(exception.toString(), exception); + log.info(msg, exception); } else { - log.error(exception.toString(), exception); + log.error(msg, exception); } }