Skip to content

Commit

Permalink
Merge pull request #6500 from HenrikJannsen/improve_broadcast_handler
Browse files Browse the repository at this point in the history
Improve broadcast handler
  • Loading branch information
alejandrogarcia83 authored Jan 6, 2023
2 parents 803a58e + 9fe3d22 commit ec411a4
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public String getAddress() {
// cappedBurnAmountShare is a % value represented as double. Smallest supported value is 0.01% -> 0.0001.
// By multiplying it with 10000 and using Math.floor we limit the candidate to 0.01%.
// Entries with 0 will be ignored in the selection method, so we do not need to filter them out.
// List<BurningManCandidate> burningManCandidates = new ArrayList<>(burningManCandidatesByName.values());
int ceiling = 10000;
List<Long> amountList = activeBurningManCandidates.stream()
.map(BurningManCandidate::getCappedBurnAmountShare)
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/java/bisq/core/provider/fee/FeeRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ public void onSuccess(Tuple2<Map<String, Long>, Map<String, Long>> feeData) {
}

public void onFailure(@NotNull Throwable throwable) {
resultFuture.setException(throwable);
if (!resultFuture.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
}
}, MoreExecutors.directExecutor());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ public void onSuccess(String mempoolData) {
}

public void onFailure(@NotNull Throwable throwable) {
mempoolServiceCallback.setException(throwable);
if (!mempoolServiceCallback.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
mempoolServiceCallback.cancel(true);
}
}
}, MoreExecutors.directExecutor());
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/bisq/core/provider/price/PriceRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ public void onSuccess(Tuple2<Map<String, Long>, Map<String, MarketPrice>> market
if (!shutDownRequested) {
resultFuture.set(marketPriceTuple);
}

}

