diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ContextGremlinServer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ContextGremlinServer.java index 1d62def94c..271d599343 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ContextGremlinServer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/ContextGremlinServer.java @@ -33,6 +33,7 @@ import com.baidu.hugegraph.HugeGraph; import com.baidu.hugegraph.auth.HugeGraphAuthProxy.Context; import com.baidu.hugegraph.auth.HugeGraphAuthProxy.ContextThreadPoolExecutor; +import com.baidu.hugegraph.config.CoreOptions; /** * GremlinServer with custom ServerGremlinExecutor, which can pass Context @@ -76,7 +77,7 @@ public void injectTraversalSource(String prefix) { static ExecutorService newGremlinExecutorService(Settings settings) { if (settings.gremlinPool == 0) { - settings.gremlinPool = Runtime.getRuntime().availableProcessors(); + settings.gremlinPool = CoreOptions.CPUS; } int size = settings.gremlinPool; ThreadFactory factory = ThreadFactoryUtil.create("exec-%d"); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java index 8b41ea3345..b15b0030ca 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java @@ -71,7 +71,7 @@ public static synchronized ServerOptions instance() { "restserver.max_worker_threads", "The maxmium worker threads of rest server.", rangeInt(2, Integer.MAX_VALUE), - 2 * Runtime.getRuntime().availableProcessors() + 2 * CoreOptions.CPUS ); public static final ConfigOption MIN_FREE_MEMORY = @@ -132,7 +132,7 @@ public static synchronized ServerOptions instance() { "gremlinserver.max_route", "The max route number for gremlin server.", positiveInt(), - 2 * Runtime.getRuntime().availableProcessors() + 2 * CoreOptions.CPUS ); public static final ConfigListOption GRAPHS = diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/license/LicenseVerifyManager.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/license/LicenseVerifyManager.java index 5e0d9e67a5..881abfebda 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/license/LicenseVerifyManager.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/license/LicenseVerifyManager.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import com.baidu.hugegraph.HugeException; +import com.baidu.hugegraph.config.CoreOptions; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.config.ServerOptions; import com.baidu.hugegraph.core.GraphManager; @@ -218,7 +219,7 @@ private void checkCpu(ExtraParam param) { if (expectCpus == NO_LIMIT) { return; } - int actualCpus = Runtime.getRuntime().availableProcessors(); + int actualCpus = CoreOptions.CPUS; if (actualCpus > expectCpus) { throw newLicenseException( "The server's cpus '%s' exceeded the limit '%s'", diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java index 9a7b3ee978..5fbbdf10d2 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java @@ -285,6 +285,16 @@ public void onError(PeerId peer, Status status) { } } + // NOTE: Jraft itself doesn't have this callback, it's added by us + public void onBusy(PeerId peer, Status status) { + /* + * If follower is busy then increase busy counter, + * it will lead to submit thread wait more time + */ + int count = RaftNode.this.busyCounter.incrementAndGet(); + LOG.info("Increase busy counter: [{}]", count); + } + private boolean isWriteBufferOverflow(Status status) { String expectMsg = "maybe write overflow"; return RaftError.EINTERNAL == status.getRaftError() && diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index e4c7f44385..92fa81a9e3 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.file.Paths; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -73,6 +74,7 @@ public final class RaftSharedContext { public static final int WAIT_RPC_TIMEOUT = 30 * 60 * 1000; // compress block size public static final int BLOCK_SIZE = 8192; + public static final int QUEUE_SIZE = CoreOptions.CPUS; public static final String DEFAULT_GROUP = "default"; @@ -362,7 +364,7 @@ private ExecutorService createBackendExecutor(int threads) { private static ExecutorService newPool(int coreThreads, int maxThreads, String name, RejectedExecutionHandler handler) { - BlockingQueue workQueue = new LinkedBlockingQueue<>(); + BlockingQueue workQueue = new ArrayBlockingQueue<>(QUEUE_SIZE); return ThreadPoolUtil.newBuilder() .poolName(name) .enableMetric(false) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index 2b8981015c..e450a68979 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Future; import org.slf4j.Logger; @@ -103,6 +104,7 @@ public void onApply(Iterator iter) { LOG.debug("Node role: {}", this.node().selfIsLeader() ? "leader" : "follower"); StoreClosure closure = null; + List> futures = new ArrayList<>(); try { while (iter.hasNext()) { closure = (StoreClosure) iter.done(); @@ -122,9 +124,8 @@ public void onApply(Iterator iter) { } else { // Follower need readMutation data byte[] bytes = iter.getData().array(); - // Follower seems no way to wait future // Let the backend thread do it directly - this.context.backendExecutor().submit(() -> { + futures.add(this.context.backendExecutor().submit(() -> { BytesBuffer buffer = LZ4Util.decompress(bytes, RaftSharedContext.BLOCK_SIZE); buffer.forReadWritten(); @@ -137,10 +138,14 @@ public void onApply(Iterator iter) { action, e); throw new BackendException("Backend error", e); } - }); + })); } iter.next(); } + // Follower wait tasks finished + for (Future future : futures) { + future.get(); + } } catch (Throwable e) { LOG.error("StateMachine occured critical error", e); Status status = new Status(RaftError.ESTATEMACHINE, @@ -150,6 +155,7 @@ public void onApply(Iterator iter) { closure.failure(status, e); } // Will cause current node inactive + // TODO: rollback to correct index iter.setErrorAndRollback(1L, status); } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java index 9d87993c08..24fcf606ae 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/config/CoreOptions.java @@ -30,14 +30,14 @@ public class CoreOptions extends OptionHolder { + public static final int CPUS = Runtime.getRuntime().availableProcessors(); + private CoreOptions() { super(); } private static volatile CoreOptions instance; - private static final int CPUS = Runtime.getRuntime().availableProcessors(); - public static synchronized CoreOptions instance() { if (instance == null) { instance = new CoreOptions(); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/util/Consumers.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/util/Consumers.java index 39d43a5627..69e96ec43c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/util/Consumers.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/util/Consumers.java @@ -37,12 +37,12 @@ import org.slf4j.Logger; import com.baidu.hugegraph.HugeException; +import com.baidu.hugegraph.config.CoreOptions; import com.baidu.hugegraph.task.TaskManager.ContextCallable; public final class Consumers { - public static final int CPUS = Runtime.getRuntime().availableProcessors(); - public static final int THREADS = 4 + CPUS / 4; + public static final int THREADS = 4 + CoreOptions.CPUS / 4; public static final int QUEUE_WORKER_SIZE = 1000; public static final long CONSUMER_WAKE_PERIOD = 1; @@ -240,8 +240,8 @@ public static ExecutorService newThreadPool(String prefix, int workers) { if (workers < 0) { assert workers == -1; workers = Consumers.THREADS; - } else if (workers > Consumers.CPUS * 2) { - workers = Consumers.CPUS * 2; + } else if (workers > CoreOptions.CPUS * 2) { + workers = CoreOptions.CPUS * 2; } String name = prefix + "-worker-%d"; return ExecutorUtil.newFixedThreadPool(workers, name); @@ -262,7 +262,7 @@ public static RuntimeException wrapException(Throwable e) { public static class ExecutorPool { - private final static int POOL_CAPACITY = 2 * CPUS; + private final static int POOL_CAPACITY = 2 * CoreOptions.CPUS; private final String threadNamePrefix; private final int executorWorkers; diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java index 3e98cda349..c8dbfbcbcb 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java @@ -493,7 +493,7 @@ public static void initOptions(HugeConfig conf, // Optimize RocksDB if (optimize) { - int processors = Runtime.getRuntime().availableProcessors(); + int processors = CoreOptions.CPUS; db.setIncreaseParallelism(Math.max(processors / 2, 1)); db.setAllowConcurrentMemtableWrite(true);