Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed May 10, 2023
1 parent d7ba63a commit 79c00ea
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.hugegraph.backend.store.raft.RaftBackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.RaftGroupManager;
import org.apache.hugegraph.backend.store.ram.RamTable;
import org.apache.hugegraph.backend.tx.EphemeralJobQueue;
import org.apache.hugegraph.task.EphemeralJobQueue;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.config.CoreOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hugegraph.backend.page.PageState;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.task.EphemeralJobQueue;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ protected void reset() {
}

@Override
protected GraphIndexTransaction indexTransaction() {
public GraphIndexTransaction indexTransaction() {
return this.indexTx;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,5 @@ public void close() {
}
}

protected abstract AbstractTransaction indexTransaction();
public abstract AbstractTransaction indexTransaction();
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public SchemaTransaction(HugeGraphParams graph, BackendStore store) {
}

@Override
protected AbstractTransaction indexTransaction() {
public AbstractTransaction indexTransaction() {
return this.indexTx;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,31 +219,31 @@ public void checkExec(String cmd) {

@Override
public void checkRead(FileDescriptor fd) {
// if (callFromGremlin() && !callFromBackendSocket() &&
// !callFromRaft() && !callFromSofaRpc()) {
// throw newSecurityException("Not allowed to read fd via Gremlin");
// }
if (callFromGremlin() && !callFromBackendSocket() &&
!callFromRaft() && !callFromSofaRpc()) {
throw newSecurityException("Not allowed to read fd via Gremlin");
}
super.checkRead(fd);
}

@Override
public void checkRead(String file) {
// if (callFromGremlin() && !callFromCaffeine() &&
// !readGroovyInCurrentDir(file) && !callFromBackendHbase() &&
// !callFromSnapshot() && !callFromRaft() &&
// !callFromSofaRpc()) {
// throw newSecurityException(
// "Not allowed to read file via Gremlin: %s", file);
// }
if (callFromGremlin() && !callFromCaffeine() &&
!readGroovyInCurrentDir(file) && !callFromBackendHbase() &&
!callFromSnapshot() && !callFromRaft() &&
!callFromSofaRpc()) {
throw newSecurityException(
"Not allowed to read file via Gremlin: %s", file);
}
super.checkRead(file);
}

@Override
public void checkRead(String file, Object context) {
// if (callFromGremlin() && !callFromRaft() && !callFromSofaRpc()) {
// throw newSecurityException(
// "Not allowed to read file via Gremlin: %s", file);
// }
if (callFromGremlin() && !callFromRaft() && !callFromSofaRpc()) {
throw newSecurityException(
"Not allowed to read file via Gremlin: %s", file);
}
super.checkRead(file, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
* under the License.
*/

package org.apache.hugegraph.backend.tx;
package org.apache.hugegraph.task;

import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.tx.GraphIndexTransaction;
import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.job.EphemeralJobBuilder;
import org.apache.hugegraph.util.Log;
Expand Down Expand Up @@ -63,8 +63,6 @@ public void add(EphemeralJob<?> job) {
if (!this.pendingQueue.offer(job)) {
LOG.warn("The pending queue of EphemeralJobQueue is full, {} job " +
"will be ignored", job.type());
this.reScheduleIfNeeded();
return;
}

this.reScheduleIfNeeded();
Expand All @@ -74,8 +72,12 @@ protected HugeGraphParams params() {
return this.graph;
}

protected Queue<EphemeralJob<?>> queue() {
return this.pendingQueue;
protected void clear() {
this.pendingQueue.clear();
}

protected EphemeralJob<?> poll() {
return this.pendingQueue.poll();
}

public void consumeComplete() {
Expand All @@ -93,7 +95,7 @@ public void reScheduleIfNeeded() {
} catch (Throwable e) {
// Maybe if it fails, consider clearing all the data in the pendingQueue,
// or start a scheduled retry task to retry until success.
LOG.warn("Failed to schedule RemoveLeftIndexJob", e);
LOG.warn("Failed to schedule BatchEphemeralJob", e);
this.pendingQueue.clear();
this.state.compareAndSet(State.EXECUTE, State.INIT);
}
Expand Down Expand Up @@ -124,7 +126,7 @@ public String type() {
@Override
public Object execute() throws Exception {
boolean stop = false;
Object ret = null;
Object result = null;
int consumeCount = 0;
InterruptedException interruptedException = null;
EphemeralJobQueue queue;
Expand Down Expand Up @@ -152,19 +154,19 @@ public Object execute() throws Exception {

try {
while (!queue.isEmpty() && batchJobs.size() < PAGE_SIZE) {
EphemeralJob<?> job = queue.queue().poll();
EphemeralJob<?> job = queue.poll();
if (job == null) {
continue;
}
batchJobs.add(job);
consumeCount++;
}

if (batchJobs.isEmpty()) {
continue;
}

ret = this.executeBatchJob(batchJobs, ret);
consumeCount += batchJobs.size();
result = this.executeBatchJob(batchJobs, result);

} catch (InterruptedException e) {
interruptedException = e;
Expand All @@ -178,7 +180,7 @@ public Object execute() throws Exception {
throw interruptedException;
}

return ret;
return result;
}

private Object executeBatchJob(List<EphemeralJob<?>> jobs, Object prevResult) throws Exception {
Expand Down Expand Up @@ -214,7 +216,7 @@ public Object call() throws Exception {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
if (queue != null) {
queue.queue().clear();
queue.clear();
queue.consumeComplete();
}
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected void save() {
}
}

public void graph(HugeGraph graph) {
protected void graph(HugeGraph graph) {
this.graph = graph;
}

Expand Down Expand Up @@ -176,7 +176,7 @@ public abstract static class SysTaskCallable<V> extends TaskCallable<V> {

private HugeGraphParams params = null;

public void params(HugeGraphParams params) {
protected void params(HugeGraphParams params) {
this.params = params;
}

Expand Down

0 comments on commit 79c00ea

Please sign in to comment.