Skip to content

Commit

Permalink
IoTConsensusV2: Fix reqBuffer reset and receive concurrent bug (#14215)
Browse files Browse the repository at this point in the history
* fix reqBuffer reset and receive concurrent bug

* fix review
  • Loading branch information
Pengzna authored Nov 28, 2024
1 parent 0d8a0d1 commit 0bb51c6
Showing 1 changed file with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1391,11 +1391,11 @@ private TPipeConsensusTransferResp onRequest(
// Judge whether connector has rebooted or not, if the rebootTimes increases compared to
// connectorRebootTimes, need to reset receiver because connector has been restarted.
if (tCommitId.getDataNodeRebootTimes() > connectorRebootTimes) {
resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes());
resetWithNewestRebootTime(tCommitId.getDataNodeRebootTimes(), condition);
}
// Similarly, check pipeTask restartTimes
if (tCommitId.getPipeTaskRestartTimes() > pipeTaskRestartTimes) {
resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes());
resetWithNewestRestartTime(tCommitId.getPipeTaskRestartTimes(), condition);
}
// update metric
if (isTransferTsFilePiece && !reqExecutionOrderBuffer.contains(requestMeta)) {
Expand Down Expand Up @@ -1471,6 +1471,21 @@ private TPipeConsensusTransferResp onRequest(
!condition.await(
PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS, TimeUnit.MILLISECONDS);

// If some reqs find the buffer no longer contains their requestMeta after jumping out
// from condition.await, it may indicate that during their wait, some reqs with newer
// pipeTaskStartTimes or rebootTimes came in and refreshed the requestBuffer. In that
// cases we need to discard these requests.
if (!reqExecutionOrderBuffer.contains(requestMeta)) {
final TSStatus status =
new TSStatus(
RpcUtils.getStatus(
TSStatusCode.PIPE_CONSENSUS_DEPRECATED_REQUEST,
"PipeConsensus receiver received a deprecated request, which may be sent before the connector restart or pipe task restart. Consider to discard it"));
LOGGER.info(
"PipeConsensus-PipeName-{}: received a deprecated request, which may be sent before the connector restart or pipe task restart. Consider to discard it",
consensusPipeName);
return new TPipeConsensusTransferResp(status);
}
// If the buffer is not full after waiting timeout, we suppose that the sender will
// not send any more events at this time, that is, the sender has sent all events. At
// this point we apply the event at reqBuffer's peek
Expand Down Expand Up @@ -1515,26 +1530,30 @@ private TPipeConsensusTransferResp onRequest(
* Reset all data to initial status and set connectorRebootTimes properly. This method is called
* when receiver identifies connector has rebooted.
*/
private void resetWithNewestRebootTime(int connectorRebootTimes) {
private void resetWithNewestRebootTime(int connectorRebootTimes, Condition condition) {
LOGGER.info(
"PipeConsensus-PipeName-{}: receiver detected an newer rebootTimes, which indicates the leader has rebooted. receiver will reset all its data.",
consensusPipeName);
// since pipe task will resend all data that hasn't synchronized after dataNode reboots, it's
// safe to clear all events in buffer.
clear();
// signal all deprecated requests that may wait on condition to expire them
condition.signalAll();
// sync the follower's connectorRebootTimes with connector's actual rebootTimes.
this.connectorRebootTimes = connectorRebootTimes;
// Note: dataNode rebooting will reset pipeTaskRestartTimes.
this.pipeTaskRestartTimes = 0;
}

private void resetWithNewestRestartTime(int pipeTaskRestartTimes) {
private void resetWithNewestRestartTime(int pipeTaskRestartTimes, Condition condition) {
LOGGER.info(
"PipeConsensus-PipeName-{}: receiver detected an newer pipeTaskRestartTimes, which indicates the pipe task has restarted. receiver will reset all its data.",
consensusPipeName);
// since pipe task will resend all data that hasn't synchronized after restarts, it's safe to
// clear all events in buffer.
clear();
// signal all deprecated requests that may wait on condition to expire them
condition.signalAll();
this.pipeTaskRestartTimes = pipeTaskRestartTimes;
}

Expand Down

0 comments on commit 0bb51c6

Please sign in to comment.