Skip to content

Commit

Permalink
Merge pull request #608 from HenrikJannsen/fix_peer_exchange_handling
Browse files Browse the repository at this point in the history
Fix peer exchange handling
  • Loading branch information
alvasw authored Dec 21, 2022
2 parents 1a61c19 + 15a4e52 commit f0fe962
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package bisq.network.p2p.services.peergroup.exchange;

import bisq.common.timer.Scheduler;
import bisq.common.util.CompletableFutureUtils;
import bisq.common.util.StringUtils;
import bisq.network.p2p.message.NetworkMessage;
import bisq.network.p2p.node.Address;
Expand All @@ -30,13 +29,13 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static bisq.network.NetworkService.NETWORK_IO_POOL;
Expand Down Expand Up @@ -87,37 +86,52 @@ private CompletableFuture<Void> doPeerExchange(Set<Address> candidates) {
.map(Address::toString)
.collect(Collectors.toList())
.toString()));
List<CompletableFuture<Boolean>> allFutures = candidates.stream()

CompletableFuture<Void> resultFuture = new CompletableFuture<>();
AtomicInteger numSuccess = new AtomicInteger();
AtomicInteger numFailures = new AtomicInteger();
candidates.stream()
.map(this::doPeerExchangeAsync)
.collect(Collectors.toList());

// When all futures complete successfully,
// then consider peer exchange complete and decide whether it should be re-done, in case of too few peers
CompletableFutureUtils.allOf(allFutures)
.thenApply(resultList -> {
int numSuccess = (int) resultList.stream().filter(e -> e).count();
log.info("Node {} completed peer exchange to {} candidates. {} requests successfully completed.",
node, candidates.size(), numSuccess);
if (peerExchangeStrategy.redoInitialPeerExchange(numSuccess, candidates.size())) {
log.info("Node {} repeats the initial peer exchange after {} sec as it has not reached sufficient connections " +
"or received sufficient peers", node, doInitialPeerExchangeDelaySec);
scheduler.ifPresent(Scheduler::stop);
scheduler = Optional.of(Scheduler.run(this::doInitialPeerExchange)
.after(doInitialPeerExchangeDelaySec, TimeUnit.SECONDS)
.name("PeerExchangeService.scheduler-" + node));
doInitialPeerExchangeDelaySec = Math.min(60, doInitialPeerExchangeDelaySec * 2);
} else {
scheduler.ifPresent(Scheduler::stop);
}
return null;
.forEach(future -> {
future.whenComplete((result, throwable) -> {
if (throwable == null) {
if (result) {
numSuccess.incrementAndGet();
if (!resultFuture.isDone()) {
log.info("We got at least one peerExchange future completed.");
resultFuture.complete(null);
}
} else {
numFailures.incrementAndGet();
}
} else {
numFailures.incrementAndGet();
}

if (numFailures.get() + numSuccess.get() == candidates.size()) {
if (!resultFuture.isDone()) {
log.info("We got all peerExchange futures completed but none was successful. This is expected when the first node bootstraps");
resultFuture.complete(null);
}

log.info("Node {} completed peer exchange to {} candidates. {} requests successfully completed.",
node, candidates.size(), numSuccess);
if (peerExchangeStrategy.shouldRedoInitialPeerExchange(numSuccess.get(), candidates.size())) {
log.info("Node {} repeats the initial peer exchange after {} sec as it has not reached sufficient connections " +
"or received sufficient peers", node, doInitialPeerExchangeDelaySec);
scheduler.ifPresent(Scheduler::stop);
scheduler = Optional.of(Scheduler.run(this::doInitialPeerExchange)
.after(doInitialPeerExchangeDelaySec, TimeUnit.SECONDS)
.name("PeerExchangeService.scheduler-" + node));
doInitialPeerExchangeDelaySec = Math.min(60, doInitialPeerExchangeDelaySec * 2);
} else {
scheduler.ifPresent(Scheduler::stop);
}
}
});
});

// Complete when any peer exchange succeeds, or when all fail.
return CompletableFutureUtils.anyOf(allFutures)
.thenApply(result -> {
log.info("Node {} completed peer exchange to at least one candidate", node);
return null;
});
return resultFuture;
}

private CompletableFuture<Boolean> doPeerExchangeAsync(Address peerAddress) {
Expand All @@ -140,14 +154,15 @@ private boolean doPeerExchange(Address peerAddress) {
Set<Peer> myPeers = peerExchangeStrategy.getPeers(peerAddress);

Set<Peer> peers = handler.request(myPeers).join();
log.info("Node {} completed peer exchange with {} and received {} peers.", node, peerAddress, peers.size());
peerExchangeStrategy.addReportedPeers(peers, peerAddress);
requestHandlerMap.remove(key);
return true;
} catch (Throwable throwable) {
if (key != null) {
requestHandlerMap.remove(key);
}
// Expect ConnectException if peer is not available
log.info("Node {} failed to do a peer exchange with {} because of: {}", node, peerAddress, throwable.getMessage());
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void addReportedPeers(Set<Peer> peers, Address peerAddress) {
peerGroup.addReportedPeers(filtered);
}

boolean redoInitialPeerExchange(long numSuccess, int numRequests) {
boolean shouldRedoInitialPeerExchange(int numSuccess, int numRequests) {
boolean moreThenHalfFailed = numRequests - numSuccess > numRequests / 2;
return moreThenHalfFailed ||
!sufficientConnections() ||
Expand Down

0 comments on commit f0fe962

Please sign in to comment.