diff --git a/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java b/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java index 78afb145767..560d0236204 100644 --- a/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java +++ b/alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java @@ -18,8 +18,8 @@ package org.apache.hertzbeat.alert; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -50,11 +50,11 @@ private void initWorkExecutor() { .setDaemon(true) .setNameFormat("alerter-worker-%d") .build(); - workerExecutor = new ThreadPoolExecutor(6, + workerExecutor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, - new SynchronousQueue<>(), + new LinkedBlockingQueue<>(), threadFactory, new ThreadPoolExecutor.AbortPolicy()); } @@ -69,10 +69,10 @@ private void initNotifyExecutor() { .setNameFormat("notify-worker-%d") .build(); notifyExecutor = new ThreadPoolExecutor(6, - 10, + 6, 10, TimeUnit.SECONDS, - new SynchronousQueue<>(), + new LinkedBlockingQueue<>(), threadFactory, new ThreadPoolExecutor.AbortPolicy()); } diff --git a/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java index ec75b4c5146..858c9e20b18 100644 --- a/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java +++ b/alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CalculateAlarm.java @@ -69,6 +69,8 @@ public class CalculateAlarm { private static final String SYSTEM_VALUE_ROW_COUNT = "system_value_row_count"; + + private static final int CALCULATE_THREADS = 3; /** * The alarm in the process is triggered @@ -129,9 +131,9 @@ private void startCalculate() { } } }; - workerPool.executeJob(runnable); - workerPool.executeJob(runnable); - workerPool.executeJob(runnable); + for (int i = 0; i < CALCULATE_THREADS; i++) { + workerPool.executeJob(runnable); + } } private void calculate(CollectRep.MetricsData metricsData) { diff --git a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java index 68c50f08775..73fb391ba04 100644 --- a/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java +++ b/collector/src/main/java/org/apache/hertzbeat/collector/dispatch/WorkerPool.java @@ -50,8 +50,10 @@ private void initWorkExecutor() { .setDaemon(true) .setNameFormat("collect-worker-%d") .build(); - workerExecutor = new ThreadPoolExecutor(100, - 1024, + int coreSize = Math.max(2, Runtime.getRuntime().availableProcessors()); + int maxSize = Runtime.getRuntime().availableProcessors() * 16; + workerExecutor = new ThreadPoolExecutor(coreSize, + maxSize, 10, TimeUnit.SECONDS, new SynchronousQueue<>(), diff --git a/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java b/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java index e9d3a75ef59..166d812d74c 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java +++ b/common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java @@ -55,7 +55,7 @@ public interface CommonConstants { /** * Response status code: Incorrect login account password */ - byte MONITOR_LOGIN_FAILED_CODE = 0x05; + byte LOGIN_FAILED_CODE = 0x05; /** * Monitoring status 0: Paused, 1: Up, 2: Down diff --git a/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java b/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java index 159d048b353..4a231230f4f 100644 --- a/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java +++ b/common/src/main/java/org/apache/hertzbeat/common/support/CommonThreadPool.java @@ -49,7 +49,7 @@ private void initWorkExecutor() { .setDaemon(true) .setNameFormat("common-worker-%d") .build(); - workerExecutor = new ThreadPoolExecutor(2, + workerExecutor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, diff --git a/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java b/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java index 211b3ed280b..daa09493a88 100644 --- a/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java +++ b/common/src/test/java/org/apache/hertzbeat/common/support/CommonThreadPoolTest.java @@ -17,15 +17,6 @@ package org.apache.hertzbeat.common.support; -import java.lang.reflect.Field; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -33,6 +24,13 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import java.lang.reflect.Field; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; /** * test for {@link CommonThreadPool} @@ -101,7 +99,7 @@ public void testInitialization() throws Exception { ThreadPoolExecutor workerExecutor = (ThreadPoolExecutor) workerExecutorField.get(pool); assertNotNull(workerExecutor); - assertEquals(2, workerExecutor.getCorePoolSize()); + assertEquals(1, workerExecutor.getCorePoolSize()); assertEquals(Integer.MAX_VALUE, workerExecutor.getMaximumPoolSize()); assertEquals(10, workerExecutor.getKeepAliveTime(TimeUnit.SECONDS)); assertTrue(workerExecutor.getQueue() instanceof SynchronousQueue); diff --git a/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java b/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java index f01731cab0a..23ae16d77bc 100644 --- a/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java +++ b/manager/src/main/java/org/apache/hertzbeat/manager/controller/AccountController.java @@ -17,8 +17,9 @@ package org.apache.hertzbeat.manager.controller; -import static org.apache.hertzbeat.common.constants.CommonConstants.MONITOR_LOGIN_FAILED_CODE; +import static org.apache.hertzbeat.common.constants.CommonConstants.LOGIN_FAILED_CODE; import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; +import io.jsonwebtoken.ExpiredJwtException; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.tags.Tag; @@ -58,7 +59,7 @@ public ResponseEntity>> authGetToken(@Valid @Request try { return ResponseEntity.ok(Message.success(accountService.authGetToken(loginDto))); } catch (AuthenticationException e) { - return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, e.getMessage())); + return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, e.getMessage())); } } @@ -70,10 +71,13 @@ public ResponseEntity> refreshToken( try { return ResponseEntity.ok(Message.success(accountService.refreshToken(refreshToken))); } catch (AuthenticationException e) { - return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, e.getMessage())); + return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, e.getMessage())); + } catch (ExpiredJwtException expiredJwtException) { + log.warn("{}", expiredJwtException.getMessage()); + return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, "Refresh Token Expired")); } catch (Exception e) { log.error("Exception occurred during token refresh: {}", e.getClass().getName(), e); - return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, "Refresh Token Expired or Error")); + return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, "Refresh Token Error")); } } } diff --git a/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java b/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java index cb50ea0e18b..6a3befb0b7d 100644 --- a/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java +++ b/manager/src/test/java/org/apache/hertzbeat/manager/controller/AccountControllerTest.java @@ -85,7 +85,7 @@ void authGetToken() throws Exception { this.mockMvc.perform(MockMvcRequestBuilders.post("/api/account/auth/form") .contentType(MediaType.APPLICATION_JSON) .content(JsonUtil.toJson(loginDto))) - .andExpect(jsonPath("$.code").value((int) CommonConstants.MONITOR_LOGIN_FAILED_CODE)) + .andExpect(jsonPath("$.code").value((int) CommonConstants.LOGIN_FAILED_CODE)) .andReturn(); } @@ -95,7 +95,7 @@ void refreshToken() throws Exception { Mockito.when(accountService.refreshToken(refreshToken)).thenThrow(new AuthenticationException()); this.mockMvc.perform(MockMvcRequestBuilders.get("/api/account/auth/refresh/{refreshToken}", refreshToken)) - .andExpect(jsonPath("$.code").value((int) CommonConstants.MONITOR_LOGIN_FAILED_CODE)) + .andExpect(jsonPath("$.code").value((int) CommonConstants.LOGIN_FAILED_CODE)) .andReturn(); } } diff --git a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java index 779be270f62..a01c7fab804 100644 --- a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingClient.java @@ -50,6 +50,8 @@ @Slf4j public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { + private static final int DEFAULT_WORKER_THREAD_NUM = Math.min(4, Runtime.getRuntime().availableProcessors()); + private final NettyClientConfig nettyClientConfig; private final CommonThreadPool threadPool; @@ -79,7 +81,9 @@ public void start() { .setDaemon(true) .setNameFormat("netty-client-worker-%d") .build(); - this.workerGroup = new NioEventLoopGroup(threadFactory); + String envThreadNum = System.getProperty("hertzbeat.client.worker.thread.num"); + int workerThreadNum = envThreadNum != null ? Integer.parseInt(envThreadNum) : DEFAULT_WORKER_THREAD_NUM; + this.workerGroup = new NioEventLoopGroup(workerThreadNum, threadFactory); this.bootstrap.group(workerGroup) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.nettyClientConfig.getConnectTimeoutMillis()) .channel(NioSocketChannel.class) diff --git a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java index a25b9b90157..48062fadb9b 100644 --- a/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/hertzbeat/remoting/netty/NettyRemotingServer.java @@ -77,7 +77,6 @@ public NettyRemotingServer(final NettyServerConfig nettyServerConfig, @Override public void start() { - this.threadPool.execute(() -> { int port = this.nettyServerConfig.getPort(); ThreadFactory bossThreadFactory = new ThreadFactoryBuilder() diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java index 1c302455456..14b77f9ba03 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/WarehouseWorkerPool.java @@ -48,8 +48,8 @@ private void initWorkExecutor() { .setDaemon(true) .setNameFormat("warehouse-worker-%d") .build(); - workerExecutor = new ThreadPoolExecutor(6, - 10, + workerExecutor = new ThreadPoolExecutor(2, + Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<>(),