Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MVP proposal #26

Closed
wants to merge 67 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
ab10cc1
FS clean up
lsfera Jul 4, 2024
977f203
suppress CS1591 (Missing XML comment for publicly visible type or me…
lsfera Jul 4, 2024
ad74d3c
made UseTable() optional
lsfera Jul 4, 2024
b59d386
enforce AOT
lsfera Jul 4, 2024
71c1f14
rename IConsumes to IHandles
lsfera Jul 4, 2024
86656ec
Use double quote
lsfera Jul 4, 2024
438cf21
use ILKE. Replication slot name is forced to lcase
lsfera Jul 4, 2024
b24c01f
unused directive
lsfera Jul 4, 2024
b024c6f
remove static variables to enable multiple instances on same process
lsfera Jul 4, 2024
8cc1fed
Added DependencyIbjection project
lsfera Jul 4, 2024
380d06a
Added demo projrct for DI
lsfera Jul 4, 2024
acf5f4d
Enhanced logging
lsfera Jul 5, 2024
e918dfa
bumped version to 0.1.1
lsfera Jul 5, 2024
2055fed
allow tableDescriptor access
lsfera Jul 5, 2024
eadba73
renamed vars
lsfera Jul 5, 2024
c097047
expose NpgsqlDataSource along with connection string - close #16
lsfera Jul 15, 2024
ef9ac95
rename extension method
lsfera Jul 15, 2024
345cad6
move to files
lsfera Jul 15, 2024
645ec1a
simplified di registration
lsfera Jul 16, 2024
ca28054
collapse project
lsfera Jul 16, 2024
f4cfe0c
rename folder
lsfera Jul 16, 2024
f9bf48f
mark as implicit usage
lsfera Jul 17, 2024
7fff1b4
switch to prepared statement
lsfera Jul 17, 2024
f0c4160
dispose resources
lsfera Jul 17, 2024
8db4187
explicit defaults
lsfera Jul 17, 2024
c446721
add PublisherOptions
lsfera Jul 17, 2024
e71999b
Expose shortcut for table validation when bootstrapping publisher
lsfera Jul 17, 2024
2351af1
explain different available to publisher for validating table
lsfera Jul 17, 2024
9d3249a
file renamed
lsfera Jul 6, 2024
ea6f7fe
renamed IHandler to IMessageHandler
lsfera Jul 7, 2024
3887f14
first working version
lsfera Jul 17, 2024
0cf7595
removed unused
lsfera Jul 18, 2024
6809b02
files reorg
lsfera Jul 18, 2024
e20bfa9
marked classes as sealed
lsfera Jul 18, 2024
9656b3b
Reviewed Atrtibutes class allowed only, not inherit
lsfera Jul 18, 2024
06bd0a1
unused file
lsfera Jul 18, 2024
41a96de
unused
lsfera Jul 18, 2024
52b5567
expose singleton
lsfera Jul 18, 2024
253d4d4
reify memoization
lsfera Jul 19, 2024
678b2c8
formatting stuff
lsfera Jul 19, 2024
c60bc89
class renamed to PublisherOptions and SubscriberOptions. Move classes…
lsfera Jul 19, 2024
7729fcd
Ensure subscriber default options
lsfera Jul 20, 2024
5ae673e
rename `PublicationSetupOptions` to `PublicationOptions` and `Replica…
lsfera Jul 20, 2024
85f520b
move classes to files
lsfera Jul 20, 2024
20bc581
renamed files
lsfera Jul 21, 2024
a95cf77
table creation can be enforced when bootstrapping in both processes(p…
lsfera Jul 21, 2024
d3c5825
typo
lsfera Jul 21, 2024
f1d0961
Enforcing invariants on publisher/subscriber options builder
lsfera Jul 21, 2024
910902a
updated satellite packages to latest
lsfera Jul 22, 2024
5126bb9
simplify test
lsfera Jul 22, 2024
b236c91
additional examples
lsfera Jul 22, 2024
49408fa
consumer lookup logic must fallback on willdcard
lsfera Jul 22, 2024
376d748
use select pg_advisory_xact_lock to serialize access to message table…
lsfera Jul 23, 2024
8794c4f
mime type is internally exposed for future extension towards binary d…
lsfera Jul 23, 2024
fa05588
renamed nethod Name => Named for table name
lsfera Jul 23, 2024
7205446
tested table creation
lsfera Jul 23, 2024
ddbd7f6
simplified raw urn
lsfera Jul 24, 2024
964b883
Provide untyped append method
lsfera Jul 24, 2024
1de0a77
avoid usage checking on public method
lsfera Jul 24, 2024
575c2b5
provide additional usage patterns
lsfera Jul 24, 2024
44953bb
provide minimal dsl on typed consumer
lsfera Jul 27, 2024
2726be9
move methodInfo registration at configuration time
lsfera Jul 27, 2024
c777721
enable processed data trace only on trace enabled logging level
lsfera Jul 27, 2024
b3d4d17
corrected IErrorProcessor signature to accept KoEnvelope Id
lsfera Jul 30, 2024
8fd8041
added more publishing options
lsfera Jul 31, 2024
e56c3c1
embed ConsumeOptions with typed Consumes
lsfera Jul 31, 2024
28745e8
Added EnableSubscriptionAutoHeal - see https://github.com/event-drive…
lsfera Aug 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions Blumchen.sln
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "pgAdmin", "pgAdmin", "{C050
docker\pgAdmin\servers.json = docker\pgAdmin\servers.json
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "postgres", "postgres", "{8AAAA344-B5FD-48D9-B2BA-379E374448D4}"
ProjectSection(SolutionItems) = preProject
docker\postgres\init.sql = docker\postgres\init.sql
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{A4044484-FE08-4399-8239-14AABFA30AD7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubscriberWorker", "src\SubscriberWorker\SubscriberWorker.csproj", "{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "UnitTests", "src\UnitTests\UnitTests.csproj", "{B16305B4-8AC3-4435-AADB-D9E2ACAA1C13}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -69,6 +68,14 @@ Global
{2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2BBDA071-FB1C-4D62-A954-B22EA6B1C738}.Release|Any CPU.Build.0 = Release|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92}.Release|Any CPU.Build.0 = Release|Any CPU
{B16305B4-8AC3-4435-AADB-D9E2ACAA1C13}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B16305B4-8AC3-4435-AADB-D9E2ACAA1C13}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B16305B4-8AC3-4435-AADB-D9E2ACAA1C13}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B16305B4-8AC3-4435-AADB-D9E2ACAA1C13}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -78,7 +85,7 @@ Global
{F2878625-0919-4C26-8DC9-58CD8FA34050} = {A4044484-FE08-4399-8239-14AABFA30AD7}
{F81E2D5B-FC59-4396-A911-56BE65E4FE80} = {A4044484-FE08-4399-8239-14AABFA30AD7}
{C050E9E8-3FB6-4581-953F-31826E385FB4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5}
{8AAAA344-B5FD-48D9-B2BA-379E374448D4} = {CD59A1A0-F40D-4047-87A3-66C0F1519FA5}
{DB58DB36-0366-4ABA-BC06-FCA9BB10EB92} = {A4044484-FE08-4399-8239-14AABFA30AD7}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {9A868C51-0460-4700-AF33-E1A921192614}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Main logic is placed in [EventsSubscription](./src/Blumchen/Subscriptions/Subscr
```shell
docker-compose up
```
2. Run(order doesn't matter) Publisher and Subscriber apps, under 'demo' folder, from vs-studio, and follow Publisher instructions.
2. Run(order doesn't matter) Publisher and (Subscriber or SubscriberWorker) apps, under 'demo' folder, from vs-studio, and follow Publisher instructions.

## Testing (against default docker instance)

Expand Down
6 changes: 2 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ services:
- "wal_level=logical"
- "-c"
- "wal_compression=on"
volumes:
- ./docker/postgres/init.sql:/docker-entrypoint-initdb.d/init.sql

- "-c"
- "max_slot_wal_keep_size=1"
pgadmin:
container_name: pgadmin_container
image: dpage/pgadmin4
Expand All @@ -36,4 +35,3 @@ services:
depends_on:
- postgres
restart: unless-stopped

6 changes: 0 additions & 6 deletions docker/postgres/init.sql

This file was deleted.

24 changes: 21 additions & 3 deletions src/Blumchen/Blumchen.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<VersionPrefix>0.1.0</VersionPrefix>
<VersionPrefix>0.1.1</VersionPrefix>
<TargetFramework>net8.0</TargetFramework>
<GenerateAssemblyTitleAttribute>true</GenerateAssemblyTitleAttribute>
<GenerateAssemblyDescriptionAttribute>true</GenerateAssemblyDescriptionAttribute>
Expand All @@ -25,20 +25,38 @@
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<RootNamespace>Blumchen</RootNamespace>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1591</NoWarn>
<WarningsNotAsErrors></WarningsNotAsErrors>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<NoWarn>1591</NoWarn>
<WarningsNotAsErrors></WarningsNotAsErrors>
</PropertyGroup>
<ItemGroup>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Tests</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>UnitTests</_Parameter1>
</AssemblyAttribute>
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleToAttribute">
<_Parameter1>Blumchen.DependencyInjection</_Parameter1>
</AssemblyAttribute>
</ItemGroup>

<ItemGroup>
<PackageReference Include="JetBrains.Annotations" Version="2023.3.0" >
<PackageReference Include="JetBrains.Annotations" Version="2024.2.0">
<PrivateAssets>all</PrivateAssets>
<ExcludeAssets>none</ExcludeAssets>
<IncludeAssets>all</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
<PackageReference Include="Npgsql" Version="8.0.3" />
<PackageReference Include="Polly.Core" Version="8.4.1" />
</ItemGroup>

</Project>
3 changes: 3 additions & 0 deletions src/Blumchen/ConfigurationException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Blumchen;

public class ConfigurationException(string message): Exception(message);
7 changes: 2 additions & 5 deletions src/Blumchen/Database/Run.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
using System.Data;
using System.Runtime.CompilerServices;
using Blumchen.Subscriptions.Replication;
using Blumchen.Subscriptions.ReplicationMessageHandlers;
using Npgsql;

#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member

namespace Blumchen.Database;

public static class Run
Expand All @@ -16,11 +13,11 @@ private static async Task Execute(
CancellationToken ct)
{
await using var command = dataSource.CreateCommand(sql);
await command.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
await command.ExecuteNonQueryAsync(ct);
}

public static async Task EnsureTableExists(this NpgsqlDataSource dataSource, TableDescriptorBuilder.MessageTable tableDescriptor, CancellationToken ct)
=> await dataSource.Execute(tableDescriptor.ToString(), ct).ConfigureAwait(false);
=> await dataSource.Execute(string.Concat("select pg_advisory_xact_lock(12345);", tableDescriptor), ct).ConfigureAwait(false);

public static async Task<bool> Exists(
this NpgsqlDataSource dataSource,
Expand Down
23 changes: 23 additions & 0 deletions src/Blumchen/DependencyInjection/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Blumchen.Subscriptions.Replication;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

#pragma warning disable IL2091

namespace Blumchen.DependencyInjection;

public static class ServiceCollectionExtensions
{

public static IServiceCollection AddBlumchen<T>(
this IServiceCollection service,
Func<IServiceProvider, IWorkerOptionsBuilder, IWorkerOptionsBuilder> workerOptions)
where T : class, IMessageHandler =>
service
.AddKeyedSingleton(typeof(T), (provider, _) => workerOptions(provider, new WorkerOptionsBuilder()).Build())
.AddHostedService(provider =>
new Worker<T>(workerOptions(provider, new WorkerOptionsBuilder()).Build(),
provider.GetRequiredService<ILogger<Worker<T>>>()));


}
48 changes: 48 additions & 0 deletions src/Blumchen/DependencyInjection/Worker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System.Collections.Concurrent;
using Blumchen.Subscriptions;
using Blumchen.Subscriptions.Replication;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Blumchen.DependencyInjection;

public class Worker<T>(
WorkerOptions options,
ILogger<Worker<T>> logger): BackgroundService where T : class, IMessageHandler
{
private string WorkerName { get; } = $"{nameof(Worker<T>)}<{typeof(T).Name}>";
private static readonly ConcurrentDictionary<string, Action<ILogger, string, object[]>> LoggingActions = new(StringComparer.OrdinalIgnoreCase);
private static void Notify(ILogger logger, LogLevel level, string template, params object[] parameters)
{
LoggingActions.GetOrAdd(template,_ => LoggerAction(level, logger.IsEnabled(level)))(logger, template, parameters);
return;

static Action<ILogger, string, object[]> LoggerAction(LogLevel ll, bool enabled) =>
(ll, enabled) switch
{
(LogLevel.Information, true) => (logger, template, parameters) => logger.LogInformation(template, parameters),
(LogLevel.Debug, true) => (logger, template, parameters) => logger.LogDebug(template, parameters),
(LogLevel.Trace, true) => (logger, template, parameters) => logger.LogTrace(template, parameters),
(LogLevel.Warning, true) => (logger, template, parameters) => logger.LogWarning(template, parameters),
(LogLevel.Error, true) => (logger, template, parameters) => logger.LogError(template, parameters),
(LogLevel.Critical, true) => (logger, template, parameters) => logger.LogCritical(template, parameters),
(_, _) => (_, _, _) => { }
};
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await options.OuterPipeline.ExecuteAsync(async token =>
await options.InnerPipeline.ExecuteAsync(async ct =>
{
await using var subscription = new Subscription();
await using var cursor = subscription.Subscribe(options.SubscriberOptions, ct)
.GetAsyncEnumerator(ct);
Notify(logger, LogLevel.Information, "{WorkerName} started", WorkerName);
while (await cursor.MoveNextAsync().ConfigureAwait(false) && !ct.IsCancellationRequested)
Notify(logger, LogLevel.Trace, "{cursor.Current} processed", cursor.Current);
}, token).ConfigureAwait(false), stoppingToken).ConfigureAwait(false);
Notify(logger, LogLevel.Information, "{WorkerName} stopped", WorkerName);
}

}
67 changes: 67 additions & 0 deletions src/Blumchen/DependencyInjection/WorkerOptionsBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
using Blumchen.Subscriber;
using Blumchen.Subscriptions.Management;
using Npgsql;
using Npgsql.Replication;
using Polly;

namespace Blumchen.DependencyInjection;

public record WorkerOptions(
ISubscriberOptions SubscriberOptions,
ResiliencePipeline OuterPipeline,
ResiliencePipeline InnerPipeline);

public interface IWorkerOptionsBuilder
{
IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline);
IWorkerOptionsBuilder Subscription(Func<OptionsBuilder, OptionsBuilder>? builder);
WorkerOptions Build();
IWorkerOptionsBuilder EnableSubscriptionAutoHeal();
}

internal sealed class WorkerOptionsBuilder: IWorkerOptionsBuilder
{
private ResiliencePipeline? _outerPipeline = default;
private Func<string, string, ResiliencePipeline>? _innerPipelineFn = default;
private Func<OptionsBuilder, OptionsBuilder>? _builder;

public IWorkerOptionsBuilder ResiliencyPipeline(ResiliencePipeline resiliencePipeline)
{
_outerPipeline = resiliencePipeline;
return this;
}public IWorkerOptionsBuilder Subscription(Func<OptionsBuilder, OptionsBuilder>? builder)
{
_builder = builder;
return this;
}

public WorkerOptions Build()
{
ArgumentNullException.ThrowIfNull(_outerPipeline);
ArgumentNullException.ThrowIfNull(_builder);
var subscriberOptions = _builder(new OptionsBuilder()).Build();
return new(subscriberOptions, _outerPipeline,
_innerPipelineFn?.Invoke(subscriberOptions.ReplicationOptions.SlotName,subscriberOptions.ConnectionStringBuilder.ConnectionString) ??
ResiliencePipeline.Empty
);
}

public IWorkerOptionsBuilder EnableSubscriptionAutoHeal()
{
_innerPipelineFn = (replicationSlotName, connectionString) => new ResiliencePipelineBuilder().AddRetry(new()
{
ShouldHandle =
new PredicateBuilder().Handle<PostgresException>(exception =>
exception.SqlState.Equals("55000", StringComparison.OrdinalIgnoreCase)),
MaxRetryAttempts = 1,
OnRetry = async args =>
{
await using var conn = new LogicalReplicationConnection(connectionString);
await conn.Open(args.Context.CancellationToken);
await conn.ReCreate(replicationSlotName, args.Context.CancellationToken).ConfigureAwait(false);
},
}).Build();
return this;
}
}

32 changes: 32 additions & 0 deletions src/Blumchen/Ensure.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System.Collections;
using Blumchen.Serialization;
using Blumchen.Subscriber;

namespace Blumchen;

internal static class Ensure
{
public static void RawUrn<T,TR>(T value, string parameters) => new RawUrnTrait<T,TR>().IsValid(value, parameters);
public static void Null<T>(T value, string parameters) => new NullTrait<T>().IsValid(value, parameters);
public static void NotNull<T>(T value, string parameters) => new NotNullTrait<T>().IsValid(value, parameters);
public static void NotEmpty<T>(T value, string parameters) => new NotEmptyTrait<T>().IsValid(value, parameters);
public static void Empty<T>(T value, string parameters) => new EmptyTrait<T>().IsValid(value, parameters);
public static bool Empty<T, TU>(T value1, TU value2, params string[] parameters) =>
new EmptyTrait<T>().IsValid(value1, parameters) && new EmptyTrait<TU>().IsValid(value2, parameters);
}

internal abstract class Validable<T>(Func<T, bool> condition, string errorFormat)
{
public bool IsValid(T value, params string[] parameters)
{
if (!condition(value))
throw new ConfigurationException(string.Format(errorFormat, parameters));
return true;
}
}

internal class RawUrnTrait<T,TR>(): Validable<T>(v => v is ICollection { Count: > 0 }, $"`{nameof(RawUrnAttribute)}` missing on `{typeof(TR).Name}` message type");
internal class NullTrait<T>(): Validable<T>(v => v is null, $"`{{0}}` method on {nameof(OptionsBuilder)} called more then once");
internal class NotNullTrait<T>(): Validable<T>(v => v is not null, $"`{{0}}` method not called on {nameof(OptionsBuilder)}");
internal class NotEmptyTrait<T>(): Validable<T>(v => v is ICollection { Count: > 0 }, $"No `{{0}}` method called on {nameof(OptionsBuilder)}");
internal class EmptyTrait<T>(): Validable<T>(v => v is ICollection { Count: 0 }, $"`{{0}}` cannot be mixed with other consuming strategies");
8 changes: 8 additions & 0 deletions src/Blumchen/IDictionaryExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace Blumchen;

internal static class IDictionaryExtensions
{
public static TR FindByMultiKey<T, TR>(this IDictionary<T, TR> registry, params T[] parameters)
where T : class =>
!registry.TryGetValue(parameters[0], out var value) ? registry.FindByMultiKey(parameters[1..parameters.Length]) : value;
}
Loading
Loading