Skip to content

Commit

Permalink
Add reset method to trade protocol.
Browse files Browse the repository at this point in the history
Add map for maintaining pending protocols by offerID.

Fixes a bug when the same taker got an early failure in a take offer attempt before the maker removes the offer and the taker did not persist the failed trade, thus the protection to not be permitted to take the same offer after a failure does not prevent another take offer attempt of the same taker. In such a case the maker has the old pending protocol listening for the trade messages and both the old and new protocol instance process those messages. In case the model data has changes (e.g. diff. inputs) this can cause a failed trade.
We do not remove the message listeners on failures to tolerate minor failures which can be recovered by resend routines.
There could be more solid fixes for that issue but this approach seems to carry less risks.

Signed-off-by: HenrikJannsen <[email protected]>
  • Loading branch information
HenrikJannsen committed Dec 26, 2022
1 parent 72b55f4 commit 8c9c3c7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
41 changes: 35 additions & 6 deletions core/src/main/java/bisq/core/trade/TradeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,18 @@ public class TradeManager implements PersistedDataHost, DecryptedDirectMessageLi
private final Provider provider;
private final ClockWatcher clockWatcher;

private final Map<String, TradeProtocol> tradeProtocolByTradeId = new HashMap<>();
// We use uid for that map not the trade ID
private final Map<String, TradeProtocol> tradeProtocolByTradeUid = new HashMap<>();

// We maintain a map with trade (offer) ID to reset a pending trade protocol for the same offer.
// Pending trade protocol could happen in edge cases when an early error did not cause a removal of the
// offer and the same peer takes the offer later again. Usually it is prevented for the taker to take again after a
// failure but that is only based on failed trades state and it can be that either the taker deletes the failed trades
// file or it was not persisted. Such rare cases could lead to a pending protocol and when taker takes again the
// offer the message listener from the old pending protocol gets invoked and processes the messages based on
// potentially outdated model data (e.g. old inputs).
private final Map<String, TradeProtocol> pendingTradeProtocolByTradeId = new HashMap<>();

private final PersistenceManager<TradableList<Trade>> persistenceManager;
private final TradableList<Trade> tradableList = new TradableList<>();
@Getter
Expand Down Expand Up @@ -408,15 +419,20 @@ public void onUpdatedDataReceived() {

public TradeProtocol getTradeProtocol(TradeModel trade) {
String uid = trade.getUid();
if (tradeProtocolByTradeId.containsKey(uid)) {
return tradeProtocolByTradeId.get(uid);
if (tradeProtocolByTradeUid.containsKey(uid)) {
return tradeProtocolByTradeUid.get(uid);
} else {
TradeProtocol tradeProtocol = TradeProtocolFactory.getNewTradeProtocol(trade);
TradeProtocol prev = tradeProtocolByTradeId.put(uid, tradeProtocol);
TradeProtocol prev = tradeProtocolByTradeUid.put(uid, tradeProtocol);
if (prev != null) {
log.error("We had already an entry with uid {}", trade.getUid());
}

TradeProtocol pending = pendingTradeProtocolByTradeId.put(trade.getId(), tradeProtocol);
if (pending != null) {
pending.reset();
}

return tradeProtocol;
}
}
Expand Down Expand Up @@ -618,14 +634,19 @@ public void onTakeBsqSwapOffer(Offer offer,

private TradeProtocol createTradeProtocol(TradeModel tradeModel) {
TradeProtocol tradeProtocol = TradeProtocolFactory.getNewTradeProtocol(tradeModel);
TradeProtocol prev = tradeProtocolByTradeId.put(tradeModel.getUid(), tradeProtocol);
TradeProtocol prev = tradeProtocolByTradeUid.put(tradeModel.getUid(), tradeProtocol);
if (prev != null) {
log.error("We had already an entry with uid {}", tradeModel.getUid());
}
if (tradeModel instanceof Trade) {
tradableList.add((Trade) tradeModel);
}

TradeProtocol pending = pendingTradeProtocolByTradeId.put(tradeModel.getId(), tradeProtocol);
if (pending != null) {
pending.reset();
}

// For BsqTrades we only store the trade at completion

return tradeProtocol;
Expand Down Expand Up @@ -866,7 +887,15 @@ public boolean wasOfferAlreadyUsedInTrade(String offerId) {
combinedStream = Stream.concat(combinedStream,
closedTradableManager.getObservableList().stream());

return combinedStream.anyMatch(t -> t.getOffer().getId().equals(offerId));
List<Tradable> list1 = getPendingAndBsqSwapTrades().collect(Collectors.toList());
List<Tradable> list2 = failedTradesManager.getObservableList().stream().collect(Collectors.toList());
List<Tradable> list3 = closedTradableManager.getObservableList().stream().collect(Collectors.toList());


List<Tradable> list = combinedStream.collect(Collectors.toList());

boolean b = list.stream().anyMatch(t -> t.getOffer().getId().equals(offerId));
return b;
}

public boolean isBuyer(Offer offer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ public void onWithdrawCompleted() {
cleanup();
}

// Resets a potentially pending protocol
public void reset() {
tradeModel.setErrorMessage("Outdated pending protocol got reset.");
protocolModel.getP2PService().removeDecryptedDirectMessageListener(this);
}

protected void onMailboxMessage(TradeMessage message, NodeAddress peerNodeAddress) {
log.info("Received {} as MailboxMessage from {} with tradeId {} and uid {}",
message.getClass().getSimpleName(), peerNodeAddress, message.getTradeId(), message.getUid());
Expand Down

0 comments on commit 8c9c3c7

Please sign in to comment.