Skip to content

Commit

Permalink
Fix ServerDiscoveryAndMonitoringProseTests.testConnectionPoolManageme…
Browse files Browse the repository at this point in the history
…nt and disable it (#873)

JAVA-4483
  • Loading branch information
stIncMale authored Feb 8, 2022
1 parent a9a15b4 commit e557fc4
Showing 1 changed file with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@
import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.Document;
import org.junit.Ignore;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static com.mongodb.ClusterFixture.configureFailPoint;
Expand All @@ -57,6 +61,7 @@
import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.synchronizedList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -168,11 +173,12 @@ public void serverDescriptionChanged(final ServerDescriptionChangedEvent event)
* <a href="https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-tests.rst#connection-pool-management">Connection Pool Management</a>.
*/
@Test
@Ignore
@SuppressWarnings("try")
public void testConnectionPoolManagement() throws InterruptedException {
assumeTrue(serverVersionAtLeast(4, 3));
assumeFalse(isServerlessTest());
BlockingQueue<Object> events = new SynchronousQueue<>(true);
BlockingQueue<Object> events = new LinkedBlockingQueue<>();
ServerMonitorListener serverMonitorListener = new ServerMonitorListener() {
@Override
public void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) {
Expand Down Expand Up @@ -209,7 +215,7 @@ public void connectionPoolCleared(final ConnectionPoolClearedEvent event) {
/* Note that ServerHeartbeatSucceededEvent type is sometimes allowed but never required.
* This is because DefaultServerMonitor does not send such events in situations when a server check happens as part
* of a connection handshake. */
assertPoll(events, ServerHeartbeatSucceededEvent.class, ConnectionPoolReadyEvent.class);
assertPoll(events, ServerHeartbeatSucceededEvent.class, singleton(ConnectionPoolReadyEvent.class));
configureFailPoint(new BsonDocument()
.append("configureFailPoint", new BsonString("failCommand"))
.append("mode", new BsonDocument()
Expand All @@ -218,9 +224,9 @@ public void connectionPoolCleared(final ConnectionPoolClearedEvent event) {
.append("failCommands", new BsonArray(asList(new BsonString("isMaster"), new BsonString("hello"))))
.append("errorCode", new BsonInt32(1234))
.append("appName", new BsonString(appName))));
assertPoll(events, ServerHeartbeatSucceededEvent.class, ServerHeartbeatFailedEvent.class);
assertPoll(events, null, ConnectionPoolClearedEvent.class);
assertPoll(events, ServerHeartbeatSucceededEvent.class, ConnectionPoolReadyEvent.class);
assertPoll(events, ServerHeartbeatSucceededEvent.class,
new HashSet<>(asList(ServerHeartbeatFailedEvent.class, ConnectionPoolClearedEvent.class)));
assertPoll(events, null, new HashSet<>(asList(ServerHeartbeatSucceededEvent.class, ConnectionPoolReadyEvent.class)));
} finally {
disableFailPoint("failCommand");
}
Expand Down Expand Up @@ -268,13 +274,14 @@ public void monitorsSleepAtLeastMinHeartbeatFreqencyMSBetweenChecks() {
}
}

private static void assertPoll(final BlockingQueue<?> queue, @Nullable final Class<?> allowed, final Class<?> required)
private static void assertPoll(final BlockingQueue<?> queue, @Nullable final Class<?> allowed, final Set<Class<?>> required)
throws InterruptedException {
assertPoll(queue, allowed, required, Timeout.startNow(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS));
}

private static void assertPoll(final BlockingQueue<?> queue, @Nullable final Class<?> allowed, final Class<?> required,
private static void assertPoll(final BlockingQueue<?> queue, @Nullable final Class<?> allowed, final Set<Class<?>> required,
final Timeout timeout) throws InterruptedException {
Set<Class<?>> encountered = new HashSet<>();
while (true) {
Object element;
if (timeout.isImmediate()) {
Expand All @@ -286,22 +293,31 @@ private static void assertPoll(final BlockingQueue<?> queue, @Nullable final Cla
}
if (element != null) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Polled " + element.toString());
LOGGER.info("Polled " + element);
}
Class<?> elementClass = element.getClass();
if (required.isAssignableFrom(elementClass)) {
if (findAssignable(elementClass, required)
.map(found -> {
encountered.add(found);
return encountered.equals(required);
}).orElseGet(() -> {
assertTrue(String.format("allowed %s, required %s, actual %s", allowed, required, elementClass),
allowed != null && allowed.isAssignableFrom(elementClass));
return false;
})) {
return;
} else {
assertTrue(String.format("allowed %s, required %s, actual %s", allowed, required, elementClass),
allowed != null && allowed.isAssignableFrom(elementClass));
}
}
if (timeout.expired()) {
fail("required " + required);
fail(String.format("encountered %s, required %s", encountered, required));
}
}
}

private static Optional<Class<?>> findAssignable(final Class<?> from, final Set<Class<?>> toAnyOf) {
return toAnyOf.stream().filter(to -> to.isAssignableFrom(from)).findAny();
}

private static <E> void put(final BlockingQueue<E> q, final E e) {
try {
q.put(e);
Expand Down

0 comments on commit e557fc4

Please sign in to comment.