Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit getDataResponse size #6428

Merged
merged 7 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import bisq.common.proto.ProtobufferException;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.persistable.PersistedDataHost;
import bisq.common.util.Tuple2;
import bisq.common.util.Utilities;

import javax.inject.Inject;
Expand Down Expand Up @@ -79,6 +80,7 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -160,50 +162,69 @@ public MailboxMessageService(NetworkNode networkNode,
@Override
public void readPersisted(Runnable completeHandler) {
persistenceManager.readPersisted(persisted -> {
log.trace("## readPersisted persisted {}", persisted.size());
Map<String, Long> numItemsPerDay = new HashMap<>();
// We sort by creation date and limit to max 3000 entries, so oldest items get skipped even if TTL
// is not reached to cap the memory footprint. 3000 items is about 10 MB.
Map<String, Tuple2<AtomicLong, List<Integer>>> numItemsPerDay = new HashMap<>();
AtomicLong totalSize = new AtomicLong();
// We sort by creation date and limit to max 3000 entries, so the oldest items get skipped even if TTL
// is not reached. 3000 items is about 60 MB with max size of 20kb supported for storage.
persisted.stream()
.sorted(Comparator.comparingLong(o -> ((MailboxItem) o).getProtectedMailboxStorageEntry().getCreationTimeStamp()).reversed())
.limit(3000)
.filter(e -> !e.isExpired(clock))
.filter(e -> !mailboxItemsByUid.containsKey(e.getUid()))
.limit(3000)
.forEach(mailboxItem -> {
ProtectedMailboxStorageEntry protectedMailboxStorageEntry = mailboxItem.getProtectedMailboxStorageEntry();
int serializedSize = protectedMailboxStorageEntry.toProtoMessage().getSerializedSize();
// Usual size is 3-4kb. A few are about 15kb and very few are larger and about 100kb or
// more (probably attachments in disputes)
// We ignore those large data to reduce memory footprint.
String date = new Date(protectedMailboxStorageEntry.getCreationTimeStamp()).toString();
String day = date.substring(4, 10);
numItemsPerDay.putIfAbsent(day, new Tuple2<>(new AtomicLong(0), new ArrayList<>()));
Tuple2<AtomicLong, List<Integer>> tuple = numItemsPerDay.get(day);
tuple.first.getAndIncrement();
tuple.second.add(serializedSize);

// We only keep small items, to reduce the potential impact of missed remove messages.
// E.g. if a seed at a longer restart period missed the remove messages, then when loading from
// persisted data the messages, they would add those again and distribute then later at requests to peers.
// Those outdated messages would then stay in the network until TTL triggers removal.
// By not applying large messages we reduce the impact of such cases at costs of extra loading costs if the message is still alive.
if (serializedSize < 20000) {
String date = new Date(protectedMailboxStorageEntry.getCreationTimeStamp()).toString();
String day = date.substring(4, 10);
numItemsPerDay.putIfAbsent(day, 0L);
numItemsPerDay.put(day, numItemsPerDay.get(day) + 1);

String uid = mailboxItem.getUid();
mailboxItemsByUid.put(uid, mailboxItem);
mailboxItemsByUid.put(mailboxItem.getUid(), mailboxItem);
mailboxMessageList.add(mailboxItem);
totalSize.getAndAdd(serializedSize);

// We add it to our map so that it get added to the excluded key set we send for
// the initial data requests. So that helps to lower the load for mailbox messages at
// initial data requests.
//todo check if listeners are called too early
p2PDataStorage.addProtectedMailboxStorageEntryToMap(protectedMailboxStorageEntry);

log.trace("## readPersisted uid={}\nhash={}\nisMine={}\ndate={}\nsize={}",
uid,
P2PDataStorage.get32ByteHashAsByteArray(protectedMailboxStorageEntry.getProtectedStoragePayload()),
mailboxItem.isMine(),
date,
serializedSize);
} else {
log.info("We ignore this large persisted mailboxItem. If still valid we will reload it from seed nodes at getData requests.\n" +
"Size={}; date={}; sender={}", Utilities.readableFileSize(serializedSize), date,
mailboxItem.getProtectedMailboxStorageEntry().getMailboxStoragePayload().getPrefixedSealedAndSignedMessage().getSenderNodeAddress());
}
});

List<Map.Entry<String, Long>> perDay = numItemsPerDay.entrySet().stream()
List<String> perDay = numItemsPerDay.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.map(entry -> {
Tuple2<AtomicLong, List<Integer>> tuple = entry.getValue();
List<Integer> sizes = tuple.second;
long sum = sizes.stream().mapToLong(s -> s).sum();
List<String> largeItems = sizes.stream()
.filter(s -> s > 20000)
.map(Utilities::readableFileSize)
.collect(Collectors.toList());
String largeMsgInfo = largeItems.isEmpty() ? "" : "; Large messages: " + largeItems;
return entry.getKey() + ": Num messages: " + tuple.first + "; Total size: " +
Utilities.readableFileSize(sum) + largeMsgInfo;
})
.collect(Collectors.toList());
log.info("We loaded {} persisted mailbox messages.\nPer day distribution:\n{}", mailboxMessageList.size(), Joiner.on("\n").join(perDay));

log.info("We loaded {} persisted mailbox messages with {}.\nPer day distribution:\n{}",
mailboxMessageList.size(),
Utilities.readableFileSize(totalSize.get()),
Joiner.on("\n").join(perDay));

requestPersistence();
completeHandler.run();
},
Expand Down
4 changes: 4 additions & 0 deletions p2p/src/main/java/bisq/network/p2p/network/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public static int getPermittedMessageSize() {
return PERMITTED_MESSAGE_SIZE;
}

public static int getMaxPermittedMessageSize() {
return MAX_PERMITTED_MESSAGE_SIZE;
}


///////////////////////////////////////////////////////////////////////////////////////////
// Class fields
Expand Down
18 changes: 11 additions & 7 deletions p2p/src/main/java/bisq/network/p2p/network/ProtoOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
class ProtoOutputStream {
private static final Logger log = LoggerFactory.getLogger(ProtoOutputStream.class);

private final OutputStream delegate;
private final OutputStream outputStream;
private final Statistic statistic;

ProtoOutputStream(OutputStream delegate, Statistic statistic) {
this.delegate = delegate;
ProtoOutputStream(OutputStream outputStream, Statistic statistic) {
this.outputStream = outputStream;
this.statistic = statistic;
}

Expand All @@ -52,17 +52,21 @@ void writeEnvelope(NetworkEnvelope envelope) {

void onConnectionShutdown() {
try {
delegate.close();
outputStream.close();
} catch (Throwable t) {
log.error("Failed to close connection", t);
}
}

private void writeEnvelopeOrThrow(NetworkEnvelope envelope) throws IOException {
long ts = System.currentTimeMillis();
protobuf.NetworkEnvelope proto = envelope.toProtoNetworkEnvelope();
proto.writeDelimitedTo(delegate);
delegate.flush();

proto.writeDelimitedTo(outputStream);
outputStream.flush();
long duration = System.currentTimeMillis() - ts;
if (duration > 10000) {
log.info("Sending {} to peer took {} sec.", envelope.getClass().getSimpleName(), duration / 1000d);
}
statistic.addSentBytes(proto.getSerializedSize());
statistic.addSentMessage(envelope);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import bisq.common.Timer;
import bisq.common.UserThread;
import bisq.common.app.Log;
import bisq.common.proto.network.NetworkProtoResolver;
import bisq.common.util.Utilities;

Expand Down Expand Up @@ -210,7 +209,7 @@ private void createTorAndHiddenService(int localPort, int servicePort) {
torStartupFuture = executorService.submit(() -> {
try {
// temporarily switch tor to debug logging
String savedLogLevel = Log.pushCustomLogLevel("org.berndpruenster.netlayer", "DEBUG");
// String savedLogLevel = Log.pushCustomLogLevel("org.berndpruenster.netlayer", "DEBUG");
// get tor
Tor.setDefault(torMode.getTor());

Expand All @@ -226,7 +225,7 @@ private void createTorAndHiddenService(int localPort, int servicePort) {
"################################################################",
(new Date().getTime() - ts2), socket); //takes usually 30-40 sec
// tor has started, revert from debug to original log level
Log.pushCustomLogLevel("org.berndpruenster.netlayer", savedLogLevel);
// Log.pushCustomLogLevel("org.berndpruenster.netlayer", savedLogLevel);
new Thread() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,11 @@ public void handle(GetDataRequest getDataRequest, final Connection connection) {
connection.getCapabilities());

if (wasPersistableNetworkPayloadsTruncated.get()) {
log.warn("The getData request from peer with {} caused too much PersistableNetworkPayload " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
log.warn("The getDataResponse for peer {} got truncated.", connectionInfo);
}

if (wasProtectedStorageEntriesTruncated.get()) {
log.warn("The getData request from peer with {} caused too much ProtectedStorageEntry " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
log.warn("The getDataResponse for peer {} got truncated.", connectionInfo);
}

log.info("The getDataResponse to peer with {} contains {} ProtectedStorageEntries and {} PersistableNetworkPayloads",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
public static GetDataResponse fromProto(protobuf.GetDataResponse proto,
NetworkProtoResolver resolver,
int messageVersion) {
log.info("Received a GetDataResponse with {}", Utilities.readableFileSize(proto.getSerializedSize()));
boolean wasTruncated = proto.getWasTruncated();
log.info("Received a GetDataResponse with {} {}",
Utilities.readableFileSize(proto.getSerializedSize()),
wasTruncated ? " (was truncated)" : "");
Set<ProtectedStorageEntry> dataSet = proto.getDataSetList().stream()
.map(entry -> (ProtectedStorageEntry) resolver.fromProto(entry)).collect(Collectors.toSet());
Set<PersistableNetworkPayload> persistableNetworkPayloadSet = proto.getPersistableNetworkPayloadItemsList().stream()
Expand All @@ -134,7 +137,7 @@ public static GetDataResponse fromProto(protobuf.GetDataResponse proto,
persistableNetworkPayloadSet,
proto.getRequestNonce(),
proto.getIsGetUpdatedDataResponse(),
proto.getWasTruncated(),
wasTruncated,
Capabilities.fromIntList(proto.getSupportedCapabilitiesList()),
messageVersion);
}
Expand Down
Loading