diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskCallable.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskCallable.java index ce3d26c644..bb21ac8554 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskCallable.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskCallable.java @@ -20,6 +20,7 @@ package com.baidu.hugegraph.task; import java.util.Date; +import java.util.Set; import java.util.concurrent.Callable; import org.apache.tinkerpop.gremlin.structure.Transaction; @@ -30,13 +31,29 @@ import com.baidu.hugegraph.HugeGraphParams; import com.baidu.hugegraph.util.E; import com.baidu.hugegraph.util.Log; +import com.google.common.collect.ImmutableSet; public abstract class TaskCallable implements Callable { private static final Logger LOG = Log.logger(HugeTask.class); - private static final String ERROR_MAX_LEN = "Failed to commit changes: " + - "The max length of bytes is"; + private static final String ERROR_COMMIT = "Failed to commit changes: "; + private static final Set ERROR_MESSAGES = ImmutableSet.of( + /* + * "The max length of bytes is" exception message occurs when + * task input size exceeds TASK_INPUT_SIZE_LIMIT or task result size + * exceeds TASK_RESULT_SIZE_LIMIT + */ + "The max length of bytes is", + /* + * "Batch too large" exception message occurs when using + * cassandra store and task input size is in + * [batch_size_fail_threshold_in_kb, TASK_INPUT_SIZE_LIMIT) or + * task result size is in + * [batch_size_fail_threshold_in_kb, TASK_RESULT_SIZE_LIMIT) + */ + "Batch too large" + ); private HugeTask task = null; private HugeGraph graph = null; @@ -98,7 +115,8 @@ protected void save() { */ LOG.error("Failed to save task with error \"{}\": {}", e, task.asMap(false)); - if (e.getMessage().contains(ERROR_MAX_LEN)) { + String message = e.getMessage(); + if (message.contains(ERROR_COMMIT) && needSaveWithEx(message)) { task.failSave(e); this.graph().taskScheduler().save(task); return; @@ -138,6 +156,15 @@ public static TaskCallable fromClass(String className) { } } + private static boolean needSaveWithEx(String message) { + for (String error : ERROR_MESSAGES) { + if (message.contains(error)) { + return true; + } + } + return false; + } + public static TaskCallable empty(Exception e) { return new TaskCallable() { @Override