Skip to content

Commit

Permalink
Merge pull request #5072 from chimp1984/persist-and-republish-mailbox…
Browse files Browse the repository at this point in the history
…-messages

Persist and republish mailbox messages
  • Loading branch information
sqrrm authored Jan 16, 2021
2 parents 44cbea8 + 5d13fdc commit d3971ef
Show file tree
Hide file tree
Showing 69 changed files with 1,486 additions and 722 deletions.
10 changes: 10 additions & 0 deletions common/src/main/java/bisq/common/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public class Config {
public static final String API_PASSWORD = "apiPassword";
public static final String API_PORT = "apiPort";
public static final String PREVENT_PERIODIC_SHUTDOWN_AT_SEED_NODE = "preventPeriodicShutdownAtSeedNode";
public static final String REPUBLISH_MAILBOX_ENTRIES = "republishMailboxEntries";

// Default values for certain options
public static final int UNSPECIFIED_PORT = -1;
Expand Down Expand Up @@ -207,6 +208,7 @@ public class Config {
public final String apiPassword;
public final int apiPort;
public final boolean preventPeriodicShutdownAtSeedNode;
public final boolean republishMailboxEntries;

// Properties derived from options but not exposed as options themselves
public final File torDir;
Expand Down Expand Up @@ -648,6 +650,13 @@ public Config(String defaultAppName, File defaultUserDataDir, String... args) {
.ofType(boolean.class)
.defaultsTo(false);

ArgumentAcceptingOptionSpec<Boolean> republishMailboxEntriesOpt =
parser.accepts(REPUBLISH_MAILBOX_ENTRIES,
"Republish mailbox messages at startup")
.withRequiredArg()
.ofType(boolean.class)
.defaultsTo(false);

try {
CompositeOptionSet options = new CompositeOptionSet();

Expand Down Expand Up @@ -764,6 +773,7 @@ public Config(String defaultAppName, File defaultUserDataDir, String... args) {
this.apiPassword = options.valueOf(apiPasswordOpt);
this.apiPort = options.valueOf(apiPortOpt);
this.preventPeriodicShutdownAtSeedNode = options.valueOf(preventPeriodicShutdownAtSeedNodeOpt);
this.republishMailboxEntries = options.valueOf(republishMailboxEntriesOpt);
} catch (OptionException ex) {
throw new ConfigException("problem parsing option '%s': %s",
ex.options().get(0),
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/bisq/core/alert/Alert.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
@ToString
@Slf4j
public final class Alert implements ProtectedStoragePayload, ExpirablePayload {
public static final long TTL = TimeUnit.DAYS.toMillis(90);

private final String message;
private final boolean isUpdateInfo;
private final String version;
Expand Down Expand Up @@ -131,7 +133,7 @@ public static Alert fromProto(protobuf.Alert proto) {

@Override
public long getTTL() {
return TimeUnit.DAYS.toMillis(90);
return TTL;
}

public void setSigAndPubKey(String signatureAsBase64, PublicKey ownerPubKey) {
Expand Down
27 changes: 19 additions & 8 deletions core/src/main/java/bisq/core/alert/PrivateNotificationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.P2PService;
import bisq.network.p2p.SendMailboxMessageListener;
import bisq.network.p2p.mailbox.MailboxMessageService;

import bisq.common.app.DevEnv;
import bisq.common.config.Config;
Expand Down Expand Up @@ -49,20 +50,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import static org.bitcoinj.core.Utils.HEX;

public class PrivateNotificationManager {
private static final Logger log = LoggerFactory.getLogger(PrivateNotificationManager.class);

private final P2PService p2PService;
private final MailboxMessageService mailboxMessageService;
private final KeyRing keyRing;
private final ObjectProperty<PrivateNotificationPayload> privateNotificationMessageProperty = new SimpleObjectProperty<>();

// Pub key for developer global privateNotification message
private final String pubKeyAsHex;

private ECKey privateNotificationSigningKey;
private DecryptedMessageWithPubKey decryptedMessageWithPubKey;
@Nullable
private PrivateNotificationMessage privateNotificationMessage;


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -71,26 +76,27 @@ public class PrivateNotificationManager {

@Inject
public PrivateNotificationManager(P2PService p2PService,
MailboxMessageService mailboxMessageService,
KeyRing keyRing,
@Named(Config.IGNORE_DEV_MSG) boolean ignoreDevMsg,
@Named(Config.USE_DEV_PRIVILEGE_KEYS) boolean useDevPrivilegeKeys) {
this.p2PService = p2PService;
this.mailboxMessageService = mailboxMessageService;
this.keyRing = keyRing;

if (!ignoreDevMsg) {
this.p2PService.addDecryptedDirectMessageListener(this::handleMessage);
this.p2PService.addDecryptedMailboxListener(this::handleMessage);
this.mailboxMessageService.addDecryptedMailboxListener(this::handleMessage);
}
pubKeyAsHex = useDevPrivilegeKeys ?
DevEnv.DEV_PRIVILEGE_PUB_KEY :
"02ba7c5de295adfe57b60029f3637a2c6b1d0e969a8aaefb9e0ddc3a7963f26925";
}

private void handleMessage(DecryptedMessageWithPubKey decryptedMessageWithPubKey, NodeAddress senderNodeAddress) {
this.decryptedMessageWithPubKey = decryptedMessageWithPubKey;
NetworkEnvelope networkEnvelope = decryptedMessageWithPubKey.getNetworkEnvelope();
if (networkEnvelope instanceof PrivateNotificationMessage) {
PrivateNotificationMessage privateNotificationMessage = (PrivateNotificationMessage) networkEnvelope;
privateNotificationMessage = (PrivateNotificationMessage) networkEnvelope;
log.info("Received PrivateNotificationMessage from {} with uid={}",
senderNodeAddress, privateNotificationMessage.getUid());
if (privateNotificationMessage.getSenderNodeAddress().equals(senderNodeAddress)) {
Expand All @@ -112,8 +118,11 @@ public ReadOnlyObjectProperty<PrivateNotificationPayload> privateNotificationPro
return privateNotificationMessageProperty;
}

public boolean sendPrivateNotificationMessageIfKeyIsValid(PrivateNotificationPayload privateNotification, PubKeyRing pubKeyRing, NodeAddress peersNodeAddress,
String privKeyString, SendMailboxMessageListener sendMailboxMessageListener) {
public boolean sendPrivateNotificationMessageIfKeyIsValid(PrivateNotificationPayload privateNotification,
PubKeyRing pubKeyRing,
NodeAddress peersNodeAddress,
String privKeyString,
SendMailboxMessageListener sendMailboxMessageListener) {
boolean isKeyValid = isKeyValid(privKeyString);
if (isKeyValid) {
signAndAddSignatureToPrivateNotificationMessage(privateNotification);
Expand All @@ -123,7 +132,7 @@ public boolean sendPrivateNotificationMessageIfKeyIsValid(PrivateNotificationPay
UUID.randomUUID().toString());
log.info("Send {} to peer {}. uid={}",
message.getClass().getSimpleName(), peersNodeAddress, message.getUid());
p2PService.sendEncryptedMailboxMessage(peersNodeAddress,
mailboxMessageService.sendEncryptedMailboxMessage(peersNodeAddress,
pubKeyRing,
message,
sendMailboxMessageListener);
Expand All @@ -133,7 +142,9 @@ public boolean sendPrivateNotificationMessageIfKeyIsValid(PrivateNotificationPay
}

public void removePrivateNotification() {
p2PService.removeMailboxMsg(decryptedMessageWithPubKey);
if (privateNotificationMessage != null) {
mailboxMessageService.removeMailboxMsg(privateNotificationMessage);
}
}

private boolean isKeyValid(String privKeyString) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@

package bisq.core.alert;

import bisq.network.p2p.MailboxMessage;
import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.mailbox.MailboxMessage;

import bisq.common.app.Version;
import bisq.common.proto.network.NetworkEnvelope;

import java.util.concurrent.TimeUnit;

import lombok.EqualsAndHashCode;
import lombok.Value;

@EqualsAndHashCode(callSuper = true)
@Value
public class PrivateNotificationMessage extends NetworkEnvelope implements MailboxMessage {
public static final long TTL = TimeUnit.DAYS.toMillis(30);

private final PrivateNotificationPayload privateNotificationPayload;
private final NodeAddress senderNodeAddress;
private final String uid;
Expand Down Expand Up @@ -70,4 +74,9 @@ public static PrivateNotificationMessage fromProto(protobuf.PrivateNotificationM
proto.getUid(),
messageVersion);
}

@Override
public long getTTL() {
return TTL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE)
public class TempProposalPayload implements ProcessOncePersistableNetworkPayload, ProtectedStoragePayload,
ExpirablePayload, PersistablePayload {
// We keep data 2 months to be safe if we increase durations of cycle. Also give a bit more resilience in case
// of any issues with the append-only data store
public static final long TTL = TimeUnit.DAYS.toMillis(60);

protected final Proposal proposal;
protected final byte[] ownerPubKeyEncoded;
Expand Down Expand Up @@ -124,8 +127,6 @@ public PublicKey getOwnerPubKey() {

@Override
public long getTTL() {
// We keep data 2 months to be safe if we increase durations of cycle. Also give a bit more resilience in case
// of any issues with the append-only data store
return TimeUnit.DAYS.toMillis(60);
return TTL;
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/bisq/core/dao/node/full/RpcService.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ void setup(ResultHandler resultHandler, Consumer<Throwable> errorHandler) {
nodeConfig.setProperty("node.bitcoind.rpc.port", Integer.toString(rpcPort));
nodeConfig.setProperty("node.bitcoind.notification.block.port", Integer.toString(rpcBlockPort));
nodeConfig.setProperty("node.bitcoind.notification.block.host", rpcBlockHost);
nodeConfig.setProperty("node.bitcoind.notification.alert.port", Integer.toString(bisq.network.p2p.Utils.findFreeSystemPort()));
nodeConfig.setProperty("node.bitcoind.notification.wallet.port", Integer.toString(bisq.network.p2p.Utils.findFreeSystemPort()));
nodeConfig.setProperty("node.bitcoind.notification.alert.port", Integer.toString(bisq.network.utils.Utils.findFreeSystemPort()));
nodeConfig.setProperty("node.bitcoind.notification.wallet.port", Integer.toString(bisq.network.utils.Utils.findFreeSystemPort()));

nodeConfig.setProperty("node.bitcoind.http.auth_scheme", "Basic");
BtcdClientImpl client = new BtcdClientImpl(httpProvider, nodeConfig);
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/bisq/core/filter/Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
@Slf4j
@Value
public final class Filter implements ProtectedStoragePayload, ExpirablePayload {
public static final long TTL = TimeUnit.DAYS.toMillis(180);

private final List<String> bannedOfferIds;
private final List<String> nodeAddressesBannedFromTrading;
private final List<String> bannedAutoConfExplorers;
Expand Down Expand Up @@ -361,7 +363,7 @@ public static Filter fromProto(protobuf.Filter proto) {

@Override
public long getTTL() {
return TimeUnit.DAYS.toMillis(180);
return TTL;
}

@Override
Expand Down
10 changes: 2 additions & 8 deletions core/src/main/java/bisq/core/filter/FilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -490,19 +490,13 @@ private void onFilterAddedFromNetwork(Filter newFilter) {
if (currentFilter != null) {
if (currentFilter.getCreationDate() > newFilter.getCreationDate()) {
log.warn("We received a new filter from the network but the creation date is older than the " +
"filter we have already. We ignore the new filter.\n" +
"New filer={}\n" +
"Old filter={}",
newFilter, filterProperty.get());
"filter we have already. We ignore the new filter.");

addToInvalidFilters(newFilter);
return;
} else {
log.warn("We received a new filter from the network and the creation date is newer than the " +
"filter we have already. We ignore the old filter.\n" +
"New filer={}\n" +
"Old filter={}",
newFilter, filterProperty.get());
"filter we have already. We ignore the old filter.");
addToInvalidFilters(currentFilter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

@Slf4j
public class GetInventoryRequester implements MessageListener, ConnectionListener {
private final static int TIMEOUT_SEC = 90;
private final static int TIMEOUT_SEC = 180;

private final NetworkNode networkNode;
private final NodeAddress nodeAddress;
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/bisq/core/offer/OfferPayload.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
@Getter
@Slf4j
public final class OfferPayload implements ProtectedStoragePayload, ExpirablePayload, RequiresOwnerIsOnlinePayload {
public static final long TTL = TimeUnit.MINUTES.toMillis(9);

///////////////////////////////////////////////////////////////////////////////////////////
// Enum
Expand Down Expand Up @@ -373,7 +374,7 @@ public static OfferPayload fromProto(protobuf.OfferPayload proto) {

@Override
public long getTTL() {
return TimeUnit.MINUTES.toMillis(9);
return TTL;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@
import bisq.core.user.PreferencesPayload;
import bisq.core.user.UserPayload;

import bisq.network.p2p.MailboxMessageList;
import bisq.network.p2p.mailbox.IgnoredMailboxMap;
import bisq.network.p2p.mailbox.MailboxMessageList;
import bisq.network.p2p.peers.peerexchange.PeerList;
import bisq.network.p2p.storage.persistence.RemovedPayloadsMap;
import bisq.network.p2p.storage.persistence.SequenceNumberMap;

import bisq.common.proto.ProtobufferRuntimeException;
Expand Down Expand Up @@ -135,6 +136,8 @@ public PersistableEnvelope fromProto(protobuf.PersistableEnvelope proto) {
return MailboxMessageList.fromProto(proto.getMailboxMessageList(), networkProtoResolver);
case IGNORED_MAILBOX_MAP:
return IgnoredMailboxMap.fromProto(proto.getIgnoredMailboxMap());
case REMOVED_PAYLOADS_MAP:
return RemovedPayloadsMap.fromProto(proto.getRemovedPayloadsMap());
default:
throw new ProtobufferRuntimeException("Unknown proto message case(PB.PersistableEnvelope). " +
"messageCase=" + proto.getMessageCase() + "; proto raw data=" + proto.toString());
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/bisq/core/setup/CorePersistedDataHost.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@
import bisq.core.user.Preferences;
import bisq.core.user.User;

import bisq.network.p2p.P2PService;
import bisq.network.p2p.mailbox.IgnoredMailboxService;
import bisq.network.p2p.mailbox.MailboxMessageService;
import bisq.network.p2p.peers.PeerManager;
import bisq.network.p2p.storage.P2PDataStorage;
import bisq.network.p2p.storage.persistence.RemovedPayloadsService;

import bisq.common.config.Config;
import bisq.common.proto.persistable.PersistedDataHost;
Expand Down Expand Up @@ -68,8 +69,9 @@ public static List<PersistedDataHost> getPersistedDataHosts(Injector injector) {
persistedDataHosts.add(injector.getInstance(RefundDisputeListService.class));
persistedDataHosts.add(injector.getInstance(P2PDataStorage.class));
persistedDataHosts.add(injector.getInstance(PeerManager.class));
persistedDataHosts.add(injector.getInstance(P2PService.class));
persistedDataHosts.add(injector.getInstance(MailboxMessageService.class));
persistedDataHosts.add(injector.getInstance(IgnoredMailboxService.class));
persistedDataHosts.add(injector.getInstance(RemovedPayloadsService.class));

if (injector.getInstance(Config.class).daoActivated) {
persistedDataHosts.add(injector.getInstance(BallotListService.class));
Expand Down
Loading

0 comments on commit d3971ef

Please sign in to comment.