From a217361aa6a442117ccb3583b9754a20223a7b3d Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Sat, 11 May 2024 08:27:29 -0500 Subject: [PATCH] Checkpoint records at an interval for TPS case when AckSet is enabled (#4526) * Checkpoint records at an interval for TPS case when AckSet is enabled Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> * Fix regular checkpoint internal with ack received Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --------- Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../stream/StreamAcknowledgementManager.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java index 8e572cae7d..ca8c51be0c 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java @@ -19,6 +19,7 @@ public class StreamAcknowledgementManager { private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class); + private static final int CHECKPOINT_RECORD_INTERVAL = 50; private final ConcurrentLinkedQueue checkpoints = new ConcurrentLinkedQueue<>(); private final ConcurrentHashMap ackStatus = new ConcurrentHashMap<>(); private final AcknowledgementSetManager acknowledgementSetManager; @@ -54,14 +55,22 @@ private void monitorAcknowledgment(final ExecutorService executorService, final CheckpointStatus lastCheckpointStatus = null; while (!Thread.currentThread().isInterrupted()) { try { - final CheckpointStatus checkpointStatus = checkpoints.peek(); + CheckpointStatus checkpointStatus = checkpoints.peek(); if (checkpointStatus != null) { if (checkpointStatus.isAcknowledged()) { - lastCheckpointStatus = checkpoints.poll(); - ackStatus.remove(checkpointStatus.getResumeToken()); if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) { - LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); - partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); + long ackCount = 0; + do { + lastCheckpointStatus = checkpoints.poll(); + ackStatus.remove(checkpointStatus.getResumeToken()); + checkpointStatus = checkpoints.peek(); + ackCount++; + // at high TPS each ack contains 100 records. This should checkpoint every 100*50 = 5000 records. + if (ackCount % CHECKPOINT_RECORD_INTERVAL == 0) { + checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount()); + } + } while (checkpointStatus != null && checkpointStatus.isAcknowledged()); + checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount()); lastCheckpointTime = System.currentTimeMillis(); } } else { @@ -100,6 +109,11 @@ private void monitorAcknowledgment(final ExecutorService executorService, final executorService.shutdown(); } + private void checkpoint(final String resumeToken, final long recordCount) { + LOG.debug("Perform regular checkpointing for resume token {} at record count {}", resumeToken, recordCount); + partitionCheckpoint.checkpoint(resumeToken, recordCount); + } + Optional createAcknowledgementSet(final String resumeToken, final long recordNumber) { if (!enableAcknowledgement) { return Optional.empty();