Skip to content

Commit

Permalink
Limit max. nr. of PersistableNetworkPayload and ProtectedStorageEntri…
Browse files Browse the repository at this point in the history
…es (#3562)

* Limit max. nr. of PersistableNetworkPayload and ProtectedStorageEntry to 10000

To avoid that seed nodes get overloaded with requests for too many
PersistableNetworkPayload and ProtectedStorageEntry data we limit nr. of
entries to max 10000.

* Add peers node address to logs

* Improve logs

- Add log of size to GetBlocksResponse.toProtoNetworkEnvelope method
- Log in kb

* Log connection UID if not peer address available

* Add cleanup code or invalid objects

We have an invalid Filter object in the live network (prob. some dev
made some mistake). This code helps so clean that up.

* Add log
  • Loading branch information
chimp1984 authored and ripcurlx committed Nov 5, 2019
1 parent 5a97683 commit b976bec
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ private GetBlocksResponse(List<RawBlock> blocks, int requestNonce, int messageVe

@Override
public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
return getNetworkEnvelopeBuilder()
protobuf.NetworkEnvelope proto = getNetworkEnvelopeBuilder()
.setGetBlocksResponse(protobuf.GetBlocksResponse.newBuilder()
.addAllRawBlocks(blocks.stream()
.map(RawBlock::toProtoMessage)
.collect(Collectors.toList()))
.setRequestNonce(requestNonce))
.build();
log.info("Sending a GetBlocksResponse with {} kB", proto.getSerializedSize() / 1000d);
return proto;
}

public static NetworkEnvelope fromProto(protobuf.GetBlocksResponse proto, int messageVersion) {
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/bisq/core/filter/FilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public void onAllServicesInitialized() {
public void onAdded(ProtectedStorageEntry data) {
if (data.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) data.getProtectedStoragePayload();
addFilter(filter);
boolean wasValid = addFilter(filter);
if (!wasValid) {
UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(data), 1);
}
}
}

Expand Down Expand Up @@ -203,7 +206,7 @@ private void resetFilters() {
filterProperty.set(null);
}

private void addFilter(Filter filter) {
private boolean addFilter(Filter filter) {
if (verifySignature(filter)) {
// Seed nodes are requested at startup before we get the filter so we only apply the banned
// nodes at the next startup and don't update the list in the P2P network domain.
Expand All @@ -223,6 +226,9 @@ private void addFilter(Filter filter) {
if (filter.isPreventPublicBtcNetwork() &&
preferences.getBitcoinNodesOptionOrdinal() == BtcNodes.BitcoinNodesOption.PUBLIC.ordinal())
preferences.setBitcoinNodesOptionOrdinal(BtcNodes.BitcoinNodesOption.PROVIDED.ordinal());
return true;
} else {
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -50,6 +51,7 @@
@Slf4j
public class GetDataRequestHandler {
private static final long TIMEOUT = 90;
private static final int MAX_ENTRIES = 10000;


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -136,27 +138,54 @@ public void onFailure(@NotNull Throwable throwable) {

private Set<PersistableNetworkPayload> getFilteredPersistableNetworkPayload(GetDataRequest getDataRequest,
Connection connection) {
final Set<P2PDataStorage.ByteArray> tempLookupSet = new HashSet<>();
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
Set<P2PDataStorage.ByteArray> tempLookupSet = new HashSet<>();
String connectionInfo = "connectionInfo" + connection.getPeersNodeAddressOptional()
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());

return dataStorage.getAppendOnlyDataStoreMap().entrySet().stream()
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
AtomicInteger maxSize = new AtomicInteger(MAX_ENTRIES);
Set<PersistableNetworkPayload> result = dataStorage.getAppendOnlyDataStoreMap().entrySet().stream()
.filter(e -> !excludedKeysAsByteArray.contains(e.getKey()))
.filter(e -> maxSize.decrementAndGet() >= 0)
.map(Map.Entry::getValue)
.filter(payload -> (connection.noCapabilityRequiredOrCapabilityIsSupported(payload)))
.filter(payload -> tempLookupSet.add(new P2PDataStorage.ByteArray(payload.getHash())))
.filter(connection::noCapabilityRequiredOrCapabilityIsSupported)
.filter(payload -> {
boolean notContained = tempLookupSet.add(new P2PDataStorage.ByteArray(payload.getHash()));
return notContained;
})
.collect(Collectors.toSet());
if (maxSize.get() <= 0) {
log.warn("The getData request from peer with {} caused too much PersistableNetworkPayload " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}
log.info("The getData request from peer with {} contains {} PersistableNetworkPayload entries ",
connectionInfo, result.size());
return result;
}

private Set<ProtectedStorageEntry> getFilteredProtectedStorageEntries(GetDataRequest getDataRequest,
Connection connection) {
final Set<ProtectedStorageEntry> filteredDataSet = new HashSet<>();
final Set<Integer> lookupSet = new HashSet<>();
Set<ProtectedStorageEntry> filteredDataSet = new HashSet<>();
Set<Integer> lookupSet = new HashSet<>();
String connectionInfo = "connectionInfo" + connection.getPeersNodeAddressOptional()
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());

AtomicInteger maxSize = new AtomicInteger(MAX_ENTRIES);
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
Set<ProtectedStorageEntry> filteredSet = dataStorage.getMap().entrySet().stream()
.filter(e -> !excludedKeysAsByteArray.contains(e.getKey()))
.filter(e -> maxSize.decrementAndGet() >= 0)
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
if (maxSize.get() <= 0) {
log.warn("The getData request from peer with {} caused too much ProtectedStorageEntry " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}
log.info("getFilteredProtectedStorageEntries " + filteredSet.size());

for (ProtectedStorageEntry protectedStorageEntry : filteredSet) {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
Expand All @@ -171,11 +200,14 @@ private Set<ProtectedStorageEntry> getFilteredProtectedStorageEntries(GetDataReq
doAdd = true;
}
if (doAdd) {
if (lookupSet.add(protectedStoragePayload.hashCode()))
boolean notContained = lookupSet.add(protectedStoragePayload.hashCode());
if (notContained)
filteredDataSet.add(protectedStorageEntry);
}
}

log.info("The getData request from peer with {} contains {} ProtectedStorageEntry entries ",
connectionInfo, filteredDataSet.size());
return filteredDataSet;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
protobuf.NetworkEnvelope proto = getNetworkEnvelopeBuilder()
.setGetDataResponse(builder)
.build();
log.info("Sending a GetDataResponse with size = {} bytes", proto.toByteArray().length);
log.info("Sending a GetDataResponse with {} kB", proto.getSerializedSize() / 1000d);
return proto;
}

public static GetDataResponse fromProto(protobuf.GetDataResponse proto,
NetworkProtoResolver resolver,
int messageVersion) {
log.info("Received a GetDataResponse with size = {} bytes", proto.toByteArray().length);
log.info("Received a GetDataResponse with {} kB", proto.getSerializedSize() / 1000d);
Set<ProtectedStorageEntry> dataSet = new HashSet<>(
proto.getDataSetList().stream()
.map(entry -> (ProtectedStorageEntry) resolver.fromProto(entry))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
NetworkEnvelope proto = getNetworkEnvelopeBuilder()
.setGetUpdatedDataRequest(builder)
.build();
log.info("Sending a GetUpdatedDataRequest with size = {} bytes", proto.toByteArray().length);
log.info("Sending a GetUpdatedDataRequest with {} kB", proto.getSerializedSize() / 1000d);
return proto;
}

public static GetUpdatedDataRequest fromProto(protobuf.GetUpdatedDataRequest proto, int messageVersion) {
log.info("Received a GetUpdatedDataRequest with size = {} bytes", proto.toByteArray().length);
log.info("Received a GetUpdatedDataRequest with {} kB", proto.getSerializedSize() / 1000d);
return new GetUpdatedDataRequest(NodeAddress.fromProto(proto.getSenderNodeAddress()),
proto.getNonce(),
ProtoUtil.byteSetFromProtoByteStringList(proto.getExcludedKeysList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
NetworkEnvelope proto = getNetworkEnvelopeBuilder()
.setPreliminaryGetDataRequest(builder)
.build();
log.info("Sending a PreliminaryGetDataRequest with size = {} bytes", proto.toByteArray().length);
log.info("Sending a PreliminaryGetDataRequest with {} kB", proto.getSerializedSize() / 1000d);
return proto;
}

public static PreliminaryGetDataRequest fromProto(protobuf.PreliminaryGetDataRequest proto, int messageVersion) {
log.info("Received a PreliminaryGetDataRequest with size = {} bytes", proto.toByteArray().length);
log.info("Received a PreliminaryGetDataRequest with {} kB", proto.getSerializedSize() / 1000d);
Capabilities supportedCapabilities = proto.getSupportedCapabilitiesList().isEmpty() ?
null :
Capabilities.fromIntList(proto.getSupportedCapabilitiesList());
Expand Down
32 changes: 32 additions & 0 deletions p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,38 @@ && checkSignature(protectedStorageEntry)
return result;
}


/**
* This method must be called only from client code not from network messages! We omit the ownership checks
* so we must apply it only if it comes from our trusted application code. It is used from client code which detects
* that the domain object violates specific domain rules.
* We could make it more generic by adding an Interface with a generic validation method.
*
* @param protectedStorageEntry The entry to be removed
*/
public void removeInvalidProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry) {
log.warn("We remove an invalid protectedStorageEntry: {}", protectedStorageEntry);
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);

if (!map.containsKey(hashOfPayload)) {
return;
}

doRemoveProtectedExpirableData(protectedStorageEntry, hashOfPayload);
removeFromProtectedDataStore(protectedStorageEntry);

// We do not update the sequence number as that method is only called if we have received an invalid
// protectedStorageEntry from a previous add operation.

// We do not call maybeAddToRemoveAddOncePayloads to avoid that an invalid object might block a valid object
// which we might receive in future (could be potential attack).

// We do not broadcast as this is a local operation only to avoid our maps get polluted with invalid objects
// and as we do not check for ownership a node would not accept such a procedure if it would come from untrusted
// source (network).
}

private void removeFromProtectedDataStore(ProtectedStorageEntry protectedStorageEntry) {
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof PersistablePayload) {
Expand Down

0 comments on commit b976bec

Please sign in to comment.