Skip to content

Commit

Permalink
Clean up mailbox messages for closed trades
Browse files Browse the repository at this point in the history
Util for removing pending mailbox messages in case the
trade has been closed by the seller after confirming receipt
and a AckMessage as mailbox message will be sent by the
buyer once they go online. In that case the seller's trade
is closed already and the TradeProtocol is not executing
the message processing, thus the mailbox message would not
be removed. To ensure that in such cases (as well other
potential cases in failure scenarios) the mailbox message
gets removed from the network we use that util.
  • Loading branch information
chimp1984 committed Nov 4, 2020
1 parent 63cae1c commit 5c79069
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 0 deletions.
10 changes: 10 additions & 0 deletions core/src/main/java/bisq/core/app/DomainInitialisation.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import bisq.core.support.dispute.refund.refundagent.RefundAgentManager;
import bisq.core.support.traderchat.TraderChatManager;
import bisq.core.trade.TradeManager;
import bisq.core.trade.closed.ClosedTradableManager;
import bisq.core.trade.failed.FailedTradesManager;
import bisq.core.trade.statistics.TradeStatisticsManager;
import bisq.core.trade.txproof.xmr.XmrTxProofService;
import bisq.core.user.User;
Expand Down Expand Up @@ -76,6 +78,8 @@ public class DomainInitialisation {
private final RefundManager refundManager;
private final TraderChatManager traderChatManager;
private final TradeManager tradeManager;
private final ClosedTradableManager closedTradableManager;
private final FailedTradesManager failedTradesManager;
private final XmrTxProofService xmrTxProofService;
private final OpenOfferManager openOfferManager;
private final Balances balances;
Expand Down Expand Up @@ -109,6 +113,8 @@ public DomainInitialisation(ClockWatcher clockWatcher,
RefundManager refundManager,
TraderChatManager traderChatManager,
TradeManager tradeManager,
ClosedTradableManager closedTradableManager,
FailedTradesManager failedTradesManager,
XmrTxProofService xmrTxProofService,
OpenOfferManager openOfferManager,
Balances balances,
Expand Down Expand Up @@ -140,6 +146,8 @@ public DomainInitialisation(ClockWatcher clockWatcher,
this.refundManager = refundManager;
this.traderChatManager = traderChatManager;
this.tradeManager = tradeManager;
this.closedTradableManager = closedTradableManager;
this.failedTradesManager = failedTradesManager;
this.xmrTxProofService = xmrTxProofService;
this.openOfferManager = openOfferManager;
this.balances = balances;
Expand Down Expand Up @@ -183,6 +191,8 @@ public void initDomainServices(Consumer<String> rejectedTxErrorMessageHandler,
traderChatManager.onAllServicesInitialized();

tradeManager.onAllServicesInitialized();
closedTradableManager.onAllServicesInitialized();
failedTradesManager.onAllServicesInitialized();
xmrTxProofService.onAllServicesInitialized();

openOfferManager.onAllServicesInitialized();
Expand Down
129 changes: 129 additions & 0 deletions core/src/main/java/bisq/core/trade/closed/CleanupMailboxMessages.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.core.trade.closed;

import bisq.core.trade.Trade;
import bisq.core.trade.messages.TradeMessage;

import bisq.network.p2p.AckMessage;
import bisq.network.p2p.AckMessageSourceType;
import bisq.network.p2p.BootstrapListener;
import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.P2PService;

import bisq.common.crypto.PubKeyRing;
import bisq.common.proto.network.NetworkEnvelope;

import javax.inject.Inject;

import java.util.List;

import lombok.extern.slf4j.Slf4j;

/**
* Util for removing pending mailbox messages in case the trade has been closed by the seller after confirming receipt
* and a AckMessage as mailbox message will be sent by the buyer once they go online. In that case the seller's trade
* is closed already and the TradeProtocol is not executing the message processing, thus the mailbox message would not
* be removed. To ensure that in such cases (as well other potential cases in failure scenarios) the mailbox message
* gets removed from the network we use that util.
*
* This class must not be injected as a singleton!
*/
@Slf4j
public class CleanupMailboxMessages {
private final P2PService p2PService;

@Inject
public CleanupMailboxMessages(P2PService p2PService) {
this.p2PService = p2PService;
}

public void handleTrades(List<Trade> trades) {
// We wrap in a try catch as in failed trades we cannot be sure if expected data is set, so we could get
// a NullPointer and do not want that this escalate to the user.
try {
if (p2PService.isBootstrapped()) {
cleanupMailboxMessages(trades);
} else {
p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
cleanupMailboxMessages(trades);
}
});
}
} catch (Throwable t) {
log.error("Cleanup mailbox messages failed. {}", t.toString());
}
}

private void cleanupMailboxMessages(List<Trade> trades) {
p2PService.getMailboxItemsByUid().values()
.stream().map(P2PService.MailboxItem::getDecryptedMessageWithPubKey)
.forEach(message -> handleDecryptedMessageWithPubKey(message, trades));
}

private void handleDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey,
List<Trade> trades) {
trades.forEach(trade -> handleDecryptedMessageWithPubKey(decryptedMessageWithPubKey, trade));
}

private void handleDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey,
Trade trade) {
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
if (!isPubKeyValid(decryptedMessageWithPubKey, trade)) {
return;
}

if (networkEnvelope instanceof TradeMessage &&
isMyMessage((TradeMessage) networkEnvelope, trade)) {
removeEntryFromMailbox(decryptedMessageWithPubKey, trade);
} else if (networkEnvelope instanceof AckMessage &&
isMyMessage((AckMessage) networkEnvelope, trade)) {
removeEntryFromMailbox(decryptedMessageWithPubKey, trade);
}
}

