Skip to content

Commit

Permalink
Merge pull request bisq-network#1536 from djing-chan/fix-network-issues
Browse files Browse the repository at this point in the history
Fix network issues [3]
  • Loading branch information
alvasw authored Dec 19, 2023
2 parents 0a8167b + f811f28 commit 0bdbb77
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public void onActivate() {
networkService.getServiceNodesByTransport().findServiceNode(type).ifPresent(serviceNode -> {
serviceNode.getPeerGroupManager().ifPresent(peerGroupManager -> {
PeerGroupService peerGroupService = peerGroupManager.getPeerGroupService();
Node node = peerGroupManager.getNode();
String numTargetConnections = String.valueOf(peerGroupService.getTargetNumConnectedPeers());
switch (type) {
case CLEAR:
Expand All @@ -87,16 +88,16 @@ public void onMessage(EnvelopePayloadMessage envelopePayloadMessage, Connection

@Override
public void onConnection(Connection connection) {
onNumConnectionsChanged(type, peerGroupService);
onNumConnectionsChanged(type, node);
}

@Override
public void onDisconnect(Connection connection, CloseReason closeReason) {
onNumConnectionsChanged(type, peerGroupService);
onNumConnectionsChanged(type, node);
}
});

onNumConnectionsChanged(type, peerGroupService);
onNumConnectionsChanged(type, node);
});
})
);
Expand All @@ -112,9 +113,9 @@ private void onNavigateToNetworkInfo() {
onNavigationTargetSelectedHandler.accept(NavigationTarget.NETWORK_INFO);
}

