Skip to content

Commit

Permalink
Optimize pendingWrites
Browse files Browse the repository at this point in the history
Reduce contention on pendingWrites by using a ConcurrentHashMap instead.
  • Loading branch information
spkrka committed Mar 19, 2019
1 parent a083e13 commit edad24e
Showing 1 changed file with 7 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,9 @@
import com.google.logging.v2.WriteLogEntriesResponse;
import com.google.protobuf.Empty;
import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -83,9 +81,7 @@ class LoggingImpl extends BaseService<LoggingOptions> implements Logging {

private static final int FLUSH_WAIT_TIMEOUT_SECONDS = 6;
private final LoggingRpc rpc;
private final Object writeLock = new Object();
private final Set<ApiFuture<Void>> pendingWrites =
Collections.newSetFromMap(new IdentityHashMap<ApiFuture<Void>, Boolean>());
private final Map<Object, ApiFuture<Void>> pendingWrites = new ConcurrentHashMap<>();

private volatile Synchronicity writeSynchronicity = Synchronicity.ASYNC;
private volatile Severity flushSeverity = null;
Expand Down Expand Up @@ -575,9 +571,7 @@ public void flush() {
// BUG(1795): We should force batcher to issue RPC call for buffered messages,
// so the code below doesn't wait uselessly.
ArrayList<ApiFuture<Void>> writesToFlush = new ArrayList<>();
synchronized (writeLock) {
writesToFlush.addAll(pendingWrites);
}
writesToFlush.addAll(pendingWrites.values());

try {
ApiFutures.allAsList(writesToFlush).get(FLUSH_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Expand All @@ -596,16 +590,13 @@ private void writeLogEntries(Iterable<LogEntry> logEntries, WriteOption... write
case ASYNC:
default:
final ApiFuture<Void> writeFuture = writeAsync(logEntries, writeOptions);
synchronized (writeLock) {
pendingWrites.add(writeFuture);
}
final Object pendingKey = new Object();
pendingWrites.put(pendingKey, writeFuture);
ApiFutures.addCallback(
writeFuture,
new ApiFutureCallback<Void>() {
private void removeFromPending() {
synchronized (writeLock) {
pendingWrites.remove(writeFuture);
}
pendingWrites.remove(pendingKey);
}

@Override
Expand Down Expand Up @@ -711,8 +702,6 @@ public void close() throws Exception {

@VisibleForTesting
int getNumPendingWrites() {
synchronized (writeLock) {
return pendingWrites.size();
}
return pendingWrites.size();
}
}

0 comments on commit edad24e

Please sign in to comment.