Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

引入 rabbitmq adapter 后,使用 @RabbitListener 等注解预先定义好监听后,在项目启动后,再使用 RabbitListenerEndpointRegistry 注册监听时,出现 [java.util.concurrent.RejectedExecutionException] #415

Open
457352727 opened this issue Mar 18, 2024 · 2 comments
Labels
question Further information is requested

Comments

@457352727
Copy link

版本信息

  • Jdk版本:1.8.0_331
  • SpringBoot版本:2.1.9.RELEASE
  • DynamicTp版本:1.1.6.1
  • 配置中心版本:Nacos 2.1.2.RELEASE

问题描述

引入 rabbitmq adapter 后,使用 @RabbitListener 等注解预先定义好监听后,在项目启动后,再使用 RabbitListenerEndpointRegistry 注册监听时,出现 [java.util.concurrent.RejectedExecutionException]。

  • 配置文件:
spring:
  cloud:
    nacos:
      discovery:
        server-addr: ${NACOS_HOST:127.0.0.1}:${NACOS_PORT:8848}
        heart-beat-timeout: 60
        watch-delay: 30000
        namespace: ${NACOS_NAMESPACE:rs10}
        group: business
        metadata:
          name: ${spring.application.name}
          version: @project.version@
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.3.111:3306/db_1
    username: root
    password: 1
  redis:
    host: ${REDIS_HOST:192.168.3.111}
    port: ${REDIS_PORT:6379}
    password: ${REDIS_PASSWORD:123456}
  rabbitmq:
    host: ${RABBIT_MQ_HOST:192.168.3.111}
    port: ${RABBIT_MQ_PORT:5672}
    username: ${RABBIT_MQ_USERNAME:server_user}
    password: ${RABBIT_MQ_PASSWORD:123456}
    virtual-host: "/"
    publisher-confirms: true
    publisher-returns: true
    cache:
      channel:
        size: 20
    template:
      retry:
        enabled: true
        initial-interval: 2000
      mandatory: true
    listener:
      type: direct
      direct:
        acknowledge-mode: manual
        retry:
          enabled: true
  mvc:
    throw-exception-if-no-handler-found: true
  dynamic:
    tp:
      enabled: true
      enabledBanner: false
      enabledCollect: true
      collectorTypes: micrometer
      monitorInterval: 5
      rabbitmqTp:
        - threadPoolName: 'rabbitMqTp#rabbitConnectionFactory'
          threadPoolAliasName: rabbitmq-sub-pool
          corePoolSize: 30
          maximumPoolSize: 60
          keepAliveTime: 60
          unit: seconds
          runTimeout: 10000
          queueTimeout: 10000
          notifyEnabled: false
        - threadPoolName: 'rabbitMqTp#rabbitConnectionFactory#publish'
          threadPoolAliasName: rabbitmq-pub-pool
          corePoolSize: 30
          maximumPoolSize: 60
          keepAliveTime: 60
          unit: seconds
          runTimeout: 10000
          queueTimeout: 10000
          notifyEnabled: false
      executors:
        - threadPoolName: 'dtpExecutor'
          threadPoolAliasName: dtp-executor
          executorType: common
          corePoolSize: 10
          maximumPoolSize: 20
          queueCapacity: 1024
          queueType: VariableLinkedBlockingQueue
          rejectedHandlerType: CallerRunsPolicy
          keepAliveTime: 60
          threadNamePrefix: dtp-exec
          allowCoreThreadTimeOut: false
          waitForTasksToCompleteOnShutdown: true
          awaitTerminationSeconds: 10
          preStartAllCoreThreads: false
          runTimeout: 10000
          queueTimeout: 10000
          taskWrapperNames:
            - "ttl"
            - "mdc"
          notifyEnabled: false
logging:
  level:
    com.rs.cloud: trace
    com.alibaba.nacos.client.config.impl.ClientWorker: warn
    org.springframework.amqp: trace
    org.springframework.retry: trace
  • 代码使用步骤:
  1. 设置 AbstractConnectionFactory 的 Executor
