Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CosmosDB: Session token optimization and ChangeFeedProcessor bug fixes #25492

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ private ConcurrentHashMap<String, ISessionToken> getPartitionKeyRangeIdToTokenMa
public String resolveGlobalSessionToken(RxDocumentServiceRequest request) {
ConcurrentHashMap<String, ISessionToken> partitionKeyRangeIdToTokenMap = this.getPartitionKeyRangeIdToTokenMap(request);
if (partitionKeyRangeIdToTokenMap != null) {
String partitionKeyRangeId = request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY_RANGE_ID);
milismsft marked this conversation as resolved.
Show resolved Hide resolved
if (StringUtils.isNotEmpty(partitionKeyRangeId) && partitionKeyRangeIdToTokenMap.get(partitionKeyRangeId) != null) {
StringBuilder result = new StringBuilder()
.append(partitionKeyRangeId).append(":").append(partitionKeyRangeIdToTokenMap.get(partitionKeyRangeId).convertToString());
return result.toString();
}

return SessionContainer.getCombinedSessionToken(partitionKeyRangeIdToTokenMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public interface LeaseCheckpointer {
*
* @param lease the lease to renew.
* @param continuationToken the continuation token.
* @param cancellationToken the cancellation token.
* @return the updated renewed lease.
*/
Mono<Lease> checkpoint(Lease lease, String continuationToken);
Mono<Lease> checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ static LeaseStoreManagerBuilderDefinition builder() {
*
* @param lease the Lease to renew.
* @param continuationToken the continuation token.
* @param cancellationToken the cancellation token.
* @return the updated renewed lease.
*/
Mono<Lease> checkpoint(Lease lease, String continuationToken);
Mono<Lease> checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken);

/**
* @return true if the lease store is initialized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@ public interface PartitionCheckpointer {
* @return a deferred operation of this call.
*/
Mono<Lease> checkpointPartition(ChangeFeedState continuationState);

/**
* Sets the cancellation token in case we need to bail out before check-pointing.
*
* @param cancellationToken the cancellation token.
* @return this instance.
*/
PartitionCheckpointer setCancellationToken(CancellationToken cancellationToken);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class EqualPartitionsBalancingStrategy implements PartitionLoadBalancingStrategy
private final int minPartitionCount;
private final int maxPartitionCount;
private final Duration leaseExpirationInterval;
private volatile int countAssignedLeases;

public EqualPartitionsBalancingStrategy(String hostName, int minPartitionCount, int maxPartitionCount, Duration leaseExpirationInterval) {
if (hostName == null) {
Expand All @@ -34,6 +35,7 @@ public EqualPartitionsBalancingStrategy(String hostName, int minPartitionCount,
this.minPartitionCount = minPartitionCount;
this.maxPartitionCount = maxPartitionCount;
this.leaseExpirationInterval = leaseExpirationInterval;
this.countAssignedLeases = 0;
}

@Override
Expand All @@ -53,6 +55,7 @@ public List<Lease> selectLeasesToTake(List<Lease> allLeases) {

int target = this.calculateTargetPartitionCount(partitionCount, workerCount);
int myCount = workerToPartitionCount.get(this.hostName);
this.countAssignedLeases = myCount;
int partitionsNeededForMe = target - myCount;

/*
Expand Down Expand Up @@ -147,7 +150,8 @@ private void categorizeLeases(
allPartitions.put(lease.getLeaseToken(), lease);

if (lease.getOwner() == null || lease.getOwner().isEmpty() || this.isExpired(lease)) {
this.logger.info("Found unused or expired lease {}", lease.getLeaseToken());
this.logger.info("Found unused or expired lease {}; current lease count for instance owner {} is {} and maxScaleCount {} ",
lease.getLeaseToken(), this.hostName, this.countAssignedLeases, this.maxPartitionCount);
expiredLeases.add(lease);
} else {
String assignedTo = lease.getOwner();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseStore;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
Expand All @@ -20,6 +17,11 @@
import com.azure.cosmos.implementation.changefeed.ServiceItemLease;
import com.azure.cosmos.implementation.changefeed.ServiceItemLeaseUpdater;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.models.FeedResponse;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -355,7 +357,7 @@ public Mono<Lease> updateProperties(Lease lease) {
}

@Override
public Mono<Lease> checkpoint(Lease lease, String continuationToken) {
public Mono<Lease> checkpoint(Lease lease, String continuationToken, CancellationToken cancellationToken) {
if (lease == null) {
throw new IllegalArgumentException("lease");
}
Expand All @@ -364,28 +366,33 @@ public Mono<Lease> checkpoint(Lease lease, String continuationToken) {
throw new IllegalArgumentException("continuationToken must be a non-empty string");
}

if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException());

return this.leaseDocumentClient.readItem(lease.getId(),
new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease),
InternalObjectNode.class)
.map( documentResourceResponse -> ServiceItemLease.fromDocument(BridgeInternal.getProperties(documentResourceResponse)))
.flatMap( refreshedLease -> this.leaseUpdater.updateLease(
refreshedLease,
lease.getId(), new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease),
serverLease -> {
if (serverLease.getOwner() == null) {
logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken());
throw new LeaseLostException(lease);
}
else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
throw new LeaseLostException(lease);
}
serverLease.setContinuationToken(continuationToken);

return serverLease;
}))
.flatMap( refreshedLease -> {
if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException());

return this.leaseUpdater.updateLease(
refreshedLease,
lease.getId(), new PartitionKey(lease.getId()),
this.requestOptionsFactory.createItemRequestOptions(lease),
serverLease -> {
if (serverLease.getOwner() == null) {
logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken());
throw new LeaseLostException(lease);
} else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
throw new LeaseLostException(lease);
}
serverLease.setContinuationToken(continuationToken);

return serverLease;
});
})
.doOnError(throwable -> {
logger.info("Partition {} lease with token '{}' failed to checkpoint for owner '{}' with continuation token '{}'",
lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner(), lease.getContinuationToken());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
// Licensed under the MIT License.
package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseCheckpointer;
import com.azure.cosmos.implementation.changefeed.PartitionCheckpointer;
import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
Expand All @@ -19,6 +22,7 @@ class PartitionCheckpointerImpl implements PartitionCheckpointer {
private final Logger logger = LoggerFactory.getLogger(PartitionCheckpointerImpl.class);
private final LeaseCheckpointer leaseCheckpointer;
private Lease lease;
private CancellationToken cancellationToken;

public PartitionCheckpointerImpl(LeaseCheckpointer leaseCheckpointer, Lease lease) {
this.leaseCheckpointer = leaseCheckpointer;
Expand All @@ -31,13 +35,23 @@ public Mono<Lease> checkpointPartition(ChangeFeedState continuationState) {
checkArgument(
continuationState.getContinuation().getContinuationTokenCount() == 1,
"For ChangeFeedProcessor the continuation state should always have one range/continuation");

if (cancellationToken.isCancellationRequested()) return Mono.error(new TaskCancelledException());

return this.leaseCheckpointer.checkpoint(
this.lease,
continuationState.getContinuation().getCurrentContinuationToken().getToken())
continuationState.getContinuation().getCurrentContinuationToken().getToken(),
cancellationToken)
.map(lease1 -> {
this.lease = lease1;
logger.info("Checkpoint: partition {}, new continuation {}", this.lease.getLeaseToken(), this.lease.getContinuationToken());
return lease1;
});
}

@Override
public PartitionCheckpointer setCancellationToken(CancellationToken cancellationToken) {
this.cancellationToken = cancellationToken;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ public PartitionProcessor create(Lease lease, ChangeFeedObserver observer) {
.withMaxItemCount(this.changeFeedProcessorOptions.getMaxItemCount());

PartitionCheckpointer checkpointer = new PartitionCheckpointerImpl(this.leaseCheckpointer, lease);
return new PartitionProcessorImpl(observer, this.documentClient, settings, checkpointer);
return new PartitionProcessorImpl(observer, this.documentClient, settings, checkpointer, lease);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
Expand Down Expand Up @@ -45,6 +46,7 @@ class PartitionProcessorImpl implements PartitionProcessor {
private final ChangeFeedObserver observer;
private volatile CosmosChangeFeedRequestOptions options;
private final ChangeFeedContextClient documentClient;
private final Lease lease;
private volatile RuntimeException resultException;

private volatile String lastServerContinuationToken;
Expand All @@ -53,11 +55,13 @@ class PartitionProcessorImpl implements PartitionProcessor {
public PartitionProcessorImpl(ChangeFeedObserver observer,
ChangeFeedContextClient documentClient,
ProcessorSettings settings,
PartitionCheckpointer checkpointer) {
PartitionCheckpointer checkpointer,
Lease lease) {
this.observer = observer;
this.documentClient = documentClient;
this.settings = settings;
this.checkpointer = checkpointer;
this.lease = lease;

ChangeFeedState state = settings.getStartState();
this.options = ModelBridgeInternal.createChangeFeedRequestOptionsForChangeFeedState(state);
Expand All @@ -66,7 +70,9 @@ public PartitionProcessorImpl(ChangeFeedObserver observer,

@Override
public Mono<Void> run(CancellationToken cancellationToken) {
logger.info("Partition {}: processing task started with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner());
this.isFirstQueryForChangeFeeds = true;
this.checkpointer.setCancellationToken(cancellationToken);

return Flux.just(this)
.flatMap( value -> {
Expand Down Expand Up @@ -109,6 +115,7 @@ public Mono<Void> run(CancellationToken cancellationToken) {
.getToken();

if (documentFeedResponse.getResults() != null && documentFeedResponse.getResults().size() > 0) {
logger.info("Partition {}: processing {} feeds with owner {}.", this.lease.getLeaseToken(), documentFeedResponse.getResults().size(), this.lease.getOwner());
return this.dispatchChanges(documentFeedResponse, continuationState)
.doOnError(throwable -> logger.debug(
"Exception was thrown from thread {}",
Expand Down Expand Up @@ -145,8 +152,8 @@ public Mono<Void> run(CancellationToken cancellationToken) {
// we know it is a terminal event.

CosmosException clientException = (CosmosException) throwable;
logger.warn("CosmosException: FeedRange {} from thread {}",
this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId(), clientException);
logger.warn("CosmosException: Partition {} from thread {} with owner {}",
this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), clientException);
StatusCodeErrorType docDbError = ExceptionClassifier.classifyClientException(clientException);

switch (docDbError) {
Expand Down Expand Up @@ -199,15 +206,16 @@ public Mono<Void> run(CancellationToken cancellationToken) {
}
}
} else if (throwable instanceof LeaseLostException) {
logger.info("LeaseLoseException with FeedRange {} from thread {}",
this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId());
this.resultException = (LeaseLostException) throwable;
logger.info("LeaseLoseException with Partition {} from thread {} with owner {}",
this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner());
this.resultException = (LeaseLostException) throwable;
} else if (throwable instanceof TaskCancelledException) {
logger.debug("Task cancelled exception: FeedRange {} from {}",
this.settings.getStartState().getFeedRange().toString(), Thread.currentThread().getId(), throwable);
logger.debug("Task cancelled exception: Partition {} from thread {} with owner {}",
this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable);
this.resultException = (TaskCancelledException) throwable;
} else {
logger.warn("Unexpected exception from thread {}", Thread.currentThread().getId(), throwable);
logger.warn("Unexpected exception: Partition {} from thread {} with owner {}",
this.lease.getLeaseToken(), Thread.currentThread().getId(), this.lease.getOwner(), throwable);
this.resultException = new RuntimeException(throwable);
}
return Flux.error(throwable);
Expand All @@ -226,7 +234,11 @@ public Mono<Void> run(CancellationToken cancellationToken) {
}

return Flux.empty();
}).then();
})
.then()
.doFinally( any -> {
logger.info("Partition {}: processing task exited with owner {}.", this.lease.getLeaseToken(), this.lease.getOwner());
});
}

private FeedRangePartitionKeyRangeImpl getPkRangeFeedRangeFromStartState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,22 @@ private Mono<Void> afterRun(ChangeFeedObserverContext context, CancellationToken

this.childShutdownCts.cancel();

if (this.processor.getResultException() != null) {
throw this.processor.getResultException();
}

if (this.renewer.getResultException() != null) {
throw this.renewer.getResultException();
}

closeReason = shutdownToken.isCancellationRequested() ?
ChangeFeedObserverCloseReason.SHUTDOWN :
ChangeFeedObserverCloseReason.UNKNOWN;

RuntimeException workerException = this.processor.getResultException();

// Priority must be given to any exception from the processor worker unless it is a task being cancelled.
if (workerException == null || workerException instanceof TaskCancelledException) {
if (this.renewer.getResultException() != null) {
workerException = this.renewer.getResultException();
}
}

if (workerException != null) {
throw workerException;
}
} catch (LeaseLostException llex) {
closeReason = ChangeFeedObserverCloseReason.LEASE_LOST;
this.resultException = llex;
Expand Down
Loading