Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions on SlidingExpiration + AbsoluteExpiration #80

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

## <a name="1.6.1"/> 1.6.1 - 2024-03-27

### Fixed

- [#80](https://github.com/Azure/Microsoft.Extensions.Caching.Cosmos/pull/80) Fixed race conditions on SlidingExpiration + AbsoluteExpiration

## <a name="1.6.0"/> 1.6.0 - 2023-11-01

### Added

- [#72](https://github.com/Azure/Microsoft.Extensions.Caching.Cosmos/pull/72) Increased SDK dependency version for critical fixes


## <a name="1.5.0"/> 1.5.0 - 2023-06-22

### Added
Expand Down
35 changes: 34 additions & 1 deletion src/CosmosCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ public byte[] Get(string key)
DateTimeOffset absoluteExpiration = DateTimeOffset.FromUnixTimeSeconds(cosmosCacheSessionResponse.Resource.AbsoluteSlidingExpiration.GetValueOrDefault());
if (absoluteExpiration < DateTimeOffset.UtcNow)
{
cosmosCacheSessionResponse.Resource.TimeToLive = 0;
// At this point the cache item we just read expired, in which case, we should treat it as not found.
// The TTL will clean it up on the container.
return null;
}
else
{
Expand All @@ -148,6 +150,12 @@ public byte[] Get(string key)

this.options.DiagnosticsHandler?.Invoke(replaceCacheSessionResponse.Diagnostics);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
// The cache item has expired in-between the read and replace operations.
this.options.DiagnosticsHandler?.Invoke(ex.Diagnostics);
return null;
}
catch (CosmosException cosmosException) when (cosmosException.StatusCode == HttpStatusCode.PreconditionFailed)
{
this.options.DiagnosticsHandler?.Invoke(cosmosException.Diagnostics);
Expand Down Expand Up @@ -203,6 +211,26 @@ public void Refresh(string key)
{
try
{
if (cosmosCacheSessionResponse.Resource.AbsoluteSlidingExpiration.GetValueOrDefault() > 0)
{
long ttl = cosmosCacheSessionResponse.Resource.TimeToLive.Value;
DateTimeOffset absoluteExpiration = DateTimeOffset.FromUnixTimeSeconds(cosmosCacheSessionResponse.Resource.AbsoluteSlidingExpiration.GetValueOrDefault());
if (absoluteExpiration < DateTimeOffset.UtcNow)
{
// At this point the cache item we just read expired, in which case, we should treat it as not found.
// The TTL will clean it up on the container.
return;
}
else
{
double pendingSeconds = (absoluteExpiration - DateTimeOffset.UtcNow).TotalSeconds;
if (pendingSeconds < ttl)
{
cosmosCacheSessionResponse.Resource.TimeToLive = (long)pendingSeconds;
}
}
}

cosmosCacheSessionResponse.Resource.PartitionKeyAttribute = this.options.ContainerPartitionKeyAttribute;
ItemResponse<CosmosCacheSession> replaceCacheSessionResponse = await this.cosmosContainer.ReplaceItemAsync(
partitionKey: new PartitionKey(key),
Expand All @@ -217,6 +245,11 @@ public void Refresh(string key)

this.options.DiagnosticsHandler?.Invoke(replaceCacheSessionResponse.Diagnostics);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
// The cache item has expired in-between the read and replace operations.
this.options.DiagnosticsHandler?.Invoke(ex.Diagnostics);
}
catch (CosmosException cosmosException) when (cosmosException.StatusCode == HttpStatusCode.PreconditionFailed)
{
// Race condition on replace, we need do not need to refresh it
Expand Down
2 changes: 1 addition & 1 deletion src/CosmosDistributedCache.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<Copyright>© Microsoft Corporation. All rights reserved.</Copyright>
<CurrentDate>$([System.DateTime]::Now.ToString(yyyyMMdd))</CurrentDate>
<NeutralLanguage>en-US</NeutralLanguage>
<ClientVersion>1.6.0</ClientVersion>
<ClientVersion>1.6.1</ClientVersion>
<VersionSuffix Condition=" '$(IsPreview)' == 'true' ">preview</VersionSuffix>
<Version Condition=" '$(VersionSuffix)' == '' ">$(ClientVersion)</Version>
<Version Condition=" '$(VersionSuffix)' != '' ">$(ClientVersion)-$(VersionSuffix)</Version>
Expand Down
80 changes: 80 additions & 0 deletions tests/unit/CosmosCacheTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,86 @@ public async Task ValidatesNoExpirationUsesNullTtl()
mockedContainer.Verify(c => c.UpsertItemAsync<CosmosCacheSession>(It.Is<CosmosCacheSession>(item => item.SessionKey == existingSession.SessionKey && item.TimeToLive == ttl), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
}

[Fact]
public async Task SlidingExpirationWithAbsoluteExpirationOnExpiredRead()
{
const int ttlSliding = 20;
const int ttlAbsolute = 50;
string etag = "etag";
CosmosCacheSession existingSession = new CosmosCacheSession();
existingSession.SessionKey = "key";
existingSession.Content = new byte[0];
existingSession.IsSlidingExpiration = true;
existingSession.TimeToLive = ttlSliding;
existingSession.AbsoluteSlidingExpiration = DateTimeOffset.UtcNow.AddSeconds(-ttlAbsolute).ToUnixTimeSeconds();
Mock<ItemResponse<CosmosCacheSession>> mockedItemResponse = new Mock<ItemResponse<CosmosCacheSession>>();
Mock<CosmosClient> mockedClient = new Mock<CosmosClient>();
Mock<Container> mockedContainer = new Mock<Container>();
Mock<Database> mockedDatabase = new Mock<Database>();
Mock<ContainerResponse> mockedResponse = new Mock<ContainerResponse>();
mockedItemResponse.Setup(c => c.Resource).Returns(existingSession);
mockedItemResponse.Setup(c => c.ETag).Returns(etag);
mockedResponse.Setup(c => c.StatusCode).Returns(HttpStatusCode.OK);
mockedContainer.Setup(c => c.ReadContainerAsync(It.IsAny<ContainerRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedResponse.Object);
mockedContainer.Setup(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedItemResponse.Object);
mockedClient.Setup(c => c.GetContainer(It.IsAny<string>(), It.IsAny<string>())).Returns(mockedContainer.Object);
mockedClient.Setup(c => c.GetDatabase(It.IsAny<string>())).Returns(mockedDatabase.Object);
mockedClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));
CosmosCache cache = new CosmosCache(Options.Create(new CosmosCacheOptions(){
DatabaseName = "something",
ContainerName = "something",
CreateIfNotExists = true,
CosmosClient = mockedClient.Object
}));

Assert.Null(await cache.GetAsync("key"));
// Checks for Db existence due to CreateIfNotExists
mockedClient.Verify(c => c.CreateDatabaseIfNotExistsAsync(It.IsAny<string>(), It.IsAny<int?>(), It.IsAny<RequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
mockedContainer.Verify(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
mockedContainer.Verify(c => c.ReplaceItemAsync<CosmosCacheSession>(It.Is<CosmosCacheSession>(item => item.TimeToLive == ttlSliding), It.Is<string>(id => id == "key"), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Never);
}

[Fact]
public async Task SlidingExpirationWithAbsoluteExpirationOnReplaceNotFound()
{
const int ttlSliding = 20;
const int ttlAbsolute = 50;
string etag = "etag";
CosmosCacheSession existingSession = new CosmosCacheSession();
existingSession.SessionKey = "key";
existingSession.Content = new byte[0];
existingSession.IsSlidingExpiration = true;
existingSession.TimeToLive = ttlSliding;
existingSession.AbsoluteSlidingExpiration = DateTimeOffset.UtcNow.AddSeconds(ttlAbsolute).ToUnixTimeSeconds();
Mock<ItemResponse<CosmosCacheSession>> mockedItemResponse = new Mock<ItemResponse<CosmosCacheSession>>();
Mock<CosmosClient> mockedClient = new Mock<CosmosClient>();
Mock<Container> mockedContainer = new Mock<Container>();
Mock<Database> mockedDatabase = new Mock<Database>();
Mock<ContainerResponse> mockedResponse = new Mock<ContainerResponse>();
mockedItemResponse.Setup(c => c.Resource).Returns(existingSession);
mockedItemResponse.Setup(c => c.ETag).Returns(etag);
mockedResponse.Setup(c => c.StatusCode).Returns(HttpStatusCode.OK);
mockedContainer.Setup(c => c.ReadContainerAsync(It.IsAny<ContainerRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedResponse.Object);
mockedContainer.Setup(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ReturnsAsync(mockedItemResponse.Object);
MockedException exception = new MockedException("test", HttpStatusCode.NotFound, 0, "", 0);
mockedContainer.Setup(c => c.ReplaceItemAsync<CosmosCacheSession>(It.Is<CosmosCacheSession>(item => item == existingSession), It.Is<string>(id => id == "key"), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>())).ThrowsAsync(exception);
mockedClient.Setup(c => c.GetContainer(It.IsAny<string>(), It.IsAny<string>())).Returns(mockedContainer.Object);
mockedClient.Setup(c => c.GetDatabase(It.IsAny<string>())).Returns(mockedDatabase.Object);
mockedClient.Setup(x => x.Endpoint).Returns(new Uri("http://localhost"));
CosmosCache cache = new CosmosCache(Options.Create(new CosmosCacheOptions(){
DatabaseName = "something",
ContainerName = "something",
CreateIfNotExists = true,
CosmosClient = mockedClient.Object
}));

Assert.Null(await cache.GetAsync("key"));
// Checks for Db existence due to CreateIfNotExists
mockedClient.Verify(c => c.CreateDatabaseIfNotExistsAsync(It.IsAny<string>(), It.IsAny<int?>(), It.IsAny<RequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
mockedContainer.Verify(c => c.ReadItemAsync<CosmosCacheSession>(It.Is<string>(id => id == "key"), It.IsAny<PartitionKey>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
mockedContainer.Verify(c => c.ReplaceItemAsync<CosmosCacheSession>(It.Is<CosmosCacheSession>(item => item.TimeToLive == ttlSliding), It.Is<string>(id => id == "key"), It.IsAny<PartitionKey?>(), It.IsAny<ItemRequestOptions>(), It.IsAny<CancellationToken>()), Times.Once);
}

private class DiagnosticsSink
{
private List<CosmosDiagnostics> capturedDiagnostics = new List<CosmosDiagnostics>();
Expand Down