Skip to content

Commit

Permalink
chore: sync adapted codes to internal 4.0 version
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Feb 3, 2024
1 parent ec3779c commit d10a68c
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,13 +293,9 @@ protected <T extends SchemaElement> T getSchema(HugeType type,
if (value == null) {
value = super.getSchema(type, name);
if (value != null) {
this.resetCachedAllIfReachedCapacity();

this.nameCache.update(prefixedName, value);

SchemaElement schema = (SchemaElement) value;
Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);
// Note: reload all schema if the cache is inconsistent with storage layer
this.clearCache(false);
this.loadAllSchema();
}
}
return (T) value;
Expand Down Expand Up @@ -338,6 +334,13 @@ protected <T extends SchemaElement> List<T> getAllSchema(HugeType type) {
}
}

private void loadAllSchema() {
getAllSchema(HugeType.PROPERTY_KEY);
getAllSchema(HugeType.VERTEX_LABEL);
getAllSchema(HugeType.EDGE_LABEL);
getAllSchema(HugeType.INDEX_LABEL);
}

@Override
public void clear() {
// Clear schema info firstly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,14 @@ public static synchronized CoreOptions instance() {
0
);

public static final ConfigOption<Long> TASK_SCHEDULE_PERIOD =
new ConfigOption<>(
"task.schedule_period",
"Period time when scheduler to schedule task",
rangeInt(0L, Long.MAX_VALUE),
10L
);

