Skip to content

Commit

Permalink
Preserve Ordering in Bulk (Azure#35892)
Browse files Browse the repository at this point in the history
* preserve ordering and some of the retry logic

* Logic for preserving order and add batch item to fault injection

* Preserve ordering on retry failures

* test for no retry flow of bulk executor

* added comments

* added back window and sort

* refactoring and addressing comments from meeting

* revert some changes

* removed idPartitionKey class and used CosmosItemIdentity instead

* revert some changes and clean up code

* refactor code and tests, add comments, address partition splitting scenario

* Comments and cleaned up code

* fix spacing

* Added generic typing to status queue

* added equals and hashcode to flush and bulk

* added copyright note

* revert bulk executor

* remove typo

* Changed equals method back for bulk item operation because causes spark to fail

* addressed comments on pr

---------

Co-authored-by: Tomas Varon <[email protected]>
  • Loading branch information
tvaron3 and Tomas Varon authored Aug 7, 2023
1 parent f744e7a commit c525c96
Show file tree
Hide file tree
Showing 18 changed files with 1,663 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,33 @@
import com.azure.cosmos.CosmosDatabaseForTest;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkItemResponse;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkOperations;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.test.faultinjection.CosmosFaultInjectionHelper;
import com.azure.cosmos.test.faultinjection.FaultInjectionCondition;
import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionErrorType;
import com.azure.cosmos.test.faultinjection.FaultInjectionRule;
import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType;
import com.azure.cosmos.test.faultinjection.FaultInjectionOperationType;
import com.azure.cosmos.test.faultinjection.FaultInjectionResultBuilders;
import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType;
import com.azure.cosmos.test.faultinjection.FaultInjectionConditionBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionRule;
import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorResultBuilder;
import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType;
import com.azure.cosmos.test.faultinjection.IFaultInjectionResult;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.Disposable;
Expand All @@ -54,7 +60,7 @@ public class BulkExecutorTest extends BatchTestBase {
private CosmosAsyncDatabase database;
private String preExistingDatabaseId = CosmosDatabaseForTest.generateId();

@Factory(dataProvider = "simpleClientBuildersWithJustDirectTcp")
@Factory(dataProvider = "simpleClientBuildersWithDirect")
public BulkExecutorTest(CosmosClientBuilder clientBuilder) {
super(clientBuilder);
}
Expand Down Expand Up @@ -207,17 +213,17 @@ public void executeBulk_OnGoneFailure() throws InterruptedException {
Flux.fromIterable(cosmosItemOperations),
new CosmosBulkExecutionOptions());

try {
CosmosFaultInjectionHelper
.configureFaultInjectionRules(container, Arrays.asList(connectionCloseRule, serverResponseDelayRule))
.block();
CosmosFaultInjectionHelper
.configureFaultInjectionRules(container, Arrays.asList(connectionCloseRule, serverResponseDelayRule))
.block();

List<CosmosBulkOperationResponse<BulkExecutorTest>> bulkResponse =
List<CosmosBulkOperationResponse<BulkExecutorTest>> bulkResponse =
Flux
.deferContextual(context -> executor.execute())
.collectList()
.block();

try {
assertThat(bulkResponse.size()).isEqualTo(1);

CosmosBulkOperationResponse<BulkExecutorTest> operationResponse = bulkResponse.get(0);
Expand All @@ -232,6 +238,190 @@ public void executeBulk_OnGoneFailure() throws InterruptedException {
}
}

@DataProvider()
public Object[][] faultInjectionProvider() {
return new Object[][]{
{null},
{injectBatchFailure("RequestRateTooLarge", FaultInjectionServerErrorType.TOO_MANY_REQUEST, 10)},
{injectBatchFailure("PartitionSplit", FaultInjectionServerErrorType.PARTITION_IS_SPLITTING, 2)}
};
}

// tests preserving order in the regular retry flow and when a partition split happens
@Test(groups = { "emulator" }, timeOut = TIMEOUT * 25, dataProvider = "faultInjectionProvider")
public void executeBulk_preserveOrdering_OnFaults(FaultInjectionRule rule) throws InterruptedException {
int totalRequest = 100;
this.container = createContainer(database);

List<CosmosItemOperation> cosmosItemOperations = new ArrayList<>();
String duplicatePK = UUID.randomUUID().toString();
String id = UUID.randomUUID().toString();
for (int i = 0; i < totalRequest; i++) {
if (i == 0) {
BatchTestBase.EventDoc eventDoc = new BatchTestBase.EventDoc(id, 2, 4, "type1",
duplicatePK);
cosmosItemOperations.add(CosmosBulkOperations.getCreateItemOperation(eventDoc,
new PartitionKey(duplicatePK)));
} else {
cosmosItemOperations.add(CosmosBulkOperations.getPatchItemOperation(id,
new PartitionKey(duplicatePK),
CosmosPatchOperations.create().replace("/type", "updated" + i)));
}
}

CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.setOrderingPreserved(cosmosBulkExecutionOptions, true);


Flux<CosmosItemOperation> inputFlux = Flux
.fromIterable(cosmosItemOperations)
.delayElements(Duration.ofMillis(100));
final BulkExecutorWithOrderingPreserved<BulkExecutorTest> executor = new BulkExecutorWithOrderingPreserved<>(
this.container,
inputFlux,
cosmosBulkExecutionOptions);

if (rule != null) {
CosmosFaultInjectionHelper
.configureFaultInjectionRules(this.container,
Arrays.asList(rule))
.block();
}


List<CosmosBulkOperationResponse<BulkExecutorTest>> bulkResponse =
Flux.deferContextual(context -> executor.execute()).collect(Collectors.toList()).block();

try {

assertThat(bulkResponse.size()).isEqualTo(totalRequest);

for (int i = 0; i < cosmosItemOperations.size(); i++) {
CosmosBulkOperationResponse<BulkExecutorTest> operationResponse = bulkResponse.get(i);
com.azure.cosmos.models.CosmosBulkItemResponse cosmosBulkItemResponse =
operationResponse.getResponse();

assertThat(operationResponse.getOperation()).isEqualTo(cosmosItemOperations.get(i));
if (i == 0) {
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code());
} else {
assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code());
}
assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull();
assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0);
assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull();
assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull();
assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull();

}
} finally {
if (executor != null && !executor.isDisposed()) {
executor.dispose();
}
if (rule != null) {
rule.disable();
}
}
}

