From 41fb5e464ca37cf0dd42712c8141c4318aaea704 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 4 Jan 2023 12:58:22 -0500 Subject: [PATCH 01/11] Use AtomicBoolean for stopped and timeoutTriggered Signed-off-by: HenrikJannsen --- .../network/p2p/peers/BroadcastHandler.java | 58 ++++++++++--------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index 5a466a5bee1..7f33f233e03 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -37,6 +37,8 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -72,8 +74,12 @@ public interface Listener { private final ResultHandler resultHandler; private final String uid; - private boolean stopped, timeoutTriggered; - private int numOfCompletedBroadcasts, numOfFailedBroadcasts, numPeersForBroadcast; + private final AtomicBoolean stopped = new AtomicBoolean(); + private final AtomicBoolean timeoutTriggered = new AtomicBoolean(); + private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger(); + private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger(); + private final AtomicInteger numPeersForBroadcast = new AtomicInteger(); + private Timer timeoutTimer; @@ -105,29 +111,29 @@ public void broadcast(List broadcastRequests, if (shutDownRequested) { delay = 1; // We sent to all peers as in case we had offers we want that it gets removed with higher reliability - numPeersForBroadcast = confirmedConnections.size(); + numPeersForBroadcast.set(confirmedConnections.size()); } else { if (requestsContainOwnMessage(broadcastRequests)) { // The broadcastRequests contains at least 1 message we have originated, so we send to all peers and // with shorter delay - numPeersForBroadcast = confirmedConnections.size(); + numPeersForBroadcast.set(confirmedConnections.size()); delay = 50; } else { // Relay nodes only send to max 7 peers and with longer delay - numPeersForBroadcast = Math.min(7, confirmedConnections.size()); + numPeersForBroadcast.set(Math.min(7, confirmedConnections.size())); delay = 100; } } setupTimeoutHandler(broadcastRequests, delay, shutDownRequested); - int iterations = numPeersForBroadcast; + int iterations = numPeersForBroadcast.get(); for (int i = 0; i < iterations; i++) { long minDelay = (i + 1) * delay; long maxDelay = (i + 2) * delay; Connection connection = confirmedConnections.get(i); UserThread.runAfterRandomDelay(() -> { - if (stopped) { + if (stopped.get()) { return; } @@ -139,8 +145,8 @@ public void broadcast(List broadcastRequests, // Could be empty list... if (broadcastRequestsForConnection.isEmpty()) { // We decrease numPeers in that case for making completion checks correct. - if (numPeersForBroadcast > 0) { - numPeersForBroadcast--; + if (numPeersForBroadcast.get() > 0) { + numPeersForBroadcast.decrementAndGet(); } checkForCompletion(); return; @@ -149,8 +155,8 @@ public void broadcast(List broadcastRequests, if (connection.isStopped()) { // Connection has died in the meantime. We skip it. // We decrease numPeers in that case for making completion checks correct. - if (numPeersForBroadcast > 0) { - numPeersForBroadcast--; + if (numPeersForBroadcast.get() > 0) { + numPeersForBroadcast.decrementAndGet(); } checkForCompletion(); return; @@ -162,7 +168,7 @@ public void broadcast(List broadcastRequests, } public void cancel() { - stopped = true; + stopped.set(true); cleanup(); } @@ -203,13 +209,13 @@ private void setupTimeoutHandler(List broadcastReq boolean shutDownRequested) { // In case of shutdown we try to complete fast and set a short 1 second timeout long baseTimeoutMs = shutDownRequested ? TimeUnit.SECONDS.toMillis(1) : BASE_TIMEOUT_MS; - long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast + 1); // We added 1 in the loop + long timeoutDelay = baseTimeoutMs + delay * (numPeersForBroadcast.get() + 1); // We added 1 in the loop timeoutTimer = UserThread.runAfter(() -> { - if (stopped) { + if (stopped.get()) { return; } - timeoutTriggered = true; + timeoutTriggered.set(true); log.warn("Broadcast did not complete after {} sec.\n" + "numPeersForBroadcast={}\n" + @@ -248,9 +254,9 @@ private void sendToPeer(Connection connection, Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(Connection connection) { - numOfCompletedBroadcasts++; + numOfCompletedBroadcasts.incrementAndGet(); - if (stopped) { + if (stopped.get()) { return; } @@ -262,9 +268,9 @@ public void onSuccess(Connection connection) { public void onFailure(@NotNull Throwable throwable) { log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(), throwable.getMessage()); - numOfFailedBroadcasts++; + numOfFailedBroadcasts.incrementAndGet(); - if (stopped) { + if (stopped.get()) { return; } @@ -286,9 +292,9 @@ private BroadcastMessage getMessage(List broadcast } private void maybeNotifyListeners(List broadcastRequests) { - int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast, 3)); + int numOfCompletedBroadcastsTarget = Math.max(1, Math.min(numPeersForBroadcast.get(), 3)); // We use equal checks to avoid duplicated listener calls as it would be the case with >= checks. - if (numOfCompletedBroadcasts == numOfCompletedBroadcastsTarget) { + if (numOfCompletedBroadcasts.get() == numOfCompletedBroadcastsTarget) { // We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast. broadcastRequests.stream() .filter(broadcastRequest -> broadcastRequest.getListener() != null) @@ -297,28 +303,28 @@ private void maybeNotifyListeners(List broadcastRe } else { // We check if number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget. // Thus we never can reach required resilience as too many numOfFailedBroadcasts occurred. - int maxPossibleSuccessCases = numPeersForBroadcast - numOfFailedBroadcasts; + int maxPossibleSuccessCases = numPeersForBroadcast.get() - numOfFailedBroadcasts.get(); // We subtract 1 as we want to have it called only once, with a < comparision we would trigger repeatedly. boolean notEnoughSucceededOrOpen = maxPossibleSuccessCases == numOfCompletedBroadcastsTarget - 1; // We did not reach resilience level and timeout prevents to reach it later - boolean timeoutAndNotEnoughSucceeded = timeoutTriggered && numOfCompletedBroadcasts < numOfCompletedBroadcastsTarget; + boolean timeoutAndNotEnoughSucceeded = timeoutTriggered.get() && numOfCompletedBroadcasts.get() < numOfCompletedBroadcastsTarget; if (notEnoughSucceededOrOpen || timeoutAndNotEnoughSucceeded) { broadcastRequests.stream() .filter(broadcastRequest -> broadcastRequest.getListener() != null) .map(Broadcaster.BroadcastRequest::getListener) - .forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts, numOfFailedBroadcasts)); + .forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts.get(), numOfFailedBroadcasts.get())); } } } private void checkForCompletion() { - if (numOfCompletedBroadcasts + numOfFailedBroadcasts == numPeersForBroadcast) { + if (numOfCompletedBroadcasts.get() + numOfFailedBroadcasts.get() == numPeersForBroadcast.get()) { cleanup(); } } private void cleanup() { - stopped = true; + stopped.set(true); if (timeoutTimer != null) { timeoutTimer.stop(); timeoutTimer = null; From 20b39a4055ad1ad75d219760e97010933fcf309b Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 4 Jan 2023 13:06:24 -0500 Subject: [PATCH 02/11] Do not send close message to banned node Signed-off-by: HenrikJannsen --- .../java/bisq/network/p2p/network/CloseConnectionReason.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/src/main/java/bisq/network/p2p/network/CloseConnectionReason.java b/p2p/src/main/java/bisq/network/p2p/network/CloseConnectionReason.java index 2f715d55c52..02bac9d1008 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/CloseConnectionReason.java +++ b/p2p/src/main/java/bisq/network/p2p/network/CloseConnectionReason.java @@ -44,7 +44,7 @@ public enum CloseConnectionReason { // illegal requests RULE_VIOLATION(true, false), - PEER_BANNED(true, false), + PEER_BANNED(false, false), INVALID_CLASS_RECEIVED(false, false), MANDATORY_CAPABILITIES_NOT_SUPPORTED(false, false); From de323399268dbcd99a0de00a37b7dfefc4ae06cc Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 4 Jan 2023 13:08:53 -0500 Subject: [PATCH 03/11] Remove stop setter at cancel Signed-off-by: HenrikJannsen --- p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index 7f33f233e03..ddcb9fa1e8a 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -168,7 +168,6 @@ public void broadcast(List broadcastRequests, } public void cancel() { - stopped.set(true); cleanup(); } From 3e48956227a0f893f891875370bc6f347ad63d23 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 4 Jan 2023 13:11:46 -0500 Subject: [PATCH 04/11] Increase numOfFailedBroadcasts at timeout Signed-off-by: HenrikJannsen --- p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index ddcb9fa1e8a..56ae58b9584 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -215,6 +215,7 @@ private void setupTimeoutHandler(List broadcastReq } timeoutTriggered.set(true); + numOfFailedBroadcasts.incrementAndGet(); log.warn("Broadcast did not complete after {} sec.\n" + "numPeersForBroadcast={}\n" + From 5e29bfe4c21b78c783a1915754860a112edf7d98 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 4 Jan 2023 13:55:20 -0500 Subject: [PATCH 05/11] Maintain pending futures and cancel them at cleanup. Signed-off-by: HenrikJannsen --- .../network/p2p/peers/BroadcastHandler.java | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index 56ae58b9584..88b25d90bdf 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -35,7 +35,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -44,6 +47,7 @@ import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; @Slf4j public class BroadcastHandler implements PeerManager.Listener { @@ -79,8 +83,9 @@ public interface Listener { private final AtomicInteger numOfCompletedBroadcasts = new AtomicInteger(); private final AtomicInteger numOfFailedBroadcasts = new AtomicInteger(); private final AtomicInteger numPeersForBroadcast = new AtomicInteger(); - + @Nullable private Timer timeoutTimer; + private final Set> sendMessageFutures = new CopyOnWriteArraySet<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -104,6 +109,10 @@ public interface Listener { public void broadcast(List broadcastRequests, boolean shutDownRequested, ListeningExecutorService executor) { + if (broadcastRequests.isEmpty()) { + return; + } + List confirmedConnections = new ArrayList<>(networkNode.getConfirmedConnections()); Collections.shuffle(confirmedConnections); @@ -162,7 +171,12 @@ public void broadcast(List broadcastRequests, return; } - sendToPeer(connection, broadcastRequestsForConnection, executor); + try { + sendToPeer(connection, broadcastRequestsForConnection, executor); + } catch (RejectedExecutionException e) { + log.error("RejectedExecutionException at broadcast ", e); + cleanup(); + } }, minDelay, maxDelay, TimeUnit.MILLISECONDS); } } @@ -250,7 +264,7 @@ private void sendToPeer(Connection connection, // Can be BundleOfEnvelopes or a single BroadcastMessage BroadcastMessage broadcastMessage = getMessage(broadcastRequestsForConnection); SettableFuture future = networkNode.sendMessage(connection, broadcastMessage, executor); - + sendMessageFutures.add(future); Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(Connection connection) { @@ -324,11 +338,22 @@ private void checkForCompletion() { } private void cleanup() { + if (stopped.get()) { + return; + } + stopped.set(true); + if (timeoutTimer != null) { timeoutTimer.stop(); timeoutTimer = null; } + + sendMessageFutures.stream() + .filter(future -> !future.isCancelled() && !future.isDone()) + .forEach(future -> future.cancel(true)); + sendMessageFutures.clear(); + peerManager.removeListener(this); resultHandler.onCompleted(this); } From 5efd13a678b0da8a64b1a421edb0845752463e89 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 4 Jan 2023 18:09:04 -0500 Subject: [PATCH 06/11] Check if setException returns false and if so, cancel future. Signed-off-by: HenrikJannsen --- .../bisq/core/provider/fee/FeeRequest.java | 5 ++++- .../core/provider/mempool/MempoolRequest.java | 5 ++++- .../core/provider/price/PriceRequest.java | 5 ++++- .../bisq/network/p2p/network/NetworkNode.java | 21 ++++++++++++++++--- 4 files changed, 30 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/bisq/core/provider/fee/FeeRequest.java b/core/src/main/java/bisq/core/provider/fee/FeeRequest.java index 007a5d39594..da3a0d9cc20 100644 --- a/core/src/main/java/bisq/core/provider/fee/FeeRequest.java +++ b/core/src/main/java/bisq/core/provider/fee/FeeRequest.java @@ -56,7 +56,10 @@ public void onSuccess(Tuple2, Map> feeData) { } public void onFailure(@NotNull Throwable throwable) { - resultFuture.setException(throwable); + if (!resultFuture.setException(throwable)) { + // In case the setException returns false we need to cancel the future. + resultFuture.cancel(true); + } } }, MoreExecutors.directExecutor()); diff --git a/core/src/main/java/bisq/core/provider/mempool/MempoolRequest.java b/core/src/main/java/bisq/core/provider/mempool/MempoolRequest.java index 6141d2ed225..a70ee735cec 100644 --- a/core/src/main/java/bisq/core/provider/mempool/MempoolRequest.java +++ b/core/src/main/java/bisq/core/provider/mempool/MempoolRequest.java @@ -69,7 +69,10 @@ public void onSuccess(String mempoolData) { } public void onFailure(@NotNull Throwable throwable) { - mempoolServiceCallback.setException(throwable); + if (!mempoolServiceCallback.setException(throwable)) { + // In case the setException returns false we need to cancel the future. + mempoolServiceCallback.cancel(true); + } } }, MoreExecutors.directExecutor()); } diff --git a/core/src/main/java/bisq/core/provider/price/PriceRequest.java b/core/src/main/java/bisq/core/provider/price/PriceRequest.java index fa9134099c4..39f4d1a9857 100644 --- a/core/src/main/java/bisq/core/provider/price/PriceRequest.java +++ b/core/src/main/java/bisq/core/provider/price/PriceRequest.java @@ -65,7 +65,10 @@ public void onSuccess(Tuple2, Map> market public void onFailure(@NotNull Throwable throwable) { if (!shutDownRequested) { - resultFuture.setException(new PriceRequestException(throwable, baseUrl)); + if (!resultFuture.setException(new PriceRequestException(throwable, baseUrl))) { + // In case the setException returns false we need to cancel the future. + resultFuture.cancel(true); + } } } }, MoreExecutors.directExecutor()); diff --git a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java index f4f6a5a3216..4401347f67c 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java +++ b/p2p/src/main/java/bisq/network/p2p/network/NetworkNode.java @@ -240,7 +240,12 @@ public void onSuccess(Connection connection) { public void onFailure(@NotNull Throwable throwable) { log.debug("onFailure at sendMessage: peersNodeAddress={}\n\tmessage={}\n\tthrowable={}", peersNodeAddress, networkEnvelope.getClass().getSimpleName(), throwable.toString()); - UserThread.execute(() -> resultFuture.setException(throwable)); + UserThread.execute(() -> { + if (!resultFuture.setException(throwable)) { + // In case the setException returns false we need to cancel the future. + resultFuture.cancel(true); + } + }); } }, MoreExecutors.directExecutor()); @@ -311,13 +316,23 @@ public void onSuccess(Connection connection) { } public void onFailure(@NotNull Throwable throwable) { - UserThread.execute(() -> resultFuture.setException(throwable)); + UserThread.execute(() -> { + if (!resultFuture.setException(throwable)) { + // In case the setException returns false we need to cancel the future. + resultFuture.cancel(true); + } + }); } }, MoreExecutors.directExecutor()); } catch (RejectedExecutionException exception) { log.error("RejectedExecutionException at sendMessage: ", exception); - resultFuture.setException(exception); + UserThread.execute(() -> { + if (!resultFuture.setException(exception)) { + // In case the setException returns false we need to cancel the future. + resultFuture.cancel(true); + } + }); } return resultFuture; } From 1172eec2ced5e5b828290f950c2e9e7b4662fcc3 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 4 Jan 2023 18:14:06 -0500 Subject: [PATCH 07/11] Improve log Signed-off-by: HenrikJannsen --- p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index 88b25d90bdf..e99fbadab30 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -280,8 +280,7 @@ public void onSuccess(Connection connection) { @Override public void onFailure(@NotNull Throwable throwable) { - log.warn("Broadcast to {} failed. ErrorMessage={}", connection.getPeersNodeAddressOptional(), - throwable.getMessage()); + log.warn("Broadcast to " + connection.getPeersNodeAddressOptional() + " failed. ", throwable); numOfFailedBroadcasts.incrementAndGet(); if (stopped.get()) { From 819c6fb200675997c733478e423364431b5c6821 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Wed, 4 Jan 2023 19:30:02 -0500 Subject: [PATCH 08/11] Combine nested if statements Signed-off-by: HenrikJannsen --- .../main/java/bisq/core/provider/price/PriceRequest.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/bisq/core/provider/price/PriceRequest.java b/core/src/main/java/bisq/core/provider/price/PriceRequest.java index 39f4d1a9857..90861c035ea 100644 --- a/core/src/main/java/bisq/core/provider/price/PriceRequest.java +++ b/core/src/main/java/bisq/core/provider/price/PriceRequest.java @@ -60,15 +60,12 @@ public void onSuccess(Tuple2, Map> market if (!shutDownRequested) { resultFuture.set(marketPriceTuple); } - } public void onFailure(@NotNull Throwable throwable) { - if (!shutDownRequested) { - if (!resultFuture.setException(new PriceRequestException(throwable, baseUrl))) { - // In case the setException returns false we need to cancel the future. - resultFuture.cancel(true); - } + if (!shutDownRequested && !resultFuture.setException(new PriceRequestException(throwable, baseUrl))) { + // In case the setException returns false we need to cancel the future. + resultFuture.cancel(true); } } }, MoreExecutors.directExecutor()); From 719602358d828289fdb2d255312ea60eca8fbec7 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Thu, 5 Jan 2023 22:17:09 -0500 Subject: [PATCH 09/11] Increase pool size Signed-off-by: HenrikJannsen --- p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java index dee52c2f87f..f6a93032f83 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/Broadcaster.java @@ -71,8 +71,8 @@ public Broadcaster(NetworkNode networkNode, this.peerManager = peerManager; ThreadPoolExecutor threadPoolExecutor = Utilities.getThreadPoolExecutor("Broadcaster", - maxConnections * 2, maxConnections * 3, + maxConnections * 4, 30, 30); executor = MoreExecutors.listeningDecorator(threadPoolExecutor); From 8b0f8fbeade65a9d5d073779d66c0a4291697618 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Thu, 5 Jan 2023 22:18:10 -0500 Subject: [PATCH 10/11] Cleanup Signed-off-by: HenrikJannsen --- .../java/bisq/core/dao/burningman/BtcFeeReceiverService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/bisq/core/dao/burningman/BtcFeeReceiverService.java b/core/src/main/java/bisq/core/dao/burningman/BtcFeeReceiverService.java index e7f9cde77a7..9134fb95afc 100644 --- a/core/src/main/java/bisq/core/dao/burningman/BtcFeeReceiverService.java +++ b/core/src/main/java/bisq/core/dao/burningman/BtcFeeReceiverService.java @@ -85,7 +85,6 @@ public String getAddress() { // cappedBurnAmountShare is a % value represented as double. Smallest supported value is 0.01% -> 0.0001. // By multiplying it with 10000 and using Math.floor we limit the candidate to 0.01%. // Entries with 0 will be ignored in the selection method, so we do not need to filter them out. - // List burningManCandidates = new ArrayList<>(burningManCandidatesByName.values()); int ceiling = 10000; List amountList = activeBurningManCandidates.stream() .map(BurningManCandidate::getCappedBurnAmountShare) From 9fe3d22b24a1f268aa3b88d48eb5d3e5a096c1e3 Mon Sep 17 00:00:00 2001 From: HenrikJannsen Date: Thu, 5 Jan 2023 22:23:24 -0500 Subject: [PATCH 11/11] Cleanup Signed-off-by: HenrikJannsen --- .../main/java/bisq/network/p2p/peers/BroadcastHandler.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java index e99fbadab30..19644fb3471 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/BroadcastHandler.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; @@ -310,8 +311,8 @@ private void maybeNotifyListeners(List broadcastRe if (numOfCompletedBroadcasts.get() == numOfCompletedBroadcastsTarget) { // We have heard back from 3 peers (or all peers if numPeers is lower) so we consider the message was sufficiently broadcast. broadcastRequests.stream() - .filter(broadcastRequest -> broadcastRequest.getListener() != null) .map(Broadcaster.BroadcastRequest::getListener) + .filter(Objects::nonNull) .forEach(listener -> listener.onSufficientlyBroadcast(broadcastRequests)); } else { // We check if number of open requests to peers is less than we need to reach numOfCompletedBroadcastsTarget. @@ -323,8 +324,8 @@ private void maybeNotifyListeners(List broadcastRe boolean timeoutAndNotEnoughSucceeded = timeoutTriggered.get() && numOfCompletedBroadcasts.get() < numOfCompletedBroadcastsTarget; if (notEnoughSucceededOrOpen || timeoutAndNotEnoughSucceeded) { broadcastRequests.stream() - .filter(broadcastRequest -> broadcastRequest.getListener() != null) .map(Broadcaster.BroadcastRequest::getListener) + .filter(Objects::nonNull) .forEach(listener -> listener.onNotSufficientlyBroadcast(numOfCompletedBroadcasts.get(), numOfFailedBroadcasts.get())); } }