From ffe64ecee1f1699c10e0e92d381da0a2b6cd563d Mon Sep 17 00:00:00 2001 From: Valentin Kovalenko Date: Mon, 7 Feb 2022 11:25:53 -0700 Subject: [PATCH] Refactor internal APIs to avoid requiring duplicate data in method/constructor params (#871) JAVA-4471 --- .../AbstractMultiServerCluster.java | 13 ++--- .../internal/connection/BaseCluster.java | 10 ++-- .../mongodb/internal/connection/Cluster.java | 17 +++--- .../connection/ClusterableServerFactory.java | 3 +- .../connection/DefaultClusterFactory.java | 4 +- .../DefaultClusterableServerFactory.java | 30 +++++------ .../DefaultSdamServerDescriptionManager.java | 5 +- .../connection/LoadBalancedCluster.java | 13 +++-- .../LoadBalancedClusterableServerFactory.java | 15 ++---- .../ServerDescriptionChangedListener.java | 28 ---------- .../connection/SingleServerCluster.java | 52 +++++++++---------- .../connection/SingleServerClusterTest.java | 2 +- .../AbstractConnectionPoolTest.java | 5 +- ...tractServerDiscoveryAndMonitoringTest.java | 2 +- .../BaseClusterSpecification.groovy | 5 ++ .../DefaultServerSpecification.groovy | 14 +++-- .../DefaultTestClusterableServerFactory.java | 20 +++---- .../connection/LoadBalancedClusterTest.java | 8 +-- .../connection/SrvPollingProseTests.java | 2 +- .../TestClusterableServerFactory.java | 6 +-- .../internal/connection/TestServer.java | 11 ++-- 21 files changed, 109 insertions(+), 156 deletions(-) delete mode 100644 driver-core/src/main/com/mongodb/internal/connection/ServerDescriptionChangedListener.java diff --git a/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java b/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java index 2307118bcf9..17942807a3d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java @@ -131,14 +131,6 @@ public ClusterableServer getServer(final ServerAddress serverAddress) { return serverTuple.server; } - - private final class DefaultServerDescriptionChangedListener implements ServerDescriptionChangedListener { - @Override - public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) { - onChange(event); - } - } - void onChange(final Collection newHosts) { withLock(() -> { if (isClosed()) { @@ -167,7 +159,8 @@ void onChange(final Collection newHosts) { }); } - private void onChange(final ServerDescriptionChangedEvent event) { + @Override + public void onChange(final ServerDescriptionChangedEvent event) { withLock(() -> { if (isClosed()) { return; @@ -347,7 +340,7 @@ private void addServer(final ServerAddress serverAddress) { if (LOGGER.isInfoEnabled()) { LOGGER.info(format("Adding discovered server %s to client view of cluster", serverAddress)); } - ClusterableServer server = createServer(serverAddress, new DefaultServerDescriptionChangedListener()); + ClusterableServer server = createServer(serverAddress); addressToServerTupleMap.put(serverAddress, new ServerTuple(server, getConnectingServerDescription(serverAddress))); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index 69b7496aa06..b8c685b707e 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -38,7 +38,6 @@ import com.mongodb.lang.Nullable; import com.mongodb.selector.CompositeServerSelector; import com.mongodb.selector.ServerSelector; -import org.bson.BsonTimestamp; import java.util.ArrayList; import java.util.Collections; @@ -91,8 +90,8 @@ abstract class BaseCluster implements Cluster { } @Override - public BsonTimestamp getClusterTime() { - return clusterClock.getClusterTime(); + public ClusterClock getClock() { + return clusterClock; } @Override @@ -392,9 +391,8 @@ private ServerSelector getCompositeServerSelector(final ServerSelector serverSel } } - protected ClusterableServer createServer(final ServerAddress serverAddress, - final ServerDescriptionChangedListener serverDescriptionChangedListener) { - return serverFactory.create(this, serverAddress, serverDescriptionChangedListener, clusterClock); + protected ClusterableServer createServer(final ServerAddress serverAddress) { + return serverFactory.create(this, serverAddress); } private void throwIfIncompatible(final ClusterDescription curDescription) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/Cluster.java b/driver-core/src/main/com/mongodb/internal/connection/Cluster.java index 13cdb0d7915..dbf854ba2ab 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/Cluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/Cluster.java @@ -19,13 +19,13 @@ import com.mongodb.ServerAddress; import com.mongodb.connection.ClusterId; +import com.mongodb.event.ServerDescriptionChangedEvent; import com.mongodb.internal.VisibleForTesting; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ClusterSettings; import com.mongodb.lang.Nullable; import com.mongodb.selector.ServerSelector; -import org.bson.BsonTimestamp; import java.io.Closeable; @@ -69,13 +69,9 @@ public interface Cluster extends Closeable { ClusterDescription getCurrentDescription(); /** - * Get the last seen cluster time - * - * @since 3.8 - * @return the last seen cluster time or null if not set + * Get the {@link ClusterClock} from which one may get the last seen cluster time. */ - @Nullable - BsonTimestamp getClusterTime(); + ClusterClock getClock(); ServerTuple selectServer(ServerSelector serverSelector); @@ -99,4 +95,11 @@ public interface Cluster extends Closeable { * @param action The action to {@linkplain Runnable#run() do}. */ void withLock(Runnable action); + + /** + * This method allows {@link Server}s to notify the {@link Cluster} about changes in their state as per the + * + * Server Discovery And Monitoring specification. + */ + void onChange(ServerDescriptionChangedEvent event); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/ClusterableServerFactory.java b/driver-core/src/main/com/mongodb/internal/connection/ClusterableServerFactory.java index 335ba68056f..27820fb5da1 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ClusterableServerFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ClusterableServerFactory.java @@ -20,8 +20,7 @@ import com.mongodb.connection.ServerSettings; public interface ClusterableServerFactory { - ClusterableServer create(Cluster cluster, ServerAddress serverAddress, - ServerDescriptionChangedListener serverDescriptionChangedListener, ClusterClock clusterClock); + ClusterableServer create(Cluster cluster, ServerAddress serverAddress); ServerSettings getSettings(); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java index ab598ee77b5..089111145c6 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java @@ -106,13 +106,13 @@ public Cluster createCluster(final ClusterSettings originalClusterSettings, fina DnsSrvRecordMonitorFactory dnsSrvRecordMonitorFactory = new DefaultDnsSrvRecordMonitorFactory(clusterId, serverSettings); if (clusterSettings.getMode() == ClusterConnectionMode.LOAD_BALANCED) { - ClusterableServerFactory serverFactory = new LoadBalancedClusterableServerFactory(clusterId, serverSettings, + ClusterableServerFactory serverFactory = new LoadBalancedClusterableServerFactory(serverSettings, connectionPoolSettings, internalConnectionPoolSettings, streamFactory, credential, commandListener, applicationName, mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(), compressorList, serverApi); return new LoadBalancedCluster(clusterId, clusterSettings, serverFactory, dnsSrvRecordMonitorFactory); } else { - ClusterableServerFactory serverFactory = new DefaultClusterableServerFactory(clusterId, clusterSettings, serverSettings, + ClusterableServerFactory serverFactory = new DefaultClusterableServerFactory(serverSettings, connectionPoolSettings, internalConnectionPoolSettings, streamFactory, heartbeatStreamFactory, credential, commandListener, applicationName, mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(), compressorList, diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java index 44b72cee5ff..f26939e0115 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java @@ -21,8 +21,7 @@ import com.mongodb.MongoDriverInformation; import com.mongodb.ServerAddress; import com.mongodb.ServerApi; -import com.mongodb.connection.ClusterId; -import com.mongodb.connection.ClusterSettings; +import com.mongodb.connection.ClusterConnectionMode; import com.mongodb.connection.ConnectionPoolSettings; import com.mongodb.connection.ServerId; import com.mongodb.connection.ServerSettings; @@ -38,8 +37,6 @@ import static java.util.Collections.emptyList; public class DefaultClusterableServerFactory implements ClusterableServerFactory { - private final ClusterId clusterId; - private final ClusterSettings clusterSettings; private final ServerSettings serverSettings; private final ConnectionPoolSettings connectionPoolSettings; private final InternalConnectionPoolSettings internalConnectionPoolSettings; @@ -53,15 +50,13 @@ public class DefaultClusterableServerFactory implements ClusterableServerFactory @Nullable private final ServerApi serverApi; - public DefaultClusterableServerFactory(final ClusterId clusterId, final ClusterSettings clusterSettings, + public DefaultClusterableServerFactory( final ServerSettings serverSettings, final ConnectionPoolSettings connectionPoolSettings, final InternalConnectionPoolSettings internalConnectionPoolSettings, final StreamFactory streamFactory, final StreamFactory heartbeatStreamFactory, final MongoCredential credential, final CommandListener commandListener, final String applicationName, final MongoDriverInformation mongoDriverInformation, final List compressorList, final @Nullable ServerApi serverApi) { - this.clusterId = clusterId; - this.clusterSettings = clusterSettings; this.serverSettings = serverSettings; this.connectionPoolSettings = connectionPoolSettings; this.internalConnectionPoolSettings = internalConnectionPoolSettings; @@ -76,27 +71,26 @@ public DefaultClusterableServerFactory(final ClusterId clusterId, final ClusterS } @Override - public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress, - final ServerDescriptionChangedListener serverDescriptionChangedListener, - final ClusterClock clusterClock) { - ServerId serverId = new ServerId(clusterId, serverAddress); + public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress) { + ServerId serverId = new ServerId(cluster.getClusterId(), serverAddress); + ClusterConnectionMode clusterMode = cluster.getSettings().getMode(); SameObjectProvider sdamProvider = SameObjectProvider.uninitialized(); - ServerMonitor serverMonitor = new DefaultServerMonitor(serverId, serverSettings, clusterClock, + ServerMonitor serverMonitor = new DefaultServerMonitor(serverId, serverSettings, cluster.getClock(), // no credentials, compressor list, or command listener for the server monitor factory - new InternalStreamConnectionFactory(clusterSettings.getMode(), heartbeatStreamFactory, null, applicationName, + new InternalStreamConnectionFactory(clusterMode, heartbeatStreamFactory, null, applicationName, mongoDriverInformation, emptyList(), null, serverApi), serverApi, sdamProvider); ConnectionPool connectionPool = new DefaultConnectionPool(serverId, - new InternalStreamConnectionFactory(clusterSettings.getMode(), streamFactory, credential, applicationName, + new InternalStreamConnectionFactory(clusterMode, streamFactory, credential, applicationName, mongoDriverInformation, compressorList, commandListener, serverApi), connectionPoolSettings, internalConnectionPoolSettings, sdamProvider); ServerListener serverListener = singleServerListener(serverSettings); - SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(cluster, serverId, serverDescriptionChangedListener, - serverListener, serverMonitor, connectionPool, clusterSettings.getMode()); + SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(cluster, serverId, serverListener, serverMonitor, + connectionPool, clusterMode); sdamProvider.initialize(sdam); serverMonitor.start(); - return new DefaultServer(serverId, clusterSettings.getMode(), connectionPool, new DefaultConnectionFactory(), serverMonitor, - sdam, serverListener, commandListener, clusterClock, true); + return new DefaultServer(serverId, clusterMode, connectionPool, new DefaultConnectionFactory(), serverMonitor, + sdam, serverListener, commandListener, cluster.getClock(), true); } @Override diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java index 38295b2cd05..3e32d505641 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultSdamServerDescriptionManager.java @@ -35,7 +35,6 @@ final class DefaultSdamServerDescriptionManager implements SdamServerDescriptionManager { private final Cluster cluster; private final ServerId serverId; - private final ServerDescriptionChangedListener serverDescriptionChangedListener; private final ServerListener serverListener; private final ServerMonitor serverMonitor; private final ConnectionPool connectionPool; @@ -44,13 +43,11 @@ final class DefaultSdamServerDescriptionManager implements SdamServerDescription DefaultSdamServerDescriptionManager(final Cluster cluster, final ServerId serverId, - final ServerDescriptionChangedListener serverDescriptionChangedListener, final ServerListener serverListener, final ServerMonitor serverMonitor, final ConnectionPool connectionPool, final ClusterConnectionMode connectionMode) { this.cluster = cluster; this.serverId = assertNotNull(serverId); - this.serverDescriptionChangedListener = assertNotNull(serverDescriptionChangedListener); this.serverListener = assertNotNull(serverListener); this.serverMonitor = assertNotNull(serverMonitor); this.connectionPool = assertNotNull(connectionPool); @@ -113,7 +110,7 @@ private void updateDescription(final ServerDescription newDescription) { if (!wouldDescriptionsGenerateEquivalentEvents(newDescription, previousDescription)) { serverListener.serverDescriptionChanged(serverDescriptionChangedEvent); } - serverDescriptionChangedListener.serverDescriptionChanged(serverDescriptionChangedEvent); + cluster.onChange(serverDescriptionChangedEvent); } private void handleException(final SdamIssue sdamIssue, final boolean beforeHandshake) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java index 173e9f27e17..de19c41dbc0 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java @@ -36,10 +36,10 @@ import com.mongodb.event.ClusterDescriptionChangedEvent; import com.mongodb.event.ClusterListener; import com.mongodb.event.ClusterOpeningEvent; +import com.mongodb.event.ServerDescriptionChangedEvent; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.lang.Nullable; import com.mongodb.selector.ServerSelector; -import org.bson.BsonTimestamp; import java.util.ArrayList; import java.util.Collection; @@ -159,7 +159,7 @@ private void init(final ClusterId clusterId, final ClusterableServerFactory serv .address(host) .build()), settings, serverFactory.getSettings()); - server = serverFactory.create(this, host, event -> { }, clusterClock); + server = serverFactory.create(this, host); clusterListener.clusterDescriptionChanged(new ClusterDescriptionChangedEvent(clusterId, description, initialDescription)); } @@ -196,9 +196,9 @@ public ClusterDescription getCurrentDescription() { } @Override - public BsonTimestamp getClusterTime() { + public ClusterClock getClock() { isTrue("open", !isClosed()); - return clusterClock.getClusterTime(); + return clusterClock; } @Override @@ -287,6 +287,11 @@ public void withLock(final Runnable action) { fail(); } + @Override + public void onChange(final ServerDescriptionChangedEvent event) { + fail(); + } + private void handleServerSelectionRequest(final ServerSelectionRequest serverSelectionRequest) { assertTrue(initializationCompleted); if (srvRecordResolvedToMultipleHosts) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedClusterableServerFactory.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedClusterableServerFactory.java index 03afee1f617..06ab77c7004 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedClusterableServerFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedClusterableServerFactory.java @@ -23,7 +23,6 @@ import com.mongodb.ServerApi; import com.mongodb.annotations.ThreadSafe; import com.mongodb.connection.ClusterConnectionMode; -import com.mongodb.connection.ClusterId; import com.mongodb.connection.ConnectionPoolSettings; import com.mongodb.connection.ServerId; import com.mongodb.connection.ServerSettings; @@ -37,7 +36,6 @@ @ThreadSafe public class LoadBalancedClusterableServerFactory implements ClusterableServerFactory { - private final ClusterId clusterId; private final ServerSettings serverSettings; private final ConnectionPoolSettings connectionPoolSettings; private final InternalConnectionPoolSettings internalConnectionPoolSettings; @@ -49,14 +47,13 @@ public class LoadBalancedClusterableServerFactory implements ClusterableServerFa private final List compressorList; private final ServerApi serverApi; - public LoadBalancedClusterableServerFactory(final ClusterId clusterId, final ServerSettings serverSettings, + public LoadBalancedClusterableServerFactory(final ServerSettings serverSettings, final ConnectionPoolSettings connectionPoolSettings, final InternalConnectionPoolSettings internalConnectionPoolSettings, final StreamFactory streamFactory, final MongoCredential credential, final CommandListener commandListener, final String applicationName, final MongoDriverInformation mongoDriverInformation, final List compressorList, final ServerApi serverApi) { - this.clusterId = clusterId; this.serverSettings = serverSettings; this.connectionPoolSettings = connectionPoolSettings; this.internalConnectionPoolSettings = internalConnectionPoolSettings; @@ -70,17 +67,15 @@ public LoadBalancedClusterableServerFactory(final ClusterId clusterId, final Ser } @Override - public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress, - final ServerDescriptionChangedListener serverDescriptionChangedListener, - final ClusterClock clusterClock) { - ConnectionPool connectionPool = new DefaultConnectionPool(new ServerId(clusterId, serverAddress), + public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress) { + ConnectionPool connectionPool = new DefaultConnectionPool(new ServerId(cluster.getClusterId(), serverAddress), new InternalStreamConnectionFactory(ClusterConnectionMode.LOAD_BALANCED, streamFactory, credential, applicationName, mongoDriverInformation, compressorList, commandListener, serverApi), connectionPoolSettings, internalConnectionPoolSettings, EmptyProvider.instance()); connectionPool.ready(); - return new LoadBalancedServer(new ServerId(clusterId, serverAddress), connectionPool, new DefaultConnectionFactory(), - singleServerListener(serverSettings), clusterClock); + return new LoadBalancedServer(new ServerId(cluster.getClusterId(), serverAddress), connectionPool, new DefaultConnectionFactory(), + singleServerListener(serverSettings), cluster.getClock()); } @Override diff --git a/driver-core/src/main/com/mongodb/internal/connection/ServerDescriptionChangedListener.java b/driver-core/src/main/com/mongodb/internal/connection/ServerDescriptionChangedListener.java deleted file mode 100644 index bc21190dc0d..00000000000 --- a/driver-core/src/main/com/mongodb/internal/connection/ServerDescriptionChangedListener.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.mongodb.internal.connection; - -import com.mongodb.event.ServerDescriptionChangedEvent; - -/* - internal interface that Cluster implementations register with Server implementations in order be notified of changes in - server state. Server implementations should fire this event even if the state has not changed according to the rules of - topology change event notification in the SDAM specification. -*/ -interface ServerDescriptionChangedListener { - void serverDescriptionChanged(ServerDescriptionChangedEvent event); -} diff --git a/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java b/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java index ec58d701249..c0cf459bb99 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java @@ -59,7 +59,7 @@ public SingleServerCluster(final ClusterId clusterId, final ClusterSettings sett // synchronized in the constructor because the change listener is re-entrant to this instance. // In other words, we are leaking a reference to "this" from the constructor. withLock(() -> { - server.set(createServer(settings.getHosts().get(0), new DefaultServerDescriptionChangedListener())); + server.set(createServer(settings.getHosts().get(0))); publishDescription(ServerDescription.builder().state(CONNECTING).address(settings.getHosts().get(0)) .build()); }); @@ -84,34 +84,32 @@ public void close() { } } - private class DefaultServerDescriptionChangedListener implements ServerDescriptionChangedListener { - @Override - public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) { - withLock(() -> { - ServerDescription newDescription = event.getNewDescription(); - if (newDescription.isOk()) { - if (getSettings().getRequiredClusterType() != ClusterType.UNKNOWN - && getSettings().getRequiredClusterType() != newDescription.getClusterType()) { - newDescription = null; - } else if (getSettings().getRequiredClusterType() == ClusterType.REPLICA_SET - && getSettings().getRequiredReplicaSetName() != null) { - if (!getSettings().getRequiredReplicaSetName().equals(newDescription.getSetName())) { - newDescription = ServerDescription.builder(newDescription) - .exception(new MongoConfigurationException( - format("Replica set name '%s' does not match required replica set name of '%s'", - newDescription.getSetName(), getSettings().getRequiredReplicaSetName()))) - .type(ServerType.UNKNOWN) - .setName(null) - .ok(false) - .build(); - publishDescription(ClusterType.UNKNOWN, newDescription); - return; - } + @Override + public void onChange(final ServerDescriptionChangedEvent event) { + withLock(() -> { + ServerDescription newDescription = event.getNewDescription(); + if (newDescription.isOk()) { + if (getSettings().getRequiredClusterType() != ClusterType.UNKNOWN + && getSettings().getRequiredClusterType() != newDescription.getClusterType()) { + newDescription = null; + } else if (getSettings().getRequiredClusterType() == ClusterType.REPLICA_SET + && getSettings().getRequiredReplicaSetName() != null) { + if (!getSettings().getRequiredReplicaSetName().equals(newDescription.getSetName())) { + newDescription = ServerDescription.builder(newDescription) + .exception(new MongoConfigurationException( + format("Replica set name '%s' does not match required replica set name of '%s'", + newDescription.getSetName(), getSettings().getRequiredReplicaSetName()))) + .type(ServerType.UNKNOWN) + .setName(null) + .ok(false) + .build(); + publishDescription(ClusterType.UNKNOWN, newDescription); + return; } } - publishDescription(newDescription); - }); - } + } + publishDescription(newDescription); + }); } private void publishDescription(final ServerDescription serverDescription) { diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java index 9350c0b3f2b..977ebad08a1 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java @@ -68,7 +68,7 @@ private void setUpCluster(final ServerAddress serverAddress) { .build(); cluster = new SingleServerCluster(clusterId, clusterSettings, - new DefaultClusterableServerFactory(clusterId, clusterSettings, ServerSettings.builder().build(), + new DefaultClusterableServerFactory(ServerSettings.builder().build(), ConnectionPoolSettings.builder().maxSize(1).build(), InternalConnectionPoolSettings.builder().build(), streamFactory, streamFactory, getCredential(), diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java index bb141bf958b..2ea4957158b 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java @@ -186,9 +186,8 @@ public void setUp() { new TestCommandListener(), ClusterFixture.getServerApi()), settings, internalSettings, sdamProvider)); - sdamProvider.initialize(new DefaultSdamServerDescriptionManager(mockedCluster(), serverId, - mock(ServerDescriptionChangedListener.class), mock(ServerListener.class), mock(ServerMonitor.class), pool, - connectionMode)); + sdamProvider.initialize(new DefaultSdamServerDescriptionManager(mockedCluster(), serverId, mock(ServerListener.class), + mock(ServerMonitor.class), pool, connectionMode)); setFailPoint(); break; } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java index f4d862aa413..bdca46d2c5e 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java @@ -175,7 +175,7 @@ protected void init(final ServerListenerFactory serverListenerFactory, final Clu ClusterId clusterId = new ClusterId(); - factory = new DefaultTestClusterableServerFactory(clusterId, settings.getMode(), serverListenerFactory); + factory = new DefaultTestClusterableServerFactory(settings.getMode(), serverListenerFactory); ClusterSettings clusterSettings = settings.getClusterListeners().contains(clusterListener) ? settings : ClusterSettings.builder(settings).addClusterListener(clusterListener).build(); diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy index 9b6421fc074..a62f6f925b3 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy @@ -16,6 +16,7 @@ package com.mongodb.internal.connection +import com.mongodb.event.ServerDescriptionChangedEvent import util.spock.annotations.Slow import com.mongodb.ClusterFixture import com.mongodb.MongoClientException @@ -69,6 +70,10 @@ class BaseClusterSpecification extends Specification { ClusterableServer getServer(final ServerAddress serverAddress) { throw new UnsupportedOperationException() } + + @Override + void onChange(final ServerDescriptionChangedEvent event) { + } } expect: 'the description is initialized after construction' diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy index 9a19b09dbf6..1bd5364b28b 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy @@ -42,6 +42,7 @@ import com.mongodb.connection.ServerDescription import com.mongodb.connection.ServerId import com.mongodb.connection.ServerType import com.mongodb.event.CommandListener +import com.mongodb.event.ServerDescriptionChangedEvent import com.mongodb.event.ServerListener import com.mongodb.internal.IgnorableRequestContext import com.mongodb.internal.async.SingleResultCallback @@ -147,8 +148,8 @@ class DefaultServerSpecification extends Specification { def connectionPool = Mock(ConnectionPool) def sdamProvider = SameObjectProvider.uninitialized() def serverMonitor = new TestServerMonitor(sdamProvider) - sdamProvider.initialize(new DefaultSdamServerDescriptionManager(mockCluster(), serverId, Mock(ServerDescriptionChangedListener), - serverListener, serverMonitor, connectionPool, ClusterConnectionMode.MULTIPLE)) + sdamProvider.initialize(new DefaultSdamServerDescriptionManager(mockCluster(), serverId, serverListener, serverMonitor, + connectionPool, ClusterConnectionMode.MULTIPLE)) def server = defaultServer(Mock(ConnectionPool), serverMonitor, serverListener, sdamProvider.get(), Mock(CommandListener)) serverMonitor.updateServerDescription(ServerDescription.builder() .address(serverId.getAddress()) @@ -499,8 +500,7 @@ class DefaultServerSpecification extends Specification { private DefaultServer defaultServer(final ConnectionPool connectionPool, final ServerMonitor serverMonitor) { def serverListener = Mock(ServerListener) defaultServer(connectionPool, serverMonitor, serverListener, - new DefaultSdamServerDescriptionManager(mockCluster(), - serverId, Mock(ServerDescriptionChangedListener), serverListener, serverMonitor, connectionPool, + new DefaultSdamServerDescriptionManager(mockCluster(), serverId, serverListener, serverMonitor, connectionPool, ClusterConnectionMode.MULTIPLE), Mock(CommandListener)) } @@ -581,7 +581,11 @@ class DefaultServerSpecification extends Specification { @Override ClusterableServer getServer(final ServerAddress serverAddress) { - null + throw new UnsupportedOperationException() + } + + @Override + void onChange(final ServerDescriptionChangedEvent event) { } } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultTestClusterableServerFactory.java b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultTestClusterableServerFactory.java index 050bf0c740e..27a3f8b7a73 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultTestClusterableServerFactory.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultTestClusterableServerFactory.java @@ -18,7 +18,6 @@ import com.mongodb.ServerAddress; import com.mongodb.connection.ClusterConnectionMode; -import com.mongodb.connection.ClusterId; import com.mongodb.connection.ServerDescription; import com.mongodb.connection.ServerId; import com.mongodb.connection.ServerSettings; @@ -30,39 +29,34 @@ public class DefaultTestClusterableServerFactory implements ClusterableServerFactory { private final ServerSettings settings = ServerSettings.builder().build(); - private final ClusterId clusterId; private final ClusterConnectionMode clusterConnectionMode; private final ServerListenerFactory serverListenerFactory; private final Map serverAddressToServerMonitorMap = new HashMap<>(); - public DefaultTestClusterableServerFactory(final ClusterId clusterId, final ClusterConnectionMode clusterConnectionMode, + public DefaultTestClusterableServerFactory(final ClusterConnectionMode clusterConnectionMode, final ServerListenerFactory serverListenerFactory) { - this.clusterId = clusterId; this.clusterConnectionMode = clusterConnectionMode; this.serverListenerFactory = serverListenerFactory; } @Override - public ClusterableServer create(final Cluster cluster, - final ServerAddress serverAddress, - final ServerDescriptionChangedListener serverDescriptionChangedListener, - final ClusterClock clusterClock) { - ServerId serverId = new ServerId(clusterId, serverAddress); + public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress) { + ServerId serverId = new ServerId(cluster.getClusterId(), serverAddress); if (clusterConnectionMode == ClusterConnectionMode.LOAD_BALANCED) { return new LoadBalancedServer(serverId, new TestConnectionPool(), - new TestConnectionFactory(), serverListenerFactory.create(serverAddress), clusterClock); + new TestConnectionFactory(), serverListenerFactory.create(serverAddress), cluster.getClock()); } else { SameObjectProvider sdamProvider = SameObjectProvider.uninitialized(); TestServerMonitor serverMonitor = new TestServerMonitor(sdamProvider); serverAddressToServerMonitorMap.put(serverAddress, serverMonitor); ConnectionPool connectionPool = new TestConnectionPool(); ServerListener serverListener = serverListenerFactory.create(serverAddress); - SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(cluster, serverId, serverDescriptionChangedListener, - serverListener, serverMonitor, connectionPool, clusterConnectionMode); + SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(cluster, serverId, serverListener, serverMonitor, + connectionPool, clusterConnectionMode); sdamProvider.initialize(sdam); serverMonitor.start(); return new DefaultServer(serverId, clusterConnectionMode, connectionPool, new TestConnectionFactory(), serverMonitor, sdam, - serverListener, null, clusterClock, true); + serverListener, null, cluster.getClock(), true); } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java index 7c8f74752a6..765d0c3b126 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java @@ -337,7 +337,7 @@ void shouldNotInitServerAfterClosing() { // close `cluster`, call `DnsSrvRecordInitializer.initialize` and check that it does not result in creating a `ClusterableServer` cluster.close(); serverInitializerCaptor.getValue().initialize(Collections.singleton(new ServerAddress())); - verify(serverFactory, never()).create(any(), any(), any(), any()); + verify(serverFactory, never()).create(any(), any()); } @Test @@ -346,12 +346,12 @@ void shouldCloseServerWhenClosing() { ClusterableServerFactory serverFactory = mock(ClusterableServerFactory.class); when(serverFactory.getSettings()).thenReturn(mock(ServerSettings.class)); ClusterableServer server = mock(ClusterableServer.class); - when(serverFactory.create(any(), any(), any(), any())).thenReturn(server); + when(serverFactory.create(any(), any())).thenReturn(server); // create `cluster` and check that it creates a `ClusterableServer` LoadBalancedCluster cluster = new LoadBalancedCluster(new ClusterId(), ClusterSettings.builder().mode(ClusterConnectionMode.LOAD_BALANCED).build(), serverFactory, mock(DnsSrvRecordMonitorFactory.class)); - verify(serverFactory, times(1)).create(any(), any(), any(), any()); + verify(serverFactory, times(1)).create(any(), any()); // close `cluster` and check that it closes `server` cluster.close(); verify(server, atLeastOnce()).close(); @@ -491,7 +491,7 @@ private void assertServerTupleExpectations(final ServerAddress serverAddress, fi private ClusterableServerFactory mockServerFactory(final ServerAddress serverAddress, final ClusterableServer expectedServer) { ClusterableServerFactory serverFactory = mock(ClusterableServerFactory.class); when(serverFactory.getSettings()).thenReturn(ServerSettings.builder().build()); - when(serverFactory.create(any(), eq(serverAddress), any(), any())).thenReturn(expectedServer); + when(serverFactory.create(any(), eq(serverAddress))).thenReturn(expectedServer); return serverFactory; } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/SrvPollingProseTests.java b/driver-core/src/test/unit/com/mongodb/internal/connection/SrvPollingProseTests.java index 87dfdc9c730..a6605725cf8 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/SrvPollingProseTests.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/SrvPollingProseTests.java @@ -66,7 +66,7 @@ public class SrvPollingProseTests { @BeforeEach public void beforeEach() { when(serverFactory.getSettings()).thenReturn(ServerSettings.builder().build()); - when(serverFactory.create(any(), any(), any(), any())).thenReturn(mock(ClusterableServer.class)); + when(serverFactory.create(any(), any())).thenReturn(mock(ClusterableServer.class)); } @AfterEach diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterableServerFactory.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterableServerFactory.java index b48d68246f4..77256086589 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterableServerFactory.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestClusterableServerFactory.java @@ -38,10 +38,8 @@ public class TestClusterableServerFactory implements ClusterableServerFactory { private final Map addressToServerMap = new HashMap(); @Override - public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress, - final ServerDescriptionChangedListener serverDescriptionChangedListener, - final ClusterClock clusterClock) { - addressToServerMap.put(serverAddress, new TestServer(serverAddress, serverDescriptionChangedListener, NO_OP_SERVER_LISTENER)); + public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress) { + addressToServerMap.put(serverAddress, new TestServer(serverAddress, cluster, NO_OP_SERVER_LISTENER)); return addressToServerMap.get(serverAddress); } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServer.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServer.java index 2dde3224020..609512148b7 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServer.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServer.java @@ -27,17 +27,16 @@ import static com.mongodb.connection.ServerConnectionState.CONNECTING; public class TestServer implements ClusterableServer { - private final ServerDescriptionChangedListener serverDescriptionChangedListener; + private final Cluster cluster; private final ServerListener serverListener; private ServerDescription description; private boolean isClosed; private final ServerId serverId; private int connectCount; - public TestServer(final ServerAddress serverAddress, final ServerDescriptionChangedListener serverDescriptionChangedListener, - final ServerListener serverListener) { + public TestServer(final ServerAddress serverAddress, final Cluster cluster, final ServerListener serverListener) { this.serverId = new ServerId(new ClusterId(), serverAddress); - this.serverDescriptionChangedListener = serverDescriptionChangedListener; + this.cluster = cluster; this.serverListener = serverListener; this.description = ServerDescription.builder().state(CONNECTING).address(serverId.getAddress()).build(); invalidate(); @@ -47,8 +46,8 @@ public void sendNotification(final ServerDescription newDescription) { ServerDescription currentDescription = description; description = newDescription; ServerDescriptionChangedEvent event = new ServerDescriptionChangedEvent(serverId, newDescription, currentDescription); - if (serverDescriptionChangedListener != null) { - serverDescriptionChangedListener.serverDescriptionChanged(event); + if (cluster != null) { + cluster.onChange(event); } if (serverListener != null) { serverListener.serverDescriptionChanged(event);