From 1e6caafa51b7b1ba5010c7b268754721c91c6ff9 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Tue, 29 Mar 2022 15:14:11 +0800 Subject: [PATCH] improve raft module Change-Id: I16b3c5d47b35f584252c45843c48111c7cf5d9a5 --- .../backend/store/raft/RaftBackendStore.java | 4 +- .../backend/store/raft/RaftClosure.java | 6 +- .../backend/store/raft/RaftNode.java | 83 ++++++++++--------- .../backend/store/raft/RaftResult.java | 4 + .../backend/store/raft/RaftSharedContext.java | 4 +- .../backend/store/raft/rpc/RpcForwarder.java | 23 ++--- 6 files changed, 71 insertions(+), 53 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java index 5f9dfb2d8a..c21e3f58e7 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java @@ -56,7 +56,7 @@ public RaftBackendStore(BackendStore store, RaftSharedContext context) { this.store = store; this.context = context; this.mutationBatch = new ThreadLocal<>(); - this.isSafeRead = this.context.isSafeRead(); + this.isSafeRead = this.context.safeRead(); } public BackendStore originStore() { @@ -228,7 +228,7 @@ public void run(Status status, long index, byte[] reqCtx) { } } }; - this.node().node().readIndex(BytesUtil.EMPTY_BYTES, readIndexClosure); + this.node().readIndex(BytesUtil.EMPTY_BYTES, readIndexClosure); try { return future.waitFinished(); } catch (Throwable e) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftClosure.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftClosure.java index eb337c72cc..1808bc85b6 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftClosure.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftClosure.java @@ -68,6 +68,10 @@ private RaftResult get() { } } + public void complete(Status status) { + this.future.complete(new RaftResult<>(status)); + } + public void complete(Status status, Supplier callback) { this.future.complete(new RaftResult<>(status, callback)); } @@ -79,7 +83,7 @@ public void failure(Status status, Throwable exception) { @Override public void run(Status status) { if (status.isOk()) { - this.complete(status, () -> null); + this.complete(status); } else { LOG.error("Failed to apply command: {}", status); String msg = "Failed to apply command in raft node with error: " + diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java index d3dd65b0ab..078b9382fc 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java @@ -67,11 +67,11 @@ public RaftNode(RaftSharedContext context) { this.busyCounter = new AtomicInteger(); } - public RaftSharedContext context() { + protected RaftSharedContext context() { return this.context; } - public Node node() { + protected Node node() { assert this.node != null; return this.node; } @@ -111,62 +111,55 @@ public void snapshot() { } } - private Node initRaftNode() throws IOException { - NodeOptions nodeOptions = this.context.nodeOptions(); - nodeOptions.setFsm(this.stateMachine); - // TODO: When support sharding, groupId needs to be bound to shard Id - String groupId = this.context.group(); - PeerId endpoint = this.context.endpoint(); - /* - * Start raft node with shared rpc server: - * return new RaftGroupService(groupId, endpoint, nodeOptions, - * this.context.rpcServer(), true) - * .start(false) - */ - return RaftServiceFactory.createAndInitRaftNode(groupId, endpoint, - nodeOptions); + public void readIndex(byte[] reqCtx, ReadIndexClosure done) { + this.node.readIndex(reqCtx, done); } - private void submitCommand(StoreCommand command, RaftStoreClosure closure) { + public Object submitAndWait(StoreCommand command, RaftStoreClosure future) { + // Submit command to raft node + this.submitCommand(command, future); + + try { + /* + * Here wait for the command to complete: + * 1.If on the leader, wait for the logs has been committed. + * 2.If on the follower, request command will be forwarded to the + * leader, actually it has waited in forwardToLeader(). + */ + return future.waitFinished(); + } catch (Throwable e) { + throw new BackendException("Failed to wait store command %s", + e, command); + } + } + + private void submitCommand(StoreCommand command, RaftStoreClosure future) { // Wait leader elected LeaderInfo leaderInfo = this.waitLeaderElected( RaftSharedContext.NO_TIMEOUT); + // If myself is not leader, forward to the leader if (!leaderInfo.selfIsLeader) { this.context.rpcForwarder().forwardToLeader(leaderInfo.leaderId, - command, closure); + command, future); return; } + // Sleep a while when raft node is busy this.waitIfBusy(); Task task = new Task(); - task.setDone(closure); - // compress return BytesBuffer + // Compress data, note compress() will return a BytesBuffer ByteBuffer buffer = LZ4Util.compress(command.data(), RaftSharedContext.BLOCK_SIZE) .forReadWritten() .asByteBuffer(); - LOG.debug("The bytes size of command(compressed) {} is {}", - command.action(), buffer.limit()); + LOG.debug("Submit to raft node '{}', the compressed bytes of command " + + "{} is {}", this.node, command.action(), buffer.limit()); task.setData(buffer); - LOG.debug("submit to raft node {}", this.node); + task.setDone(future); this.node.apply(task); } - public Object submitAndWait(StoreCommand command, RaftStoreClosure future) { - this.submitCommand(command, future); - try { - /* - * Here will wait future complete, actually the follower has waited - * in forwardToLeader, written like this to simplify the code - */ - return future.waitFinished(); - } catch (Throwable e) { - throw new BackendException("Failed to wait store command %s", - e, command); - } - } - protected LeaderInfo waitLeaderElected(int timeout) { String group = this.context.group(); LeaderInfo leaderInfo = this.leaderInfo.get(); @@ -250,6 +243,22 @@ private void waitIfBusy() { } } + private Node initRaftNode() throws IOException { + NodeOptions nodeOptions = this.context.nodeOptions(); + nodeOptions.setFsm(this.stateMachine); + // TODO: When support sharding, groupId needs to be bound to shard Id + String groupId = this.context.group(); + PeerId endpoint = this.context.endpoint(); + /* + * Start raft node with shared rpc server: + * return new RaftGroupService(groupId, endpoint, nodeOptions, + * this.context.rpcServer(), true) + * .start(false) + */ + return RaftServiceFactory.createAndInitRaftNode(groupId, endpoint, + nodeOptions); + } + @Override public String toString() { return String.format("[%s-%s]", this.context.group(), this.nodeId()); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftResult.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftResult.java index 5bee760edb..5f26993ffa 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftResult.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftResult.java @@ -30,6 +30,10 @@ public final class RaftResult { private final Supplier callback; private final Throwable exception; + public RaftResult(Status status) { + this(status, () -> null, null); + } + public RaftResult(Status status, Supplier callback) { this(status, callback, null); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index 8342d08a0e..3176cafc8d 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -136,7 +136,7 @@ public RaftSharedContext(HugeGraphParams params) { public void initRaftNode() { this.raftNode = new RaftNode(this); - this.rpcForwarder = new RpcForwarder(this.raftNode); + this.rpcForwarder = new RpcForwarder(this.raftNode.node()); this.raftGroupManager = new RaftGroupManagerImpl(this); } @@ -337,7 +337,7 @@ public PeerId endpoint() { return endpoint; } - public boolean isSafeRead() { + public boolean safeRead() { return this.config().get(CoreOptions.RAFT_SAFE_READ); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/RpcForwarder.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/RpcForwarder.java index 0151f5040d..2c93d113ee 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/RpcForwarder.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/RpcForwarder.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; +import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.core.NodeImpl; import com.alipay.sofa.jraft.entity.PeerId; @@ -34,7 +35,6 @@ import com.alipay.sofa.jraft.util.Endpoint; import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.backend.store.raft.RaftClosure; -import com.baidu.hugegraph.backend.store.raft.RaftNode; import com.baidu.hugegraph.backend.store.raft.RaftStoreClosure; import com.baidu.hugegraph.backend.store.raft.StoreCommand; import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.CommonResponse; @@ -53,14 +53,14 @@ public class RpcForwarder { private final PeerId nodeId; private final RaftClientService rpcClient; - public RpcForwarder(RaftNode node) { - this.nodeId = node.node().getNodeId().getPeerId(); - this.rpcClient = ((NodeImpl) node.node()).getRpcService(); + public RpcForwarder(Node node) { + this.nodeId = node.getNodeId().getPeerId(); + this.rpcClient = ((NodeImpl) node).getRpcService(); E.checkNotNull(this.rpcClient, "rpc client"); } public void forwardToLeader(PeerId leaderId, StoreCommand command, - RaftStoreClosure closure) { + RaftStoreClosure future) { E.checkNotNull(leaderId, "leader id"); E.checkState(!leaderId.equals(this.nodeId), "Invalid state: current node is the leader, there is " + @@ -80,7 +80,7 @@ public void forwardToLeader(PeerId leaderId, StoreCommand command, public void setResponse(StoreCommandResponse response) { if (response.getStatus()) { LOG.debug("StoreCommandResponse status ok"); - closure.complete(Status.OK(), () -> null); + future.complete(Status.OK(), () -> null); } else { LOG.debug("StoreCommandResponse status error"); Status status = new Status(RaftError.UNKNOWN, @@ -90,13 +90,13 @@ public void setResponse(StoreCommandResponse response) { "is [%s], failed to forward request " + "to leader: %s", leaderId, response.getMessage()); - closure.failure(status, e); + future.failure(status, e); } } @Override public void run(Status status) { - closure.run(status); + future.run(status); } }; this.waitRpc(leaderId.getEndpoint(), request, responseClosure); @@ -112,7 +112,7 @@ public RaftClosure forwardToLeader(PeerId leaderId, this.nodeId, leaderId); RaftClosure future = new RaftClosure<>(); - RpcResponseClosure responseClosure = new RpcResponseClosure() { + RpcResponseClosure responseDone = new RpcResponseClosure() { @Override public void setResponse(T response) { FieldDescriptor fd = response.getDescriptorForType() @@ -142,7 +142,7 @@ public void run(Status status) { future.run(status); } }; - this.waitRpc(leaderId.getEndpoint(), request, responseClosure); + this.waitRpc(leaderId.getEndpoint(), request, responseDone); return future; } @@ -151,7 +151,8 @@ private void waitRpc(Endpoint endpoint, Message request, E.checkNotNull(endpoint, "leader endpoint"); try { this.rpcClient.invokeWithDone(endpoint, request, done, - WAIT_RPC_TIMEOUT).get(); + WAIT_RPC_TIMEOUT) + .get(); } catch (InterruptedException e) { throw new BackendException("Invoke rpc request was interrupted, " + "please try again later", e);