diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java index b329bec6200a5..51439b6afeea4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java @@ -109,6 +109,13 @@ private ConcurrentHashMap getPartitionKeyRangeIdToTokenMa public String resolveGlobalSessionToken(RxDocumentServiceRequest request) { ConcurrentHashMap partitionKeyRangeIdToTokenMap = this.getPartitionKeyRangeIdToTokenMap(request); if (partitionKeyRangeIdToTokenMap != null) { + String partitionKeyRangeId = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID); + if (StringUtils.isNotEmpty(partitionKeyRangeId) && partitionKeyRangeIdToTokenMap.get(partitionKeyRangeId) != null) { + StringBuilder result = new StringBuilder() + .append(partitionKeyRangeId).append(":").append(partitionKeyRangeIdToTokenMap.get(partitionKeyRangeId).convertToString()); + return result.toString(); + } + return SessionContainer.getCombinedSessionToken(partitionKeyRangeIdToTokenMap); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java index cd16a7da0aae0..4c7f9c5f3e2ed 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java @@ -15,7 +15,8 @@ public interface LeaseCheckpointer { * * @param lease the lease to renew. * @param continuationToken the continuation token. + * @param cancellationToken the cancellation token. * @return the updated renewed lease. */ - Mono checkpoint(Lease lease, String continuationToken); + Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java index f5cba3a4d9e9a..509d2fa09785f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java @@ -100,9 +100,10 @@ static LeaseStoreManagerBuilderDefinition builder() { * * @param lease the Lease to renew. * @param continuationToken the continuation token. + * @param cancellationToken the cancellation token. * @return the updated renewed lease. */ - Mono checkpoint(Lease lease, String continuationToken); + Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken); /** * @return true if the lease store is initialized. diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java index 8b16bae7d896e..f4d1befef5055 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java @@ -16,4 +16,12 @@ public interface PartitionCheckpointer { * @return a deferred operation of this call. */ Mono checkpointPartition(ChangeFeedState continuationState); + + /** + * Sets the cancellation token in case we need to bail out before check-pointing. + * + * @param cancellationToken the cancellation token. + * @return this instance. + */ + PartitionCheckpointer setCancellationToken(CancellationToken cancellationToken); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/EqualPartitionsBalancingStrategy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/EqualPartitionsBalancingStrategy.java index 5b3bec04545e6..9f0ab9288195f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/EqualPartitionsBalancingStrategy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/EqualPartitionsBalancingStrategy.java @@ -24,6 +24,7 @@ class EqualPartitionsBalancingStrategy implements PartitionLoadBalancingStrategy private final int minPartitionCount; private final int maxPartitionCount; private final Duration leaseExpirationInterval; + private volatile int countAssignedLeases; public EqualPartitionsBalancingStrategy(String hostName, int minPartitionCount, int maxPartitionCount, Duration leaseExpirationInterval) { if (hostName == null) { @@ -34,6 +35,7 @@ public EqualPartitionsBalancingStrategy(String hostName, int minPartitionCount, this.minPartitionCount = minPartitionCount; this.maxPartitionCount = maxPartitionCount; this.leaseExpirationInterval = leaseExpirationInterval; + this.countAssignedLeases = 0; } @Override @@ -53,6 +55,7 @@ public List selectLeasesToTake(List allLeases) { int target = this.calculateTargetPartitionCount(partitionCount, workerCount); int myCount = workerToPartitionCount.get(this.hostName); + this.countAssignedLeases = myCount; int partitionsNeededForMe = target - myCount; /* @@ -147,7 +150,8 @@ private void categorizeLeases( allPartitions.put(lease.getLeaseToken(), lease); if (lease.getOwner() == null || lease.getOwner().isEmpty() || this.isExpired(lease)) { - this.logger.info("Found unused or expired lease {}", lease.getLeaseToken()); + this.logger.info("Found unused or expired lease {}; current lease count for instance owner {} is {} and maxScaleCount {} ", + lease.getLeaseToken(), this.hostName, this.countAssignedLeases, this.maxPartitionCount); expiredLeases.add(lease); } else { String assignedTo = lease.getOwner(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java index c092f786a3d64..23d59eab77d12 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java @@ -6,12 +6,9 @@ import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.InternalObjectNode; -import com.azure.cosmos.models.FeedResponse; -import com.azure.cosmos.models.ModelBridgeInternal; -import com.azure.cosmos.models.PartitionKey; -import com.azure.cosmos.models.SqlParameter; -import com.azure.cosmos.models.SqlQuerySpec; +import com.azure.cosmos.implementation.changefeed.CancellationToken; import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient; +import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException; import com.azure.cosmos.implementation.changefeed.Lease; import com.azure.cosmos.implementation.changefeed.LeaseStore; import com.azure.cosmos.implementation.changefeed.LeaseStoreManager; @@ -20,6 +17,11 @@ import com.azure.cosmos.implementation.changefeed.ServiceItemLease; import com.azure.cosmos.implementation.changefeed.ServiceItemLeaseUpdater; import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.ModelBridgeInternal; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.SqlParameter; +import com.azure.cosmos.models.SqlQuerySpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -355,7 +357,7 @@ public Mono updateProperties(Lease lease) { } @Override - public Mono checkpoint(Lease lease, String continuationToken) { + public Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken) { if (lease == null) { throw new IllegalArgumentException("lease"); } @@ -364,28 +366,33 @@ public Mono checkpoint(Lease lease, String continuationToken) { throw new IllegalArgumentException("continuationToken must be a non-empty string"); } + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + return this.leaseDocumentClient.readItem(lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), InternalObjectNode.class) .map( documentResourceResponse -> ServiceItemLease.fromDocument(BridgeInternal.getProperties(documentResourceResponse))) - .flatMap( refreshedLease -> this.leaseUpdater.updateLease( - refreshedLease, - lease.getId(), new PartitionKey(lease.getId()), - this.requestOptionsFactory.createItemRequestOptions(lease), - serverLease -> { - if (serverLease.getOwner() == null) { - logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken()); - throw new LeaseLostException(lease); - } - else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { - logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); - throw new LeaseLostException(lease); - } - serverLease.setContinuationToken(continuationToken); - - return serverLease; - })) + .flatMap( refreshedLease -> { + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + + return this.leaseUpdater.updateLease( + refreshedLease, + lease.getId(), new PartitionKey(lease.getId()), + this.requestOptionsFactory.createItemRequestOptions(lease), + serverLease -> { + if (serverLease.getOwner() == null) { + logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken()); + throw new LeaseLostException(lease); + } else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { + logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); + throw new LeaseLostException(lease); + } + serverLease.setContinuationToken(continuationToken); + + return serverLease; + }); + }) .doOnError(throwable -> { logger.info("Partition {} lease with token '{}' failed to checkpoint for owner '{}' with continuation token '{}'", lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner(), lease.getContinuationToken()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java index b6ef9f4858ad4..dce2d084fe22c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java @@ -2,11 +2,14 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation.changefeed.implementation; +import com.azure.cosmos.implementation.changefeed.CancellationToken; import com.azure.cosmos.implementation.changefeed.Lease; import com.azure.cosmos.implementation.changefeed.LeaseCheckpointer; import com.azure.cosmos.implementation.changefeed.PartitionCheckpointer; +import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; @@ -19,6 +22,7 @@ class PartitionCheckpointerImpl implements PartitionCheckpointer { private final Logger logger = LoggerFactory.getLogger(PartitionCheckpointerImpl.class); private final LeaseCheckpointer leaseCheckpointer; private Lease lease; + private CancellationToken cancellationToken; public PartitionCheckpointerImpl(LeaseCheckpointer leaseCheckpointer, Lease lease) { this.leaseCheckpointer = leaseCheckpointer; @@ -31,13 +35,23 @@ public Mono checkpointPartition(ChangeFeedState continuationState) { checkArgument( continuationState.getContinuation().getContinuationTokenCount() == 1, "For ChangeFeedProcessor the continuation state should always have one range/continuation"); + + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + return this.leaseCheckpointer.checkpoint( this.lease, - continuationState.getContinuation().getCurrentContinuationToken().getToken()) + continuationState.getContinuation().getCurrentContinuationToken().getToken(), + cancellationToken) .map(lease1 -> { this.lease = lease1; logger.info("Checkpoint: partition {}, new continuation {}", this.lease.getLeaseToken(), this.lease.getContinuationToken()); return lease1; }); } + + @Override + public PartitionCheckpointer setCancellationToken(CancellationToken cancellationToken) { + this.cancellationToken = cancellationToken; + return this; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java index 1366651d57433..0c6f82270c408 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java @@ -113,6 +113,6 @@ public PartitionProcessor create(Lease lease, ChangeFeedObserver observer) { .withMaxItemCount(this.changeFeedProcessorOptions.getMaxItemCount()); PartitionCheckpointer checkpointer = new PartitionCheckpointerImpl(this.leaseCheckpointer, lease); - return new PartitionProcessorImpl(observer, this.documentClient, settings, checkpointer); + return new PartitionProcessorImpl(observer, this.documentClient, settings, checkpointer, lease); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java index b13dbdaff1bed..04d973181256f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java @@ -4,6 +4,7 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.CosmosSchedulers; +import com.azure.cosmos.implementation.changefeed.Lease; import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; @@ -45,6 +46,7 @@ class PartitionProcessorImpl implements PartitionProcessor { private final ChangeFeedObserver observer; private volatile CosmosChangeFeedRequestOptions options; private final ChangeFeedContextClient documentClient; + private final Lease lease; private volatile RuntimeException resultException; private volatile String lastServerContinuationToken; @@ -53,11 +55,13 @@ class PartitionProcessorImpl implements PartitionProcessor { public PartitionProcessorImpl(ChangeFeedObserver observer, ChangeFeedContextClient documentClient, ProcessorSettings settings, - PartitionCheckpointer checkpointer) { + PartitionCheckpointer checkpointer, + Lease lease) { this.observer = observer; this.documentClient = documentClient; this.settings = settings; this.checkpointer = checkpointer; + this.lease = lease; ChangeFeedState state = settings.getStartState(); this.options = ModelBridgeInternal.createChangeFeedRequestOptionsForChangeFeedState(state); @@ -66,7 +70,9 @@ public PartitionProcessorImpl(ChangeFeedObserver observer, @Override public Mono run(CancellationToken cancellationToken) { + logger.info("Partition {}: processing task started with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner()); this.isFirstQueryForChangeFeeds = true; + this.checkpointer.setCancellationToken(cancellationToken); return Flux.just(this) .flatMap( value -> { @@ -109,6 +115,7 @@ public Mono run(CancellationToken cancellationToken) { .getToken(); if (documentFeedResponse.getResults() != null && documentFeedResponse.getResults().size() > 0) { + logger.info("Partition {}: processing {} feeds with owner {}.", this.lease.getLeaseToken(), documentFeedResponse.getResults().size(), this.lease.getOwner()); return this.dispatchChanges(documentFeedResponse, continuationState) .doOnError(throwable -> logger.debug( "Exception was thrown from thread {}", @@ -145,8 +152,8 @@ public Mono run(CancellationToken cancellationToken) { // we know it is a terminal event. CosmosException clientException = (CosmosException) throwable; - logger.warn("CosmosException: FeedRange {} from thread {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId(), clientException); + logger.warn("CosmosException: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), clientException); StatusCodeErrorType docDbError = ExceptionClassifier.classifyClientException(clientException); switch (docDbError) { @@ -199,15 +206,16 @@ public Mono run(CancellationToken cancellationToken) { } } } else if (throwable instanceof LeaseLostException) { - logger.info("LeaseLoseException with FeedRange {} from thread {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId()); - this.resultException = (LeaseLostException) throwable; + logger.info("LeaseLoseException with Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner()); + this.resultException = (LeaseLostException) throwable; } else if (throwable instanceof TaskCancelledException) { - logger.debug("Task cancelled exception: FeedRange {} from {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId(), throwable); + logger.debug("Task cancelled exception: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable); this.resultException = (TaskCancelledException) throwable; } else { - logger.warn("Unexpected exception from thread {}", Thread.currentThread().getId(), throwable); + logger.warn("Unexpected exception: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable); this.resultException = new RuntimeException(throwable); } return Flux.error(throwable); @@ -226,7 +234,11 @@ public Mono run(CancellationToken cancellationToken) { } return Flux.empty(); - }).then(); + }) + .then() + .doFinally( any -> { + logger.info("Partition {}: processing task exited with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner()); + }); } private FeedRangePartitionKeyRangeImpl getPkRangeFeedRangeFromStartState() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java index 31827d6dc0b4d..3f65384ca451f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java @@ -78,18 +78,22 @@ private Mono afterRun(ChangeFeedObserverContext context, CancellationToken this.childShutdownCts.cancel(); - if (this.processor.getResultException() != null) { - throw this.processor.getResultException(); - } - - if (this.renewer.getResultException() != null) { - throw this.renewer.getResultException(); - } - closeReason = shutdownToken.isCancellationRequested() ? ChangeFeedObserverCloseReason.SHUTDOWN : ChangeFeedObserverCloseReason.UNKNOWN; + RuntimeException workerException = this.processor.getResultException(); + + // Priority must be given to any exception from the processor worker unless it is a task being cancelled. + if (workerException == null || workerException instanceof TaskCancelledException) { + if (this.renewer.getResultException() != null) { + workerException = this.renewer.getResultException(); + } + } + + if (workerException != null) { + throw workerException; + } } catch (LeaseLostException llex) { closeReason = ChangeFeedObserverCloseReason.LEASE_LOST; this.resultException = llex; diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java index 2edbbf69fe94c..e2308513ecd9a 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java @@ -68,6 +68,17 @@ public void sessionContainer() throws Exception { sessionToken = sessionContainer.resolvePartitionLocalSessionToken(request, resolvedPKRange.getId()); assertThat(sessionToken.getLSN()).isEqualTo(2); + + String globalSessionToken = sessionContainer.resolveGlobalSessionToken(request); + assertThat(globalSessionToken).isEqualTo("range_0:1#0#4=90#5=2,range_1:1#1#4=90#5=2,range_4:1#4#4=90#5=2,range_2:1#2#4=90#5=2,range_3:1#3#4=90#5=2"); + + request.getHeaders().put(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID, "not_found"); + globalSessionToken = sessionContainer.resolveGlobalSessionToken(request); + assertThat(globalSessionToken).isEqualTo("range_0:1#0#4=90#5=2,range_1:1#1#4=90#5=2,range_4:1#4#4=90#5=2,range_2:1#2#4=90#5=2,range_3:1#3#4=90#5=2"); + + request.getHeaders().put(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID, "range_1"); + globalSessionToken = sessionContainer.resolveGlobalSessionToken(request); + assertThat(globalSessionToken).isEqualTo("range_1:1#1#4=90#5=2"); } @Test(groups = "unit") diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java index aeb4444743634..b014dfcc83762 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java @@ -2,7 +2,6 @@ // Licensed under the MIT License. package com.azure.cosmos.rx; -import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ChangeFeedProcessor; import com.azure.cosmos.ChangeFeedProcessorBuilder; import com.azure.cosmos.implementation.AsyncDocumentClient; @@ -15,6 +14,7 @@ import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.SqlParameter; @@ -449,11 +449,11 @@ public void staledLeaseAcquiring() throws InterruptedException { ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); leaseDocument.setOwner("TEMP_OWNER"); CosmosItemRequestOptions options = new CosmosItemRequestOptions(); - return createdLeaseCollection.replaceItem(doc, doc.getId(), new PartitionKey(doc.getId()), options) - .map(itemResponse -> BridgeInternal.getProperties(itemResponse)); + + return createdLeaseCollection.replaceItem(leaseDocument, leaseDocument.getId(), new PartitionKey(leaseDocument.getId()), options) + .map(CosmosItemResponse::getItem); }) - .map(ServiceItemLease::fromDocument) - .map(leaseDocument -> { + .map(leaseDocument -> { ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; found host {}", leaseDocument.getOwner()); return leaseDocument; }) @@ -511,7 +511,6 @@ public void staledLeaseAcquiring() throws InterruptedException { @Test(groups = { "emulator" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void ownerNullAcquiring() throws InterruptedException { final String ownerFirst = "Owner_First"; - final String ownerSecond = "Owner_Second"; final String leasePrefix = "TEST"; CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); @@ -519,55 +518,34 @@ public void ownerNullAcquiring() throws InterruptedException { try { List createdDocuments = new ArrayList<>(); Map receivedDocuments = new ConcurrentHashMap<>(); - setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, FEED_COUNT); ChangeFeedProcessor changeFeedProcessorFirst = new ChangeFeedProcessorBuilder() .hostName(ownerFirst) .handleChanges(docs -> { ChangeFeedProcessorTest.log.info("START processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); - try { - Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted exception", e); - } - ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); - }) - .feedContainer(createdFeedCollection) - .leaseContainer(createdLeaseCollection) - .options(new ChangeFeedProcessorOptions() - .setLeasePrefix(leasePrefix) - .setLeaseRenewInterval(Duration.ofSeconds(1)) - .setLeaseAcquireInterval(Duration.ofSeconds(2)) - .setLeaseExpirationInterval(Duration.ofSeconds(20)) - .setFeedPollDelay(Duration.ofSeconds(1)) - ) - .buildChangeFeedProcessor(); - - ChangeFeedProcessor changeFeedProcessorSecond = new ChangeFeedProcessorBuilder() - .hostName(ownerSecond) - .handleChanges((List docs) -> { - ChangeFeedProcessorTest.log.info("START processing from thread {} using host {}", Thread.currentThread().getId(), ownerSecond); for (JsonNode item : docs) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } processItem(item, receivedDocuments); } - ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerSecond); + ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); }) .feedContainer(createdFeedCollection) .leaseContainer(createdLeaseCollection) .options(new ChangeFeedProcessorOptions() - .setLeaseRenewInterval(Duration.ofSeconds(10)) - .setLeaseAcquireInterval(Duration.ofSeconds(5)) - .setLeaseExpirationInterval(Duration.ofSeconds(20)) - .setFeedPollDelay(Duration.ofSeconds(2)) - .setLeasePrefix(leasePrefix) - .setMaxItemCount(10) .setStartFromBeginning(true) - .setMaxScaleCount(0) // unlimited + .setLeasePrefix(leasePrefix) + .setLeaseRenewInterval(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setLeaseAcquireInterval(Duration.ofMillis(5 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setLeaseExpirationInterval(Duration.ofMillis(6 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setFeedPollDelay(Duration.ofSeconds(5)) ) .buildChangeFeedProcessor(); try { - ChangeFeedProcessorTest.log.info("Start creating documents"); + ChangeFeedProcessorTest.log.info("Start more creating documents"); List docDefList = new ArrayList<>(); for (int i = 0; i < FEED_COUNT; i++) { @@ -583,59 +561,53 @@ public void ownerNullAcquiring() throws InterruptedException { }) .then( Mono.just(changeFeedProcessorFirst) - .flatMap( value -> { - try { - Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted exception", e); - } - ChangeFeedProcessorTest.log.info("Update leases for Change feed processor in thread {} using host {}", Thread.currentThread().getId(), "Owner_first"); - - SqlParameter param1 = new SqlParameter(); - param1.setName("@PartitionLeasePrefix"); - param1.setValue(leasePrefix); - SqlParameter param2 = new SqlParameter(); - param2.setName("@Owner"); - param2.setValue(ownerFirst); - - SqlQuerySpec querySpec = new SqlQuerySpec( - "SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix) AND c.Owner=@Owner", Arrays.asList(param1, param2)); - - CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); - - return createdLeaseCollection.queryItems(querySpec, cosmosQueryRequestOptions, InternalObjectNode.class).byPage() - .flatMap(documentFeedResponse -> reactor.core.publisher.Flux.fromIterable(documentFeedResponse.getResults())) - .flatMap(doc -> { - ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); - leaseDocument.setOwner(null); - CosmosItemRequestOptions options = new CosmosItemRequestOptions(); - return createdLeaseCollection.replaceItem(doc, doc.getId(), new PartitionKey(doc.getId()), options) - .map(itemResponse -> BridgeInternal.getProperties(itemResponse)); - }) - .map(ServiceItemLease::fromDocument) - .map(leaseDocument -> { - ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; setting host to '{}'", leaseDocument.getOwner()); - return leaseDocument; - }) - .last() - .flatMap(leaseDocument -> { - ChangeFeedProcessorTest.log.info("Start creating documents"); - List docDefList1 = new ArrayList<>(); - - for (int i = 0; i < FEED_COUNT; i++) { - docDefList1.add(getDocumentDefinition()); - } - - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) - .last() - .delayElement(Duration.ofMillis(1000)) - .flatMap(cosmosItemResponse -> { - ChangeFeedProcessorTest.log.info("Start second Change feed processor"); - return changeFeedProcessorSecond.start().subscribeOn(Schedulers.elastic()) - .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)); - }); - }); - })) + .flatMap( value -> { + ChangeFeedProcessorTest.log.info("Update leases for Change feed processor in thread {} using host {}", Thread.currentThread().getId(), "Owner_first"); + try { + Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); + } catch (InterruptedException ignored) { + } + + ChangeFeedProcessorTest.log.info("QueryItems before Change feed processor processing"); + + SqlParameter param1 = new SqlParameter(); + param1.setName("@PartitionLeasePrefix"); + param1.setValue(leasePrefix); + SqlParameter param2 = new SqlParameter(); + param2.setName("@Owner"); + param2.setValue(ownerFirst); + + SqlQuerySpec querySpec = new SqlQuerySpec( + "SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix) AND c.Owner=@Owner", Arrays.asList(param1, param2)); + + CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); + + return createdLeaseCollection.queryItems(querySpec, cosmosQueryRequestOptions, InternalObjectNode.class).byPage() + .flatMap(documentFeedResponse -> reactor.core.publisher.Flux.fromIterable(documentFeedResponse.getResults())) + .flatMap(doc -> { + ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); + leaseDocument.setOwner(null); + CosmosItemRequestOptions options = new CosmosItemRequestOptions(); + return createdLeaseCollection.replaceItem(leaseDocument, leaseDocument.getId(), new PartitionKey(leaseDocument.getId()), options) + .map(CosmosItemResponse::getItem); + }) + .map(leaseDocument -> { + ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; current Owner is'{}'", leaseDocument.getOwner()); + return leaseDocument; + }) + .last() + .flatMap(leaseDocument -> { + ChangeFeedProcessorTest.log.info("Start creating more documents"); + List docDefList1 = new ArrayList<>(); + + for (int i = 0; i < FEED_COUNT; i++) { + docDefList1.add(getDocumentDefinition()); + } + + return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + .last(); + }); + })) .subscribe(); } catch (Exception ex) { log.error("First change feed processor did not start in the expected time", ex); @@ -643,17 +615,18 @@ public void ownerNullAcquiring() throws InterruptedException { } long remainingWork = 20 * CHANGE_FEED_PROCESSOR_TIMEOUT; - while (remainingWork > 0 && changeFeedProcessorFirst.isStarted() && changeFeedProcessorSecond.isStarted()) { + while (remainingWork > 0 && !changeFeedProcessorFirst.isStarted()) { remainingWork -= 100; Thread.sleep(100); } // Wait for the feed processor to receive and process the documents. - waitToReceiveDocuments(receivedDocuments, 10 * CHANGE_FEED_PROCESSOR_TIMEOUT, 2 * FEED_COUNT); + waitToReceiveDocuments(receivedDocuments, 30 * CHANGE_FEED_PROCESSOR_TIMEOUT, 2 * FEED_COUNT); - assertThat(changeFeedProcessorSecond.isStarted()).as("Change Feed Processor instance is running").isTrue(); + assertThat(changeFeedProcessorFirst.isStarted()).as("Change Feed Processor instance is running").isTrue(); + + Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - changeFeedProcessorSecond.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); changeFeedProcessorFirst.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); // Wait for the feed processor to shutdown. @@ -870,7 +843,7 @@ private void waitToReceiveDocuments(Map receivedDocuments, lon Thread.sleep(100); } - assertThat(remainingWork >= 0).as("Failed to receive all the feed documents").isTrue(); + assertThat(remainingWork > 0).as("Failed to receive all the feed documents").isTrue(); } private Consumer> leasesChangeFeedProcessorHandler(LeaseStateMonitor leaseStateMonitor) {