Skip to content

Commit

Permalink
Some bug fix for load task and schema manage
Browse files Browse the repository at this point in the history
  • Loading branch information
Linary committed Oct 16, 2020
1 parent 331c4fe commit e794dd3
Show file tree
Hide file tree
Showing 31 changed files with 511 additions and 344 deletions.
6 changes: 1 addition & 5 deletions hubble-be/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down Expand Up @@ -124,7 +120,7 @@
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-loader</artifactId>
<version>0.10.4</version>
<version>0.10.5</version>
<exclusions>
<exclusion>
<groupId>com.baidu.hugegraph</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setMaxPoolSize(8);
taskExecutor.setCorePoolSize(8);
taskExecutor.setMaxPoolSize(16);
taskExecutor.setQueueCapacity(16);
taskExecutor.initialize();
return taskExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ public FileMapping fileSetting(@PathVariable("id") int id,
if (mapping == null) {
throw new ExternalException("load.file-mapping.not-exist.id", id);
}
// unescape \\t to \t
newEntity.unescapeDelimiterIfNeeded();
// change format to TEXT if needed
newEntity.changeFormatIfNeeded();
FileSetting oldEntity = mapping.getFileSetting();
FileSetting entity = this.mergeEntity(oldEntity, newEntity);
mapping.setFileSetting(entity);
Expand All @@ -140,8 +140,6 @@ public FileMapping fileSetting(@PathVariable("id") int id,
if (this.service.update(mapping) != 1) {
throw new InternalException("entity.update.failed", mapping);
}
// escape \t to \\t
mapping.getFileSetting().escapeDelimiterIfNeeded();
return mapping;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
Expand All @@ -41,6 +45,7 @@
import org.springframework.web.multipart.MultipartFile;

import com.baidu.hugegraph.common.Constant;
import com.baidu.hugegraph.common.Response;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.entity.enums.FileMappingStatus;
import com.baidu.hugegraph.entity.enums.JobManagerStatus;
Expand Down Expand Up @@ -69,15 +74,31 @@ public class FileUploadController {
@Autowired
private JobManagerService jobService;

private final Map<String, ReadWriteLock> uploadingTokenLocks;

public FileUploadController() {
this.uploadingTokenLocks = new ConcurrentHashMap<>();
}

@GetMapping("token")
public Response fileToken(@PathVariable("connId") int connId,
@PathVariable("jobId") int jobId,
@RequestParam("name") String fileName) {
String token = this.service.generateFileToken(fileName);
this.uploadingTokenLocks.put(token, new ReentrantReadWriteLock());
return Response.builder().status(Constant.STATUS_OK).data(token).build();
}

@PostMapping
public FileUploadResult upload(@PathVariable("connId") int connId,
@PathVariable("jobId") int jobId,
@RequestParam("file") MultipartFile file,
@RequestParam("name") String fileName,
@RequestParam("token") String token,
@RequestParam("total") int total,
@RequestParam("index") int index) {
// When front-end use multipart-upload mode,
// file.getOriginalFilename() is blob, not actual file name
this.checkTotalAndIndexValid(total, index);
this.checkFileNameMatchToken(fileName, token);
JobManager jobEntity = this.jobService.get(jobId);
this.checkFileValid(connId, jobId, jobEntity, file, fileName);

Expand All @@ -88,105 +109,137 @@ public FileUploadResult upload(@PathVariable("connId") int connId,
// Before merge: upload-files/conn-1/verson_person.csv/part-1
// After merge: upload-files/conn-1/file-mapping-1/verson_person.csv
String filePath = Paths.get(location, path, fileName).toString();
// Check destFile exist
// Ex.check(!destFile.exists(), "load.upload.file.existed", fileName);
FileUploadResult result = this.service.uploadFile(file, index, filePath);
if (result.getStatus() == FileUploadResult.Status.FAILURE) {
// Check this file deleted before
ReadWriteLock lock = this.uploadingTokenLocks.get(token);
if (lock == null) {
FileUploadResult result = new FileUploadResult();
// Current part saved path
String partName = file.getOriginalFilename();
result.setName(partName);
result.setSize(file.getSize());
result.setStatus(FileUploadResult.Status.FAILURE);
result.setCause("File has been deleted");
return result;
}
// Verify the existence of fragmented files
FileMapping mapping = this.service.get(connId, jobId, fileName);
if (mapping != null) {
mapping.setJobId(jobId);
mapping.setFileStatus(FileMappingStatus.UPLOADING);
mapping.setFileIndex(mapping.getFileIndex() + "," + index);
mapping.setFileTotal(total);
if (this.service.update(mapping) != 1) {
throw new InternalException("entity.update.failed", mapping);

lock.readLock().lock();
try {
FileUploadResult result = this.service.uploadFile(file, token,
index, filePath);
if (result.getStatus() == FileUploadResult.Status.FAILURE) {
return result;
}
} else {
mapping = new FileMapping(connId, fileName, filePath);
mapping.setJobId(jobId);
mapping.setFileStatus(FileMappingStatus.UPLOADING);
mapping.setFileIndex(String.valueOf(index));
mapping.setFileTotal(total);
if (this.service.save(mapping) != 1) {
throw new InternalException("entity.insert.failed", mapping);
synchronized (this.service) {
// Verify the existence of fragmented files
FileMapping mapping = this.service.get(connId, jobId, fileName);
if (mapping == null) {
mapping = new FileMapping(connId, fileName, filePath);
mapping.setJobId(jobId);
mapping.setFileStatus(FileMappingStatus.UPLOADING);
mapping.setFileIndex(String.valueOf(index));
mapping.setFileTotal(total);
if (this.service.save(mapping) != 1) {
throw new InternalException("entity.insert.failed",
mapping);
}
} else {
if (mapping.getFileStatus() == FileMappingStatus.COMPLETED) {
result.setId(mapping.getId());
return result;
}
}
Integer mappingId = mapping.getId();
// Determine whether all the parts have been uploaded, then merge them
boolean merged = this.service.tryMergePartFiles(filePath, total);
if (!merged) {
return result;
}
// Save file mapping
mapping = new FileMapping(connId, fileName, filePath);
// Read column names and values then fill it
this.service.extractColumns(mapping);
mapping.setId(mappingId);
mapping.setFileStatus(FileMappingStatus.COMPLETED);
mapping.setTotalLines(FileUtil.countLines(mapping.getPath()));
mapping.setTotalSize(FileUtils.sizeOf(new File(mapping.getPath())));
mapping.setCreateTime(HubbleUtil.nowDate());

// Move to the directory corresponding to the file mapping Id
String newPath = this.service.moveToNextLevelDir(mapping);
// Update file mapping stored path
mapping.setPath(newPath);
if (this.service.update(mapping) != 1) {
throw new InternalException("entity.update.failed", mapping);
}
// Update Job Manager size
long jobSize = jobEntity.getJobSize() + mapping.getTotalSize();
jobEntity.setJobSize(jobSize);
jobEntity.setJobStatus(JobManagerStatus.SETTING);
if (this.jobService.update(jobEntity) != 1) {
throw new InternalException("entity.update.failed", jobEntity);
}
result.setId(mapping.getId());
}
return result;
} finally {
lock.readLock().unlock();
}
Integer mapId = mapping.getId();
// Determine whether all the parts have been uploaded, then merge them
boolean merged = this.service.tryMergePartFiles(filePath, total);
if (merged) {
// Save file mapping
mapping = new FileMapping(connId, fileName, filePath);
// Read column names and values then fill it
this.service.extractColumns(mapping);
mapping.setId(mapId);
mapping.setFileStatus(FileMappingStatus.COMPLETED);
mapping.setTotalLines(FileUtil.countLines(mapping.getPath()));
mapping.setTotalSize(FileUtils.sizeOf(new File(mapping.getPath())));
mapping.setCreateTime(HubbleUtil.nowDate());
// Will generate mapping id

// Move to the directory corresponding to the file mapping Id
String newPath = this.service.moveToNextLevelDir(mapping);
// Update file mapping stored path
mapping.setPath(newPath);
if (this.service.update(mapping) != 1) {
throw new InternalException("entity.update.failed", mapping);
}

@DeleteMapping
public Boolean delete(@PathVariable("connId") int connId,
@PathVariable("jobId") int jobId,
@RequestParam("name") String fileName,
@RequestParam("token") String token) {
JobManager jobEntity = this.jobService.get(jobId);
Ex.check(jobEntity != null,
"job-manager.not-exist.id", jobId);
Ex.check(jobEntity.getJobStatus() == JobManagerStatus.DEFAULT ||
jobEntity.getJobStatus() == JobManagerStatus.SETTING,
"deleted.file.no-permission" );
FileMapping mapping = this.service.get(connId, fileName);
Ex.check(mapping != null, "load.file-mapping.not-exist.name", fileName);

ReadWriteLock lock = this.uploadingTokenLocks.get(token);
if (lock == null) {
throw new InternalException("Can't find lock of file %s with " +
"token %s", fileName, token);
}
lock.writeLock().lock();
try {
this.service.deleteDiskFile(mapping);
this.uploadingTokenLocks.remove(token);

if (this.service.remove(mapping.getId()) != 1) {
throw new InternalException("entity.delete.failed", mapping);
}
// Update Job Manager size
long jobSize = jobEntity.getJobSize() + mapping.getTotalSize();
log.info("removed file mapping {}", mapping.getId());
long jobSize = jobEntity.getJobSize() - mapping.getTotalSize();
jobEntity.setJobSize(jobSize);
jobEntity.setJobStatus(JobManagerStatus.SETTING);
if (this.jobService.update(jobEntity) != 1) {
throw new InternalException("job-manager.entity.update.failed",
jobEntity);
}
result.setId(mapping.getId());
return true;
} finally {
lock.writeLock().unlock();
}
return result;
}

@DeleteMapping
public Map<String, Boolean> delete(@PathVariable("connId") int connId,
@PathVariable("jobId") int jobId,
@RequestParam("names")
List<String> fileNames) {

JobManager jobEntity = this.jobService.get(jobId);
Ex.check(jobEntity != null,
"job-manager.not-exist.id", jobId);
Ex.check(jobEntity.getJobStatus() == JobManagerStatus.SETTING,
"deleted.file.no-permission" );
Ex.check(fileNames.size() > 0, "load.upload.files.at-least-one");
Map<String, Boolean> result = new LinkedHashMap<>();
for (String fileName : fileNames) {
FileMapping mapping = this.service.get(connId, fileName);
Ex.check(mapping != null, "load.file-mapping.not-exist.name",
fileName);
File destFile = new File(mapping.getPath());
boolean deleted = destFile.delete();
if (deleted) {
log.info("deleted file {}, prepare to remove file mapping {}",
destFile, mapping.getId());
if (this.service.remove(mapping.getId()) != 1) {
throw new InternalException("entity.delete.failed", mapping);
}
log.info("removed file mapping {}", mapping.getId());
long jobSize = jobEntity.getJobSize() - mapping.getTotalSize();
jobEntity.setJobSize(jobSize);
if (this.jobService.update(jobEntity) != 1) {
throw new InternalException("job-manager.entity.update.failed",
jobEntity);
}
}
result.put(fileName, deleted);
private void checkTotalAndIndexValid(int total, int index) {
if (total <= 0) {
throw new InternalException("The request params 'total' must > 0");
}
if (index < 0) {
throw new InternalException("The request params 'index' must >= 0");
}
return result;
}

private void checkFileNameMatchToken(String fileName, String token) {
String md5Prefix = HubbleUtil.md5(fileName);
Ex.check(StringUtils.isNotEmpty(token) && token.startsWith(md5Prefix),
"load.upload.file.name-token.unmatch");
}

private void checkFileValid(int connId, int jobId, JobManager jobEntity,
MultipartFile file, String fileName) {
Expand All @@ -198,7 +251,7 @@ private void checkFileValid(int connId, int jobId, JobManager jobEntity,
// Now allowed to upload empty file
Ex.check(!file.isEmpty(), "load.upload.file.cannot-be-empty");
// Difficult: how to determine whether the file is csv or text
log.info("File content type: {}", file.getContentType());
log.debug("File content type: {}", file.getContentType());

String format = FilenameUtils.getExtension(fileName);
List<String> formatWhiteList = this.config.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.ArrayList;
import java.util.List;

import org.datanucleus.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
Expand Down Expand Up @@ -78,13 +78,15 @@ public JobManager create(@PathVariable("connId") int connId,
Ex.check(entity.getJobName().length() <= 48,
"job.manager.job-name.reached-limit");
Ex.check(entity.getJobName() != null, () ->
Constant.COMMON_NAME_PATTERN.matcher(entity.getJobName()).matches(),
"job.manager.job-name.unmatch-regex");
Constant.COMMON_NAME_PATTERN.matcher(
entity.getJobName()).matches(),
"job.manager.job-name.unmatch-regex");
Ex.check(entity.getJobRemarks().length() <= 200,
"job.manager.job-remarks.reached-limit");
Ex.check(!StringUtils.isEmpty(entity.getJobRemarks()), () ->
Constant.COMMON_REMARK_PATTERN.matcher(entity.getJobRemarks()).matches(),
"job.manager.job-remarks.unmatch-regex");
Constant.COMMON_REMARK_PATTERN.matcher(
entity.getJobRemarks()).matches(),
"job.manager.job-remarks.unmatch-regex");
Ex.check(this.service.count() < LIMIT,
"job.manager.reached-limit", LIMIT);
if (this.service.getTask(entity.getJobName(), connId) != null) {
Expand Down Expand Up @@ -153,8 +155,8 @@ public JobManager update(@PathVariable("id") int id,
Ex.check(newEntity.getJobName().length() <= 48,
"job.manager.job-name.reached-limit");
Ex.check(newEntity.getJobName() != null, () ->
Constant.COMMON_NAME_PATTERN
.matcher(newEntity.getJobName()).matches(),
Constant.COMMON_NAME_PATTERN.matcher(
newEntity.getJobName()).matches(),
"job.manager.job-name.unmatch-regex");
Ex.check(newEntity.getJobRemarks().length() <= 200,
"job.manager.job-remarks.reached-limit");
Expand All @@ -178,7 +180,7 @@ public Response reason(@PathVariable("connId") int connId,
if (job == null) {
throw new ExternalException("job.manager.not-exist.id", id);
}
List<LoadTask> tasks = taskService.batchTasks(job.getId());
List<LoadTask> tasks = this.taskService.batchTasks(job.getId());
List<JobManagerReasonResult> reasonResults = new ArrayList<>();
tasks.forEach(task -> {
JobManagerReasonResult reasonResult = new JobManagerReasonResult();
Expand Down
Loading

0 comments on commit e794dd3

Please sign in to comment.