Skip to content

Commit

Permalink
[ISSUE#525]add aclClient PRCHook for message track
Browse files Browse the repository at this point in the history
  • Loading branch information
zongtanghu committed Dec 27, 2018
1 parent 094b5dc commit 2eef6eb
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
} else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
}
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean ms
} else {
tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
}
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.remoting.RPCHook;

/**
* Created by zongtanghu on 2018/11/6.
Expand All @@ -74,7 +75,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
private String dispatcherId = UUID.randomUUID().toString();
private String traceTopicName;

public AsyncArrayDispatcher(Properties properties) throws MQClientException {
public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException {
dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE);
int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"));
// queueSize is greater than or equal to the n power of 2 of value
Expand All @@ -92,7 +93,7 @@ public AsyncArrayDispatcher(Properties properties) throws MQClientException {
TimeUnit.MILLISECONDS, //
this.appenderQueue, //
new ThreadFactoryImpl("MQTraceSendThread_"));
traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties);
traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
}

public String getTraceTopicName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.remoting.RPCHook;

public class TrackTraceProducerFactory {

Expand All @@ -33,10 +34,10 @@ public class TrackTraceProducerFactory {
private static DefaultMQProducer traceProducer;


public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) {
public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) {
if (traceProducer == null) {

traceProducer = new DefaultMQProducer();
traceProducer = new DefaultMQProducer(rpcHook);
traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME);
traceProducer.setSendMsgTimeout(5000);
traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis())));
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@
<exclude>src/test/resources/certs/*</exclude>
<exclude>src/test/**/*.log</exclude>
<exclude>src/test/resources/META-INF/service/*</exclude>
<exclude>src/main/resources/META-INF/service/*</exclude>
<exclude>*/target/**</exclude>
<exclude>*/*.iml</exclude>
</excludes>
Expand Down

0 comments on commit 2eef6eb

Please sign in to comment.