Skip to content

Commit

Permalink
[DS-5229][feat] Implement server port custom config (#6947)
Browse files Browse the repository at this point in the history
* [DS-5229][fix] server port custom config
This closes #5229

* [DS-5229][feat] Implement server port custom config
This closes #5229

* [Bug] [readme] Error link to Docker and k8s in readme #6802 (#6862)

* [Bug] [readme] Error link to Docker and k8s in readme #6802
1、modify the error link

* [Bug] [readme] Error link to Docker and k8s in readme #6802
1、modify the error link in readme_zh_cn.md

* [DS-6829][WorkerServer] skip create log dir and print log in dryRun model (#6852)

Co-authored-by: caishunfeng <[email protected]>

* [DS-5229][feat] Implement server port custom config
Modify review suggestion
This closes #5229

Co-authored-by: GaoTianDuo <[email protected]>
Co-authored-by: wind <[email protected]>
Co-authored-by: caishunfeng <[email protected]>
  • Loading branch information
4 people authored Nov 30, 2021
1 parent 100a9ba commit 0dcff14
Show file tree
Hide file tree
Showing 10 changed files with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.dolphinscheduler.common.Constants.ALERT_RPC_PORT;

import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
Expand Down Expand Up @@ -100,7 +101,7 @@ private void checkTable() {

private void startServer() {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(ALERT_RPC_PORT);
serverConfig.setListenPort(PropertyUtils.getInt(ALERT_RPC_PORT, 50052));

server = new NettyRemotingServer(serverConfig);
server.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dolphinscheduler.api.service.LoggerService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
Expand Down Expand Up @@ -94,7 +95,7 @@ public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {
Result<String> result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());

logger.info("log host : {} , logPath : {} , logServer port : {}", host, taskInstance.getLogPath(),
Constants.RPC_PORT);
PropertyUtils.getInt(Constants.RPC_PORT, 50051));

StringBuilder log = new StringBuilder();
if (skipLineNum == 0) {
Expand All @@ -106,7 +107,7 @@ public Result<String> queryLog(int taskInstId, int skipLineNum, int limit) {
}

log.append(logClient
.rollViewLog(host, Constants.RPC_PORT, taskInstance.getLogPath(), skipLineNum, limit));
.rollViewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath(), skipLineNum, limit));

result.setData(log.toString());
return result;
Expand All @@ -131,7 +132,7 @@ public byte[] getLogBytes(int taskInstId) {
host,
Constants.SYSTEM_LINE_SEPARATOR).getBytes(StandardCharsets.UTF_8);
return Bytes.concat(head,
logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath()));
logClient.getLogBytes(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), taskInstance.getLogPath()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ private Constants() {
public static final String COMMON_PROPERTIES_PATH = "/common.properties";

/**
* alter properties
* alert properties
*/
public static final int ALERT_RPC_PORT = 50052;
public static final String ALERT_RPC_PORT = "alert.rpc.port";

/**
* registry properties
Expand Down Expand Up @@ -291,7 +291,7 @@ private Constants() {
*
* rpc port
*/
public static final int RPC_PORT = 50051;
public static final String RPC_PORT = "rpc.port";

/**
* forbid running task
Expand Down
4 changes: 4 additions & 0 deletions dolphinscheduler-common/src/main/resources/common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ sudo.enable=true

# development state
development.state=false

# rpc port
rpc.port=50051
alert.rpc.port=50052
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.log;

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
Expand Down Expand Up @@ -49,7 +50,7 @@ public class LoggerServer {

public LoggerServer() {
this.serverConfig = new NettyServerConfig();
this.serverConfig.setListenPort(Constants.RPC_PORT);
this.serverConfig.setListenPort(PropertyUtils.getInt(Constants.RPC_PORT, 50051));
this.server = new NettyRemotingServer(serverConfig);
this.requestProcessor = new LoggerRequestProcessor();
this.server.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, requestProcessor, requestProcessor.getExecutor());
Expand All @@ -72,7 +73,7 @@ public static void main(String[] args) {
*/
public void start() {
this.server.start();
logger.info("logger server started, listening on port : {}", Constants.RPC_PORT);
logger.info("logger server started, listening on port : {}", PropertyUtils.getInt(Constants.RPC_PORT, 50051));
Runtime.getRuntime().addShutdownHook(new Thread(LoggerServer.this::stop));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public static List<String> killYarnJob(TaskExecutionContext taskExecutionContext
String log;
try (LogClientService logClient = new LogClientService()) {
log = logClient.viewLog(Host.of(taskExecutionContext.getHost()).getIp(),
Constants.RPC_PORT,
PropertyUtils.getInt(Constants.RPC_PORT, 50051),
taskExecutionContext.getLogPath());
}
if (!StringUtils.isEmpty(log)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
Expand Down Expand Up @@ -124,7 +125,7 @@ public static void main(String[] args) {
@PostConstruct
public void run() {
// alert-server client registry
alertClientService = new AlertClientService(workerConfig.getAlertListenHost(), Constants.ALERT_RPC_PORT);
alertClientService = new AlertClientService(workerConfig.getAlertListenHost(), PropertyUtils.getInt(Constants.ALERT_RPC_PORT, 50052));

// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
Expand Down Expand Up @@ -179,7 +180,7 @@ private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestComm
private Pair<Boolean, List<String>> killYarnJob(String host, String logPath, String executePath, String tenantCode) {
try (LogClientService logClient = new LogClientService();) {
logger.info("view log host : {},logPath : {}", host, logPath);
String log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
String log = logClient.viewLog(host, PropertyUtils.getInt(Constants.RPC_PORT, 50051), logPath);
List<String> appIds = Collections.emptyList();
if (!StringUtils.isEmpty(log)) {
appIds = LoggerUtils.getAppIds(log, logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.service.log.LogClientService;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -51,7 +52,7 @@ public void testRollViewLog() throws IOException {
org.apache.commons.io.FileUtils.writeStringToFile(new File("/tmp/demo.txt"), expectedTmpDemoString, Charset.defaultCharset());

String resultTmpDemoString = this.logClientService.rollViewLog(
"localhost", Constants.RPC_PORT,"/tmp/demo.txt", 0, 1000);
"localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051), "/tmp/demo.txt", 0, 1000);

Assert.assertEquals(expectedTmpDemoString, resultTmpDemoString.replaceAll("[\r|\n|\t]", StringUtils.EMPTY));

Expand All @@ -63,11 +64,11 @@ public void testRemoveTaskLog() throws IOException {
String expectedTmpRemoveString = "testRemoveTaskLog";
org.apache.commons.io.FileUtils.writeStringToFile(new File("/tmp/remove.txt"), expectedTmpRemoveString, Charset.defaultCharset());

Boolean b = this.logClientService.removeTaskLog("localhost", Constants.RPC_PORT,"/tmp/remove.txt");
Boolean b = this.logClientService.removeTaskLog("localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051),"/tmp/remove.txt");

Assert.assertTrue(b);

String result = this.logClientService.viewLog("localhost", Constants.RPC_PORT,"/tmp/demo.txt");
String result = this.logClientService.viewLog("localhost", PropertyUtils.getInt(Constants.RPC_PORT, 50051),"/tmp/demo.txt");

Assert.assertEquals(StringUtils.EMPTY, result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.Command;
import org.apache.dolphinscheduler.dao.entity.DagData;
Expand Down Expand Up @@ -507,7 +508,7 @@ public void removeTaskLogFile(Integer processInstanceId) {
if (StringUtils.isEmpty(taskInstance.getHost())) {
continue;
}
int port = Constants.RPC_PORT;
int port = PropertyUtils.getInt(Constants.RPC_PORT, 50051);
String ip = "";
try {
ip = Host.of(taskInstance.getHost()).getIp();
Expand Down

0 comments on commit 0dcff14

Please sign in to comment.