From 07015fccd0f5776df4e2c9e2f7f0d5a054e7e5ff Mon Sep 17 00:00:00 2001 From: lilai Date: Thu, 11 Jan 2024 14:30:57 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=98=9F=E5=88=97=E7=A6=81?= =?UTF-8?q?=E6=AD=A2=E6=B6=88=E8=B4=B9Kafka=E9=9B=86=E6=88=90=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: lilai --- .../common/mq-consume-prohibition/action.yml | 37 ++ .../mq-consume-prohibition/kafka/action.yml | 66 +++ ...q_consume_prohibition_integration_test.yml | 114 +++++ .../kafka-consumer-demo/pom.xml | 42 ++ .../kafka/KafkaConsumerApplication.java | 38 ++ .../controller/KafkaConsumerController.java | 206 +++++++++ .../src/main/resources/application.properties | 2 + .../pom.xml | 6 + .../common/utils/HttpRequestUtils.java | 5 +- .../pom.xml | 28 +- .../kafka/KafkaProhibitionTest.java | 402 ++++++++++++++++++ .../integration/utils/HttpRequestUtils.java | 68 +++ .../mq-consume-prohibition-test/pom.xml | 22 +- 13 files changed, 1027 insertions(+), 9 deletions(-) create mode 100644 .github/actions/common/mq-consume-prohibition/action.yml create mode 100644 .github/actions/scenarios/mq-consume-prohibition/kafka/action.yml create mode 100644 .github/workflows/mq_consume_prohibition_integration_test.yml create mode 100644 sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/pom.xml create mode 100644 sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/java/com/huaweicloud/sermant/test/mq/prohibition/kafka/KafkaConsumerApplication.java create mode 100644 sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/java/com/huaweicloud/sermant/test/mq/prohibition/kafka/controller/KafkaConsumerController.java create mode 100644 sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/resources/application.properties create mode 100644 sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/src/test/java/com/huaweicloud/sermant/mq/prohibition/integration/kafka/KafkaProhibitionTest.java create mode 100644 sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/src/test/java/com/huaweicloud/sermant/mq/prohibition/integration/utils/HttpRequestUtils.java diff --git a/.github/actions/common/mq-consume-prohibition/action.yml b/.github/actions/common/mq-consume-prohibition/action.yml new file mode 100644 index 0000000000..eb81278dc5 --- /dev/null +++ b/.github/actions/common/mq-consume-prohibition/action.yml @@ -0,0 +1,37 @@ +name: "Common operations" +description: "do something common for mq-consume-prohibition plugin test" +runs: + using: "composite" + steps: + - name: Set up JDK ${{ env.javaVersion }} + uses: actions/setup-java@v3 + with: + java-version: ${{ env.javaVersion }} + distribution: 'adopt' + cache: maven + - name: get zookeeper from cache + uses: actions/cache@v3 + with: + path: apache-zookeeper-3.6.3-bin.tar.gz + key: ${{ runner.os }}-apache-zookeeper-3.6.3 + - name: run zookeeper + shell: bash + run: | + tar -zxf apache-zookeeper-3.6.3-bin.tar.gz + bash apache-zookeeper-3.6.3-bin/bin/zkServer.sh start apache-zookeeper-3.6.3-bin/conf/zoo_sample.cfg + - name: cache dependencies + uses: actions/cache@v3 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven- + - name: cache agent + uses: actions/cache@v3 + with: + path: sermant-agent-*/ + key: ${{ runner.os }}-agent-${{ github.run_id }} + - name: entry + uses: ./.github/actions/common/entry + with: + log-dir: ./logs/mq-consume-prohibition/common \ No newline at end of file diff --git a/.github/actions/scenarios/mq-consume-prohibition/kafka/action.yml b/.github/actions/scenarios/mq-consume-prohibition/kafka/action.yml new file mode 100644 index 0000000000..2ec3f96011 --- /dev/null +++ b/.github/actions/scenarios/mq-consume-prohibition/kafka/action.yml @@ -0,0 +1,66 @@ +name: "Mq-consume-prohibition Plugin kafka Test" +description: "Auto test for mq-consume-prohibition by kafka" +runs: + using: composite + steps: + - name: entry + uses: ./.github/actions/common/entry + with: + log-dir: ./logs/mq-consume-prohibition/kafka + - name: get kafka from cache + uses: actions/cache@v3 + with: + path: kafka_2.13-2.7.0.tgz + key: ${{ runner.os }}-kafka_2.13-2.7.0 + restore-keys: | + ${{ runner.os }}-kafka_2.13-2.7.0 + - name: start kafka + shell: bash + run: | + tar -zxf kafka_2.13-2.7.0.tgz + nohup bash kafka_2.13-2.7.0/bin/kafka-server-start.sh kafka_2.13-2.7.0/config/server.properties & + - name: package demo + shell: bash + run: | + mvn package -Dkafka-client.version=${{ matrix.kafkaVersion }} -DskipTests -Pkafka-consumer-test --file \ + sermant-integration-tests/mq-consume-prohibition-test/pom.xml + - name: start first kafka consumer demo + shell: bash + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=default -jar \ + -Dsermant_log_dir=${{ env.logDir }}/kafka-consumer -Dservice_meta_zone=hangzhou -Dserver.port=7070 \ + -Dclient_id=test-client-1 \ + sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/target/kafka-consumer-demo.jar > ${{ env.logDir }}/kafka-consumer-1.log 2>&1 & + - name: start second kafka consumer demo + shell: bash + run: | + nohup java -javaagent:sermant-agent-${{ env.sermantVersion }}/agent/sermant-agent.jar=appName=default -jar \ + -Dsermant_log_dir=${{ env.logDir }}/kafka-consumer -Dservice_meta_zone=shanghai -Dserver.port=7071 \ + -Dclient_id=test-client-2 \ + sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/target/kafka-consumer-demo.jar > ${{ env.logDir }}/kafka-consumer-2.log 2>&1 & + - name: waiting for services start + shell: bash + run: | + ps -ef | grep java + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:7070/healthCheck 120 + bash ./sermant-integration-tests/scripts/checkService.sh http://127.0.0.1:7071/healthCheck 120 + - name: test kafka + shell: bash + run: | + mvn test -Dmq.consume.prohibition.test.type=KAFKA --file \ + sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/pom.xml + - name: exit + if: always() + uses: ./.github/actions/common/exit + with: + processor-keyword: mq-consume-prohibition|kafka + - name: if failure then upload error log + uses: actions/upload-artifact@v3 + if: ${{ failure() || cancelled() }} + with: + name: (${{ github.job }})-mq-consume-prohibition-kafka-(${{ matrix.kafkaVersion }}-logs + path: | + ./*.log + ./logs/** + if-no-files-found: warn + retention-days: 2 \ No newline at end of file diff --git a/.github/workflows/mq_consume_prohibition_integration_test.yml b/.github/workflows/mq_consume_prohibition_integration_test.yml new file mode 100644 index 0000000000..679ce5cca8 --- /dev/null +++ b/.github/workflows/mq_consume_prohibition_integration_test.yml @@ -0,0 +1,114 @@ +name: mq-consume-prohibition integration test +env: + sermantVersion: 1.0.0 +on: + push: + pull_request: + branches: + - '*' + paths: + - 'sermant-agentcore/**' + - 'sermant-integration-tests/mq-consume-prohibition-test/**' + - 'sermant-plugins/sermant-mq-consume-prohibition/**' + - '.github/workflows/mq-consume-prohibition_integration_test.yml' + - '.github/actions/scenarios/mq-consume-prohibition/**' + - '.github/actions/common/mq-consume-prohibition/**' +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}-${{ github.head_ref }} + cancel-in-progress: true +jobs: + download-midwares-and-cache: + name: download midwares and cache + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: cache zookeeper + uses: actions/cache@v3 + with: + path: apache-zookeeper-3.6.3-bin.tar.gz + key: ${{ runner.os }}-apache-zookeeper-3.6.3 + restore-keys: | + ${{ runner.os }}-apache-zookeeper-3.6.3 + - name: download zookeeper + run: | + export ROOT_PATH=$(pwd) + bash ./sermant-integration-tests/scripts/tryDownloadMidware.sh zk + - name: cache rocketmq + uses: actions/cache@v3 + with: + path: rocketmq-all-4.8.0-bin-release.zip + key: ${{ runner.os }}-rocketmq-all-4.8.0-bin-release + restore-keys: | + ${{ runner.os }}-rocketmq-all-4.8.0-bin-release + - name: download rocketmq + run: | + export ROOT_PATH=$(pwd) + bash ./sermant-integration-tests/scripts/tryDownloadMidware.sh rocketmq + - name: cache kafka + uses: actions/cache@v3 + with: + path: kafka_2.13-2.7.0.tgz + key: ${{ runner.os }}-kafka_2.13-2.7.0 + restore-keys: | + ${{ runner.os }}-kafka_2.13-2.7.0 + - name: download kafka + run: | + export ROOT_PATH=$(pwd) + bash ./sermant-integration-tests/scripts/tryDownloadMidware.sh kafka + build-agent-and-cache: + name: build agent and cache + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 8 + uses: actions/setup-java@v3 + with: + java-version: '8' + distribution: 'adopt' + cache: maven + - name: cache agent + uses: actions/cache@v3 + with: + path: sermant-agent-*/ + key: ${{ runner.os }}-agent-${{ github.run_id }} + - name: package agent + run: | + sed -i '/sermant-backend/d' pom.xml + sed -i '/sermant-integration-tests/d' pom.xml + sed -i '/sermant-injector/d' pom.xml + mvn package -DskipTests -Ptest --file pom.xml + test-for-mq-consume-prohibition-kafka: + name: Test for mq-consume-prohibition kafka + runs-on: ubuntu-latest + needs: [build-agent-and-cache, download-midwares-and-cache] + strategy: + matrix: + include: + - kafkaVersion: "2.0.1" + - kafkaVersion: "2.1.1" + - kafkaVersion: "2.2.2" + - kafkaVersion: "2.3.1" + - kafkaVersion: "2.4.0" + - kafkaVersion: "2.5.1" + - kafkaVersion: "2.6.3" + - kafkaVersion: "2.7.2" + - kafkaVersion: "2.8.2" + - kafkaVersion: "3.0.2" + - kafkaVersion: "3.1.2" + - kafkaVersion: "3.2.2" + - kafkaVersion: "3.3.2" + - kafkaVersion: "3.4.1" + - kafkaVersion: "3.5.2" + - kafkaVersion: "3.6.1" + fail-fast: false + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 100 + - name: set java version to environment + run: | + echo "javaVersion=8" >> $GITHUB_ENV + - name: common operations + uses: ./.github/actions/common/mq-consume-prohibition + - name: mq-consume-prohibition test for kafkaVersion=${{ matrix.kafkaVersion }} + uses: ./.github/actions/scenarios/mq-consume-prohibition/kafka \ No newline at end of file diff --git a/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/pom.xml b/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/pom.xml new file mode 100644 index 0000000000..4fdbf2c802 --- /dev/null +++ b/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/pom.xml @@ -0,0 +1,42 @@ + + + + mq-consume-prohibition-test + com.huaweicloud.sermant + 1.0.0 + + 4.0.0 + + kafka-consumer-demo + + + 8 + 8 + 2.7.0 + + + + + org.apache.kafka + kafka-clients + ${kafka-client.version} + + + org.springframework.boot + spring-boot-starter-web + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + + \ No newline at end of file diff --git a/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/java/com/huaweicloud/sermant/test/mq/prohibition/kafka/KafkaConsumerApplication.java b/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/java/com/huaweicloud/sermant/test/mq/prohibition/kafka/KafkaConsumerApplication.java new file mode 100644 index 0000000000..54bb810657 --- /dev/null +++ b/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/java/com/huaweicloud/sermant/test/mq/prohibition/kafka/KafkaConsumerApplication.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2024-2024 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.test.mq.prohibition.kafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * KafkaConsumer demo启动类 + * + * @author lilai + * @since 2024-01-09 + **/ +@SpringBootApplication +public class KafkaConsumerApplication { + /** + * 启动类 + * + * @param args 进程启动入参 + */ + public static void main(String[] args) { + SpringApplication.run(KafkaConsumerApplication.class, args); + } +} diff --git a/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/java/com/huaweicloud/sermant/test/mq/prohibition/kafka/controller/KafkaConsumerController.java b/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/java/com/huaweicloud/sermant/test/mq/prohibition/kafka/controller/KafkaConsumerController.java new file mode 100644 index 0000000000..7a9953bf2b --- /dev/null +++ b/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/java/com/huaweicloud/sermant/test/mq/prohibition/kafka/controller/KafkaConsumerController.java @@ -0,0 +1,206 @@ +/* + * Copyright (C) 2024-2024 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.test.mq.prohibition.kafka.controller; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult; +import org.apache.kafka.clients.admin.KafkaAdminClient; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.PostConstruct; + +/** + * KafkaConsumer接口 + * + * @author lilai + * @since 2024-01-09 + */ +@RestController +public class KafkaConsumerController { + private static final int POLL_DURATION = 2000; + + private static final int SLEEP_TIME = 3000; + + private static final String GROUP = "test"; + + private static final String TOPIC = "test-topic"; + + private static final int PARTITION_SUM = 2; + + private final AtomicBoolean closed = new AtomicBoolean(false); + + private KafkaAdminClient adminClient; + + private final Properties properties = new Properties(); + + private KafkaConsumer consumer; + + @Value("${client.id}") + private String clientId; + + /** + * 初始化 + */ + @PostConstruct + public void init() { + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + adminClient = (KafkaAdminClient) AdminClient.create(properties); + } + + /** + * 创建消费者实例,订阅多个Topic + */ + @GetMapping("/createConsumerWithMultiTopics") + public void createConsumerWithMultiTopics() { + createConsumer(Arrays.asList("test-topic-1", "test-topic-2")); + } + + /** + * 创建消费者实例,订阅1个Topic + */ + @GetMapping("/createConsumerWithOneTopic") + public void createConsumerWithOneTopic() { + createConsumer(Collections.singletonList(TOPIC)); + } + + /** + * 关闭消费者实例 + * + * @throws InterruptedException 中断异常 + */ + @GetMapping("/closeConsumer") + public void closeConsumer() throws InterruptedException { + closed.set(true); + Thread.sleep(SLEEP_TIME); + closed.set(false); + } + + /** + * 获取消费者组中消费者数量 + * + * @return 消费者数量 + * @throws InterruptedException 中断异常 + * @throws ExecutionException 执行异常 + */ + @GetMapping(value = "/getConsumerGroupSize") + public int getConsumerGroupSize() throws InterruptedException, + ExecutionException { + DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups( + Collections.singleton(GROUP)); + Map descriptions = describeConsumerGroupsResult.all() + .get(); + ConsumerGroupDescription groupDescription = descriptions.get(GROUP); + if (groupDescription == null) { + return 0; + } + return groupDescription.members().size(); + } + + /** + * 获取消费者订阅的topic + * + * @return 消费者订阅的topic + * @throws InterruptedException 中断异常 + * @throws ExecutionException 执行异常 + */ + @GetMapping(value = "/getConsumerTopic") + public Set getConsumerGroupTopic() throws InterruptedException, + ExecutionException { + DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups( + Collections.singleton(GROUP)); + Map descriptions = describeConsumerGroupsResult.all() + .get(); + ConsumerGroupDescription groupDescription = descriptions.get(GROUP); + if (groupDescription == null) { + return Collections.emptySet(); + } + for (MemberDescription description : groupDescription.members()) { + if (description.clientId().equals(clientId)) { + Set topics = new HashSet<>(); + description.assignment().topicPartitions() + .forEach(topicPartition -> topics.add(topicPartition.topic())); + return topics; + } + } + return Collections.emptySet(); + } + + /** + * 创建测试Topic + */ + @GetMapping(value = "/createTopics") + public void createTopic() { + NewTopic firstTopic = new NewTopic(TOPIC, PARTITION_SUM, (short) 1); + NewTopic secondTopic = new NewTopic("test-topic-1", 1, (short) 1); + NewTopic thirdTopic = new NewTopic("test-topic-2", 1, (short) 1); + adminClient.createTopics(Arrays.asList(firstTopic, secondTopic, thirdTopic)); + } + + /** + * 测试服务是否正常启动 + * + * @return String 响应 + */ + @GetMapping(value = "/healthCheck") + public String healthCheck() { + return "ok"; + } + + private void createConsumer(List topics) { + consumer = new KafkaConsumer(properties); + new Thread(() -> { + try { + consumer.subscribe(topics); + while (!closed.get()) { + consumer.poll(POLL_DURATION); + } + } catch (WakeupException exception) { + if (!closed.get()) { + throw exception; + } + } finally { + consumer.close(); + } + }).start(); + } +} diff --git a/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/resources/application.properties b/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/resources/application.properties new file mode 100644 index 0000000000..ef9792ed7f --- /dev/null +++ b/sermant-integration-tests/mq-consume-prohibition-test/kafka-consumer-demo/src/main/resources/application.properties @@ -0,0 +1,2 @@ +server.port=7070 +client.id=test-client-1 \ No newline at end of file diff --git a/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-common-demo/pom.xml b/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-common-demo/pom.xml index a64e968b4c..57457d7092 100644 --- a/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-common-demo/pom.xml +++ b/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-common-demo/pom.xml @@ -15,6 +15,7 @@ 8 8 4.5.13 + 1.2 @@ -35,6 +36,11 @@ org.slf4j slf4j-api + + commons-logging + commons-logging + ${common.log.version} + \ No newline at end of file diff --git a/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-common-demo/src/main/java/com/huaweicloud/sermant/mq/prohibition/common/utils/HttpRequestUtils.java b/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-common-demo/src/main/java/com/huaweicloud/sermant/mq/prohibition/common/utils/HttpRequestUtils.java index 0ac5c86448..de5ffd4743 100644 --- a/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-common-demo/src/main/java/com/huaweicloud/sermant/mq/prohibition/common/utils/HttpRequestUtils.java +++ b/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-common-demo/src/main/java/com/huaweicloud/sermant/mq/prohibition/common/utils/HttpRequestUtils.java @@ -38,13 +38,16 @@ public class HttpRequestUtils { private static final int SUCCESS_CODE = 200; + private HttpRequestUtils() { + } + /** * http的get请求 * * @param url http请求url * @return 响应体body */ - private static String doGet(String url) { + public static String doGet(String url) { try (CloseableHttpClient httpClient = HttpClients.createDefault()) { RequestConfig requestConfig = RequestConfig.custom() .build(); diff --git a/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/pom.xml b/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/pom.xml index 787fa4593c..5eba0d8975 100644 --- a/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/pom.xml +++ b/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/pom.xml @@ -15,6 +15,8 @@ 8 8 4.3.0 + 4.5.13 + 1.2 @@ -23,15 +25,33 @@ junit-jupiter test - - com.huaweicloud.sermant - mq-consume-prohibition-common-demo - org.apache.curator curator-recipes ${curator.version} + + org.apache.httpcomponents + httpclient + ${httpclient4x.version} + + + ch.qos.logback + logback-core + + + ch.qos.logback + logback-classic + + + org.slf4j + slf4j-api + + + commons-logging + commons-logging + ${common.log.version} + \ No newline at end of file diff --git a/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/src/test/java/com/huaweicloud/sermant/mq/prohibition/integration/kafka/KafkaProhibitionTest.java b/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/src/test/java/com/huaweicloud/sermant/mq/prohibition/integration/kafka/KafkaProhibitionTest.java new file mode 100644 index 0000000000..b87ff0a105 --- /dev/null +++ b/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/src/test/java/com/huaweicloud/sermant/mq/prohibition/integration/kafka/KafkaProhibitionTest.java @@ -0,0 +1,402 @@ +/* + * Copyright (C) 2024-2024 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.mq.prohibition.integration.kafka; + +import com.huaweicloud.sermant.mq.prohibition.integration.utils.DynamicConfigUtils; +import com.huaweicloud.sermant.mq.prohibition.integration.utils.HttpRequestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + +/** + * Kafka禁消费集成测试 + * + * @author lilai + * @since 2024-01-09 + */ +@EnabledIfSystemProperty(named = "mq.consume.prohibition.test.type", matches = "KAFKA") +public class KafkaProhibitionTest { + private static final String CONFIG_PATH = "/app=default&environment=&zone=hangzhou/sermant.mq.consume.globalConfig"; + + private static final String TEST_CLIENT_1_ADDRESS = "http://127.0.0.1:7070"; + + private static final String TEST_CLIENT_2_ADDRESS = "http://127.0.0.1:7071"; + + private static final String CREATE_TOPICS_API = "/createTopics"; + + private static final String CREATE_CONSUMER_WITH_ONE_TOPIC_API = "/createConsumerWithOneTopic"; + + private static final String CREATE_CONSUMER_WITH_MULTI_TOPIC_API = "/createConsumerWithMultiTopics"; + + private static final String GET_CONSUMER_GROUP_SIZE_API = "/getConsumerGroupSize"; + + private static final String GET_CONSUMER_TOPIC_API = "/getConsumerTopic"; + + private static final String CLOSE_CONSUMER_API = "/closeConsumer"; + + private static final long WAIT_TIME = 2000; + + private static final long LONG_WAIT_TIME = 5000; + + private static final String lINE_SEPARATOR = System.getProperty("line.separator"); + + /** + * 创建Topic + * + * @throws InterruptedException 中断异常 + */ + @BeforeAll + public static void setUp() throws InterruptedException { + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CREATE_TOPICS_API); + Thread.sleep(WAIT_TIME); + } + + /** + * 测试单个消费者订阅单个Topic,启动前关闭禁消费开关 + * + * @throws Exception 异常 + */ + @Test + public void testSingleConsumerSingleTopicStartUpWithSwitchOff() throws Exception { + // Consumer启动前关闭禁止消费配置 + String config = "enableKafkaProhibition: false" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, config); + // 创建单Topic的消费者test-client-1 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CREATE_CONSUMER_WITH_ONE_TOPIC_API); + Thread.sleep(WAIT_TIME); + int groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + String topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + try { + // 启动前关闭禁止消费的测试 + Assertions.assertEquals(1, groupSize); + Assertions.assertTrue(topics.contains("test-topic")); + } finally { + // 清理资源,关闭消费者test-client-1以及删除配置 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CLOSE_CONSUMER_API); + clearConfig(); + Thread.sleep(WAIT_TIME); + } + } + + /** + * 测试单个消费者订阅多个Topic,启动前关闭禁消费开关 + * + * @throws Exception 异常 + */ + @Test + public void testSingleConsumerMultiTopicStartUpWithSwitchOff() throws Exception { + // Consumer启动前关闭禁止消费配置 + String config = "enableKafkaProhibition: false" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic-1" + lINE_SEPARATOR + + " - test-topic-2"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, config); + // 创建单Topic的消费者test-client-1 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CREATE_CONSUMER_WITH_MULTI_TOPIC_API); + Thread.sleep(WAIT_TIME); + int groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + String topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + try { + // 启动前关闭禁止消费的测试 + Assertions.assertEquals(1, groupSize); + Assertions.assertTrue(topics.contains("test-topic-1")); + Assertions.assertTrue(topics.contains("test-topic-2")); + } finally { + // 清理资源,关闭消费者test-client-1以及删除配置 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CLOSE_CONSUMER_API); + clearConfig(); + Thread.sleep(WAIT_TIME); + } + } + + /** + * 测试单个消费者订阅单个Topic,启动前开启禁消费开关,运行时恢复消费,运行时开启禁消费开关 + * + * @throws Exception 异常 + */ + @Test + public void testSingleConsumerSingleTopic() throws Exception { + // Consumer启动前开启禁止消费配置 + String configOn = "enableKafkaProhibition: true" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOn); + // 创建单Topic的消费者test-client-1 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CREATE_CONSUMER_WITH_ONE_TOPIC_API); + Thread.sleep(WAIT_TIME); + int groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + String topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + try { + // 启动前开启禁止消费的测试 + Assertions.assertEquals(0, groupSize); + Assertions.assertEquals("[]", topics); + // 运行时下发恢复消费的配置 + String configOff = "enableKafkaProhibition: false" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOff); + Thread.sleep(WAIT_TIME); + groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + // 运行时恢复消费的测试 + Assertions.assertEquals(1, groupSize); + Assertions.assertTrue(topics.contains("test-topic")); + // 运行时下发开启禁止消费的配置 + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOn); + Thread.sleep(WAIT_TIME); + groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + Assertions.assertEquals(0, groupSize); + Assertions.assertEquals("[]", topics); + } finally { + // 清理资源,关闭消费者test-client-1以及删除配置 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CLOSE_CONSUMER_API); + Thread.sleep(WAIT_TIME); + clearConfig(); + } + } + + /** + * 测试单个消费者订阅多个Topic,启动前开启全部Topic的禁消费开关,运行时恢复消费,运行时开启禁消费开关 + * + * @throws Exception 异常 + */ + @Test + public void testSingleConsumerMultiTopicsWithAllTopicsToProhibit() throws Exception { + // Consumer启动前开启禁止消费所有Topic的配置 + String configOn = "enableKafkaProhibition: true" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic-1" + lINE_SEPARATOR + + " - test-topic-2"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOn); + // 创建多Topic的消费者test-client-1 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CREATE_CONSUMER_WITH_MULTI_TOPIC_API); + Thread.sleep(WAIT_TIME); + int groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + String topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + try { + // 启动前开启禁止消费的测试 + Assertions.assertEquals(0, groupSize); + Assertions.assertEquals("[]", topics); + // 运行时下发恢复消费所有Topic的配置 + String configOff = "enableKafkaProhibition: false" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic-1" + lINE_SEPARATOR + + " - test-topic-2"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOff); + Thread.sleep(WAIT_TIME); + groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + // 运行时恢复消费所有Topic的测试 + Assertions.assertEquals(1, groupSize); + Assertions.assertTrue(topics.contains("test-topic-1")); + Assertions.assertTrue(topics.contains("test-topic-2")); + // 运行时下发开启禁止消费所有Topic的配置 + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOn); + Thread.sleep(WAIT_TIME); + groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + Assertions.assertEquals(0, groupSize); + Assertions.assertEquals("[]", topics); + } finally { + // 清理资源,关闭消费者test-client-1以及删除配置 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CLOSE_CONSUMER_API); + Thread.sleep(WAIT_TIME); + clearConfig(); + } + } + + /** + * 测试单个消费者订阅多个Topic,启动前开启单个Topic的禁消费开关,运行时恢复消费,运行时开启禁消费开关 + * + * @throws Exception 异常 + */ + @Test + public void testSingleConsumerMultiTopicsWithOneTopicToProhibit() throws Exception { + // Consumer启动前开启禁止消费所有Topic的配置 + String configOn = "enableKafkaProhibition: true" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic-1"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOn); + // 创建多Topic的消费者test-client-1 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CREATE_CONSUMER_WITH_MULTI_TOPIC_API); + Thread.sleep(WAIT_TIME); + int groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + String topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + try { + // 启动前开启禁止消费的测试 + Assertions.assertEquals(1, groupSize); + Assertions.assertFalse(topics.contains("test-topic-1")); + Assertions.assertTrue(topics.contains("test-topic-2")); + // 运行时下发恢复消费所有Topic的配置 + String configOff = "enableKafkaProhibition: false" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic-1"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOff); + Thread.sleep(LONG_WAIT_TIME); + groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + // 运行时恢复消费所有Topic的测试 + Assertions.assertEquals(1, groupSize); + Assertions.assertTrue(topics.contains("test-topic-1")); + Assertions.assertTrue(topics.contains("test-topic-2")); + // 运行时下发开启禁止消费所有Topic的配置 + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOn); + Thread.sleep(LONG_WAIT_TIME); + groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + Assertions.assertEquals(1, groupSize); + Assertions.assertFalse(topics.contains("test-topic-1")); + Assertions.assertTrue(topics.contains("test-topic-2")); + } finally { + // 清理资源,关闭消费者test-client-1以及删除配置 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CLOSE_CONSUMER_API); + Thread.sleep(WAIT_TIME); + clearConfig(); + } + } + + /** + * 测试两个消费者订阅同一个Topic,启动前开启其中一个消费者的禁消费开关,运行时恢复消费,运行时开启禁消费开关 + * + * @throws Exception 异常 + */ + @Test + public void testMultiConsumersOneTopicWithOneConsumerToProhibit() throws Exception { + // Consumer启动前开启禁止消费的配置 + String configOn = "enableKafkaProhibition: true" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOn); + // 创建单Topic的消费者test-client-1,test-client-2 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CREATE_CONSUMER_WITH_ONE_TOPIC_API); + HttpRequestUtils.doGet(TEST_CLIENT_2_ADDRESS + CREATE_CONSUMER_WITH_ONE_TOPIC_API); + Thread.sleep(WAIT_TIME); + int groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + String topicsForFirstClient; + String topicsForSecondClient = HttpRequestUtils.doGet(TEST_CLIENT_2_ADDRESS + GET_CONSUMER_TOPIC_API); + try { + // 启动前开启禁止消费的测试 + Assertions.assertEquals(1, groupSize); + Assertions.assertTrue(topicsForSecondClient.contains("test-topic")); + // 运行时下发恢复消费的配置 + String configOff = "enableKafkaProhibition: false" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOff); + Thread.sleep(LONG_WAIT_TIME); + groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + topicsForFirstClient = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + topicsForSecondClient = HttpRequestUtils.doGet(TEST_CLIENT_2_ADDRESS + GET_CONSUMER_TOPIC_API); + // 运行时恢复消费的测试 + Assertions.assertEquals(2, groupSize); + Assertions.assertTrue(topicsForFirstClient.contains("test-topic")); + Assertions.assertTrue(topicsForSecondClient.contains("test-topic")); + // 运行时下发开启禁止消费所有Topic的配置 + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOn); + Thread.sleep(LONG_WAIT_TIME); + groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + topicsForSecondClient = HttpRequestUtils.doGet(TEST_CLIENT_2_ADDRESS + GET_CONSUMER_TOPIC_API); + Assertions.assertEquals(1, groupSize); + Assertions.assertTrue(topicsForSecondClient.contains("test-topic")); + } finally { + // 清理资源,关闭消费者test-client-1、test-client-2以及删除配置 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CLOSE_CONSUMER_API); + HttpRequestUtils.doGet(TEST_CLIENT_2_ADDRESS + CLOSE_CONSUMER_API); + Thread.sleep(WAIT_TIME); + clearConfig(); + } + } + + /** + * 测试单个消费者订阅单个Topic,开启禁消费前消费者shutdown + * + * @throws Exception 异常 + */ + @Test + public void testSingleConsumerShutDownBeforeSwitchOn() throws Exception { + // 创建单Topic的消费者test-client-1 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CREATE_CONSUMER_WITH_ONE_TOPIC_API); + Thread.sleep(WAIT_TIME); + // 关闭消费者test-client-1 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CLOSE_CONSUMER_API); + // 开启禁止消费配置 + String configOn = "enableKafkaProhibition: true" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOn); + int groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + String topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + try { + Assertions.assertEquals(0, groupSize); + Assertions.assertEquals("[]", topics); + } finally { + // 清理资源 + Thread.sleep(WAIT_TIME); + clearConfig(); + } + } + + /** + * 测试单个消费者订阅单个Topic,恢复禁消费前消费者shutdown + * + * @throws Exception 异常 + */ + @Test + public void testSingleConsumerShutDownBeforeSwitchOff() throws Exception { + // 创建单Topic的消费者test-client-1 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CREATE_CONSUMER_WITH_ONE_TOPIC_API); + Thread.sleep(WAIT_TIME); + // 开启禁止消费配置 + String configOn = "enableKafkaProhibition: true" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic"; + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOn); + // 关闭消费者test-client-1 + HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + CLOSE_CONSUMER_API); + // 关闭禁止消费配置 + String configOff = "enableKafkaProhibition: false" + lINE_SEPARATOR + + "kafkaTopics:" + lINE_SEPARATOR + + " - test-topic"; + int groupSize = Integer.parseInt(HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_GROUP_SIZE_API)); + String topics = HttpRequestUtils.doGet(TEST_CLIENT_1_ADDRESS + GET_CONSUMER_TOPIC_API); + DynamicConfigUtils.updateConfig(CONFIG_PATH, configOff); + try { + Assertions.assertEquals(0, groupSize); + Assertions.assertEquals("[]", topics); + } finally { + // 清理资源 + Thread.sleep(WAIT_TIME); + clearConfig(); + } + } + + /** + * 清除禁消费配置 + * + * @throws Exception 异常 + */ + private void clearConfig() throws Exception { + String config = "enableKafkaProhibition: false"; + DynamicConfigUtils.updateConfig("/app=default&environment=&zone=hangzhou/sermant.mq.consume.globalConfig", + config); + } +} diff --git a/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/src/test/java/com/huaweicloud/sermant/mq/prohibition/integration/utils/HttpRequestUtils.java b/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/src/test/java/com/huaweicloud/sermant/mq/prohibition/integration/utils/HttpRequestUtils.java new file mode 100644 index 0000000000..71d116f719 --- /dev/null +++ b/sermant-integration-tests/mq-consume-prohibition-test/mq-consume-prohibition-integration-test/src/test/java/com/huaweicloud/sermant/mq/prohibition/integration/utils/HttpRequestUtils.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2024-2024 Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.huaweicloud.sermant.mq.prohibition.integration.utils; + +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * http请求工具类 + * + * @author daizhenyu + * @since 2024-01-08 + **/ +public class HttpRequestUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(HttpRequestUtils.class); + + private static final int SUCCESS_CODE = 200; + + private HttpRequestUtils() { + } + + /** + * http的get请求 + * + * @param url http请求url + * @return 响应体body + */ + public static String doGet(String url) { + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + RequestConfig requestConfig = RequestConfig.custom() + .build(); + HttpGet httpGet = new HttpGet(url); + httpGet.setConfig(requestConfig); + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + if (response.getStatusLine().getStatusCode() == SUCCESS_CODE) { + return EntityUtils.toString(response.getEntity()); + } + LOGGER.info("Request error, the message is: {}", EntityUtils.toString(response.getEntity())); + return ""; + } + } catch (IOException e) { + LOGGER.info("Request exception, the message is: {}", e.getMessage()); + return ""; + } + } +} diff --git a/sermant-integration-tests/mq-consume-prohibition-test/pom.xml b/sermant-integration-tests/mq-consume-prohibition-test/pom.xml index d688dc31ee..73494bd385 100644 --- a/sermant-integration-tests/mq-consume-prohibition-test/pom.xml +++ b/sermant-integration-tests/mq-consume-prohibition-test/pom.xml @@ -15,11 +15,7 @@ mq-consume-prohibition-test pom - rocketmq-consumer-push-demo mq-consume-prohibition-common-demo - rocketmq-consumer-pull-subscribe-demo - rocketmq-consumer-pull-assign-demo - rocketmq-producer-demo mq-consume-prohibition-integration-test @@ -39,4 +35,22 @@ + + + kafka-consumer-test + + kafka-consumer-demo + + + + rocketmq-consume-test + + rocketmq-consumer-push-demo + rocketmq-consumer-pull-subscribe-demo + rocketmq-consumer-pull-assign-demo + rocketmq-producer-demo + + + + \ No newline at end of file