-
Notifications
You must be signed in to change notification settings - Fork 521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a callback 'onBusy' used to adaptive rate limit #1401
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ | |
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.Future; | ||
|
||
import org.slf4j.Logger; | ||
|
||
|
@@ -103,6 +104,7 @@ public void onApply(Iterator iter) { | |
LOG.debug("Node role: {}", this.node().selfIsLeader() ? | ||
"leader" : "follower"); | ||
StoreClosure closure = null; | ||
List<Future<?>> futures = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move to line 138 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. line 156 need use it |
||
try { | ||
while (iter.hasNext()) { | ||
closure = (StoreClosure) iter.done(); | ||
|
@@ -122,9 +124,8 @@ public void onApply(Iterator iter) { | |
} else { | ||
// Follower need readMutation data | ||
byte[] bytes = iter.getData().array(); | ||
// Follower seems no way to wait future | ||
// Let the backend thread do it directly | ||
this.context.backendExecutor().submit(() -> { | ||
futures.add(this.context.backendExecutor().submit(() -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how many tasks in general? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. about 10 ~ 20 |
||
BytesBuffer buffer = LZ4Util.decompress(bytes, | ||
RaftSharedContext.BLOCK_SIZE); | ||
buffer.forReadWritten(); | ||
|
@@ -137,10 +138,14 @@ public void onApply(Iterator iter) { | |
action, e); | ||
throw new BackendException("Backend error", e); | ||
} | ||
}); | ||
})); | ||
} | ||
iter.next(); | ||
} | ||
// Follower wait tasks finished | ||
for (Future<?> future : futures) { | ||
future.get(); | ||
} | ||
} catch (Throwable e) { | ||
LOG.error("StateMachine occured critical error", e); | ||
Status status = new Status(RaftError.ESTATEMACHINE, | ||
|
@@ -150,6 +155,7 @@ public void onApply(Iterator iter) { | |
closure.failure(status, e); | ||
} | ||
// Will cause current node inactive | ||
// TODO: rollback to correct index | ||
iter.setErrorAndRollback(1L, status); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seperate into 2 commits:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done