Skip to content

Commit

Permalink
IoTConsensusV2: Fix syncLag error when using steam mode and 2+ replic…
Browse files Browse the repository at this point in the history
…as (#14219)
  • Loading branch information
Pengzna authored Nov 28, 2024
1 parent f15c30b commit 0d8a0d1
Showing 1 changed file with 39 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@

import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeConnector;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;

/**
* This class is used to aggregate the write progress of all Connectors to calculate the minimum
Expand All @@ -33,66 +34,55 @@
* <p>Note: every consensusGroup/dataRegion has and only has 1 instance of this class.
*/
public class PipeConsensusSyncLagManager {
long userWriteProgress = 0;
long minReplicateProgress = Long.MAX_VALUE;
List<ConsensusPipeConnector> consensusPipeConnectorList = new CopyOnWriteArrayList<>();

private void updateReplicateProgress() {
minReplicateProgress = Long.MAX_VALUE;
// if there isn't a consensus pipe task, replicate progress is Long.MAX_VALUE.
if (consensusPipeConnectorList.isEmpty()) {
return;
}
// else we find the minimum progress in all consensus pipe task.
consensusPipeConnectorList.forEach(
consensusPipeConnector ->
minReplicateProgress =
Math.min(
minReplicateProgress,
consensusPipeConnector.getConsensusPipeReplicateProgress()));
long syncLag = Long.MIN_VALUE;
ReentrantLock lock = new ReentrantLock();
List<ConsensusPipeConnector> consensusPipeConnectorList = new ArrayList<>();

private long getSyncLagForSpecificConsensusPipe(ConsensusPipeConnector consensusPipeConnector) {
long userWriteProgress = consensusPipeConnector.getConsensusPipeCommitProgress();
long replicateProgress = consensusPipeConnector.getConsensusPipeReplicateProgress();
return Math.max(userWriteProgress - replicateProgress, 0);
}

private void updateUserWriteProgress() {
// if there isn't a consensus pipe task, user write progress is 0.
if (consensusPipeConnectorList.isEmpty()) {
userWriteProgress = 0;
return;
}
// since the user write progress of different consensus pipes on the same DataRegion is the
// same, we only need to take out one Connector to calculate
public void addConsensusPipeConnector(ConsensusPipeConnector consensusPipeConnector) {
try {
ConsensusPipeConnector connector = consensusPipeConnectorList.get(0);
userWriteProgress = connector.getConsensusPipeCommitProgress();
} catch (Exception e) {
// if removing the last connector happens after empty check, we may encounter
// OutOfBoundsException, in this case, we set userWriteProgress to 0.
userWriteProgress = 0;
lock.lock();
consensusPipeConnectorList.add(consensusPipeConnector);
} finally {
lock.unlock();
}
}

public void addConsensusPipeConnector(ConsensusPipeConnector consensusPipeConnector) {
consensusPipeConnectorList.add(consensusPipeConnector);
}

public void removeConsensusPipeConnector(ConsensusPipeConnector connector) {
consensusPipeConnectorList.remove(connector);
try {
lock.lock();
consensusPipeConnectorList.remove(connector);
} finally {
lock.unlock();
}
}

/**
* SyncLag represents the difference between the current replica users' write progress and the
* minimum synchronization progress of all other replicas. The semantics is how much data the
* leader has left to synchronize.
* SyncLag represents the biggest difference between the current replica users' write progress and
* the synchronization progress of all other replicas. The semantics is how much data the leader
* has left to synchronize.
*/
public long calculateSyncLag() {
updateUserWriteProgress();
updateReplicateProgress();
// if there isn't a consensus pipe task, the syncLag is userWriteProgress - 0
if (minReplicateProgress == Long.MAX_VALUE) {
return userWriteProgress;
} else {
// since we first update userWriteProgress then update replicateProgress, there may have some
// cases that userWriteProgress is less than replicateProgress. In these cases, we return 0.
return Math.max(userWriteProgress - minReplicateProgress, 0);
try {
lock.lock();
// if there isn't a consensus pipe task, the syncLag is 0
if (consensusPipeConnectorList.isEmpty()) {
return 0;
}
// else we find the biggest gap between leader and replicas in all consensus pipe task.
syncLag = Long.MIN_VALUE;
consensusPipeConnectorList.forEach(
consensusPipeConnector ->
syncLag =
Math.max(syncLag, getSyncLagForSpecificConsensusPipe(consensusPipeConnector)));
return syncLag;
} finally {
lock.unlock();
}
}

Expand Down

0 comments on commit 0d8a0d1

Please sign in to comment.