Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve shutdown routine #4927

Merged
merged 3 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/java/bisq/core/app/BisqExecutable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import bisq.core.dao.DaoSetup;
import bisq.core.dao.node.full.RpcService;
import bisq.core.offer.OpenOfferManager;
import bisq.core.provider.price.PriceFeedService;
import bisq.core.setup.CorePersistedDataHost;
import bisq.core.setup.CoreSetup;
import bisq.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
Expand Down Expand Up @@ -227,12 +228,14 @@ public void gracefulShutDown(ResultHandler resultHandler) {
}

try {
injector.getInstance(PriceFeedService.class).shutDown();
injector.getInstance(ArbitratorManager.class).shutDown();
injector.getInstance(TradeStatisticsManager.class).shutDown();
injector.getInstance(XmrTxProofService.class).shutDown();
injector.getInstance(RpcService.class).shutDown();
injector.getInstance(DaoSetup.class).shutDown();
injector.getInstance(AvoidStandbyModeService.class).shutDown();
log.info("OpenOfferManager shutdown started");
injector.getInstance(OpenOfferManager.class).shutDown(() -> {
log.info("OpenOfferManager shutdown completed");

Expand Down Expand Up @@ -265,7 +268,7 @@ public void gracefulShutDown(ResultHandler resultHandler) {

// Wait max 20 sec.
UserThread.runAfter(() -> {
log.warn("Timeout triggered resultHandler");
log.warn("Graceful shut down not completed in 20 sec. We trigger our timeout handler.");
if (!hasDowngraded) {
// If user tried to downgrade we do not write the persistable data to avoid data corruption
PersistenceManager.flushAllDataToDisk(() -> {
Expand Down
1 change: 0 additions & 1 deletion core/src/main/java/bisq/core/offer/OfferBookService.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ public List<Offer> getOffers() {
}

public void removeOfferAtShutDown(OfferPayload offerPayload) {
log.debug("removeOfferAtShutDown " + offerPayload);
removeOffer(offerPayload, null, null);
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/java/bisq/core/offer/OpenOfferManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,12 @@ public void shutDown(@Nullable Runnable completeHandler) {
UserThread.execute(() -> openOffers.forEach(
openOffer -> offerBookService.removeOfferAtShutDown(openOffer.getOffer().getOfferPayload())
));
if (completeHandler != null)
UserThread.runAfter(completeHandler, size * 200 + 500, TimeUnit.MILLISECONDS);
if (completeHandler != null) {
// For typical number of offers we are tolerant with delay to give enough time to broadcast.
// If number of offers is very high we limit to 3 sec. to not delay other shutdown routines.
int delay = Math.min(3000, size * 200 + 500);
UserThread.runAfter(completeHandler, delay, TimeUnit.MILLISECONDS);
}
} else {
if (completeHandler != null)
completeHandler.run();
Expand Down
28 changes: 20 additions & 8 deletions core/src/main/java/bisq/core/provider/price/PriceFeedService.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class PriceFeedService {
private String baseUrlOfRespondingProvider;
@Nullable
private Timer requestTimer;
@Nullable
private PriceRequest priceRequest;


///////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -115,10 +117,20 @@ public PriceFeedService(@SuppressWarnings("SameParameterValue") PriceNodeHttpCli
// API
///////////////////////////////////////////////////////////////////////////////////////////

public void shutDown() {
if (requestTimer != null) {
requestTimer.stop();
requestTimer = null;
}
if (priceRequest != null) {
priceRequest.shutDown();
}
}

public void setCurrencyCodeOnInit() {
if (getCurrencyCode() == null) {
final TradeCurrency preferredTradeCurrency = preferences.getPreferredTradeCurrency();
final String code = preferredTradeCurrency != null ? preferredTradeCurrency.getCode() : "USD";
TradeCurrency preferredTradeCurrency = preferences.getPreferredTradeCurrency();
String code = preferredTradeCurrency != null ? preferredTradeCurrency.getCode() : "USD";
setCurrencyCode(code);
}
}
Expand Down Expand Up @@ -180,8 +192,8 @@ private void request(boolean repeatRequests) {
}
}, (errorMessage, throwable) -> {
if (throwable instanceof PriceRequestException) {
final String baseUrlOfFaultyRequest = ((PriceRequestException) throwable).priceProviderBaseUrl;
final String baseUrlOfCurrentRequest = priceProvider.getBaseUrl();
String baseUrlOfFaultyRequest = ((PriceRequestException) throwable).priceProviderBaseUrl;
String baseUrlOfCurrentRequest = priceProvider.getBaseUrl();
if (baseUrlOfFaultyRequest != null && baseUrlOfCurrentRequest.equals(baseUrlOfFaultyRequest)) {
log.warn("We received an error: baseUrlOfCurrentRequest={}, baseUrlOfFaultyRequest={}",
baseUrlOfCurrentRequest, baseUrlOfFaultyRequest);
Expand Down Expand Up @@ -223,7 +235,7 @@ private void retryWithNewProvider() {
UserThread.runAfter(() -> {
retryDelay = Math.min(retryDelay + 5, PERIOD_SEC);

final String oldBaseUrl = priceProvider.getBaseUrl();
String oldBaseUrl = priceProvider.getBaseUrl();
setNewPriceProvider();
log.warn("We received an error at the request from provider {}. " +
"We select the new provider {} and use that for a new request. retryDelay was {} sec.", oldBaseUrl, priceProvider.getBaseUrl(), retryDelay);
Expand Down Expand Up @@ -381,15 +393,15 @@ private boolean applyPriceToConsumer() {
}

private void requestAllPrices(PriceProvider provider, Runnable resultHandler, FaultHandler faultHandler) {
PriceRequest priceRequest = new PriceRequest();
priceRequest = new PriceRequest();
SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> future = priceRequest.requestAllPrices(provider);
Futures.addCallback(future, new FutureCallback<Tuple2<Map<String, Long>, Map<String, MarketPrice>>>() {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Tuple2<Map<String, Long>, Map<String, MarketPrice>> result) {
UserThread.execute(() -> {
checkNotNull(result, "Result must not be null at requestAllPrices");
// Each currency rate has a different timestamp, depending on when
// the pricenode aggregate rate was calculated
// the priceNode aggregate rate was calculated
// However, the request timestamp is when the pricenode was queried
epochInMillisAtLastRequest = System.currentTimeMillis();

Expand Down
17 changes: 14 additions & 3 deletions core/src/main/java/bisq/core/provider/price/PriceProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,18 @@
@Slf4j
public class PriceProvider extends HttpClientProvider {

private boolean shutDownRequested;

// Do not use Guice here as we might create multiple instances
public PriceProvider(HttpClient httpClient, String baseUrl) {
super(httpClient, baseUrl, false);
}

public Tuple2<Map<String, Long>, Map<String, MarketPrice>> getAll() throws IOException {
if (shutDownRequested) {
return new Tuple2<>(new HashMap<>(), new HashMap<>());
}

Map<String, MarketPrice> marketPriceMap = new HashMap<>();
String hsVersion = "";
if (P2PService.getMyNodeAddress() != null)
Expand All @@ -66,10 +72,10 @@ public Tuple2<Map<String, Long>, Map<String, MarketPrice>> getAll() throws IOExc
list.forEach(obj -> {
try {
LinkedTreeMap<?, ?> treeMap = (LinkedTreeMap<?, ?>) obj;
final String currencyCode = (String) treeMap.get("currencyCode");
final double price = (Double) treeMap.get("price");
String currencyCode = (String) treeMap.get("currencyCode");
double price = (Double) treeMap.get("price");
// json uses double for our timestampSec long value...
final long timestampSec = MathUtils.doubleToLong((Double) treeMap.get("timestampSec"));
long timestampSec = MathUtils.doubleToLong((Double) treeMap.get("timestampSec"));
marketPriceMap.put(currencyCode, new MarketPrice(currencyCode, price, timestampSec, true));
} catch (Throwable t) {
log.error(t.toString());
Expand All @@ -83,4 +89,9 @@ public Tuple2<Map<String, Long>, Map<String, MarketPrice>> getAll() throws IOExc
public String getBaseUrl() {
return httpClient.getBaseUrl();
}

public void shutDown() {
shutDownRequested = true;
httpClient.shutDown();
}
}
39 changes: 33 additions & 6 deletions core/src/main/java/bisq/core/provider/price/PriceRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,64 @@
import com.google.common.util.concurrent.SettableFuture;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@Slf4j
public class PriceRequest {
private static final ListeningExecutorService executorService = Utilities.getListeningExecutorService("PriceRequest", 3, 5, 10 * 60);
@Nullable
private PriceProvider provider;
private boolean shutDownRequested;

public PriceRequest() {
}

public SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> requestAllPrices(PriceProvider provider) {
final String baseUrl = provider.getBaseUrl();
final SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> resultFuture = SettableFuture.create();
this.provider = provider;
String baseUrl = provider.getBaseUrl();
SettableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> resultFuture = SettableFuture.create();
ListenableFuture<Tuple2<Map<String, Long>, Map<String, MarketPrice>>> future = executorService.submit(() -> {
Thread.currentThread().setName("PriceRequest-" + provider.getBaseUrl());
Thread.currentThread().setName("PriceRequest-" + baseUrl);
return provider.getAll();
});

Futures.addCallback(future, new FutureCallback<Tuple2<Map<String, Long>, Map<String, MarketPrice>>>() {
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Tuple2<Map<String, Long>, Map<String, MarketPrice>> marketPriceTuple) {
log.trace("Received marketPriceTuple of {}\nfrom provider {}", marketPriceTuple, provider);
resultFuture.set(marketPriceTuple);
if (!shutDownRequested) {
resultFuture.set(marketPriceTuple);
}

}

public void onFailure(@NotNull Throwable throwable) {
resultFuture.setException(new PriceRequestException(throwable, baseUrl));
if (!shutDownRequested) {
resultFuture.setException(new PriceRequestException(throwable, baseUrl));
}
}
}, MoreExecutors.directExecutor());

return resultFuture;
}

public void shutDown() {
shutDownRequested = true;
if (provider != null) {
provider.shutDown();
}

executorService.shutdown();
try {
if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}
2 changes: 2 additions & 0 deletions p2p/src/main/java/bisq/network/http/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ String requestWithGETNoProxy(String param,
String getUid();

String getBaseUrl();

void shutDown();
}
27 changes: 24 additions & 3 deletions p2p/src/main/java/bisq/network/http/HttpClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public class HttpClientImpl implements HttpClient {
private String baseUrl;
private boolean ignoreSocks5Proxy;
private final String uid;
@Nullable
private HttpURLConnection connection;
@Nullable
private CloseableHttpClient httpclient;

@Inject
public HttpClientImpl(@Nullable Socks5ProxyProvider socks5ProxyProvider) {
Expand All @@ -76,6 +80,19 @@ public HttpClientImpl(String baseUrl) {
uid = UUID.randomUUID().toString();
}

@Override
public void shutDown() {
if (connection != null) {
connection.disconnect();
}
if (httpclient != null) {
try {
httpclient.close();
} catch (IOException ignore) {
}
}
}

@Override
public void setBaseUrl(String baseUrl) {
this.baseUrl = baseUrl;
Expand Down Expand Up @@ -117,7 +134,6 @@ public String requestWithGET(String param,
public String requestWithGETNoProxy(String param,
@Nullable String headerKey,
@Nullable String headerValue) throws IOException {
HttpURLConnection connection = null;
log.debug("Executing HTTP request " + baseUrl + param + " proxy: none.");
URL url = new URL(baseUrl + param);
try {
Expand Down Expand Up @@ -177,7 +193,8 @@ private String doRequestWithGETProxy(String param,
PoolingHttpClientConnectionManager cm = socks5Proxy.resolveAddrLocally() ?
new PoolingHttpClientConnectionManager(reg) :
new PoolingHttpClientConnectionManager(reg, new FakeDnsResolver());
try (CloseableHttpClient httpclient = HttpClients.custom().setConnectionManager(cm).build()) {
try {
httpclient = HttpClients.custom().setConnectionManager(cm).build();
InetSocketAddress socksAddress = new InetSocketAddress(socks5Proxy.getInetAddress(), socks5Proxy.getPort());

// remove me: Use this to test with system-wide Tor proxy, or change port for another proxy.
Expand All @@ -191,11 +208,15 @@ private String doRequestWithGETProxy(String param,
request.setHeader(headerKey, headerValue);

log.debug("Executing request " + request + " proxy: " + socksAddress);
try (CloseableHttpResponse response = httpclient.execute(request, context)) {
try (CloseableHttpResponse response = checkNotNull(httpclient).execute(request, context)) {
return convertInputStreamToString(response.getEntity().getContent());
}
} catch (Throwable t) {
throw new IOException("Error at requestWithGETProxy with URL: " + (baseUrl + param) + ". Throwable=" + t.getMessage());
} finally {
if (httpclient != null) {
httpclient.close();
}
}
}

Expand Down