From 794f773254bd275e3cda48cf51129b3a00981fe6 Mon Sep 17 00:00:00 2001 From: Bazen Date: Sat, 11 Nov 2023 01:14:27 +0200 Subject: [PATCH] Added produce to a specific subject & refactored (#156) --- .../Constants/MemphisConstants.cs | 2 + .../Consumer/FetchMessageOptions.cs | 2 - .../Consumer/IMemphisConsumer.cs | 6 +- .../Consumer/MemphisConsumer.cs | 10 - .../Consumer/MemphisConsumerOptions.cs | 2 - .../Core/MemphisConnectionEventArgs.cs | 6 +- src/Memphis.Client/Core/MemphisMessage.cs | 7 +- .../Core/MemphisMessageHandler.cs | 31 +- .../Exception/MemphisConnectionException.cs | 11 +- .../Exception/MemphisException.cs | 16 +- .../Exception/MemphisMessageIdException.cs | 11 +- .../MemphisSchemaValidationException.cs | 11 +- src/Memphis.Client/GlobalUsings.cs | 16 + src/Memphis.Client/Helper/JsonSerDes.cs | 2 - src/Memphis.Client/Helper/MemphisUtil.cs | 57 ++- .../Helper/MessageSerializer.cs | 8 +- src/Memphis.Client/IMemphisClient.cs | 7 +- src/Memphis.Client/MemphisClient.Station.cs | 85 +++++ src/Memphis.Client/MemphisClient.cs | 23 +- src/Memphis.Client/MemphisClientFactory.cs | 351 +++++++++--------- .../Models/Request/AttachSchemaRequest.cs | 23 +- .../Models/Request/CreateConsumerRequest.cs | 77 ++-- .../Models/Request/CreateProducerRequest.cs | 41 +- .../Models/Request/CreateSchemaRequest.cs | 4 +- .../Models/Request/CreateStationRequest.cs | 2 - .../Models/Request/DetachSchemaRequest.cs | 19 +- .../Models/Request/DlsMessage.cs | 64 ++-- .../Models/Request/NotificationRequest.cs | 31 +- src/Memphis.Client/Models/Request/PmAckMsg.cs | 21 +- .../Models/Request/RemoveConsumerRequest.cs | 31 +- .../Models/Request/RemoveProducerRequest.cs | 31 +- .../Models/Request/RemoveStationRequest.cs | 19 +- .../Models/Response/CreateConsumerResponse.cs | 4 +- .../Models/Response/CreateProducerResponse.cs | 14 +- .../Models/Response/CreateSchemaResponse.cs | 4 +- .../Models/Response/FunctionsDetails.cs | 25 ++ .../Models/Response/ProducerSchemaUpdate.cs | 29 +- .../Response/ProducerSchemaUpdateVersion.cs | 33 +- .../Models/Response/SchemaUpdateInit.cs | 2 - .../Models/Response/SdkClientsUpdate.cs | 24 +- src/Memphis.Client/Options.cs | 1 - .../Producer/IMemphisProducer.cs | 6 +- .../Producer/MemphisProducer.cs | 71 ++-- .../Producer/MemphisProducerOptions.cs | 4 +- src/Memphis.Client/Station/IMemphisStation.cs | 4 +- src/Memphis.Client/Station/MemphisStation.cs | 4 +- .../Station/StationPartitionResolver.cs | 5 +- .../Validators/AvroValidator.cs | 7 +- .../Validators/GraphqlValidator.cs | 104 +++--- .../Validators/ISchemaValidator.cs | 13 +- .../Validators/JsonValidator.cs | 50 ++- .../Validators/ProtoBufValidator.cs | 7 +- .../Validators/SchemaValidator.cs | 60 ++- .../Validators/ValidatorType.cs | 15 +- 54 files changed, 740 insertions(+), 773 deletions(-) create mode 100644 src/Memphis.Client/GlobalUsings.cs create mode 100644 src/Memphis.Client/MemphisClient.Station.cs create mode 100644 src/Memphis.Client/Models/Response/FunctionsDetails.cs diff --git a/src/Memphis.Client/Constants/MemphisConstants.cs b/src/Memphis.Client/Constants/MemphisConstants.cs index 7dfda69..02c211a 100644 --- a/src/Memphis.Client/Constants/MemphisConstants.cs +++ b/src/Memphis.Client/Constants/MemphisConstants.cs @@ -39,6 +39,8 @@ internal class MemphisSubjects // not available yes public const string SCHEMA_DESTRUCTION = ""; + + public const string FUNCTIONS_UPDATE = "$memphis_functions_updates_"; } public static class MemphisSchemaTypes diff --git a/src/Memphis.Client/Consumer/FetchMessageOptions.cs b/src/Memphis.Client/Consumer/FetchMessageOptions.cs index 21bd65b..dc88419 100644 --- a/src/Memphis.Client/Consumer/FetchMessageOptions.cs +++ b/src/Memphis.Client/Consumer/FetchMessageOptions.cs @@ -1,7 +1,5 @@ #nullable disable -using System; - namespace Memphis.Client.Consumer; public sealed class FetchMessageOptions diff --git a/src/Memphis.Client/Consumer/IMemphisConsumer.cs b/src/Memphis.Client/Consumer/IMemphisConsumer.cs index 4e3f8fd..d5aa174 100644 --- a/src/Memphis.Client/Consumer/IMemphisConsumer.cs +++ b/src/Memphis.Client/Consumer/IMemphisConsumer.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Memphis.Client.Core; +using Memphis.Client.Core; namespace Memphis.Client.Consumer; diff --git a/src/Memphis.Client/Consumer/MemphisConsumer.cs b/src/Memphis.Client/Consumer/MemphisConsumer.cs index 19468af..8dca40b 100644 --- a/src/Memphis.Client/Consumer/MemphisConsumer.cs +++ b/src/Memphis.Client/Consumer/MemphisConsumer.cs @@ -1,18 +1,8 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Memphis.Client.Constants; using Memphis.Client.Core; -using Memphis.Client.Exception; using Memphis.Client.Helper; using Memphis.Client.Models.Request; using Memphis.Client.Station; -using NATS.Client; -using NATS.Client.JetStream; namespace Memphis.Client.Consumer; diff --git a/src/Memphis.Client/Consumer/MemphisConsumerOptions.cs b/src/Memphis.Client/Consumer/MemphisConsumerOptions.cs index 40dbfe4..72a2166 100644 --- a/src/Memphis.Client/Consumer/MemphisConsumerOptions.cs +++ b/src/Memphis.Client/Consumer/MemphisConsumerOptions.cs @@ -1,7 +1,5 @@ #nullable disable -using System; - namespace Memphis.Client.Consumer; public sealed class MemphisConsumerOptions diff --git a/src/Memphis.Client/Core/MemphisConnectionEventArgs.cs b/src/Memphis.Client/Core/MemphisConnectionEventArgs.cs index c4d2a7f..a6c9fe8 100644 --- a/src/Memphis.Client/Core/MemphisConnectionEventArgs.cs +++ b/src/Memphis.Client/Core/MemphisConnectionEventArgs.cs @@ -1,8 +1,4 @@ -using System; -using Memphis.Client.Exception; -using NATS.Client; - -namespace Memphis.Client; +namespace Memphis.Client; public class MemphisConnectionEventArgs { diff --git a/src/Memphis.Client/Core/MemphisMessage.cs b/src/Memphis.Client/Core/MemphisMessage.cs index abac686..c0d956c 100644 --- a/src/Memphis.Client/Core/MemphisMessage.cs +++ b/src/Memphis.Client/Core/MemphisMessage.cs @@ -1,10 +1,5 @@ -using System; -using System.Text; -using Memphis.Client.Constants; -using Memphis.Client.Exception; -using Memphis.Client.Helper; +using Memphis.Client.Helper; using Memphis.Client.Models.Request; -using NATS.Client; namespace Memphis.Client.Core; diff --git a/src/Memphis.Client/Core/MemphisMessageHandler.cs b/src/Memphis.Client/Core/MemphisMessageHandler.cs index d03fe1c..3e01b41 100644 --- a/src/Memphis.Client/Core/MemphisMessageHandler.cs +++ b/src/Memphis.Client/Core/MemphisMessageHandler.cs @@ -1,26 +1,21 @@ -using System; -using System.Collections.Generic; -using NATS.Client.JetStream; +namespace Memphis.Client.Core; -namespace Memphis.Client.Core +public class MemphisMessageHandlerEventArgs : EventArgs { - public class MemphisMessageHandlerEventArgs : EventArgs + public MemphisMessageHandlerEventArgs(List messageList, IJetStream? context, System.Exception? ex) { - public MemphisMessageHandlerEventArgs(List messageList, IJetStream? context, System.Exception? ex) - { - MessageList = messageList; - Context = context; - Exception = ex; - } + MessageList = messageList; + Context = context; + Exception = ex; + } - /// - /// Retrieves the message. - /// - public List MessageList { get; } + /// + /// Retrieves the message. + /// + public List MessageList { get; } - public System.Exception? Exception { get; } + public System.Exception? Exception { get; } - public IJetStream? Context { get; set; } - } + public IJetStream? Context { get; set; } } \ No newline at end of file diff --git a/src/Memphis.Client/Exception/MemphisConnectionException.cs b/src/Memphis.Client/Exception/MemphisConnectionException.cs index 98ad0ce..2d3e381 100644 --- a/src/Memphis.Client/Exception/MemphisConnectionException.cs +++ b/src/Memphis.Client/Exception/MemphisConnectionException.cs @@ -1,8 +1,7 @@ -namespace Memphis.Client.Exception +namespace Memphis.Client.Exception; + +public class MemphisConnectionException : MemphisException { - public class MemphisConnectionException : MemphisException - { - public MemphisConnectionException(string err, System.Exception ex) : base(err, ex) { } - public MemphisConnectionException(string err) : base(err) { } - } + public MemphisConnectionException(string err, System.Exception ex) : base(err, ex) { } + public MemphisConnectionException(string err) : base(err) { } } \ No newline at end of file diff --git a/src/Memphis.Client/Exception/MemphisException.cs b/src/Memphis.Client/Exception/MemphisException.cs index d5da957..b13ff0a 100644 --- a/src/Memphis.Client/Exception/MemphisException.cs +++ b/src/Memphis.Client/Exception/MemphisException.cs @@ -1,18 +1,14 @@ -using System; -using System.Text.RegularExpressions; +namespace Memphis.Client.Exception; -namespace Memphis.Client.Exception +public class MemphisException : System.Exception { - public class MemphisException : System.Exception + public MemphisException(String err) : base(Regex.Replace(err, @"[nN][aA][Tt][Ss]", "memphis")) { - public MemphisException(String err) : base(Regex.Replace(err, @"[nN][aA][Tt][Ss]", "memphis")) - { - } + } - public MemphisException(String err, System.Exception innerEx) : base(Regex.Replace(err, @"[nN][aA][Tt][Ss]", "memphis"), innerEx) - { + public MemphisException(String err, System.Exception innerEx) : base(Regex.Replace(err, @"[nN][aA][Tt][Ss]", "memphis"), innerEx) + { - } } } \ No newline at end of file diff --git a/src/Memphis.Client/Exception/MemphisMessageIdException.cs b/src/Memphis.Client/Exception/MemphisMessageIdException.cs index 76aedbd..b16835b 100644 --- a/src/Memphis.Client/Exception/MemphisMessageIdException.cs +++ b/src/Memphis.Client/Exception/MemphisMessageIdException.cs @@ -1,8 +1,7 @@ -namespace Memphis.Client.Exception +namespace Memphis.Client.Exception; + +public class MemphisMessageIdException : MemphisException { - public class MemphisMessageIdException : MemphisException - { - public MemphisMessageIdException(string err, System.Exception ex) : base(err, ex) { } - public MemphisMessageIdException(string err) : base(err) { } - } + public MemphisMessageIdException(string err, System.Exception ex) : base(err, ex) { } + public MemphisMessageIdException(string err) : base(err) { } } \ No newline at end of file diff --git a/src/Memphis.Client/Exception/MemphisSchemaValidationException.cs b/src/Memphis.Client/Exception/MemphisSchemaValidationException.cs index 6b43046..abffee4 100644 --- a/src/Memphis.Client/Exception/MemphisSchemaValidationException.cs +++ b/src/Memphis.Client/Exception/MemphisSchemaValidationException.cs @@ -1,8 +1,7 @@ -namespace Memphis.Client.Exception +namespace Memphis.Client.Exception; + +public class MemphisSchemaValidationException : MemphisException { - public class MemphisSchemaValidationException : MemphisException - { - public MemphisSchemaValidationException(string err, System.Exception ex) : base(err, ex) { } - public MemphisSchemaValidationException(string err) : base(err) { } - } + public MemphisSchemaValidationException(string err, System.Exception ex) : base(err, ex) { } + public MemphisSchemaValidationException(string err) : base(err) { } } \ No newline at end of file diff --git a/src/Memphis.Client/GlobalUsings.cs b/src/Memphis.Client/GlobalUsings.cs new file mode 100644 index 0000000..6391261 --- /dev/null +++ b/src/Memphis.Client/GlobalUsings.cs @@ -0,0 +1,16 @@ +global using System; +global using System.Collections.Concurrent; +global using System.Collections.Generic; +global using System.Collections.Specialized; +global using System.IO; +global using System.Text; +global using System.Text.RegularExpressions; +global using System.Threading; +global using System.Threading.Tasks; +global using System.Runtime.Serialization; +global using Memphis.Client.Constants; +global using Memphis.Client.Exception; +global using NATS.Client; +global using NATS.Client.Internals; +global using NATS.Client.JetStream; +global using Newtonsoft.Json; diff --git a/src/Memphis.Client/Helper/JsonSerDes.cs b/src/Memphis.Client/Helper/JsonSerDes.cs index 3400b48..59e4010 100644 --- a/src/Memphis.Client/Helper/JsonSerDes.cs +++ b/src/Memphis.Client/Helper/JsonSerDes.cs @@ -1,6 +1,4 @@ -using System.IO; using System.Runtime.Serialization.Json; -using System.Text; namespace Memphis.Client.Helper; diff --git a/src/Memphis.Client/Helper/MemphisUtil.cs b/src/Memphis.Client/Helper/MemphisUtil.cs index d39ab6d..836f471 100644 --- a/src/Memphis.Client/Helper/MemphisUtil.cs +++ b/src/Memphis.Client/Helper/MemphisUtil.cs @@ -1,41 +1,38 @@ -using System; using System.Security.Cryptography; -using System.Text; -namespace Memphis.Client.Helper +namespace Memphis.Client.Helper; + +internal static class MemphisUtil { - internal static class MemphisUtil + internal static string GetInternalName(string name) + { + return name.ToLower().Replace(".", "#"); + } + + internal static string GetStationName(string internalStationName) { - internal static string GetInternalName(string name) - { - return name.ToLower().Replace(".", "#"); - } - - internal static string GetStationName(string internalStationName) + return internalStationName.Replace("#", "."); + } + + internal static readonly char[] chars = + "0123456789abcdef".ToCharArray(); + + internal static string GetUniqueKey(int size) + { + byte[] data = new byte[4*size]; + using (var crypto = RandomNumberGenerator.Create()) { - return internalStationName.Replace("#", "."); + crypto.GetBytes(data); } - - internal static readonly char[] chars = - "0123456789abcdef".ToCharArray(); - - internal static string GetUniqueKey(int size) + StringBuilder result = new StringBuilder(size); + for (int i = 0; i < size; i++) { - byte[] data = new byte[4*size]; - using (var crypto = RandomNumberGenerator.Create()) - { - crypto.GetBytes(data); - } - StringBuilder result = new StringBuilder(size); - for (int i = 0; i < size; i++) - { - var rnd = BitConverter.ToUInt32(data, i * 4); - var idx = rnd % chars.Length; + var rnd = BitConverter.ToUInt32(data, i * 4); + var idx = rnd % chars.Length; - result.Append(chars[idx]); - } - - return result.ToString(); + result.Append(chars[idx]); } + + return result.ToString(); } } \ No newline at end of file diff --git a/src/Memphis.Client/Helper/MessageSerializer.cs b/src/Memphis.Client/Helper/MessageSerializer.cs index 72390bd..8fc7cda 100644 --- a/src/Memphis.Client/Helper/MessageSerializer.cs +++ b/src/Memphis.Client/Helper/MessageSerializer.cs @@ -1,10 +1,4 @@ -using System; -using System.IO; -using System.Text; -using Memphis.Client.Constants; -using Memphis.Client.Exception; -using Newtonsoft.Json; -using SolTechnology.Avro; +using SolTechnology.Avro; namespace Memphis.Client; diff --git a/src/Memphis.Client/IMemphisClient.cs b/src/Memphis.Client/IMemphisClient.cs index e9932ed..b54c940 100644 --- a/src/Memphis.Client/IMemphisClient.cs +++ b/src/Memphis.Client/IMemphisClient.cs @@ -1,9 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Collections.Specialized; -using System.Threading; -using System.Threading.Tasks; -using Memphis.Client.Consumer; +using Memphis.Client.Consumer; using Memphis.Client.Core; using Memphis.Client.Producer; using Memphis.Client.Station; diff --git a/src/Memphis.Client/MemphisClient.Station.cs b/src/Memphis.Client/MemphisClient.Station.cs new file mode 100644 index 0000000..c99baff --- /dev/null +++ b/src/Memphis.Client/MemphisClient.Station.cs @@ -0,0 +1,85 @@ +using Memphis.Client.Helper; + +namespace Memphis.Client; + +public partial class MemphisClient +{ + private readonly ConcurrentDictionary _functionDetails = new(); + private readonly ConcurrentDictionary _functionDetailSubscriptions = new(); + private readonly ConcurrentDictionary _functionDetailSubscriptionCounter = new(); + + internal ConcurrentDictionary FunctionDetails { get => _functionDetails; } + + private Task ListenForFunctionUpdate(string stationName, int stationVersion, CancellationToken cancellationToken) + { + if (string.IsNullOrWhiteSpace(stationName) || + stationVersion <= 0) + return Task.CompletedTask; + + try + { + var internalStationName = MemphisUtil.GetInternalName(stationName); + if (_functionDetailSubscriptions.TryGetValue(internalStationName, out _)) + { + _functionDetailSubscriptionCounter.AddOrUpdate(internalStationName, 1, (_, count) => count + 1); + return Task.CompletedTask; + } + var functionUpdateSubject = $"{MemphisSubjects.FUNCTIONS_UPDATE}{internalStationName}"; + var subscription = _brokerConnection.SubscribeAsync(functionUpdateSubject, FunctionUpdateEventHandler); + if (!_functionDetailSubscriptions.TryAdd(internalStationName, subscription)) + throw new MemphisException($"Could not add subscription for {functionUpdateSubject}."); + _functionDetailSubscriptionCounter.AddOrUpdate(internalStationName, 1, (_, count) => count + 1); + return Task.CompletedTask; + } + catch (System.Exception e) + { + throw new MemphisException(e.Message, e); + } + + void FunctionUpdateEventHandler(object sender, MsgHandlerEventArgs e) + { + if (e is null || e.Message is null) + return; + + var jsonData = Encoding.UTF8.GetString(e.Message.Data); + var functionsUpdate = JsonConvert.DeserializeObject(jsonData); + if (functionsUpdate is null) + return; + + _functionDetails.AddOrUpdate( + e.Message.Subject, + new FunctionsDetails { PartitionsFunctions = functionsUpdate.Functions }, + (_, _) => new FunctionsDetails { PartitionsFunctions = functionsUpdate.Functions }); + } + } + + private async Task RemoveFunctionUpdateListenerAsync(string stationName) + { + try + { + if (string.IsNullOrWhiteSpace(stationName)) + return; + + var internalStationName = MemphisUtil.GetInternalName(stationName); + if (!_functionDetailSubscriptionCounter.TryGetValue(internalStationName, out var count) || + count <= 0) + return; + + int countAfterRemoval = count - 1; + _functionDetailSubscriptionCounter.TryUpdate(internalStationName, countAfterRemoval, count); + if (countAfterRemoval <= 0) + { + if (_functionDetailSubscriptions.TryGetValue(internalStationName, out var subscriptionToRemove)) + { + await subscriptionToRemove.DrainAsync(); + _functionDetailSubscriptions.TryRemove(internalStationName, out _); + } + _functionDetails.TryRemove(internalStationName, out _); + } + } + catch (System.Exception e) + { + throw new MemphisException(e.Message, e); + } + } +} diff --git a/src/Memphis.Client/MemphisClient.cs b/src/Memphis.Client/MemphisClient.cs index d502e4d..cba542a 100644 --- a/src/Memphis.Client/MemphisClient.cs +++ b/src/Memphis.Client/MemphisClient.cs @@ -1,17 +1,5 @@ -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Collections.Specialized; -using System.IO; -using System.Security.Cryptography; -using System.Text; -using System.Text.RegularExpressions; -using System.Threading; -using System.Threading.Tasks; -using Memphis.Client.Constants; using Memphis.Client.Consumer; using Memphis.Client.Core; -using Memphis.Client.Exception; using Memphis.Client.Helper; using Memphis.Client.Models.Request; using Memphis.Client.Models.Response; @@ -19,9 +7,6 @@ using Memphis.Client.Station; using Memphis.Client.Validators; using Murmur; -using NATS.Client; -using NATS.Client.JetStream; -using Newtonsoft.Json; #pragma warning disable CS8625 // Cannot convert null literal to non-nullable reference type. #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously @@ -31,7 +16,7 @@ namespace Memphis.Client; -public sealed class MemphisClient : IMemphisClient +public sealed partial class MemphisClient : IMemphisClient { private bool _desposed; private readonly Options _brokerConnOptions; @@ -184,6 +169,7 @@ public async Task CreateProducer(MemphisProducerOptions produce _clusterConfigurations.AddOrUpdate(MemphisSdkClientUpdateTypes.SEND_NOTIFICATION, createProducerResponse.SendNotification, (_, _) => createProducerResponse.SendNotification); await ListenForSchemaUpdate(internalStationName, createProducerResponse.SchemaUpdate); + await ListenForFunctionUpdate(internalStationName, createProducerResponse.StationVersion, cancellationToken); if (createProducerResponse.PartitionsUpdate is not null) { @@ -216,6 +202,8 @@ public async Task CreateProducer(MemphisProducerOptions produce } } + + public async Task> FetchMessages( FetchMessageOptions options, CancellationToken cancellationToken = default @@ -784,6 +772,7 @@ await SendNotificationAsync(title: "Schema validation has failed", internal async Task NotifyRemoveProducer(string stationName) { await RemoveFromSchemaUpdateListener(stationName); + await RemoveFunctionUpdateListenerAsync(stationName); } internal async Task NotifyRemoveConsumer(string stationName) @@ -1177,7 +1166,6 @@ void SyncSdkClientUpdate() { throw new MemphisException($"Unable to deserialize sdk client update: {respAsJson}", exc); } - try { _sdkClientUpdateSemaphore.WaitAsync(); @@ -1194,6 +1182,7 @@ void SyncSdkClientUpdate() RemoveStationProducers(sdkClientUpdate.StationName); RemoveStationConsumers(sdkClientUpdate.StationName); RemoveSchemaUpdateListener(sdkClientUpdate.StationName); + RemoveFunctionUpdateListenerAsync(sdkClientUpdate.StationName); break; default: break; diff --git a/src/Memphis.Client/MemphisClientFactory.cs b/src/Memphis.Client/MemphisClientFactory.cs index af1e19d..a5e722a 100644 --- a/src/Memphis.Client/MemphisClientFactory.cs +++ b/src/Memphis.Client/MemphisClientFactory.cs @@ -1,222 +1,213 @@ -using System; -using System.Runtime.CompilerServices; using System.Security.Cryptography.X509Certificates; -using System.Text.RegularExpressions; -using System.Threading; -using System.Threading.Tasks; -using Memphis.Client.Exception; -using NATS.Client; -using NATS.Client.JetStream; - -namespace Memphis.Client + +namespace Memphis.Client; + +public static class MemphisClientFactory { - public static class MemphisClientFactory + public static ClientOptions GetDefaultOptions() { - public static ClientOptions GetDefaultOptions() + return new ClientOptions { - return new ClientOptions - { - Port = 6666, - Reconnect = true, - MaxReconnect = 10, - MaxReconnectIntervalMs = 1_500, - TimeoutMs = (int)TimeSpan.FromSeconds(2).TotalMilliseconds, - AccountId = 1 - }; - } + Port = 6666, + Reconnect = true, + MaxReconnect = 10, + MaxReconnectIntervalMs = 1_500, + TimeoutMs = (int)TimeSpan.FromSeconds(2).TotalMilliseconds, + AccountId = 1 + }; + } - /// - /// Create Memphis Client - /// - /// Client Options used to customize behavior of client used to connect Memphis - /// An object connected to the Memphis server. - public static async Task CreateClient(ClientOptions opts, - CancellationToken cancellationToken = default) - { - if (XNOR(string.IsNullOrWhiteSpace(opts.ConnectionToken), - string.IsNullOrWhiteSpace(opts.Password))) - throw new MemphisException("You have to connect with one of the following methods: connection token / password"); + /// + /// Create Memphis Client + /// + /// Client Options used to customize behavior of client used to connect Memphis + /// An object connected to the Memphis server. + public static async Task CreateClient(ClientOptions opts, + CancellationToken cancellationToken = default) + { + if (XNOR(string.IsNullOrWhiteSpace(opts.ConnectionToken), + string.IsNullOrWhiteSpace(opts.Password))) + throw new MemphisException("You have to connect with one of the following methods: connection token / password"); - var connectionId = Guid.NewGuid().ToString(); + var connectionId = Guid.NewGuid().ToString(); - var brokerConnOptions = ConnectionFactory.GetDefaultOptions(); - brokerConnOptions.Servers = new[] { $"{NormalizeHost(opts.Host)}:{opts.Port}" }; - brokerConnOptions.AllowReconnect = opts.Reconnect; - brokerConnOptions.ReconnectWait = opts.MaxReconnectIntervalMs; - brokerConnOptions.Name = $"{connectionId}::{opts.Username}"; - brokerConnOptions.Timeout = opts.TimeoutMs; - brokerConnOptions.Verbose = true; + var brokerConnOptions = ConnectionFactory.GetDefaultOptions(); + brokerConnOptions.Servers = new[] { $"{NormalizeHost(opts.Host)}:{opts.Port}" }; + brokerConnOptions.AllowReconnect = opts.Reconnect; + brokerConnOptions.ReconnectWait = opts.MaxReconnectIntervalMs; + brokerConnOptions.Name = $"{connectionId}::{opts.Username}"; + brokerConnOptions.Timeout = opts.TimeoutMs; + brokerConnOptions.Verbose = true; - if (!string.IsNullOrWhiteSpace(opts.ConnectionToken)) - { - brokerConnOptions.Token = opts.ConnectionToken; - } - else - { - brokerConnOptions.User = $"{opts.Username}${opts.AccountId}"; - brokerConnOptions.Password = opts.Password; - } + if (!string.IsNullOrWhiteSpace(opts.ConnectionToken)) + { + brokerConnOptions.Token = opts.ConnectionToken; + } + else + { + brokerConnOptions.User = $"{opts.Username}${opts.AccountId}"; + brokerConnOptions.Password = opts.Password; + } - if (opts.Tls != null) + if (opts.Tls != null) + { + brokerConnOptions.Secure = true; + brokerConnOptions.CheckCertificateRevocation = true; + if (opts.Tls.Certificate is not null) { - brokerConnOptions.Secure = true; - brokerConnOptions.CheckCertificateRevocation = true; - if (opts.Tls.Certificate is not null) - { - brokerConnOptions.AddCertificate(opts.Tls.Certificate); - } - else if (!string.IsNullOrWhiteSpace(opts.Tls.FileName)) + brokerConnOptions.AddCertificate(opts.Tls.Certificate); + } + else if (!string.IsNullOrWhiteSpace(opts.Tls.FileName)) + { + if (!string.IsNullOrWhiteSpace(opts.Tls.Password)) { - if (!string.IsNullOrWhiteSpace(opts.Tls.Password)) - { - brokerConnOptions.AddCertificate(new X509Certificate2(opts.Tls.FileName, opts.Tls.Password)); - } - else - { - brokerConnOptions.AddCertificate(opts.Tls.FileName); - } + brokerConnOptions.AddCertificate(new X509Certificate2(opts.Tls.FileName, opts.Tls.Password)); } - - if (opts.Tls.RemoteCertificateValidationCallback is not null) + else { - brokerConnOptions.TLSRemoteCertificationValidationCallback = opts.Tls.RemoteCertificateValidationCallback; + brokerConnOptions.AddCertificate(opts.Tls.FileName); } } - try - { - - SuppressDefaultNatsEventHandlerLogs(brokerConnOptions); - ConfigureEventHandlers(brokerConnOptions, opts); - - - IConnection brokerConnection = await EstablishBrokerManagerConnection(brokerConnOptions, cancellationToken); - IJetStream jetStreamContext = brokerConnection.CreateJetStreamContext(); - MemphisClient client = new( - brokerConnOptions, brokerConnection, - jetStreamContext, connectionId); - await client.ListenForSdkClientUpdate(); - return client; - } - catch (System.Exception e) + if (opts.Tls.RemoteCertificateValidationCallback is not null) { - throw new MemphisConnectionException("error occurred, when connecting memphis", e); + brokerConnOptions.TLSRemoteCertificationValidationCallback = opts.Tls.RemoteCertificateValidationCallback; } + } - static void ConfigureEventHandlers(Options options, ClientOptions clientOptions) - { - if (clientOptions.ClosedEventHandler is null) - { - options.ClosedEventHandler += DefaultErrorHandler; - return; - } - options.ClosedEventHandler += (sender, args) - => clientOptions.ClosedEventHandler.Invoke(sender, args); - } + try + { - static void DefaultErrorHandler(object sender, ConnEventArgs args) - { - if (args is { Error: { } }) - { - Console.WriteLine(new MemphisException(args.Error.ToString()).ToString()); - } - } + SuppressDefaultNatsEventHandlerLogs(brokerConnOptions); + ConfigureEventHandlers(brokerConnOptions, opts); - static void SuppressDefaultNatsEventHandlerLogs(Options options) - { - options.ClosedEventHandler += (_, _) => { }; - options.ServerDiscoveredEventHandler += (_, _) => { }; - options.DisconnectedEventHandler += (_, _) => { }; - options.ReconnectedEventHandler += (_, _) => { }; - options.LameDuckModeEventHandler += (_, _) => { }; - options.AsyncErrorEventHandler += (_, _) => { }; - options.HeartbeatAlarmEventHandler += (_, _) => { }; - options.UnhandledStatusEventHandler += (_, _) => { }; - options.FlowControlProcessedEventHandler += (_, _) => { }; - } + + IConnection brokerConnection = await EstablishBrokerManagerConnection(brokerConnOptions, cancellationToken); + IJetStream jetStreamContext = brokerConnection.CreateJetStreamContext(); + MemphisClient client = new( + brokerConnOptions, brokerConnection, + jetStreamContext, connectionId); + await client.ListenForSdkClientUpdate(); + return client; + } + catch (System.Exception e) + { + throw new MemphisConnectionException("error occurred, when connecting memphis", e); } - /// - /// This method is used to connect to Memphis Broker. - /// It attempts to establish connection using accountId, and if it fails, it tries to connect using username(with out accountId). - /// - /// Broker Options used to customize behavior of client used to connect Memphis - /// An object connected to the Memphis server. - private static async Task EstablishBrokerManagerConnection(Options brokerOptions, CancellationToken cancellationToken = default) + static void ConfigureEventHandlers(Options options, ClientOptions clientOptions) { - if (string.IsNullOrWhiteSpace(brokerOptions.User)) + if (clientOptions.ClosedEventHandler is null) { - await DelayLocalConnection(brokerOptions.Servers); - return new ConnectionFactory() - .CreateConnection(brokerOptions); + options.ClosedEventHandler += DefaultErrorHandler; + return; } + options.ClosedEventHandler += (sender, args) + => clientOptions.ClosedEventHandler.Invoke(sender, args); + } - try - { - return new ConnectionFactory() - .CreateConnection(brokerOptions); - } - catch (System.Exception ex) + static void DefaultErrorHandler(object sender, ConnEventArgs args) + { + if (args is { Error: { } }) { - if (ex.Message.IndexOf("Authorization Violation", StringComparison.OrdinalIgnoreCase) >= 0) - { - var pattern = @"(?[^$]*)(?\$)(?.+)"; - if (Regex.Match(brokerOptions.User, pattern) is { Success: true } match) - { - await DelayLocalConnection(brokerOptions.Servers); - brokerOptions.User = match.Groups["username"].Value; - return new ConnectionFactory() - .CreateConnection(brokerOptions); - } - } - throw new MemphisException(ex.ToString()); + Console.WriteLine(new MemphisException(args.Error.ToString()).ToString()); } } - /// - /// Delay local connection for handling bad quality networks like port fwd - /// - /// List of servers - /// Task - private static async Task DelayLocalConnection(string[] servers) + static void SuppressDefaultNatsEventHandlerLogs(Options options) { - if (servers is { Length: > 0 } && IsLocalConnection(servers[0])) - await Task.Delay((int)TimeSpan.FromSeconds(1).TotalMilliseconds); + options.ClosedEventHandler += (_, _) => { }; + options.ServerDiscoveredEventHandler += (_, _) => { }; + options.DisconnectedEventHandler += (_, _) => { }; + options.ReconnectedEventHandler += (_, _) => { }; + options.LameDuckModeEventHandler += (_, _) => { }; + options.AsyncErrorEventHandler += (_, _) => { }; + options.HeartbeatAlarmEventHandler += (_, _) => { }; + options.UnhandledStatusEventHandler += (_, _) => { }; + options.FlowControlProcessedEventHandler += (_, _) => { }; } + } - /// - /// Check if connection is local - /// - /// Host - /// True if connection is local, otherwise false - private static bool IsLocalConnection(string host) + /// + /// This method is used to connect to Memphis Broker. + /// It attempts to establish connection using accountId, and if it fails, it tries to connect using username(with out accountId). + /// + /// Broker Options used to customize behavior of client used to connect Memphis + /// An object connected to the Memphis server. + private static async Task EstablishBrokerManagerConnection(Options brokerOptions, CancellationToken cancellationToken = default) + { + if (string.IsNullOrWhiteSpace(brokerOptions.User)) { - return - !string.IsNullOrWhiteSpace(host) && - host.Contains("localhost"); + await DelayLocalConnection(brokerOptions.Servers); + return new ConnectionFactory() + .CreateConnection(brokerOptions); } - /// - /// XNOR operator - /// - /// First boolean value - /// Second boolean value - /// True if both values are equal, otherwise false - private static bool XNOR(bool a, bool b) - => a == b; - - /// - /// Normalize host - /// - /// - /// Remove http:// or https:// from host - /// - /// Host - /// Normalized host - internal static string NormalizeHost(string host) + try + { + return new ConnectionFactory() + .CreateConnection(brokerOptions); + } + catch (System.Exception ex) { - return Regex.Replace(host, "^http(s?)://", string.Empty); + if (ex.Message.IndexOf("Authorization Violation", StringComparison.OrdinalIgnoreCase) >= 0) + { + var pattern = @"(?[^$]*)(?\$)(?.+)"; + if (Regex.Match(brokerOptions.User, pattern) is { Success: true } match) + { + await DelayLocalConnection(brokerOptions.Servers); + brokerOptions.User = match.Groups["username"].Value; + return new ConnectionFactory() + .CreateConnection(brokerOptions); + } + } + throw new MemphisException(ex.ToString()); } } + + /// + /// Delay local connection for handling bad quality networks like port fwd + /// + /// List of servers + /// Task + private static async Task DelayLocalConnection(string[] servers) + { + if (servers is { Length: > 0 } && IsLocalConnection(servers[0])) + await Task.Delay((int)TimeSpan.FromSeconds(1).TotalMilliseconds); + } + + /// + /// Check if connection is local + /// + /// Host + /// True if connection is local, otherwise false + private static bool IsLocalConnection(string host) + { + return + !string.IsNullOrWhiteSpace(host) && + host.Contains("localhost"); + } + + /// + /// XNOR operator + /// + /// First boolean value + /// Second boolean value + /// True if both values are equal, otherwise false + private static bool XNOR(bool a, bool b) + => a == b; + + /// + /// Normalize host + /// + /// + /// Remove http:// or https:// from host + /// + /// Host + /// Normalized host + internal static string NormalizeHost(string host) + { + return Regex.Replace(host, "^http(s?)://", string.Empty); + } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Request/AttachSchemaRequest.cs b/src/Memphis.Client/Models/Request/AttachSchemaRequest.cs index 2752753..df0c65d 100644 --- a/src/Memphis.Client/Models/Request/AttachSchemaRequest.cs +++ b/src/Memphis.Client/Models/Request/AttachSchemaRequest.cs @@ -1,17 +1,14 @@ -using System.Runtime.Serialization; - #nullable disable -namespace Memphis.Client.Models.Request +namespace Memphis.Client.Models.Request; + +[DataContract] +internal sealed class AttachSchemaRequest { - [DataContract] - internal sealed class AttachSchemaRequest - { - [DataMember(Name = "name")] - public string SchemaName { get; set; } + [DataMember(Name = "name")] + public string SchemaName { get; set; } - [DataMember(Name = "station_name")] - public string StationName { get; set; } - [DataMember(Name = "username")] - public string UserName { get; set; } - } + [DataMember(Name = "station_name")] + public string StationName { get; set; } + [DataMember(Name = "username")] + public string UserName { get; set; } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Request/CreateConsumerRequest.cs b/src/Memphis.Client/Models/Request/CreateConsumerRequest.cs index f5b5d64..f2bd65d 100644 --- a/src/Memphis.Client/Models/Request/CreateConsumerRequest.cs +++ b/src/Memphis.Client/Models/Request/CreateConsumerRequest.cs @@ -1,46 +1,43 @@ -using System.Runtime.Serialization; - #nullable disable -namespace Memphis.Client.Models.Request +namespace Memphis.Client.Models.Request; + +[DataContract] +internal sealed class CreateConsumerRequest { - [DataContract] - internal sealed class CreateConsumerRequest - { - [DataMember(Name = "name")] - public string ConsumerName { get; set; } - - [DataMember(Name = "station_name")] - public string StationName { get; set; } - - [DataMember(Name = "connection_id")] - public string ConnectionId { get; set; } - - [DataMember(Name = "consumer_type")] - public string ConsumerType { get; set; } - - [DataMember(Name = "consumers_group")] - public string ConsumerGroup { get; set; } - - [DataMember(Name = "max_ack_time_ms")] - public int MaxAckTimeMs { get; set; } - - [DataMember(Name = "max_msg_deliveries")] - public int MaxMsgCountForDelivery { get; set; } - - [DataMember(Name = "username")] - public string UserName { get; set; } - - [DataMember(Name = "start_consume_from_sequence")] - public int StartConsumeFromSequence { get; set; } - - [DataMember(Name = "last_messages")] - public int LastMessages { get; set; } + [DataMember(Name = "name")] + public string ConsumerName { get; set; } + + [DataMember(Name = "station_name")] + public string StationName { get; set; } + + [DataMember(Name = "connection_id")] + public string ConnectionId { get; set; } + + [DataMember(Name = "consumer_type")] + public string ConsumerType { get; set; } + + [DataMember(Name = "consumers_group")] + public string ConsumerGroup { get; set; } + + [DataMember(Name = "max_ack_time_ms")] + public int MaxAckTimeMs { get; set; } + + [DataMember(Name = "max_msg_deliveries")] + public int MaxMsgCountForDelivery { get; set; } + + [DataMember(Name = "username")] + public string UserName { get; set; } + + [DataMember(Name = "start_consume_from_sequence")] + public int StartConsumeFromSequence { get; set; } + + [DataMember(Name = "last_messages")] + public int LastMessages { get; set; } - [DataMember(Name = "req_version")] - public int RequestVersion { get; set; } + [DataMember(Name = "req_version")] + public int RequestVersion { get; set; } - [DataMember(Name = "app_id")] - public string ApplicationId { get; set; } - } + [DataMember(Name = "app_id")] + public string ApplicationId { get; set; } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Request/CreateProducerRequest.cs b/src/Memphis.Client/Models/Request/CreateProducerRequest.cs index 60a8091..ac2b6d7 100644 --- a/src/Memphis.Client/Models/Request/CreateProducerRequest.cs +++ b/src/Memphis.Client/Models/Request/CreateProducerRequest.cs @@ -1,33 +1,30 @@ -using System.Runtime.Serialization; - #nullable disable -namespace Memphis.Client.Models.Request +namespace Memphis.Client.Models.Request; + +[DataContract] +internal sealed class CreateProducerRequest { - [DataContract] - internal sealed class CreateProducerRequest - { - [DataMember(Name = "name")] - public string ProducerName { get; set; } + [DataMember(Name = "name")] + public string ProducerName { get; set; } - [DataMember(Name = "station_name")] - public string StationName { get; set; } + [DataMember(Name = "station_name")] + public string StationName { get; set; } - [DataMember(Name = "connection_id")] - public string ConnectionId { get; set; } + [DataMember(Name = "connection_id")] + public string ConnectionId { get; set; } - [DataMember(Name = "producer_type")] - public string ProducerType { get; set; } + [DataMember(Name = "producer_type")] + public string ProducerType { get; set; } - [DataMember(Name = "req_version")] - public int RequestVersion { get; set; } + [DataMember(Name = "req_version")] + public int RequestVersion { get; set; } - [DataMember(Name = "username")] - public string UserName { get; set; } + [DataMember(Name = "username")] + public string UserName { get; set; } - [DataMember(Name = "app_id")] - public string ApplicationId { get; set; } + [DataMember(Name = "app_id")] + public string ApplicationId { get; set; } - - } + } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Request/CreateSchemaRequest.cs b/src/Memphis.Client/Models/Request/CreateSchemaRequest.cs index be16f41..66b0f3c 100644 --- a/src/Memphis.Client/Models/Request/CreateSchemaRequest.cs +++ b/src/Memphis.Client/Models/Request/CreateSchemaRequest.cs @@ -1,6 +1,4 @@ -using System.Runtime.Serialization; - -namespace Memphis.Client.Models.Request; +namespace Memphis.Client.Models.Request; #nullable disable diff --git a/src/Memphis.Client/Models/Request/CreateStationRequest.cs b/src/Memphis.Client/Models/Request/CreateStationRequest.cs index 1dba0a6..b87891a 100644 --- a/src/Memphis.Client/Models/Request/CreateStationRequest.cs +++ b/src/Memphis.Client/Models/Request/CreateStationRequest.cs @@ -1,5 +1,3 @@ -using System.Runtime.Serialization; - #nullable disable namespace Memphis.Client.Models.Request; diff --git a/src/Memphis.Client/Models/Request/DetachSchemaRequest.cs b/src/Memphis.Client/Models/Request/DetachSchemaRequest.cs index de07588..de65dba 100644 --- a/src/Memphis.Client/Models/Request/DetachSchemaRequest.cs +++ b/src/Memphis.Client/Models/Request/DetachSchemaRequest.cs @@ -1,15 +1,12 @@ -using System.Runtime.Serialization; - #nullable disable -namespace Memphis.Client.Models.Request +namespace Memphis.Client.Models.Request; + +[DataContract] +internal sealed class DetachSchemaRequest { - [DataContract] - internal sealed class DetachSchemaRequest - { - [DataMember(Name = "station_name")] - public string StationName { get; set; } - [DataMember(Name = "username")] - public string UserName { get; set; } - } + [DataMember(Name = "station_name")] + public string StationName { get; set; } + [DataMember(Name = "username")] + public string UserName { get; set; } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Request/DlsMessage.cs b/src/Memphis.Client/Models/Request/DlsMessage.cs index 551be41..b0cbae3 100644 --- a/src/Memphis.Client/Models/Request/DlsMessage.cs +++ b/src/Memphis.Client/Models/Request/DlsMessage.cs @@ -1,41 +1,37 @@ -using System.Collections.Generic; -using System.Runtime.Serialization; - #nullable disable -namespace Memphis.Client.Models.Request +namespace Memphis.Client.Models.Request; + +[DataContract] +internal class ProducerDetails { - [DataContract] - internal class ProducerDetails - { - [DataMember(Name = "name")] - public string Name { get; set; } - [DataMember(Name = "connection_id")] - public string ConnectionId { get; set; } - } + [DataMember(Name = "name")] + public string Name { get; set; } + [DataMember(Name = "connection_id")] + public string ConnectionId { get; set; } +} - [DataContract] - internal class MessagePayloadDls - { - [DataMember(Name = "size")] - public int Size { get; set; } - [DataMember(Name = "data")] - public string Data { get; set; } - [DataMember(Name = "headers")] - public Dictionary Headers { get; set; } - } +[DataContract] +internal class MessagePayloadDls +{ + [DataMember(Name = "size")] + public int Size { get; set; } + [DataMember(Name = "data")] + public string Data { get; set; } + [DataMember(Name = "headers")] + public Dictionary Headers { get; set; } +} - [DataContract] - internal class DlsMessage - { - [DataMember(Name = "station_name")] - public string StationName { get; set; } - [DataMember(Name = "producer")] - public ProducerDetails Producer { get; set; } - [DataMember(Name = "message")] - public MessagePayloadDls Message { get; set; } +[DataContract] +internal class DlsMessage +{ + [DataMember(Name = "station_name")] + public string StationName { get; set; } + [DataMember(Name = "producer")] + public ProducerDetails Producer { get; set; } + [DataMember(Name = "message")] + public MessagePayloadDls Message { get; set; } - [DataMember(Name = "validation_error")] - public string ValidationError { get; set; } - } + [DataMember(Name = "validation_error")] + public string ValidationError { get; set; } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Request/NotificationRequest.cs b/src/Memphis.Client/Models/Request/NotificationRequest.cs index ba2901c..431146d 100644 --- a/src/Memphis.Client/Models/Request/NotificationRequest.cs +++ b/src/Memphis.Client/Models/Request/NotificationRequest.cs @@ -1,23 +1,20 @@ -using System.Runtime.Serialization; - #nullable disable -namespace Memphis.Client.Models.Request +namespace Memphis.Client.Models.Request; + +[DataContract] +internal sealed class NotificationRequest { - [DataContract] - internal sealed class NotificationRequest - { - [DataMember(Name = "title", IsRequired = true)] - public string Title { get; set; } - - [DataMember(Name = "msg", IsRequired = true)] - public string Message { get; set; } - - [DataMember(Name = "type", IsRequired = true)] - public string Type { get; set; } + [DataMember(Name = "title", IsRequired = true)] + public string Title { get; set; } + + [DataMember(Name = "msg", IsRequired = true)] + public string Message { get; set; } + + [DataMember(Name = "type", IsRequired = true)] + public string Type { get; set; } - [DataMember(Name = "code")] - public string Code { get; set; } + [DataMember(Name = "code")] + public string Code { get; set; } - } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Request/PmAckMsg.cs b/src/Memphis.Client/Models/Request/PmAckMsg.cs index c2b4f85..c113842 100644 --- a/src/Memphis.Client/Models/Request/PmAckMsg.cs +++ b/src/Memphis.Client/Models/Request/PmAckMsg.cs @@ -1,16 +1,13 @@ -using System.Runtime.Serialization; +#nullable disable -#nullable disable +namespace Memphis.Client.Models.Request; -namespace Memphis.Client.Models.Request +[DataContract] +internal sealed class PmAckMsg { - [DataContract] - internal sealed class PmAckMsg - { - [DataMember(Name = "id")] - public string Id { get; set; } - - [DataMember(Name = "cg_name")] - public string ConsumerGroupName { get; set; } - } + [DataMember(Name = "id")] + public string Id { get; set; } + + [DataMember(Name = "cg_name")] + public string ConsumerGroupName { get; set; } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Request/RemoveConsumerRequest.cs b/src/Memphis.Client/Models/Request/RemoveConsumerRequest.cs index 083897b..0fc0c85 100644 --- a/src/Memphis.Client/Models/Request/RemoveConsumerRequest.cs +++ b/src/Memphis.Client/Models/Request/RemoveConsumerRequest.cs @@ -1,26 +1,23 @@ -using System.Runtime.Serialization; - #nullable disable -namespace Memphis.Client.Models.Request +namespace Memphis.Client.Models.Request; + +[DataContract] +internal sealed class RemoveConsumerRequest { - [DataContract] - internal sealed class RemoveConsumerRequest - { - [DataMember(Name = "name")] - public string ConsumerName { get; set; } + [DataMember(Name = "name")] + public string ConsumerName { get; set; } - [DataMember(Name = "station_name")] - public string StationName { get; set; } + [DataMember(Name = "station_name")] + public string StationName { get; set; } - [DataMember(Name = "connection_id")] - public string ConnectionId { get; set; } + [DataMember(Name = "connection_id")] + public string ConnectionId { get; set; } - [DataMember(Name = "username")] - public string Username { get; set; } + [DataMember(Name = "username")] + public string Username { get; set; } - [DataMember(Name = "req_version")] - public int RequestVersion { get; set; } + [DataMember(Name = "req_version")] + public int RequestVersion { get; set; } - } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Request/RemoveProducerRequest.cs b/src/Memphis.Client/Models/Request/RemoveProducerRequest.cs index 54bad68..20328a6 100644 --- a/src/Memphis.Client/Models/Request/RemoveProducerRequest.cs +++ b/src/Memphis.Client/Models/Request/RemoveProducerRequest.cs @@ -1,25 +1,22 @@ -using System.Runtime.Serialization; - #nullable disable -namespace Memphis.Client.Models.Request +namespace Memphis.Client.Models.Request; + +[DataContract] +internal sealed class RemoveProducerRequest { - [DataContract] - internal sealed class RemoveProducerRequest - { - [DataMember(Name = "name")] - public string ProducerName { get; set; } + [DataMember(Name = "name")] + public string ProducerName { get; set; } - [DataMember(Name = "station_name")] - public string StationName { get; set; } + [DataMember(Name = "station_name")] + public string StationName { get; set; } - [DataMember(Name = "connection_id")] - public string ConnectionId { get; set; } + [DataMember(Name = "connection_id")] + public string ConnectionId { get; set; } - [DataMember(Name = "username")] - public string Username { get; set; } + [DataMember(Name = "username")] + public string Username { get; set; } - [DataMember(Name = "req_version")] - public int RequestVersion { get; set; } - } + [DataMember(Name = "req_version")] + public int RequestVersion { get; set; } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Request/RemoveStationRequest.cs b/src/Memphis.Client/Models/Request/RemoveStationRequest.cs index c8e4a0a..d079f09 100644 --- a/src/Memphis.Client/Models/Request/RemoveStationRequest.cs +++ b/src/Memphis.Client/Models/Request/RemoveStationRequest.cs @@ -1,15 +1,12 @@ -using System.Runtime.Serialization; - #nullable disable -namespace Memphis.Client.Models.Request +namespace Memphis.Client.Models.Request; + +[DataContract] +internal class RemoveStationRequest { - [DataContract] - internal class RemoveStationRequest - { - [DataMember(Name = "station_name")] - public string StationName { get; set; } - [DataMember(Name = "username")] - public string Username { get; set; } - } + [DataMember(Name = "station_name")] + public string StationName { get; set; } + [DataMember(Name = "username")] + public string Username { get; set; } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Response/CreateConsumerResponse.cs b/src/Memphis.Client/Models/Response/CreateConsumerResponse.cs index 12e80d5..7d3d9f5 100644 --- a/src/Memphis.Client/Models/Response/CreateConsumerResponse.cs +++ b/src/Memphis.Client/Models/Response/CreateConsumerResponse.cs @@ -1,6 +1,4 @@ -using System.Runtime.Serialization; - -namespace Memphis.Client.Models.Response; +namespace Memphis.Client.Models.Response; #nullable disable diff --git a/src/Memphis.Client/Models/Response/CreateProducerResponse.cs b/src/Memphis.Client/Models/Response/CreateProducerResponse.cs index 4ac561b..e4ae75f 100644 --- a/src/Memphis.Client/Models/Response/CreateProducerResponse.cs +++ b/src/Memphis.Client/Models/Response/CreateProducerResponse.cs @@ -1,4 +1,3 @@ -using System.Runtime.Serialization; #nullable disable namespace Memphis.Client.Models.Response; @@ -20,10 +19,21 @@ internal sealed class CreateProducerResponse [DataMember(Name = "send_notification")] public bool SendNotification { get; set; } - + [DataMember(Name = "schemaverse_to_dls")] public bool SchemaVerseToDls { get; set; } [DataMember(Name = "partitions_update")] public PartitionsUpdate PartitionsUpdate { get; set; } + + [DataMember(Name = "station_version")] + public int StationVersion { get; set; } + + [DataMember(Name = "station_partitions_first_functions")] + public Dictionary StationPartitionsFirstFunctions { get; set; } + + public CreateProducerResponse() + { + StationPartitionsFirstFunctions = new(); + } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Response/CreateSchemaResponse.cs b/src/Memphis.Client/Models/Response/CreateSchemaResponse.cs index 1c9644c..26a3c89 100644 --- a/src/Memphis.Client/Models/Response/CreateSchemaResponse.cs +++ b/src/Memphis.Client/Models/Response/CreateSchemaResponse.cs @@ -1,6 +1,4 @@ -using System.Runtime.Serialization; - -namespace Memphis.Client.Models.Response; +namespace Memphis.Client.Models.Response; #nullable disable [DataContract] diff --git a/src/Memphis.Client/Models/Response/FunctionsDetails.cs b/src/Memphis.Client/Models/Response/FunctionsDetails.cs new file mode 100644 index 0000000..bb554b1 --- /dev/null +++ b/src/Memphis.Client/Models/Response/FunctionsDetails.cs @@ -0,0 +1,25 @@ +namespace Memphis.Client; + +[DataContract] +internal class FunctionsUpdate +{ + [DataMember(Name = "functions")] + public Dictionary Functions { get; set; } + public FunctionsUpdate() + { + Functions = new(); + } +} + + +[DataContract] +internal class FunctionsDetails +{ + [DataMember(Name = "partitions_functions")] + public Dictionary PartitionsFunctions { get; set; } + + public FunctionsDetails() + { + PartitionsFunctions = new(); + } +} diff --git a/src/Memphis.Client/Models/Response/ProducerSchemaUpdate.cs b/src/Memphis.Client/Models/Response/ProducerSchemaUpdate.cs index da25767..72ceb27 100644 --- a/src/Memphis.Client/Models/Response/ProducerSchemaUpdate.cs +++ b/src/Memphis.Client/Models/Response/ProducerSchemaUpdate.cs @@ -1,22 +1,19 @@ -using System.Runtime.Serialization; +#nullable disable -#nullable disable +namespace Memphis.Client.Models.Response; -namespace Memphis.Client.Models.Response +[DataContract] +internal sealed class ProducerSchemaUpdate { - [DataContract] - internal sealed class ProducerSchemaUpdate - { - [DataMember(Name = "UpdateType")] - public string UpdateType { get; set; } + [DataMember(Name = "UpdateType")] + public string UpdateType { get; set; } - [DataMember(Name = "init")] - public SchemaUpdateInit Init { get; set; } - } + [DataMember(Name = "init")] + public SchemaUpdateInit Init { get; set; } +} - internal static class ProducerSchemaUpdateType - { - public const int SCHEMA_UPDATE_TYPE_INIT = 1; - public const int SCHEMA_UPDATE_TYPE_DROP = 2; - } +internal static class ProducerSchemaUpdateType +{ + public const int SCHEMA_UPDATE_TYPE_INIT = 1; + public const int SCHEMA_UPDATE_TYPE_DROP = 2; } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Response/ProducerSchemaUpdateVersion.cs b/src/Memphis.Client/Models/Response/ProducerSchemaUpdateVersion.cs index 4278b60..fa4b283 100644 --- a/src/Memphis.Client/Models/Response/ProducerSchemaUpdateVersion.cs +++ b/src/Memphis.Client/Models/Response/ProducerSchemaUpdateVersion.cs @@ -1,22 +1,19 @@ -using System.Runtime.Serialization; - #nullable disable -namespace Memphis.Client.Models.Response +namespace Memphis.Client.Models.Response; + +[DataContract] +internal sealed class ProducerSchemaUpdateVersion { - [DataContract] - internal sealed class ProducerSchemaUpdateVersion - { - [DataMember(Name = "version_number")] - public string VersionNumber { get; set; } - - [DataMember(Name = "descriptor")] - public string Descriptor { get; set; } - - [DataMember(Name = "schema_content")] - public string Content { get; set; } - - [DataMember(Name = "message_struct_name")] - public string MessageStructName { get; set; } - } + [DataMember(Name = "version_number")] + public string VersionNumber { get; set; } + + [DataMember(Name = "descriptor")] + public string Descriptor { get; set; } + + [DataMember(Name = "schema_content")] + public string Content { get; set; } + + [DataMember(Name = "message_struct_name")] + public string MessageStructName { get; set; } } \ No newline at end of file diff --git a/src/Memphis.Client/Models/Response/SchemaUpdateInit.cs b/src/Memphis.Client/Models/Response/SchemaUpdateInit.cs index d215616..6756edd 100644 --- a/src/Memphis.Client/Models/Response/SchemaUpdateInit.cs +++ b/src/Memphis.Client/Models/Response/SchemaUpdateInit.cs @@ -1,5 +1,3 @@ -using System.Runtime.Serialization; - #nullable disable namespace Memphis.Client.Models.Response; diff --git a/src/Memphis.Client/Models/Response/SdkClientsUpdate.cs b/src/Memphis.Client/Models/Response/SdkClientsUpdate.cs index de0a3cc..32b8dad 100644 --- a/src/Memphis.Client/Models/Response/SdkClientsUpdate.cs +++ b/src/Memphis.Client/Models/Response/SdkClientsUpdate.cs @@ -1,20 +1,16 @@ - -using System.Runtime.Serialization; - #nullable disable -namespace Memphis.Client.Models.Response +namespace Memphis.Client.Models.Response; + +[DataContract] +internal class SdkClientsUpdate { - [DataContract] - internal class SdkClientsUpdate - { - [DataMember(Name = "station_name")] - public string StationName { get; set; } + [DataMember(Name = "station_name")] + public string StationName { get; set; } - [DataMember(Name = "type")] - public string Type { get; set; } + [DataMember(Name = "type")] + public string Type { get; set; } - [DataMember(Name = "update")] - public bool? Update { get; set; } - } + [DataMember(Name = "update")] + public bool? Update { get; set; } } \ No newline at end of file diff --git a/src/Memphis.Client/Options.cs b/src/Memphis.Client/Options.cs index 64c1175..d086358 100644 --- a/src/Memphis.Client/Options.cs +++ b/src/Memphis.Client/Options.cs @@ -1,4 +1,3 @@ -using System; using System.Net.Security; using System.Security.Cryptography.X509Certificates; diff --git a/src/Memphis.Client/Producer/IMemphisProducer.cs b/src/Memphis.Client/Producer/IMemphisProducer.cs index 768ed03..8510c1b 100644 --- a/src/Memphis.Client/Producer/IMemphisProducer.cs +++ b/src/Memphis.Client/Producer/IMemphisProducer.cs @@ -1,8 +1,4 @@ -using System; -using System.Collections.Specialized; -using System.Threading.Tasks; - -namespace Memphis.Client.Producer; +namespace Memphis.Client.Producer; public interface IMemphisProducer { diff --git a/src/Memphis.Client/Producer/MemphisProducer.cs b/src/Memphis.Client/Producer/MemphisProducer.cs index 2a582d5..918718c 100644 --- a/src/Memphis.Client/Producer/MemphisProducer.cs +++ b/src/Memphis.Client/Producer/MemphisProducer.cs @@ -1,18 +1,6 @@ -using System; -using System.Collections.Generic; -using System.Collections.Specialized; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Memphis.Client.Constants; -using Memphis.Client.Exception; using Memphis.Client.Helper; using Memphis.Client.Models.Request; using Memphis.Client.Station; -using NATS.Client; -using NATS.Client.Internals; -using NATS.Client.JetStream; -using Newtonsoft.Json; #pragma warning disable CS8602 // Possible null reference argument. @@ -26,7 +14,6 @@ public sealed class MemphisProducer : IMemphisProducer internal string StationName { get => _stationName; } internal string ProducerName { get => _producerName; } - private readonly string _realName; private readonly string _producerName; private readonly string _stationName; @@ -83,38 +70,36 @@ internal async Task ProduceToBrokerAsync( await EnsureMessageIsValid(message, headers); string streamName = _internalStationName; - if (_memphisClient.StationPartitions.TryGetValue(_internalStationName, out var partitions)) + if (_memphisClient.StationPartitions.TryGetValue(_internalStationName, out var partitions) && + partitions != null && partitions.PartitionsList != null) { - if (partitions != null && partitions.PartitionsList != null) + if (partitions.PartitionsList.Length == 1) { - if (partitions.PartitionsList.Length == 1) - { - streamName = $"{_internalStationName}${partitions.PartitionsList[0]}"; - } - else if (partitions.PartitionsList.Length > 1) + streamName = $"{_internalStationName}${partitions.PartitionsList[0]}"; + } + else if (partitions.PartitionsList.Length > 1) + { + if (partitionNumber > 0 && !string.IsNullOrWhiteSpace(partitionKey)) + throw new MemphisException("Can not use both partition number and partition key"); + + int partition = PartitionResolver.Resolve(); + + if (!string.IsNullOrWhiteSpace(partitionKey)) + partition = _memphisClient.GetPartitionFromKey(partitionKey, _internalStationName); + + else if (partitionNumber > 0) { - if(partitionNumber > 0 && !string.IsNullOrWhiteSpace(partitionKey)) - throw new MemphisException("Can not use both partition number and partition key"); - - int partition = PartitionResolver.Resolve(); - - if(!string.IsNullOrWhiteSpace(partitionKey)) - partition = _memphisClient.GetPartitionFromKey(partitionKey, _internalStationName); - - else if(partitionNumber > 0) - { - _memphisClient.EnsurePartitionNumberIsValid(partitionNumber, _internalStationName); - partition = partitionNumber; - } - - streamName = $"{_internalStationName}${partition}"; + _memphisClient.EnsurePartitionNumberIsValid(partitionNumber, _internalStationName); + partition = partitionNumber; } + + streamName = $"{_internalStationName}${partition}"; } } var msg = new Msg { - Subject = $"{streamName}.final", + Subject = FullSubjectName(), Data = message, Header = new MsgHeader { @@ -152,7 +137,7 @@ internal async Task ProduceToBrokerAsync( throw new MemphisException(publishAck.ErrorDescription); } } - catch (NATS.Client.NATSNoRespondersException) + catch (NATSNoRespondersException) { /// /// This exception is thrown when there are no station available to produce the message. @@ -174,6 +159,18 @@ async Task ReInitializeProducerAndRetry(byte[] message, NameValueCollection head { await _memphisClient.ProduceAsync(this, message, headers, ackWaitMs, asyncProduceAck, messageId, partitionKey); } + + string FullSubjectName() + { + string partitionString = streamName.Split('$')[1]; + int partitionNumber = Convert.ToInt32(partitionString); + if (_memphisClient.FunctionDetails.TryGetValue(_internalStationName, out var functionDetails) && + functionDetails.PartitionsFunctions.TryGetValue(partitionNumber, out var functionId)) + { + return $"{streamName}.functions.{functionId}"; + } + return $"{streamName}.final"; + } } diff --git a/src/Memphis.Client/Producer/MemphisProducerOptions.cs b/src/Memphis.Client/Producer/MemphisProducerOptions.cs index 4e08bcd..6e4ff03 100644 --- a/src/Memphis.Client/Producer/MemphisProducerOptions.cs +++ b/src/Memphis.Client/Producer/MemphisProducerOptions.cs @@ -1,6 +1,4 @@ -using System; - -#nullable disable +#nullable disable namespace Memphis.Client.Producer; diff --git a/src/Memphis.Client/Station/IMemphisStation.cs b/src/Memphis.Client/Station/IMemphisStation.cs index eb1e1f9..9acb84d 100644 --- a/src/Memphis.Client/Station/IMemphisStation.cs +++ b/src/Memphis.Client/Station/IMemphisStation.cs @@ -1,6 +1,4 @@ -using System.Threading.Tasks; - -namespace Memphis.Client.Station; +namespace Memphis.Client.Station; public interface IMemphisStation { diff --git a/src/Memphis.Client/Station/MemphisStation.cs b/src/Memphis.Client/Station/MemphisStation.cs index 579a950..d6d9977 100644 --- a/src/Memphis.Client/Station/MemphisStation.cs +++ b/src/Memphis.Client/Station/MemphisStation.cs @@ -1,6 +1,4 @@ -using System; -using System.Threading.Tasks; -using Memphis.Client.Helper; +using Memphis.Client.Helper; namespace Memphis.Client.Station; diff --git a/src/Memphis.Client/Station/StationPartitionResolver.cs b/src/Memphis.Client/Station/StationPartitionResolver.cs index c762727..d310197 100644 --- a/src/Memphis.Client/Station/StationPartitionResolver.cs +++ b/src/Memphis.Client/Station/StationPartitionResolver.cs @@ -1,7 +1,4 @@ -using System; -using System.Threading; - -namespace Memphis.Client.Station; +namespace Memphis.Client.Station; internal enum StationPartitionResolverType { diff --git a/src/Memphis.Client/Validators/AvroValidator.cs b/src/Memphis.Client/Validators/AvroValidator.cs index 2fc23a1..ef5d931 100644 --- a/src/Memphis.Client/Validators/AvroValidator.cs +++ b/src/Memphis.Client/Validators/AvroValidator.cs @@ -1,9 +1,4 @@ -using System.Threading.Tasks; -using SolTechnology.Avro; - -using Memphis.Client.Exception; -using System.Runtime.Serialization; -using Newtonsoft.Json; +using SolTechnology.Avro; namespace Memphis.Client.Validators; diff --git a/src/Memphis.Client/Validators/GraphqlValidator.cs b/src/Memphis.Client/Validators/GraphqlValidator.cs index 37e0c34..791d8b5 100644 --- a/src/Memphis.Client/Validators/GraphqlValidator.cs +++ b/src/Memphis.Client/Validators/GraphqlValidator.cs @@ -1,80 +1,74 @@ -using System; -using System.Collections.Concurrent; -using System.Linq; -using System.Text; -using System.Threading.Tasks; +using System.Linq; using GraphQL; using GraphQL.Types; -using Memphis.Client.Exception; -namespace Memphis.Client.Validators +namespace Memphis.Client.Validators; + +internal class GraphqlValidator : ISchemaValidator { - internal class GraphqlValidator : ISchemaValidator - { - private readonly IDocumentExecuter _documentExecutor; - private readonly ConcurrentDictionary _schemaCache; + private readonly IDocumentExecuter _documentExecutor; + private readonly ConcurrentDictionary _schemaCache; - public GraphqlValidator() - { - this._documentExecutor = new DocumentExecuter(); - this._schemaCache = new ConcurrentDictionary(); - } + public GraphqlValidator() + { + this._documentExecutor = new DocumentExecuter(); + this._schemaCache = new ConcurrentDictionary(); + } - public async Task ValidateAsync(byte[] messageToValidate, string schemaName) + public async Task ValidateAsync(byte[] messageToValidate, string schemaName) + { + if (_schemaCache.TryGetValue(schemaName, out ISchema schemaObj)) { - if (_schemaCache.TryGetValue(schemaName, out ISchema schemaObj)) + var queryToValidate = Encoding.UTF8.GetString(messageToValidate); + var execResult = await _documentExecutor.ExecuteAsync(_ => { - var queryToValidate = Encoding.UTF8.GetString(messageToValidate); - var execResult = await _documentExecutor.ExecuteAsync(_ => - { - _.Schema = schemaObj; - _.Query = queryToValidate; - }); + _.Schema = schemaObj; + _.Query = queryToValidate; + }); - if (execResult.Errors?.Count > 0) - { - var errMsg = string.Join("; ", execResult.Errors? - .Select(err => $"Code: {err.Code}, Message: {err.Message}")); - - throw new MemphisSchemaValidationException($"Schema validation has failed: \n {errMsg}"); - } + if (execResult.Errors?.Count > 0) + { + var errMsg = string.Join("; ", execResult.Errors? + .Select(err => $"Code: {err.Code}, Message: {err.Message}")); - return; + throw new MemphisSchemaValidationException($"Schema validation has failed: \n {errMsg}"); } - throw new MemphisSchemaValidationException($"Schema: {schemaName} not found in local cache"); + return; } - public bool ParseAndStore(string schemeName, string schemaData) + throw new MemphisSchemaValidationException($"Schema: {schemaName} not found in local cache"); + } + + public bool ParseAndStore(string schemeName, string schemaData) + { + if (string.IsNullOrEmpty(schemeName)) { - if (string.IsNullOrEmpty(schemeName)) - { - throw new ArgumentException($"Invalid value provided for {schemeName}"); - } + throw new ArgumentException($"Invalid value provided for {schemeName}"); + } - if (string.IsNullOrEmpty(schemaData)) - { - throw new ArgumentException($"Invalid value provided for {schemaData}"); - } + if (string.IsNullOrEmpty(schemaData)) + { + throw new ArgumentException($"Invalid value provided for {schemaData}"); + } - try - { - var newSchema = Schema.For(schemaData); - _schemaCache.AddOrUpdate(schemeName, newSchema, (key, oldVal) => newSchema); + try + { + var newSchema = Schema.For(schemaData); + _schemaCache.AddOrUpdate(schemeName, newSchema, (key, oldVal) => newSchema); - return true; - } - catch (System.Exception) - { - return false; - } + return true; } - - public void RemoveSchema(string schemaName) + catch (System.Exception) { - _schemaCache.TryRemove(schemaName, out ISchema schemaObj); + return false; } } + + public void RemoveSchema(string schemaName) + { + _schemaCache.TryRemove(schemaName, out ISchema schemaObj); + } } \ No newline at end of file diff --git a/src/Memphis.Client/Validators/ISchemaValidator.cs b/src/Memphis.Client/Validators/ISchemaValidator.cs index b062820..806c47d 100644 --- a/src/Memphis.Client/Validators/ISchemaValidator.cs +++ b/src/Memphis.Client/Validators/ISchemaValidator.cs @@ -1,13 +1,10 @@ -using System.Threading.Tasks; +namespace Memphis.Client.Validators; -namespace Memphis.Client.Validators +internal interface ISchemaValidator { - internal interface ISchemaValidator - { - Task ValidateAsync(byte[] messageToValidate, string schemaAsStr); + Task ValidateAsync(byte[] messageToValidate, string schemaAsStr); - bool ParseAndStore(string schemeName, string schemaData); + bool ParseAndStore(string schemeName, string schemaData); - void RemoveSchema(string schemaName); - } + void RemoveSchema(string schemaName); } \ No newline at end of file diff --git a/src/Memphis.Client/Validators/JsonValidator.cs b/src/Memphis.Client/Validators/JsonValidator.cs index c82ff40..4df571f 100644 --- a/src/Memphis.Client/Validators/JsonValidator.cs +++ b/src/Memphis.Client/Validators/JsonValidator.cs @@ -1,40 +1,36 @@ using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Memphis.Client.Exception; using NJsonSchema; -namespace Memphis.Client.Validators +namespace Memphis.Client.Validators; + +internal class JsonValidator : SchemaValidator, ISchemaValidator { - internal class JsonValidator : SchemaValidator, ISchemaValidator + protected override JsonSchema Parse(string schemaData, string _) { - protected override JsonSchema Parse(string schemaData, string _) - { - return JsonSchema.FromJsonAsync(schemaData).GetAwaiter().GetResult(); - } + return JsonSchema.FromJsonAsync(schemaData).GetAwaiter().GetResult(); + } - public Task ValidateAsync(byte[] messageToValidate, string schemaAsStr) + public Task ValidateAsync(byte[] messageToValidate, string schemaAsStr) + { + if (!_schemaCache.TryGetValue(schemaAsStr, out var schemaObj)) + throw new MemphisSchemaValidationException($"Schema: {schemaAsStr} not found in local cache"); + try { - if (!_schemaCache.TryGetValue(schemaAsStr, out var schemaObj)) - throw new MemphisSchemaValidationException($"Schema: {schemaAsStr} not found in local cache"); - try - { - var jsonMsg = Encoding.UTF8.GetString(messageToValidate); - var errors = schemaObj.Validate(jsonMsg); + var jsonMsg = Encoding.UTF8.GetString(messageToValidate); + var errors = schemaObj.Validate(jsonMsg); - if (!errors.Any()) return Task.CompletedTask; - var sb = new StringBuilder(); - foreach (var error in errors) - { - sb.AppendLine(error.ToString()); - } - - throw new MemphisSchemaValidationException($"Schema validation has failed: \n {sb.ToString()}"); - } - catch (System.Exception ex) + if (!errors.Any()) return Task.CompletedTask; + var sb = new StringBuilder(); + foreach (var error in errors) { - throw new MemphisSchemaValidationException($"Schema validation has failed: \n {ex.Message}", ex); + sb.AppendLine(error.ToString()); } + + throw new MemphisSchemaValidationException($"Schema validation has failed: \n {sb.ToString()}"); + } + catch (System.Exception ex) + { + throw new MemphisSchemaValidationException($"Schema validation has failed: \n {ex.Message}", ex); } } } \ No newline at end of file diff --git a/src/Memphis.Client/Validators/ProtoBufValidator.cs b/src/Memphis.Client/Validators/ProtoBufValidator.cs index d4ca043..156034c 100644 --- a/src/Memphis.Client/Validators/ProtoBufValidator.cs +++ b/src/Memphis.Client/Validators/ProtoBufValidator.cs @@ -1,9 +1,4 @@ -using System; -using System.Text; -using System.Threading.Tasks; -using Memphis.Client.Exception; - -namespace Memphis.Client.Validators; +namespace Memphis.Client.Validators; #nullable disable internal class ProtoBufSchema diff --git a/src/Memphis.Client/Validators/SchemaValidator.cs b/src/Memphis.Client/Validators/SchemaValidator.cs index bc955ba..c58c2b5 100644 --- a/src/Memphis.Client/Validators/SchemaValidator.cs +++ b/src/Memphis.Client/Validators/SchemaValidator.cs @@ -1,47 +1,43 @@ -using System; -using System.Collections.Concurrent; +namespace Memphis.Client.Validators; -namespace Memphis.Client.Validators +internal abstract class SchemaValidator { - internal abstract class SchemaValidator + protected readonly ConcurrentDictionary _schemaCache; + + public SchemaValidator() { - protected readonly ConcurrentDictionary _schemaCache; + this._schemaCache = new ConcurrentDictionary(); + } - public SchemaValidator() + public bool ParseAndStore(string schemeName, string schemaData) + { + if (string.IsNullOrEmpty(schemeName)) { - this._schemaCache = new ConcurrentDictionary(); + throw new ArgumentException($"Invalid value provided for {schemeName}"); } - public bool ParseAndStore(string schemeName, string schemaData) + if (string.IsNullOrEmpty(schemaData)) { - if (string.IsNullOrEmpty(schemeName)) - { - throw new ArgumentException($"Invalid value provided for {schemeName}"); - } - - if (string.IsNullOrEmpty(schemaData)) - { - throw new ArgumentException($"Invalid value provided for {schemaData}"); - } - - try - { - var newSchema = Parse(schemaData, schemeName); - _schemaCache.AddOrUpdate(schemeName, newSchema, (key, oldVal) => newSchema); - - return true; - } - catch (System.Exception) - { - return false; - } + throw new ArgumentException($"Invalid value provided for {schemaData}"); } - public void RemoveSchema(string schemaName) + try { - _schemaCache.TryRemove(schemaName, out TSchema _); + var newSchema = Parse(schemaData, schemeName); + _schemaCache.AddOrUpdate(schemeName, newSchema, (key, oldVal) => newSchema); + + return true; } + catch (System.Exception) + { + return false; + } + } - protected abstract TSchema Parse(string schemaData, string schemaName); + public void RemoveSchema(string schemaName) + { + _schemaCache.TryRemove(schemaName, out TSchema _); } + + protected abstract TSchema Parse(string schemaData, string schemaName); } \ No newline at end of file diff --git a/src/Memphis.Client/Validators/ValidatorType.cs b/src/Memphis.Client/Validators/ValidatorType.cs index 8d71ada..d93b617 100644 --- a/src/Memphis.Client/Validators/ValidatorType.cs +++ b/src/Memphis.Client/Validators/ValidatorType.cs @@ -1,10 +1,9 @@ -namespace Memphis.Client.Validators +namespace Memphis.Client.Validators; + +internal enum ValidatorType { - internal enum ValidatorType - { - GRAPHQL = 1, - JSON = 2, - PROTOBUF = 3, - AVRO = 4 - } + GRAPHQL = 1, + JSON = 2, + PROTOBUF = 3, + AVRO = 4 } \ No newline at end of file