Skip to content

Commit

Permalink
ConnectionStateListener change (Azure#15991)
Browse files Browse the repository at this point in the history
* ConnectionStateListener change
Co-authored-by: Annie Liang <[email protected]>
  • Loading branch information
xinlian12 authored Oct 21, 2020
1 parent 8f0d1b2 commit f0445e4
Show file tree
Hide file tree
Showing 23 changed files with 525 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos;

import com.azure.cosmos.util.Beta;
import io.netty.channel.ChannelOption;

import java.time.Duration;
Expand All @@ -15,12 +16,14 @@
*/
public final class DirectConnectionConfig {
// Constants
private static final Boolean DEFAULT_CONNECTION_ENDPOINT_REDISCOVERY_ENABLED = false;
private static final Duration DEFAULT_IDLE_ENDPOINT_TIMEOUT = Duration.ofHours(1l);
private static final Duration DEFAULT_CONNECT_TIMEOUT = Duration.ofSeconds(5L);
private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(5L);
private static final int DEFAULT_MAX_CONNECTIONS_PER_ENDPOINT = 130;
private static final int DEFAULT_MAX_REQUESTS_PER_CONNECTION = 30;

private boolean connectionEndpointRediscoveryEnabled;
private Duration connectTimeout;
private Duration idleConnectionTimeout;
private Duration idleEndpointTimeout;
Expand All @@ -32,6 +35,7 @@ public final class DirectConnectionConfig {
* Constructor
*/
public DirectConnectionConfig() {
this.connectionEndpointRediscoveryEnabled = DEFAULT_CONNECTION_ENDPOINT_REDISCOVERY_ENABLED;
this.connectTimeout = DEFAULT_CONNECT_TIMEOUT;
this.idleConnectionTimeout = Duration.ZERO;
this.idleEndpointTimeout = DEFAULT_IDLE_ENDPOINT_TIMEOUT;
Expand All @@ -40,6 +44,39 @@ public DirectConnectionConfig() {
this.requestTimeout = DEFAULT_REQUEST_TIMEOUT;
}

/**
* Gets a value indicating whether Direct TCP connection endpoint rediscovery is enabled.
* <p>
* The connection endpoint rediscovery feature is designed to reduce and spread-out latency spikes that may occur during maintenance operations.
*
* By default, connection endpoint rediscovery is disabled.
*
* @return {@code true} if Direct TCP connection endpoint rediscovery is enabled; {@code false} otherwise.
*/
@Beta(Beta.SinceVersion.V4_8_0)
public boolean isConnectionEndpointRediscoveryEnabled() {
return this.connectionEndpointRediscoveryEnabled;
}

/**
* Sets a value indicating whether Direct TCP connection endpoint rediscovery should be enabled.
* <p>
* The connection endpoint rediscovery feature is designed to reduce and spread-out latency spikes that may occur during maintenance operations.
*
* By default, connection endpoint rediscovery is disabled.
*
* @param connectionEndpointRediscoveryEnabled {@code true} if connection endpoint rediscovery is enabled; {@code
* false} otherwise.
*
* @return the {@linkplain DirectConnectionConfig}.
*/
@Beta(Beta.SinceVersion.V4_8_0)
public DirectConnectionConfig setConnectionEndpointRediscoveryEnabled(boolean connectionEndpointRediscoveryEnabled) {
this.connectionEndpointRediscoveryEnabled = connectionEndpointRediscoveryEnabled;
return this;
}


/**
* Gets the default DIRECT connection configuration.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class ConnectionPolicy {
private int maxConnectionsPerEndpoint;
private int maxRequestsPerConnection;
private Duration idleTcpConnectionTimeout;
private boolean tcpConnectionEndpointRediscoveryEnabled;

/**
* Constructor.
Expand All @@ -52,6 +53,7 @@ public ConnectionPolicy(GatewayConnectionConfig gatewayConnectionConfig) {
this.maxConnectionPoolSize = gatewayConnectionConfig.getMaxConnectionPoolSize();
this.requestTimeout = BridgeInternal.getRequestTimeoutFromGatewayConnectionConfig(gatewayConnectionConfig);
this.proxy = gatewayConnectionConfig.getProxy();
this.tcpConnectionEndpointRediscoveryEnabled = false;
}

public ConnectionPolicy(DirectConnectionConfig directConnectionConfig) {
Expand All @@ -62,6 +64,7 @@ public ConnectionPolicy(DirectConnectionConfig directConnectionConfig) {
this.maxConnectionsPerEndpoint = directConnectionConfig.getMaxConnectionsPerEndpoint();
this.maxRequestsPerConnection = directConnectionConfig.getMaxRequestsPerConnection();
this.requestTimeout = BridgeInternal.getRequestTimeoutFromDirectConnectionConfig(directConnectionConfig);
this.tcpConnectionEndpointRediscoveryEnabled = directConnectionConfig.isConnectionEndpointRediscoveryEnabled();
}

private ConnectionPolicy(ConnectionMode connectionMode) {
Expand All @@ -75,6 +78,26 @@ private ConnectionPolicy(ConnectionMode connectionMode) {
this.userAgentSuffix = "";
}

/**
* Gets a value that indicates whether Direct TCP connection endpoint rediscovery is enabled.
*
* @return {@code true} if Direct TCP connection endpoint rediscovery should is enabled; {@code false} otherwise.
*/
public boolean isTcpConnectionEndpointRediscoveryEnabled() {
return this.tcpConnectionEndpointRediscoveryEnabled;
}

/**
* Sets a value that indicates whether Direct TCP connection endpoint rediscovery is enabled.
*
* @return the {@linkplain ConnectionPolicy}.
*/
public ConnectionPolicy setTcpConnectionEndpointRediscoveryEnabled(boolean tcpConnectionEndpointRediscoveryEnabled) {
this.tcpConnectionEndpointRediscoveryEnabled = tcpConnectionEndpointRediscoveryEnabled;
return this;
}


/**
* Gets the default connection policy.
*
Expand Down Expand Up @@ -493,6 +516,7 @@ public String toString() {
", idleEndpointTimeout=" + idleEndpointTimeout +
", maxConnectionsPerEndpoint=" + maxConnectionsPerEndpoint +
", maxRequestsPerConnection=" + maxRequestsPerConnection +
", tcpConnectionEndpointRediscoveryEnabled=" + tcpConnectionEndpointRediscoveryEnabled +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,14 @@ private String rntbdConfigInternal(RntbdTransportClient.Options rntbdOptions) {
if (rntbdOptions == null) {
return null;
}
return Strings.lenientFormat("(cto:%s, rto:%s, icto:%s, ieto:%s, mcpe:%s, mrpc:%s)",
return Strings.lenientFormat("(cto:%s, rto:%s, icto:%s, ieto:%s, mcpe:%s, mrpc:%s, cer:%s)",
rntbdOptions.connectTimeout(),
rntbdOptions.requestTimeout(),
rntbdOptions.idleChannelTimeout(),
rntbdOptions.idleEndpointTimeout(),
rntbdOptions.maxChannelsPerEndpoint(),
rntbdOptions.maxRequestsPerChannel());
rntbdOptions.maxRequestsPerChannel(),
rntbdOptions.isConnectionEndpointRediscoveryEnabled());
}

private String preferredRegionsInternal() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ public GoneException(String message, String requestUri) {
this(message, null, new HashMap<>(), requestUri);
}

/**
* Instantiates a new {@link GoneException Gone exception}.
*
* @param message the message
* @param requestUri the request uri
* @param cause the cause of this (client-side) {@link GoneException}
*/
public GoneException(String message, URI requestUri, Exception cause) {
this(message, cause, null, requestUri);
}

GoneException(Exception innerException) {
this(RMResources.Gone, innerException, new HashMap<>(), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,15 +397,6 @@ public void init() {

private void initializeDirectConnectivity() {

this.storeClientFactory = new StoreClientFactory(
this.diagnosticsClientConfig,
this.configs,
this.connectionPolicy,
// this.maxConcurrentConnectionOpenRequests,
this.userAgentContainer,
this.connectionSharingAcrossClientsEnabled
);

this.addressResolver = new GlobalAddressResolver(this,
this.reactorHttpClient,
this.globalEndpointManager,
Expand All @@ -419,6 +410,16 @@ private void initializeDirectConnectivity() {
null,
this.connectionPolicy);

this.storeClientFactory = new StoreClientFactory(
this.addressResolver,
this.diagnosticsClientConfig,
this.configs,
this.connectionPolicy,
// this.maxConcurrentConnectionOpenRequests,
this.userAgentContainer,
this.connectionSharingAcrossClientsEnabled
);

this.createStoreModel(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.NotImplementedException;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.PartitionKeyInternal;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Function;

Expand Down Expand Up @@ -62,6 +64,11 @@ public void initializeCaches(
this.collectionRoutingMapCache = collectionRoutingMapCache;
}

@Override
public void remove(RxDocumentServiceRequest request, Set<PartitionKeyRangeIdentity> partitionKeyRangeIdentitySet) {
throw new NotImplementedException("remove() is not supported in AddressResolver");
}

public Mono<AddressInformation[]> resolveAsync(
RxDocumentServiceRequest request,
boolean forceRefreshPartitionAddresses) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.azure.cosmos.implementation.http.HttpResponse;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.timeout.ReadTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
Expand All @@ -55,6 +54,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -142,6 +142,18 @@ public GatewayAddressCache(
DefaultSuboptimalPartitionForceRefreshIntervalInSeconds);
}

@Override
public void removeAddress(final PartitionKeyRangeIdentity partitionKeyRangeIdentity) {

Objects.requireNonNull(partitionKeyRangeIdentity, "expected non-null partitionKeyRangeIdentity");

if (partitionKeyRangeIdentity.getPartitionKeyRangeId().equals(PartitionKeyRange.MASTER_PARTITION_KEY_RANGE_ID)) {
this.masterPartitionAddressCache = null;
} else {
this.serverPartitionAddressCache.remove(partitionKeyRangeIdentity);
}
}

@Override
public Mono<Utils.ValueHolder<AddressInformation[]>> tryGetAddresses(RxDocumentServiceRequest request,
PartitionKeyRangeIdentity partitionKeyRangeIdentity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand All @@ -43,8 +45,6 @@ public class GlobalAddressResolver implements IAddressResolver {
private final GatewayServiceConfigurationReader serviceConfigReader;
final Map<URI, EndpointCache> addressCacheByEndpoint;

private GatewayAddressCache gatewayAddressCache;
private AddressResolver addressResolver;
private HttpClient httpClient;

public GlobalAddressResolver(
Expand Down Expand Up @@ -94,12 +94,32 @@ Mono<Void> openAsync(DocumentCollection collection) {
for (EndpointCache endpointCache : this.addressCacheByEndpoint.values()) {
tasks.add(endpointCache.addressCache.openAsync(collection, ranges));
}
@SuppressWarnings({"rawtypes", "unchecked"})
@SuppressWarnings({ "rawtypes", "unchecked" })
Mono<Void>[] array = new Mono[this.addressCacheByEndpoint.values().size()];
return Flux.mergeDelayError(Queues.SMALL_BUFFER_SIZE, tasks.toArray(array)).then();
});
}

@Override
public void remove(
final RxDocumentServiceRequest request,
final Set<PartitionKeyRangeIdentity> partitionKeyRangeIdentitySet) {

Objects.requireNonNull(request, "expected non-null request");
Objects.requireNonNull(partitionKeyRangeIdentitySet, "expected non-null partitionKeyRangeIdentitySet");

if (partitionKeyRangeIdentitySet.size() > 0) {

URI addressResolverURI = this.endpointManager.resolveServiceEndpoint(request);

this.addressCacheByEndpoint.computeIfPresent(addressResolverURI, (ignored, endpointCache) -> {
final GatewayAddressCache addressCache = endpointCache.addressCache;
partitionKeyRangeIdentitySet.forEach(partitionKeyRangeIdentity -> addressCache.removeAddress(partitionKeyRangeIdentity));
return endpointCache;
});
}
}

@Override
public Mono<AddressInformation[]> resolveAsync(RxDocumentServiceRequest request, boolean forceRefresh) {
IAddressResolver resolver = this.getAddressResolver(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@

public interface IAddressCache {

/**
* Removes the physical address associated with the given {@link PartitionKeyRangeIdentity partition key range identity}
*
*
*/
void removeAddress(PartitionKeyRangeIdentity partitionKeyRangeIdentity);

/**
* Resolves physical addresses by either PartitionKeyRangeIdentity.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
package com.azure.cosmos.implementation.directconnectivity;

import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import reactor.core.publisher.Mono;

import java.util.Set;

public interface IAddressResolver {

void remove(RxDocumentServiceRequest request, Set<PartitionKeyRangeIdentity> partitionKeyRangeIdentitySet);

Mono<AddressInformation[]> resolveAsync(
RxDocumentServiceRequest request,
boolean forceRefreshPartitionAddresses);
Expand Down
Loading

0 comments on commit f0445e4

Please sign in to comment.