Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up trade statistics from duplicate entries #3476

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ public final class TradeStatistics2 implements LazyProcessedPayload, Persistable
private final long tradeDate;
private final String depositTxId;

// hash get set in constructor from json of all the other data fields (with hash = null).
// Hash get set in constructor from json of all the other data fields (with hash = null).
@JsonExclude
private final byte[] hash;
// PB field signature_pub_key_bytes not used anymore from v0.6 on

// Should be only used in emergency case if we need to add data but do not want to break backward compatibility
// at the P2P network storage checks. The hash of the object will be used to verify if the data is valid. Any new
// field in a class would break that hash and therefore break the storage mechanism.
@Nullable
@JsonExclude
private Map<String, String> extraDataMap;

public TradeStatistics2(OfferPayload offerPayload,
Expand Down Expand Up @@ -152,12 +154,14 @@ public TradeStatistics2(OfferPayload.Direction direction,
this.depositTxId = depositTxId;
this.extraDataMap = ExtraDataMapValidator.getValidatedExtraDataMap(extraDataMap);

if (hash == null)
// We create hash from all fields excluding hash itself. We use json as simple data serialisation.
// tradeDate is different for both peers so we ignore it for hash.
this.hash = Hash.getSha256Ripemd160hash(Utilities.objectToJson(this).getBytes(Charsets.UTF_8));
else
this.hash = hash;
this.hash = hash == null ? createHash() : hash;
}

public byte[] createHash() {
// We create hash from all fields excluding hash itself. We use json as simple data serialisation.
// TradeDate is different for both peers so we ignore it for hash. ExtraDataMap is ignored as well as at
// software updates we might have different entries which would cause a different hash.
return Hash.getSha256Ripemd160hash(Utilities.objectToJson(this).getBytes(Charsets.UTF_8));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@

import java.io.File;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;

import static com.google.common.base.Preconditions.checkArgument;

@Slf4j
public class TradeStatistics2StorageService extends MapStoreService<TradeStatistics2Store, PersistableNetworkPayload> {
private static final String FILE_NAME = "TradeStatistics2Store";
Expand Down Expand Up @@ -70,6 +70,19 @@ public boolean canHandle(PersistableNetworkPayload payload) {
return payload instanceof TradeStatistics2;
}

Collection<TradeStatistics2> cleanupMap(Collection<TradeStatistics2> collection) {
Map<P2PDataStorage.ByteArray, TradeStatistics2> tempMap = new HashMap<>();
// We recreate the hash as there have been duplicates from diff. extraMap entries introduced at software updates
collection.forEach(item -> tempMap.putIfAbsent(new P2PDataStorage.ByteArray(item.createHash()), item));

Map<P2PDataStorage.ByteArray, PersistableNetworkPayload> map = getMap();
map.clear();
map.putAll(tempMap);
persist();

return tempMap.values();
}


///////////////////////////////////////////////////////////////////////////////////////////
// Protected
Expand All @@ -83,8 +96,5 @@ protected TradeStatistics2Store createStore() {
@Override
protected void readStore() {
super.readStore();
checkArgument(store instanceof TradeStatistics2Store,
"Store is not instance of TradeStatistics2Store. That can happen if the ProtoBuffer " +
"file got changed. We clear the data store and recreated it again.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
import java.io.File;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -60,9 +62,11 @@ public class TradeStatisticsManager {
private final JsonFileManager jsonFileManager;
private final P2PService p2PService;
private final PriceFeedService priceFeedService;
private final TradeStatistics2StorageService tradeStatistics2StorageService;
private final ReferralIdService referralIdService;
private final boolean dumpStatistics;
private final ObservableSet<TradeStatistics2> observableTradeStatisticsSet = FXCollections.observableSet();
private int duplicates = 0;

@Inject
public TradeStatisticsManager(P2PService p2PService,
Expand All @@ -74,6 +78,7 @@ public TradeStatisticsManager(P2PService p2PService,
@Named(AppOptionKeys.DUMP_STATISTICS) boolean dumpStatistics) {
this.p2PService = p2PService;
this.priceFeedService = priceFeedService;
this.tradeStatistics2StorageService = tradeStatistics2StorageService;
this.referralIdService = referralIdService;
this.dumpStatistics = dumpStatistics;
jsonFileManager = new JsonFileManager(storageDir);
Expand All @@ -97,16 +102,36 @@ public void onAllServicesInitialized() {

p2PService.getP2PDataStorage().addAppendOnlyDataStoreListener(payload -> {
if (payload instanceof TradeStatistics2)
addToMap((TradeStatistics2) payload, true);
addToSet((TradeStatistics2) payload);
});

Map<String, TradeStatistics2> map = new HashMap<>();
AtomicInteger origSize = new AtomicInteger();
p2PService.getP2PDataStorage().getAppendOnlyDataStoreMap().values().stream()
.filter(e -> e instanceof TradeStatistics2)
.map(e -> (TradeStatistics2) e)
.filter(TradeStatistics2::isValid)
.forEach(e -> addToMap(e, map));
observableTradeStatisticsSet.addAll(map.values());
.forEach(tradeStatistics -> {
origSize.getAndIncrement();
TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics);
if (prevValue != null) {
duplicates++;
}
});

Collection<TradeStatistics2> items = map.values();
// At startup we check if we have duplicate entries. This might be the case from software updates when we
// introduced new entries to the extraMap. As that map is for flexibility in updates we keep it excluded from
// json so that it will not cause duplicates anymore. Until all users have updated we keep the cleanup code.
// Should not be needed later anymore, but will also not hurt if no duplicates exist.
if (duplicates > 0) {
long ts = System.currentTimeMillis();
items = tradeStatistics2StorageService.cleanupMap(items);
log.info("We found {} duplicate entries. Size of map entries before and after cleanup: {} / {}. Cleanup took {} ms.",
duplicates, origSize, items.size(), System.currentTimeMillis() - ts);
}

observableTradeStatisticsSet.addAll(items);

priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet);

Expand All @@ -131,7 +156,7 @@ public void publishTradeStatistics(List<Trade> trades) {
trade.getDate(),
(trade.getDepositTx() != null ? trade.getDepositTx().getHashAsString() : ""),
extraDataMap);
addToMap(tradeStatistics, true);
addToSet(tradeStatistics);

// We only republish trades from last 10 days
if ((new Date().getTime() - trade.getDate().getTime()) < TimeUnit.DAYS.toMillis(10)) {
Expand All @@ -149,30 +174,22 @@ public ObservableSet<TradeStatistics2> getObservableTradeStatisticsSet() {
return observableTradeStatisticsSet;
}

private void addToMap(TradeStatistics2 tradeStatistics, boolean storeLocally) {
private void addToSet(TradeStatistics2 tradeStatistics) {
if (!observableTradeStatisticsSet.contains(tradeStatistics)) {

if (observableTradeStatisticsSet.stream()
.anyMatch(e -> (e.getOfferId().equals(tradeStatistics.getOfferId()))))
if (observableTradeStatisticsSet.stream().anyMatch(e -> e.getOfferId().equals(tradeStatistics.getOfferId()))) {
return;
}

if (!tradeStatistics.isValid())
if (!tradeStatistics.isValid()) {
return;
}

observableTradeStatisticsSet.add(tradeStatistics);
if (storeLocally) {
priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet);
dump();
}
priceFeedService.applyLatestBisqMarketPrice(observableTradeStatisticsSet);
dump();
}
}

private void addToMap(TradeStatistics2 tradeStatistics, Map<String, TradeStatistics2> map) {
TradeStatistics2 prevValue = map.putIfAbsent(tradeStatistics.getOfferId(), tradeStatistics);
if (prevValue != null)
log.trace("We have already an item with the same offer ID. That might happen if both the maker and the taker published the tradeStatistics");
}

private void dump() {
if (dumpStatistics) {
// We store the statistics as json so it is easy for further processing (e.g. for web based services)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
@Slf4j
class RequestDataHandler implements MessageListener {
private static final long TIMEOUT = 90;
private static boolean initialRequestApplied = false;

private NodeAddress peersNodeAddress;
/*
*/
Expand Down Expand Up @@ -240,7 +242,12 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
// Processing 82645 items took now 61 ms compared to earlier version where it took ages (> 2min).
// Usually we only get about a few hundred or max. a few 1000 items. 82645 is all
// trade stats stats and all account age witness data.
dataStorage.addPersistableNetworkPayloadFromInitialRequest(e);

// We only apply it once from first response
if (!initialRequestApplied) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we only handle the first requests for the PersistableNetworkPayload objects, but all of them for the ProtectedStoragePayload objects? Do we expect more inconsistencies between the ProtectedStorageEntrys so we use all of them? Or, were the PersistableNetworkPayload handlers not dealing with duplicates well?

I haven't been able to follow the entire TradeStatstics issue, but I am just trying to understand more about what is expected/unexpected in this startup path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hash issue was caused by the extraMap where at certain updates data got added and combined with the republishing of the objects created dupicates entries as the hashes have been different due the added entries in that map (was used for tracking usage of arbitrators and mediators for a optimized selection of them).

The optimisation for handling the PersistableNetworkPayload objects was to avoid performance spikes when processing a large set of PersistableNetworkPayload objects. For ProtectedStorageEntrys we do not expect such high amount of objects. PersistableNetworkPayload are mainly Tradestatistic objects and AccountAgeWitnes objects and if a user have not been online for longer time those can be several 1000 objects which caused longer processing time if the normal method was used where unnecessary checks (as we got those obejcts at startup and not at runtime later) have caused most of the cpu costs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think I understand. So we can have up to two loops through the ProtectedStoragePayload path and only one through the PersistableNetworkPayload path. The code handles duplicates just fine, but processing 2x the PersistableNetworkPayload objects takes a lot of time and we don't expect processing them from the second seed to provide meaningful data. Thanks for the rundown.

dataStorage.addPersistableNetworkPayloadFromInitialRequest(e);
initialRequestApplied = true;
}
} else {
// We don't broadcast here as we are only connected to the seed node and would be pointless
dataStorage.addPersistableNetworkPayload(e, sender, false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ public boolean addPersistableNetworkPayload(PersistableNetworkPayload payload,
// Overwriting an entry would be also no issue. We also skip notifying listeners as we get called before the domain
// is ready so no listeners are set anyway. We might get called twice from a redundant call later, so listeners
// might be added then but as we have the data already added calling them would be irrelevant as well.
// TODO find a way to avoid the second call...
public boolean addPersistableNetworkPayloadFromInitialRequest(PersistableNetworkPayload payload) {
byte[] hash = payload.getHash();
if (payload.verifyHashSize()) {
Expand Down