Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a callback 'onBusy' used to adaptive rate limit #1401

Merged
merged 4 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.auth.HugeGraphAuthProxy.Context;
import com.baidu.hugegraph.auth.HugeGraphAuthProxy.ContextThreadPoolExecutor;
import com.baidu.hugegraph.config.CoreOptions;

/**
* GremlinServer with custom ServerGremlinExecutor, which can pass Context
Expand Down Expand Up @@ -76,7 +77,7 @@ public void injectTraversalSource(String prefix) {

static ExecutorService newGremlinExecutorService(Settings settings) {
if (settings.gremlinPool == 0) {
settings.gremlinPool = Runtime.getRuntime().availableProcessors();
settings.gremlinPool = CoreOptions.CPUS;
}
int size = settings.gremlinPool;
ThreadFactory factory = ThreadFactoryUtil.create("exec-%d");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static synchronized ServerOptions instance() {
"restserver.max_worker_threads",
"The maxmium worker threads of rest server.",
rangeInt(2, Integer.MAX_VALUE),
2 * Runtime.getRuntime().availableProcessors()
2 * CoreOptions.CPUS
);

public static final ConfigOption<Integer> MIN_FREE_MEMORY =
Expand Down Expand Up @@ -132,7 +132,7 @@ public static synchronized ServerOptions instance() {
"gremlinserver.max_route",
"The max route number for gremlin server.",
positiveInt(),
2 * Runtime.getRuntime().availableProcessors()
2 * CoreOptions.CPUS
);

public static final ConfigListOption<String> GRAPHS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.ServerOptions;
import com.baidu.hugegraph.core.GraphManager;
Expand Down Expand Up @@ -218,7 +219,7 @@ private void checkCpu(ExtraParam param) {
if (expectCpus == NO_LIMIT) {
return;
}
int actualCpus = Runtime.getRuntime().availableProcessors();
int actualCpus = CoreOptions.CPUS;
if (actualCpus > expectCpus) {
throw newLicenseException(
"The server's cpus '%s' exceeded the limit '%s'",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,16 @@ public void onError(PeerId peer, Status status) {
}
}

// NOTE: Jraft itself doesn't have this callback, it's added by us
public void onBusy(PeerId peer, Status status) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seperate into 2 commits:

  1. raft commit for: raft/* and raft-tools.sh
  2. task commit for: TaskManager and rest-server.properties

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/*
* If follower is busy then increase busy counter,
* it will lead to submit thread wait more time
*/
int count = RaftNode.this.busyCounter.incrementAndGet();
LOG.info("Increase busy counter: [{}]", count);
}

private boolean isWriteBufferOverflow(Status status) {
String expectMsg = "maybe write overflow";
return RaftError.EINTERNAL == status.getRaftError() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -73,6 +74,7 @@ public final class RaftSharedContext {
public static final int WAIT_RPC_TIMEOUT = 30 * 60 * 1000;
// compress block size
public static final int BLOCK_SIZE = 8192;
public static final int QUEUE_SIZE = CoreOptions.CPUS;

public static final String DEFAULT_GROUP = "default";

Expand Down Expand Up @@ -362,7 +364,7 @@ private ExecutorService createBackendExecutor(int threads) {
private static ExecutorService newPool(int coreThreads, int maxThreads,
String name,
RejectedExecutionHandler handler) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
return ThreadPoolUtil.newBuilder()
.poolName(name)
.enableMetric(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

import org.slf4j.Logger;

Expand Down Expand Up @@ -103,6 +104,7 @@ public void onApply(Iterator iter) {
LOG.debug("Node role: {}", this.node().selfIsLeader() ?
"leader" : "follower");
StoreClosure closure = null;
List<Future<?>> futures = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move to line 138

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 156 need use it

try {
while (iter.hasNext()) {
closure = (StoreClosure) iter.done();
Expand All @@ -122,9 +124,8 @@ public void onApply(Iterator iter) {
} else {
// Follower need readMutation data
byte[] bytes = iter.getData().array();
// Follower seems no way to wait future
// Let the backend thread do it directly
this.context.backendExecutor().submit(() -> {
futures.add(this.context.backendExecutor().submit(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how many tasks in general?

Copy link
Contributor Author

@Linary Linary Apr 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

about 10 ~ 20

BytesBuffer buffer = LZ4Util.decompress(bytes,
RaftSharedContext.BLOCK_SIZE);
buffer.forReadWritten();
Expand All @@ -137,10 +138,14 @@ public void onApply(Iterator iter) {
action, e);
throw new BackendException("Backend error", e);
}
});
}));
}
iter.next();
}
// Follower wait tasks finished
for (Future<?> future : futures) {
future.get();
}
} catch (Throwable e) {
LOG.error("StateMachine occured critical error", e);
Status status = new Status(RaftError.ESTATEMACHINE,
Expand All @@ -150,6 +155,7 @@ public void onApply(Iterator iter) {
closure.failure(status, e);
}
// Will cause current node inactive
// TODO: rollback to correct index
iter.setErrorAndRollback(1L, status);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@

public class CoreOptions extends OptionHolder {

public static final int CPUS = Runtime.getRuntime().availableProcessors();

private CoreOptions() {
super();
}

private static volatile CoreOptions instance;

private static final int CPUS = Runtime.getRuntime().availableProcessors();

public static synchronized CoreOptions instance() {
if (instance == null) {
instance = new CoreOptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import org.slf4j.Logger;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.task.TaskManager.ContextCallable;

public final class Consumers<V> {

public static final int CPUS = Runtime.getRuntime().availableProcessors();
public static final int THREADS = 4 + CPUS / 4;
public static final int THREADS = 4 + CoreOptions.CPUS / 4;
public static final int QUEUE_WORKER_SIZE = 1000;
public static final long CONSUMER_WAKE_PERIOD = 1;

Expand Down Expand Up @@ -240,8 +240,8 @@ public static ExecutorService newThreadPool(String prefix, int workers) {
if (workers < 0) {
assert workers == -1;
workers = Consumers.THREADS;
} else if (workers > Consumers.CPUS * 2) {
workers = Consumers.CPUS * 2;
} else if (workers > CoreOptions.CPUS * 2) {
workers = CoreOptions.CPUS * 2;
}
String name = prefix + "-worker-%d";
return ExecutorUtil.newFixedThreadPool(workers, name);
Expand All @@ -262,7 +262,7 @@ public static RuntimeException wrapException(Throwable e) {

public static class ExecutorPool {

private final static int POOL_CAPACITY = 2 * CPUS;
private final static int POOL_CAPACITY = 2 * CoreOptions.CPUS;

private final String threadNamePrefix;
private final int executorWorkers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public static void initOptions(HugeConfig conf,

// Optimize RocksDB
if (optimize) {
int processors = Runtime.getRuntime().availableProcessors();
int processors = CoreOptions.CPUS;
db.setIncreaseParallelism(Math.max(processors / 2, 1));

db.setAllowConcurrentMemtableWrite(true);
Expand Down