Skip to content

Commit

Permalink
add mq gray plugin declare/service
Browse files Browse the repository at this point in the history
Signed-off-by: chengyouling <[email protected]>
  • Loading branch information
chengyouling committed Sep 4, 2024
1 parent a61202c commit 76ed33b
Show file tree
Hide file tree
Showing 21 changed files with 1,099 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.declarer;

import io.sermant.core.plugin.agent.declarer.PluginDeclarer;
import io.sermant.core.plugin.agent.declarer.SuperTypeDeclarer;

/**
* abstract declarer
*
* @author chengyouling
* @since 2024-05-27
**/
public abstract class MqAbstractDeclarer implements PluginDeclarer {
@Override
public SuperTypeDeclarer[] getSuperTypeDeclarers() {
return new SuperTypeDeclarer[0];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.declarer;

import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.interceptor.MqLitePullConsumerSubscribeInterceptor;

/**
* lite pull consumer build consumer client config declarer
*
* @author chengyouling
* @since 2024-05-27
**/
public class MqLitePullConsumerSubscribeDeclarer extends MqAbstractDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl";

private static final String METHOD_NAME = "subscribe";

private static final String METHOD_FETCH_NAME = "fetchMessageQueues";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(
MethodMatcher.nameEquals(METHOD_NAME).or(MethodMatcher.nameEquals(METHOD_FETCH_NAME)),
new MqLitePullConsumerSubscribeInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.declarer;

import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.interceptor.MqPullConsumerFetchInterceptor;

/**
* pull consumer build consumer client config declarer
*
* @author chengyouling
* @since 2024-05-27
**/
public class MqPullConsumerFetchDeclarer extends MqAbstractDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl";

private static final String METHOD_NAME_SUB = "fetchSubscribeMessageQueues";

private static final String METHOD_NAME_BALANCE = "fetchMessageQueuesInBalance";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME_BALANCE)
.or(MethodMatcher.nameEquals(METHOD_NAME_SUB)), new MqPullConsumerFetchInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.declarer;

import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.interceptor.MqPushConsumerSubscribeFetchInterceptor;

/**
* push consumer build consumer client config declarer
*
* @author chengyouling
* @since 2024-05-27
**/
public class MqPushConsumerSubscribeFetchDeclarer extends MqAbstractDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl";

private static final String METHOD_NAME_SUB = "fetchSubscribeMessageQueues";

private static final String METHOD_NAME_BALANCE = "subscribe";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME_BALANCE)
.or(MethodMatcher.nameEquals(METHOD_NAME_SUB)), new MqPushConsumerSubscribeFetchInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.declarer;

import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.interceptor.MqSchedulerRebuildSubscriptionInterceptor;

/**
* scheduler update SQL92 query message statement declarer
*
* @author chengyouling
* @since 2024-05-27
**/
public class MqSchedulerRebuildSubscriptionDeclarer extends MqAbstractDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.RebalanceImpl";

private static final String METHOD_NAME = "getSubscriptionInner";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME),
new MqSchedulerRebuildSubscriptionInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.declarer;

import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.interceptor.PullConsumerSubscriptionUpdateInterceptor;

/**
* pull consumer TAG/SQL92 query message statement build declarer
*
* @author chengyouling
* @since 2024-05-27
**/
public class PullConsumerSubscriptionUpdateDeclarer extends MqAbstractDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl";

private static final String METHOD_NAME = "getSubscriptionData";

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.nameEquals(METHOD_NAME),
new PullConsumerSubscriptionUpdateInterceptor())
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.sermant.mq.grayscale.declarer;

import io.sermant.core.plugin.agent.declarer.InterceptDeclarer;
import io.sermant.core.plugin.agent.matcher.ClassMatcher;
import io.sermant.core.plugin.agent.matcher.MethodMatcher;
import io.sermant.mq.grayscale.interceptor.MqProducerGrayMessageHookInterceptor;

/**
* SendMessageHook builder declarer
*
* @author chengyouling
* @since 2024-05-27
**/
public class RocketMqProducerGrayMessageHookDeclarer extends MqAbstractDeclarer {
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl";

private static final String[] METHOD_PARAM_TYPES = {
"org.apache.rocketmq.client.producer.DefaultMQProducer",
"org.apache.rocketmq.remoting.RPCHook"
};

@Override
public ClassMatcher getClassMatcher() {
return ClassMatcher.nameEquals(ENHANCE_CLASS);
}

@Override
public InterceptDeclarer[] getInterceptDeclarers(ClassLoader classLoader) {
return new InterceptDeclarer[]{
InterceptDeclarer.build(MethodMatcher.isConstructor()
.and(MethodMatcher.paramTypesEqual(METHOD_PARAM_TYPES)), new MqProducerGrayMessageHookInterceptor())
};
}
}
Loading

0 comments on commit 76ed33b

Please sign in to comment.