Skip to content

Commit

Permalink
fix: concurrent modification of processing receievd messages (#1807)
Browse files Browse the repository at this point in the history
* fix: concurrent modification of processing receievd messages

* Removing synchronized keyword, and making outstandingReceipts into a concurrentMap

* Removing synchronized keyword for notifyAckSuccess and failure as well

* fixing lint

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
maitrimangal and gcf-owl-bot[bot] authored Nov 6, 2023
1 parent e179b94 commit d162126
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -92,8 +91,8 @@ class MessageDispatcher {
private final LinkedBlockingQueue<AckRequestData> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<AckRequestData> pendingNacks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<AckRequestData> pendingReceipts = new LinkedBlockingQueue<>();
private final LinkedHashMap<String, ReceiptCompleteData> outstandingReceipts =
new LinkedHashMap<String, ReceiptCompleteData>();
private final ConcurrentMap<String, ReceiptCompleteData> outstandingReceipts =
new ConcurrentHashMap<String, ReceiptCompleteData>();
private final AtomicInteger messageDeadlineSeconds = new AtomicInteger();
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
private final Lock jobLock;
Expand Down Expand Up @@ -411,7 +410,7 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
processBatch(outstandingBatch);
}

synchronized void notifyAckSuccess(AckRequestData ackRequestData) {
void notifyAckSuccess(AckRequestData ackRequestData) {

if (outstandingReceipts.containsKey(ackRequestData.getAckId())) {
outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete();
Expand All @@ -437,7 +436,7 @@ synchronized void notifyAckSuccess(AckRequestData ackRequestData) {
}
}

synchronized void notifyAckFailed(AckRequestData ackRequestData) {
void notifyAckFailed(AckRequestData ackRequestData) {
outstandingReceipts.remove(ackRequestData.getAckId());
}

Expand Down

0 comments on commit d162126

Please sign in to comment.