Skip to content

Commit

Permalink
Introduce and use Cluster.withLock to achieve cluster-wide locking (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
stIncMale authored Feb 7, 2022
1 parent 8854c2c commit 47c99b5
Show file tree
Hide file tree
Showing 15 changed files with 126 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,16 @@ MongoException getSrvResolutionException() {

protected void initialize(final Collection<ServerAddress> serverAddresses) {
ClusterDescription currentDescription = getCurrentDescription();
ClusterDescription newDescription;

// synchronizing this code because addServer registers a callback which is re-entrant to this instance.
// In other words, we are leaking a reference to "this" from the constructor.
synchronized (this) {
withLock(() -> {
for (final ServerAddress serverAddress : serverAddresses) {
addServer(serverAddress);
}
newDescription = updateDescription();
ClusterDescription newDescription = updateDescription();
fireChangeEvent(newDescription, currentDescription);
}
});
}

@Override
Expand All @@ -111,14 +110,14 @@ protected void connect() {

@Override
public void close() {
synchronized (this) {
withLock(() -> {
if (!isClosed()) {
for (final ServerTuple serverTuple : addressToServerTupleMap.values()) {
serverTuple.server.close();
}
}
super.close();
}
});
}

@Override
Expand All @@ -141,7 +140,7 @@ public void serverDescriptionChanged(final ServerDescriptionChangedEvent event)
}

void onChange(final Collection<ServerAddress> newHosts) {
synchronized (this) {
withLock(() -> {
if (isClosed()) {
return;
}
Expand All @@ -165,14 +164,11 @@ void onChange(final Collection<ServerAddress> newHosts) {
ClusterDescription newClusterDescription = updateDescription();

fireChangeEvent(newClusterDescription, oldClusterDescription);
}
});
}

private void onChange(final ServerDescriptionChangedEvent event) {
ClusterDescription oldClusterDescription = null;
ClusterDescription newClusterDescription = null;
boolean shouldUpdateDescription = true;
synchronized (this) {
withLock(() -> {
if (isClosed()) {
return;
}
Expand All @@ -193,6 +189,7 @@ private void onChange(final ServerDescriptionChangedEvent event) {
return;
}

boolean shouldUpdateDescription = true;
if (newDescription.isOk()) {
if (clusterType == UNKNOWN && newDescription.getType() != REPLICA_SET_GHOST) {
clusterType = newDescription.getClusterType();
Expand All @@ -216,6 +213,8 @@ private void onChange(final ServerDescriptionChangedEvent event) {
}
}

ClusterDescription oldClusterDescription = null;
ClusterDescription newClusterDescription = null;
if (shouldUpdateDescription) {
serverTuple.description = newDescription;
oldClusterDescription = getCurrentDescription();
Expand All @@ -224,7 +223,7 @@ private void onChange(final ServerDescriptionChangedEvent event) {
if (shouldUpdateDescription) {
fireChangeEvent(newClusterDescription, oldClusterDescription);
}
}
});
}

private boolean handleReplicaSetMemberChanged(final ServerDescription newDescription) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,15 @@ public boolean isClosed() {
return isClosed;
}

protected synchronized void updateDescription(final ClusterDescription newDescription) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Updating cluster description to %s", newDescription.getShortDescription()));
}
protected void updateDescription(final ClusterDescription newDescription) {
withLock(() -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Updating cluster description to %s", newDescription.getShortDescription()));
}

description = newDescription;
updatePhase();
description = newDescription;
updatePhase();
});
}

/**
Expand All @@ -265,8 +267,13 @@ public ClusterDescription getCurrentDescription() {
return description;
}

private synchronized void updatePhase() {
phase.getAndSet(new CountDownLatch(1)).countDown();
@Override
public synchronized void withLock(final Runnable action) {
action.run();
}

private void updatePhase() {
withLock(() -> phase.getAndSet(new CountDownLatch(1)).countDown());
}

private long getMaxWaitTimeNanos() {
Expand Down Expand Up @@ -387,7 +394,7 @@ private ServerSelector getCompositeServerSelector(final ServerSelector serverSel

protected ClusterableServer createServer(final ServerAddress serverAddress,
final ServerDescriptionChangedListener serverDescriptionChangedListener) {
return serverFactory.create(serverAddress, serverDescriptionChangedListener, clusterClock);
return serverFactory.create(this, serverAddress, serverDescriptionChangedListener, clusterClock);
}

private void throwIfIncompatible(final ClusterDescription curDescription) {
Expand Down Expand Up @@ -456,26 +463,30 @@ long getRemainingTime() {
}
}

private synchronized void notifyWaitQueueHandler(final ServerSelectionRequest request) {
if (isClosed) {
return;
}
private void notifyWaitQueueHandler(final ServerSelectionRequest request) {
withLock(() -> {
if (isClosed) {
return;
}

waitQueue.add(request);
waitQueue.add(request);

if (waitQueueHandler == null) {
waitQueueHandler = new Thread(new WaitQueueHandler(), "cluster-" + clusterId.getValue());
waitQueueHandler.setDaemon(true);
waitQueueHandler.start();
} else {
updatePhase();
}
if (waitQueueHandler == null) {
waitQueueHandler = new Thread(new WaitQueueHandler(), "cluster-" + clusterId.getValue());
waitQueueHandler.setDaemon(true);
waitQueueHandler.start();
} else {
updatePhase();
}
});
}

private synchronized void stopWaitQueueHandler() {
if (waitQueueHandler != null) {
waitQueueHandler.interrupt();
}
private void stopWaitQueueHandler() {
withLock(() -> {
if (waitQueueHandler != null) {
waitQueueHandler.interrupt();
}
});
}

private final class WaitQueueHandler implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,11 @@ public interface Cluster extends Closeable {
* @return true if all the servers in this cluster have been closed
*/
boolean isClosed();

/**
* Does the supplied {@code action} while holding a reentrant cluster-wide lock.
*
* @param action The action to {@linkplain Runnable#run() do}.
*/
void withLock(Runnable action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.mongodb.connection.ServerSettings;

public interface ClusterableServerFactory {
ClusterableServer create(ServerAddress serverAddress, ServerDescriptionChangedListener serverDescriptionChangedListener,
ClusterClock clusterClock);
ClusterableServer create(Cluster cluster, ServerAddress serverAddress,
ServerDescriptionChangedListener serverDescriptionChangedListener, ClusterClock clusterClock);

ServerSettings getSettings();
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public DefaultClusterableServerFactory(final ClusterId clusterId, final ClusterS
}

@Override
public ClusterableServer create(final ServerAddress serverAddress,
public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress,
final ServerDescriptionChangedListener serverDescriptionChangedListener,
final ClusterClock clusterClock) {
ServerId serverId = new ServerId(clusterId, serverAddress);
Expand All @@ -91,7 +91,7 @@ mongoDriverInformation, emptyList(), null, serverApi),
mongoDriverInformation, compressorList, commandListener, serverApi),
connectionPoolSettings, internalConnectionPoolSettings, sdamProvider);
ServerListener serverListener = singleServerListener(serverSettings);
SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(serverId, serverDescriptionChangedListener,
SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(cluster, serverId, serverDescriptionChangedListener,
serverListener, serverMonitor, connectionPool, clusterSettings.getMode());
sdamProvider.initialize(sdam);
serverMonitor.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@
import com.mongodb.event.ServerDescriptionChangedEvent;
import com.mongodb.event.ServerListener;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertTrue;
Expand All @@ -36,34 +33,34 @@

@ThreadSafe
final class DefaultSdamServerDescriptionManager implements SdamServerDescriptionManager {
private final Cluster cluster;
private final ServerId serverId;
private final ServerDescriptionChangedListener serverDescriptionChangedListener;
private final ServerListener serverListener;
private final ServerMonitor serverMonitor;
private final ConnectionPool connectionPool;
private final ClusterConnectionMode connectionMode;
private final Lock lock;
private volatile ServerDescription description;

DefaultSdamServerDescriptionManager(final ServerId serverId,
DefaultSdamServerDescriptionManager(final Cluster cluster,
final ServerId serverId,
final ServerDescriptionChangedListener serverDescriptionChangedListener,
final ServerListener serverListener, final ServerMonitor serverMonitor,
final ConnectionPool connectionPool,
final ClusterConnectionMode connectionMode) {
this.cluster = cluster;
this.serverId = assertNotNull(serverId);
this.serverDescriptionChangedListener = assertNotNull(serverDescriptionChangedListener);
this.serverListener = assertNotNull(serverListener);
this.serverMonitor = assertNotNull(serverMonitor);
this.connectionPool = assertNotNull(connectionPool);
this.connectionMode = assertNotNull(connectionMode);
description = unknownConnectingServerDescription(serverId, null);
lock = new ReentrantLock();
}

@Override
public void update(final ServerDescription candidateDescription) {
lock.lock();
try {
cluster.withLock(() -> {
if (TopologyVersionHelper.newer(description.getTopologyVersion(), candidateDescription.getTopologyVersion())) {
return;
}
Expand All @@ -85,29 +82,17 @@ public void update(final ServerDescription candidateDescription) {
assertFalse(markedPoolReady);
connectionPool.invalidate(candidateDescriptionException);
}
} finally {
lock.unlock();
}
});
}

@Override
public void handleExceptionBeforeHandshake(final SdamIssue sdamIssue) {
lock.lock();
try {
handleException(sdamIssue, true);
} finally {
lock.unlock();
}
cluster.withLock(() -> handleException(sdamIssue, true));
}

@Override
public void handleExceptionAfterHandshake(final SdamIssue sdamIssue) {
lock.lock();
try {
handleException(sdamIssue, false);
} finally {
lock.unlock();
}
cluster.withLock(() -> handleException(sdamIssue, false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private void init(final ClusterId clusterId, final ClusterableServerFactory serv
.address(host)
.build()),
settings, serverFactory.getSettings());
server = serverFactory.create(host, event -> { }, clusterClock);
server = serverFactory.create(this, host, event -> { }, clusterClock);

clusterListener.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(clusterId, description, initialDescription));
}
Expand Down Expand Up @@ -282,6 +282,11 @@ public boolean isClosed() {
return closed.get();
}

@Override
public void withLock(final Runnable action) {
fail();
}

private void handleServerSelectionRequest(final ServerSelectionRequest serverSelectionRequest) {
assertTrue(initializationCompleted);
if (srvRecordResolvedToMultipleHosts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public LoadBalancedClusterableServerFactory(final ClusterId clusterId, final Ser
}

@Override
public ClusterableServer create(final ServerAddress serverAddress,
public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress,
final ServerDescriptionChangedListener serverDescriptionChangedListener,
final ClusterClock clusterClock) {
ConnectionPool connectionPool = new DefaultConnectionPool(new ServerId(clusterId, serverAddress),
Expand Down
Loading

0 comments on commit 47c99b5

Please sign in to comment.