Skip to content

Commit

Permalink
Checkpoint records at an interval for TPS case when AckSet is enabled (
Browse files Browse the repository at this point in the history
…#4526)

* Checkpoint records at an interval for TPS case when AckSet is enabled

Signed-off-by: Dinu John <[email protected]>

* Fix regular checkpoint internal with ack received

Signed-off-by: Dinu John <[email protected]>

---------

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh authored May 11, 2024
1 parent 310e8be commit a217361
Showing 1 changed file with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

public class StreamAcknowledgementManager {
private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class);
private static final int CHECKPOINT_RECORD_INTERVAL = 50;
private final ConcurrentLinkedQueue<CheckpointStatus> checkpoints = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<String, CheckpointStatus> ackStatus = new ConcurrentHashMap<>();
private final AcknowledgementSetManager acknowledgementSetManager;
Expand Down Expand Up @@ -54,14 +55,22 @@ private void monitorAcknowledgment(final ExecutorService executorService, final
CheckpointStatus lastCheckpointStatus = null;
while (!Thread.currentThread().isInterrupted()) {
try {
final CheckpointStatus checkpointStatus = checkpoints.peek();
CheckpointStatus checkpointStatus = checkpoints.peek();
if (checkpointStatus != null) {
if (checkpointStatus.isAcknowledged()) {
lastCheckpointStatus = checkpoints.poll();
ackStatus.remove(checkpointStatus.getResumeToken());
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
long ackCount = 0;
do {
lastCheckpointStatus = checkpoints.poll();
ackStatus.remove(checkpointStatus.getResumeToken());
checkpointStatus = checkpoints.peek();
ackCount++;
// at high TPS each ack contains 100 records. This should checkpoint every 100*50 = 5000 records.
if (ackCount % CHECKPOINT_RECORD_INTERVAL == 0) {
checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
}
} while (checkpointStatus != null && checkpointStatus.isAcknowledged());
checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
lastCheckpointTime = System.currentTimeMillis();
}
} else {
Expand Down Expand Up @@ -100,6 +109,11 @@ private void monitorAcknowledgment(final ExecutorService executorService, final
executorService.shutdown();
}

private void checkpoint(final String resumeToken, final long recordCount) {
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", resumeToken, recordCount);
partitionCheckpoint.checkpoint(resumeToken, recordCount);
}

Optional<AcknowledgementSet> createAcknowledgementSet(final String resumeToken, final long recordNumber) {
if (!enableAcknowledgement) {
return Optional.empty();
Expand Down

0 comments on commit a217361

Please sign in to comment.