Skip to content

Commit

Permalink
fix leader not wait for apply-task
Browse files Browse the repository at this point in the history
Change-Id: Ib2a87081f7411d4ebfaaef2f8fff0140bb4d3dc0
  • Loading branch information
javeme committed Jan 6, 2022
1 parent 39a750f commit 9204ab1
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,17 @@ public void run(Status status, long index, byte[] reqCtx) {
future.complete(status, () -> func.apply(query));
} else {
future.failure(status, new BackendException(
"Failed to execute query '%s' with read-index: %s",
query, status));
"Failed to do raft read-index: %s",
status));
}
}
};
this.node().node().readIndex(BytesUtil.EMPTY_BYTES, readIndexClosure);
try {
return future.waitFinished();
} catch (Throwable e) {
LOG.warn("Failed to execute query '{}' with read-index: {}",
query, future.status());
LOG.warn("Failed to execute query '{}': {}",
query, future.status(), e);
throw new BackendException("Failed to execute query: %s", e, query);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package com.baidu.hugegraph.backend.store.raft;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

import org.slf4j.Logger;
Expand Down Expand Up @@ -70,46 +72,20 @@ private RaftNode node() {
public void onApply(Iterator iter) {
LOG.debug("Node role: {}", this.node().selfIsLeader() ?
"leader" : "follower");
RaftStoreClosure closure = null;
List<Future<?>> futures = new ArrayList<>();
try {
// Apply all the logs
while (iter.hasNext()) {
closure = (RaftStoreClosure) iter.done();
RaftStoreClosure closure = (RaftStoreClosure) iter.done();
if (closure != null) {
// Leader just take it out from the closure
StoreCommand command = closure.command();
BytesBuffer buffer = BytesBuffer.wrap(command.data());
// The first two bytes are StoreType and StoreAction
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
boolean forwarded = command.forwarded();
// Let the producer thread to handle it
closure.complete(Status.OK(), () -> {
this.applyCommand(type, action, buffer, forwarded);
return null;
});
futures.add(this.onApplyLeader(closure));
} else {
// Follower need readMutation data
byte[] bytes = iter.getData().array();
// Let the backend thread do it directly
futures.add(this.context.backendExecutor().submit(() -> {
BytesBuffer buffer = LZ4Util.decompress(bytes,
RaftSharedContext.BLOCK_SIZE);
buffer.forReadWritten();
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
try {
this.applyCommand(type, action, buffer, false);
} catch (Throwable e) {
String title = "Failed to execute backend command";
LOG.error("{}: {}", title, action, e);
throw new BackendException(title, e);
}
}));
futures.add(this.onApplyFollower(iter.getData()));
}
iter.next();
}
// Follower wait tasks finished

// Wait for all tasks finished
for (Future<?> future : futures) {
future.get();
}
Expand All @@ -118,17 +94,58 @@ public void onApply(Iterator iter) {
LOG.error("{}", title, e);
Status status = new Status(RaftError.ESTATEMACHINE,
"%s: %s", title, e.getMessage());
if (closure != null) {
closure.failure(status, e);
}
// Will cause current node inactive
// TODO: rollback to correct index
iter.setErrorAndRollback(1L, status);
}
}

private void applyCommand(StoreType type, StoreAction action,
BytesBuffer buffer, boolean forwarded) {
private Future<?> onApplyLeader(RaftStoreClosure closure) {
// Leader just take the command out from the closure
StoreCommand command = closure.command();
BytesBuffer buffer = BytesBuffer.wrap(command.data());
// The first two bytes are StoreType and StoreAction
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
boolean forwarded = command.forwarded();
// Let the producer thread to handle it, and wait for it
CompletableFuture<Object> future = new CompletableFuture<>();
closure.complete(Status.OK(), () -> {
Object result;
try {
result = this.applyCommand(type, action, buffer, forwarded);
} catch (Throwable e) {
future.completeExceptionally(e);
throw e;
}
future.complete(result);
return result;
});
return future;
}

private Future<?> onApplyFollower(ByteBuffer data) {
// Follower need to read mutation data
byte[] bytes = data.array();
// Let the backend thread do it directly
return this.context.backendExecutor().submit(() -> {
BytesBuffer buffer = LZ4Util.decompress(bytes,
RaftSharedContext.BLOCK_SIZE);
buffer.forReadWritten();
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
try {
return this.applyCommand(type, action, buffer, false);
} catch (Throwable e) {
String title = "Failed to execute backend command";
LOG.error("{}: {}", title, action, e);
throw new BackendException(title, e);
}
});
}

private Object applyCommand(StoreType type, StoreAction action,
BytesBuffer buffer, boolean forwarded) {
E.checkState(type != StoreType.ALL,
"Can't apply command for all store at one time");
BackendStore store = this.store(type);
Expand Down Expand Up @@ -171,6 +188,7 @@ private void applyCommand(StoreType type, StoreAction action,
default:
throw new IllegalArgumentException("Invalid action " + action);
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,20 @@ public StoreCommandProcessor(RaftSharedContext context) {
@Override
public Message processRequest(StoreCommandRequest request,
RpcRequestClosure done) {
LOG.debug("Processing StoreCommandRequest");
LOG.debug("Processing StoreCommandRequest: {}", request.getAction());
RaftNode node = this.context.node();
try {
StoreCommand command = this.parseStoreCommand(request);
RaftStoreClosure closure = new RaftStoreClosure(command);
node.submitAndWait(command, closure);
// TODO: return the submitAndWait() result to rpc client
return StoreCommandResponse.newBuilder().setStatus(true).build();
} catch (Throwable e) {
StoreCommandResponse.Builder builder;
builder = StoreCommandResponse.newBuilder().setStatus(false);
LOG.warn("Failed to process StoreCommandRequest: {}",
request.getAction(), e);
StoreCommandResponse.Builder builder = StoreCommandResponse
.newBuilder()
.setStatus(false);
if (e.getMessage() != null) {
builder.setMessage(e.getMessage());
}
Expand Down

0 comments on commit 9204ab1

Please sign in to comment.