Skip to content

Commit

Permalink
Create PeerGroupService at startup as it is used for persistence and …
Browse files Browse the repository at this point in the history
…the framework requires that persistence service classes are available when the persistence framework read in all data.

As we do not want to pass the default node which is created later via a setter to PeerGroupService, it got refactored so that clients use node instead. This also reduced the usage of PeerGroupService where not needed. The peer group management could need more refactoring as it's a bit messy as it is now.
  • Loading branch information
djing-chan committed Dec 19, 2023
1 parent d0ac717 commit 189e94a
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public void onActivate() {
networkService.getServiceNodesByTransport().findServiceNode(type).ifPresent(serviceNode -> {
serviceNode.getPeerGroupManager().ifPresent(peerGroupManager -> {
PeerGroupService peerGroupService = peerGroupManager.getPeerGroupService();
Node node = peerGroupManager.getNode();
String numTargetConnections = String.valueOf(peerGroupService.getTargetNumConnectedPeers());
switch (type) {
case CLEAR:
Expand All @@ -87,16 +88,16 @@ public void onMessage(EnvelopePayloadMessage envelopePayloadMessage, Connection

@Override
public void onConnection(Connection connection) {
onNumConnectionsChanged(type, peerGroupService);
onNumConnectionsChanged(type, node);
}

@Override
public void onDisconnect(Connection connection, CloseReason closeReason) {
onNumConnectionsChanged(type, peerGroupService);
onNumConnectionsChanged(type, node);
}
});

onNumConnectionsChanged(type, peerGroupService);
onNumConnectionsChanged(type, node);
});
})
);
Expand All @@ -112,9 +113,9 @@ private void onNavigateToNetworkInfo() {
onNavigationTargetSelectedHandler.accept(NavigationTarget.NETWORK_INFO);
}

