Skip to content

Commit

Permalink
Add client-level excluded regions and make it mutable. (Azure#36616)
Browse files Browse the repository at this point in the history
* Preliminary changes to make certain properties dynamically configurable.

* Preliminary changes to make certain properties dynamically configurable.

* Preliminary changes to make certain properties dynamically configurable.

* Preliminary changes to make certain properties dynamically configurable.

* Preliminary changes to make certain properties dynamically configurable.

* Preliminary changes to make certain properties dynamically configurable.

* Preliminary changes to make certain properties dynamically configurable.

* Preliminary changes to make certain properties dynamically configurable.

* Added javadoc.

* Added javadoc.

* Added javadoc.

* Added javadoc.

* Attempt at fixing spot bugs.

* Attempt at fixing CI pipeline.

* Added mutability test for end-to-end timeout.

* Adding test coverage.

* Adding tests for excludeRegions.

* Adding tests for excludeRegions.

* Adding tests for excludeRegions.

* Adding tests for excludeRegions.

* Adding tests for excludeRegions.

* Adding tests for excludeRegions.

* Adding tests for excludeRegions.

* Adding tests for SessionRetryOptions.

* Adding tests for end-to-end operation timeout mutation.

* Adding tests for threshold and threshold step mutation.

* Adding tests for threshold and threshold step mutation.

* Adding tests for exclude regions mutability.

* Adding validations.

* Refactorings.

* Refactorings.

* Refactorings.

* Refactorings.

* Wired excludeRegions to diagnostic string.

* Fixed tests.

* Used monitor object to synchronize update of excludeRegions.

* Added test titles.

* Refactorings.

* Refactorings.

* Refactorings.

* Refactorings.

* Adding / Fixing tests.

* Wiring diagnostics.

* Locking changes.

* Diagnostic updated for wiring excluded regions.

* Refactorings.

* Fixing diagnostic issues.

* Refactorings.

* Refactorings.

* Fixing spot bugs.

* Adding 503 retry tests for point write operations.

* Adding code comments.

* Adding code comments.

* Adding code comments.

* Adding code comments.

* Fixing concurrent access to excludedRegions.

* Fixing NPE issues.

* Adding tests for E2E timeout mutation.

* Made regionSwitchHint volatile.

* Reverting changes.

* Refactorings.

* Refactorings.

* Refactorings.

* Updated CHANGELOG.md.

* Updated CHANGELOG.md.

* Refactorings.

* Refactorings.

* Adding tests.

* Adding tests.

* Added tests for bulk, batch and LocationCache.

* Added tests for bulk, batch and LocationCache.

* Added tests for bulk, batch and LocationCache.

* Added tests for bulk, batch and LocationCache.

* Test fixes.

* Test fixes.

* Addressing tests.

* Addressing tests.

* Fixed faulty merge.

* Addressing review comments.

* Preliminary changes to inject excluded regions through a Supplier.

* Preliminary changes to inject excluded regions through a Supplier.

* Preliminary changes to inject excluded regions through a Supplier.

* Fix CI issues.

* Clean up code.

* Clean up code.

* Fix tests.

* Clean up code.

* Updated CHANGELOG.md.

* Updated CHANGELOG.md.

* Refactorings.

* Reacting to review comments.

* Fixing CI pipeline failures.

* Refactorings.

* Fixing CI issues.

* Fixing CI/live-test issues.
  • Loading branch information
jeet1995 authored Sep 29, 2023
1 parent f76d34b commit 8142343
Show file tree
Hide file tree
Showing 16 changed files with 3,174 additions and 34 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.azure.cosmos.ConnectionMode;
import com.azure.cosmos.CosmosContainerProactiveInitConfig;
import com.azure.cosmos.CosmosContainerProactiveInitConfigBuilder;
import com.azure.cosmos.CosmosExcludedRegions;
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
import com.azure.cosmos.implementation.http.HttpClientConfig;
Expand All @@ -23,9 +25,12 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -190,6 +195,9 @@ public void full(

DiagnosticsClientContext clientContext = Mockito.mock(DiagnosticsClientContext.class);
System.setProperty("COSMOS.REPLICA_ADDRESS_VALIDATION_ENABLED", "false");
AtomicReference<CosmosExcludedRegions> cosmosExcludedRegionsAtomicReference = new AtomicReference<>(
new CosmosExcludedRegions(new HashSet<>(Arrays.asList("west us 2"))));
Supplier<CosmosExcludedRegions> excludedRegionsSupplier = () -> cosmosExcludedRegionsAtomicReference.get();

DiagnosticsClientContext.DiagnosticsClientConfig diagnosticsClientConfig = new DiagnosticsClientContext.DiagnosticsClientConfig();
String machineId = "vmId:" + UUID.randomUUID();
Expand All @@ -203,6 +211,8 @@ public void full(
httpConfig.withNetworkRequestTimeout(Duration.ofSeconds(18));
diagnosticsClientConfig.withGatewayHttpClientConfig(httpConfig.toDiagnosticsString());
diagnosticsClientConfig.withPreferredRegions(ImmutableList.of("west us 1", "west us 2"));
diagnosticsClientConfig.withConnectionPolicy(
new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig()).setExcludedRegionsSupplier(excludedRegionsSupplier));
diagnosticsClientConfig.withConnectionSharingAcrossClientsEnabled(true);
diagnosticsClientConfig.withEndpointDiscoveryEnabled(true);
diagnosticsClientConfig.withClientMap(new HashMap<>());
Expand All @@ -224,6 +234,7 @@ public void full(
assertThat(objectNode.get("connCfg").get("rntbd").asText()).isEqualTo("null");
assertThat(objectNode.get("connCfg").get("gw").asText()).isEqualTo("(cps:500, nrto:PT18S, icto:PT17S, p:false)");
assertThat(objectNode.get("connCfg").get("other").asText()).isEqualTo("(ed: true, cs: true, rv: false)");
assertThat(objectNode.get("excrgns").asText()).isEqualTo("[westus2]");

String expectedProactiveInitConfigString = reconstructProactiveInitConfigString(cosmosContainerIdentities, aggressiveWarmupDuration, proactiveConnectionRegionCount);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,10 @@ public static LocationCache getLocationCache(GlobalEndpointManager globalEndpoin
return get(LocationCache.class, globalEndpointManager, "locationCache");
}

public static ConnectionPolicy getConnectionPolicy(LocationCache locationCache) {
return get(ConnectionPolicy.class, locationCache, "connectionPolicy");
}

public static HttpClient getClientTelemetryHttpClint(ClientTelemetry clientTelemetry) {
return get(HttpClient.class, clientTelemetry, "httpClient");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.cosmos.implementation.routing;

import com.azure.cosmos.CosmosExcludedRegions;
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.implementation.ConnectionPolicy;
import com.azure.cosmos.implementation.LifeCycleUtils;
Expand All @@ -16,6 +17,7 @@
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import com.azure.cosmos.models.ModelBridgeUtils;
import com.azure.cosmos.implementation.guava25.collect.ImmutableList;
import com.azure.cosmos.implementation.guava25.collect.Iterables;
Expand All @@ -27,20 +29,25 @@

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Tests for {@link LocationCache}
Expand Down Expand Up @@ -89,6 +96,88 @@ public Object[][] paramsProvider() {
return list.toArray(new Object[][]{});
}

// The purpose of validateExcludedRegions is the following:
// 1. Test with various combinations of excludedRegions such as:
// a) list with excludedRegions which is a sub-list of preferredRegions
// b) list with excludedRegions which is a sub-list of preferredRegions and has duplicates
// c) list with excludedRegions which is not a sub-list of preferredRegions and has no duplicates
// d) list with excludedRegions which is not a sub-list of preferredRegions and has duplicates
// e) list which is null
// f) list which is empty
// 2. todo: the dataProvider hard codes the list of available read and available write regions - this can be avoided
// a) according to the hardcoding - available read regions: location1, location2; available write regions: location1, location2, location3
@DataProvider(name = "excludedRegionsTestConfigs")
public Object[][] excludedRegionsTestConfigs() {
// Parameters passed:
// 1. List of regions to exclude on the client / ConnectionPolicy
// 2. List of regions to exclude on the request
// 3. The request itself
// 4. Expected applicable read regions
// 5. Expected applicable write regions
return new Object[][] {
{
new HashSet<>(Arrays.asList("location1", "location2")),
null,
RxDocumentServiceRequest.create(
mockDiagnosticsClientContext(),
OperationType.Read,
ResourceType.Document),
new HashSet<>(Arrays.asList(Location1Endpoint)),
new HashSet<>(Arrays.asList(Location3Endpoint))
},
{
new HashSet<>(Arrays.asList("location1", "location2", "location7")),
null,
RxDocumentServiceRequest.create(
mockDiagnosticsClientContext(),
OperationType.Read,
ResourceType.Document),
new HashSet<>(Arrays.asList(Location1Endpoint)),
new HashSet<>(Arrays.asList(Location3Endpoint))
},
{
null,
null,
RxDocumentServiceRequest.create(
mockDiagnosticsClientContext(),
OperationType.Read,
ResourceType.Document),
new HashSet<>(Arrays.asList(Location1Endpoint, Location2Endpoint)),
new HashSet<>(Arrays.asList(Location1Endpoint, Location2Endpoint, Location3Endpoint))
},
{
new HashSet<>(),
null,
RxDocumentServiceRequest.create(
mockDiagnosticsClientContext(),
OperationType.Read,
ResourceType.Document),
new HashSet<>(Arrays.asList(Location1Endpoint, Location2Endpoint)),
new HashSet<>(Arrays.asList(Location1Endpoint, Location2Endpoint, Location3Endpoint))
},
{
new HashSet<>(Arrays.asList("location5, location10, location10")),
null,
RxDocumentServiceRequest.create(
mockDiagnosticsClientContext(),
OperationType.Read,
ResourceType.Document),
new HashSet<>(Arrays.asList(Location1Endpoint, Location2Endpoint)),
new HashSet<>(Arrays.asList(Location1Endpoint, Location2Endpoint, Location3Endpoint))
},
{
new HashSet<>(Arrays.asList("location1, location2, location10")),
Arrays.asList("location2"),
RxDocumentServiceRequest.create(
mockDiagnosticsClientContext(),
OperationType.Read,
ResourceType.Document),
new HashSet<>(Arrays.asList(Location1Endpoint)),
new HashSet<>(Arrays.asList(Location1Endpoint, Location3Endpoint))
}
};
}

@Test(groups = "long", dataProvider = "paramsProvider")
public void validateAsync(boolean useMultipleWriteEndpoints,
boolean endpointDiscoveryEnabled,
Expand All @@ -106,6 +195,40 @@ public void validateWriteEndpointOrderWithClientSideDisableMultipleWriteLocation
assertThat(this.cache.getWriteEndpoints().get(2)).isEqualTo(LocationCacheTest.Location3Endpoint);
}

@Test(groups = "unit", dataProvider = "excludedRegionsTestConfigs")
public void validateExcludedRegions(
Set<String> excludedRegionsOnClient,
List<String> excludedRegionsOnRequest,
RxDocumentServiceRequest request,
Set<URI> expectedApplicableReadEndpoints,
Set<URI> expectedApplicableWriteEndpoints) throws Exception {

this.initialize(true, true, false);
AtomicReference<CosmosExcludedRegions> cosmosExcludedRegionsAtomicReference = new AtomicReference<>(
new CosmosExcludedRegions(new HashSet<>()));
Supplier<CosmosExcludedRegions> excludedRegionsSupplier = () -> cosmosExcludedRegionsAtomicReference.get();

ConnectionPolicy connectionPolicy = ReflectionUtils.getConnectionPolicy(this.cache);

connectionPolicy.setExcludedRegionsSupplier(excludedRegionsSupplier);

if (excludedRegionsOnClient == null) {
assertThatThrownBy(() -> cosmosExcludedRegionsAtomicReference.set(new CosmosExcludedRegions(excludedRegionsOnClient)))
.isInstanceOf(IllegalArgumentException.class);
} else {

cosmosExcludedRegionsAtomicReference.set(new CosmosExcludedRegions(excludedRegionsOnClient));

request.requestContext.setExcludeRegions(excludedRegionsOnRequest);

List<URI> applicableReadEndpoints = cache.getApplicableReadEndpoints(request);
List<URI> applicableWriteEndpoints = cache.getApplicableWriteEndpoints(request);

assertThat(applicableReadEndpoints.size()).isEqualTo(expectedApplicableReadEndpoints.size());
assertThat(applicableWriteEndpoints.size()).isEqualTo(expectedApplicableWriteEndpoints.size());
}
}

@AfterClass()
public void afterClass() {
LifeCycleUtils.closeQuietly(this.endpointManager);
Expand Down Expand Up @@ -135,27 +258,26 @@ private void initialize(
boolean enableEndpointDiscovery,
boolean isPreferredLocationsListEmpty) throws Exception {

ConnectionPolicy connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig());
connectionPolicy.setEndpointDiscoveryEnabled(enableEndpointDiscovery);
connectionPolicy.setMultipleWriteRegionsEnabled(useMultipleWriteLocations);

this.mockedClient = new DatabaseAccountManagerInternalMock();
this.databaseAccount = LocationCacheTest.createDatabaseAccount(useMultipleWriteLocations);

this.preferredLocations = isPreferredLocationsListEmpty ?
new UnmodifiableList<>(Collections.emptyList()) :
new UnmodifiableList<>(ImmutableList.of("location1", "location2", "location3"));

connectionPolicy.setPreferredRegions(this.preferredLocations);

this.cache = new LocationCache(
this.preferredLocations,
connectionPolicy,
LocationCacheTest.DefaultEndpoint,
enableEndpointDiscovery,
useMultipleWriteLocations,
configs);

this.cache.onDatabaseAccountRead(this.databaseAccount);

ConnectionPolicy connectionPolicy = new ConnectionPolicy(DirectConnectionConfig.getDefaultConfig());
connectionPolicy.setEndpointDiscoveryEnabled(enableEndpointDiscovery);
connectionPolicy.setMultipleWriteRegionsEnabled(useMultipleWriteLocations);
connectionPolicy.setPreferredRegions(this.preferredLocations);

this.endpointManager = new GlobalEndpointManager(mockedClient, connectionPolicy, configs);
}

Expand Down
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

#### Features Added
* Added a preview API to `ChangeFeedProcessorBuilder` to process an additional `ChangeFeedProcessorContext` for handling all versions and deletes changes. - See [PR 36715](https://github.com/Azure/azure-sdk-for-java/pull/36715)
* Added public APIs to configure a `Supplier<CosmosExcludedRegions>` through `CosmosClientBuilder#excludedRegionSupplier` and `CosmosExcludedRegions` - a type which encapsulates a set of excluded regions. See [PR 36616](https://github.com/Azure/azure-sdk-for-java/pull/36616)

#### Breaking Changes

#### Bugs Fixed
* Fixed an issue with the threshold based availability strategy, which could result in missing diagnostics and unnecessarily high tail latency - See [PR 36508](https://github.com/Azure/azure-sdk-for-java/pull/36508) and [PR 36786](https://github.com/Azure/azure-sdk-for-java/pull/36786).
* Fixed the issue of `excludeRegions` not being honored for `CosmosBulkExecutionOptions`. - See[PR 36616](https://github.com/Azure/azure-sdk-for-java/pull/36616)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.WriteRetryPolicy;
import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.clienttelemetry.ClientMetricsDiagnosticsHandler;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetry;
import com.azure.cosmos.implementation.clienttelemetry.ClientTelemetryDiagnosticsHandler;
Expand Down Expand Up @@ -57,6 +58,7 @@
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import reactor.core.publisher.Mono;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;

import static com.azure.cosmos.implementation.ImplementationBridgeHelpers.CosmosClientBuilderHelper;

Expand Down Expand Up @@ -139,6 +142,7 @@ public class CosmosClientBuilder implements
private CosmosContainerProactiveInitConfig proactiveContainerInitConfig;
private CosmosEndToEndOperationLatencyPolicyConfig cosmosEndToEndOperationLatencyPolicyConfig;
private SessionRetryOptions sessionRetryOptions;
private Supplier<CosmosExcludedRegions> cosmosExcludedRegionsSupplier;

/**
* Instantiates a new Cosmos client builder.
Expand Down Expand Up @@ -877,6 +881,34 @@ public CosmosClientBuilder sessionRetryOptions(SessionRetryOptions sessionRetryO
return this;
}

/**
* Sets a {@link Supplier<CosmosExcludedRegions>} which returns a {@link CosmosExcludedRegions} instance when {@link Supplier#get()} is invoked.
* The request will not be routed to regions present in {@link CosmosExcludedRegions#getExcludedRegions()}
* for hedging scenarios and retry scenarios for the workload executed through this instance
* of {@link CosmosClient} / {@link CosmosAsyncClient}.
*
* @param excludedRegionsSupplier the supplier which returns a {@code CosmosExcludedRegions} instance.
* @return current CosmosClientBuilder.
* */
public CosmosClientBuilder excludedRegionsSupplier(Supplier<CosmosExcludedRegions> excludedRegionsSupplier) {
this.cosmosExcludedRegionsSupplier = excludedRegionsSupplier;
return this;
}

/**
* Gets the regions to exclude from the list of preferred regions. A request will not be
* routed to these excluded regions for non-retry and retry scenarios
* for the workload executed through this instance of {@link CosmosClient} / {@link CosmosAsyncClient}.
*
* @return the list of regions to exclude.
* */
Set<String> getExcludedRegions() {
if (this.cosmosExcludedRegionsSupplier != null && this.cosmosExcludedRegionsSupplier.get() != null) {
return this.cosmosExcludedRegionsSupplier.get().getExcludedRegions();
}
return new HashSet<>();
}

SessionRetryOptions getSessionRetryOptions() {
return this.sessionRetryOptions;
}
Expand Down Expand Up @@ -1114,6 +1146,7 @@ ConnectionPolicy buildConnectionPolicy() {
this.connectionPolicy = new ConnectionPolicy(gatewayConnectionConfig);
}
this.connectionPolicy.setPreferredRegions(this.preferredRegions);
this.connectionPolicy.setExcludedRegionsSupplier(this.cosmosExcludedRegionsSupplier);
this.connectionPolicy.setUserAgentSuffix(this.userAgentSuffix);
this.connectionPolicy.setThrottlingRetryOptions(this.throttlingRetryOptions);
this.connectionPolicy.setEndpointDiscoveryEnabled(this.endpointDiscoveryEnabled);
Expand Down Expand Up @@ -1200,10 +1233,10 @@ private void logStartupInfo(StopWatch stopwatch, CosmosAsyncClient client) {

// NOTE: if changing the logging below - do not log any confidential info like master key credentials etc.
logger.info("Cosmos Client with (Correlation) ID [{}] started up in [{}] ms with the following " +
"configuration: serviceEndpoint [{}], preferredRegions [{}], connectionPolicy [{}], " +
"configuration: serviceEndpoint [{}], preferredRegions [{}], excludedRegions [{}], connectionPolicy [{}], " +
"consistencyLevel [{}], contentResponseOnWriteEnabled [{}], sessionCapturingOverride [{}], " +
"connectionSharingAcrossClients [{}], clientTelemetryEnabled [{}], proactiveContainerInit [{}], diagnostics [{}], tracing [{}]",
client.getContextClient().getClientCorrelationId(), time, getEndpoint(), getPreferredRegions(),
client.getContextClient().getClientCorrelationId(), time, getEndpoint(), getPreferredRegions(), getExcludedRegions(),
getConnectionPolicy(), getConsistencyLevel(), isContentResponseOnWriteEnabled(),
isSessionCapturingOverrideEnabled(), isConnectionSharingAcrossClientsEnabled(),
isClientTelemetryEnabled(), getProactiveContainerInitConfig(), diagnosticsCfg, tracingCfg);
Expand Down
Loading

0 comments on commit 8142343

Please sign in to comment.