From 79c00ea91cdb3d2d0c212d69ff7e368b203a8b2f Mon Sep 17 00:00:00 2001 From: vaughn Date: Wed, 10 May 2023 09:33:07 +0800 Subject: [PATCH] improve --- .../apache/hugegraph/StandardHugeGraph.java | 2 +- .../backend/tx/GraphIndexTransaction.java | 1 + .../backend/tx/GraphTransaction.java | 2 +- .../backend/tx/IndexableTransaction.java | 2 +- .../backend/tx/SchemaTransaction.java | 2 +- .../security/HugeSecurityManager.java | 30 +++++++++---------- .../tx => task}/EphemeralJobQueue.java | 28 +++++++++-------- .../apache/hugegraph/task/TaskCallable.java | 4 +-- 8 files changed, 37 insertions(+), 34 deletions(-) rename hugegraph-core/src/main/java/org/apache/hugegraph/{backend/tx => task}/EphemeralJobQueue.java (92%) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index c896618601..42a67158ed 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -51,7 +51,7 @@ import org.apache.hugegraph.backend.store.raft.RaftBackendStoreProvider; import org.apache.hugegraph.backend.store.raft.RaftGroupManager; import org.apache.hugegraph.backend.store.ram.RamTable; -import org.apache.hugegraph.backend.tx.EphemeralJobQueue; +import org.apache.hugegraph.task.EphemeralJobQueue; import org.apache.hugegraph.backend.tx.GraphTransaction; import org.apache.hugegraph.backend.tx.SchemaTransaction; import org.apache.hugegraph.config.CoreOptions; diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java index f7d1038427..f6739c5345 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java @@ -36,6 +36,7 @@ import org.apache.hugegraph.backend.page.PageState; import org.apache.hugegraph.backend.store.BackendEntry; import org.apache.hugegraph.backend.store.BackendStore; +import org.apache.hugegraph.task.EphemeralJobQueue; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java index cca5e2240f..0cb12f8db6 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java @@ -251,7 +251,7 @@ protected void reset() { } @Override - protected GraphIndexTransaction indexTransaction() { + public GraphIndexTransaction indexTransaction() { return this.indexTx; } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/IndexableTransaction.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/IndexableTransaction.java index a7a813aaf7..e5ca2d0463 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/IndexableTransaction.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/IndexableTransaction.java @@ -82,5 +82,5 @@ public void close() { } } - protected abstract AbstractTransaction indexTransaction(); + public abstract AbstractTransaction indexTransaction(); } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/SchemaTransaction.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/SchemaTransaction.java index 82ec7a20e2..4e25ba2488 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/SchemaTransaction.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/SchemaTransaction.java @@ -85,7 +85,7 @@ public SchemaTransaction(HugeGraphParams graph, BackendStore store) { } @Override - protected AbstractTransaction indexTransaction() { + public AbstractTransaction indexTransaction() { return this.indexTx; } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/security/HugeSecurityManager.java b/hugegraph-core/src/main/java/org/apache/hugegraph/security/HugeSecurityManager.java index ad4e78212e..af168a53b5 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/security/HugeSecurityManager.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/security/HugeSecurityManager.java @@ -219,31 +219,31 @@ public void checkExec(String cmd) { @Override public void checkRead(FileDescriptor fd) { -// if (callFromGremlin() && !callFromBackendSocket() && -// !callFromRaft() && !callFromSofaRpc()) { -// throw newSecurityException("Not allowed to read fd via Gremlin"); -// } + if (callFromGremlin() && !callFromBackendSocket() && + !callFromRaft() && !callFromSofaRpc()) { + throw newSecurityException("Not allowed to read fd via Gremlin"); + } super.checkRead(fd); } @Override public void checkRead(String file) { -// if (callFromGremlin() && !callFromCaffeine() && -// !readGroovyInCurrentDir(file) && !callFromBackendHbase() && -// !callFromSnapshot() && !callFromRaft() && -// !callFromSofaRpc()) { -// throw newSecurityException( -// "Not allowed to read file via Gremlin: %s", file); -// } + if (callFromGremlin() && !callFromCaffeine() && + !readGroovyInCurrentDir(file) && !callFromBackendHbase() && + !callFromSnapshot() && !callFromRaft() && + !callFromSofaRpc()) { + throw newSecurityException( + "Not allowed to read file via Gremlin: %s", file); + } super.checkRead(file); } @Override public void checkRead(String file, Object context) { -// if (callFromGremlin() && !callFromRaft() && !callFromSofaRpc()) { -// throw newSecurityException( -// "Not allowed to read file via Gremlin: %s", file); -// } + if (callFromGremlin() && !callFromRaft() && !callFromSofaRpc()) { + throw newSecurityException( + "Not allowed to read file via Gremlin: %s", file); + } super.checkRead(file, context); } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/EphemeralJobQueue.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java similarity index 92% rename from hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/EphemeralJobQueue.java rename to hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java index d94112df23..95a45d8e0f 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/EphemeralJobQueue.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java @@ -15,18 +15,18 @@ * under the License. */ -package org.apache.hugegraph.backend.tx; +package org.apache.hugegraph.task; import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; -import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicReference; import org.apache.hugegraph.HugeGraphParams; import org.apache.hugegraph.backend.query.Query; +import org.apache.hugegraph.backend.tx.GraphIndexTransaction; import org.apache.hugegraph.job.EphemeralJob; import org.apache.hugegraph.job.EphemeralJobBuilder; import org.apache.hugegraph.util.Log; @@ -63,8 +63,6 @@ public void add(EphemeralJob job) { if (!this.pendingQueue.offer(job)) { LOG.warn("The pending queue of EphemeralJobQueue is full, {} job " + "will be ignored", job.type()); - this.reScheduleIfNeeded(); - return; } this.reScheduleIfNeeded(); @@ -74,8 +72,12 @@ protected HugeGraphParams params() { return this.graph; } - protected Queue> queue() { - return this.pendingQueue; + protected void clear() { + this.pendingQueue.clear(); + } + + protected EphemeralJob poll() { + return this.pendingQueue.poll(); } public void consumeComplete() { @@ -93,7 +95,7 @@ public void reScheduleIfNeeded() { } catch (Throwable e) { // Maybe if it fails, consider clearing all the data in the pendingQueue, // or start a scheduled retry task to retry until success. - LOG.warn("Failed to schedule RemoveLeftIndexJob", e); + LOG.warn("Failed to schedule BatchEphemeralJob", e); this.pendingQueue.clear(); this.state.compareAndSet(State.EXECUTE, State.INIT); } @@ -124,7 +126,7 @@ public String type() { @Override public Object execute() throws Exception { boolean stop = false; - Object ret = null; + Object result = null; int consumeCount = 0; InterruptedException interruptedException = null; EphemeralJobQueue queue; @@ -152,19 +154,19 @@ public Object execute() throws Exception { try { while (!queue.isEmpty() && batchJobs.size() < PAGE_SIZE) { - EphemeralJob job = queue.queue().poll(); + EphemeralJob job = queue.poll(); if (job == null) { continue; } batchJobs.add(job); - consumeCount++; } if (batchJobs.isEmpty()) { continue; } - ret = this.executeBatchJob(batchJobs, ret); + consumeCount += batchJobs.size(); + result = this.executeBatchJob(batchJobs, result); } catch (InterruptedException e) { interruptedException = e; @@ -178,7 +180,7 @@ public Object execute() throws Exception { throw interruptedException; } - return ret; + return result; } private Object executeBatchJob(List> jobs, Object prevResult) throws Exception { @@ -214,7 +216,7 @@ public Object call() throws Exception { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); if (queue != null) { - queue.queue().clear(); + queue.clear(); queue.consumeComplete(); } throw e; diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskCallable.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskCallable.java index d73a6d71e3..117613c731 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskCallable.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskCallable.java @@ -124,7 +124,7 @@ protected void save() { } } - public void graph(HugeGraph graph) { + protected void graph(HugeGraph graph) { this.graph = graph; } @@ -176,7 +176,7 @@ public abstract static class SysTaskCallable extends TaskCallable { private HugeGraphParams params = null; - public void params(HugeGraphParams params) { + protected void params(HugeGraphParams params) { this.params = params; }