private void onNumConnectionsChanged(TransportType transportType, PeerGroupService peerGroupService) {
private void onNumConnectionsChanged(TransportType transportType, Node node) {
UIThread.run(() -> {
String value = String.valueOf(peerGroupService.getNumConnections());
String value = String.valueOf(node.getNumConnections());
switch (transportType) {
case CLEAR:
model.getClearNetNumConnections().set(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

@Slf4j
public abstract class BaseNodesByIdTest extends BaseNetworkTest {
Expand All @@ -44,7 +45,7 @@ void test_messageRoundTrip(Node.Config nodeConfig) throws InterruptedException {
BanList banList = new BanList();
TransportService transportService = TransportService.create(nodeConfig.getTransportType(), nodeConfig.getTransportConfig());
PersistenceService persistenceService = new PersistenceService("");
KeyBundleService keyBundleService = new KeyBundleService(persistenceService);
KeyBundleService keyBundleService = new KeyBundleService(persistenceService, mock(KeyBundleService.Config.class));

NodesById nodesById = new NodesById(banList, nodeConfig, keyBundleService, transportService, new NetworkLoadService(), new AuthorizationService(new HashCashService()));
long ts = System.currentTimeMillis();
Expand Down Expand Up @@ -135,7 +136,7 @@ void test_initializeServer(Node.Config nodeConfig) {
BanList banList = new BanList();
TransportService transportService = TransportService.create(nodeConfig.getTransportType(), nodeConfig.getTransportConfig());
PersistenceService persistenceService = new PersistenceService("");
KeyBundleService keyBundleService = new KeyBundleService(persistenceService);
KeyBundleService keyBundleService = new KeyBundleService(persistenceService, mock(KeyBundleService.Config.class));
NodesById nodesById = new NodesById(banList, nodeConfig, keyBundleService, transportService, new NetworkLoadService(), new AuthorizationService(new HashCashService()));
initializeServers(2, nodesById);
nodesById.shutdown().join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package bisq.network.p2p.services.peergroup.exchange;

import bisq.network.common.Address;
import bisq.network.common.TransportType;
import bisq.network.p2p.BaseNetworkTest;
import bisq.network.p2p.node.Node;
import bisq.network.p2p.node.transport.TransportService;
Expand All @@ -43,7 +44,8 @@ void test_peerExchange(Node.Config nodeConfig) throws InterruptedException, Exec
int numSeeds = 2;
int numNodes = 2;
BanList banList = new BanList();
TransportService transportService = TransportService.create(nodeConfig.getTransportType(), nodeConfig.getTransportConfig());
TransportType transportType = nodeConfig.getTransportType();
TransportService transportService = TransportService.create(transportType, nodeConfig.getTransportConfig());
PersistenceService persistenceService = new PersistenceService(getBaseDir().toAbsolutePath().toString());
PeerGroupManager.Config peerGroupServiceConfig = new PeerGroupManager.Config(
null, null, null,
Expand Down Expand Up @@ -99,11 +101,11 @@ void test_peerExchange(Node.Config nodeConfig) throws InterruptedException, Exec
for (int i = 0; i < numNodes; i++) {
Node node = nodes.get(i);
PeerGroupService peerGroupService = new PeerGroupService(persistenceService,
node,
transportType,
new PeerGroupService.Config(),
new HashSet<>(seedNodeAddresses),
banList);
PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, new PeerExchangeStrategy.Config());
PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, node, new PeerExchangeStrategy.Config());
PeerExchangeService peerExchangeService = new PeerExchangeService(node, peerExchangeStrategy);
peerExchangeService.startInitialPeerExchange().whenComplete((result, throwable) -> {
assertNull(throwable);
Expand All @@ -120,11 +122,11 @@ void test_peerExchange(Node.Config nodeConfig) throws InterruptedException, Exec
for (int i = 0; i < numNodes; i++) {
Node node = nodes.get(i);
PeerGroupService peerGroupService = new PeerGroupService(persistenceService,
node,
transportType,
new PeerGroupService.Config(),
new HashSet<>(seedNodeAddresses),
banList);
PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, new PeerExchangeStrategy.Config());
PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, node, new PeerExchangeStrategy.Config());
PeerExchangeService peerExchangeService = new PeerExchangeService(node, peerExchangeStrategy);
peerExchangeService.startInitialPeerExchange().whenComplete((result, throwable) -> {
assertNull(throwable);
Expand Down
25 changes: 13 additions & 12 deletions network/network/src/main/java/bisq/network/p2p/ServiceNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import bisq.network.p2p.services.data.inventory.InventoryService;
import bisq.network.p2p.services.peergroup.BanList;
import bisq.network.p2p.services.peergroup.PeerGroupManager;
import bisq.network.p2p.services.peergroup.PeerGroupService;
import bisq.persistence.PersistenceService;
import bisq.security.keys.KeyBundleService;
import bisq.security.keys.PubKey;
Expand Down Expand Up @@ -103,10 +104,10 @@ public enum SupportedService {
private final Config config;
private final PeerGroupManager.Config peerGroupServiceConfig;
private final Optional<DataService> dataService;
private final PeerGroupService peerGroupService;
private final InventoryService.Config inventoryServiceConfig;
private final Optional<MessageDeliveryStatusService> messageDeliveryStatusService;
private final KeyBundleService keyBundleService;
private final PersistenceService persistenceService;
private final Set<Address> seedNodeAddresses;

@Getter
Expand Down Expand Up @@ -147,11 +148,12 @@ public enum SupportedService {
this.messageDeliveryStatusService = messageDeliveryStatusService;
this.dataService = dataService;
this.keyBundleService = keyBundleService;
this.persistenceService = persistenceService;
this.seedNodeAddresses = seedNodeAddresses;

transportService = TransportService.create(transportType, nodeConfig.getTransportConfig());
nodesById = new NodesById(banList, nodeConfig, keyBundleService, transportService, networkLoadService, authorizationService);

peerGroupService = new PeerGroupService(persistenceService, transportType, peerGroupServiceConfig.getPeerGroupConfig(), seedNodeAddresses, banList);
}


Expand All @@ -164,18 +166,17 @@ Node getInitializedDefaultNode(NetworkId defaultNetworkId) {

Set<SupportedService> supportedServices = config.getSupportedServices();
peerGroupManager = supportedServices.contains(SupportedService.PEER_GROUP) ?
Optional.of(new PeerGroupManager(persistenceService,
defaultNode,
Optional.of(new PeerGroupManager(defaultNode,
peerGroupService,
banList,
peerGroupServiceConfig,
seedNodeAddresses)) :
peerGroupServiceConfig)) :
Optional.empty();

boolean dataServiceEnabled = supportedServices.contains(SupportedService.PEER_GROUP) &&
supportedServices.contains(SupportedService.DATA);

dataNetworkService = dataServiceEnabled ?
Optional.of(new DataNetworkService(defaultNode, peerGroupManager.orElseThrow(), dataService.orElseThrow())) :
Optional.of(new DataNetworkService(defaultNode, dataService.orElseThrow())) :
Optional.empty();

inventoryService = dataServiceEnabled ?
Expand All @@ -192,8 +193,8 @@ Node getInitializedDefaultNode(NetworkId defaultNetworkId) {
setState(State.INITIALIZING);
transportService.initialize();// blocking
defaultNode.initialize();// blocking
peerGroupManager.ifPresentOrElse(peerGroupService -> {
peerGroupService.initialize();// blocking
peerGroupManager.ifPresentOrElse(peerGroupManager -> {
peerGroupManager.initialize();// blocking
setState(State.INITIALIZED);
},
() -> setState(State.INITIALIZED));
Expand Down Expand Up @@ -225,19 +226,19 @@ boolean isNodeInitialized(NetworkId networkId) {

void addSeedNodeAddresses(Set<Address> seedNodeAddresses) {
this.seedNodeAddresses.addAll(seedNodeAddresses);
peerGroupManager.ifPresent(peerGroupService -> peerGroupService.addSeedNodeAddresses(seedNodeAddresses));
peerGroupManager.ifPresent(peerGroupManager -> peerGroupManager.addSeedNodeAddresses(seedNodeAddresses));
}

void addSeedNodeAddress(Address seedNodeAddress) {
// In case we would get called before peerGroupManager is created we add the seedNodeAddress to the
// seedNodeAddresses field
seedNodeAddresses.add(seedNodeAddress);
peerGroupManager.ifPresent(peerGroupService -> peerGroupService.addSeedNodeAddress(seedNodeAddress));
peerGroupManager.ifPresent(peerGroupManager -> peerGroupManager.addSeedNodeAddress(seedNodeAddress));
}

void removeSeedNodeAddress(Address seedNodeAddress) {
seedNodeAddresses.remove(seedNodeAddress);
peerGroupManager.ifPresent(peerGroupService -> peerGroupService.removeSeedNodeAddress(seedNodeAddress));
peerGroupManager.ifPresent(peerGroupManager -> peerGroupManager.removeSeedNodeAddress(seedNodeAddress));
}

SendConfidentialMessageResult confidentialSend(EnvelopePayloadMessage envelopePayloadMessage,
Expand Down
19 changes: 17 additions & 2 deletions network/network/src/main/java/bisq/network/p2p/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,21 @@ public Stream<Connection> getAllConnections() {
return Stream.concat(inboundConnectionsByAddress.values().stream(), outboundConnectionsByAddress.values().stream());
}

public Stream<Connection> getAllActiveConnections() {
return getAllConnections().filter(Connection::isRunning);
}

public Stream<OutboundConnection> getActiveOutboundConnections() {
return getOutboundConnectionsByAddress().values().stream().filter(Connection::isRunning);
}

public Stream<InboundConnection> getActiveInboundConnections() {
return getInboundConnectionsByAddress().values().stream().filter(Connection::isRunning);
}

public int getNumConnections() {
return (int) getAllActiveConnections().count();
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// Connection.Handler
Expand Down Expand Up @@ -587,8 +602,8 @@ public Optional<Address> findMyAddress() {
return server.map(Server::getAddress);
}

public int getNumConnections() {
return inboundConnectionsByAddress.size() + outboundConnectionsByAddress.size();
public boolean notMyself(Address address) {
return findMyAddress().stream().noneMatch(myAddress -> myAddress.equals(address));
}

public boolean isInitialized() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ private void processAckMessage(AckMessage ackMessage) {
return;
}

if (observableStatus.get() == MessageDeliveryStatus.ADDED_TO_MAILBOX) {
observableStatus.set(MessageDeliveryStatus.MAILBOX_MSG_RECEIVED);
} else {
if (observableStatus.get() == MessageDeliveryStatus.START_SENDING) {
observableStatus.set(MessageDeliveryStatus.ARRIVED);
} else {
// Covers ADDED_TO_MAILBOX, TRY_ADD_TO_MAILBOX and FAILED
observableStatus.set(MessageDeliveryStatus.MAILBOX_MSG_RECEIVED);
}
} else {
messageDeliveryStatusByMessageId.put(messageId, new Observable<>(MessageDeliveryStatus.ARRIVED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import bisq.network.p2p.node.Connection;
import bisq.network.p2p.node.Node;
import bisq.network.p2p.services.data.broadcast.Broadcaster;
import bisq.network.p2p.services.peergroup.PeerGroupManager;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -36,10 +35,10 @@ public class DataNetworkService implements Node.Listener {
private final DataService dataService;
private final Broadcaster broadcaster;

public DataNetworkService(Node node, PeerGroupManager peerGroupManager, DataService dataService) {
public DataNetworkService(Node node, DataService dataService) {
this.node = node;
this.dataService = dataService;
broadcaster = new Broadcaster(node, peerGroupManager.getPeerGroupService());
broadcaster = new Broadcaster(node);
node.addListener(this);
dataService.addBroadcaster(broadcaster);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import bisq.network.NetworkService;
import bisq.network.p2p.node.Connection;
import bisq.network.p2p.node.Node;
import bisq.network.p2p.services.peergroup.PeerGroupService;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -39,12 +38,10 @@ public class Broadcaster {
private static final long RE_BROADCAST_DELAY_MS = 100;

private final Node node;
private final PeerGroupService peerGroupService;
private final RetryPolicy<BroadcastResult> retryPolicy;

public Broadcaster(Node node, PeerGroupService peerGroupService) {
public Broadcaster(Node node) {
this.node = node;
this.peerGroupService = peerGroupService;

retryPolicy = RetryPolicy.<BroadcastResult>builder()
.handle(IllegalStateException.class)
Expand Down Expand Up @@ -81,11 +78,11 @@ public CompletableFuture<BroadcastResult> doBroadcast(BroadcastMessage broadcast
.orTimeout(BROADCAST_TIMEOUT, TimeUnit.SECONDS);
AtomicInteger numSuccess = new AtomicInteger(0);
AtomicInteger numFaults = new AtomicInteger(0);
long numConnections = peerGroupService.getAllConnections().count();
long numConnections = node.getAllActiveConnections().count();
long numBroadcasts = Math.min(numConnections, Math.round(numConnections * distributionFactor));
log.debug("Broadcast {} to {} out of {} peers. distributionFactor={}",
broadcastMessage.getClass().getSimpleName(), numBroadcasts, numConnections, distributionFactor);
List<Connection> allConnections = peerGroupService.getAllConnections().collect(Collectors.toList());
List<Connection> allConnections = node.getAllActiveConnections().collect(Collectors.toList());
Collections.shuffle(allConnections);
NetworkService.NETWORK_IO_POOL.submit(() -> {
allConnections.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void onMessage(EnvelopePayloadMessage envelopePayloadMessage, Connection
public void onConnection(Connection connection) {
if (sufficientConnections()) {
log.info("We are sufficiently connected to start the inventory request. numConnections={}",
peerGroupService.getNumConnections());
node.getNumConnections());
doRequest();
}
}
Expand Down Expand Up @@ -187,16 +187,16 @@ private void doRequest() {
private List<CompletableFuture<Inventory>> request(DataFilter dataFilter) {
int maxSeeds = 2;
int maxCandidates = 4;
List<Address> candidates = peerGroupService.getAllConnectedPeers()
List<Address> candidates = peerGroupService.getAllConnectedPeers(node)
.filter(peerGroupService::isSeed)
.limit(maxSeeds)
.map(Peer::getAddress)
.collect(Collectors.toList());
candidates.addAll(peerGroupService.getAllConnectedPeers()
candidates.addAll(peerGroupService.getAllConnectedPeers(node)
.filter(peerGroupService::notASeed)
.map(Peer::getAddress)
.collect(Collectors.toList()));
return peerGroupService.getAllConnections()
return node.getAllActiveConnections()
.filter(connection -> !requestHandlerMap.containsKey(connection.getId()))
.filter(connection -> candidates.contains(connection.getPeerAddress()))
.limit(maxCandidates)
Expand Down Expand Up @@ -224,7 +224,7 @@ private List<CompletableFuture<Inventory>> request(DataFilter dataFilter) {
}

private boolean sufficientConnections() {
return peerGroupService.getNumConnections() > peerGroupService.getTargetNumConnectedPeers() / 2;
return node.getNumConnections() > peerGroupService.getTargetNumConnectedPeers() / 2;
}

private FilterEntry toFilterEntry(Map.Entry<ByteArray, ? extends DataRequest> mapEntry) {
Expand Down
Loading

0 comments on commit 189e94a

Please sign in to comment.