import org.dromara.dynamictp.adapter.rabbitmq.RabbitMqDtpAdapter;
import org.dromara.dynamictp.common.entity.TpExecutorProps;
import org.dromara.dynamictp.common.properties.DtpProperties;
import org.dromara.dynamictp.core.executor.DtpExecutor;
import org.dromara.dynamictp.core.support.DtpLifecycleSupport;
import org.dromara.dynamictp.core.support.ExecutorWrapper;
import org.dromara.dynamictp.core.support.ThreadPoolExecutorProxy;
import org.dromara.dynamictp.starter.adapter.rabbitmq.autoconfigure.RabbitMqTpAutoConfiguration;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SimplePropertyValueConnectionNameStrategy;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
@AutoConfigureAfter({RabbitAutoConfiguration.class})
@AutoConfigureBefore({RabbitMqTpAutoConfiguration.class})
@EnableRabbit
public class AmqpConfig {

    private static final AtomicInteger NUMBER = new AtomicInteger(0);
    private static final String DEFAULT_NAME = "rabbitmqPool";
    private static final String DEFAULT_ALIAS_NAME_PREFIX = "rabbitmq-pool-";
    private final Environment environment;
    private final AbstractConnectionFactory connectionFactory;
    private final RabbitTemplate rabbitTemplate;
    private final RabbitMqDtpAdapter rabbitMqDtpAdapter;
    private final DirectRabbitListenerContainerFactory listenerContainerFactory;

    public AmqpConfig(Environment environment, AbstractConnectionFactory connectionFactory,
                      RabbitTemplate rabbitTemplate, RabbitMqDtpAdapter rabbitMqDtpAdapter,
                      DirectRabbitListenerContainerFactory listenerContainerFactory) {
        this.environment = environment;
        this.connectionFactory = connectionFactory;
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitMqDtpAdapter = rabbitMqDtpAdapter;
        this.listenerContainerFactory = listenerContainerFactory;
    }

    @PostConstruct
    public void afterPropertiesSet() {
        SimplePropertyValueConnectionNameStrategy cns = new SimplePropertyValueConnectionNameStrategy("spring.application.name");
        cns.setEnvironment(environment);
        connectionFactory.setConnectionNameStrategy(cns);

        ThreadPoolExecutor subscribePool = rabbitmqSubscribePool();
        System.out.println(subscribePool);

        connectionFactory.setExecutor(subscribePool);

        // set dynamic-tp proxy
        String publishTpName = generatePublishThreadPoolName();
        ThreadPoolExecutor rabbitmqPublishPool = rabbitmqPublishPool();
        ThreadPoolExecutorProxy proxy = new ThreadPoolExecutorProxy(rabbitmqPublishPool);
        ConnectionFactory publisherConnectionFactory = connectionFactory.getPublisherConnectionFactory();
        if (publisherConnectionFactory instanceof AbstractConnectionFactory) {
            ((AbstractConnectionFactory) publisherConnectionFactory).setExecutor(proxy);
        }
        Map<String, ExecutorWrapper> executorWrappers = rabbitMqDtpAdapter.getExecutorWrappers();
        executorWrappers.put(publishTpName, new ExecutorWrapper(publishTpName, proxy));
        DtpLifecycleSupport.shutdownGracefulAsync(rabbitmqPublishPool, publishTpName, 5);

        rabbitTemplate.setUsePublisherConnection(true);
        rabbitTemplate.setConfirmCallback(new AmqpConfirmCallback());
        rabbitTemplate.setReturnCallback(new AmqpReturnCallback());

        listenerContainerFactory.setConnectionFactory(connectionFactory);
    }

    private ThreadPoolExecutor rabbitmqSubscribePool() {
        return assembleThreadPoolExecutor("rabbitMqTp#rabbitConnectionFactory");
    }

    private ThreadPoolExecutor rabbitmqPublishPool() {
        return assembleThreadPoolExecutor(generatePublishThreadPoolName());
    }

