Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunked transaction operation documentation #256

Merged
merged 1 commit into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/Akka.Persistence.Azure/CloudTableExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,19 @@ public static class CloudTableExtensions
{
private const int MaxBatchSize = 100;

/// <summary>
/// <para>
/// Execute a batch transaction to the service. This method automatically chunks the batch request into chunks
/// of 100 items if the batch size is greater than 100.
/// </para>
/// <b>NOTE</b>: This does mean that sending more than 100 items will break atomicity, there is no guarantee
/// that all items in the batch will be executed successfully.
/// </summary>
/// <param name="table">The Azure table client</param>
/// <param name="batch">The list of <see cref="TableTransactionAction"/> items to be sent to the service</param>
/// <param name="token">Cancellation token</param>
/// <returns>List of <see cref="Response"/> for each items</returns>
// TODO Replace this with real transactional execution if Azure Table Storage supports it in the future.
public static async Task<IReadOnlyList<Response>> ExecuteBatchAsLimitedBatches(
this TableClient table,
List<TableTransactionAction> batch,
Expand Down
24 changes: 24 additions & 0 deletions src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,12 @@ protected override async Task DeleteMessagesToAsync(string persistenceId, long t
else
nextTask = null;

// ** Intentional behaviour **
// Send the batch as a chunk of 100 items. This is intentional because Azure Table Storage
// does not support transaction with more than 100 entries.
//
// ExecuteBatchAsLimitedBatches breaks atomicity on any transaction/batch write operations with more than
// 100 entries.
var response = await Table.ExecuteBatchAsLimitedBatches(currentPage.Values
.Select(entity => new TableTransactionAction(TableTransactionActionType.Delete, entity)).ToList(), _shutdownCts.Token);

Expand Down Expand Up @@ -370,6 +376,12 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
if (_log.IsDebugEnabled && _settings.VerboseLogging)
_log.Debug("Attempting to write batch of {0} messages to Azure storage", batchItems.Count);

// ** Intentional behaviour **
// Send the batch as a chunk of 100 items. This is intentional because Azure Table Storage
// does not support transaction with more than 100 entries.
//
// ExecuteBatchAsLimitedBatches breaks atomicity on any transaction/batch write operations with more than
// 100 entries.
var response = await Table.ExecuteBatchAsLimitedBatches(batchItems, _shutdownCts.Token);
if (_log.IsDebugEnabled && _settings.VerboseLogging)
{
Expand Down Expand Up @@ -400,6 +412,12 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
new AllPersistenceIdsEntry(PartitionKeyEscapeHelper.Escape(item.Key)).WriteEntity()));
}

// ** Intentional behaviour **
// Send the batch as a chunk of 100 items. This is intentional because Azure Table Storage
// does not support transaction with more than 100 entries.
//
// ExecuteBatchAsLimitedBatches breaks atomicity on any transaction/batch write operations with more than
// 100 entries.
var allPersistenceResponse = await Table.ExecuteBatchAsLimitedBatches(allPersistenceIdsBatch, _shutdownCts.Token);

if (_log.IsDebugEnabled && _settings.VerboseLogging)
Expand All @@ -422,6 +440,12 @@ protected override async Task<IImmutableList<Exception>> WriteMessagesAsync(IEnu
eventTagsBatch.Add(new TableTransactionAction(TableTransactionActionType.UpsertReplace, item.WriteEntity()));
}

// ** Intentional behaviour **
// Send the batch as a chunk of 100 items. This is intentional because Azure Table Storage
// does not support transaction with more than 100 entries.
//
// ExecuteBatchAsLimitedBatches breaks atomicity on any transaction/batch write operations with more than
// 100 entries.
var eventTagsResponse = await Table.ExecuteBatchAsLimitedBatches(eventTagsBatch, _shutdownCts.Token);

if (_log.IsDebugEnabled && _settings.VerboseLogging)
Expand Down