From 2bc2ab864caa08624b3774d14d824c23397cbbee Mon Sep 17 00:00:00 2001 From: liningrui Date: Thu, 8 Apr 2021 17:59:47 +0800 Subject: [PATCH] Add a callback 'onBusy' used to adaptive rate limit Change-Id: I2a1139f6e436744ee6b20557ec18f2181b2e63be --- .../baidu/hugegraph/backend/store/raft/RaftNode.java | 7 +++++++ .../backend/store/raft/RaftSharedContext.java | 4 ++-- .../backend/store/raft/StoreStateMachine.java | 12 ++++++++++-- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java index 9a7b3ee978..7c64e1fef0 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java @@ -285,6 +285,13 @@ public void onError(PeerId peer, Status status) { } } + // NOTE: Jraft itself doesn't have this callback, it's added by us + // @Override + public void onBusy(PeerId peer, Status status) { + int count = RaftNode.this.busyCounter.incrementAndGet(); + LOG.info("Increase busy counter: [{}]", count); + } + private boolean isWriteBufferOverflow(Status status) { String expectMsg = "maybe write overflow"; return RaftError.EINTERNAL == status.getRaftError() && diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index e4c7f44385..56ee6c0548 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -25,9 +25,9 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.io.FileUtils; @@ -362,7 +362,7 @@ private ExecutorService createBackendExecutor(int threads) { private static ExecutorService newPool(int coreThreads, int maxThreads, String name, RejectedExecutionHandler handler) { - BlockingQueue workQueue = new LinkedBlockingQueue<>(); + BlockingQueue workQueue = new SynchronousQueue<>(); return ThreadPoolUtil.newBuilder() .poolName(name) .enableMetric(false) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index 2b8981015c..07caa54fa9 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Future; import org.slf4j.Logger; @@ -103,6 +104,7 @@ public void onApply(Iterator iter) { LOG.debug("Node role: {}", this.node().selfIsLeader() ? "leader" : "follower"); StoreClosure closure = null; + List> futures = new ArrayList<>(); try { while (iter.hasNext()) { closure = (StoreClosure) iter.done(); @@ -124,7 +126,7 @@ public void onApply(Iterator iter) { byte[] bytes = iter.getData().array(); // Follower seems no way to wait future // Let the backend thread do it directly - this.context.backendExecutor().submit(() -> { + futures.add(this.context.backendExecutor().submit(() -> { BytesBuffer buffer = LZ4Util.decompress(bytes, RaftSharedContext.BLOCK_SIZE); buffer.forReadWritten(); @@ -137,10 +139,14 @@ public void onApply(Iterator iter) { action, e); throw new BackendException("Backend error", e); } - }); + })); } iter.next(); } + // Follower wait tasks finished + for (Future future : futures) { + future.get(); + } } catch (Throwable e) { LOG.error("StateMachine occured critical error", e); Status status = new Status(RaftError.ESTATEMACHINE, @@ -150,6 +156,7 @@ public void onApply(Iterator iter) { closure.failure(status, e); } // Will cause current node inactive + // TODO: rollback to correct index iter.setErrorAndRollback(1L, status); } } @@ -253,6 +260,7 @@ public void onConfigurationCommitted(Configuration conf) { @Override public void onError(final RaftException e) { + // If busy, spin and wait a moment LOG.error("Raft error: {}", e.getMessage(), e); } }