Skip to content

Commit

Permalink
[Feature][dolphinscheduler-api] add queryLog and downloadTaskLog with…
Browse files Browse the repository at this point in the history
… projectCode #7153 (#7192)

* [Feature][dolphinscheduler-api] add queryLog and downloadTaskLog with projectCode #7153

* [Feature][dolphinscheduler-api] add queryLog and downloadTaskLog with projectCode #7153

* [Feature][dolphinscheduler-api] add queryLog and downloadTaskLog with projectCode #7153

Co-authored-by: honghuo.zw <[email protected]>
  • Loading branch information
zwZjut and honghuo.zw authored Dec 5, 2021
1 parent d83735a commit 05aef27
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
Expand All @@ -43,6 +44,7 @@
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;

/**
Expand Down Expand Up @@ -82,7 +84,6 @@ public Result<String> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SE
return loggerService.queryLog(taskInstanceId, skipNum, limit);
}


/**
* download log file
*
Expand All @@ -107,4 +108,59 @@ public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value = Const
.body(logBytes);
}

/**
* query task log in specified project
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstanceId task instance id
* @param skipNum skip number
* @param limit limit
* @return task log content
*/
@ApiOperation(value = "queryLogInSpecifiedProject", notes = "QUERY_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "limit", value = "LIMIT", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/{projectCode}/detail")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_TASK_INSTANCE_LOG_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result<String> queryLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskInstanceId") int taskInstanceId,
@RequestParam(value = "skipLineNum") int skipNum,
@RequestParam(value = "limit") int limit) {
return returnDataList(loggerService.queryLog(loginUser, projectCode, taskInstanceId, skipNum, limit));
}

/**
* download log file
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstanceId task instance id
* @return log file content
*/
@ApiOperation(value = "downloadTaskLogInSpecifiedProject", notes = "DOWNLOAD_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
@ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required = true, dataType = "Int", example = "100")
})
@GetMapping(value = "/{projectCode}/download-log")
@ResponseBody
@ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "taskInstanceId") int taskInstanceId) {
byte[] logBytes = loggerService.getLogBytes(loginUser, projectCode, taskInstanceId);
return ResponseEntity
.ok()
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + System.currentTimeMillis() + ".log" + "\"")
.body(logBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.dolphinscheduler.api.service;

import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.User;

import java.util.Map;

/**
* logger service
Expand All @@ -43,4 +46,25 @@ public interface LoggerService {
*/
byte[] getLogBytes(int taskInstId);

/**
* query log
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
Map<String, Object> queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit);

/**
* get log bytes
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @return log byte array
*/
byte[] getLogBytes(User loginUser, long projectCode, int taskInstId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,24 @@
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.process.ProcessService;

import org.apache.commons.lang.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;

import javax.annotation.PostConstruct;
Expand All @@ -47,7 +54,7 @@
* logger service impl
*/
@Service
public class LoggerServiceImpl implements LoggerService {
public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService {

private static final Logger logger = LoggerFactory.getLogger(LoggerServiceImpl.class);

Expand All @@ -58,6 +65,15 @@ public class LoggerServiceImpl implements LoggerService {

private LogClientService logClient;

@Autowired
ProjectMapper projectMapper;

@Autowired
ProjectService projectService;

@Autowired
TaskDefinitionMapper taskDefinitionMapper;

@PostConstruct
public void init() {
if (Objects.isNull(this.logClient)) {
Expand Down Expand Up @@ -89,10 +105,117 @@ public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
}
Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
String log = queryLog(taskInstance,skipLineNum,limit);
result.setData(log);
return result;
}

String host = getHost(taskInstance.getHost());

Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
/**
* get log size
*
* @param taskInstId task instance id
* @return log byte array
*/
@Override
public byte[] getLogBytes(int taskInstId) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
return getLogBytes(taskInstance);
}

/**
* query log
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
@Override
@SuppressWarnings("unchecked")
public Map<String, Object> queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
// check whether the task instance can be found
TaskInstance task = processService.findTaskInstanceById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
return result;
}

TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode());
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstId);
return result;
}
String log = queryLog(task, skipLineNum, limit);
result.put(Constants.DATA_LIST, log);
return result;
}

/**
* get log bytes
*
* @param loginUser login user
* @param projectCode project code
* @param taskInstId task instance id
* @return log byte array
*/
@Override
public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
throw new ServiceException("user has no permission");
}
// check whether the task instance can be found
TaskInstance task = processService.findTaskInstanceById(taskInstId);
if (task == null || StringUtils.isBlank(task.getHost())) {
throw new ServiceException("task instance is null or host is null");
}

TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode());
if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) {
throw new ServiceException("task instance does not exist in project");
}
return getLogBytes(task);
}

/**
* get host
*
* @param address address
* @return old version return true ,otherwise return false
*/
private String getHost(String address) {
if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
return address;
}
return Host.of(address).getIp();
}

/**
* query log
*
* @param taskInstance task instance
* @param skipLineNum skip line number
* @param limit limit
* @return log string data
*/
private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) {

String host = getHost(taskInstance.getHost());

logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(),
PropertyUtils.getInt(Constants.RPC_PORT, 50051));
Expand All @@ -109,23 +232,16 @@ public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {
log.append(logClient
.rollViewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath(), skipLineNum, limit));

result.setData(log.toString());
return result;
return log.toString();
}


/**
* get log size
* get log bytes
*
* @param taskInstId task instance id
* @param taskInstance task instance
* @return log byte array
*/
@Override
public byte[] getLogBytes(int taskInstId) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) {
throw new ServiceException("task instance is null or host is null");
}
private byte[] getLogBytes(TaskInstance taskInstance) {
String host = getHost(taskInstance.getHost());
byte[] head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(),
Expand All @@ -134,17 +250,4 @@ public byte[] getLogBytes(int taskInstId) {
return Bytes.concat(head,
logClient.getLogBytes(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath()));
}

/**
* get host
*
* @param address address
* @return old version return true ,otherwise return false
*/
private String getHost(String address) {
if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
return address;
}
return Host.of(address).getIp();
}
}
Loading

0 comments on commit 05aef27

Please sign in to comment.