Skip to content

Commit

Permalink
Add a callback 'onBusy' used to adaptive rate limit (#1401)
Browse files Browse the repository at this point in the history
* Add a callback 'onBusy' used to adaptive rate limit

Change-Id: I2a1139f6e436744ee6b20557ec18f2181b2e63be
  • Loading branch information
Linary authored Apr 14, 2021
1 parent 39b9474 commit ec974d0
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.auth.HugeGraphAuthProxy.Context;
import com.baidu.hugegraph.auth.HugeGraphAuthProxy.ContextThreadPoolExecutor;
import com.baidu.hugegraph.config.CoreOptions;

/**
* GremlinServer with custom ServerGremlinExecutor, which can pass Context
Expand Down Expand Up @@ -76,7 +77,7 @@ public void injectTraversalSource(String prefix) {

static ExecutorService newGremlinExecutorService(Settings settings) {
if (settings.gremlinPool == 0) {
settings.gremlinPool = Runtime.getRuntime().availableProcessors();
settings.gremlinPool = CoreOptions.CPUS;
}
int size = settings.gremlinPool;
ThreadFactory factory = ThreadFactoryUtil.create("exec-%d");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static synchronized ServerOptions instance() {
"restserver.max_worker_threads",
"The maxmium worker threads of rest server.",
rangeInt(2, Integer.MAX_VALUE),
2 * Runtime.getRuntime().availableProcessors()
2 * CoreOptions.CPUS
);

public static final ConfigOption<Integer> MIN_FREE_MEMORY =
Expand Down Expand Up @@ -132,7 +132,7 @@ public static synchronized ServerOptions instance() {
"gremlinserver.max_route",
"The max route number for gremlin server.",
positiveInt(),
2 * Runtime.getRuntime().availableProcessors()
2 * CoreOptions.CPUS
);

public static final ConfigListOption<String> GRAPHS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.ServerOptions;
import com.baidu.hugegraph.core.GraphManager;
Expand Down Expand Up @@ -218,7 +219,7 @@ private void checkCpu(ExtraParam param) {
if (expectCpus == NO_LIMIT) {
return;
}
int actualCpus = Runtime.getRuntime().availableProcessors();
int actualCpus = CoreOptions.CPUS;
if (actualCpus > expectCpus) {
throw newLicenseException(
"The server's cpus '%s' exceeded the limit '%s'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,16 @@ public void onError(PeerId peer, Status status) {
}
}

// NOTE: Jraft itself doesn't have this callback, it's added by us
public void onBusy(PeerId peer, Status status) {
/*
* If follower is busy then increase busy counter,
* it will lead to submit thread wait more time
*/
int count = RaftNode.this.busyCounter.incrementAndGet();
LOG.info("Increase busy counter: [{}]", count);
}

private boolean isWriteBufferOverflow(Status status) {
String expectMsg = "maybe write overflow";
return RaftError.EINTERNAL == status.getRaftError() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -73,6 +74,7 @@ public final class RaftSharedContext {
public static final int WAIT_RPC_TIMEOUT = 30 * 60 * 1000;
// compress block size
public static final int BLOCK_SIZE = 8192;
public static final int QUEUE_SIZE = CoreOptions.CPUS;

public static final String DEFAULT_GROUP = "default";

Expand Down Expand Up @@ -362,7 +364,7 @@ private ExecutorService createBackendExecutor(int threads) {
private static ExecutorService newPool(int coreThreads, int maxThreads,
String name,
RejectedExecutionHandler handler) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
return ThreadPoolUtil.newBuilder()
.poolName(name)
.enableMetric(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

import org.slf4j.Logger;

Expand Down Expand Up @@ -103,6 +104,7 @@ public void onApply(Iterator iter) {
LOG.debug("Node role: {}", this.node().selfIsLeader() ?
"leader" : "follower");
StoreClosure closure = null;
List<Future<?>> futures = new ArrayList<>();
try {
while (iter.hasNext()) {
closure = (StoreClosure) iter.done();
Expand All @@ -122,9 +124,8 @@ public void onApply(Iterator iter) {
} else {
// Follower need readMutation data
byte[] bytes = iter.getData().array();
// Follower seems no way to wait future
// Let the backend thread do it directly
this.context.backendExecutor().submit(() -> {
futures.add(this.context.backendExecutor().submit(() -> {
BytesBuffer buffer = LZ4Util.decompress(bytes,
RaftSharedContext.BLOCK_SIZE);
buffer.forReadWritten();
Expand All @@ -137,10 +138,14 @@ public void onApply(Iterator iter) {
action, e);
throw new BackendException("Backend error", e);
}
});
}));
}
iter.next();
}
// Follower wait tasks finished
for (Future<?> future : futures) {
future.get();
}
} catch (Throwable e) {
LOG.error("StateMachine occured critical error", e);
Status status = new Status(RaftError.ESTATEMACHINE,
Expand All @@ -150,6 +155,7 @@ public void onApply(Iterator iter) {
closure.failure(status, e);
}
// Will cause current node inactive
// TODO: rollback to correct index
iter.setErrorAndRollback(1L, status);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@

public class CoreOptions extends OptionHolder {

public static final int CPUS = Runtime.getRuntime().availableProcessors();

private CoreOptions() {
super();
}

private static volatile CoreOptions instance;

private static final int CPUS = Runtime.getRuntime().availableProcessors();

public static synchronized CoreOptions instance() {
if (instance == null) {
instance = new CoreOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import org.slf4j.Logger;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.task.TaskManager.ContextCallable;

public final class Consumers<V> {

public static final int CPUS = Runtime.getRuntime().availableProcessors();
public static final int THREADS = 4 + CPUS / 4;
public static final int THREADS = 4 + CoreOptions.CPUS / 4;
public static final int QUEUE_WORKER_SIZE = 1000;
public static final long CONSUMER_WAKE_PERIOD = 1;

Expand Down Expand Up @@ -240,8 +240,8 @@ public static ExecutorService newThreadPool(String prefix, int workers) {
if (workers < 0) {
assert workers == -1;
workers = Consumers.THREADS;
} else if (workers > Consumers.CPUS * 2) {
workers = Consumers.CPUS * 2;
} else if (workers > CoreOptions.CPUS * 2) {
workers = CoreOptions.CPUS * 2;
}
String name = prefix + "-worker-%d";
return ExecutorUtil.newFixedThreadPool(workers, name);
Expand All @@ -262,7 +262,7 @@ public static RuntimeException wrapException(Throwable e) {

public static class ExecutorPool {

private final static int POOL_CAPACITY = 2 * CPUS;
private final static int POOL_CAPACITY = 2 * CoreOptions.CPUS;

private final String threadNamePrefix;
private final int executorWorkers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public static void initOptions(HugeConfig conf,

// Optimize RocksDB
if (optimize) {
int processors = Runtime.getRuntime().availableProcessors();
int processors = CoreOptions.CPUS;
db.setIncreaseParallelism(Math.max(processors / 2, 1));

db.setAllowConcurrentMemtableWrite(true);
Expand Down

0 comments on commit ec974d0

Please sign in to comment.