Skip to content

Commit

Permalink
Refactor transaction server pinning (#1211)
Browse files Browse the repository at this point in the history
* Determine whether TransactionContext is required based on ConnectionSource's ServerDescription
instead of the ClusterDescription
* Remove Cluster#getDescription and update tests that relied on it

JAVA-5186
  • Loading branch information
jyemin authored Oct 4, 2023
1 parent 6a20fb6 commit 92c90c6
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,56 +165,6 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati
}
}

@Override
public ClusterDescription getDescription() {
isTrue("open", !isClosed());

try {
CountDownLatch currentPhase = phase.get();
ClusterDescription curDescription = description;

boolean selectionFailureLogged = false;

long startTimeNanos = System.nanoTime();
long curTimeNanos = startTimeNanos;
long maxWaitTimeNanos = getMaxWaitTimeNanos();

while (curDescription.getType() == ClusterType.UNKNOWN) {

if (curTimeNanos - startTimeNanos > maxWaitTimeNanos) {
throw new MongoTimeoutException(format("Timed out after %d ms while waiting to connect. Client view of cluster state "
+ "is %s",
settings.getServerSelectionTimeout(MILLISECONDS),
curDescription.getShortDescription()));
}

if (!selectionFailureLogged) {
if (LOGGER.isInfoEnabled()) {
if (settings.getServerSelectionTimeout(MILLISECONDS) < 0) {
LOGGER.info("Cluster description not yet available. Waiting indefinitely.");
} else {
LOGGER.info(format("Cluster description not yet available. Waiting for %d ms before timing out",
settings.getServerSelectionTimeout(MILLISECONDS)));
}
}
selectionFailureLogged = true;
}

connect();

currentPhase.await(Math.min(maxWaitTimeNanos - (curTimeNanos - startTimeNanos), getMinWaitTimeNanos()), NANOSECONDS);

curTimeNanos = System.nanoTime();

currentPhase = phase.get();
curDescription = description;
}
return curDescription;
} catch (InterruptedException e) {
throw interruptAndCreateMongoInterruptedException("Interrupted while waiting to connect", e);
}
}

