diff --git a/README.md b/README.md index d1df18a..918827e 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,49 @@ akka.persistence.snapshot-store.azure-blob-store.connection-string = "Your Azure akka.persistence.snapshot-store.azure-blob-store.container-name = "Your container name" ``` +### Local development mode +You can turn local development mode by changing these two settings: +``` +akka.persistence.journal.azure-table.development = on +akka.persistence.snapshot-store.azure-blob-store.development = on +``` +When set, the plugin will ignore the `connection-string` setting and uses the Azure Storage Emulator default connection string of "UseDevelopmentStorage=true" instead. + +### Configuring snapshots Blob Storage + +#### Auto-initialize blob container + +Blob container auto-initialize behaviour can be changed by changing this flag setting: +``` +# Creates the required container if set +akka.persistence.snapshot-store.azure-blob-store.auto-initialize = on +``` + +#### Container public access type + +Auto-initialized blob container public access type can be controlled by changing this setting: +``` +# Public access level for the auto-initialized storage container. +# Valid values are "None", "BlobContainer" or "Blob" +akka.persistence.snapshot-store.azure-blob-store.container-public-access-type = "None" +``` + +#### DefaultAzureCredential + +`Azure.Identity` `DefaultAzureCredential` can be used to configure the resource by using `AzureBlobSnapshotSetup`. When using `DefaultAzureCredential`, the HOCON 'connection-string' setting is ignored. + +Example: +``` +var blobStorageSetup = AzureBlobSnapshotSetup.Create( + new Uri("https://{account_name}.blob.core.windows.net"), // This is the blob service URI + new DefaultAzureCredential() // You can pass a DefaultAzureCredentialOption here. + // https://docs.microsoft.com/en-us/dotnet/api/azure.identity.defaultazurecredential?view=azure-dotnet +); + +var bootstrap = BootstrapSetup.Create().And(blobStorageSetup); +var system = ActorSystem.Create("actorSystem", bootstrap); +``` + ## Using the plugin in local development environment You can use this plugin with Azure Storage Emulator in a local development environment by setting the development flag in the configuration file: diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index e85efae..068bc7f 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,4 +1,13 @@ -#### 0.7.2 January 08 2021 #### +#### 0.8.0 April 16 2021 #### **Release of Akka.Persistence.Azure** -- Upgraded to [Akka.NET v1.4.14](https://github.com/akkadotnet/akka.net/releases/tag/1.4.14) +- [Problem with implimentation of akka persistence contract](https://github.com/petabridge/Akka.Persistence.Azure/pull/143) +- [Limiting batch size for table storage](https://github.com/petabridge/Akka.Persistence.Azure/pull/145) +- [Bump AkkaVersion from 1.4.14 to 1.4.18](https://github.com/petabridge/Akka.Persistence.Azure/pull/148) +- [Added settings for auto-initialize](https://github.com/petabridge/Akka.Persistence.Azure/pull/150) +- [Upgrade WindowsAzure.Storage to Microsoft.Azure.Cosmos.Table and Azure.Storage.Blobs](https://github.com/petabridge/Akka.Persistence.Azure/pull/151) +- [Added support to configure blob container public access level](https://github.com/petabridge/Akka.Persistence.Azure/pull/152) +- [Change the default public access type of auto-init containers to None](https://github.com/petabridge/Akka.Persistence.Azure/pull/154) +- [Add DefaultAzureIdentity support for snapshot Azure Blob Storage](https://github.com/petabridge/Akka.Persistence.Azure/pull/155) + +See the README for documentation on how to use the latest features in Akka.Persistence.Azure: https://github.com/petabridge/Akka.Persistence.Azure#akkapersistenceazure diff --git a/src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs b/src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs index 531ace1..93ece8c 100644 --- a/src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs +++ b/src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs @@ -1,6 +1,6 @@ using System.Threading.Tasks; using Akka.Actor; -using Microsoft.WindowsAzure.Storage; +using Microsoft.Azure.Cosmos.Table; namespace Akka.Persistence.Azure.TestHelpers { diff --git a/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj b/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj index bf8250e..2a30455 100644 --- a/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj +++ b/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj @@ -7,7 +7,6 @@ - diff --git a/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs b/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs index bd48dc6..2bdc59b 100644 --- a/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzurePersistenceConfigSpec.cs @@ -9,6 +9,7 @@ using Akka.Persistence.Azure.Journal; using Akka.Persistence.Azure.Query; using Akka.Persistence.Azure.Snapshot; +using Azure.Storage.Blobs.Models; using FluentAssertions; using Xunit; @@ -40,6 +41,7 @@ public void ShouldParseDefaultSnapshotConfig() blobSettings.ConnectTimeout.Should().Be(TimeSpan.FromSeconds(3)); blobSettings.RequestTimeout.Should().Be(TimeSpan.FromSeconds(3)); blobSettings.VerboseLogging.Should().BeFalse(); + blobSettings.ContainerPublicAccessType.Should().Be(PublicAccessType.None); } [Fact] @@ -97,7 +99,7 @@ public void ShouldThrowArgumentExceptionForIllegalTableNames(string tableName, s table-name = " + tableName + @" }").WithFallback(AzurePersistence.DefaultConfig) .GetConfig("akka.persistence.journal.azure-table")); - createJournalSettings.ShouldThrow(reason); + createJournalSettings.Should().Throw(reason); } [Theory] @@ -113,7 +115,7 @@ public void ShouldThrowArgumentExceptionForIllegalContainerNames(string containe }").WithFallback(AzurePersistence.DefaultConfig) .GetConfig("akka.persistence.snapshot-store.azure-blob-store")); - createSnapshotSettings.ShouldThrow(reason); + createSnapshotSettings.Should().Throw(reason); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj b/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj index 6cd9c03..461da09 100644 --- a/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj +++ b/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj @@ -5,6 +5,7 @@ $(NetStandardLibVersion) Akka.Persistence support for Windows Azure Table storage and Azure blob storage. + 8.0 @@ -14,7 +15,10 @@ - + + + + \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/CloudTableExtensions.cs b/src/Akka.Persistence.Azure/CloudTableExtensions.cs new file mode 100644 index 0000000..deae510 --- /dev/null +++ b/src/Akka.Persistence.Azure/CloudTableExtensions.cs @@ -0,0 +1,46 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos.Table; + +namespace Akka.Persistence.Azure +{ + public static class CloudTableExtensions + { + private const int MaxBatchSize = 100; + + public static async Task> ExecuteBatchAsLimitedBatches( + this CloudTable table, + TableBatchOperation batch) + { + if (batch.Count < 1) + return new List(); + + if (batch.Count <= MaxBatchSize) + return await table.ExecuteBatchAsync(batch); + + var result = new List(); + var limitedBatchOperationLists = batch.ChunkBy(MaxBatchSize); + + foreach (var limitedBatchOperationList in limitedBatchOperationLists) + { + var limitedBatch = CreateLimitedTableBatchOperation(limitedBatchOperationList); + var limitedBatchResult = await table.ExecuteBatchAsync(limitedBatch); + result.AddRange(limitedBatchResult); + } + + return result; + } + + private static TableBatchOperation CreateLimitedTableBatchOperation( + IEnumerable limitedBatchOperationList) + { + var limitedBatch = new TableBatchOperation(); + foreach (var limitedBatchOperation in limitedBatchOperationList) + { + limitedBatch.Add(limitedBatchOperation); + } + + return limitedBatch; + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs index 18fbc97..4e1cbbf 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs @@ -11,8 +11,6 @@ using Akka.Persistence.Azure.Util; using Akka.Persistence.Journal; using Akka.Util.Internal; -using Microsoft.WindowsAzure.Storage; -using Microsoft.WindowsAzure.Storage.Table; using System; using System.Collections.Generic; using System.Collections.Immutable; @@ -20,6 +18,7 @@ using System.Threading; using System.Threading.Tasks; using Akka.Configuration; +using Microsoft.Azure.Cosmos.Table; using Debug = System.Diagnostics.Debug; namespace Akka.Persistence.Azure.Journal @@ -223,7 +222,7 @@ protected override async Task DeleteMessagesToAsync( tableBatchOperation.Delete(toBeDeleted); } - var deleteTask = Table.ExecuteBatchAsync(tableBatchOperation); + var deleteTask = Table.ExecuteBatchAsLimitedBatches(tableBatchOperation); await deleteTask; } @@ -378,20 +377,24 @@ protected override async Task> WriteMessagesAsync( if (_log.IsDebugEnabled && _settings.VerboseLogging) _log.Debug("Attempting to write batch of {0} messages to Azure storage", persistenceBatch.Count); - var persistenceResults = await Table.ExecuteBatchAsync(persistenceBatch); + var persistenceResults = await Table.ExecuteBatchAsLimitedBatches(persistenceBatch); if (_log.IsDebugEnabled && _settings.VerboseLogging) foreach (var r in persistenceResults) _log.Debug("Azure table storage wrote entity [{0}] with status code [{1}]", r.Etag, r.HttpStatusCode); + + exceptions = exceptions.Add(null); } catch (Exception ex) { + _log.Warning(ex, "Failure while writing messages to Azure table storage"); + exceptions = exceptions.Add(ex); } } } - if (exceptions.IsEmpty) + if (exceptions.All(ex => ex == null)) { var allPersistenceIdsBatch = new TableBatchOperation(); @@ -401,7 +404,7 @@ protected override async Task> WriteMessagesAsync( allPersistenceIdsBatch.InsertOrReplace(new AllPersistenceIdsEntry(encodedKey)); }); - var allPersistenceResults = await Table.ExecuteBatchAsync(allPersistenceIdsBatch); + var allPersistenceResults = await Table.ExecuteBatchAsLimitedBatches(allPersistenceIdsBatch); if (_log.IsDebugEnabled && _settings.VerboseLogging) foreach (var r in allPersistenceResults) @@ -426,7 +429,7 @@ protected override async Task> WriteMessagesAsync( eventTagsBatch.InsertOrReplace(item); } - var eventTagsResults = await Table.ExecuteBatchAsync(eventTagsBatch); + var eventTagsResults = await Table.ExecuteBatchAsLimitedBatches(eventTagsBatch); if (_log.IsDebugEnabled && _settings.VerboseLogging) foreach (var r in eventTagsResults) @@ -439,7 +442,6 @@ protected override async Task> WriteMessagesAsync( NotifyTagChange(tag); } } - } } } @@ -451,7 +453,7 @@ protected override async Task> WriteMessagesAsync( * * Either everything fails or everything succeeds is the idea I guess. */ - return exceptions.IsEmpty ? null : exceptions; + return exceptions.Any(ex => ex != null) ? exceptions : null; } catch (Exception ex) { @@ -709,7 +711,7 @@ private async Task> GetAllPersistenceIds() { var query = GenerateAllPersistenceIdsQuery(); - TableQuerySegment result = null; + TableQuerySegment result = null; var returnValue = ImmutableList.Empty; @@ -734,8 +736,26 @@ private async Task InitCloudStorage( var tableClient = _storageAccount.CreateCloudTableClient(); var tableRef = tableClient.GetTableReference(_settings.TableName); var op = new OperationContext(); + using (var cts = new CancellationTokenSource(_settings.ConnectTimeout)) { + if (!_settings.AutoInitialize) + { + var exists = await tableRef.ExistsAsync(null, null, cts.Token); + + if (!exists) + { + remainingTries = 0; + + throw new Exception( + $"Table {_settings.TableName} doesn't exist. Either create it or turn auto-initialize on"); + } + + _log.Info("Successfully connected to existing table", _settings.TableName); + + return tableRef; + } + if (await tableRef.CreateIfNotExistsAsync(new TableRequestOptions(), op, cts.Token)) _log.Info("Created Azure Cloud Table", _settings.TableName); else diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs index 7bc239b..41c8a6b 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs @@ -7,7 +7,7 @@ using System; using System.Linq; using Akka.Configuration; -using Microsoft.WindowsAzure.Storage; +using Microsoft.Azure.Cosmos.Table; namespace Akka.Persistence.Azure.Journal { @@ -24,7 +24,8 @@ public AzureTableStorageJournalSettings( TimeSpan connectTimeout, TimeSpan requestTimeout, bool verboseLogging, - bool development) + bool development, + bool autoInitialize) { if(string.IsNullOrWhiteSpace(tableName)) throw new ConfigurationException("[AzureTableStorageJournal] Table name is null or empty."); @@ -43,6 +44,7 @@ public AzureTableStorageJournalSettings( RequestTimeout = requestTimeout; VerboseLogging = verboseLogging; Development = development; + AutoInitialize = autoInitialize; } /// @@ -71,6 +73,8 @@ public AzureTableStorageJournalSettings( public bool VerboseLogging { get; } public bool Development { get; } + + public bool AutoInitialize { get; } /// /// Creates an instance using the @@ -86,6 +90,7 @@ public static AzureTableStorageJournalSettings Create(Config config) var requestTimeout = config.GetTimeSpan("request-timeout", TimeSpan.FromSeconds(3)); var verbose = config.GetBoolean("verbose-logging", false); var development = config.GetBoolean("development", false); + var autoInitialize = config.GetBoolean("auto-initialize", true); return new AzureTableStorageJournalSettings( connectionString, @@ -93,7 +98,8 @@ public static AzureTableStorageJournalSettings Create(Config config) connectTimeout, requestTimeout, verbose, - development); + development, + autoInitialize); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Journal/HighestSequenceNrEntry.cs b/src/Akka.Persistence.Azure/Journal/HighestSequenceNrEntry.cs index 05ff7de..b8f6816 100644 --- a/src/Akka.Persistence.Azure/Journal/HighestSequenceNrEntry.cs +++ b/src/Akka.Persistence.Azure/Journal/HighestSequenceNrEntry.cs @@ -1,8 +1,7 @@ using System; using System.Collections.Generic; using System.Text; -using Microsoft.WindowsAzure.Storage; -using Microsoft.WindowsAzure.Storage.Table; +using Microsoft.Azure.Cosmos.Table; namespace Akka.Persistence.Azure.Journal { diff --git a/src/Akka.Persistence.Azure/ListExtensions.cs b/src/Akka.Persistence.Azure/ListExtensions.cs new file mode 100644 index 0000000..50807ca --- /dev/null +++ b/src/Akka.Persistence.Azure/ListExtensions.cs @@ -0,0 +1,18 @@ +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; + +namespace Akka.Persistence.Azure +{ + public static class ListExtensions + { + public static IImmutableList> ChunkBy(this IEnumerable source, int chunkSize) + { + return source + .Select((x, i) => new { Index = i, Value = x }) + .GroupBy(x => x.Index / chunkSize) + .Select(x => x.Select(v => v.Value).ToList().AsEnumerable()) + .ToImmutableList(); + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotSetup.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotSetup.cs new file mode 100644 index 0000000..c7c26a9 --- /dev/null +++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotSetup.cs @@ -0,0 +1,49 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Akka.Actor.Setup; +using Azure.Identity; +using Azure.Storage.Blobs; + +namespace Akka.Persistence.Azure.Snapshot +{ + public class AzureBlobSnapshotSetup : Setup + { + /// + /// Create a new + /// + /// + /// A referencing the blob service. + /// This is likely to be similar to "https://{account_name}.blob.core.windows.net". + /// + /// + /// The used to sign requests. + /// + /// + /// Optional client options that define the transport pipeline policies for authentication, + /// retries, etc., that are applied to every request. + /// + /// A new instance + public static AzureBlobSnapshotSetup Create( + Uri serviceUri, + DefaultAzureCredential defaultAzureCredential, + BlobClientOptions blobClientOptions = default) + => new AzureBlobSnapshotSetup(serviceUri, defaultAzureCredential, blobClientOptions); + + private AzureBlobSnapshotSetup( + Uri serviceUri, + DefaultAzureCredential azureCredential, + BlobClientOptions blobClientOptions) + { + ServiceUri = serviceUri; + DefaultAzureCredential = azureCredential; + BlobClientOptions = blobClientOptions; + } + + public Uri ServiceUri { get; } + + public DefaultAzureCredential DefaultAzureCredential { get; } + + public BlobClientOptions BlobClientOptions { get; } + } +} diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs index 9e82b8b..a18c295 100644 --- a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs +++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStore.cs @@ -8,14 +8,17 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Net; using System.Threading; using System.Threading.Tasks; using Akka.Configuration; using Akka.Event; using Akka.Persistence.Azure.Util; using Akka.Persistence.Snapshot; -using Microsoft.WindowsAzure.Storage; -using Microsoft.WindowsAzure.Storage.Blob; +using Azure; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; +using Azure.Storage.Blobs.Specialized; namespace Akka.Persistence.Azure.Snapshot { @@ -38,11 +41,11 @@ public class AzureBlobSnapshotStore : SnapshotStore private const string TimeStampMetaDataKey = "Timestamp"; private const string SeqNoMetaDataKey = "SeqNo"; - private readonly Lazy _container; + private readonly Lazy _containerClient; private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly SerializationHelper _serialization; private readonly AzureBlobSnapshotStoreSettings _settings; - private readonly CloudStorageAccount _storageAccount; + private readonly BlobServiceClient _serviceClient; public AzureBlobSnapshotStore(Config config = null) { @@ -51,33 +54,65 @@ public AzureBlobSnapshotStore(Config config = null) ? AzurePersistence.Get(Context.System).BlobSettings : AzureBlobSnapshotStoreSettings.Create(config); - _storageAccount = _settings.Development ? - CloudStorageAccount.DevelopmentStorageAccount : - CloudStorageAccount.Parse(_settings.ConnectionString); + if (_settings.Development) + { + _serviceClient = new BlobServiceClient("UseDevelopmentStorage=true"); + } + else + { + var credentialSetup = Context.System.Settings.Setup.Get(); + if (credentialSetup.HasValue) + { + _serviceClient = new BlobServiceClient( + credentialSetup.Value.ServiceUri, + credentialSetup.Value.DefaultAzureCredential, + credentialSetup.Value.BlobClientOptions); + } + else + { + _serviceClient = new BlobServiceClient(_settings.ConnectionString); + } + } - _container = new Lazy(() => InitCloudStorage(5).Result); + _containerClient = new Lazy(() => InitCloudStorage(5).Result); } - public CloudBlobContainer Container => _container.Value; + public BlobContainerClient Container => _containerClient.Value; - private async Task InitCloudStorage(int remainingTries) + private async Task InitCloudStorage(int remainingTries) { try { - var blobClient = _storageAccount.CreateCloudBlobClient(); - var containerRef = blobClient.GetContainerReference(_settings.ContainerName); - var op = new OperationContext(); + var blobClient = _serviceClient.GetBlobContainerClient(_settings.ContainerName); - using (var cts = new CancellationTokenSource(_settings.ConnectTimeout)) + using var cts = new CancellationTokenSource(_settings.ConnectTimeout); + if (!_settings.AutoInitialize) { - if (await containerRef.CreateIfNotExistsAsync(BlobContainerPublicAccessType.Container, - new BlobRequestOptions(), op, cts.Token)) - _log.Info("Created Azure Blob Container", _settings.ContainerName); - else - _log.Info("Successfully connected to existing container", _settings.ContainerName); + var exists = await blobClient.ExistsAsync(cts.Token); + + if (!exists) + { + remainingTries = 0; + + throw new Exception( + $"Container {_settings.ContainerName} doesn't exist. Either create it or turn auto-initialize on"); + } + + _log.Info("Successfully connected to existing container {0}", _settings.ContainerName); + + return blobClient; } - return containerRef; + var response = await blobClient.CreateIfNotExistsAsync( + _settings.ContainerPublicAccessType, + cancellationToken: cts.Token); + + if (response.GetRawResponse().Status == (int)HttpStatusCode.Created) + _log.Info("Created Azure Blob Container {0}", _settings.ContainerName); + else + _log.Info("Successfully connected to existing container {0}", _settings.ContainerName); + + return blobClient; } catch (Exception ex) { @@ -106,29 +141,25 @@ protected override void PreStart() protected override async Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) { - var requestOptions = GenerateOptions(); - BlobResultSegment results = null; - using (var cts = new CancellationTokenSource(_settings.RequestTimeout)) + using var cts = new CancellationTokenSource(_settings.RequestTimeout); { - results = await Container.ListBlobsSegmentedAsync(SeqNoHelper.ToSnapshotSearchQuery(persistenceId), - true, - BlobListingDetails.Metadata, null, null, requestOptions, new OperationContext(), cts.Token); - } + var results = Container.GetBlobsAsync( + prefix: SeqNoHelper.ToSnapshotSearchQuery(persistenceId), + traits: BlobTraits.Metadata, + cancellationToken: cts.Token); - // if we made it down here, the initial request succeeded. + var pageEnumerator = results.AsPages().GetAsyncEnumerator(cts.Token); - async Task FilterAndFetch(BlobResultSegment segment) - { + if (!await pageEnumerator.MoveNextAsync()) + return null; + + // TODO: see if there's ever a scenario where the most recent snapshots aren't in the first page of the pagination list. // apply filter criteria - var filtered = segment.Results - .Where(x => x is CloudBlockBlob) - .Cast() + var filtered = pageEnumerator.Current.Values .Where(x => FilterBlobSeqNo(criteria, x)) .Where(x => FilterBlobTimestamp(criteria, x)) - .OrderByDescending(x => FetchBlobSeqNo(x)) // ordering matters - get highest seqNo item - .ThenByDescending(x => - FetchBlobTimestamp( - x)) // if there are multiple snapshots taken at same SeqNo, need latest timestamp + .OrderByDescending(FetchBlobSeqNo) // ordering matters - get highest seqNo item + .ThenByDescending(FetchBlobTimestamp) // if there are multiple snapshots taken at same SeqNo, need latest timestamp .FirstOrDefault(); // couldn't find what we were looking for. Onto the next part of the query @@ -136,160 +167,101 @@ async Task FilterAndFetch(BlobResultSegment segment) if (filtered == null) return null; - using (var cts = new CancellationTokenSource(_settings.RequestTimeout)) - using (var memoryStream = new MemoryStream()) - { - await filtered.DownloadToStreamAsync(memoryStream, AccessCondition.GenerateEmptyCondition(), - GenerateOptions(), new OperationContext(), cts.Token); + using var memoryStream = new MemoryStream(); + var blobClient = Container.GetBlockBlobClient(filtered.Name); + var downloadInfo = await blobClient.DownloadAsync(cts.Token); + await downloadInfo.Value.Content.CopyToAsync(memoryStream); - var snapshot = _serialization.SnapshotFromBytes(memoryStream.ToArray()); + var snapshot = _serialization.SnapshotFromBytes(memoryStream.ToArray()); - var returnValue = - new SelectedSnapshot( - new SnapshotMetadata( - persistenceId, - FetchBlobSeqNo(filtered), - new DateTime(FetchBlobTimestamp(filtered))), - snapshot.Data); + var result = + new SelectedSnapshot( + new SnapshotMetadata( + persistenceId, + FetchBlobSeqNo(filtered), + new DateTime(FetchBlobTimestamp(filtered))), + snapshot.Data); - return returnValue; - } + return result; } - - // TODO: see if there's ever a scenario where the most recent snapshots aren't in the beginning of the pagination list. - var result = await FilterAndFetch(results); - return result; } protected override async Task SaveAsync(SnapshotMetadata metadata, object snapshot) { - var blob = Container.GetBlockBlobReference(metadata.ToSnapshotBlobId()); + var blobClient = Container.GetBlockBlobClient(metadata.ToSnapshotBlobId()); var snapshotData = _serialization.SnapshotToBytes(new Serialization.Snapshot(snapshot)); - using (var cts = new CancellationTokenSource(_settings.RequestTimeout)) + using var cts = new CancellationTokenSource(_settings.RequestTimeout); + var blobMetadata = new Dictionary { - blob.Metadata.Add(TimeStampMetaDataKey, metadata.Timestamp.Ticks.ToString()); - + [TimeStampMetaDataKey] = metadata.Timestamp.Ticks.ToString(), /* * N.B. No need to convert the key into the Journal format we use here. * The blobs themselves don't have their sort order affected by * the presence of this metadata, so we should just save the SeqNo * in a format that can be easily deserialized later. */ - blob.Metadata.Add(SeqNoMetaDataKey, metadata.SequenceNr.ToString()); - - await blob.UploadFromByteArrayAsync( - snapshotData, - 0, - snapshotData.Length, - AccessCondition.GenerateEmptyCondition(), - GenerateOptions(), - new OperationContext(), - cts.Token); - } + [SeqNoMetaDataKey] = metadata.SequenceNr.ToString() + }; + + using var stream = new MemoryStream(snapshotData); + await blobClient.UploadAsync( + stream, + metadata: blobMetadata, + cancellationToken: cts.Token); } protected override async Task DeleteAsync(SnapshotMetadata metadata) { - var blob = Container.GetBlockBlobReference(metadata.ToSnapshotBlobId()); - using (var cts = new CancellationTokenSource(_settings.RequestTimeout)) - { - await blob.DeleteIfExistsAsync(DeleteSnapshotsOption.None, AccessCondition.GenerateEmptyCondition(), - GenerateOptions(), new OperationContext(), - cts.Token); - } + var blobClient = Container.GetBlobClient(metadata.ToSnapshotBlobId()); + + using var cts = new CancellationTokenSource(_settings.RequestTimeout); + await blobClient.DeleteIfExistsAsync(cancellationToken: cts.Token); } protected override async Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria) { - var requestOptions = GenerateOptions(); - BlobResultSegment results = null; - using (var cts = new CancellationTokenSource(_settings.RequestTimeout)) + using var cts = new CancellationTokenSource(_settings.RequestTimeout); + var items = Container.GetBlobsAsync( + prefix: SeqNoHelper.ToSnapshotSearchQuery(persistenceId), + traits: BlobTraits.Metadata, + cancellationToken: cts.Token); + + var filtered = items + .Where(x => FilterBlobSeqNo(criteria, x)) + .Where(x => FilterBlobTimestamp(criteria, x)); + + var deleteTasks = new List(); + await foreach (var blob in filtered.WithCancellation(cts.Token)) { - /* - * Query only the metadata - don't need to stream the entire blob back to us - * in order to delete it from storage in the next request. - */ - results = await Container.ListBlobsSegmentedAsync(SeqNoHelper.ToSnapshotSearchQuery(persistenceId), - true, - BlobListingDetails.Metadata, null, null, requestOptions, new OperationContext(), cts.Token); - } - - // if we made it down here, the initial request succeeded. - - async Task FilterAndDelete(BlobResultSegment segment) - { - // apply filter criteria - var filtered = segment.Results.Where(x => x is CloudBlockBlob) - .Cast() - .Where(x => FilterBlobSeqNo(criteria, x)) - .Where(x => FilterBlobTimestamp(criteria, x)); - - var deleteTasks = new List(); - using (var cts = new CancellationTokenSource(_settings.RequestTimeout)) - { - foreach (var blob in filtered) - deleteTasks.Add(blob.DeleteIfExistsAsync(DeleteSnapshotsOption.None, - AccessCondition.GenerateEmptyCondition(), - GenerateOptions(), new OperationContext(), cts.Token)); - - await Task.WhenAll(deleteTasks); - } - } - - var continuationToken = results.ContinuationToken; - var deleteTask = FilterAndDelete(results); - - while (continuationToken != null) - { - // get the next round of results in parallel with the deletion of the previous - var nextResults = await Container.ListBlobsSegmentedAsync(continuationToken); - - // finish our previous delete tasks - await deleteTask; - - // start next round of deletes - deleteTask = FilterAndDelete(nextResults); - - // move the loop forward if there are more results to be processed still - continuationToken = nextResults.ContinuationToken; + var blobClient = Container.GetBlobClient(blob.Name); + deleteTasks.Add(blobClient.DeleteIfExistsAsync(cancellationToken: cts.Token)); } - // wait for the final delete operation to complete - await deleteTask; + await Task.WhenAll(deleteTasks); } - private static bool FilterBlobSeqNo(SnapshotSelectionCriteria criteria, CloudBlob x) + private static bool FilterBlobSeqNo(SnapshotSelectionCriteria criteria, BlobItem x) { var seqNo = FetchBlobSeqNo(x); return seqNo <= criteria.MaxSequenceNr && seqNo >= criteria.MinSequenceNr; } - private static long FetchBlobSeqNo(CloudBlob x) + private static long FetchBlobSeqNo(BlobItem x) { return long.Parse(x.Metadata[SeqNoMetaDataKey]); } - private static bool FilterBlobTimestamp(SnapshotSelectionCriteria criteria, CloudBlob x) + private static bool FilterBlobTimestamp(SnapshotSelectionCriteria criteria, BlobItem x) { var ticks = FetchBlobTimestamp(x); return ticks <= criteria.MaxTimeStamp.Ticks && (!criteria.MinTimestamp.HasValue || ticks >= criteria.MinTimestamp.Value.Ticks); } - private static long FetchBlobTimestamp(CloudBlob x) + private static long FetchBlobTimestamp(BlobItem x) { return long.Parse(x.Metadata[TimeStampMetaDataKey]); } - - private BlobRequestOptions GenerateOptions() - { - return GenerateOptions(_settings); - } - - private static BlobRequestOptions GenerateOptions(AzureBlobSnapshotStoreSettings settings) - { - return new BlobRequestOptions { MaximumExecutionTime = settings.RequestTimeout }; - } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs index d40abd7..fe294e4 100644 --- a/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs +++ b/src/Akka.Persistence.Azure/Snapshot/AzureBlobSnapshotStoreSettings.cs @@ -6,7 +6,8 @@ using System; using Akka.Configuration; -using Microsoft.WindowsAzure.Storage; +using Akka.Persistence.Azure.Util; +using Azure.Storage.Blobs.Models; namespace Akka.Persistence.Azure.Snapshot { @@ -16,12 +17,35 @@ namespace Akka.Persistence.Azure.Snapshot /// public sealed class AzureBlobSnapshotStoreSettings { - public AzureBlobSnapshotStoreSettings(string connectionString, string containerName, - TimeSpan connectTimeout, TimeSpan requestTimeout, bool verboseLogging, bool development) + [Obsolete] + public AzureBlobSnapshotStoreSettings( + string connectionString, + string containerName, + TimeSpan connectTimeout, + TimeSpan requestTimeout, + bool verboseLogging, + bool development, + bool autoInitialize) + : this(connectionString, containerName, connectTimeout, requestTimeout, verboseLogging, development, autoInitialize, PublicAccessType.BlobContainer) + { } + + public AzureBlobSnapshotStoreSettings( + string connectionString, + string containerName, + TimeSpan connectTimeout, + TimeSpan requestTimeout, + bool verboseLogging, + bool development, + bool autoInitialize, + PublicAccessType containerPublicAccessType) { if (string.IsNullOrWhiteSpace(containerName)) throw new ConfigurationException("[AzureBlobSnapshotStore] Container name is null or empty."); + if (string.IsNullOrWhiteSpace(connectionString) && !development) + throw new ConfigurationException( + "Invalid [connection-string] value. Connection string must not be null or empty when development mode is not set."); + NameValidator.ValidateContainerName(containerName); ConnectionString = connectionString; ContainerName = containerName; @@ -29,6 +53,8 @@ public AzureBlobSnapshotStoreSettings(string connectionString, string containerN ConnectTimeout = connectTimeout; VerboseLogging = verboseLogging; Development = development; + AutoInitialize = autoInitialize; + ContainerPublicAccessType = containerPublicAccessType; } /// @@ -58,6 +84,10 @@ public AzureBlobSnapshotStoreSettings(string connectionString, string containerN public bool Development { get; } + public bool AutoInitialize { get; } + + public PublicAccessType ContainerPublicAccessType { get; } + /// /// Creates an instance using the /// `akka.persistence.snapshot-store.azure-blob-store` HOCON configuration section. @@ -72,6 +102,13 @@ public static AzureBlobSnapshotStoreSettings Create(Config config) var requestTimeout = config.GetTimeSpan("request-timeout", TimeSpan.FromSeconds(3)); var verbose = config.GetBoolean("verbose-logging", false); var development = config.GetBoolean("development", false); + var autoInitialize = config.GetBoolean("auto-initialize", true); + + var accessType = config.GetString("container-public-access-type", PublicAccessType.BlobContainer.ToString()); + + if (!Enum.TryParse(accessType, true, out var containerPublicAccessType)) + throw new ConfigurationException( + "Invalid [container-public-access-type] value. Valid values are 'None', 'Blob', and 'BlobContainer'"); return new AzureBlobSnapshotStoreSettings( connectionString, @@ -79,7 +116,9 @@ public static AzureBlobSnapshotStoreSettings Create(Config config) connectTimeout, requestTimeout, verbose, - development); + development, + autoInitialize, + containerPublicAccessType); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs b/src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs index f2f5005..5c329ba 100644 --- a/src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs +++ b/src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; -using Microsoft.WindowsAzure.Storage; -using Microsoft.WindowsAzure.Storage.Table; +using Microsoft.Azure.Cosmos.Table; namespace Akka.Persistence.Azure.TableEntities { diff --git a/src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs b/src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs index 02b3c31..831ebbb 100644 --- a/src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs +++ b/src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs @@ -2,8 +2,7 @@ using System.Collections.Generic; using System.Linq; using Akka.Persistence.Azure.Util; -using Microsoft.WindowsAzure.Storage; -using Microsoft.WindowsAzure.Storage.Table; +using Microsoft.Azure.Cosmos.Table; namespace Akka.Persistence.Azure.TableEntities { diff --git a/src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs b/src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs index 6c3f848..5c12e33 100644 --- a/src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs +++ b/src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; -using Microsoft.WindowsAzure.Storage; -using Microsoft.WindowsAzure.Storage.Table; +using Microsoft.Azure.Cosmos.Table; namespace Akka.Persistence.Azure.TableEntities { diff --git a/src/Akka.Persistence.Azure/TableEntities/PersistentJournalEntry.cs b/src/Akka.Persistence.Azure/TableEntities/PersistentJournalEntry.cs index c6bec5d..8ba2aa6 100644 --- a/src/Akka.Persistence.Azure/TableEntities/PersistentJournalEntry.cs +++ b/src/Akka.Persistence.Azure/TableEntities/PersistentJournalEntry.cs @@ -6,11 +6,10 @@ using Akka.Persistence.Azure.Journal; using Akka.Persistence.Azure.Util; -using Microsoft.WindowsAzure.Storage; -using Microsoft.WindowsAzure.Storage.Table; using System; using System.Collections.Generic; using System.Linq; +using Microsoft.Azure.Cosmos.Table; namespace Akka.Persistence.Azure.TableEntities { diff --git a/src/Akka.Persistence.Azure/Util/NameValidator.cs b/src/Akka.Persistence.Azure/Util/NameValidator.cs new file mode 100644 index 0000000..ca1dd43 --- /dev/null +++ b/src/Akka.Persistence.Azure/Util/NameValidator.cs @@ -0,0 +1,192 @@ +// ----------------------------------------------------------------------------------------- +// +// Copyright 2013 Microsoft Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// ---------------------------------------------------------------------------------------- + +using System; +using System.Globalization; +using System.Text.RegularExpressions; + +namespace Akka.Persistence.Azure.Util +{ + /// + /// Provides helpers to validate resource names across the Microsoft Azure Storage Services. + /// + public static class NameValidator + { + private const int BlobFileDirectoryMinLength = 1; + private const int ContainerShareQueueTableMinLength = 3; + private const int ContainerShareQueueTableMaxLength = 63; + private const int FileDirectoryMaxLength = 255; + private const int BlobMaxLength = 1024; + private static readonly string[] ReservedFileNames = { ".", "..", "LPT1", "LPT2", "LPT3", "LPT4", "LPT5", "LPT6", "LPT7", "LPT8", "LPT9", "COM1", "COM2", "COM3", "COM4", "COM5", "COM6", "COM7", "COM8", "COM9", "PRN", "AUX", "NUL", "CON", "CLOCK$" }; + private static readonly RegexOptions RegexOptions = RegexOptions.Singleline | RegexOptions.ExplicitCapture | RegexOptions.CultureInvariant; + private static readonly Regex FileDirectoryRegex = new Regex(@"^[^""\\/:|<>*?]*\/{0,1}$", RegexOptions); + private static readonly Regex ShareContainerQueueRegex = new Regex("^[a-z0-9]+(-[a-z0-9]+)*$", RegexOptions); + private static readonly Regex TableRegex = new Regex("^[A-Za-z][A-Za-z0-9]*$", RegexOptions); + private static readonly Regex MetricsTableRegex = new Regex(@"^\$Metrics(HourPrimary|MinutePrimary|HourSecondary|MinuteSecondary)?(Transactions)(Blob|Queue|Table)$", RegexOptions); + + /// + /// Checks if a container name is valid. + /// + /// A string representing the container name to validate. + public static void ValidateContainerName(string containerName) + { + if (!("$root".Equals(containerName, StringComparison.Ordinal) || "$logs".Equals(containerName, StringComparison.Ordinal))) + { + ValidateShareContainerQueueHelper(containerName, "container"); + } + } + + /// + /// Checks if a queue name is valid. + /// + /// A string representing the queue name to validate. + public static void ValidateQueueName(string queueName) + { + ValidateShareContainerQueueHelper(queueName, "queue"); + } + + /// + /// Checks if a share name is valid. + /// + /// A string representing the share name to validate. + public static void ValidateShareName(string shareName) + { + ValidateShareContainerQueueHelper(shareName, "share"); + } + + private static void ValidateShareContainerQueueHelper(string resourceName, string resourceType) + { + if (string.IsNullOrWhiteSpace(resourceName)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. The {0} name may not be null, empty, or whitespace only.", resourceType)); + } + + if (resourceName.Length < ContainerShareQueueTableMinLength || resourceName.Length > ContainerShareQueueTableMaxLength) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name length. The {0} name must be between {1} and {2} characters long.", resourceType, ContainerShareQueueTableMinLength, ContainerShareQueueTableMaxLength)); + } + + if (!ShareContainerQueueRegex.IsMatch(resourceName)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. Check MSDN for more information about valid {0} naming.", resourceType)); + } + } + + /// + /// Checks if a blob name is valid. + /// + /// A string representing the blob name to validate. + public static void ValidateBlobName(string blobName) + { + if (string.IsNullOrWhiteSpace(blobName)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. The {0} name may not be null, empty, or whitespace only.", "blob")); + } + + if (blobName.Length < BlobFileDirectoryMinLength || blobName.Length > BlobMaxLength) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name length. The {0} name must be between {1} and {2} characters long.", "blob", BlobFileDirectoryMinLength, BlobMaxLength)); + } + + int slashCount = 0; + foreach (char c in blobName) + { + if (c == '/') + { + slashCount++; + } + } + + // 254 slashes means 255 path segments; max 254 segments for blobs, 255 includes container. + if (slashCount >= 254) + { + throw new ArgumentException("The count of URL path segments (strings between '/' characters) as part of the blob name cannot exceed 254."); + } + } + + /// + /// Checks if a file name is valid. + /// + /// A string representing the file name to validate. + public static void ValidateFileName(string fileName) + { + ValidateFileDirectoryHelper(fileName, "file"); + + if (fileName.EndsWith("/", StringComparison.Ordinal)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. Check MSDN for more information about valid {0} naming.", "file")); + } + + foreach (string s in ReservedFileNames) + { + if (s.Equals(fileName, StringComparison.OrdinalIgnoreCase)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. This {0} name is reserved.", "file")); + } + } + } + + /// + /// Checks if a directory name is valid. + /// + /// A string representing the directory name to validate. + public static void ValidateDirectoryName(string directoryName) + { + ValidateFileDirectoryHelper(directoryName, "directory"); + } + + private static void ValidateFileDirectoryHelper(string resourceName, string resourceType) + { + if (string.IsNullOrWhiteSpace(resourceName)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. The {0} name may not be null, empty, or whitespace only.", resourceType)); + } + + if (resourceName.Length < BlobFileDirectoryMinLength || resourceName.Length > FileDirectoryMaxLength) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name length. The {0} name must be between {1} and {2} characters long.", resourceType, BlobFileDirectoryMinLength, FileDirectoryMaxLength)); + } + + if (!FileDirectoryRegex.IsMatch(resourceName)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. Check MSDN for more information about valid {0} naming.", resourceType)); + } + } + + /// + /// Checks if a table name is valid. + /// + /// A string representing the table name to validate. + public static void ValidateTableName(string tableName) + { + if (string.IsNullOrWhiteSpace(tableName)) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. The {0} name may not be null, empty, or whitespace only.", "table")); + } + + if (tableName.Length < ContainerShareQueueTableMinLength || tableName.Length > ContainerShareQueueTableMaxLength) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name length. The {0} name must be between {1} and {2} characters long.", "table", ContainerShareQueueTableMinLength, ContainerShareQueueTableMaxLength)); + } + + if (!(TableRegex.IsMatch(tableName) || MetricsTableRegex.IsMatch(tableName) || tableName.Equals("$MetricsCapacityBlob", StringComparison.OrdinalIgnoreCase))) + { + throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "Invalid {0} name. Check MSDN for more information about valid {0} naming.", "table")); + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/reference.conf b/src/Akka.Persistence.Azure/reference.conf index be83a76..acb95a4 100644 --- a/src/Akka.Persistence.Azure/reference.conf +++ b/src/Akka.Persistence.Azure/reference.conf @@ -31,6 +31,9 @@ akka.persistence { # Support for Azure Storage Emulator for local development. # Will ignore connection string settings if turned on. development = off + + # Creates the required table if set + auto-initialize = on } } @@ -86,6 +89,13 @@ akka.persistence { # Support for Azure Storage Emulator for local development. # Will ignore connection string settings if turned on. development = off + + # Creates the required container if set + auto-initialize = on + + # Public access level for the auto-initialized storage container. + # Valid values are "None", "BlobContainer" or "Blob" + container-public-access-type = "None" } } } \ No newline at end of file diff --git a/src/common.props b/src/common.props index 0e535ea..1b8ec8c 100644 --- a/src/common.props +++ b/src/common.props @@ -16,9 +16,9 @@ 2.4.1 - 1.4.14 - 4.14.0 - 16.8.3 + 1.4.18 + 5.10.3 + 16.9.4 netcoreapp3.1 net461 netstandard2.0