    private ThreadPoolExecutor assembleThreadPoolExecutor(String name) {
        List<TpExecutorProps> propsList = DtpProperties.getInstance().getRabbitmqTp();

        if (Objects.isNull(propsList)) {
            return getDefaultThreadPoolExecutor();
        }

        TpExecutorProps props = propsList.stream()
                .filter(i -> name.equals(i.getThreadPoolName()))
                .findFirst()
                .orElse(null);

        if (Objects.isNull(props)) {
            return getDefaultThreadPoolExecutor();
        }

        DtpExecutor dtpExecutor = new DtpExecutor(props.getCorePoolSize(), props.getMaximumPoolSize(), props.getKeepAliveTime(), props.getUnit(),
                new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        dtpExecutor.setThreadPoolName(props.getThreadPoolName());
        dtpExecutor.setThreadPoolAliasName(props.getThreadPoolAliasName());
        dtpExecutor.setRunTimeout(props.getRunTimeout());
        dtpExecutor.setQueueTimeout(props.getQueueTimeout());
        dtpExecutor.setNotifyEnabled(props.isNotifyEnabled());
        return dtpExecutor;
    }

    private ThreadPoolExecutor getDefaultThreadPoolExecutor() {
        DtpExecutor dtpExecutor = new DtpExecutor(30, 60, 60, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
        int i = NUMBER.getAndIncrement();
        dtpExecutor.setThreadPoolName(DEFAULT_NAME + i);
        dtpExecutor.setThreadPoolAliasName(DEFAULT_ALIAS_NAME_PREFIX + i);
        dtpExecutor.setRunTimeout(2000);
        dtpExecutor.setQueueTimeout(2000);
        dtpExecutor.setNotifyEnabled(false);
        return dtpExecutor;
    }

    public static String generatePublishThreadPoolName() {
        return "rabbitMqTp#rabbitConnectionFactory#publish";
    }
}
  1. 定义监听
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
@Order
@Slf4j
public class DefaultDeadLetterListener {

    @RabbitListener(queues = "#{@commonDeadLetterQueue.getName()}", containerFactory = "rabbitListenerContainerFactory")
    public void processDeadLetterPublicMessage(org.springframework.amqp.core.Message message,
                                               @Header(value = "#{T(com.rs.cloud.business.mns.constant.MessageHeaderKey).CONTENT_TYPE_ID}",
                                                       required = false) String contentClassName) {
        log.info("MESSAGE ###: {}", message);
        log.info("contentClassName: {}", contentClassName);
    }
}
  1. 使用 RabbitListenerEndpointRegistry 动态注册监听
// ...
RabbitListenerEndpointRegistry.registerListenerContainer(endpoint, listenerContainerFactory, true);
// ...
  • 报错信息:如下图
    图片
  • 猜测可能原因:
    启动时,使用 AmqpConfig 中 rabbitmqSubscribePool() 返回的线程池启动了监听容器,后续因为引入了 dynamic-tp-spring-boot-starter-adapter-rabbitmq ,rabbitmqSubscribePool() 返回的线程池被 shutdown,被 ThreadPoolExecutorProxy 替代,但是项目启动时生成的监听使用的 connection 是 rabbitmqSubscribePool() 返回的线程池,而不是 ThreadPoolExecutorProxy

复现步骤

按照上述编写完代码后,手动调用RabbitListenerEndpointRegistry.registerListenerContainer(endpoint, listenerContainerFactory, true)

其他信息

@457352727 457352727 added the bug Something isn't working label Mar 18, 2024
@yanhom1314
Copy link
Collaborator

怎么感觉你使用的有些复杂了,connectionFactory的executor你就不要手动设置了,让adapter自动去创建Proxy替换,是不就可以了

@457352727
Copy link
Author

457352727 commented Mar 20, 2024

RabbitMqDtpAdapter 这个类判断 connectionFactory 中的 executor 不是 null 才会创建 Proxy 替换,但是在 2.1.9.RELEASE 版本的 Spring AMQP 中,如果不手动设置 connectionFactory 的 executor ,其默认就是 null

@yanhom1314 yanhom1314 added question Further information is requested and removed bug Something isn't working labels Jan 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants