Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact(core): optimized batch removal of remaining indices consumed by a single consumer #2203

Merged
merged 12 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hugegraph.backend.store.ram.RamTable;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.task.ServerInfoManager;
import org.apache.hugegraph.type.define.GraphMode;
import org.apache.hugegraph.type.define.GraphReadMode;
Expand Down Expand Up @@ -90,4 +91,6 @@ public interface HugeGraphParams {
RateLimiter readRateLimiter();

RamTable ramtable();

<T> void submitEphemeralJob(EphemeralJob<T> job);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hugegraph.backend.store.raft.RaftBackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.RaftGroupManager;
import org.apache.hugegraph.backend.store.ram.RamTable;
import org.apache.hugegraph.task.EphemeralJobQueue;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.config.CoreOptions;
Expand All @@ -60,6 +61,7 @@
import org.apache.hugegraph.event.EventListener;
import org.apache.hugegraph.exception.NotAllowException;
import org.apache.hugegraph.io.HugeGraphIoRegistry;
import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.masterelection.ClusterRoleStore;
import org.apache.hugegraph.masterelection.Config;
import org.apache.hugegraph.masterelection.RoleElectionConfig;
Expand Down Expand Up @@ -1163,6 +1165,7 @@ private void waitUntilAllTasksCompleted() {
private class StandardHugeGraphParams implements HugeGraphParams {

private HugeGraph graph = StandardHugeGraph.this;
private final EphemeralJobQueue ephemeralJobQueue = new EphemeralJobQueue(this);

private void graph(HugeGraph graph) {
this.graph = graph;
Expand Down Expand Up @@ -1304,6 +1307,11 @@ public RateLimiter readRateLimiter() {
public RamTable ramtable() {
return StandardHugeGraph.this.ramtable;
}

@Override
public <T> void submitEphemeralJob(EphemeralJob<T> job) {
this.ephemeralJobQueue.add(job);
}
}

private class TinkerPopTransaction extends AbstractThreadLocalTransaction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private void compressSnapshotDir(SnapshotWriter writer, Map<String, String> snap
try {
LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir, outputFile);
long begin = System.currentTimeMillis();
String rootDir = Paths.get(snapshotDir).getParent().toString();
String rootDir = Paths.get(snapshotDir).toAbsolutePath().getParent().toString();
String sourceDir = Paths.get(snapshotDir).getFileName().toString();
CompressStrategyManager.getDefault()
.compressZip(rootDir, sourceDir, outputFile, checksum);
Expand Down Expand Up @@ -200,7 +200,7 @@ private String decompressSnapshot(SnapshotReader reader,
E.checkArgument(this.dataDisks.containsKey(diskTableKey),
"The data path for '%s' should be exist", diskTableKey);
String dataPath = this.dataDisks.get(diskTableKey);
String parentPath = Paths.get(dataPath).getParent().toString();
String parentPath = Paths.get(dataPath).toAbsolutePath().getParent().toString();
String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR))
.toString();
FileUtils.deleteDirectory(new File(snapshotDir));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ public void forwardToLeader(PeerId leaderId, StoreCommand command,
public void setResponse(StoreCommandResponse response) {
if (response.getStatus()) {
LOG.debug("StoreCommandResponse status ok");
future.complete(Status.OK(), () -> null);
// This code forwards the request to the Raft leader and considers the operation successful
// if it's forwarded successfully. It returns a RaftClosure because the calling
// logic expects a RaftClosure result. Specifically, if the current instance is the Raft leader,
// it executes the corresponding logic locally and notifies the calling logic asynchronously
// via RaftClosure. Therefore, the result is returned as a RaftClosure here.
Comment on lines +80 to +84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to use /* */ next time

RaftClosure<Status> supplierFuture = new RaftClosure<>();
supplierFuture.complete(Status.OK());
future.complete(Status.OK(), () -> supplierFuture);
zyxxoo marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOG.debug("StoreCommandResponse status error");
Status status = new Status(RaftError.UNKNOWN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hugegraph.backend.page.PageState;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.task.EphemeralJobQueue;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
Expand Down Expand Up @@ -69,7 +70,6 @@
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.iterator.Metadatable;
import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.job.EphemeralJobBuilder;
import org.apache.hugegraph.job.system.DeleteExpiredJob;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.schema.IndexLabel;
Expand All @@ -81,7 +81,6 @@
import org.apache.hugegraph.structure.HugeIndex.IdWithExpiredTime;
import org.apache.hugegraph.structure.HugeProperty;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.task.HugeTask;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Action;
import org.apache.hugegraph.type.define.HugeKeys;
Expand Down Expand Up @@ -115,15 +114,11 @@ public GraphIndexTransaction(HugeGraphParams graph, BackendStore store) {
conf.get(CoreOptions.QUERY_INDEX_INTERSECT_THRESHOLD);
}

protected Id asyncRemoveIndexLeft(ConditionQuery query,
HugeElement element) {
protected void asyncRemoveIndexLeft(ConditionQuery query,
HugeElement element) {
LOG.info("Remove left index: {}, query: {}", element, query);
RemoveLeftIndexJob job = new RemoveLeftIndexJob(query, element);
HugeTask<?> task = EphemeralJobBuilder.of(this.graph())
.name(element.id().asString())
.job(job)
.schedule();
return task.id();
this.params().submitEphemeralJob(job);
}

@Watched(prefix = "index")
Expand Down Expand Up @@ -1717,7 +1712,8 @@ private static Query parent(Collection<Query> queries) {
}
}

public static class RemoveLeftIndexJob extends EphemeralJob<Object> {
public static class RemoveLeftIndexJob extends EphemeralJob<Long>
zyxxoo marked this conversation as resolved.
Show resolved Hide resolved
implements EphemeralJobQueue.Reduce<Long> {

private static final String REMOVE_LEFT_INDEX = "remove_left_index";

Expand All @@ -1741,7 +1737,7 @@ public String type() {
}

@Override
public Object execute() {
public Long execute() {
this.tx = this.element.schemaLabel().system() ?
this.params().systemTransaction().indexTransaction() :
this.params().graphTransaction().indexTransaction();
Expand Down Expand Up @@ -1780,7 +1776,6 @@ protected long removeIndexLeft(ConditionQuery query,
// Process secondary index or search index
sCount += this.processSecondaryOrSearchIndexLeft(cq, element);
}
this.tx.commit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we don't support auto commit anymore?

return rCount + sCount;
}

Expand Down Expand Up @@ -1808,7 +1803,6 @@ private long processRangeIndexLeft(ConditionQuery query,
}
// Remove LeftIndex after constructing remove job
this.query.removeElementLeftIndex(element.id());
this.tx.commit();
return count;
}

Expand Down Expand Up @@ -1873,11 +1867,9 @@ private long processSecondaryOrSearchIndexLeft(ConditionQuery query,
*/
this.tx.updateIndex(il.id(), element, false);
}
this.tx.commit();
if (this.deletedByError(element, incorrectIndexFields,
incorrectPKs)) {
this.tx.updateIndex(il.id(), deletion, false);
this.tx.commit();
} else {
count++;
}
Expand Down Expand Up @@ -1949,5 +1941,18 @@ private HugeElement newestElement(HugeElement element) {
return (HugeEdge) QueryResults.one(iter);
}
}

@Override
public Long reduce(Long t1, Long t2) {
if (t1 == null) {
return t2;
}

if (t2 == null) {
return t1;
}

return t1 + t2;
}
}
}
Loading