Skip to content

Commit

Permalink
Merge pull request #2617 from HenrikJannsen/use-blocking-calls-for-re…
Browse files Browse the repository at this point in the history
…publish-at-oracle

Use blocking calls at republish at oracle node
  • Loading branch information
HenrikJannsen authored Aug 11, 2024
2 parents 56ed432 + 6bc6bd8 commit 34c4ad5
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import bisq.bonded_roles.security_manager.alert.AuthorizedAlertData;
import bisq.common.application.Service;
import bisq.common.encoding.Hex;
import bisq.common.platform.MemoryReport;
import bisq.common.timer.Scheduler;
import bisq.common.util.CompletableFutureUtils;
import bisq.identity.Identity;
Expand Down Expand Up @@ -95,6 +96,7 @@ public static Bisq1BridgeService.Config from(com.typesafe.config.Config config)
private final PublicKey authorizedPublicKey;
private final boolean ignoreSecurityManager;
private final boolean staticPublicKeysProvided;

@Setter
private AuthorizedOracleNode authorizedOracleNode;
@Setter
Expand All @@ -103,7 +105,7 @@ public static Bisq1BridgeService.Config from(com.typesafe.config.Config config)
private Identity identity;

@Nullable
private Scheduler requestDoaDataScheduler, republishAuthorizedBondedRolesScheduler;
private Scheduler periodicRequestDoaDataScheduler, initialDelayScheduler;