// Tests No Retry Exception flow
@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void executeBulk_preserveOrdering_OnServiceUnAvailable() throws InterruptedException {
int totalRequest = 100;
this.container = createContainer(database);

List<CosmosItemOperation> cosmosItemOperations = new ArrayList<>();
String duplicatePK = UUID.randomUUID().toString();
String duplicateId = UUID.randomUUID().toString();
PartitionKey duplicatePartitionKey = new PartitionKey(duplicatePK);
for (int i = 0; i < totalRequest; i++) {
if (i == 0) {
BatchTestBase.EventDoc eventDoc = new BatchTestBase.EventDoc(duplicateId, 2, 4, "type1",
duplicatePK);
cosmosItemOperations.add(CosmosBulkOperations.getCreateItemOperation(eventDoc,
duplicatePartitionKey));
} else {
cosmosItemOperations.add(CosmosBulkOperations.getPatchItemOperation(duplicateId,
new PartitionKey(duplicatePK),
CosmosPatchOperations.create().replace("/type", "updated" + i)));
}
}

CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.setOrderingPreserved(cosmosBulkExecutionOptions, true);

FaultInjectionRule rule = injectBatchFailure("ServiceUnavailable", FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, 1);

Flux<CosmosItemOperation> inputFlux = Flux
.fromIterable(cosmosItemOperations)
.delayElements(Duration.ofMillis(100));
final BulkExecutorWithOrderingPreserved<BulkExecutorTest> executor = new BulkExecutorWithOrderingPreserved<>(
this.container,
inputFlux,
cosmosBulkExecutionOptions);

CosmosFaultInjectionHelper
.configureFaultInjectionRules(this.container,
Arrays.asList(rule))
.block();
List<CosmosBulkOperationResponse<BulkExecutorTest>> bulkResponse =
Flux.deferContextual(context -> executor.execute()).collect(Collectors.toList()).block();

try {
assertThat(bulkResponse.size()).isEqualTo(totalRequest);


for (int i = 0; i < cosmosItemOperations.size(); i++) {
CosmosBulkOperationResponse<BulkExecutorTest> operationResponse = bulkResponse.get(i);
com.azure.cosmos.models.CosmosBulkItemResponse cosmosBulkItemResponse =
operationResponse.getResponse();
assertThat(cosmosBulkItemResponse).isNull();

}

} finally {
if (executor != null && !executor.isDisposed()) {
executor.dispose();
}
rule.disable();
}
}

