Skip to content

Commit

Permalink
refactor(server): optimize the server-node info (#2671)
Browse files Browse the repository at this point in the history
Co-authored-by: imbajin <[email protected]>
  • Loading branch information
zyxxoo and imbajin authored Oct 10, 2024
1 parent f6f3708 commit 861a100
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,7 @@ private void initRoleStateMachine(Id serverId) {
conf.get(
RoleElectionOptions.BASE_TIMEOUT_MILLISECOND));
ClusterRoleStore roleStore = new StandardClusterRoleStore(this.params);
this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleConfig,
roleStore);
this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleConfig, roleStore);
}

@Override
Expand Down Expand Up @@ -1007,7 +1006,7 @@ public void create(String configPath, GlobalMasterInfo nodeInfo) {
this.initBackend();
this.serverStarted(nodeInfo);

// Write config to disk file
// Write config to the disk file
String confPath = ConfigUtil.writeToFile(configPath, this.name(),
this.configuration());
this.configuration.file(confPath);
Expand Down Expand Up @@ -1349,7 +1348,7 @@ public String schedulerType() {

private class TinkerPopTransaction extends AbstractThreadLocalTransaction {

// Times opened from upper layer
// Times opened from the upper layer
private final AtomicInteger refs;
// Flag opened of each thread
private final ThreadLocal<Boolean> opened;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -47,14 +46,13 @@
public class HugeServerInfo {

// Unit millisecond
private static final long EXPIRED_INTERVAL =
TaskManager.SCHEDULE_PERIOD * 10;
private static final long EXPIRED_INTERVAL = TaskManager.SCHEDULE_PERIOD * 10;

private Id id;
private NodeRole role;
private Date updateTime;
private int maxLoad;
private int load;
private Date updateTime;
private final Id id;

private transient boolean updated = false;

Expand Down Expand Up @@ -114,6 +112,10 @@ public void increaseLoad(int delta) {
this.updated = true;
}

public long expireTime() {
return this.updateTime.getTime() + EXPIRED_INTERVAL;
}

public Date updateTime() {
return this.updateTime;
}
Expand Down Expand Up @@ -200,8 +202,7 @@ public Map<String, Object> asMap() {

public static HugeServerInfo fromVertex(Vertex vertex) {
HugeServerInfo serverInfo = new HugeServerInfo((Id) vertex.id());
for (Iterator<VertexProperty<Object>> iter = vertex.properties();
iter.hasNext(); ) {
for (var iter = vertex.properties(); iter.hasNext(); ) {
VertexProperty<Object> prop = iter.next();
serverInfo.property(prop.key(), prop.value());
}
Expand Down Expand Up @@ -246,7 +247,7 @@ public static final class Schema {

public static final String SERVER = P.SERVER;

protected final HugeGraphParams graph;
private final HugeGraphParams graph;

public Schema(HugeGraphParams graph) {
this.graph = graph;
Expand All @@ -264,16 +265,14 @@ public void initSchemaIfNeeded() {
VertexLabel label = graph.schema().vertexLabel(SERVER)
.properties(properties)
.useCustomizeStringId()
.nullableKeys(P.ROLE, P.MAX_LOAD,
P.LOAD, P.UPDATE_TIME)
.nullableKeys(P.ROLE, P.MAX_LOAD, P.LOAD, P.UPDATE_TIME)
.enableLabelIndex(true)
.build();
this.graph.schemaTransaction().addVertexLabel(label);
}

private String[] initProperties() {
List<String> props = new ArrayList<>();

props.add(createPropertyKey(P.ROLE, DataType.BYTE));
props.add(createPropertyKey(P.MAX_LOAD, DataType.INT));
props.add(createPropertyKey(P.LOAD, DataType.INT));
Expand All @@ -283,8 +282,7 @@ private String[] initProperties() {
}

public boolean existVertexLabel(String label) {
return this.graph.schemaTransaction()
.getVertexLabel(label) != null;
return this.graph.schemaTransaction().getVertexLabel(label) != null;
}

@SuppressWarnings("unused")
Expand All @@ -296,8 +294,7 @@ private String createPropertyKey(String name, DataType dataType) {
return this.createPropertyKey(name, dataType, Cardinality.SINGLE);
}

private String createPropertyKey(String name, DataType dataType,
Cardinality cardinality) {
private String createPropertyKey(String name, DataType dataType, Cardinality cardinality) {
SchemaManager schema = this.graph.graph().schema();
PropertyKey propertyKey = schema.propertyKey(name)
.dataType(dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public class ServerInfoManager {
private volatile boolean onlySingleNode;
private volatile boolean closed;

public ServerInfoManager(HugeGraphParams graph,
ExecutorService dbExecutor) {
public ServerInfoManager(HugeGraphParams graph, ExecutorService dbExecutor) {
E.checkNotNull(graph, "graph");
E.checkNotNull(dbExecutor, "db executor");

Expand Down Expand Up @@ -107,9 +106,20 @@ public synchronized void initServerInfo(GlobalMasterInfo nodeInfo) {

Id serverId = nodeInfo.nodeId();
HugeServerInfo existed = this.serverInfo(serverId);
if (existed != null && existed.alive()) {
final long now = DateUtil.now().getTime();
if (existed.expireTime() > now + 30 * 1000) {
LOG.info("The node time maybe skew very much: {}", existed);
throw new HugeException("The server with name '%s' maybe skew very much", serverId);
}
try {
Thread.sleep(existed.expireTime() - now + 1);
} catch (InterruptedException e) {
throw new HugeException("Interrupted when waiting for server info expired", e);
}
}
E.checkArgument(existed == null || !existed.alive(),
"The server with name '%s' already in cluster",
serverId);
"The server with name '%s' already in cluster", serverId);

if (nodeInfo.nodeRole().master()) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
Expand Down Expand Up @@ -185,13 +195,12 @@ public synchronized void heartbeat() {
/* ServerInfo is missing */
if (this.selfNodeId() == null) {
// Ignore if ServerInfo is not initialized
LOG.info("ServerInfo is missing: {}, may not be initialized yet");
LOG.info("ServerInfo is missing: {}, may not be initialized yet", this.selfNodeId());
return;
}
if (this.selfIsMaster()) {
// On master node, just wait for ServerInfo re-init
LOG.warn("ServerInfo is missing: {}, may be cleared before",
this.selfNodeId());
// On the master node, just wait for ServerInfo re-init
LOG.warn("ServerInfo is missing: {}, may be cleared before", this.selfNodeId());
return;
}
/*
Expand Down Expand Up @@ -232,12 +241,10 @@ protected synchronized HugeServerInfo pickWorkerNode(Collection<HugeServerInfo>
if (!server.alive()) {
continue;
}

if (server.role().master()) {
master = server;
continue;
}

hasWorkerNode = true;
if (!server.suitableFor(task, now)) {
continue;
Expand All @@ -254,13 +261,12 @@ protected synchronized HugeServerInfo pickWorkerNode(Collection<HugeServerInfo>
this.onlySingleNode = singleNode;
}

// Only schedule to master if there is no workers and master is suitable
// Only schedule to master if there are no workers and master are suitable
if (!hasWorkerNode) {
if (master != null && master.suitableFor(task, now)) {
serverWithMinLoad = master;
}
}

return serverWithMinLoad;
}

Expand All @@ -286,8 +292,7 @@ private Id save(HugeServerInfo serverInfo) {
throw new HugeException("Schema is missing for %s '%s'",
HugeServerInfo.P.SERVER, serverInfo);
}
HugeVertex vertex = this.tx().constructVertex(false,
serverInfo.asArray());
HugeVertex vertex = this.tx().constructVertex(false, serverInfo.asArray());
// Add or update server info in backend store
vertex = this.tx().addVertex(vertex);
return vertex.id();
Expand All @@ -301,8 +306,7 @@ private int save(Collection<HugeServerInfo> serverInfos) {
}
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
throw new HugeException("Schema is missing for %s",
HugeServerInfo.P.SERVER);
throw new HugeException("Schema is missing for %s", HugeServerInfo.P.SERVER);
}
// Save server info in batch
GraphTransaction tx = this.tx();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private TaskTransaction tx() {
if (this.taskTx == null) {
BackendStore store = this.graph.loadSystemStore();
TaskTransaction tx = new TaskTransaction(this.graph, store);
assert this.taskTx == null; // may be reentrant?
assert this.taskTx == null; // maybe reentrant?
this.taskTx = tx;
}
}
Expand Down Expand Up @@ -196,7 +196,7 @@ public <V> Future<?> schedule(HugeTask<V> task) {

if (this.serverManager().onlySingleNode() && !task.computer()) {
/*
* Speed up for single node, submit task immediately,
* Speed up for single node, submit the task immediately,
* this code can be removed without affecting code logic
*/
task.status(TaskStatus.QUEUED);
Expand All @@ -205,7 +205,7 @@ public <V> Future<?> schedule(HugeTask<V> task) {
return this.submitTask(task);
} else {
/*
* Just set SCHEDULING status and save task,
* Just set the SCHEDULING status and save the task,
* it will be scheduled by periodic scheduler worker
*/
task.status(TaskStatus.SCHEDULING);
Expand Down Expand Up @@ -276,11 +276,11 @@ public synchronized <V> void cancel(HugeTask<V> task) {
assert this.serverManager().selfIsMaster();
if (!task.server().equals(this.serverManager().selfNodeId())) {
/*
* Remove task from memory if it's running on worker node,
* but keep task in memory if it's running on master node.
* cancel-scheduling will read task from backend store, if
* Remove the task from memory if it's running on worker node,
* but keep the task in memory if it's running on master node.
* Cancel-scheduling will read the task from backend store, if
* removed this instance from memory, there will be two task
* instances with same id, and can't cancel the real task that
* instances with the same id, and can't cancel the real task that
* is running but removed from memory.
*/
this.remove(task);
Expand All @@ -301,12 +301,10 @@ public ServerInfoManager serverManager() {

protected synchronized void scheduleTasksOnMaster() {
// Master server schedule all scheduling tasks to suitable worker nodes
Collection<HugeServerInfo> serverInfos = this.serverManager()
.allServerInfos();
Collection<HugeServerInfo> serverInfos = this.serverManager().allServerInfos();
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULING,
PAGE_SIZE, page);
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
if (task.server() != null) {
Expand All @@ -318,12 +316,10 @@ protected synchronized void scheduleTasksOnMaster() {
return;
}

HugeServerInfo server = this.serverManager().pickWorkerNode(
serverInfos, task);
HugeServerInfo server = this.serverManager().pickWorkerNode(serverInfos, task);
if (server == null) {
LOG.info("The master can't find suitable servers to " +
"execute task '{}', wait for next schedule",
task.id());
"execute task '{}', wait for next schedule", task.id());
continue;
}

Expand All @@ -336,8 +332,7 @@ protected synchronized void scheduleTasksOnMaster() {
// Update server load in memory, it will be saved at the ending
server.increaseLoad(task.load());

LOG.info("Scheduled task '{}' to server '{}'",
task.id(), server.id());
LOG.info("Scheduled task '{}' to server '{}'", task.id(), server.id());
}
if (page != null) {
page = PageInfo.pageInfo(tasks);
Expand All @@ -351,8 +346,7 @@ protected synchronized void scheduleTasksOnMaster() {
protected void executeTasksOnWorker(Id server) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULED,
PAGE_SIZE, page);
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
this.initTaskCallable(task);
Expand Down Expand Up @@ -381,8 +375,7 @@ protected void executeTasksOnWorker(Id server) {
protected void cancelTasksOnWorker(Id server) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.CANCELLING,
PAGE_SIZE, page);
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
Id taskServer = task.server();
Expand Down Expand Up @@ -557,10 +550,10 @@ public <V> HugeTask<V> delete(Id id, boolean force) {

HugeTask<?> task = this.task(id);
/*
* The following is out of date when task running on worker node:
* The following is out of date when the task running on worker node:
* HugeTask<?> task = this.tasks.get(id);
* Tasks are removed from memory after completed at most time,
* but there is a tiny gap between tasks are completed and
* but there is a tiny gap between tasks is completed and
* removed from memory.
* We assume tasks only in memory may be incomplete status,
* in fact, it is also possible to appear on the backend tasks
Expand Down Expand Up @@ -621,7 +614,7 @@ private <V> HugeTask<V> waitUntilTaskCompleted(Id id, long seconds,
throw e;
}
if (task.completed()) {
// Wait for task result being set after status is completed
// Wait for the task result being set after the status is completed
sleep(intervalMs);
return task;
}
Expand Down

0 comments on commit 861a100

Please sign in to comment.