diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java index fccc8c06ff5..31d4801f11a 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java @@ -24,6 +24,7 @@ import org.hibernate.validator.constraints.Length; import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; import java.util.List; @@ -39,7 +40,7 @@ public class DataAddTaskRequest { @NotBlank(message = "inlongGroupId cannot be blank") private String groupId; - @ApiModelProperty(value = "Source ID", hidden = true) + @ApiModelProperty(value = "Source ID, When the source ID is not null, data add task entries based on the source ID") private Integer sourceId; @ApiModelProperty(value = "Agent ip List") @@ -53,4 +54,14 @@ public class DataAddTaskRequest { @ApiModelProperty(value = "Audit version", hidden = true) private String auditVersion; + @ApiModelProperty(value = "Data start time") + private String dataTimeFrom; + + @ApiModelProperty(value = "Data end time") + private String dataTimeTo; + + @ApiModelProperty(value = "Increase Audit Version") + @NotNull(message = "IncreaseAuditVersion cannot be null") + private Boolean increaseAuditVersion; + } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java index 6cb5fe1e42b..20bcf08e9ff 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileDataAddTaskRequest.java @@ -39,10 +39,4 @@ public class FileDataAddTaskRequest extends DataAddTaskRequest { @ApiModelProperty("filterStreams") private List filterStreams; - @ApiModelProperty("Start time") - private Long startTime; - - @ApiModelProperty("End time") - private Long endTime; - } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java index 6b3cc382dbc..1be07993a9d 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java @@ -89,11 +89,11 @@ public class FileSource extends StreamSource { @ApiModelProperty("Whether retry") private Boolean retry; - @ApiModelProperty("Start time") - private Long startTime; + @ApiModelProperty(value = "Data start time") + private String dataTimeFrom; - @ApiModelProperty("End time") - private Long endTime; + @ApiModelProperty(value = "Data end time") + private String dataTimeTo; @ApiModelProperty("filterStreams") private List filterStreams; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java index db4ba25c12b..f23622d0609 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java @@ -92,11 +92,11 @@ public class FileSourceDTO { @ApiModelProperty("Whether retry") private Boolean retry = false; - @ApiModelProperty("Start time") - private Long startTime = 0L; + @ApiModelProperty(value = "Data start time") + private String dataTimeFrom; - @ApiModelProperty("End time") - private Long endTime = 0L; + @ApiModelProperty(value = "Data end time") + private String dataTimeTo; @ApiModelProperty(value = "Audit version") private String auditVersion; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java index d0100ecd1b8..777b231629c 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceRequest.java @@ -84,12 +84,11 @@ public class FileSourceRequest extends SourceRequest { @ApiModelProperty("Whether retry") private Boolean retry; - @ApiModelProperty("Start time") - private Long startTime; - - @ApiModelProperty("End time") - private Long endTime; + @ApiModelProperty(value = "Data start time") + private String dataTimeFrom; + @ApiModelProperty(value = "Data end time") + private String dataTimeTo; @ApiModelProperty("filterStreams") private List filterStreams; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java index 49ee5dd6698..4d8e19a9acb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java @@ -59,6 +59,7 @@ public class AgentClusterNodeInstallOperator implements InlongClusterNodeInstall public static final String INSTALLER_CONF_PATH = "/conf/installer.properties"; public static final String INSTALLER_START_CMD = "/bin/installer.sh start"; + public static final String CRONTAB_START_CMD = "/bin/crontab.sh"; public static final String INSTALLER_RESTART_CMD = "/bin/installer.sh restart"; public static final String INSTALLER_STOP_CMD = "/bin/installer.sh restart"; public static final String AGENT_MANAGER_AUTH_SECRET_ID = "agent.manager.auth.secretId"; @@ -115,6 +116,8 @@ public boolean install(ClusterNodeRequest clusterNodeRequest, String operator) { deployInstaller(request, operator); String startCmd = agentInstallPath + INSTALLER_START_CMD; commandExecutor.execRemote(request, startCmd); + String crontabStartCmd = agentInstallPath + CRONTAB_START_CMD; + commandExecutor.execRemote(request, crontabStartCmd); clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), NodeStatus.INSTALL_SUCCESS.getStatus(), currentTime + InlongConstants.BLANK + "success to install"); } catch (Exception e) { @@ -149,6 +152,8 @@ public boolean reInstall(ClusterNodeRequest clusterNodeRequest, String operator) commandExecutor.cpDir(request, agentInstallTempPath + "/modules.json", agentInstallPath + "/conf"); String reStartCmd = agentInstallPath + INSTALLER_RESTART_CMD; commandExecutor.execRemote(request, reStartCmd); + String crontabStartCmd = agentInstallPath + CRONTAB_START_CMD; + commandExecutor.execRemote(request, crontabStartCmd); clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), NodeStatus.NORMAL.getStatus(), currentTime + InlongConstants.BLANK + "success to reinstall"); } catch (Exception e) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java index cb44988bbf1..85356195b57 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cmd/CommandExecutorImpl.java @@ -147,7 +147,8 @@ public CommandResult rmDir(AgentClusterNodeRequest clusterNodeRequest, String pa @Override public CommandResult cpDir(AgentClusterNodeRequest clusterNodeRequest, String sourcePath, String targetPath) throws Exception { - return execRemote(clusterNodeRequest, "cp " + sourcePath + " " + targetPath); + return execRemote(clusterNodeRequest, + "if [ -e " + sourcePath + " ]; then cp " + sourcePath + " " + targetPath + "; fi"); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index e82d9d2aeba..324c9969103 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -545,12 +545,21 @@ public List addDataAddTask(DataAddTaskRequest request, String operator) List agentIpList = request.getAgentIpList(); List entityList = new ArrayList<>(); List resultIdList = new ArrayList<>(); - if (CollectionUtils.isEmpty(agentIpList)) { - entityList = sourceMapper.selectByRelatedId(request.getGroupId(), null, null); + if (request.getSourceId() != null) { + StreamSourceEntity entity = sourceMapper.selectById(request.getSourceId()); + Preconditions.expectNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND); + entityList.add(entity); } else { - for (String agentIp : agentIpList) { - List sourceEntityList = sourceMapper.selectByAgentIp(agentIp); - entityList.addAll(sourceEntityList); + if (agentIpList == null) { + throw new BusinessException("Agent ip list can not null"); + } + if (CollectionUtils.isEmpty(agentIpList)) { + entityList = sourceMapper.selectByRelatedId(request.getGroupId(), null, null); + } else { + for (String agentIp : agentIpList) { + List sourceEntityList = sourceMapper.selectByAgentIp(agentIp); + entityList.addAll(sourceEntityList); + } } } for (StreamSourceEntity sourceEntity : entityList) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java index fa10f8081ff..faf797dc7aa 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/file/FileSourceOperator.java @@ -114,10 +114,12 @@ public Integer addDataAddTask(DataAddTaskRequest request, String operator) { try { List dataAddTaskList = sourceMapper.selectByTaskMapId(sourceEntity.getId()); FileSourceDTO dto = FileSourceDTO.getFromJson(sourceEntity.getExtParams()); - dto.setStartTime(sourceRequest.getStartTime()); - dto.setEndTime(sourceRequest.getEndTime()); + dto.setDataTimeFrom(sourceRequest.getDataTimeFrom()); + dto.setDataTimeTo(sourceRequest.getDataTimeTo()); dto.setRetry(true); - dto.setAuditVersion(request.getAuditVersion()); + if (request.getIncreaseAuditVersion()) { + dto.setAuditVersion(request.getAuditVersion()); + } dto.setFilterStreams(sourceRequest.getFilterStreams()); StreamSourceEntity dataAddTaskEntity = CommonBeanUtils.copyProperties(sourceEntity, StreamSourceEntity::new);