private FaultInjectionRule injectBatchFailure(String id, FaultInjectionServerErrorType serverErrorType, int hitLimit) {


FaultInjectionServerErrorResultBuilder faultInjectionResultBuilder = FaultInjectionResultBuilders
.getResultBuilder(serverErrorType)
.delay(Duration.ofMillis(1500));


IFaultInjectionResult result = faultInjectionResultBuilder.build();

FaultInjectionConnectionType connectionType = FaultInjectionConnectionType.GATEWAY;
if (ImplementationBridgeHelpers
.CosmosAsyncClientHelper
.getCosmosAsyncClientAccessor()
.getConnectionMode(this.client)
.equals(ConnectionMode.DIRECT.toString())) {
connectionType = FaultInjectionConnectionType.DIRECT;
}

FaultInjectionCondition condition = new FaultInjectionConditionBuilder()
.operationType(FaultInjectionOperationType.BATCH_ITEM)
.connectionType(connectionType)
.build();

return new FaultInjectionRuleBuilder(id)
.condition(condition)
.result(result)
.startDelay(Duration.ofSeconds(1))
.hitLimit(hitLimit)
.build();
}


@Test(groups = { "emulator" }, timeOut = TIMEOUT)
Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
### 4.49.0-beta.1 (Unreleased)

#### Features Added

