diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 20164ff2293c..32ead7587e37 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -1391,11 +1391,11 @@ private TPipeConsensusTransferResp onRequest( // Judge whether connector has rebooted or not, if the rebootTimes increases compared to // connectorRebootTimes, need to reset receiver because connector has been restarted. if (tCommitId.getDataNodeRebootTimes() > connectorRebootTimes) { - resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes()); + resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes(), condition); } // Similarly, check pipeTask restartTimes if (tCommitId.getPipeTaskRestartTimes() > pipeTaskRestartTimes) { - resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes()); + resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes(), condition); } // update metric if (isTransferTsFilePiece && !reqExecutionOrderBuffer.contains(requestMeta)) { @@ -1471,6 +1471,21 @@ private TPipeConsensusTransferResp onRequest( !condition.await( PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS, TimeUnit.MILLISECONDS); + // If some reqs find the buffer no longer contains their requestMeta after jumping out + // from condition.await, it may indicate that during their wait, some reqs with newer + // pipeTaskStartTimes or rebootTimes came in and refreshed the requestBuffer. In that + // cases we need to discard these requests. + if (!reqExecutionOrderBuffer.contains(requestMeta)) { + final TSStatus status = + new TSStatus( + RpcUtils.getStatus( + TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST, + "PipeConsensus receiver received a deprecated request, which may be sent before the connector restart or pipe task restart. Consider to discard it")); + LOGGER.info( + "PipeConsensus-PipeName-{}: received a deprecated request, which may be sent before the connector restart or pipe task restart. Consider to discard it", + consensusPipeName); + return new TPipeConsensusTransferResp(status); + } // If the buffer is not full after waiting timeout, we suppose that the sender will // not send any more events at this time, that is, the sender has sent all events. At // this point we apply the event at reqBuffer's peek @@ -1515,26 +1530,30 @@ private TPipeConsensusTransferResp onRequest( * Reset all data to initial status and set connectorRebootTimes properly. This method is called * when receiver identifies connector has rebooted. */ - private void resetWithNewestRebootTime(int connectorRebootTimes) { + private void resetWithNewestRebootTime(int connectorRebootTimes, Condition condition) { LOGGER.info( "PipeConsensus-PipeName-{}: receiver detected an newer rebootTimes, which indicates the leader has rebooted. receiver will reset all its data.", consensusPipeName); // since pipe task will resend all data that hasn't synchronized after dataNode reboots, it's // safe to clear all events in buffer. clear(); + // signal all deprecated requests that may wait on condition to expire them + condition.signalAll(); // sync the follower's connectorRebootTimes with connector's actual rebootTimes. this.connectorRebootTimes = connectorRebootTimes; // Note: dataNode rebooting will reset pipeTaskRestartTimes. this.pipeTaskRestartTimes = 0; } - private void resetWithNewestRestartTime(int pipeTaskRestartTimes) { + private void resetWithNewestRestartTime(int pipeTaskRestartTimes, Condition condition) { LOGGER.info( "PipeConsensus-PipeName-{}: receiver detected an newer pipeTaskRestartTimes, which indicates the pipe task has restarted. receiver will reset all its data.", consensusPipeName); // since pipe task will resend all data that hasn't synchronized after restarts, it's safe to // clear all events in buffer. clear(); + // signal all deprecated requests that may wait on condition to expire them + condition.signalAll(); this.pipeTaskRestartTimes = pipeTaskRestartTimes; }