diff --git a/src/Akka.Persistence.Azure/CloudTableExtensions.cs b/src/Akka.Persistence.Azure/CloudTableExtensions.cs index 6f36dd2..64d852b 100644 --- a/src/Akka.Persistence.Azure/CloudTableExtensions.cs +++ b/src/Akka.Persistence.Azure/CloudTableExtensions.cs @@ -17,6 +17,19 @@ public static class CloudTableExtensions { private const int MaxBatchSize = 100; + /// + /// + /// Execute a batch transaction to the service. This method automatically chunks the batch request into chunks + /// of 100 items if the batch size is greater than 100. + /// + /// NOTE: This does mean that sending more than 100 items will break atomicity, there is no guarantee + /// that all items in the batch will be executed successfully. + /// + /// The Azure table client + /// The list of items to be sent to the service + /// Cancellation token + /// List of for each items + // TODO Replace this with real transactional execution if Azure Table Storage supports it in the future. public static async Task> ExecuteBatchAsLimitedBatches( this TableClient table, List batch, diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs index c1191a2..a119106 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs @@ -229,6 +229,12 @@ protected override async Task DeleteMessagesToAsync(string persistenceId, long t else nextTask = null; + // ** Intentional behaviour ** + // Send the batch as a chunk of 100 items. This is intentional because Azure Table Storage + // does not support transaction with more than 100 entries. + // + // ExecuteBatchAsLimitedBatches breaks atomicity on any transaction/batch write operations with more than + // 100 entries. var response = await Table.ExecuteBatchAsLimitedBatches(currentPage.Values .Select(entity => new TableTransactionAction(TableTransactionActionType.Delete, entity)).ToList(), _shutdownCts.Token); @@ -370,6 +376,12 @@ protected override async Task> WriteMessagesAsync(IEnu if (_log.IsDebugEnabled && _settings.VerboseLogging) _log.Debug("Attempting to write batch of {0} messages to Azure storage", batchItems.Count); + // ** Intentional behaviour ** + // Send the batch as a chunk of 100 items. This is intentional because Azure Table Storage + // does not support transaction with more than 100 entries. + // + // ExecuteBatchAsLimitedBatches breaks atomicity on any transaction/batch write operations with more than + // 100 entries. var response = await Table.ExecuteBatchAsLimitedBatches(batchItems, _shutdownCts.Token); if (_log.IsDebugEnabled && _settings.VerboseLogging) { @@ -400,6 +412,12 @@ protected override async Task> WriteMessagesAsync(IEnu new AllPersistenceIdsEntry(PartitionKeyEscapeHelper.Escape(item.Key)).WriteEntity())); } + // ** Intentional behaviour ** + // Send the batch as a chunk of 100 items. This is intentional because Azure Table Storage + // does not support transaction with more than 100 entries. + // + // ExecuteBatchAsLimitedBatches breaks atomicity on any transaction/batch write operations with more than + // 100 entries. var allPersistenceResponse = await Table.ExecuteBatchAsLimitedBatches(allPersistenceIdsBatch, _shutdownCts.Token); if (_log.IsDebugEnabled && _settings.VerboseLogging) @@ -422,6 +440,12 @@ protected override async Task> WriteMessagesAsync(IEnu eventTagsBatch.Add(new TableTransactionAction(TableTransactionActionType.UpsertReplace, item.WriteEntity())); } + // ** Intentional behaviour ** + // Send the batch as a chunk of 100 items. This is intentional because Azure Table Storage + // does not support transaction with more than 100 entries. + // + // ExecuteBatchAsLimitedBatches breaks atomicity on any transaction/batch write operations with more than + // 100 entries. var eventTagsResponse = await Table.ExecuteBatchAsLimitedBatches(eventTagsBatch, _shutdownCts.Token); if (_log.IsDebugEnabled && _settings.VerboseLogging)