Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10096][Manager] Support installing agents by SSH #10098

Merged
merged 14 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class InlongClusterNodeEntity implements Serializable {
private String type;
private String ip;
private Integer port;
private String username;
private String password;
private Integer sshPort;
private String protocolType;
private Integer nodeLoad;
private String extParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
<result column="type" jdbcType="VARCHAR" property="type"/>
<result column="ip" jdbcType="VARCHAR" property="ip"/>
<result column="port" jdbcType="INTEGER" property="port"/>
<result column="username" jdbcType="VARCHAR" property="username"/>
<result column="password" jdbcType="VARCHAR" property="password"/>
<result column="ssh_port" jdbcType="INTEGER" property="sshPort"/>
<result column="protocol_type" jdbcType="VARCHAR" property="protocolType"/>
<result column="node_load" jdbcType="INTEGER" property="nodeLoad"/>
<result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
Expand All @@ -39,32 +42,34 @@
<result column="version" jdbcType="INTEGER" property="version"/>
</resultMap>
<sql id="Base_Column_List">
id, parent_id, type, ip, port, protocol_type, node_load, ext_params, description,
id, parent_id, type, ip, port, username, password, ssh_port, protocol_type, node_load, ext_params, description,
status, is_deleted, creator, modifier, create_time, modify_time, version
</sql>

<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
insert into inlong_cluster_node (id, parent_id, type,
ip, port, protocol_type,
node_load, ext_params,
description, status,
creator, modifier)
ip, port, username,
password, ssh_port, protocol_type,
node_load, ext_params, description,
status, creator, modifier)
values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
#{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
#{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
#{description, jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
#{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
#{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{username,jdbcType=VARCHAR},
#{password,jdbcType=VARCHAR}, #{sshPort,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
#{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR}, #{description, jdbcType=VARCHAR},
#{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
</insert>

