diff --git a/.gitignore b/.gitignore index 4f6922e..3a649ea 100644 --- a/.gitignore +++ b/.gitignore @@ -302,3 +302,5 @@ __pycache__/ *.btm.cs *.odx.cs *.xsd.cs +package-lock.json +azurite/ diff --git a/Akka.Persistence.Azure.sln b/Akka.Persistence.Azure.sln index 4a649bf..bcd72c8 100644 --- a/Akka.Persistence.Azure.sln +++ b/Akka.Persistence.Azure.sln @@ -9,8 +9,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Persistence.Azure.Test EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Persistence.Azure.Tests.Performance", "src\Akka.Persistence.Azure.Tests.Performance\Akka.Persistence.Azure.Tests.Performance.csproj", "{CAE7CA7C-0D0C-4FDA-BDE9-BE16A27343EF}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Persistence.Azure.TestHelpers", "src\Akka.Persistence.Azure.TestHelpers\Akka.Persistence.Azure.TestHelpers.csproj", "{FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "build", "build", "{320BFA6C-930A-4B7F-9FA4-76CDD0F0E552}" ProjectSection(SolutionItems) = preProject build.cmd = build.cmd @@ -37,10 +35,6 @@ Global {CAE7CA7C-0D0C-4FDA-BDE9-BE16A27343EF}.Debug|Any CPU.Build.0 = Debug|Any CPU {CAE7CA7C-0D0C-4FDA-BDE9-BE16A27343EF}.Release|Any CPU.ActiveCfg = Release|Any CPU {CAE7CA7C-0D0C-4FDA-BDE9-BE16A27343EF}.Release|Any CPU.Build.0 = Release|Any CPU - {FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Debug|Any CPU.Build.0 = Debug|Any CPU - {FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Release|Any CPU.ActiveCfg = Release|Any CPU - {FE4C3232-1DB9-40E5-B0BE-BD70ACB4C1F3}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c891df9..064b44f 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,3 +1,9 @@ +#### 0.8.4 June 2 2022 #### +* Upgraded to [Akka.NET 1.4.39](https://github.com/akkadotnet/akka.net/releases/tag/1.4.39) +* [Update Azure.Identity to 1.6.0](https://github.com/petabridge/Akka.Persistence.Azure/pull/205) +* [Update System.Linq.Async to 6.0.1](https://github.com/petabridge/Akka.Persistence.Azure/pull/198) +* [Upgrade `Microsoft.Azure.Consmos.Table` to `Azure.Data.Tables` 12.5.0](https://github.com/petabridge/Akka.Persistence.Azure/pull/207) + #### 0.8.3 September 9 2021 #### * Upgraded to [Akka.NET 1.4.25](https://github.com/akkadotnet/akka.net/releases/tag/1.4.25) * [Update Azure.Identity to 1.4.1](https://github.com/petabridge/Akka.Persistence.Azure/pull/176) diff --git a/azurite.cmd b/azurite.cmd new file mode 100644 index 0000000..28e20ab --- /dev/null +++ b/azurite.cmd @@ -0,0 +1 @@ +start /B azurite --silent --location azurite --debug azirute/debug.log \ No newline at end of file diff --git a/build-system/azure-pipeline.template.yaml b/build-system/azure-pipeline.template.yaml index dd59d85..696095d 100644 --- a/build-system/azure-pipeline.template.yaml +++ b/build-system/azure-pipeline.template.yaml @@ -16,6 +16,12 @@ jobs: submodules: recursive # set to 'true' for a single level of submodules or 'recursive' to get submodules of submodules persistCredentials: true # Linux or macOS + - bash: | + sudo npm install -g azurite@3.17.1 + sudo mkdir azurite + sudo azurite --silent --location azurite --debug azirute/debug.log & + displayName: 'Install and Run Azurite (Linux)' + condition: in( variables['Agent.OS'], 'Linux', 'Darwin' ) - task: Bash@3 displayName: Linux / OSX Build inputs: @@ -24,8 +30,14 @@ jobs: continueOnError: true condition: in( variables['Agent.OS'], 'Linux', 'Darwin' ) env: - AZURE_CONNECTION_STR: $(azureConnectionString) + AZURE_CONNECTION_STR: UseDevelopmentStorage=true # Windows + - powershell: | + npm install -g azurite@3.17.1 + mkdir azurite + Start-Process azurite.cmd -PassThru + displayName: 'Install and Run Azurite (Windows)' + condition: eq( variables['Agent.OS'], 'Windows_NT' ) - task: BatchScript@1 displayName: Windows Build inputs: @@ -34,7 +46,7 @@ jobs: continueOnError: true condition: eq( variables['Agent.OS'], 'Windows_NT' ) env: - AZURE_CONNECTION_STR: $(azureConnectionString) + AZURE_CONNECTION_STR: UseDevelopmentStorage=true - task: PublishTestResults@2 inputs: testRunner: VSTest diff --git a/build-system/linux-pr-validation.yaml b/build-system/linux-pr-validation.yaml index 61f748e..d4289e8 100644 --- a/build-system/linux-pr-validation.yaml +++ b/build-system/linux-pr-validation.yaml @@ -17,6 +17,6 @@ jobs: - template: azure-pipeline.template.yaml parameters: name: Ubuntu - vmImage: 'ubuntu-16.04' + vmImage: 'ubuntu-latest' scriptFileName: ./build.sh - scriptArgs: all \ No newline at end of file + scriptArgs: all diff --git a/build-system/windows-pr-validation.yaml b/build-system/windows-pr-validation.yaml index 0b50b03..e7b0b03 100644 --- a/build-system/windows-pr-validation.yaml +++ b/build-system/windows-pr-validation.yaml @@ -17,6 +17,6 @@ jobs: - template: azure-pipeline.template.yaml parameters: name: Windows - vmImage: 'windows-2019' + vmImage: 'windows-latest' scriptFileName: build.cmd - scriptArgs: all \ No newline at end of file + scriptArgs: all diff --git a/build-system/windows-release.yaml b/build-system/windows-release.yaml index 602bee2..185d995 100644 --- a/build-system/windows-release.yaml +++ b/build-system/windows-release.yaml @@ -2,7 +2,7 @@ # See https://docs.microsoft.com/en-us/azure/devops/pipelines/yaml-schema for reference pool: - vmImage: vs2017-win2016 + vmImage: windows-latest demands: Cmd trigger: @@ -35,4 +35,4 @@ steps: title: '$(projectName) v$(Build.SourceBranchName)' releaseNotesFile: 'RELEASE_NOTES.md' assets: | - bin\nuget\*.nupkg \ No newline at end of file + bin\nuget\*.nupkg diff --git a/src/Akka.Persistence.Azure.TestHelpers/Akka.Persistence.Azure.TestHelpers.csproj b/src/Akka.Persistence.Azure.TestHelpers/Akka.Persistence.Azure.TestHelpers.csproj deleted file mode 100644 index f9f6848..0000000 --- a/src/Akka.Persistence.Azure.TestHelpers/Akka.Persistence.Azure.TestHelpers.csproj +++ /dev/null @@ -1,15 +0,0 @@ - - - - $(NetStandardLibVersion) - false - - - - - - - - - - diff --git a/src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs b/src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs deleted file mode 100644 index 93ece8c..0000000 --- a/src/Akka.Persistence.Azure.TestHelpers/DbUtils.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System.Threading.Tasks; -using Akka.Actor; -using Microsoft.Azure.Cosmos.Table; - -namespace Akka.Persistence.Azure.TestHelpers -{ - public class DbUtils - { - public static Task CleanupCloudTable(string connectionString, string tableName) - { - var account = CloudStorageAccount.Parse(connectionString); - var table = account.CreateCloudTableClient().GetTableReference(tableName); - return table.DeleteIfExistsAsync(); - } - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.TestHelpers/IAsyncFixture.cs b/src/Akka.Persistence.Azure.TestHelpers/IAsyncFixture.cs deleted file mode 100644 index 04830a7..0000000 --- a/src/Akka.Persistence.Azure.TestHelpers/IAsyncFixture.cs +++ /dev/null @@ -1,21 +0,0 @@ -// ----------------------------------------------------------------------- -// -// Copyright (C) 2015 - 2018 Petabridge, LLC -// -// ----------------------------------------------------------------------- - -using System.Threading.Tasks; - -namespace Akka.Persistence.Azure.TestHelpers -{ - /// - /// Describes an asynchronous test fixture that we may use in both - /// XUnit and NBench specs for integration testing purposes. - /// - public interface IAsyncFixture - { - Task Initialize(); - - Task CleanUp(); - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.TestHelpers/WindowsAzureStorageEmulatorFixture.cs b/src/Akka.Persistence.Azure.TestHelpers/WindowsAzureStorageEmulatorFixture.cs deleted file mode 100644 index 96599eb..0000000 --- a/src/Akka.Persistence.Azure.TestHelpers/WindowsAzureStorageEmulatorFixture.cs +++ /dev/null @@ -1,132 +0,0 @@ -// ----------------------------------------------------------------------- -// -// Copyright (C) 2015 - 2018 Petabridge, LLC -// -// ----------------------------------------------------------------------- - -using System; -using System.Collections.Generic; -using System.Runtime.InteropServices; -using System.Threading.Tasks; -using Docker.DotNet; -using Docker.DotNet.Models; - -namespace Akka.Persistence.Azure.TestHelpers -{ - /// - /// Integration testing fixture using the Windows Azure Storage Emulator - /// Docker image provided by Microsoft: https://hub.docker.com/r/microsoft/azure-storage-emulator/ - /// - public class WindowsAzureStorageEmulatorFixture : IAsyncFixture - { - private const string AzureStorageImageName = "microsoft/azure-storage-emulator"; - private readonly string _azureStorageContainerName = $"azurestorage-{Guid.NewGuid():N}"; - private DockerClient _client; - - public string ConnectionString { get; private set; } - - public async Task Initialize() - { - DockerClientConfiguration config; - if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)) - config = new DockerClientConfiguration(new Uri("unix:///var/run/docker.sock")); - else if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows)) - config = new DockerClientConfiguration(new Uri("npipe://./pipe/docker_engine")); - else - throw new NotSupportedException($"Unsupported OS [{RuntimeInformation.OSDescription}]"); - - _client = config.CreateClient(); - - var images = - await _client.Images.ListImagesAsync(new ImagesListParameters - { - Filters = new Dictionary> - { - ["reference"] = new Dictionary - { - [AzureStorageImageName] = true - } - } - }); - if (images.Count == 0) - await _client.Images.CreateImageAsync( - new ImagesCreateParameters {FromImage = AzureStorageImageName, Tag = "latest"}, null, - new Progress()); - - var azureBlobPort = 10000; - var azureQueuePort = 10001; - var azureTablePort = 10002; - - // create the container - await _client.Containers.CreateContainerAsync(new CreateContainerParameters - { - Image = AzureStorageImageName, - Name = _azureStorageContainerName, - Tty = true, - HostConfig = new HostConfig - { - PortBindings = new Dictionary> - { - { - "10000/tcp", - new List - { - new PortBinding - { - HostPort = $"{azureBlobPort}" - } - } - }, - - { - "10001/tcp", - new List - { - new PortBinding - { - HostPort = $"{azureQueuePort}" - } - } - }, - - { - "10002/tcp", - new List - { - new PortBinding - { - HostPort = $"{azureTablePort}" - } - } - } - } - } - }); - - // start the container - await _client.Containers.StartContainerAsync(_azureStorageContainerName, new ContainerStartParameters()); - - ConnectionString = GenerateConnStr(); - - await Task.Delay(TimeSpan.FromSeconds(10)); - } - - public async Task CleanUp() - { - if (_client != null) - { - await _client.Containers.StopContainerAsync(_azureStorageContainerName, new ContainerStopParameters()); - await _client.Containers.RemoveContainerAsync(_azureStorageContainerName, - new ContainerRemoveParameters {Force = true}); - _client.Dispose(); - } - } - - public static string GenerateConnStr(string ip = "127.0.0.1", int blobport = 10000, int queueport = 10001, - int tableport = 10002) - { - return - $"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://{ip}:{blobport}/devstoreaccount1;TableEndpoint=http://{ip}:{tableport}/devstoreaccount1;QueueEndpoint=http://{ip}:{queueport}/devstoreaccount1;"; - } - } -} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj b/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj index c85d9b9..1535977 100644 --- a/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj +++ b/src/Akka.Persistence.Azure.Tests.Performance/Akka.Persistence.Azure.Tests.Performance.csproj @@ -13,7 +13,6 @@ - diff --git a/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs b/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs index e855e6f..d02aab1 100644 --- a/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs +++ b/src/Akka.Persistence.Azure.Tests.Performance/AzureJournalPerfSpecs.cs @@ -7,18 +7,20 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Threading; using System.Threading.Tasks; using Akka.Actor; using Akka.Configuration; -using Akka.Persistence.Azure.TestHelpers; using Akka.Util.Internal; +using Azure.Data.Tables; +using Azure.Storage.Blobs; using NBench; namespace Akka.Persistence.Azure.Tests.Performance { public class AzureJournalPerfSpecs { + public const string ConnectionString = "UseDevelopmentStorage=true"; + public const string RecoveryCounterName = "MsgRecovered"; private Counter _recoveryCounter; @@ -35,10 +37,12 @@ public class AzureJournalPerfSpecs public static Config JournalConfig() { - if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"))) - return JournalConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")); + var connString = Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"); + if (string.IsNullOrWhiteSpace(connString)) + connString = ConnectionString; - return JournalConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); + CleanupCloudTable(connString).Wait(); + return JournalConfig(connString); } public static Config JournalConfig(string connectionString) @@ -55,6 +59,22 @@ public static Config JournalConfig(string connectionString) .WithFallback("akka.persistence.journal.azure-table.table-name=" + TableName); } + public static async Task CleanupCloudTable(string connectionString) + { + var tableClient = new TableServiceClient(connectionString); + + await foreach(var table in tableClient.QueryAsync()) + { + await tableClient.DeleteTableAsync(table.Name); + } + + var blobClient = new BlobServiceClient(connectionString); + foreach (var blobContainer in blobClient.GetBlobContainers()) + { + await blobClient.DeleteBlobContainerAsync(blobContainer.Name); + } + } + private ActorSystem ActorSystem { get; set; } private List _persistentActors = new List(PersistentActorCount); @@ -65,7 +85,6 @@ public void Setup(BenchmarkContext context) _recoveryCounter = context.GetCounter(RecoveryCounterName); _writeCounter = context.GetCounter(WriteCounterName); - ActorSystem = Actor.ActorSystem.Create(nameof(AzureJournalPerfSpecs) + TableVersionCounter.Current, JournalConfig()); foreach (var i in Enumerable.Range(0, PersistentActorCount)) @@ -121,7 +140,7 @@ public void CleanUp() try { - DbUtils.CleanupCloudTable(AzurePersistence.Get(ActorSystem).TableSettings.ConnectionString, TableName).Wait(TimeSpan.FromSeconds(3)); + CleanupCloudTable(AzurePersistence.Get(ActorSystem).TableSettings.ConnectionString).Wait(TimeSpan.FromSeconds(3)); } catch { } } diff --git a/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj b/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj index 2a30455..b03985a 100644 --- a/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj +++ b/src/Akka.Persistence.Azure.Tests/Akka.Persistence.Azure.Tests.csproj @@ -1,6 +1,5 @@  - $(NetCoreTestVersion) @@ -8,15 +7,15 @@ + - + all runtime; build; native; contentfiles; analyzers; buildtransitive - diff --git a/src/Akka.Persistence.Azure.Tests/AzureBlobSnapshotStoreSerializationSpec.cs b/src/Akka.Persistence.Azure.Tests/AzureBlobSnapshotStoreSerializationSpec.cs index 718fcd2..fd6d9ae 100644 --- a/src/Akka.Persistence.Azure.Tests/AzureBlobSnapshotStoreSerializationSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzureBlobSnapshotStoreSerializationSpec.cs @@ -6,29 +6,21 @@ using System; using Akka.Configuration; -using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.Azure.Tests.Helper; using Akka.Persistence.TCK.Serialization; using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests { - [Collection("AzureSnapshot")] + [Collection("AzureSpecs")] public class AzureBlobSnapshotStoreSerializationSpec : SnapshotStoreSerializationSpec { - public AzureBlobSnapshotStoreSerializationSpec(ITestOutputHelper output) : base(Config(), - nameof(AzureTableJournalSpec), output) + public AzureBlobSnapshotStoreSerializationSpec(ITestOutputHelper output) + : base(AzureConfig(), nameof(AzureBlobSnapshotStoreSerializationSpec), output) { AzurePersistence.Get(Sys); } - - public static Config Config() - { - if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"))) - return AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")); - - return AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/AzureBlobSnapshotStoreSpec.cs b/src/Akka.Persistence.Azure.Tests/AzureBlobSnapshotStoreSpec.cs index 239fd64..a94842f 100644 --- a/src/Akka.Persistence.Azure.Tests/AzureBlobSnapshotStoreSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzureBlobSnapshotStoreSpec.cs @@ -6,7 +6,7 @@ using System; using Akka.Configuration; -using Akka.Persistence.Azure.TestHelpers; +using Akka.Persistence.Azure.Tests.Helper; using Akka.Persistence.TCK.Snapshot; using Xunit; using Xunit.Abstractions; @@ -14,22 +14,14 @@ namespace Akka.Persistence.Azure.Tests { - [Collection("AzureSnapshot")] + [Collection("AzureSpecs")] public class AzureBlobSnapshotStoreSpec : SnapshotStoreSpec { - public AzureBlobSnapshotStoreSpec(ITestOutputHelper output) : base(Config(), - nameof(AzureTableJournalSpec), output) + public AzureBlobSnapshotStoreSpec(ITestOutputHelper output) + : base(AzureConfig(), nameof(AzureBlobSnapshotStoreSpec), output) { AzurePersistence.Get(Sys); Initialize(); } - - public static Config Config() - { - if (!string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"))) - return AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")); - - return AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/AzureTableJournalEscapePersistentIdSpec.cs b/src/Akka.Persistence.Azure.Tests/AzureTableJournalEscapePersistentIdSpec.cs index 705a2a4..6445d96 100644 --- a/src/Akka.Persistence.Azure.Tests/AzureTableJournalEscapePersistentIdSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzureTableJournalEscapePersistentIdSpec.cs @@ -2,7 +2,7 @@ using System.Reflection; using Akka.Actor; using Akka.Configuration; -using Akka.Persistence.Azure.TestHelpers; +using Akka.Persistence.Azure.Tests.Helper; using Akka.Persistence.TCK; using Akka.Persistence.TCK.Journal; using Akka.TestKit; @@ -12,21 +12,38 @@ namespace Akka.Persistence.Azure.Tests { - public class AzureTableJournalEscapePersistentIdSpec : AzureTableJournalSpec, IClassFixture + [Collection("AzureSpecs")] + public class AzureTableJournalEscapePersistentIdSpec : JournalSpec { - public AzureTableJournalEscapePersistentIdSpec(ITestOutputHelper output) : base(output) + public AzureTableJournalEscapePersistentIdSpec(ITestOutputHelper output) + : base(AzureConfig(), nameof(AzureTableJournalEscapePersistentIdSpec), output) { - } - - /// - protected override void PreparePersistenceId(string pid) - { - base.PreparePersistenceId(pid); + AzurePersistence.Get(Sys); // Before storage is initialized, let's set Pid to the value that needs to be encoded var persistenceIdUsedForTests = typeof(PluginSpec).GetField($"<{nameof(Pid)}>k__BackingField", BindingFlags.Instance | BindingFlags.NonPublic); var currentValue = persistenceIdUsedForTests.GetValue(this).ToString(); persistenceIdUsedForTests.SetValue(this, $"some/path/to/encode/{currentValue}"); + + Initialize(); + } + + [WindowsFact(SkipUnixReason = "Batch delete is not supported by Azurite in Linux")] + public override void Journal_should_not_reset_HighestSequenceNr_after_message_deletion() + { + base.Journal_should_not_reset_HighestSequenceNr_after_message_deletion(); + } + + [WindowsFact(SkipUnixReason = "Batch delete is not supported by Azurite in Linux")] + public override void Journal_should_not_replay_permanently_deleted_messages_on_range_deletion() + { + base.Journal_should_not_replay_permanently_deleted_messages_on_range_deletion(); + } + + [WindowsFact(SkipUnixReason = "Batch delete is not supported by Azurite in Linux")] + public override void Journal_should_not_reset_HighestSequenceNr_after_journal_cleanup() + { + base.Journal_should_not_reset_HighestSequenceNr_after_journal_cleanup(); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSerializationSpec.cs b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSerializationSpec.cs index e7b42cc..7aaf2d4 100644 --- a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSerializationSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSerializationSpec.cs @@ -6,22 +6,21 @@ using System; using Akka.Configuration; -using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.Azure.Tests.Helper; using Akka.Persistence.TCK.Serialization; using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests { - [Collection("AzureJournal")] + [Collection("AzureSpecs")] public class AzureTableJournalSerializationSpec : JournalSerializationSpec { public AzureTableJournalSerializationSpec(ITestOutputHelper output) - : base(Config(), nameof(AzureTableJournalSerializationSpec), output) + : base(AzureConfig(), nameof(AzureTableJournalSerializationSpec), output) { AzurePersistence.Get(Sys); - output.WriteLine("Current table: {0}", TableName); } [Fact(Skip= "https://github.com/akkadotnet/akka.net/issues/3965")] @@ -29,32 +28,5 @@ public override void Journal_should_serialize_Persistent_with_EventAdapter_manif { } - - public static string TableName { get; private set; } - - public static Config Config() - { - var azureConfig = - !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - - TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); - - return azureConfig; - } - - protected override void Dispose(bool disposing) - { - base.Dispose(disposing); - if (DbUtils.CleanupCloudTable(AzurePersistence.Get(Sys).TableSettings.ConnectionString, TableName).Wait(TimeSpan.FromSeconds(3))) - { - Log.Info("Successfully deleted table [{0}] after test run.", TableName); - } - else - { - Log.Error("Unable to delete table [{0}] after test run.", TableName); - } - } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs index 3dee98c..c28cc74 100644 --- a/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/AzureTableJournalSpec.cs @@ -5,55 +5,23 @@ // ----------------------------------------------------------------------- using Akka.Configuration; -using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.TCK.Journal; using System; +using Akka.Persistence.Azure.Tests.Helper; using Xunit; using Xunit.Abstractions; using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests { - [Collection("AzureJournal")] - public class AzureTableJournalSpec : JournalSpec + [Collection("AzureSpecs")] + public abstract class AzureTableJournalSpec : JournalSpec { - private ITestOutputHelper _output; - public AzureTableJournalSpec(ITestOutputHelper output) - : base(TestConfig(), nameof(AzureTableJournalSpec), output) + : base(AzureConfig(), nameof(AzureTableJournalSpec), output) { AzurePersistence.Get(Sys); - _output = output; Initialize(); - - output.WriteLine("Current table: {0}", TableName); - } - - public static string TableName { get; private set; } - - public static Config TestConfig() - { - var azureConfig = - !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - ? AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - : AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - - TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); - - return azureConfig; - } - - protected override void Dispose(bool disposing) - { - base.Dispose(disposing); - if (DbUtils.CleanupCloudTable(AzurePersistence.Get(Sys).TableSettings.ConnectionString, TableName).Wait(TimeSpan.FromSeconds(3))) - { - _output.WriteLine("Successfully deleted table [{0}] after test run.", TableName); - } - else - { - _output.WriteLine("Unable to delete table [{0}] after test run.", TableName); - } } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Helper/AzureStorageConfigHelper.cs b/src/Akka.Persistence.Azure.Tests/Helper/AzureStorageConfigHelper.cs index 7c9a694..dfcf595 100644 --- a/src/Akka.Persistence.Azure.Tests/Helper/AzureStorageConfigHelper.cs +++ b/src/Akka.Persistence.Azure.Tests/Helper/AzureStorageConfigHelper.cs @@ -5,6 +5,25 @@ namespace Akka.Persistence.Azure.Tests.Helper { public static class AzureStorageConfigHelper { + private const string ConnectionString = "UseDevelopmentStorage=true"; + + public static Config AzureConfig() + { + return AzureConfig(AzureConfig); + } + + public static Config AzureConfig(Func configTemplate) + { + var connString = Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"); + if (!string.IsNullOrEmpty(connString)) + { + return AzureConfig(connString); + } + + DbUtils.CleanupCloudTable(ConnectionString).Wait(); + return configTemplate(ConnectionString); + } + public static Config AzureConfig(string connectionString) { var tableName = "t" + Guid.NewGuid().ToString().Replace("-", ""); @@ -59,7 +78,8 @@ public static Config AzureConfig(string connectionString) } }") .WithFallback("akka.persistence.journal.azure-table.table-name=" + tableName) - .WithFallback("akka.persistence.snapshot-store.azure-blob-store.container-name=" + containerName); + .WithFallback("akka.persistence.snapshot-store.azure-blob-store.container-name=" + containerName) + .WithFallback(AzurePersistence.DefaultConfig); } } diff --git a/src/Akka.Persistence.Azure.Tests/Helper/DbUtils.cs b/src/Akka.Persistence.Azure.Tests/Helper/DbUtils.cs new file mode 100644 index 0000000..6e49b6f --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Helper/DbUtils.cs @@ -0,0 +1,25 @@ +using System.Threading.Tasks; +using Azure.Data.Tables; +using Azure.Storage.Blobs; + +namespace Akka.Persistence.Azure.Tests.Helper +{ + public static class DbUtils + { + public static async Task CleanupCloudTable(string connectionString) + { + var tableClient = new TableServiceClient(connectionString); + + await foreach(var table in tableClient.QueryAsync()) + { + await tableClient.DeleteTableAsync(table.Name); + } + + var blobClient = new BlobServiceClient(connectionString); + foreach (var blobContainer in blobClient.GetBlobContainers()) + { + await blobClient.DeleteBlobContainerAsync(blobContainer.Name); + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Helper/WindowsFactAttribute.cs b/src/Akka.Persistence.Azure.Tests/Helper/WindowsFactAttribute.cs new file mode 100644 index 0000000..b58ca99 --- /dev/null +++ b/src/Akka.Persistence.Azure.Tests/Helper/WindowsFactAttribute.cs @@ -0,0 +1,23 @@ +using System; +using Xunit; + +namespace Akka.Persistence.Azure.Tests.Helper +{ + public class WindowsFactAttribute: FactAttribute + { + private string _skip; + + /// + public override string Skip + { + get + { + var isUnix = Environment.OSVersion.Platform == PlatformID.Unix; + return isUnix ? SkipUnixReason ?? "Skipped under Unix platforms" : _skip; + } + set => _skip = value; + } + + public string SkipUnixReason { get; set; } + } +} \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Issue159Spec.cs b/src/Akka.Persistence.Azure.Tests/Issue159Spec.cs index 1fbe926..5d37d5c 100644 --- a/src/Akka.Persistence.Azure.Tests/Issue159Spec.cs +++ b/src/Akka.Persistence.Azure.Tests/Issue159Spec.cs @@ -1,20 +1,18 @@ -using System; -using Akka.Configuration; -using Akka.Persistence.Azure.TestHelpers; +using Akka.Configuration; +using Akka.Persistence.Azure.Tests.Helper; using Akka.Persistence.TCK.Snapshot; using Azure.Storage.Blobs; +using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests { + [Collection("AzureSpecs")] public class Issue159Spec: SnapshotStoreSpec { - private static Config Config() + private static Config Config(string connectionString) { - var connectionString = Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR"); - if(string.IsNullOrEmpty(connectionString)) - connectionString = WindowsAzureStorageEmulatorFixture.GenerateConnStr(); - return ConfigurationFactory.ParseString(@" akka { loglevel = DEBUG @@ -47,23 +45,22 @@ private static Config Config() snapshot-store { plugin = ""akka.persistence.snapshot-store.azure-blob-store"" + azure-blob-store { + class = ""Akka.Persistence.Azure.Snapshot.AzureBlobSnapshotStore, Akka.Persistence.Azure"" + connection-string = """ + connectionString + @""" + container-name = ""default"" + connect-timeout = 3s + request-timeout = 3s + verbose-logging = on + plugin-dispatcher = ""akka.actor.default-dispatcher"" + } } } -} - -akka.persistence.snapshot-store.azure-blob-store { - class = ""Akka.Persistence.Azure.Snapshot.AzureBlobSnapshotStore, Akka.Persistence.Azure"" - connection-string = """ + connectionString + @""" - container-name = ""default"" - connect-timeout = 3s - request-timeout = 3s - verbose-logging = on - plugin-dispatcher = ""akka.actor.default-dispatcher"" }"); } public Issue159Spec(ITestOutputHelper output) - : base(Config(), nameof(Issue159Spec), output) + : base(AzureConfig(Config), nameof(Issue159Spec), output) { var extension = AzurePersistence.Get(Sys); diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByPersistenceIdSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByPersistenceIdSpec.cs index 6597b5c..0a9857d 100644 --- a/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByPersistenceIdSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByPersistenceIdSpec.cs @@ -1,43 +1,44 @@ using Akka.Configuration; using Akka.Persistence.Azure.Query; -using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.Azure.Tests.Helper; using Akka.Persistence.Query; using Akka.Persistence.TCK.Query; using System; using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests.Query { - [Collection("AzureQuery")] - public sealed class AzureTableCurrentEventsByPersistenceIdSpec - : CurrentEventsByPersistenceIdSpec + [Collection("AzureSpecs")] + public sealed class AzureTableCurrentEventsByPersistenceIdSpec : CurrentEventsByPersistenceIdSpec { public AzureTableCurrentEventsByPersistenceIdSpec(ITestOutputHelper output) - : base(Config(), nameof(AzureTablePersistenceIdsSpec), output) + : base(AzureConfig(), nameof(AzureTableCurrentEventsByPersistenceIdSpec), output) { AzurePersistence.Get(Sys); ReadJournal = Sys.ReadJournalFor( AzureTableStorageReadJournal.Identifier); - - output.WriteLine("Current table: {0}", TableName); } - public static string TableName { get; private set; } - - public static Config Config() + [WindowsFact(SkipUnixReason = "Batch delete is not supported by Azurite in Linux")] + public override void ReadJournal_CurrentEventsByPersistenceId_should_return_empty_stream_for_cleaned_journal_from_0_to_MaxLong() { - var azureConfig = - !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); + base.ReadJournal_CurrentEventsByPersistenceId_should_return_empty_stream_for_cleaned_journal_from_0_to_MaxLong(); + } - TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); + [WindowsFact(SkipUnixReason = "Batch delete is not supported by Azurite in Linux")] + public override void ReadJournal_CurrentEventsByPersistenceId_should_return_remaining_values_after_partial_journal_cleanup() + { + base.ReadJournal_CurrentEventsByPersistenceId_should_return_remaining_values_after_partial_journal_cleanup(); + } - return azureConfig; + [WindowsFact(SkipUnixReason = "Batch delete is not supported by Azurite in Linux")] + public override void ReadJournal_CurrentEventsByPersistenceId_should_return_empty_stream_for_cleaned_journal_from_0_to_0() + { + base.ReadJournal_CurrentEventsByPersistenceId_should_return_empty_stream_for_cleaned_journal_from_0_to_0(); } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByTagSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByTagSpec.cs index 9cf18cf..4bab9dc 100644 --- a/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByTagSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentEventsByTagSpec.cs @@ -2,21 +2,20 @@ using Akka.Actor; using Akka.Configuration; using Akka.Persistence.Azure.Query; -using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.Azure.Tests.Helper; using Akka.Persistence.Query; using Akka.Persistence.TCK.Query; using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests.Query { - [Collection("AzureQuery")] - public sealed class AzureTableCurrentEventsByTagSpec - : CurrentEventsByTagSpec + [Collection("AzureSpecs")] + public sealed class AzureTableCurrentEventsByTagSpec : CurrentEventsByTagSpec { public AzureTableCurrentEventsByTagSpec(ITestOutputHelper output) - : base(Config(), nameof(AzureTablePersistenceIdsSpec), output) + : base(AzureConfig(), nameof(AzureTableCurrentEventsByTagSpec), output) { AzurePersistence.Get(Sys); @@ -33,17 +32,5 @@ public AzureTableCurrentEventsByTagSpec(ITestOutputHelper output) } public static string TableName { get; private set; } - - public static Config Config() - { - var azureConfig = - !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - - TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); - - return azureConfig; - } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentPersistenceIdsSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentPersistenceIdsSpec.cs index 8cb3eb3..dea844d 100644 --- a/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentPersistenceIdsSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableCurrentPersistenceIdsSpec.cs @@ -4,7 +4,6 @@ using Akka.Actor; using Akka.Configuration; using Akka.Persistence.Azure.Query; -using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.Azure.Tests.Helper; using Akka.Persistence.Query; using Akka.Persistence.TCK.Query; @@ -12,17 +11,17 @@ using Akka.Util.Internal; using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests.Query { - [Collection("AzureQuery")] - public sealed class AzureTableCurrentPersistenceIdsSpec - : CurrentPersistenceIdsSpec + [Collection("AzureSpecs")] + public sealed class AzureTableCurrentPersistenceIdsSpec : CurrentPersistenceIdsSpec { private readonly ITestOutputHelper _output; public AzureTableCurrentPersistenceIdsSpec(ITestOutputHelper output) - : base(Config(), nameof(AzureTablePersistenceIdsSpec), output) + : base(AzureConfig(), nameof(AzureTableCurrentPersistenceIdsSpec), output) { _output = output; AzurePersistence.Get(Sys); @@ -30,22 +29,6 @@ public AzureTableCurrentPersistenceIdsSpec(ITestOutputHelper output) ReadJournal = Sys.ReadJournalFor( AzureTableStorageReadJournal.Identifier); - - output.WriteLine("Current table: {0}", TableName); - } - - public static string TableName { get; private set; } - - public static Config Config() - { - var azureConfig = - !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - - TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); - - return azureConfig; } public override void ReadJournal_query_CurrentPersistenceIds_should_not_see_new_events_after_complete() diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByPersistenceIdSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByPersistenceIdSpec.cs index f860a9a..3e26f2e 100644 --- a/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByPersistenceIdSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByPersistenceIdSpec.cs @@ -1,43 +1,26 @@ using System; using Akka.Configuration; using Akka.Persistence.Azure.Query; -using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.Azure.Tests.Helper; using Akka.Persistence.Query; using Akka.Persistence.TCK.Query; using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests.Query { - [Collection("AzureQuery")] - public sealed class AzureTableEventsByPersistenceIdSpec - : EventsByPersistenceIdSpec + [Collection("AzureSpecs")] + public sealed class AzureTableEventsByPersistenceIdSpec : EventsByPersistenceIdSpec { public AzureTableEventsByPersistenceIdSpec(ITestOutputHelper output) - : base(Config(), nameof(AzureTablePersistenceIdsSpec), output) + : base(AzureConfig(), nameof(AzureTableEventsByPersistenceIdSpec), output) { AzurePersistence.Get(Sys); ReadJournal = Sys.ReadJournalFor( AzureTableStorageReadJournal.Identifier); - - output.WriteLine("Current table: {0}", TableName); - } - - public static string TableName { get; private set; } - - public static Config Config() - { - var azureConfig = - !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - - TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); - - return azureConfig; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByTagSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByTagSpec.cs index b0cfb65..5959d16 100644 --- a/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByTagSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableEventsByTagSpec.cs @@ -3,7 +3,6 @@ using Akka.Actor; using Akka.Configuration; using Akka.Persistence.Azure.Query; -using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.Azure.Tests.Helper; using Akka.Persistence.Journal; using Akka.Persistence.Query; @@ -11,15 +10,15 @@ using Akka.Streams.TestKit; using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests.Query { - [Collection("AzureQuery")] - public sealed class AzureTableEventsByTagSpec - : EventsByTagSpec + [Collection("AzureSpecs")] + public sealed class AzureTableEventsByTagSpec : EventsByTagSpec { public AzureTableEventsByTagSpec(ITestOutputHelper output) - : base(Config(), nameof(AzureTableEventsByTagSpec), output) + : base(AzureConfig(), nameof(AzureTableEventsByTagSpec), output) { AzurePersistence.Get(Sys); @@ -32,20 +31,6 @@ public AzureTableEventsByTagSpec(ITestOutputHelper output) ExpectMsg("warm-up-done", TimeSpan.FromSeconds(60)); } - public static string TableName { get; private set; } - - public static Config Config() - { - var azureConfig = - !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - - TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); - - return azureConfig; - } - [Fact] public void ReadJournal_should_delete_EventTags_index_items() { diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTablePersistenceIdsSpec.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTablePersistenceIdsSpec.cs index 62323dd..10ddd3f 100644 --- a/src/Akka.Persistence.Azure.Tests/Query/AzureTablePersistenceIdsSpec.cs +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTablePersistenceIdsSpec.cs @@ -1,21 +1,20 @@ using Akka.Configuration; using Akka.Persistence.Azure.Query; -using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.Query; using Akka.Persistence.TCK.Query; using System; +using Akka.Persistence.Azure.Tests.Helper; using Xunit; using Xunit.Abstractions; using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests.Query { - [Collection("AzureQuery")] - public sealed class AzureTablePersistenceIdsSpec - : PersistenceIdsSpec + [Collection("AzureSpecs")] + public sealed class AzureTablePersistenceIdsSpec : PersistenceIdsSpec { public AzureTablePersistenceIdsSpec(ITestOutputHelper output) - : base(Config(), nameof(AzureTablePersistenceIdsSpec), output) + : base(AzureConfig(), nameof(AzureTablePersistenceIdsSpec), output) { AzurePersistence.Get(Sys); @@ -24,21 +23,6 @@ public AzureTablePersistenceIdsSpec(ITestOutputHelper output) AzureTableStorageReadJournal.Identifier); } - public static string TableName { get; private set; } - protected override bool AllocatesAllPersistenceIDsPublisher => false; - - - public static Config Config() - { - var azureConfig = - !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - ? AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - : AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - - TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); - - return azureConfig; - } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure.Tests/Query/AzureTableQueryEdgeCaseSpecs.cs b/src/Akka.Persistence.Azure.Tests/Query/AzureTableQueryEdgeCaseSpecs.cs index 4d788f4..779a97c 100644 --- a/src/Akka.Persistence.Azure.Tests/Query/AzureTableQueryEdgeCaseSpecs.cs +++ b/src/Akka.Persistence.Azure.Tests/Query/AzureTableQueryEdgeCaseSpecs.cs @@ -7,7 +7,6 @@ using Akka.Actor; using Akka.Configuration; using Akka.Persistence.Azure.Query; -using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.Azure.Tests.Helper; using Akka.Persistence.Journal; using Akka.Persistence.Query; @@ -16,10 +15,11 @@ using FluentAssertions; using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests.Query { - [Collection("AzureQuery")] + [Collection("AzureSpecs")] public class AzureTableQueryEdgeCaseSpecs : Akka.TestKit.Xunit2.TestKit { public static readonly AtomicCounter Counter = new AtomicCounter(0); @@ -41,7 +41,7 @@ public RealMsg(string msg) public const int MessageCount = 20; public AzureTableQueryEdgeCaseSpecs(ITestOutputHelper output) - : base(Config(), nameof(AzureTableQueryEdgeCaseSpecs), output) + : base(AzureConfig(), nameof(AzureTableQueryEdgeCaseSpecs), output) { _output = output; Materializer = Sys.Materializer(); @@ -172,19 +172,5 @@ public object ToJournal(object evt) return new Tagged(evt, ImmutableHashSet.Empty.Add(DefaultTag).Add(evt.GetType().Name)); } } - - public static string TableName { get; private set; } - - public static Config Config() - { - var azureConfig = - !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - - TableName = azureConfig.GetString("akka.persistence.journal.azure-table.table-name"); - - return azureConfig; - } } } diff --git a/src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs b/src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs index c202886..1159276 100644 --- a/src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs +++ b/src/Akka.Persistence.Azure.Tests/SerializerHelperSpecs.cs @@ -3,37 +3,27 @@ using System.Text; using Akka.Actor; using Akka.Configuration; -using Akka.Persistence.Azure.TestHelpers; using Akka.Persistence.Azure.Tests.Helper; using FluentAssertions; using Xunit; using Xunit.Abstractions; +using static Akka.Persistence.Azure.Tests.Helper.AzureStorageConfigHelper; namespace Akka.Persistence.Azure.Tests { + [Collection("AzureSpecs")] public class SerializerHelperSpecs : Akka.TestKit.Xunit2.TestKit { private readonly SerializationHelper _helper; public SerializerHelperSpecs(ITestOutputHelper helper) - : base(Config(), nameof(SerializerHelperSpecs), output: helper) + : base(AzureConfig(), nameof(SerializerHelperSpecs), output: helper) { // force Akka.Persistence serializers to be loaded AzurePersistence.Get(Sys); _helper = new SerializationHelper(Sys); } - public static Config Config() - { - var azureConfig = - !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - ? AzureStorageConfigHelper.AzureConfig(Environment.GetEnvironmentVariable("AZURE_CONNECTION_STR")) - : AzureStorageConfigHelper.AzureConfig(WindowsAzureStorageEmulatorFixture.GenerateConnStr()); - - return azureConfig; - } - - [Fact] public void ShouldSerializeAndDeserializePersistentRepresentation() { diff --git a/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj b/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj index 42bf27b..8a21670 100644 --- a/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj +++ b/src/Akka.Persistence.Azure/Akka.Persistence.Azure.csproj @@ -15,10 +15,10 @@ - - - - + + + + \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/CloudTableExtensions.cs b/src/Akka.Persistence.Azure/CloudTableExtensions.cs index deae510..b4e83bb 100644 --- a/src/Akka.Persistence.Azure/CloudTableExtensions.cs +++ b/src/Akka.Persistence.Azure/CloudTableExtensions.cs @@ -1,6 +1,8 @@ using System.Collections.Generic; +using System.Collections.Immutable; using System.Threading.Tasks; -using Microsoft.Azure.Cosmos.Table; +using Azure; +using Azure.Data.Tables; namespace Akka.Persistence.Azure { @@ -8,39 +10,26 @@ public static class CloudTableExtensions { private const int MaxBatchSize = 100; - public static async Task> ExecuteBatchAsLimitedBatches( - this CloudTable table, - TableBatchOperation batch) + public static async Task> ExecuteBatchAsLimitedBatches( + this TableClient table, + List batch) { if (batch.Count < 1) - return new List(); + return ImmutableList.Empty; if (batch.Count <= MaxBatchSize) - return await table.ExecuteBatchAsync(batch); + return (await table.SubmitTransactionAsync(batch)).Value; - var result = new List(); + var result = new List(); var limitedBatchOperationLists = batch.ChunkBy(MaxBatchSize); foreach (var limitedBatchOperationList in limitedBatchOperationLists) { - var limitedBatch = CreateLimitedTableBatchOperation(limitedBatchOperationList); - var limitedBatchResult = await table.ExecuteBatchAsync(limitedBatch); - result.AddRange(limitedBatchResult); + var limitedBatchResponse = await table.SubmitTransactionAsync(limitedBatchOperationList); + result.AddRange(limitedBatchResponse.Value); } return result; } - - private static TableBatchOperation CreateLimitedTableBatchOperation( - IEnumerable limitedBatchOperationList) - { - var limitedBatch = new TableBatchOperation(); - foreach (var limitedBatchOperation in limitedBatchOperationList) - { - limitedBatch.Add(limitedBatchOperation); - } - - return limitedBatch; - } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs index 4e1cbbf..bb69159 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournal.cs @@ -18,7 +18,8 @@ using System.Threading; using System.Threading.Tasks; using Akka.Configuration; -using Microsoft.Azure.Cosmos.Table; +using Azure; +using Azure.Data.Tables; using Debug = System.Diagnostics.Debug; namespace Akka.Persistence.Azure.Journal @@ -50,8 +51,8 @@ public class AzureTableStorageJournal : AsyncWriteJournal private readonly Dictionary> _persistenceIdSubscribers = new Dictionary>(); private readonly SerializationHelper _serialization; private readonly AzureTableStorageJournalSettings _settings; - private readonly CloudStorageAccount _storageAccount; - private readonly Lazy _tableStorage; + private readonly TableServiceClient _tableServiceClient; + private TableClient _tableStorage_DoNotUseDirectly; private readonly Dictionary> _tagSubscribers = new Dictionary>(); public AzureTableStorageJournal(Config config = null) @@ -61,23 +62,24 @@ public AzureTableStorageJournal(Config config = null) AzureTableStorageJournalSettings.Create(config); _serialization = new SerializationHelper(Context.System); - _storageAccount = _settings.Development ? - CloudStorageAccount.DevelopmentStorageAccount : - CloudStorageAccount.Parse(_settings.ConnectionString); - - _tableStorage = new Lazy(() => InitCloudStorage(5).Result); + _tableServiceClient = new TableServiceClient(_settings.ConnectionString); } - public CloudTable Table => _tableStorage.Value; + public TableClient Table + { + get + { + if (_tableStorage_DoNotUseDirectly == null) + throw new Exception("Table storage has not been initialized yet. PreStart() has not been invoked"); + return _tableStorage_DoNotUseDirectly; + } + } - protected bool HasAllPersistenceIdSubscribers => - _allPersistenceIdSubscribers.Count != 0; + protected bool HasAllPersistenceIdSubscribers => _allPersistenceIdSubscribers.Count != 0; - protected bool HasPersistenceIdSubscribers => - _persistenceIdSubscribers.Count != 0; + protected bool HasPersistenceIdSubscribers => _persistenceIdSubscribers.Count != 0; - protected bool HasTagSubscribers => - _tagSubscribers.Count != 0; + protected bool HasTagSubscribers => _tagSubscribers.Count != 0; public override async Task ReadHighestSequenceNrAsync( string persistenceId, @@ -87,20 +89,10 @@ public override async Task ReadHighestSequenceNrAsync( _log.Debug("Entering method ReadHighestSequenceNrAsync"); - var sequenceNumberQuery = GenerateHighestSequenceNumberQuery(persistenceId); - TableQuerySegment result = null; - long seqNo = 0L; - - do - { - result = await Table.ExecuteQuerySegmentedAsync(sequenceNumberQuery, result?.ContinuationToken); - - if (result.Results.Count > 0) - { - seqNo = Math.Max(seqNo, result.Results.Max(x => x.HighestSequenceNr)); - } - } while (result.ContinuationToken != null); - + var seqNo = await HighestSequenceNumberQuery(persistenceId) + .Select(entity => entity.GetInt64(HighestSequenceNrEntry.HighestSequenceNrKey).Value) + .AggregateAsync(0L, Math.Max); + _log.Debug("Leaving method ReadHighestSequenceNrAsync with SeqNo [{0}] for PersistentId [{1}]", seqNo, persistenceId); return seqNo; @@ -116,32 +108,34 @@ public override async Task ReplayMessagesAsync( { NotifyNewPersistenceIdAdded(persistenceId); - _log.Debug("Entering method ReplayMessagesAsync for persistentId [{0}] from seqNo range [{1}, {2}] and taking up to max [{3}]", persistenceId, fromSequenceNr, toSequenceNr, max); + _log.Debug("Entering method ReplayMessagesAsync for persistentId [{0}] from seqNo range [{1}, {2}] and taking up to max [{3}]", + persistenceId, fromSequenceNr, toSequenceNr, max); if (max == 0) return; - var replayQuery = GeneratePersistentJournalEntryReplayQuery(persistenceId, fromSequenceNr, toSequenceNr); + var pages = PersistentJournalEntryReplayQuery(persistenceId, fromSequenceNr, toSequenceNr).AsPages().GetAsyncEnumerator(); - var nextTask = Table.ExecuteQuerySegmentedAsync(replayQuery, null); + ValueTask? nextTask = pages.MoveNextAsync(); var count = 0L; - while (nextTask != null) + while(nextTask.HasValue) { - var tableQueryResult = await nextTask; - + await nextTask.Value; + var currentPage = pages.Current; + if (_log.IsDebugEnabled && _settings.VerboseLogging) { - _log.Debug("Recovered [{0}] messages for entity [{1}]", tableQueryResult.Results.Count, persistenceId); + _log.Debug("Recovered [{0}] messages for entity [{1}]", currentPage.Values.Count, persistenceId); } - if (tableQueryResult.ContinuationToken != null) + if (currentPage.ContinuationToken != null) { if (_log.IsDebugEnabled && _settings.VerboseLogging) { _log.Debug("Have additional messages to download for entity [{0}]", persistenceId); } // start the next query while we process the results of this one - nextTask = Table.ExecuteQuerySegmentedAsync(replayQuery, tableQueryResult.ContinuationToken); + nextTask = pages.MoveNextAsync(); } else { @@ -154,13 +148,14 @@ public override async Task ReplayMessagesAsync( nextTask = null; } - foreach (var savedEvent in tableQueryResult.Results) + foreach (var entity in currentPage.Values) { // check if we've hit max recovery if (count >= max) return; ++count; + var savedEvent = new PersistentJournalEntry(entity); var deserialized = _serialization.PersistentFromBytes(savedEvent.Payload); // Write the new persistent because it sets the sender as deadLetters which is not correct @@ -187,44 +182,34 @@ public override async Task ReplayMessagesAsync( _log.Debug("Leaving method ReplayMessagesAsync"); } - protected override async Task DeleteMessagesToAsync( - string persistenceId, - long toSequenceNr) + protected override async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr) { NotifyNewPersistenceIdAdded(persistenceId); _log.Debug("Entering method DeleteMessagesToAsync for persistentId [{0}] and up to seqNo [{1}]", persistenceId, toSequenceNr); - var query = GeneratePersistentJournalEntryDeleteQuery(persistenceId, toSequenceNr); + var pages = PersistentJournalEntryDeleteQuery(persistenceId, toSequenceNr).AsPages().GetAsyncEnumerator(); - var nextSegment = Table.ExecuteQuerySegmentedAsync(query, null); - - while (nextSegment != null) + ValueTask? nextTask = pages.MoveNextAsync(); + while (nextTask.HasValue) { - var queryResults = await nextSegment; + await nextTask.Value; + var currentPage = pages.Current; if (_log.IsDebugEnabled && _settings.VerboseLogging) { - _log.Debug("Have [{0}] messages to delete for entity [{1}]", queryResults.Results.Count, persistenceId); + _log.Debug("Have [{0}] messages to delete for entity [{1}]", currentPage.Values.Count, persistenceId); } - nextSegment = - queryResults.ContinuationToken != null - ? Table.ExecuteQuerySegmentedAsync(query, queryResults.ContinuationToken) - : null; + if (currentPage.ContinuationToken != null) + nextTask = pages.MoveNextAsync(); + else + nextTask = null; - if (queryResults.Results.Count > 0) + if (currentPage.Values.Count > 0) { - var tableBatchOperation = new TableBatchOperation(); - - foreach (var toBeDeleted in queryResults.Results) - { - tableBatchOperation.Delete(toBeDeleted); - } - - var deleteTask = Table.ExecuteBatchAsLimitedBatches(tableBatchOperation); - - await deleteTask; + await Table.SubmitTransactionAsync(currentPage.Values + .Select(entity => new TableTransactionAction(TableTransactionActionType.Delete, entity))); } } @@ -235,8 +220,8 @@ protected override void PreStart() { _log.Debug("Initializing Azure Table Storage..."); - // forces loading of the value - var name = Table.Name; + InitCloudStorage(5) + .ConfigureAwait(false).GetAwaiter().GetResult(); _log.Debug("Successfully started Azure Table Storage!"); @@ -244,8 +229,7 @@ protected override void PreStart() base.PreStart(); } - protected override bool ReceivePluginInternal( - object message) + protected override bool ReceivePluginInternal(object message) { switch (message) { @@ -275,16 +259,13 @@ protected override bool ReceivePluginInternal( return true; } - protected override async Task> WriteMessagesAsync( - IEnumerable atomicWrites) + protected override async Task> WriteMessagesAsync(IEnumerable atomicWrites) { try { - var taggedEntries = ImmutableDictionary>.Empty; - - var exceptions = ImmutableList.Empty; - - var highSequenceNumbers = ImmutableDictionary.Empty; + var taggedEntries = new Dictionary>(); + var exceptions = new List(); + var highSequenceNumbers = new Dictionary(); using (var currentWrites = atomicWrites.GetEnumerator()) { @@ -292,123 +273,103 @@ protected override async Task> WriteMessagesAsync( { Debug.Assert(currentWrites.Current != null, "atomicWrites.Current != null"); - var list = currentWrites.Current.Payload.AsInstanceOf>(); - - var batchItems = ImmutableList.Empty; - + var list = (IImmutableList) currentWrites.Current.Payload; + var batchItems = new List(); foreach (var t in list) { var item = t; - Debug.Assert(item != null, nameof(item) + " != null"); - byte[] payloadBytes = null; - string[] tags = {}; - // If the payload is a tagged payload, reset to a non-tagged payload if (item.Payload is Tagged tagged) { item = item.WithPayload(tagged.Payload); - - payloadBytes = _serialization.PersistentToBytes(item); - if (tagged.Tags.Count > 0) - { tags = tagged.Tags.ToArray(); - - } } - if (payloadBytes == null) - { - payloadBytes = _serialization.PersistentToBytes(item); - } - - var newItem = - new PersistentJournalEntry( - item.PersistenceId, - item.SequenceNr, - payloadBytes, - item.Manifest, - tags); + var payloadBytes = _serialization.PersistentToBytes(item); + + var newItem = new PersistentJournalEntry( + persistentId: item.PersistenceId, + seqNo: item.SequenceNr, + payload: payloadBytes, + manifest: item.Manifest, + tags: tags); - batchItems = batchItems.Add(newItem); + batchItems.Add(new TableTransactionAction(TableTransactionActionType.Add, newItem.WriteEntity())); foreach (var tag in tags) { if (!taggedEntries.ContainsKey(tag)) - { - taggedEntries = taggedEntries.SetItem(tag, new List()); - } - - taggedEntries[tag].Add( - new EventTagEntry( - newItem.PartitionKey, - tag, - newItem.SeqNo, - newItem.Payload, - newItem.Manifest, - newItem.UtcTicks)); + taggedEntries[tag] = new List(); + + taggedEntries[tag].Add(new EventTagEntry( + persistenceId: newItem.PartitionKey, + tag: tag, + seqNo: newItem.SeqNo, + payload: newItem.Payload, + manifest: newItem.Manifest, + utcTicks: newItem.UtcTicks)); } - highSequenceNumbers = - highSequenceNumbers.SetItem( - item.PersistenceId, - item.SequenceNr); + highSequenceNumbers[item.PersistenceId] = item.SequenceNr; } try { - var persistenceBatch = new TableBatchOperation(); - - highSequenceNumbers.ForEach( - x => batchItems = batchItems.Add( - new HighestSequenceNrEntry(x.Key, x.Value))); + foreach (var entry in highSequenceNumbers) + { + batchItems.Add(new TableTransactionAction( + TableTransactionActionType.UpsertReplace, + new HighestSequenceNrEntry(entry.Key, entry.Value).WriteEntity())); + } // Encode partition keys for writing - foreach (var tableEntity in batchItems) + foreach (var action in batchItems) { - tableEntity.PartitionKey = PartitionKeyEscapeHelper.Escape(tableEntity.PartitionKey); + action.Entity.PartitionKey = PartitionKeyEscapeHelper.Escape(action.Entity.PartitionKey); } - - batchItems.ForEach(x => persistenceBatch.InsertOrReplace(x)); if (_log.IsDebugEnabled && _settings.VerboseLogging) - _log.Debug("Attempting to write batch of {0} messages to Azure storage", persistenceBatch.Count); - - var persistenceResults = await Table.ExecuteBatchAsLimitedBatches(persistenceBatch); + _log.Debug("Attempting to write batch of {0} messages to Azure storage", batchItems.Count); + var response = await Table.SubmitTransactionAsync(batchItems); if (_log.IsDebugEnabled && _settings.VerboseLogging) - foreach (var r in persistenceResults) - _log.Debug("Azure table storage wrote entity [{0}] with status code [{1}]", r.Etag, r.HttpStatusCode); + { + foreach (var r in response.Value) + { + _log.Debug("Azure table storage wrote entities with status code [{0}]", r.Status); + } + } - exceptions = exceptions.Add(null); + exceptions.Add(null); } catch (Exception ex) { _log.Warning(ex, "Failure while writing messages to Azure table storage"); - - exceptions = exceptions.Add(ex); + exceptions.Add(ex); } } } if (exceptions.All(ex => ex == null)) { - var allPersistenceIdsBatch = new TableBatchOperation(); + var allPersistenceIdsBatch = new List(); - highSequenceNumbers.ForEach(x => + foreach (var item in highSequenceNumbers) { - var encodedKey = PartitionKeyEscapeHelper.Escape(x.Key); - allPersistenceIdsBatch.InsertOrReplace(new AllPersistenceIdsEntry(encodedKey)); - }); + allPersistenceIdsBatch.Add(new TableTransactionAction( + TableTransactionActionType.UpdateReplace, + new AllPersistenceIdsEntry(PartitionKeyEscapeHelper.Escape(item.Key)).WriteEntity())); + } - var allPersistenceResults = await Table.ExecuteBatchAsLimitedBatches(allPersistenceIdsBatch); + var allPersistenceResponse = await Table.SubmitTransactionAsync(allPersistenceIdsBatch); if (_log.IsDebugEnabled && _settings.VerboseLogging) - foreach (var r in allPersistenceResults) - _log.Debug("Azure table storage wrote entity [{0}] with status code [{1}]", r.Etag, r.HttpStatusCode); + foreach (var r in allPersistenceResponse.Value) + _log.Debug("Azure table storage wrote entity with status code [{1}]", r.Status); if (HasPersistenceIdSubscribers || HasAllPersistenceIdSubscribers) { @@ -417,23 +378,20 @@ protected override async Task> WriteMessagesAsync( if (taggedEntries.Count > 0) { - var eventTagsBatch = new TableBatchOperation(); - foreach (var kvp in taggedEntries) { - eventTagsBatch.Clear(); + var eventTagsBatch = new List(); foreach (var item in kvp.Value) { - item.PartitionKey = PartitionKeyEscapeHelper.Escape(item.PartitionKey); - eventTagsBatch.InsertOrReplace(item); + eventTagsBatch.Add(new TableTransactionAction(TableTransactionActionType.UpsertReplace, item.WriteEntity())); } - var eventTagsResults = await Table.ExecuteBatchAsLimitedBatches(eventTagsBatch); + var eventTagsResponse = await Table.SubmitTransactionAsync(eventTagsBatch); if (_log.IsDebugEnabled && _settings.VerboseLogging) - foreach (var r in eventTagsResults) - _log.Debug("Azure table storage wrote entity [{0}] with status code [{1}]", r.Etag, r.HttpStatusCode); + foreach (var r in eventTagsResponse.Value) + _log.Debug("Azure table storage wrote entity with status code [{1}]", r.Status); if (HasTagSubscribers && taggedEntries.Count != 0) { @@ -453,7 +411,7 @@ protected override async Task> WriteMessagesAsync( * * Either everything fails or everything succeeds is the idea I guess. */ - return exceptions.Any(ex => ex != null) ? exceptions : null; + return exceptions.Any(ex => ex != null) ? exceptions.ToImmutableList() : null; } catch (Exception ex) { @@ -462,217 +420,98 @@ protected override async Task> WriteMessagesAsync( } } - private static TableQuery GenerateAllPersistenceIdsQuery() + private AsyncPageable GenerateAllPersistenceIdsQuery( + int? maxPerPage = null, + CancellationToken cancellationToken = default) { - var filter = - TableQuery.GenerateFilterCondition( - "PartitionKey", - QueryComparisons.Equal, - AllPersistenceIdsEntry.PartitionKeyValue); - - var returnValue = new TableQuery().Where(filter).Select(new[] { "RowKey" }); - - return returnValue; + return Table.QueryAsync( + filter: $"PartitionKey eq '{AllPersistenceIdsEntry.PartitionKeyValue}'", + maxPerPage: maxPerPage, + @select: new[] { "RowKey" }, + cancellationToken: cancellationToken + ); } - private static TableQuery GenerateHighestSequenceNumberQuery( - string persistenceId) + private AsyncPageable HighestSequenceNumberQuery( + string persistenceId, + int? maxPerPage = null, + CancellationToken cancellationToken = default) { - var filter = - TableQuery.CombineFilters( - TableQuery.GenerateFilterCondition( - "PartitionKey", - QueryComparisons.Equal, - PartitionKeyEscapeHelper.Escape(persistenceId)), - TableOperators.And, - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.Equal, - HighestSequenceNrEntry.RowKeyValue)); - - var returnValue = new TableQuery().Where(filter); - - return returnValue; + return Table.QueryAsync( + filter: $"PartitionKey eq '{PartitionKeyEscapeHelper.Escape(persistenceId)}' and " + + $"RowKey eq '{HighestSequenceNrEntry.RowKeyValue}'", + maxPerPage: maxPerPage, + @select: null, + cancellationToken: cancellationToken + ); } - //private static TableQuery GeneratePersistentJournalEntryDeleteQuery( - private static TableQuery GeneratePersistentJournalEntryDeleteQuery( + private AsyncPageable PersistentJournalEntryDeleteQuery( string persistenceId, - long toSequenceNr) + long toSequenceNr, + int? maxPerPage = null, + CancellationToken cancellationToken = default) { - var persistenceIdFilter = - TableQuery.GenerateFilterCondition( - "PartitionKey", - QueryComparisons.Equal, - PartitionKeyEscapeHelper.Escape(persistenceId)); - - var highestSequenceNrFilter = - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.NotEqual, - HighestSequenceNrEntry.RowKeyValue); - - var rowKeyLessThanFilter = - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.LessThanOrEqual, - toSequenceNr.ToJournalRowKey()); - - var rowKeyFilter = - TableQuery.CombineFilters( - highestSequenceNrFilter, - TableOperators.And, - rowKeyLessThanFilter); - - var filter = - TableQuery.CombineFilters( - persistenceIdFilter, - TableOperators.And, - rowKeyFilter); - - var returnValue = new TableQuery().Where(filter); - - return returnValue; + return Table.QueryAsync( + filter: $"PartitionKey eq '{PartitionKeyEscapeHelper.Escape(persistenceId)}' and " + + $"RowKey ne '{HighestSequenceNrEntry.RowKeyValue}' and " + + $"RowKey le '{toSequenceNr.ToJournalRowKey()}'", + maxPerPage: maxPerPage, + @select: null, + cancellationToken: cancellationToken); } - private static TableQuery GenerateEventTagEntryDeleteQuery( + private AsyncPageable EventTagEntryDeleteQuery( string persistenceId, long fromSequenceNr, - long toSequenceNr) + long toSequenceNr, + int? maxPerPage = null, + CancellationToken cancellationToken = default) { - var persistenceIdFilter = - TableQuery.GenerateFilterCondition( - "PartitionKey", - QueryComparisons.Equal, - EventTagEntry.PartitionKeyValue); - - var idxPartitionKeyFilter = - TableQuery.GenerateFilterCondition( - EventTagEntry.IdxPartitionKeyKeyName, - QueryComparisons.Equal, - persistenceId); - - var idxRowKeyGreaterThanFilter = - TableQuery.GenerateFilterCondition( - EventTagEntry.IdxRowKeyKeyName, - QueryComparisons.GreaterThanOrEqual, - fromSequenceNr.ToJournalRowKey()); - - var idxRowKeyLessThanFilter = - TableQuery.GenerateFilterCondition( - EventTagEntry.IdxRowKeyKeyName, - QueryComparisons.LessThanOrEqual, - toSequenceNr.ToJournalRowKey()); - - var partitionKeyFilter = - TableQuery.CombineFilters( - persistenceIdFilter, - TableOperators.And, - idxPartitionKeyFilter); - - var idxRowKeyFilter = - TableQuery.CombineFilters( - idxRowKeyGreaterThanFilter, - TableOperators.And, - idxRowKeyLessThanFilter); - - var filter = - TableQuery.CombineFilters( - partitionKeyFilter, - TableOperators.And, - idxRowKeyFilter); - - var returnValue = new TableQuery().Where(filter); - - return returnValue; + return Table.QueryAsync( + filter: $"PartitionKey eq '{EventTagEntry.PartitionKeyValue}' and " + + $"{EventTagEntry.IdxPartitionKeyKeyName} eq '{persistenceId}' and " + + $"{EventTagEntry.IdxRowKeyKeyName} ge '{fromSequenceNr.ToJournalRowKey()}' and " + + $"{EventTagEntry.IdxRowKeyKeyName} le '{toSequenceNr.ToJournalRowKey()}'", + maxPerPage: maxPerPage, + @select: null, + cancellationToken: cancellationToken); } - private static TableQuery GeneratePersistentJournalEntryReplayQuery( + private AsyncPageable PersistentJournalEntryReplayQuery( string persistentId, long fromSequenceNumber, - long toSequenceNumber) + long toSequenceNumber, + int? maxPerPage = null, + CancellationToken cancellationToken = default) { - var persistenceIdFilter = - TableQuery.GenerateFilterCondition( - "PartitionKey", - QueryComparisons.Equal, - PartitionKeyEscapeHelper.Escape(persistentId)); - - var highestSequenceNrFilter = - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.NotEqual, - HighestSequenceNrEntry.RowKeyValue); - - var filter = - TableQuery.CombineFilters( - persistenceIdFilter, - TableOperators.And, - highestSequenceNrFilter); - + var filter = $"PartitionKey eq '{PartitionKeyEscapeHelper.Escape(persistentId)}' and " + + $"RowKey ne '{HighestSequenceNrEntry.RowKeyValue}'"; + if (fromSequenceNumber > 0) - { - filter = - TableQuery.CombineFilters( - filter, - TableOperators.And, - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.GreaterThanOrEqual, - fromSequenceNumber.ToJournalRowKey())); - } - - if (toSequenceNumber != long.MaxValue) - { - filter = - TableQuery.CombineFilters( - filter, - TableOperators.And, - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.LessThanOrEqual, - toSequenceNumber.ToJournalRowKey())); - } + filter += $" and RowKey ge '{fromSequenceNumber.ToJournalRowKey()}'"; - var returnValue = new TableQuery().Where(filter); + if (toSequenceNumber < long.MaxValue) + filter += $" and RowKey le '{toSequenceNumber.ToJournalRowKey()}'"; - return returnValue; + return Table.QueryAsync(filter, maxPerPage, null, cancellationToken); } - private static TableQuery GenerateTaggedMessageQuery( - ReplayTaggedMessages replay) + private AsyncPageable TaggedMessageQuery( + ReplayTaggedMessages replay, + int? maxPerPage = null, + CancellationToken cancellationToken = default) { - var partitionKeyFilter = - TableQuery.GenerateFilterCondition( - "PartitionKey", - QueryComparisons.Equal, - PartitionKeyEscapeHelper.Escape(EventTagEntry.GetPartitionKey(replay.Tag))); - - var utcTicksTRowKeyFilter = - TableQuery.CombineFilters( - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.GreaterThan, - $"{replay.FromOffset.ToJournalRowKey()}{EventTagEntry.AsciiIncrementedDelimiter}"), - TableOperators.And, - TableQuery.GenerateFilterCondition( - "RowKey", - QueryComparisons.LessThanOrEqual, - $"{replay.ToOffset.ToJournalRowKey()}{EventTagEntry.Delimiter}")); - - var filter = - TableQuery.CombineFilters( - partitionKeyFilter, - TableOperators.And, - utcTicksTRowKeyFilter); - - var returnValue = new TableQuery().Where(filter); - - return returnValue; + return Table.QueryAsync( + filter: $"PartitionKey eq '{PartitionKeyEscapeHelper.Escape(EventTagEntry.GetPartitionKey(replay.Tag))}' and " + + $"RowKey gt '{replay.FromOffset.ToJournalRowKey()}{EventTagEntry.AsciiIncrementedDelimiter}' and " + + $"RowKey le '{replay.ToOffset.ToJournalRowKey()}{EventTagEntry.Delimiter}'", + maxPerPage: maxPerPage, + @select: null, + cancellationToken: cancellationToken); } - private async Task AddAllPersistenceIdSubscriber( - IActorRef subscriber) + private async Task AddAllPersistenceIdSubscriber(IActorRef subscriber) { lock (_allPersistenceIdSubscribers) { @@ -681,9 +520,7 @@ private async Task AddAllPersistenceIdSubscriber( subscriber.Tell(new CurrentPersistenceIds(await GetAllPersistenceIds())); } - private void AddPersistenceIdSubscriber( - IActorRef subscriber, - string persistenceId) + private void AddPersistenceIdSubscriber(IActorRef subscriber, string persistenceId) { if (!_persistenceIdSubscribers.TryGetValue(persistenceId, out var subscriptions)) { @@ -694,9 +531,7 @@ private void AddPersistenceIdSubscriber( subscriptions.Add(subscriber); } - private void AddTagSubscriber( - IActorRef subscriber, - string tag) + private void AddTagSubscriber(IActorRef subscriber, string tag) { if (!_tagSubscribers.TryGetValue(tag, out var subscriptions)) { @@ -709,39 +544,20 @@ private void AddTagSubscriber( private async Task> GetAllPersistenceIds() { - var query = GenerateAllPersistenceIdsQuery(); - - TableQuerySegment result = null; - - var returnValue = ImmutableList.Empty; - - do - { - result = await Table.ExecuteQuerySegmentedAsync(query, result?.ContinuationToken); - - if (result.Results.Count > 0) - { - returnValue = returnValue.AddRange(result.Results.Select(x => x.RowKey)); - } - } while (result.ContinuationToken != null); - - return returnValue; + return await GenerateAllPersistenceIdsQuery().Select(item => item.RowKey).ToListAsync(); } - private async Task InitCloudStorage( - int remainingTries) + private async Task InitCloudStorage(int remainingTries) { try { - var tableClient = _storageAccount.CreateCloudTableClient(); - var tableRef = tableClient.GetTableReference(_settings.TableName); - var op = new OperationContext(); + var tableClient = _tableServiceClient.GetTableClient(_settings.TableName); using (var cts = new CancellationTokenSource(_settings.ConnectTimeout)) { if (!_settings.AutoInitialize) { - var exists = await tableRef.ExistsAsync(null, null, cts.Token); + var exists = await IsTableExist(_settings.TableName, cts.Token); if (!exists) { @@ -752,17 +568,17 @@ private async Task InitCloudStorage( } _log.Info("Successfully connected to existing table", _settings.TableName); - - return tableRef; + + _tableStorage_DoNotUseDirectly = tableClient; + return; } - if (await tableRef.CreateIfNotExistsAsync(new TableRequestOptions(), op, cts.Token)) + if (await tableClient.CreateIfNotExistsAsync(cts.Token) != null) _log.Info("Created Azure Cloud Table", _settings.TableName); else _log.Info("Successfully connected to existing table", _settings.TableName); + _tableStorage_DoNotUseDirectly = tableClient; } - - return tableRef; } catch (Exception ex) { @@ -770,10 +586,18 @@ private async Task InitCloudStorage( if (remainingTries == 0) throw; await Task.Delay(RetryInterval[remainingTries]); - return await InitCloudStorage(remainingTries - 1); + await InitCloudStorage(remainingTries - 1); } } + private async Task IsTableExist(string name, CancellationToken token) + { + var tables = await _tableServiceClient.QueryAsync(t => t.Name == name, cancellationToken: token) + .ToListAsync(token) + .ConfigureAwait(false); + return tables.Count > 0; + } + private void NotifyNewPersistenceIdAdded( string persistenceId) { @@ -816,31 +640,27 @@ private void RemoveSubscriber( /// /// TBD /// TBD - private async Task ReplayTaggedMessagesAsync( - ReplayTaggedMessages replay) + private async Task ReplayTaggedMessagesAsync(ReplayTaggedMessages replay) { - var query = GenerateTaggedMessageQuery(replay); - - // While we can specify the TakeCount, the CloudTable client does - // not really respect this fact and will keep pulling results. - query.TakeCount = - replay.Max > int.MaxValue - ? int.MaxValue - : (int)replay.Max; - // In order to actually break at the limit we ask for we have to // keep a separate counter and track it ourselves. var counter = 0; - - TableQuerySegment result = null; - var maxOrderingId = 0L; - do + var pages = TaggedMessageQuery(replay).AsPages().GetAsyncEnumerator(); + ValueTask? nextTask = pages.MoveNextAsync(); + + while (nextTask != null) { - result = await Table.ExecuteQuerySegmentedAsync(query, result?.ContinuationToken); + await nextTask.Value; + var currentPage = pages.Current; - foreach (var entry in result.Results.OrderBy(x => x.UtcTicks)) + if (currentPage.ContinuationToken != null) + nextTask = pages.MoveNextAsync(); + else + nextTask = null; + + foreach (var entry in currentPage.Values.Select(entity => new EventTagEntry(entity)).OrderBy(x => x.UtcTicks)) { var deserialized = _serialization.PersistentFromBytes(entry.Payload); @@ -872,13 +692,12 @@ private async Task ReplayTaggedMessagesAsync( { break; } - } while (result.ContinuationToken != null); + } return maxOrderingId; } - private bool TryAddPersistenceId( - string persistenceId) + private bool TryAddPersistenceId(string persistenceId) { lock (_allPersistenceIds) { diff --git a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs index 41c8a6b..8ac6f0b 100644 --- a/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs +++ b/src/Akka.Persistence.Azure/Journal/AzureTableStorageJournalSettings.cs @@ -7,7 +7,7 @@ using System; using System.Linq; using Akka.Configuration; -using Microsoft.Azure.Cosmos.Table; +using Akka.Persistence.Azure.Util; namespace Akka.Persistence.Azure.Journal { diff --git a/src/Akka.Persistence.Azure/Journal/HighestSequenceNrEntry.cs b/src/Akka.Persistence.Azure/Journal/HighestSequenceNrEntry.cs deleted file mode 100644 index b8f6816..0000000 --- a/src/Akka.Persistence.Azure/Journal/HighestSequenceNrEntry.cs +++ /dev/null @@ -1,71 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using Microsoft.Azure.Cosmos.Table; - -namespace Akka.Persistence.Azure.Journal -{ - internal sealed class HighestSequenceNrEntry - : ITableEntity - { - public const string HighestSequenceNrKey = "highestSequenceNr"; - private const string ManifestKeyName = "manifest"; - public const string RowKeyValue = "highestSequenceNr"; - - // In order to use this in a TableQuery a parameterless constructor is required - public HighestSequenceNrEntry() - { - } - - public HighestSequenceNrEntry( - string persistenceId, - long highestSequenceNr, - string manifest = "") - { - PartitionKey = persistenceId; - - RowKey = RowKeyValue; - - HighestSequenceNr = highestSequenceNr; - - Manifest = manifest; - } - - public string ETag { get; set; } - - public long HighestSequenceNr { get; set; } - - public string Manifest { get; set; } - - public string PartitionKey { get; set; } - - public string RowKey { get; set; } - - public DateTimeOffset Timestamp { get; set; } - - public void ReadEntity( - IDictionary properties, - OperationContext operationContext) - { - Manifest = - properties.ContainsKey(ManifestKeyName) - ? properties[ManifestKeyName].StringValue - : string.Empty; - - HighestSequenceNr = properties[HighestSequenceNrKey].Int64Value.Value; - } - - public IDictionary WriteEntity( - OperationContext operationContext) - { - var dict = - new Dictionary - { - [HighestSequenceNrKey] = EntityProperty.GeneratePropertyForLong(HighestSequenceNr), - [ManifestKeyName] = EntityProperty.GeneratePropertyForString(Manifest), - }; - - return dict; - } - } -} diff --git a/src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs b/src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs index 5c329ba..5bb2dfd 100644 --- a/src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs +++ b/src/Akka.Persistence.Azure/TableEntities/AllPersistenceIdsEntry.cs @@ -1,18 +1,25 @@ using System; -using System.Collections.Generic; -using Microsoft.Azure.Cosmos.Table; +using Azure; +using Azure.Data.Tables; namespace Akka.Persistence.Azure.TableEntities { - internal sealed class AllPersistenceIdsEntry - : ITableEntity + internal sealed class AllPersistenceIdsEntry { private const string ManifestKeyName = "manifest"; public const string PartitionKeyValue = "allPersistenceIdsIdx"; // In order to use this in a TableQuery a parameterless constructor is required - public AllPersistenceIdsEntry() + public AllPersistenceIdsEntry(TableEntity entity) { + PartitionKey = entity.PartitionKey; + ETag = entity.ETag; + RowKey = entity.RowKey; + Timestamp = entity.Timestamp; + + Manifest = entity.ContainsKey(ManifestKeyName) + ? entity.GetString(ManifestKeyName) + : string.Empty; } public AllPersistenceIdsEntry( @@ -20,42 +27,32 @@ public AllPersistenceIdsEntry( string manifest = "") { PartitionKey = PartitionKeyValue; - RowKey = persistenceId; - Manifest = manifest; } - public string ETag { get; set; } - - public string Manifest { get; set; } - - public string PartitionKey { get; set; } + public string PartitionKey { get; } + public ETag ETag { get; } + public string RowKey { get; } + public DateTimeOffset? Timestamp { get; } - public string RowKey { get; set; } + public string Manifest { get; } - public DateTimeOffset Timestamp { get; set; } - public void ReadEntity( - IDictionary properties, - OperationContext operationContext) + public TableEntity WriteEntity() { - Manifest = - properties.ContainsKey(ManifestKeyName) - ? properties[ManifestKeyName].StringValue - : string.Empty; - } - - public IDictionary WriteEntity( - OperationContext operationContext) - { - var dict = - new Dictionary - { - [ManifestKeyName] = EntityProperty.GeneratePropertyForString(Manifest), - }; - - return dict; + var entity = new TableEntity + { + PartitionKey = PartitionKey, + ETag = ETag, + RowKey = RowKey, + Timestamp = Timestamp, + }; + + if (!string.IsNullOrWhiteSpace(Manifest)) + entity[ManifestKeyName] = Manifest; + + return entity; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs b/src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs index 831ebbb..9bb5df1 100644 --- a/src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs +++ b/src/Akka.Persistence.Azure/TableEntities/EventTagEntry.cs @@ -1,13 +1,12 @@ using System; -using System.Collections.Generic; using System.Linq; using Akka.Persistence.Azure.Util; -using Microsoft.Azure.Cosmos.Table; +using Azure; +using Azure.Data.Tables; namespace Akka.Persistence.Azure.TableEntities { internal sealed class EventTagEntry - : ITableEntity { public const char Delimiter = ':'; public const char AsciiIncrementedDelimiter = ';'; //(char)((byte)Delimiter + 1) @@ -20,8 +19,22 @@ internal sealed class EventTagEntry public const string IdxTagKeyName = "idxTag"; // In order to use this in a TableQuery a parameterless constructor is required - public EventTagEntry() + public EventTagEntry(TableEntity entity) { + PartitionKey = entity.PartitionKey; + ETag = entity.ETag; + RowKey = entity.RowKey; + Timestamp = entity.Timestamp; + + IdxPartitionKey = entity.GetString(IdxPartitionKeyKeyName); + IdxRowKey = entity.GetString(IdxRowKeyKeyName); + IdxTag = entity.GetString(IdxTagKeyName); + UtcTicks = entity.GetInt64(UtcTicksKeyName).Value; + Payload = entity.GetBinary(PayloadKeyName); + + Manifest = entity.ContainsKey(ManifestKeyName) + ? entity.GetString(ManifestKeyName) + : string.Empty; } public EventTagEntry( @@ -33,89 +46,62 @@ public EventTagEntry( long? utcTicks = null) { if (persistenceId.Any(x => x == Delimiter)) - { throw new ArgumentException($"Must not contain {Delimiter}.", nameof(persistenceId)); - } if (tag.Any(x => x == Delimiter)) - { throw new ArgumentException($"Must not contain {Delimiter}.", nameof(tag)); - } PartitionKey = GetPartitionKey(tag); - Manifest = manifest; - IdxPartitionKey = persistenceId; - IdxRowKey = seqNo.ToJournalRowKey(); - IdxTag = tag; - UtcTicks = utcTicks ?? DateTime.UtcNow.Ticks; - RowKey = GetRowKey(UtcTicks, IdxPartitionKey, IdxRowKey); - Payload = payload; } - public string ETag { get; set; } - - public string IdxPartitionKey { get; set; } - - public string IdxRowKey { get; set; } + public string PartitionKey { get; } + public ETag ETag { get; } + public string RowKey { get; } + public DateTimeOffset? Timestamp { get; } - public string IdxTag { get; set; } + public string IdxPartitionKey { get; } - public string Manifest { get; set; } + public string IdxRowKey { get; } - public string PartitionKey { get; set; } + public string IdxTag { get; } - public byte[] Payload { get; set; } + public string Manifest { get; } - public string RowKey { get; set; } + public byte[] Payload { get; } - public DateTimeOffset Timestamp { get; set; } + public long UtcTicks { get; } - public long UtcTicks { get; set; } - - public void ReadEntity(IDictionary properties, OperationContext operationContext) - { - Manifest = - properties.ContainsKey(ManifestKeyName) - ? properties[ManifestKeyName].StringValue - : string.Empty; - - IdxPartitionKey = properties[IdxPartitionKeyKeyName].StringValue; - - IdxRowKey = properties[IdxRowKeyKeyName].StringValue; - - IdxTag = properties[IdxTagKeyName].StringValue; - - UtcTicks = properties[UtcTicksKeyName].Int64Value.Value; - - Payload = properties[PayloadKeyName].BinaryValue; - } - - public IDictionary WriteEntity(OperationContext operationContext) + public TableEntity WriteEntity() { - var dict = - new Dictionary - { - [ManifestKeyName] = EntityProperty.GeneratePropertyForString(Manifest), - [PayloadKeyName] = EntityProperty.GeneratePropertyForByteArray(Payload), - [UtcTicksKeyName] = EntityProperty.GeneratePropertyForLong(UtcTicks), - [IdxPartitionKeyKeyName] = EntityProperty.GeneratePropertyForString(IdxPartitionKey), - [IdxRowKeyKeyName] = EntityProperty.GeneratePropertyForString(IdxRowKey), - [IdxTagKeyName] = EntityProperty.GeneratePropertyForString(IdxTag) - }; - - return dict; + var entity = new TableEntity + { + PartitionKey = PartitionKey, + ETag = ETag, + RowKey = RowKey, + Timestamp = Timestamp, + [PayloadKeyName] = Payload, + [UtcTicksKeyName] = UtcTicks, + [IdxPartitionKeyKeyName] = IdxPartitionKey, + [IdxRowKeyKeyName] = IdxRowKey, + [IdxTagKeyName] = IdxTag + }; + + if (!string.IsNullOrWhiteSpace(Manifest)) + entity[ManifestKeyName] = Manifest; + + return entity; } public static string GetPartitionKey(string tag) { - return $"{PartitionKeyValue}-{tag}"; + return PartitionKeyEscapeHelper.Escape($"{PartitionKeyValue}-{tag}"); } public static string GetRowKey( diff --git a/src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs b/src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs index 5c12e33..e4c34f0 100644 --- a/src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs +++ b/src/Akka.Persistence.Azure/TableEntities/HighestSequenceNrEntry.cs @@ -1,19 +1,28 @@ using System; -using System.Collections.Generic; -using Microsoft.Azure.Cosmos.Table; +using Azure; +using Azure.Data.Tables; namespace Akka.Persistence.Azure.TableEntities { - internal sealed class HighestSequenceNrEntry - : ITableEntity + internal sealed class HighestSequenceNrEntry { public const string HighestSequenceNrKey = "highestSequenceNr"; private const string ManifestKeyName = "manifest"; public const string RowKeyValue = "highestSequenceNr"; - // In order to use this in a TableQuery a parameterless constructor is required - public HighestSequenceNrEntry() + public HighestSequenceNrEntry(TableEntity entity) { + PartitionKey = entity.PartitionKey; + ETag = entity.ETag; + RowKey = entity.RowKey; + Timestamp = entity.Timestamp; + + HighestSequenceNr = entity.GetInt64(HighestSequenceNrKey).Value; + + Manifest = + entity.ContainsKey(ManifestKeyName) + ? entity.GetString(ManifestKeyName) + : string.Empty; } public HighestSequenceNrEntry( @@ -22,49 +31,37 @@ public HighestSequenceNrEntry( string manifest = "") { PartitionKey = persistenceId; - RowKey = RowKeyValue; - HighestSequenceNr = highestSequenceNr; - Manifest = manifest; } - public string ETag { get; set; } - - public long HighestSequenceNr { get; set; } - - public string Manifest { get; set; } - - public string PartitionKey { get; set; } + public string PartitionKey { get; } + + public ETag ETag { get; } + public string RowKey { get; } + + public DateTimeOffset? Timestamp { get; } - public string RowKey { get; set; } + public long HighestSequenceNr { get; } - public DateTimeOffset Timestamp { get; set; } + public string Manifest { get; } - public void ReadEntity( - IDictionary properties, - OperationContext operationContext) + public TableEntity WriteEntity() { - Manifest = - properties.ContainsKey(ManifestKeyName) - ? properties[ManifestKeyName].StringValue - : string.Empty; - - HighestSequenceNr = properties[HighestSequenceNrKey].Int64Value.Value; - } - - public IDictionary WriteEntity( - OperationContext operationContext) - { - var dict = - new Dictionary - { - [HighestSequenceNrKey] = EntityProperty.GeneratePropertyForLong(HighestSequenceNr), - [ManifestKeyName] = EntityProperty.GeneratePropertyForString(Manifest), - }; - - return dict; + var entity = new TableEntity + { + PartitionKey = PartitionKey, + ETag = ETag, + RowKey = RowKey, + Timestamp = Timestamp, + [HighestSequenceNrKey] = HighestSequenceNr, + }; + + if (!string.IsNullOrWhiteSpace(Manifest)) + entity[ManifestKeyName] = Manifest; + + return entity; } } } \ No newline at end of file diff --git a/src/Akka.Persistence.Azure/TableEntities/PersistentJournalEntry.cs b/src/Akka.Persistence.Azure/TableEntities/PersistentJournalEntry.cs index 8ba2aa6..7e64b91 100644 --- a/src/Akka.Persistence.Azure/TableEntities/PersistentJournalEntry.cs +++ b/src/Akka.Persistence.Azure/TableEntities/PersistentJournalEntry.cs @@ -7,9 +7,8 @@ using Akka.Persistence.Azure.Journal; using Akka.Persistence.Azure.Util; using System; -using System.Collections.Generic; -using System.Linq; -using Microsoft.Azure.Cosmos.Table; +using Azure; +using Azure.Data.Tables; namespace Akka.Persistence.Azure.TableEntities { @@ -26,7 +25,7 @@ namespace Akka.Persistence.Azure.TableEntities /// those expensive parts at run-time. /// Also, due to the way this API is designed... We have to be ok with mutability. /// - internal sealed class PersistentJournalEntry : ITableEntity + internal sealed class PersistentJournalEntry { public const string TagsKeyName = "tags"; public const string UtcTicksKeyName = "utcTicks"; @@ -34,8 +33,22 @@ internal sealed class PersistentJournalEntry : ITableEntity private const string PayloadKeyName = "payload"; private const string SeqNoKeyName = "seqno"; - public PersistentJournalEntry() + public PersistentJournalEntry(TableEntity entity) { + PartitionKey = entity.PartitionKey; + ETag = entity.ETag; + RowKey = entity.RowKey; + Timestamp = entity.Timestamp; + + Manifest = entity.ContainsKey(ManifestKeyName) + ? entity.GetString(ManifestKeyName) + : string.Empty; + + // an exception is fine here - means the data is corrupted anyway + SeqNo = entity.GetInt64(SeqNoKeyName).Value; + Payload = entity.GetBinary(PayloadKeyName); + Tags = entity.GetString(TagsKeyName).Split(new []{';'}, StringSplitOptions.RemoveEmptyEntries); + UtcTicks = entity.GetInt64(UtcTicksKeyName).Value; } public PersistentJournalEntry( @@ -54,21 +67,23 @@ public PersistentJournalEntry( UtcTicks = DateTime.UtcNow.Ticks; } - public string ETag { get; set; } + public string PartitionKey { get; } + + public ETag ETag { get; } + + public string RowKey { get; } + + public DateTimeOffset? Timestamp { get; } /// /// The serialization manifest. /// - public string Manifest { get; set; } - - public string PartitionKey { get; set; } + public string Manifest { get; } /// /// The persistent payload /// - public byte[] Payload { get; set; } - - public string RowKey { get; set; } + public byte[] Payload { get; } /// /// The sequence number. @@ -77,14 +92,12 @@ public PersistentJournalEntry( /// We store this as a separate field since we may pad the /// in order to get the rows for each partition to sort in descending order. /// - public long SeqNo { get; set; } + public long SeqNo { get; } /// /// Tags associated with this entry, if any /// - public string[] Tags { get; set; } - - public DateTimeOffset Timestamp { get; set; } + public string[] Tags { get; } /// /// Ticks of current UTC at the time the entry was created @@ -93,35 +106,26 @@ public PersistentJournalEntry( /// Azure Table Storage does not index the Timestamp value so performing /// any query against it will be extremely slow /// - public long UtcTicks { get; set; } - - public void ReadEntity(IDictionary properties, OperationContext operationContext) - { - Manifest = - properties.ContainsKey(ManifestKeyName) - ? properties[ManifestKeyName].StringValue - : string.Empty; - - // an exception is fine here - means the data is corrupted anyway - SeqNo = properties[SeqNoKeyName].Int64Value.Value; - Payload = properties[PayloadKeyName].BinaryValue; - Tags = properties[TagsKeyName].StringValue.Split(new []{';'}, StringSplitOptions.RemoveEmptyEntries); - UtcTicks = properties[UtcTicksKeyName].Int64Value.Value; - } + public long UtcTicks { get; } - public IDictionary WriteEntity(OperationContext operationContext) + public TableEntity WriteEntity() { - var dict = - new Dictionary - { - [PayloadKeyName] = EntityProperty.GeneratePropertyForByteArray(Payload), - [ManifestKeyName] = EntityProperty.GeneratePropertyForString(Manifest), - [SeqNoKeyName] = EntityProperty.GeneratePropertyForLong(SeqNo), - [TagsKeyName] = EntityProperty.GeneratePropertyForString(string.Join(";", Tags)), - [UtcTicksKeyName] = EntityProperty.GeneratePropertyForLong(UtcTicks) - }; - - return dict; + var entity = new TableEntity + { + PartitionKey = PartitionKey, + ETag = ETag, + RowKey = RowKey, + Timestamp = Timestamp, + [PayloadKeyName] = Payload, + [SeqNoKeyName] = SeqNo, + [TagsKeyName] = string.Join(";", Tags), + [UtcTicksKeyName] = UtcTicks + }; + + if (!string.IsNullOrWhiteSpace(Manifest)) + entity[ManifestKeyName] = Manifest; + + return entity; } } } \ No newline at end of file diff --git a/src/common.props b/src/common.props index c2e797e..0894d8f 100644 --- a/src/common.props +++ b/src/common.props @@ -16,9 +16,9 @@ 2.4.1 - 1.4.25 + 1.4.39 5.10.3 - 16.11.0 + 17.2.0 netcoreapp3.1 net461 netstandard2.0 @@ -33,6 +33,6 @@ snupkg - + \ No newline at end of file