Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Apr 7, 2024
1 parent 301e99b commit ea2b9e4
Showing 1 changed file with 162 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public final class TaskManager {
"server-info-db-worker-%d";
public static final String TASK_SCHEDULER = "task-scheduler-%d";

public static final String OLAP_TASK_WORKER = "olap-task-worker-%d";
public static final String SCHEMA_TASK_WORKER = "schema-task-worker-%d";
public static final String EPHEMERAL_TASK_WORKER = "ephemeral-task-worker-%d";
public static final String DISTRIBUTED_TASK_SCHEDULER = "distributed-scheduler-%d";

protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
private static final long TX_CLOSE_TIMEOUT = 30L; // unit s
private static final int THREADS = 4;
Expand All @@ -60,6 +65,11 @@ public final class TaskManager {
private final ExecutorService serverInfoDbExecutor;
private final PausableScheduledThreadPool schedulerExecutor;

private final ExecutorService schemaTaskExecutor;
private final ExecutorService olapTaskExecutor;
private final ExecutorService ephemeralTaskExecutor;
private final PausableScheduledThreadPool distributedSchedulerExecutor;

private boolean enableRoleElected = false;

public static TaskManager instance() {
Expand All @@ -76,6 +86,17 @@ private TaskManager(int pool) {
1, TASK_DB_WORKER);
this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
1, SERVER_INFO_DB_WORKER);

this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
SCHEMA_TASK_WORKER);
this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
OLAP_TASK_WORKER);
this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
EPHEMERAL_TASK_WORKER);
this.distributedSchedulerExecutor =
ExecutorUtil.newPausableScheduledThreadPool(1,
DISTRIBUTED_TASK_SCHEDULER);

