Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit max. nr. of PersistableNetworkPayload and ProtectedStorageEntries #3562

Merged
merged 6 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ private GetBlocksResponse(List<RawBlock> blocks, int requestNonce, int messageVe

@Override
public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
return getNetworkEnvelopeBuilder()
protobuf.NetworkEnvelope proto = getNetworkEnvelopeBuilder()
.setGetBlocksResponse(protobuf.GetBlocksResponse.newBuilder()
.addAllRawBlocks(blocks.stream()
.map(RawBlock::toProtoMessage)
.collect(Collectors.toList()))
.setRequestNonce(requestNonce))
.build();
log.info("Sending a GetBlocksResponse with {} kB", proto.getSerializedSize() / 1000d);
return proto;
}

public static NetworkEnvelope fromProto(protobuf.GetBlocksResponse proto, int messageVersion) {
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/bisq/core/filter/FilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public void onAllServicesInitialized() {
public void onAdded(ProtectedStorageEntry data) {
if (data.getProtectedStoragePayload() instanceof Filter) {
Filter filter = (Filter) data.getProtectedStoragePayload();
addFilter(filter);
boolean wasValid = addFilter(filter);
if (!wasValid) {
UserThread.runAfter(() -> p2PService.getP2PDataStorage().removeInvalidProtectedStorageEntry(data), 1);
}
}
}

Expand Down Expand Up @@ -203,7 +206,7 @@ private void resetFilters() {
filterProperty.set(null);
}

private void addFilter(Filter filter) {
private boolean addFilter(Filter filter) {
if (verifySignature(filter)) {
// Seed nodes are requested at startup before we get the filter so we only apply the banned
// nodes at the next startup and don't update the list in the P2P network domain.
Expand All @@ -223,6 +226,9 @@ private void addFilter(Filter filter) {
if (filter.isPreventPublicBtcNetwork() &&
preferences.getBitcoinNodesOptionOrdinal() == BtcNodes.BitcoinNodesOption.PUBLIC.ordinal())
preferences.setBitcoinNodesOptionOrdinal(BtcNodes.BitcoinNodesOption.PROVIDED.ordinal());
return true;
} else {
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -50,6 +51,7 @@
@Slf4j
public class GetDataRequestHandler {
private static final long TIMEOUT = 90;
private static final int MAX_ENTRIES = 10000;


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -136,27 +138,54 @@ public void onFailure(@NotNull Throwable throwable) {

private Set<PersistableNetworkPayload> getFilteredPersistableNetworkPayload(GetDataRequest getDataRequest,
Connection connection) {
final Set<P2PDataStorage.ByteArray> tempLookupSet = new HashSet<>();
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
Set<P2PDataStorage.ByteArray> tempLookupSet = new HashSet<>();
String connectionInfo = "connectionInfo" + connection.getPeersNodeAddressOptional()
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());

return dataStorage.getAppendOnlyDataStoreMap().entrySet().stream()
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
AtomicInteger maxSize = new AtomicInteger(MAX_ENTRIES);
Set<PersistableNetworkPayload> result = dataStorage.getAppendOnlyDataStoreMap().entrySet().stream()
.filter(e -> !excludedKeysAsByteArray.contains(e.getKey()))
.filter(e -> maxSize.decrementAndGet() >= 0)
.map(Map.Entry::getValue)
.filter(payload -> (connection.noCapabilityRequiredOrCapabilityIsSupported(payload)))
.filter(payload -> tempLookupSet.add(new P2PDataStorage.ByteArray(payload.getHash())))
.filter(connection::noCapabilityRequiredOrCapabilityIsSupported)
.filter(payload -> {
boolean notContained = tempLookupSet.add(new P2PDataStorage.ByteArray(payload.getHash()));
return notContained;
})
.collect(Collectors.toSet());
if (maxSize.get() <= 0) {
log.warn("The getData request from peer with {} caused too much PersistableNetworkPayload " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}
log.info("The getData request from peer with {} contains {} PersistableNetworkPayload entries ",
connectionInfo, result.size());
return result;
}

private Set<ProtectedStorageEntry> getFilteredProtectedStorageEntries(GetDataRequest getDataRequest,
Connection connection) {
final Set<ProtectedStorageEntry> filteredDataSet = new HashSet<>();
final Set<Integer> lookupSet = new HashSet<>();
Set<ProtectedStorageEntry> filteredDataSet = new HashSet<>();
Set<Integer> lookupSet = new HashSet<>();
String connectionInfo = "connectionInfo" + connection.getPeersNodeAddressOptional()
.map(e -> "node address " + e.getFullAddress())
.orElseGet(() -> "connection UID " + connection.getUid());

AtomicInteger maxSize = new AtomicInteger(MAX_ENTRIES);
Set<P2PDataStorage.ByteArray> excludedKeysAsByteArray = P2PDataStorage.ByteArray.convertBytesSetToByteArraySet(getDataRequest.getExcludedKeys());
Set<ProtectedStorageEntry> filteredSet = dataStorage.getMap().entrySet().stream()
.filter(e -> !excludedKeysAsByteArray.contains(e.getKey()))
.filter(e -> maxSize.decrementAndGet() >= 0)
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
if (maxSize.get() <= 0) {
log.warn("The getData request from peer with {} caused too much ProtectedStorageEntry " +
"entries to get delivered. We limited the entries for the response to {} entries",
connectionInfo, MAX_ENTRIES);
}
log.info("getFilteredProtectedStorageEntries " + filteredSet.size());

for (ProtectedStorageEntry protectedStorageEntry : filteredSet) {
final ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
Expand All @@ -171,11 +200,14 @@ private Set<ProtectedStorageEntry> getFilteredProtectedStorageEntries(GetDataReq
doAdd = true;
}
if (doAdd) {
if (lookupSet.add(protectedStoragePayload.hashCode()))
boolean notContained = lookupSet.add(protectedStoragePayload.hashCode());
if (notContained)
filteredDataSet.add(protectedStorageEntry);
}
}

log.info("The getData request from peer with {} contains {} ProtectedStorageEntry entries ",
connectionInfo, filteredDataSet.size());
return filteredDataSet;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
protobuf.NetworkEnvelope proto = getNetworkEnvelopeBuilder()
.setGetDataResponse(builder)
.build();
log.info("Sending a GetDataResponse with size = {} bytes", proto.toByteArray().length);
log.info("Sending a GetDataResponse with {} kB", proto.getSerializedSize() / 1000d);
return proto;
}

public static GetDataResponse fromProto(protobuf.GetDataResponse proto,
NetworkProtoResolver resolver,
int messageVersion) {
log.info("Received a GetDataResponse with size = {} bytes", proto.toByteArray().length);
log.info("Received a GetDataResponse with {} kB", proto.getSerializedSize() / 1000d);
Set<ProtectedStorageEntry> dataSet = new HashSet<>(
proto.getDataSetList().stream()
.map(entry -> (ProtectedStorageEntry) resolver.fromProto(entry))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
NetworkEnvelope proto = getNetworkEnvelopeBuilder()
.setGetUpdatedDataRequest(builder)
.build();
log.info("Sending a GetUpdatedDataRequest with size = {} bytes", proto.toByteArray().length);
log.info("Sending a GetUpdatedDataRequest with {} kB", proto.getSerializedSize() / 1000d);
return proto;
}

public static GetUpdatedDataRequest fromProto(protobuf.GetUpdatedDataRequest proto, int messageVersion) {
log.info("Received a GetUpdatedDataRequest with size = {} bytes", proto.toByteArray().length);
log.info("Received a GetUpdatedDataRequest with {} kB", proto.getSerializedSize() / 1000d);
return new GetUpdatedDataRequest(NodeAddress.fromProto(proto.getSenderNodeAddress()),
proto.getNonce(),
ProtoUtil.byteSetFromProtoByteStringList(proto.getExcludedKeysList()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
NetworkEnvelope proto = getNetworkEnvelopeBuilder()
.setPreliminaryGetDataRequest(builder)
.build();
log.info("Sending a PreliminaryGetDataRequest with size = {} bytes", proto.toByteArray().length);
log.info("Sending a PreliminaryGetDataRequest with {} kB", proto.getSerializedSize() / 1000d);
return proto;
}

public static PreliminaryGetDataRequest fromProto(protobuf.PreliminaryGetDataRequest proto, int messageVersion) {
log.info("Received a PreliminaryGetDataRequest with size = {} bytes", proto.toByteArray().length);
log.info("Received a PreliminaryGetDataRequest with {} kB", proto.getSerializedSize() / 1000d);
Capabilities supportedCapabilities = proto.getSupportedCapabilitiesList().isEmpty() ?
null :
Capabilities.fromIntList(proto.getSupportedCapabilitiesList());
Expand Down
32 changes: 32 additions & 0 deletions p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,38 @@ && checkSignature(protectedStorageEntry)
return result;
}


/**
* This method must be called only from client code not from network messages! We omit the ownership checks
* so we must apply it only if it comes from our trusted application code. It is used from client code which detects
* that the domain object violates specific domain rules.
* We could make it more generic by adding an Interface with a generic validation method.
*
* @param protectedStorageEntry The entry to be removed
*/
public void removeInvalidProtectedStorageEntry(ProtectedStorageEntry protectedStorageEntry) {
Copy link
Contributor

@julianknutsen julianknutsen Nov 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leads to data structure inconsistency in the event that two consumers listen for the same Payload type and one consumer calls into this function to remove the data. It also breaks encapsulation having the consumers know about the internals of the P2PDataStore. This isn't perfect now, anyway, but moving towards a model where P2PDataStore is private to P2PService may be a good goal.

It seems like a cleaner way to do the same thing would be to have the P2PDataStore validate each stored entry prior to becoming "active" or signaling listeners. The ProtectedStorageEntry and/or payload could implement an interface that would do this check and remove the data before it was every available for consumers.

Is that what you were thinking with the "generic validation method"? Doing this would dovetail well with my existing refactoring and I can probably try it out and see how it looks. I think it would give a lot more flexibility for the future if there are certain Entrys or Payloads that we need to purge but made it past prior validation. Here is just one example I found while testing:

// TESTCASE: validForAddOperation() should fail if Entry.receiversPubKey and Payload.ownerPubKey don't match
// XXXBUGXXX: The current code doesn't validate this mismatch, but it would create an added payload that could never
// be removed since the remove code requires Entry.receiversPubKey == Payload.ownerPubKey
@Test
public void isValidForAddOperation_EntryReceiverPayloadReceiverMismatch() throws NoSuchAlgorithmException, CryptoException {
KeyPair senderKeys = TestUtils.generateKeyPair();
KeyPair receiverKeys = TestUtils.generateKeyPair();
MailboxStoragePayload mailboxStoragePayload = buildMailboxStoragePayload(senderKeys.getPublic(), receiverKeys.getPublic());
ProtectedStorageEntry protectedStorageEntry = buildProtectedMailboxStorageEntry(mailboxStoragePayload, senderKeys, senderKeys.getPublic(), 1);
// should be assertFalse
Assert.assertTrue(protectedStorageEntry.isValidForAddOperation());
}

log.warn("We remove an invalid protectedStorageEntry: {}", protectedStorageEntry);
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
ByteArray hashOfPayload = get32ByteHashAsByteArray(protectedStoragePayload);

if (!map.containsKey(hashOfPayload)) {
return;
}

doRemoveProtectedExpirableData(protectedStorageEntry, hashOfPayload);
removeFromProtectedDataStore(protectedStorageEntry);

// We do not update the sequence number as that method is only called if we have received an invalid
// protectedStorageEntry from a previous add operation.

// We do not call maybeAddToRemoveAddOncePayloads to avoid that an invalid object might block a valid object
// which we might receive in future (could be potential attack).

// We do not broadcast as this is a local operation only to avoid our maps get polluted with invalid objects
// and as we do not check for ownership a node would not accept such a procedure if it would come from untrusted
// source (network).
}

private void removeFromProtectedDataStore(ProtectedStorageEntry protectedStorageEntry) {
ProtectedStoragePayload protectedStoragePayload = protectedStorageEntry.getProtectedStoragePayload();
if (protectedStoragePayload instanceof PersistablePayload) {
Expand Down