Skip to content

Commit

Permalink
Add GetDataResponsePriority for filtering payloads for GetDataResponse.
Browse files Browse the repository at this point in the history
Fix sorting at truncating DateSortedTruncatablePayloads.
Do not include items when payload.toProtoMessage() == null.

Signed-off-by: HenrikJannsen <[email protected]>
  • Loading branch information
HenrikJannsen committed Jan 5, 2023
1 parent 5542ea2 commit 0822515
Showing 1 changed file with 99 additions and 38 deletions.
137 changes: 99 additions & 38 deletions p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import bisq.common.crypto.Hash;
import bisq.common.crypto.Sig;
import bisq.common.persistence.PersistenceManager;
import bisq.common.proto.network.GetDataResponsePriority;
import bisq.common.proto.network.NetworkEnvelope;
import bisq.common.proto.network.NetworkPayload;
import bisq.common.proto.persistable.PersistablePayload;
Expand Down Expand Up @@ -425,68 +426,128 @@ static private <T extends NetworkPayload> Set<T> filterKnownHashes(
// We start with the non-DateSortedTruncatablePayload as they have higher priority. In case we would exceed our
// size limit the following DateSortedTruncatablePayload items would not get added at all.
Set<Map.Entry<ByteArray, T>> entries = toFilter.entrySet();
List<T> filteredResults = entries.stream()
.filter(entry -> !(entry.getValue() instanceof DateSortedTruncatablePayload))
List<T> resultPayloads = new ArrayList<>();

// Truncation follows this rules
// 1. Add all payloads with GetDataResponsePriority.MID
// 2. Add all payloads with GetDataResponsePriority.LOW && !DateSortedTruncatablePayload until exceededSizeLimit is reached
// 3. if(!exceededSizeLimit) Add all payloads with GetDataResponsePriority.LOW && DateSortedTruncatablePayload until
// exceededSizeLimit is reached and truncate by maxItems (sorted by date). We add the sublist to our resultPayloads in
// reverse order so in case we cut off at next step we cut off oldest items.
// 4. We truncate list if resultList size > maxEntries
// 5. Add all payloads with GetDataResponsePriority.HIGH


// 1. Add all payloads with GetDataResponsePriority.MID
List<T> midPrioPayloads = entries.stream()
.filter(entry -> entry.getValue().getGetDataResponsePriority() == GetDataResponsePriority.MID)
.filter(entry -> !knownHashes.contains(entry.getKey()))
.map(Map.Entry::getValue)
.filter(payload -> shouldTransmitPayloadToPeer(peerCapabilities, objToPayload.apply(payload)))
.filter(payload -> {
Message message = payload.toProtoMessage();
if (message == null) {
return true;
}
if (exceededSizeLimit.get() || totalSize.addAndGet(message.getSerializedSize()) > limit) {
exceededSizeLimit.set(true);
}
return !exceededSizeLimit.get();
})
.collect(Collectors.toList());
log.info("Num filtered non-dateSortedTruncatablePayloads {}", filteredResults.size());
resultPayloads.addAll(midPrioPayloads);
log.info("Number of payloads with GetDataResponsePriority.MID: {}", midPrioPayloads.size());

List<T> dateSortedTruncatablePayloads = entries.stream()
.filter(entry -> entry.getValue() instanceof DateSortedTruncatablePayload)
// 2. Add all payloads with GetDataResponsePriority.LOW && !DateSortedTruncatablePayload until exceededSizeLimit is reached
List<T> lowPrioPayloads = entries.stream()
.filter(entry -> entry.getValue().getGetDataResponsePriority() == GetDataResponsePriority.LOW)
.filter(entry -> !(entry.getValue() instanceof DateSortedTruncatablePayload))
.filter(entry -> !knownHashes.contains(entry.getKey()))
.map(Map.Entry::getValue)
.filter(payload -> shouldTransmitPayloadToPeer(peerCapabilities, objToPayload.apply(payload)))
.filter(payload -> {
Message message = payload.toProtoMessage();
if (message == null) {
return true;
// todo can that be the case?
log.warn("payload.toProtoMessage() is null {}", payload);
return false;
}
if (exceededSizeLimit.get()) {
return false;
}
if (exceededSizeLimit.get() || totalSize.addAndGet(message.getSerializedSize()) > limit) {

if (totalSize.addAndGet(message.getSerializedSize()) > limit) {
exceededSizeLimit.set(true);
return false;
}
return !exceededSizeLimit.get();

return true;
})
.sorted(Comparator.comparing(payload -> ((DateSortedTruncatablePayload) payload).getDate()))
.collect(Collectors.toList());
log.info("Num filtered dateSortedTruncatablePayloads {}", dateSortedTruncatablePayloads.size());
if (!dateSortedTruncatablePayloads.isEmpty()) {
int maxItems = ((DateSortedTruncatablePayload) dateSortedTruncatablePayloads.get(0)).maxItems();
if (dateSortedTruncatablePayloads.size() > maxItems) {
int fromIndex = dateSortedTruncatablePayloads.size() - maxItems;
int toIndex = dateSortedTruncatablePayloads.size();
dateSortedTruncatablePayloads = dateSortedTruncatablePayloads.subList(fromIndex, toIndex);
outTruncated.set(true);
log.info("Num truncated dateSortedTruncatablePayloads {}", dateSortedTruncatablePayloads.size());
resultPayloads.addAll(lowPrioPayloads);
log.info("Number of payloads with GetDataResponsePriority.LOW and !DateSortedTruncatablePayload: {}. Exceeded size limit: {}", lowPrioPayloads.size(), exceededSizeLimit.get());

// 3. if(!exceededSizeLimit) Add all payloads with GetDataResponsePriority.LOW && DateSortedTruncatablePayload until
// exceededSizeLimit is reached and truncate by maxItems (sorted by date). We add the sublist to our resultPayloads in
// reverse order so in case we cut off at next step we cut off oldest items.
if (!exceededSizeLimit.get()) {
List<T> dateSortedPayloads = entries.stream()
.filter(entry -> entry.getValue().getGetDataResponsePriority() == GetDataResponsePriority.LOW)
.filter(entry -> entry.getValue() instanceof DateSortedTruncatablePayload)
.filter(entry -> !knownHashes.contains(entry.getKey()))
.map(Map.Entry::getValue)
.filter(payload -> shouldTransmitPayloadToPeer(peerCapabilities, objToPayload.apply(payload)))
.filter(payload -> {
Message message = payload.toProtoMessage();
if (message == null) {
// todo can that be the case?
log.warn("payload.toProtoMessage() is null {}", payload);
return false;
}
if (exceededSizeLimit.get()) {
return false;
}

if (totalSize.addAndGet(message.getSerializedSize()) > limit) {
exceededSizeLimit.set(true);
return false;
}

return true;
})
.sorted(Comparator.comparing(payload -> ((DateSortedTruncatablePayload) payload).getDate()))
.collect(Collectors.toList());
if (!dateSortedPayloads.isEmpty()) {
int maxItems = ((DateSortedTruncatablePayload) dateSortedPayloads.get(0)).maxItems();
int size = dateSortedPayloads.size();
if (size > maxItems) {
int fromIndex = size - maxItems;
dateSortedPayloads = dateSortedPayloads.subList(fromIndex, size);
outTruncated.set(true);
log.info("Num truncated dateSortedPayloads {}", size);
log.info("Removed oldest {} dateSortedPayloads as we exceeded {}", fromIndex, maxItems);
}
}
}
log.info("Number of payloads with GetDataResponsePriority.LOW and DateSortedTruncatablePayload: {}. Was truncated: {}", dateSortedPayloads.size(), outTruncated.get());

// The non-dateSortedTruncatablePayloads have higher prio, so we added dateSortedTruncatablePayloads
// after those so in case we need to truncate we first truncate the dateSortedTruncatablePayloads.
filteredResults.addAll(dateSortedTruncatablePayloads);
// We reverse sorting so in case we get truncated we cut off the older items
dateSortedPayloads.sort(Comparator.comparing(payload -> ((DateSortedTruncatablePayload) payload).getDate()).reversed());
resultPayloads.addAll(dateSortedPayloads);
} else {
log.info("No dateSortedPayloads added as we exceeded already the exceededSizeLimit of {}", limit);
}

if (filteredResults.size() > maxEntries) {
filteredResults = filteredResults.subList(0, maxEntries);
// 4. We truncate list if resultList size > maxEntries
int size = resultPayloads.size();
if (size > maxEntries) {
resultPayloads = resultPayloads.subList(0, maxEntries);
outTruncated.set(true);
log.info("Num truncated filteredResults {}", filteredResults.size());
} else {
log.info("Num filteredResults {}", filteredResults.size());
log.info("Removed last {} payloads as we exceeded {}", size - maxEntries, maxEntries);
}

outTruncated.set(outTruncated.get() || exceededSizeLimit.get());

return new HashSet<>(filteredResults);
// 5. Add all payloads with GetDataResponsePriority.HIGH
List<T> highPrioPayloads = entries.stream()
.filter(entry -> entry.getValue().getGetDataResponsePriority() == GetDataResponsePriority.HIGH)
.filter(entry -> !knownHashes.contains(entry.getKey()))
.map(Map.Entry::getValue)
.filter(payload -> shouldTransmitPayloadToPeer(peerCapabilities, objToPayload.apply(payload)))
.collect(Collectors.toList());
resultPayloads.addAll(highPrioPayloads);
log.info("Number of payloads with GetDataResponsePriority.HIGH: {}", highPrioPayloads.size());
log.info("Number of result payloads we send to requester: {}", resultPayloads.size());
return new HashSet<>(resultPayloads);
}

public Collection<PersistableNetworkPayload> getPersistableNetworkPayloadCollection() {
Expand Down

0 comments on commit 0822515

Please sign in to comment.