-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Some bug fix for load task and schema manage #246
Conversation
7e582ac
to
efe4ed9
Compare
this.fileReadLines = this.context().newProgress().totalInputReaded(); | ||
this.lastDuration += this.context().summary().totalTime(); | ||
this.currDuration = 0L; | ||
// how to save db |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove it
while (this.status.inRunning()) { | ||
try { | ||
Thread.sleep(1000); | ||
} catch (InterruptedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to ignored
log.info("LoadTask is executing run task:{}", task.getId()); | ||
task.run(); | ||
log.info("Execute callback"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update log to:
log.info("Executing task: {}", task.getId());
log.info("Executed task: {}", task.getId());
9c40dc3
to
e794dd3
Compare
try { | ||
// transferTo should accept absolute path | ||
srcFile.transferTo(destFile.getAbsoluteFile()); | ||
result.setStatus(FileUploadResult.Status.SUCCESS); | ||
log.info("Uploaded file part {}", partName + "-" + index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Uploaded file part {}-{}",
@@ -160,7 +168,8 @@ public boolean tryMergePartFiles(String dirPath, int total) { | |||
// Rename file to dest file | |||
FileUtils.moveFile(partFiles[0], newFile); | |||
} catch (IOException e) { | |||
throw new InternalException("load.upload.move-file.failed"); | |||
log.error(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some message for log
@@ -178,19 +187,22 @@ public boolean tryMergePartFiles(String dirPath, int total) { | |||
try (InputStream is = new FileInputStream(partFile)) { | |||
IOUtils.copy(is, os); | |||
} catch (IOException e) { | |||
log.error(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
} | ||
} | ||
// Delete origin directory | ||
try { | ||
FileUtils.forceDelete(dir); | ||
} catch (IOException e) { | ||
throw new InternalException("load.upload.delete-temp-dir.failed"); | ||
log.error(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Iterator<Task> tasks = list.iterator(); | ||
while (tasks.hasNext()) { | ||
Task task = tasks.next(); | ||
for (Task task : list) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename list to tasks
LoadContext context = task.context(); | ||
task.setFileReadLines(context.newProgress().totalInputReaded()); | ||
task.setCurrDuration(context.summary().totalTime()); | ||
if (this.update(task) != 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does 1 mean, prefer boolean or enum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 means operate affected rows. it defined by mybatis-plus framework.
@@ -178,19 +188,23 @@ public boolean tryMergePartFiles(String dirPath, int total) { | |||
try (InputStream is = new FileInputStream(partFile)) { | |||
IOUtils.copy(is, os); | |||
} catch (IOException e) { | |||
log.error("Failed copy file stream from {} to {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failed to copy
} | ||
} | ||
} catch (IOException e) { | ||
throw new InternalException("load.upload.merge-file.failed"); | ||
log.error(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
improve
throw new InternalException("entity.update.failed", task); | ||
task.getOptions().incrementalMode = true; | ||
task.restoreContext(); | ||
task.lock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer to add method task.lock()/unlock()
public class LoadTask implements Runnable { | ||
|
||
@TableField(exist = false) | ||
@JsonIgnore | ||
private transient HugeGraphLoader loader; | ||
public transient final Lock lock = new ReentrantLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set private
@PathVariable("jobId") int jobId, | ||
@RequestParam("name") String fileName) { | ||
String token = this.service.generateFileToken(fileName); | ||
this.uploadingTokenLocks.put(token, new ReentrantReadWriteLock()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check exist
Iterator<Map.Entry<Integer, LoadTask>> iter = taskContainer.entrySet() | ||
.iterator(); | ||
Iterator<Map.Entry<Integer, LoadTask>> iter; | ||
iter = this.taskContainer.entrySet().iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
runningTasks
for (String fileName : fileNames) { | ||
String token = this.service.generateFileToken(fileName); | ||
Ex.check(!this.uploadingTokenLocks.containsKey(token), | ||
"load.upload.file.token.existed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check put() return null
hubble-be/src/main/java/com/baidu/hugegraph/service/load/LoadTaskService.java
Show resolved
Hide resolved
d67f793
to
670d887
Compare
@@ -286,6 +291,17 @@ public void loadParameter(@RequestBody LoadParameter newEntity) { | |||
} | |||
} | |||
|
|||
@PutMapping("finish") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer rename to another name, like ready-loading
@@ -240,6 +247,17 @@ public Boolean delete(@PathVariable("connId") int connId, | |||
} | |||
} | |||
|
|||
@PutMapping("finish") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
new ConfigOption<>( | ||
"upload_file.max_uploading_time", | ||
"The maximum allowable uploading time(second) for file " + | ||
"uploads, the uploaded parts will be cleared if exceed " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uploaded file parts
try { | ||
FileUtils.forceDelete(new File(filePath)); | ||
} catch (IOException e) { | ||
log.warn("Failed to delete uploading timeout file {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete expired uploading files
throw new InternalException("entity.update.failed", task); | ||
} | ||
// executed in other threads | ||
this.taskExecutor.execute(task, () -> this.update(task)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should handle exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should handle exception if the update is failed?
try { | ||
if (this.update(task) != 1) { | ||
throw new InternalException("entity.update.failed", task); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we wrap these 3 lines with a method
hubble-be/src/main/java/com/baidu/hugegraph/service/load/LoadTaskService.java
Show resolved
Hide resolved
if (this.update(task) != 1) { | ||
throw new InternalException("entity.update.failed", task); | ||
task.getOptions().incrementalMode = true; | ||
task.restoreContext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move line 200~2201 to try block
@@ -99,7 +101,8 @@ public ExecuteHistory get(int connId, int id) { | |||
try { | |||
Task task = client.task().get(history.getAsyncId()); | |||
history.setDuration(task.updateTime() - task.createTime()); | |||
history.setAsyncStatus(AsyncTaskStatus.valueOf(task.status().toUpperCase())); | |||
history.setAsyncStatus(AsyncTaskStatus.valueOf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer add a method setAsyncStatus(string)
} | ||
this.save(task); | ||
// executed in other threads | ||
this.taskExecutor.execute(task, () -> this.update(task)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should handle exception if the update is failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need, it doesn't affect other tasks to use the thread pool
@@ -130,18 +132,14 @@ public FileMapping fileSetting(@PathVariable("id") int id, | |||
if (mapping == null) { | |||
throw new ExternalException("load.file-mapping.not-exist.id", id); | |||
} | |||
// unescape \\t to \t | |||
newEntity.unescapeDelimiterIfNeeded(); | |||
// change format to TEXT if needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Change format to TEXT if needed
|
||
@JsonIgnore | ||
public Set<String> getVertexMappingLabels() { | ||
if (this.getVertexMappings() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (this.getVertexMappings() != null) {
return new HashSet<>();
}
return this.getVertexMappings().stream()
.map(ElementMapping::getLabel)
.collect(Collectors.toSet());
|
||
@JsonIgnore | ||
public Set<String> getEdgeMappingLabels() { | ||
if (this.getEdgeMappings() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
this.status = LoadStatus.FAILED; | ||
this.lock.lock(); | ||
try { | ||
// Pay attention to whether the user stops actively or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/*
*/
float rate; | ||
if (this.fileReadLines == null || this.duration == null || | ||
this.duration == 0L) { | ||
if (readLines == 0L || duration == 0L) { | ||
rate = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rate = 0.0F
No description provided.