From d16212627b0d9b6616e0a9b20af2c430e2f6b36f Mon Sep 17 00:00:00 2001 From: maitrimangal <121899734+maitrimangal@users.noreply.github.com> Date: Mon, 6 Nov 2023 15:13:47 -0500 Subject: [PATCH] fix: concurrent modification of processing receievd messages (#1807) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: concurrent modification of processing receievd messages * Removing synchronized keyword, and making outstandingReceipts into a concurrentMap * Removing synchronized keyword for notifyAckSuccess and failure as well * fixing lint * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --- .../com/google/cloud/pubsub/v1/MessageDispatcher.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 9556849bb..b257594ea 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -92,8 +91,8 @@ class MessageDispatcher { private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>(); - private final LinkedHashMap outstandingReceipts = - new LinkedHashMap(); + private final ConcurrentMap outstandingReceipts = + new ConcurrentHashMap(); private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(); private final AtomicBoolean extendDeadline = new AtomicBoolean(true); private final Lock jobLock; @@ -411,7 +410,7 @@ void processReceivedMessages(List messages) { processBatch(outstandingBatch); } - synchronized void notifyAckSuccess(AckRequestData ackRequestData) { + void notifyAckSuccess(AckRequestData ackRequestData) { if (outstandingReceipts.containsKey(ackRequestData.getAckId())) { outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete(); @@ -437,7 +436,7 @@ synchronized void notifyAckSuccess(AckRequestData ackRequestData) { } } - synchronized void notifyAckFailed(AckRequestData ackRequestData) { + void notifyAckFailed(AckRequestData ackRequestData) { outstandingReceipts.remove(ackRequestData.getAckId()); }