Skip to content

Commit

Permalink
Merge pull request bisq-network#4746 from chimp1984/cleanup-mailbox-m…
Browse files Browse the repository at this point in the history
…essages-at-closed-trades

Clean up mailbox messages for closed trades
  • Loading branch information
sqrrm authored Nov 4, 2020
2 parents 63cae1c + 5c79069 commit 83ee1fe
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 0 deletions.
10 changes: 10 additions & 0 deletions core/src/main/java/bisq/core/app/DomainInitialisation.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import bisq.core.support.dispute.refund.refundagent.RefundAgentManager;
import bisq.core.support.traderchat.TraderChatManager;
import bisq.core.trade.TradeManager;
import bisq.core.trade.closed.ClosedTradableManager;
import bisq.core.trade.failed.FailedTradesManager;
import bisq.core.trade.statistics.TradeStatisticsManager;
import bisq.core.trade.txproof.xmr.XmrTxProofService;
import bisq.core.user.User;
Expand Down Expand Up @@ -76,6 +78,8 @@ public class DomainInitialisation {
private final RefundManager refundManager;
private final TraderChatManager traderChatManager;
private final TradeManager tradeManager;
private final ClosedTradableManager closedTradableManager;
private final FailedTradesManager failedTradesManager;
private final XmrTxProofService xmrTxProofService;
private final OpenOfferManager openOfferManager;
private final Balances balances;
Expand Down Expand Up @@ -109,6 +113,8 @@ public DomainInitialisation(ClockWatcher clockWatcher,
RefundManager refundManager,
TraderChatManager traderChatManager,
TradeManager tradeManager,
ClosedTradableManager closedTradableManager,
FailedTradesManager failedTradesManager,
XmrTxProofService xmrTxProofService,
OpenOfferManager openOfferManager,
Balances balances,
Expand Down Expand Up @@ -140,6 +146,8 @@ public DomainInitialisation(ClockWatcher clockWatcher,
this.refundManager = refundManager;
this.traderChatManager = traderChatManager;
this.tradeManager = tradeManager;
this.closedTradableManager = closedTradableManager;
this.failedTradesManager = failedTradesManager;
this.xmrTxProofService = xmrTxProofService;
this.openOfferManager = openOfferManager;
this.balances = balances;
Expand Down Expand Up @@ -183,6 +191,8 @@ public void initDomainServices(Consumer<String> rejectedTxErrorMessageHandler,
traderChatManager.onAllServicesInitialized();

tradeManager.onAllServicesInitialized();
closedTradableManager.onAllServicesInitialized();
failedTradesManager.onAllServicesInitialized();
xmrTxProofService.onAllServicesInitialized();

openOfferManager.onAllServicesInitialized();
Expand Down
129 changes: 129 additions & 0 deletions core/src/main/java/bisq/core/trade/closed/CleanupMailboxMessages.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.core.trade.closed;

import bisq.core.trade.Trade;
import bisq.core.trade.messages.TradeMessage;

import bisq.network.p2p.AckMessage;
import bisq.network.p2p.AckMessageSourceType;
import bisq.network.p2p.BootstrapListener;
import bisq.network.p2p.DecryptedMessageWithPubKey;
import bisq.network.p2p.P2PService;

import bisq.common.crypto.PubKeyRing;
import bisq.common.proto.network.NetworkEnvelope;

import javax.inject.Inject;

import java.util.List;

import lombok.extern.slf4j.Slf4j;

/**
* Util for removing pending mailbox messages in case the trade has been closed by the seller after confirming receipt
* and a AckMessage as mailbox message will be sent by the buyer once they go online. In that case the seller's trade
* is closed already and the TradeProtocol is not executing the message processing, thus the mailbox message would not
* be removed. To ensure that in such cases (as well other potential cases in failure scenarios) the mailbox message
* gets removed from the network we use that util.
*
* This class must not be injected as a singleton!
*/
@Slf4j
public class CleanupMailboxMessages {
private final P2PService p2PService;

@Inject
public CleanupMailboxMessages(P2PService p2PService) {
this.p2PService = p2PService;
}

public void handleTrades(List<Trade> trades) {
// We wrap in a try catch as in failed trades we cannot be sure if expected data is set, so we could get
// a NullPointer and do not want that this escalate to the user.
try {
if (p2PService.isBootstrapped()) {
cleanupMailboxMessages(trades);
} else {
p2PService.addP2PServiceListener(new BootstrapListener() {
@Override
public void onUpdatedDataReceived() {
cleanupMailboxMessages(trades);
}
});
}
} catch (Throwable t) {
log.error("Cleanup mailbox messages failed. {}", t.toString());
}
}

private void cleanupMailboxMessages(List<Trade> trades) {
p2PService.getMailboxItemsByUid().values()
.stream().map(P2PService.MailboxItem::getDecryptedMessageWithPubKey)
.forEach(message -> handleDecryptedMessageWithPubKey(message, trades));
}

private void handleDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey,
List<Trade> trades) {
trades.forEach(trade -> handleDecryptedMessageWithPubKey(decryptedMessageWithPubKey, trade));
}

private void handleDecryptedMessageWithPubKey(DecryptedMessageWithPubKey decryptedMessageWithPubKey,
Trade trade) {
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
if (!isPubKeyValid(decryptedMessageWithPubKey, trade)) {
return;
}

if (networkEnvelope instanceof TradeMessage &&
isMyMessage((TradeMessage) networkEnvelope, trade)) {
removeEntryFromMailbox(decryptedMessageWithPubKey, trade);
} else if (networkEnvelope instanceof AckMessage &&
isMyMessage((AckMessage) networkEnvelope, trade)) {
removeEntryFromMailbox(decryptedMessageWithPubKey, trade);
}
}

private void removeEntryFromMailbox(DecryptedMessageWithPubKey decryptedMessageWithPubKey, Trade trade) {
log.info("We found a pending mailbox message ({}) for trade {}. As the trade is closed we remove the mailbox message.",
decryptedMessageWithPubKey.getNetworkEnvelope().getClass().getSimpleName(), trade.getId());
p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey);
}

private boolean isMyMessage(TradeMessage message, Trade trade) {
return message.getTradeId().equals(trade.getId());
}

private boolean isMyMessage(AckMessage ackMessage, Trade trade) {
return ackMessage.getSourceType() == AckMessageSourceType.TRADE_MESSAGE &&
ackMessage.getSourceId().equals(trade.getId());
}

private boolean isPubKeyValid(DecryptedMessageWithPubKey message, Trade trade) {
// We can only validate the peers pubKey if we have it already. If we are the taker we get it from the offer
// Otherwise it depends on the state of the trade protocol if we have received the peers pubKeyRing already.
PubKeyRing peersPubKeyRing = trade.getProcessModel().getTradingPeer().getPubKeyRing();
boolean isValid = true;
if (peersPubKeyRing != null &&
!message.getSignaturePubKey().equals(peersPubKeyRing.getSignaturePubKey())) {
isValid = false;
log.error("SignaturePubKey in message does not match the SignaturePubKey we have set for our trading peer.");
}
return isValid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,26 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ClosedTradableManager implements PersistedDataHost {
private final PersistenceManager<TradableList<Tradable>> persistenceManager;
private final TradableList<Tradable> closedTradables = new TradableList<>();
private final KeyRing keyRing;
private final PriceFeedService priceFeedService;
private final CleanupMailboxMessages cleanupMailboxMessages;
private final DumpDelayedPayoutTx dumpDelayedPayoutTx;

@Inject
public ClosedTradableManager(KeyRing keyRing,
PriceFeedService priceFeedService,
PersistenceManager<TradableList<Tradable>> persistenceManager,
CleanupMailboxMessages cleanupMailboxMessages,
DumpDelayedPayoutTx dumpDelayedPayoutTx) {
this.keyRing = keyRing;
this.priceFeedService = priceFeedService;
this.cleanupMailboxMessages = cleanupMailboxMessages;
this.dumpDelayedPayoutTx = dumpDelayedPayoutTx;
this.persistenceManager = persistenceManager;

Expand All @@ -72,6 +78,10 @@ public void readPersisted(Runnable completeHandler) {
completeHandler);
}

public void onAllServicesInitialized() {
cleanupMailboxMessages.handleTrades(getClosedTrades());
}

public void add(Tradable tradable) {
if (closedTradables.add(tradable)) {
persistenceManager.requestPersistence();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import bisq.core.trade.TradableList;
import bisq.core.trade.Trade;
import bisq.core.trade.TradeUtil;
import bisq.core.trade.closed.CleanupMailboxMessages;

import bisq.common.crypto.KeyRing;
import bisq.common.persistence.PersistenceManager;
Expand All @@ -49,6 +50,7 @@ public class FailedTradesManager implements PersistedDataHost {
private final KeyRing keyRing;
private final PriceFeedService priceFeedService;
private final BtcWalletService btcWalletService;
private final CleanupMailboxMessages cleanupMailboxMessages;
private final PersistenceManager<TradableList<Trade>> persistenceManager;
private final TradeUtil tradeUtil;
private final DumpDelayedPayoutTx dumpDelayedPayoutTx;
Expand All @@ -61,10 +63,12 @@ public FailedTradesManager(KeyRing keyRing,
BtcWalletService btcWalletService,
PersistenceManager<TradableList<Trade>> persistenceManager,
TradeUtil tradeUtil,
CleanupMailboxMessages cleanupMailboxMessages,
DumpDelayedPayoutTx dumpDelayedPayoutTx) {
this.keyRing = keyRing;
this.priceFeedService = priceFeedService;
this.btcWalletService = btcWalletService;
this.cleanupMailboxMessages = cleanupMailboxMessages;
this.dumpDelayedPayoutTx = dumpDelayedPayoutTx;
this.persistenceManager = persistenceManager;
this.tradeUtil = tradeUtil;
Expand All @@ -85,6 +89,10 @@ public void readPersisted(Runnable completeHandler) {
completeHandler);
}

public void onAllServicesInitialized() {
cleanupMailboxMessages.handleTrades(failedTrades.getList());
}

public void add(Trade trade) {
if (failedTrades.add(trade)) {
persistenceManager.requestPersistence();
Expand Down

0 comments on commit 83ee1fe

Please sign in to comment.