Skip to content

Commit

Permalink
Re-add chunked transaction batching support (#254)
Browse files Browse the repository at this point in the history
* Re-add chunked transaction batching support

* Add unit tests

* Fix unit test
  • Loading branch information
Arkatufus authored Sep 27, 2022
1 parent d1baecf commit ac47852
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 12 deletions.
93 changes: 93 additions & 0 deletions src/Akka.Persistence.Azure.Tests/LimitedBatchSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// -----------------------------------------------------------------------
// <copyright file="LimitedBatchSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Persistence.Azure.Tests.Helper;
using Azure.Data.Tables;
using FluentAssertions;
using FluentAssertions.Extensions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Persistence.Azure.Tests
{
public class LimitedBatchSpec: IAsyncLifetime
{
private readonly TableClient _tableClient;

public LimitedBatchSpec(ITestOutputHelper output)
{
_tableClient = new TableClient("UseDevelopmentStorage=true", "testtable");
}

public async Task InitializeAsync()
{
await DbUtils.CleanupCloudTable("UseDevelopmentStorage=true");
await _tableClient.CreateAsync();
}

public Task DisposeAsync()
{
return Task.CompletedTask;
}

[Fact(DisplayName = "Limited batch with 0 entries should return empty list")]
public async Task ZeroEntriesTest()
{
using var cts = new CancellationTokenSource(3.Seconds());
var result = await _tableClient.ExecuteBatchAsLimitedBatches(new List<TableTransactionAction>(), cts.Token);
result.Count.Should().Be(0);

var entities = await _tableClient.QueryAsync<TableEntity>("PartitionKey eq 'test'", null, null, cts.Token)
.ToListAsync(cts.Token);
entities.Count.Should().Be(0);
}

[Fact(DisplayName = "Limited batch with less than 100 entries should work")]
public async Task FewEntriesTest()
{
var entries = Enumerable.Range(1, 50)
.Select(i => new TableTransactionAction(TableTransactionActionType.Add, new TableEntity
{
PartitionKey = "test",
RowKey = i.ToString("D8")
})).ToList();

using var cts = new CancellationTokenSource(3.Seconds());
var result = await _tableClient.ExecuteBatchAsLimitedBatches(entries, cts.Token);
result.Count.Should().Be(50);

var entities = await _tableClient.QueryAsync<TableEntity>("PartitionKey eq 'test'", null, null, cts.Token)
.ToListAsync(cts.Token);
entities.Count.Should().Be(50);
entities.Select(e => int.Parse(e.RowKey.TrimStart('0'))).Should().BeEquivalentTo(Enumerable.Range(1, 50));
}

[Fact(DisplayName = "Limited batch with more than 100 entries should work")]
public async Task LotsEntriesTest()
{
var entries = Enumerable.Range(1, 505)
.Select(i => new TableTransactionAction(TableTransactionActionType.Add, new TableEntity
{
PartitionKey = "test",
RowKey = i.ToString("D8")
})).ToList();

using var cts = new CancellationTokenSource(3.Seconds());
var result = await _tableClient.ExecuteBatchAsLimitedBatches(entries, cts.Token);
result.Count.Should().Be(505);

var entities = await _tableClient.QueryAsync<TableEntity>("PartitionKey eq 'test'", null, null, cts.Token)
.ToListAsync(cts.Token);
entities.Count.Should().Be(505);
entities.Select(e => int.Parse(e.RowKey.TrimStart('0'))).Should().BeEquivalentTo(Enumerable.Range(1, 505));
}
}
}
8 changes: 5 additions & 3 deletions src/Akka.Persistence.Azure/CloudTableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Data.Tables;
Expand All @@ -18,20 +19,21 @@ public static class CloudTableExtensions

public static async Task<IReadOnlyList<Response>> ExecuteBatchAsLimitedBatches(
this TableClient table,
List<TableTransactionAction> batch)
List<TableTransactionAction> batch,
CancellationToken token)
{
if (batch.Count < 1)
return ImmutableList<Response>.Empty;

if (batch.Count <= MaxBatchSize)
return (await table.SubmitTransactionAsync(batch)).Value;
return (await table.SubmitTransactionAsync(batch, token)).Value;

var result = new List<Response>();
var limitedBatchOperationLists = batch.ChunkBy(MaxBatchSize);

foreach (var limitedBatchOperationList in limitedBatchOperationLists)
{
var limitedBatchResponse = await table.SubmitTransactionAsync(limitedBatchOperationList);
var limitedBatchResponse = await table.SubmitTransactionAsync(limitedBatchOperationList, token);
result.AddRange(limitedBatchResponse.Value);
}

Expand Down
23 changes: 14 additions & 9 deletions src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,15 @@ protected override async Task DeleteMessagesToAsync(string persistenceId, long t
else
nextTask = null;

if (currentPage.Values.Count > 0)
var response = await Table.ExecuteBatchAsLimitedBatches(currentPage.Values
.Select(entity => new TableTransactionAction(TableTransactionActionType.Delete, entity)).ToList(), _shutdownCts.Token);

if (_log.IsDebugEnabled && _settings.VerboseLogging)
{
await Table.SubmitTransactionAsync(currentPage.Values
.Select(entity => new TableTransactionAction(TableTransactionActionType.Delete, entity)), _shutdownCts.Token);
foreach (var r in response)
{
_log.Debug("Azure table storage wrote entities with status code [{0}]", r.Status);
}
}
}

Expand Down Expand Up @@ -365,10 +370,10 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
if (_log.IsDebugEnabled && _settings.VerboseLogging)
_log.Debug("Attempting to write batch of {0} messages to Azure storage", batchItems.Count);

var response = await Table.SubmitTransactionAsync(batchItems, _shutdownCts.Token);
var response = await Table.ExecuteBatchAsLimitedBatches(batchItems, _shutdownCts.Token);
if (_log.IsDebugEnabled && _settings.VerboseLogging)
{
foreach (var r in response.Value)
foreach (var r in response)
{
_log.Debug("Azure table storage wrote entities with status code [{0}]", r.Status);
}
Expand All @@ -395,10 +400,10 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
new AllPersistenceIdsEntry(PartitionKeyEscapeHelper.Escape(item.Key)).WriteEntity()));
}

var allPersistenceResponse = await Table.SubmitTransactionAsync(allPersistenceIdsBatch, _shutdownCts.Token);
var allPersistenceResponse = await Table.ExecuteBatchAsLimitedBatches(allPersistenceIdsBatch, _shutdownCts.Token);

if (_log.IsDebugEnabled && _settings.VerboseLogging)
foreach (var r in allPersistenceResponse.Value)
foreach (var r in allPersistenceResponse)
_log.Debug("Azure table storage wrote entity with status code [{0}]", r.Status);

if (HasPersistenceIdSubscribers || HasAllPersistenceIdSubscribers)
Expand All @@ -417,10 +422,10 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
eventTagsBatch.Add(new TableTransactionAction(TableTransactionActionType.UpsertReplace, item.WriteEntity()));
}

var eventTagsResponse = await Table.SubmitTransactionAsync(eventTagsBatch, _shutdownCts.Token);
var eventTagsResponse = await Table.ExecuteBatchAsLimitedBatches(eventTagsBatch, _shutdownCts.Token);

if (_log.IsDebugEnabled && _settings.VerboseLogging)
foreach (var r in eventTagsResponse.Value)
foreach (var r in eventTagsResponse)
_log.Debug("Azure table storage wrote entity with status code [{0}]", r.Status);

if (HasTagSubscribers && taggedEntries.Count != 0)
Expand Down

0 comments on commit ac47852

Please sign in to comment.