From 842b315476385795014035d52fc372a2f8960c9a Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Tue, 4 Jan 2022 10:29:05 +0800 Subject: [PATCH] improve raft moddule Change-Id: I016d3fcc4ab50614afdf452e7c9691ee3cc3c70b --- hugegraph-core/pom.xml | 2 +- .../backend/store/raft/RaftNode.java | 20 +++++---- .../backend/store/raft/RaftSharedContext.java | 45 +++++++++++-------- 3 files changed, 39 insertions(+), 28 deletions(-) diff --git a/hugegraph-core/pom.xml b/hugegraph-core/pom.xml index ca06fc3120..1ca2e0413c 100644 --- a/hugegraph-core/pom.xml +++ b/hugegraph-core/pom.xml @@ -54,7 +54,7 @@ com.alipay.sofa jraft-core - 1.3.5 + 1.3.9 org.slf4j diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java index 74f8e14db3..d3dd65b0ab 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java @@ -28,7 +28,7 @@ import org.slf4j.Logger; import com.alipay.sofa.jraft.Node; -import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.RaftServiceFactory; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.closure.ReadIndexClosure; import com.alipay.sofa.jraft.core.Replicator.ReplicatorStateListener; @@ -36,7 +36,6 @@ import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.option.NodeOptions; -import com.alipay.sofa.jraft.rpc.RpcServer; import com.alipay.sofa.jraft.util.BytesUtil; import com.baidu.hugegraph.backend.BackendException; import com.baidu.hugegraph.util.LZ4Util; @@ -58,6 +57,7 @@ public RaftNode(RaftSharedContext context) { this.stateMachine = new StoreStateMachine(context); try { this.node = this.initRaftNode(); + LOG.info("Start raft node: {}", this); } catch (IOException e) { throw new BackendException("Failed to init raft node", e); } @@ -94,6 +94,7 @@ public void onLeaderInfoChange(PeerId leaderId, boolean selfIsLeader) { } public void shutdown() { + LOG.info("Shutdown raft node: {}", this); this.node.shutdown(); } @@ -116,13 +117,14 @@ private Node initRaftNode() throws IOException { // TODO: When support sharding, groupId needs to be bound to shard Id String groupId = this.context.group(); PeerId endpoint = this.context.endpoint(); - RpcServer rpcServer = this.context.rpcServer(); - RaftGroupService raftGroupService; - // Shared rpc server - raftGroupService = new RaftGroupService(groupId, endpoint, nodeOptions, - rpcServer, true); - // Start node - return raftGroupService.start(false); + /* + * Start raft node with shared rpc server: + * return new RaftGroupService(groupId, endpoint, nodeOptions, + * this.context.rpcServer(), true) + * .start(false) + */ + return RaftServiceFactory.createAndInitRaftNode(groupId, endpoint, + nodeOptions); } private void submitCommand(StoreCommand command, RaftStoreClosure closure) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index 75a512da41..7858bca8d0 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -30,14 +30,15 @@ import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; -import com.alipay.sofa.jraft.option.ReadOnlyOption; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; +import com.alipay.sofa.jraft.NodeManager; import com.alipay.sofa.jraft.conf.Configuration; import com.alipay.sofa.jraft.entity.PeerId; import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.option.RaftOptions; +import com.alipay.sofa.jraft.option.ReadOnlyOption; import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; import com.alipay.sofa.jraft.rpc.RpcServer; import com.alipay.sofa.jraft.util.NamedThreadFactory; @@ -101,7 +102,7 @@ public final class RaftSharedContext { public RaftSharedContext(HugeGraphParams params) { this.params = params; - HugeConfig config = params.configuration(); + HugeConfig config = this.config(); this.schemaStoreName = config.get(CoreOptions.STORE_SCHEMA); this.graphStoreName = config.get(CoreOptions.STORE_GRAPH); @@ -126,12 +127,7 @@ public RaftSharedContext(HugeGraphParams params) { this.raftGroupManager = null; this.rpcForwarder = null; this.registerRpcRequestProcessors(); - } - - private void registerRpcRequestProcessors() { - this.rpcServer.registerProcessor(new StoreCommandProcessor(this)); - this.rpcServer.registerProcessor(new SetLeaderProcessor(this)); - this.rpcServer.registerProcessor(new ListPeersProcessor(this)); + LOG.info("Start raft server successfully: {}", this.endpoint()); } public void initRaftNode() { @@ -149,8 +145,14 @@ public void waitRaftNodeStarted() { } public void close() { - LOG.info("Stopping raft nodes"); - this.rpcServer.shutdown(); + LOG.info("Stop raft server: {}", this.endpoint()); + + RaftNode node = this.node(); + if (node != null) { + node.shutdown(); + } + + this.shutdownRpcServer(); } public RaftNode node() { @@ -168,10 +170,6 @@ public RaftGroupManager raftNodeManager(String group) { return this.raftGroupManager; } - public RpcServer rpcServer() { - return this.rpcServer; - } - public String group() { return DEFAULT_GROUP; } @@ -340,14 +338,25 @@ private RpcServer initAndStartRpcServer() { System.setProperty("bolt.channel_write_buf_high_water_mark", String.valueOf(highWaterMark)); - PeerId serverId = new PeerId(); - serverId.parse(this.config().get(CoreOptions.RAFT_ENDPOINT)); + PeerId endpoint = this.endpoint(); + NodeManager.getInstance().addAddress(endpoint.getEndpoint()); RpcServer rpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer( - serverId.getEndpoint()); - LOG.info("RPC server is started successfully"); + endpoint.getEndpoint()); return rpcServer; } + private void shutdownRpcServer() { + this.rpcServer.shutdown(); + PeerId endpoint = this.endpoint(); + NodeManager.getInstance().removeAddress(endpoint.getEndpoint()); + } + + private void registerRpcRequestProcessors() { + this.rpcServer.registerProcessor(new StoreCommandProcessor(this)); + this.rpcServer.registerProcessor(new SetLeaderProcessor(this)); + this.rpcServer.registerProcessor(new ListPeersProcessor(this)); + } + private ExecutorService createReadIndexExecutor(int coreThreads) { int maxThreads = coreThreads << 2; String name = "store-read-index-callback";