From 393327b9309df4ee0accab842bc99dd16eebfac0 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Mon, 18 Nov 2024 10:06:54 +0800 Subject: [PATCH 01/14] RUN CI --- .github/workflows/pipe-it-2cluster.yml | 139 +------------------------ 1 file changed, 3 insertions(+), 136 deletions(-) diff --git a/.github/workflows/pipe-it-2cluster.yml b/.github/workflows/pipe-it-2cluster.yml index 7c2143d6d2c9..825556b1896d 100644 --- a/.github/workflows/pipe-it-2cluster.yml +++ b/.github/workflows/pipe-it-2cluster.yml @@ -32,102 +32,6 @@ env: DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} jobs: - auto-create-schema: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2AutoCreateSchema \ - -ntp - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-auto-create-schema-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 - manual-create-schema: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode, PipeConsensusBatchMode, PipeConsensusStreamMode] - cluster2: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - os: [ ubuntu-latest ] - exclude: - - cluster1: LightWeightStandaloneMode - cluster2: LightWeightStandaloneMode - - cluster1: LightWeightStandaloneMode - cluster2: ScalableSingleNodeMode - - cluster1: ScalableSingleNodeMode - cluster2: LightWeightStandaloneMode - - cluster1: ScalableSingleNodeMode - cluster2: HighPerformanceMode - - cluster1: HighPerformanceMode - cluster2: LightWeightStandaloneMode - - cluster1: HighPerformanceMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusBatchMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusBatchMode - cluster2: HighPerformanceMode - - cluster1: PipeConsensusStreamMode - cluster2: LightWeightStandaloneMode - - cluster1: PipeConsensusStreamMode - cluster2: HighPerformanceMode - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2ManualCreateSchema \ - -ntp - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-manual-create-schema-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 subscription-arch-verification: strategy: fail-fast: false @@ -135,7 +39,7 @@ jobs: matrix: java: [ 17 ] # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ ScalableSingleNodeMode ] + cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, ScalableSingleNodeMode ] cluster2: [ ScalableSingleNodeMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} @@ -173,7 +77,7 @@ jobs: matrix: java: [ 17 ] # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] + cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, ScalableSingleNodeMode ] cluster2: [ ScalableSingleNodeMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} @@ -211,7 +115,7 @@ jobs: matrix: java: [ 17 ] # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ ScalableSingleNodeMode ] + cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, ScalableSingleNodeMode ] cluster2: [ ScalableSingleNodeMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} @@ -242,40 +146,3 @@ jobs: name: cluster-log-subscription-regression-misc-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} path: integration-test/target/cluster-logs retention-days: 30 - table-model: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [17] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster: [LightWeightStandaloneMode, ScalableSingleNodeMode, HighPerformanceMode] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster }} \ - -pl integration-test \ - -am -PMultiClusterIT2TableModel \ - -ntp - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-table-model-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster }}-${{ matrix.cluster }} - path: integration-test/target/cluster-logs - retention-days: 30 From afee1ef967a95f62b24cbe5858567374bd7b68dc Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Tue, 19 Nov 2024 00:05:17 +0800 Subject: [PATCH 02/14] setup --- ...HistoricalDataRegionTsFileAndDeletionExtractor.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java index ee44ac465bec..9fdfad891e08 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; @@ -506,7 +507,14 @@ private void flushTsFilesForExtraction( final long lastFlushedByPipeTime = DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId); if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) { dataRegion.syncCloseAllWorkingTsFileProcessors(); - DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis()); + // Consider the scenario: a consensus pipe comes to the same region, followed by a user pipe + // **immediately**. + // Since a large number of consensus pipes are not created at the same time, resulting in no + // serious waiting for locks, the lastFlushedByPipeTime timestamp is not updated for the + // consensus pipe. + if (!pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis()); + } LOGGER.info( "Pipe {}@{}: finish to flush data region, took {} ms", pipeName, From 370beb80dba3af50e19ed0f8c4ae7c1b4f5e797a Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Thu, 21 Nov 2024 11:25:20 +0800 Subject: [PATCH 03/14] try fix --- .../regression/AbstractSubscriptionRegressionIT.java | 12 +++++++++--- .../consumer/SubscriptionPullConsumer.java | 4 ++++ .../task/subtask/processor/PipeProcessorSubtask.java | 4 ++-- .../task/subtask/SubscriptionConnectorSubtask.java | 12 ++---------- .../agent/task/execution/PipeSubtaskExecutor.java | 6 +++--- .../task/subtask/PipeAbstractConnectorSubtask.java | 6 +++--- .../commons/pipe/agent/task/subtask/PipeSubtask.java | 4 ++-- 7 files changed, 25 insertions(+), 23 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java index fe32817ad007..bd9ac1c2479d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java @@ -299,7 +299,9 @@ public void consume_data(SubscriptionPullConsumer consumer, Session session) session.insertTablet(tablet); } } - consumer.commitSync(messages); + if (!consumer.isAutoCommit()) { + consumer.commitSync(messages); + } } } @@ -349,7 +351,9 @@ public List consume_tsfile(SubscriptionPullConsumer consumer, List results = new ArrayList<>(devices.size()); for (AtomicInteger rowCount : rowCounts) { @@ -375,7 +379,9 @@ public static void consume_data_long( session.insertTablet(tablet); } } - consumer.commitSync(messages); + if (!consumer.isAutoCommit()) { + consumer.commitSync(messages); + } } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java index ae9f99b969d9..79acdf6bd2cd 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java @@ -70,6 +70,10 @@ boolean isClosed() { return isClosed.get(); } + public boolean isAutoCommit() { + return autoCommit; + } + /////////////////////////////// ctor /////////////////////////////// protected SubscriptionPullConsumer(final SubscriptionPullConsumer.Builder builder) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index e5383bfaae41..7f29b3c55f99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory; import java.util.Objects; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; public class PipeProcessorSubtask extends PipeReportableSubtask { @@ -96,7 +96,7 @@ public PipeProcessorSubtask( @Override public void bindExecutors( final ListeningExecutorService subtaskWorkerThreadPoolExecutor, - final ScheduledExecutorService ignored, + final ExecutorService ignored, final PipeSubtaskScheduler subtaskScheduler) { this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor; this.subtaskScheduler = subtaskScheduler; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java index c5b841a8c66f..9b7587381d99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.subscription.task.subtask; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; -import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; import org.apache.iotdb.pipe.api.PipeConnector; @@ -31,8 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; - public class SubscriptionConnectorSubtask extends PipeConnectorSubtask { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConnectorSubtask.class); @@ -76,13 +73,8 @@ public UnboundedBlockingPendingQueue getInputPendingQueue() { @Override protected void registerCallbackHookAfterSubmit(final ListenableFuture future) { - final ListenableFuture nextFuture = - Futures.withTimeout( - future, - Duration.ofSeconds( - SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs()), - subtaskCallbackListeningExecutor); - Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor); + // TODO: Futures.withTimeout + Futures.addCallback(future, this, subtaskCallbackListeningExecutor); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java index c4c96dad3e77..4ea7714962bf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java @@ -32,14 +32,14 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; public abstract class PipeSubtaskExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtaskExecutor.class); - private static final ScheduledExecutorService subtaskCallbackListeningExecutor = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + private static final ExecutorService subtaskCallbackListeningExecutor = + IoTDBThreadPoolFactory.newSingleThreadExecutor( ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName()); protected final WrappedThreadPoolExecutor underlyingThreadPool; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java index 58cc142713d0..cfd987758e46 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java @@ -33,7 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; public abstract class PipeAbstractConnectorSubtask extends PipeReportableSubtask { @@ -43,7 +43,7 @@ public abstract class PipeAbstractConnectorSubtask extends PipeReportableSubtask protected PipeConnector outputPipeConnector; // For thread pool to execute callbacks - protected ScheduledExecutorService subtaskCallbackListeningExecutor; + protected ExecutorService subtaskCallbackListeningExecutor; // For controlling subtask submitting, making sure that // a subtask is submitted to only one thread at a time @@ -62,7 +62,7 @@ protected PipeAbstractConnectorSubtask( @Override public void bindExecutors( final ListeningExecutorService subtaskWorkerThreadPoolExecutor, - final ScheduledExecutorService subtaskCallbackListeningExecutor, + final ExecutorService subtaskCallbackListeningExecutor, final PipeSubtaskScheduler subtaskScheduler) { this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor; this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java index 1169711b46d2..2da797c2b3b5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -65,7 +65,7 @@ protected PipeSubtask(final String taskID, final long creationTime) { public abstract void bindExecutors( ListeningExecutorService subtaskWorkerThreadPoolExecutor, - ScheduledExecutorService subtaskCallbackListeningExecutor, + ExecutorService subtaskCallbackListeningExecutor, PipeSubtaskScheduler subtaskScheduler); @Override From 79cc30c85525f1c4418b01a0ead3f8948cee7151 Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Thu, 21 Nov 2024 11:29:24 +0800 Subject: [PATCH 04/14] Revert "setup" This reverts commit afee1ef967a95f62b24cbe5858567374bd7b68dc. --- ...HistoricalDataRegionTsFileAndDeletionExtractor.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java index 21b46faa24fb..9932ba42ed98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; @@ -523,14 +522,7 @@ private void flushTsFilesForExtraction( final long lastFlushedByPipeTime = DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId); if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) { dataRegion.syncCloseAllWorkingTsFileProcessors(); - // Consider the scenario: a consensus pipe comes to the same region, followed by a user pipe - // **immediately**. - // Since a large number of consensus pipes are not created at the same time, resulting in no - // serious waiting for locks, the lastFlushedByPipeTime timestamp is not updated for the - // consensus pipe. - if (!pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { - DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis()); - } + DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis()); LOGGER.info( "Pipe {}@{}: finish to flush data region, took {} ms", pipeName, From ecf7e435aec968b5b654e2e48dc2754d7ac73a2c Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Thu, 21 Nov 2024 11:30:25 +0800 Subject: [PATCH 05/14] fixup --- .../PipeHistoricalDataRegionTsFileAndDeletionExtractor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java index 9932ba42ed98..61b721f88777 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; From 5b352dbbb15c35c45d6221bcc438ab7564343da4 Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Thu, 21 Nov 2024 15:41:15 +0800 Subject: [PATCH 06/14] try fix --- .github/workflows/pipe-it-2cluster.yml | 6 +-- .../it/IoTDBSubscriptionITConstant.java | 1 + .../AbstractSubscriptionRegressionIT.java | 38 +++++++------------ 3 files changed, 17 insertions(+), 28 deletions(-) diff --git a/.github/workflows/pipe-it-2cluster.yml b/.github/workflows/pipe-it-2cluster.yml index 825556b1896d..ba36f00ceb29 100644 --- a/.github/workflows/pipe-it-2cluster.yml +++ b/.github/workflows/pipe-it-2cluster.yml @@ -39,7 +39,7 @@ jobs: matrix: java: [ 17 ] # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, ScalableSingleNodeMode ] + cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] cluster2: [ ScalableSingleNodeMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} @@ -77,7 +77,7 @@ jobs: matrix: java: [ 17 ] # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, ScalableSingleNodeMode ] + cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] cluster2: [ ScalableSingleNodeMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} @@ -115,7 +115,7 @@ jobs: matrix: java: [ 17 ] # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, ScalableSingleNodeMode ] + cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] cluster2: [ ScalableSingleNodeMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java index 5b8ec3932741..8fb2872bfea8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java @@ -40,4 +40,5 @@ public class IoTDBSubscriptionITConstant { public static final long SLEEP_NS = 1_000_000_000L; public static final long POLL_TIMEOUT_MS = 10_000L; + public static final int MAX_RETRY_TIMES = 3; } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java index bd9ac1c2479d..239c30834c57 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java @@ -57,6 +57,7 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.MAX_RETRY_TIMES; import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS; public abstract class AbstractSubscriptionRegressionIT extends AbstractSubscriptionTripleIT { @@ -285,12 +286,17 @@ public void consume_data(SubscriptionPullConsumer consumer, Session session) StatementExecutionException, InterruptedException, IoTDBConnectionException { + int retryCount = 0; while (true) { Thread.sleep(1000); - + // That is, the consumer poll will keep pulling if no messages are fetched within the timeout, + // until a message is fetched or the time exceeds the timeout. List messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); if (messages.isEmpty()) { - break; + retryCount++; + if (retryCount >= MAX_RETRY_TIMES) { + break; + } } for (final SubscriptionMessage message : messages) { for (final Iterator it = message.getSessionDataSetsHandler().tabletIterator(); @@ -321,6 +327,7 @@ public List consume_tsfile(SubscriptionPullConsumer consumer, List consume_tsfile(SubscriptionPullConsumer consumer, List messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); if (messages.isEmpty()) { - break; + retryCount++; + if (retryCount >= MAX_RETRY_TIMES) { + break; + } } for (final SubscriptionMessage message : messages) { onReceived.incrementAndGet(); @@ -363,28 +373,6 @@ public List consume_tsfile(SubscriptionPullConsumer consumer, List messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); - if (messages.isEmpty()) { - Thread.sleep(1000); - } - for (final SubscriptionMessage message : messages) { - for (final Iterator it = message.getSessionDataSetsHandler().tabletIterator(); - it.hasNext(); ) { - final Tablet tablet = it.next(); - session.insertTablet(tablet); - } - } - if (!consumer.isAutoCommit()) { - consumer.commitSync(messages); - } - } - } - public void consume_data(SubscriptionPullConsumer consumer) throws TException, IOException, From 92a54e6930c22a2455453162dbe3984df07c5d15 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Thu, 21 Nov 2024 19:31:42 +0800 Subject: [PATCH 07/14] flush & enlarge timeout --- .github/workflows/pipe-it-2cluster.yml | 42 +------------------ .../it/IoTDBSubscriptionITConstant.java | 2 +- .../IoTDBDefaultPullConsumerDataSetIT.java | 1 + .../IoTDBDefaultTsfilePushConsumerIT.java | 1 + ...llConsumerWith1TopicShareProcessMixIT.java | 1 + .../param/IoTDBTestParamPullConsumerIT.java | 1 + ...tAutoCommitFalseDataSetPullConsumerIT.java | 1 + ...stAutoCommitTrueDataSetPullConsumerIT.java | 1 + .../IoTDBAllTsDatasetPullConsumerIT.java | 1 + .../IoTDBPathDeviceDataSetPullConsumerIT.java | 1 + .../IoTDBTimeTsDatasetPullConsumerIT.java | 1 + ...DBSnapshotDevicePullConsumerDataSetIT.java | 1 + ...sumer2With1TopicShareProcessDataSetIT.java | 1 + .../IoTDBOneConsumerMultiTopicsDatasetIT.java | 1 + .../IoTDBDBPatternPullConsumerDataSetIT.java | 1 + ...DBDefaultPatternPullConsumerDataSetIT.java | 1 + ...TDBDevicePatternPullConsumerDataSetIT.java | 1 + ...dleMatch2PatternPullConsumerDataSetIT.java | 1 + ...ddleMatchPatternPullConsumerDataSetIT.java | 1 + .../IoTDBTSPatternPullConsumerDataSetIT.java | 1 + .../time/IoTDBAllPullConsumerDataSetIT.java | 1 + .../IoTDBHistoryPullConsumerDataSetIT.java | 1 + .../IoTDBRealTimePullConsumerDataSetIT.java | 1 + ...imeRangeAccuratePullConsumerDataSetIT.java | 1 + .../IoTDBTimeRangePullConsumerDataSetIT.java | 1 + ...IoTDBTimeLooseTsDatasetPushConsumerIT.java | 1 + ...IoTDBTimeTsLooseDatasetPushConsumerIT.java | 1 + ...sumer2With1TopicShareProcessDataSetIT.java | 1 + .../IoTDBOneConsumerMultiTopicsDatasetIT.java | 1 + .../IoTDBRealTimeDBDatasetPushConsumerIT.java | 1 + ...eRangeAccurateDBDataSetPushConsumerIT.java | 1 + ...IoTDBTimeRangeDBDataSetPushConsumerIT.java | 1 + .../IoTDBDataSet1TopicConsumerSpecialIT.java | 1 + .../topic/IoTDBTestTopicNameIT.java | 1 + .../user/IoTDBOtherUserConsumerIT.java | 1 + 35 files changed, 36 insertions(+), 41 deletions(-) diff --git a/.github/workflows/pipe-it-2cluster.yml b/.github/workflows/pipe-it-2cluster.yml index ba36f00ceb29..4fa77bc40e5c 100644 --- a/.github/workflows/pipe-it-2cluster.yml +++ b/.github/workflows/pipe-it-2cluster.yml @@ -32,44 +32,6 @@ env: DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} jobs: - subscription-arch-verification: - strategy: - fail-fast: false - max-parallel: 15 - matrix: - java: [ 17 ] - # StrongConsistencyClusterMode is ignored now because RatisConsensus has not been supported yet. - cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] - cluster2: [ ScalableSingleNodeMode ] - os: [ ubuntu-latest ] - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v4 - - name: Set up JDK ${{ matrix.java }} - uses: actions/setup-java@v4 - with: - distribution: liberica - java-version: ${{ matrix.java }} - - name: IT Test - shell: bash - # we do not compile client-cpp for saving time, it is tested in client.yml - # we can skip influxdb-protocol because it has been tested separately in influxdb-protocol.yml - run: | - mvn clean verify \ - -P with-integration-tests \ - -DskipUTs \ - -DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256 -DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \ - -DClusterConfigurations=${{ matrix.cluster1 }},${{ matrix.cluster2 }} \ - -pl integration-test \ - -am -PMultiClusterIT2SubscriptionArchVerification \ - -ntp - - name: Upload Artifact - if: failure() - uses: actions/upload-artifact@v4 - with: - name: cluster-log-subscription-java${{ matrix.java }}-${{ runner.os }}-${{ matrix.cluster1 }}-${{ matrix.cluster2 }} - path: integration-test/target/cluster-logs - retention-days: 30 subscription-regression-consumer: strategy: fail-fast: false @@ -77,7 +39,7 @@ jobs: matrix: java: [ 17 ] # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] + cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, PipeConsensusBatchMode, PipeConsensusStreamMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] cluster2: [ ScalableSingleNodeMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} @@ -115,7 +77,7 @@ jobs: matrix: java: [ 17 ] # do not use HighPerformanceMode here, otherwise some tests will cause the GH runner to receive a shutdown signal - cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] + cluster1: [ PipeConsensusBatchMode, PipeConsensusStreamMode, PipeConsensusBatchMode, PipeConsensusStreamMode, PipeConsensusBatchMode, PipeConsensusStreamMode ] cluster2: [ ScalableSingleNodeMode ] os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java index 8fb2872bfea8..1a7bf4b80014 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java @@ -28,7 +28,7 @@ public class IoTDBSubscriptionITConstant { private static final long AWAITILITY_POLL_DELAY_SECOND = 1L; private static final long AWAITILITY_POLL_INTERVAL_SECOND = 1L; - private static final long AWAITILITY_AT_MOST_SECOND = 600L; + private static final long AWAITILITY_AT_MOST_SECOND = 900L; public static final ConditionFactory AWAIT = Awaitility.await() diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java index 38f500d8249a..7be1b37b5c4c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java @@ -90,6 +90,7 @@ private void insert_data(long timestamp, String device) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java index 781670cd32c4..66be3a5d0f8e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java @@ -107,6 +107,7 @@ private void insert_data(long timestamp, String device) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/mix/IoTDBPushConsumerPullConsumerWith1TopicShareProcessMixIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/mix/IoTDBPushConsumerPullConsumerWith1TopicShareProcessMixIT.java index a78034853804..585daf1b89c5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/mix/IoTDBPushConsumerPullConsumerWith1TopicShareProcessMixIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/mix/IoTDBPushConsumerPullConsumerWith1TopicShareProcessMixIT.java @@ -112,6 +112,7 @@ private void insert_data(long timestamp) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java index 8aa334727e72..b1fc2cc6d383 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBTestParamPullConsumerIT.java @@ -127,6 +127,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } private void run_single(SubscriptionPullConsumer consumer, int index) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/autocommit/IoTDBTestAutoCommitFalseDataSetPullConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/autocommit/IoTDBTestAutoCommitFalseDataSetPullConsumerIT.java index 4a0b889dd85e..09f6edf66f9b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/autocommit/IoTDBTestAutoCommitFalseDataSetPullConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/autocommit/IoTDBTestAutoCommitFalseDataSetPullConsumerIT.java @@ -117,6 +117,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } private void consume_data_noCommit(SubscriptionPullConsumer consumer, Session session) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/autocommit/IoTDBTestAutoCommitTrueDataSetPullConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/autocommit/IoTDBTestAutoCommitTrueDataSetPullConsumerIT.java index 83b7112d795c..75cec56ce3a5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/autocommit/IoTDBTestAutoCommitTrueDataSetPullConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/autocommit/IoTDBTestAutoCommitTrueDataSetPullConsumerIT.java @@ -111,6 +111,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } private void consume_data_noCommit(SubscriptionPullConsumer consumer, Session session) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsDatasetPullConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsDatasetPullConsumerIT.java index 28ee1a7fb56a..18e332a6cdaf 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsDatasetPullConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBAllTsDatasetPullConsumerIT.java @@ -122,6 +122,7 @@ private void insert_data(long timestamp, String device) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceDataSetPullConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceDataSetPullConsumerIT.java index a2df21515189..87b1ed2cc787 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceDataSetPullConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBPathDeviceDataSetPullConsumerIT.java @@ -123,6 +123,7 @@ private void insert_data(long timestamp, String device) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsDatasetPullConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsDatasetPullConsumerIT.java index 8fc036774306..487eb300f7ec 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsDatasetPullConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/loose_range/IoTDBTimeTsDatasetPullConsumerIT.java @@ -123,6 +123,7 @@ private void insert_data(long timestamp, String device) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java index 90f7667556ec..aa69b2350d30 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java @@ -111,6 +111,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBConsumer2With1TopicShareProcessDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBConsumer2With1TopicShareProcessDataSetIT.java index 2eb292cd5ee0..0dcb2e3d154e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBConsumer2With1TopicShareProcessDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBConsumer2With1TopicShareProcessDataSetIT.java @@ -104,6 +104,7 @@ private void insert_data(long timestamp) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsDatasetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsDatasetIT.java index 3f5d9ee0d73d..9236fe001f02 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsDatasetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsDatasetIT.java @@ -106,6 +106,7 @@ private void insert_data(long timestamp) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumerDataSetIT.java index e4dc6ecfd99a..8c18ffa64e29 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDBPatternPullConsumerDataSetIT.java @@ -113,6 +113,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDefaultPatternPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDefaultPatternPullConsumerDataSetIT.java index 8ca5ca9f4512..4a967039477c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDefaultPatternPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDefaultPatternPullConsumerDataSetIT.java @@ -108,6 +108,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java index 855be595bfa9..f5af56f4f207 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java @@ -115,6 +115,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatch2PatternPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatch2PatternPullConsumerDataSetIT.java index 4c00ccd19f08..3ac7152f164f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatch2PatternPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatch2PatternPullConsumerDataSetIT.java @@ -118,6 +118,7 @@ private void insert_data(long timestamp, String device) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumerDataSetIT.java index f4f1c5dd84fd..5de6dbdc5287 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumerDataSetIT.java @@ -111,6 +111,7 @@ private void insert_data(long timestamp) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumerDataSetIT.java index 4cd80a64fb4c..adc8307697be 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumerDataSetIT.java @@ -95,6 +95,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); Thread.sleep(1000); } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBAllPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBAllPullConsumerDataSetIT.java index 66c08d5ad3f7..f6cb23666e78 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBAllPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBAllPullConsumerDataSetIT.java @@ -95,6 +95,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBHistoryPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBHistoryPullConsumerDataSetIT.java index 6b4df3216112..6b286695ea79 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBHistoryPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBHistoryPullConsumerDataSetIT.java @@ -99,6 +99,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBRealTimePullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBRealTimePullConsumerDataSetIT.java index 3a419f684022..fb6cd2a2518d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBRealTimePullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBRealTimePullConsumerDataSetIT.java @@ -99,6 +99,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBTimeRangeAccuratePullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBTimeRangeAccuratePullConsumerDataSetIT.java index 4ced5edb360d..81418adb93fd 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBTimeRangeAccuratePullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBTimeRangeAccuratePullConsumerDataSetIT.java @@ -95,6 +95,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBTimeRangePullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBTimeRangePullConsumerDataSetIT.java index 91bc8bc1c39a..870dc0fa2d73 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBTimeRangePullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/time/IoTDBTimeRangePullConsumerDataSetIT.java @@ -98,6 +98,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsDatasetPushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsDatasetPushConsumerIT.java index d166a44d7bbf..7e8aca80cb7d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsDatasetPushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeLooseTsDatasetPushConsumerIT.java @@ -135,6 +135,7 @@ private void insert_data(long timestamp, String device) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeTsLooseDatasetPushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeTsLooseDatasetPushConsumerIT.java index 9e7d4a4ef505..09e024df931e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeTsLooseDatasetPushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/loose_range/IoTDBTimeTsLooseDatasetPushConsumerIT.java @@ -135,6 +135,7 @@ private void insert_data(long timestamp, String device) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBConsumer2With1TopicShareProcessDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBConsumer2With1TopicShareProcessDataSetIT.java index 0e9ccaab0ad1..26cccb358083 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBConsumer2With1TopicShareProcessDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBConsumer2With1TopicShareProcessDataSetIT.java @@ -116,6 +116,7 @@ private void insert_data(long timestamp) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsDatasetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsDatasetIT.java index 2936b7e4bd0a..0ef39e64f394 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsDatasetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBOneConsumerMultiTopicsDatasetIT.java @@ -125,6 +125,7 @@ private void insert_data(long timestamp, String device) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBDatasetPushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBDatasetPushConsumerIT.java index ec4ad033733d..e3cb2c4d4337 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBDatasetPushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBRealTimeDBDatasetPushConsumerIT.java @@ -102,6 +102,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeAccurateDBDataSetPushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeAccurateDBDataSetPushConsumerIT.java index d7aea65ae189..9b6a2c358725 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeAccurateDBDataSetPushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeAccurateDBDataSetPushConsumerIT.java @@ -104,6 +104,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBDataSetPushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBDataSetPushConsumerIT.java index 4c9f26c3892c..f83f157c9e95 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBDataSetPushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/time/IoTDBTimeRangeDBDataSetPushConsumerIT.java @@ -106,6 +106,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java index 5526b40835f9..8bfe9d408253 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBDataSet1TopicConsumerSpecialIT.java @@ -97,6 +97,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBTestTopicNameIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBTestTopicNameIT.java index bf57794a6c32..50ad8a6152cb 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBTestTopicNameIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/topic/IoTDBTestTopicNameIT.java @@ -100,6 +100,7 @@ private void insert_data(long timestamp) timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); Thread.sleep(1000); } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/user/IoTDBOtherUserConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/user/IoTDBOtherUserConsumerIT.java index 220bff22e220..b440b41c0025 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/user/IoTDBOtherUserConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/user/IoTDBOtherUserConsumerIT.java @@ -102,6 +102,7 @@ private void insert_data(long timestamp) timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); Thread.sleep(1000); } From 64102f7b5360b5b65895e1c055982495b5674307 Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Fri, 22 Nov 2024 01:19:52 +0800 Subject: [PATCH 08/14] changes by admin --- .../subscription/it/IoTDBSubscriptionITConstant.java | 2 +- .../subtask/connector/PipeConnectorSubtaskLifeCycle.java | 4 ++-- .../subtask/SubscriptionConnectorSubtaskLifeCycle.java | 8 +------- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java index 1a7bf4b80014..03cb5f145752 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java @@ -40,5 +40,5 @@ public class IoTDBSubscriptionITConstant { public static final long SLEEP_NS = 1_000_000_000L; public static final long POLL_TIMEOUT_MS = 10_000L; - public static final int MAX_RETRY_TIMES = 3; + public static final int MAX_RETRY_TIMES = 60; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java index 14d4f604c22c..ecbbc641e4b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java @@ -34,8 +34,8 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable { protected final PipeConnectorSubtask subtask; private final UnboundedBlockingPendingQueue pendingQueue; - private int runningTaskCount; - private int registeredTaskCount; + protected int runningTaskCount; + protected int registeredTaskCount; public PipeConnectorSubtaskLifeCycle( PipeConnectorSubtaskExecutor executor, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java index 57fb2c7004de..359690fa2729 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java @@ -34,17 +34,11 @@ public class SubscriptionConnectorSubtaskLifeCycle extends PipeConnectorSubtaskL private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConnectorSubtaskLifeCycle.class); - private int runningTaskCount; - private int registeredTaskCount; - public SubscriptionConnectorSubtaskLifeCycle( final PipeConnectorSubtaskExecutor executor, // SubscriptionSubtaskExecutor final PipeConnectorSubtask subtask, // SubscriptionConnectorSubtask final UnboundedBlockingPendingQueue pendingQueue) { super(executor, subtask, pendingQueue); - - runningTaskCount = 0; - registeredTaskCount = 0; } @Override @@ -69,7 +63,7 @@ public synchronized void register() { } @Override - public synchronized boolean deregister(final String ignored, int regionId) { + public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } From a96c67f64ab318670182ea5514bb907604c41e92 Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Fri, 22 Nov 2024 13:31:12 +0800 Subject: [PATCH 09/14] fixup --- .../it/IoTDBSubscriptionITConstant.java | 4 ++-- .../IoTDBDefaultTsfilePushConsumerIT.java | 18 ++++++++---------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java index 03cb5f145752..8fb2872bfea8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java @@ -28,7 +28,7 @@ public class IoTDBSubscriptionITConstant { private static final long AWAITILITY_POLL_DELAY_SECOND = 1L; private static final long AWAITILITY_POLL_INTERVAL_SECOND = 1L; - private static final long AWAITILITY_AT_MOST_SECOND = 900L; + private static final long AWAITILITY_AT_MOST_SECOND = 600L; public static final ConditionFactory AWAIT = Awaitility.await() @@ -40,5 +40,5 @@ public class IoTDBSubscriptionITConstant { public static final long SLEEP_NS = 1_000_000_000L; public static final long POLL_TIMEOUT_MS = 10_000L; - public static final int MAX_RETRY_TIMES = 60; + public static final int MAX_RETRY_TIMES = 3; } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java index 66be3a5d0f8e..9794ea255e0a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java @@ -32,7 +32,6 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.TsFileReader; import org.apache.tsfile.read.common.Path; -import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.read.expression.QueryExpression; import org.apache.tsfile.read.query.dataset.QueryDataSet; import org.apache.tsfile.write.record.Tablet; @@ -48,7 +47,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT; @@ -129,9 +130,9 @@ public void do_test() } session_src.executeNonQueryStatement("flush"); final AtomicInteger onReceiveCount = new AtomicInteger(0); - List rowCounts = new ArrayList<>(deviceCount); + List> rowCounts = new ArrayList<>(deviceCount); for (int i = 0; i < deviceCount; i++) { - rowCounts.add(new AtomicInteger(0)); + rowCounts.add(new HashSet<>()); } consumer = new SubscriptionPushConsumer.Builder() @@ -154,17 +155,14 @@ public void do_test() QueryExpression.create( Collections.singletonList(paths.get(i)), null)); while (dataset.hasNext()) { - rowCounts.get(i).addAndGet(1); - RowRecord next = dataset.next(); - // System.out.println(format.format(new - // Date())+" "+next.getTimestamp()+","+next.getFields()); + rowCounts.get(i).add(dataset.next().getTimestamp()); } System.out.println( FORMAT.format(new Date()) + " rowCounts_" + i + ":" - + rowCounts.get(i).get()); + + rowCounts.get(i).size()); } } catch (IOException e) { throw new RuntimeException(e); @@ -192,7 +190,7 @@ public void do_test() AWAIT.untilAsserted( () -> { for (int i = 0; i < deviceCount; i++) { - assertEquals(rowCounts.get(i).get(), 10, devices.get(i) + ".s_0"); + assertEquals(rowCounts.get(i).size(), 10, devices.get(i) + ".s_0"); } }); // Unsubscribe @@ -217,7 +215,7 @@ public void do_test() AWAIT.untilAsserted( () -> { for (int i = 0; i < deviceCount; i++) { - assertEquals(rowCounts.get(i).get(), 25, devices.get(i) + ".s_0"); + assertEquals(rowCounts.get(i).size(), 25, devices.get(i) + ".s_0"); } }); System.out.println(FORMAT.format(new Date()) + " onReceived: " + onReceiveCount.get()); From f413b39fefef75acf413322988fb26f980ec43c1 Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Fri, 22 Nov 2024 13:38:03 +0800 Subject: [PATCH 10/14] revert --- .../IoTDBDefaultTsfilePushConsumerIT.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java index 9794ea255e0a..66be3a5d0f8e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java @@ -32,6 +32,7 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.TsFileReader; import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.read.expression.QueryExpression; import org.apache.tsfile.read.query.dataset.QueryDataSet; import org.apache.tsfile.write.record.Tablet; @@ -47,9 +48,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT; @@ -130,9 +129,9 @@ public void do_test() } session_src.executeNonQueryStatement("flush"); final AtomicInteger onReceiveCount = new AtomicInteger(0); - List> rowCounts = new ArrayList<>(deviceCount); + List rowCounts = new ArrayList<>(deviceCount); for (int i = 0; i < deviceCount; i++) { - rowCounts.add(new HashSet<>()); + rowCounts.add(new AtomicInteger(0)); } consumer = new SubscriptionPushConsumer.Builder() @@ -155,14 +154,17 @@ public void do_test() QueryExpression.create( Collections.singletonList(paths.get(i)), null)); while (dataset.hasNext()) { - rowCounts.get(i).add(dataset.next().getTimestamp()); + rowCounts.get(i).addAndGet(1); + RowRecord next = dataset.next(); + // System.out.println(format.format(new + // Date())+" "+next.getTimestamp()+","+next.getFields()); } System.out.println( FORMAT.format(new Date()) + " rowCounts_" + i + ":" - + rowCounts.get(i).size()); + + rowCounts.get(i).get()); } } catch (IOException e) { throw new RuntimeException(e); @@ -190,7 +192,7 @@ public void do_test() AWAIT.untilAsserted( () -> { for (int i = 0; i < deviceCount; i++) { - assertEquals(rowCounts.get(i).size(), 10, devices.get(i) + ".s_0"); + assertEquals(rowCounts.get(i).get(), 10, devices.get(i) + ".s_0"); } }); // Unsubscribe @@ -215,7 +217,7 @@ public void do_test() AWAIT.untilAsserted( () -> { for (int i = 0; i < deviceCount; i++) { - assertEquals(rowCounts.get(i).size(), 25, devices.get(i) + ".s_0"); + assertEquals(rowCounts.get(i).get(), 25, devices.get(i) + ".s_0"); } }); System.out.println(FORMAT.format(new Date()) + " onReceived: " + onReceiveCount.get()); From 26f7c7a08d31e16150edf38e04b514b41a43fde7 Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Fri, 22 Nov 2024 14:15:16 +0800 Subject: [PATCH 11/14] changes by admin --- .../it/IoTDBSubscriptionITConstant.java | 2 +- .../AbstractSubscriptionRegressionIT.java | 9 ++++++--- .../IoTDBDefaultTsfilePushConsumerIT.java | 6 ++++-- ...BConsumer2With1TopicShareProcessTsfileIT.java | 16 +++++++++------- .../IoTDBOneConsumerMultiTopicsTsfileIT.java | 4 +++- 5 files changed, 23 insertions(+), 14 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java index 8fb2872bfea8..030dbf22232c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java @@ -40,5 +40,5 @@ public class IoTDBSubscriptionITConstant { public static final long SLEEP_NS = 1_000_000_000L; public static final long POLL_TIMEOUT_MS = 10_000L; - public static final int MAX_RETRY_TIMES = 3; + public static final int MAX_RETRY_TIMES = 6; } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java index 239c30834c57..1d0ecb61569c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java @@ -294,6 +294,7 @@ public void consume_data(SubscriptionPullConsumer consumer, Session session) List messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); if (messages.isEmpty()) { retryCount++; + session_src.executeNonQueryStatement("flush"); if (retryCount >= MAX_RETRY_TIMES) { break; } @@ -312,17 +313,18 @@ public void consume_data(SubscriptionPullConsumer consumer, Session session) } public List consume_tsfile_withFileCount( - SubscriptionPullConsumer consumer, String device) throws InterruptedException { + SubscriptionPullConsumer consumer, String device) + throws InterruptedException, IoTDBConnectionException, StatementExecutionException { return consume_tsfile(consumer, Collections.singletonList(device)); } public int consume_tsfile(SubscriptionPullConsumer consumer, String device) - throws InterruptedException { + throws InterruptedException, IoTDBConnectionException, StatementExecutionException { return consume_tsfile(consumer, Collections.singletonList(device)).get(0); } public List consume_tsfile(SubscriptionPullConsumer consumer, List devices) - throws InterruptedException { + throws InterruptedException, IoTDBConnectionException, StatementExecutionException { List rowCounts = new ArrayList<>(devices.size()); for (int i = 0; i < devices.size(); i++) { rowCounts.add(new AtomicInteger(0)); @@ -336,6 +338,7 @@ public List consume_tsfile(SubscriptionPullConsumer consumer, List messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); if (messages.isEmpty()) { retryCount++; + session_src.executeNonQueryStatement("flush"); if (retryCount >= MAX_RETRY_TIMES) { break; } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java index 66be3a5d0f8e..916db8bdbe89 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java @@ -192,7 +192,8 @@ public void do_test() AWAIT.untilAsserted( () -> { for (int i = 0; i < deviceCount; i++) { - assertEquals(rowCounts.get(i).get(), 10, devices.get(i) + ".s_0"); + // NOTE: consider leader change + assertGte(rowCounts.get(i).get(), 10, devices.get(i) + ".s_0"); } }); // Unsubscribe @@ -217,7 +218,8 @@ public void do_test() AWAIT.untilAsserted( () -> { for (int i = 0; i < deviceCount; i++) { - assertEquals(rowCounts.get(i).get(), 25, devices.get(i) + ".s_0"); + // NOTE: consider leader change + assertGte(rowCounts.get(i).get(), 25, devices.get(i) + ".s_0"); } }); System.out.println(FORMAT.format(new Date()) + " onReceived: " + onReceiveCount.get()); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBConsumer2With1TopicShareProcessTsfileIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBConsumer2With1TopicShareProcessTsfileIT.java index 11f36939f8f4..fab4f43b1381 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBConsumer2With1TopicShareProcessTsfileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBConsumer2With1TopicShareProcessTsfileIT.java @@ -140,11 +140,9 @@ public void do_test() try { insert_data(1706659200000L); Thread.sleep(1000); - } catch (IoTDBConnectionException e) { - throw new RuntimeException(e); - } catch (StatementExecutionException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { + } catch (IoTDBConnectionException + | StatementExecutionException + | InterruptedException e) { throw new RuntimeException(e); } } @@ -156,7 +154,9 @@ public void do_test() () -> { try { rowCount1.addAndGet(consume_tsfile(consumer, device)); - } catch (InterruptedException e) { + } catch (InterruptedException + | IoTDBConnectionException + | StatementExecutionException e) { throw new RuntimeException(e); } }); @@ -165,7 +165,9 @@ public void do_test() () -> { try { rowCount2.addAndGet(consume_tsfile(consumer2, device)); - } catch (InterruptedException e) { + } catch (InterruptedException + | IoTDBConnectionException + | StatementExecutionException e) { throw new RuntimeException(e); } }); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java index 8bda5795c97b..9b8c74ebfaff 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java @@ -150,7 +150,9 @@ public void do_test() System.out.println(results); rowCount.addAndGet(results.get(0)); rowCount.addAndGet(results.get(1)); - } catch (InterruptedException e) { + } catch (InterruptedException + | IoTDBConnectionException + | StatementExecutionException e) { throw new RuntimeException(e); } }); From 18dfe6dbf421b1042fc41816f14ff6167a720370 Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Fri, 22 Nov 2024 22:18:40 +0800 Subject: [PATCH 12/14] intro consume_xxx_await --- .../it/IoTDBSubscriptionITConstant.java | 2 +- .../AbstractSubscriptionRegressionIT.java | 68 +++++++++++++++++++ ...TDBDevicePatternPullConsumerDataSetIT.java | 31 ++++++--- ...MiddleMatchPatternPullConsumeTsfileIT.java | 18 ++--- 4 files changed, 95 insertions(+), 24 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java index 030dbf22232c..8fb2872bfea8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java @@ -40,5 +40,5 @@ public class IoTDBSubscriptionITConstant { public static final long SLEEP_NS = 1_000_000_000L; public static final long POLL_TIMEOUT_MS = 10_000L; - public static final int MAX_RETRY_TIMES = 6; + public static final int MAX_RETRY_TIMES = 3; } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java index 1d0ecb61569c..4a4c4b665d03 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java @@ -57,6 +57,7 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT; import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.MAX_RETRY_TIMES; import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS; @@ -385,6 +386,73 @@ public void consume_data(SubscriptionPullConsumer consumer) consume_data(consumer, session_dest); } + @FunctionalInterface + public interface WrappedVoidSupplier { + void get() throws Throwable; + } + + public void consume_data_await( + SubscriptionPullConsumer consumer, Session session, List assertions) { + AWAIT.untilAsserted( + () -> { + List messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); + if (messages.isEmpty()) { + session_src.executeNonQueryStatement("flush"); + } + for (final SubscriptionMessage message : messages) { + for (final Iterator it = message.getSessionDataSetsHandler().tabletIterator(); + it.hasNext(); ) { + final Tablet tablet = it.next(); + session.insertTablet(tablet); + } + } + if (!consumer.isAutoCommit()) { + consumer.commitSync(messages); + } + for (final WrappedVoidSupplier assertion : assertions) { + assertion.get(); + } + }); + } + + public void consume_tsfile_await( + SubscriptionPullConsumer consumer, List devices, List expected) { + final List counters = new ArrayList<>(devices.size()); + for (int i = 0; i < devices.size(); i++) { + counters.add(new AtomicInteger(0)); + } + AWAIT.untilAsserted( + () -> { + List messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); + if (messages.isEmpty()) { + session_src.executeNonQueryStatement("flush"); + } + for (final SubscriptionMessage message : messages) { + final SubscriptionTsFileHandler tsFileHandler = message.getTsFileHandler(); + try (final TsFileReader tsFileReader = tsFileHandler.openReader()) { + for (int i = 0; i < devices.size(); i++) { + final Path path = new Path(devices.get(i), "s_0", true); + final QueryDataSet dataSet = + tsFileReader.query( + QueryExpression.create(Collections.singletonList(path), null)); + while (dataSet.hasNext()) { + dataSet.next(); + counters.get(i).addAndGet(1); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (!consumer.isAutoCommit()) { + consumer.commitSync(messages); + } + for (int i = 0; i < devices.size(); i++) { + assertEquals(counters.get(i).get(), expected.get(i)); + } + }); + } + //////////////////////////// strict assertions //////////////////////////// public static void assertEquals(int actual, int expected) { diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java index f5af56f4f207..14e0c9ffcae4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; @RunWith(IoTDBTestRunner.class) @@ -133,13 +134,18 @@ public void do_test() assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after subscription"); insert_data(System.currentTimeMillis() - 30000L); // Consumption data - consume_data(consumer, session_dest); String sql = "select count(s_0) from " + device; - System.out.println("src: " + getCount(session_src, sql)); - check_count(8, sql, "Consumption data:" + pattern); - check_count(8, "select count(s_1) from " + device, "Consumption data: s_1"); - check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption data:d_1"); - check_count(0, "select count(s_0) from " + device2, "Consumption data:d_2"); + consume_data_await( + consumer, + session_dest, + Collections.singletonList( + () -> { + System.out.println("src: " + getCount(session_src, sql)); + check_count(8, sql, "Consumption data:" + pattern); + check_count(8, "select count(s_1) from " + device, "Consumption data: s_1"); + check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption data:d_1"); + check_count(0, "select count(s_0) from " + device2, "Consumption data:d_2"); + })); insert_data(System.currentTimeMillis()); // Unsubscribe consumer.unsubscribe(topicName); @@ -147,11 +153,16 @@ public void do_test() consumer.subscribe(topicName); assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after re-subscribing"); insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00 - System.out.println("src: " + getCount(session_src, sql)); // Consumption data: Progress is not retained after unsubscribing and then re-subscribing. Full // synchronization. - consume_data(consumer, session_dest); - check_count(12, "select count(s_0) from " + device, "consume data again:s_0"); - check_count(12, "select count(s_1) from " + device, "Consumption data: s_1"); + consume_data_await( + consumer, + session_dest, + Collections.singletonList( + () -> { + System.out.println("src: " + getCount(session_src, sql)); + check_count(12, "select count(s_0) from " + device, "consume data again:s_0"); + check_count(12, "select count(s_1) from " + device, "Consumption data: s_1"); + })); } } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java index 6351641276f5..5c9045ac0fc9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /*** @@ -150,28 +151,19 @@ public void do_test() devices.add(device); devices.add(device2); devices.add(database2 + ".d_2"); + consume_tsfile_await(consumer, devices, Arrays.asList(10, 1, 1)); - List rowCounts = consume_tsfile(consumer, devices); - assertEquals(rowCounts.get(0), 10); - assertEquals(rowCounts.get(1), 1); - assertEquals(rowCounts.get(2), 1); // Unsubscribe consumer.unsubscribe(topicName); assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after cancellation"); + // Subscribe and then write data consumer.subscribe(topicName); assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after re-subscribing"); insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00 + // Consumption data: Progress is not retained after canceling and re-subscribing. Full // synchronization. - rowCounts = consume_tsfile(consumer, devices); - - assertEquals( - rowCounts.get(0), - 15, - "Unsubscribe and resubscribe, progress is not retained. Full synchronization."); - assertEquals( - rowCounts.get(1), 1, "Cancel subscription and subscribe again," + database + ".d_1"); - assertEquals(rowCounts.get(2), 1, "Unsubscribe and resubscribe," + database2 + ".d_2"); + consume_tsfile_await(consumer, devices, Arrays.asList(15, 1, 1)); } } From 8c2218e692ce8c38b5fa88d9a57b1c1c0ac07aec Mon Sep 17 00:00:00 2001 From: V_Galaxy Date: Sat, 23 Nov 2024 01:17:58 +0800 Subject: [PATCH 13/14] improve --- ...DBSnapshotDevicePullConsumerDataSetIT.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java index aa69b2350d30..56d326731121 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java @@ -19,6 +19,7 @@ package org.apache.iotdb.subscription.it.triple.regression.pullconsumer.mode; +import java.util.Collections; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionConsumer; import org.apache.iotdb.rpc.IoTDBConnectionException; @@ -131,13 +132,14 @@ public void do_test() Thread.sleep(1000); insert_data(System.currentTimeMillis() - 30000L); // Consumption data - consume_data(consumer, session_dest); - String sql = "select count(s_0) from " + device; - System.out.println("src: " + getCount(session_src, sql)); - check_count(4, sql, "Consumption data:" + pattern); - check_count(4, "select count(s_1) from " + device, "Consumption data: s_1"); - check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption Data: d_1"); - check_count(0, "select count(s_0) from " + database2 + ".d_2", "Consumption data:d_2"); + consume_data_await(consumer, session_dest, Collections.singletonList(() -> { + String sql = "select count(s_0) from " + device; + System.out.println("src: " + getCount(session_src, sql)); + check_count(4, sql, "Consumption data:" + pattern); + check_count(4, "select count(s_1) from " + device, "Consumption data: s_1"); + check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption Data: d_1"); + check_count(0, "select count(s_0) from " + database2 + ".d_2", "Consumption data:d_2"); + })); insert_data(System.currentTimeMillis()); // Unsubscribe consumer.unsubscribe(topicName); @@ -146,8 +148,11 @@ public void do_test() assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after re-subscribing"); // Consumption data: Progress is not retained after unsubscribing and re-subscribing. Full // synchronization. - consume_data(consumer, session_dest); - check_count(8, "select count(s_0) from " + device, "Consume data again:" + pattern); - check_count(8, "select count(s_1) from " + device, "Consumption data: s_1"); + consume_data_await(consumer, session_dest, Collections.singletonList(() -> { + String sql = "select count(s_0) from " + device; + System.out.println("src: " + getCount(session_src, sql)); + check_count(8, "select count(s_0) from " + device, "Consume data again:" + pattern); + check_count(8, "select count(s_1) from " + device, "Consumption data: s_1"); + })); } } From f1c7466269d339c44e25b87cfa7f38252b88d2eb Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Sat, 23 Nov 2024 01:47:42 +0800 Subject: [PATCH 14/14] spotless --- ...DBSnapshotDevicePullConsumerDataSetIT.java | 40 ++++++++++++------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java index 56d326731121..49b7fb7c7c16 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java @@ -19,7 +19,6 @@ package org.apache.iotdb.subscription.it.triple.regression.pullconsumer.mode; -import java.util.Collections; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionConsumer; import org.apache.iotdb.rpc.IoTDBConnectionException; @@ -43,6 +42,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; @RunWith(IoTDBTestRunner.class) @@ -132,14 +132,20 @@ public void do_test() Thread.sleep(1000); insert_data(System.currentTimeMillis() - 30000L); // Consumption data - consume_data_await(consumer, session_dest, Collections.singletonList(() -> { - String sql = "select count(s_0) from " + device; - System.out.println("src: " + getCount(session_src, sql)); - check_count(4, sql, "Consumption data:" + pattern); - check_count(4, "select count(s_1) from " + device, "Consumption data: s_1"); - check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption Data: d_1"); - check_count(0, "select count(s_0) from " + database2 + ".d_2", "Consumption data:d_2"); - })); + consume_data_await( + consumer, + session_dest, + Collections.singletonList( + () -> { + String sql = "select count(s_0) from " + device; + System.out.println("src: " + getCount(session_src, sql)); + check_count(4, sql, "Consumption data:" + pattern); + check_count(4, "select count(s_1) from " + device, "Consumption data: s_1"); + check_count( + 0, "select count(s_0) from " + database + ".d_1", "Consumption Data: d_1"); + check_count( + 0, "select count(s_0) from " + database2 + ".d_2", "Consumption data:d_2"); + })); insert_data(System.currentTimeMillis()); // Unsubscribe consumer.unsubscribe(topicName); @@ -148,11 +154,15 @@ public void do_test() assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after re-subscribing"); // Consumption data: Progress is not retained after unsubscribing and re-subscribing. Full // synchronization. - consume_data_await(consumer, session_dest, Collections.singletonList(() -> { - String sql = "select count(s_0) from " + device; - System.out.println("src: " + getCount(session_src, sql)); - check_count(8, "select count(s_0) from " + device, "Consume data again:" + pattern); - check_count(8, "select count(s_1) from " + device, "Consumption data: s_1"); - })); + consume_data_await( + consumer, + session_dest, + Collections.singletonList( + () -> { + String sql = "select count(s_0) from " + device; + System.out.println("src: " + getCount(session_src, sql)); + check_count(8, "select count(s_0) from " + device, "Consume data again:" + pattern); + check_count(8, "select count(s_1) from " + device, "Consumption data: s_1"); + })); } }