// For schedule task to run, just one thread is ok
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
1, TASK_SCHEDULER);
Expand All @@ -88,11 +109,36 @@ private TaskManager(int pool) {

public void addScheduler(HugeGraphParams graph) {
E.checkArgumentNotNull(graph, "The graph can't be null");

TaskScheduler scheduler = new StandardTaskScheduler(graph,
this.taskExecutor, this.taskDbExecutor,
this.serverInfoDbExecutor);
this.schedulers.put(graph, scheduler);
LOG.info("Use {} as the scheduler of graph ({})",
graph.schedulerType(), graph.name());
// TODO: 如当前服务绑定到指定的非 DEFAULT 图空间,非当前图空间的图不再创建任务调度器 (graph space)
switch (graph.schedulerType()) {
case "distributed": {
TaskScheduler scheduler =
new DistributedTaskScheduler(
graph,
distributedSchedulerExecutor,
taskDbExecutor,
schemaTaskExecutor,
olapTaskExecutor,
taskExecutor, /* gremlinTaskExecutor */
ephemeralTaskExecutor,
serverInfoDbExecutor);
this.schedulers.put(graph, scheduler);
break;
}
case "local":
default: {
TaskScheduler scheduler =
new StandardTaskScheduler(
graph,
this.taskExecutor,
this.taskDbExecutor,
this.serverInfoDbExecutor);
this.schedulers.put(graph, scheduler);
break;
}
}
}

public void closeScheduler(HugeGraphParams graph) {
Expand Down Expand Up @@ -123,6 +169,10 @@ public void closeScheduler(HugeGraphParams graph) {
if (!this.schedulerExecutor.isTerminated()) {
this.closeSchedulerTx(graph);
}

if (!this.distributedSchedulerExecutor.isTerminated()) {
this.closeDistributedSchedulerTx(graph);
}
}

private void closeTaskTx(HugeGraphParams graph) {
Expand Down Expand Up @@ -157,6 +207,21 @@ private void closeSchedulerTx(HugeGraphParams graph) {
}
}

private void closeDistributedSchedulerTx(HugeGraphParams graph) {
final Callable<Void> closeTx = () -> {
// Do close-tx for current thread
graph.closeTx();
// Let other threads run
Thread.yield();
return null;
};
try {
this.distributedSchedulerExecutor.submit(closeTx).get();
} catch (Exception e) {
throw new HugeException("Exception when closing scheduler tx", e);
}
}

public void pauseScheduledThreadPool() {
this.schedulerExecutor.pauseSchedule();
}
Expand All @@ -170,8 +235,7 @@ public TaskScheduler getScheduler(HugeGraphParams graph) {
}

public ServerInfoManager getServerInfoManager(HugeGraphParams graph) {
StandardTaskScheduler scheduler = (StandardTaskScheduler)
this.getScheduler(graph);
TaskScheduler scheduler = this.getScheduler(graph);
if (scheduler == null) {
return null;
}
Expand All @@ -195,10 +259,21 @@ public void shutdown(long timeout) {
}
}

if (terminated && !this.distributedSchedulerExecutor.isShutdown()) {
this.distributedSchedulerExecutor.shutdown();
try {
terminated = this.distributedSchedulerExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
}

if (terminated && !this.taskExecutor.isShutdown()) {
this.taskExecutor.shutdown();
try {
terminated = this.taskExecutor.awaitTermination(timeout, unit);
terminated = this.taskExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
Expand All @@ -217,7 +292,38 @@ public void shutdown(long timeout) {
if (terminated && !this.taskDbExecutor.isShutdown()) {
this.taskDbExecutor.shutdown();
try {
terminated = this.taskDbExecutor.awaitTermination(timeout, unit);
terminated = this.taskDbExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
}

if (terminated && !this.ephemeralTaskExecutor.isShutdown()) {
this.ephemeralTaskExecutor.shutdown();
try {
terminated = this.ephemeralTaskExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
}

if (terminated && !this.schemaTaskExecutor.isShutdown()) {
this.schemaTaskExecutor.shutdown();
try {
terminated = this.schemaTaskExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
}

if (terminated && !this.olapTaskExecutor.isShutdown()) {
this.olapTaskExecutor.shutdown();
try {
terminated = this.olapTaskExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
Expand Down Expand Up @@ -292,7 +398,7 @@ private void scheduleOrExecuteJob() {
// Called by scheduler timer
try {
for (TaskScheduler entry : this.schedulers.values()) {
StandardTaskScheduler scheduler = (StandardTaskScheduler) entry;
TaskScheduler scheduler = entry;
// Maybe other thread close&remove scheduler at the same time
synchronized (scheduler) {
this.scheduleOrExecuteJobForGraph(scheduler);
Expand All @@ -303,56 +409,59 @@ private void scheduleOrExecuteJob() {
}
}

private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler) {
private void scheduleOrExecuteJobForGraph(TaskScheduler scheduler) {
E.checkNotNull(scheduler, "scheduler");

ServerInfoManager serverManager = scheduler.serverManager();
String graph = scheduler.graphName();

LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
try {
/*
* Skip if:
* graph is closed (iterate schedulers before graph is closing)
* or
* graph is not initialized(maybe truncated or cleared).
*
* If graph is closing by other thread, current thread get
* serverManager and try lock graph, at the same time other
* thread deleted the lock-group, current thread would get
* exception 'LockGroup xx does not exists'.
* If graph is closed, don't call serverManager.initialized()
* due to it will reopen graph tx.
*/
if (!serverManager.graphIsReady()) {
return;
}

// Update server heartbeat
serverManager.heartbeat();
if (scheduler instanceof StandardTaskScheduler) {
StandardTaskScheduler standardTaskScheduler = (StandardTaskScheduler) (scheduler);
ServerInfoManager serverManager = scheduler.serverManager();
String graph = scheduler.graphName();

/*
* Master will schedule tasks to suitable servers.
* Note a Worker may become to a Master, so elected-Master also needs to
* execute tasks assigned by previous Master when enableRoleElected=true.
* However, when enableRoleElected=false, a Master is only set by the
* config assignment, assigned-Master always stays the same state.
*/
if (serverManager.selfIsMaster()) {
scheduler.scheduleTasksOnMaster();
if (!this.enableRoleElected && !serverManager.onlySingleNode()) {
// assigned-Master + non-single-node don't need to execute tasks
LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
try {
/*
* Skip if:
* graph is closed (iterate schedulers before graph is closing)
* or
* graph is not initialized(maybe truncated or cleared).
*
* If graph is closing by other thread, current thread get
* serverManager and try lock graph, at the same time other
* thread deleted the lock-group, current thread would get
* exception 'LockGroup xx does not exists'.
* If graph is closed, don't call serverManager.initialized()
* due to it will reopen graph tx.
*/
if (!serverManager.graphIsReady()) {
return;
}
}

// Execute queued tasks scheduled to current server
scheduler.executeTasksOnWorker(serverManager.selfNodeId());
// Update server heartbeat
serverManager.heartbeat();

/*
* Master will schedule tasks to suitable servers.
* Note a Worker may become to a Master, so elected-Master also needs to
* execute tasks assigned by previous Master when enableRoleElected=true.
* However, when enableRoleElected=false, a Master is only set by the
* config assignment, assigned-Master always stays the same state.
*/
if (serverManager.selfIsMaster()) {
standardTaskScheduler.scheduleTasksOnMaster();
if (!this.enableRoleElected && !serverManager.onlySingleNode()) {
// assigned-Master + non-single-node don't need to execute tasks
return;
}
}

// Cancel tasks scheduled to current server
scheduler.cancelTasksOnWorker(serverManager.selfNodeId());
} finally {
LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
// Execute queued tasks scheduled to current server
standardTaskScheduler.executeTasksOnWorker(serverManager.selfNodeId());

// Cancel tasks scheduled to current server
standardTaskScheduler.cancelTasksOnWorker(serverManager.selfNodeId());
} finally {
LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
}
}
}

Expand Down

0 comments on commit ea2b9e4

Please sign in to comment.