Skip to content

Commit

Permalink
Clean up trade statistics from duplicate entries (#3476)
Browse files Browse the repository at this point in the history
* Clean up trade statistics from duplicate entries

At software updates we added new entries to the extraMap which caused
duplicate entries (if one if the traders was on the new and the other on
the old version or at republishing). We set it now json exclude so avoid
that in future and clean up the map.

* Avoid repeated calls to addPersistableNetworkPayloadFromInitialRequest

For trade stat cleanup we don't want to apply it multiple times as it
is a bit expensive. We get from each seed node the initial data response
and would pollute with the second response our map again and if our node
is a seed node, the seed node itself could not get into a clean state and
would continue pollution other nodes.

* Refactor

Remove not used param
Rename method
Inline method
Cleanups
  • Loading branch information
chimp1984 authored and sqrrm committed Oct 28, 2019
1 parent bc2ca8d commit 16dda95
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ public final class TradeStatistics2 implements LazyProcessedPayload, Persistable
private final long tradeDate;
private final String depositTxId;

// hash get set in constructor from json of all the other data fields (with hash = null).
// Hash get set in constructor from json of all the other data fields (with hash = null).
@JsonExclude
private final byte[] hash;
// PB field signature_pub_key_bytes not used anymore from v0.6 on

// Should be only used in emergency case if we need to add data but do not want to break backward compatibility
// at the P2P network storage checks. The hash of the object will be used to verify if the data is valid. Any new
// field in a class would break that hash and therefore break the storage mechanism.
@Nullable
@JsonExclude
private Map<String, String> extraDataMap;

public TradeStatistics2(OfferPayload offerPayload,
Expand Down Expand Up @@ -152,12 +154,14 @@ public TradeStatistics2(OfferPayload.Direction direction,
this.depositTxId = depositTxId;
this.extraDataMap = ExtraDataMapValidator.getValidatedExtraDataMap(extraDataMap);

if (hash == null)
// We create hash from all fields excluding hash itself. We use json as simple data serialisation.
// tradeDate is different for both peers so we ignore it for hash.
this.hash = Hash.getSha256Ripemd160hash(Utilities.objectToJson(this).getBytes(Charsets.UTF_8));
else
this.hash = hash;
this.hash = hash == null ? createHash() : hash;
}

public byte[] createHash() {
// We create hash from all fields excluding hash itself. We use json as simple data serialisation.
// TradeDate is different for both peers so we ignore it for hash. ExtraDataMap is ignored as well as at
// software updates we might have different entries which would cause a different hash.
return Hash.getSha256Ripemd160hash(Utilities.objectToJson(this).getBytes(Charsets.UTF_8));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@

import java.io.File;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;

import static com.google.common.base.Preconditions.checkArgument;

@Slf4j
public class TradeStatistics2StorageService extends MapStoreService<TradeStatistics2Store, PersistableNetworkPayload> {
private static final String FILE_NAME = "TradeStatistics2Store";
Expand Down Expand Up @@ -70,6 +70,19 @@ public boolean canHandle(PersistableNetworkPayload payload) {
return payload instanceof TradeStatistics2;
}

Collection<TradeStatistics2> cleanupMap(Collection<TradeStatistics2> collection) {
Map<P2PDataStorage.ByteArray, TradeStatistics2> tempMap = new HashMap<>();
// We recreate the hash as there have been duplicates from diff. extraMap entries introduced at software updates
collection.forEach(item -> tempMap.putIfAbsent(new P2PDataStorage.ByteArray(item.createHash()), item));

Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = getMap();
map.clear();
map.putAll(tempMap);
persist();

return tempMap.values();
}


///////////////////////////////////////////////////////////////////////////////////////////
// Protected
Expand All @@ -83,8 +96,5 @@ protected TradeStatistics2Store createStore() {
@Override
protected void readStore() {
super.readStore();
checkArgument(store instanceof TradeStatistics2Store,
"Store is not instance of TradeStatistics2Store. That can happen if the ProtoBuffer " +
"file got changed. We clear the data store and recreated it again.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
import java.io.File;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -60,9 +62,11 @@ public class TradeStatisticsManager {
private final JsonFileManager jsonFileManager;
private final P2PService p2PService;
private final PriceFeedService priceFeedService;
private final TradeStatistics2StorageService tradeStatistics2StorageService;
private final ReferralIdService referralIdService;
private final boolean dumpStatistics;
private final ObservableSet<TradeStatistics2> observableTradeStatisticsSet = FXCollections.observableSet();
private int duplicates = 0;

@Inject
public TradeStatisticsManager(P2PService p2PService,
Expand All @@ -74,6 +78,7 @@ public TradeStatisticsManager(P2PService p2PService,
@Named(AppOptionKeys.DUMP_STATISTICS) boolean dumpStatistics) {
this.p2PService = p2PService;
this.priceFeedService = priceFeedService;
this.tradeStatistics2StorageService = tradeStatistics2StorageService;
this.referralIdService = referralIdService;
this.dumpStatistics = dumpStatistics;
jsonFileManager = new JsonFileManager(storageDir);
Expand All @@ -97,16 +102,36 @@ public void onAllServicesInitialized() {

p2PService.getP2PDataStorage().addAppendOnlyDataStoreListener(payload -> {
if (payload instanceof TradeStatistics2)
addToMap((TradeStatistics2) payload, true);
addToSet((TradeStatistics2) payload);
});

Map<String, TradeStatistics2> map = new HashMap<>();
AtomicInteger origSize = new AtomicInteger();
p2PService.getP2PDataStorage().getAppendOnlyDataStoreMap().values().stream()
.filter(e -> e instanceof TradeStatistics2)
.map(e -> (TradeStatistics2) e)
.filter(TradeStatistics2::isValid)
.forEach(e -> addToMap(e, map));
observableTradeStatisticsSet.addAll(map.values());
.forEach(tradeStatistics -> {
origSize.getAndIncrement();
TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics);
if (prevValue != null) {
duplicates++;
}
});

Collection<TradeStatistics2> items = map.values();
// At startup we check if we have duplicate entries. This might be the case from software updates when we
// introduced new entries to the extraMap. As that map is for flexibility in updates we keep it excluded from
// json so that it will not cause duplicates anymore. Until all users have updated we keep the cleanup code.
// Should not be needed later anymore, but will also not hurt if no duplicates exist.
if (duplicates > 0) {
long ts = System.currentTimeMillis();
items = tradeStatistics2StorageService.cleanupMap(items);
log.info("We found {} duplicate entries. Size of map entries before and after cleanup: {} / {}. Cleanup took {} ms.",
duplicates, origSize, items.size(), System.currentTimeMillis() - ts);
}

observableTradeStatisticsSet.addAll(items);

priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet);

Expand All @@ -131,7 +156,7 @@ public void publishTradeStatistics(List<Trade> trades) {
trade.getDate(),
(trade.getDepositTx() != null ? trade.getDepositTx().getHashAsString() : ""),
extraDataMap);
addToMap(tradeStatistics, true);
addToSet(tradeStatistics);

// We only republish trades from last 10 days
if ((new Date().getTime() - trade.getDate().getTime()) < TimeUnit.DAYS.toMillis(10)) {
Expand All @@ -149,30 +174,22 @@ public ObservableSet<TradeStatistics2> getObservableTradeStatisticsSet() {
return observableTradeStatisticsSet;
}

private void addToMap(TradeStatistics2 tradeStatistics, boolean storeLocally) {
private void addToSet(TradeStatistics2 tradeStatistics) {
if (!observableTradeStatisticsSet.contains(tradeStatistics)) {

if (observableTradeStatisticsSet.stream()
.anyMatch(e -> (e.getOfferId().equals(tradeStatistics.getOfferId()))))
if (observableTradeStatisticsSet.stream().anyMatch(e -> e.getOfferId().equals(tradeStatistics.getOfferId()))) {
return;
}

if (!tradeStatistics.isValid())
if (!tradeStatistics.isValid()) {
return;
}

observableTradeStatisticsSet.add(tradeStatistics);
if (storeLocally) {
priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet);
dump();
}
priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet);
dump();
}
}

private void addToMap(TradeStatistics2 tradeStatistics, Map<String, TradeStatistics2> map) {
TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics);
if (prevValue != null)
log.trace("We have already an item with the same offer ID. That might happen if both the maker and the taker published the tradeStatistics");
}

private void dump() {
if (dumpStatistics) {
// We store the statistics as json so it is easy for further processing (e.g. for web based services)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
@Slf4j
class RequestDataHandler implements MessageListener {
private static final long TIMEOUT = 90;
private static boolean initialRequestApplied = false;

private NodeAddress peersNodeAddress;
/*
*/
Expand Down Expand Up @@ -240,7 +242,12 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
// Processing 82645 items took now 61 ms compared to earlier version where it took ages (> 2min).
// Usually we only get about a few hundred or max. a few 1000 items. 82645 is all
// trade stats stats and all account age witness data.
dataStorage.addPersistableNetworkPayloadFromInitialRequest(e);

// We only apply it once from first response
if (!initialRequestApplied) {
dataStorage.addPersistableNetworkPayloadFromInitialRequest(e);
initialRequestApplied = true;
}
} else {
// We don't broadcast here as we are only connected to the seed node and would be pointless
dataStorage.addPersistableNetworkPayload(e, sender, false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ public boolean addPersistableNetworkPayload(PersistableNetworkPayload payload,
// Overwriting an entry would be also no issue. We also skip notifying listeners as we get called before the domain
// is ready so no listeners are set anyway. We might get called twice from a redundant call later, so listeners
// might be added then but as we have the data already added calling them would be irrelevant as well.
// TODO find a way to avoid the second call...
public boolean addPersistableNetworkPayloadFromInitialRequest(PersistableNetworkPayload payload) {
byte[] hash = payload.getHash();
if (payload.verifyHashSize()) {
Expand Down

0 comments on commit 16dda95

Please sign in to comment.