Skip to content

Commit

Permalink
Add a callback 'onBusy' used to adaptive rate limit
Browse files Browse the repository at this point in the history
Change-Id: I2a1139f6e436744ee6b20557ec18f2181b2e63be
  • Loading branch information
Linary committed Apr 8, 2021
1 parent 39b9474 commit 2bc2ab8
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ public void onError(PeerId peer, Status status) {
}
}

// NOTE: Jraft itself doesn't have this callback, it's added by us
// @Override
public void onBusy(PeerId peer, Status status) {
int count = RaftNode.this.busyCounter.incrementAndGet();
LOG.info("Increase busy counter: [{}]", count);
}

private boolean isWriteBufferOverflow(Status status) {
String expectMsg = "maybe write overflow";
return RaftError.EINTERNAL == status.getRaftError() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -362,7 +362,7 @@ private ExecutorService createBackendExecutor(int threads) {
private static ExecutorService newPool(int coreThreads, int maxThreads,
String name,
RejectedExecutionHandler handler) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
return ThreadPoolUtil.newBuilder()
.poolName(name)
.enableMetric(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

import org.slf4j.Logger;

Expand Down Expand Up @@ -103,6 +104,7 @@ public void onApply(Iterator iter) {
LOG.debug("Node role: {}", this.node().selfIsLeader() ?
"leader" : "follower");
StoreClosure closure = null;
List<Future<?>> futures = new ArrayList<>();
try {
while (iter.hasNext()) {
closure = (StoreClosure) iter.done();
Expand All @@ -124,7 +126,7 @@ public void onApply(Iterator iter) {
byte[] bytes = iter.getData().array();
// Follower seems no way to wait future
// Let the backend thread do it directly
this.context.backendExecutor().submit(() -> {
futures.add(this.context.backendExecutor().submit(() -> {
BytesBuffer buffer = LZ4Util.decompress(bytes,
RaftSharedContext.BLOCK_SIZE);
buffer.forReadWritten();
Expand All @@ -137,10 +139,14 @@ public void onApply(Iterator iter) {
action, e);
throw new BackendException("Backend error", e);
}
});
}));
}
iter.next();
}
// Follower wait tasks finished
for (Future<?> future : futures) {
future.get();
}
} catch (Throwable e) {
LOG.error("StateMachine occured critical error", e);
Status status = new Status(RaftError.ESTATEMACHINE,
Expand All @@ -150,6 +156,7 @@ public void onApply(Iterator iter) {
closure.failure(status, e);
}
// Will cause current node inactive
// TODO: rollback to correct index
iter.setErrorAndRollback(1L, status);
}
}
Expand Down Expand Up @@ -253,6 +260,7 @@ public void onConfigurationCommitted(Configuration conf) {

@Override
public void onError(final RaftException e) {
// If busy, spin and wait a moment
LOG.error("Raft error: {}", e.getMessage(), e);
}
}

0 comments on commit 2bc2ab8

Please sign in to comment.