public Bisq1BridgeService(Config config,
NetworkService networkService,
Expand Down Expand Up @@ -137,18 +139,34 @@ public CompletableFuture<Boolean> initialize() {
.whenComplete((result, throwable) -> {
networkService.addConfidentialMessageListener(this);
authorizedBondedRolesService.addListener(this);
requestDoaDataScheduler = Scheduler.run(this::requestDoaData).periodically(60, 5, TimeUnit.SECONDS);
republishAuthorizedBondedRolesScheduler = Scheduler.run(this::republishAuthorizedBondedRoles).after(60, TimeUnit.SECONDS);

initialDelayScheduler = Scheduler.run(() -> {
log.info("Start republishAuthorizedBondedRoles");
republishAuthorizedBondedRoles();
MemoryReport.logReport();
log.info("Completed republishAuthorizedBondedRoles");
log.info("Start request and publish DaoData");
requestDaoData().join();
MemoryReport.logReport();
log.info("Completed request and publish DaoData");
periodicRequestDoaDataScheduler = Scheduler.run(() -> {
log.info("periodicRequestDoaDataScheduler: Start requestDoaData");
requestDaoData().join();
;
MemoryReport.logReport();
log.info("periodicRequestDoaDataScheduler: Completed requestDoaData");
}).periodically(5, TimeUnit.SECONDS);
}).after(60, TimeUnit.SECONDS);
});
}

public CompletableFuture<Boolean> shutdown() {
log.info("shutdown");
if (requestDoaDataScheduler != null) {
requestDoaDataScheduler.stop();
if (periodicRequestDoaDataScheduler != null) {
periodicRequestDoaDataScheduler.stop();
}
if (republishAuthorizedBondedRolesScheduler != null) {
republishAuthorizedBondedRolesScheduler.stop();
if (initialDelayScheduler != null) {
initialDelayScheduler.stop();
}
networkService.removeConfidentialMessageListener(this);
authorizedBondedRolesService.removeListener(this);
Expand Down Expand Up @@ -226,65 +244,83 @@ private CompletableFuture<List<BondedReputationDto>> requestBondedReputations()
}

private CompletableFuture<Boolean> publishProofOfBurnDtoSet(List<ProofOfBurnDto> proofOfBurnList) {
// After v2.1.0 we can remove support for version 0 data
log.info("publishProofOfBurnDtoSet: proofOfBurnList={}", proofOfBurnList);
Stream<AuthorizedProofOfBurnData> oldVersions = proofOfBurnList.stream()
.map(dto -> new AuthorizedProofOfBurnData(
0,
dto.getBlockTime(),
dto.getAmount(),
Hex.decode(dto.getHash()),
dto.getBlockHeight(),
dto.getTxId(),
staticPublicKeysProvided));
Stream<AuthorizedProofOfBurnData> newVersions = proofOfBurnList.stream()
.map(dto -> new AuthorizedProofOfBurnData(
dto.getBlockTime(),
dto.getAmount(),
Hex.decode(dto.getHash()),
dto.getBlockHeight(),
dto.getTxId(),
staticPublicKeysProvided));
return CompletableFutureUtils.allOf(Stream.concat(oldVersions, newVersions)
.map(this::publishAuthorizedData)
.collect(Collectors.toList()))
.thenApply(results -> !results.contains(false));
return CompletableFuture.supplyAsync(() -> {
// After v2.1.0 we can remove support for version 0 data
log.info("publishProofOfBurnDtoSet: proofOfBurnList={}", proofOfBurnList);
Stream<AuthorizedProofOfBurnData> oldVersions = proofOfBurnList.stream()
.map(dto -> new AuthorizedProofOfBurnData(
0,
dto.getBlockTime(),
dto.getAmount(),
Hex.decode(dto.getHash()),
dto.getBlockHeight(),
dto.getTxId(),
staticPublicKeysProvided));
Stream<AuthorizedProofOfBurnData> newVersions = proofOfBurnList.stream()
.map(dto -> new AuthorizedProofOfBurnData(
dto.getBlockTime(),
dto.getAmount(),
Hex.decode(dto.getHash()),
dto.getBlockHeight(),
dto.getTxId(),
staticPublicKeysProvided));
return CompletableFutureUtils.allOf(Stream.concat(oldVersions, newVersions)
.map(this::publishAuthorizedData)
.collect(Collectors.toList()))
.thenApply(results -> !results.contains(false))
.join();
}, NetworkService.NETWORK_IO_POOL);
}

private CompletableFuture<Boolean> publishBondedReputationDtoSet(List<BondedReputationDto> bondedReputationList) {
// After v2.1.0 we can remove support for version 0 data
log.info("publishBondedReputationDtoSet: bondedReputationList={}", bondedReputationList);
Stream<AuthorizedBondedReputationData> oldVersions = bondedReputationList.stream()
.map(dto -> new AuthorizedBondedReputationData(
0,
dto.getBlockTime(),
dto.getAmount(),
Hex.decode(dto.getHash()),
dto.getLockTime(),
dto.getBlockHeight(),
dto.getTxId(),
staticPublicKeysProvided));
Stream<AuthorizedBondedReputationData> newVersions = bondedReputationList.stream()
.map(dto -> new AuthorizedBondedReputationData(
dto.getBlockTime(),
dto.getAmount(),
Hex.decode(dto.getHash()),
dto.getLockTime(),
dto.getBlockHeight(),
dto.getTxId(),
staticPublicKeysProvided));
return CompletableFutureUtils.allOf(Stream.concat(oldVersions, newVersions)
.map(this::publishAuthorizedData)
.collect(Collectors.toList()))
.thenApply(results -> !results.contains(false));
return CompletableFuture.supplyAsync(() -> {
// After v2.1.0 we can remove support for version 0 data
log.info("publishBondedReputationDtoSet: bondedReputationList={}", bondedReputationList);
Stream<AuthorizedBondedReputationData> oldVersions = bondedReputationList.stream()
.map(dto -> new AuthorizedBondedReputationData(
0,
dto.getBlockTime(),
dto.getAmount(),
Hex.decode(dto.getHash()),
dto.getLockTime(),
dto.getBlockHeight(),
dto.getTxId(),
staticPublicKeysProvided));
Stream<AuthorizedBondedReputationData> newVersions = bondedReputationList.stream()
.map(dto -> new AuthorizedBondedReputationData(
dto.getBlockTime(),
dto.getAmount(),
Hex.decode(dto.getHash()),
dto.getLockTime(),
dto.getBlockHeight(),
dto.getTxId(),
staticPublicKeysProvided));
return CompletableFutureUtils.allOf(Stream.concat(oldVersions, newVersions)
.map(this::publishAuthorizedData)
.collect(Collectors.toList()))
.thenApply(results -> !results.contains(false))
.join();
}, NetworkService.NETWORK_IO_POOL);
}

private CompletableFuture<Boolean> publishAuthorizedData(AuthorizedDistributedData data) {
return networkService.publishAuthorizedData(data,
identity.getNetworkIdWithKeyPair().getKeyPair(),
authorizedPrivateKey,
authorizedPublicKey)
.thenApply(broadCastDataResult -> true);
.thenApply(broadCastDataResult -> {
int numSuccess = broadCastDataResult.stream()
.mapToInt(e -> {
try {
e.join();
return 1;
} catch (Exception ex) {
return 0;
}
})
.sum();
return numSuccess == broadCastDataResult.size();
});
}

private CompletableFuture<Boolean> removeAuthorizedData(AuthorizedDistributedData authorizedDistributedData) {
Expand All @@ -294,35 +330,43 @@ private CompletableFuture<Boolean> removeAuthorizedData(AuthorizedDistributedDat
.thenApply(broadCastDataResult -> true);
}

private void republishAuthorizedBondedRoles() {
networkService.getDataService()
.ifPresent(dataService -> {
dataService.getAuthorizedData()
.map(AuthorizedData::getAuthorizedDistributedData)
.filter(authorizedDistributedData -> authorizedDistributedData instanceof AuthorizedBondedRole)
.map(authorizedDistributedData -> (AuthorizedBondedRole) authorizedDistributedData)
.forEach(authorizedBondedRole -> {
Optional<AuthorizedOracleNode> authorizingOracleNode = authorizedBondedRole.getAuthorizingOracleNode();
String bondUserName = authorizedBondedRole.getBondUserName();
if (authorizingOracleNode.isPresent()) {
if (authorizingOracleNode.get().getProfileId().equals(authorizedOracleNode.getProfileId())) {
log.info("Republish AuthorizedBondedRole with bondUserName {}. authorizedOracleNode={}",
bondUserName, authorizedOracleNode.getBondUserName());
publishAuthorizedData(authorizedBondedRole);
} else {
log.info("Cannot republish AuthorizedBondedRole with bondUserName {} because we are not the authorizedOracleNode for that data",
bondUserName);
private boolean republishAuthorizedBondedRoles() {
List<AuthorizedBondedRole> list = networkService.getDataService().stream()
.flatMap(dataService -> dataService.getAuthorizedData()
.map(AuthorizedData::getAuthorizedDistributedData)
.filter(authorizedDistributedData -> authorizedDistributedData instanceof AuthorizedBondedRole)
.map(authorizedDistributedData -> (AuthorizedBondedRole) authorizedDistributedData))
.toList();
int numSuccess = list.stream()
.map(authorizedBondedRole -> {
Optional<AuthorizedOracleNode> authorizingOracleNode = authorizedBondedRole.getAuthorizingOracleNode();
String bondUserName = authorizedBondedRole.getBondUserName();
if (authorizingOracleNode.isPresent()) {
if (authorizingOracleNode.get().getProfileId().equals(authorizedOracleNode.getProfileId())) {
log.info("Republish AuthorizedBondedRole with bondUserName {}. authorizedOracleNode={}",
bondUserName, authorizedOracleNode.getBondUserName());
try {
return publishAuthorizedData(authorizedBondedRole).get();
} catch (Exception e) {
return false;
}
} else {
log.info("Cannot republish AuthorizedBondedRole with bondUserName {} because authorizedOracleNode is missing",
log.info("Cannot republish AuthorizedBondedRole with bondUserName {} because we are not the authorizedOracleNode for that data",
bondUserName);
}
});
});
} else {
log.info("Cannot republish AuthorizedBondedRole with bondUserName {} because authorizedOracleNode is missing",
bondUserName);
}
return false;
}
).mapToInt(success -> success ? 1 : 0)
.sum();
return list.size() == numSuccess;
}

private CompletableFuture<Boolean> requestDoaData() {
log.info("requestDoaData");
private CompletableFuture<Boolean> requestDaoData() {
log.info("requestDaoData");
return requestProofOfBurnTxs()
.thenCompose(this::publishProofOfBurnDtoSet)
.thenCompose(result -> requestBondedReputations())
Expand Down Expand Up @@ -438,7 +482,8 @@ private void processAuthorizeSignedWitnessRequest(AuthorizeSignedWitnessRequest
}
}

private void processBondedRoleRegistrationRequest(BondedRoleRegistrationRequest request, PublicKey senderPublicKey) {
private void processBondedRoleRegistrationRequest(BondedRoleRegistrationRequest request,
PublicKey senderPublicKey) {
log.info("processBondedRoleRegistrationRequest {}", request);
String profileId = request.getProfileId();

Expand Down
3 changes: 2 additions & 1 deletion common/src/main/java/bisq/common/timer/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package bisq.common.timer;

import bisq.common.threading.ExecutorFactory;
import bisq.common.util.StringUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -48,7 +49,7 @@ public static Scheduler run(Runnable task) {
}

public Scheduler name(String threadName) {
this.threadName = Optional.of(threadName);
this.threadName = Optional.of(StringUtils.truncate(threadName, 10));
return this;
}

Expand Down

0 comments on commit 34c4ad5

Please sign in to comment.