Skip to content

Commit

Permalink
消息队列禁止消费插件统一RocketMq名称
Browse files Browse the repository at this point in the history
Signed-off-by: daizhenyu <[email protected]>
  • Loading branch information
daizhenyu committed Jan 26, 2024
1 parent 9cc6a63 commit 53b5f9e
Show file tree
Hide file tree
Showing 12 changed files with 46 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.huaweicloud.sermant.core.utils.ReflectUtils;
import com.huaweicloud.sermant.rocketmq.cache.RocketMqConsumerCache;
import com.huaweicloud.sermant.rocketmq.wrapper.DefaultLitePullConsumerWrapper;
import com.huaweicloud.sermant.utils.RocketmqWrapperUtils;
import com.huaweicloud.sermant.utils.RocketMqWrapperUtils;

import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue;
Expand Down Expand Up @@ -115,7 +115,7 @@ private static void suspendSubscriptiveConsumer(DefaultLitePullConsumerWrapper w
private static void suspendAssignedConsumer(DefaultLitePullConsumerWrapper wrapper) {
DefaultLitePullConsumerImpl pullConsumerImpl = wrapper.getPullConsumerImpl();
if (wrapper.getAssignedMessageQueue() == null) {
Optional<AssignedMessageQueue> assignedMessageQueueOptional = RocketmqWrapperUtils
Optional<AssignedMessageQueue> assignedMessageQueueOptional = RocketMqWrapperUtils
.getAssignedMessageQueue(pullConsumerImpl);
if (assignedMessageQueueOptional.isPresent()) {
wrapper.setAssignedMessageQueue(assignedMessageQueueOptional.get());
Expand Down Expand Up @@ -255,7 +255,7 @@ private static void resumeAssignedConsumer(DefaultLitePullConsumerWrapper wrappe
* @param pullConsumer pullConsumer实例
*/
public static void cachePullConsumer(DefaultLitePullConsumer pullConsumer) {
Optional<DefaultLitePullConsumerWrapper> pullConsumerWrapperOptional = RocketmqWrapperUtils
Optional<DefaultLitePullConsumerWrapper> pullConsumerWrapperOptional = RocketMqWrapperUtils
.wrapPullConsumer(pullConsumer);
if (pullConsumerWrapperOptional.isPresent()) {
RocketMqConsumerCache.PULL_CONSUMERS_CACHE.put(pullConsumer.hashCode(), pullConsumerWrapperOptional.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.huaweicloud.sermant.core.common.LoggerFactory;
import com.huaweicloud.sermant.rocketmq.cache.RocketMqConsumerCache;
import com.huaweicloud.sermant.rocketmq.wrapper.DefaultMqPushConsumerWrapper;
import com.huaweicloud.sermant.utils.RocketmqWrapperUtils;
import com.huaweicloud.sermant.utils.RocketMqWrapperUtils;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
Expand Down Expand Up @@ -105,7 +105,7 @@ private static void resumePushConsumer(DefaultMqPushConsumerWrapper wrapper) {
* @param pushConsumer pushConsumer实例
*/
public static void cachePushConsumer(DefaultMQPushConsumer pushConsumer) {
Optional<DefaultMqPushConsumerWrapper> pushConsumerWrapperOptional = RocketmqWrapperUtils
Optional<DefaultMqPushConsumerWrapper> pushConsumerWrapperOptional = RocketMqWrapperUtils
.wrapPushConsumer(pushConsumer);
if (pushConsumerWrapperOptional.isPresent()) {
RocketMqConsumerCache.PUSH_CONSUMERS_CACHE.put(pushConsumer.hashCode(), pushConsumerWrapperOptional.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
* @author daizhenyu
* @since 2023-12-14
**/
public class RocketmqWrapperUtils {
private RocketmqWrapperUtils() {
public class RocketMqWrapperUtils {
private RocketMqWrapperUtils() {
}

/**
Expand Down Expand Up @@ -105,7 +105,7 @@ public static Optional<DefaultMqPushConsumerWrapper> wrapPushConsumer(DefaultMQP
*/
public static Optional<AssignedMessageQueue> getAssignedMessageQueue(DefaultLitePullConsumerImpl pullConsumerImpl) {
// 设置插件类加载器的局部类加载器为宿主类加载器
((PluginClassLoader) RocketmqWrapperUtils.class.getClassLoader()).setLocalLoader(
((PluginClassLoader) RocketMqWrapperUtils.class.getClassLoader()).setLocalLoader(
pullConsumerImpl.getClass().getClassLoader());

Optional<Object> assignedMessageQueueOptional = ReflectUtils
Expand All @@ -116,7 +116,7 @@ public static Optional<AssignedMessageQueue> getAssignedMessageQueue(DefaultLite
}

// 移除插件类加载器的局部类加载器
((PluginClassLoader) RocketmqWrapperUtils.class.getClassLoader()).removeLocalLoader();
((PluginClassLoader) RocketMqWrapperUtils.class.getClassLoader()).removeLocalLoader();
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.huaweicloud.sermant.rocketmq.cache.RocketMqConsumerCache;
import com.huaweicloud.sermant.rocketmq.constant.SubscriptionType;
import com.huaweicloud.sermant.rocketmq.wrapper.DefaultLitePullConsumerWrapper;
import com.huaweicloud.sermant.utils.RocketmqWrapperUtils;
import com.huaweicloud.sermant.utils.RocketMqWrapperUtils;

import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
Expand Down Expand Up @@ -78,9 +78,9 @@ public static void setUp() throws MQClientException {
pullConsumerWrapper = new DefaultLitePullConsumerWrapper(pullConsumer, pullConsumerImpl, rebalance, instance);
pullConsumerWrapper.setConsumerGroup("test-group");

MockedStatic<RocketmqWrapperUtils> wrapperUtilsMockedStatic = Mockito
.mockStatic(RocketmqWrapperUtils.class);
wrapperUtilsMockedStatic.when(() -> RocketmqWrapperUtils.wrapPullConsumer(pullConsumer))
MockedStatic<RocketMqWrapperUtils> wrapperUtilsMockedStatic = Mockito
.mockStatic(RocketMqWrapperUtils.class);
wrapperUtilsMockedStatic.when(() -> RocketMqWrapperUtils.wrapPullConsumer(pullConsumer))
.thenReturn(Optional.of(pullConsumerWrapper));

prohibitionTopics = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.huaweicloud.sermant.core.plugin.config.ServiceMeta;
import com.huaweicloud.sermant.rocketmq.cache.RocketMqConsumerCache;
import com.huaweicloud.sermant.rocketmq.wrapper.DefaultMqPushConsumerWrapper;
import com.huaweicloud.sermant.utils.RocketmqWrapperUtils;
import com.huaweicloud.sermant.utils.RocketMqWrapperUtils;

import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
Expand Down Expand Up @@ -66,9 +66,9 @@ public static void setUp() throws MQClientException {
pushConsumerWrapper = new DefaultMqPushConsumerWrapper(pushConsumer, pushConsumerImpl, instance);
pushConsumerWrapper.setConsumerGroup("test-group");

MockedStatic<RocketmqWrapperUtils> wrapperUtilsMockedStatic = Mockito
.mockStatic(RocketmqWrapperUtils.class);
wrapperUtilsMockedStatic.when(() -> RocketmqWrapperUtils.wrapPushConsumer(pushConsumer))
MockedStatic<RocketMqWrapperUtils> wrapperUtilsMockedStatic = Mockito
.mockStatic(RocketMqWrapperUtils.class);
wrapperUtilsMockedStatic.when(() -> RocketMqWrapperUtils.wrapPushConsumer(pushConsumer))
.thenReturn(Optional.of(pushConsumerWrapper));

prohibitionTopics = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ public static void setUp() throws MQClientException {

@Test
public void testWrapPullConsumer() {
Optional<DefaultLitePullConsumerWrapper> pullConsumerWrapperOptional = RocketmqWrapperUtils
Optional<DefaultLitePullConsumerWrapper> pullConsumerWrapperOptional = RocketMqWrapperUtils
.wrapPullConsumer(pullConsumer);
Assert.assertTrue(pullConsumerWrapperOptional.isPresent());
Assert.assertEquals(pullConsumerWrapperOptional.get().getPullConsumer(), pullConsumer);
}

@Test
public void testWrapPushConsumer() {
Optional<DefaultMqPushConsumerWrapper> pushConsumerWrapperOptional = RocketmqWrapperUtils
Optional<DefaultMqPushConsumerWrapper> pushConsumerWrapperOptional = RocketMqWrapperUtils
.wrapPushConsumer(pushConsumer);
Assert.assertTrue(pushConsumerWrapperOptional.isPresent());
Assert.assertEquals(pushConsumerWrapperOptional.get().getPushConsumer(), pushConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,28 @@
import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.mq.prohibition.rocketmq.utils.RocketmqEnhancementHelper;
import com.huaweicloud.sermant.mq.prohibition.rocketmq.utils.RocketMqEnhancementHelper;

/**
* pullConsumer拦截声明器,支持rocketmq4.8+版本
*
* @author daizhenyu
* @since 2023-12-04
**/
public class RocketmqPullConsumerDeclarer extends AbstractPluginDeclarer {
public class RocketMqPullConsumerDeclarer extends AbstractPluginDeclarer {
@Override
public ClassMatcher getClassMatcher() {
return RocketmqEnhancementHelper.getPullConsumerClassMatcher();
return RocketMqEnhancementHelper.getPullConsumerClassMatcher();
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
RocketmqEnhancementHelper.getPullConsumerStartInterceptDeclarers(),
RocketmqEnhancementHelper.getPullConsumerSubscribeInterceptDeclarers(),
RocketmqEnhancementHelper.getPullConsumerUnsubscribeInterceptDeclarers(),
RocketmqEnhancementHelper.getPullConsumerAssignInterceptDeclarers(),
RocketmqEnhancementHelper.getPullConsumerShutdownInterceptDeclarers()
RocketMqEnhancementHelper.getPullConsumerStartInterceptDeclarers(),
RocketMqEnhancementHelper.getPullConsumerSubscribeInterceptDeclarers(),
RocketMqEnhancementHelper.getPullConsumerUnsubscribeInterceptDeclarers(),
RocketMqEnhancementHelper.getPullConsumerAssignInterceptDeclarers(),
RocketMqEnhancementHelper.getPullConsumerShutdownInterceptDeclarers()
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,27 @@
import com.huaweicloud.sermant.core.plugin.agent.declarer.AbstractPluginDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import com.huaweicloud.sermant.core.plugin.agent.matcher.ClassMatcher;
import com.huaweicloud.sermant.mq.prohibition.rocketmq.utils.RocketmqEnhancementHelper;
import com.huaweicloud.sermant.mq.prohibition.rocketmq.utils.RocketMqEnhancementHelper;

/**
* pushConsumer拦截声明器,支持rocketmq4.8+版本
*
* @author daizhenyu
* @since 2023-12-04
**/
public class RocketmqPushConsumerDeclarer extends AbstractPluginDeclarer {
public class RocketMqPushConsumerDeclarer extends AbstractPluginDeclarer {
@Override
public ClassMatcher getClassMatcher() {
return RocketmqEnhancementHelper.getPushConsumerClassMatcher();
return RocketMqEnhancementHelper.getPushConsumerClassMatcher();
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
RocketmqEnhancementHelper.getPushConsumerStartInterceptDeclarers(),
RocketmqEnhancementHelper.getPushConsumerSubscribeInterceptDeclarers(),
RocketmqEnhancementHelper.getPushConsumerUnsubscribeInterceptDeclarers(),
RocketmqEnhancementHelper.getPushConsumerShutdownInterceptDeclarers()
RocketMqEnhancementHelper.getPushConsumerStartInterceptDeclarers(),
RocketMqEnhancementHelper.getPushConsumerSubscribeInterceptDeclarers(),
RocketMqEnhancementHelper.getPushConsumerUnsubscribeInterceptDeclarers(),
RocketMqEnhancementHelper.getPushConsumerShutdownInterceptDeclarers()
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* @author daizhenyu
* @since 2023-12-13
**/
public class RocketmqEnhancementHelper {
public class RocketMqEnhancementHelper {
private static final String ENHANCE_PUSH_CONSUMER_CLASS =
"org.apache.rocketmq.client.consumer.DefaultMQPushConsumer";

Expand All @@ -53,7 +53,7 @@ public class RocketmqEnhancementHelper {

private static final String ASSIGN_METHOD_NAME = "assign";

private RocketmqEnhancementHelper() {
private RocketMqEnhancementHelper() {
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
com.huaweicloud.sermant.mq.prohibition.rocketmq.declarer.RocketmqPullConsumerDeclarer
com.huaweicloud.sermant.mq.prohibition.rocketmq.declarer.RocketmqPushConsumerDeclarer
com.huaweicloud.sermant.mq.prohibition.rocketmq.declarer.RocketMqPullConsumerDeclarer
com.huaweicloud.sermant.mq.prohibition.rocketmq.declarer.RocketMqPushConsumerDeclarer
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.huaweicloud.sermant.mq.prohibition.rocketmq.interceptor;

import com.huaweicloud.sermant.rocketmq.wrapper.DefaultLitePullConsumerWrapper;
import com.huaweicloud.sermant.utils.RocketmqWrapperUtils;
import com.huaweicloud.sermant.utils.RocketMqWrapperUtils;

import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
Expand Down Expand Up @@ -46,9 +46,9 @@ public class BasePullConsumerInterceptorTest {
public void before() throws MQClientException {
pullConsumer = new DefaultLitePullConsumer("test-group");
pullConsumerWrapper = createPullConsumerWrapper();
MockedStatic<RocketmqWrapperUtils> wrapperUtilsMockedStatic = Mockito
.mockStatic(RocketmqWrapperUtils.class);
wrapperUtilsMockedStatic.when(() -> RocketmqWrapperUtils.wrapPullConsumer(pullConsumer))
MockedStatic<RocketMqWrapperUtils> wrapperUtilsMockedStatic = Mockito
.mockStatic(RocketMqWrapperUtils.class);
wrapperUtilsMockedStatic.when(() -> RocketMqWrapperUtils.wrapPullConsumer(pullConsumer))
.thenReturn(Optional.of(pullConsumerWrapper));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.huaweicloud.sermant.mq.prohibition.rocketmq.interceptor;

import com.huaweicloud.sermant.rocketmq.wrapper.DefaultMqPushConsumerWrapper;
import com.huaweicloud.sermant.utils.RocketmqWrapperUtils;
import com.huaweicloud.sermant.utils.RocketMqWrapperUtils;

import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
Expand All @@ -44,9 +44,9 @@ public class BasePushConsumerInterceptorTest {
public void before() {
pushConsumer = new DefaultMQPushConsumer("test-group");
pushConsumerWrapper = createPushConsumerWrapper();
MockedStatic<RocketmqWrapperUtils> wrapperUtilsMockedStatic = Mockito
.mockStatic(RocketmqWrapperUtils.class);
wrapperUtilsMockedStatic.when(() -> RocketmqWrapperUtils.wrapPushConsumer(pushConsumer))
MockedStatic<RocketMqWrapperUtils> wrapperUtilsMockedStatic = Mockito
.mockStatic(RocketMqWrapperUtils.class);
wrapperUtilsMockedStatic.when(() -> RocketMqWrapperUtils.wrapPushConsumer(pushConsumer))
.thenReturn(Optional.of(pushConsumerWrapper));
}

Expand Down

0 comments on commit 53b5f9e

Please sign in to comment.