diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java index e675dd9554..3fdfd7689c 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/filter/RedirectFilter.java @@ -82,16 +82,16 @@ public void filter(ContainerRequestContext context) throws IOException { return; } - GlobalMasterInfo globalMasterInfo = manager.globalMasterInfo(); - if (globalMasterInfo == null || !globalMasterInfo.isFeatureSupport()) { + GlobalMasterInfo globalNodeInfo = manager.globalNodeRoleInfo(); + if (globalNodeInfo == null || !globalNodeInfo.supportElection()) { return; } - GlobalMasterInfo.NodeInfo masterNodeInfo = globalMasterInfo.nodeInfo(); - if (masterNodeInfo == null || masterNodeInfo.isMaster() || - StringUtils.isEmpty(masterNodeInfo.url())) { + GlobalMasterInfo.NodeInfo masterInfo = globalNodeInfo.masterInfo(); + if (masterInfo == null || masterInfo.isMaster() || + StringUtils.isEmpty(masterInfo.nodeUrl())) { return; } - String url = masterNodeInfo.url(); + String url = masterInfo.nodeUrl(); URI redirectUri; try { diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java index 2435e2667a..d22b86bab4 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeGraphAuthProxy.java @@ -55,6 +55,7 @@ import org.apache.hugegraph.exception.NotSupportException; import org.apache.hugegraph.iterator.FilterIterator; import org.apache.hugegraph.iterator.MapperIterator; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.masterelection.RoleElectionStateMachine; import org.apache.hugegraph.rpc.RpcServiceConfig4Client; import org.apache.hugegraph.rpc.RpcServiceConfig4Server; @@ -78,7 +79,6 @@ import org.apache.hugegraph.type.Nameable; import org.apache.hugegraph.type.define.GraphMode; import org.apache.hugegraph.type.define.GraphReadMode; -import org.apache.hugegraph.type.define.NodeRole; import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.Log; import org.apache.hugegraph.util.RateLimiter; @@ -669,9 +669,9 @@ public void waitReady(RpcServer rpcServer) { } @Override - public void serverStarted(Id serverId, NodeRole serverRole) { + public void serverStarted(GlobalMasterInfo nodeInfo) { this.verifyAdminPermission(); - this.hugegraph.serverStarted(serverId, serverRole); + this.hugegraph.serverStarted(nodeInfo); } @Override @@ -776,9 +776,9 @@ public void resumeSnapshot() { } @Override - public void create(String configPath, Id server, NodeRole role) { + public void create(String configPath, GlobalMasterInfo nodeInfo) { this.verifyPermission(HugePermission.WRITE, ResourceType.STATUS); - this.hugegraph.create(configPath, server, role); + this.hugegraph.create(configPath, nodeInfo); } @Override diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java index 2c73b5ee93..b8770ca7df 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java @@ -23,8 +23,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -41,7 +39,6 @@ import org.apache.hugegraph.backend.BackendException; import org.apache.hugegraph.backend.cache.Cache; import org.apache.hugegraph.backend.cache.CacheManager; -import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.id.IdGenerator; import org.apache.hugegraph.backend.store.BackendStoreInfo; import org.apache.hugegraph.config.CoreOptions; @@ -53,7 +50,7 @@ import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.masterelection.RoleElectionOptions; import org.apache.hugegraph.masterelection.RoleElectionStateMachine; -import org.apache.hugegraph.masterelection.StandardStateMachineCallback; +import org.apache.hugegraph.masterelection.StandardRoleListener; import org.apache.hugegraph.metrics.MetricsUtil; import org.apache.hugegraph.metrics.ServerReporter; import org.apache.hugegraph.rpc.RpcClientProvider; @@ -88,14 +85,11 @@ public final class GraphManager { private final HugeAuthenticator authenticator; private final RpcServer rpcServer; private final RpcClientProvider rpcClient; - private final HugeConfig conf; - - private RoleElectionStateMachine roleStateWorker; - private GlobalMasterInfo globalMasterInfo; - private Id server; - private NodeRole role; + private RoleElectionStateMachine roleStateMachine; + private GlobalMasterInfo globalNodeRoleInfo; + private final HugeConfig conf; private final EventHub eventHub; public GraphManager(HugeConfig conf, EventHub hub) { @@ -104,6 +98,10 @@ public GraphManager(HugeConfig conf, EventHub hub) { this.authenticator = HugeAuthenticator.loadAuthenticator(conf); this.rpcServer = new RpcServer(conf); this.rpcClient = new RpcClientProvider(conf); + + this.roleStateMachine = null; + this.globalNodeRoleInfo = new GlobalMasterInfo(); + this.eventHub = hub; this.conf = conf; } @@ -141,8 +139,7 @@ public void loadGraphs(Map graphConfs) { } } - public HugeGraph cloneGraph(String name, String newName, - String configText) { + public HugeGraph cloneGraph(String name, String newName, String configText) { /* * 0. check and modify params * 1. create graph instance @@ -270,6 +267,10 @@ public AuthManager authManager() { return this.authenticator().authManager(); } + public GlobalMasterInfo globalNodeRoleInfo() { + return this.globalNodeRoleInfo; + } + public void close() { for (Graph graph : this.graphs.values()) { try { @@ -280,8 +281,8 @@ public void close() { } this.destroyRpcServer(); this.unlistenChanges(); - if (this.roleStateWorker != null) { - this.roleStateWorker.shutdown(); + if (this.roleStateMachine != null) { + this.roleStateMachine.shutdown(); } } @@ -414,8 +415,7 @@ private void waitGraphsReady() { LOG.info("RpcServer is not enabled, skip wait graphs ready"); return; } - com.alipay.remoting.rpc.RpcServer remotingRpcServer = - this.remotingRpcServer(); + com.alipay.remoting.rpc.RpcServer remotingRpcServer = this.remotingRpcServer(); for (String graphName : this.graphs.keySet()) { HugeGraph graph = this.graph(graphName); graph.waitReady(remotingRpcServer); @@ -433,7 +433,7 @@ private void checkBackendVersionOrExit(HugeConfig config) { if (this.requireAuthentication()) { String token = config.get(ServerOptions.AUTH_ADMIN_TOKEN); try { - this.authenticator.initAdminUser(token); + this.authenticator().initAdminUser(token); } catch (Exception e) { throw new BackendException( "The backend store of '%s' can't " + @@ -455,65 +455,57 @@ private void checkBackendVersionOrExit(HugeConfig config) { } private void serverStarted(HugeConfig config) { - String server = config.get(ServerOptions.SERVER_ID); + String id = config.get(ServerOptions.SERVER_ID); String role = config.get(ServerOptions.SERVER_ROLE); - E.checkArgument(StringUtils.isNotEmpty(server), + E.checkArgument(StringUtils.isNotEmpty(id), "The server name can't be null or empty"); E.checkArgument(StringUtils.isNotEmpty(role), "The server role can't be null or empty"); - this.server = IdGenerator.of(server); - this.role = NodeRole.valueOf(role.toUpperCase()); - boolean supportRoleStateWorker = this.supportRoleStateWorker(); - if (supportRoleStateWorker) { - this.role = NodeRole.WORKER; + NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase()); + boolean supportRoleElection = !nodeRole.computer() && + this.supportRoleElection(); + if (supportRoleElection) { + // Init any server as Worker role, then do role election + nodeRole = NodeRole.WORKER; } + this.globalNodeRoleInfo.initNodeId(IdGenerator.of(id)); + this.globalNodeRoleInfo.initNodeRole(nodeRole); + for (String graph : this.graphs()) { HugeGraph hugegraph = this.graph(graph); assert hugegraph != null; - hugegraph.serverStarted(this.server, this.role); + hugegraph.serverStarted(this.globalNodeRoleInfo); } - if (supportRoleStateWorker) { - this.initRoleStateWorker(); + if (supportRoleElection) { + this.initRoleStateMachine(); } } - private void initRoleStateWorker() { - E.checkArgument(this.roleStateWorker == null, "Repetition init"); - Executor applyThread = Executors.newSingleThreadExecutor(); - this.roleStateWorker = this.authenticator().graph().roleElectionStateMachine(); - this.globalMasterInfo = new GlobalMasterInfo(); - StandardStateMachineCallback stateMachineCallback = new StandardStateMachineCallback( - TaskManager.instance(), - this.globalMasterInfo); - applyThread.execute(() -> { - this.roleStateWorker.apply(stateMachineCallback); - }); - } - - public GlobalMasterInfo globalMasterInfo() { - return this.globalMasterInfo; + private void initRoleStateMachine() { + E.checkArgument(this.roleStateMachine == null, + "Repeated initialization of role state worker"); + this.globalNodeRoleInfo.supportElection(true); + this.roleStateMachine = this.authenticator().graph().roleElectionStateMachine(); + StandardRoleListener listener = new StandardRoleListener(TaskManager.instance(), + this.globalNodeRoleInfo); + this.roleStateMachine.start(listener); } - private boolean supportRoleStateWorker() { - if (this.role.computer()) { - return false; - } - + private boolean supportRoleElection() { try { if (!(this.authenticator() instanceof StandardAuthenticator)) { LOG.info("{} authenticator does not support role election currently", this.authenticator().getClass().getSimpleName()); return false; } + return true; } catch (IllegalStateException e) { - LOG.info("Unconfigured StandardAuthenticator, not support role election currently"); + LOG.info("{}, does not support role election currently", e.getMessage()); return false; } - - return true; } private void addMetrics(HugeConfig config) { @@ -591,7 +583,7 @@ private HugeGraph createGraph(HugeConfig config, String name) { graph = (HugeGraph) GraphFactory.open(config); // Init graph and start it - graph.create(this.graphsDir, this.server, this.role); + graph.create(this.graphsDir, this.globalNodeRoleInfo); } catch (Throwable e) { LOG.error("Failed to create graph '{}' due to: {}", name, e.getMessage(), e); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraph.java index cd287c47be..85c093e59e 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraph.java @@ -30,6 +30,7 @@ import org.apache.hugegraph.backend.store.raft.RaftGroupManager; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.config.TypedOption; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.masterelection.RoleElectionStateMachine; import org.apache.hugegraph.rpc.RpcServiceConfig4Client; import org.apache.hugegraph.rpc.RpcServiceConfig4Server; @@ -44,12 +45,11 @@ import org.apache.hugegraph.task.TaskScheduler; import org.apache.hugegraph.traversal.optimize.HugeCountStepStrategy; import org.apache.hugegraph.traversal.optimize.HugeGraphStepStrategy; -import org.apache.hugegraph.traversal.optimize.HugeVertexStepStrategy; import org.apache.hugegraph.traversal.optimize.HugePrimaryKeyStrategy; +import org.apache.hugegraph.traversal.optimize.HugeVertexStepStrategy; import org.apache.hugegraph.type.HugeType; import org.apache.hugegraph.type.define.GraphMode; import org.apache.hugegraph.type.define.GraphReadMode; -import org.apache.hugegraph.type.define.NodeRole; import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -201,7 +201,7 @@ public interface HugeGraph extends Graph { void waitReady(RpcServer rpcServer); - void serverStarted(Id serverId, NodeRole serverRole); + void serverStarted(GlobalMasterInfo nodeInfo); boolean started(); @@ -221,7 +221,7 @@ public interface HugeGraph extends Graph { void resumeSnapshot(); - void create(String configPath, Id server, NodeRole role); + void create(String configPath, GlobalMasterInfo nodeInfo); void drop(); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index db37d0a4bd..c671056e04 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -51,7 +51,6 @@ import org.apache.hugegraph.backend.store.raft.RaftBackendStoreProvider; import org.apache.hugegraph.backend.store.raft.RaftGroupManager; import org.apache.hugegraph.backend.store.ram.RamTable; -import org.apache.hugegraph.task.EphemeralJobQueue; import org.apache.hugegraph.backend.tx.GraphTransaction; import org.apache.hugegraph.backend.tx.SchemaTransaction; import org.apache.hugegraph.config.CoreOptions; @@ -64,6 +63,7 @@ import org.apache.hugegraph.job.EphemeralJob; import org.apache.hugegraph.masterelection.ClusterRoleStore; import org.apache.hugegraph.masterelection.Config; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.masterelection.RoleElectionConfig; import org.apache.hugegraph.masterelection.RoleElectionOptions; import org.apache.hugegraph.masterelection.RoleElectionStateMachine; @@ -84,13 +84,13 @@ import org.apache.hugegraph.structure.HugeFeatures; import org.apache.hugegraph.structure.HugeVertex; import org.apache.hugegraph.structure.HugeVertexProperty; +import org.apache.hugegraph.task.EphemeralJobQueue; import org.apache.hugegraph.task.ServerInfoManager; import org.apache.hugegraph.task.TaskManager; import org.apache.hugegraph.task.TaskScheduler; import org.apache.hugegraph.type.HugeType; import org.apache.hugegraph.type.define.GraphMode; import org.apache.hugegraph.type.define.GraphReadMode; -import org.apache.hugegraph.type.define.NodeRole; import org.apache.hugegraph.util.ConfigUtil; import org.apache.hugegraph.util.DateUtil; import org.apache.hugegraph.util.E; @@ -267,15 +267,15 @@ public BackendFeatures backendStoreFeatures() { } @Override - public void serverStarted(Id serverId, NodeRole serverRole) { + public void serverStarted(GlobalMasterInfo nodeInfo) { LOG.info("Init system info for graph '{}'", this.name); this.initSystemInfo(); LOG.info("Init server info [{}-{}] for graph '{}'...", - serverId, serverRole, this.name); - this.serverInfoManager().initServerInfo(serverId, serverRole); + nodeInfo.nodeId(), nodeInfo.nodeRole(), this.name); + this.serverInfoManager().initServerInfo(nodeInfo); - this.initRoleStateWorker(serverId); + this.initRoleStateMachine(nodeInfo.nodeId()); // TODO: check necessary? LOG.info("Check olap property-key tables for graph '{}'", this.name); @@ -291,16 +291,18 @@ public void serverStarted(Id serverId, NodeRole serverRole) { this.started = true; } - private void initRoleStateWorker(Id serverId) { - Config roleStateMachineConfig = new RoleElectionConfig(serverId.toString(), - this.configuration.get(RoleElectionOptions.NODE_EXTERNAL_URL), - this.configuration.get(RoleElectionOptions.EXCEEDS_FAIL_COUNT), - this.configuration.get(RoleElectionOptions.RANDOM_TIMEOUT_MILLISECOND), - this.configuration.get(RoleElectionOptions.HEARTBEAT_INTERVAL_SECOND), - this.configuration.get(RoleElectionOptions.MASTER_DEAD_TIMES), - this.configuration.get(RoleElectionOptions.BASE_TIMEOUT_MILLISECOND)); - ClusterRoleStore clusterRoleStore = new StandardClusterRoleStore(this.params); - this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleStateMachineConfig, clusterRoleStore); + private void initRoleStateMachine(Id serverId) { + HugeConfig conf = this.configuration; + Config roleConfig = new RoleElectionConfig(serverId.toString(), + conf.get(RoleElectionOptions.NODE_EXTERNAL_URL), + conf.get(RoleElectionOptions.EXCEEDS_FAIL_COUNT), + conf.get(RoleElectionOptions.RANDOM_TIMEOUT_MILLISECOND), + conf.get(RoleElectionOptions.HEARTBEAT_INTERVAL_SECOND), + conf.get(RoleElectionOptions.MASTER_DEAD_TIMES), + conf.get(RoleElectionOptions.BASE_TIMEOUT_MILLISECOND)); + ClusterRoleStore roleStore = new StandardClusterRoleStore(this.params); + this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleConfig, + roleStore); } @Override @@ -399,8 +401,7 @@ public void truncateBackend() { try { this.storeProvider.truncate(); // TODO: remove this after serverinfo saved in etcd - this.serverStarted(this.serverInfoManager().selfServerId(), - this.serverInfoManager().selfServerRole()); + this.serverStarted(this.serverInfoManager().globalNodeRoleInfo()); } finally { LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK); } @@ -974,9 +975,9 @@ public synchronized void close() throws Exception { } @Override - public void create(String configPath, Id server, NodeRole role) { + public void create(String configPath, GlobalMasterInfo nodeInfo) { this.initBackend(); - this.serverStarted(server, role); + this.serverStarted(nodeInfo); // Write config to disk file String confPath = ConfigUtil.writeToFile(configPath, this.name(), @@ -1052,8 +1053,7 @@ public TaskScheduler taskScheduler() { } private ServerInfoManager serverInfoManager() { - ServerInfoManager manager = this.taskManager - .getServerInfoManager(this.params); + ServerInfoManager manager = this.taskManager.getServerInfoManager(this.params); E.checkState(manager != null, "Can't find server info manager for graph '%s'", this); return manager; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/job/computer/AbstractComputer.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/job/computer/AbstractComputer.java index a40d0001b4..11207ebae5 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/job/computer/AbstractComputer.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/job/computer/AbstractComputer.java @@ -32,16 +32,15 @@ import org.apache.commons.configuration2.tree.ImmutableNode; import org.apache.commons.configuration2.tree.NodeHandler; import org.apache.commons.configuration2.tree.NodeModel; -import org.apache.hugegraph.type.define.Directions; -import org.apache.hugegraph.util.ParameterUtil; -import org.slf4j.Logger; - import org.apache.hugegraph.HugeException; import org.apache.hugegraph.job.ComputerJob; import org.apache.hugegraph.job.Job; import org.apache.hugegraph.traversal.algorithm.HugeTraverser; +import org.apache.hugegraph.type.define.Directions; import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.Log; +import org.apache.hugegraph.util.ParameterUtil; +import org.slf4j.Logger; public abstract class AbstractComputer implements Computer { @@ -85,7 +84,6 @@ public void checkParameters(Map parameters) { @Override public Object call(Job job, Map parameters) { - this.checkAndCollectParameters(parameters); // Read configuration try { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java index d7dd127af5..9124fcd0b6 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java @@ -17,50 +17,103 @@ package org.apache.hugegraph.masterelection; -public class GlobalMasterInfo { +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.id.IdGenerator; +import org.apache.hugegraph.type.define.NodeRole; +import org.apache.hugegraph.util.E; - private NodeInfo nodeInfo; - private volatile boolean featureSupport; +// TODO: rename to GlobalNodeRoleInfo +public final class GlobalMasterInfo { + + private final static NodeInfo NO_MASTER = new NodeInfo(false, ""); + + private volatile boolean supportElection; + private volatile NodeInfo masterNodeInfo; + + private volatile Id nodeId; + private volatile NodeRole nodeRole; public GlobalMasterInfo() { - this.featureSupport = false; - this.nodeInfo = new NodeInfo(false, ""); + this(NO_MASTER); + } + + public GlobalMasterInfo(NodeInfo masterInfo) { + this.supportElection = false; + this.masterNodeInfo = masterInfo; + + this.nodeId = null; + this.nodeRole = null; + } + + public void supportElection(boolean featureSupport) { + this.supportElection = featureSupport; + } + + public boolean supportElection() { + return this.supportElection; } - public final void nodeInfo(boolean isMaster, String url) { + public void resetMasterInfo() { + this.masterNodeInfo = NO_MASTER; + } + + public void masterInfo(boolean isMaster, String nodeUrl) { // final can avoid instruction rearrangement, visibility can be ignored - final NodeInfo tmp = new NodeInfo(isMaster, url); - this.nodeInfo = tmp; + this.masterNodeInfo = new NodeInfo(isMaster, nodeUrl); + } + + public NodeInfo masterInfo() { + return this.masterNodeInfo; + } + + public Id nodeId() { + return this.nodeId; + } + + public NodeRole nodeRole() { + return this.nodeRole; + } + + public void initNodeId(Id id) { + this.nodeId = id; } - public final NodeInfo nodeInfo() { - return this.nodeInfo; + public void initNodeRole(NodeRole role) { + E.checkArgument(role != null, "The server role can't be null"); + E.checkArgument(this.nodeRole == null, + "The server role can't be init twice"); + this.nodeRole = role; } - public void isFeatureSupport(boolean featureSupport) { - this.featureSupport = featureSupport; + public void changeNodeRole(NodeRole role) { + E.checkArgument(role != null, "The server role can't be null"); + this.nodeRole = role; } - public boolean isFeatureSupport() { - return this.featureSupport; + public static GlobalMasterInfo master(String nodeId) { + NodeInfo masterInfo = new NodeInfo(true, nodeId); + GlobalMasterInfo nodeInfo = new GlobalMasterInfo(masterInfo); + nodeInfo.nodeId = IdGenerator.of(nodeId); + nodeInfo.nodeRole = NodeRole.MASTER; + return nodeInfo; } public static class NodeInfo { private final boolean isMaster; - private final String url; + private final String nodeUrl; public NodeInfo(boolean isMaster, String url) { this.isMaster = isMaster; - this.url = url; + this.nodeUrl = url; } public boolean isMaster() { return this.isMaster; } - public String url() { - return this.url; + public String nodeUrl() { + return this.nodeUrl; } } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/RoleElectionStateMachine.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/RoleElectionStateMachine.java index 920bc104f1..2a33d1bf6c 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/RoleElectionStateMachine.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/RoleElectionStateMachine.java @@ -21,5 +21,5 @@ public interface RoleElectionStateMachine { void shutdown(); - void apply(StateMachineCallback stateMachineCallback); + void start(RoleListener callback); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StateMachineCallback.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/RoleListener.java similarity index 96% rename from hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StateMachineCallback.java rename to hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/RoleListener.java index 35abbe3402..e99db4c16f 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StateMachineCallback.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/RoleListener.java @@ -17,7 +17,7 @@ package org.apache.hugegraph.masterelection; -public interface StateMachineCallback { +public interface RoleListener { void onAsRoleMaster(StateMachineContext context); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleElectionStateMachine.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleElectionStateMachine.java index aa284def60..a0e2601d49 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleElectionStateMachine.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleElectionStateMachine.java @@ -19,6 +19,8 @@ import java.security.SecureRandom; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.locks.LockSupport; import org.apache.hugegraph.util.E; @@ -29,25 +31,36 @@ public class StandardRoleElectionStateMachine implements RoleElectionStateMachin private static final Logger LOG = Log.logger(StandardRoleElectionStateMachine.class); - private volatile boolean shutdown; private final Config config; + private final ClusterRoleStore roleStore; + private final ExecutorService applyThread; + + private volatile boolean shutdown; private volatile RoleState state; - private final ClusterRoleStore clusterRoleStore; - public StandardRoleElectionStateMachine(Config config, ClusterRoleStore clusterRoleStore) { + public StandardRoleElectionStateMachine(Config config, ClusterRoleStore roleStore) { this.config = config; - this.clusterRoleStore = clusterRoleStore; + this.roleStore = roleStore; + this.applyThread = Executors.newSingleThreadExecutor(); this.state = new UnknownState(null); this.shutdown = false; } @Override public void shutdown() { + if (this.shutdown) { + return; + } this.shutdown = true; + this.applyThread.shutdown(); } @Override - public void apply(StateMachineCallback stateMachineCallback) { + public void start(RoleListener stateMachineCallback) { + this.applyThread.execute(() -> this.apply(stateMachineCallback)); + } + + private void apply(RoleListener stateMachineCallback) { int failCount = 0; StateMachineContextImpl context = new StateMachineContextImpl(this); while (!this.shutdown) { @@ -73,13 +86,17 @@ public void apply(StateMachineCallback stateMachineCallback) { } } + protected ClusterRoleStore roleStore() { + return this.roleStore; + } + private interface RoleState { SecureRandom SECURE_RANDOM = new SecureRandom(); RoleState transform(StateMachineContext context); - Callback callback(StateMachineCallback callback); + Callback callback(RoleListener callback); static void heartBeatPark(StateMachineContext context) { long heartBeatIntervalSecond = context.config().heartBeatIntervalSecond(); @@ -110,7 +127,7 @@ public UnknownState(Integer epoch) { @Override public RoleState transform(StateMachineContext context) { - ClusterRoleStore adapter = context.adapter(); + ClusterRoleStore adapter = context.roleStore(); Optional clusterRoleOpt = adapter.query(); if (!clusterRoleOpt.isPresent()) { context.reset(); @@ -137,7 +154,7 @@ public RoleState transform(StateMachineContext context) { } @Override - public Callback callback(StateMachineCallback callback) { + public Callback callback(RoleListener callback) { return callback::unknown; } } @@ -158,7 +175,7 @@ public RoleState transform(StateMachineContext context) { } @Override - public Callback callback(StateMachineCallback callback) { + public Callback callback(RoleListener callback) { return callback::onAsRoleAbdication; } } @@ -175,7 +192,7 @@ public MasterState(ClusterRole clusterRole) { public RoleState transform(StateMachineContext context) { this.clusterRole.increaseClock(); RoleState.heartBeatPark(context); - if (context.adapter().updateIfNodePresent(this.clusterRole)) { + if (context.roleStore().updateIfNodePresent(this.clusterRole)) { return this; } context.reset(); @@ -184,7 +201,7 @@ public RoleState transform(StateMachineContext context) { } @Override - public Callback callback(StateMachineCallback callback) { + public Callback callback(RoleListener callback) { return callback::onAsRoleMaster; } } @@ -216,7 +233,7 @@ public RoleState transform(StateMachineContext context) { } @Override - public Callback callback(StateMachineCallback callback) { + public Callback callback(RoleListener callback) { return callback::onAsRoleWorker; } @@ -255,7 +272,7 @@ public RoleState transform(StateMachineContext context) { context.config().url(), epoch); // The master failover completed context.epoch(clusterRole.epoch()); - if (context.adapter().updateIfNodePresent(clusterRole)) { + if (context.roleStore().updateIfNodePresent(clusterRole)) { context.master(new MasterServerInfoImpl(clusterRole.node(), clusterRole.url())); return new MasterState(clusterRole); } else { @@ -264,7 +281,7 @@ public RoleState transform(StateMachineContext context) { } @Override - public Callback callback(StateMachineCallback callback) { + public Callback callback(RoleListener callback) { return callback::onAsRoleCandidate; } } @@ -303,8 +320,8 @@ public void epoch(Integer epoch) { } @Override - public ClusterRoleStore adapter() { - return this.machine.adapter(); + public ClusterRoleStore roleStore() { + return this.machine.roleStore(); } @Override @@ -348,8 +365,4 @@ public String node() { return this.node; } } - - protected ClusterRoleStore adapter() { - return this.clusterRoleStore; - } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardStateMachineCallback.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java similarity index 67% rename from hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardStateMachineCallback.java rename to hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java index 28e01d2913..f2bb94f521 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardStateMachineCallback.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java @@ -23,84 +23,83 @@ import org.apache.hugegraph.util.Log; import org.slf4j.Logger; -public class StandardStateMachineCallback implements StateMachineCallback { +public class StandardRoleListener implements RoleListener { - private static final Logger LOG = Log.logger(StandardStateMachineCallback.class); + private static final Logger LOG = Log.logger(StandardRoleListener.class); private final TaskManager taskManager; - private final GlobalMasterInfo globalMasterInfo; + private final GlobalMasterInfo roleInfo; - private boolean isMaster = false; + private volatile boolean selfIsMaster; - public StandardStateMachineCallback(TaskManager taskManager, - GlobalMasterInfo globalMasterInfo) { + public StandardRoleListener(TaskManager taskManager, + GlobalMasterInfo roleInfo) { this.taskManager = taskManager; - this.taskManager.enableRoleElected(true); - this.globalMasterInfo = globalMasterInfo; - this.globalMasterInfo.isFeatureSupport(true); + this.taskManager.enableRoleElection(); + this.roleInfo = roleInfo; + this.selfIsMaster = false; } @Override public void onAsRoleMaster(StateMachineContext context) { - if (!isMaster) { + if (!selfIsMaster) { this.taskManager.onAsRoleMaster(); LOG.info("Server {} change to master role", context.config().node()); } - this.initGlobalMasterInfo(context); - this.isMaster = true; + this.updateMasterInfo(context); + this.selfIsMaster = true; } @Override public void onAsRoleWorker(StateMachineContext context) { - if (isMaster) { + if (this.selfIsMaster) { this.taskManager.onAsRoleWorker(); LOG.info("Server {} change to worker role", context.config().node()); } - this.initGlobalMasterInfo(context); - this.isMaster = false; + this.updateMasterInfo(context); + this.selfIsMaster = false; } @Override public void onAsRoleCandidate(StateMachineContext context) { + // pass } @Override - public void unknown(StateMachineContext context) { - if (isMaster) { + public void onAsRoleAbdication(StateMachineContext context) { + if (this.selfIsMaster) { this.taskManager.onAsRoleWorker(); LOG.info("Server {} change to worker role", context.config().node()); } - this.initGlobalMasterInfo(context); + this.updateMasterInfo(context); + this.selfIsMaster = false; + } - isMaster = false; + @Override + public void error(StateMachineContext context, Throwable e) { + LOG.error("Server {} exception occurred", context.config().node(), e); } @Override - public void onAsRoleAbdication(StateMachineContext context) { - if (isMaster) { + public void unknown(StateMachineContext context) { + if (this.selfIsMaster) { this.taskManager.onAsRoleWorker(); LOG.info("Server {} change to worker role", context.config().node()); } - this.initGlobalMasterInfo(context); - - isMaster = false; - } + this.updateMasterInfo(context); - @Override - public void error(StateMachineContext context, Throwable e) { - LOG.error("Server {} exception occurred", context.config().node(), e); + this.selfIsMaster = false; } - public void initGlobalMasterInfo(StateMachineContext context) { + public void updateMasterInfo(StateMachineContext context) { StateMachineContext.MasterServerInfo master = context.master(); if (master == null) { - this.globalMasterInfo.nodeInfo(false, ""); + this.roleInfo.resetMasterInfo(); return; } boolean isMaster = Objects.equals(context.node(), master.node()); - String url = master.url(); - this.globalMasterInfo.nodeInfo(isMaster, url); + this.roleInfo.masterInfo(isMaster, master.url()); } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StateMachineContext.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StateMachineContext.java index a3eaf89626..587cd417fc 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StateMachineContext.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StateMachineContext.java @@ -33,7 +33,7 @@ public interface StateMachineContext { void master(MasterServerInfo info); - ClusterRoleStore adapter(); + ClusterRoleStore roleStore(); void reset(); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java index e8cccf88e3..ee14b4ceb2 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java @@ -37,6 +37,7 @@ import org.apache.hugegraph.exception.ConnectionException; import org.apache.hugegraph.iterator.ListIterator; import org.apache.hugegraph.iterator.MapperIterator; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.schema.PropertyKey; import org.apache.hugegraph.schema.VertexLabel; import org.apache.hugegraph.structure.HugeVertex; @@ -61,8 +62,7 @@ public class ServerInfoManager { private final HugeGraphParams graph; private final ExecutorService dbExecutor; - private Id selfServerId; - private NodeRole selfServerRole; + private volatile GlobalMasterInfo globalNodeInfo; private volatile boolean onlySingleNode; private volatile boolean closed; @@ -75,8 +75,7 @@ public ServerInfoManager(HugeGraphParams graph, this.graph = graph; this.dbExecutor = dbExecutor; - this.selfServerId = null; - this.selfServerRole = NodeRole.MASTER; + this.globalNodeInfo = null; this.onlySingleNode = false; this.closed = false; @@ -86,7 +85,7 @@ public void init() { HugeServerInfo.schema(this.graph).initSchemaIfNeeded(); } - public boolean close() { + public synchronized boolean close() { this.closed = true; if (!this.dbExecutor.isShutdown()) { this.removeSelfServerInfo(); @@ -103,40 +102,24 @@ public boolean close() { return true; } - public synchronized void forceInitServerInfo(Id server, NodeRole role) { - if (this.closed) { - return; - } - - E.checkArgument(server != null && role != null, - "The server id or role can't be null"); - this.selfServerId = server; - this.selfServerRole = role; + public synchronized void initServerInfo(GlobalMasterInfo nodeInfo) { + E.checkArgument(nodeInfo != null, "The global node info can't be null"); - this.saveServerInfo(this.selfServerId, this.selfServerRole); - } - - public synchronized void initServerInfo(Id server, NodeRole role) { - E.checkArgument(server != null && role != null, - "The server id or role can't be null"); - this.selfServerId = server; - this.selfServerRole = role; - - HugeServerInfo existed = this.serverInfo(server); + Id serverId = nodeInfo.nodeId(); + HugeServerInfo existed = this.serverInfo(serverId); E.checkArgument(existed == null || !existed.alive(), "The server with name '%s' already in cluster", - server); - if (role.master()) { + serverId); + + if (nodeInfo.nodeRole().master()) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { - Iterator servers = this.serverInfos(PAGE_SIZE, - page); + Iterator servers = this.serverInfos(PAGE_SIZE, page); while (servers.hasNext()) { existed = servers.next(); - E.checkArgument(!existed.role().master() || - !existed.alive(), - "Already existed master '%s' in current " + - "cluster", existed.id()); + E.checkArgument(!existed.role().master() || !existed.alive(), + "Already existed master '%s' in current cluster", + existed.id()); } if (page != null) { page = PageInfo.pageInfo(servers); @@ -144,36 +127,80 @@ public synchronized void initServerInfo(Id server, NodeRole role) { } while (page != null); } - // TODO: save ServerInfo at AuthServer - this.saveServerInfo(this.selfServerId, this.selfServerRole); + this.globalNodeInfo = nodeInfo; + + // TODO: save ServerInfo to AuthServer + this.saveServerInfo(this.selfNodeId(), this.selfNodeRole()); + } + + public synchronized void changeServerRole(NodeRole nodeRole) { + if (this.closed) { + return; + } + + this.globalNodeInfo.changeNodeRole(nodeRole); + + // TODO: save ServerInfo to AuthServer + this.saveServerInfo(this.selfNodeId(), this.selfNodeRole()); } - public Id selfServerId() { - return this.selfServerId; + public GlobalMasterInfo globalNodeRoleInfo() { + return this.globalNodeInfo; } - public NodeRole selfServerRole() { - return this.selfServerRole; + public Id selfNodeId() { + if (this.globalNodeInfo == null) { + return null; + } + return this.globalNodeInfo.nodeId(); + } + + public NodeRole selfNodeRole() { + if (this.globalNodeInfo == null) { + return null; + } + return this.globalNodeInfo.nodeRole(); } - public boolean master() { - return this.selfServerRole != null && this.selfServerRole.master(); + public boolean selfIsMaster() { + return this.selfNodeRole() != null && this.selfNodeRole().master(); } public boolean onlySingleNode() { - // Only has one master node + // Only exists one node in the whole master return this.onlySingleNode; } - public void heartbeat() { + public synchronized void heartbeat() { + assert this.graphIsReady(); + HugeServerInfo serverInfo = this.selfServerInfo(); - if (serverInfo == null && this.selfServerId != null && - this.selfServerRole != NodeRole.MASTER) { - serverInfo = this.saveServerInfo(this.selfServerId, - this.selfServerRole); + if (serverInfo != null) { + // Update heartbeat time for this server + serverInfo.updateTime(DateUtil.now()); + this.save(serverInfo); + return; } - serverInfo.updateTime(DateUtil.now()); - this.save(serverInfo); + + /* ServerInfo is missing */ + if (this.selfNodeId() == null) { + // Ignore if ServerInfo is not initialized + LOG.info("ServerInfo is missing: {}, may not be initialized yet"); + return; + } + if (this.selfIsMaster()) { + // On master node, just wait for ServerInfo re-init + LOG.warn("ServerInfo is missing: {}, may be cleared before", + this.selfNodeId()); + return; + } + /* + * Missing server info on non-master node, may be caused by graph + * truncated on master node then synced by raft. + * TODO: we just patch it here currently, to be improved. + */ + serverInfo = this.saveServerInfo(this.selfNodeId(), this.selfNodeRole()); + assert serverInfo != null; } public synchronized void decreaseLoad(int load) { @@ -188,7 +215,7 @@ public int calcMaxLoad() { return 10000; } - protected boolean graphReady() { + protected boolean graphIsReady() { return !this.closed && this.graph.started() && this.graph.initialized(); } @@ -242,8 +269,8 @@ private GraphTransaction tx() { return this.graph.systemTransaction(); } - private HugeServerInfo saveServerInfo(Id server, NodeRole role) { - HugeServerInfo serverInfo = new HugeServerInfo(server, role); + private HugeServerInfo saveServerInfo(Id serverId, NodeRole serverRole) { + HugeServerInfo serverInfo = new HugeServerInfo(serverId, serverRole); serverInfo.maxLoad(this.calcMaxLoad()); this.save(serverInfo); @@ -310,16 +337,16 @@ private V call(Callable callable) { } private HugeServerInfo selfServerInfo() { - HugeServerInfo selfServerInfo = this.serverInfo(this.selfServerId); - if (selfServerInfo == null) { - LOG.warn("ServerInfo is missing: {}", this.selfServerId); + HugeServerInfo selfServerInfo = this.serverInfo(this.selfNodeId()); + if (selfServerInfo == null && this.selfNodeId() != null) { + LOG.warn("ServerInfo is missing: {}", this.selfNodeId()); } return selfServerInfo; } - private HugeServerInfo serverInfo(Id server) { + private HugeServerInfo serverInfo(Id serverId) { return this.call(() -> { - Iterator vertices = this.tx().queryVertices(server); + Iterator vertices = this.tx().queryVertices(serverId); Vertex vertex = QueryResults.one(vertices); if (vertex == null) { return null; @@ -335,19 +362,19 @@ private HugeServerInfo removeSelfServerInfo() { * backend store, initServerInfo() is not called in this case, so * this.selfServerId is null at this time. */ - if (this.selfServerId != null && this.graph.initialized()) { - return this.removeServerInfo(this.selfServerId); + if (this.selfNodeId() != null && this.graph.initialized()) { + return this.removeServerInfo(this.selfNodeId()); } return null; } - private HugeServerInfo removeServerInfo(Id server) { - if (server == null) { + private HugeServerInfo removeServerInfo(Id serverId) { + if (serverId == null) { return null; } - LOG.info("Remove server info: {}", server); + LOG.info("Remove server info: {}", serverId); return this.call(() -> { - Iterator vertices = this.tx().queryVertices(server); + Iterator vertices = this.tx().queryVertices(serverId); Vertex vertex = QueryResults.one(vertices); if (vertex == null) { return null; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java index 9eda3f6b02..120aeb0d66 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java @@ -141,7 +141,7 @@ private TaskTransaction tx() { @Override public void restoreTasks() { - Id selfServer = this.serverManager().selfServerId(); + Id selfServer = this.serverManager().selfNodeId(); // Restore 'RESTORING', 'RUNNING' and 'QUEUED' tasks in order. for (TaskStatus status : TaskStatus.PENDING_STATUSES) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; @@ -177,35 +177,35 @@ private Future restore(HugeTask task) { public Future schedule(HugeTask task) { E.checkArgumentNotNull(task, "Task can't be null"); + /* + * Just submit to queue if status=QUEUED (means re-schedule task) + * NOTE: schedule() method may be called multi times by + * HugeTask.checkDependenciesSuccess() method + */ if (task.status() == TaskStatus.QUEUED) { - /* - * Just submit to queue if status=QUEUED (means re-schedule task) - * NOTE: schedule() method may be called multi times by - * HugeTask.checkDependenciesSuccess() method - */ return this.resubmitTask(task); } + /* + * Due to EphemeralJob won't be serialized and deserialized through + * shared storage, submit EphemeralJob immediately on any node + */ if (task.callable() instanceof EphemeralJob) { - /* - * Due to EphemeralJob won't be serialized and deserialized through - * shared storage, submit EphemeralJob immediately on master - * NOTE: don't need to save EphemeralJob task - */ + // NOTE: we don't need to save EphemeralJob task task.status(TaskStatus.QUEUED); return this.submitTask(task); } - // Only check if not EphemeralJob + // Check this is on master for normal task schedule this.checkOnMasterNode("schedule"); if (this.serverManager().onlySingleNode() && !task.computer()) { /* * Speed up for single node, submit task immediately, - * this code can be removed without affecting logic + * this code can be removed without affecting code logic */ task.status(TaskStatus.QUEUED); - task.server(this.serverManager().selfServerId()); + task.server(this.serverManager().selfNodeId()); this.save(task); return this.submitTask(task); } else { @@ -278,8 +278,8 @@ public synchronized void cancel(HugeTask task) { // The task scheduled to workers, let the worker node to cancel this.save(task); assert task.server() != null : task; - assert this.serverManager().master(); - if (!task.server().equals(this.serverManager().selfServerId())) { + assert this.serverManager().selfIsMaster(); + if (!task.server().equals(this.serverManager().selfNodeId())) { /* * Remove task from memory if it's running on worker node, * but keep task in memory if it's running on master node. @@ -303,10 +303,10 @@ protected ServerInfoManager serverManager() { return this.serverManager; } - protected synchronized void scheduleTasks() { + protected synchronized void scheduleTasksOnMaster() { // Master server schedule all scheduling tasks to suitable worker nodes - Collection scheduleInfos = this.serverManager() - .allServerInfos(); + Collection serverInfos = this.serverManager() + .allServerInfos(); String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { Iterator> tasks = this.tasks(TaskStatus.SCHEDULING, @@ -318,12 +318,12 @@ protected synchronized void scheduleTasks() { continue; } - if (!this.serverManager.master()) { + if (!this.serverManager.selfIsMaster()) { return; } HugeServerInfo server = this.serverManager().pickWorkerNode( - scheduleInfos, task); + serverInfos, task); if (server == null) { LOG.info("The master can't find suitable servers to " + "execute task '{}', wait for next schedule", @@ -348,7 +348,8 @@ protected synchronized void scheduleTasks() { } } while (page != null); - this.serverManager().updateServerInfos(scheduleInfos); + // Save to store + this.serverManager().updateServerInfos(serverInfos); } protected void executeTasksOnWorker(Id server) { @@ -422,7 +423,7 @@ protected void cancelTasksOnWorker(Id server) { protected void taskDone(HugeTask task) { this.remove(task); - Id selfServerId = this.serverManager().selfServerId(); + Id selfServerId = this.serverManager().selfNodeId(); try { this.serverManager().decreaseLoad(task.load()); } catch (Throwable e) { @@ -718,7 +719,7 @@ private V call(Callable callable) { } private void checkOnMasterNode(String op) { - if (!this.serverManager().master()) { + if (!this.serverManager().selfIsMaster()) { throw new HugeException("Can't %s task on non-master server", op); } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index 0ad96f443c..b3726d3830 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -243,19 +243,8 @@ public int pendingTasks() { return size; } - protected void notifyNewTask(HugeTask task) { - Queue queue = ((ThreadPoolExecutor) this.schedulerExecutor) - .getQueue(); - if (queue.size() <= 1) { - /* - * Notify to schedule tasks initiatively when have new task - * It's OK to not notify again if there are more than one task in - * queue(like two, one is timer task, one is immediate task), - * we don't want too many immediate tasks to be inserted into queue, - * one notify will cause all the tasks to be processed. - */ - this.schedulerExecutor.submit(this::scheduleOrExecuteJob); - } + public void enableRoleElection() { + this.enableRoleElected = true; } public void onAsRoleMaster() { @@ -263,7 +252,7 @@ public void onAsRoleMaster() { for (TaskScheduler entry : this.schedulers.values()) { StandardTaskScheduler scheduler = (StandardTaskScheduler) entry; ServerInfoManager serverInfoManager = scheduler.serverManager(); - serverInfoManager.forceInitServerInfo(serverInfoManager.selfServerId(), NodeRole.MASTER); + serverInfoManager.changeServerRole(NodeRole.MASTER); } } catch (Throwable e) { LOG.error("Exception occurred when change to master role", e); @@ -276,7 +265,7 @@ public void onAsRoleWorker() { for (TaskScheduler entry : this.schedulers.values()) { StandardTaskScheduler scheduler = (StandardTaskScheduler) entry; ServerInfoManager serverInfoManager = scheduler.serverManager(); - serverInfoManager.forceInitServerInfo(serverInfoManager.selfServerId(), NodeRole.WORKER); + serverInfoManager.changeServerRole(NodeRole.WORKER); } } catch (Throwable e) { LOG.error("Exception occurred when change to worker role", e); @@ -284,8 +273,19 @@ public void onAsRoleWorker() { } } - public void enableRoleElected(boolean enableRoleElected) { - this.enableRoleElected = enableRoleElected; + protected void notifyNewTask(HugeTask task) { + Queue queue = ((ThreadPoolExecutor) this.schedulerExecutor) + .getQueue(); + if (queue.size() <= 1) { + /* + * Notify to schedule tasks initiatively when have new task + * It's OK to not notify again if there are more than one task in + * queue(like two, one is timer task, one is immediate task), + * we don't want too many immediate tasks to be inserted into queue, + * one notify will cause all the tasks to be processed. + */ + this.schedulerExecutor.submit(this::scheduleOrExecuteJob); + } } private void scheduleOrExecuteJob() { @@ -324,7 +324,7 @@ private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler) { * If graph is closed, don't call serverManager.initialized() * due to it will reopen graph tx. */ - if (!serverManager.graphReady()) { + if (!serverManager.graphIsReady()) { return; } @@ -332,25 +332,25 @@ private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler) { serverManager.heartbeat(); /* - * Master schedule tasks to suitable servers. - * Worker maybe become Master, so Master also need perform tasks assigned by - * previous Master when enableRoleElected is true. - * However, the master only needs to take the assignment, - * because the master stays the same when enableRoleElected is false. - * There is no suitable server when these tasks are created + * Master will schedule tasks to suitable servers. + * Note a Worker may become to a Master, so elected-Master also needs to + * execute tasks assigned by previous Master when enableRoleElected=true. + * However, when enableRoleElected=false, a Master is only set by the + * config assignment, assigned-Master always stays the same state. */ - if (serverManager.master()) { - scheduler.scheduleTasks(); + if (serverManager.selfIsMaster()) { + scheduler.scheduleTasksOnMaster(); if (!this.enableRoleElected && !serverManager.onlySingleNode()) { + // assigned-Master + non-single-node don't need to execute tasks return; } } - // Schedule queued tasks scheduled to current server - scheduler.executeTasksOnWorker(serverManager.selfServerId()); + // Execute queued tasks scheduled to current server + scheduler.executeTasksOnWorker(serverManager.selfNodeId()); // Cancel tasks scheduled to current server - scheduler.cancelTasksOnWorker(serverManager.selfServerId()); + scheduler.cancelTasksOnWorker(serverManager.selfNodeId()); } finally { LockUtil.unlock(graph, LockUtil.GRAPH_LOCK); } diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/gremlin-server.yaml b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/gremlin-server.yaml index dff43cb04b..5946779828 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/gremlin-server.yaml +++ b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/gremlin-server.yaml @@ -40,6 +40,8 @@ scriptEngines: { org.apache.hugegraph.backend.id.IdGenerator, org.apache.hugegraph.type.define.Directions, org.apache.hugegraph.type.define.NodeRole, + org.apache.hugegraph.masterelection.GlobalMasterInfo, + org.apache.hugegraph.util.DateUtil, org.apache.hugegraph.traversal.algorithm.CollectionPathsTraverser, org.apache.hugegraph.traversal.algorithm.CountTraverser, org.apache.hugegraph.traversal.algorithm.CustomizedCrosspointsTraverser, @@ -64,7 +66,6 @@ scriptEngines: { org.apache.hugegraph.traversal.optimize.ConditionP, org.apache.hugegraph.traversal.optimize.Text, org.apache.hugegraph.traversal.optimize.TraversalUtil, - org.apache.hugegraph.util.DateUtil, org.opencypher.gremlin.traversal.CustomFunctions, org.opencypher.gremlin.traversal.CustomPredicate ], diff --git a/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft1/gremlin-server.yaml b/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft1/gremlin-server.yaml index 517c0a5fcc..55a8c0bb34 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft1/gremlin-server.yaml +++ b/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft1/gremlin-server.yaml @@ -36,6 +36,8 @@ scriptEngines: { org.apache.hugegraph.backend.id.IdGenerator, org.apache.hugegraph.type.define.Directions, org.apache.hugegraph.type.define.NodeRole, + org.apache.hugegraph.masterelection.GlobalMasterInfo, + org.apache.hugegraph.util.DateUtil, org.apache.hugegraph.traversal.algorithm.CollectionPathsTraverser, org.apache.hugegraph.traversal.algorithm.CountTraverser, org.apache.hugegraph.traversal.algorithm.CustomizedCrosspointsTraverser, @@ -57,9 +59,11 @@ scriptEngines: { org.apache.hugegraph.traversal.algorithm.steps.EdgeStep, org.apache.hugegraph.traversal.algorithm.steps.RepeatEdgeStep, org.apache.hugegraph.traversal.algorithm.steps.WeightedEdgeStep, + org.apache.hugegraph.traversal.optimize.ConditionP, org.apache.hugegraph.traversal.optimize.Text, org.apache.hugegraph.traversal.optimize.TraversalUtil, - org.apache.hugegraph.util.DateUtil + org.opencypher.gremlin.traversal.CustomFunctions, + org.opencypher.gremlin.traversal.CustomPredicate ], methodImports: [java.lang.Math#*] }, diff --git a/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft2/gremlin-server.yaml b/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft2/gremlin-server.yaml index bd37369c83..54d9a6ddec 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft2/gremlin-server.yaml +++ b/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft2/gremlin-server.yaml @@ -36,6 +36,8 @@ scriptEngines: { org.apache.hugegraph.backend.id.IdGenerator, org.apache.hugegraph.type.define.Directions, org.apache.hugegraph.type.define.NodeRole, + org.apache.hugegraph.masterelection.GlobalMasterInfo, + org.apache.hugegraph.util.DateUtil, org.apache.hugegraph.traversal.algorithm.CollectionPathsTraverser, org.apache.hugegraph.traversal.algorithm.CountTraverser, org.apache.hugegraph.traversal.algorithm.CustomizedCrosspointsTraverser, @@ -57,9 +59,11 @@ scriptEngines: { org.apache.hugegraph.traversal.algorithm.steps.EdgeStep, org.apache.hugegraph.traversal.algorithm.steps.RepeatEdgeStep, org.apache.hugegraph.traversal.algorithm.steps.WeightedEdgeStep, + org.apache.hugegraph.traversal.optimize.ConditionP, org.apache.hugegraph.traversal.optimize.Text, org.apache.hugegraph.traversal.optimize.TraversalUtil, - org.apache.hugegraph.util.DateUtil + org.opencypher.gremlin.traversal.CustomFunctions, + org.opencypher.gremlin.traversal.CustomPredicate ], methodImports: [java.lang.Math#*] }, diff --git a/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft3/gremlin-server.yaml b/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft3/gremlin-server.yaml index a034d63520..508abef354 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft3/gremlin-server.yaml +++ b/hugegraph-server/hugegraph-dist/src/assembly/travis/conf-raft3/gremlin-server.yaml @@ -36,6 +36,8 @@ scriptEngines: { org.apache.hugegraph.backend.id.IdGenerator, org.apache.hugegraph.type.define.Directions, org.apache.hugegraph.type.define.NodeRole, + org.apache.hugegraph.masterelection.GlobalMasterInfo, + org.apache.hugegraph.util.DateUtil, org.apache.hugegraph.traversal.algorithm.CollectionPathsTraverser, org.apache.hugegraph.traversal.algorithm.CountTraverser, org.apache.hugegraph.traversal.algorithm.CustomizedCrosspointsTraverser, @@ -57,9 +59,11 @@ scriptEngines: { org.apache.hugegraph.traversal.algorithm.steps.EdgeStep, org.apache.hugegraph.traversal.algorithm.steps.RepeatEdgeStep, org.apache.hugegraph.traversal.algorithm.steps.WeightedEdgeStep, + org.apache.hugegraph.traversal.optimize.ConditionP, org.apache.hugegraph.traversal.optimize.Text, org.apache.hugegraph.traversal.optimize.TraversalUtil, - org.apache.hugegraph.util.DateUtil + org.opencypher.gremlin.traversal.CustomFunctions, + org.opencypher.gremlin.traversal.CustomPredicate ], methodImports: [java.lang.Math#*] }, diff --git a/hugegraph-server/hugegraph-example/src/main/java/org/apache/hugegraph/example/ExampleUtil.java b/hugegraph-server/hugegraph-example/src/main/java/org/apache/hugegraph/example/ExampleUtil.java index 7cb148262a..8317f9ba7f 100644 --- a/hugegraph-server/hugegraph-example/src/main/java/org/apache/hugegraph/example/ExampleUtil.java +++ b/hugegraph-server/hugegraph-example/src/main/java/org/apache/hugegraph/example/ExampleUtil.java @@ -20,18 +20,17 @@ import java.io.File; import java.util.Iterator; import java.util.concurrent.TimeoutException; -import org.slf4j.Logger; import org.apache.hugegraph.HugeException; import org.apache.hugegraph.HugeFactory; import org.apache.hugegraph.HugeGraph; -import org.apache.hugegraph.backend.id.IdGenerator; import org.apache.hugegraph.dist.RegisterUtil; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.perf.PerfUtil; import org.apache.hugegraph.task.HugeTask; import org.apache.hugegraph.task.TaskScheduler; -import org.apache.hugegraph.type.define.NodeRole; import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; public class ExampleUtil { private static final Logger LOG = Log.logger(ExampleUtil.class); @@ -81,7 +80,7 @@ public static HugeGraph loadGraph(boolean needClear, boolean needProfile) { graph.clearBackend(); } graph.initBackend(); - graph.serverStarted(IdGenerator.of("server1"), NodeRole.MASTER); + graph.serverStarted(GlobalMasterInfo.master("server1")); return graph; } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/api/GremlinApiTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/api/GremlinApiTest.java index 57aefeb9be..b065270871 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/api/GremlinApiTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/api/GremlinApiTest.java @@ -121,7 +121,7 @@ public void testClearAndInit() { body = "{" + "\"gremlin\":\"hugegraph.serverStarted(" + - " IdGenerator.of('server1'), NodeRole.MASTER)\"," + + " GlobalMasterInfo.master('server1'))\"," + "\"bindings\":{}," + "\"language\":\"gremlin-groovy\"," + "\"aliases\":{\"g\":\"__g_hugegraph\"}}"; diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/BaseCoreTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/BaseCoreTest.java index d51e1b5951..df9932ab8e 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/BaseCoreTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/BaseCoreTest.java @@ -21,13 +21,12 @@ import org.apache.hugegraph.HugeGraph; import org.apache.hugegraph.HugeGraphParams; -import org.apache.hugegraph.backend.id.IdGenerator; import org.apache.hugegraph.backend.store.BackendFeatures; import org.apache.hugegraph.dist.RegisterUtil; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.schema.SchemaManager; import org.apache.hugegraph.testutil.Utils; import org.apache.hugegraph.testutil.Whitebox; -import org.apache.hugegraph.type.define.NodeRole; import org.apache.hugegraph.util.Log; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Vertex; @@ -66,7 +65,7 @@ public static void init() { graph = Utils.open(); graph.clearBackend(); graph.initBackend(); - graph.serverStarted(IdGenerator.of("server1"), NodeRole.MASTER); + graph.serverStarted(GlobalMasterInfo.master("server-test")); } @AfterClass diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/CoreTestSuite.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/CoreTestSuite.java index e140474b40..beafedf863 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/CoreTestSuite.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/CoreTestSuite.java @@ -36,9 +36,7 @@ TaskCoreTest.class, AuthTest.class, MultiGraphsTest.class, - RamTableTest.class, - RoleElectionStateMachineTest.class + RamTableTest.class }) public class CoreTestSuite { - } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java index 23fb122f44..3b468ba458 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java @@ -31,8 +31,8 @@ import org.apache.hugegraph.backend.store.BackendStoreInfo; import org.apache.hugegraph.backend.store.rocksdb.RocksDBOptions; import org.apache.hugegraph.config.CoreOptions; -import org.apache.hugegraph.exception.ConnectionException; import org.apache.hugegraph.exception.ExistedException; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.schema.EdgeLabel; import org.apache.hugegraph.schema.IndexLabel; import org.apache.hugegraph.schema.PropertyKey; @@ -40,7 +40,6 @@ import org.apache.hugegraph.schema.VertexLabel; import org.apache.hugegraph.testutil.Assert; import org.apache.hugegraph.testutil.Utils; -import org.apache.hugegraph.type.define.NodeRole; import org.apache.tinkerpop.gremlin.structure.T; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.GraphFactory; @@ -87,9 +86,9 @@ public void testCopySchemaWithMultiGraphs() { graph.initBackend(); } HugeGraph g1 = graphs.get(0); - g1.serverStarted(IdGenerator.of("server-g2"), NodeRole.MASTER); + g1.serverStarted(GlobalMasterInfo.master("server-g2")); HugeGraph g2 = graphs.get(1); - g2.serverStarted(IdGenerator.of("server-g3"), NodeRole.MASTER); + g2.serverStarted(GlobalMasterInfo.master("server-g3")); SchemaManager schema = g1.schema(); @@ -209,8 +208,8 @@ public void testCopySchemaWithMultiGraphsWithConflict() { } HugeGraph g1 = graphs.get(0); HugeGraph g2 = graphs.get(1); - g1.serverStarted(IdGenerator.of("server-g1c"), NodeRole.MASTER); - g2.serverStarted(IdGenerator.of("server-g2c"), NodeRole.MASTER); + g1.serverStarted(GlobalMasterInfo.master("server-g1c")); + g2.serverStarted(GlobalMasterInfo.master("server-g2c")); g1.schema().propertyKey("id").asInt().create(); g2.schema().propertyKey("id").asText().create(); diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/RoleElectionStateMachineTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/RoleElectionStateMachineTest.java index 9fbbb5e628..dd73821661 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/RoleElectionStateMachineTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/RoleElectionStateMachineTest.java @@ -34,8 +34,8 @@ import org.apache.hugegraph.masterelection.ClusterRoleStore; import org.apache.hugegraph.masterelection.Config; import org.apache.hugegraph.masterelection.RoleElectionStateMachine; +import org.apache.hugegraph.masterelection.RoleListener; import org.apache.hugegraph.masterelection.StandardRoleElectionStateMachine; -import org.apache.hugegraph.masterelection.StateMachineCallback; import org.apache.hugegraph.masterelection.StateMachineContext; import org.apache.hugegraph.testutil.Assert; import org.apache.hugegraph.testutil.Utils; @@ -43,13 +43,13 @@ public class RoleElectionStateMachineTest { - public static class LogEntry { + private static class LogEntry { - Integer epoch; + private final Integer epoch; - String node; + private final String node; - Role role; + private final Role role; enum Role { master, @@ -74,28 +74,29 @@ public boolean equals(Object obj) { return false; } LogEntry logEntry = (LogEntry) obj; - return Objects.equals(epoch, logEntry.epoch) && - Objects.equals(node, logEntry.node) && role == logEntry.role; + return Objects.equals(this.epoch, logEntry.epoch) && + Objects.equals(this.node, logEntry.node) && + this.role == logEntry.role; } @Override public int hashCode() { - return Objects.hash(epoch, node, role); + return Objects.hash(this.epoch, this.node, this.role); } @Override public String toString() { return "LogEntry{" + - "epoch=" + epoch + - ", node='" + node + '\'' + - ", role=" + role + + "epoch=" + this.epoch + + ", node='" + this.node + '\'' + + ", role=" + this.role + '}'; } } private static class TestConfig implements Config { - String node; + private final String node; public TestConfig(String node) { this.node = node; @@ -139,11 +140,11 @@ public long baseTimeoutMillisecond() { @Test public void testStateMachine() throws InterruptedException { - final CountDownLatch stop = new CountDownLatch(4); final int MAX_COUNT = 200; - final List logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT)); - final List masterNodes = Collections.synchronizedList(new ArrayList<>(MAX_COUNT)); - final StateMachineCallback callback = new StateMachineCallback() { + CountDownLatch stop = new CountDownLatch(4); + List logRecords = Collections.synchronizedList(new ArrayList<>(MAX_COUNT)); + List masterNodes = Collections.synchronizedList(new ArrayList<>(MAX_COUNT)); + RoleListener callback = new RoleListener() { @Override public void onAsRoleMaster(StateMachineContext context) { @@ -200,12 +201,13 @@ public void onAsRoleAbdication(StateMachineContext context) { @Override public void error(StateMachineContext context, Throwable e) { Utils.println("state machine error: node " + - context.node() + - " message " + e.getMessage()); + context.node() + " message " + e.getMessage()); } }; - final List clusterRoleLogs = Collections.synchronizedList(new ArrayList<>(100)); + final List clusterRoleLogs = Collections.synchronizedList( + new ArrayList<>(100)); + final ClusterRoleStore clusterRoleStore = new ClusterRoleStore() { volatile int epoch = 0; @@ -227,7 +229,7 @@ public boolean updateIfNodePresent(ClusterRole clusterRole) { } ClusterRole copy = this.copy(clusterRole); - ClusterRole newClusterRole = data.compute(copy.epoch(), (key, value) -> { + ClusterRole newClusterRole = this.data.compute(copy.epoch(), (key, value) -> { if (copy.epoch() > this.epoch) { this.epoch = copy.epoch(); Assert.assertNull(value); @@ -262,27 +264,27 @@ public Optional query() { Thread node1 = new Thread(() -> { Config config = new TestConfig("1"); RoleElectionStateMachine stateMachine = - new StandardRoleElectionStateMachine(config, clusterRoleStore); + new StandardRoleElectionStateMachine(config, clusterRoleStore); machines[1] = stateMachine; - stateMachine.apply(callback); + stateMachine.start(callback); stop.countDown(); }); Thread node2 = new Thread(() -> { Config config = new TestConfig("2"); RoleElectionStateMachine stateMachine = - new StandardRoleElectionStateMachine(config, clusterRoleStore); + new StandardRoleElectionStateMachine(config, clusterRoleStore); machines[2] = stateMachine; - stateMachine.apply(callback); + stateMachine.start(callback); stop.countDown(); }); Thread node3 = new Thread(() -> { Config config = new TestConfig("3"); RoleElectionStateMachine stateMachine = - new StandardRoleElectionStateMachine(config, clusterRoleStore); + new StandardRoleElectionStateMachine(config, clusterRoleStore); machines[3] = stateMachine; - stateMachine.apply(callback); + stateMachine.start(callback); stop.countDown(); }); diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java index c6dcff4a87..415e804626 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/tinkerpop/TestGraph.java @@ -24,27 +24,24 @@ import java.util.Set; import org.apache.commons.configuration2.Configuration; -import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; -import org.apache.tinkerpop.gremlin.structure.Edge; -import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.T; -import org.apache.tinkerpop.gremlin.structure.Transaction; -import org.apache.tinkerpop.gremlin.structure.Vertex; -import org.apache.tinkerpop.gremlin.structure.io.Io; - import org.apache.hugegraph.HugeGraph; -import org.apache.hugegraph.backend.id.Id; -import org.apache.hugegraph.backend.id.IdGenerator; import org.apache.hugegraph.backend.store.BackendStoreInfo; import org.apache.hugegraph.io.HugeGraphIoRegistry; import org.apache.hugegraph.io.HugeGraphSONModule; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.perf.PerfUtil.Watched; import org.apache.hugegraph.schema.PropertyKey; import org.apache.hugegraph.schema.SchemaManager; import org.apache.hugegraph.task.TaskScheduler; import org.apache.hugegraph.testutil.Whitebox; import org.apache.hugegraph.type.define.IdStrategy; -import org.apache.hugegraph.type.define.NodeRole; +import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.T; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.Io; import com.google.common.collect.ImmutableSet; @@ -85,8 +82,7 @@ protected void initBackend() { assert sysInfo.exists() && !this.graph.closed(); } - Id id = IdGenerator.of("server-tinkerpop"); - this.graph.serverStarted(id, NodeRole.MASTER); + this.graph.serverStarted(GlobalMasterInfo.master("server-tinkerpop")); this.initedBackend = true; } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java index eb1fb6ad3b..458c2d8a9e 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java @@ -17,6 +17,7 @@ package org.apache.hugegraph.unit; +import org.apache.hugegraph.core.RoleElectionStateMachineTest; import org.apache.hugegraph.unit.cache.CacheManagerTest; import org.apache.hugegraph.unit.cache.CacheTest; import org.apache.hugegraph.unit.cache.CachedGraphTransactionTest; @@ -111,6 +112,7 @@ TraversalUtilTest.class, PageStateTest.class, SystemSchemaStoreTest.class, + RoleElectionStateMachineTest.class, /* serializer */ BytesBufferTest.class, diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/core/SecurityManagerTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/core/SecurityManagerTest.java index 403bc62e99..ae431480c4 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/core/SecurityManagerTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/core/SecurityManagerTest.java @@ -32,23 +32,22 @@ import java.util.Map; import java.util.concurrent.TimeoutException; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - import org.apache.hugegraph.HugeException; import org.apache.hugegraph.HugeFactory; import org.apache.hugegraph.HugeGraph; -import org.apache.hugegraph.backend.id.IdGenerator; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.job.GremlinJob; import org.apache.hugegraph.job.JobBuilder; +import org.apache.hugegraph.masterelection.GlobalMasterInfo; import org.apache.hugegraph.security.HugeSecurityManager; import org.apache.hugegraph.task.HugeTask; import org.apache.hugegraph.testutil.Assert; -import org.apache.hugegraph.type.define.NodeRole; import org.apache.hugegraph.unit.FakeObjects; import org.apache.hugegraph.util.JsonUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + import com.google.common.collect.ImmutableMap; public class SecurityManagerTest { @@ -319,7 +318,7 @@ private static HugeGraph loadGraph(boolean needClear) { graph.clearBackend(); } graph.initBackend(); - graph.serverStarted(IdGenerator.of("server1"), NodeRole.MASTER); + graph.serverStarted(GlobalMasterInfo.master("server1")); return graph; }