public void onFailure(@NotNull Throwable throwable) {
if (!shutDownRequested) {
resultFuture.setException(new PriceRequestException(throwable, baseUrl));
if (!shutDownRequested && !resultFuture.setException(new PriceRequestException(throwable, baseUrl))) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
}
}, MoreExecutors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public enum CloseConnectionReason {

// illegal requests
RULE_VIOLATION(true, false),
PEER_BANNED(true, false),
PEER_BANNED(false, false),
INVALID_CLASS_RECEIVED(false, false),
MANDATORY_CAPABILITIES_NOT_SUPPORTED(false, false);

Expand Down
21 changes: 18 additions & 3 deletions p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,12 @@ public void onSuccess(Connection connection) {

public void onFailure(@NotNull Throwable throwable) {
log.debug("onFailure at sendMessage: peersNodeAddress={}\n\tmessage={}\n\tthrowable={}", peersNodeAddress, networkEnvelope.getClass().getSimpleName(), throwable.toString());
UserThread.execute(() -> resultFuture.setException(throwable));
UserThread.execute(() -> {
if (!resultFuture.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
});
}
}, MoreExecutors.directExecutor());

Expand Down Expand Up @@ -311,13 +316,23 @@ public void onSuccess(Connection connection) {
}

public void onFailure(@NotNull Throwable throwable) {
UserThread.execute(() -> resultFuture.setException(throwable));
UserThread.execute(() -> {
if (!resultFuture.setException(throwable)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
});
}
}, MoreExecutors.directExecutor());

} catch (RejectedExecutionException exception) {
log.error("RejectedExecutionException at sendMessage: ", exception);
resultFuture.setException(exception);
UserThread.execute(() -> {
if (!resultFuture.setException(exception)) {
// In case the setException returns false we need to cancel the future.
resultFuture.cancel(true);
}
});
}
return resultFuture;
}
Expand Down
95 changes: 63 additions & 32 deletions p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,20 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@Slf4j
public class BroadcastHandler implements PeerManager.Listener {
Expand Down Expand Up @@ -72,9 +79,14 @@ public interface Listener {
private final ResultHandler resultHandler;
private final String uid;

private boolean stopped, timeoutTriggered;
private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeersForBroadcast;
private final AtomicBoolean stopped = new AtomicBoolean();
private final AtomicBoolean timeoutTriggered = new AtomicBoolean();
private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger();
private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger();
private final AtomicInteger numPeersForBroadcast = new AtomicInteger();
@Nullable
private Timer timeoutTimer;
private final Set<SettableFuture<Connection>> sendMessageFutures = new CopyOnWriteArraySet<>();


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -98,36 +110,40 @@ public interface Listener {
public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
boolean shutDownRequested,
ListeningExecutorService executor) {
if (broadcastRequests.isEmpty()) {
return;
}

List<Connection> confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections());
Collections.shuffle(confirmedConnections);

int delay;
if (shutDownRequested) {
delay = 1;
// We sent to all peers as in case we had offers we want that it gets removed with higher reliability
numPeersForBroadcast = confirmedConnections.size();
numPeersForBroadcast.set(confirmedConnections.size());
} else {
if (requestsContainOwnMessage(broadcastRequests)) {
// The broadcastRequests contains at least 1 message we have originated, so we send to all peers and
// with shorter delay
numPeersForBroadcast = confirmedConnections.size();
numPeersForBroadcast.set(confirmedConnections.size());
delay = 50;
} else {
// Relay nodes only send to max 7 peers and with longer delay
numPeersForBroadcast = Math.min(7, confirmedConnections.size());
numPeersForBroadcast.set(Math.min(7, confirmedConnections.size()));
delay = 100;
}
}

setupTimeoutHandler(broadcastRequests, delay, shutDownRequested);

int iterations = numPeersForBroadcast;
int iterations = numPeersForBroadcast.get();
for (int i = 0; i < iterations; i++) {
long minDelay = (i + 1) * delay;
long maxDelay = (i + 2) * delay;
Connection connection = confirmedConnections.get(i);
UserThread.runAfterRandomDelay(() -> {
if (stopped) {
if (stopped.get()) {
return;
}

Expand All @@ -139,8 +155,8 @@ public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
// Could be empty list...
if (broadcastRequestsForConnection.isEmpty()) {
// We decrease numPeers in that case for making completion checks correct.
if (numPeersForBroadcast > 0) {
numPeersForBroadcast--;
if (numPeersForBroadcast.get() > 0) {
numPeersForBroadcast.decrementAndGet();
}
checkForCompletion();
return;
Expand All @@ -149,20 +165,24 @@ public void broadcast(List<Broadcaster.BroadcastRequest> broadcastRequests,
if (connection.isStopped()) {
// Connection has died in the meantime. We skip it.
// We decrease numPeers in that case for making completion checks correct.
if (numPeersForBroadcast > 0) {
numPeersForBroadcast--;
if (numPeersForBroadcast.get() > 0) {
numPeersForBroadcast.decrementAndGet();
}
checkForCompletion();
return;
}

sendToPeer(connection, broadcastRequestsForConnection, executor);
try {
sendToPeer(connection, broadcastRequestsForConnection, executor);
} catch (RejectedExecutionException e) {
log.error("RejectedExecutionException at broadcast ", e);
cleanup();
}
}, minDelay, maxDelay, TimeUnit.MILLISECONDS);
}
}

public void cancel() {
stopped = true;
cleanup();
}

Expand Down Expand Up @@ -203,13 +223,14 @@ private void setupTimeoutHandler(List<Broadcaster.BroadcastRequest> broadcastReq
boolean shutDownRequested) {
// In case of shutdown we try to complete fast and set a short 1 second timeout
long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1) : BASE_TIMEOUT_MS;
long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast + 1); // We added 1 in the loop
long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast.get() + 1); // We added 1 in the loop
timeoutTimer = UserThread.runAfter(() -> {
if (stopped) {
if (stopped.get()) {
return;
}

timeoutTriggered = true;
timeoutTriggered.set(true);
numOfFailedBroadcasts.incrementAndGet();

log.warn("Broadcast did not complete after {} sec.\n" +
"numPeersForBroadcast={}\n" +
Expand Down Expand Up @@ -244,13 +265,13 @@ private void sendToPeer(Connection connection,
// Can be BundleOfEnvelopes or a single BroadcastMessage
BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection);
SettableFuture<Connection> future = networkNode.sendMessage(connection, broadcastMessage, executor);

sendMessageFutures.add(future);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
numOfCompletedBroadcasts++;
numOfCompletedBroadcasts.incrementAndGet();

if (stopped) {
if (stopped.get()) {
return;
}

Expand All @@ -260,11 +281,10 @@ public void onSuccess(Connection connection) {

@Override
public void onFailure(@NotNull Throwable throwable) {
log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(),
throwable.getMessage());
numOfFailedBroadcasts++;
log.warn("Broadcast to " + connection.getPeersNodeAddressOptional() + " failed. ", throwable);
numOfFailedBroadcasts.incrementAndGet();

if (stopped) {
if (stopped.get()) {
return;
}

Expand All @@ -286,43 +306,54 @@ private BroadcastMessage getMessage(List<Broadcaster.BroadcastRequest> broadcast
}

private void maybeNotifyListeners(List<Broadcaster.BroadcastRequest> broadcastRequests) {
int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast, 3));
int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast.get(), 3));
// We use equal checks to avoid duplicated listener calls as it would be the case with >= checks.
if (numOfCompletedBroadcasts == numOfCompletedBroadcastsTarget) {
if (numOfCompletedBroadcasts.get() == numOfCompletedBroadcastsTarget) {
// We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast.
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener)
.filter(Objects::nonNull)
.forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests));
} else {
// We check if number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget.
// Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred.
int maxPossibleSuccessCases = numPeersForBroadcast - numOfFailedBroadcasts;
int maxPossibleSuccessCases = numPeersForBroadcast.get() - numOfFailedBroadcasts.get();
// We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly.
boolean notEnoughSucceededOrOpen = maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1;
// We did not reach resilience level and timeout prevents to reach it later
boolean timeoutAndNotEnoughSucceeded = timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget;
boolean timeoutAndNotEnoughSucceeded = timeoutTriggered.get() && numOfCompletedBroadcasts.get() < numOfCompletedBroadcastsTarget;
if (notEnoughSucceededOrOpen || timeoutAndNotEnoughSucceeded) {
broadcastRequests.stream()
.filter(broadcastRequest -> broadcastRequest.getListener() != null)
.map(Broadcaster.BroadcastRequest::getListener)
.forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts));
.filter(Objects::nonNull)
.forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts.get(), numOfFailedBroadcasts.get()));
}
}
}

private void checkForCompletion() {
if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeersForBroadcast) {
if (numOfCompletedBroadcasts.get() + numOfFailedBroadcasts.get() == numPeersForBroadcast.get()) {
cleanup();
}
}

private void cleanup() {
stopped = true;
if (stopped.get()) {
return;
}

stopped.set(true);

if (timeoutTimer != null) {
timeoutTimer.stop();
timeoutTimer = null;
}

sendMessageFutures.stream()
.filter(future -> !future.isCancelled() && !future.isDone())
.forEach(future -> future.cancel(true));
sendMessageFutures.clear();

peerManager.removeListener(this);
resultHandler.onCompleted(this);
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public Broadcaster(NetworkNode networkNode,
this.peerManager = peerManager;

ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("Broadcaster",
maxConnections * 2,
maxConnections * 3,
maxConnections * 4,
30,
30);
executor = MoreExecutors.listeningDecorator(threadPoolExecutor);
Expand Down

0 comments on commit ec411a4

Please sign in to comment.