public static final ConfigOption<Long> TASK_WAIT_TIMEOUT =
new ConfigOption<>(
"task.wait_timeout",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public PdDistributedLock(KvClient<?> client) {
public LockResult lock(String key, long second) {
long ttl = second * 1000L;
try {
LockResponse response = this.client.lock(key, ttl);
LockResponse response = this.client.lockWithoutReentrant(key, ttl);
boolean succeed = response.getSucceed();
LockResult result = new LockResult();
if (succeed) {
Expand All @@ -56,7 +56,7 @@ public LockResult lock(String key, long second) {
synchronized (result) {
keepAlive(key);
}
}, 10, period, TimeUnit.MILLISECONDS);
}, period, period, TimeUnit.MILLISECONDS);
result.setFuture(future);
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.hugegraph.task;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -30,16 +29,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;

import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.page.PageInfo;
import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.concurrent.LockGroup;
import org.apache.hugegraph.concurrent.LockManager;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.exception.ConnectionException;
import org.apache.hugegraph.exception.NotFoundException;
Expand All @@ -53,7 +48,7 @@
import org.slf4j.Logger;

public class DistributedTaskScheduler extends TaskAndResultScheduler {
protected static final int SCHEDULE_PERIOD = 10;
private final long schedulePeriod;
private static final Logger LOG = Log.logger(DistributedTaskScheduler.class);
private final ExecutorService taskDbExecutor;
private final ExecutorService schemaTaskExecutor;
Expand All @@ -63,8 +58,6 @@ public class DistributedTaskScheduler extends TaskAndResultScheduler {
private final ScheduledThreadPoolExecutor schedulerExecutor;
private final ScheduledFuture<?> cronFuture;

private final String lockGroupName;

/**
* the status of scheduler
*/
Expand All @@ -90,11 +83,11 @@ public DistributedTaskScheduler(HugeGraphParams graph,

this.schedulerExecutor = schedulerExecutor;

lockGroupName = String.format("%s_%s_distributed", graphSpace, graph);
LockManager.instance().create(lockGroupName);

this.closed.set(false);

this.schedulePeriod = this.graph.configuration()
.get(CoreOptions.TASK_SCHEDULE_PERIOD);

this.cronFuture = this.schedulerExecutor.scheduleWithFixedDelay(
() -> {
// TODO: uncomment later - graph space
Expand All @@ -105,14 +98,15 @@ public DistributedTaskScheduler(HugeGraphParams graph,
// TaskManager.useAdmin();
this.cronSchedule();
} catch (Throwable t) {
LOG.info("cronScheduler exception ", t);
// TODO: log with graph space
LOG.info("cronScheduler exception graph: {}", this.graphName(), t);
} finally {
// TODO: uncomment later - graph space
LockUtil.unlock("", LockUtil.GRAPH_LOCK);
// LockUtil.unlock(this.graph().spaceGraphName(), LockUtil.GRAPH_LOCK);
}
},
10L, SCHEDULE_PERIOD,
10L, schedulePeriod,
TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -158,9 +152,14 @@ public void cronSchedule() {
LOG.info("Try to update task({})@({}/{}) status" +
"(RUNNING->FAILED)", running.id(), this.graphSpace,
this.graphName);
updateStatusWithLock(running.id(), TaskStatus.RUNNING,
TaskStatus.FAILED);
runningTasks.remove(running.id());
if (updateStatusWithLock(running.id(), TaskStatus.RUNNING,
TaskStatus.FAILED)) {
runningTasks.remove(running.id());
} else {
LOG.warn("Update task({})@({}/{}) status" +
"(RUNNING->FAILED) failed",
running.id(), this.graphSpace, this.graphName);
}
}
}

Expand Down Expand Up @@ -484,7 +483,7 @@ protected boolean updateStatus(Id id, TaskStatus prestatus,

return true;
} else {
LOG.info("Update task({}) status conflict: current({}), " +
LOG.warn("Update task({}) status conflict: current({}), " +
"pre({}), status({})", id, task.status(),
prestatus, status);
return false;
Expand Down Expand Up @@ -542,7 +541,7 @@ private boolean tryStartHugeTask(HugeTask<?> task) {
if (executor.getActiveCount() < executor.getMaximumPoolSize()) {
TaskRunner<?> runner = new TaskRunner<>(task);
chosenExecutor.submit(runner);
LOG.info("Start task({})@({}/{})", task.id(),
LOG.info("Submit task({})@({}/{})", task.id(),
this.graphSpace, this.graphName);

return true;
Expand Down Expand Up @@ -570,23 +569,12 @@ private LockResult tryLockTask(String taskId) {

LockResult lockResult = new LockResult();

LockGroup lockGroup = LockManager.instance().get(lockGroupName);
Lock localLock = lockGroup.lock(taskId);

if (!localLock.tryLock()) {
return new LockResult();
}

try {
lockResult =
MetaManager.instance().tryLockTask(graphSpace, graphName,
taskId);
} catch (Throwable t) {
LOG.info(String.format("try to lock task(%s) error", taskId), t);
}

if (!lockResult.lockSuccess()) {
localLock.unlock();
LOG.warn(String.format("try to lock task(%s) error", taskId), t);
}

return lockResult;
Expand All @@ -598,14 +586,9 @@ private void unlockTask(String taskId, LockResult lockResult) {
MetaManager.instance().unlockTask(graphSpace, graphName, taskId,
lockResult);
} catch (Throwable t) {
LOG.info(String.format("try to unlock task(%s) error",
LOG.warn(String.format("try to unlock task(%s) error",
taskId), t);
}

LockGroup lockGroup = LockManager.instance().get(lockGroupName);
Lock localLock = lockGroup.lock(taskId);

localLock.unlock();
}

private boolean isLockedTask(String taskId) {
Expand All @@ -626,7 +609,9 @@ public void run() {
LockResult lockResult = tryLockTask(task.id().asString());

initTaskParams(task);
if (lockResult.lockSuccess()) {
if (lockResult.lockSuccess() && !task.completed()) {

LOG.info("Start task({})", task.id());

TaskManager.setContext(task.context());
try {
Expand All @@ -644,7 +629,7 @@ public void run() {
// 任务执行不会抛出异常,HugeTask 在执行过程中,会捕获异常,并存储到 DB 中
task.run();
} catch (Throwable t) {
LOG.info("exception when execute task", t);
LOG.warn("exception when execute task", t);
} finally {
runningTasks.remove(task.id());
unlockTask(task.id().asString(), lockResult);
Expand Down

0 comments on commit d10a68c

Please sign in to comment.