From f0445e4ada1629e52037420e6dc52c109581397b Mon Sep 17 00:00:00 2001 From: Annie Liang <64233642+xinlian12@users.noreply.github.com> Date: Wed, 21 Oct 2020 22:51:35 +0000 Subject: [PATCH] ConnectionStateListener change (#15991) * ConnectionStateListener change Co-authored-by: Annie Liang --- .../azure/cosmos/DirectConnectionConfig.java | 37 ++++ .../implementation/ConnectionPolicy.java | 24 +++ .../DiagnosticsClientContext.java | 5 +- .../cosmos/implementation/GoneException.java | 11 ++ .../implementation/RxDocumentClientImpl.java | 19 +- .../directconnectivity/AddressResolver.java | 9 +- .../GatewayAddressCache.java | 14 +- .../GlobalAddressResolver.java | 26 ++- .../directconnectivity/IAddressCache.java | 7 + .../directconnectivity/IAddressResolver.java | 6 + .../RntbdTransportClient.java | 54 +++++- .../SharedTransportClient.java | 22 ++- .../StoreClientFactory.java | 13 +- .../rntbd/RntbdClientChannelPool.java | 2 - .../rntbd/RntbdConnectionEvent.java | 9 + .../rntbd/RntbdConnectionStateListener.java | 164 ++++++++++++++++++ .../rntbd/RntbdEndpoint.java | 10 ++ .../rntbd/RntbdServiceEndpoint.java | 52 +++++- .../routing/PartitionKeyRangeIdentity.java | 30 ++-- .../main/java/com/azure/cosmos/util/Beta.java | 14 +- .../CilentConfigDiagnosticsTest.java | 9 +- .../RntbdTransportClientTest.java | 47 +++-- .../SharedTransportClientTest.java | 22 ++- 23 files changed, 525 insertions(+), 81 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionEvent.java create mode 100644 sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionStateListener.java diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/DirectConnectionConfig.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/DirectConnectionConfig.java index 54b5427873df3..c225d29dc9d2b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/DirectConnectionConfig.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/DirectConnectionConfig.java @@ -3,6 +3,7 @@ package com.azure.cosmos; +import com.azure.cosmos.util.Beta; import io.netty.channel.ChannelOption; import java.time.Duration; @@ -15,12 +16,14 @@ */ public final class DirectConnectionConfig { // Constants + private static final Boolean DEFAULT_CONNECTION_ENDPOINT_REDISCOVERY_ENABLED = false; private static final Duration DEFAULT_IDLE_ENDPOINT_TIMEOUT = Duration.ofHours(1l); private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(5L); private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(5L); private static final int DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT = 130; private static final int DEFAULT_MAX_REQUESTS_PER_CONNECTION = 30; + private boolean connectionEndpointRediscoveryEnabled; private Duration connectTimeout; private Duration idleConnectionTimeout; private Duration idleEndpointTimeout; @@ -32,6 +35,7 @@ public final class DirectConnectionConfig { * Constructor */ public DirectConnectionConfig() { + this.connectionEndpointRediscoveryEnabled = DEFAULT_CONNECTION_ENDPOINT_REDISCOVERY_ENABLED; this.connectTimeout = DEFAULT_CONNECT_TIMEOUT; this.idleConnectionTimeout = Duration.ZERO; this.idleEndpointTimeout = DEFAULT_IDLE_ENDPOINT_TIMEOUT; @@ -40,6 +44,39 @@ public DirectConnectionConfig() { this.requestTimeout = DEFAULT_REQUEST_TIMEOUT; } + /** + * Gets a value indicating whether Direct TCP connection endpoint rediscovery is enabled. + *

+ * The connection endpoint rediscovery feature is designed to reduce and spread-out latency spikes that may occur during maintenance operations. + * + * By default, connection endpoint rediscovery is disabled. + * + * @return {@code true} if Direct TCP connection endpoint rediscovery is enabled; {@code false} otherwise. + */ + @Beta(Beta.SinceVersion.V4_8_0) + public boolean isConnectionEndpointRediscoveryEnabled() { + return this.connectionEndpointRediscoveryEnabled; + } + + /** + * Sets a value indicating whether Direct TCP connection endpoint rediscovery should be enabled. + *

+ * The connection endpoint rediscovery feature is designed to reduce and spread-out latency spikes that may occur during maintenance operations. + * + * By default, connection endpoint rediscovery is disabled. + * + * @param connectionEndpointRediscoveryEnabled {@code true} if connection endpoint rediscovery is enabled; {@code + * false} otherwise. + * + * @return the {@linkplain DirectConnectionConfig}. + */ + @Beta(Beta.SinceVersion.V4_8_0) + public DirectConnectionConfig setConnectionEndpointRediscoveryEnabled(boolean connectionEndpointRediscoveryEnabled) { + this.connectionEndpointRediscoveryEnabled = connectionEndpointRediscoveryEnabled; + return this; + } + + /** * Gets the default DIRECT connection configuration. * diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java index ae4645486c693..48ad9cda4ee89 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ConnectionPolicy.java @@ -42,6 +42,7 @@ public final class ConnectionPolicy { private int maxConnectionsPerEndpoint; private int maxRequestsPerConnection; private Duration idleTcpConnectionTimeout; + private boolean tcpConnectionEndpointRediscoveryEnabled; /** * Constructor. @@ -52,6 +53,7 @@ public ConnectionPolicy(GatewayConnectionConfig gatewayConnectionConfig) { this.maxConnectionPoolSize = gatewayConnectionConfig.getMaxConnectionPoolSize(); this.requestTimeout = BridgeInternal.getRequestTimeoutFromGatewayConnectionConfig(gatewayConnectionConfig); this.proxy = gatewayConnectionConfig.getProxy(); + this.tcpConnectionEndpointRediscoveryEnabled = false; } public ConnectionPolicy(DirectConnectionConfig directConnectionConfig) { @@ -62,6 +64,7 @@ public ConnectionPolicy(DirectConnectionConfig directConnectionConfig) { this.maxConnectionsPerEndpoint = directConnectionConfig.getMaxConnectionsPerEndpoint(); this.maxRequestsPerConnection = directConnectionConfig.getMaxRequestsPerConnection(); this.requestTimeout = BridgeInternal.getRequestTimeoutFromDirectConnectionConfig(directConnectionConfig); + this.tcpConnectionEndpointRediscoveryEnabled = directConnectionConfig.isConnectionEndpointRediscoveryEnabled(); } private ConnectionPolicy(ConnectionMode connectionMode) { @@ -75,6 +78,26 @@ private ConnectionPolicy(ConnectionMode connectionMode) { this.userAgentSuffix = ""; } + /** + * Gets a value that indicates whether Direct TCP connection endpoint rediscovery is enabled. + * + * @return {@code true} if Direct TCP connection endpoint rediscovery should is enabled; {@code false} otherwise. + */ + public boolean isTcpConnectionEndpointRediscoveryEnabled() { + return this.tcpConnectionEndpointRediscoveryEnabled; + } + + /** + * Sets a value that indicates whether Direct TCP connection endpoint rediscovery is enabled. + * + * @return the {@linkplain ConnectionPolicy}. + */ + public ConnectionPolicy setTcpConnectionEndpointRediscoveryEnabled(boolean tcpConnectionEndpointRediscoveryEnabled) { + this.tcpConnectionEndpointRediscoveryEnabled = tcpConnectionEndpointRediscoveryEnabled; + return this; + } + + /** * Gets the default connection policy. * @@ -493,6 +516,7 @@ public String toString() { ", idleEndpointTimeout=" + idleEndpointTimeout + ", maxConnectionsPerEndpoint=" + maxConnectionsPerEndpoint + ", maxRequestsPerConnection=" + maxRequestsPerConnection + + ", tcpConnectionEndpointRediscoveryEnabled=" + tcpConnectionEndpointRediscoveryEnabled + '}'; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java index c382117ba2106..e88fdcd4d6589 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/DiagnosticsClientContext.java @@ -186,13 +186,14 @@ private String rntbdConfigInternal(RntbdTransportClient.Options rntbdOptions) { if (rntbdOptions == null) { return null; } - return Strings.lenientFormat("(cto:%s, rto:%s, icto:%s, ieto:%s, mcpe:%s, mrpc:%s)", + return Strings.lenientFormat("(cto:%s, rto:%s, icto:%s, ieto:%s, mcpe:%s, mrpc:%s, cer:%s)", rntbdOptions.connectTimeout(), rntbdOptions.requestTimeout(), rntbdOptions.idleChannelTimeout(), rntbdOptions.idleEndpointTimeout(), rntbdOptions.maxChannelsPerEndpoint(), - rntbdOptions.maxRequestsPerChannel()); + rntbdOptions.maxRequestsPerChannel(), + rntbdOptions.isConnectionEndpointRediscoveryEnabled()); } private String preferredRegionsInternal() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GoneException.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GoneException.java index ddac13e482053..ea620b1c9c10d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GoneException.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/GoneException.java @@ -75,6 +75,17 @@ public GoneException(String message, String requestUri) { this(message, null, new HashMap<>(), requestUri); } + /** + * Instantiates a new {@link GoneException Gone exception}. + * + * @param message the message + * @param requestUri the request uri + * @param cause the cause of this (client-side) {@link GoneException} + */ + public GoneException(String message, URI requestUri, Exception cause) { + this(message, cause, null, requestUri); + } + GoneException(Exception innerException) { this(RMResources.Gone, innerException, new HashMap<>(), null); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index f660042362ef8..7848dc4d5beaf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -397,15 +397,6 @@ public void init() { private void initializeDirectConnectivity() { - this.storeClientFactory = new StoreClientFactory( - this.diagnosticsClientConfig, - this.configs, - this.connectionPolicy, - // this.maxConcurrentConnectionOpenRequests, - this.userAgentContainer, - this.connectionSharingAcrossClientsEnabled - ); - this.addressResolver = new GlobalAddressResolver(this, this.reactorHttpClient, this.globalEndpointManager, @@ -419,6 +410,16 @@ private void initializeDirectConnectivity() { null, this.connectionPolicy); + this.storeClientFactory = new StoreClientFactory( + this.addressResolver, + this.diagnosticsClientConfig, + this.configs, + this.connectionPolicy, + // this.maxConcurrentConnectionOpenRequests, + this.userAgentContainer, + this.connectionSharingAcrossClientsEnabled + ); + this.createStoreModel(true); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java index 53748ee926d3b..9577f1d3ff42a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/AddressResolver.java @@ -21,16 +21,18 @@ import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.apachecommons.lang.NotImplementedException; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.caches.RxCollectionCache; import com.azure.cosmos.implementation.routing.CollectionRoutingMap; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; -import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import java.util.Set; import java.util.concurrent.Callable; import java.util.function.Function; @@ -62,6 +64,11 @@ public void initializeCaches( this.collectionRoutingMapCache = collectionRoutingMapCache; } + @Override + public void remove(RxDocumentServiceRequest request, Set partitionKeyRangeIdentitySet) { + throw new NotImplementedException("remove() is not supported in AddressResolver"); + } + public Mono resolveAsync( RxDocumentServiceRequest request, boolean forceRefreshPartitionAddresses) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java index c75fc5a3912b6..2cd4ff3e5ea62 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java @@ -38,7 +38,6 @@ import com.azure.cosmos.implementation.http.HttpResponse; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.timeout.ReadTimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -55,6 +54,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -142,6 +142,18 @@ public GatewayAddressCache( DefaultSuboptimalPartitionForceRefreshIntervalInSeconds); } + @Override + public void removeAddress(final PartitionKeyRangeIdentity partitionKeyRangeIdentity) { + + Objects.requireNonNull(partitionKeyRangeIdentity, "expected non-null partitionKeyRangeIdentity"); + + if (partitionKeyRangeIdentity.getPartitionKeyRangeId().equals(PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) { + this.masterPartitionAddressCache = null; + } else { + this.serverPartitionAddressCache.remove(partitionKeyRangeIdentity); + } + } + @Override public Mono> tryGetAddresses(RxDocumentServiceRequest request, PartitionKeyRangeIdentity partitionKeyRangeIdentity, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java index 58da0cb9b3805..492c35f577305 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GlobalAddressResolver.java @@ -27,6 +27,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -43,8 +45,6 @@ public class GlobalAddressResolver implements IAddressResolver { private final GatewayServiceConfigurationReader serviceConfigReader; final Map addressCacheByEndpoint; - private GatewayAddressCache gatewayAddressCache; - private AddressResolver addressResolver; private HttpClient httpClient; public GlobalAddressResolver( @@ -94,12 +94,32 @@ Mono openAsync(DocumentCollection collection) { for (EndpointCache endpointCache : this.addressCacheByEndpoint.values()) { tasks.add(endpointCache.addressCache.openAsync(collection, ranges)); } - @SuppressWarnings({"rawtypes", "unchecked"}) + @SuppressWarnings({ "rawtypes", "unchecked" }) Mono[] array = new Mono[this.addressCacheByEndpoint.values().size()]; return Flux.mergeDelayError(Queues.SMALL_BUFFER_SIZE, tasks.toArray(array)).then(); }); } + @Override + public void remove( + final RxDocumentServiceRequest request, + final Set partitionKeyRangeIdentitySet) { + + Objects.requireNonNull(request, "expected non-null request"); + Objects.requireNonNull(partitionKeyRangeIdentitySet, "expected non-null partitionKeyRangeIdentitySet"); + + if (partitionKeyRangeIdentitySet.size() > 0) { + + URI addressResolverURI = this.endpointManager.resolveServiceEndpoint(request); + + this.addressCacheByEndpoint.computeIfPresent(addressResolverURI, (ignored, endpointCache) -> { + final GatewayAddressCache addressCache = endpointCache.addressCache; + partitionKeyRangeIdentitySet.forEach(partitionKeyRangeIdentity -> addressCache.removeAddress(partitionKeyRangeIdentity)); + return endpointCache; + }); + } + } + @Override public Mono resolveAsync(RxDocumentServiceRequest request, boolean forceRefresh) { IAddressResolver resolver = this.getAddressResolver(request); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java index 5b01ff483050e..0f58758128420 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressCache.java @@ -10,6 +10,13 @@ public interface IAddressCache { + /** + * Removes the physical address associated with the given {@link PartitionKeyRangeIdentity partition key range identity} + * + * + */ + void removeAddress(PartitionKeyRangeIdentity partitionKeyRangeIdentity); + /** * Resolves physical addresses by either PartitionKeyRangeIdentity. * diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressResolver.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressResolver.java index 122d8f451f166..77d2ef31306fb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressResolver.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/IAddressResolver.java @@ -4,9 +4,15 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; import reactor.core.publisher.Mono; +import java.util.Set; + public interface IAddressResolver { + + void remove(RxDocumentServiceRequest request, Set partitionKeyRangeIdentitySet); + Mono resolveAsync( RxDocumentServiceRequest request, boolean forceRefreshPartitionAddresses); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java index df8b197ade7a8..0e0e09fd15f0b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClient.java @@ -93,14 +93,44 @@ public final class RntbdTransportClient extends TransportClient { // region Constructors + /** + * Initializes a newly created {@linkplain RntbdTransportClient} object. + * + * @param configs A {@link Configs} instance containing the {@link SslContext} to be used. + * @param connectionPolicy The {@linkplain ConnectionPolicy connection policy} to be applied. + * @param userAgent The {@linkplain UserAgentContainer user agent} identifying. + * @param addressResolver The address resolver to be used for connection endpoint rediscovery, if connection + * endpoint rediscovery is enabled by {@code connectionPolicy}. + */ + public RntbdTransportClient( + final Configs configs, + final ConnectionPolicy connectionPolicy, + final UserAgentContainer userAgent, + final IAddressResolver addressResolver) { + + this( + new Options.Builder(connectionPolicy).userAgent(userAgent).build(), + configs.getSslContext(), + addressResolver); + } + RntbdTransportClient(final RntbdEndpoint.Provider endpointProvider) { this.endpointProvider = endpointProvider; this.id = instanceCount.incrementAndGet(); this.tag = RntbdTransportClient.tag(this.id); } - RntbdTransportClient(final Options options, final SslContext sslContext) { - this.endpointProvider = new RntbdServiceEndpoint.Provider(this, options, sslContext); + RntbdTransportClient( + final Options options, + final SslContext sslContext, + final IAddressResolver addressResolver) { + + this.endpointProvider = new RntbdServiceEndpoint.Provider( + this, + options, + checkNotNull(sslContext, "expected non-null sslContext"), + addressResolver); + this.id = instanceCount.incrementAndGet(); this.tag = RntbdTransportClient.tag(this.id); } @@ -186,7 +216,8 @@ public Mono invokeStoreAsync(final Uri addressUri, final RxDocume error = new GoneException( lenientFormat("an unexpected %s occurred: %s", unexpectedError), - address.toString()); + address, + error instanceof Exception ? (Exception) error : new RuntimeException(error)); } assert error instanceof CosmosException; @@ -300,6 +331,9 @@ public static final class Options { @JsonProperty() private final Duration connectionAcquisitionTimeout; + @JsonProperty() + private final boolean connectionEndpointRediscoveryEnabled; + @JsonProperty() private final Duration connectTimeout; @@ -361,6 +395,7 @@ private Options(final Builder builder) { this.bufferPageSize = builder.bufferPageSize; this.connectionAcquisitionTimeout = builder.connectionAcquisitionTimeout; + this.connectionEndpointRediscoveryEnabled = builder.connectionEndpointRediscoveryEnabled; this.idleChannelTimeout = builder.idleChannelTimeout; this.idleChannelTimerResolution = builder.idleChannelTimerResolution; this.idleEndpointTimeout = builder.idleEndpointTimeout; @@ -385,6 +420,7 @@ private Options(final Builder builder) { private Options(final ConnectionPolicy connectionPolicy) { this.bufferPageSize = 8192; this.connectionAcquisitionTimeout = Duration.ofSeconds(5L); + this.connectionEndpointRediscoveryEnabled = connectionPolicy.isTcpConnectionEndpointRediscoveryEnabled(); this.connectTimeout = connectionPolicy.getConnectTimeout(); this.idleChannelTimeout = connectionPolicy.getIdleTcpConnectionTimeout(); this.idleChannelTimerResolution = Duration.ofMillis(100); @@ -431,6 +467,10 @@ public Duration idleEndpointTimeout() { return this.idleEndpointTimeout; } + public boolean isConnectionEndpointRediscoveryEnabled() { + return this.connectionEndpointRediscoveryEnabled; + } + public int maxBufferCapacity() { return this.maxBufferCapacity; } @@ -517,6 +557,7 @@ public String toString() { *

{@code RntbdTransportClient.class.getClassLoader().getResourceAsStream("azure.cosmos.directTcp.defaultOptions.json")}
*

Example:

{@code {
          *   "bufferPageSize": 8192,
+         *   "connectionEndpointRediscoveryEnabled": true,
          *   "connectTimeout": "PT1M",
          *   "idleChannelTimeout": "PT0S",
          *   "idleEndpointTimeout": "PT1M10S",
@@ -605,6 +646,7 @@ public static class Builder {
 
             private int bufferPageSize;
             private final Duration connectionAcquisitionTimeout;
+            private boolean connectionEndpointRediscoveryEnabled;
             private Duration connectTimeout;
             private Duration idleChannelTimeout;
             private Duration idleChannelTimerResolution;
@@ -630,6 +672,7 @@ public Builder(ConnectionPolicy connectionPolicy) {
 
                 this.bufferPageSize = DEFAULT_OPTIONS.bufferPageSize;
                 this.connectionAcquisitionTimeout = DEFAULT_OPTIONS.connectionAcquisitionTimeout;
+                this.connectionEndpointRediscoveryEnabled = DEFAULT_OPTIONS.connectionEndpointRediscoveryEnabled;
                 this.connectTimeout = connectionPolicy.getConnectTimeout();
                 this.idleChannelTimeout = connectionPolicy.getIdleTcpConnectionTimeout();
                 this.idleChannelTimerResolution = DEFAULT_OPTIONS.idleChannelTimerResolution;
@@ -677,6 +720,11 @@ public Builder connectionAcquisitionTimeout(final Duration value) {
                 return this;
             }
 
+            public Builder connectionEndpointRediscoveryEnabled(final boolean value) {
+                this.connectionEndpointRediscoveryEnabled = value;
+                return this;
+            }
+
             public Builder connectionTimeout(final Duration value) {
                 checkArgument(value == null || value.compareTo(Duration.ZERO) > 0,
                     "expected positive value, not %s",
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClient.java
index dd56c4bff6ce9..e9b622e350890 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClient.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClient.java
@@ -31,12 +31,19 @@ public class SharedTransportClient extends TransportClient {
     private static SharedTransportClient sharedTransportClient;
     private final RntbdTransportClient.Options rntbdOptions;
 
-    public static TransportClient getOrCreateInstance(Protocol protocol, Configs configs, ConnectionPolicy connectionPolicy, UserAgentContainer userAgent, DiagnosticsClientContext.DiagnosticsClientConfig diagnosticsClientConfig) {
+    public static TransportClient getOrCreateInstance(
+        Protocol protocol,
+        Configs configs,
+        ConnectionPolicy connectionPolicy,
+        UserAgentContainer userAgent,
+        DiagnosticsClientContext.DiagnosticsClientConfig diagnosticsClientConfig,
+        IAddressResolver addressResolver) {
+
         synchronized (SharedTransportClient.class) {
             if (sharedTransportClient == null) {
                 assert counter.get() == 0;
                 logger.info("creating a new shared RntbdTransportClient");
-                sharedTransportClient = new SharedTransportClient(protocol, configs, connectionPolicy, userAgent);
+                sharedTransportClient = new SharedTransportClient(protocol, configs, connectionPolicy, userAgent, addressResolver);
             } else {
                 logger.info("Reusing an instance of RntbdTransportClient");
             }
@@ -50,12 +57,17 @@ public static TransportClient getOrCreateInstance(Protocol protocol, Configs con
 
     private final TransportClient transportClient;
 
-    private SharedTransportClient(Protocol protocol, Configs configs, ConnectionPolicy connectionPolicy, UserAgentContainer userAgent) {
+    private SharedTransportClient(
+        Protocol protocol,
+        Configs configs,
+        ConnectionPolicy connectionPolicy,
+        UserAgentContainer userAgent,
+        IAddressResolver addressResolver) {
         if (protocol == Protocol.TCP) {
             this.rntbdOptions =
                 new RntbdTransportClient.Options.Builder(connectionPolicy).userAgent(userAgent).build();
-            this.transportClient = new RntbdTransportClient(rntbdOptions, configs.getSslContext());
-            
+            this.transportClient = new RntbdTransportClient(rntbdOptions, configs.getSslContext(), addressResolver);
+
         } else if (protocol == Protocol.HTTPS){
             this.rntbdOptions = null;
             this.transportClient = new HttpTransportClient(configs, connectionPolicy, userAgent);
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClientFactory.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClientFactory.java
index c9538d2b67b74..b69a87eba3730 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClientFactory.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreClientFactory.java
@@ -7,7 +7,6 @@
 import com.azure.cosmos.implementation.ConnectionPolicy;
 import com.azure.cosmos.implementation.DiagnosticsClientContext;
 import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
-import com.azure.cosmos.implementation.RxDocumentClientImpl;
 import com.azure.cosmos.implementation.SessionContainer;
 import com.azure.cosmos.implementation.UserAgentContainer;
 
@@ -29,6 +28,7 @@ public class StoreClientFactory implements AutoCloseable {
     private volatile boolean isClosed;
 
     public StoreClientFactory(
+        IAddressResolver addressResolver,
         DiagnosticsClientContext.DiagnosticsClientConfig diagnosticsClientConfig,
         Configs configs,
         ConnectionPolicy connectionPolicy,
@@ -37,8 +37,13 @@ public StoreClientFactory(
         this.configs = configs;
         this.protocol = configs.getProtocol();
         if (enableTransportClientSharing) {
-            this.transportClient = SharedTransportClient.getOrCreateInstance(protocol, configs
-                , connectionPolicy, userAgent, diagnosticsClientConfig);
+            this.transportClient = SharedTransportClient.getOrCreateInstance(
+                protocol,
+                configs,
+                connectionPolicy,
+                userAgent,
+                diagnosticsClientConfig,
+                addressResolver);
         } else {
             if (protocol == Protocol.HTTPS) {
                 this.transportClient = new HttpTransportClient(configs, connectionPolicy, userAgent);
@@ -46,7 +51,7 @@ public StoreClientFactory(
 
                 RntbdTransportClient.Options rntbdOptions =
                     new RntbdTransportClient.Options.Builder(connectionPolicy).userAgent(userAgent).build();
-                this.transportClient = new RntbdTransportClient(rntbdOptions, configs.getSslContext());
+                this.transportClient = new RntbdTransportClient(rntbdOptions, configs.getSslContext(), addressResolver);
                 diagnosticsClientConfig.withRntbdOptions(rntbdOptions);
 
             } else {
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java
index b7f1bbfe6dabe..7f67cf21149ba 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdClientChannelPool.java
@@ -3,7 +3,6 @@
 
 package com.azure.cosmos.implementation.directconnectivity.rntbd;
 
-import com.azure.cosmos.implementation.Utils;
 import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint.Config;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.SerializerProvider;
@@ -27,7 +26,6 @@
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
-import io.netty.util.concurrent.SingleThreadEventExecutor;
 import io.netty.util.internal.ThrowableUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionEvent.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionEvent.java
new file mode 100644
index 0000000000000..28e52ebe8dfd0
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionEvent.java
@@ -0,0 +1,9 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation.directconnectivity.rntbd;
+
+public enum RntbdConnectionEvent {
+    READ_EOF,
+    READ_FAILURE;
+}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionStateListener.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionStateListener.java
new file mode 100644
index 0000000000000..942e351c4ebf3
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdConnectionStateListener.java
@@ -0,0 +1,164 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation.directconnectivity.rntbd;
+
+import com.azure.cosmos.implementation.GoneException;
+import com.azure.cosmos.implementation.RxDocumentServiceRequest;
+import com.azure.cosmos.implementation.directconnectivity.IAddressResolver;
+import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.time.Instant;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
+
+public class RntbdConnectionStateListener {
+    // region Fields
+
+    private static final Logger logger = LoggerFactory.getLogger(RntbdConnectionStateListener.class);
+
+    private final IAddressResolver addressResolver;
+    private final RntbdEndpoint endpoint;
+    private final Set partitionAddressCache;
+    private final AtomicBoolean updatingAddressCache = new AtomicBoolean(false);
+
+    // endregion
+
+    // region Constructors
+
+    public RntbdConnectionStateListener(final IAddressResolver addressResolver, final RntbdEndpoint endpoint) {
+        this.addressResolver = checkNotNull(addressResolver, "expected non-null addressResolver");
+        this.endpoint = checkNotNull(endpoint, "expected non-null endpoint");
+        this.partitionAddressCache = ConcurrentHashMap.newKeySet();
+    }
+
+    // endregion
+
+    // region Methods
+
+    public void onException(final RxDocumentServiceRequest request, Throwable exception) {
+        checkNotNull(request, "expect non-null request");
+        checkNotNull(exception, "expect non-null exception");
+
+        if (exception instanceof GoneException) {
+            final Throwable cause = exception.getCause();
+
+            if (cause != null) {
+
+                // GoneException was produced by the client, not the server
+                //
+                // This could occur for example:
+                //
+                // * an operation fails due to an IOException which indicates a connection reset by the server,
+                // * a channel closes unexpectedly because the server stopped taking requests, or
+                // * an error was detected by the transport client (e.g., IllegalStateException)
+                // * a request timed out in pending acquisition queue
+                // * a request failed fast in admission control layer due to high load
+                // * channel connect timed out
+                //
+                // Currently, only ClosedChannelException will raise onConnectionEvent since it is more sure of a signal the server is going down.
+
+                if (cause instanceof IOException) {
+
+                    if (cause instanceof ClosedChannelException) {
+                        this.onConnectionEvent(RntbdConnectionEvent.READ_EOF, request, exception);
+                    } else {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Will not raise the connection state change event for error {}", cause);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public void updateConnectionState(final RxDocumentServiceRequest request) {
+
+        checkNotNull("expect non-null request");
+
+        PartitionKeyRangeIdentity partitionKeyRangeIdentity = this.getPartitionKeyRangeIdentity(request);
+        checkNotNull(partitionKeyRangeIdentity, "expected non-null partitionKeyRangeIdentity");
+
+        this.partitionAddressCache.add(partitionKeyRangeIdentity);
+
+        if (logger.isDebugEnabled()) {
+            logger.debug(
+                "updateConnectionState({\"time\":{},\"endpoint\":{},\"partitionKeyRangeIdentity\":{}})",
+                RntbdObjectMapper.toJson(Instant.now()),
+                RntbdObjectMapper.toJson(endpoint),
+                RntbdObjectMapper.toJson(partitionKeyRangeIdentity));
+        }
+    }
+
+    // endregion
+
+    // region Privates
+
+    private PartitionKeyRangeIdentity getPartitionKeyRangeIdentity(final RxDocumentServiceRequest request) {
+        checkNotNull(request, "expect non-null request");
+
+        PartitionKeyRangeIdentity partitionKeyRangeIdentity = request.getPartitionKeyRangeIdentity();
+
+        if (partitionKeyRangeIdentity == null) {
+
+            final String partitionKeyRange = checkNotNull(
+                request.requestContext.resolvedPartitionKeyRange, "expected non-null resolvedPartitionKeyRange").getId();
+
+            final String collectionRid = request.requestContext.resolvedCollectionRid;
+
+            partitionKeyRangeIdentity = collectionRid != null
+                ? new PartitionKeyRangeIdentity(collectionRid, partitionKeyRange)
+                : new PartitionKeyRangeIdentity(partitionKeyRange);
+        }
+
+        return partitionKeyRangeIdentity;
+    }
+
+    private void onConnectionEvent(final RntbdConnectionEvent event, final RxDocumentServiceRequest request, final Throwable exception) {
+
+        checkNotNull(request, "expected non-null exception");
+        checkNotNull(exception, "expected non-null exception");
+
+        if (event == RntbdConnectionEvent.READ_EOF) {
+            if (!this.endpoint.isClosed()) {
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("onConnectionEvent({\"event\":{},\"time\":{},\"endpoint\":{},\"cause\":{})",
+                        event,
+                        RntbdObjectMapper.toJson(Instant.now()),
+                        RntbdObjectMapper.toJson(this.endpoint),
+                        RntbdObjectMapper.toJson(exception));
+                }
+
+                this.updateAddressCache(request);
+            }
+        }
+    }
+
+    private void updateAddressCache(final RxDocumentServiceRequest request) {
+        try{
+            if (this.updatingAddressCache.compareAndSet(false, true)) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug(
+                        "updateAddressCache ({\"time\":{},\"endpoint\":{},\"partitionAddressCache\":{}})",
+                        RntbdObjectMapper.toJson(Instant.now()),
+                        RntbdObjectMapper.toJson(this.endpoint),
+                        RntbdObjectMapper.toJson(this.partitionAddressCache));
+                }
+
+                this.addressResolver.remove(request, this.partitionAddressCache);
+                this.partitionAddressCache.clear();
+            }
+        } finally {
+            this.updatingAddressCache.set(false);
+        }
+    }
+    // endregion
+}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java
index 5157ee0edf4aa..a1ecb71c44758 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdEndpoint.java
@@ -4,6 +4,7 @@
 package com.azure.cosmos.implementation.directconnectivity.rntbd;
 
 import com.azure.cosmos.implementation.UserAgentContainer;
+import com.azure.cosmos.implementation.directconnectivity.IAddressResolver;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import io.micrometer.core.instrument.Tag;
@@ -58,6 +59,8 @@ public interface RntbdEndpoint extends AutoCloseable {
 
     SocketAddress remoteAddress();
 
+    URI remoteURI();
+
     int requestQueueLength();
 
     Tag tag();
@@ -92,6 +95,8 @@ interface Provider extends AutoCloseable {
 
         RntbdEndpoint get(URI physicalAddress);
 
+        IAddressResolver getAddressResolver();
+
         Stream list();
     }
 
@@ -155,6 +160,11 @@ public long idleEndpointTimeoutInNanos() {
             return this.options.idleEndpointTimeout().toNanos();
         }
 
+        @JsonProperty()
+        public boolean isConnectionEndpointRediscoveryEnabled() {
+            return this.options.isConnectionEndpointRediscoveryEnabled();
+        }
+
         @JsonProperty
         public int maxBufferCapacity() {
             return this.options.maxBufferCapacity();
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java
index 4c8212ce904b8..99fc273031286 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdServiceEndpoint.java
@@ -7,8 +7,8 @@
 import com.azure.cosmos.CosmosException;
 import com.azure.cosmos.implementation.GoneException;
 import com.azure.cosmos.implementation.HttpConstants;
+import com.azure.cosmos.implementation.directconnectivity.IAddressResolver;
 import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
-import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
 import com.azure.cosmos.implementation.guava25.collect.ImmutableMap;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.SerializerProvider;
@@ -33,6 +33,7 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.UUID;
@@ -73,11 +74,14 @@ public final class RntbdServiceEndpoint implements RntbdEndpoint {
     private final Instant createdTime;
     private final RntbdMetrics metrics;
     private final Provider provider;
+    private final URI remoteURI;
     private final SocketAddress remoteAddress;
     private final RntbdRequestTimer requestTimer;
     private final Tag tag;
     private final int maxConcurrentRequests;
 
+    private final RntbdConnectionStateListener connectionStateListener;
+
     // endregion
 
     // region Constructors
@@ -89,6 +93,21 @@ private RntbdServiceEndpoint(
         final RntbdRequestTimer timer,
         final URI physicalAddress) {
 
+        try {
+            this.remoteURI = new URI(
+                physicalAddress.getScheme(),
+                null,
+                physicalAddress.getHost(),
+                physicalAddress.getPort(),
+                null,
+                null,
+                null);
+        } catch (URISyntaxException error) {
+            throw new IllegalArgumentException(
+                lenientFormat("physicalAddress %s cannot be parsed as a server-based authority", physicalAddress),
+                error);
+        }
+
         final Bootstrap bootstrap = new Bootstrap()
             .channel(NioSocketChannel.class)
             .group(group)
@@ -97,7 +116,7 @@ private RntbdServiceEndpoint(
             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.connectTimeoutInMillis())
             .option(ChannelOption.RCVBUF_ALLOCATOR, receiveBufferAllocator)
             .option(ChannelOption.SO_KEEPALIVE, true)
-            .remoteAddress(physicalAddress.getHost(), physicalAddress.getPort());
+            .remoteAddress(this.remoteURI.getHost(), this.remoteURI.getPort());
 
         this.createdTime = Instant.now();
         this.channelPool = new RntbdClientChannelPool(this, bootstrap, config);
@@ -119,6 +138,10 @@ private RntbdServiceEndpoint(
 
         this.metrics = new RntbdMetrics(provider.transportClient, this);
         this.maxConcurrentRequests = config.maxConcurrentRequestsPerEndpoint();
+
+        this.connectionStateListener = this.provider.addressResolver != null && config.isConnectionEndpointRediscoveryEnabled()
+            ? new RntbdConnectionStateListener(this.provider.addressResolver, this)
+            : null;
     }
 
     // endregion
@@ -194,6 +217,9 @@ public SocketAddress remoteAddress() {
         return this.remoteAddress;
     }
 
+    @Override
+    public URI remoteURI() { return this.remoteURI; }
+
     @Override
     public int requestQueueLength() {
         return this.channelPool.requestQueueLength();
@@ -249,6 +275,10 @@ public RntbdRequestRecord request(final RntbdRequestArgs args) {
             }
         }
 
+        if (this.connectionStateListener != null) {
+            this.connectionStateListener.updateConnectionState(args.serviceRequest());
+        }
+
         this.lastRequestNanoTime.set(args.nanoTimeCreated());
 
         final RntbdRequestRecord record = this.write(args);
@@ -257,18 +287,22 @@ public RntbdRequestRecord request(final RntbdRequestArgs args) {
         record.whenComplete((response, error) -> {
             this.concurrentRequests.decrementAndGet();
             this.metrics.markComplete(record);
-            onResponse(response, error);
+            onResponse(error, record);
         });
 
         return record;
     }
 
-    private void onResponse(StoreResponse storeResponse, Throwable exception) {
+    private void onResponse(Throwable exception, RntbdRequestRecord record) {
         if (exception == null) {
             this.lastSuccessfulRequestNanoTime.set(System.nanoTime());
             return;
         }
 
+        if (this.connectionStateListener != null) {
+            this.connectionStateListener.onException(record.args().serviceRequest(), exception);
+        }
+
         // exception != null
         if (exception instanceof CosmosException) {
             CosmosException cosmosException = (CosmosException) exception;
@@ -426,11 +460,13 @@ public static final class Provider implements RntbdEndpoint.Provider {
         private final RntbdEndpointMonitoringProvider monitoring;
         private final RntbdRequestTimer requestTimer;
         private final RntbdTransportClient transportClient;
+        private final IAddressResolver addressResolver;
 
         public Provider(
             final RntbdTransportClient transportClient,
             final Options options,
-            final SslContext sslContext) {
+            final SslContext sslContext,
+            final IAddressResolver addressResolver) {
 
             checkNotNull(transportClient, "expected non-null provider");
             checkNotNull(options, "expected non-null options");
@@ -445,6 +481,7 @@ public Provider(
                 wireLogLevel = null;
             }
 
+            this.addressResolver = addressResolver;
             this.transportClient = transportClient;
             this.config = new Config(options, sslContext, wireLogLevel);
 
@@ -514,6 +551,11 @@ public RntbdEndpoint get(final URI physicalAddress) {
                 physicalAddress));
         }
 
+        @Override
+        public IAddressResolver getAddressResolver() {
+            return this.addressResolver;
+        }
+
         @Override
         public Stream list() {
             return this.endpoints.values().stream();
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyRangeIdentity.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyRangeIdentity.java
index 83048a762f8be..84624c6d9e7e6 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyRangeIdentity.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyRangeIdentity.java
@@ -5,12 +5,14 @@
 
 import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
 
+import java.util.Objects;
+
 /**
  * Used internally to represents the identity of a partition key range in the Azure Cosmos DB database service.
  */
 public final class PartitionKeyRangeIdentity {
-    private String collectionRid;
-    private String partitionKeyRangeId;
+    private final String collectionRid;
+    private final String partitionKeyRangeId;
 
     public PartitionKeyRangeIdentity(String collectionRid, String partitionKeyRangeId) {
         if (collectionRid == null) {
@@ -40,6 +42,7 @@ public PartitionKeyRangeIdentity(String partitionKeyRangeId) {
             throw new IllegalArgumentException("partitionKeyRangeId");
         }
 
+        this.collectionRid = null;
         this.partitionKeyRangeId = partitionKeyRangeId;
     }
 
@@ -72,21 +75,28 @@ public String toString() {
 
     @Override
     public boolean equals(Object other) {
-        if (null == other) {
-            return false;
-        }
         if (this == other) {
             return true;
         }
-        return other instanceof PartitionKeyRangeIdentity
-                && ((PartitionKeyRangeIdentity) other).collectionRid.equals(this.collectionRid)
-                && ((PartitionKeyRangeIdentity) other).partitionKeyRangeId.equals(this.partitionKeyRangeId);
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        PartitionKeyRangeIdentity that = (PartitionKeyRangeIdentity) other;
+
+        if (!Objects.equals(this.collectionRid, that.collectionRid)) {
+            return false;
+        }
+
+        return partitionKeyRangeId.equals(that.partitionKeyRangeId);
     }
 
     @Override
     public int hashCode() {
-        return ((this.collectionRid != null ? this.collectionRid.hashCode() : 0) * 397)
-                ^ (this.partitionKeyRangeId != null ? this.partitionKeyRangeId.hashCode() : 0);
+        int result = collectionRid != null ? collectionRid.hashCode() : 0;
+        result = (397 * result) ^ partitionKeyRangeId.hashCode();
+        return result;
     }
 
     public String getCollectionRid() {
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java
index 800578a4e6730..c981a2fe3f113 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java
@@ -3,17 +3,17 @@
 
 package com.azure.cosmos.util;
 
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.ElementType.TYPE;
-import static java.lang.annotation.ElementType.CONSTRUCTOR;
-
 import java.lang.annotation.Documented;
 import java.lang.annotation.Inherited;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
+import static java.lang.annotation.ElementType.CONSTRUCTOR;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.ElementType.TYPE;
+
 @Documented
 @Retention(RetentionPolicy.CLASS)
 @Target({ TYPE, METHOD, PARAMETER, CONSTRUCTOR })
@@ -45,6 +45,8 @@ public enum SinceVersion {
         /** v4.6.0 */
         V4_6_0,
         /** v4.7.0 */
-        V4_7_0
+        V4_7_0,
+        /** v4.8.0 */
+        V4_8_0,
     }
 }
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/CilentConfigDiagnosticsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/CilentConfigDiagnosticsTest.java
index 3252af650815d..2e297ef68deb4 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/CilentConfigDiagnosticsTest.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/CilentConfigDiagnosticsTest.java
@@ -8,21 +8,14 @@
 import com.azure.cosmos.implementation.http.HttpClientConfig;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.io.IOUtils;
 import org.mockito.Mockito;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import java.io.StringWriter;
-import java.io.Writer;
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -78,7 +71,7 @@ public void rntbd() throws Exception {
         assertThat(objectNode.get("id").asInt()).isEqualTo(1);
         assertThat(objectNode.get("numberOfClients").asInt()).isEqualTo(2);
         assertThat(objectNode.get("consistencyCfg").asText()).isEqualTo("(consistency: null, mm: false, prgns: [])");
-        assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("(cto:PT5S, rto:PT5S, icto:PT0S, ieto:PT1H, mcpe:130, mrpc:30)");
+        assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("(cto:PT5S, rto:PT5S, icto:PT0S, ieto:PT1H, mcpe:130, mrpc:30, cer:false)");
         assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:null, rto:null, icto:null, p:false)");
         assertThat(objectNode.get("connCfg").get("other").asText()).isEqualTo("(ed: false, cs: false)");
 
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java
index f85e611f643be..cb5ac984f3c9f 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/RntbdTransportClientTest.java
@@ -4,10 +4,12 @@
 package com.azure.cosmos.implementation.directconnectivity;
 
 import com.azure.core.credential.AzureKeyCredential;
+import com.azure.cosmos.CosmosException;
 import com.azure.cosmos.implementation.BadRequestException;
+import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider;
 import com.azure.cosmos.implementation.ConflictException;
-import com.azure.cosmos.CosmosException;
 import com.azure.cosmos.implementation.ConnectionPolicy;
+import com.azure.cosmos.implementation.FailureValidator;
 import com.azure.cosmos.implementation.ForbiddenException;
 import com.azure.cosmos.implementation.GoneException;
 import com.azure.cosmos.implementation.InternalServerErrorException;
@@ -15,23 +17,21 @@
 import com.azure.cosmos.implementation.LockedException;
 import com.azure.cosmos.implementation.MethodNotAllowedException;
 import com.azure.cosmos.implementation.NotFoundException;
+import com.azure.cosmos.implementation.OperationType;
 import com.azure.cosmos.implementation.PartitionIsMigratingException;
 import com.azure.cosmos.implementation.PartitionKeyRangeGoneException;
 import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException;
+import com.azure.cosmos.implementation.Paths;
 import com.azure.cosmos.implementation.PreconditionFailedException;
 import com.azure.cosmos.implementation.RequestEntityTooLargeException;
 import com.azure.cosmos.implementation.RequestRateTooLargeException;
 import com.azure.cosmos.implementation.RequestTimeoutException;
 import com.azure.cosmos.implementation.RequestVerb;
+import com.azure.cosmos.implementation.ResourceType;
 import com.azure.cosmos.implementation.RetryWithException;
+import com.azure.cosmos.implementation.RxDocumentServiceRequest;
 import com.azure.cosmos.implementation.ServiceUnavailableException;
 import com.azure.cosmos.implementation.UnauthorizedException;
-import com.azure.cosmos.implementation.BaseAuthorizationTokenProvider;
-import com.azure.cosmos.implementation.FailureValidator;
-import com.azure.cosmos.implementation.OperationType;
-import com.azure.cosmos.implementation.Paths;
-import com.azure.cosmos.implementation.ResourceType;
-import com.azure.cosmos.implementation.RxDocumentServiceRequest;
 import com.azure.cosmos.implementation.UserAgentContainer;
 import com.azure.cosmos.implementation.Utils;
 import com.azure.cosmos.implementation.directconnectivity.rntbd.AsyncRntbdRequestRecord;
@@ -69,6 +69,7 @@
 import java.net.ConnectException;
 import java.net.SocketAddress;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Arrays;
@@ -80,6 +81,7 @@
 import static com.azure.cosmos.implementation.HttpConstants.HttpHeaders;
 import static com.azure.cosmos.implementation.HttpConstants.SubStatusCodes;
 import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;
+import static com.azure.cosmos.implementation.guava27.Strings.lenientFormat;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -617,7 +619,7 @@ public void verifyGoneResponseMapsToGoneException() throws Exception {
         final RntbdTransportClient.Options options = new RntbdTransportClient.Options.Builder(connectionPolicy).build();
         final SslContext sslContext = SslContextBuilder.forClient().build();
 
-        try (final RntbdTransportClient transportClient = new RntbdTransportClient(options, sslContext)) {
+        try (final RntbdTransportClient transportClient = new RntbdTransportClient(options, sslContext, null)) {
 
             final BaseAuthorizationTokenProvider authorizationTokenProvider = new BaseAuthorizationTokenProvider(
                 new AzureKeyCredential(RntbdTestConfiguration.AccountKey)
@@ -735,7 +737,7 @@ private static RntbdTransportClient getRntbdTransportClientUnderTest(
             throw new AssertionError(String.format("%s: %s", error.getClass(), error.getMessage()));
         }
 
-        return new RntbdTransportClient(new FakeEndpoint.Provider(options, sslContext, expected));
+        return new RntbdTransportClient(new FakeEndpoint.Provider(options, sslContext, expected, null));
     }
 
     private void validateFailure(final Mono responseMono, final FailureValidator validator) {
@@ -816,12 +818,27 @@ private static final class FakeEndpoint implements RntbdEndpoint {
         final RntbdRequestTimer requestTimer;
         final FakeChannel fakeChannel;
         final URI physicalAddress;
+        final URI remoteURI;
         final Tag tag;
 
         private FakeEndpoint(
             final Config config, final RntbdRequestTimer timer, final URI physicalAddress,
             final RntbdResponse... expected
         ) {
+            try {
+                this.remoteURI = new URI(
+                    physicalAddress.getScheme(),
+                    null,
+                    physicalAddress.getHost(),
+                    physicalAddress.getPort(),
+                    null,
+                    null,
+                    null);
+            } catch (URISyntaxException error) {
+                throw new IllegalArgumentException(
+                    lenientFormat("physicalAddress %s cannot be parsed as a server-based authority", physicalAddress),
+                    error);
+            }
 
             final ArrayBlockingQueue responses = new ArrayBlockingQueue<>(
                 expected.length, true, Arrays.asList(expected)
@@ -908,6 +925,9 @@ public SocketAddress remoteAddress() {
             return this.fakeChannel.remoteAddress();
         }
 
+        @Override
+        public URI remoteURI() { return this.remoteURI; }
+
         @Override
         public int requestQueueLength() {
             return 0;
@@ -953,13 +973,15 @@ static class Provider implements RntbdEndpoint.Provider {
             final Config config;
             final RntbdResponse expected;
             final RntbdRequestTimer timer;
+            final IAddressResolver addressResolver;
 
-            Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected) {
+            Provider(RntbdTransportClient.Options options, SslContext sslContext, RntbdResponse expected, IAddressResolver addressResolver) {
                 this.config = new Config(options, sslContext, LogLevel.WARN);
                 this.timer = new RntbdRequestTimer(
                     config.requestTimeoutInNanos(),
                     config.requestTimerResolutionInNanos());
                 this.expected = expected;
+                this.addressResolver = addressResolver;
             }
 
             @Override
@@ -987,6 +1009,11 @@ public RntbdEndpoint get(URI physicalAddress) {
                 return new FakeEndpoint(config, timer, physicalAddress, expected);
             }
 
+            @Override
+            public IAddressResolver getAddressResolver() {
+                return this.addressResolver;
+            }
+
             @Override
             public Stream list() {
                 return Stream.empty();
diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClientTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClientTest.java
index e8057e93bb048..50498c0c58181 100644
--- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClientTest.java
+++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/SharedTransportClientTest.java
@@ -9,8 +9,6 @@
 import com.azure.cosmos.implementation.UserAgentContainer;
 import org.testng.annotations.Test;
 
-import java.time.Duration;
-
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class SharedTransportClientTest {
@@ -19,8 +17,8 @@ public void createTwoClient_SharedReference() {
         TransportClient transportClient1 = null;
         TransportClient transportClient2 = null;
         try {
-            transportClient1 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig());
-            transportClient2 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig());
+            transportClient1 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig(), null);
+            transportClient2 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig(), null);
 
             assertThat(transportClient2).isSameAs(transportClient1);
             assertThat(((SharedTransportClient) transportClient1).getReferenceCounter()).isEqualTo(2);
@@ -35,8 +33,8 @@ public void createTwoHttpsClient_SharedReference() {
         TransportClient transportClient1 = null;
         TransportClient transportClient2 = null;
         try {
-            transportClient1 = SharedTransportClient.getOrCreateInstance(Protocol.HTTPS, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig());
-            transportClient2 = SharedTransportClient.getOrCreateInstance(Protocol.HTTPS, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig());
+            transportClient1 = SharedTransportClient.getOrCreateInstance(Protocol.HTTPS, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig(), null);
+            transportClient2 = SharedTransportClient.getOrCreateInstance(Protocol.HTTPS, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig(), null);
 
             assertThat(transportClient2).isSameAs(transportClient1);
             assertThat(((SharedTransportClient) transportClient1).getReferenceCounter()).isEqualTo(2);
@@ -53,12 +51,12 @@ public void createTwoClient_CloseOne_CreateAnotherClient_SharedReference() throw
         TransportClient transportClient3 = null;
 
         try {
-            transportClient1 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig());
-            transportClient2 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig());
+            transportClient1 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig(), null);
+            transportClient2 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig(), null);
             transportClient2.close();
             assertThat(((SharedTransportClient) transportClient1).getReferenceCounter()).isEqualTo(1);
 
-            transportClient3 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig());
+            transportClient3 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig(), null);
             assertThat(transportClient3).isSameAs(transportClient1);
             assertThat(((SharedTransportClient) transportClient1).getReferenceCounter()).isEqualTo(2);
         } finally {
@@ -74,13 +72,13 @@ public void createTwoClient_CloseBoth_ReCreateClient_NewReference() throws Excep
         TransportClient transportClient3 = null;
 
         try {
-            transportClient1 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig());
-            transportClient2 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig());
+            transportClient1 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig(), null);
+            transportClient2 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig(), null);
             transportClient1.close();
             transportClient2.close();
             assertThat(((SharedTransportClient) transportClient1).getReferenceCounter()).isEqualTo(0);
 
-            transportClient3 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig());
+            transportClient3 = SharedTransportClient.getOrCreateInstance(Protocol.TCP, new Configs(), ConnectionPolicy.getDefaultPolicy(), new UserAgentContainer(), new DiagnosticsClientContext.DiagnosticsClientConfig(), null);
             assertThat(transportClient3).isNotSameAs(transportClient1);
             assertThat(((SharedTransportClient) transportClient3).getReferenceCounter()).isEqualTo(1);
         } finally {