From a8324f96b18d56dc6f86005802758a33ef071fc9 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Mon, 10 Jul 2023 21:07:19 +0800 Subject: [PATCH] Polish config, expose rpc config in application.yml --- docs/docs/en/architecture/configuration.md | 10 +-- docs/docs/zh/architecture/configuration.md | 10 +-- .../api/audit/AuditPublishService.java | 10 +-- .../api/configuration/ApiConfig.java | 90 +++++++++++++++++++ .../api/configuration/AppConfiguration.java | 7 +- .../api/configuration/AuditConfiguration.java | 38 -------- .../PythonGatewayConfiguration.java | 38 -------- .../configuration/TrafficConfiguration.java | 38 -------- .../api/interceptor/RateLimitInterceptor.java | 6 +- .../api/python/PythonGateway.java | 7 +- .../src/main/resources/application.yaml | 76 ++++++++-------- ...figurationTest.java => ApiConfigTest.java} | 32 +++---- .../configuration/AuditConfigurationTest.java | 37 -------- .../interceptor/RateLimitInterceptorTest.java | 8 +- .../src/test/resources/application.yaml | 38 +++++++- .../server/master/MasterServer.java | 6 ++ .../server/master/config/MasterConfig.java | 8 ++ .../server/master/rpc/MasterRPCServer.java | 2 +- .../server/master/rpc/MasterRpcClient.java | 25 ++++-- .../runner/WorkflowExecuteRunnable.java | 33 +++---- .../src/main/resources/application.yaml | 61 +++++++------ .../server/worker/config/WorkerConfig.java | 5 ++ .../server/worker/rpc/WorkerRpcClient.java | 9 +- .../server/worker/rpc/WorkerRpcServer.java | 2 +- 24 files changed, 298 insertions(+), 298 deletions(-) create mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/ApiConfig.java delete mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AuditConfiguration.java delete mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java delete mode 100644 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java rename dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/{TrafficConfigurationTest.java => ApiConfigTest.java} (56%) delete mode 100644 dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/AuditConfigurationTest.java diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md index 05d9e99ebc45..acbd4c87b0e0 100644 --- a/docs/docs/en/architecture/configuration.md +++ b/docs/docs/en/architecture/configuration.md @@ -259,11 +259,11 @@ Location: `api-server/conf/application.yaml` |security.authentication.ldap.ssl.enable|false|LDAP switch| |security.authentication.ldap.ssl.trust-store|ldapkeystore.jks|LDAP jks file absolute path| |security.authentication.ldap.ssl.trust-store-password|password|LDAP jks password| -|traffic.control.global.switch|false|traffic control global switch| -|traffic.control.max-global-qps-rate|300|global max request number per second| -|traffic.control.tenant-switch|false|traffic control tenant switch| -|traffic.control.default-tenant-qps-rate|10|default tenant max request number per second| -|traffic.control.customize-tenant-qps-rate||customize tenant max request number per second| +|api.traffic.control.global.switch|false|traffic control global switch| +|api.traffic.control.max-global-qps-rate|300|global max request number per second| +|api.traffic.control.tenant-switch|false|traffic control tenant switch| +|api.traffic.control.default-tenant-qps-rate|10|default tenant max request number per second| +|api.traffic.control.customize-tenant-qps-rate||customize tenant max request number per second| ### Master Server related configuration diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md index 3d91a8797dcf..024a037ce009 100644 --- a/docs/docs/zh/architecture/configuration.md +++ b/docs/docs/zh/architecture/configuration.md @@ -257,11 +257,11 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId |security.authentication.ldap.ssl.enable|false|LDAP ssl开关| |security.authentication.ldap.ssl.trust-store|ldapkeystore.jks|LDAP jks文件绝对路径| |security.authentication.ldap.ssl.trust-store-password|password|LDAP jks密码| -|traffic.control.global.switch|false|流量控制全局开关| -|traffic.control.max-global-qps-rate|300|全局最大请求数/秒| -|traffic.control.tenant-switch|false|流量控制租户开关| -|traffic.control.default-tenant-qps-rate|10|默认租户最大请求数/秒限制| -|traffic.control.customize-tenant-qps-rate||自定义租户最大请求数/秒限制| +|api.traffic.control.global.switch|false|流量控制全局开关| +|api.traffic.control.max-global-qps-rate|300|全局最大请求数/秒| +|api.traffic.control.tenant-switch|false|流量控制租户开关| +|api.traffic.control.default-tenant-qps-rate|10|默认租户最大请求数/秒限制| +|api.traffic.control.customize-tenant-qps-rate||自定义租户最大请求数/秒限制| ## Master Server相关配置 diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java index 6e47f871458c..34a5cd8ac0b9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/audit/AuditPublishService.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.api.audit; -import org.apache.dolphinscheduler.api.configuration.AuditConfiguration; +import org.apache.dolphinscheduler.api.configuration.ApiConfig; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -34,20 +34,20 @@ @Slf4j public class AuditPublishService { - private BlockingQueue auditMessageQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue auditMessageQueue = new LinkedBlockingQueue<>(); @Autowired private List subscribers; @Autowired - private AuditConfiguration auditConfiguration; + private ApiConfig apiConfig; /** * create a daemon thread to process the message queue */ @PostConstruct private void init() { - if (auditConfiguration.getEnabled()) { + if (apiConfig.isAuditEnable()) { Thread thread = new Thread(this::doPublish); thread.setDaemon(true); thread.setName("Audit-Log-Consume-Thread"); @@ -61,7 +61,7 @@ private void init() { * @param message audit message */ public void publish(AuditMessage message) { - if (auditConfiguration.getEnabled() && !auditMessageQueue.offer(message)) { + if (apiConfig.isAuditEnable() && !auditMessageQueue.offer(message)) { log.error("Publish audit message failed, message:{}", message); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/ApiConfig.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/ApiConfig.java new file mode 100644 index 000000000000..3a33ad2d5887 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/ApiConfig.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.configuration; + +import java.util.HashMap; +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.validation.Errors; +import org.springframework.validation.Validator; +import org.springframework.validation.annotation.Validated; + +@Slf4j +@Data +@Validated +@Configuration +@ConfigurationProperties(value = "api") +public class ApiConfig implements Validator { + + private boolean auditEnable = false; + + private TrafficConfiguration trafficControl = new TrafficConfiguration(); + + private PythonGatewayConfiguration pythonGateway = new PythonGatewayConfiguration(); + + @Override + public boolean supports(Class clazz) { + return ApiConfig.class.isAssignableFrom(clazz); + } + + @Override + public void validate(Object target, Errors errors) { + printConfig(); + } + + private void printConfig() { + log.info("API config: auditEnable -> {} ", auditEnable); + log.info("API config: trafficControl -> {} ", trafficControl); + log.info("API config: pythonGateway -> {} ", pythonGateway); + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class TrafficConfiguration { + + private boolean globalSwitch = false; + private Integer maxGlobalQpsRate = 300; + private boolean tenantSwitch = false; + private Integer defaultTenantQpsRate = 10; + private Map customizeTenantQpsRate = new HashMap<>(); + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class PythonGatewayConfiguration { + + private boolean enabled = true; + private String gatewayServerAddress = "0.0.0.0"; + private int gatewayServerPort = 25333; + private String pythonAddress = "127.0.0.1"; + private int pythonPort = 25334; + private int connectTimeout = 0; + private int readTimeout = 0; + private String authToken = "jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc"; + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java index 2d6b5f7bdd60..9fde9eec17c0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AppConfiguration.java @@ -50,7 +50,7 @@ public class AppConfiguration implements WebMvcConfigurer { public static final String LOCALE_LANGUAGE_COOKIE = "language"; @Autowired - private TrafficConfiguration trafficConfiguration; + private ApiConfig apiConfig; @Bean public CorsFilter corsFilter() { @@ -90,14 +90,15 @@ public LocaleChangeInterceptor localeChangeInterceptor() { @Bean public RateLimitInterceptor createRateLimitInterceptor() { - return new RateLimitInterceptor(trafficConfiguration); + return new RateLimitInterceptor(apiConfig.getTrafficControl()); } @Override public void addInterceptors(InterceptorRegistry registry) { // i18n registry.addInterceptor(localeChangeInterceptor()); - if (trafficConfiguration.isGlobalSwitch() || trafficConfiguration.isTenantSwitch()) { + ApiConfig.TrafficConfiguration trafficControl = apiConfig.getTrafficControl(); + if (trafficControl.isGlobalSwitch() || trafficControl.isTenantSwitch()) { registry.addInterceptor(createRateLimitInterceptor()); } registry.addInterceptor(loginInterceptor()) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AuditConfiguration.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AuditConfiguration.java deleted file mode 100644 index 451fcbc40ba9..000000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/AuditConfiguration.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.configuration; - -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.stereotype.Component; - -@Component -@EnableConfigurationProperties -@ConfigurationProperties(value = "audit", ignoreUnknownFields = false) -public class AuditConfiguration { - - private boolean enabled; - - public boolean getEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } -} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java deleted file mode 100644 index a3f5f0dad7f1..000000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/PythonGatewayConfiguration.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.configuration; - -import lombok.Data; - -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -@Data -@Configuration -@ConfigurationProperties(value = "python-gateway") -public class PythonGatewayConfiguration { - - private boolean enabled; - private String gatewayServerAddress; - private int gatewayServerPort; - private String pythonAddress; - private int pythonPort; - private int connectTimeout; - private int readTimeout; - private String authToken; -} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java deleted file mode 100644 index 47cb5230f393..000000000000 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/configuration/TrafficConfiguration.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.configuration; - -import java.util.HashMap; -import java.util.Map; - -import lombok.Data; - -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -@Data -@Configuration -@ConfigurationProperties(prefix = "traffic.control") -public class TrafficConfiguration { - - private boolean globalSwitch; - private Integer maxGlobalQpsRate = 300; - private boolean tenantSwitch; - private Integer defaultTenantQpsRate = 10; - private Map customizeTenantQpsRate = new HashMap<>(); -} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java index 22dcafcbdc02..1f0ec80ac868 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptor.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.api.interceptor; -import org.apache.dolphinscheduler.api.configuration.TrafficConfiguration; +import org.apache.dolphinscheduler.api.configuration.ApiConfig; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -47,7 +47,7 @@ @Slf4j public class RateLimitInterceptor implements HandlerInterceptor { - private TrafficConfiguration trafficConfiguration; + private ApiConfig.TrafficConfiguration trafficConfiguration; private RateLimiter globalRateLimiter; @@ -98,7 +98,7 @@ public boolean preHandle(HttpServletRequest request, HttpServletResponse respons return true; } - public RateLimitInterceptor(TrafficConfiguration trafficConfiguration) { + public RateLimitInterceptor(ApiConfig.TrafficConfiguration trafficConfiguration) { this.trafficConfiguration = trafficConfiguration; if (trafficConfiguration.isGlobalSwitch()) { this.globalRateLimiter = diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index fe57f12d16fb..2451a9de6e01 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.api.python; -import org.apache.dolphinscheduler.api.configuration.PythonGatewayConfiguration; +import org.apache.dolphinscheduler.api.configuration.ApiConfig; import org.apache.dolphinscheduler.api.dto.EnvironmentDto; import org.apache.dolphinscheduler.api.dto.resources.ResourceComponent; import org.apache.dolphinscheduler.api.enums.Status; @@ -145,7 +145,7 @@ public class PythonGateway { private DataSourceMapper dataSourceMapper; @Autowired - private PythonGatewayConfiguration pythonGatewayConfiguration; + private ApiConfig apiConfig; @Autowired private ProjectUserMapper projectUserMapper; @@ -689,13 +689,14 @@ public StorageEntity createOrUpdateResource(String userName, String fullName, @PostConstruct public void init() { - if (pythonGatewayConfiguration.isEnabled()) { + if (apiConfig.getPythonGateway().isEnabled()) { this.start(); } } private void start() { try { + ApiConfig.PythonGatewayConfiguration pythonGatewayConfiguration = apiConfig.getPythonGateway(); InetAddress gatewayHost = InetAddress.getByName(pythonGatewayConfiguration.getGatewayServerAddress()); GatewayServerBuilder serverBuilder = new GatewayServer.GatewayServerBuilder() .entryPoint(this) diff --git a/dolphinscheduler-api/src/main/resources/application.yaml b/dolphinscheduler-api/src/main/resources/application.yaml index 5d3154c6f593..1165437972ce 100644 --- a/dolphinscheduler-api/src/main/resources/application.yaml +++ b/dolphinscheduler-api/src/main/resources/application.yaml @@ -115,35 +115,46 @@ registry: block-until-connected: 600ms digest: ~ -audit: - enabled: false +api: + audit-enable: false + # Traffic control, if you turn on this config, the maximum number of request/s will be limited. + # global max request number per second + # default tenant-level max request number + traffic-control: + global-switch: false + max-global-qps-rate: 300 + tenant-switch: false + default-tenant-qps-rate: 10 + #customize-tenant-qps-rate: + # eg. + #tenant1: 11 + #tenant2: 20 + python-gateway: + # Weather enable python gateway server or not. The default value is true. + enabled: true + # Authentication token for connection from python api to python gateway server. Should be changed the default value + # when you deploy in public network. + auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc + # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different + # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost` + gateway-server-address: 0.0.0.0 + # The port of Python gateway server start. Define which port you could connect to Python gateway server from + # Python API side. + gateway-server-port: 25333 + # The address of Python callback client. + python-address: 127.0.0.1 + # The port of Python callback client. + python-port: 25334 + # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite), + # and socket server would never close even though no requests accept + connect-timeout: 0 + # Close each active connection of socket server if python program not active after x milliseconds. Define value is + # (0 = infinite), and socket server would never close even though no requests accept + read-timeout: 0 metrics: enabled: true -python-gateway: - # Weather enable python gateway server or not. The default value is true. - enabled: true - # Authentication token for connection from python api to python gateway server. Should be changed the default value - # when you deploy in public network. - auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc - # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different - # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost` - gateway-server-address: 0.0.0.0 - # The port of Python gateway server start. Define which port you could connect to Python gateway server from - # Python API side. - gateway-server-port: 25333 - # The address of Python callback client. - python-address: 127.0.0.1 - # The port of Python callback client. - python-port: 25334 - # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite), - # and socket server would never close even though no requests accept - connect-timeout: 0 - # Close each active connection of socket server if python program not active after x milliseconds. Define value is - # (0 = infinite), and socket server would never close even though no requests accept - read-timeout: 0 - security: authentication: # Authentication types (supported types: PASSWORD,LDAP,CASDOOR_SSO) @@ -168,21 +179,6 @@ security: trust-store: "/ldapkeystore.jks" trust-store-password: "password" -# Traffic control, if you turn on this config, the maximum number of request/s will be limited. -# global max request number per second -# default tenant-level max request number -traffic: - control: - global-switch: false - max-global-qps-rate: 300 - tenant-switch: false - default-tenant-qps-rate: 10 - #customize-tenant-qps-rate: - # eg. - #tenant1: 11 - #tenant2: 20 - - # Override by profile --- diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/TrafficConfigurationTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/ApiConfigTest.java similarity index 56% rename from dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/TrafficConfigurationTest.java rename to dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/ApiConfigTest.java index d730f98afedf..35650fea1e0d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/TrafficConfigurationTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/ApiConfigTest.java @@ -25,33 +25,33 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -public class TrafficConfigurationTest extends AbstractControllerTest { +public class ApiConfigTest extends AbstractControllerTest { @Autowired - private TrafficConfiguration trafficConfiguration; + private ApiConfig apiConfig; @Test - public void isTrafficGlobalControlSwitch() { - Assertions.assertFalse(trafficConfiguration.isGlobalSwitch()); + public void testIsAuditEnable() { + Assertions.assertTrue(apiConfig.isAuditEnable()); } @Test - public void getMaxGlobalQpsLimit() { - Assertions.assertEquals(300, (int) trafficConfiguration.getMaxGlobalQpsRate()); - } + public void testGetTrafficControlConfig() { + ApiConfig.TrafficConfiguration trafficControl = apiConfig.getTrafficControl(); - @Test - public void isTrafficTenantControlSwitch() { - Assertions.assertFalse(trafficConfiguration.isTenantSwitch()); - } + Assertions.assertFalse(trafficControl.isGlobalSwitch()); + Assertions.assertEquals(299, (int) trafficControl.getMaxGlobalQpsRate()); + Assertions.assertFalse(trafficControl.isTenantSwitch()); + Assertions.assertEquals(9, (int) trafficControl.getDefaultTenantQpsRate()); + Assertions.assertTrue(MapUtils.isEmpty(trafficControl.getCustomizeTenantQpsRate())); - @Test - public void getDefaultTenantQpsLimit() { - Assertions.assertEquals(10, (int) trafficConfiguration.getDefaultTenantQpsRate()); } @Test - public void getCustomizeTenantQpsRate() { - Assertions.assertTrue(MapUtils.isEmpty(trafficConfiguration.getCustomizeTenantQpsRate())); + public void testGetPythonGateway() { + ApiConfig.PythonGatewayConfiguration pythonGateway = apiConfig.getPythonGateway(); + + Assertions.assertFalse(pythonGateway.isEnabled()); } + } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/AuditConfigurationTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/AuditConfigurationTest.java deleted file mode 100644 index a2a3759c0424..000000000000 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/configuration/AuditConfigurationTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.api.configuration; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.ActiveProfiles; - -@ActiveProfiles("audit") -@SpringBootTest(classes = AuditConfiguration.class) -public class AuditConfigurationTest { - - @Autowired - private AuditConfiguration auditConfiguration; - - @Test - public void isAuditGlobalControlSwitch() { - Assertions.assertTrue(auditConfiguration.getEnabled()); - } -} diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java index 9dcd1111e287..a584c41eaf5c 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/interceptor/RateLimitInterceptorTest.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.api.interceptor; -import org.apache.dolphinscheduler.api.configuration.TrafficConfiguration; +import org.apache.dolphinscheduler.api.configuration.ApiConfig; import java.util.HashMap; import java.util.Map; @@ -39,14 +39,14 @@ public class RateLimitInterceptorTest { public void testPreHandleWithoutControl() throws ExecutionException { HttpServletRequest request = Mockito.mock(HttpServletRequest.class); HttpServletResponse response = Mockito.mock(HttpServletResponse.class); - RateLimitInterceptor rateLimitInterceptor = new RateLimitInterceptor(new TrafficConfiguration()); + RateLimitInterceptor rateLimitInterceptor = new RateLimitInterceptor(new ApiConfig.TrafficConfiguration()); Assertions.assertTrue(rateLimitInterceptor.preHandle(request, response, null)); Assertions.assertTrue(rateLimitInterceptor.preHandle(request, response, null)); } @Test public void testPreHandleWithTenantLevenControl() throws ExecutionException { - TrafficConfiguration trafficConfiguration = new TrafficConfiguration(); + ApiConfig.TrafficConfiguration trafficConfiguration = new ApiConfig.TrafficConfiguration(); trafficConfiguration.setTenantSwitch(true); Map map = new HashMap<>(); map.put("tenant1", 2); @@ -70,7 +70,7 @@ public void testPreHandleWithTenantLevenControl() throws ExecutionException { @Test public void testPreHandleWithGlobalControl() throws ExecutionException { - TrafficConfiguration trafficConfiguration = new TrafficConfiguration(); + ApiConfig.TrafficConfiguration trafficConfiguration = new ApiConfig.TrafficConfiguration(); trafficConfiguration.setTenantSwitch(true); trafficConfiguration.setGlobalSwitch(true); trafficConfiguration.setMaxGlobalQpsRate(3); diff --git a/dolphinscheduler-api/src/test/resources/application.yaml b/dolphinscheduler-api/src/test/resources/application.yaml index b071412a6f6a..fda37e4ea477 100644 --- a/dolphinscheduler-api/src/test/resources/application.yaml +++ b/dolphinscheduler-api/src/test/resources/application.yaml @@ -27,5 +27,39 @@ spring: registry: type: zookeeper -audit: - enabled: true \ No newline at end of file +api: + audit-enable: true + # Traffic control, if you turn on this config, the maximum number of request/s will be limited. + # global max request number per second + # default tenant-level max request number + traffic-control: + global-switch: false + max-global-qps-rate: 299 + tenant-switch: false + default-tenant-qps-rate: 9 + #customize-tenant-qps-rate: + # eg. + #tenant1: 11 + #tenant2: 20 + python-gateway: + # Weather enable python gateway server or not. The default value is true. + enabled: false + # Authentication token for connection from python api to python gateway server. Should be changed the default value + # when you deploy in public network. + auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc + # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different + # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost` + gateway-server-address: 0.0.0.0 + # The port of Python gateway server start. Define which port you could connect to Python gateway server from + # Python API side. + gateway-server-port: 25333 + # The address of Python callback client. + python-address: 127.0.0.1 + # The port of Python callback client. + python-port: 25334 + # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite), + # and socket server would never close even though no requests accept + connect-timeout: 0 + # Close each active connection of socket server if python program not active after x milliseconds. Define value is + # (0 = infinite), and socket server would never close even though no requests accept + read-timeout: 0 \ No newline at end of file diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index dadd02fbf0ca..1bbe91935373 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.scheduler.api.SchedulerApi; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer; +import org.apache.dolphinscheduler.server.master.rpc.MasterRpcClient; import org.apache.dolphinscheduler.server.master.runner.EventExecuteService; import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap; @@ -73,6 +74,9 @@ public class MasterServer implements IStoppable { @Autowired private MasterRPCServer masterRPCServer; + @Autowired + private MasterRpcClient masterRpcClient; + public static void main(String[] args) { Thread.currentThread().setName(Constants.THREAD_NAME_MASTER_SERVER); SpringApplication.run(MasterServer.class); @@ -85,6 +89,7 @@ public static void main(String[] args) { public void run() throws SchedulerException { // init rpc server this.masterRPCServer.start(); + this.masterRpcClient.start(); // install task plugin this.taskPluginManager.loadPlugin(); @@ -125,6 +130,7 @@ public void close(String cause) { SchedulerApi closedSchedulerApi = schedulerApi; MasterSchedulerBootstrap closedSchedulerBootstrap = masterSchedulerBootstrap; MasterRPCServer closedRpcServer = masterRPCServer; + MasterRpcClient closedRpcClient = masterRpcClient; MasterRegistryClient closedMasterRegistryClient = masterRegistryClient; // close spring Context and will invoke method with @PreDestroy annotation to destroy beans. // like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index d65a3fe52aef..05a79d1170a6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -20,6 +20,8 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector; import org.apache.dolphinscheduler.server.master.processor.queue.TaskExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; @@ -97,6 +99,10 @@ public class MasterConfig implements Validator { private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L); + private NettyClientConfig masterRpcClientConfig = new NettyClientConfig(); + + private NettyServerConfig masterRpcServerConfig = new NettyServerConfig(); + // ip:listenPort private String masterAddress; @@ -177,5 +183,7 @@ private void printConfig() { log.info("Master config: masterAddress -> {} ", masterAddress); log.info("Master config: masterRegistryPath -> {} ", masterRegistryPath); log.info("Master config: workerGroupRefreshInterval -> {} ", workerGroupRefreshInterval); + log.info("Master config: masterRpcServerConfig -> {} ", masterRpcServerConfig); + log.info("Master config: masterRpcClientConfig -> {} ", masterRpcClientConfig); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java index ac9d4da19d2a..f5cadb7339e7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRPCServer.java @@ -47,7 +47,7 @@ public class MasterRPCServer implements AutoCloseable { public void start() { log.info("Starting Master RPC Server..."); // init remoting server - NettyServerConfig serverConfig = new NettyServerConfig(); + NettyServerConfig serverConfig = masterConfig.getMasterRpcServerConfig(); serverConfig.setListenPort(masterConfig.getListenPort()); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); for (MasterRpcProcessor masterRpcProcessor : masterRpcProcessors) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcClient.java index 225b27b0601a..15d976cbee15 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterRpcClient.java @@ -19,25 +19,28 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Message; -import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component -public class MasterRpcClient { +public class MasterRpcClient implements AutoCloseable { - private final NettyRemotingClient client; + @Autowired + private MasterConfig masterConfig; private static final long DEFAULT_TIME_OUT_MILLS = 10_000L; + private NettyRemotingClient client; - public MasterRpcClient() { - client = new NettyRemotingClient(new NettyClientConfig()); + public void start() { + client = new NettyRemotingClient(masterConfig.getMasterRpcClientConfig()); log.info("Success initialized MasterRPCClient..."); } @@ -46,7 +49,15 @@ public Message sendSyncCommand(@NonNull Host host, return client.sendSync(host, rpcMessage, DEFAULT_TIME_OUT_MILLS); } - public void send(Host of, Message message) throws RemotingException { - client.send(of, message); + public void send(@NonNull Host host, @NonNull Message message) throws RemotingException { + client.send(host, message); } + + @Override + public void close() { + if (client != null) { + client.close(); + } + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index e60da71472c2..f7051dbabc59 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -47,7 +47,6 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -244,19 +243,10 @@ public class WorkflowExecuteRunnable implements Callable { private final CuringParamsService curingParamsService; - private final String masterAddress; - private final DefaultTaskExecuteRunnableFactory defaultTaskExecuteRunnableFactory; - /** - * @param processInstance processInstance - * @param processService processService - * @param processInstanceDao processInstanceDao - * @param masterRpcClient masterRpcClient - * @param processAlertManager processAlertManager - * @param masterConfig masterConfig - * @param stateWheelExecuteThread stateWheelExecuteThread - */ + private final MasterConfig masterConfig; + public WorkflowExecuteRunnable( @NonNull ProcessInstance processInstance, @NonNull CommandService commandService, @@ -275,12 +265,12 @@ public WorkflowExecuteRunnable( this.processInstanceDao = processInstanceDao; this.processInstance = processInstance; this.masterRpcClient = masterRpcClient; + this.masterConfig = masterConfig; this.processAlertManager = processAlertManager; this.stateWheelExecuteThread = stateWheelExecuteThread; this.curingParamsService = curingParamsService; this.taskInstanceDao = taskInstanceDao; this.taskDefinitionLogDao = taskDefinitionLogDao; - this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort()); this.defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory; this.processDefinition = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion()); @@ -480,7 +470,7 @@ public void taskFinished(TaskInstance taskInstance) throws StateEventHandleExcep taskInstance.getState()); this.updateProcessInstanceState(); - sendTaskLogOnMasterToRemoteIfNeeded(taskInstance.getLogPath(), taskInstance.getHost()); + sendTaskLogOnMasterToRemoteIfNeeded(taskInstance); } catch (Exception ex) { log.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskSet", ex); // remove the task from complete map, so that we can finish in the next time. @@ -1424,7 +1414,8 @@ private boolean tryToTakeOverTaskInstance(TaskInstance taskInstance) { try { Message message = masterRpcClient.sendSyncCommand(Host.of(taskInstance.getHost()), - new WorkflowHostChangeRequest(taskInstance.getId(), masterAddress).convert2Command()); + new WorkflowHostChangeRequest(taskInstance.getId(), masterConfig.getMasterAddress()) + .convert2Command()); if (message == null) { log.error( "Takeover task instance failed, the worker {} might not be alive, will try to create a new task instance", @@ -2237,17 +2228,13 @@ private enum WorkflowRunnableStatus { } - private void sendTaskLogOnMasterToRemoteIfNeeded(String logPath, String host) { - if (RemoteLogUtils.isRemoteLoggingEnable() && isExecutedOnMaster(host)) { - RemoteLogUtils.sendRemoteLog(logPath); - log.info("Master sends task log {} to remote storage asynchronously.", logPath); + private void sendTaskLogOnMasterToRemoteIfNeeded(TaskInstance taskInstance) { + if (RemoteLogUtils.isRemoteLoggingEnable() && TaskUtils.isMasterTask(taskInstance.getTaskType())) { + RemoteLogUtils.sendRemoteLog(taskInstance.getLogPath()); + log.info("Master sends task log {} to remote storage asynchronously.", taskInstance.getLogPath()); } } - private boolean isExecutedOnMaster(String host) { - return host.endsWith(masterAddress.split(Constants.COLON)[1]); - } - private void mergeTaskInstanceVarPool(TaskInstance taskInstance) { String taskVarPoolJson = taskInstance.getVarPool(); if (StringUtils.isEmpty(taskVarPoolJson)) { diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index 27522f3937b2..324725351139 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -181,28 +181,42 @@ alert: wait-timeout: 0 heartbeat-interval: 60s -python-gateway: - # Weather enable python gateway server or not. The default value is true. - enabled: true - # Authentication token for connection from python api to python gateway server. Should be changed the default value - # when you deploy in public network. - auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc - # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different - # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost` - gateway-server-address: 0.0.0.0 - # The port of Python gateway server start. Define which port you could connect to Python gateway server from - # Python API side. - gateway-server-port: 25333 - # The address of Python callback client. - python-address: 127.0.0.1 - # The port of Python callback client. - python-port: 25334 - # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite), - # and socket server would never close even though no requests accept - connect-timeout: 0 - # Close each active connection of socket server if python program not active after x milliseconds. Define value is - # (0 = infinite), and socket server would never close even though no requests accept - read-timeout: 0 +api: + audit-enable: false + # Traffic control, if you turn on this config, the maximum number of request/s will be limited. + # global max request number per second + # default tenant-level max request number + traffic-control: + global-switch: false + max-global-qps-rate: 300 + tenant-switch: false + default-tenant-qps-rate: 10 + #customize-tenant-qps-rate: + # eg. + #tenant1: 11 + #tenant2: 20 + python-gateway: + # Weather enable python gateway server or not. The default value is true. + enabled: true + # Authentication token for connection from python api to python gateway server. Should be changed the default value + # when you deploy in public network. + auth-token: jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc + # The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different + # between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost` + gateway-server-address: 0.0.0.0 + # The port of Python gateway server start. Define which port you could connect to Python gateway server from + # Python API side. + gateway-server-port: 25333 + # The address of Python callback client. + python-address: 127.0.0.1 + # The port of Python callback client. + python-port: 25334 + # Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite), + # and socket server would never close even though no requests accept + connect-timeout: 0 + # Close each active connection of socket server if python program not active after x milliseconds. Define value is + # (0 = infinite), and socket server would never close even though no requests accept + read-timeout: 0 server: port: 12345 @@ -234,9 +248,6 @@ management: tags: application: ${spring.application.name} -audit: - enabled: true - metrics: enabled: true diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 6d1d776bc887..ae1ce70163ba 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -21,6 +21,8 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties; +import org.apache.dolphinscheduler.remote.config.NettyClientConfig; +import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import java.time.Duration; @@ -50,6 +52,9 @@ public class WorkerConfig implements Validator { private double reservedMemory = 0.1; private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties(); + private NettyClientConfig workerRpcClientConfig = new NettyClientConfig(); + private NettyServerConfig workerRpcServerConfig = new NettyServerConfig(); + /** * This field doesn't need to set at config file, it will be calculated by workerIp:listenPort */ diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java index adf5cbccbeb8..e7a025546ec6 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcClient.java @@ -19,10 +19,10 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Message; -import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.remote.processor.WorkerRpcProcessor; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import java.util.List; @@ -43,16 +43,17 @@ public class WorkerRpcClient implements AutoCloseable { @Lazy private List workerRpcProcessors; + @Autowired + private WorkerConfig workerConfig; + private NettyRemotingClient nettyRemotingClient; public void start() { log.info("Worker rpc client starting"); - NettyClientConfig nettyClientConfig = new NettyClientConfig(); - this.nettyRemotingClient = new NettyRemotingClient(nettyClientConfig); + this.nettyRemotingClient = new NettyRemotingClient(workerConfig.getWorkerRpcClientConfig()); // we only use the client to handle the ack message, we can optimize this, send ack to the nettyServer. for (WorkerRpcProcessor workerRpcProcessor : workerRpcProcessors) { this.nettyRemotingClient.registerProcessor(workerRpcProcessor); - } log.info("Worker rpc client started"); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java index 94b509498a3a..cf81ad4f55d0 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java @@ -44,7 +44,7 @@ public class WorkerRpcServer implements Closeable { public void start() { log.info("Worker rpc server starting..."); - NettyServerConfig serverConfig = new NettyServerConfig(); + NettyServerConfig serverConfig = workerConfig.getWorkerRpcServerConfig(); serverConfig.setListenPort(workerConfig.getListenPort()); nettyRemotingServer = new NettyRemotingServer(serverConfig); for (WorkerRpcProcessor workerRpcProcessor : workerRpcProcessors) {