diff --git a/hubble-be/pom.xml b/hubble-be/pom.xml index ae5de8e6..957a9e10 100644 --- a/hubble-be/pom.xml +++ b/hubble-be/pom.xml @@ -32,10 +32,6 @@ org.springframework.boot spring-boot-starter-cache - - org.springframework.boot - spring-boot-starter-aop - org.springframework.boot spring-boot-starter-test @@ -98,38 +94,11 @@ - - com.baidu.hugegraph - hugegraph-client - 1.8.10 - - - org.apache.logging.log4j - log4j-api - - - org.apache.logging.log4j - log4j-core - - - org.apache.logging.log4j - log4j-slf4j-impl - - - com.baidu.hugegraph - hugegraph-common - - - com.baidu.hugegraph hugegraph-loader - 0.10.4 + 0.10.5 - - com.baidu.hugegraph - hugegraph-client - org.apache.logging.log4j log4j-api diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/config/AsyncConfig.java b/hubble-be/src/main/java/com/baidu/hugegraph/config/AsyncConfig.java index 64355b27..28136b3d 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/config/AsyncConfig.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/config/AsyncConfig.java @@ -33,9 +33,9 @@ public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); - taskExecutor.setCorePoolSize(4); - taskExecutor.setMaxPoolSize(8); - taskExecutor.setQueueCapacity(16); + taskExecutor.setCorePoolSize(8); + taskExecutor.setMaxPoolSize(16); + taskExecutor.setQueueCapacity(1024); taskExecutor.initialize(); return taskExecutor; } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/GraphConnectionController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/GraphConnectionController.java index aec9cd37..75413cd0 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/GraphConnectionController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/GraphConnectionController.java @@ -42,7 +42,6 @@ import com.baidu.hugegraph.driver.HugeClient; import com.baidu.hugegraph.entity.GraphConnection; import com.baidu.hugegraph.exception.ExternalException; -import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.options.HubbleOptions; import com.baidu.hugegraph.service.GraphConnectionService; import com.baidu.hugegraph.service.HugeClientPoolService; @@ -149,9 +148,7 @@ public GraphConnection create(@RequestBody GraphConnection newEntity) { Ex.check(verifyResult.isEnabled(), Constant.STATUS_UNAUTHORIZED, verifyResult.getMessage()); - if (this.connService.save(newEntity) != 1) { - throw new InternalException("entity.insert.failed", newEntity); - } + this.connService.save(newEntity); this.poolService.put(newEntity, client); return newEntity; } @@ -180,9 +177,7 @@ public GraphConnection update(@PathVariable("id") int id, Ex.check(verifyResult.isEnabled(), Constant.STATUS_UNAUTHORIZED, verifyResult.getMessage()); - if (this.connService.update(entity) != 1) { - throw new InternalException("entity.update.failed", entity); - } + this.connService.update(entity); this.poolService.put(entity, client); return entity; } @@ -193,10 +188,7 @@ public GraphConnection delete(@PathVariable("id") int id) { if (oldEntity == null) { throw new ExternalException("graph-connection.not-exist.id", id); } - int rows = this.connService.remove(id); - if (rows != 1) { - throw new InternalException("entity.delete.failed", oldEntity); - } + this.connService.remove(id); this.poolService.remove(oldEntity); this.licenseService.updateAllGraphStatus(); return oldEntity; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/SettingController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/SettingController.java index a3563a68..2de06784 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/SettingController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/SettingController.java @@ -33,7 +33,6 @@ import com.baidu.hugegraph.common.Constant; import com.baidu.hugegraph.entity.UserInfo; -import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.service.UserInfoService; import com.baidu.hugegraph.util.E; @@ -62,16 +61,10 @@ public UserInfo config(@RequestParam(value = "locale", .username(username) .locale(locale) .build(); - int rows = this.service.save(userInfo); - if (rows == 0) { - throw new InternalException("entity.insert.failed"); - } + this.service.save(userInfo); } else { userInfo.setLocale(locale); - int rows = this.service.update(userInfo); - if (rows == 0) { - throw new InternalException("entity.update.failed"); - } + this.service.update(userInfo); } Cookie cookie = new Cookie(Constant.COOKIE_USER, userInfo.getUsername()); diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileMappingController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileMappingController.java index 5b27215e..bcfed8b6 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileMappingController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileMappingController.java @@ -37,17 +37,19 @@ import com.baidu.hugegraph.common.Constant; import com.baidu.hugegraph.controller.BaseController; +import com.baidu.hugegraph.entity.enums.JobStatus; import com.baidu.hugegraph.entity.load.EdgeMapping; import com.baidu.hugegraph.entity.load.ElementMapping; import com.baidu.hugegraph.entity.load.FileMapping; import com.baidu.hugegraph.entity.load.FileSetting; +import com.baidu.hugegraph.entity.load.JobManager; import com.baidu.hugegraph.entity.load.LoadParameter; import com.baidu.hugegraph.entity.load.VertexMapping; import com.baidu.hugegraph.entity.schema.EdgeLabelEntity; import com.baidu.hugegraph.entity.schema.VertexLabelEntity; import com.baidu.hugegraph.exception.ExternalException; -import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.service.load.FileMappingService; +import com.baidu.hugegraph.service.load.JobManagerService; import com.baidu.hugegraph.service.schema.EdgeLabelService; import com.baidu.hugegraph.service.schema.VertexLabelService; import com.baidu.hugegraph.util.Ex; @@ -67,6 +69,8 @@ public class FileMappingController extends BaseController { private EdgeLabelService elService; @Autowired private FileMappingService service; + @Autowired + private JobManagerService jobService; @GetMapping public IPage list(@PathVariable("connId") int connId, @@ -99,9 +103,7 @@ public void delete(@PathVariable("id") int id) { } this.service.deleteDiskFile(mapping); - if (this.service.remove(id) != 1) { - throw new InternalException("entity.delete.failed", mapping); - } + this.service.remove(id); } @DeleteMapping @@ -130,18 +132,14 @@ public FileMapping fileSetting(@PathVariable("id") int id, if (mapping == null) { throw new ExternalException("load.file-mapping.not-exist.id", id); } - // unescape \\t to \t - newEntity.unescapeDelimiterIfNeeded(); + // Change format to TEXT if needed + newEntity.changeFormatIfNeeded(); FileSetting oldEntity = mapping.getFileSetting(); FileSetting entity = this.mergeEntity(oldEntity, newEntity); mapping.setFileSetting(entity); // Read column names and values then fill it this.service.extractColumns(mapping); - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); - } - // escape \t to \\t - mapping.getFileSetting().escapeDelimiterIfNeeded(); + this.service.update(mapping); return mapping; } @@ -157,9 +155,7 @@ public FileMapping addVertexMapping(@PathVariable("connId") int connId, newEntity.setId(HubbleUtil.generateSimpleId()); mapping.getVertexMappings().add(newEntity); - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); - } + this.service.update(mapping); return mapping; } @@ -182,9 +178,7 @@ public FileMapping updateVertexMapping(@PathVariable("connId") int connId, Set vertexMappings = mapping.getVertexMappings(); vertexMappings.remove(vertexMapping); vertexMappings.add(newEntity); - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); - } + this.service.update(mapping); return mapping; } @@ -202,9 +196,7 @@ public FileMapping deleteVertexMapping(@PathVariable("id") int id, throw new ExternalException( "load.file-mapping.vertex-mapping.not-exist.id", vmid); } - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); - } + this.service.update(mapping); return mapping; } @@ -220,9 +212,7 @@ public FileMapping addEdgeMapping(@PathVariable("connId") int connId, newEntity.setId(HubbleUtil.generateSimpleId()); mapping.getEdgeMappings().add(newEntity); - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); - } + this.service.update(mapping); return mapping; } @@ -245,9 +235,7 @@ public FileMapping updateEdgeMapping(@PathVariable("connId") int connId, Set edgeMappings = mapping.getEdgeMappings(); edgeMappings.remove(edgeMapping); edgeMappings.add(newEntity); - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); - } + this.service.update(mapping); return mapping; } @@ -265,9 +253,7 @@ public FileMapping deleteEdgeMapping(@PathVariable("id") int id, throw new ExternalException( "load.file-mapping.edge-mapping.not-exist.id", emid); } - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); - } + this.service.update(mapping); return mapping; } @@ -282,12 +268,22 @@ public void loadParameter(@RequestBody LoadParameter newEntity) { LoadParameter oldEntity = mapping.getLoadParameter(); LoadParameter entity = this.mergeEntity(oldEntity, newEntity); mapping.setLoadParameter(entity); - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); - } + this.service.update(mapping); } } + @PutMapping("next-step") + public JobManager nextStep(@PathVariable("jobId") int jobId) { + JobManager jobEntity = this.jobService.get(jobId); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobStatus.MAPPING, + "job.manager.status.unexpected", + JobStatus.MAPPING, jobEntity.getJobStatus()); + jobEntity.setJobStatus(JobStatus.SETTING); + this.jobService.update(jobEntity); + return jobEntity; + } + private void checkVertexMappingValid(int connId, VertexMapping vertexMapping, FileMapping fileMapping) { VertexLabelEntity vl = this.vlService.get(vertexMapping.getLabel(), diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileUploadController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileUploadController.java index b1bea2c8..97dfb8dd 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileUploadController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/FileUploadController.java @@ -25,16 +25,21 @@ import java.io.File; import java.io.IOException; import java.nio.file.Paths; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; import org.apache.commons.io.FilenameUtils; +import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @@ -43,7 +48,7 @@ import com.baidu.hugegraph.common.Constant; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.entity.enums.FileMappingStatus; -import com.baidu.hugegraph.entity.enums.JobManagerStatus; +import com.baidu.hugegraph.entity.enums.JobStatus; import com.baidu.hugegraph.entity.load.FileMapping; import com.baidu.hugegraph.entity.load.FileUploadResult; import com.baidu.hugegraph.entity.load.JobManager; @@ -51,6 +56,7 @@ import com.baidu.hugegraph.options.HubbleOptions; import com.baidu.hugegraph.service.load.FileMappingService; import com.baidu.hugegraph.service.load.JobManagerService; +import com.baidu.hugegraph.util.CollectionUtil; import com.baidu.hugegraph.util.Ex; import com.baidu.hugegraph.util.FileUtil; import com.baidu.hugegraph.util.HubbleUtil; @@ -69,136 +75,188 @@ public class FileUploadController { @Autowired private JobManagerService jobService; + @GetMapping("token") + public Map fileToken(@PathVariable("connId") int connId, + @PathVariable("jobId") int jobId, + @RequestParam("names") + List fileNames) { + Ex.check(CollectionUtil.allUnique(fileNames), + "load.upload.file.duplicate-name"); + Map tokens = new HashMap<>(); + for (String fileName : fileNames) { + String token = this.service.generateFileToken(fileName); + Ex.check(!this.uploadingTokenLocks().containsKey(token), + "load.upload.file.token.existed"); + this.uploadingTokenLocks().put(token, new ReentrantReadWriteLock()); + tokens.put(fileName, token); + } + return tokens; + } + @PostMapping public FileUploadResult upload(@PathVariable("connId") int connId, @PathVariable("jobId") int jobId, @RequestParam("file") MultipartFile file, @RequestParam("name") String fileName, + @RequestParam("token") String token, @RequestParam("total") int total, @RequestParam("index") int index) { - // When front-end use multipart-upload mode, - // file.getOriginalFilename() is blob, not actual file name + this.checkTotalAndIndexValid(total, index); + this.checkFileNameMatchToken(fileName, token); JobManager jobEntity = this.jobService.get(jobId); this.checkFileValid(connId, jobId, jobEntity, file, fileName); - - String location = this.config.get(HubbleOptions.UPLOAD_FILE_LOCATION); - String path = Paths.get(CONN_PREIFX + connId, JOB_PREIFX + jobId) - .toString(); - this.ensureLocationExist(location, path); - // Before merge: upload-files/conn-1/verson_person.csv/part-1 - // After merge: upload-files/conn-1/file-mapping-1/verson_person.csv - String filePath = Paths.get(location, path, fileName).toString(); - // Check destFile exist - // Ex.check(!destFile.exists(), "load.upload.file.existed", fileName); - FileUploadResult result = this.service.uploadFile(file, index, filePath); - if (result.getStatus() == FileUploadResult.Status.FAILURE) { + if (jobEntity.getJobStatus() == JobStatus.DEFAULT) { + jobEntity.setJobStatus(JobStatus.UPLOADING); + this.jobService.update(jobEntity); + } + // Ensure location exist and generate file path + String filePath = this.generateFilePath(connId, jobId, fileName); + // Check this file deleted before + ReadWriteLock lock = this.uploadingTokenLocks().get(token); + FileUploadResult result; + if (lock == null) { + result = new FileUploadResult(); + result.setName(file.getOriginalFilename()); + result.setSize(file.getSize()); + result.setStatus(FileUploadResult.Status.FAILURE); + result.setCause("File has been deleted"); return result; } - // Verify the existence of fragmented files - FileMapping mapping = this.service.get(connId, jobId, fileName); - if (mapping != null) { - mapping.setJobId(jobId); - mapping.setFileStatus(FileMappingStatus.UPLOADING); - mapping.setFileIndex(mapping.getFileIndex() + "," + index); - mapping.setFileTotal(total); - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); + + lock.readLock().lock(); + try { + result = this.service.uploadFile(file, index, filePath); + if (result.getStatus() == FileUploadResult.Status.FAILURE) { + return result; } - } else { - mapping = new FileMapping(connId, fileName, filePath); - mapping.setJobId(jobId); - mapping.setFileStatus(FileMappingStatus.UPLOADING); - mapping.setFileIndex(String.valueOf(index)); - mapping.setFileTotal(total); - if (this.service.save(mapping) != 1) { - throw new InternalException("entity.insert.failed", mapping); + synchronized (this.service) { + // Verify the existence of fragmented files + FileMapping mapping = this.service.get(connId, jobId, fileName); + if (mapping == null) { + mapping = new FileMapping(connId, fileName, filePath); + mapping.setJobId(jobId); + mapping.setFileStatus(FileMappingStatus.UPLOADING); + this.service.save(mapping); + } else { + if (mapping.getFileStatus() == FileMappingStatus.COMPLETED) { + result.setId(mapping.getId()); + // Remove uploading file token + this.uploadingTokenLocks().remove(token); + return result; + } else { + mapping.setUpdateTime(HubbleUtil.nowDate()); + } + } + // Determine whether all the parts have been uploaded, then merge them + boolean merged = this.service.tryMergePartFiles(filePath, total); + if (!merged) { + this.service.update(mapping); + return result; + } + // Read column names and values then fill it + this.service.extractColumns(mapping); + mapping.setFileStatus(FileMappingStatus.COMPLETED); + mapping.setTotalLines(FileUtil.countLines(mapping.getPath())); + mapping.setTotalSize(FileUtils.sizeOf(new File(mapping.getPath()))); + + // Move to the directory corresponding to the file mapping Id + String newPath = this.service.moveToNextLevelDir(mapping); + // Update file mapping stored path + mapping.setPath(newPath); + this.service.update(mapping); + // Update Job Manager size + long jobSize = jobEntity.getJobSize() + mapping.getTotalSize(); + jobEntity.setJobSize(jobSize); + this.jobService.update(jobEntity); + result.setId(mapping.getId()); + // Remove uploading file token + this.uploadingTokenLocks().remove(token); } + return result; + } finally { + lock.readLock().unlock(); } - Integer mapId = mapping.getId(); - // Determine whether all the parts have been uploaded, then merge them - boolean merged = this.service.tryMergePartFiles(filePath, total); - if (merged) { - // Save file mapping - mapping = new FileMapping(connId, fileName, filePath); - // Read column names and values then fill it - this.service.extractColumns(mapping); - mapping.setId(mapId); - mapping.setFileStatus(FileMappingStatus.COMPLETED); - mapping.setTotalLines(FileUtil.countLines(mapping.getPath())); - mapping.setTotalSize(FileUtils.sizeOf(new File(mapping.getPath()))); - mapping.setCreateTime(HubbleUtil.nowDate()); - // Will generate mapping id - - // Move to the directory corresponding to the file mapping Id - String newPath = this.service.moveToNextLevelDir(mapping); - // Update file mapping stored path - mapping.setPath(newPath); - if (this.service.update(mapping) != 1) { - throw new InternalException("entity.update.failed", mapping); - } - // Update Job Manager size - long jobSize = jobEntity.getJobSize() + mapping.getTotalSize(); + } + + @DeleteMapping + public Boolean delete(@PathVariable("connId") int connId, + @PathVariable("jobId") int jobId, + @RequestParam("name") String fileName, + @RequestParam("token") String token) { + JobManager jobEntity = this.jobService.get(jobId); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobStatus.UPLOADING || + jobEntity.getJobStatus() == JobStatus.MAPPING || + jobEntity.getJobStatus() == JobStatus.SETTING, + "deleted.file.no-permission"); + FileMapping mapping = this.service.get(connId, jobId, fileName); + Ex.check(mapping != null, "load.file-mapping.not-exist.name", fileName); + + ReadWriteLock lock = this.uploadingTokenLocks().get(token); + if (lock != null) { + lock.writeLock().lock(); + } + try { + this.service.deleteDiskFile(mapping); + log.info("Prepare to remove file mapping {}", mapping.getId()); + this.service.remove(mapping.getId()); + long jobSize = jobEntity.getJobSize() - mapping.getTotalSize(); jobEntity.setJobSize(jobSize); - jobEntity.setJobStatus(JobManagerStatus.SETTING); - if (this.jobService.update(jobEntity) != 1) { - throw new InternalException("job-manager.entity.update.failed", - jobEntity); + this.jobService.update(jobEntity); + if (lock != null) { + this.uploadingTokenLocks().remove(token); + } + return true; + } finally { + if (lock != null) { + lock.writeLock().unlock(); } - result.setId(mapping.getId()); } - return result; } - @DeleteMapping - public Map delete(@PathVariable("connId") int connId, - @PathVariable("jobId") int jobId, - @RequestParam("names") - List fileNames) { - + @PutMapping("next-step") + public JobManager nextStep(@PathVariable("jobId") int jobId) { JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); - Ex.check(jobEntity.getJobStatus() == JobManagerStatus.SETTING, - "deleted.file.no-permission" ); - Ex.check(fileNames.size() > 0, "load.upload.files.at-least-one"); - Map result = new LinkedHashMap<>(); - for (String fileName : fileNames) { - FileMapping mapping = this.service.get(connId, fileName); - Ex.check(mapping != null, "load.file-mapping.not-exist.name", - fileName); - File destFile = new File(mapping.getPath()); - boolean deleted = destFile.delete(); - if (deleted) { - log.info("deleted file {}, prepare to remove file mapping {}", - destFile, mapping.getId()); - if (this.service.remove(mapping.getId()) != 1) { - throw new InternalException("entity.delete.failed", mapping); - } - log.info("removed file mapping {}", mapping.getId()); - long jobSize = jobEntity.getJobSize() - mapping.getTotalSize(); - jobEntity.setJobSize(jobSize); - if (this.jobService.update(jobEntity) != 1) { - throw new InternalException("job-manager.entity.update.failed", - jobEntity); - } - } - result.put(fileName, deleted); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobStatus.UPLOADING, + "job.manager.status.unexpected", + JobStatus.UPLOADING, jobEntity.getJobStatus()); + jobEntity.setJobStatus(JobStatus.MAPPING); + this.jobService.update(jobEntity); + return jobEntity; + } + + private Map uploadingTokenLocks() { + return this.service.getUploadingTokenLocks(); + } + + private void checkTotalAndIndexValid(int total, int index) { + if (total <= 0) { + throw new InternalException("The request params 'total' must > 0"); + } + if (index < 0) { + throw new InternalException("The request params 'index' must >= 0"); } - return result; } + private void checkFileNameMatchToken(String fileName, String token) { + String md5Prefix = HubbleUtil.md5(fileName); + Ex.check(StringUtils.isNotEmpty(token) && token.startsWith(md5Prefix), + "load.upload.file.name-token.unmatch"); + } private void checkFileValid(int connId, int jobId, JobManager jobEntity, MultipartFile file, String fileName) { - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); - Ex.check(jobEntity.getJobStatus() == JobManagerStatus.DEFAULT || - jobEntity.getJobStatus() == JobManagerStatus.SETTING, - "load.upload.file.no-permission" ); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobStatus.DEFAULT || + jobEntity.getJobStatus() == JobStatus.UPLOADING || + jobEntity.getJobStatus() == JobStatus.MAPPING || + jobEntity.getJobStatus() == JobStatus.SETTING, + "load.upload.file.no-permission"); // Now allowed to upload empty file Ex.check(!file.isEmpty(), "load.upload.file.cannot-be-empty"); // Difficult: how to determine whether the file is csv or text - log.info("File content type: {}", file.getContentType()); + log.debug("File content type: {}", file.getContentType()); String format = FilenameUtils.getExtension(fileName); List formatWhiteList = this.config.get( @@ -230,6 +288,16 @@ private void checkFileValid(int connId, int jobId, JobManager jobEntity, FileUtils.byteCountToDisplaySize(totalFileSizeLimit)); } + private String generateFilePath(int connId, int jobId, String fileName) { + String location = this.config.get(HubbleOptions.UPLOAD_FILE_LOCATION); + String path = Paths.get(CONN_PREIFX + connId, JOB_PREIFX + jobId) + .toString(); + this.ensureLocationExist(location, path); + // Before merge: upload-files/conn-1/verson_person.csv/part-1 + // After merge: upload-files/conn-1/file-mapping-1/verson_person.csv + return Paths.get(location, path, fileName).toString(); + } + private void ensureLocationExist(String location, String connPath) { String path = Paths.get(location, connPath).toString(); File locationDir = new File(path); diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/JobManagerController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/JobManagerController.java index ace5789a..c68189f2 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/JobManagerController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/JobManagerController.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.List; -import org.datanucleus.util.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; @@ -36,7 +36,7 @@ import com.baidu.hugegraph.common.Constant; import com.baidu.hugegraph.common.Response; -import com.baidu.hugegraph.entity.enums.JobManagerStatus; +import com.baidu.hugegraph.entity.enums.JobStatus; import com.baidu.hugegraph.entity.enums.LoadStatus; import com.baidu.hugegraph.entity.load.FileMapping; import com.baidu.hugegraph.entity.load.JobManager; @@ -78,28 +78,27 @@ public JobManager create(@PathVariable("connId") int connId, Ex.check(entity.getJobName().length() <= 48, "job.manager.job-name.reached-limit"); Ex.check(entity.getJobName() != null, () -> - Constant.COMMON_NAME_PATTERN.matcher(entity.getJobName()).matches(), - "job.manager.job-name.unmatch-regex"); + Constant.COMMON_NAME_PATTERN.matcher( + entity.getJobName()).matches(), + "job.manager.job-name.unmatch-regex"); Ex.check(entity.getJobRemarks().length() <= 200, "job.manager.job-remarks.reached-limit"); Ex.check(!StringUtils.isEmpty(entity.getJobRemarks()), () -> - Constant.COMMON_REMARK_PATTERN.matcher(entity.getJobRemarks()).matches(), - "job.manager.job-remarks.unmatch-regex"); + Constant.COMMON_REMARK_PATTERN.matcher( + entity.getJobRemarks()).matches(), + "job.manager.job-remarks.unmatch-regex"); Ex.check(this.service.count() < LIMIT, "job.manager.reached-limit", LIMIT); if (this.service.getTask(entity.getJobName(), connId) != null) { throw new InternalException("job.manager.job-name.repeated"); } entity.setConnId(connId); - entity.setJobStatus(JobManagerStatus.DEFAULT); + entity.setJobStatus(JobStatus.DEFAULT); entity.setJobDuration(0L); entity.setJobSize(0L); entity.setUpdateTime(HubbleUtil.nowDate()); entity.setCreateTime(HubbleUtil.nowDate()); - int rows = this.service.save(entity); - if (rows != 1) { - throw new InternalException("entity.insert.failed", entity); - } + this.service.save(entity); } return entity; } @@ -110,9 +109,7 @@ public void delete(@PathVariable("id") int id) { if (task == null) { throw new ExternalException("job.manager.not-exist.id", id); } - if (this.service.remove(id) != 1) { - throw new InternalException("entity.delete.failed", task); - } + this.service.remove(id); } @GetMapping("{id}") @@ -153,8 +150,8 @@ public JobManager update(@PathVariable("id") int id, Ex.check(newEntity.getJobName().length() <= 48, "job.manager.job-name.reached-limit"); Ex.check(newEntity.getJobName() != null, () -> - Constant.COMMON_NAME_PATTERN - .matcher(newEntity.getJobName()).matches(), + Constant.COMMON_NAME_PATTERN.matcher( + newEntity.getJobName()).matches(), "job.manager.job-name.unmatch-regex"); Ex.check(newEntity.getJobRemarks().length() <= 200, "job.manager.job-remarks.reached-limit"); @@ -165,9 +162,7 @@ public JobManager update(@PathVariable("id") int id, } entity.setJobName(newEntity.getJobName()); entity.setJobRemarks(newEntity.getJobRemarks()); - if (this.service.update(entity) != 1) { - throw new InternalException("entity.update.failed", entity); - } + this.service.update(entity); return entity; } @@ -178,7 +173,7 @@ public Response reason(@PathVariable("connId") int connId, if (job == null) { throw new ExternalException("job.manager.not-exist.id", id); } - List tasks = taskService.batchTasks(job.getId()); + List tasks = this.taskService.batchTasks(job.getId()); List reasonResults = new ArrayList<>(); tasks.forEach(task -> { JobManagerReasonResult reasonResult = new JobManagerReasonResult(); diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/LoadTaskController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/LoadTaskController.java index ac7c7e32..6ae2585f 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/LoadTaskController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/load/LoadTaskController.java @@ -36,12 +36,11 @@ import com.baidu.hugegraph.common.Response; import com.baidu.hugegraph.controller.BaseController; import com.baidu.hugegraph.entity.GraphConnection; -import com.baidu.hugegraph.entity.enums.JobManagerStatus; +import com.baidu.hugegraph.entity.enums.JobStatus; import com.baidu.hugegraph.entity.load.FileMapping; import com.baidu.hugegraph.entity.load.JobManager; import com.baidu.hugegraph.entity.load.LoadTask; import com.baidu.hugegraph.exception.ExternalException; -import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.service.GraphConnectionService; import com.baidu.hugegraph.service.load.FileMappingService; import com.baidu.hugegraph.service.load.JobManagerService; @@ -107,18 +106,14 @@ public LoadTask create(@PathVariable("connId") int connId, @PathVariable("jobId") int jobId, @RequestBody LoadTask entity) { JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); - Ex.check(jobEntity.getJobStatus() == JobManagerStatus.SETTING, - "load.task.create.no-permission" ); - synchronized(this.service) { + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobStatus.SETTING, + "load.task.create.no-permission"); + synchronized (this.service) { Ex.check(this.service.count() < LIMIT, "load.task.reached-limit", LIMIT); entity.setConnId(connId); - int rows = this.service.save(entity); - if (rows != 1) { - throw new InternalException("entity.insert.failed", entity); - } + this.service.save(entity); } return entity; } @@ -129,9 +124,7 @@ public void delete(@PathVariable("id") int id) { if (task == null) { throw new ExternalException("load.task.not-exist.id", id); } - if (this.service.remove(id) != 1) { - throw new InternalException("entity.delete.failed", task); - } + this.service.remove(id); } @PostMapping("start") @@ -144,11 +137,9 @@ public List start(@PathVariable("connId") int connId, throw new ExternalException("graph-connection.not-exist.id", connId); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); - Ex.check((jobEntity.getJobStatus() == JobManagerStatus.SETTING || - jobEntity.getJobStatus() == JobManagerStatus.IMPORTING), - "load.task.start.no-permission" ); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobStatus.SETTING, + "load.task.start.no-permission"); List tasks = new ArrayList<>(); for (Integer fileId: fileIds) { @@ -158,12 +149,9 @@ public List start(@PathVariable("connId") int connId, } tasks.add(this.service.start(connection, fileMapping)); } - jobEntity.setJobStatus(JobManagerStatus.IMPORTING); - jobEntity.setUpdateTime( HubbleUtil.nowDate()); - if (this.jobService.update(jobEntity) != 1) { - throw new InternalException("job-manager.entity.update.failed", - jobEntity); - } + jobEntity.setJobStatus(JobStatus.LOADING); + jobEntity.setUpdateTime(HubbleUtil.nowDate()); + this.jobService.update(jobEntity); return tasks; } @@ -176,17 +164,13 @@ public LoadTask pause(@PathVariable("connId") int connId, throw new ExternalException("graph-connection.not-exist.id", connId); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); - Ex.check(jobEntity.getJobStatus() == JobManagerStatus.IMPORTING, + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobStatus.LOADING, "load.task.pause.no-permission"); LoadTask task = this.service.pause(taskId); - jobEntity.setJobStatus(JobManagerStatus.IMPORTING); - jobEntity.setUpdateTime( HubbleUtil.nowDate()); - if (this.jobService.update(jobEntity) != 1) { - throw new InternalException("job-manager.entity.update.failed", - jobEntity); - } + jobEntity.setJobStatus(JobStatus.LOADING); + jobEntity.setUpdateTime(HubbleUtil.nowDate()); + this.jobService.update(jobEntity); return task; } @@ -199,17 +183,13 @@ public LoadTask resume(@PathVariable("connId") int connId, throw new ExternalException("graph-connection.not-exist.id", connId); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); - Ex.check(jobEntity.getJobStatus() == JobManagerStatus.IMPORTING, + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobStatus.LOADING, "load.task.pause.no-permission"); LoadTask task = this.service.resume(taskId); - jobEntity.setJobStatus(JobManagerStatus.IMPORTING); - jobEntity.setUpdateTime( HubbleUtil.nowDate()); - if (this.jobService.update(jobEntity) != 1) { - throw new InternalException("job-manager.entity.update.failed", - jobEntity); - } + jobEntity.setJobStatus(JobStatus.LOADING); + jobEntity.setUpdateTime(HubbleUtil.nowDate()); + this.jobService.update(jobEntity); return task; } @@ -222,17 +202,13 @@ public LoadTask stop(@PathVariable("connId") int connId, throw new ExternalException("graph-connection.not-exist.id", connId); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); - Ex.check(jobEntity.getJobStatus() == JobManagerStatus.IMPORTING, + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobStatus.LOADING, "load.task.pause.no-permission"); LoadTask task = this.service.stop(taskId); - jobEntity.setJobStatus(JobManagerStatus.IMPORTING); - jobEntity.setUpdateTime( HubbleUtil.nowDate()); - if (this.jobService.update(jobEntity) != 1) { - throw new InternalException("job-manager.entity.update.failed", - jobEntity); - } + jobEntity.setJobStatus(JobStatus.LOADING); + jobEntity.setUpdateTime(HubbleUtil.nowDate()); + this.jobService.update(jobEntity); return task; } @@ -245,17 +221,13 @@ public LoadTask retry(@PathVariable("connId") int connId, throw new ExternalException("graph-connection.not-exist.id", connId); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); - Ex.check(jobEntity.getJobStatus() == JobManagerStatus.IMPORTING, + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); + Ex.check(jobEntity.getJobStatus() == JobStatus.LOADING, "load.task.pause.no-permission"); LoadTask task = this.service.retry(taskId); - jobEntity.setJobStatus(JobManagerStatus.IMPORTING); + jobEntity.setJobStatus(JobStatus.LOADING); jobEntity.setUpdateTime( HubbleUtil.nowDate()); - if (this.jobService.update(jobEntity) != 1) { - throw new InternalException("job-manager.entity.update.failed", - jobEntity); - } + this.jobService.update(jobEntity); return task; } @@ -268,11 +240,13 @@ public Response reason(@PathVariable("connId") int connId, throw new ExternalException("load.task.not-exist.id", id); } JobManager jobEntity = this.jobService.get(jobId); - Ex.check(jobEntity != null, - "job-manager.not-exist.id", jobId); + Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); Integer fileId = task.getFileId(); FileMapping mapping = this.fmService.get(fileId); String reason = this.service.readLoadFailedReason(mapping); - return Response.builder().status(200).data(reason).build(); + return Response.builder() + .status(Constant.STATUS_OK) + .data(reason) + .build(); } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/ExecuteHistoryController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/ExecuteHistoryController.java index e4378b66..a23cb681 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/ExecuteHistoryController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/ExecuteHistoryController.java @@ -30,7 +30,6 @@ import com.baidu.hugegraph.common.Constant; import com.baidu.hugegraph.entity.query.ExecuteHistory; import com.baidu.hugegraph.exception.ExternalException; -import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.service.query.ExecuteHistoryService; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -67,9 +66,7 @@ public ExecuteHistory delete(@PathVariable("connId") int connId, if (oldEntity == null) { throw new ExternalException("execute-history.not-exist.id", id); } - if (this.service.remove(connId, id) != 1) { - throw new InternalException("entity.delete.failed", oldEntity); - } + this.service.remove(connId, id); return oldEntity; } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/GremlinCollectionController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/GremlinCollectionController.java index eb961bd0..d67c5136 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/GremlinCollectionController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/GremlinCollectionController.java @@ -34,10 +34,9 @@ import com.baidu.hugegraph.common.Constant; import com.baidu.hugegraph.entity.query.GremlinCollection; import com.baidu.hugegraph.exception.ExternalException; -import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.service.query.GremlinCollectionService; -import com.baidu.hugegraph.util.HubbleUtil; import com.baidu.hugegraph.util.Ex; +import com.baidu.hugegraph.util.HubbleUtil; import com.baomidou.mybatisplus.core.metadata.IPage; @RestController @@ -107,10 +106,7 @@ public GremlinCollection create(@PathVariable("connId") int connId, synchronized(this.service) { Ex.check(this.service.count() < LIMIT, "gremlin-collection.reached-limit", LIMIT); - int rows = this.service.save(newEntity); - if (rows != 1) { - throw new InternalException("entity.insert.failed", newEntity); - } + this.service.save(newEntity); } return newEntity; } @@ -128,10 +124,7 @@ public GremlinCollection update(@PathVariable("id") int id, GremlinCollection entity = this.mergeEntity(oldEntity, newEntity); this.checkEntityUnique(entity, false); - int rows = this.service.update(entity); - if (rows != 1) { - throw new InternalException("entity.update.failed", entity); - } + this.service.update(entity); return entity; } @@ -141,10 +134,7 @@ public GremlinCollection delete(@PathVariable("id") int id) { if (oldEntity == null) { throw new ExternalException("gremlin-collection.not-exist.id", id); } - int rows = this.service.remove(id); - if (rows != 1) { - throw new InternalException("entity.delete.failed", oldEntity); - } + this.service.remove(id); return oldEntity; } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/GremlinQueryController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/GremlinQueryController.java index 6720814e..60c03c14 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/GremlinQueryController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/query/GremlinQueryController.java @@ -41,7 +41,6 @@ import com.baidu.hugegraph.entity.query.ExecuteHistory; import com.baidu.hugegraph.entity.query.GremlinQuery; import com.baidu.hugegraph.entity.query.GremlinResult; -import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.service.query.ExecuteHistoryService; import com.baidu.hugegraph.service.query.GremlinQueryService; import com.baidu.hugegraph.util.Ex; @@ -76,10 +75,7 @@ public GremlinResult execute(@PathVariable("connId") int connId, history = new ExecuteHistory(null, connId, 0L, ExecuteType.GREMLIN, query.getContent(), status, AsyncTaskStatus.UNKNOWN, -1L, createTime); - int rows = this.historyService.save(history); - if (rows != 1) { - throw new InternalException("entity.insert.failed", history); - } + this.historyService.save(history); StopWatch timer = StopWatch.createStarted(); try { @@ -94,10 +90,7 @@ public GremlinResult execute(@PathVariable("connId") int connId, long duration = timer.getTime(TimeUnit.MILLISECONDS); history.setStatus(status); history.setDuration(duration); - rows = this.historyService.update(history); - if (rows != 1) { - log.error("Failed to save execute history entity {}", history); - } + this.historyService.update(history); } } @@ -113,10 +106,7 @@ public ExecuteStatus executeAsyncTask(@PathVariable("connId") int connId, history = new ExecuteHistory(null, connId, 0L, ExecuteType.GREMLIN_ASYNC, query.getContent(), status, AsyncTaskStatus.UNKNOWN, -1L, createTime); - int rows = this.historyService.save(history); - if (rows != 1) { - throw new InternalException("entity.insert.failed", history); - } + this.historyService.save(history); StopWatch timer = StopWatch.createStarted(); long asyncId = 0L; @@ -133,10 +123,7 @@ public ExecuteStatus executeAsyncTask(@PathVariable("connId") int connId, history.setStatus(status); history.setDuration(duration); history.setAsyncId(asyncId); - rows = this.historyService.update(history); - if (rows != 1) { - log.error("Failed to save execute history entity {}", history); - } + this.historyService.update(history); } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/controller/AsyncTaskController.java b/hubble-be/src/main/java/com/baidu/hugegraph/controller/task/AsyncTaskController.java similarity index 99% rename from hubble-be/src/main/java/com/baidu/hugegraph/controller/AsyncTaskController.java rename to hubble-be/src/main/java/com/baidu/hugegraph/controller/task/AsyncTaskController.java index 0118d221..5cba1f0e 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/controller/AsyncTaskController.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/controller/task/AsyncTaskController.java @@ -17,7 +17,7 @@ * under the License. */ -package com.baidu.hugegraph.controller; +package com.baidu.hugegraph.controller.task; import java.util.List; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/AsyncTaskStatus.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/AsyncTaskStatus.java index 6bbde538..55ad62a6 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/AsyncTaskStatus.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/AsyncTaskStatus.java @@ -1,3 +1,22 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package com.baidu.hugegraph.entity.enums; import com.baomidou.mybatisplus.core.enums.IEnum; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/FileMappingStatus.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/FileMappingStatus.java index e30c1a4f..785e1285 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/FileMappingStatus.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/FileMappingStatus.java @@ -25,7 +25,9 @@ public enum FileMappingStatus implements IEnum { UPLOADING(0), - COMPLETED(1); + COMPLETED(1), + + FAILURE(2); private byte code; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/JobManagerStatus.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/JobStatus.java similarity index 84% rename from hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/JobManagerStatus.java rename to hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/JobStatus.java index 0c49c3d1..d737d321 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/JobManagerStatus.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/enums/JobStatus.java @@ -21,21 +21,25 @@ import com.baomidou.mybatisplus.core.enums.IEnum; -public enum JobManagerStatus implements IEnum { +public enum JobStatus implements IEnum { DEFAULT(0), - SUCCESS(1), + UPLOADING(1), - FAILED(2), + MAPPING(2), SETTING(3), - IMPORTING(4); + LOADING(4), + + SUCCESS(5), + + FAILED(6); private byte code; - JobManagerStatus(int code) { + JobStatus(int code) { assert code < 256; this.code = (byte) code; } @@ -45,11 +49,11 @@ public Byte getValue() { return this.code; } - public boolean isImporting() { - return this == IMPORTING; - } - public boolean isSetting() { return this == SETTING; } + + public boolean isLoading() { + return this == LOADING; + } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileMapping.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileMapping.java index 7719b518..7585c54c 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileMapping.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileMapping.java @@ -20,8 +20,10 @@ package com.baidu.hugegraph.entity.load; import java.util.Date; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; +import java.util.stream.Collectors; import com.baidu.hugegraph.annotation.MergeProperty; import com.baidu.hugegraph.entity.enums.FileMappingStatus; @@ -87,14 +89,6 @@ public class FileMapping { @JsonProperty("file_status") private FileMappingStatus fileStatus; - @TableField("file_total") - @JsonProperty("file_total") - private long fileTotal; - - @TableField("file_index") - @JsonProperty("file_index") - private String fileIndex; - @TableField(value = "file_setting", typeHandler = JacksonTypeHandler.class) @JsonProperty("file_setting") private FileSetting fileSetting; @@ -114,14 +108,14 @@ public class FileMapping { @JsonProperty("load_parameter") private LoadParameter loadParameter; - @TableField("last_access_time") - @JsonProperty("last_access_time") - private Date lastAccessTime; - @MergeProperty(useNew = false) @JsonProperty("create_time") private Date createTime; + @MergeProperty(useNew = false) + @JsonProperty("update_time") + private Date updateTime; + public FileMapping(int connId, String name, String path) { this(connId, name, path, HubbleUtil.nowDate()); } @@ -136,8 +130,8 @@ public FileMapping(int connId, String name, String path, this.vertexMappings = new LinkedHashSet<>(); this.edgeMappings = new LinkedHashSet<>(); this.loadParameter = new LoadParameter(); - this.lastAccessTime = lastAccessTime; - this.createTime = HubbleUtil.nowDate(); + this.createTime = lastAccessTime; + this.updateTime = lastAccessTime; } public VertexMapping getVertexMapping(String vmId) { @@ -157,4 +151,24 @@ public EdgeMapping getEdgeMapping(String emId) { } return null; } + + @JsonIgnore + public Set getVertexMappingLabels() { + if (this.getVertexMappings() == null) { + return new HashSet<>(); + } + return this.getVertexMappings().stream() + .map(ElementMapping::getLabel) + .collect(Collectors.toSet()); + } + + @JsonIgnore + public Set getEdgeMappingLabels() { + if (this.getEdgeMappings() == null) { + return new HashSet<>(); + } + return this.getEdgeMappings().stream() + .map(ElementMapping::getLabel) + .collect(Collectors.toSet()); + } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileSetting.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileSetting.java index 137c0aa6..85febb94 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileSetting.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/FileSetting.java @@ -19,12 +19,21 @@ package com.baidu.hugegraph.entity.load; +import java.io.IOException; import java.util.List; import com.baidu.hugegraph.annotation.MergeProperty; import com.baidu.hugegraph.common.Constant; import com.baidu.hugegraph.common.Mergeable; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; import lombok.AllArgsConstructor; import lombok.Builder; @@ -53,6 +62,8 @@ public class FileSetting implements Mergeable { @MergeProperty @JsonProperty("delimiter") + @JsonSerialize(using = DelimiterSerializer.class) + @JsonDeserialize(using = DelimiterDeserializer.class) private String delimiter = ","; @MergeProperty @@ -75,18 +86,45 @@ public class FileSetting implements Mergeable { @JsonProperty("list_format") private ListFormat listFormat = new ListFormat(); - public void unescapeDelimiterIfNeeded() { - if ("\\t".equals(this.delimiter)) { - this.delimiter = "\t"; - } + public void changeFormatIfNeeded() { if (!",".equals(this.delimiter)) { this.format = "TEXT"; } } - public void escapeDelimiterIfNeeded() { - if ("\t".equals(this.delimiter)) { - this.delimiter = "\\t"; + public static class DelimiterSerializer extends StdSerializer { + + protected DelimiterSerializer() { + super(String.class); + } + + @Override + public void serialize(String delimiter, JsonGenerator jsonGenerator, + SerializerProvider provider) throws IOException { + if ("\t".equals(delimiter)) { + jsonGenerator.writeString("\\t"); + } else { + jsonGenerator.writeString(delimiter); + } + } + } + + public static class DelimiterDeserializer extends StdDeserializer { + + protected DelimiterDeserializer() { + super(String.class); + } + + @Override + public String deserialize(JsonParser jsonParser, + DeserializationContext context) + throws IOException { + String delimiter = jsonParser.getText(); + if ("\\t".equals(delimiter)) { + return "\t"; + } else { + return delimiter; + } } } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManager.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManager.java index 53c1a783..40c8c909 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManager.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManager.java @@ -22,8 +22,7 @@ import java.util.Date; import com.baidu.hugegraph.annotation.MergeProperty; -import com.baidu.hugegraph.entity.enums.JobManagerStatus; -import com.baidu.hugegraph.entity.enums.LoadStatus; +import com.baidu.hugegraph.entity.enums.JobStatus; import com.baidu.hugegraph.util.SerializeUtil; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; @@ -75,7 +74,7 @@ public class JobManager { @TableField("job_status") @MergeProperty @JsonProperty("job_status") - private JobManagerStatus jobStatus; + private JobStatus jobStatus; @TableField("job_duration") @MergeProperty diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManagerReasonResult.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManagerReasonResult.java index 44011408..0cfdd7f6 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManagerReasonResult.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/JobManagerReasonResult.java @@ -20,7 +20,6 @@ package com.baidu.hugegraph.entity.load; import com.baidu.hugegraph.annotation.MergeProperty; -import com.baomidou.mybatisplus.annotation.TableField; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/LoadTask.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/LoadTask.java index d7c9023a..ce54ef79 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/LoadTask.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/load/LoadTask.java @@ -20,9 +20,9 @@ package com.baidu.hugegraph.entity.load; import java.util.Date; -import java.util.HashSet; import java.util.Set; -import java.util.stream.Collectors; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.baidu.hugegraph.annotation.MergeProperty; import com.baidu.hugegraph.entity.GraphConnection; @@ -40,7 +40,6 @@ import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonPropertyOrder; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import lombok.AllArgsConstructor; @@ -55,14 +54,19 @@ @AllArgsConstructor @Builder @TableName(value = "load_task", autoResultMap = true) -@JsonPropertyOrder({"id", "conn_id", "task_id" , "file_id", "file_name", "vertices", - "edges", "load_rate", "load_progress", "file_total_lines", - "file_read_lines", "status", "duration", "create_time"}) public class LoadTask implements Runnable { @TableField(exist = false) @JsonIgnore - private transient HugeGraphLoader loader; + private transient final Lock lock = new ReentrantLock(); + + @TableField(exist = false) + @JsonIgnore + private transient volatile HugeGraphLoader loader; + + @TableField(exist = false) + @JsonIgnore + private transient volatile boolean finished; @TableId(type = IdType.AUTO) @MergeProperty(useNew = false) @@ -109,21 +113,27 @@ public class LoadTask implements Runnable { @JsonProperty("file_total_lines") private Long fileTotalLines; + @TableField("load_status") + @MergeProperty + @JsonProperty("status") + private volatile LoadStatus status; + @TableField("file_read_lines") @MergeProperty @JsonProperty("file_read_lines") private Long fileReadLines; - @TableField("load_status") + @TableField("last_duration") @MergeProperty - @JsonProperty("status") - private LoadStatus status; + @JsonProperty("last_duration") + @JsonSerialize(using = SerializeUtil.DurationSerializer.class) + private Long lastDuration; - @TableField("duration") + @TableField("curr_duration") @MergeProperty - @JsonProperty("duration") + @JsonProperty("curr_duration") @JsonSerialize(using = SerializeUtil.DurationSerializer.class) - private Long duration; + private Long currDuration; @MergeProperty(useNew = false) @JsonProperty("create_time") @@ -132,70 +142,87 @@ public class LoadTask implements Runnable { public LoadTask(LoadOptions options, GraphConnection connection, FileMapping mapping) { this.loader = new HugeGraphLoader(options); + this.finished = false; this.id = null; this.connId = connection.getId(); this.jobId = mapping.getJobId(); this.fileId = mapping.getId(); this.fileName = mapping.getName(); this.options = options; - if (mapping.getVertexMappings() != null) { - this.vertices = mapping.getVertexMappings().stream() - .map(ElementMapping::getLabel) - .collect(Collectors.toSet()); - } else { - this.vertices = new HashSet<>(); - } - if (mapping.getEdgeMappings() != null) { - this.edges = mapping.getEdgeMappings().stream() - .map(ElementMapping::getLabel) - .collect(Collectors.toSet()); - } else { - this.edges = new HashSet<>(); - } + this.vertices = mapping.getVertexMappingLabels(); + this.edges = mapping.getEdgeMappingLabels(); this.fileTotalLines = mapping.getTotalLines(); - this.fileReadLines = 0L; this.status = LoadStatus.RUNNING; - this.duration = 0L; + this.fileReadLines = 0L; + this.lastDuration = 0L; + this.currDuration = 0L; this.createTime = HubbleUtil.nowDate(); } @Override public void run() { - log.info("LoadTaskMonitor is monitoring task : {}", this.id); - boolean succeed; + log.info("LoadTask is start running : {}", this.id); + boolean noError; try { - succeed = this.loader.load(); + noError = this.loader.load(); } catch (Throwable e) { - succeed = false; + noError = false; log.error("Run task {} failed. cause: {}", this.id, e.getMessage()); } - // Pay attention to whether the user stops actively or - // the program stops by itself - if (this.status.inRunning()) { - if (succeed) { - this.status = LoadStatus.SUCCEED; - } else { - this.status = LoadStatus.FAILED; + this.lock.lock(); + try { + // Pay attention to whether the user stops actively or + // the program stops by itself + if (this.status.inRunning()) { + if (noError) { + this.status = LoadStatus.SUCCEED; + } else { + this.status = LoadStatus.FAILED; + } } + this.fileReadLines = this.context().newProgress().totalInputReaded(); + this.lastDuration += this.context().summary().totalTime(); + this.currDuration = 0L; + } finally { + this.finished = true; + this.lock.unlock(); } - this.fileReadLines = this.context().newProgress().totalInputReaded(); - this.duration += this.context().summary().totalTime(); + } + + public void lock() { + this.lock.lock(); + } + + public void unlock() { + this.lock.unlock(); + } + + public void stop() { + this.context().stopLoading(); + while (!this.finished) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + // pass + } + } + this.loader = null; + log.info("LoadTask {} stopped", this.id); } public void restoreContext() { - Ex.check(this.options != null, - "The load options shouldn't be null"); + Ex.check(this.options != null, "The load options shouldn't be null"); this.loader = new HugeGraphLoader(this.options); } public LoadContext context() { + Ex.check(this.loader != null, "loader shouldn't be null"); return this.loader.context(); } @JsonProperty("load_progress") public int getLoadProgress() { - if (this.fileTotalLines == null || this.fileReadLines == null || - this.fileTotalLines == 0) { + if (this.fileTotalLines == null || this.fileTotalLines == 0) { return 0; } Ex.check(this.fileTotalLines >= this.fileReadLines, @@ -205,14 +232,26 @@ public int getLoadProgress() { return (int) (0.5 + (double) this.fileReadLines / this.fileTotalLines * 100); } + @JsonProperty("duration") + @JsonSerialize(using = SerializeUtil.DurationSerializer.class) + public Long getDuration() { + this.lock.lock(); + try { + return this.lastDuration + this.currDuration; + } finally { + this.lock.unlock(); + } + } + @JsonProperty("load_rate") public String getLoadRate() { + long readLines = this.fileReadLines; + long duration = this.getDuration(); float rate; - if (this.fileReadLines == null || this.duration == null || - this.duration == 0L) { - rate = 0; + if (readLines == 0L || duration == 0L) { + rate = 0.0F; } else { - rate = this.fileReadLines * 1000.0F / this.duration; + rate = readLines * 1000.0F / duration; rate = Math.round(rate * 1000) / 1000.0F; } return String.format("%s/s", rate); diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/query/ExecuteHistory.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/query/ExecuteHistory.java index e6e88cef..da41cec4 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/query/ExecuteHistory.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/query/ExecuteHistory.java @@ -89,4 +89,12 @@ public class ExecuteHistory implements Identifiable, Mergeable { @MergeProperty(useNew = false) @JsonProperty("create_time") private Date createTime; + + public void setAsyncStatus(AsyncTaskStatus status) { + this.asyncStatus = status; + } + + public void setAsyncStatus(String status) { + this.asyncStatus = AsyncTaskStatus.valueOf(status); + } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTask.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTask.java similarity index 98% rename from hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTask.java rename to hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTask.java index 7e7519f0..6e278e17 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTask.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTask.java @@ -17,7 +17,7 @@ * under the License. */ -package com.baidu.hugegraph.entity.algorithm; +package com.baidu.hugegraph.entity.task; import java.util.Date; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTaskResult.java b/hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTaskResult.java similarity index 97% rename from hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTaskResult.java rename to hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTaskResult.java index e0958299..aa26fbd9 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/entity/algorithm/AsyncTaskResult.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/entity/task/AsyncTaskResult.java @@ -17,7 +17,7 @@ * under the License. */ -package com.baidu.hugegraph.entity.algorithm; +package com.baidu.hugegraph.entity.task; import com.baidu.hugegraph.annotation.MergeProperty; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/handler/LoadTaskExecutor.java b/hubble-be/src/main/java/com/baidu/hugegraph/handler/LoadTaskExecutor.java index 5914db01..febac01e 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/handler/LoadTaskExecutor.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/handler/LoadTaskExecutor.java @@ -31,8 +31,10 @@ public class LoadTaskExecutor { @Async - public void execute(LoadTask task) { - log.info("LoadTask is executing run task:{}", task.getId()); + public void execute(LoadTask task, Runnable callback) { + log.info("Executing task: {}", task.getId()); task.run(); + log.info("Executed task: {}, update status to db", task.getId()); + callback.run(); } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/mapper/algorithm/AsyncTaskMapper.java b/hubble-be/src/main/java/com/baidu/hugegraph/mapper/algorithm/AsyncTaskMapper.java index d48152e7..17775190 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/mapper/algorithm/AsyncTaskMapper.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/mapper/algorithm/AsyncTaskMapper.java @@ -22,7 +22,7 @@ import org.apache.ibatis.annotations.Mapper; import org.springframework.stereotype.Component; -import com.baidu.hugegraph.entity.algorithm.AsyncTask; +import com.baidu.hugegraph.entity.task.AsyncTask; import com.baomidou.mybatisplus.core.mapper.BaseMapper; @Mapper diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/mapper/load/JobManagerMapper.java b/hubble-be/src/main/java/com/baidu/hugegraph/mapper/load/JobManagerMapper.java index c7074f3e..b01d9301 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/mapper/load/JobManagerMapper.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/mapper/load/JobManagerMapper.java @@ -19,8 +19,6 @@ package com.baidu.hugegraph.mapper.load; -import java.util.List; - import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -34,8 +32,9 @@ @Component public interface JobManagerMapper extends BaseMapper { - @Select("SELECT ISNULL(SUM(f.total_size),0) as total_size, ISNULL(SUM(l.duration),0) " + - "as duration FROM `load_task` as l LEFT JOIN `file_mapping` as f " + + @Select("SELECT ISNULL(SUM(f.total_size),0) as total_size, " + + "ISNULL(SUM(l.duration),0) as duration " + + "FROM `load_task` as l LEFT JOIN `file_mapping` as f " + "ON l.file_id=f.id WHERE l.job_id = #{job_id}") JobManagerItem computeSizeDuration(@Param("job_id") int job_id); } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/options/HubbleOptions.java b/hubble-be/src/main/java/com/baidu/hugegraph/options/HubbleOptions.java index 4300d545..5377986b 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/options/HubbleOptions.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/options/HubbleOptions.java @@ -165,7 +165,7 @@ public static synchronized HubbleOptions instance() { "upload_file.location", "The location of uploaded files.", disallowEmpty(), - "./upload-files" + "upload-files" ); public static final ConfigListOption UPLOAD_FILE_FORMAT_LIST = @@ -192,6 +192,16 @@ public static synchronized HubbleOptions instance() { 10 * Bytes.GB ); + public static final ConfigOption UPLOAD_FILE_MAX_TIME_CONSUMING = + new ConfigOption<>( + "upload_file.max_uploading_time", + "The maximum allowable uploading time(second) for file " + + "uploads, the uploaded file parts will be cleared if " + + "exceed this time", + positiveInt(), + 12L * 60 * 60 + ); + public static final ConfigOption SERVER_PROTOCOL = new ConfigOption<>( "server.protocol", diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/GraphConnectionService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/GraphConnectionService.java index 1652b612..3073acf3 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/GraphConnectionService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/GraphConnectionService.java @@ -28,6 +28,7 @@ import org.springframework.util.StringUtils; import com.baidu.hugegraph.entity.GraphConnection; +import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.mapper.GraphConnectionMapper; import com.baidu.hugegraph.util.SQLUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -67,17 +68,23 @@ public int count() { } @Transactional(isolation = Isolation.READ_COMMITTED) - public int save(GraphConnection connection) { - return this.mapper.insert(connection); + public void save(GraphConnection connection) { + if (this.mapper.insert(connection) != 1) { + throw new InternalException("entity.insert.failed", connection); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int update(GraphConnection connection) { - return this.mapper.updateById(connection); + public void update(GraphConnection connection) { + if (this.mapper.updateById(connection) != 1) { + throw new InternalException("entity.update.failed", connection); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int remove(int id) { - return this.mapper.deleteById(id); + public void remove(int id) { + if (this.mapper.deleteById(id) != 1) { + throw new InternalException("entity.delete.failed", id); + } } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/UserInfoService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/UserInfoService.java index 1c2db392..5c79e0c7 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/UserInfoService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/UserInfoService.java @@ -23,6 +23,7 @@ import org.springframework.stereotype.Service; import com.baidu.hugegraph.entity.UserInfo; +import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.mapper.UserInfoMapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; @@ -39,11 +40,15 @@ public UserInfo getByName(String name) { return this.mapper.selectOne(query); } - public int save(UserInfo userInfo) { - return this.mapper.insert(userInfo); + public void save(UserInfo userInfo) { + if (this.mapper.insert(userInfo) != 1) { + throw new InternalException("entity.insert.failed", userInfo); + } } - public int update(UserInfo userInfo) { - return this.mapper.updateById(userInfo); + public void update(UserInfo userInfo) { + if (this.mapper.updateById(userInfo) != 1) { + throw new InternalException("entity.update.failed", userInfo); + } } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/AsyncTaskService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/AsyncTaskService.java index 1305890f..76cb1a99 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/AsyncTaskService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/AsyncTaskService.java @@ -20,9 +20,7 @@ package com.baidu.hugegraph.service.algorithm; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; @@ -63,11 +61,9 @@ public IPage list(int connId, int pageNo, int pageSize, String content, if (status.isEmpty()) { status = null; } - List list = client.task().list(status); + List tasks = client.task().list(status); List result = new ArrayList<>(); - Iterator tasks = list.iterator(); - while (tasks.hasNext()) { - Task task = tasks.next(); + for (Task task : tasks) { if (!type.isEmpty() && !type.equals(task.type())) { continue; } @@ -79,7 +75,7 @@ public IPage list(int connId, int pageNo, int pageSize, String content, } result.add(task); } - Collections.sort(result, Comparator.comparing(Task::createTime).reversed()); + result.sort(Comparator.comparing(Task::createTime).reversed()); return PageUtil.page(result, pageNo, pageSize); } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/OltpAlgoService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/OltpAlgoService.java index 0751813e..865525cc 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/OltpAlgoService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/algorithm/OltpAlgoService.java @@ -1,3 +1,22 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package com.baidu.hugegraph.service.algorithm; import java.util.ArrayList; @@ -20,7 +39,6 @@ import com.baidu.hugegraph.entity.query.GremlinResult; import com.baidu.hugegraph.entity.query.JsonView; import com.baidu.hugegraph.entity.query.TableView; -import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.service.HugeClientPoolService; import com.baidu.hugegraph.service.query.ExecuteHistoryService; import com.baidu.hugegraph.structure.graph.Edge; @@ -60,12 +78,9 @@ public GremlinResult shortestPath(int connId, ShortestPath body) { ExecuteStatus status = ExecuteStatus.SUCCESS; ExecuteHistory history; history = new ExecuteHistory(null, connId, 0L, ExecuteType.ALGORITHM, - body.toString(), status, AsyncTaskStatus.UNKNOWN, - -1L, createTime); - int rows = this.historyService.save(history); - if (rows != 1) { - throw new InternalException("entity.insert.failed", history); - } + body.toString(), status, + AsyncTaskStatus.UNKNOWN, -1L, createTime); + this.historyService.save(history); return GremlinResult.builder() .type(GremlinResult.Type.PATH) .jsonView(jsonView) @@ -95,8 +110,7 @@ private GraphView buildPathGraphView(Path result) { Map vertices = new HashMap<>(); Map edges = new HashMap<>(); - List ids = new ArrayList<>(); - List elements = ((Path) result).objects(); + List elements = result.objects(); for (Object element : elements) { if (element instanceof Vertex) { Vertex vertex = (Vertex) element; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/graph/GraphService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/graph/GraphService.java index 86d2f3d3..21bf7a95 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/graph/GraphService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/graph/GraphService.java @@ -193,7 +193,7 @@ private void fillProperties(int connId, SchemaLabelEntity schema, } } - private class EdgeHolder { + private static class EdgeHolder { private Edge edge; private Vertex source; diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/FileMappingService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/FileMappingService.java index 98b84e8e..cdd14e92 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/FileMappingService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/FileMappingService.java @@ -30,24 +30,35 @@ import java.io.OutputStream; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Date; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; import java.util.regex.Pattern; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.multipart.MultipartFile; +import com.baidu.hugegraph.config.HugeConfig; +import com.baidu.hugegraph.entity.enums.FileMappingStatus; import com.baidu.hugegraph.entity.load.FileMapping; import com.baidu.hugegraph.entity.load.FileSetting; import com.baidu.hugegraph.entity.load.FileUploadResult; import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.mapper.load.FileMappingMapper; +import com.baidu.hugegraph.options.HubbleOptions; import com.baidu.hugegraph.util.Ex; +import com.baidu.hugegraph.util.HubbleUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.Wrappers; @@ -63,17 +74,23 @@ public class FileMappingService { public static final String JOB_PREIFX = "job-"; public static final String FILE_PREIFX = "file-mapping-"; + @Autowired + private HugeConfig config; @Autowired private FileMappingMapper mapper; - public FileMapping get(int id) { - return this.mapper.selectById(id); + private final Map uploadingTokenLocks; + + public FileMappingService() { + this.uploadingTokenLocks = new ConcurrentHashMap<>(); } - public FileMapping get(int connId, String fileName) { - QueryWrapper query = Wrappers.query(); - query.eq("conn_id", connId).eq("name", fileName); - return this.mapper.selectOne(query); + public Map getUploadingTokenLocks() { + return this.uploadingTokenLocks; + } + + public FileMapping get(int id) { + return this.mapper.selectById(id); } public FileMapping get(int connId, int jobId, String fileName) { @@ -92,51 +109,65 @@ public IPage list(int connId, int jobId, int pageNo, int pageSize) QueryWrapper query = Wrappers.query(); query.eq("conn_id", connId); query.eq("job_id", jobId); + query.eq("file_status", FileMappingStatus.COMPLETED.getValue()); query.orderByDesc("create_time"); Page page = new Page<>(pageNo, pageSize); return this.mapper.selectPage(page, query); } @Transactional(isolation = Isolation.READ_COMMITTED) - public int save(FileMapping mapping) { - return this.mapper.insert(mapping); + public void save(FileMapping mapping) { + if (this.mapper.insert(mapping) != 1) { + throw new InternalException("entity.insert.failed", mapping); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int update(FileMapping mapping) { - return this.mapper.updateById(mapping); + public void update(FileMapping mapping) { + if (this.mapper.updateById(mapping) != 1) { + throw new InternalException("entity.update.failed", mapping); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int remove(int id) { - return this.mapper.deleteById(id); + public void remove(int id) { + if (this.mapper.deleteById(id) != 1) { + throw new InternalException("entity.delete.failed", id); + } + } + + public String generateFileToken(String fileName) { + return HubbleUtil.md5(fileName) + "-" + + HubbleUtil.nowTime().getEpochSecond(); } public FileUploadResult uploadFile(MultipartFile srcFile, int index, String dirPath) { + FileUploadResult result = new FileUploadResult(); + // Current part saved path + String partName = srcFile.getOriginalFilename(); + result.setName(partName); + result.setSize(srcFile.getSize()); + + File destFile = new File(dirPath, partName + "-" + index); // File all parts saved path File dir = new File(dirPath); if (!dir.exists()) { dir.mkdirs(); } - // Current part saved path - String fileName = srcFile.getOriginalFilename(); - File destFile = new File(dirPath, fileName + "-" + index); if (destFile.exists()) { destFile.delete(); } - log.debug("Upload file {} length {}", fileName, srcFile.getSize()); - FileUploadResult result = new FileUploadResult(); - result.setName(fileName); - result.setSize(srcFile.getSize()); + log.debug("Uploading file {} length {}", partName, srcFile.getSize()); try { // transferTo should accept absolute path srcFile.transferTo(destFile.getAbsoluteFile()); result.setStatus(FileUploadResult.Status.SUCCESS); + log.debug("Uploaded file part {}-{}", partName, index); } catch (Exception e) { - log.error("Failed to save upload file and insert file mapping " + - "record", e); + log.error("Failed to save upload file and insert " + + "file mapping record", e); result.setStatus(FileUploadResult.Status.FAILURE); result.setCause(e.getMessage()); } @@ -160,7 +191,9 @@ public boolean tryMergePartFiles(String dirPath, int total) { // Rename file to dest file FileUtils.moveFile(partFiles[0], newFile); } catch (IOException e) { - throw new InternalException("load.upload.move-file.failed"); + log.error("Failed to rename file from {} to {}", + partFiles[0], newFile, e); + throw new InternalException("load.upload.move-file.failed", e); } } else { Arrays.sort(partFiles, (o1, o2) -> { @@ -178,19 +211,24 @@ public boolean tryMergePartFiles(String dirPath, int total) { try (InputStream is = new FileInputStream(partFile)) { IOUtils.copy(is, os); } catch (IOException e) { + log.error("Failed to copy file stream from {} to {}", + partFile, newFile, e); throw new InternalException( - "load.upload.merge-file.failed"); + "load.upload.merge-file.failed", e); } } } catch (IOException e) { - throw new InternalException("load.upload.merge-file.failed"); + log.error("Failed to copy all file-parts stream to {}", + newFile, e); + throw new InternalException("load.upload.merge-file.failed", e); } } // Delete origin directory try { FileUtils.forceDelete(dir); } catch (IOException e) { - throw new InternalException("load.upload.delete-temp-dir.failed"); + log.error("Failed to force delete file {}", dir, e); + throw new InternalException("load.upload.delete-temp-dir.failed", e); } // Rename file to dest file if (!newFile.renameTo(destFile)) { @@ -268,15 +306,61 @@ public String moveToNextLevelDir(FileMapping mapping) { public void deleteDiskFile(FileMapping mapping) { File file = new File(mapping.getPath()); - File parentDir = file.getParentFile(); - log.info("Prepare to delete directory {}", parentDir); - try { - FileUtils.forceDelete(parentDir); - } catch (IOException e) { - throw new InternalException("Failed to delete directory " + - "corresponded to the file id %s, " + - "please delete it manually", - e, mapping.getId()); + if (file.isDirectory()) { + log.info("Prepare to delete directory {}", file); + try { + FileUtils.forceDelete(file); + } catch (IOException e) { + throw new InternalException("Failed to delete directory " + + "corresponded to the file id %s, " + + "please delete it manually", + e, mapping.getId()); + } + } else { + File parentDir = file.getParentFile(); + log.info("Prepare to delete directory {}", parentDir); + try { + FileUtils.forceDelete(parentDir); + } catch (IOException e) { + throw new InternalException("Failed to delete parent directory " + + "corresponded to the file id %s, " + + "please delete it manually", + e, mapping.getId()); + } + } + } + + @Async + @Scheduled(fixedRate = 10 * 60 * 1000) + public void deleteUnfinishedFile() { + QueryWrapper query = Wrappers.query(); + query.in("file_status", FileMappingStatus.UPLOADING.getValue()); + List mappings = this.mapper.selectList(query); + long threshold = this.config.get( + HubbleOptions.UPLOAD_FILE_MAX_TIME_CONSUMING) * 1000; + Date now = HubbleUtil.nowDate(); + for (FileMapping mapping : mappings) { + Date updateTime = mapping.getUpdateTime(); + long duration = now.getTime() - updateTime.getTime(); + if (duration > threshold) { + String filePath = mapping.getPath(); + try { + FileUtils.forceDelete(new File(filePath)); + } catch (IOException e) { + log.warn("Failed to delete expired uploading file {}", + filePath, e); + } + this.remove(mapping.getId()); + // Delete corresponding uploading tokens + Iterator> iter; + iter = this.uploadingTokenLocks.entrySet().iterator(); + iter.forEachRemaining(entry -> { + String token = entry.getKey(); + if (token.startsWith(mapping.getName())) { + iter.remove(); + } + }); + } } } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/JobManagerService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/JobManagerService.java index 9b07a74f..c77aa908 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/JobManagerService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/JobManagerService.java @@ -20,7 +20,6 @@ package com.baidu.hugegraph.service.load; import java.util.Date; -import java.util.Iterator; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; @@ -28,7 +27,7 @@ import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Transactional; -import com.baidu.hugegraph.entity.enums.JobManagerStatus; +import com.baidu.hugegraph.entity.enums.JobStatus; import com.baidu.hugegraph.entity.enums.LoadStatus; import com.baidu.hugegraph.entity.load.JobManager; import com.baidu.hugegraph.entity.load.LoadTask; @@ -59,9 +58,9 @@ public JobManager get(int id) { return this.mapper.selectById(id); } - public JobManager getTask(String job_name, int connId) { + public JobManager getTask(String jobName, int connId) { QueryWrapper query = Wrappers.query(); - query.eq("job_name", job_name); + query.eq("job_name", jobName); query.eq("conn_id", connId); return this.mapper.selectOne(query); } @@ -79,38 +78,33 @@ public IPage list(int connId, int pageNo, int pageSize, String conte query.orderByDesc("create_time"); Page page = new Page<>(pageNo, pageSize); IPage list = this.mapper.selectPage(page, query); - list.getRecords().forEach((p) -> { - if (p.getJobStatus() == JobManagerStatus.IMPORTING) { - List tasks = this.taskService.taskListByJob(p.getId()); - JobManagerStatus status = JobManagerStatus.SUCCESS; - Iterator loadTasks = tasks.iterator(); - while (loadTasks.hasNext()) { - LoadTask loadTask = loadTasks.next(); + list.getRecords().forEach(task -> { + if (task.getJobStatus() == JobStatus.LOADING) { + List tasks = this.taskService.taskListByJob(task.getId()); + JobStatus status = JobStatus.SUCCESS; + for (LoadTask loadTask : tasks) { if (loadTask.getStatus().inRunning() || loadTask.getStatus() == LoadStatus.PAUSED || loadTask.getStatus() == LoadStatus.STOPPED) { - status = JobManagerStatus.IMPORTING; + status = JobStatus.LOADING; break; } if (loadTask.getStatus() == LoadStatus.FAILED) { - status = JobManagerStatus.FAILED; + status = JobStatus.FAILED; break; } } - if (status == JobManagerStatus.SUCCESS || - status == JobManagerStatus.FAILED) { - p.setJobStatus(status); - if (this.update(p) != 1) { - throw new InternalException("job-manager.entity.update.failed", - p); - } + if (status == JobStatus.SUCCESS || + status == JobStatus.FAILED) { + task.setJobStatus(status); + this.update(task); } } - Date endDate = (p.getJobStatus() == JobManagerStatus.FAILED || - p.getJobStatus() == JobManagerStatus.SUCCESS) ? - p.getUpdateTime() : HubbleUtil.nowDate(); - p.setJobDuration(endDate.getTime() - p.getCreateTime().getTime()); + Date endDate = task.getJobStatus() == JobStatus.FAILED || + task.getJobStatus() == JobStatus.SUCCESS ? + task.getUpdateTime() : HubbleUtil.nowDate(); + task.setJobDuration(endDate.getTime() - task.getCreateTime().getTime()); }); return list; } @@ -120,17 +114,23 @@ public List listAll() { } @Transactional(isolation = Isolation.READ_COMMITTED) - public int remove(int id) { - return this.mapper.deleteById(id); + public void save(JobManager entity) { + if (this.mapper.insert(entity) != 1) { + throw new InternalException("entity.insert.failed", entity); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int save(JobManager entity) { - return this.mapper.insert(entity); + public void update(JobManager entity) { + if (this.mapper.updateById(entity) != 1) { + throw new InternalException("entity.update.failed", entity); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int update(JobManager entity) { - return this.mapper.updateById(entity); + public void remove(int id) { + if (this.mapper.deleteById(id) != 1) { + throw new InternalException("entity.delete.failed", id); + } } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/LoadTaskService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/LoadTaskService.java index 6514be60..b4ce9833 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/load/LoadTaskService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/load/LoadTaskService.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,12 +41,10 @@ import com.baidu.hugegraph.common.Constant; import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.entity.GraphConnection; -import com.baidu.hugegraph.entity.enums.JobManagerStatus; import com.baidu.hugegraph.entity.enums.LoadStatus; import com.baidu.hugegraph.entity.load.EdgeMapping; import com.baidu.hugegraph.entity.load.FileMapping; import com.baidu.hugegraph.entity.load.FileSetting; -import com.baidu.hugegraph.entity.load.JobManager; import com.baidu.hugegraph.entity.load.ListFormat; import com.baidu.hugegraph.entity.load.LoadParameter; import com.baidu.hugegraph.entity.load.LoadTask; @@ -68,7 +65,6 @@ import com.baidu.hugegraph.service.schema.EdgeLabelService; import com.baidu.hugegraph.service.schema.VertexLabelService; import com.baidu.hugegraph.util.Ex; -import com.baidu.hugegraph.util.HubbleUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.Wrappers; @@ -95,14 +91,10 @@ public class LoadTaskService { @Autowired private HugeConfig config; - private Map taskContainer; + private Map runningTaskContainer; public LoadTaskService() { - this.taskContainer = new ConcurrentHashMap<>(); - } - - public Map getTaskContainer() { - return this.taskContainer; + this.runningTaskContainer = new ConcurrentHashMap<>(); } public LoadTask get(int id) { @@ -143,59 +135,60 @@ public List taskListByJob(int jobId) { } @Transactional(isolation = Isolation.READ_COMMITTED) - public int save(LoadTask entity) { - return this.mapper.insert(entity); + public void save(LoadTask entity) { + if (this.mapper.insert(entity) != 1) { + throw new InternalException("entity.insert.failed", entity); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int update(LoadTask entity) { - return this.mapper.updateById(entity); + public void update(LoadTask entity) { + if (this.mapper.updateById(entity) != 1) { + throw new InternalException("entity.update.failed", entity); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int remove(int id) { - this.taskContainer.remove(id); - return this.mapper.deleteById(id); + public void remove(int id) { + this.runningTaskContainer.remove(id); + if (this.mapper.deleteById(id) != 1) { + throw new InternalException("entity.delete.failed", id); + } } - public List batchTasks(int job_id) { + public List batchTasks(int jobId) { QueryWrapper query = Wrappers.query(); - query.eq("job_id", job_id); + query.eq("job_id", jobId); return this.mapper.selectList(query); } public LoadTask start(GraphConnection connection, FileMapping fileMapping) { - connection = sslService.configSSL(this.config, connection); + connection = this.sslService.configSSL(this.config, connection); LoadTask task = this.buildLoadTask(connection, fileMapping); - if (this.save(task) != 1) { - throw new InternalException("entity.insert.failed", task); - } - this.taskExecutor.execute(task); - - if (this.update(task) != 1) { - throw new InternalException("entity.update.failed", task); - } + this.save(task); + // executed in other threads + this.taskExecutor.execute(task, () -> this.update(task)); // Save current load task - this.taskContainer.put(task.getId(), task); + this.runningTaskContainer.put(task.getId(), task); return task; } public LoadTask pause(int taskId) { - LoadTask task = this.taskContainer.get(taskId); + LoadTask task = this.runningTaskContainer.get(taskId); Ex.check(task.getStatus() == LoadStatus.RUNNING, "Can only pause the RUNNING task"); - LoadContext context = task.context(); // Mark status as paused, should set before context.stopLoading() task.setStatus(LoadStatus.PAUSED); // Let HugeGraphLoader stop - context.stopLoading(); + task.stop(); - task.setFileReadLines(context.newProgress().totalInputReaded()); - task.setDuration(context.summary().totalTime()); - if (update(task) != 1) { - throw new InternalException("entity.update.failed", task); + task.lock(); + try { + this.update(task); + this.runningTaskContainer.remove(taskId); + } finally { + task.unlock(); } - this.taskContainer.remove(taskId); return task; } @@ -204,21 +197,23 @@ public LoadTask resume(int taskId) { Ex.check(task.getStatus() == LoadStatus.PAUSED || task.getStatus() == LoadStatus.FAILED, "Can only resume the PAUSED or FAILED task"); - task.restoreContext(); - task.setStatus(LoadStatus.RUNNING); - // Set work mode in incrental mode, load from last breakpoint - task.context().options().incrementalMode = true; - this.taskExecutor.execute(task); - - if (this.update(task) != 1) { - throw new InternalException("entity.update.failed", task); + task.lock(); + try { + // Set work mode in incrental mode, load from last breakpoint + task.getOptions().incrementalMode = true; + task.restoreContext(); + task.setStatus(LoadStatus.RUNNING); + this.update(task); + this.taskExecutor.execute(task, () -> this.update(task)); + this.runningTaskContainer.put(taskId, task); + } finally { + task.unlock(); } - this.taskContainer.put(taskId, task); return task; } public LoadTask stop(int taskId) { - LoadTask task = this.taskContainer.get(taskId); + LoadTask task = this.runningTaskContainer.get(taskId); if (task == null) { task = this.get(taskId); task.restoreContext(); @@ -226,17 +221,17 @@ public LoadTask stop(int taskId) { Ex.check(task.getStatus() == LoadStatus.RUNNING || task.getStatus() == LoadStatus.PAUSED, "Can only stop the RUNNING or PAUSED task"); - LoadContext context = task.context(); // Mark status as stopped task.setStatus(LoadStatus.STOPPED); - context.stopLoading(); + task.stop(); - task.setFileReadLines(context.newProgress().totalInputReaded()); - task.setDuration(context.summary().totalTime()); - if (update(task) != 1) { - throw new InternalException("entity.update.failed", task); + task.lock(); + try { + this.update(task); + this.runningTaskContainer.remove(taskId); + } finally { + task.unlock(); } - this.taskContainer.remove(taskId); return task; } @@ -245,17 +240,20 @@ public LoadTask retry(int taskId) { Ex.check(task.getStatus() == LoadStatus.FAILED || task.getStatus() == LoadStatus.STOPPED, "Can only retry the FAILED or STOPPED task"); - task.restoreContext(); - - task.setStatus(LoadStatus.RUNNING); - // Set work mode in normal mode, load from begin - task.context().options().incrementalMode = false; - this.taskExecutor.execute(task); - - if (this.update(task) != 1) { - throw new InternalException("entity.update.failed", task); + task.lock(); + try { + // Set work mode in normal mode, load from begin + task.getOptions().incrementalMode = false; + task.restoreContext(); + task.setStatus(LoadStatus.RUNNING); + task.setLastDuration(0L); + task.setCurrDuration(0L); + this.update(task); + this.taskExecutor.execute(task, () -> this.update(task)); + this.runningTaskContainer.put(taskId, task); + } finally { + task.unlock(); } - this.taskContainer.put(taskId, task); return task; } @@ -294,21 +292,36 @@ public void pauseAllTasks() { * Update progress periodically */ @Async - @Scheduled(fixedDelay = 5 * 1000) + @Scheduled(fixedDelay = 1 * 1000) @Transactional(isolation = Isolation.READ_COMMITTED) public void updateLoadTaskProgress() { - Map taskContainer = this.getTaskContainer(); - Iterator> iter = taskContainer.entrySet() - .iterator(); - iter.forEachRemaining(entry -> { - LoadTask task = entry.getValue(); - LoadContext context = task.context(); - task.setFileReadLines(context.newProgress().totalInputReaded()); - task.setDuration(context.summary().totalTime()); - if (this.update(task) != 1) { - throw new InternalException("entity.update.failed", task); + for (LoadTask task : this.runningTaskContainer.values()) { + if (!task.getStatus().inRunning()) { + continue; } - }); + task.lock(); + try { + if (task.getStatus().inRunning()) { + LoadContext context = task.context(); + long readLines = context.newProgress().totalInputReaded(); + if (readLines == 0L) { + /* + * When the Context is just constructed, newProgress + * is empty. Only after parsing is started will use + * oldProgress and incrementally update newProgress, + * if get totalInputReaded value during this process, + * it will return 0, so need read it from oldProgress + */ + readLines = context.oldProgress().totalInputReaded(); + } + task.setFileReadLines(readLines); + task.setCurrDuration(context.summary().totalTime()); + this.update(task); + } + } finally { + task.unlock(); + } + } } private LoadTask buildLoadTask(GraphConnection connection, diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/query/ExecuteHistoryService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/query/ExecuteHistoryService.java index 5a371477..bdc18a29 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/query/ExecuteHistoryService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/query/ExecuteHistoryService.java @@ -34,6 +34,7 @@ import com.baidu.hugegraph.entity.enums.AsyncTaskStatus; import com.baidu.hugegraph.entity.enums.ExecuteType; import com.baidu.hugegraph.entity.query.ExecuteHistory; +import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.mapper.query.ExecuteHistoryMapper; import com.baidu.hugegraph.options.HubbleOptions; import com.baidu.hugegraph.service.HugeClientPoolService; @@ -79,9 +80,10 @@ public IPage list(int connId, long current, long pageSize) { if (p.getType().equals(ExecuteType.GREMLIN_ASYNC)) { try { Task task = client.task().get(p.getAsyncId()); - long endDate = task.updateTime() > 0 ? task.updateTime() : now.getLong(ChronoField.INSTANT_SECONDS); + long endDate = task.updateTime() > 0 ? task.updateTime() : + now.getLong(ChronoField.INSTANT_SECONDS); p.setDuration(endDate - task.createTime()); - p.setAsyncStatus(AsyncTaskStatus.valueOf(task.status().toUpperCase())); + p.setAsyncStatus(task.status().toUpperCase()); } catch (Exception e) { p.setDuration(0L); p.setAsyncStatus(AsyncTaskStatus.UNKNOWN); @@ -99,7 +101,7 @@ public ExecuteHistory get(int connId, int id) { try { Task task = client.task().get(history.getAsyncId()); history.setDuration(task.updateTime() - task.createTime()); - history.setAsyncStatus(AsyncTaskStatus.valueOf(task.status().toUpperCase())); + history.setAsyncStatus(task.status().toUpperCase()); } catch (Exception e) { history.setDuration(0L); history.setAsyncStatus(AsyncTaskStatus.UNKNOWN); @@ -109,23 +111,29 @@ public ExecuteHistory get(int connId, int id) { } @Transactional(isolation = Isolation.READ_COMMITTED) - public int save(ExecuteHistory history) { - return this.mapper.insert(history); + public void save(ExecuteHistory history) { + if (this.mapper.insert(history) != 1) { + throw new InternalException("entity.insert.failed", history); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int update(ExecuteHistory history) { - return this.mapper.updateById(history); + public void update(ExecuteHistory history) { + if (this.mapper.updateById(history) != 1) { + throw new InternalException("entity.update.failed", history); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int remove(int connId, int id) { + public void remove(int connId, int id) { ExecuteHistory history = this.mapper.selectById(id); HugeClient client = this.getClient(connId); if (history.getType().equals(ExecuteType.GREMLIN_ASYNC)) { client.task().delete(history.getAsyncId()); } - return this.mapper.deleteById(id); + if (this.mapper.deleteById(id) != 1) { + throw new InternalException("entity.delete.failed", history); + } } @Async diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/query/GremlinCollectionService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/query/GremlinCollectionService.java index d30ca0b0..8e9aa019 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/query/GremlinCollectionService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/query/GremlinCollectionService.java @@ -26,6 +26,7 @@ import org.springframework.util.StringUtils; import com.baidu.hugegraph.entity.query.GremlinCollection; +import com.baidu.hugegraph.exception.InternalException; import com.baidu.hugegraph.mapper.query.GremlinCollectionMapper; import com.baidu.hugegraph.util.SQLUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -101,17 +102,23 @@ public int count() { } @Transactional(isolation = Isolation.READ_COMMITTED) - public int save(GremlinCollection collection) { - return this.mapper.insert(collection); + public void save(GremlinCollection collection) { + if (this.mapper.insert(collection) != 1) { + throw new InternalException("entity.insert.failed", collection); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int update(GremlinCollection collection) { - return this.mapper.updateById(collection); + public void update(GremlinCollection collection) { + if (this.mapper.updateById(collection) != 1) { + throw new InternalException("entity.update.failed", collection); + } } @Transactional(isolation = Isolation.READ_COMMITTED) - public int remove(int id) { - return this.mapper.deleteById(id); + public void remove(int id) { + if (this.mapper.deleteById(id) != 1) { + throw new InternalException("entity.delete.failed", id); + } } } diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/EdgeLabelService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/EdgeLabelService.java index 47d280b2..1aa2f713 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/EdgeLabelService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/EdgeLabelService.java @@ -139,16 +139,14 @@ public void checkNotExist(String name, int connId) { public void add(EdgeLabelEntity entity, int connId) { HugeClient client = this.client(connId); EdgeLabel edgeLabel = convert(entity, client); - client.schema().addEdgeLabel(edgeLabel); - - List indexLabels = collectIndexLabels(entity, client); try { - this.piService.addBatch(indexLabels, client); + client.schema().addEdgeLabel(edgeLabel); } catch (Exception e) { - client.schema().removeEdgeLabel(edgeLabel.name()); throw new ExternalException("schema.edgelabel.create.failed", e, entity.getName()); } + List indexLabels = collectIndexLabels(entity, client); + this.piService.addBatch(indexLabels, client); } public void update(EdgeLabelUpdateEntity entity, int connId) { @@ -185,27 +183,20 @@ public void update(EdgeLabelUpdateEntity entity, int connId) { } } - // NOTE: property can append but doesn't support eliminate now - client.schema().appendEdgeLabel(edgeLabel); - - try { - this.piService.addBatch(addedIndexLabels, client); - } catch (Exception e) { - throw new ExternalException("schema.edgelabel.update.failed", e, - entity.getName()); - } - try { - this.piService.removeBatch(removedIndexLabelNames, client); + // NOTE: property can append but doesn't support eliminate now + client.schema().appendEdgeLabel(edgeLabel); } catch (Exception e) { throw new ExternalException("schema.edgelabel.update.failed", e, entity.getName()); } + this.piService.addBatch(addedIndexLabels, client); + this.piService.removeBatch(removedIndexLabelNames, client); } public void remove(String name, int connId) { HugeClient client = this.client(connId); - client.schema().removeEdgeLabel(name); + client.schema().removeEdgeLabelAsync(name); } public ConflictDetail checkConflict(ConflictCheckEntity entity, @@ -349,7 +340,7 @@ private static EdgeLabelEntity convert(EdgeLabel edgeLabel, .build(); } - public static EdgeLabelStyle getStyle(SchemaElement element) { + private static EdgeLabelStyle getStyle(SchemaElement element) { String styleValue = (String) element.userdata().get(USER_KEY_STYLE); if (styleValue != null) { return JsonUtil.fromJson(styleValue, EdgeLabelStyle.class); diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/PropertyIndexService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/PropertyIndexService.java index 902d66a2..684ba114 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/PropertyIndexService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/PropertyIndexService.java @@ -203,18 +203,19 @@ private PropertyIndex get(String name, int connId) { } } - public void addBatch(List indexLabels, HugeClient client) { + public List addBatch(List indexLabels, HugeClient client) { BiFunction func = (hugeClient, il) -> { return hugeClient.schema().addIndexLabelAsync(il); }; - addBatch(indexLabels, client, func, SchemaType.PROPERTY_INDEX); + return addBatch(indexLabels, client, func, + SchemaType.PROPERTY_INDEX); } - public void removeBatch(List indexLabels, HugeClient client) { + public List removeBatch(List indexLabels, HugeClient client) { BiFunction func = (hugeClient, name) -> { return hugeClient.schema().removeIndexLabelAsync(name); }; - removeBatch(indexLabels, client, func, SchemaType.PROPERTY_INDEX); + return removeBatch(indexLabels, client, func, SchemaType.PROPERTY_INDEX); } public void checkConflict(List entities, diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/VertexLabelService.java b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/VertexLabelService.java index 61c67f1e..dbea72df 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/VertexLabelService.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/service/schema/VertexLabelService.java @@ -149,15 +149,14 @@ public List getLinkEdgeLabels(String name, int connId) { public void add(VertexLabelEntity entity, int connId) { HugeClient client = this.client(connId); VertexLabel vertexLabel = convert(entity, client); - client.schema().addVertexLabel(vertexLabel); - - List indexLabels = collectIndexLabels(entity, client); try { - this.piService.addBatch(indexLabels, client); + client.schema().addVertexLabel(vertexLabel); } catch (Exception e) { - throw new ExternalException("schema.vertexlabel.create.failed", e, - entity.getName()); + throw new ExternalException("schema.vertexlabel.create.failed", + e, entity.getName()); } + List indexLabels = collectIndexLabels(entity, client); + this.piService.addBatch(indexLabels, client); } public void update(VertexLabelUpdateEntity entity, int connId) { @@ -194,28 +193,20 @@ public void update(VertexLabelUpdateEntity entity, int connId) { } } - // NOTE: property can append but doesn't support eliminate now - client.schema().appendVertexLabel(vertexLabel); - - try { - this.piService.addBatch(addedIndexLabels, client); - } catch (Exception e) { - // client.schema().eliminateVertexLabel(vertexLabel); - throw new ExternalException("schema.vertexlabel.update.failed", e, - entity.getName()); - } - try { - this.piService.removeBatch(removedIndexLabelNames, client); + // NOTE: property can append but doesn't support eliminate now + client.schema().appendVertexLabel(vertexLabel); } catch (Exception e) { throw new ExternalException("schema.vertexlabel.update.failed", e, entity.getName()); } + this.piService.addBatch(addedIndexLabels, client); + this.piService.removeBatch(removedIndexLabelNames, client); } public void remove(String name, int connId) { HugeClient client = this.client(connId); - client.schema().removeVertexLabel(name); + client.schema().removeVertexLabelAsync(name); } public boolean checkUsing(String name, int connId) { @@ -355,7 +346,7 @@ private static VertexLabelEntity join(VertexLabel vertexLabel, .build(); } - public static VertexLabelStyle getStyle(SchemaElement element) { + private static VertexLabelStyle getStyle(SchemaElement element) { String styleValue = (String) element.userdata().get(USER_KEY_STYLE); if (styleValue != null) { return JsonUtil.fromJson(styleValue, VertexLabelStyle.class); diff --git a/hubble-be/src/main/java/com/baidu/hugegraph/util/HubbleUtil.java b/hubble-be/src/main/java/com/baidu/hugegraph/util/HubbleUtil.java index e5e52a87..2db09725 100644 --- a/hubble-be/src/main/java/com/baidu/hugegraph/util/HubbleUtil.java +++ b/hubble-be/src/main/java/com/baidu/hugegraph/util/HubbleUtil.java @@ -25,6 +25,7 @@ import java.util.UUID; import java.util.regex.Pattern; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.CollectionUtils; public final class HubbleUtil { @@ -52,4 +53,8 @@ public static boolean equalCollection(Collection c1, Collection c2) { public static String generateSimpleId() { return UUID.randomUUID().toString(); } + + public static String md5(String rawText) { + return DigestUtils.md5Hex(rawText); + } } diff --git a/hubble-be/src/main/resources/database/schema.sql b/hubble-be/src/main/resources/database/schema.sql index 89af81bd..cc03c3e7 100644 --- a/hubble-be/src/main/resources/database/schema.sql +++ b/hubble-be/src/main/resources/database/schema.sql @@ -57,14 +57,12 @@ CREATE TABLE IF NOT EXISTS `file_mapping` ( `total_lines` LONG NOT NULL, `total_size` LONG NOT NULL, `file_status` TINYINT NOT NULL DEFAULT 0, - `file_total` LONG NOT NULL DEFAULT 0, - `file_index` VARCHAR(5000) NOT NULL DEFAULT '', `file_setting` VARCHAR(65535) NOT NULL, `vertex_mappings` VARCHAR(65535) NOT NULL, `edge_mappings` VARCHAR(65535) NOT NULL, `load_parameter` VARCHAR(65535) NOT NULL, - `last_access_time` DATETIME(6) NOT NULL, `create_time` DATETIME(6) NOT NULL, + `update_time` DATETIME(6) NOT NULL, PRIMARY KEY (`id`), UNIQUE (`conn_id`, `job_id`, `name`) ); @@ -80,9 +78,10 @@ CREATE TABLE IF NOT EXISTS `load_task` ( `vertices` VARCHAR(512) NOT NULL, `edges` VARCHAR(512) NOT NULL, `file_total_lines` LONG NOT NULL, - `file_read_lines` LONG NOT NULL, `load_status` TINYINT NOT NULL, - `duration` LONG NOT NULL, + `file_read_lines` LONG NOT NULL, + `last_duration` LONG NOT NULL, + `curr_duration` LONG NOT NULL, `create_time` DATETIME(6) NOT NULL, PRIMARY KEY (`id`) ); diff --git a/hubble-be/src/main/resources/i18n/messages.properties b/hubble-be/src/main/resources/i18n/messages.properties index 4c714be6..aca95bf1 100644 --- a/hubble-be/src/main/resources/i18n/messages.properties +++ b/hubble-be/src/main/resources/i18n/messages.properties @@ -110,6 +110,9 @@ schema.display-fields.cannot-be-nullable=The display fields of schema can't be n load.upload.files.at-least-one=Please select at least one file load.upload.file.cannot-be-empty=The upload file can't be empty load.upload.file.format.unsupported=The upload file format is unsupported +load.upload.file.token.existed=There existed same token +load.upload.file.duplicate-name=Don't allow duplicate file names to obtain tokens at the same time +load.upload.file.name-token.unmatch=The upload file name doesn't match with token load.upload.files.cannot-dup=The upload files can't contain duplicates load.upload.file.exceed-single-size=The upload file has exceeded single limit size {0} load.upload.file.exceed-total-size=The upload file has exceeded total limit size {0} @@ -129,6 +132,14 @@ load.file-mapping.edge.not-exist.label=The edge mapping doesn't exist with label load.file-mapping.edge.source-fields-cannot-be-empty=The source fields can't be empty load.file-mapping.edge.target-fields-cannot-be-empty=The target fields can't be empty +# job manager +job.manager.job-name.repeated=Job Manager name can't be repeated +job.manager.job-name.reached-limit=Can't add job name because has length limit +job.manager.job-name.unmatch-regex=Invalid job name, valid name is up to 48 alpha-numeric characters and underscores +job.manager.job-remarks.reached-limit=Can't add job remarks because has length limit +job.manager.job-remarks.unmatch-regex=Invalid job name, valid name is up to 200 alpha-numeric characters and underscores +job.manager.status.unexpected=Job status doesn't meet expectations, it should be {0}, actually {1} + # Internal Exceptions common.unknown.enum.type=Unknown type {0} for enum {1} @@ -156,10 +167,3 @@ license.datasize.no-limit=no limit license.verify.ip.unauthorized=The hugegraph-hubble's IP {0} doesn't match the authorized {1} license.verfiy.mac-unmatch-ip=Failed to get mac address for IP {0} license.verify.mac.unauthorized=The hugegraph-hubble's mac {0} doesn't match the authorized {1} - -# job manager -job.manager.job-name.repeated=Job Manager name can't be repeated -job.manager.job-name.reached-limit=Can't add job name because has length limit -job.manager.job-name.unmatch-regex=Invalid job name, valid name is up to 48 alpha-numeric characters and underscores -job.manager.job-remarks.reached-limit=Can't add job remarks because has length limit -job.manager.job-remarks.unmatch-regex=Invalid job name, valid name is up to 200 alpha-numeric characters and underscores diff --git a/hubble-be/src/main/resources/i18n/messages_zh_CN.properties b/hubble-be/src/main/resources/i18n/messages_zh_CN.properties index 3dc6bc18..7feb0f60 100644 --- a/hubble-be/src/main/resources/i18n/messages_zh_CN.properties +++ b/hubble-be/src/main/resources/i18n/messages_zh_CN.properties @@ -110,6 +110,9 @@ schema.display-fields.cannot-be-nullable=展示内容不能是可空属性 load.upload.files.at-least-one=至少选择一个文件 load.upload.file.cannot-be-empty=不能上传空文件 load.upload.file.format.unsupported=上传文件的格式不支持 +load.upload.file.token.existed=已存在相同的 token +load.upload.file.duplicate-name=不允许重复的文件名同时获取 token +load.upload.file.name-token.unmatch=上传文件的名称与 token 不匹配 load.upload.files.cannot-dup=上传的文件不能包含重复的 load.upload.file.exceed-single-size=上传文件大小超过了限制 {0} load.upload.file.exceed-total-size=上传文件总大小超过了限制 {0} @@ -129,6 +132,14 @@ load.file-mapping.edge.not-exist.label=不存在 label 为 {0} 的边映射 load.file-mapping.edge.source-fields-cannot-be-empty=起点ID列不能为空 load.file-mapping.edge.target-fields-cannot-be-empty=终点ID列不能为空 +# job manager +job.manager.job-name.repeated=任务名称不能重复 +job.manager.job-name.reached-limit=任务名称长度限制48个字符 +job.manager.job-name.unmatch-regex=任务名不合法,任务名允许字母、数字、中文、下划线,最多 48 个字符 +job.manager.job-remarks.reached-limit=任务描述长度限制48个字符 +job.manager.job-remarks.unmatch-regex=任务描述不合法,任务描述允许字母、数字、中文、下划线,最多 200 个字符 +job.manager.status.unexpected=当前的任务状态不符合预期,应该是 {0},实际是 {1} + # Internal Exceptions common.unknown.enum.type=枚举 {1} 的值 {0} 未知 @@ -156,10 +167,3 @@ license.datasize.no-limit=无限制 license.verify.ip.unauthorized=hugegraph-hubble 机器的 IP {0} 不在已授权范围内 {1} license.verfiy.mac-unmatch-ip=hugegraph-hubble 机器的 MAC 与 IP {0} 不匹配 license.verify.mac.unauthorized=hugegraph-hubble 机器的 MAC {0} 不在已授权范围内 {1} - -# job manager -job.manager.job-name.repeated=任务名称不能重复 -job.manager.job-name.reached-limit=任务名称长度限制48个字符 -job.manager.job-name.unmatch-regex=任务名不合法,任务名允许字母、数字、中文、下划线,最多 48 个字符 -job.manager.job-remarks.reached-limit=任务描述长度限制48个字符 -job.manager.job-remarks.unmatch-regex=任务描述不合法,任务描述允许字母、数字、中文、下划线,最多 200 个字符 diff --git a/hubble-dist/pom.xml b/hubble-dist/pom.xml index c57a7a03..66bc6e2f 100644 --- a/hubble-dist/pom.xml +++ b/hubble-dist/pom.xml @@ -5,7 +5,7 @@ hugegraph-hubble com.baidu.hugegraph - 1.3.0 + 1.5.0 4.0.0