diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 179a80daa42..2149d675681 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -309,7 +309,7 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, } else { tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); } - AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); traceDispatcher = dispatcher; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 3c33d2eed53..22c676055fc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -178,7 +178,7 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean ms } else { tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); } - AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties); + AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java index 90b00d414c9..496d290f5cf 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java @@ -49,6 +49,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.remoting.RPCHook; /** * Created by zongtanghu on 2018/11/6. @@ -74,7 +75,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { private String dispatcherId = UUID.randomUUID().toString(); private String traceTopicName; - public AsyncArrayDispatcher(Properties properties) throws MQClientException { + public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException { dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE); int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048")); // queueSize is greater than or equal to the n power of 2 of value @@ -92,7 +93,7 @@ public AsyncArrayDispatcher(Properties properties) throws MQClientException { TimeUnit.MILLISECONDS, // this.appenderQueue, // new ThreadFactoryImpl("MQTraceSendThread_")); - traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties); + traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook); } public String getTraceTopicName() { diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java index 27447df7340..37c39a14e55 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java @@ -25,6 +25,7 @@ import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.remoting.RPCHook; public class TrackTraceProducerFactory { @@ -33,10 +34,10 @@ public class TrackTraceProducerFactory { private static DefaultMQProducer traceProducer; - public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) { + public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) { if (traceProducer == null) { - traceProducer = new DefaultMQProducer(); + traceProducer = new DefaultMQProducer(rpcHook); traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME); traceProducer.setSendMsgTimeout(5000); traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis()))); diff --git a/pom.xml b/pom.xml index c20b04cb366..1e3c4bcc859 100644 --- a/pom.xml +++ b/pom.xml @@ -259,6 +259,7 @@ src/test/resources/certs/* src/test/**/*.log src/test/resources/META-INF/service/* + src/main/resources/META-INF/service/* */target/** */*.iml