Skip to content

Commit

Permalink
feat(all): tune single Thread into SingleThreadExecutor (#5410)
Browse files Browse the repository at this point in the history
  • Loading branch information
halibobo1205 authored Aug 15, 2023
1 parent f33c169 commit a9c4f43
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.tron.common.es;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;

@Slf4j(topic = "common")
public class ExecutorServiceManager {

public static ExecutorService newSingleThreadExecutor(String name) {
return newSingleThreadExecutor(name, false);
}

public static ExecutorService newSingleThreadExecutor(String name, boolean isDaemon) {
return Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
}


public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name) {
return newSingleThreadScheduledExecutor(name, false);
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name,
boolean isDaemon) {
return Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(name).setDaemon(isDaemon).build());
}

public static void shutdownAndAwaitTermination(ExecutorService pool, String name) {
if (pool == null) {
return;
}
logger.info("Pool {} shutdown...", name);
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
logger.warn("Pool {} did not terminate", name);
}
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
logger.info("Pool {} shutdown done", name);
}
}
20 changes: 10 additions & 10 deletions consensus/src/main/java/org/tron/consensus/dpos/DposTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import static org.tron.core.config.Parameter.ChainConstant.BLOCK_PRODUCED_INTERVAL;

import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutorService;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.util.ObjectUtils;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.Sha256Hash;
Expand All @@ -34,16 +36,18 @@ public class DposTask {
@Setter
private DposService dposService;

private Thread produceThread;
private ExecutorService produceExecutor;

private final String name = "DPosMiner";

private volatile boolean isRunning = true;

public void init() {

if (!dposService.isEnable() || StringUtils.isEmpty(dposService.getMiners())) {
if (!dposService.isEnable() || ObjectUtils.isEmpty(dposService.getMiners())) {
return;
}

produceExecutor = ExecutorServiceManager.newSingleThreadExecutor(name);
Runnable runnable = () -> {
while (isRunning) {
try {
Expand All @@ -67,17 +71,13 @@ public void init() {
}
}
};
produceThread = new Thread(runnable, "DPosMiner");
produceThread.start();
produceExecutor.submit(runnable);
logger.info("DPoS task started.");
}

public void stop() {
isRunning = false;
if (produceThread != null) {
produceThread.interrupt();
}
logger.info("DPoS task stopped.");
ExecutorServiceManager.shutdownAndAwaitTermination(produceExecutor, name);
}

private State produceBlock() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.backup.BackupManager;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.p2p.stats.TrafficStats;

Expand All @@ -29,20 +31,24 @@ public class BackupServer {

private volatile boolean shutdown = false;

private final String name = "BackupServer";
private ExecutorService executor;

@Autowired
public BackupServer(final BackupManager backupManager) {
this.backupManager = backupManager;
}

public void initServer() {
if (port > 0 && commonParameter.getBackupMembers().size() > 0) {
new Thread(() -> {
executor = ExecutorServiceManager.newSingleThreadExecutor(name);
executor.submit(() -> {
try {
start();
} catch (Exception e) {
logger.error("Start backup server failed, {}", e);
}
}, "BackupServer").start();
});
}
}

Expand Down Expand Up @@ -88,12 +94,14 @@ public void initChannel(NioDatagramChannel ch)
public void close() {
logger.info("Closing backup server...");
shutdown = true;
ExecutorServiceManager.shutdownAndAwaitTermination(executor, name);
if (channel != null) {
try {
channel.close().await(10, TimeUnit.SECONDS);
} catch (Exception e) {
logger.warn("Closing backup server failed.", e);
}
}
logger.info("Backup server closed.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public class TronLogShutdownHook extends ShutdownHookBase {
private static final Duration CHECK_SHUTDOWN_DELAY = Duration.buildByMilliseconds(100);

/**
* The check times before shutdown. default is 50
* The check times before shutdown. default is 60000/100 = 600 times.
*/
private Integer check_times = 50;
private final long check_times = 60 * 1000 / CHECK_SHUTDOWN_DELAY.getMilliseconds();

public TronLogShutdownHook() {
}
Expand Down
25 changes: 17 additions & 8 deletions framework/src/main/java/org/tron/core/db/Manager.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.tron.api.GrpcAPI.TransactionInfoList;
import org.tron.common.args.GenesisBlock;
import org.tron.common.bloom.Bloom;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.logsfilter.EventPluginLoader;
import org.tron.common.logsfilter.FilterQuery;
import org.tron.common.logsfilter.capsule.BlockFilterCapsule;
Expand Down Expand Up @@ -253,6 +254,13 @@ public class Manager {
private AtomicInteger blockWaitLock = new AtomicInteger(0);
private Object transactionLock = new Object();

private ExecutorService rePushEs;
private static final String rePushEsName = "repush";
private ExecutorService triggerEs;
private static final String triggerEsName = "event-trigger";
private ExecutorService filterEs;
private static final String filterEsName = "filter";

/**
* Cycle thread to rePush Transactions
*/
Expand Down Expand Up @@ -429,14 +437,17 @@ public BlockingQueue<TransactionCapsule> getRePushTransactions() {

public void stopRePushThread() {
isRunRePushThread = false;
ExecutorServiceManager.shutdownAndAwaitTermination(rePushEs, rePushEsName);
}

public void stopRePushTriggerThread() {
isRunTriggerCapsuleProcessThread = false;
ExecutorServiceManager.shutdownAndAwaitTermination(triggerEs, triggerEsName);
}

public void stopFilterProcessThread() {
isRunFilterProcessThread = false;
ExecutorServiceManager.shutdownAndAwaitTermination(filterEs, filterEsName);
}

@PostConstruct
Expand Down Expand Up @@ -524,21 +535,19 @@ public void init() {
revokingStore.enable();
validateSignService = Executors
.newFixedThreadPool(Args.getInstance().getValidateSignThreadNum());
Thread rePushThread = new Thread(rePushLoop);
rePushThread.setDaemon(true);
rePushThread.start();
rePushEs = ExecutorServiceManager.newSingleThreadExecutor(rePushEsName, true);
rePushEs.submit(rePushLoop);
// add contract event listener for subscribing
if (Args.getInstance().isEventSubscribe()) {
startEventSubscribing();
Thread triggerCapsuleProcessThread = new Thread(triggerCapsuleProcessLoop);
triggerCapsuleProcessThread.setDaemon(true);
triggerCapsuleProcessThread.start();
triggerEs = ExecutorServiceManager.newSingleThreadExecutor(triggerEsName, true);
triggerEs.submit(triggerCapsuleProcessLoop);
}

// start json rpc filter process
if (CommonParameter.getInstance().isJsonRpcFilterEnabled()) {
Thread filterProcessThread = new Thread(filterProcessLoop);
filterProcessThread.start();
filterEs = ExecutorServiceManager.newSingleThreadExecutor(filterEsName);
filterEs.submit(filterProcessLoop);
}

//initStoreFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.tron.common.es.ExecutorServiceManager;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.utils.ByteArray;
import org.tron.common.utils.JsonUtil;
Expand All @@ -27,28 +27,22 @@ public class NodePersistService {
private final boolean isNodePersist = CommonParameter.getInstance().isNodeDiscoveryPersist();
@Autowired
private CommonStore commonStore;
private Timer nodePersistTaskTimer;

private ScheduledExecutorService nodePersistExecutor;

private final String name = "NodePersistTask";

public void init() {
if (isNodePersist) {
nodePersistTaskTimer = new Timer("NodePersistTaskTimer");
nodePersistTaskTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
dbWrite();
}
}, DB_COMMIT_RATE, DB_COMMIT_RATE);
nodePersistExecutor = ExecutorServiceManager.newSingleThreadScheduledExecutor(name);
nodePersistExecutor.scheduleAtFixedRate(this::dbWrite, DB_COMMIT_RATE, DB_COMMIT_RATE,
TimeUnit.MILLISECONDS);
}
}

public void close() {
if (Objects.isNull(nodePersistTaskTimer)) {
return;
}
try {
nodePersistTaskTimer.cancel();
} catch (Exception e) {
logger.error("Close nodePersistTaskTimer failed", e);
if (isNodePersist) {
ExecutorServiceManager.shutdownAndAwaitTermination(nodePersistExecutor, name);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.tron.common.backup;

import java.util.ArrayList;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.tron.common.backup.socket.BackupServer;
import org.tron.common.parameter.CommonParameter;
import org.tron.core.Constant;
import org.tron.core.config.args.Args;


public class BackupServerTest {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private BackupServer backupServer;

@Before
public void setUp() throws Exception {
Args.setParam(new String[]{"-d", temporaryFolder.newFolder().toString()}, Constant.TEST_CONF);
CommonParameter.getInstance().setBackupPort(80);
List<String> members = new ArrayList<>();
members.add("127.0.0.2");
CommonParameter.getInstance().setBackupMembers(members);
BackupManager backupManager = new BackupManager();
backupManager.init();
backupServer = new BackupServer(backupManager);
}

@After
public void tearDown() {
backupServer.close();
Args.clearParam();
}

@Test
public void test() throws InterruptedException {
backupServer.initServer();
}
}

0 comments on commit a9c4f43

Please sign in to comment.