From bd8d2cecdfb56b35d41a86697eda279e34f19bcf Mon Sep 17 00:00:00 2001 From: Kristofer Karlsson Date: Tue, 19 Mar 2019 17:26:43 +0100 Subject: [PATCH] Optimize pendingWrites (#4697) Reduce contention on pendingWrites by using a ConcurrentHashMap instead. --- .../com/google/cloud/logging/LoggingImpl.java | 25 ++++++------------- 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/google-cloud-clients/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java b/google-cloud-clients/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java index 32cc296f29d9..d10c69012b0f 100644 --- a/google-cloud-clients/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java +++ b/google-cloud-clients/google-cloud-logging/src/main/java/com/google/cloud/logging/LoggingImpl.java @@ -70,11 +70,9 @@ import com.google.logging.v2.WriteLogEntriesResponse; import com.google.protobuf.Empty; import java.util.ArrayList; -import java.util.Collections; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -83,9 +81,7 @@ class LoggingImpl extends BaseService implements Logging { private static final int FLUSH_WAIT_TIMEOUT_SECONDS = 6; private final LoggingRpc rpc; - private final Object writeLock = new Object(); - private final Set> pendingWrites = - Collections.newSetFromMap(new IdentityHashMap, Boolean>()); + private final Map> pendingWrites = new ConcurrentHashMap<>(); private volatile Synchronicity writeSynchronicity = Synchronicity.ASYNC; private volatile Severity flushSeverity = null; @@ -575,9 +571,7 @@ public void flush() { // BUG(1795): We should force batcher to issue RPC call for buffered messages, // so the code below doesn't wait uselessly. ArrayList> writesToFlush = new ArrayList<>(); - synchronized (writeLock) { - writesToFlush.addAll(pendingWrites); - } + writesToFlush.addAll(pendingWrites.values()); try { ApiFutures.allAsList(writesToFlush).get(FLUSH_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); @@ -596,16 +590,13 @@ private void writeLogEntries(Iterable logEntries, WriteOption... write case ASYNC: default: final ApiFuture writeFuture = writeAsync(logEntries, writeOptions); - synchronized (writeLock) { - pendingWrites.add(writeFuture); - } + final Object pendingKey = new Object(); + pendingWrites.put(pendingKey, writeFuture); ApiFutures.addCallback( writeFuture, new ApiFutureCallback() { private void removeFromPending() { - synchronized (writeLock) { - pendingWrites.remove(writeFuture); - } + pendingWrites.remove(pendingKey); } @Override @@ -711,8 +702,6 @@ public void close() throws Exception { @VisibleForTesting int getNumPendingWrites() { - synchronized (writeLock) { - return pendingWrites.size(); - } + return pendingWrites.size(); } }