Skip to content

Commit

Permalink
Fix visibility timeout errors (#4812) (#4831) (#4866)
Browse files Browse the repository at this point in the history
Fix visibility timeout errors (#4812)

Signed-off-by: Daniel Li <[email protected]>
(cherry picked from commit 910533a)

Co-authored-by: Daniel Li <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and danhli authored Aug 23, 2024
1 parent 476b9a8 commit 9a8d663
Showing 1 changed file with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ private ReceiveMessageRequest createReceiveMessageRequest() {
private List<DeleteMessageBatchRequestEntry> processS3EventNotificationRecords(final Collection<ParsedMessage> s3EventNotificationRecords) {
final List<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntryCollection = new ArrayList<>();
final List<ParsedMessage> parsedMessagesToRead = new ArrayList<>();
final Map<ParsedMessage, AcknowledgementSet> messageAcknowledgementSetMap = new HashMap<>();
final Map<ParsedMessage, List<DeleteMessageBatchRequestEntry>> messageWaitingForAcknowledgementsMap = new HashMap<>();

for (ParsedMessage parsedMessage : s3EventNotificationRecords) {
if (parsedMessage.isFailedParsing()) {
Expand Down Expand Up @@ -224,7 +226,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
}

LOG.info("Received {} messages from SQS. Processing {} messages.", s3EventNotificationRecords.size(), parsedMessagesToRead.size());

for (ParsedMessage parsedMessage : parsedMessagesToRead) {
List<DeleteMessageBatchRequestEntry> waitingForAcknowledgements = new ArrayList<>();
AcknowledgementSet acknowledgementSet = null;
Expand Down Expand Up @@ -262,7 +264,19 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
},
Duration.ofSeconds(progressCheckInterval));
}
messageAcknowledgementSetMap.put(parsedMessage, acknowledgementSet);
messageWaitingForAcknowledgementsMap.put(parsedMessage, waitingForAcknowledgements);
}
}

if (endToEndAcknowledgementsEnabled) {
LOG.debug("Created acknowledgement sets for {} messages.", parsedMessagesToRead.size());
}

// Use a separate loop for processing the S3 objects
for (ParsedMessage parsedMessage : parsedMessagesToRead) {
final AcknowledgementSet acknowledgementSet = messageAcknowledgementSetMap.get(parsedMessage);
final List<DeleteMessageBatchRequestEntry> waitingForAcknowledgements = messageWaitingForAcknowledgementsMap.get(parsedMessage);
final S3ObjectReference s3ObjectReference = populateS3Reference(parsedMessage.getBucketName(), parsedMessage.getObjectKey());
final Optional<DeleteMessageBatchRequestEntry> deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference, acknowledgementSet);
if (endToEndAcknowledgementsEnabled) {
Expand All @@ -271,7 +285,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
} else {
deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add);
}
}
}

return deleteMessageBatchRequestEntryCollection;
}
Expand Down

0 comments on commit 9a8d663

Please sign in to comment.