private void removeEntryFromMailbox(DecryptedMessageWithPubKey decryptedMessageWithPubKey, Trade trade) {
log.info("We found a pending mailbox message ({}) for trade {}. As the trade is closed we remove the mailbox message.",
decryptedMessageWithPubKey.getNetworkEnvelope().getClass().getSimpleName(), trade.getId());
p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey);
}

private boolean isMyMessage(TradeMessage message, Trade trade) {
return message.getTradeId().equals(trade.getId());
}

private boolean isMyMessage(AckMessage ackMessage, Trade trade) {
return ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE &&
ackMessage.getSourceId().equals(trade.getId());
}

private boolean isPubKeyValid(DecryptedMessageWithPubKey message, Trade trade) {
// We can only validate the peers pubKey if we have it already. If we are the taker we get it from the offer
// Otherwise it depends on the state of the trade protocol if we have received the peers pubKeyRing already.
PubKeyRing peersPubKeyRing = trade.getProcessModel().getTradingPeer().getPubKeyRing();
boolean isValid = true;
if (peersPubKeyRing != null &&
!message.getSignaturePubKey().equals(peersPubKeyRing.getSignaturePubKey())) {
isValid = false;
log.error("SignaturePubKey in message does not match the SignaturePubKey we have set for our trading peer.");
}
return isValid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,26 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ClosedTradableManager implements PersistedDataHost {
private final PersistenceManager<TradableList<Tradable>> persistenceManager;
private final TradableList<Tradable> closedTradables = new TradableList<>();
private final KeyRing keyRing;
private final PriceFeedService priceFeedService;
private final CleanupMailboxMessages cleanupMailboxMessages;
private final DumpDelayedPayoutTx dumpDelayedPayoutTx;

@Inject
public ClosedTradableManager(KeyRing keyRing,
PriceFeedService priceFeedService,
PersistenceManager<TradableList<Tradable>> persistenceManager,
CleanupMailboxMessages cleanupMailboxMessages,
DumpDelayedPayoutTx dumpDelayedPayoutTx) {
this.keyRing = keyRing;
this.priceFeedService = priceFeedService;
this.cleanupMailboxMessages = cleanupMailboxMessages;
this.dumpDelayedPayoutTx = dumpDelayedPayoutTx;
this.persistenceManager = persistenceManager;

Expand All @@ -72,6 +78,10 @@ public void readPersisted(Runnable completeHandler) {
completeHandler);
}

public void onAllServicesInitialized() {
cleanupMailboxMessages.handleTrades(getClosedTrades());
}

public void add(Tradable tradable) {
if (closedTradables.add(tradable)) {
persistenceManager.requestPersistence();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import bisq.core.trade.TradableList;
import bisq.core.trade.Trade;
import bisq.core.trade.TradeUtil;
import bisq.core.trade.closed.CleanupMailboxMessages;

import bisq.common.crypto.KeyRing;
import bisq.common.persistence.PersistenceManager;
Expand All @@ -49,6 +50,7 @@ public class FailedTradesManager implements PersistedDataHost {
private final KeyRing keyRing;
private final PriceFeedService priceFeedService;
private final BtcWalletService btcWalletService;
private final CleanupMailboxMessages cleanupMailboxMessages;
private final PersistenceManager<TradableList<Trade>> persistenceManager;
private final TradeUtil tradeUtil;
private final DumpDelayedPayoutTx dumpDelayedPayoutTx;
Expand All @@ -61,10 +63,12 @@ public FailedTradesManager(KeyRing keyRing,
BtcWalletService btcWalletService,
PersistenceManager<TradableList<Trade>> persistenceManager,
TradeUtil tradeUtil,
CleanupMailboxMessages cleanupMailboxMessages,
DumpDelayedPayoutTx dumpDelayedPayoutTx) {
this.keyRing = keyRing;
this.priceFeedService = priceFeedService;
this.btcWalletService = btcWalletService;
this.cleanupMailboxMessages = cleanupMailboxMessages;
this.dumpDelayedPayoutTx = dumpDelayedPayoutTx;
this.persistenceManager = persistenceManager;
this.tradeUtil = tradeUtil;
Expand All @@ -85,6 +89,10 @@ public void readPersisted(Runnable completeHandler) {
completeHandler);
}

public void onAllServicesInitialized() {
cleanupMailboxMessages.handleTrades(failedTrades.getList());
}

public void add(Trade trade) {
if (failedTrades.add(trade)) {
persistenceManager.requestPersistence();
Expand Down

0 comments on commit 5c79069

Please sign in to comment.