From 795cf738e00058a938a26e9ce1dd6c630e74c51b Mon Sep 17 00:00:00 2001 From: milismsft-mac Date: Wed, 17 Nov 2021 20:37:30 -0800 Subject: [PATCH 1/5] Session token optimization and ChangeFeedProcessor bug fixes By default we don't scope the global session token in Gateway when partition key ID is part of the request. For containers with a very large number of partitions and multi-region account setting, this can result in "Request headers too long" errors. The change is to look first at the current request's headers and if the request is scoped to one specific partition, we should also scope the session token to that specific partition. ChangeFeedProcessor related changes: when the renewer task is cancelled, we also need to close the processor task even if we are about to check-point the current state. The cange also ensures that any exceptions are prioritized accordinally. Added more logs to CFP to allow for better monitoring of the current processing. Added test case for the scoped session token and simplified the existing CFP test. --- .../implementation/SessionContainer.java | 7 + .../changefeed/LeaseCheckpointer.java | 3 +- .../changefeed/LeaseStoreManager.java | 3 +- .../changefeed/PartitionCheckpointer.java | 8 + .../implementation/LeaseStoreManagerImpl.java | 43 +++-- .../PartitionCheckpointerImpl.java | 16 +- .../PartitionProcessorFactoryImpl.java | 2 +- .../PartitionProcessorImpl.java | 30 +++- .../PartitionSupervisorImpl.java | 20 ++- .../implementation/SessionContainerTest.java | 11 ++ .../cosmos/rx/ChangeFeedProcessorTest.java | 164 ++++++++---------- 11 files changed, 172 insertions(+), 135 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java index b329bec6200a5..51439b6afeea4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionContainer.java @@ -109,6 +109,13 @@ private ConcurrentHashMap getPartitionKeyRangeIdToTokenMa public String resolveGlobalSessionToken(RxDocumentServiceRequest request) { ConcurrentHashMap partitionKeyRangeIdToTokenMap = this.getPartitionKeyRangeIdToTokenMap(request); if (partitionKeyRangeIdToTokenMap != null) { + String partitionKeyRangeId = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID); + if (StringUtils.isNotEmpty(partitionKeyRangeId) && partitionKeyRangeIdToTokenMap.get(partitionKeyRangeId) != null) { + StringBuilder result = new StringBuilder() + .append(partitionKeyRangeId).append(":").append(partitionKeyRangeIdToTokenMap.get(partitionKeyRangeId).convertToString()); + return result.toString(); + } + return SessionContainer.getCombinedSessionToken(partitionKeyRangeIdToTokenMap); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java index cd16a7da0aae0..4c7f9c5f3e2ed 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java @@ -15,7 +15,8 @@ public interface LeaseCheckpointer { * * @param lease the lease to renew. * @param continuationToken the continuation token. + * @param cancellationToken the cancellation token. * @return the updated renewed lease. */ - Mono checkpoint(Lease lease, String continuationToken); + Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java index f5cba3a4d9e9a..509d2fa09785f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java @@ -100,9 +100,10 @@ static LeaseStoreManagerBuilderDefinition builder() { * * @param lease the Lease to renew. * @param continuationToken the continuation token. + * @param cancellationToken the cancellation token. * @return the updated renewed lease. */ - Mono checkpoint(Lease lease, String continuationToken); + Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken); /** * @return true if the lease store is initialized. diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java index 8b16bae7d896e..4662a78814609 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java @@ -16,4 +16,12 @@ public interface PartitionCheckpointer { * @return a deferred operation of this call. */ Mono checkpointPartition(ChangeFeedState continuationState); + + /** + * Sets the cancelation token in case we need to bail out before check-pointing. + * + * @param cancellationToken the cancellation token. + * @return this instance. + */ + PartitionCheckpointer setCancellationToken(CancellationToken cancellationToken); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java index c092f786a3d64..f1f0dcb6029e0 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java @@ -6,6 +6,8 @@ import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.InternalObjectNode; +import com.azure.cosmos.implementation.changefeed.CancellationToken; +import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.PartitionKey; @@ -355,7 +357,7 @@ public Mono updateProperties(Lease lease) { } @Override - public Mono checkpoint(Lease lease, String continuationToken) { + public Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken) { if (lease == null) { throw new IllegalArgumentException("lease"); } @@ -364,28 +366,33 @@ public Mono checkpoint(Lease lease, String continuationToken) { throw new IllegalArgumentException("continuationToken must be a non-empty string"); } + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + return this.leaseDocumentClient.readItem(lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), InternalObjectNode.class) .map( documentResourceResponse -> ServiceItemLease.fromDocument(BridgeInternal.getProperties(documentResourceResponse))) - .flatMap( refreshedLease -> this.leaseUpdater.updateLease( - refreshedLease, - lease.getId(), new PartitionKey(lease.getId()), - this.requestOptionsFactory.createItemRequestOptions(lease), - serverLease -> { - if (serverLease.getOwner() == null) { - logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken()); - throw new LeaseLostException(lease); - } - else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { - logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); - throw new LeaseLostException(lease); - } - serverLease.setContinuationToken(continuationToken); - - return serverLease; - })) + .flatMap( refreshedLease -> { + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + + return this.leaseUpdater.updateLease( + refreshedLease, + lease.getId(), new PartitionKey(lease.getId()), + this.requestOptionsFactory.createItemRequestOptions(lease), + serverLease -> { + if (serverLease.getOwner() == null) { + logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken()); + throw new LeaseLostException(lease); + } else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { + logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); + throw new LeaseLostException(lease); + } + serverLease.setContinuationToken(continuationToken); + + return serverLease; + }); + }) .doOnError(throwable -> { logger.info("Partition {} lease with token '{}' failed to checkpoint for owner '{}' with continuation token '{}'", lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner(), lease.getContinuationToken()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java index b6ef9f4858ad4..dce2d084fe22c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java @@ -2,11 +2,14 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation.changefeed.implementation; +import com.azure.cosmos.implementation.changefeed.CancellationToken; import com.azure.cosmos.implementation.changefeed.Lease; import com.azure.cosmos.implementation.changefeed.LeaseCheckpointer; import com.azure.cosmos.implementation.changefeed.PartitionCheckpointer; +import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; @@ -19,6 +22,7 @@ class PartitionCheckpointerImpl implements PartitionCheckpointer { private final Logger logger = LoggerFactory.getLogger(PartitionCheckpointerImpl.class); private final LeaseCheckpointer leaseCheckpointer; private Lease lease; + private CancellationToken cancellationToken; public PartitionCheckpointerImpl(LeaseCheckpointer leaseCheckpointer, Lease lease) { this.leaseCheckpointer = leaseCheckpointer; @@ -31,13 +35,23 @@ public Mono checkpointPartition(ChangeFeedState continuationState) { checkArgument( continuationState.getContinuation().getContinuationTokenCount() == 1, "For ChangeFeedProcessor the continuation state should always have one range/continuation"); + + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + return this.leaseCheckpointer.checkpoint( this.lease, - continuationState.getContinuation().getCurrentContinuationToken().getToken()) + continuationState.getContinuation().getCurrentContinuationToken().getToken(), + cancellationToken) .map(lease1 -> { this.lease = lease1; logger.info("Checkpoint: partition {}, new continuation {}", this.lease.getLeaseToken(), this.lease.getContinuationToken()); return lease1; }); } + + @Override + public PartitionCheckpointer setCancellationToken(CancellationToken cancellationToken) { + this.cancellationToken = cancellationToken; + return this; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java index 1366651d57433..0c6f82270c408 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java @@ -113,6 +113,6 @@ public PartitionProcessor create(Lease lease, ChangeFeedObserver observer) { .withMaxItemCount(this.changeFeedProcessorOptions.getMaxItemCount()); PartitionCheckpointer checkpointer = new PartitionCheckpointerImpl(this.leaseCheckpointer, lease); - return new PartitionProcessorImpl(observer, this.documentClient, settings, checkpointer); + return new PartitionProcessorImpl(observer, this.documentClient, settings, checkpointer, lease); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java index b13dbdaff1bed..45e0488554c06 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java @@ -4,6 +4,7 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.CosmosSchedulers; +import com.azure.cosmos.implementation.changefeed.Lease; import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; @@ -45,6 +46,7 @@ class PartitionProcessorImpl implements PartitionProcessor { private final ChangeFeedObserver observer; private volatile CosmosChangeFeedRequestOptions options; private final ChangeFeedContextClient documentClient; + private final Lease lease; private volatile RuntimeException resultException; private volatile String lastServerContinuationToken; @@ -53,11 +55,13 @@ class PartitionProcessorImpl implements PartitionProcessor { public PartitionProcessorImpl(ChangeFeedObserver observer, ChangeFeedContextClient documentClient, ProcessorSettings settings, - PartitionCheckpointer checkpointer) { + PartitionCheckpointer checkpointer, + Lease lease) { this.observer = observer; this.documentClient = documentClient; this.settings = settings; this.checkpointer = checkpointer; + this.lease = lease; ChangeFeedState state = settings.getStartState(); this.options = ModelBridgeInternal.createChangeFeedRequestOptionsForChangeFeedState(state); @@ -66,7 +70,9 @@ public PartitionProcessorImpl(ChangeFeedObserver observer, @Override public Mono run(CancellationToken cancellationToken) { + logger.info("Partition {}: processing task started with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner()); this.isFirstQueryForChangeFeeds = true; + this.checkpointer.setCancellationToken(cancellationToken); return Flux.just(this) .flatMap( value -> { @@ -109,6 +115,7 @@ public Mono run(CancellationToken cancellationToken) { .getToken(); if (documentFeedResponse.getResults() != null && documentFeedResponse.getResults().size() > 0) { + logger.info("Partition {}: processing {} feeds with owner {}.", this.lease.getLeaseToken(), documentFeedResponse.getResults().size(), this.lease.getOwner()); return this.dispatchChanges(documentFeedResponse, continuationState) .doOnError(throwable -> logger.debug( "Exception was thrown from thread {}", @@ -145,8 +152,8 @@ public Mono run(CancellationToken cancellationToken) { // we know it is a terminal event. CosmosException clientException = (CosmosException) throwable; - logger.warn("CosmosException: FeedRange {} from thread {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId(), clientException); + logger.warn("CosmosException: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), clientException); StatusCodeErrorType docDbError = ExceptionClassifier.classifyClientException(clientException); switch (docDbError) { @@ -199,15 +206,16 @@ public Mono run(CancellationToken cancellationToken) { } } } else if (throwable instanceof LeaseLostException) { - logger.info("LeaseLoseException with FeedRange {} from thread {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId()); + logger.info("LeaseLoseException with Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner()); this.resultException = (LeaseLostException) throwable; } else if (throwable instanceof TaskCancelledException) { - logger.debug("Task cancelled exception: FeedRange {} from {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId(), throwable); + logger.debug("Task cancelled exception: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable); this.resultException = (TaskCancelledException) throwable; } else { - logger.warn("Unexpected exception from thread {}", Thread.currentThread().getId(), throwable); + logger.warn("Unexpected exception: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable); this.resultException = new RuntimeException(throwable); } return Flux.error(throwable); @@ -226,7 +234,11 @@ public Mono run(CancellationToken cancellationToken) { } return Flux.empty(); - }).then(); + }) + .then() + .doFinally( any -> { + logger.info("Partition {}: processing task exited with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner()); + }); } private FeedRangePartitionKeyRangeImpl getPkRangeFeedRangeFromStartState() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java index 31827d6dc0b4d..3f65384ca451f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java @@ -78,18 +78,22 @@ private Mono afterRun(ChangeFeedObserverContext context, CancellationToken this.childShutdownCts.cancel(); - if (this.processor.getResultException() != null) { - throw this.processor.getResultException(); - } - - if (this.renewer.getResultException() != null) { - throw this.renewer.getResultException(); - } - closeReason = shutdownToken.isCancellationRequested() ? ChangeFeedObserverCloseReason.SHUTDOWN : ChangeFeedObserverCloseReason.UNKNOWN; + RuntimeException workerException = this.processor.getResultException(); + + // Priority must be given to any exception from the processor worker unless it is a task being cancelled. + if (workerException == null || workerException instanceof TaskCancelledException) { + if (this.renewer.getResultException() != null) { + workerException = this.renewer.getResultException(); + } + } + + if (workerException != null) { + throw workerException; + } } catch (LeaseLostException llex) { closeReason = ChangeFeedObserverCloseReason.LEASE_LOST; this.resultException = llex; diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java index 2edbbf69fe94c..e2308513ecd9a 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/SessionContainerTest.java @@ -68,6 +68,17 @@ public void sessionContainer() throws Exception { sessionToken = sessionContainer.resolvePartitionLocalSessionToken(request, resolvedPKRange.getId()); assertThat(sessionToken.getLSN()).isEqualTo(2); + + String globalSessionToken = sessionContainer.resolveGlobalSessionToken(request); + assertThat(globalSessionToken).isEqualTo("range_0:1#0#4=90#5=2,range_1:1#1#4=90#5=2,range_4:1#4#4=90#5=2,range_2:1#2#4=90#5=2,range_3:1#3#4=90#5=2"); + + request.getHeaders().put(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID, "not_found"); + globalSessionToken = sessionContainer.resolveGlobalSessionToken(request); + assertThat(globalSessionToken).isEqualTo("range_0:1#0#4=90#5=2,range_1:1#1#4=90#5=2,range_4:1#4#4=90#5=2,range_2:1#2#4=90#5=2,range_3:1#3#4=90#5=2"); + + request.getHeaders().put(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID, "range_1"); + globalSessionToken = sessionContainer.resolveGlobalSessionToken(request); + assertThat(globalSessionToken).isEqualTo("range_1:1#1#4=90#5=2"); } @Test(groups = "unit") diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java index aeb4444743634..fccb84e29ae2e 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java @@ -2,7 +2,6 @@ // Licensed under the MIT License. package com.azure.cosmos.rx; -import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ChangeFeedProcessor; import com.azure.cosmos.ChangeFeedProcessorBuilder; import com.azure.cosmos.implementation.AsyncDocumentClient; @@ -15,6 +14,7 @@ import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.SqlParameter; @@ -449,11 +449,10 @@ public void staledLeaseAcquiring() throws InterruptedException { ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); leaseDocument.setOwner("TEMP_OWNER"); CosmosItemRequestOptions options = new CosmosItemRequestOptions(); - return createdLeaseCollection.replaceItem(doc, doc.getId(), new PartitionKey(doc.getId()), options) - .map(itemResponse -> BridgeInternal.getProperties(itemResponse)); + return createdLeaseCollection.replaceItem(leaseDocument, leaseDocument.getId(), new PartitionKey(leaseDocument.getId()), options) + .map(CosmosItemResponse::getItem); }) - .map(ServiceItemLease::fromDocument) - .map(leaseDocument -> { + .map(leaseDocument -> { ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; found host {}", leaseDocument.getOwner()); return leaseDocument; }) @@ -511,7 +510,6 @@ public void staledLeaseAcquiring() throws InterruptedException { @Test(groups = { "emulator" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void ownerNullAcquiring() throws InterruptedException { final String ownerFirst = "Owner_First"; - final String ownerSecond = "Owner_Second"; final String leasePrefix = "TEST"; CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); @@ -519,55 +517,34 @@ public void ownerNullAcquiring() throws InterruptedException { try { List createdDocuments = new ArrayList<>(); Map receivedDocuments = new ConcurrentHashMap<>(); - setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, FEED_COUNT); ChangeFeedProcessor changeFeedProcessorFirst = new ChangeFeedProcessorBuilder() .hostName(ownerFirst) .handleChanges(docs -> { ChangeFeedProcessorTest.log.info("START processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); - try { - Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted exception", e); - } - ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); - }) - .feedContainer(createdFeedCollection) - .leaseContainer(createdLeaseCollection) - .options(new ChangeFeedProcessorOptions() - .setLeasePrefix(leasePrefix) - .setLeaseRenewInterval(Duration.ofSeconds(1)) - .setLeaseAcquireInterval(Duration.ofSeconds(2)) - .setLeaseExpirationInterval(Duration.ofSeconds(20)) - .setFeedPollDelay(Duration.ofSeconds(1)) - ) - .buildChangeFeedProcessor(); - - ChangeFeedProcessor changeFeedProcessorSecond = new ChangeFeedProcessorBuilder() - .hostName(ownerSecond) - .handleChanges((List docs) -> { - ChangeFeedProcessorTest.log.info("START processing from thread {} using host {}", Thread.currentThread().getId(), ownerSecond); for (JsonNode item : docs) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } processItem(item, receivedDocuments); } - ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerSecond); + ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); }) .feedContainer(createdFeedCollection) .leaseContainer(createdLeaseCollection) .options(new ChangeFeedProcessorOptions() - .setLeaseRenewInterval(Duration.ofSeconds(10)) - .setLeaseAcquireInterval(Duration.ofSeconds(5)) - .setLeaseExpirationInterval(Duration.ofSeconds(20)) - .setFeedPollDelay(Duration.ofSeconds(2)) - .setLeasePrefix(leasePrefix) - .setMaxItemCount(10) .setStartFromBeginning(true) - .setMaxScaleCount(0) // unlimited + .setLeasePrefix(leasePrefix) + .setLeaseRenewInterval(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setLeaseAcquireInterval(Duration.ofMillis(5 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setLeaseExpirationInterval(Duration.ofMillis(6 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setFeedPollDelay(Duration.ofSeconds(5)) ) .buildChangeFeedProcessor(); try { - ChangeFeedProcessorTest.log.info("Start creating documents"); + ChangeFeedProcessorTest.log.info("Start more creating documents"); List docDefList = new ArrayList<>(); for (int i = 0; i < FEED_COUNT; i++) { @@ -583,59 +560,53 @@ public void ownerNullAcquiring() throws InterruptedException { }) .then( Mono.just(changeFeedProcessorFirst) - .flatMap( value -> { - try { - Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted exception", e); - } - ChangeFeedProcessorTest.log.info("Update leases for Change feed processor in thread {} using host {}", Thread.currentThread().getId(), "Owner_first"); - - SqlParameter param1 = new SqlParameter(); - param1.setName("@PartitionLeasePrefix"); - param1.setValue(leasePrefix); - SqlParameter param2 = new SqlParameter(); - param2.setName("@Owner"); - param2.setValue(ownerFirst); - - SqlQuerySpec querySpec = new SqlQuerySpec( - "SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix) AND c.Owner=@Owner", Arrays.asList(param1, param2)); - - CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); - - return createdLeaseCollection.queryItems(querySpec, cosmosQueryRequestOptions, InternalObjectNode.class).byPage() - .flatMap(documentFeedResponse -> reactor.core.publisher.Flux.fromIterable(documentFeedResponse.getResults())) - .flatMap(doc -> { - ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); - leaseDocument.setOwner(null); - CosmosItemRequestOptions options = new CosmosItemRequestOptions(); - return createdLeaseCollection.replaceItem(doc, doc.getId(), new PartitionKey(doc.getId()), options) - .map(itemResponse -> BridgeInternal.getProperties(itemResponse)); - }) - .map(ServiceItemLease::fromDocument) - .map(leaseDocument -> { - ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; setting host to '{}'", leaseDocument.getOwner()); - return leaseDocument; - }) - .last() - .flatMap(leaseDocument -> { - ChangeFeedProcessorTest.log.info("Start creating documents"); - List docDefList1 = new ArrayList<>(); - - for (int i = 0; i < FEED_COUNT; i++) { - docDefList1.add(getDocumentDefinition()); - } - - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) - .last() - .delayElement(Duration.ofMillis(1000)) - .flatMap(cosmosItemResponse -> { - ChangeFeedProcessorTest.log.info("Start second Change feed processor"); - return changeFeedProcessorSecond.start().subscribeOn(Schedulers.elastic()) - .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)); - }); - }); - })) + .flatMap( value -> { + ChangeFeedProcessorTest.log.info("Update leases for Change feed processor in thread {} using host {}", Thread.currentThread().getId(), "Owner_first"); + try { + Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); + } catch (InterruptedException ignored) { + } + + ChangeFeedProcessorTest.log.info("QueryItems before Change feed processor processing"); + + SqlParameter param1 = new SqlParameter(); + param1.setName("@PartitionLeasePrefix"); + param1.setValue(leasePrefix); + SqlParameter param2 = new SqlParameter(); + param2.setName("@Owner"); + param2.setValue(ownerFirst); + + SqlQuerySpec querySpec = new SqlQuerySpec( + "SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix) AND c.Owner=@Owner", Arrays.asList(param1, param2)); + + CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); + + return createdLeaseCollection.queryItems(querySpec, cosmosQueryRequestOptions, InternalObjectNode.class).byPage() + .flatMap(documentFeedResponse -> reactor.core.publisher.Flux.fromIterable(documentFeedResponse.getResults())) + .flatMap(doc -> { + ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); + leaseDocument.setOwner(null); + CosmosItemRequestOptions options = new CosmosItemRequestOptions(); + return createdLeaseCollection.replaceItem(leaseDocument, leaseDocument.getId(), new PartitionKey(leaseDocument.getId()), options) + .map(CosmosItemResponse::getItem); + }) + .map(leaseDocument -> { + ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; current Owner is'{}'", leaseDocument.getOwner()); + return leaseDocument; + }) + .last() + .flatMap(leaseDocument -> { + ChangeFeedProcessorTest.log.info("Start creating more documents"); + List docDefList1 = new ArrayList<>(); + + for (int i = 0; i < FEED_COUNT; i++) { + docDefList1.add(getDocumentDefinition()); + } + + return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + .last(); + }); + })) .subscribe(); } catch (Exception ex) { log.error("First change feed processor did not start in the expected time", ex); @@ -643,17 +614,18 @@ public void ownerNullAcquiring() throws InterruptedException { } long remainingWork = 20 * CHANGE_FEED_PROCESSOR_TIMEOUT; - while (remainingWork > 0 && changeFeedProcessorFirst.isStarted() && changeFeedProcessorSecond.isStarted()) { + while (remainingWork > 0 && !changeFeedProcessorFirst.isStarted()) { remainingWork -= 100; Thread.sleep(100); } // Wait for the feed processor to receive and process the documents. - waitToReceiveDocuments(receivedDocuments, 10 * CHANGE_FEED_PROCESSOR_TIMEOUT, 2 * FEED_COUNT); + waitToReceiveDocuments(receivedDocuments, 30 * CHANGE_FEED_PROCESSOR_TIMEOUT, 2 * FEED_COUNT); - assertThat(changeFeedProcessorSecond.isStarted()).as("Change Feed Processor instance is running").isTrue(); + assertThat(changeFeedProcessorFirst.isStarted()).as("Change Feed Processor instance is running").isTrue(); + + Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - changeFeedProcessorSecond.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); changeFeedProcessorFirst.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); // Wait for the feed processor to shutdown. @@ -870,7 +842,7 @@ private void waitToReceiveDocuments(Map receivedDocuments, lon Thread.sleep(100); } - assertThat(remainingWork >= 0).as("Failed to receive all the feed documents").isTrue(); + assertThat(remainingWork > 0).as("Failed to receive all the feed documents").isTrue(); } private Consumer> leasesChangeFeedProcessorHandler(LeaseStateMonitor leaseStateMonitor) { From 5d744efe46319495f1dc291fc48c15ad591275b0 Mon Sep 17 00:00:00 2001 From: milismsft-mac Date: Tue, 21 Dec 2021 12:20:38 -0800 Subject: [PATCH 2/5] ChangeFeedProcessor related changes When the renewer task is cancelled, we also need to close the processor task even if we are about to check-point the current state. The change also ensures that any exceptions are prioritized accordingly. Added more logs to CFP to allow for better monitoring of the current processing. Update CFP test in order to simplify the logic. --- .../changefeed/LeaseCheckpointer.java | 2 +- .../changefeed/LeaseStoreManager.java | 3 +- .../changefeed/PartitionCheckpointer.java | 8 + .../implementation/LeaseStoreManagerImpl.java | 53 +++--- .../PartitionCheckpointerImpl.java | 16 +- .../PartitionProcessorFactoryImpl.java | 2 +- .../PartitionProcessorImpl.java | 30 +++- .../PartitionSupervisorImpl.java | 20 ++- .../cosmos/rx/ChangeFeedProcessorTest.java | 166 ++++++++---------- 9 files changed, 159 insertions(+), 141 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java index cd16a7da0aae0..bb3c32e2e18fc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java @@ -17,5 +17,5 @@ public interface LeaseCheckpointer { * @param continuationToken the continuation token. * @return the updated renewed lease. */ - Mono checkpoint(Lease lease, String continuationToken); + Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java index f5cba3a4d9e9a..509d2fa09785f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseStoreManager.java @@ -100,9 +100,10 @@ static LeaseStoreManagerBuilderDefinition builder() { * * @param lease the Lease to renew. * @param continuationToken the continuation token. + * @param cancellationToken the cancellation token. * @return the updated renewed lease. */ - Mono checkpoint(Lease lease, String continuationToken); + Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken); /** * @return true if the lease store is initialized. diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java index 8b16bae7d896e..f4d1befef5055 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionCheckpointer.java @@ -16,4 +16,12 @@ public interface PartitionCheckpointer { * @return a deferred operation of this call. */ Mono checkpointPartition(ChangeFeedState continuationState); + + /** + * Sets the cancellation token in case we need to bail out before check-pointing. + * + * @param cancellationToken the cancellation token. + * @return this instance. + */ + PartitionCheckpointer setCancellationToken(CancellationToken cancellationToken); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java index c092f786a3d64..23d59eab77d12 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java @@ -6,12 +6,9 @@ import com.azure.cosmos.CosmosAsyncContainer; import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.InternalObjectNode; -import com.azure.cosmos.models.FeedResponse; -import com.azure.cosmos.models.ModelBridgeInternal; -import com.azure.cosmos.models.PartitionKey; -import com.azure.cosmos.models.SqlParameter; -import com.azure.cosmos.models.SqlQuerySpec; +import com.azure.cosmos.implementation.changefeed.CancellationToken; import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient; +import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException; import com.azure.cosmos.implementation.changefeed.Lease; import com.azure.cosmos.implementation.changefeed.LeaseStore; import com.azure.cosmos.implementation.changefeed.LeaseStoreManager; @@ -20,6 +17,11 @@ import com.azure.cosmos.implementation.changefeed.ServiceItemLease; import com.azure.cosmos.implementation.changefeed.ServiceItemLeaseUpdater; import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.ModelBridgeInternal; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.SqlParameter; +import com.azure.cosmos.models.SqlQuerySpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -355,7 +357,7 @@ public Mono updateProperties(Lease lease) { } @Override - public Mono checkpoint(Lease lease, String continuationToken) { + public Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken) { if (lease == null) { throw new IllegalArgumentException("lease"); } @@ -364,28 +366,33 @@ public Mono checkpoint(Lease lease, String continuationToken) { throw new IllegalArgumentException("continuationToken must be a non-empty string"); } + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + return this.leaseDocumentClient.readItem(lease.getId(), new PartitionKey(lease.getId()), this.requestOptionsFactory.createItemRequestOptions(lease), InternalObjectNode.class) .map( documentResourceResponse -> ServiceItemLease.fromDocument(BridgeInternal.getProperties(documentResourceResponse))) - .flatMap( refreshedLease -> this.leaseUpdater.updateLease( - refreshedLease, - lease.getId(), new PartitionKey(lease.getId()), - this.requestOptionsFactory.createItemRequestOptions(lease), - serverLease -> { - if (serverLease.getOwner() == null) { - logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken()); - throw new LeaseLostException(lease); - } - else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { - logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); - throw new LeaseLostException(lease); - } - serverLease.setContinuationToken(continuationToken); - - return serverLease; - })) + .flatMap( refreshedLease -> { + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + + return this.leaseUpdater.updateLease( + refreshedLease, + lease.getId(), new PartitionKey(lease.getId()), + this.requestOptionsFactory.createItemRequestOptions(lease), + serverLease -> { + if (serverLease.getOwner() == null) { + logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken()); + throw new LeaseLostException(lease); + } else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) { + logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner()); + throw new LeaseLostException(lease); + } + serverLease.setContinuationToken(continuationToken); + + return serverLease; + }); + }) .doOnError(throwable -> { logger.info("Partition {} lease with token '{}' failed to checkpoint for owner '{}' with continuation token '{}'", lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner(), lease.getContinuationToken()); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java index b6ef9f4858ad4..dce2d084fe22c 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionCheckpointerImpl.java @@ -2,11 +2,14 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation.changefeed.implementation; +import com.azure.cosmos.implementation.changefeed.CancellationToken; import com.azure.cosmos.implementation.changefeed.Lease; import com.azure.cosmos.implementation.changefeed.LeaseCheckpointer; import com.azure.cosmos.implementation.changefeed.PartitionCheckpointer; +import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; @@ -19,6 +22,7 @@ class PartitionCheckpointerImpl implements PartitionCheckpointer { private final Logger logger = LoggerFactory.getLogger(PartitionCheckpointerImpl.class); private final LeaseCheckpointer leaseCheckpointer; private Lease lease; + private CancellationToken cancellationToken; public PartitionCheckpointerImpl(LeaseCheckpointer leaseCheckpointer, Lease lease) { this.leaseCheckpointer = leaseCheckpointer; @@ -31,13 +35,23 @@ public Mono checkpointPartition(ChangeFeedState continuationState) { checkArgument( continuationState.getContinuation().getContinuationTokenCount() == 1, "For ChangeFeedProcessor the continuation state should always have one range/continuation"); + + if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException()); + return this.leaseCheckpointer.checkpoint( this.lease, - continuationState.getContinuation().getCurrentContinuationToken().getToken()) + continuationState.getContinuation().getCurrentContinuationToken().getToken(), + cancellationToken) .map(lease1 -> { this.lease = lease1; logger.info("Checkpoint: partition {}, new continuation {}", this.lease.getLeaseToken(), this.lease.getContinuationToken()); return lease1; }); } + + @Override + public PartitionCheckpointer setCancellationToken(CancellationToken cancellationToken) { + this.cancellationToken = cancellationToken; + return this; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java index 1366651d57433..0c6f82270c408 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorFactoryImpl.java @@ -113,6 +113,6 @@ public PartitionProcessor create(Lease lease, ChangeFeedObserver observer) { .withMaxItemCount(this.changeFeedProcessorOptions.getMaxItemCount()); PartitionCheckpointer checkpointer = new PartitionCheckpointerImpl(this.leaseCheckpointer, lease); - return new PartitionProcessorImpl(observer, this.documentClient, settings, checkpointer); + return new PartitionProcessorImpl(observer, this.documentClient, settings, checkpointer, lease); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java index b13dbdaff1bed..7b9bb47cdc3ed 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java @@ -4,6 +4,7 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.implementation.CosmosSchedulers; +import com.azure.cosmos.implementation.changefeed.Lease; import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl; import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; @@ -45,6 +46,7 @@ class PartitionProcessorImpl implements PartitionProcessor { private final ChangeFeedObserver observer; private volatile CosmosChangeFeedRequestOptions options; private final ChangeFeedContextClient documentClient; + private final Lease lease; private volatile RuntimeException resultException; private volatile String lastServerContinuationToken; @@ -53,11 +55,13 @@ class PartitionProcessorImpl implements PartitionProcessor { public PartitionProcessorImpl(ChangeFeedObserver observer, ChangeFeedContextClient documentClient, ProcessorSettings settings, - PartitionCheckpointer checkpointer) { + PartitionCheckpointer checkpointer, + Lease lease) { this.observer = observer; this.documentClient = documentClient; this.settings = settings; this.checkpointer = checkpointer; + this.lease = lease; ChangeFeedState state = settings.getStartState(); this.options = ModelBridgeInternal.createChangeFeedRequestOptionsForChangeFeedState(state); @@ -66,7 +70,9 @@ public PartitionProcessorImpl(ChangeFeedObserver observer, @Override public Mono run(CancellationToken cancellationToken) { + logger.info("Partition {}: processing task started with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner()); this.isFirstQueryForChangeFeeds = true; + this.checkpointer.setCancellationToken(cancellationToken); return Flux.just(this) .flatMap( value -> { @@ -109,6 +115,7 @@ public Mono run(CancellationToken cancellationToken) { .getToken(); if (documentFeedResponse.getResults() != null && documentFeedResponse.getResults().size() > 0) { + logger.info("Partition {}: processing {} feeds with owner {}.", this.lease.getLeaseToken(), documentFeedResponse.getResults().size(), this.lease.getOwner()); return this.dispatchChanges(documentFeedResponse, continuationState) .doOnError(throwable -> logger.debug( "Exception was thrown from thread {}", @@ -145,8 +152,8 @@ public Mono run(CancellationToken cancellationToken) { // we know it is a terminal event. CosmosException clientException = (CosmosException) throwable; - logger.warn("CosmosException: FeedRange {} from thread {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId(), clientException); + logger.warn("CosmosException: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), clientException); StatusCodeErrorType docDbError = ExceptionClassifier.classifyClientException(clientException); switch (docDbError) { @@ -199,15 +206,16 @@ public Mono run(CancellationToken cancellationToken) { } } } else if (throwable instanceof LeaseLostException) { - logger.info("LeaseLoseException with FeedRange {} from thread {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId()); + logger.info("LeaseLoseException with Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner()); this.resultException = (LeaseLostException) throwable; } else if (throwable instanceof TaskCancelledException) { - logger.debug("Task cancelled exception: FeedRange {} from {}", - this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId(), throwable); + logger.debug("Task cancelled exception: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable); this.resultException = (TaskCancelledException) throwable; } else { - logger.warn("Unexpected exception from thread {}", Thread.currentThread().getId(), throwable); + logger.warn("Unexpected exception: Partition {} from thread {} with owner {}", + this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable); this.resultException = new RuntimeException(throwable); } return Flux.error(throwable); @@ -226,7 +234,11 @@ public Mono run(CancellationToken cancellationToken) { } return Flux.empty(); - }).then(); + }) + .then() + .doFinally( any -> { + logger.info("Partition {}: processing task exited with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner()); + }); } private FeedRangePartitionKeyRangeImpl getPkRangeFeedRangeFromStartState() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java index 31827d6dc0b4d..3f65384ca451f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java @@ -78,18 +78,22 @@ private Mono afterRun(ChangeFeedObserverContext context, CancellationToken this.childShutdownCts.cancel(); - if (this.processor.getResultException() != null) { - throw this.processor.getResultException(); - } - - if (this.renewer.getResultException() != null) { - throw this.renewer.getResultException(); - } - closeReason = shutdownToken.isCancellationRequested() ? ChangeFeedObserverCloseReason.SHUTDOWN : ChangeFeedObserverCloseReason.UNKNOWN; + RuntimeException workerException = this.processor.getResultException(); + + // Priority must be given to any exception from the processor worker unless it is a task being cancelled. + if (workerException == null || workerException instanceof TaskCancelledException) { + if (this.renewer.getResultException() != null) { + workerException = this.renewer.getResultException(); + } + } + + if (workerException != null) { + throw workerException; + } } catch (LeaseLostException llex) { closeReason = ChangeFeedObserverCloseReason.LEASE_LOST; this.resultException = llex; diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java index aeb4444743634..8ac5ed32daff0 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java @@ -2,7 +2,6 @@ // Licensed under the MIT License. package com.azure.cosmos.rx; -import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.ChangeFeedProcessor; import com.azure.cosmos.ChangeFeedProcessorBuilder; import com.azure.cosmos.implementation.AsyncDocumentClient; @@ -15,6 +14,7 @@ import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.SqlParameter; @@ -449,11 +449,10 @@ public void staledLeaseAcquiring() throws InterruptedException { ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); leaseDocument.setOwner("TEMP_OWNER"); CosmosItemRequestOptions options = new CosmosItemRequestOptions(); - return createdLeaseCollection.replaceItem(doc, doc.getId(), new PartitionKey(doc.getId()), options) - .map(itemResponse -> BridgeInternal.getProperties(itemResponse)); - }) - .map(ServiceItemLease::fromDocument) - .map(leaseDocument -> { + return createdLeaseCollection.replaceItem(leaseDocument, leaseDocument.getId(), new PartitionKey(leaseDocument.getId()), options) + .map(CosmosItemResponse::getItem); + }) + .map(leaseDocument -> { ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; found host {}", leaseDocument.getOwner()); return leaseDocument; }) @@ -511,7 +510,6 @@ public void staledLeaseAcquiring() throws InterruptedException { @Test(groups = { "emulator" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT) public void ownerNullAcquiring() throws InterruptedException { final String ownerFirst = "Owner_First"; - final String ownerSecond = "Owner_Second"; final String leasePrefix = "TEST"; CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT); CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); @@ -519,55 +517,34 @@ public void ownerNullAcquiring() throws InterruptedException { try { List createdDocuments = new ArrayList<>(); Map receivedDocuments = new ConcurrentHashMap<>(); - setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, FEED_COUNT); ChangeFeedProcessor changeFeedProcessorFirst = new ChangeFeedProcessorBuilder() .hostName(ownerFirst) .handleChanges(docs -> { ChangeFeedProcessorTest.log.info("START processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); - try { - Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted exception", e); - } - ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); - }) - .feedContainer(createdFeedCollection) - .leaseContainer(createdLeaseCollection) - .options(new ChangeFeedProcessorOptions() - .setLeasePrefix(leasePrefix) - .setLeaseRenewInterval(Duration.ofSeconds(1)) - .setLeaseAcquireInterval(Duration.ofSeconds(2)) - .setLeaseExpirationInterval(Duration.ofSeconds(20)) - .setFeedPollDelay(Duration.ofSeconds(1)) - ) - .buildChangeFeedProcessor(); - - ChangeFeedProcessor changeFeedProcessorSecond = new ChangeFeedProcessorBuilder() - .hostName(ownerSecond) - .handleChanges((List docs) -> { - ChangeFeedProcessorTest.log.info("START processing from thread {} using host {}", Thread.currentThread().getId(), ownerSecond); for (JsonNode item : docs) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignored) { + } processItem(item, receivedDocuments); } - ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerSecond); + ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst); }) .feedContainer(createdFeedCollection) .leaseContainer(createdLeaseCollection) .options(new ChangeFeedProcessorOptions() - .setLeaseRenewInterval(Duration.ofSeconds(10)) - .setLeaseAcquireInterval(Duration.ofSeconds(5)) - .setLeaseExpirationInterval(Duration.ofSeconds(20)) - .setFeedPollDelay(Duration.ofSeconds(2)) - .setLeasePrefix(leasePrefix) - .setMaxItemCount(10) .setStartFromBeginning(true) - .setMaxScaleCount(0) // unlimited + .setLeasePrefix(leasePrefix) + .setLeaseRenewInterval(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setLeaseAcquireInterval(Duration.ofMillis(5 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setLeaseExpirationInterval(Duration.ofMillis(6 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .setFeedPollDelay(Duration.ofSeconds(5)) ) .buildChangeFeedProcessor(); try { - ChangeFeedProcessorTest.log.info("Start creating documents"); + ChangeFeedProcessorTest.log.info("Start more creating documents"); List docDefList = new ArrayList<>(); for (int i = 0; i < FEED_COUNT; i++) { @@ -583,59 +560,53 @@ public void ownerNullAcquiring() throws InterruptedException { }) .then( Mono.just(changeFeedProcessorFirst) - .flatMap( value -> { - try { - Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted exception", e); - } - ChangeFeedProcessorTest.log.info("Update leases for Change feed processor in thread {} using host {}", Thread.currentThread().getId(), "Owner_first"); - - SqlParameter param1 = new SqlParameter(); - param1.setName("@PartitionLeasePrefix"); - param1.setValue(leasePrefix); - SqlParameter param2 = new SqlParameter(); - param2.setName("@Owner"); - param2.setValue(ownerFirst); - - SqlQuerySpec querySpec = new SqlQuerySpec( - "SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix) AND c.Owner=@Owner", Arrays.asList(param1, param2)); - - CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); - - return createdLeaseCollection.queryItems(querySpec, cosmosQueryRequestOptions, InternalObjectNode.class).byPage() - .flatMap(documentFeedResponse -> reactor.core.publisher.Flux.fromIterable(documentFeedResponse.getResults())) - .flatMap(doc -> { - ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); - leaseDocument.setOwner(null); - CosmosItemRequestOptions options = new CosmosItemRequestOptions(); - return createdLeaseCollection.replaceItem(doc, doc.getId(), new PartitionKey(doc.getId()), options) - .map(itemResponse -> BridgeInternal.getProperties(itemResponse)); - }) - .map(ServiceItemLease::fromDocument) - .map(leaseDocument -> { - ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; setting host to '{}'", leaseDocument.getOwner()); - return leaseDocument; - }) - .last() - .flatMap(leaseDocument -> { - ChangeFeedProcessorTest.log.info("Start creating documents"); - List docDefList1 = new ArrayList<>(); - - for (int i = 0; i < FEED_COUNT; i++) { - docDefList1.add(getDocumentDefinition()); - } - - return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) - .last() - .delayElement(Duration.ofMillis(1000)) - .flatMap(cosmosItemResponse -> { - ChangeFeedProcessorTest.log.info("Start second Change feed processor"); - return changeFeedProcessorSecond.start().subscribeOn(Schedulers.elastic()) - .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)); - }); - }); - })) + .flatMap( value -> { + ChangeFeedProcessorTest.log.info("Update leases for Change feed processor in thread {} using host {}", Thread.currentThread().getId(), "Owner_first"); + try { + Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT); + } catch (InterruptedException ignored) { + } + + ChangeFeedProcessorTest.log.info("QueryItems before Change feed processor processing"); + + SqlParameter param1 = new SqlParameter(); + param1.setName("@PartitionLeasePrefix"); + param1.setValue(leasePrefix); + SqlParameter param2 = new SqlParameter(); + param2.setName("@Owner"); + param2.setValue(ownerFirst); + + SqlQuerySpec querySpec = new SqlQuerySpec( + "SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix) AND c.Owner=@Owner", Arrays.asList(param1, param2)); + + CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions(); + + return createdLeaseCollection.queryItems(querySpec, cosmosQueryRequestOptions, InternalObjectNode.class).byPage() + .flatMap(documentFeedResponse -> reactor.core.publisher.Flux.fromIterable(documentFeedResponse.getResults())) + .flatMap(doc -> { + ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); + leaseDocument.setOwner(null); + CosmosItemRequestOptions options = new CosmosItemRequestOptions(); + return createdLeaseCollection.replaceItem(leaseDocument, leaseDocument.getId(), new PartitionKey(leaseDocument.getId()), options) + .map(CosmosItemResponse::getItem); + }) + .map(leaseDocument -> { + ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; current Owner is'{}'", leaseDocument.getOwner()); + return leaseDocument; + }) + .last() + .flatMap(leaseDocument -> { + ChangeFeedProcessorTest.log.info("Start creating more documents"); + List docDefList1 = new ArrayList<>(); + + for (int i = 0; i < FEED_COUNT; i++) { + docDefList1.add(getDocumentDefinition()); + } + + return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT) + .last(); + }); + })) .subscribe(); } catch (Exception ex) { log.error("First change feed processor did not start in the expected time", ex); @@ -643,17 +614,18 @@ public void ownerNullAcquiring() throws InterruptedException { } long remainingWork = 20 * CHANGE_FEED_PROCESSOR_TIMEOUT; - while (remainingWork > 0 && changeFeedProcessorFirst.isStarted() && changeFeedProcessorSecond.isStarted()) { + while (remainingWork > 0 && !changeFeedProcessorFirst.isStarted()) { remainingWork -= 100; Thread.sleep(100); } // Wait for the feed processor to receive and process the documents. - waitToReceiveDocuments(receivedDocuments, 10 * CHANGE_FEED_PROCESSOR_TIMEOUT, 2 * FEED_COUNT); + waitToReceiveDocuments(receivedDocuments, 30 * CHANGE_FEED_PROCESSOR_TIMEOUT, 2 * FEED_COUNT); - assertThat(changeFeedProcessorSecond.isStarted()).as("Change Feed Processor instance is running").isTrue(); + assertThat(changeFeedProcessorFirst.isStarted()).as("Change Feed Processor instance is running").isTrue(); + + Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT); - changeFeedProcessorSecond.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); changeFeedProcessorFirst.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); // Wait for the feed processor to shutdown. @@ -870,7 +842,7 @@ private void waitToReceiveDocuments(Map receivedDocuments, lon Thread.sleep(100); } - assertThat(remainingWork >= 0).as("Failed to receive all the feed documents").isTrue(); + assertThat(remainingWork > 0).as("Failed to receive all the feed documents").isTrue(); } private Consumer> leasesChangeFeedProcessorHandler(LeaseStateMonitor leaseStateMonitor) { From 59a29b8d2ddb1e89913df6627d98deb2ae86fa86 Mon Sep 17 00:00:00 2001 From: milismsft-mac Date: Tue, 21 Dec 2021 12:23:36 -0800 Subject: [PATCH 3/5] Update --- .../java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java index 8ac5ed32daff0..b014dfcc83762 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java @@ -449,9 +449,10 @@ public void staledLeaseAcquiring() throws InterruptedException { ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc); leaseDocument.setOwner("TEMP_OWNER"); CosmosItemRequestOptions options = new CosmosItemRequestOptions(); - return createdLeaseCollection.replaceItem(leaseDocument, leaseDocument.getId(), new PartitionKey(leaseDocument.getId()), options) - .map(CosmosItemResponse::getItem); - }) + + return createdLeaseCollection.replaceItem(leaseDocument, leaseDocument.getId(), new PartitionKey(leaseDocument.getId()), options) + .map(CosmosItemResponse::getItem); + }) .map(leaseDocument -> { ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; found host {}", leaseDocument.getOwner()); return leaseDocument; From 9b24da78568097b39ce0826a910ab948785d410d Mon Sep 17 00:00:00 2001 From: milismsft-mac Date: Tue, 21 Dec 2021 12:50:49 -0800 Subject: [PATCH 4/5] Improve logging when unused or expired leases are detected. --- .../implementation/EqualPartitionsBalancingStrategy.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/EqualPartitionsBalancingStrategy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/EqualPartitionsBalancingStrategy.java index 5b3bec04545e6..9f0ab9288195f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/EqualPartitionsBalancingStrategy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/EqualPartitionsBalancingStrategy.java @@ -24,6 +24,7 @@ class EqualPartitionsBalancingStrategy implements PartitionLoadBalancingStrategy private final int minPartitionCount; private final int maxPartitionCount; private final Duration leaseExpirationInterval; + private volatile int countAssignedLeases; public EqualPartitionsBalancingStrategy(String hostName, int minPartitionCount, int maxPartitionCount, Duration leaseExpirationInterval) { if (hostName == null) { @@ -34,6 +35,7 @@ public EqualPartitionsBalancingStrategy(String hostName, int minPartitionCount, this.minPartitionCount = minPartitionCount; this.maxPartitionCount = maxPartitionCount; this.leaseExpirationInterval = leaseExpirationInterval; + this.countAssignedLeases = 0; } @Override @@ -53,6 +55,7 @@ public List selectLeasesToTake(List allLeases) { int target = this.calculateTargetPartitionCount(partitionCount, workerCount); int myCount = workerToPartitionCount.get(this.hostName); + this.countAssignedLeases = myCount; int partitionsNeededForMe = target - myCount; /* @@ -147,7 +150,8 @@ private void categorizeLeases( allPartitions.put(lease.getLeaseToken(), lease); if (lease.getOwner() == null || lease.getOwner().isEmpty() || this.isExpired(lease)) { - this.logger.info("Found unused or expired lease {}", lease.getLeaseToken()); + this.logger.info("Found unused or expired lease {}; current lease count for instance owner {} is {} and maxScaleCount {} ", + lease.getLeaseToken(), this.hostName, this.countAssignedLeases, this.maxPartitionCount); expiredLeases.add(lease); } else { String assignedTo = lease.getOwner(); From fec2ca61dc523acc2b656cc3d472ccbc33884970 Mon Sep 17 00:00:00 2001 From: milismsft-mac Date: Tue, 21 Dec 2021 13:19:34 -0800 Subject: [PATCH 5/5] updates --- .../cosmos/implementation/changefeed/LeaseCheckpointer.java | 1 + .../changefeed/implementation/PartitionProcessorImpl.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java index bb3c32e2e18fc..4c7f9c5f3e2ed 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/LeaseCheckpointer.java @@ -15,6 +15,7 @@ public interface LeaseCheckpointer { * * @param lease the lease to renew. * @param continuationToken the continuation token. + * @param cancellationToken the cancellation token. * @return the updated renewed lease. */ Mono checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java index 7b9bb47cdc3ed..04d973181256f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.java @@ -208,7 +208,7 @@ public Mono run(CancellationToken cancellationToken) { } else if (throwable instanceof LeaseLostException) { logger.info("LeaseLoseException with Partition {} from thread {} with owner {}", this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner()); - this.resultException = (LeaseLostException) throwable; + this.resultException = (LeaseLostException) throwable; } else if (throwable instanceof TaskCancelledException) { logger.debug("Task cancelled exception: Partition {} from thread {} with owner {}", this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable);