Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix peer exchange handling #608

Merged
merged 2 commits into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package bisq.network.p2p.services.peergroup.exchange;

import bisq.common.timer.Scheduler;
import bisq.common.util.CompletableFutureUtils;
import bisq.common.util.StringUtils;
import bisq.network.p2p.message.NetworkMessage;
import bisq.network.p2p.node.Address;
Expand All @@ -30,13 +29,13 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static bisq.network.NetworkService.NETWORK_IO_POOL;
Expand Down Expand Up @@ -87,37 +86,52 @@ private CompletableFuture<Void> doPeerExchange(Set<Address> candidates) {
.map(Address::toString)
.collect(Collectors.toList())
.toString()));
List<CompletableFuture<Boolean>> allFutures = candidates.stream()

CompletableFuture<Void> resultFuture = new CompletableFuture<>();
AtomicInteger numSuccess = new AtomicInteger();
AtomicInteger numFailures = new AtomicInteger();
candidates.stream()
.map(this::doPeerExchangeAsync)
.collect(Collectors.toList());

// When all futures complete successfully,
// then consider peer exchange complete and decide whether it should be re-done, in case of too few peers
CompletableFutureUtils.allOf(allFutures)
.thenApply(resultList -> {
int numSuccess = (int) resultList.stream().filter(e -> e).count();
log.info("Node {} completed peer exchange to {} candidates. {} requests successfully completed.",
node, candidates.size(), numSuccess);
if (peerExchangeStrategy.redoInitialPeerExchange(numSuccess, candidates.size())) {
log.info("Node {} repeats the initial peer exchange after {} sec as it has not reached sufficient connections " +
"or received sufficient peers", node, doInitialPeerExchangeDelaySec);
scheduler.ifPresent(Scheduler::stop);
scheduler = Optional.of(Scheduler.run(this::doInitialPeerExchange)
.after(doInitialPeerExchangeDelaySec, TimeUnit.SECONDS)
.name("PeerExchangeService.scheduler-" + node));
doInitialPeerExchangeDelaySec = Math.min(60, doInitialPeerExchangeDelaySec * 2);
} else {
scheduler.ifPresent(Scheduler::stop);
}
return null;
.forEach(future -> {
future.whenComplete((result, throwable) -> {
if (throwable == null) {
if (result) {
numSuccess.incrementAndGet();
if (!resultFuture.isDone()) {
log.info("We got at least one peerExchange future completed.");
resultFuture.complete(null);
}
} else {
numFailures.incrementAndGet();
}
} else {
numFailures.incrementAndGet();
}

if (numFailures.get() + numSuccess.get() == candidates.size()) {
if (!resultFuture.isDone()) {
log.info("We got all peerExchange futures completed but none was successful. This is expected when the first node bootstraps");
resultFuture.complete(null);
}

log.info("Node {} completed peer exchange to {} candidates. {} requests successfully completed.",
node, candidates.size(), numSuccess);
if (peerExchangeStrategy.shouldRedoInitialPeerExchange(numSuccess.get(), candidates.size())) {
log.info("Node {} repeats the initial peer exchange after {} sec as it has not reached sufficient connections " +
"or received sufficient peers", node, doInitialPeerExchangeDelaySec);
scheduler.ifPresent(Scheduler::stop);
scheduler = Optional.of(Scheduler.run(this::doInitialPeerExchange)
.after(doInitialPeerExchangeDelaySec, TimeUnit.SECONDS)
.name("PeerExchangeService.scheduler-" + node));
doInitialPeerExchangeDelaySec = Math.min(60, doInitialPeerExchangeDelaySec * 2);
} else {
scheduler.ifPresent(Scheduler::stop);
}
}
});
});

// Complete when any peer exchange succeeds, or when all fail.
return CompletableFutureUtils.anyOf(allFutures)
.thenApply(result -> {
log.info("Node {} completed peer exchange to at least one candidate", node);
return null;
});
return resultFuture;
}

private CompletableFuture<Boolean> doPeerExchangeAsync(Address peerAddress) {
Expand All @@ -140,14 +154,15 @@ private boolean doPeerExchange(Address peerAddress) {
Set<Peer> myPeers = peerExchangeStrategy.getPeers(peerAddress);

Set<Peer> peers = handler.request(myPeers).join();
log.info("Node {} completed peer exchange with {} and received {} peers.", node, peerAddress, peers.size());
peerExchangeStrategy.addReportedPeers(peers, peerAddress);
requestHandlerMap.remove(key);
return true;
} catch (Throwable throwable) {
if (key != null) {
requestHandlerMap.remove(key);
}
// Expect ConnectException if peer is not available
log.info("Node {} failed to do a peer exchange with {} because of: {}", node, peerAddress, throwable.getMessage());
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void addReportedPeers(Set<Peer> peers, Address peerAddress) {
peerGroup.addReportedPeers(filtered);
}

boolean redoInitialPeerExchange(long numSuccess, int numRequests) {
boolean shouldRedoInitialPeerExchange(int numSuccess, int numRequests) {
boolean moreThenHalfFailed = numRequests - numSuccess > numRequests / 2;
return moreThenHalfFailed ||
!sufficientConnections() ||
Expand Down