diff --git a/.github/actions/common/mq-grayscale/rocketmq/action.yml b/.github/actions/common/mq-grayscale/rocketmq/action.yml
new file mode 100644
index 0000000000..38453038d7
--- /dev/null
+++ b/.github/actions/common/mq-grayscale/rocketmq/action.yml
@@ -0,0 +1,83 @@
+name: "Rocketmq Message Gray Common operations"
+description: "Do something common for rocketmq message gray"
+runs:
+ using: "composite"
+ steps:
+ - name: Set up JDK ${{ env.javaVersion }}
+ uses: actions/setup-java@v4
+ with:
+ java-version: ${{ env.javaVersion }}
+ distribution: 'adopt'
+ cache: maven
+ - name: download agent
+ uses: actions/cache@v4
+ with:
+ path: sermant-agent-*/
+ key: ${{ runner.os }}-agent-${{ github.run_id }}
+ - name: get cse from cache
+ uses: actions/cache@v4
+ with:
+ path: Local-CSE-2.1.3-linux-amd64.zip
+ key: ${{ runner.os }}-local-cse
+ restore-keys: |
+ ${{ runner.os }}-local-cse
+ - name: start cse
+ shell: bash
+ run: |
+ export ROOT_PATH=$(pwd)
+ bash ./sermant-integration-tests/scripts/startCse.sh
+ - name: get rocketmq from cache
+ uses: actions/cache@v4
+ with:
+ path: rocketmq-all-4.8.0-bin-release.zip
+ key: ${{ runner.os }}-rocketmq-all-4.8.0-bin-release
+ restore-keys: |
+ ${{ runner.os }}-rocketmq-all-4.8.0-bin-release
+ - name: get rocketmq514 from cache
+ uses: actions/cache@v4
+ with:
+ path: rocketmq-all-5.1.4-bin-release.zip
+ key: ${{ runner.os }}-rocketmq-all-5.1.4-bin-release
+ restore-keys: |
+ ${{ runner.os }}-rocketmq-all-5.1.4-bin-release
+ - name: start rocketmq server
+ shell: bash
+ if: matrix.rocketMqClientVersion != '5.0.0' || (matrix.rocketMqClientVersion == '5.0.0' && matrix.test-model != 'PLUGIN_ENABLED_FALSE_LITE_PULL' && matrix.test-model != 'CONSUMER_BASE_ONLY_LITE_PULL' && matrix.test-model != 'CONSUMER_BASE_GRAY_LITE_PULL')
+ run: |
+ unzip rocketmq-all-4.8.0-bin-release.zip
+ sed -i 's/if \[\[ "$JAVA_MAJOR_VERSION" -lt "9" \]\]/if [ "$JAVA_MAJOR_VERSION" -lt "9" ]/g' rocketmq-all-4.8.0-bin-release/bin/runserver.sh
+ sed -i 's/-Xms4g -Xmx4g -Xmn2g/-Xms1g -Xmx1g -Xmn1g/g' rocketmq-all-4.8.0-bin-release/bin/runserver.sh
+ sed -i '22i enablePropertyFilter = true' rocketmq-all-4.8.0-bin-release/conf/broker.conf
+ nohup bash rocketmq-all-4.8.0-bin-release/bin/mqnamesrv &
+ - name: start rocketmq broker
+ shell: bash
+ if: matrix.rocketMqClientVersion != '5.0.0' || (matrix.rocketMqClientVersion == '5.0.0' && matrix.test-model != 'PLUGIN_ENABLED_FALSE_LITE_PULL' && matrix.test-model != 'CONSUMER_BASE_ONLY_LITE_PULL' && matrix.test-model != 'CONSUMER_BASE_GRAY_LITE_PULL')
+ run: |
+ sed -i 's/-Xms8g -Xmx8g -Xmn4g/-Xms1g -Xmx1g -Xmn1g/g' rocketmq-all-4.8.0-bin-release/bin/runbroker.sh
+ nohup bash rocketmq-all-4.8.0-bin-release/bin/mqbroker -n localhost:9876 -c rocketmq-all-4.8.0-bin-release/conf/broker.conf &
+ - name: start rocketmq514 server
+ shell: bash
+ if: (matrix.test-model == 'PLUGIN_ENABLED_FALSE_LITE_PULL' || matrix.test-model == 'CONSUMER_BASE_ONLY_LITE_PULL' || matrix.test-model == 'CONSUMER_BASE_GRAY_LITE_PULL') && matrix.rocketMqClientVersion == '5.0.0'
+ run: |
+ unzip rocketmq-all-5.1.4-bin-release.zip
+ sed -i 's/if \[\[ "$JAVA_MAJOR_VERSION" -lt "9" \]\]/if [ "$JAVA_MAJOR_VERSION" -lt "9" ]/g' rocketmq-all-5.1.4-bin-release/bin/runserver.sh
+ sed -i 's/-Xms4g -Xmx4g -Xmn2g/-Xms1g -Xmx1g -Xmn1g/g' rocketmq-all-5.1.4-bin-release/bin/runserver.sh
+ sed -i '22i enablePropertyFilter = true' rocketmq-all-5.1.4-bin-release/conf/broker.conf
+ nohup bash rocketmq-all-5.1.4-bin-release/bin/mqnamesrv &
+ - name: start rocketmq514 broker
+ shell: bash
+ if: (matrix.test-model == 'PLUGIN_ENABLED_FALSE_LITE_PULL' || matrix.test-model == 'CONSUMER_BASE_ONLY_LITE_PULL' || matrix.test-model == 'CONSUMER_BASE_GRAY_LITE_PULL') && matrix.rocketMqClientVersion == '5.0.0'
+ run: |
+ sed -i 's/-Xms8g -Xmx8g -Xmn4g/-Xms1g -Xmx1g -Xmn1g/g' rocketmq-all-5.1.4-bin-release/bin/runbroker.sh
+ nohup bash rocketmq-all-5.1.4-bin-release/bin/mqbroker -n localhost:9876 -c rocketmq-all-5.1.4-bin-release/conf/broker.conf &
+ - name: cache dependencies
+ uses: actions/cache@v4
+ with:
+ path: ~/.m2/repository
+ key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ ${{ runner.os }}-maven-
+ - name: entry
+ uses: ./.github/actions/common/entry
+ with:
+ log-dir: ./logs/rocketmq-grayscale/common
diff --git a/.github/actions/common/plugin-change-check/action.yml b/.github/actions/common/plugin-change-check/action.yml
index 39cd81b572..cf88e6842f 100644
--- a/.github/actions/common/plugin-change-check/action.yml
+++ b/.github/actions/common/plugin-change-check/action.yml
@@ -494,6 +494,22 @@ runs:
shell: bash
run: |
echo "crossthreadTagTransmissionChanged=${{ steps.changed-tag-transmission-crossthread.outputs.changed }}" >> $GITHUB_ENV
+ - uses: ktamas77/has-changed-path@v1.0.3
+ id: changed-mq-grayscale-rocketmq
+ with:
+ paths: sermant-plugins/sermant-mq-grayscale/mq-config-common
+ sermant-plugins/sermant-mq-grayscale/mq-config-service
+ sermant-plugins/sermant-mq-grayscale/mq-grayscale-rocketmq-plugin
+ sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo
+ sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo
+ sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test
+ ./.github/workflows/message_gray_integration_test.yml
+ ./.github/actions/common/mq-grayscale/rocketmq
+ ./.github/actions/scenarios/mq-grayscale/rocketmq
+ - name: env mq-grayscale-rocketmq
+ shell: bash
+ run: |
+ echo "mqGrayscaleRocketMqChanged=${{ steps.changed-mq-grayscale-rocketmq.outputs.changed }}" >> $GITHUB_ENV
- uses: ktamas77/has-changed-path@v1.0.3
id: changed-dubbo-router-action
with:
@@ -924,3 +940,10 @@ runs:
${{ steps.changed-common-action.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' ];then
echo "enableXdsServicAction=true" >> $GITHUB_ENV
fi
+
+ # ==========mq grayscale rocketmq is needed to test?==========
+ if [ ${{ env.mqGrayscaleRocketMqChanged }} == 'true' -o \
+ ${{ env.sermantAgentCoreChanged }} == 'true' -o \
+ ${{ steps.changed-common-action.outputs.changed }} == 'true' -o ${{ env.triggerPushEvent }} == 'true' ];then
+ echo "enableMqGrayscaleRocketMqAction=true" >> $GITHUB_ENV
+ fi
diff --git a/.github/actions/scenarios/mq-grayscale/rocketmq/action.yml b/.github/actions/scenarios/mq-grayscale/rocketmq/action.yml
new file mode 100644
index 0000000000..f9f099c92d
--- /dev/null
+++ b/.github/actions/scenarios/mq-grayscale/rocketmq/action.yml
@@ -0,0 +1,86 @@
+name: "RocketMq Grayscale Test"
+description: "Auto test for rocketMq grayscale"
+runs:
+ using: "composite"
+ steps:
+ - name: package rocketmq grayscale tests
+ shell: bash
+ run: mvn package -Drocketmq-client.version=${{ matrix.rocketMqClientVersion }} -DskipTests --file sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml
+ - name: echo test model
+ shell: bash
+ run: |
+ echo "=======test-model======"-${{ matrix.test-model }}
+ - name: start base producer service
+ shell: bash
+ env:
+ dynamic.config.dynamicConfigType: KIE
+ dynamic.config.serverAddress: 127.0.0.1:30110
+ SERVICE_META_ENVIRONMENT: development
+ run: |
+ nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-producer -jar \
+ sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/target/grayscale-rocketmq-producer-demo.jar > ${{ env.logDir }}/rocketmq-producer-base.log 2>&1 &
+ - name: start gray producer service
+ shell: bash
+ env:
+ dynamic.config.dynamicConfigType: KIE
+ dynamic.config.serverAddress: 127.0.0.1:30110
+ SERVICE_META_ENVIRONMENT: development
+ SERVICE_META_VERSION: 1.0.1
+ run: |
+ nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-producer -Dserver.port=9040 -jar \
+ sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/target/grayscale-rocketmq-producer-demo.jar > ${{ env.logDir }}/rocketmq-producer-gray.log 2>&1 &
+ - name: start base consumer service
+ shell: bash
+ env:
+ dynamic.config.dynamicConfigType: KIE
+ dynamic.config.serverAddress: 127.0.0.1:30110
+ SERVICE_META_ENVIRONMENT: development
+ run: |
+ nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-consumer -jar \
+ sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/target/grayscale-rocketmq-consumer-demo.jar > ${{ env.logDir }}/rocketmq-consumer-base.log 2>&1 &
+ - name: start gray consumer service
+ shell: bash
+ if: matrix.test-model != 'CONSUMER_BASE_ONLY_PULL' && matrix.test-model != 'CONSUMER_BASE_ONLY_LITE_PULL' && matrix.test-model != 'CONSUMER_BASE_ONLY_PUSH'
+ env:
+ dynamic.config.dynamicConfigType: KIE
+ dynamic.config.serverAddress: 127.0.0.1:30110
+ SERVICE_META_ENVIRONMENT: development
+ SERVICE_META_VERSION: 1.0.1
+ run: |
+ nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=grayscale-rocketmq-consumer -Dserver.port=9010 -jar \
+ sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/target/grayscale-rocketmq-consumer-demo.jar > ${{ env.logDir }}/rocketmq-consumer-gray.log 2>&1 &
+ - name: waiting for service start for only base
+ shell: bash
+ if: matrix.test-model == 'CONSUMER_BASE_ONLY_PULL' || matrix.test-model == 'CONSUMER_BASE_ONLY_LITE_PULL' || matrix.test-model == 'CONSUMER_BASE_ONLY_PUSH'
+ run: |
+ bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9030/actuator/health 200
+ bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9040/actuator/health 200
+ bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9000/actuator/health 200
+ - name: waiting for service start for base and gray
+ shell: bash
+ if: matrix.test-model != 'CONSUMER_BASE_ONLY_PULL' && matrix.test-model != 'CONSUMER_BASE_ONLY_LITE_PULL' && matrix.test-model != 'CONSUMER_BASE_ONLY_PUSH'
+ run: |
+ bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9030/actuator/health 200
+ bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9040/actuator/health 200
+ bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9000/actuator/health 200
+ bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:9010/actuator/health 200
+ - name: test message grayscale rocketmq
+ shell: bash
+ run: |
+ mvn test -Dgrayscale.rocketmq.integration.test.type=${{ matrix.test-model }} -Dtest.version=${{ matrix.rocketMqClientVersion }} --file \
+ sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml
+ - name: exit
+ if: always()
+ uses: ./.github/actions/common/exit
+ with:
+ processor-keyword: grayscale-rocketmq
+ - name: if failure then upload error log
+ uses: actions/upload-artifact@v3
+ if: ${{ failure() || cancelled() }}
+ with:
+ name: (test-for-grayscale-rocketmq)-(${{ matrix.test-model }}})-logs
+ path: |
+ ./*.log
+ ./logs/**/*.log
+ if-no-files-found: warn
+ retention-days: 2
diff --git a/.github/workflows/message_gray_integration_test.yml b/.github/workflows/message_gray_integration_test.yml
new file mode 100644
index 0000000000..ff0e19b81b
--- /dev/null
+++ b/.github/workflows/message_gray_integration_test.yml
@@ -0,0 +1,133 @@
+name: Message gray integration test
+env:
+ sermantVersion: 1.0.0
+on:
+ push:
+ pull_request:
+ branches:
+ - '*'
+ paths:
+ - 'sermant-agentcore/**'
+ - 'sermant-integration-tests/mq-grayscale-rocketmq-test/**'
+ - 'sermant-plugins/sermant-mq-grayscale/**'
+ - '.github/workflows/message_gray_integration_test.yml'
+ - '.github/actions/common/mq-grayscale/rocketmq/**'
+ - '.github/actions/scenarios/mq-grayscale/rocketmq/**'
+ - '.github/actions/common/plugin-change-check/action.yml'
+ - '.github/actions/common/entry/action.yml'
+ - '.github/actions/common/exit/action.yml'
+concurrency:
+ group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}-${{ github.head_ref }}
+ cancel-in-progress: true
+jobs:
+ set-execution-conditions:
+ name: set-execution-conditions
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ with:
+ fetch-depth: 100
+ - name: plugin-change-check
+ id: plugin-change-check
+ uses: ./.github/actions/common/plugin-change-check
+ - name: set-outputs
+ id: set-outputs
+ run: |
+ echo "enableMqGrayscaleRocketMqAction=${{env.enableMqGrayscaleRocketMqAction}}" >> $GITHUB_OUTPUT
+ outputs:
+ enableMqGrayscaleRocketMqAction: ${{ steps.set-outputs.outputs.enableMqGrayscaleRocketMqAction }}
+ download-midwares-and-cache:
+ name: download midwares and cache
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - name: cache local cse
+ uses: actions/cache@v3
+ with:
+ path: Local-CSE-2.1.3-linux-amd64.zip
+ key: ${{ runner.os }}-local-cse
+ restore-keys: |
+ ${{ runner.os }}-local-cse
+ - name: download cse
+ run: |
+ export ROOT_PATH=$(pwd)
+ bash ./sermant-integration-tests/scripts/tryDownloadMidware.sh cse
+ - name: cache rocketmq
+ uses: actions/cache@v3
+ with:
+ path: rocketmq-all-4.8.0-bin-release.zip
+ key: ${{ runner.os }}-rocketmq-all-4.8.0-bin-release
+ restore-keys: |
+ ${{ runner.os }}-rocketmq-all-4.8.0-bin-release
+ - name: download rocketmq
+ run: |
+ export ROOT_PATH=$(pwd)
+ bash ./sermant-integration-tests/scripts/tryDownloadMidware.sh rocketmq
+ - name: cache rocketmq514
+ uses: actions/cache@v3
+ with:
+ path: rocketmq-all-5.1.4-bin-release.zip
+ key: ${{ runner.os }}-rocketmq-all-5.1.4-bin-release
+ restore-keys: |
+ ${{ runner.os }}-rocketmq-all-5.1.4-bin-release
+ - name: download rocketmq
+ run: |
+ export ROOT_PATH=$(pwd)
+ bash ./sermant-integration-tests/scripts/tryDownloadMidware.sh rocketmq514
+ build-agent-and-cache:
+ name: build agent and cache
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v3
+ - name: Set up JDK 8
+ uses: actions/setup-java@v3
+ with:
+ java-version: '8'
+ distribution: 'adopt'
+ cache: maven
+ - name: cache agent
+ uses: actions/cache@v3
+ with:
+ path: sermant-agent-*/
+ key: ${{ runner.os }}-agent-${{ github.run_id }}
+ - name: package agent
+ run: |
+ sed -i '/sermant-backend/d' pom.xml
+ sed -i '/sermant-integration-tests/d' pom.xml
+ sed -i '/sermant-injector/d' pom.xml
+ sed -i '/sermant-flowcontrol/d' sermant-plugins/pom.xml
+ sed -i '/sermant-database-write-prohibition/d' sermant-plugins/pom.xml
+ sed -i '/sermant-spring-beans-deal/d' sermant-plugins/pom.xml
+ sed -i '/sermant-service-removal/d' sermant-plugins/pom.xml
+ sed -i '/sermant-service-visibility/d' sermant-plugins/pom.xml
+ sed -i '/sermant-monitor/d' sermant-plugins/pom.xml
+ sed -i '/sermant-mq-consume-prohibition/d' sermant-plugins/pom.xml
+ sed -i '/sermant-springboot-registry/d' sermant-plugins/pom.xml
+ sed -i '/sermant-service-registry/d' sermant-plugins/pom.xml
+ sed -i '/sermant-dynamic-config/d' sermant-plugins/pom.xml
+ sed -i '/sermant-router/d' sermant-plugins/pom.xml
+ sed -i '/sermant-loadbalancer/d' sermant-plugins/pom.xml
+ mvn package -DskipTests -Ptest --file pom.xml
+ test-for-grayscale-rocketmq:
+ name: Test for grayscale rocketmq
+ runs-on: ubuntu-latest
+ if: needs.set-execution-conditions.outputs.enableMqGrayscaleRocketMqAction=='true'
+ needs: [set-execution-conditions, build-agent-and-cache, download-midwares-and-cache]
+ strategy:
+ matrix:
+ test-model: [ 'PLUGIN_ENABLED_FALSE_PULL','PLUGIN_ENABLED_FALSE_LITE_PULL','PLUGIN_ENABLED_FALSE_PUSH',
+ 'CONSUMER_BASE_ONLY_PULL','CONSUMER_BASE_ONLY_LITE_PULL','CONSUMER_BASE_ONLY_PUSH',
+ 'CONSUMER_BASE_GRAY_PULL','CONSUMER_BASE_GRAY_LITE_PULL','CONSUMER_BASE_GRAY_PUSH' ]
+ rocketMqClientVersion: ["4.8.0", "4.9.0", "4.9.1", "4.9.2", "4.9.3", "4.9.4", "4.9.5", "4.9.6", "4.9.7", "4.9.8", "5.0.0"]
+ fail-fast: false
+ steps:
+ - uses: actions/checkout@v3
+ with:
+ fetch-depth: 100
+ - name: set java version to environment
+ run: |
+ echo "javaVersion=8" >> $GITHUB_ENV
+ - name: common operations
+ uses: ./.github/actions/common/mq-grayscale/rocketmq
+ - name: message gray test for test-model=${{ matrix.test-model }} rocketMqClientVersion=${{ matrix.rocketMqClientVersion }}
+ uses: ./.github/actions/scenarios/mq-grayscale/rocketmq
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/pom.xml
new file mode 100644
index 0000000000..0e92b41e45
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/pom.xml
@@ -0,0 +1,49 @@
+
+
+
+ mq-grayscale-rocketmq-test
+ io.sermant.integration
+ 1.0.0
+
+ 4.0.0
+
+ grayscale-rocketmq-consumer-demo
+ 1.0.0
+
+
+ 8
+ 8
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.apache.rocketmq
+ rocketmq-client
+
+
+ org.apache.rocketmq
+ rocketmq-common
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+
+
+ ${project.artifactId}
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerApplication.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerApplication.java
new file mode 100644
index 0000000000..db4b9ae9a9
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerApplication.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.consumer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * springboot starter
+ *
+ * @author chengyouling
+ * @since 2024-10-30
+ **/
+@SpringBootApplication
+public class RocketMqConsumerApplication {
+ /**
+ * main
+ *
+ * @param args parameters
+ */
+ public static void main(String[] args) {
+ SpringApplication.run(RocketMqConsumerApplication.class, args);
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerController.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerController.java
new file mode 100644
index 0000000000..21f0978260
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqConsumerController.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.consumer;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Map;
+
+/**
+ * consumer controller
+ *
+ * @author chengyouling
+ * @since 2024-10-30
+ **/
+@RestController
+public class RocketMqConsumerController {
+ @Value("${rocketmq.address}")
+ private String nameServer;
+
+ @Value("${rocketmq.topic}")
+ private String topic;
+
+ @Autowired
+ private RocketMqPullConsumer pullConsumer;
+
+ @Autowired
+ private RocketMqPushConsumer pushConsumer;
+
+ @Autowired
+ private RocketMqLitePullConsumer litePullConsumer;
+
+ /**
+ * clear cache count info
+ *
+ * @return is success
+ */
+ @GetMapping("/clearMessageCount")
+ public String clearMessageCount() {
+ RocketMqMessageUtils.clearMessageCount();
+ return "success";
+ }
+
+ /**
+ * init push consumer
+ *
+ * @param consumerType consumerType
+ * @return init status
+ */
+ @GetMapping("/initConsumer")
+ public String initConsumer(@RequestParam("consumerType") String consumerType) {
+ if ("PUSH".equals(consumerType)) {
+ pushConsumer.initPushConsume(topic + "-PUSH", nameServer);
+ } else if ("PULL".equals(consumerType)) {
+ pullConsumer.initPullConsumer(nameServer);
+ } else {
+ litePullConsumer.initLitePullConsumer(topic + "-LITE-PULL", nameServer);
+ }
+ return "success";
+ }
+
+ /**
+ * pull message
+ *
+ * @return message count
+ */
+ @GetMapping("/getPullConsumerMessageCount")
+ public Map getPullConsumerMessageCount() {
+ return pullConsumer.getMessageCount(topic + "-PULL");
+ }
+
+ /**
+ * get push consumer message count
+ *
+ * @return message count
+ */
+ @GetMapping("/getPushConsumerMessageCount")
+ public Map getPushConsumerMessageCount() {
+ return pushConsumer.getMessageCount();
+ }
+
+ /**
+ * get lite pull consumer message count
+ *
+ * @return message count
+ */
+ @GetMapping("/getLitePullConsumerMessageCount")
+ public Map getLitePullConsumerMessageCount() {
+ return litePullConsumer.getMessageCount();
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqLitePullConsumer.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqLitePullConsumer.java
new file mode 100644
index 0000000000..884fc43894
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqLitePullConsumer.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.consumer;
+
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * litePull consumer
+ *
+ * @author chengyouling
+ * @since 2024-11-30
+ **/
+@Component
+public class RocketMqLitePullConsumer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqLitePullConsumer.class);
+
+ private DefaultLitePullConsumer litePullConsumer;
+
+ private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+
+ /**
+ * init lite pull consumer
+ *
+ * @param mqTopic topic
+ * @param mqAddress address
+ */
+ public void initLitePullConsumer(String mqTopic, String mqAddress) {
+ if (litePullConsumer == null) {
+ try {
+ litePullConsumer = new DefaultLitePullConsumer("default");
+ litePullConsumer.setNamesrvAddr(mqAddress);
+ litePullConsumer.subscribe(mqTopic, "*");
+ litePullConsumer.start();
+ executorService.scheduleWithFixedDelay(new LitePullRunnable(litePullConsumer), 0, 2,
+ TimeUnit.SECONDS);
+ } catch (MQClientException e) {
+ LOGGER.error("init lite pull consumer error!", e);
+ }
+ }
+ }
+
+ /**
+ * get lite pull message count
+ *
+ * @return message count
+ */
+ public Map getMessageCount() {
+ LOGGER.info(System.currentTimeMillis() + "lite pull getMessageCount");
+ return RocketMqMessageUtils.getMessageCount();
+ }
+
+ static class LitePullRunnable implements Runnable {
+ private final DefaultLitePullConsumer litePullConsumer;
+
+ public LitePullRunnable(DefaultLitePullConsumer litePullConsumer) {
+ this.litePullConsumer = litePullConsumer;
+ }
+
+ @Override
+ public void run() {
+ List messageExts = litePullConsumer.poll();
+ LOGGER.info(System.currentTimeMillis() + "lite pull messages:" + messageExts.size());
+ messageExts.forEach(RocketMqMessageUtils::convertMessageCount);
+ }
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqMessageUtils.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqMessageUtils.java
new file mode 100644
index 0000000000..13bc301e54
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqMessageUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * message count utils
+ *
+ * @author chengyouling
+ * @since 2024-10-30
+ */
+public class RocketMqMessageUtils {
+ private static final String GRAY = "gray";
+
+ private static int grayMessageCount = 0;
+
+ private static int baseMessageCount = 0;
+
+ private RocketMqMessageUtils() {
+ }
+
+ /**
+ * convert base/gray message
+ *
+ * @param messageExt MessageExt
+ */
+ public static void convertMessageCount(MessageExt messageExt) {
+ if (messageExt.getProperties() != null && GRAY.equals(messageExt.getProperties().get(
+ "x_lane_canary"))) {
+ grayMessageCount++;
+ return;
+ }
+ baseMessageCount++;
+ }
+
+ /**
+ * get base/gray message count
+ *
+ * @return message count
+ */
+ public static Map getMessageCount() {
+ Map countMap = new HashMap<>();
+ countMap.put("baseMessageCount", baseMessageCount);
+ countMap.put("grayMessageCount", grayMessageCount);
+ return countMap;
+ }
+
+ /**
+ * clear cache count info
+ */
+ public static void clearMessageCount() {
+ baseMessageCount = 0;
+ grayMessageCount = 0;
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPullConsumer.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPullConsumer.java
new file mode 100644
index 0000000000..8ff1d39a60
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPullConsumer.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.consumer;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * pull consumer
+ *
+ * @author chengyouling
+ * @since 2024-11-30
+ **/
+@Component
+public class RocketMqPullConsumer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqPullConsumer.class);
+
+ private DefaultMQPullConsumer pullConsumer;
+
+ private final Map offSetTable = new HashMap<>();
+
+ private final int maxReconsumeTimes = 10000;
+
+ private final int maxNums = 32;
+
+ /**
+ * init pull consumer
+ *
+ * @param mqAddress address
+ */
+ public void initPullConsumer(String mqAddress) {
+ try {
+ if (pullConsumer == null) {
+ pullConsumer = new DefaultMQPullConsumer("default");
+ pullConsumer.setNamesrvAddr(mqAddress);
+ pullConsumer.setMaxReconsumeTimes(maxReconsumeTimes);
+ pullConsumer.start();
+ }
+ } catch (MQClientException e) {
+ LOGGER.error("init pull consumer error!", e);
+ }
+ }
+
+ /**
+ * get pull message count
+ *
+ * @param topic topic
+ * @return message count
+ */
+ public Map getMessageCount(String topic) {
+ try {
+ Set messageQueues = pullConsumer.fetchSubscribeMessageQueues(topic);
+ if (messageQueues.isEmpty()) {
+ return new HashMap<>();
+ }
+ for (MessageQueue mq : messageQueues) {
+ PullResultExt pullResult = (PullResultExt) pullConsumer.pullBlockIfNotFound(mq, null,
+ getMessageQueueOffSet(mq), maxNums);
+ putMessageQueueOffSet(mq, pullResult);
+ if (pullResult.getPullStatus() == PullStatus.FOUND) {
+ List messageExts = pullResult.getMsgFoundList();
+ for (MessageExt messageExt: messageExts) {
+ RocketMqMessageUtils.convertMessageCount(messageExt);
+ }
+ }
+ }
+ return RocketMqMessageUtils.getMessageCount();
+ } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+ LOGGER.error("consume pull message error!", e);
+ return new HashMap<>();
+ }
+ }
+
+ private void putMessageQueueOffSet(MessageQueue mq, PullResultExt pullResult) {
+ offSetTable.put(mq, pullResult.getNextBeginOffset());
+ }
+
+ private long getMessageQueueOffSet(MessageQueue mq) {
+ return offSetTable.getOrDefault(mq, 0L);
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPushConsumer.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPushConsumer.java
new file mode 100644
index 0000000000..ebf371855e
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/consumer/RocketMqPushConsumer.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.consumer;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+/**
+ * push consumer
+ *
+ * @author chengyouling
+ * @since 2024-11-30
+ **/
+@Component
+public class RocketMqPushConsumer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqPushConsumer.class);
+
+ private DefaultMQPushConsumer pushConsumer;
+
+ /**
+ * init push consumer
+ *
+ * @param mqTopic topic
+ * @param mqAddress address
+ */
+ public void initPushConsume(String mqTopic, String mqAddress) {
+ try {
+ if (pushConsumer == null) {
+ pushConsumer = new DefaultMQPushConsumer("default");
+ pushConsumer.setNamesrvAddr(mqAddress);
+ pushConsumer.subscribe(mqTopic, "*");
+ pushConsumer.registerMessageListener((MessageListenerConcurrently)(messages, context) -> {
+ for (MessageExt messageExt : messages) {
+ RocketMqMessageUtils.convertMessageCount(messageExt);
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ });
+ pushConsumer.start();
+ }
+ } catch (MQClientException e) {
+ LOGGER.error("init push consumer error!", e);
+ }
+ }
+
+ /**
+ * get push message count
+ *
+ * @return message count
+ */
+ public Map getMessageCount() {
+ return RocketMqMessageUtils.getMessageCount();
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/resources/application.yaml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/resources/application.yaml
new file mode 100644
index 0000000000..2c81926171
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-consumer-demo/src/main/resources/application.yaml
@@ -0,0 +1,10 @@
+server:
+ port: 9000
+rocketmq:
+ address: 127.0.0.1:9876
+ topic: MESSAGE-GRAY
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "*"
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml
new file mode 100644
index 0000000000..17cca01e74
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/pom.xml
@@ -0,0 +1,40 @@
+
+
+
+ mq-grayscale-rocketmq-test
+ io.sermant.integration
+ 1.0.0
+
+ 4.0.0
+
+ grayscale-rocketmq-integration-test
+ 1.0.0
+
+
+ 8
+ 8
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
+
+ org.springframework
+ spring-web
+ 5.3.20
+ test
+
+
+ com.alibaba
+ fastjson
+ 2.0.9
+ test
+
+
+
+
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/GrayscaleRocketmqTest.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/GrayscaleRocketmqTest.java
new file mode 100644
index 0000000000..9cfc8b9278
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/GrayscaleRocketmqTest.java
@@ -0,0 +1,482 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.integration;
+
+import io.sermant.demo.grayscale.rocketmq.integration.support.KieClient;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.concurrent.TimeUnit;
+
+import com.alibaba.fastjson.JSON;
+
+/**
+ * test method class
+ *
+ * @author chengyouling
+ * @since 2024-10-30
+ */
+public class GrayscaleRocketmqTest {
+ private static final RestTemplate restTemplate = new RestTemplate();
+
+ private static final KieClient kieClient = new KieClient(restTemplate);
+
+ private static final String CONFIG_KEY = "grayscale.mq.config";
+
+ private static final String CONSUMER_TYPE_PULL = "PULL";
+
+ private static final String CONSUMER_TYPE_LITE_PULL = "LITE-PULL";
+
+ private static final String CONSUMER_TYPE_PUSH = "PUSH";
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "PLUGIN_ENABLED_FALSE_PULL")
+ public void testPluginEnabledFalsePullConsumeMessage() throws InterruptedException {
+ initAndProduceMessage(true, CONSUMER_TYPE_PULL);
+ Thread.sleep(5000);
+ int baseBaseCount = parseBaseMessageCount(getPullBaseResult());
+ int grayBaseCount = parseBaseMessageCount(getPullGrayResult());
+ Assertions.assertTrue(baseBaseCount == 2 && grayBaseCount == 2);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches =
+ "PLUGIN_ENABLED_FALSE_LITE_PULL")
+ public void testPluginEnabledFalseLitePullConsumeMessage() throws InterruptedException {
+ String clientVersion = System.getProperty("test.version");
+ System.out.println("===========clientVersion========" + clientVersion);
+ if ("5.0.0".equals(clientVersion)) {
+ initAndProduceLitePullMessage(true);
+ } else {
+ initAndProduceMessage(true, CONSUMER_TYPE_LITE_PULL);
+ }
+ Thread.sleep(60000);
+ int baseBaseCount = parseBaseMessageCount(getLitePullBaseResult());
+ int grayBaseCount = parseBaseMessageCount(getLitePullGrayResult());
+ Assertions.assertEquals(2, baseBaseCount + grayBaseCount);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "PLUGIN_ENABLED_FALSE_PUSH")
+ public void testPluginEnabledFalsePushConsumeMessage() throws InterruptedException {
+ initAndProduceMessage(true, CONSUMER_TYPE_PUSH);
+ Thread.sleep(60000);
+ int baseBaseCount = parseBaseMessageCount(getPushBaseResult());
+ int grayBaseCount = parseBaseMessageCount(getPushGrayResult());
+ Assertions.assertEquals(2, baseBaseCount + grayBaseCount);
+ }
+
+ private int parseBaseMessageCount(String messageCount) {
+ return (int) JSON.parseObject(messageCount).get("baseMessageCount");
+ }
+
+ private int parseGrayMessageCount(String messageCount) {
+ return (int) JSON.parseObject(messageCount).get("grayMessageCount");
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY_PULL")
+ public void testAutoOnlyBasePullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("AUTO", "");
+ initAndProduceMessage(false, CONSUMER_TYPE_PULL);
+ Thread.sleep(5000);
+ String baseResult = getPullBaseResult();
+ int baseBaseCount = parseBaseMessageCount(baseResult);
+ int baseGrayCount = parseGrayMessageCount(baseResult);
+ Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches =
+ "CONSUMER_BASE_ONLY_LITE_PULL")
+ public void testAutoOnlyBaseLitePullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("AUTO", "");
+ String clientVersion = System.getProperty("test.version");
+ System.out.println("===========clientVersion========" + clientVersion);
+ if ("5.0.0".equals(clientVersion)) {
+ initAndProduceLitePullMessage(false);
+ } else {
+ initAndProduceMessage(false, CONSUMER_TYPE_LITE_PULL);
+ }
+ Thread.sleep(60000);
+ String litePullBaseResult = getLitePullBaseResult();
+ int baseBaseCount = parseBaseMessageCount(litePullBaseResult);
+ int baseGrayCount = parseGrayMessageCount(litePullBaseResult);
+ Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY_PUSH")
+ public void testAutoOnlyBasePushConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("AUTO", "");
+ initAndProduceMessage(false, CONSUMER_TYPE_PUSH);
+ Thread.sleep(60000);
+ String pushBaseResult = getPushBaseResult();
+ int baseBaseCount = parseBaseMessageCount(pushBaseResult);
+ int baseGrayCount = parseGrayMessageCount(pushBaseResult);
+ Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY_PULL")
+ public void testAutoBaseExcPullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("AUTO", "gray");
+ initAndProduceMessage(false, CONSUMER_TYPE_PULL);
+ Thread.sleep(5000);
+ String baseResult = getPullBaseResult();
+ int baseBaseCount = parseBaseMessageCount(baseResult);
+ int baseGrayCount = parseGrayMessageCount(baseResult);
+ Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches =
+ "CONSUMER_BASE_ONLY_LITE_PULL")
+ public void testAutoBaseExcLitePullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("AUTO", "gray");
+ String clientVersion = System.getProperty("test.version");
+ System.out.println("===========clientVersion========" + clientVersion);
+ if ("5.0.0".equals(clientVersion)) {
+ initAndProduceLitePullMessage(false);
+ } else {
+ initAndProduceMessage(false, CONSUMER_TYPE_LITE_PULL);
+ }
+ Thread.sleep(60000);
+ String litePullBaseResult = getLitePullBaseResult();
+ int baseBaseCount = parseBaseMessageCount(litePullBaseResult);
+ int baseGrayCount = parseGrayMessageCount(litePullBaseResult);
+ Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY_PUSH")
+ public void testAutoBaseExcPushConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("AUTO", "gray");
+ initAndProduceMessage(false, CONSUMER_TYPE_PUSH);
+ Thread.sleep(50000);
+ String pushBaseResult = getPushBaseResult();
+ int baseBaseCount = parseBaseMessageCount(pushBaseResult);
+ int baseGrayCount = parseGrayMessageCount(pushBaseResult);
+ Assertions.assertTrue(baseBaseCount == 1 && baseGrayCount == 0);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_GRAY_PULL")
+ public void testAutoBaseGrayPullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("AUTO", "");
+ initAndProduceMessage(true, CONSUMER_TYPE_PULL);
+ Thread.sleep(5000);
+ int baseBaseCount = parseBaseMessageCount(getPullBaseResult());
+ int grayGrayCount = parseGrayMessageCount(getPullGrayResult());
+ Assertions.assertTrue(baseBaseCount == 1 && grayGrayCount == 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches =
+ "CONSUMER_BASE_GRAY_LITE_PULL")
+ public void testAutoBaseGrayLitePullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("AUTO", "");
+ String clientVersion = System.getProperty("test.version");
+ System.out.println("===========clientVersion========" + clientVersion);
+ if ("5.0.0".equals(clientVersion)) {
+ initAndProduceLitePullMessage(true);
+ } else {
+ initAndProduceMessage(true, CONSUMER_TYPE_LITE_PULL);
+ }
+ Thread.sleep(60000);
+ int baseBaseCount = parseBaseMessageCount(getLitePullBaseResult());
+ int grayGrayCount = parseGrayMessageCount(getLitePullGrayResult());
+ Assertions.assertTrue(baseBaseCount == 1 && grayGrayCount == 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_GRAY_PUSH")
+ public void testAutoBaseGrayPushConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("AUTO", "");
+ initAndProduceMessage(true, CONSUMER_TYPE_PUSH);
+ Thread.sleep(50000);
+ int baseBaseCount = parseBaseMessageCount(getPushBaseResult());
+ int grayGrayCount = parseGrayMessageCount(getPushGrayResult());
+ Assertions.assertTrue(baseBaseCount == 1 && grayGrayCount == 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY_PULL")
+ public void testBaseBaseOnlyPullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "");
+ initAndProduceMessage(false, CONSUMER_TYPE_PULL);
+ Thread.sleep(5000);
+ String baseResult = getPullBaseResult();
+ int baseBaseCount = parseBaseMessageCount(baseResult);
+ int baseGrayCount = parseGrayMessageCount(baseResult);
+ Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches =
+ "CONSUMER_BASE_ONLY_LITE_PULL")
+ public void testBaseBaseOnlyLitePullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "");
+ String clientVersion = System.getProperty("test.version");
+ System.out.println("===========clientVersion========" + clientVersion);
+ if ("5.0.0".equals(clientVersion)) {
+ initAndProduceLitePullMessage(false);
+ } else {
+ initAndProduceMessage(false, CONSUMER_TYPE_LITE_PULL);
+ }
+ Thread.sleep(60000);
+ String litePullBaseResult = getLitePullBaseResult();
+ int baseBaseCount = parseBaseMessageCount(litePullBaseResult);
+ int baseGrayCount = parseGrayMessageCount(litePullBaseResult);
+ Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY_PUSH")
+ public void testBaseBaseOnlyPushConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "");
+ initAndProduceMessage(false, CONSUMER_TYPE_PUSH);
+ Thread.sleep(50000);
+ String pushBaseResult = getPushBaseResult();
+ int baseBaseCount = parseBaseMessageCount(pushBaseResult);
+ int baseGrayCount = parseGrayMessageCount(pushBaseResult);
+ Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY_PULL")
+ public void testBaseBaseExcGrayPullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "gray");
+ initAndProduceMessage(false, CONSUMER_TYPE_PULL);
+ Thread.sleep(5000);
+ String baseResult = getPullBaseResult();
+ int baseBaseCount = parseBaseMessageCount(baseResult);
+ int baseGrayCount = parseGrayMessageCount(baseResult);
+ Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 0);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches =
+ "CONSUMER_BASE_ONLY_LITE_PULL")
+ public void testBaseBaseExcGrayLitePullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "gray");
+ String clientVersion = System.getProperty("test.version");
+ System.out.println("===========clientVersion========" + clientVersion);
+ if ("5.0.0".equals(clientVersion)) {
+ initAndProduceLitePullMessage(false);
+ } else {
+ initAndProduceMessage(false, CONSUMER_TYPE_LITE_PULL);
+ }
+ Thread.sleep(60000);
+ String litePullBaseResult = getLitePullBaseResult();
+ int baseBaseCount = parseBaseMessageCount(litePullBaseResult);
+ int baseGrayCount = parseGrayMessageCount(litePullBaseResult);
+ Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 0);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_ONLY_PUSH")
+ public void testBaseBaseExcGrayPushConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "gray");
+ initAndProduceMessage(false, CONSUMER_TYPE_PUSH);
+ Thread.sleep(50000);
+ String pushBaseResult = getPushBaseResult();
+ int baseBaseCount = parseBaseMessageCount(pushBaseResult);
+ int baseGrayCount = parseGrayMessageCount(pushBaseResult);
+ Assertions.assertTrue( baseBaseCount == 1 && baseGrayCount == 0);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_GRAY_PULL")
+ public void testBaseBaseGrayPullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "");
+ initAndProduceMessage(true, CONSUMER_TYPE_PULL);
+ Thread.sleep(5000);
+ int baseBaseCount = parseBaseMessageCount(getPullBaseResult());
+ int grayGrayCount = parseGrayMessageCount(getPullGrayResult());
+ Assertions.assertTrue(baseBaseCount == 1 && grayGrayCount <= 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches =
+ "CONSUMER_BASE_GRAY_LITE_PULL")
+ public void testBaseBaseGrayLitePullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "");
+ String clientVersion = System.getProperty("test.version");
+ System.out.println("===========clientVersion========" + clientVersion);
+ if ("5.0.0".equals(clientVersion)) {
+ initAndProduceLitePullMessage(true);
+ } else {
+ initAndProduceMessage(true, CONSUMER_TYPE_LITE_PULL);
+ }
+ Thread.sleep(60000);
+ int baseBaseCount = parseBaseMessageCount(getLitePullBaseResult());
+ int grayGrayCount = parseGrayMessageCount(getLitePullGrayResult());
+ Assertions.assertTrue(baseBaseCount == 1 && grayGrayCount <= 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_GRAY_PUSH")
+ public void testBaseBaseGrayPushConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "");
+ initAndProduceMessage(true, CONSUMER_TYPE_PUSH);
+ Thread.sleep(60000);
+ int baseBaseCount = parseBaseMessageCount(getPushBaseResult());
+ int grayGrayCount = parseGrayMessageCount(getPushGrayResult());
+ Assertions.assertTrue(baseBaseCount == 1 && grayGrayCount <= 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_GRAY_PULL")
+ public void testBaseBaseGrayExcGrayPullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "gray");
+ initAndProduceMessage(true, CONSUMER_TYPE_PULL);
+ Thread.sleep(5000);
+ int baseBaseCount = parseBaseMessageCount(getPullBaseResult());
+ int grayGrayCount = parseGrayMessageCount(getPullGrayResult());
+ Assertions.assertTrue(baseBaseCount == 1 && grayGrayCount == 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches =
+ "CONSUMER_BASE_GRAY_LITE_PULL")
+ public void testBaseBaseGrayExcGrayLitePullConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "gray");
+ String clientVersion = System.getProperty("test.version");
+ System.out.println("===========clientVersion========" + clientVersion);
+ if ("5.0.0".equals(clientVersion)) {
+ initAndProduceLitePullMessage(true);
+ } else {
+ initAndProduceMessage(true, CONSUMER_TYPE_LITE_PULL);
+ }
+ Thread.sleep(60000);
+ int baseBaseCount = parseBaseMessageCount(getLitePullBaseResult());
+ int grayGrayCount = parseGrayMessageCount(getLitePullGrayResult());
+ Assertions.assertTrue(baseBaseCount == 1 && grayGrayCount == 1);
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "grayscale.rocketmq.integration.test.type", matches = "CONSUMER_BASE_GRAY_PUSH")
+ public void testBaseBaseGrayExcGrayPushConsumeMessage() throws InterruptedException {
+ createGrayscaleConfig("BASE", "gray");
+ initAndProduceMessage(true, CONSUMER_TYPE_PUSH);
+ Thread.sleep(50000);
+ int baseBaseCount = parseBaseMessageCount(getPushBaseResult());
+ int grayGrayCount = parseGrayMessageCount(getPushGrayResult());
+ Assertions.assertTrue(baseBaseCount == 1 && grayGrayCount == 1);
+ }
+
+ private void initAndProduceMessage(boolean isGrayInstanceInit, String consumerType) throws InterruptedException {
+ clearCacheCount(isGrayInstanceInit);
+ initProducer(consumerType.equals(CONSUMER_TYPE_PULL) ? "initPullProducer" :
+ consumerType.equals(CONSUMER_TYPE_LITE_PULL) ? "initLitePullProducer" : "initPushProducer");
+ initConsumer(isGrayInstanceInit, consumerType);
+ Thread.sleep(20000);
+ produceMessage(consumerType.equals(CONSUMER_TYPE_PULL) ? "producePullMessage" :
+ consumerType.equals(CONSUMER_TYPE_LITE_PULL) ? "produceLitePullMessage" : "producePushMessage");
+ }
+
+ private void initAndProduceLitePullMessage(boolean isGrayInstanceInit) {
+ clearCacheCount(isGrayInstanceInit);
+ initProducer("initLitePullProducer");
+ produceMessage("produceLitePullMessage");
+ initConsumer(isGrayInstanceInit, CONSUMER_TYPE_LITE_PULL);
+ }
+
+ private String getPullBaseResult() {
+ return restTemplate.getForObject("http://127.0.0.1:9000/getPullConsumerMessageCount", String.class);
+ }
+
+ private String getPushBaseResult() {
+ return restTemplate.getForObject("http://127.0.0.1:9000/getPushConsumerMessageCount", String.class);
+ }
+
+ private String getLitePullBaseResult() {
+ return restTemplate.getForObject("http://127.0.0.1:9000/getLitePullConsumerMessageCount", String.class);
+ }
+
+ private String getPullGrayResult() {
+ return restTemplate.getForObject("http://127.0.0.1:9010/getPullConsumerMessageCount", String.class);
+ }
+
+ private String getPushGrayResult() {
+ return restTemplate.getForObject("http://127.0.0.1:9010/getPushConsumerMessageCount", String.class);
+ }
+
+ private String getLitePullGrayResult() {
+ return restTemplate.getForObject("http://127.0.0.1:9010/getLitePullConsumerMessageCount", String.class);
+ }
+
+ private void initProducer(String path) {
+ restTemplate.getForObject("http://127.0.0.1:9030/" + path, String.class);
+ restTemplate.getForObject("http://127.0.0.1:9040/" + path, String.class);
+ }
+
+ private void produceMessage(String path) {
+ restTemplate.getForObject("http://127.0.0.1:9030/" + path + "?message={1}", String.class, "message");
+ restTemplate.getForObject("http://127.0.0.1:9040/" + path + "?message={1}", String.class, "gray-message");
+ }
+
+ private void initConsumer(boolean isGrayInstanceInit, String consumerType) {
+ if (isGrayInstanceInit) {
+ // Trigger start gray consumer.
+ restTemplate.getForObject("http://127.0.0.1:9010/initConsumer?consumerType={1}", String.class,
+ consumerType);
+ }
+
+ // Trigger start base consumer.
+ restTemplate.getForObject("http://127.0.0.1:9000/initConsumer?consumerType={1}", String.class, consumerType);
+ }
+
+ private void clearCacheCount(boolean isGrayInstanceInit) {
+ if (isGrayInstanceInit) {
+ restTemplate.getForObject("http://127.0.0.1:9010/clearMessageCount", String.class);
+ }
+ restTemplate.getForObject("http://127.0.0.1:9000/clearMessageCount", String.class);
+ }
+
+ private void createGrayscaleConfig(String consumeMode, String excludeTag) {
+ String CONTENT = "enabled: true\n"
+ + "grayscale:\n"
+ + " - consumerGroupTag: gray\n"
+ + " serviceMeta:\n"
+ + " version: 1.0.1\n"
+ + " trafficTag:\n"
+ + " x_lane_canary: gray\n"
+ + "base:\n"
+ + " autoCheckDelayTime: 10\n"
+ + " consumeMode: " + consumeMode + "\n"
+ + " excludeGroupTags: [\""+ excludeTag + "\"]\n";
+
+ Assertions.assertTrue(kieClient.publishConfig(CONFIG_KEY, CONTENT));
+ }
+
+ @AfterEach
+ public void deleteGrayscaleConfig() {
+ try {
+ kieClient.deleteKey(CONFIG_KEY);
+ TimeUnit.SECONDS.sleep(2);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/KieClient.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/KieClient.java
new file mode 100644
index 0000000000..76ea118248
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/KieClient.java
@@ -0,0 +1,251 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.integration.support;
+
+import com.alibaba.fastjson.JSONObject;
+
+import io.sermant.demo.grayscale.rocketmq.integration.support.entity.KieConfigEntity;
+import io.sermant.demo.grayscale.rocketmq.integration.support.entity.KieResponse;
+import io.sermant.demo.grayscale.rocketmq.integration.support.utils.LabelGroupUtils;
+
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.HttpClientErrorException;
+import org.springframework.web.client.RestTemplate;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Kie response client
+ *
+ * @author chengyouling
+ * @since 2024-10-30
+ */
+public class KieClient {
+ private final RestTemplate restTemplate;
+
+ private final String url;
+
+ private final Map labels;
+
+ /**
+ * 构造函数
+ *
+ * @param restTemplate 请求器
+ */
+ public KieClient(RestTemplate restTemplate) {
+ this(restTemplate, null);
+ }
+
+ /**
+ * 构造函数
+ *
+ * @param restTemplate 请求器
+ * @param url 地址
+ */
+ public KieClient(RestTemplate restTemplate, String url) {
+ this(restTemplate, url, null);
+ }
+
+ /**
+ * 构造函数
+ *
+ * @param restTemplate 请求器
+ * @param url 地址
+ * @param labels 标签
+ */
+ public KieClient(RestTemplate restTemplate, String url, Map labels) {
+ this.restTemplate = restTemplate;
+ this.url = url == null ? "http://127.0.0.1:30110/v1/default/kie/kv" : url;
+ this.labels = labels;
+ }
+
+ /**
+ * 发布配置
+ *
+ * @param key 键
+ * @param value 值
+ * @return 是否发布成功
+ */
+ public boolean publishConfig(String key, String value) {
+ final KieConfigEntity configEntity = queryTargetKeyId(key);
+ if (configEntity != null) {
+ // 更新操作
+ return updateKey(configEntity, value);
+ } else {
+ // 新增操作
+ return addKey(key, value);
+ }
+ }
+
+ /**
+ * 更新配置
+ *
+ * @param configEntity 查询的config信息
+ * @param value 新的配置内容
+ * @return 是否更新成功
+ */
+ public boolean updateKey(KieConfigEntity configEntity, String value) {
+ HttpHeaders headers = new HttpHeaders();
+ MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
+ headers.setContentType(type);
+ headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+
+ HttpEntity entity = new HttpEntity<>(JSONObject.toJSONString(new KieRequest(configEntity.getKey(),
+ value, labels)),
+ headers);
+ String address = this.url + "/" + configEntity.getId();
+ restTemplate.put(address, entity);
+ return true;
+ }
+
+ /**
+ * 根据key删除配置
+ *
+ * @param key 键
+ * @return 是否删除成功
+ */
+ public boolean deleteKey(String key) {
+ final KieConfigEntity configEntity = this.queryTargetKeyId(key);
+ if (configEntity == null) {
+ return false;
+ }
+ String address = this.url + "/" + configEntity.getId();
+ restTemplate.delete(address);
+ return true;
+ }
+
+ /**
+ * 添加新配置
+ *
+ * @param key 键
+ * @param value 值
+ * @return 是否添加成功
+ */
+ public boolean addKey(String key, String value) {
+ HttpHeaders headers = new HttpHeaders();
+ MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8");
+ headers.setContentType(type);
+ headers.add("Accept", MediaType.APPLICATION_JSON.toString());
+
+ HttpEntity entity = new HttpEntity<>(JSONObject.toJSONString(new KieRequest(key, value, labels)), headers);
+
+ try {
+ ResponseEntity responseEntity = restTemplate.postForEntity(url, entity, String.class);
+ return responseEntity.getStatusCode() == HttpStatus.OK;
+ } catch (HttpClientErrorException ex) {
+ // ignored
+ }
+ return false;
+ }
+
+ private KieConfigEntity queryTargetKeyId(String key) {
+ final List query = query(null);
+ return query.stream().filter(kieConfigEntity -> kieConfigEntity.getKey().equals(key))
+ .findFirst().orElse(null);
+ }
+
+ /**
+ * 根据标签查询kie配置
+ *
+ * @param labels 新增标签
+ * @return kie响应
+ */
+ public List query(Map labels) {
+ Map curLabels = labels;
+ if (labels == null) {
+ curLabels = this.labels;
+ }
+ if (curLabels == null) {
+ curLabels = new HashMap<>();
+ curLabels.put("app", "default");
+ curLabels.put("environment", "development");
+ }
+ final String labelGroup = LabelGroupUtils.createLabelGroup(curLabels);
+ final String labelCondition = LabelGroupUtils.getLabelCondition(labelGroup);
+ String address = this.url + "?" + labelCondition + "&match=exact&revision=";
+ final ResponseEntity configs = restTemplate.getForEntity(address, String.class);
+ JSONObject result = JSONObject.parseObject(configs.getBody());
+ if (configs.getStatusCode().value() == 200) {
+ final KieResponse kieResponse = result.toJavaObject(KieResponse.class);
+ return kieResponse.getData();
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * kie请求
+ *
+ * @since 2022-07-12
+ */
+ static class KieRequest implements Serializable {
+ private String key;
+ private String value;
+ private Map labels;
+ private String status = "enabled";
+
+ public KieRequest(String key, String value, Map labels) {
+ this.key = key;
+ this.value = value;
+ this.labels = labels;
+ if (this.labels == null) {
+ this.labels = new HashMap<>();
+ this.labels.put("app", "default");
+ this.labels.put("environment", "development");
+ }
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public Map getLabels() {
+ return labels;
+ }
+
+ public void setLabels(Map labels) {
+ this.labels = labels;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieConfigEntity.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieConfigEntity.java
new file mode 100644
index 0000000000..d5035f3df2
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieConfigEntity.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.sermant.demo.grayscale.rocketmq.integration.support.entity;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Kie response entity
+ *
+ * @author chengyouling
+ * @since 2024-10-30
+ */
+public class KieConfigEntity {
+ private String id;
+ private String key;
+ private Map labels = new HashMap();
+ private String value;
+ private String valueType;
+ private String status;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public Map getLabels() {
+ return labels;
+ }
+
+ public void setLabels(Map labels) {
+ this.labels = labels;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public String getValueType() {
+ return valueType;
+ }
+
+ public void setValueType(String valueType) {
+ this.valueType = valueType;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ @Override
+ public String toString() {
+ return "KieConfigEntity{" +
+ "id='" + id + '\'' +
+ ", key='" + key + '\'' +
+ ", labels=" + labels +
+ ", value='" + value + '\'' +
+ ", valueType='" + valueType + '\'' +
+ ", status='" + status + '\'' +
+ '}';
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieResponse.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieResponse.java
new file mode 100644
index 0000000000..47f437c15c
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/entity/KieResponse.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.integration.support.entity;
+
+import java.util.List;
+
+/**
+ * Kie response
+ *
+ * @author chengyouling
+ * @since 2024-10-30
+ */
+public class KieResponse {
+ private List data;
+
+ public List getData() {
+ return data;
+ }
+
+ public void setData(List data) {
+ this.data = data;
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/utils/LabelGroupUtils.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/utils/LabelGroupUtils.java
new file mode 100644
index 0000000000..daabdfd9b2
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-integration-test/src/test/java/io/sermant/demo/grayscale/rocketmq/integration/support/utils/LabelGroupUtils.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.integration.support.utils;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * group生成工具
+ *
+ * @author zhouss
+ * @since 2021-11-23
+ */
+public class LabelGroupUtils {
+ private static final String GROUP_SEPARATOR = "&";
+
+ private static final String KV_SEPARATOR = "=";
+
+ /**
+ * 查询时使用的kv分隔符
+ */
+ private static final String LABEL_QUERY_SEPARATOR = ":";
+
+ /**
+ * 查询标签前缀
+ */
+ private static final String LABEL_PREFIX = "label=";
+
+ /**
+ * 键值对长度
+ */
+ private static final int KV_LEN = 2;
+
+ /**
+ * 默认组
+ */
+ private static final String DEFAULT_GROUP_KEY = "GROUP";
+
+ private LabelGroupUtils() {
+ }
+
+ /**
+ * 创建标签组
+ *
+ * @param labels 标签组
+ * @return labelGroup 例如: app=sc&service=helloService
+ */
+ public static String createLabelGroup(Map labels) {
+ if (labels == null || labels.isEmpty()) {
+ return "";
+ }
+ final StringBuilder group = new StringBuilder();
+ final List keys = new ArrayList<>(labels.keySet());
+
+ // 防止相同map因排序不同而导致最后的label不一致
+ Collections.sort(keys);
+ for (String key : keys) {
+ String value = labels.get(key);
+ if (key == null || value == null) {
+ continue;
+ }
+ group.append(key).append(KV_SEPARATOR).append(value).append(GROUP_SEPARATOR);
+ }
+ if (group.length() == 0) {
+ return "";
+ }
+ return group.deleteCharAt(group.length() - 1).toString();
+ }
+
+ /**
+ * 重组group, 防止因多个标签因顺序问题而导致group不同
+ *
+ * @param group 标签组
+ * @return group
+ */
+ public static String rebuildGroup(String group) {
+ if (isLabelGroup(group)) {
+ return createLabelGroup(resolveGroupLabels(group));
+ }
+ return LabelGroupUtils.createLabelGroup(Collections.singletonMap(DEFAULT_GROUP_KEY, group));
+ }
+
+ /**
+ * 是否为标签组key
+ *
+ * @param group 监听键
+ * @return 是否为标签组
+ */
+ public static boolean isLabelGroup(String group) {
+ return group != null && group.contains(KV_SEPARATOR);
+ }
+
+ /**
+ * 解析标签为map
+ *
+ * @param group 标签组 app=sc&service=helloService
+ * @return 标签键值对, 返回键值将会是有序的
+ */
+ public static Map resolveGroupLabels(String group) {
+ final Map result = new LinkedHashMap<>();
+ if (group == null) {
+ return result;
+ }
+ String curGroup = group;
+ if (!isLabelGroup(curGroup)) {
+ // 如果非group标签(ZK配置中心场景适配),则为该group创建标签
+ curGroup = LabelGroupUtils.createLabelGroup(Collections.singletonMap(DEFAULT_GROUP_KEY, curGroup));
+ }
+ try {
+ final String decode = URLDecoder.decode(curGroup, "UTF-8");
+ final String[] labels = decode.split("&");
+ for (String label : labels) {
+ final String[] labelKv = label.split("=");
+ if (labelKv.length == KV_LEN) {
+ result.put(labelKv[0], labelKv[1]);
+ } else if (labelKv.length == 1) {
+ // 仅配置了KEY的情况, 使用空串代替
+ result.put(labelKv[0], "");
+ }
+ }
+ } catch (UnsupportedEncodingException ignored) {
+ // ignored
+ }
+ return result;
+ }
+
+ /**
+ * 获取标签信息
+ *
+ * @param group 分组 app=sc&service=helloService转换label=app:sc&label=service:helloService
+ * @return 标签组条件
+ */
+ public static String getLabelCondition(String group) {
+ if (group == null || "".equals(group)) {
+ return group;
+ }
+ String curGroup = rebuildGroup(group);
+ final Map labels = resolveGroupLabels(curGroup);
+ final StringBuilder finalGroup = new StringBuilder();
+ for (Map.Entry entry : labels.entrySet()) {
+ finalGroup.append(LABEL_PREFIX)
+ .append(buildSingleLabel(entry.getKey(), entry.getValue()))
+ .append(GROUP_SEPARATOR);
+ }
+ return finalGroup.deleteCharAt(finalGroup.length() - 1).toString();
+ }
+
+ private static String buildSingleLabel(String key, String value) {
+ return key + LABEL_QUERY_SEPARATOR + value;
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/pom.xml
new file mode 100644
index 0000000000..98a7d47520
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/pom.xml
@@ -0,0 +1,49 @@
+
+
+
+ mq-grayscale-rocketmq-test
+ io.sermant.integration
+ 1.0.0
+
+ 4.0.0
+
+ grayscale-rocketmq-producer-demo
+ 1.0.0
+
+
+ 8
+ 8
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.apache.rocketmq
+ rocketmq-client
+
+
+ org.apache.rocketmq
+ rocketmq-common
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+
+
+
+
+ ${project.artifactId}
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/MqProducerController.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/MqProducerController.java
new file mode 100644
index 0000000000..41ef2ee299
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/MqProducerController.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.producer;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * producer controller
+ *
+ * @author chengyouling
+ * @since 2024-10-30
+ */
+@RestController
+public class MqProducerController {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MqProducerController.class);
+
+ @Value("${rocketmq.address}")
+ private String mqAddress;
+
+ @Value("${rocketmq.topic}")
+ private String mqTopic;
+
+ private DefaultMQProducer pullMqProducer;
+
+ private DefaultMQProducer litePullMqProducer;
+
+ private DefaultMQProducer pushMqProducer;
+
+ private final String errorMessage = "error";
+
+ private final String successMessage = "success";
+
+ private final int sendMsgTimeout = 60000;
+
+ /**
+ * init pull producer
+ *
+ * @return is success
+ */
+ @GetMapping("/initPullProducer")
+ public String initPullProducer() {
+ try {
+ if (pullMqProducer == null) {
+ pullMqProducer = createProducer();
+ pullMqProducer.start();
+ }
+ } catch (MQClientException e) {
+ LOGGER.error("init pull producer error!", e);
+ return errorMessage;
+ }
+ return successMessage;
+ }
+
+ /**
+ * pull producer produce message
+ *
+ * @param message message
+ * @return is success
+ */
+ @GetMapping("/producePullMessage")
+ public String producePullMessage(@RequestParam("message") String message) {
+ try {
+ if (pullMqProducer == null) {
+ initPullProducer();
+ }
+ Message mqMessage = new Message(mqTopic + "-PULL", message.getBytes(StandardCharsets.UTF_8));
+ pullMqProducer.send(mqMessage);
+ } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+ LOGGER.error("send pull message error, address={}, message={}", mqAddress, message, e);
+ return errorMessage;
+ }
+ return successMessage;
+ }
+
+ /**
+ * init lite pull producer
+ *
+ * @return is success
+ */
+ @GetMapping("/initLitePullProducer")
+ public String initLitePullProducer() {
+ try {
+ if (litePullMqProducer == null) {
+ litePullMqProducer = createProducer();
+ litePullMqProducer.start();
+ }
+ } catch (MQClientException e) {
+ LOGGER.error("init lite pull producer error!", e);
+ return errorMessage;
+ }
+ return successMessage;
+ }
+
+ /**
+ * lite pull producer produce message
+ *
+ * @param message message
+ * @return is success
+ */
+ @GetMapping("/produceLitePullMessage")
+ public String produceLitePullMessage(@RequestParam("message") String message) {
+ try {
+ if (litePullMqProducer == null) {
+ initLitePullProducer();
+ }
+ Message mqMessage = new Message(mqTopic + "-LITE-PULL", message.getBytes(StandardCharsets.UTF_8));
+ SendResult result = litePullMqProducer.send(mqMessage);
+ LOGGER.info(System.currentTimeMillis() + "produce lite pull message" + message + ",status:" + result.getSendStatus());
+ } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+ LOGGER.error("send lite pull message error, address={}, message={}", mqAddress, message, e);
+ return errorMessage;
+ }
+ return successMessage;
+ }
+
+ /**
+ * init push producer
+ *
+ * @return is success
+ */
+ @GetMapping("/initPushProducer")
+ public String initPushProducer() {
+ try {
+ if (pushMqProducer == null) {
+ pushMqProducer = createProducer();
+ pushMqProducer.start();
+ }
+ } catch (MQClientException e) {
+ LOGGER.error("init push producer error!", e);
+ return errorMessage;
+ }
+ return successMessage;
+ }
+
+ /**
+ * push producer produce message
+ *
+ * @param message message
+ * @return is success
+ */
+ @GetMapping("/producePushMessage")
+ public String producePushMessage(@RequestParam("message") String message) {
+ try {
+ if (pushMqProducer == null) {
+ initPushProducer();
+ }
+ Message mqMessage = new Message(mqTopic + "-PUSH", message.getBytes(StandardCharsets.UTF_8));
+ pushMqProducer.send(mqMessage);
+ } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+ LOGGER.error("send push message error, address={}, message={}", mqAddress, message, e);
+ return errorMessage;
+ }
+ return successMessage;
+ }
+
+ private DefaultMQProducer createProducer() throws MQClientException {
+ DefaultMQProducer mqProducer = new DefaultMQProducer("default");
+ mqProducer.setNamesrvAddr(mqAddress);
+ mqProducer.setSendMsgTimeout(sendMsgTimeout);
+ return mqProducer;
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/RocketMqProducerApplication.java b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/RocketMqProducerApplication.java
new file mode 100644
index 0000000000..4c9d33b13d
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/java/io/sermant/demo/grayscale/rocketmq/producer/RocketMqProducerApplication.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (C) 2024-2024 Sermant Authors. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.sermant.demo.grayscale.rocketmq.producer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+ * springboot starer
+ *
+ * @author chengyouling
+ * @since 2024-10-30
+ **/
+@SpringBootApplication
+public class RocketMqProducerApplication {
+ /**
+ * 启动类
+ *
+ * @param args 进程启动入参
+ */
+ public static void main(String[] args) {
+ SpringApplication.run(RocketMqProducerApplication.class, args);
+ }
+}
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/resources/application.yaml b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/resources/application.yaml
new file mode 100644
index 0000000000..5351a02721
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/grayscale-rocketmq-producer-demo/src/main/resources/application.yaml
@@ -0,0 +1,10 @@
+server:
+ port: 9030
+rocketmq:
+ address: 127.0.0.1:9876
+ topic: MESSAGE-GRAY
+management:
+ endpoints:
+ web:
+ exposure:
+ include: "*"
diff --git a/sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml b/sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml
new file mode 100644
index 0000000000..6f304c1094
--- /dev/null
+++ b/sermant-integration-tests/mq-grayscale-rocketmq-test/pom.xml
@@ -0,0 +1,61 @@
+
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.7.15
+
+
+ 4.0.0
+
+ io.sermant.integration
+ mq-grayscale-rocketmq-test
+ 1.0.0
+ pom
+
+
+ 8
+ 8
+ 2.7.15
+ 2021.0.9
+ 5.0.0
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ ${spring-boot.version}
+ pom
+ import
+
+
+ org.apache.rocketmq
+ rocketmq-client
+ ${rocketmq-client.version}
+
+
+ org.apache.rocketmq
+ rocketmq-common
+ ${rocketmq-client.version}
+
+
+
+
+
+ grayscale-rocketmq-integration-test
+ grayscale-rocketmq-producer-demo
+ grayscale-rocketmq-consumer-demo
+
+
+
diff --git a/sermant-integration-tests/pom.xml b/sermant-integration-tests/pom.xml
index 2883cd1e41..d426e3f621 100644
--- a/sermant-integration-tests/pom.xml
+++ b/sermant-integration-tests/pom.xml
@@ -21,5 +21,6 @@
mq-consume-prohibition-test
database-write-prohibition-test
xds-service-test
+ mq-grayscale-rocketmq-test