diff --git a/p2p/src/main/java/bisq/network/p2p/network/Connection.java b/p2p/src/main/java/bisq/network/p2p/network/Connection.java index c3f6f222215..0bec36f8ac5 100644 --- a/p2p/src/main/java/bisq/network/p2p/network/Connection.java +++ b/p2p/src/main/java/bisq/network/p2p/network/Connection.java @@ -28,6 +28,7 @@ import bisq.network.p2p.peers.getdata.messages.GetDataRequest; import bisq.network.p2p.peers.getdata.messages.GetDataResponse; import bisq.network.p2p.peers.keepalive.messages.KeepAliveMessage; +import bisq.network.p2p.storage.P2PDataStorage; import bisq.network.p2p.storage.messages.AddDataMessage; import bisq.network.p2p.storage.messages.AddPersistableNetworkPayloadMessage; import bisq.network.p2p.storage.payload.CapabilityRequiringPayload; @@ -425,41 +426,40 @@ private boolean violatesThrottleLimit(long now, int seconds, int messageCountLim @Override public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { checkArgument(connection.equals(this)); - int accountAgeWitnessEntries = 0; if (networkEnvelope instanceof BundleOfEnvelopes) { - Map> map = new HashMap<>(); - Set set = new HashSet<>(); - - List networkEnvelopes = ((BundleOfEnvelopes) networkEnvelope).getEnvelopes(); - for (NetworkEnvelope current : networkEnvelopes) { - String simpleName = current.getClass().getSimpleName(); - boolean isAccountAgeWitness = false; - if (current instanceof AddPersistableNetworkPayloadMessage) { - PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) current).getPersistableNetworkPayload(); - simpleName = "AddPersistableNetworkPayloadMessage." + persistableNetworkPayload.getClass().getSimpleName(); - if (simpleName.equals("AddPersistableNetworkPayloadMessage.AccountAgeWitness")) { - accountAgeWitnessEntries++; - isAccountAgeWitness = true; - } - } - map.putIfAbsent(simpleName, new ArrayList<>()); - map.get(simpleName).add(current); - if (!isAccountAgeWitness || accountAgeWitnessEntries < 20) { - set.add(current); - } - } - map.forEach((key, value) -> log.info("BundleOfEnvelope with {} items of {}, from {}", - value.size(), key, connection.getPeersNodeAddressOptional())); - - log.info("We forward {} items. All received items: {}", set.size(), networkEnvelopes.size()); - - set.forEach(envelope -> UserThread.execute(() -> - messageListeners.forEach(listener -> listener.onMessage(envelope, connection)))); + onBundleOfEnvelopes((BundleOfEnvelopes) networkEnvelope, connection); } else { UserThread.execute(() -> messageListeners.forEach(e -> e.onMessage(networkEnvelope, connection))); } } + private void onBundleOfEnvelopes(BundleOfEnvelopes networkEnvelope, Connection connection) { + Map> itemsByHash = new HashMap<>(); + Set envelopesToProcess = new HashSet<>(); + List networkEnvelopes = networkEnvelope.getEnvelopes(); + for (NetworkEnvelope current : networkEnvelopes) { + if (current instanceof AddPersistableNetworkPayloadMessage) { + PersistableNetworkPayload persistableNetworkPayload = ((AddPersistableNetworkPayloadMessage) current).getPersistableNetworkPayload(); + byte[] hash = persistableNetworkPayload.getHash(); + String itemName = persistableNetworkPayload.getClass().getSimpleName(); + P2PDataStorage.ByteArray byteArray = new P2PDataStorage.ByteArray(hash); + itemsByHash.putIfAbsent(byteArray, new HashSet<>()); + Set envelopesByHash = itemsByHash.get(byteArray); + if (!envelopesByHash.contains(current)) { + envelopesByHash.add(current); + envelopesToProcess.add(current); + } else { + log.debug("We got duplicated items for {}. We ignore the duplicates. Hash: {}", + itemName, Utilities.encodeToHex(hash)); + } + } else { + envelopesToProcess.add(current); + } + } + envelopesToProcess.forEach(envelope -> UserThread.execute(() -> + messageListeners.forEach(listener -> listener.onMessage(envelope, connection)))); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Setters