From b63d6320c4800f52a20fbfb03aa3388e58e27030 Mon Sep 17 00:00:00 2001 From: chimp1984 Date: Mon, 11 Jan 2021 21:13:25 -0500 Subject: [PATCH] Add republishExistingProtectedMailboxStorageEntry method Seed nodes republish the persisted mailbox messages after startup in chunks of 50 items with a 2 minute delay. Move check for hasSequenceNrIncreased in addProtectedStorageEntry earlier so we return earlier. This is a very common case for return if we receive outdated data (like republished mailbox data we have already received). Mark logs with ## for easier find/replace for dev testing... --- .../p2p/mailbox/MailboxMessageService.java | 72 ++++++++---- .../network/p2p/storage/P2PDataStorage.java | 110 +++++++++++------- 2 files changed, 119 insertions(+), 63 deletions(-) diff --git a/p2p/src/main/java/bisq/network/p2p/mailbox/MailboxMessageService.java b/p2p/src/main/java/bisq/network/p2p/mailbox/MailboxMessageService.java index 6c3e8978248..4a82ba759fe 100644 --- a/p2p/src/main/java/bisq/network/p2p/mailbox/MailboxMessageService.java +++ b/p2p/src/main/java/bisq/network/p2p/mailbox/MailboxMessageService.java @@ -31,7 +31,6 @@ import bisq.network.p2p.peers.Broadcaster; import bisq.network.p2p.peers.PeerManager; import bisq.network.p2p.peers.getdata.RequestDataManager; -import bisq.network.p2p.seed.SeedNodeRepository; import bisq.network.p2p.storage.HashMapChangedListener; import bisq.network.p2p.storage.P2PDataStorage; import bisq.network.p2p.storage.messages.AddDataMessage; @@ -66,15 +65,18 @@ import java.time.Clock; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Random; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -102,7 +104,8 @@ @Slf4j public class MailboxMessageService implements SetupListener, RequestDataManager.Listener, HashMapChangedListener, PersistedDataHost { - private final SeedNodeRepository seedNodeRepository; + private static final long REPUBLISH_DELAY_SEC = TimeUnit.SECONDS.toSeconds(2); + private final EncryptionService encryptionService; private final IgnoredMailboxService ignoredMailboxService; private final PersistenceManager persistenceManager; @@ -123,7 +126,6 @@ public MailboxMessageService(NetworkNode networkNode, PeerManager peerManager, P2PDataStorage p2PDataStorage, RequestDataManager requestDataManager, - SeedNodeRepository seedNodeRepository, EncryptionService encryptionService, IgnoredMailboxService ignoredMailboxService, PersistenceManager persistenceManager, @@ -133,7 +135,6 @@ public MailboxMessageService(NetworkNode networkNode, this.peerManager = peerManager; this.p2PDataStorage = p2PDataStorage; this.requestDataManager = requestDataManager; - this.seedNodeRepository = seedNodeRepository; this.encryptionService = encryptionService; this.ignoredMailboxService = ignoredMailboxService; this.persistenceManager = persistenceManager; @@ -161,7 +162,7 @@ public void readPersisted(Runnable completeHandler) { String uid = mailboxItem.getUid(); mailboxItemsByUid.put(uid, mailboxItem); mailboxMessageList.add(mailboxItem); - log.trace("readPersisted uid={}\nhash={}\nmailboxItemsByUid={}", + log.trace("## readPersisted uid={}\nhash={}\nmailboxItemsByUid={}", uid, P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()), mailboxItemsByUid); @@ -188,7 +189,7 @@ public void sendEncryptedMailboxMessage(NodeAddress peer, NetworkEnvelope message, SendMailboxMessageListener sendMailboxMessageListener) { if (peersPubKeyRing == null) { - log.error("sendEncryptedMailboxMessage: peersPubKeyRing is null. We ignore the call."); + log.trace("## sendEncryptedMailboxMessage: peersPubKeyRing is null. We ignore the call."); return; } @@ -230,10 +231,10 @@ public void onFailure(@NotNull Throwable throwable) { long ttl; if (message instanceof ExpirablePayload) { ttl = ((ExpirablePayload) message).getTTL(); - log.trace("We take TTL from {}. ttl={}", message.getClass().getSimpleName(), ttl); + log.trace("## We take TTL from {}. ttl={}", message.getClass().getSimpleName(), ttl); } else { ttl = MailboxStoragePayload.TTL; - log.trace("Message is not of type ExpirablePayload. " + + log.trace("## Message is not of type ExpirablePayload. " + "We use the default TTL from MailboxStoragePayload. ttl={}; message={}", ttl, message.getClass().getSimpleName()); } @@ -274,7 +275,7 @@ public void removeMailboxMsg(DecryptedMessageWithPubKey decryptedMessageWithPubK // We will get called the onRemoved handler which triggers removeMailboxItemFromMap as well. // But as we use the uid from the decrypted data which is not available at onRemoved we need to // call removeMailboxItemFromMap here. The onRemoved only removes foreign mailBoxMessages. - log.trace("removeMailboxMsg uid={}", uid); + log.trace("## removeMailboxMsg uid={}", uid); removeMailboxItemFromLocalStore(uid); }); } else { @@ -284,7 +285,7 @@ public void removeMailboxMsg(DecryptedMessageWithPubKey decryptedMessageWithPubK } public Set getMyDecryptedMessages() { - log.trace("getMyMailBoxMessages mailboxItemsByUid={}", mailboxItemsByUid); + log.trace("## getMyMailBoxMessages mailboxItemsByUid={}", mailboxItemsByUid); return mailboxItemsByUid.values().stream() .filter(MailboxItem::isMine) .map(MailboxItem::getDecryptedMessageWithPubKey) @@ -306,7 +307,7 @@ public void onTorNodeReady() { isBootstrapped = true; // As we do not expect a updated data request response we start here with addHashMapChangedListenerAndApply addHashMapChangedListenerAndApply(); - UserThread.runAfter(this::republishMailBoxMessages, 20); + UserThread.runAfter(this::maybeRepublishMailBoxMessages, REPUBLISH_DELAY_SEC); } } @@ -330,7 +331,7 @@ public void onUpdatedDataReceived() { // Only now we start listening and processing. The p2PDataStorage is our cache for data we have received // after the hidden service was ready. addHashMapChangedListenerAndApply(); - UserThread.runAfter(this::republishMailBoxMessages, 20); + UserThread.runAfter(this::maybeRepublishMailBoxMessages, REPUBLISH_DELAY_SEC); } } @@ -349,7 +350,6 @@ public void onAdded(Collection protectedStorageEntries) { .filter(e -> e instanceof ProtectedMailboxStorageEntry) .map(e -> (ProtectedMailboxStorageEntry) e) .filter(e -> networkNode.getNodeAddress() != null) - .filter(e -> !seedNodeRepository.isSeedNode(networkNode.getNodeAddress())) // Seed nodes don't expect mailbox messages .collect(Collectors.toSet()); if (entries.size() > 1) { threadedBatchProcessMailboxEntries(entries); @@ -360,7 +360,7 @@ public void onAdded(Collection protectedStorageEntries) { @Override public void onRemoved(Collection protectedStorageEntries) { - log.trace("onRemoved"); + log.trace("## onRemoved"); // We can only remove the foreign mailbox messages as for our own we use the uid from the decrypted // payload which is not available here. But own mailbox messages get removed anyway after processing // at the removeMailboxMsg method. @@ -368,7 +368,7 @@ public void onRemoved(Collection protectedStorageEntries) .filter(protectedStorageEntry -> protectedStorageEntry instanceof ProtectedMailboxStorageEntry) .map(protectedStorageEntry -> (ProtectedMailboxStorageEntry) protectedStorageEntry) .map(e -> e.getMailboxStoragePayload().getPrefixedSealedAndSignedMessage().getUid()) - .forEach(uid -> removeMailboxItemFromLocalStore(uid)); + .forEach(this::removeMailboxItemFromLocalStore); } @@ -453,7 +453,7 @@ private void handleMailboxItem(MailboxItem mailboxItem) { if (!mailboxItemsByUid.containsKey(uid)) { mailboxItemsByUid.put(uid, mailboxItem); mailboxMessageList.add(mailboxItem); - log.trace("handleMailboxItem uid={}\nhash={}\nmailboxMessageList={}", + log.trace("## handleMailboxItem uid={}\nhash={}\nmailboxMessageList={}", uid, P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()), mailboxItemsByUid); @@ -567,17 +567,41 @@ private void removeMailboxEntryFromNetwork(ProtectedMailboxStorageEntry protecte } } - private void republishMailBoxMessages() { - log.trace("republishMailBoxMessages mailboxItemsByUid={}", mailboxItemsByUid); + private void maybeRepublishMailBoxMessages() { + // We only do the republishing at seed nodes to avoid that the network gets too much traffic + // 1000 mailbox messages are about 3 MB, so that would cause quite some load if all nodes would do that. + if (!peerManager.isSeedNode(networkNode.getNodeAddress())) { + return; + } + + log.trace("## republishMailBoxMessages mailboxItemsByUid={}", mailboxItemsByUid); // In addProtectedStorageEntry we break early if we have already received a remove message for that entry. - mailboxItemsByUid.values().stream() + republishInChunks(mailboxItemsByUid.values().stream() .filter(e -> !e.isExpired(clock)) .map(MailboxItem::getProtectedMailboxStorageEntry) - .forEach(protectedMailboxStorageEntry -> - p2PDataStorage.addProtectedStorageEntry(protectedMailboxStorageEntry, - networkNode.getNodeAddress(), - null)); + .collect(Collectors.toCollection(ArrayDeque::new))); + } + + // We republish buckets of 50 items which is about 200 kb. With 20 connections at a seed node that results in + // 4 MB in total. For 1000 messages it takes 40 min with a 2 min delay. We do that republishing just for + // additional resilience and as a backup in case all seed nodes would fail to prevent that mailbox messages would + // get lost. A long delay for republishing is preferred over too much network load. + private void republishInChunks(Queue queue) { + log.trace("## republishInChunks queue={}", queue.size()); + int i = 0; + while (!queue.isEmpty() && i < 50) { + ProtectedMailboxStorageEntry protectedMailboxStorageEntry = queue.poll(); + i++; + // Broadcaster will accumulate messages in a BundleOfEnvelopes + p2PDataStorage.republishExistingProtectedMailboxStorageEntry(protectedMailboxStorageEntry, + networkNode.getNodeAddress(), + null); + } + if (!queue.isEmpty()) { + // We delay 2 minutes to not overload the network + UserThread.runAfter(() -> republishInChunks(queue), REPUBLISH_DELAY_SEC); + } } private void removeMailboxItemFromLocalStore(String uid) { @@ -585,7 +609,7 @@ private void removeMailboxItemFromLocalStore(String uid) { MailboxItem mailboxItem = mailboxItemsByUid.get(uid); mailboxItemsByUid.remove(uid); mailboxMessageList.remove(mailboxItem); - log.trace("removeMailboxItemFromMap uid={}\nhash={}\nmailboxItemsByUid={}", + log.trace("## removeMailboxItemFromMap uid={}\nhash={}\nmailboxItemsByUid={}", uid, P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()), mailboxItemsByUid diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index 1c39e88bb77..4607ccc9429 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -247,7 +247,7 @@ public void addProtectedMailboxStorageEntryToMap(ProtectedStorageEntry protected ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); map.put(hashOfPayload, protectedStorageEntry); - log.trace("addProtectedMailboxStorageEntryToMap hashOfPayload={}, map={}", hashOfPayload, printMap()); + log.trace("## addProtectedMailboxStorageEntryToMap hashOfPayload={}, map={}", hashOfPayload, printMap()); } @@ -280,14 +280,14 @@ private Set getKnownPayloadHashes() { Map mapForDataRequest = getMapForDataRequest(); Set excludedKeys = getKeysAsByteSet(mapForDataRequest); - log.trace("getKnownPayloadHashes map of PersistableNetworkPayloads={}, excludedKeys={}", + log.trace("## getKnownPayloadHashes map of PersistableNetworkPayloads={}, excludedKeys={}", printPersistableNetworkPayloadMap(mapForDataRequest), - excludedKeys.stream().map(e -> Utilities.encodeToHex(e)).toArray()); + excludedKeys.stream().map(Utilities::encodeToHex).toArray()); Set excludedKeysFromProtectedStorageEntryMap = getKeysAsByteSet(map); - log.trace("getKnownPayloadHashes map of ProtectedStorageEntrys={}, excludedKeys={}", + log.trace("## getKnownPayloadHashes map of ProtectedStorageEntrys={}, excludedKeys={}", printMap(), - excludedKeysFromProtectedStorageEntryMap.stream().map(e -> Utilities.encodeToHex(e)).toArray()); + excludedKeysFromProtectedStorageEntryMap.stream().map(Utilities::encodeToHex).toArray()); excludedKeys.addAll(excludedKeysFromProtectedStorageEntryMap); return excludedKeys; @@ -322,7 +322,7 @@ public GetDataResponse buildGetDataResponse( log.info("{} PersistableNetworkPayload entries remained after filtered by excluded keys. " + "Original map had {} entries.", filteredPersistableNetworkPayloads.size(), mapForDataResponse.size()); - log.trace("buildGetDataResponse filteredPersistableNetworkPayloadHashes={}", + log.trace("## buildGetDataResponse filteredPersistableNetworkPayloadHashes={}", filteredPersistableNetworkPayloads.stream() .map(e -> Utilities.encodeToHex(e.getHash())) .toArray()); @@ -338,7 +338,7 @@ public GetDataResponse buildGetDataResponse( log.info("{} ProtectedStorageEntry entries remained after filtered by excluded keys. " + "Original map had {} entries.", filteredProtectedStorageEntries.size(), map.size()); - log.trace("buildGetDataResponse filteredProtectedStorageEntryHashes={}", + log.trace("## buildGetDataResponse filteredProtectedStorageEntryHashes={}", filteredProtectedStorageEntries.stream() .map(e -> get32ByteHashAsByteArray((e.getProtectedStoragePayload()))) .toArray()); @@ -667,7 +667,7 @@ private boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, boolean allowBroadcast, boolean reBroadcast, boolean checkDate) { - log.trace("addPersistableNetworkPayload payload={}", payload); + log.debug("addPersistableNetworkPayload payload={}", payload); // Payload hash size does not match expectation for that type of message. if (!payload.verifyHashSize()) { @@ -680,7 +680,7 @@ private boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, // Store already knows about this payload. Ignore it unless the caller specifically requests a republish. if (payloadHashAlreadyInStore && !reBroadcast) { - log.trace("addPersistableNetworkPayload failed due to duplicate payload"); + log.debug("addPersistableNetworkPayload failed due to duplicate payload"); return false; } @@ -720,20 +720,21 @@ private void addPersistableNetworkPayloadFromInitialRequest(PersistableNetworkPa } } + public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry, + @Nullable NodeAddress sender, + @Nullable BroadcastHandler.Listener listener) { + return addProtectedStorageEntry(protectedStorageEntry, sender, listener, true); + } + /** - * Adds a ProtectedStorageEntry to the local P2P data storage. If it does not already exist locally, it will be - * broadcast to the P2P network. + * Adds a ProtectedStorageEntry to the local P2P data storage and broadcast if all checks have been successful. * * @param protectedStorageEntry ProtectedStorageEntry to add to the network - * @param sender local NodeAddress, if available + * @param sender Senders nodeAddress, if available * @param listener optional listener that can be used to receive events on broadcast - * @return true if the ProtectedStorageEntry was added to the local P2P data storage and broadcast + * @param allowBroadcast Flag to allow broadcast + * @return true if the ProtectedStorageEntry was added to the local P2P data storage */ - public boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, - @Nullable BroadcastHandler.Listener listener) { - return addProtectedStorageEntry(protectedStorageEntry, sender, listener, true); - } - private boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener, @@ -741,45 +742,50 @@ private boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageE ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload(); ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); - log.trace("call addProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap()); + log.trace("## call addProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap()); + + // We do that check early as it is a very common case for returning, so we return early + // If we have seen a more recent operation for this payload and we have a payload locally, ignore it + ProtectedStorageEntry storedEntry = map.get(hashOfPayload); + if (storedEntry != null && !hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) { + log.trace("## hasSequenceNrIncreased is false. hash={}", hashOfPayload); + return false; + } if (hasAlreadyRemovedAddOncePayload(protectedStoragePayload, hashOfPayload)) { - log.warn("We have already removed that AddOncePayload by a previous removeDataMessage. " + + log.trace("## We have already removed that AddOncePayload by a previous removeDataMessage. " + "We ignore that message. ProtectedStoragePayload: {}", protectedStoragePayload.toString()); return false; } - // To avoid that expired data get stored and broadcast we check early for expire date. + // To avoid that expired data get stored and broadcast we check for expire date. if (protectedStorageEntry.isExpired(clock)) { String peer = sender != null ? sender.getFullAddress() : "sender is null"; - log.debug("We received an expired protectedStorageEntry from peer {}. ProtectedStoragePayload={}", + log.trace("## We received an expired protectedStorageEntry from peer {}. ProtectedStoragePayload={}", peer, protectedStorageEntry.getProtectedStoragePayload().getClass().getSimpleName()); return false; } - ProtectedStorageEntry storedEntry = map.get(hashOfPayload); - - // If we have seen a more recent operation for this payload and we have a payload locally, ignore it - if (storedEntry != null && - !hasSequenceNrIncreased(protectedStorageEntry.getSequenceNumber(), hashOfPayload)) { - return false; - } - // We want to allow add operations for equal sequence numbers if we don't have the payload locally. This is // the case for non-persistent Payloads that need to be reconstructed from peer and seed nodes each startup. MapValue sequenceNumberMapValue = sequenceNumberMap.get(hashOfPayload); if (sequenceNumberMapValue != null && protectedStorageEntry.getSequenceNumber() < sequenceNumberMapValue.sequenceNr) { + log.trace("## sequenceNr too low hash={}", hashOfPayload); return false; } // Verify the ProtectedStorageEntry is well formed and valid for the add operation - if (!protectedStorageEntry.isValidForAddOperation()) + if (!protectedStorageEntry.isValidForAddOperation()) { + log.trace("## !isValidForAddOperation hash={}", hashOfPayload); return false; + } // If we have already seen an Entry with the same hash, verify the metadata is equal - if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry)) + if (storedEntry != null && !protectedStorageEntry.matchesRelevantPubKey(storedEntry)) { + log.trace("## !matchesRelevantPubKey hash={}", hashOfPayload); return false; + } // This is an updated entry. Record it and signal listeners. map.put(hashOfPayload, protectedStorageEntry); @@ -789,12 +795,12 @@ private boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageE sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.getSequenceNumber(), this.clock.millis())); requestPersistence(); - log.trace("ProtectedStorageEntry added to map. hash={}, map={}", hashOfPayload, printMap()); + log.trace("## ProtectedStorageEntry added to map. hash={}, map={}", hashOfPayload, printMap()); // Optionally, broadcast the add/update depending on the calling environment if (allowBroadcast) { broadcaster.broadcast(new AddDataMessage(protectedStorageEntry), sender, listener); - log.trace("broadcasted ProtectedStorageEntry. hash={}", hashOfPayload); + log.trace("## broadcasted ProtectedStorageEntry. hash={}", hashOfPayload); } // Persist ProtectedStorageEntries carrying PersistablePayload payloads if (protectedStoragePayload instanceof PersistablePayload) @@ -803,6 +809,32 @@ private boolean addProtectedStorageEntry(ProtectedStorageEntry protectedStorageE return true; } + /** + * We do not do all checks as it is used for republishing existing mailbox messages from seed nodes which + * only got stored if they had been valid when we received them. + * + * @param protectedMailboxStorageEntry ProtectedMailboxStorageEntry to add to the network + * @param sender Senders nodeAddress, if available + * @param listener optional listener that can be used to receive events on broadcast + */ + public void republishExistingProtectedMailboxStorageEntry(ProtectedMailboxStorageEntry protectedMailboxStorageEntry, + @Nullable NodeAddress sender, + @Nullable BroadcastHandler.Listener listener) { + ProtectedStoragePayload protectedStoragePayload = protectedMailboxStorageEntry.getProtectedStoragePayload(); + ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload); + + log.trace("## call republishProtectedStorageEntry hash={}, map={}", hashOfPayload, printMap()); + + if (hasAlreadyRemovedAddOncePayload(protectedStoragePayload, hashOfPayload)) { + log.trace("## We have already removed that AddOncePayload by a previous removeDataMessage. " + + "We ignore that message. ProtectedStoragePayload: {}", protectedStoragePayload.toString()); + return; + } + + broadcaster.broadcast(new AddDataMessage(protectedMailboxStorageEntry), sender, listener); + log.trace("## broadcasted ProtectedStorageEntry. hash={}", hashOfPayload); + } + public boolean hasAlreadyRemovedAddOncePayload(ProtectedStoragePayload protectedStoragePayload, ByteArray hashOfPayload) { return protectedStoragePayload instanceof AddOncePayload && removedPayloadsService.wasRemoved(hashOfPayload); @@ -991,9 +1023,9 @@ private void removeFromMapAndDataStore(Collection storedSequenceNumber) { - /*log.trace("Sequence number has increased (>). sequenceNumber = " + /*log.debug("Sequence number has increased (>). sequenceNumber = " + newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + " / hashOfData=" + hashOfData.toString());*/ return true; } else if (newSequenceNumber == storedSequenceNumber) { @@ -1025,7 +1057,7 @@ private boolean hasSequenceNrIncreased(int newSequenceNumber, ByteArray hashOfDa msg = "Sequence number is equal to the stored one. sequenceNumber = " + newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber; } - log.trace(msg); + log.debug(msg); return false; } else { log.debug("Sequence number is invalid. sequenceNumber = " @@ -1087,7 +1119,7 @@ private void printData(String info) { .append(Utilities.toTruncatedString(protectedStoragePayload)); }); sb.append("\n------------------------------------------------------------\n"); - log.trace(sb.toString()); + log.debug(sb.toString()); //log.debug("Data set " + info + " operation: size=" + map.values().size()); } }