From 16dda9565c152140a30ccfa04f438fdf9fb0857b Mon Sep 17 00:00:00 2001 From: chimp1984 <54558767+chimp1984@users.noreply.github.com> Date: Mon, 28 Oct 2019 06:13:17 -0500 Subject: [PATCH] Clean up trade statistics from duplicate entries (#3476) * Clean up trade statistics from duplicate entries At software updates we added new entries to the extraMap which caused duplicate entries (if one if the traders was on the new and the other on the old version or at republishing). We set it now json exclude so avoid that in future and clean up the map. * Avoid repeated calls to addPersistableNetworkPayloadFromInitialRequest For trade stat cleanup we don't want to apply it multiple times as it is a bit expensive. We get from each seed node the initial data response and would pollute with the second response our map again and if our node is a seed node, the seed node itself could not get into a clean state and would continue pollution other nodes. * Refactor Remove not used param Rename method Inline method Cleanups --- .../trade/statistics/TradeStatistics2.java | 18 +++--- .../TradeStatistics2StorageService.java | 20 +++++-- .../statistics/TradeStatisticsManager.java | 55 ++++++++++++------- .../p2p/peers/getdata/RequestDataHandler.java | 9 ++- .../network/p2p/storage/P2PDataStorage.java | 1 - 5 files changed, 70 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java index 2cd22ad92b4..7a12248d76e 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2.java @@ -82,7 +82,8 @@ public final class TradeStatistics2 implements LazyProcessedPayload, Persistable private final long tradeDate; private final String depositTxId; - // hash get set in constructor from json of all the other data fields (with hash = null). + // Hash get set in constructor from json of all the other data fields (with hash = null). + @JsonExclude private final byte[] hash; // PB field signature_pub_key_bytes not used anymore from v0.6 on @@ -90,6 +91,7 @@ public final class TradeStatistics2 implements LazyProcessedPayload, Persistable // at the P2P network storage checks. The hash of the object will be used to verify if the data is valid. Any new // field in a class would break that hash and therefore break the storage mechanism. @Nullable + @JsonExclude private Map extraDataMap; public TradeStatistics2(OfferPayload offerPayload, @@ -152,12 +154,14 @@ public TradeStatistics2(OfferPayload.Direction direction, this.depositTxId = depositTxId; this.extraDataMap = ExtraDataMapValidator.getValidatedExtraDataMap(extraDataMap); - if (hash == null) - // We create hash from all fields excluding hash itself. We use json as simple data serialisation. - // tradeDate is different for both peers so we ignore it for hash. - this.hash = Hash.getSha256Ripemd160hash(Utilities.objectToJson(this).getBytes(Charsets.UTF_8)); - else - this.hash = hash; + this.hash = hash == null ? createHash() : hash; + } + + public byte[] createHash() { + // We create hash from all fields excluding hash itself. We use json as simple data serialisation. + // TradeDate is different for both peers so we ignore it for hash. ExtraDataMap is ignored as well as at + // software updates we might have different entries which would cause a different hash. + return Hash.getSha256Ripemd160hash(Utilities.objectToJson(this).getBytes(Charsets.UTF_8)); } @Override diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java index cb684ec9353..a06ca30c67b 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatistics2StorageService.java @@ -29,12 +29,12 @@ import java.io.File; +import java.util.Collection; +import java.util.HashMap; import java.util.Map; import lombok.extern.slf4j.Slf4j; -import static com.google.common.base.Preconditions.checkArgument; - @Slf4j public class TradeStatistics2StorageService extends MapStoreService { private static final String FILE_NAME = "TradeStatistics2Store"; @@ -70,6 +70,19 @@ public boolean canHandle(PersistableNetworkPayload payload) { return payload instanceof TradeStatistics2; } + Collection cleanupMap(Collection collection) { + Map tempMap = new HashMap<>(); + // We recreate the hash as there have been duplicates from diff. extraMap entries introduced at software updates + collection.forEach(item -> tempMap.putIfAbsent(new P2PDataStorage.ByteArray(item.createHash()), item)); + + Map map = getMap(); + map.clear(); + map.putAll(tempMap); + persist(); + + return tempMap.values(); + } + /////////////////////////////////////////////////////////////////////////////////////////// // Protected @@ -83,8 +96,5 @@ protected TradeStatistics2Store createStore() { @Override protected void readStore() { super.readStore(); - checkArgument(store instanceof TradeStatistics2Store, - "Store is not instance of TradeStatistics2Store. That can happen if the ProtoBuffer " + - "file got changed. We clear the data store and recreated it again."); } } diff --git a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java index 3f40cd660bd..78d6b845044 100644 --- a/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java +++ b/core/src/main/java/bisq/core/trade/statistics/TradeStatisticsManager.java @@ -43,11 +43,13 @@ import java.io.File; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -60,9 +62,11 @@ public class TradeStatisticsManager { private final JsonFileManager jsonFileManager; private final P2PService p2PService; private final PriceFeedService priceFeedService; + private final TradeStatistics2StorageService tradeStatistics2StorageService; private final ReferralIdService referralIdService; private final boolean dumpStatistics; private final ObservableSet observableTradeStatisticsSet = FXCollections.observableSet(); + private int duplicates = 0; @Inject public TradeStatisticsManager(P2PService p2PService, @@ -74,6 +78,7 @@ public TradeStatisticsManager(P2PService p2PService, @Named(AppOptionKeys.DUMP_STATISTICS) boolean dumpStatistics) { this.p2PService = p2PService; this.priceFeedService = priceFeedService; + this.tradeStatistics2StorageService = tradeStatistics2StorageService; this.referralIdService = referralIdService; this.dumpStatistics = dumpStatistics; jsonFileManager = new JsonFileManager(storageDir); @@ -97,16 +102,36 @@ public void onAllServicesInitialized() { p2PService.getP2PDataStorage().addAppendOnlyDataStoreListener(payload -> { if (payload instanceof TradeStatistics2) - addToMap((TradeStatistics2) payload, true); + addToSet((TradeStatistics2) payload); }); Map map = new HashMap<>(); + AtomicInteger origSize = new AtomicInteger(); p2PService.getP2PDataStorage().getAppendOnlyDataStoreMap().values().stream() .filter(e -> e instanceof TradeStatistics2) .map(e -> (TradeStatistics2) e) .filter(TradeStatistics2::isValid) - .forEach(e -> addToMap(e, map)); - observableTradeStatisticsSet.addAll(map.values()); + .forEach(tradeStatistics -> { + origSize.getAndIncrement(); + TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics); + if (prevValue != null) { + duplicates++; + } + }); + + Collection items = map.values(); + // At startup we check if we have duplicate entries. This might be the case from software updates when we + // introduced new entries to the extraMap. As that map is for flexibility in updates we keep it excluded from + // json so that it will not cause duplicates anymore. Until all users have updated we keep the cleanup code. + // Should not be needed later anymore, but will also not hurt if no duplicates exist. + if (duplicates > 0) { + long ts = System.currentTimeMillis(); + items = tradeStatistics2StorageService.cleanupMap(items); + log.info("We found {} duplicate entries. Size of map entries before and after cleanup: {} / {}. Cleanup took {} ms.", + duplicates, origSize, items.size(), System.currentTimeMillis() - ts); + } + + observableTradeStatisticsSet.addAll(items); priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); @@ -131,7 +156,7 @@ public void publishTradeStatistics(List trades) { trade.getDate(), (trade.getDepositTx() != null ? trade.getDepositTx().getHashAsString() : ""), extraDataMap); - addToMap(tradeStatistics, true); + addToSet(tradeStatistics); // We only republish trades from last 10 days if ((new Date().getTime() - trade.getDate().getTime()) < TimeUnit.DAYS.toMillis(10)) { @@ -149,30 +174,22 @@ public ObservableSet getObservableTradeStatisticsSet() { return observableTradeStatisticsSet; } - private void addToMap(TradeStatistics2 tradeStatistics, boolean storeLocally) { + private void addToSet(TradeStatistics2 tradeStatistics) { if (!observableTradeStatisticsSet.contains(tradeStatistics)) { - - if (observableTradeStatisticsSet.stream() - .anyMatch(e -> (e.getOfferId().equals(tradeStatistics.getOfferId())))) + if (observableTradeStatisticsSet.stream().anyMatch(e -> e.getOfferId().equals(tradeStatistics.getOfferId()))) { return; + } - if (!tradeStatistics.isValid()) + if (!tradeStatistics.isValid()) { return; + } observableTradeStatisticsSet.add(tradeStatistics); - if (storeLocally) { - priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); - dump(); - } + priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet); + dump(); } } - private void addToMap(TradeStatistics2 tradeStatistics, Map map) { - TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics); - if (prevValue != null) - log.trace("We have already an item with the same offer ID. That might happen if both the maker and the taker published the tradeStatistics"); - } - private void dump() { if (dumpStatistics) { // We store the statistics as json so it is easy for further processing (e.g. for web based services) diff --git a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java index c97dae0f5ce..b699138e5f5 100644 --- a/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java +++ b/p2p/src/main/java/bisq/network/p2p/peers/getdata/RequestDataHandler.java @@ -58,6 +58,8 @@ @Slf4j class RequestDataHandler implements MessageListener { private static final long TIMEOUT = 90; + private static boolean initialRequestApplied = false; + private NodeAddress peersNodeAddress; /* */ @@ -240,7 +242,12 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) { // Processing 82645 items took now 61 ms compared to earlier version where it took ages (> 2min). // Usually we only get about a few hundred or max. a few 1000 items. 82645 is all // trade stats stats and all account age witness data. - dataStorage.addPersistableNetworkPayloadFromInitialRequest(e); + + // We only apply it once from first response + if (!initialRequestApplied) { + dataStorage.addPersistableNetworkPayloadFromInitialRequest(e); + initialRequestApplied = true; + } } else { // We don't broadcast here as we are only connected to the seed node and would be pointless dataStorage.addPersistableNetworkPayload(e, sender, false, diff --git a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java index be956d0fb04..f01157f3fcc 100644 --- a/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java +++ b/p2p/src/main/java/bisq/network/p2p/storage/P2PDataStorage.java @@ -348,7 +348,6 @@ public boolean addPersistableNetworkPayload(PersistableNetworkPayload payload, // Overwriting an entry would be also no issue. We also skip notifying listeners as we get called before the domain // is ready so no listeners are set anyway. We might get called twice from a redundant call later, so listeners // might be added then but as we have the data already added calling them would be irrelevant as well. - // TODO find a way to avoid the second call... public boolean addPersistableNetworkPayloadFromInitialRequest(PersistableNetworkPayload payload) { byte[] hash = payload.getHash(); if (payload.verifyHashSize()) {