* Added a flag for allowing customers to preserve ordering in bulk mode. See [PR 35892](https://github.com/Azure/azure-sdk-for-java/pull/35892)
#### Breaking Changes
* Gone exceptions that are not idempotent should not be retried because it is not known if they succeeded for sure. The handling of the exception in this case is left to the user. Fixed retrying write operations when a gone exception occurs in bulk mode. - See [PR 35838](https://github.com/Azure/azure-sdk-for-java/pull/35838)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.batch.BatchExecutor;
import com.azure.cosmos.implementation.batch.BulkExecutor;
import com.azure.cosmos.implementation.batch.BulkExecutorWithOrderingPreserved;
import com.azure.cosmos.implementation.faultinjection.IFaultInjectorProvider;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
Expand Down Expand Up @@ -1305,9 +1306,16 @@ public <TContext> Flux<CosmosBulkOperationResponse<TContext>> executeBulkOperati
final CosmosBulkExecutionOptions cosmosBulkExecutionOptions = bulkOptions;

return Flux.deferContextual(context -> {
final BulkExecutor<TContext> executor = new BulkExecutor<>(this, operations, cosmosBulkExecutionOptions);

return executor.execute().publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC);
// revert easily back to original bulk executor
if (ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper
.getCosmosBulkExecutionOptionsAccessor()
.isOrderingPreserved(cosmosBulkExecutionOptions)) {
final BulkExecutorWithOrderingPreserved<TContext> executor = new BulkExecutorWithOrderingPreserved<>(this, operations, cosmosBulkExecutionOptions);
return executor.execute().publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC);
} else {
final BulkExecutor<TContext> executor = new BulkExecutor<>(this, operations, cosmosBulkExecutionOptions);
return executor.execute().publishOn(CosmosSchedulers.BULK_EXECUTOR_BOUNDED_ELASTIC);
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,14 @@ public CosmosException createCosmosException(int statusCode, Exception innerExce
return new CosmosException(statusCode, innerException);
}

@Override
public CosmosException createCosmosException(int statusCode,
String message,
Map<String, String> responseHeaders,
Exception exception) {
return new CosmosException(statusCode, message, responseHeaders, exception);
}

@Override
public List<String> getReplicaStatusList(CosmosException cosmosException) {
return cosmosException.getReplicaStatusList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ void setOperationContext(

OperationContextAndListenerTuple getOperationContext(CosmosBulkExecutionOptions options);

void setOrderingPreserved(CosmosBulkExecutionOptions options,
boolean preserveOrdering);

boolean isOrderingPreserved(CosmosBulkExecutionOptions options);

<T> T getLegacyBatchScopedContext(CosmosBulkExecutionOptions options);

double getMinTargetedMicroBatchRetryRate(CosmosBulkExecutionOptions options);
Expand Down Expand Up @@ -1333,6 +1338,8 @@ public static void setCosmosExceptionAccessor(final CosmosExceptionAccessor newA

public interface CosmosExceptionAccessor {
CosmosException createCosmosException(int statusCode, Exception innerException);
CosmosException createCosmosException(int statusCode, String message, Map<String, String> responseHeaders,
Exception exception);
List<String> getReplicaStatusList(CosmosException cosmosException);
CosmosException setRntbdChannelStatistics(CosmosException cosmosException, RntbdChannelStatistics rntbdChannelStatistics);
RntbdChannelStatistics getRntbdChannelStatistics(CosmosException cosmosException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1679,6 +1679,11 @@ private Mono<RxDocumentServiceRequest> getBatchDocumentRequest(DocumentClientRet
options,
content);

if (serverBatchRequest instanceof PartitionKeyRangeServerBatchRequest) {
PartitionKeyRangeServerBatchRequest partitionKeyRangeServerBatchRequest = (PartitionKeyRangeServerBatchRequest) serverBatchRequest;
request.setPartitionBasedGoneNotifier(partitionKeyRangeServerBatchRequest.getPartitionBasedGoneNotifier());
}

if (options != null) {
request.requestContext.setExcludeRegions(options.getExcludeRegions());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.cosmos.CosmosDiagnostics;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.batch.PartitionBasedGoneNotifier;
import com.azure.cosmos.implementation.directconnectivity.WFConstants;
import com.azure.cosmos.implementation.faultinjection.FaultInjectionRequestContext;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
Expand Down Expand Up @@ -84,6 +85,8 @@ public class RxDocumentServiceRequest implements Cloneable {

private volatile boolean nonIdempotentWriteRetriesEnabled = false;

private PartitionBasedGoneNotifier partitionBasedGoneNotifier;

public boolean isReadOnlyRequest() {
return this.operationType == OperationType.Read
|| this.operationType == OperationType.ReadFeed
Expand Down Expand Up @@ -1184,4 +1187,12 @@ public Duration getResponseTimeout() {
public void setResponseTimeout(Duration responseTimeout) {
this.responseTimeout = responseTimeout;
}

public void setPartitionBasedGoneNotifier(PartitionBasedGoneNotifier partitionBasedGoneNotifier) {
this.partitionBasedGoneNotifier = partitionBasedGoneNotifier;
}

public PartitionBasedGoneNotifier getPartitionBasedGoneNotifier() {
return partitionBasedGoneNotifier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,5 @@ public final class BatchRequestResponseConstants {

public static final int MAX_COLLECTION_RECREATION_RETRY_COUNT = 10;
public static final int MAX_COLLECTION_RECREATION_REFRESH_INTERVAL_IN_SECONDS = 1;
public static final String PRESERVE_ORDERING_EXCEPTION_MESSAGE = "Failed due to a previous operation failing with an exception that should not be retried with the same id and partition key. This is unique to the preserve ordering flag.";
}
Loading

0 comments on commit c525c96

Please sign in to comment.