<insert id="insertOnDuplicateKeyUpdate" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity">
insert into inlong_cluster_node (id, parent_id, type,
ip, port, protocol_type,
ip, port, username,
password, ssh_port, protocol_type,
node_load, ext_params, status,
creator, modifier)
values (#{id,jdbcType=INTEGER}, #{parentId,jdbcType=INTEGER}, #{type,jdbcType=VARCHAR},
#{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
#{ip,jdbcType=VARCHAR}, #{port,jdbcType=INTEGER}, #{username,jdbcType=VARCHAR},
#{password,jdbcType=VARCHAR}, #{sshPort,jdbcType=INTEGER}, #{protocolType,jdbcType=VARCHAR},
#{nodeLoad,jdbcType=INTEGER}, #{extParams,jdbcType=LONGVARCHAR},
#{status,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR})
ON DUPLICATE KEY UPDATE node_load = VALUES(node_load),
Expand Down Expand Up @@ -157,6 +162,9 @@
type = #{type,jdbcType=VARCHAR},
ip = #{ip,jdbcType=VARCHAR},
port = #{port,jdbcType=INTEGER},
username = #{username,jdbcType=VARCHAR},
password = #{password,jdbcType=VARCHAR},
ssh_port = #{sshPort,jdbcType=INTEGER},
protocol_type = #{protocolType,jdbcType=VARCHAR},
node_load = #{nodeLoad,jdbcType=INTEGER},
ext_params = #{extParams,jdbcType=LONGVARCHAR},
Expand All @@ -183,6 +191,15 @@
<if test="port != null">
port = #{port,jdbcType=INTEGER},
</if>
<if test="username != null">
username = #{username,jdbcType=VARCHAR},
</if>
<if test="password != null">
password = #{password,jdbcType=VARCHAR},
</if>
<if test="sshPort != null">
ssh_port = #{sshPort,jdbcType=INTEGER},
</if>
<if test="protocolType != null">
protocol_type = #{protocolType,jdbcType=VARCHAR},
</if>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ public class ClusterNodeRequest {
@NotNull(message = "port cannot be null")
private Integer port;

@ApiModelProperty(value = "Username")
private String username;

@ApiModelProperty(value = "password")
private String password;

@ApiModelProperty(value = "SSH port")
private Integer sshPort;

@ApiModelProperty(value = "Cluster protocol type")
@Length(min = 1, max = 20, message = "length must be less than or equal to 20")
private String protocolType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public class ClusterNodeResponse {
@ApiModelProperty(value = "Cluster port")
private Integer port;

@ApiModelProperty(value = "Username")
private String username;

@ApiModelProperty(value = "password")
private String password;

@ApiModelProperty(value = "SSH port")
private Integer sshPort;

@ApiModelProperty(value = "Cluster protocol type")
private String protocolType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,57 @@
package org.apache.inlong.manager.service.cluster.node;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ModuleType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.AESUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongClusterNodeEntity;
import org.apache.inlong.manager.dao.entity.ModuleConfigEntity;
import org.apache.inlong.manager.dao.entity.PackageConfigEntity;
import org.apache.inlong.manager.dao.entity.UserEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.ModuleConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.PackageConfigEntityMapper;
import org.apache.inlong.manager.dao.mapper.UserEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterDTO;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest;
import org.apache.inlong.manager.service.cmd.CommandExecutor;

import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

@Service
public class AgentClusterNodeInstallOperator implements InlongClusterNodeInstallOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(AgentClusterNodeInstallOperator.class);

@Autowired
private InlongClusterEntityMapper clusterEntityMapper;
@Autowired
private CommandExecutor commandExecutor;
@Autowired
private ModuleConfigEntityMapper moduleConfigEntityMapper;
@Autowired
private PackageConfigEntityMapper packageConfigEntityMapper;
@Autowired
private UserEntityMapper userEntityMapper;

@Value("${metrics.audit.proxy.hosts:127.0.0.1:10081}")
private String auditProxyUrl;
@Value("${agent.install.path:inlong/inlong-installer/}")
private String agentInstallPath;
@Value("${manager.url:127.0.0.1:8083}")
private String managerUrl;

@Override
public Boolean accept(String clusterType) {
Expand All @@ -49,9 +82,39 @@ public String getClusterNodeType() {

@Override
public boolean install(ClusterNodeRequest clusterNodeRequest, String operator) {
// todo Provide agent installation capability
AgentClusterNodeRequest agentNodeRequest = (AgentClusterNodeRequest) clusterNodeRequest;
InlongClusterEntity clusterEntity = clusterEntityMapper.selectById(clusterNodeRequest.getParentId());
LOGGER.info("begin to insert agent inlong cluster node={}", clusterNodeRequest);
try {
InlongClusterEntity clusterEntity = clusterEntityMapper.selectById(clusterNodeRequest.getParentId());
AgentClusterDTO agentClusterDTO = AgentClusterDTO.getFromJson(clusterEntity.getExtParams());
fuweng11 marked this conversation as resolved.
Show resolved Hide resolved
AgentClusterNodeRequest request = (AgentClusterNodeRequest) clusterNodeRequest;
commandExecutor.mkdir(request, agentInstallPath);
String downLoadUrl = getInstallerDownLoadUrl(request);
String fileName = downLoadUrl.substring(downLoadUrl.lastIndexOf('/') + 1);
commandExecutor.downLoadPackage(request, agentInstallPath, downLoadUrl);
commandExecutor.tarPackage(request, fileName, agentInstallPath);
String confFile = agentInstallPath + "conf/installer.properties";
Map<String, String> configMap = new HashMap<>();
configMap.put("agent.local.ip", request.getIp());
configMap.put("agent.manager.addr", managerUrl);
UserEntity userInfo = userEntityMapper.selectByName(operator);
Preconditions.expectNotNull(userInfo, "User doesn't exist");
String secretKey =
new String(AESUtils.decryptAsString(userInfo.getSecretKey(), userInfo.getEncryptVersion()));
configMap.put("agent.manager.auth.secretId", operator);
configMap.put("agent.manager.auth.secretKey", secretKey);
configMap.put("agent.cluster.tag", clusterEntity.getClusterTags());
configMap.put("agent.cluster.name", clusterEntity.getName());
configMap.put("audit.proxys", auditProxyUrl);
commandExecutor.modifyConfig(request, configMap, confFile);
String startCmd = agentInstallPath + "bin/installer.sh start";
commandExecutor.execRemote(request, startCmd);

} catch (Exception e) {
String errMsg = String.format("install installer failed for ip=%s", clusterNodeRequest.getIp());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);
}
LOGGER.info("success to insert agent inlong cluster node={}", clusterNodeRequest);
return true;
}

Expand All @@ -61,4 +124,22 @@ public boolean unload(InlongClusterNodeEntity clusterNodeEntity, String operator
InlongClusterEntity clusterEntity = clusterEntityMapper.selectById(clusterNodeEntity.getParentId());
return true;
}

private String getInstallerDownLoadUrl(AgentClusterNodeRequest request) {
if (CollectionUtils.isEmpty(request.getModuleIdList())) {
throw new BusinessException(
String.format("install failed when module id list is null for ip=%s, type=%s", request.getIp(),
request.getType()));
}
for (Integer moduleId : request.getModuleIdList()) {
ModuleConfigEntity moduleConfigEntity = moduleConfigEntityMapper.selectByPrimaryKey(moduleId);
if (Objects.equals(moduleConfigEntity.getType(), ModuleType.INSTALLER.name())) {
PackageConfigEntity packageConfigEntity = packageConfigEntityMapper.selectByPrimaryKey(
moduleConfigEntity.getPackageId());
return packageConfigEntity.getDownloadUrl();
}
}
throw new BusinessException(String.format("can't get installer download url for ip=%s, type=%s", request.getIp(),
request.getType()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.service.cmd;

import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeRequest;

import java.util.Map;

public interface CommandExecutor {

CommandResult exec(String cmd) throws Exception;

CommandResult execRemote(AgentClusterNodeRequest clusterNodeRequest, String cmd) throws Exception;

CommandResult modifyConfig(AgentClusterNodeRequest clusterNodeRequest, Map<String, String> configMap,
String confPath) throws Exception;

CommandResult tarPackage(AgentClusterNodeRequest clusterNodeRequest, String fileName, String tarPath)
throws Exception;

CommandResult downLoadPackage(AgentClusterNodeRequest clusterNodeRequest, String downLoadPath, String downLoadUrl)
throws Exception;

CommandResult mkdir(AgentClusterNodeRequest clusterNodeRequest, String path) throws Exception;
}
Loading
Loading