Skip to content

Commit

Permalink
[improve] update thread pool nums policy (apache#2606)
Browse files Browse the repository at this point in the history
Signed-off-by: tomsun28 <[email protected]>
Co-authored-by: shown <[email protected]>
Co-authored-by: Calvin <[email protected]>
  • Loading branch information
3 people authored Aug 31, 2024
1 parent 2763ee4 commit 049c093
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.hertzbeat.alert;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -50,11 +50,11 @@ private void initWorkExecutor() {
.setDaemon(true)
.setNameFormat("alerter-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(6,
workerExecutor = new ThreadPoolExecutor(10,
10,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new LinkedBlockingQueue<>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
Expand All @@ -69,10 +69,10 @@ private void initNotifyExecutor() {
.setNameFormat("notify-worker-%d")
.build();
notifyExecutor = new ThreadPoolExecutor(6,
10,
6,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new LinkedBlockingQueue<>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
public class CalculateAlarm {

private static final String SYSTEM_VALUE_ROW_COUNT = "system_value_row_count";

private static final int CALCULATE_THREADS = 3;

/**
* The alarm in the process is triggered
Expand Down Expand Up @@ -129,9 +131,9 @@ private void startCalculate() {
}
}
};
workerPool.executeJob(runnable);
workerPool.executeJob(runnable);
workerPool.executeJob(runnable);
for (int i = 0; i < CALCULATE_THREADS; i++) {
workerPool.executeJob(runnable);
}
}

private void calculate(CollectRep.MetricsData metricsData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ private void initWorkExecutor() {
.setDaemon(true)
.setNameFormat("collect-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(100,
1024,
int coreSize = Math.max(2, Runtime.getRuntime().availableProcessors());
int maxSize = Runtime.getRuntime().availableProcessors() * 16;
workerExecutor = new ThreadPoolExecutor(coreSize,
maxSize,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public interface CommonConstants {
/**
* Response status code: Incorrect login account password
*/
byte MONITOR_LOGIN_FAILED_CODE = 0x05;
byte LOGIN_FAILED_CODE = 0x05;

/**
* Monitoring status 0: Paused, 1: Up, 2: Down
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private void initWorkExecutor() {
.setDaemon(true)
.setNameFormat("common-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(2,
workerExecutor = new ThreadPoolExecutor(1,
Integer.MAX_VALUE,
10,
TimeUnit.SECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,20 @@

package org.apache.hertzbeat.common.support;

import java.lang.reflect.Field;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.lang.reflect.Field;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
* test for {@link CommonThreadPool}
Expand Down Expand Up @@ -101,7 +99,7 @@ public void testInitialization() throws Exception {
ThreadPoolExecutor workerExecutor = (ThreadPoolExecutor) workerExecutorField.get(pool);

assertNotNull(workerExecutor);
assertEquals(2, workerExecutor.getCorePoolSize());
assertEquals(1, workerExecutor.getCorePoolSize());
assertEquals(Integer.MAX_VALUE, workerExecutor.getMaximumPoolSize());
assertEquals(10, workerExecutor.getKeepAliveTime(TimeUnit.SECONDS));
assertTrue(workerExecutor.getQueue() instanceof SynchronousQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.hertzbeat.manager.controller;

import static org.apache.hertzbeat.common.constants.CommonConstants.MONITOR_LOGIN_FAILED_CODE;
import static org.apache.hertzbeat.common.constants.CommonConstants.LOGIN_FAILED_CODE;
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
import io.jsonwebtoken.ExpiredJwtException;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
Expand Down Expand Up @@ -58,7 +59,7 @@ public ResponseEntity<Message<Map<String, String>>> authGetToken(@Valid @Request
try {
return ResponseEntity.ok(Message.success(accountService.authGetToken(loginDto)));
} catch (AuthenticationException e) {
return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, e.getMessage()));
return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, e.getMessage()));
}
}

Expand All @@ -70,10 +71,13 @@ public ResponseEntity<Message<RefreshTokenResponse>> refreshToken(
try {
return ResponseEntity.ok(Message.success(accountService.refreshToken(refreshToken)));
} catch (AuthenticationException e) {
return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, e.getMessage()));
return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, e.getMessage()));
} catch (ExpiredJwtException expiredJwtException) {
log.warn("{}", expiredJwtException.getMessage());
return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, "Refresh Token Expired"));
} catch (Exception e) {
log.error("Exception occurred during token refresh: {}", e.getClass().getName(), e);
return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, "Refresh Token Expired or Error"));
return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, "Refresh Token Error"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void authGetToken() throws Exception {
this.mockMvc.perform(MockMvcRequestBuilders.post("/api/account/auth/form")
.contentType(MediaType.APPLICATION_JSON)
.content(JsonUtil.toJson(loginDto)))
.andExpect(jsonPath("$.code").value((int) CommonConstants.MONITOR_LOGIN_FAILED_CODE))
.andExpect(jsonPath("$.code").value((int) CommonConstants.LOGIN_FAILED_CODE))
.andReturn();
}

Expand All @@ -95,7 +95,7 @@ void refreshToken() throws Exception {
Mockito.when(accountService.refreshToken(refreshToken)).thenThrow(new AuthenticationException());
this.mockMvc.perform(MockMvcRequestBuilders.get("/api/account/auth/refresh/{refreshToken}",
refreshToken))
.andExpect(jsonPath("$.code").value((int) CommonConstants.MONITOR_LOGIN_FAILED_CODE))
.andExpect(jsonPath("$.code").value((int) CommonConstants.LOGIN_FAILED_CODE))
.andReturn();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
@Slf4j
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {

private static final int DEFAULT_WORKER_THREAD_NUM = Math.min(4, Runtime.getRuntime().availableProcessors());

private final NettyClientConfig nettyClientConfig;

private final CommonThreadPool threadPool;
Expand Down Expand Up @@ -79,7 +81,9 @@ public void start() {
.setDaemon(true)
.setNameFormat("netty-client-worker-%d")
.build();
this.workerGroup = new NioEventLoopGroup(threadFactory);
String envThreadNum = System.getProperty("hertzbeat.client.worker.thread.num");
int workerThreadNum = envThreadNum != null ? Integer.parseInt(envThreadNum) : DEFAULT_WORKER_THREAD_NUM;
this.workerGroup = new NioEventLoopGroup(workerThreadNum, threadFactory);
this.bootstrap.group(workerGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.nettyClientConfig.getConnectTimeoutMillis())
.channel(NioSocketChannel.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public NettyRemotingServer(final NettyServerConfig nettyServerConfig,

@Override
public void start() {

this.threadPool.execute(() -> {
int port = this.nettyServerConfig.getPort();
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ private void initWorkExecutor() {
.setDaemon(true)
.setNameFormat("warehouse-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(6,
10,
workerExecutor = new ThreadPoolExecutor(2,
Integer.MAX_VALUE,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
Expand Down

0 comments on commit 049c093

Please sign in to comment.