From e794dd3e667754a3a1210bf5db4cc600d2aa5259 Mon Sep 17 00:00:00 2001 From: liningrui Date: Tue, 29 Sep 2020 21:29:02 +0800 Subject: [PATCH] Some bug fix for load task and schema manage --- hubble-be/pom.xml | 6 +- .../baidu/hugegraph/config/AsyncConfig.java | 4 +- .../load/FileMappingController.java | 6 +- .../controller/load/FileUploadController.java | 225 +++++++++++------- .../controller/load/JobManagerController.java | 18 +- .../controller/load/LoadTaskController.java | 38 ++- .../{ => task}/AsyncTaskController.java | 2 +- .../hugegraph/entity/load/FileMapping.java | 24 ++ .../hugegraph/entity/load/FileSetting.java | 52 +++- .../entity/load/JobManagerReasonResult.java | 1 - .../baidu/hugegraph/entity/load/LoadTask.java | 122 ++++++---- .../entity/{algorithm => task}/AsyncTask.java | 2 +- .../{algorithm => task}/AsyncTaskResult.java | 2 +- .../hugegraph/handler/LoadTaskExecutor.java | 6 +- .../mapper/algorithm/AsyncTaskMapper.java | 2 +- .../mapper/load/JobManagerMapper.java | 7 +- .../hugegraph/options/HubbleOptions.java | 2 +- .../service/algorithm/AsyncTaskService.java | 8 +- .../service/algorithm/OltpAlgoService.java | 26 +- .../service/load/FileMappingService.java | 72 ++++-- .../service/load/JobManagerService.java | 13 +- .../service/load/LoadTaskService.java | 131 +++++----- .../service/query/ExecuteHistoryService.java | 9 +- .../service/schema/EdgeLabelService.java | 25 +- .../service/schema/PropertyIndexService.java | 9 +- .../service/schema/VertexLabelService.java | 29 +-- .../com/baidu/hugegraph/util/HubbleUtil.java | 5 + .../src/main/resources/database/schema.sql | 5 +- .../main/resources/i18n/messages.properties | 1 + .../resources/i18n/messages_zh_CN.properties | 1 + hubble-dist/pom.xml | 2 +- 31 files changed, 511 insertions(+), 344 deletions(-) rename hubble-be/src/main/java/com/baidu/hugegraph/controller/{ => task}/AsyncTaskController.java (99%) rename hubble-be/src/main/java/com/baidu/hugegraph/entity/{algorithm => task}/AsyncTask.java (98%) rename hubble-be/src/main/java/com/baidu/hugegraph/entity/{algorithm => task}/AsyncTaskResult.java (97%) diff --git a/hubble-be/pom.xml b/hubble-be/pom.xml index ae5de8e6..dd1728db 100644 --- a/hubble-be/pom.xml +++ b/hubble-be/pom.xml @@ -32,10 +32,6 @@ org.springframework.boot spring-boot-starter-cache - - org.springframework.boot - spring-boot-starter-aop - org.springframework.boot spring-boot-starter-test @@ -124,7 +120,7 @@ com.baidu.hugegraph hugegraph-loader - 0.10.4 + 0.10.5 com.baidu.hugegraph diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/config/AsyncConfig.java b/hubble-be/src/main/java/com/baidu/hugegraph/config/AsyncConfig.java index 64355b27..33d03c60 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/config/AsyncConfig.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/config/AsyncConfig.java @@ -33,8 +33,8 @@ public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); - taskExecutor.setCorePoolSize(4); - taskExecutor.setMaxPoolSize(8); + taskExecutor.setCorePoolSize(8); + taskExecutor.setMaxPoolSize(16); taskExecutor.setQueueCapacity(16); taskExecutor.initialize(); return taskExecutor; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileMappingController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileMappingController.java index 5b27215e..3fc08b0b 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileMappingController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileMappingController.java @@ -130,8 +130,8 @@ public FileMapping fileSetting(@PathVariable("id") int id, if (mapping == null) { throw new ExternalException("load.file-mapping.not-exist.id", id); } - // unescape \\t to \t - newEntity.unescapeDelimiterIfNeeded(); + // change format to TEXT if needed + newEntity.changeFormatIfNeeded(); FileSetting oldEntity = mapping.getFileSetting(); FileSetting entity = this.mergeEntity(oldEntity, newEntity); mapping.setFileSetting(entity); @@ -140,8 +140,6 @@ public FileMapping fileSetting(@PathVariable("id") int id, if (this.service.update(mapping) != 1) { throw new InternalException("entity.update.failed", mapping); } - // escape \t to \\t - mapping.getFileSetting().escapeDelimiterIfNeeded(); return mapping; } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileUploadController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileUploadController.java index b1bea2c8..e1e7cfd9 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileUploadController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileUploadController.java @@ -25,14 +25,18 @@ import java.io.File; import java.io.IOException; import java.nio.file.Paths; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; @@ -41,6 +45,7 @@ import org.springframework.web.multipart.MultipartFile; import com.baidu.hugegraph.common.Constant; +import com.baidu.hugegraph.common.Response; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.entity.enums.FileMappingStatus; import com.baidu.hugegraph.entity.enums.JobManagerStatus; @@ -69,15 +74,31 @@ public class FileUploadController { @Autowired private JobManagerService jobService; + private final Map uploadingTokenLocks; + + public FileUploadController() { + this.uploadingTokenLocks = new ConcurrentHashMap<>(); + } + + @GetMapping("token") + public Response fileToken(@PathVariable("connId") int connId, + @PathVariable("jobId") int jobId, + @RequestParam("name") String fileName) { + String token = this.service.generateFileToken(fileName); + this.uploadingTokenLocks.put(token, new ReentrantReadWriteLock()); + return Response.builder().status(Constant.STATUS_OK).data(token).build(); + } + @PostMapping public FileUploadResult upload(@PathVariable("connId") int connId, @PathVariable("jobId") int jobId, @RequestParam("file") MultipartFile file, @RequestParam("name") String fileName, + @RequestParam("token") String token, @RequestParam("total") int total, @RequestParam("index") int index) { - // When front-end use multipart-upload mode, - // file.getOriginalFilename() is blob, not actual file name + this.checkTotalAndIndexValid(total, index); + this.checkFileNameMatchToken(fileName, token); JobManager jobEntity = this.jobService.get(jobId); this.checkFileValid(connId, jobId, jobEntity, file, fileName); @@ -88,105 +109,137 @@ public FileUploadResult upload(@PathVariable("connId") int connId, // Before merge: upload-files/conn-1/verson_person.csv/part-1 // After merge: upload-files/conn-1/file-mapping-1/verson_person.csv String filePath = Paths.get(location, path, fileName).toString(); - // Check destFile exist - // Ex.check(!destFile.exists(), "load.upload.file.existed", fileName); - FileUploadResult result = this.service.uploadFile(file, index, filePath); - if (result.getStatus() == FileUploadResult.Status.FAILURE) { + // Check this file deleted before + ReadWriteLock lock = this.uploadingTokenLocks.get(token); + if (lock == null) { + FileUploadResult result = new FileUploadResult(); + // Current part saved path + String partName = file.getOriginalFilename(); + result.setName(partName); + result.setSize(file.getSize()); + result.setStatus(FileUploadResult.Status.FAILURE); + result.setCause("File has been deleted"); return result; } - // Verify the existence of fragmented files - FileMapping mapping = this.service.get(connId, jobId, fileName); - if (mapping != null) { - mapping.setJobId(jobId); - mapping.setFileStatus(FileMappingStatus.UPLOADING); - mapping.setFileIndex(mapping.getFileIndex() + "," + index); - mapping.setFileTotal(total); - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); + + lock.readLock().lock(); + try { + FileUploadResult result = this.service.uploadFile(file, token, + index, filePath); + if (result.getStatus() == FileUploadResult.Status.FAILURE) { + return result; } - } else { - mapping = new FileMapping(connId, fileName, filePath); - mapping.setJobId(jobId); - mapping.setFileStatus(FileMappingStatus.UPLOADING); - mapping.setFileIndex(String.valueOf(index)); - mapping.setFileTotal(total); - if (this.service.save(mapping) != 1) { - throw new InternalException("entity.insert.failed", mapping); + synchronized (this.service) { + // Verify the existence of fragmented files + FileMapping mapping = this.service.get(connId, jobId, fileName); + if (mapping == null) { + mapping = new FileMapping(connId, fileName, filePath); + mapping.setJobId(jobId); + mapping.setFileStatus(FileMappingStatus.UPLOADING); + mapping.setFileIndex(String.valueOf(index)); + mapping.setFileTotal(total); + if (this.service.save(mapping) != 1) { + throw new InternalException("entity.insert.failed", + mapping); + } + } else { + if (mapping.getFileStatus() == FileMappingStatus.COMPLETED) { + result.setId(mapping.getId()); + return result; + } + } + Integer mappingId = mapping.getId(); + // Determine whether all the parts have been uploaded, then merge them + boolean merged = this.service.tryMergePartFiles(filePath, total); + if (!merged) { + return result; + } + // Save file mapping + mapping = new FileMapping(connId, fileName, filePath); + // Read column names and values then fill it + this.service.extractColumns(mapping); + mapping.setId(mappingId); + mapping.setFileStatus(FileMappingStatus.COMPLETED); + mapping.setTotalLines(FileUtil.countLines(mapping.getPath())); + mapping.setTotalSize(FileUtils.sizeOf(new File(mapping.getPath()))); + mapping.setCreateTime(HubbleUtil.nowDate()); + + // Move to the directory corresponding to the file mapping Id + String newPath = this.service.moveToNextLevelDir(mapping); + // Update file mapping stored path + mapping.setPath(newPath); + if (this.service.update(mapping) != 1) { + throw new InternalException("entity.update.failed", mapping); + } + // Update Job Manager size + long jobSize = jobEntity.getJobSize() + mapping.getTotalSize(); + jobEntity.setJobSize(jobSize); + jobEntity.setJobStatus(JobManagerStatus.SETTING); + if (this.jobService.update(jobEntity) != 1) { + throw new InternalException("entity.update.failed", jobEntity); + } + result.setId(mapping.getId()); } + return result; + } finally { + lock.readLock().unlock(); } - Integer mapId = mapping.getId(); - // Determine whether all the parts have been uploaded, then merge them - boolean merged = this.service.tryMergePartFiles(filePath, total); - if (merged) { - // Save file mapping - mapping = new FileMapping(connId, fileName, filePath); - // Read column names and values then fill it - this.service.extractColumns(mapping); - mapping.setId(mapId); - mapping.setFileStatus(FileMappingStatus.COMPLETED); - mapping.setTotalLines(FileUtil.countLines(mapping.getPath())); - mapping.setTotalSize(FileUtils.sizeOf(new File(mapping.getPath()))); - mapping.setCreateTime(HubbleUtil.nowDate()); - // Will generate mapping id - - // Move to the directory corresponding to the file mapping Id - String newPath = this.service.moveToNextLevelDir(mapping); - // Update file mapping stored path - mapping.setPath(newPath); - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); + } + + @DeleteMapping + public Boolean delete(@PathVariable("connId") int connId, + @PathVariable("jobId") int jobId, + @RequestParam("name") String fileName, + @RequestParam("token") String token) { + JobManager jobEntity = this.jobService.get(jobId); + Ex.check(jobEntity != null, + "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobManagerStatus.DEFAULT || + jobEntity.getJobStatus() == JobManagerStatus.SETTING, + "deleted.file.no-permission" ); + FileMapping mapping = this.service.get(connId, fileName); + Ex.check(mapping != null, "load.file-mapping.not-exist.name", fileName); + + ReadWriteLock lock = this.uploadingTokenLocks.get(token); + if (lock == null) { + throw new InternalException("Can't find lock of file %s with " + + "token %s", fileName, token); + } + lock.writeLock().lock(); + try { + this.service.deleteDiskFile(mapping); + this.uploadingTokenLocks.remove(token); + + if (this.service.remove(mapping.getId()) != 1) { + throw new InternalException("entity.delete.failed", mapping); } - // Update Job Manager size - long jobSize = jobEntity.getJobSize() + mapping.getTotalSize(); + log.info("removed file mapping {}", mapping.getId()); + long jobSize = jobEntity.getJobSize() - mapping.getTotalSize(); jobEntity.setJobSize(jobSize); - jobEntity.setJobStatus(JobManagerStatus.SETTING); if (this.jobService.update(jobEntity) != 1) { throw new InternalException("job-manager.entity.update.failed", jobEntity); } - result.setId(mapping.getId()); + return true; + } finally { + lock.writeLock().unlock(); } - return result; } - @DeleteMapping - public Map delete(@PathVariable("connId") int connId, - @PathVariable("jobId") int jobId, - @RequestParam("names") - List fileNames) { - - JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); - Ex.check(jobEntity.getJobStatus() == JobManagerStatus.SETTING, - "deleted.file.no-permission" ); - Ex.check(fileNames.size() > 0, "load.upload.files.at-least-one"); - Map result = new LinkedHashMap<>(); - for (String fileName : fileNames) { - FileMapping mapping = this.service.get(connId, fileName); - Ex.check(mapping != null, "load.file-mapping.not-exist.name", - fileName); - File destFile = new File(mapping.getPath()); - boolean deleted = destFile.delete(); - if (deleted) { - log.info("deleted file {}, prepare to remove file mapping {}", - destFile, mapping.getId()); - if (this.service.remove(mapping.getId()) != 1) { - throw new InternalException("entity.delete.failed", mapping); - } - log.info("removed file mapping {}", mapping.getId()); - long jobSize = jobEntity.getJobSize() - mapping.getTotalSize(); - jobEntity.setJobSize(jobSize); - if (this.jobService.update(jobEntity) != 1) { - throw new InternalException("job-manager.entity.update.failed", - jobEntity); - } - } - result.put(fileName, deleted); + private void checkTotalAndIndexValid(int total, int index) { + if (total <= 0) { + throw new InternalException("The request params 'total' must > 0"); + } + if (index < 0) { + throw new InternalException("The request params 'index' must >= 0"); } - return result; } + private void checkFileNameMatchToken(String fileName, String token) { + String md5Prefix = HubbleUtil.md5(fileName); + Ex.check(StringUtils.isNotEmpty(token) && token.startsWith(md5Prefix), + "load.upload.file.name-token.unmatch"); + } private void checkFileValid(int connId, int jobId, JobManager jobEntity, MultipartFile file, String fileName) { @@ -198,7 +251,7 @@ private void checkFileValid(int connId, int jobId, JobManager jobEntity, // Now allowed to upload empty file Ex.check(!file.isEmpty(), "load.upload.file.cannot-be-empty"); // Difficult: how to determine whether the file is csv or text - log.info("File content type: {}", file.getContentType()); + log.debug("File content type: {}", file.getContentType()); String format = FilenameUtils.getExtension(fileName); List formatWhiteList = this.config.get( diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/JobManagerController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/JobManagerController.java index ace5789a..5298d881 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/JobManagerController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/JobManagerController.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.List; -import org.datanucleus.util.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; @@ -78,13 +78,15 @@ public JobManager create(@PathVariable("connId") int connId, Ex.check(entity.getJobName().length() <= 48, "job.manager.job-name.reached-limit"); Ex.check(entity.getJobName() != null, () -> - Constant.COMMON_NAME_PATTERN.matcher(entity.getJobName()).matches(), - "job.manager.job-name.unmatch-regex"); + Constant.COMMON_NAME_PATTERN.matcher( + entity.getJobName()).matches(), + "job.manager.job-name.unmatch-regex"); Ex.check(entity.getJobRemarks().length() <= 200, "job.manager.job-remarks.reached-limit"); Ex.check(!StringUtils.isEmpty(entity.getJobRemarks()), () -> - Constant.COMMON_REMARK_PATTERN.matcher(entity.getJobRemarks()).matches(), - "job.manager.job-remarks.unmatch-regex"); + Constant.COMMON_REMARK_PATTERN.matcher( + entity.getJobRemarks()).matches(), + "job.manager.job-remarks.unmatch-regex"); Ex.check(this.service.count() < LIMIT, "job.manager.reached-limit", LIMIT); if (this.service.getTask(entity.getJobName(), connId) != null) { @@ -153,8 +155,8 @@ public JobManager update(@PathVariable("id") int id, Ex.check(newEntity.getJobName().length() <= 48, "job.manager.job-name.reached-limit"); Ex.check(newEntity.getJobName() != null, () -> - Constant.COMMON_NAME_PATTERN - .matcher(newEntity.getJobName()).matches(), + Constant.COMMON_NAME_PATTERN.matcher( + newEntity.getJobName()).matches(), "job.manager.job-name.unmatch-regex"); Ex.check(newEntity.getJobRemarks().length() <= 200, "job.manager.job-remarks.reached-limit"); @@ -178,7 +180,7 @@ public Response reason(@PathVariable("connId") int connId, if (job == null) { throw new ExternalException("job.manager.not-exist.id", id); } - List tasks = taskService.batchTasks(job.getId()); + List tasks = this.taskService.batchTasks(job.getId()); List reasonResults = new ArrayList<>(); tasks.forEach(task -> { JobManagerReasonResult reasonResult = new JobManagerReasonResult(); diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/LoadTaskController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/LoadTaskController.java index ac7c7e32..f69bdbab 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/LoadTaskController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/LoadTaskController.java @@ -107,11 +107,10 @@ public LoadTask create(@PathVariable("connId") int connId, @PathVariable("jobId") int jobId, @RequestBody LoadTask entity) { JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); Ex.check(jobEntity.getJobStatus() == JobManagerStatus.SETTING, "load.task.create.no-permission" ); - synchronized(this.service) { + synchronized (this.service) { Ex.check(this.service.count() < LIMIT, "load.task.reached-limit", LIMIT); entity.setConnId(connId); @@ -144,11 +143,10 @@ public List start(@PathVariable("connId") int connId, throw new ExternalException("graph-connection.not-exist.id", connId); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); - Ex.check((jobEntity.getJobStatus() == JobManagerStatus.SETTING || - jobEntity.getJobStatus() == JobManagerStatus.IMPORTING), - "load.task.start.no-permission" ); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobManagerStatus.SETTING || + jobEntity.getJobStatus() == JobManagerStatus.IMPORTING, + "load.task.start.no-permission"); List tasks = new ArrayList<>(); for (Integer fileId: fileIds) { @@ -176,13 +174,12 @@ public LoadTask pause(@PathVariable("connId") int connId, throw new ExternalException("graph-connection.not-exist.id", connId); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); Ex.check(jobEntity.getJobStatus() == JobManagerStatus.IMPORTING, "load.task.pause.no-permission"); LoadTask task = this.service.pause(taskId); jobEntity.setJobStatus(JobManagerStatus.IMPORTING); - jobEntity.setUpdateTime( HubbleUtil.nowDate()); + jobEntity.setUpdateTime(HubbleUtil.nowDate()); if (this.jobService.update(jobEntity) != 1) { throw new InternalException("job-manager.entity.update.failed", jobEntity); @@ -199,13 +196,12 @@ public LoadTask resume(@PathVariable("connId") int connId, throw new ExternalException("graph-connection.not-exist.id", connId); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); Ex.check(jobEntity.getJobStatus() == JobManagerStatus.IMPORTING, "load.task.pause.no-permission"); LoadTask task = this.service.resume(taskId); jobEntity.setJobStatus(JobManagerStatus.IMPORTING); - jobEntity.setUpdateTime( HubbleUtil.nowDate()); + jobEntity.setUpdateTime(HubbleUtil.nowDate()); if (this.jobService.update(jobEntity) != 1) { throw new InternalException("job-manager.entity.update.failed", jobEntity); @@ -222,8 +218,7 @@ public LoadTask stop(@PathVariable("connId") int connId, throw new ExternalException("graph-connection.not-exist.id", connId); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); Ex.check(jobEntity.getJobStatus() == JobManagerStatus.IMPORTING, "load.task.pause.no-permission"); LoadTask task = this.service.stop(taskId); @@ -245,8 +240,7 @@ public LoadTask retry(@PathVariable("connId") int connId, throw new ExternalException("graph-connection.not-exist.id", connId); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); Ex.check(jobEntity.getJobStatus() == JobManagerStatus.IMPORTING, "load.task.pause.no-permission"); LoadTask task = this.service.retry(taskId); @@ -268,11 +262,13 @@ public Response reason(@PathVariable("connId") int connId, throw new ExternalException("load.task.not-exist.id", id); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); Integer fileId = task.getFileId(); FileMapping mapping = this.fmService.get(fileId); String reason = this.service.readLoadFailedReason(mapping); - return Response.builder().status(200).data(reason).build(); + return Response.builder() + .status(Constant.STATUS_OK) + .data(reason) + .build(); } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/AsyncTaskController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/task/AsyncTaskController.java similarity index 99% rename from hubble-be/src/main/java/com/baidu/hugegraph/controller/AsyncTaskController.java rename to hubble-be/src/main/java/com/baidu/hugegraph/controller/task/AsyncTaskController.java index 0118d221..5cba1f0e 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/AsyncTaskController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/task/AsyncTaskController.java @@ -17,7 +17,7 @@ * under the License. */ -package com.baidu.hugegraph.controller; +package com.baidu.hugegraph.controller.task; import java.util.List; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileMapping.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileMapping.java index 7719b518..b635d343 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileMapping.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileMapping.java @@ -20,8 +20,10 @@ package com.baidu.hugegraph.entity.load; import java.util.Date; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; +import java.util.stream.Collectors; import com.baidu.hugegraph.annotation.MergeProperty; import com.baidu.hugegraph.entity.enums.FileMappingStatus; @@ -157,4 +159,26 @@ public EdgeMapping getEdgeMapping(String emId) { } return null; } + + @JsonIgnore + public Set getVertexMappingLabels() { + if (this.getVertexMappings() != null) { + return this.getVertexMappings().stream() + .map(ElementMapping::getLabel) + .collect(Collectors.toSet()); + } else { + return new HashSet<>(); + } + } + + @JsonIgnore + public Set getEdgeMappingLabels() { + if (this.getEdgeMappings() != null) { + return this.getEdgeMappings().stream() + .map(ElementMapping::getLabel) + .collect(Collectors.toSet()); + } else { + return new HashSet<>(); + } + } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileSetting.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileSetting.java index 137c0aa6..85febb94 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileSetting.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileSetting.java @@ -19,12 +19,21 @@ package com.baidu.hugegraph.entity.load; +import java.io.IOException; import java.util.List; import com.baidu.hugegraph.annotation.MergeProperty; import com.baidu.hugegraph.common.Constant; import com.baidu.hugegraph.common.Mergeable; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; import lombok.AllArgsConstructor; import lombok.Builder; @@ -53,6 +62,8 @@ public class FileSetting implements Mergeable { @MergeProperty @JsonProperty("delimiter") + @JsonSerialize(using = DelimiterSerializer.class) + @JsonDeserialize(using = DelimiterDeserializer.class) private String delimiter = ","; @MergeProperty @@ -75,18 +86,45 @@ public class FileSetting implements Mergeable { @JsonProperty("list_format") private ListFormat listFormat = new ListFormat(); - public void unescapeDelimiterIfNeeded() { - if ("\\t".equals(this.delimiter)) { - this.delimiter = "\t"; - } + public void changeFormatIfNeeded() { if (!",".equals(this.delimiter)) { this.format = "TEXT"; } } - public void escapeDelimiterIfNeeded() { - if ("\t".equals(this.delimiter)) { - this.delimiter = "\\t"; + public static class DelimiterSerializer extends StdSerializer { + + protected DelimiterSerializer() { + super(String.class); + } + + @Override + public void serialize(String delimiter, JsonGenerator jsonGenerator, + SerializerProvider provider) throws IOException { + if ("\t".equals(delimiter)) { + jsonGenerator.writeString("\\t"); + } else { + jsonGenerator.writeString(delimiter); + } + } + } + + public static class DelimiterDeserializer extends StdDeserializer { + + protected DelimiterDeserializer() { + super(String.class); + } + + @Override + public String deserialize(JsonParser jsonParser, + DeserializationContext context) + throws IOException { + String delimiter = jsonParser.getText(); + if ("\\t".equals(delimiter)) { + return "\t"; + } else { + return delimiter; + } } } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManagerReasonResult.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManagerReasonResult.java index 44011408..0cfdd7f6 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManagerReasonResult.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManagerReasonResult.java @@ -20,7 +20,6 @@ package com.baidu.hugegraph.entity.load; import com.baidu.hugegraph.annotation.MergeProperty; -import com.baomidou.mybatisplus.annotation.TableField; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/LoadTask.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/LoadTask.java index d7c9023a..4e844c75 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/LoadTask.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/LoadTask.java @@ -20,9 +20,9 @@ package com.baidu.hugegraph.entity.load; import java.util.Date; -import java.util.HashSet; import java.util.Set; -import java.util.stream.Collectors; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.baidu.hugegraph.annotation.MergeProperty; import com.baidu.hugegraph.entity.GraphConnection; @@ -40,7 +40,6 @@ import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonPropertyOrder; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import lombok.AllArgsConstructor; @@ -55,14 +54,15 @@ @AllArgsConstructor @Builder @TableName(value = "load_task", autoResultMap = true) -@JsonPropertyOrder({"id", "conn_id", "task_id" , "file_id", "file_name", "vertices", - "edges", "load_rate", "load_progress", "file_total_lines", - "file_read_lines", "status", "duration", "create_time"}) public class LoadTask implements Runnable { @TableField(exist = false) @JsonIgnore - private transient HugeGraphLoader loader; + public transient final Lock lock = new ReentrantLock(); + + @TableField(exist = false) + @JsonIgnore + private transient volatile HugeGraphLoader loader; @TableId(type = IdType.AUTO) @MergeProperty(useNew = false) @@ -109,21 +109,27 @@ public class LoadTask implements Runnable { @JsonProperty("file_total_lines") private Long fileTotalLines; + @TableField("load_status") + @MergeProperty + @JsonProperty("status") + private volatile LoadStatus status; + @TableField("file_read_lines") @MergeProperty @JsonProperty("file_read_lines") private Long fileReadLines; - @TableField("load_status") + @TableField("last_duration") @MergeProperty - @JsonProperty("status") - private LoadStatus status; + @JsonProperty("last_duration") + @JsonSerialize(using = SerializeUtil.DurationSerializer.class) + private Long lastDuration; - @TableField("duration") + @TableField("curr_duration") @MergeProperty - @JsonProperty("duration") + @JsonProperty("curr_duration") @JsonSerialize(using = SerializeUtil.DurationSerializer.class) - private Long duration; + private Long currDuration; @MergeProperty(useNew = false) @JsonProperty("create_time") @@ -138,64 +144,70 @@ public LoadTask(LoadOptions options, GraphConnection connection, this.fileId = mapping.getId(); this.fileName = mapping.getName(); this.options = options; - if (mapping.getVertexMappings() != null) { - this.vertices = mapping.getVertexMappings().stream() - .map(ElementMapping::getLabel) - .collect(Collectors.toSet()); - } else { - this.vertices = new HashSet<>(); - } - if (mapping.getEdgeMappings() != null) { - this.edges = mapping.getEdgeMappings().stream() - .map(ElementMapping::getLabel) - .collect(Collectors.toSet()); - } else { - this.edges = new HashSet<>(); - } + this.vertices = mapping.getVertexMappingLabels(); + this.edges = mapping.getEdgeMappingLabels(); this.fileTotalLines = mapping.getTotalLines(); - this.fileReadLines = 0L; this.status = LoadStatus.RUNNING; - this.duration = 0L; + this.fileReadLines = 0L; + this.lastDuration = 0L; + this.currDuration = 0L; this.createTime = HubbleUtil.nowDate(); } @Override public void run() { - log.info("LoadTaskMonitor is monitoring task : {}", this.id); - boolean succeed; + log.info("LoadTask is start running : {}", this.id); + boolean noError; try { - succeed = this.loader.load(); + noError = this.loader.load(); } catch (Throwable e) { - succeed = false; + noError = false; log.error("Run task {} failed. cause: {}", this.id, e.getMessage()); } - // Pay attention to whether the user stops actively or - // the program stops by itself - if (this.status.inRunning()) { - if (succeed) { - this.status = LoadStatus.SUCCEED; - } else { - this.status = LoadStatus.FAILED; + this.lock.lock(); + try { + // Pay attention to whether the user stops actively or + // the program stops by itself + if (this.status.inRunning()) { + if (noError) { + this.status = LoadStatus.SUCCEED; + } else { + this.status = LoadStatus.FAILED; + } + } + this.fileReadLines = this.context().newProgress().totalInputReaded(); + this.lastDuration += this.context().summary().totalTime(); + this.currDuration = 0L; + } finally { + this.lock.unlock(); + } + } + + public void stop() { + this.context().stopLoading(); + while (this.status.inRunning()) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // pass } } - this.fileReadLines = this.context().newProgress().totalInputReaded(); - this.duration += this.context().summary().totalTime(); + log.info("LoadTask {} stopped", this.id); } public void restoreContext() { - Ex.check(this.options != null, - "The load options shouldn't be null"); + Ex.check(this.options != null, "The load options shouldn't be null"); this.loader = new HugeGraphLoader(this.options); } public LoadContext context() { + Ex.check(this.loader != null, "loader shouldn't be null"); return this.loader.context(); } @JsonProperty("load_progress") public int getLoadProgress() { - if (this.fileTotalLines == null || this.fileReadLines == null || - this.fileTotalLines == 0) { + if (this.fileTotalLines == null || this.fileTotalLines == 0) { return 0; } Ex.check(this.fileTotalLines >= this.fileReadLines, @@ -205,14 +217,26 @@ public int getLoadProgress() { return (int) (0.5 + (double) this.fileReadLines / this.fileTotalLines * 100); } + @JsonProperty("duration") + @JsonSerialize(using = SerializeUtil.DurationSerializer.class) + public Long getDuration() { + this.lock.lock(); + try { + return this.lastDuration + this.currDuration; + } finally { + this.lock.unlock(); + } + } + @JsonProperty("load_rate") public String getLoadRate() { + long readLines = this.fileReadLines; + long duration = this.getDuration(); float rate; - if (this.fileReadLines == null || this.duration == null || - this.duration == 0L) { + if (readLines == 0L || duration == 0L) { rate = 0; } else { - rate = this.fileReadLines * 1000.0F / this.duration; + rate = readLines * 1000.0F / duration; rate = Math.round(rate * 1000) / 1000.0F; } return String.format("%s/s", rate); diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTask.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTask.java similarity index 98% rename from hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTask.java rename to hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTask.java index 7e7519f0..6e278e17 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTask.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTask.java @@ -17,7 +17,7 @@ * under the License. */ -package com.baidu.hugegraph.entity.algorithm; +package com.baidu.hugegraph.entity.task; import java.util.Date; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTaskResult.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTaskResult.java similarity index 97% rename from hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTaskResult.java rename to hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTaskResult.java index e0958299..aa26fbd9 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTaskResult.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTaskResult.java @@ -17,7 +17,7 @@ * under the License. */ -package com.baidu.hugegraph.entity.algorithm; +package com.baidu.hugegraph.entity.task; import com.baidu.hugegraph.annotation.MergeProperty; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/handler/LoadTaskExecutor.java b/hubble-be/src/main/java/com/baidu/hugegraph/handler/LoadTaskExecutor.java index 5914db01..e8801192 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/handler/LoadTaskExecutor.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/handler/LoadTaskExecutor.java @@ -31,8 +31,10 @@ public class LoadTaskExecutor { @Async - public void execute(LoadTask task) { - log.info("LoadTask is executing run task:{}", task.getId()); + public void execute(LoadTask task, Runnable callback) { + log.info("Executing task:{}", task.getId()); task.run(); + log.info("Executed task:{}", task.getId()); + callback.run(); } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/mapper/algorithm/AsyncTaskMapper.java b/hubble-be/src/main/java/com/baidu/hugegraph/mapper/algorithm/AsyncTaskMapper.java index d48152e7..17775190 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/mapper/algorithm/AsyncTaskMapper.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/mapper/algorithm/AsyncTaskMapper.java @@ -22,7 +22,7 @@ import org.apache.ibatis.annotations.Mapper; import org.springframework.stereotype.Component; -import com.baidu.hugegraph.entity.algorithm.AsyncTask; +import com.baidu.hugegraph.entity.task.AsyncTask; import com.baomidou.mybatisplus.core.mapper.BaseMapper; @Mapper diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/mapper/load/JobManagerMapper.java b/hubble-be/src/main/java/com/baidu/hugegraph/mapper/load/JobManagerMapper.java index c7074f3e..b01d9301 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/mapper/load/JobManagerMapper.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/mapper/load/JobManagerMapper.java @@ -19,8 +19,6 @@ package com.baidu.hugegraph.mapper.load; -import java.util.List; - import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -34,8 +32,9 @@ @Component public interface JobManagerMapper extends BaseMapper { - @Select("SELECT ISNULL(SUM(f.total_size),0) as total_size, ISNULL(SUM(l.duration),0) " + - "as duration FROM `load_task` as l LEFT JOIN `file_mapping` as f " + + @Select("SELECT ISNULL(SUM(f.total_size),0) as total_size, " + + "ISNULL(SUM(l.duration),0) as duration " + + "FROM `load_task` as l LEFT JOIN `file_mapping` as f " + "ON l.file_id=f.id WHERE l.job_id = #{job_id}") JobManagerItem computeSizeDuration(@Param("job_id") int job_id); } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/options/HubbleOptions.java b/hubble-be/src/main/java/com/baidu/hugegraph/options/HubbleOptions.java index 4300d545..cda0fe96 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/options/HubbleOptions.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/options/HubbleOptions.java @@ -165,7 +165,7 @@ public static synchronized HubbleOptions instance() { "upload_file.location", "The location of uploaded files.", disallowEmpty(), - "./upload-files" + "upload-files" ); public static final ConfigListOption UPLOAD_FILE_FORMAT_LIST = diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/AsyncTaskService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/AsyncTaskService.java index 1305890f..81cdfa7e 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/AsyncTaskService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/AsyncTaskService.java @@ -20,9 +20,7 @@ package com.baidu.hugegraph.service.algorithm; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; @@ -65,9 +63,7 @@ public IPage list(int connId, int pageNo, int pageSize, String content, } List list = client.task().list(status); List result = new ArrayList<>(); - Iterator tasks = list.iterator(); - while (tasks.hasNext()) { - Task task = tasks.next(); + for (Task task : list) { if (!type.isEmpty() && !type.equals(task.type())) { continue; } @@ -79,7 +75,7 @@ public IPage list(int connId, int pageNo, int pageSize, String content, } result.add(task); } - Collections.sort(result, Comparator.comparing(Task::createTime).reversed()); + result.sort(Comparator.comparing(Task::createTime).reversed()); return PageUtil.page(result, pageNo, pageSize); } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/OltpAlgoService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/OltpAlgoService.java index 0751813e..bb3fc747 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/OltpAlgoService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/OltpAlgoService.java @@ -1,3 +1,22 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package com.baidu.hugegraph.service.algorithm; import java.util.ArrayList; @@ -60,8 +79,8 @@ public GremlinResult shortestPath(int connId, ShortestPath body) { ExecuteStatus status = ExecuteStatus.SUCCESS; ExecuteHistory history; history = new ExecuteHistory(null, connId, 0L, ExecuteType.ALGORITHM, - body.toString(), status, AsyncTaskStatus.UNKNOWN, - -1L, createTime); + body.toString(), status, + AsyncTaskStatus.UNKNOWN, -1L, createTime); int rows = this.historyService.save(history); if (rows != 1) { throw new InternalException("entity.insert.failed", history); @@ -95,8 +114,7 @@ private GraphView buildPathGraphView(Path result) { Map vertices = new HashMap<>(); Map edges = new HashMap<>(); - List ids = new ArrayList<>(); - List elements = ((Path) result).objects(); + List elements = result.objects(); for (Object element : elements) { if (element instanceof Vertex) { Vertex vertex = (Vertex) element; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/FileMappingService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/FileMappingService.java index 98b84e8e..7ef7bd2b 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/FileMappingService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/FileMappingService.java @@ -48,6 +48,7 @@ import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.mapper.load.FileMappingMapper; import com.baidu.hugegraph.util.Ex; +import com.baidu.hugegraph.util.HubbleUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.Wrappers; @@ -112,31 +113,38 @@ public int remove(int id) { return this.mapper.deleteById(id); } - public FileUploadResult uploadFile(MultipartFile srcFile, int index, - String dirPath) { + public String generateFileToken(String fileName) { + return HubbleUtil.md5(fileName) + "-" + + HubbleUtil.nowTime().getEpochSecond(); + } + + public FileUploadResult uploadFile(MultipartFile srcFile, String token, + int index, String dirPath) { + FileUploadResult result = new FileUploadResult(); + // Current part saved path + String partName = srcFile.getOriginalFilename(); + result.setName(partName); + result.setSize(srcFile.getSize()); + + File destFile = new File(dirPath, partName + "-" + index); // File all parts saved path File dir = new File(dirPath); if (!dir.exists()) { dir.mkdirs(); } - // Current part saved path - String fileName = srcFile.getOriginalFilename(); - File destFile = new File(dirPath, fileName + "-" + index); if (destFile.exists()) { destFile.delete(); } - log.debug("Upload file {} length {}", fileName, srcFile.getSize()); - FileUploadResult result = new FileUploadResult(); - result.setName(fileName); - result.setSize(srcFile.getSize()); + log.debug("Uploading file {} length {}", partName, srcFile.getSize()); try { // transferTo should accept absolute path srcFile.transferTo(destFile.getAbsoluteFile()); result.setStatus(FileUploadResult.Status.SUCCESS); + log.info("Uploaded file part {}", partName + "-" + index); } catch (Exception e) { - log.error("Failed to save upload file and insert file mapping " + - "record", e); + log.error("Failed to save upload file and insert " + + "file mapping record", e); result.setStatus(FileUploadResult.Status.FAILURE); result.setCause(e.getMessage()); } @@ -160,7 +168,8 @@ public boolean tryMergePartFiles(String dirPath, int total) { // Rename file to dest file FileUtils.moveFile(partFiles[0], newFile); } catch (IOException e) { - throw new InternalException("load.upload.move-file.failed"); + log.error(e); + throw new InternalException("load.upload.move-file.failed", e); } } else { Arrays.sort(partFiles, (o1, o2) -> { @@ -178,19 +187,22 @@ public boolean tryMergePartFiles(String dirPath, int total) { try (InputStream is = new FileInputStream(partFile)) { IOUtils.copy(is, os); } catch (IOException e) { + log.error(e); throw new InternalException( - "load.upload.merge-file.failed"); + "load.upload.merge-file.failed", e); } } } catch (IOException e) { - throw new InternalException("load.upload.merge-file.failed"); + log.error(e); + throw new InternalException("load.upload.merge-file.failed", e); } } // Delete origin directory try { FileUtils.forceDelete(dir); } catch (IOException e) { - throw new InternalException("load.upload.delete-temp-dir.failed"); + log.error(e); + throw new InternalException("load.upload.delete-temp-dir.failed", e); } // Rename file to dest file if (!newFile.renameTo(destFile)) { @@ -268,15 +280,27 @@ public String moveToNextLevelDir(FileMapping mapping) { public void deleteDiskFile(FileMapping mapping) { File file = new File(mapping.getPath()); - File parentDir = file.getParentFile(); - log.info("Prepare to delete directory {}", parentDir); - try { - FileUtils.forceDelete(parentDir); - } catch (IOException e) { - throw new InternalException("Failed to delete directory " + - "corresponded to the file id %s, " + - "please delete it manually", - e, mapping.getId()); + if (file.isDirectory()) { + log.info("Prepare to delete directory {}", file); + try { + FileUtils.forceDelete(file); + } catch (IOException e) { + throw new InternalException("Failed to delete directory " + + "corresponded to the file id %s, " + + "please delete it manually", + e, mapping.getId()); + } + } else { + File parentDir = file.getParentFile(); + log.info("Prepare to delete directory {}", parentDir); + try { + FileUtils.forceDelete(parentDir); + } catch (IOException e) { + throw new InternalException("Failed to delete parent directory " + + "corresponded to the file id %s, " + + "please delete it manually", + e, mapping.getId()); + } } } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/JobManagerService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/JobManagerService.java index 9b07a74f..fd762cbc 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/JobManagerService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/JobManagerService.java @@ -20,7 +20,6 @@ package com.baidu.hugegraph.service.load; import java.util.Date; -import java.util.Iterator; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; @@ -59,9 +58,9 @@ public JobManager get(int id) { return this.mapper.selectById(id); } - public JobManager getTask(String job_name, int connId) { + public JobManager getTask(String jobName, int connId) { QueryWrapper query = Wrappers.query(); - query.eq("job_name", job_name); + query.eq("job_name", jobName); query.eq("conn_id", connId); return this.mapper.selectOne(query); } @@ -83,9 +82,7 @@ public IPage list(int connId, int pageNo, int pageSize, String conte if (p.getJobStatus() == JobManagerStatus.IMPORTING) { List tasks = this.taskService.taskListByJob(p.getId()); JobManagerStatus status = JobManagerStatus.SUCCESS; - Iterator loadTasks = tasks.iterator(); - while (loadTasks.hasNext()) { - LoadTask loadTask = loadTasks.next(); + for (LoadTask loadTask : tasks) { if (loadTask.getStatus().inRunning() || loadTask.getStatus() == LoadStatus.PAUSED || loadTask.getStatus() == LoadStatus.STOPPED) { @@ -107,8 +104,8 @@ public IPage list(int connId, int pageNo, int pageSize, String conte } } } - Date endDate = (p.getJobStatus() == JobManagerStatus.FAILED || - p.getJobStatus() == JobManagerStatus.SUCCESS) ? + Date endDate = p.getJobStatus() == JobManagerStatus.FAILED || + p.getJobStatus() == JobManagerStatus.SUCCESS ? p.getUpdateTime() : HubbleUtil.nowDate(); p.setJobDuration(endDate.getTime() - p.getCreateTime().getTime()); }); diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/LoadTaskService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/LoadTaskService.java index 6514be60..eaf03348 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/LoadTaskService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/LoadTaskService.java @@ -42,12 +42,10 @@ import com.baidu.hugegraph.common.Constant; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.entity.GraphConnection; -import com.baidu.hugegraph.entity.enums.JobManagerStatus; import com.baidu.hugegraph.entity.enums.LoadStatus; import com.baidu.hugegraph.entity.load.EdgeMapping; import com.baidu.hugegraph.entity.load.FileMapping; import com.baidu.hugegraph.entity.load.FileSetting; -import com.baidu.hugegraph.entity.load.JobManager; import com.baidu.hugegraph.entity.load.ListFormat; import com.baidu.hugegraph.entity.load.LoadParameter; import com.baidu.hugegraph.entity.load.LoadTask; @@ -68,7 +66,6 @@ import com.baidu.hugegraph.service.schema.EdgeLabelService; import com.baidu.hugegraph.service.schema.VertexLabelService; import com.baidu.hugegraph.util.Ex; -import com.baidu.hugegraph.util.HubbleUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.Wrappers; @@ -101,10 +98,6 @@ public LoadTaskService() { this.taskContainer = new ConcurrentHashMap<>(); } - public Map getTaskContainer() { - return this.taskContainer; - } - public LoadTask get(int id) { return this.mapper.selectById(id); } @@ -158,23 +151,20 @@ public int remove(int id) { return this.mapper.deleteById(id); } - public List batchTasks(int job_id) { + public List batchTasks(int jobId) { QueryWrapper query = Wrappers.query(); - query.eq("job_id", job_id); + query.eq("job_id", jobId); return this.mapper.selectList(query); } public LoadTask start(GraphConnection connection, FileMapping fileMapping) { - connection = sslService.configSSL(this.config, connection); + connection = this.sslService.configSSL(this.config, connection); LoadTask task = this.buildLoadTask(connection, fileMapping); if (this.save(task) != 1) { throw new InternalException("entity.insert.failed", task); } - this.taskExecutor.execute(task); - - if (this.update(task) != 1) { - throw new InternalException("entity.update.failed", task); - } + // executed in other threads + this.taskExecutor.execute(task, () -> this.update(task)); // Save current load task this.taskContainer.put(task.getId(), task); return task; @@ -184,18 +174,19 @@ public LoadTask pause(int taskId) { LoadTask task = this.taskContainer.get(taskId); Ex.check(task.getStatus() == LoadStatus.RUNNING, "Can only pause the RUNNING task"); - LoadContext context = task.context(); - // Mark status as paused, should set before context.stopLoading() - task.setStatus(LoadStatus.PAUSED); - // Let HugeGraphLoader stop - context.stopLoading(); - - task.setFileReadLines(context.newProgress().totalInputReaded()); - task.setDuration(context.summary().totalTime()); - if (update(task) != 1) { - throw new InternalException("entity.update.failed", task); + task.lock.lock(); + try { + // Mark status as paused, should set before context.stopLoading() + task.setStatus(LoadStatus.PAUSED); + // Let HugeGraphLoader stop + task.stop(); + if (update(task) != 1) { + throw new InternalException("entity.update.failed", task); + } + this.taskContainer.remove(taskId); + } finally { + task.lock.unlock(); } - this.taskContainer.remove(taskId); return task; } @@ -204,16 +195,20 @@ public LoadTask resume(int taskId) { Ex.check(task.getStatus() == LoadStatus.PAUSED || task.getStatus() == LoadStatus.FAILED, "Can only resume the PAUSED or FAILED task"); - task.restoreContext(); - task.setStatus(LoadStatus.RUNNING); // Set work mode in incrental mode, load from last breakpoint - task.context().options().incrementalMode = true; - this.taskExecutor.execute(task); - - if (this.update(task) != 1) { - throw new InternalException("entity.update.failed", task); + task.getOptions().incrementalMode = true; + task.restoreContext(); + task.lock.lock(); + try { + task.setStatus(LoadStatus.RUNNING); + if (this.update(task) != 1) { + throw new InternalException("entity.update.failed", task); + } + this.taskExecutor.execute(task, () -> this.update(task)); + this.taskContainer.put(taskId, task); + } finally { + task.lock.unlock(); } - this.taskContainer.put(taskId, task); return task; } @@ -226,17 +221,18 @@ public LoadTask stop(int taskId) { Ex.check(task.getStatus() == LoadStatus.RUNNING || task.getStatus() == LoadStatus.PAUSED, "Can only stop the RUNNING or PAUSED task"); - LoadContext context = task.context(); - // Mark status as stopped - task.setStatus(LoadStatus.STOPPED); - context.stopLoading(); - - task.setFileReadLines(context.newProgress().totalInputReaded()); - task.setDuration(context.summary().totalTime()); - if (update(task) != 1) { - throw new InternalException("entity.update.failed", task); + task.lock.lock(); + try { + // Mark status as stopped + task.setStatus(LoadStatus.STOPPED); + task.stop(); + if (update(task) != 1) { + throw new InternalException("entity.update.failed", task); + } + this.taskContainer.remove(taskId); + } finally { + task.lock.unlock(); } - this.taskContainer.remove(taskId); return task; } @@ -245,17 +241,22 @@ public LoadTask retry(int taskId) { Ex.check(task.getStatus() == LoadStatus.FAILED || task.getStatus() == LoadStatus.STOPPED, "Can only retry the FAILED or STOPPED task"); - task.restoreContext(); - - task.setStatus(LoadStatus.RUNNING); // Set work mode in normal mode, load from begin - task.context().options().incrementalMode = false; - this.taskExecutor.execute(task); - - if (this.update(task) != 1) { - throw new InternalException("entity.update.failed", task); + task.getOptions().incrementalMode = false; + task.restoreContext(); + task.lock.lock(); + try { + task.setStatus(LoadStatus.RUNNING); + task.setLastDuration(0L); + task.setCurrDuration(0L); + if (this.update(task) != 1) { + throw new InternalException("entity.update.failed", task); + } + this.taskExecutor.execute(task, () -> this.update(task)); + this.taskContainer.put(taskId, task); + } finally { + task.lock.unlock(); } - this.taskContainer.put(taskId, task); return task; } @@ -294,19 +295,25 @@ public void pauseAllTasks() { * Update progress periodically */ @Async - @Scheduled(fixedDelay = 5 * 1000) + @Scheduled(fixedDelay = 1 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED) public void updateLoadTaskProgress() { - Map taskContainer = this.getTaskContainer(); - Iterator> iter = taskContainer.entrySet() - .iterator(); + Iterator> iter; + iter = this.taskContainer.entrySet().iterator(); iter.forEachRemaining(entry -> { LoadTask task = entry.getValue(); - LoadContext context = task.context(); - task.setFileReadLines(context.newProgress().totalInputReaded()); - task.setDuration(context.summary().totalTime()); - if (this.update(task) != 1) { - throw new InternalException("entity.update.failed", task); + task.lock.lock(); + try { + if (task.getStatus().inRunning()) { + LoadContext context = task.context(); + task.setFileReadLines(context.newProgress().totalInputReaded()); + task.setCurrDuration(context.summary().totalTime()); + if (this.update(task) != 1) { + throw new InternalException("entity.update.failed", task); + } + } + } finally { + task.lock.unlock(); } }); } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/query/ExecuteHistoryService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/query/ExecuteHistoryService.java index 5a371477..ad9e6e4f 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/query/ExecuteHistoryService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/query/ExecuteHistoryService.java @@ -79,9 +79,11 @@ public IPage list(int connId, long current, long pageSize) { if (p.getType().equals(ExecuteType.GREMLIN_ASYNC)) { try { Task task = client.task().get(p.getAsyncId()); - long endDate = task.updateTime() > 0 ? task.updateTime() : now.getLong(ChronoField.INSTANT_SECONDS); + long endDate = task.updateTime() > 0 ? task.updateTime() : + now.getLong(ChronoField.INSTANT_SECONDS); p.setDuration(endDate - task.createTime()); - p.setAsyncStatus(AsyncTaskStatus.valueOf(task.status().toUpperCase())); + p.setAsyncStatus(AsyncTaskStatus.valueOf( + task.status().toUpperCase())); } catch (Exception e) { p.setDuration(0L); p.setAsyncStatus(AsyncTaskStatus.UNKNOWN); @@ -99,7 +101,8 @@ public ExecuteHistory get(int connId, int id) { try { Task task = client.task().get(history.getAsyncId()); history.setDuration(task.updateTime() - task.createTime()); - history.setAsyncStatus(AsyncTaskStatus.valueOf(task.status().toUpperCase())); + history.setAsyncStatus(AsyncTaskStatus.valueOf( + task.status().toUpperCase())); } catch (Exception e) { history.setDuration(0L); history.setAsyncStatus(AsyncTaskStatus.UNKNOWN); diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/EdgeLabelService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/EdgeLabelService.java index 47d280b2..ce8d97e6 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/EdgeLabelService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/EdgeLabelService.java @@ -139,16 +139,14 @@ public void checkNotExist(String name, int connId) { public void add(EdgeLabelEntity entity, int connId) { HugeClient client = this.client(connId); EdgeLabel edgeLabel = convert(entity, client); - client.schema().addEdgeLabel(edgeLabel); - - List indexLabels = collectIndexLabels(entity, client); try { - this.piService.addBatch(indexLabels, client); + client.schema().addEdgeLabel(edgeLabel); } catch (Exception e) { - client.schema().removeEdgeLabel(edgeLabel.name()); throw new ExternalException("schema.edgelabel.create.failed", e, entity.getName()); } + List indexLabels = collectIndexLabels(entity, client); + this.piService.addBatch(indexLabels, client); } public void update(EdgeLabelUpdateEntity entity, int connId) { @@ -185,27 +183,20 @@ public void update(EdgeLabelUpdateEntity entity, int connId) { } } - // NOTE: property can append but doesn't support eliminate now - client.schema().appendEdgeLabel(edgeLabel); - - try { - this.piService.addBatch(addedIndexLabels, client); - } catch (Exception e) { - throw new ExternalException("schema.edgelabel.update.failed", e, - entity.getName()); - } - try { - this.piService.removeBatch(removedIndexLabelNames, client); + // NOTE: property can append but doesn't support eliminate now + client.schema().appendEdgeLabel(edgeLabel); } catch (Exception e) { throw new ExternalException("schema.edgelabel.update.failed", e, entity.getName()); } + this.piService.addBatch(addedIndexLabels, client); + this.piService.removeBatch(removedIndexLabelNames, client); } public void remove(String name, int connId) { HugeClient client = this.client(connId); - client.schema().removeEdgeLabel(name); + client.schema().removeEdgeLabelAsync(name); } public ConflictDetail checkConflict(ConflictCheckEntity entity, diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/PropertyIndexService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/PropertyIndexService.java index 902d66a2..684ba114 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/PropertyIndexService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/PropertyIndexService.java @@ -203,18 +203,19 @@ private PropertyIndex get(String name, int connId) { } } - public void addBatch(List indexLabels, HugeClient client) { + public List addBatch(List indexLabels, HugeClient client) { BiFunction func = (hugeClient, il) -> { return hugeClient.schema().addIndexLabelAsync(il); }; - addBatch(indexLabels, client, func, SchemaType.PROPERTY_INDEX); + return addBatch(indexLabels, client, func, + SchemaType.PROPERTY_INDEX); } - public void removeBatch(List indexLabels, HugeClient client) { + public List removeBatch(List indexLabels, HugeClient client) { BiFunction func = (hugeClient, name) -> { return hugeClient.schema().removeIndexLabelAsync(name); }; - removeBatch(indexLabels, client, func, SchemaType.PROPERTY_INDEX); + return removeBatch(indexLabels, client, func, SchemaType.PROPERTY_INDEX); } public void checkConflict(List entities, diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/VertexLabelService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/VertexLabelService.java index 61c67f1e..a675c7ee 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/VertexLabelService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/VertexLabelService.java @@ -149,15 +149,14 @@ public List getLinkEdgeLabels(String name, int connId) { public void add(VertexLabelEntity entity, int connId) { HugeClient client = this.client(connId); VertexLabel vertexLabel = convert(entity, client); - client.schema().addVertexLabel(vertexLabel); - - List indexLabels = collectIndexLabels(entity, client); try { - this.piService.addBatch(indexLabels, client); + client.schema().addVertexLabel(vertexLabel); } catch (Exception e) { - throw new ExternalException("schema.vertexlabel.create.failed", e, - entity.getName()); + throw new ExternalException("schema.vertexlabel.create.failed", + e, entity.getName()); } + List indexLabels = collectIndexLabels(entity, client); + this.piService.addBatch(indexLabels, client); } public void update(VertexLabelUpdateEntity entity, int connId) { @@ -194,28 +193,20 @@ public void update(VertexLabelUpdateEntity entity, int connId) { } } - // NOTE: property can append but doesn't support eliminate now - client.schema().appendVertexLabel(vertexLabel); - - try { - this.piService.addBatch(addedIndexLabels, client); - } catch (Exception e) { - // client.schema().eliminateVertexLabel(vertexLabel); - throw new ExternalException("schema.vertexlabel.update.failed", e, - entity.getName()); - } - try { - this.piService.removeBatch(removedIndexLabelNames, client); + // NOTE: property can append but doesn't support eliminate now + client.schema().appendVertexLabel(vertexLabel); } catch (Exception e) { throw new ExternalException("schema.vertexlabel.update.failed", e, entity.getName()); } + this.piService.addBatch(addedIndexLabels, client); + this.piService.removeBatch(removedIndexLabelNames, client); } public void remove(String name, int connId) { HugeClient client = this.client(connId); - client.schema().removeVertexLabel(name); + client.schema().removeVertexLabelAsync(name); } public boolean checkUsing(String name, int connId) { diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/util/HubbleUtil.java b/hubble-be/src/main/java/com/baidu/hugegraph/util/HubbleUtil.java index e5e52a87..2db09725 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/util/HubbleUtil.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/util/HubbleUtil.java @@ -25,6 +25,7 @@ import java.util.UUID; import java.util.regex.Pattern; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.CollectionUtils; public final class HubbleUtil { @@ -52,4 +53,8 @@ public static boolean equalCollection(Collection c1, Collection c2) { public static String generateSimpleId() { return UUID.randomUUID().toString(); } + + public static String md5(String rawText) { + return DigestUtils.md5Hex(rawText); + } } diff --git a/hubble-be/src/main/resources/database/schema.sql b/hubble-be/src/main/resources/database/schema.sql index 89af81bd..a9f50c41 100644 --- a/hubble-be/src/main/resources/database/schema.sql +++ b/hubble-be/src/main/resources/database/schema.sql @@ -80,9 +80,10 @@ CREATE TABLE IF NOT EXISTS `load_task` ( `vertices` VARCHAR(512) NOT NULL, `edges` VARCHAR(512) NOT NULL, `file_total_lines` LONG NOT NULL, - `file_read_lines` LONG NOT NULL, `load_status` TINYINT NOT NULL, - `duration` LONG NOT NULL, + `file_read_lines` LONG NOT NULL, + `last_duration` LONG NOT NULL, + `curr_duration` LONG NOT NULL, `create_time` DATETIME(6) NOT NULL, PRIMARY KEY (`id`) ); diff --git a/hubble-be/src/main/resources/i18n/messages.properties b/hubble-be/src/main/resources/i18n/messages.properties index 4c714be6..3bb435f7 100644 --- a/hubble-be/src/main/resources/i18n/messages.properties +++ b/hubble-be/src/main/resources/i18n/messages.properties @@ -110,6 +110,7 @@ schema.display-fields.cannot-be-nullable=The display fields of schema can't be n load.upload.files.at-least-one=Please select at least one file load.upload.file.cannot-be-empty=The upload file can't be empty load.upload.file.format.unsupported=The upload file format is unsupported +load.upload.file.name-token.unmatch=The upload file name doesn't match with token load.upload.files.cannot-dup=The upload files can't contain duplicates load.upload.file.exceed-single-size=The upload file has exceeded single limit size {0} load.upload.file.exceed-total-size=The upload file has exceeded total limit size {0} diff --git a/hubble-be/src/main/resources/i18n/messages_zh_CN.properties b/hubble-be/src/main/resources/i18n/messages_zh_CN.properties index 3dc6bc18..5545e4b5 100644 --- a/hubble-be/src/main/resources/i18n/messages_zh_CN.properties +++ b/hubble-be/src/main/resources/i18n/messages_zh_CN.properties @@ -110,6 +110,7 @@ schema.display-fields.cannot-be-nullable=展示内容不能是可空属性 load.upload.files.at-least-one=至少选择一个文件 load.upload.file.cannot-be-empty=不能上传空文件 load.upload.file.format.unsupported=上传文件的格式不支持 +load.upload.file.name-token.unmatch=上传文件的名称与 token 不匹配 load.upload.files.cannot-dup=上传的文件不能包含重复的 load.upload.file.exceed-single-size=上传文件大小超过了限制 {0} load.upload.file.exceed-total-size=上传文件总大小超过了限制 {0} diff --git a/hubble-dist/pom.xml b/hubble-dist/pom.xml index c57a7a03..66bc6e2f 100644 --- a/hubble-dist/pom.xml +++ b/hubble-dist/pom.xml @@ -5,7 +5,7 @@ hugegraph-hubble com.baidu.hugegraph - 1.3.0 + 1.5.0 4.0.0