public ClusterId getClusterId() {
return clusterId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ public interface Cluster extends Closeable {

ClusterSettings getSettings();

/**
* Get the description of this cluster. This method will not return normally until the cluster type is known.
*
* @return a ClusterDescription representing the current state of the cluster
* @throws com.mongodb.MongoTimeoutException if the timeout has been reached before the cluster type is known
* @throws com.mongodb.MongoInterruptedException if interrupted when getting the cluster description
*/
ClusterDescription getDescription();


ClusterId getClusterId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,6 @@ public ClusterSettings getSettings() {
return settings;
}

@Override
public ClusterDescription getDescription() {
isTrue("open", !isClosed());
waitForSrv();
return description;
}

@Override
public ClusterId getClusterId() {
return clusterId;
Expand Down
34 changes: 24 additions & 10 deletions driver-core/src/test/functional/com/mongodb/ClusterFixture.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.mongodb.async.FutureResultCallback;
import com.mongodb.connection.AsynchronousSocketChannelStreamFactory;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ConnectionPoolSettings;
Expand Down Expand Up @@ -85,6 +86,7 @@
import static com.mongodb.connection.ClusterType.REPLICA_SET;
import static com.mongodb.connection.ClusterType.SHARDED;
import static com.mongodb.connection.ClusterType.STANDALONE;
import static com.mongodb.connection.ClusterType.UNKNOWN;
import static com.mongodb.internal.connection.ClusterDescriptionHelper.getPrimaries;
import static com.mongodb.internal.connection.ClusterDescriptionHelper.getSecondaries;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
Expand Down Expand Up @@ -140,7 +142,20 @@ public static String getDefaultDatabaseName() {
}

public static boolean clusterIsType(final ClusterType clusterType) {
return getCluster().getDescription().getType() == clusterType;
return getClusterDescription(getCluster()).getType() == clusterType;
}

public static ClusterDescription getClusterDescription(final Cluster cluster) {
try {
ClusterDescription clusterDescription = cluster.getCurrentDescription();
while (clusterDescription.getType() == UNKNOWN) {
Thread.sleep(10);
clusterDescription = cluster.getCurrentDescription();
}
return clusterDescription;
} catch (InterruptedException e) {
throw interruptAndCreateMongoInterruptedException("Interrupted", e);
}
}

public static ServerVersion getServerVersion() {
Expand Down Expand Up @@ -449,27 +464,27 @@ public static SslSettings getSslSettings(final ConnectionString connectionString
}

public static ServerAddress getPrimary() {
List<ServerDescription> serverDescriptions = getPrimaries(getCluster().getDescription());
List<ServerDescription> serverDescriptions = getPrimaries(getClusterDescription(getCluster()));
while (serverDescriptions.isEmpty()) {
try {
sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
serverDescriptions = getPrimaries(getCluster().getDescription());
serverDescriptions = getPrimaries(getClusterDescription(getCluster()));
}
return serverDescriptions.get(0).getAddress();
}

public static ServerAddress getSecondary() {
List<ServerDescription> serverDescriptions = getSecondaries(getCluster().getDescription());
List<ServerDescription> serverDescriptions = getSecondaries(getClusterDescription(getCluster()));
while (serverDescriptions.isEmpty()) {
try {
sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
serverDescriptions = getSecondaries(getCluster().getDescription());
serverDescriptions = getSecondaries(getClusterDescription(getCluster()));
}
return serverDescriptions.get(0).getAddress();
}
Expand Down Expand Up @@ -499,20 +514,19 @@ public static BsonDocument getServerParameters() {
}

public static boolean isDiscoverableReplicaSet() {
return getCluster().getDescription().getType() == REPLICA_SET
&& getCluster().getDescription().getConnectionMode() == MULTIPLE;
return clusterIsType(REPLICA_SET) && getClusterConnectionMode() == MULTIPLE;
}

public static boolean isSharded() {
return getCluster().getDescription().getType() == SHARDED;
return clusterIsType(SHARDED);
}

public static boolean isStandalone() {
return getCluster().getDescription().getType() == STANDALONE;
return clusterIsType(STANDALONE);
}

public static boolean isLoadBalanced() {
return getCluster().getSettings().getMode() == LOAD_BALANCED;
return getClusterConnectionMode() == LOAD_BALANCED;
}

public static boolean isAuthenticated() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,14 @@ public void tearDown() {
cluster.close();
}

@Test
public void shouldGetDescription() {
// given
setUpCluster(getPrimary());

// expect
assertNotNull(cluster.getDescription());
}

@Test
public void descriptionShouldIncludeSettings() {
// given
setUpCluster(getPrimary());

// expect
assertNotNull(cluster.getDescription().getClusterSettings());
assertNotNull(cluster.getDescription().getServerSettings());
assertNotNull(cluster.getCurrentDescription().getClusterSettings());
assertNotNull(cluster.getCurrentDescription().getServerSettings());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ class BaseClusterSpecification extends Specification {
cluster.getCurrentDescription() == new ClusterDescription(clusterSettings.getMode(), ClusterType.UNKNOWN, [], clusterSettings,
factory.getSettings())

when: 'the description is accessed before initialization'
cluster.getDescription()

then: 'a MongoTimeoutException is thrown'
thrown(MongoTimeoutException)

when: 'a server is selected before initialization'
cluster.selectServer({ def clusterDescription -> [] }, new OperationContext())

Expand Down Expand Up @@ -193,21 +187,10 @@ class BaseClusterSpecification extends Specification {
.exception(new MongoInternalException('oops'))
.build())

cluster.getDescription()

then:
def e = thrown(MongoTimeoutException)
e.getMessage().startsWith("Timed out after ${serverSelectionTimeoutMS} ms while waiting to connect. " +
'Client view of cluster state is {type=UNKNOWN')
e.getMessage().contains('{address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' +
'exception={com.mongodb.MongoInternalException: oops}}')
e.getMessage().contains('{address=localhost:27018, type=UNKNOWN, state=CONNECTING}')

when:
cluster.selectServer(new WritableServerSelector(), new OperationContext())

then:
e = thrown(MongoTimeoutException)
def e = thrown(MongoTimeoutException)
e.getMessage().startsWith("Timed out after ${serverSelectionTimeoutMS} ms while waiting for a server " +
'that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN')
e.getMessage().contains('{address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' +
Expand Down Expand Up @@ -272,37 +255,6 @@ class BaseClusterSpecification extends Specification {
cluster?.close()
}

@Slow
def 'should wait indefinitely for a cluster description until interrupted'() {
given:
def cluster = new MultiServerCluster(new ClusterId(),
builder().mode(MULTIPLE)
.hosts([firstServer, secondServer, thirdServer])
.serverSelectionTimeout(-1, SECONDS)
.build(),
factory)

when:
def latch = new CountDownLatch(1)
def thread = new Thread({
try {
cluster.getDescription()
} catch (MongoInterruptedException e) {
latch.countDown()
}
})
thread.start()
sleep(1000)
thread.interrupt()
def interrupted = latch.await(ClusterFixture.TIMEOUT, SECONDS)

then:
interrupted

cleanup:
cluster?.close()
}

def 'should select server asynchronously when server is already available'() {
given:
def cluster = new MultiServerCluster(new ClusterId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class DnsMultiServerClusterSpecification extends Specification {
factory.sendNotification(secondServer, SHARD_ROUTER)
def firstTestServer = factory.getServer(firstServer)
def secondTestServer = factory.getServer(secondServer)
def clusterDescription = cluster.getDescription()
def clusterDescription = cluster.getCurrentDescription()

then: 'events are generated, description includes hosts, exception is cleared, and servers are open'
2 * clusterListener.clusterDescriptionChanged(_)
Expand All @@ -112,7 +112,7 @@ class DnsMultiServerClusterSpecification extends Specification {
initializer.initialize([secondServer, thirdServer])
factory.sendNotification(secondServer, SHARD_ROUTER)
def thirdTestServer = factory.getServer(thirdServer)
clusterDescription = cluster.getDescription()
clusterDescription = cluster.getCurrentDescription()

then: 'events are generated, description is updated, and the removed server is closed'
1 * clusterListener.clusterDescriptionChanged(_)
Expand All @@ -125,7 +125,7 @@ class DnsMultiServerClusterSpecification extends Specification {

when: 'the listener is initialized with another exception'
initializer.initialize(exception)
clusterDescription = cluster.getDescription()
clusterDescription = cluster.getCurrentDescription()

then: 'the exception is ignored'
0 * clusterListener.clusterDescriptionChanged(_)
Expand Down
Loading

0 comments on commit 92c90c6

Please sign in to comment.