private void onNumConnectionsChanged(TransportType transportType, PeerGroupService peerGroupService) {
private void onNumConnectionsChanged(TransportType transportType, Node node) {
UIThread.run(() -> {
String value = String.valueOf(peerGroupService.getNumConnections());
String value = String.valueOf(node.getNumConnections());
switch (transportType) {
case CLEAR:
model.getClearNetNumConnections().set(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

@Slf4j
public abstract class BaseNodesByIdTest extends BaseNetworkTest {
Expand All @@ -44,7 +45,7 @@ void test_messageRoundTrip(Node.Config nodeConfig) throws InterruptedException {
BanList banList = new BanList();
TransportService transportService = TransportService.create(nodeConfig.getTransportType(), nodeConfig.getTransportConfig());
PersistenceService persistenceService = new PersistenceService("");
KeyBundleService keyBundleService = new KeyBundleService(persistenceService);
KeyBundleService keyBundleService = new KeyBundleService(persistenceService, mock(KeyBundleService.Config.class));

NodesById nodesById = new NodesById(banList, nodeConfig, keyBundleService, transportService, new NetworkLoadService(), new AuthorizationService(new HashCashService()));
long ts = System.currentTimeMillis();
Expand Down Expand Up @@ -135,7 +136,7 @@ void test_initializeServer(Node.Config nodeConfig) {
BanList banList = new BanList();
TransportService transportService = TransportService.create(nodeConfig.getTransportType(), nodeConfig.getTransportConfig());
PersistenceService persistenceService = new PersistenceService("");
KeyBundleService keyBundleService = new KeyBundleService(persistenceService);
KeyBundleService keyBundleService = new KeyBundleService(persistenceService, mock(KeyBundleService.Config.class));
NodesById nodesById = new NodesById(banList, nodeConfig, keyBundleService, transportService, new NetworkLoadService(), new AuthorizationService(new HashCashService()));
initializeServers(2, nodesById);
nodesById.shutdown().join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package bisq.network.p2p.services.peergroup.exchange;

import bisq.network.common.Address;
import bisq.network.common.TransportType;
import bisq.network.p2p.BaseNetworkTest;
import bisq.network.p2p.node.Node;
import bisq.network.p2p.node.transport.TransportService;
Expand All @@ -43,7 +44,8 @@ void test_peerExchange(Node.Config nodeConfig) throws InterruptedException, Exec
int numSeeds = 2;
int numNodes = 2;
BanList banList = new BanList();
TransportService transportService = TransportService.create(nodeConfig.getTransportType(), nodeConfig.getTransportConfig());
TransportType transportType = nodeConfig.getTransportType();
TransportService transportService = TransportService.create(transportType, nodeConfig.getTransportConfig());
PersistenceService persistenceService = new PersistenceService(getBaseDir().toAbsolutePath().toString());
PeerGroupManager.Config peerGroupServiceConfig = new PeerGroupManager.Config(
null, null, null,
Expand Down Expand Up @@ -99,11 +101,11 @@ void test_peerExchange(Node.Config nodeConfig) throws InterruptedException, Exec
for (int i = 0; i < numNodes; i++) {
Node node = nodes.get(i);
PeerGroupService peerGroupService = new PeerGroupService(persistenceService,
node,
transportType,
new PeerGroupService.Config(),
new HashSet<>(seedNodeAddresses),
banList);
PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, new PeerExchangeStrategy.Config());
PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, node, new PeerExchangeStrategy.Config());
PeerExchangeService peerExchangeService = new PeerExchangeService(node, peerExchangeStrategy);
peerExchangeService.startInitialPeerExchange().whenComplete((result, throwable) -> {
assertNull(throwable);
Expand All @@ -120,11 +122,11 @@ void test_peerExchange(Node.Config nodeConfig) throws InterruptedException, Exec
for (int i = 0; i < numNodes; i++) {
Node node = nodes.get(i);
PeerGroupService peerGroupService = new PeerGroupService(persistenceService,
node,
transportType,
new PeerGroupService.Config(),
new HashSet<>(seedNodeAddresses),
banList);
PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, new PeerExchangeStrategy.Config());
PeerExchangeStrategy peerExchangeStrategy = new PeerExchangeStrategy(peerGroupService, node, new PeerExchangeStrategy.Config());
PeerExchangeService peerExchangeService = new PeerExchangeService(node, peerExchangeStrategy);
peerExchangeService.startInitialPeerExchange().whenComplete((result, throwable) -> {
assertNull(throwable);
Expand Down
25 changes: 13 additions & 12 deletions network/network/src/main/java/bisq/network/p2p/ServiceNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import bisq.network.p2p.services.data.inventory.InventoryService;
import bisq.network.p2p.services.peergroup.BanList;
import bisq.network.p2p.services.peergroup.PeerGroupManager;
import bisq.network.p2p.services.peergroup.PeerGroupService;
import bisq.persistence.PersistenceService;
import bisq.security.keys.KeyBundleService;
import bisq.security.keys.PubKey;
Expand Down Expand Up @@ -103,10 +104,10 @@ public enum SupportedService {
private final Config config;
private final PeerGroupManager.Config peerGroupServiceConfig;
private final Optional<DataService> dataService;
private final PeerGroupService peerGroupService;
private final InventoryService.Config inventoryServiceConfig;
private final Optional<MessageDeliveryStatusService> messageDeliveryStatusService;
private final KeyBundleService keyBundleService;
private final PersistenceService persistenceService;
private final Set<Address> seedNodeAddresses;

@Getter
Expand Down Expand Up @@ -147,11 +148,12 @@ public enum SupportedService {
this.messageDeliveryStatusService = messageDeliveryStatusService;
this.dataService = dataService;
this.keyBundleService = keyBundleService;
this.persistenceService = persistenceService;
this.seedNodeAddresses = seedNodeAddresses;

transportService = TransportService.create(transportType, nodeConfig.getTransportConfig());
nodesById = new NodesById(banList, nodeConfig, keyBundleService, transportService, networkLoadService, authorizationService);

peerGroupService = new PeerGroupService(persistenceService, transportType, peerGroupServiceConfig.getPeerGroupConfig(), seedNodeAddresses, banList);
}


Expand All @@ -164,18 +166,17 @@ Node getInitializedDefaultNode(NetworkId defaultNetworkId) {

Set<SupportedService> supportedServices = config.getSupportedServices();
peerGroupManager = supportedServices.contains(SupportedService.PEER_GROUP) ?
Optional.of(new PeerGroupManager(persistenceService,
defaultNode,
Optional.of(new PeerGroupManager(defaultNode,
peerGroupService,
banList,
peerGroupServiceConfig,
seedNodeAddresses)) :
peerGroupServiceConfig)) :
Optional.empty();

boolean dataServiceEnabled = supportedServices.contains(SupportedService.PEER_GROUP) &&
supportedServices.contains(SupportedService.DATA);

dataNetworkService = dataServiceEnabled ?
Optional.of(new DataNetworkService(defaultNode, peerGroupManager.orElseThrow(), dataService.orElseThrow())) :
Optional.of(new DataNetworkService(defaultNode, dataService.orElseThrow())) :
Optional.empty();

inventoryService = dataServiceEnabled ?
Expand All @@ -192,8 +193,8 @@ Node getInitializedDefaultNode(NetworkId defaultNetworkId) {
setState(State.INITIALIZING);
transportService.initialize();// blocking
defaultNode.initialize();// blocking
peerGroupManager.ifPresentOrElse(peerGroupService -> {
peerGroupService.initialize();// blocking
peerGroupManager.ifPresentOrElse(peerGroupManager -> {
peerGroupManager.initialize();// blocking
setState(State.INITIALIZED);
},
() -> setState(State.INITIALIZED));
Expand Down Expand Up @@ -225,19 +226,19 @@ boolean isNodeInitialized(NetworkId networkId) {

void addSeedNodeAddresses(Set<Address> seedNodeAddresses) {
this.seedNodeAddresses.addAll(seedNodeAddresses);
peerGroupManager.ifPresent(peerGroupService -> peerGroupService.addSeedNodeAddresses(seedNodeAddresses));
peerGroupManager.ifPresent(peerGroupManager -> peerGroupManager.addSeedNodeAddresses(seedNodeAddresses));
}

void addSeedNodeAddress(Address seedNodeAddress) {
// In case we would get called before peerGroupManager is created we add the seedNodeAddress to the
// seedNodeAddresses field
seedNodeAddresses.add(seedNodeAddress);
peerGroupManager.ifPresent(peerGroupService -> peerGroupService.addSeedNodeAddress(seedNodeAddress));
peerGroupManager.ifPresent(peerGroupManager -> peerGroupManager.addSeedNodeAddress(seedNodeAddress));
}

void removeSeedNodeAddress(Address seedNodeAddress) {
seedNodeAddresses.remove(seedNodeAddress);
peerGroupManager.ifPresent(peerGroupService -> peerGroupService.removeSeedNodeAddress(seedNodeAddress));
peerGroupManager.ifPresent(peerGroupManager -> peerGroupManager.removeSeedNodeAddress(seedNodeAddress));
}

SendConfidentialMessageResult confidentialSend(EnvelopePayloadMessage envelopePayloadMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ protected Connection(Socket socket,
} catch (Exception exception) {
//todo StreamCorruptedException from i2p at shutdown. prob it send some text data at shut down
if (isInputStreamActive()) {
log.debug("Call shutdown from startListen read handler {} due exception={}", this, exception.toString());
log.debug("Exception at input handler on {}", this, exception);
close(CloseReason.EXCEPTION.exception(exception));
// EOFException expected if connection got closed

// EOFException expected if connection got closed (Socket closed message)
// TODO maybe also filter SocketException
if (!(exception instanceof EOFException)) {
errorHandler.accept(this, exception);
}
Expand Down Expand Up @@ -191,6 +193,8 @@ Connection send(EnvelopePayloadMessage envelopePayloadMessage, AuthorizationToke
} catch (Throwable throwable) {
if (!isStopped) {
throw throwable;
} else {
log.warn("Send message failed at stopped connection", throwable);
}
}
}
Expand All @@ -207,11 +211,10 @@ Connection send(EnvelopePayloadMessage envelopePayloadMessage, AuthorizationToke
return this;
} catch (IOException exception) {
if (!isStopped) {
log.error("Call shutdown from send {} due exception={}", this, exception.toString());
log.error("Send message failed. {}", this, exception);
close(CloseReason.EXCEPTION.exception(exception));
}
// We wrap any exception (also expected EOFException in case of connection close), to inform the caller
// that the "send proto" intent failed.
// We wrap any exception (also expected EOFException in case of connection close), to leave handling of the exception to the caller.
throw new ConnectionException(exception);
}
}
Expand Down
60 changes: 39 additions & 21 deletions network/network/src/main/java/bisq/network/p2p/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@

import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.net.*;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
Expand Down Expand Up @@ -311,6 +308,7 @@ public Connection send(EnvelopePayloadMessage envelopePayloadMessage, Address ad

public Connection send(EnvelopePayloadMessage envelopePayloadMessage, Connection connection) {
if (connection.isStopped()) {
log.debug("Send message failed as connection is already stopped {}", this);
throw new ConnectionClosedException(connection);
}
try {
Expand All @@ -322,6 +320,7 @@ public Connection send(EnvelopePayloadMessage envelopePayloadMessage, Connection
} catch (Throwable throwable) {
if (connection.isRunning()) {
handleException(connection, throwable);
log.debug("Send message failed on {}", this, throwable);
closeConnection(connection, CloseReason.EXCEPTION.exception(throwable));
}
throw new ConnectionClosedException(connection);
Expand Down Expand Up @@ -349,15 +348,17 @@ public Connection getConnection(Address address) {
///////////////////////////////////////////////////////////////////////////////////////////////////

private Connection createOutboundConnection(Address address) {
return myCapability.map(capability -> createOutboundConnection(address, capability)).orElseGet(() -> {
int port = networkId.getAddressByTransportTypeMap().get(transportType).getPort();
log.warn("We create an outbound connection but we have not initialized our server. " +
"We create a server on port {} now but clients better control node " +
"life cycle themselves.", port);
initialize();
checkArgument(myCapability.isPresent(), "myCapability must be present after initializeServer got called");
return createOutboundConnection(address, myCapability.get());
});
log.debug("Create outbound connection to {}", address);
return myCapability.map(capability -> createOutboundConnection(address, capability))
.orElseGet(() -> {
int port = networkId.getAddressByTransportTypeMap().get(transportType).getPort();
log.warn("We create an outbound connection but we have not initialized our server. " +
"We create a server on port {} now but clients better control node " +
"life cycle themselves.", port);
initialize();
checkArgument(myCapability.isPresent(), "myCapability must be present after initializeServer got called");
return createOutboundConnection(address, myCapability.get());
});
}

private Connection createOutboundConnection(Address address, Capability myCapability) {
Expand Down Expand Up @@ -438,6 +439,21 @@ public Stream<Connection> getAllConnections() {
return Stream.concat(inboundConnectionsByAddress.values().stream(), outboundConnectionsByAddress.values().stream());
}

public Stream<Connection> getAllActiveConnections() {
return getAllConnections().filter(Connection::isRunning);
}

public Stream<OutboundConnection> getActiveOutboundConnections() {
return getOutboundConnectionsByAddress().values().stream().filter(Connection::isRunning);
}

public Stream<InboundConnection> getActiveInboundConnections() {
return getInboundConnectionsByAddress().values().stream().filter(Connection::isRunning);
}

public int getNumConnections() {
return (int) getAllActiveConnections().count();
}

///////////////////////////////////////////////////////////////////////////////////////////////////
// Connection.Handler
Expand Down Expand Up @@ -587,8 +603,8 @@ public Optional<Address> findMyAddress() {
return server.map(Server::getAddress);
}

public int getNumConnections() {
return inboundConnectionsByAddress.size() + outboundConnectionsByAddress.size();
public boolean notMyself(Address address) {
return findMyAddress().stream().noneMatch(myAddress -> myAddress.equals(address));
}

public boolean isInitialized() {
Expand Down Expand Up @@ -625,17 +641,19 @@ private void handleException(Throwable exception) {
if (isShutdown()) {
return;
}
String msg = "Exception: ";
if (exception instanceof EOFException) {
log.debug(exception.toString(), exception);
log.info(msg, exception);
} else if (exception instanceof ConnectException) {
log.debug(msg, exception);
} else if (exception instanceof SocketException) {
log.debug(exception.toString(), exception);
log.debug(msg, exception);
} else if (exception instanceof UnknownHostException) {
log.warn("UnknownHostException. Might happen if we try to connect to wrong network type");
log.warn(exception.toString(), exception);
log.warn("UnknownHostException. Might happen if we try to connect to wrong network type.", exception);
} else if (exception instanceof SocketTimeoutException) {
log.warn(exception.toString(), exception);
log.info(msg, exception);
} else {
log.error(exception.toString(), exception);
log.error(msg, exception);
}
}

Expand Down
Loading

0 comments on commit 0bdbb77

Please sign in to comment.