From c4c5becbbe88d171f2bbe2f8f63051d5855c5e1f Mon Sep 17 00:00:00 2001 From: imbajin Date: Tue, 8 Oct 2024 22:43:14 +0800 Subject: [PATCH] tiny improve --- .../apache/hugegraph/StandardHugeGraph.java | 7 ++-- .../apache/hugegraph/task/HugeServerInfo.java | 38 +++++++++-------- .../hugegraph/task/ServerInfoManager.java | 27 ++++-------- .../hugegraph/task/StandardTaskScheduler.java | 41 ++++++++----------- 4 files changed, 49 insertions(+), 64 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index 4e9263a488..6480d7f288 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -316,8 +316,7 @@ private void initRoleStateMachine(Id serverId) { conf.get( RoleElectionOptions.BASE_TIMEOUT_MILLISECOND)); ClusterRoleStore roleStore = new StandardClusterRoleStore(this.params); - this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleConfig, - roleStore); + this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleConfig, roleStore); } @Override @@ -1007,7 +1006,7 @@ public void create(String configPath, GlobalMasterInfo nodeInfo) { this.initBackend(); this.serverStarted(nodeInfo); - // Write config to disk file + // Write config to the disk file String confPath = ConfigUtil.writeToFile(configPath, this.name(), this.configuration()); this.configuration.file(confPath); @@ -1349,7 +1348,7 @@ public String schedulerType() { private class TinkerPopTransaction extends AbstractThreadLocalTransaction { - // Times opened from upper layer + // Times opened from the upper layer private final AtomicInteger refs; // Flag opened of each thread private final ThreadLocal opened; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java index 3b3cf1ad88..32c21cd730 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import org.apache.hugegraph.HugeException; import org.apache.hugegraph.HugeGraph; import org.apache.hugegraph.HugeGraphParams; import org.apache.hugegraph.backend.id.Id; @@ -47,14 +48,13 @@ public class HugeServerInfo { // Unit millisecond - private static final long EXPIRED_INTERVAL = - TaskManager.SCHEDULE_PERIOD * 10; + private static final long EXPIRED_INTERVAL = TaskManager.SCHEDULE_PERIOD * 10; - private Id id; private NodeRole role; + private Date updateTime; private int maxLoad; private int load; - private Date updateTime; + private final Id id; private transient boolean updated = false; @@ -204,8 +204,7 @@ public Map asMap() { public static HugeServerInfo fromVertex(Vertex vertex) { HugeServerInfo serverInfo = new HugeServerInfo((Id) vertex.id()); - for (Iterator> iter = vertex.properties(); - iter.hasNext(); ) { + for (Iterator> iter = vertex.properties(); iter.hasNext(); ) { VertexProperty prop = iter.next(); serverInfo.property(prop.key(), prop.value()); } @@ -250,7 +249,7 @@ public static final class Schema { public static final String SERVER = P.SERVER; - protected final HugeGraphParams graph; + private final HugeGraphParams graph; public Schema(HugeGraphParams graph) { this.graph = graph; @@ -261,17 +260,20 @@ public void initSchemaIfNeeded() { return; } - HugeGraph graph = this.graph.graph(); - String[] properties = this.initProperties(); - - // Create vertex label '~server' - VertexLabel label = graph.schema().vertexLabel(SERVER) - .properties(properties) - .useCustomizeStringId() - .nullableKeys(P.ROLE, P.MAX_LOAD, - P.LOAD, P.UPDATE_TIME) - .enableLabelIndex(true) - .build(); + VertexLabel label; + try (HugeGraph graph = this.graph.graph()) { + String[] properties = this.initProperties(); + + // Create vertex label '~server' + label = graph.schema().vertexLabel(SERVER) + .properties(properties) + .useCustomizeStringId() + .nullableKeys(P.ROLE, P.MAX_LOAD, P.LOAD, P.UPDATE_TIME) + .enableLabelIndex(true) + .build(); + } catch (Exception e) { + throw new HugeException("Failed to init schema for '%s'", e, SERVER); + } this.graph.schemaTransaction().addVertexLabel(label); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java index 3bcd48ac4c..bcef869017 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java @@ -67,8 +67,7 @@ public class ServerInfoManager { private volatile boolean onlySingleNode; private volatile boolean closed; - public ServerInfoManager(HugeGraphParams graph, - ExecutorService dbExecutor) { + public ServerInfoManager(HugeGraphParams graph, ExecutorService dbExecutor) { E.checkNotNull(graph, "graph"); E.checkNotNull(dbExecutor, "db executor"); @@ -116,13 +115,11 @@ public synchronized void initServerInfo(GlobalMasterInfo nodeInfo) { try { Thread.sleep(existed.expireTime() - now + 1); } catch (InterruptedException e) { - throw new HugeException("Interrupted when waiting for server " + - "info expired", e); + throw new HugeException("Interrupted when waiting for server info expired", e); } } E.checkArgument(existed == null || !existed.alive(), - "The server with name '%s' already in cluster", - serverId); + "The server with name '%s' already in cluster", serverId); if (nodeInfo.nodeRole().master()) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; @@ -198,13 +195,12 @@ public synchronized void heartbeat() { /* ServerInfo is missing */ if (this.selfNodeId() == null) { // Ignore if ServerInfo is not initialized - LOG.info("ServerInfo is missing: {}, may not be initialized yet"); + LOG.info("ServerInfo is missing: {}, may not be initialized yet", this.selfNodeId()); return; } if (this.selfIsMaster()) { - // On master node, just wait for ServerInfo re-init - LOG.warn("ServerInfo is missing: {}, may be cleared before", - this.selfNodeId()); + // On the master node, just wait for ServerInfo re-init + LOG.warn("ServerInfo is missing: {}, may be cleared before", this.selfNodeId()); return; } /* @@ -245,12 +241,10 @@ protected synchronized HugeServerInfo pickWorkerNode(Collection if (!server.alive()) { continue; } - if (server.role().master()) { master = server; continue; } - hasWorkerNode = true; if (!server.suitableFor(task, now)) { continue; @@ -267,13 +261,12 @@ protected synchronized HugeServerInfo pickWorkerNode(Collection this.onlySingleNode = singleNode; } - // Only schedule to master if there is no workers and master is suitable + // Only schedule to master if there are no workers and master are suitable if (!hasWorkerNode) { if (master != null && master.suitableFor(task, now)) { serverWithMinLoad = master; } } - return serverWithMinLoad; } @@ -299,8 +292,7 @@ private Id save(HugeServerInfo serverInfo) { throw new HugeException("Schema is missing for %s '%s'", HugeServerInfo.P.SERVER, serverInfo); } - HugeVertex vertex = this.tx().constructVertex(false, - serverInfo.asArray()); + HugeVertex vertex = this.tx().constructVertex(false, serverInfo.asArray()); // Add or update server info in backend store vertex = this.tx().addVertex(vertex); return vertex.id(); @@ -314,8 +306,7 @@ private int save(Collection serverInfos) { } HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph); if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) { - throw new HugeException("Schema is missing for %s", - HugeServerInfo.P.SERVER); + throw new HugeException("Schema is missing for %s", HugeServerInfo.P.SERVER); } // Save server info in batch GraphTransaction tx = this.tx(); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java index 8afe11dff2..1395888611 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java @@ -120,7 +120,7 @@ private TaskTransaction tx() { if (this.taskTx == null) { BackendStore store = this.graph.loadSystemStore(); TaskTransaction tx = new TaskTransaction(this.graph, store); - assert this.taskTx == null; // may be reentrant? + assert this.taskTx == null; // maybe reentrant? this.taskTx = tx; } } @@ -196,7 +196,7 @@ public Future schedule(HugeTask task) { if (this.serverManager().onlySingleNode() && !task.computer()) { /* - * Speed up for single node, submit task immediately, + * Speed up for single node, submit the task immediately, * this code can be removed without affecting code logic */ task.status(TaskStatus.QUEUED); @@ -205,7 +205,7 @@ public Future schedule(HugeTask task) { return this.submitTask(task); } else { /* - * Just set SCHEDULING status and save task, + * Just set the SCHEDULING status and save the task, * it will be scheduled by periodic scheduler worker */ task.status(TaskStatus.SCHEDULING); @@ -276,11 +276,11 @@ public synchronized void cancel(HugeTask task) { assert this.serverManager().selfIsMaster(); if (!task.server().equals(this.serverManager().selfNodeId())) { /* - * Remove task from memory if it's running on worker node, - * but keep task in memory if it's running on master node. - * cancel-scheduling will read task from backend store, if + * Remove the task from memory if it's running on worker node, + * but keep the task in memory if it's running on master node. + * Cancel-scheduling will read the task from backend store, if * removed this instance from memory, there will be two task - * instances with same id, and can't cancel the real task that + * instances with the same id, and can't cancel the real task that * is running but removed from memory. */ this.remove(task); @@ -301,12 +301,10 @@ public ServerInfoManager serverManager() { protected synchronized void scheduleTasksOnMaster() { // Master server schedule all scheduling tasks to suitable worker nodes - Collection serverInfos = this.serverManager() - .allServerInfos(); + Collection serverInfos = this.serverManager().allServerInfos(); String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { - Iterator> tasks = this.tasks(TaskStatus.SCHEDULING, - PAGE_SIZE, page); + Iterator> tasks = this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page); while (tasks.hasNext()) { HugeTask task = tasks.next(); if (task.server() != null) { @@ -318,12 +316,10 @@ protected synchronized void scheduleTasksOnMaster() { return; } - HugeServerInfo server = this.serverManager().pickWorkerNode( - serverInfos, task); + HugeServerInfo server = this.serverManager().pickWorkerNode(serverInfos, task); if (server == null) { LOG.info("The master can't find suitable servers to " + - "execute task '{}', wait for next schedule", - task.id()); + "execute task '{}', wait for next schedule", task.id()); continue; } @@ -336,8 +332,7 @@ protected synchronized void scheduleTasksOnMaster() { // Update server load in memory, it will be saved at the ending server.increaseLoad(task.load()); - LOG.info("Scheduled task '{}' to server '{}'", - task.id(), server.id()); + LOG.info("Scheduled task '{}' to server '{}'", task.id(), server.id()); } if (page != null) { page = PageInfo.pageInfo(tasks); @@ -351,8 +346,7 @@ protected synchronized void scheduleTasksOnMaster() { protected void executeTasksOnWorker(Id server) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { - Iterator> tasks = this.tasks(TaskStatus.SCHEDULED, - PAGE_SIZE, page); + Iterator> tasks = this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page); while (tasks.hasNext()) { HugeTask task = tasks.next(); this.initTaskCallable(task); @@ -381,8 +375,7 @@ protected void executeTasksOnWorker(Id server) { protected void cancelTasksOnWorker(Id server) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { - Iterator> tasks = this.tasks(TaskStatus.CANCELLING, - PAGE_SIZE, page); + Iterator> tasks = this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page); while (tasks.hasNext()) { HugeTask task = tasks.next(); Id taskServer = task.server(); @@ -557,10 +550,10 @@ public HugeTask delete(Id id, boolean force) { HugeTask task = this.task(id); /* - * The following is out of date when task running on worker node: + * The following is out of date when the task running on worker node: * HugeTask task = this.tasks.get(id); * Tasks are removed from memory after completed at most time, - * but there is a tiny gap between tasks are completed and + * but there is a tiny gap between tasks is completed and * removed from memory. * We assume tasks only in memory may be incomplete status, * in fact, it is also possible to appear on the backend tasks @@ -621,7 +614,7 @@ private HugeTask waitUntilTaskCompleted(Id id, long seconds, throw e; } if (task.completed()) { - // Wait for task result being set after status is completed + // Wait for the task result being set after the status is completed sleep(intervalMs); return task; }