diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 9556849bb..b257594ea 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -92,8 +91,8 @@ class MessageDispatcher { private final LinkedBlockingQueue pendingAcks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>(); - private final LinkedHashMap outstandingReceipts = - new LinkedHashMap(); + private final ConcurrentMap outstandingReceipts = + new ConcurrentHashMap(); private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(); private final AtomicBoolean extendDeadline = new AtomicBoolean(true); private final Lock jobLock; @@ -411,7 +410,7 @@ void processReceivedMessages(List messages) { processBatch(outstandingBatch); } - synchronized void notifyAckSuccess(AckRequestData ackRequestData) { + void notifyAckSuccess(AckRequestData ackRequestData) { if (outstandingReceipts.containsKey(ackRequestData.getAckId())) { outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete(); @@ -437,7 +436,7 @@ synchronized void notifyAckSuccess(AckRequestData ackRequestData) { } } - synchronized void notifyAckFailed(AckRequestData ackRequestData) { + void notifyAckFailed(AckRequestData ackRequestData) { outstandingReceipts.remove(ackRequestData.getAckId()); }