Skip to content

Commit

Permalink
Add republishExistingProtectedMailboxStorageEntry method
Browse files Browse the repository at this point in the history
Seed nodes republish the persisted mailbox messages after
startup in chunks of 50 items with a 2 minute delay.

Move check for hasSequenceNrIncreased in addProtectedStorageEntry
earlier so we return earlier. This is a very common case for return
if we receive outdated data (like republished mailbox data we have
already received).

Mark logs with ## for easier find/replace for dev testing...
  • Loading branch information
chimp1984 committed Jan 12, 2021
1 parent e0c4255 commit b63d632
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import bisq.network.p2p.peers.Broadcaster;
import bisq.network.p2p.peers.PeerManager;
import bisq.network.p2p.peers.getdata.RequestDataManager;
import bisq.network.p2p.seed.SeedNodeRepository;
import bisq.network.p2p.storage.HashMapChangedListener;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.messages.AddDataMessage;
Expand Down Expand Up @@ -66,15 +65,18 @@

import java.time.Clock;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -102,7 +104,8 @@
@Slf4j
public class MailboxMessageService implements SetupListener, RequestDataManager.Listener, HashMapChangedListener,
PersistedDataHost {
private final SeedNodeRepository seedNodeRepository;
private static final long REPUBLISH_DELAY_SEC = TimeUnit.SECONDS.toSeconds(2);

private final EncryptionService encryptionService;
private final IgnoredMailboxService ignoredMailboxService;
private final PersistenceManager<MailboxMessageList> persistenceManager;
Expand All @@ -123,7 +126,6 @@ public MailboxMessageService(NetworkNode networkNode,
PeerManager peerManager,
P2PDataStorage p2PDataStorage,
RequestDataManager requestDataManager,
SeedNodeRepository seedNodeRepository,
EncryptionService encryptionService,
IgnoredMailboxService ignoredMailboxService,
PersistenceManager<MailboxMessageList> persistenceManager,
Expand All @@ -133,7 +135,6 @@ public MailboxMessageService(NetworkNode networkNode,
this.peerManager = peerManager;
this.p2PDataStorage = p2PDataStorage;
this.requestDataManager = requestDataManager;
this.seedNodeRepository = seedNodeRepository;
this.encryptionService = encryptionService;
this.ignoredMailboxService = ignoredMailboxService;
this.persistenceManager = persistenceManager;
Expand Down Expand Up @@ -161,7 +162,7 @@ public void readPersisted(Runnable completeHandler) {
String uid = mailboxItem.getUid();
mailboxItemsByUid.put(uid, mailboxItem);
mailboxMessageList.add(mailboxItem);
log.trace("readPersisted uid={}\nhash={}\nmailboxItemsByUid={}",
log.trace("## readPersisted uid={}\nhash={}\nmailboxItemsByUid={}",
uid,
P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()),
mailboxItemsByUid);
Expand All @@ -188,7 +189,7 @@ public void sendEncryptedMailboxMessage(NodeAddress peer,
NetworkEnvelope message,
SendMailboxMessageListener sendMailboxMessageListener) {
if (peersPubKeyRing == null) {
log.error("sendEncryptedMailboxMessage: peersPubKeyRing is null. We ignore the call.");
log.trace("## sendEncryptedMailboxMessage: peersPubKeyRing is null. We ignore the call.");
return;
}

Expand Down Expand Up @@ -230,10 +231,10 @@ public void onFailure(@NotNull Throwable throwable) {
long ttl;
if (message instanceof ExpirablePayload) {
ttl = ((ExpirablePayload) message).getTTL();
log.trace("We take TTL from {}. ttl={}", message.getClass().getSimpleName(), ttl);
log.trace("## We take TTL from {}. ttl={}", message.getClass().getSimpleName(), ttl);
} else {
ttl = MailboxStoragePayload.TTL;
log.trace("Message is not of type ExpirablePayload. " +
log.trace("## Message is not of type ExpirablePayload. " +
"We use the default TTL from MailboxStoragePayload. ttl={}; message={}",
ttl, message.getClass().getSimpleName());
}
Expand Down Expand Up @@ -274,7 +275,7 @@ public void removeMailboxMsg(DecryptedMessageWithPubKey decryptedMessageWithPubK
// We will get called the onRemoved handler which triggers removeMailboxItemFromMap as well.
// But as we use the uid from the decrypted data which is not available at onRemoved we need to
// call removeMailboxItemFromMap here. The onRemoved only removes foreign mailBoxMessages.
log.trace("removeMailboxMsg uid={}", uid);
log.trace("## removeMailboxMsg uid={}", uid);
removeMailboxItemFromLocalStore(uid);
});
} else {
Expand All @@ -284,7 +285,7 @@ public void removeMailboxMsg(DecryptedMessageWithPubKey decryptedMessageWithPubK
}

public Set<DecryptedMessageWithPubKey> getMyDecryptedMessages() {
log.trace("getMyMailBoxMessages mailboxItemsByUid={}", mailboxItemsByUid);
log.trace("## getMyMailBoxMessages mailboxItemsByUid={}", mailboxItemsByUid);
return mailboxItemsByUid.values().stream()
.filter(MailboxItem::isMine)
.map(MailboxItem::getDecryptedMessageWithPubKey)
Expand All @@ -306,7 +307,7 @@ public void onTorNodeReady() {
isBootstrapped = true;
// As we do not expect a updated data request response we start here with addHashMapChangedListenerAndApply
addHashMapChangedListenerAndApply();
UserThread.runAfter(this::republishMailBoxMessages, 20);
UserThread.runAfter(this::maybeRepublishMailBoxMessages, REPUBLISH_DELAY_SEC);
}
}

Expand All @@ -330,7 +331,7 @@ public void onUpdatedDataReceived() {
// Only now we start listening and processing. The p2PDataStorage is our cache for data we have received
// after the hidden service was ready.
addHashMapChangedListenerAndApply();
UserThread.runAfter(this::republishMailBoxMessages, 20);
UserThread.runAfter(this::maybeRepublishMailBoxMessages, REPUBLISH_DELAY_SEC);
}
}

Expand All @@ -349,7 +350,6 @@ public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {
.filter(e -> e instanceof ProtectedMailboxStorageEntry)
.map(e -> (ProtectedMailboxStorageEntry) e)
.filter(e -> networkNode.getNodeAddress() != null)
.filter(e -> !seedNodeRepository.isSeedNode(networkNode.getNodeAddress())) // Seed nodes don't expect mailbox messages
.collect(Collectors.toSet());
if (entries.size() > 1) {
threadedBatchProcessMailboxEntries(entries);
Expand All @@ -360,15 +360,15 @@ public void onAdded(Collection<ProtectedStorageEntry> protectedStorageEntries) {

@Override
public void onRemoved(Collection<ProtectedStorageEntry> protectedStorageEntries) {
log.trace("onRemoved");
log.trace("## onRemoved");
// We can only remove the foreign mailbox messages as for our own we use the uid from the decrypted
// payload which is not available here. But own mailbox messages get removed anyway after processing
// at the removeMailboxMsg method.
protectedStorageEntries.stream()
.filter(protectedStorageEntry -> protectedStorageEntry instanceof ProtectedMailboxStorageEntry)
.map(protectedStorageEntry -> (ProtectedMailboxStorageEntry) protectedStorageEntry)
.map(e -> e.getMailboxStoragePayload().getPrefixedSealedAndSignedMessage().getUid())
.forEach(uid -> removeMailboxItemFromLocalStore(uid));
.forEach(this::removeMailboxItemFromLocalStore);
}


Expand Down Expand Up @@ -453,7 +453,7 @@ private void handleMailboxItem(MailboxItem mailboxItem) {
if (!mailboxItemsByUid.containsKey(uid)) {
mailboxItemsByUid.put(uid, mailboxItem);
mailboxMessageList.add(mailboxItem);
log.trace("handleMailboxItem uid={}\nhash={}\nmailboxMessageList={}",
log.trace("## handleMailboxItem uid={}\nhash={}\nmailboxMessageList={}",
uid,
P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()),
mailboxItemsByUid);
Expand Down Expand Up @@ -567,25 +567,49 @@ private void removeMailboxEntryFromNetwork(ProtectedMailboxStorageEntry protecte
}
}

private void republishMailBoxMessages() {
log.trace("republishMailBoxMessages mailboxItemsByUid={}", mailboxItemsByUid);
private void maybeRepublishMailBoxMessages() {
// We only do the republishing at seed nodes to avoid that the network gets too much traffic
// 1000 mailbox messages are about 3 MB, so that would cause quite some load if all nodes would do that.
if (!peerManager.isSeedNode(networkNode.getNodeAddress())) {
return;
}

log.trace("## republishMailBoxMessages mailboxItemsByUid={}", mailboxItemsByUid);

// In addProtectedStorageEntry we break early if we have already received a remove message for that entry.
mailboxItemsByUid.values().stream()
republishInChunks(mailboxItemsByUid.values().stream()
.filter(e -> !e.isExpired(clock))
.map(MailboxItem::getProtectedMailboxStorageEntry)
.forEach(protectedMailboxStorageEntry ->
p2PDataStorage.addProtectedStorageEntry(protectedMailboxStorageEntry,
networkNode.getNodeAddress(),
null));
.collect(Collectors.toCollection(ArrayDeque::new)));
}

// We republish buckets of 50 items which is about 200 kb. With 20 connections at a seed node that results in
// 4 MB in total. For 1000 messages it takes 40 min with a 2 min delay. We do that republishing just for
// additional resilience and as a backup in case all seed nodes would fail to prevent that mailbox messages would
// get lost. A long delay for republishing is preferred over too much network load.
private void republishInChunks(Queue<ProtectedMailboxStorageEntry> queue) {
log.trace("## republishInChunks queue={}", queue.size());
int i = 0;
while (!queue.isEmpty() && i < 50) {
ProtectedMailboxStorageEntry protectedMailboxStorageEntry = queue.poll();
i++;
// Broadcaster will accumulate messages in a BundleOfEnvelopes
p2PDataStorage.republishExistingProtectedMailboxStorageEntry(protectedMailboxStorageEntry,
networkNode.getNodeAddress(),
null);
}
if (!queue.isEmpty()) {
// We delay 2 minutes to not overload the network
UserThread.runAfter(() -> republishInChunks(queue), REPUBLISH_DELAY_SEC);
}
}

private void removeMailboxItemFromLocalStore(String uid) {
if (mailboxItemsByUid.containsKey(uid)) {
MailboxItem mailboxItem = mailboxItemsByUid.get(uid);
mailboxItemsByUid.remove(uid);
mailboxMessageList.remove(mailboxItem);
log.trace("removeMailboxItemFromMap uid={}\nhash={}\nmailboxItemsByUid={}",
log.trace("## removeMailboxItemFromMap uid={}\nhash={}\nmailboxItemsByUid={}",
uid,
P2PDataStorage.get32ByteHashAsByteArray(mailboxItem.getProtectedMailboxStorageEntry().getProtectedStoragePayload()),
mailboxItemsByUid
Expand Down
Loading

0 comments on commit b63d632

Please sign in to comment.