Skip to content

Commit

Permalink
Added produce to a specific subject & refactored (superstreamlabs#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbazen authored Nov 10, 2023
1 parent bb86eec commit 794f773
Show file tree
Hide file tree
Showing 54 changed files with 740 additions and 773 deletions.
2 changes: 2 additions & 0 deletions src/Memphis.Client/Constants/MemphisConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ internal class MemphisSubjects

// not available yes
public const string SCHEMA_DESTRUCTION = "";

public const string FUNCTIONS_UPDATE = "$memphis_functions_updates_";
}

public static class MemphisSchemaTypes
Expand Down
2 changes: 0 additions & 2 deletions src/Memphis.Client/Consumer/FetchMessageOptions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#nullable disable

using System;

namespace Memphis.Client.Consumer;

public sealed class FetchMessageOptions
Expand Down
6 changes: 1 addition & 5 deletions src/Memphis.Client/Consumer/IMemphisConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Memphis.Client.Core;
using Memphis.Client.Core;

namespace Memphis.Client.Consumer;

Expand Down
10 changes: 0 additions & 10 deletions src/Memphis.Client/Consumer/MemphisConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Memphis.Client.Constants;
using Memphis.Client.Core;
using Memphis.Client.Exception;
using Memphis.Client.Helper;
using Memphis.Client.Models.Request;
using Memphis.Client.Station;
using NATS.Client;
using NATS.Client.JetStream;

namespace Memphis.Client.Consumer;

Expand Down
2 changes: 0 additions & 2 deletions src/Memphis.Client/Consumer/MemphisConsumerOptions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#nullable disable

using System;

namespace Memphis.Client.Consumer;

public sealed class MemphisConsumerOptions
Expand Down
6 changes: 1 addition & 5 deletions src/Memphis.Client/Core/MemphisConnectionEventArgs.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
using System;
using Memphis.Client.Exception;
using NATS.Client;

namespace Memphis.Client;
namespace Memphis.Client;

public class MemphisConnectionEventArgs
{
Expand Down
7 changes: 1 addition & 6 deletions src/Memphis.Client/Core/MemphisMessage.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
using System;
using System.Text;
using Memphis.Client.Constants;
using Memphis.Client.Exception;
using Memphis.Client.Helper;
using Memphis.Client.Helper;
using Memphis.Client.Models.Request;
using NATS.Client;

namespace Memphis.Client.Core;

Expand Down
31 changes: 13 additions & 18 deletions src/Memphis.Client/Core/MemphisMessageHandler.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,21 @@
using System;
using System.Collections.Generic;
using NATS.Client.JetStream;
namespace Memphis.Client.Core;

namespace Memphis.Client.Core
public class MemphisMessageHandlerEventArgs : EventArgs
{
public class MemphisMessageHandlerEventArgs : EventArgs
public MemphisMessageHandlerEventArgs(List<MemphisMessage> messageList, IJetStream? context, System.Exception? ex)
{
public MemphisMessageHandlerEventArgs(List<MemphisMessage> messageList, IJetStream? context, System.Exception? ex)
{
MessageList = messageList;
Context = context;
Exception = ex;
}
MessageList = messageList;
Context = context;
Exception = ex;
}


/// <summary>
/// Retrieves the message.
/// </summary>
public List<MemphisMessage> MessageList { get; }
/// <summary>
/// Retrieves the message.
/// </summary>
public List<MemphisMessage> MessageList { get; }

public System.Exception? Exception { get; }
public System.Exception? Exception { get; }

public IJetStream? Context { get; set; }
}
public IJetStream? Context { get; set; }
}
11 changes: 5 additions & 6 deletions src/Memphis.Client/Exception/MemphisConnectionException.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
namespace Memphis.Client.Exception
namespace Memphis.Client.Exception;

public class MemphisConnectionException : MemphisException
{
public class MemphisConnectionException : MemphisException
{
public MemphisConnectionException(string err, System.Exception ex) : base(err, ex) { }
public MemphisConnectionException(string err) : base(err) { }
}
public MemphisConnectionException(string err, System.Exception ex) : base(err, ex) { }
public MemphisConnectionException(string err) : base(err) { }
}
16 changes: 6 additions & 10 deletions src/Memphis.Client/Exception/MemphisException.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
using System;
using System.Text.RegularExpressions;
namespace Memphis.Client.Exception;

namespace Memphis.Client.Exception
public class MemphisException : System.Exception
{
public class MemphisException : System.Exception
public MemphisException(String err) : base(Regex.Replace(err, @"[nN][aA][Tt][Ss]", "memphis"))
{
public MemphisException(String err) : base(Regex.Replace(err, @"[nN][aA][Tt][Ss]", "memphis"))
{

}
}

public MemphisException(String err, System.Exception innerEx) : base(Regex.Replace(err, @"[nN][aA][Tt][Ss]", "memphis"), innerEx)
{
public MemphisException(String err, System.Exception innerEx) : base(Regex.Replace(err, @"[nN][aA][Tt][Ss]", "memphis"), innerEx)
{

}
}
}
11 changes: 5 additions & 6 deletions src/Memphis.Client/Exception/MemphisMessageIdException.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
namespace Memphis.Client.Exception
namespace Memphis.Client.Exception;

public class MemphisMessageIdException : MemphisException
{
public class MemphisMessageIdException : MemphisException
{
public MemphisMessageIdException(string err, System.Exception ex) : base(err, ex) { }
public MemphisMessageIdException(string err) : base(err) { }
}
public MemphisMessageIdException(string err, System.Exception ex) : base(err, ex) { }
public MemphisMessageIdException(string err) : base(err) { }
}
11 changes: 5 additions & 6 deletions src/Memphis.Client/Exception/MemphisSchemaValidationException.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
namespace Memphis.Client.Exception
namespace Memphis.Client.Exception;

public class MemphisSchemaValidationException : MemphisException
{
public class MemphisSchemaValidationException : MemphisException
{
public MemphisSchemaValidationException(string err, System.Exception ex) : base(err, ex) { }
public MemphisSchemaValidationException(string err) : base(err) { }
}
public MemphisSchemaValidationException(string err, System.Exception ex) : base(err, ex) { }
public MemphisSchemaValidationException(string err) : base(err) { }
}
16 changes: 16 additions & 0 deletions src/Memphis.Client/GlobalUsings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
global using System;
global using System.Collections.Concurrent;
global using System.Collections.Generic;
global using System.Collections.Specialized;
global using System.IO;
global using System.Text;
global using System.Text.RegularExpressions;
global using System.Threading;
global using System.Threading.Tasks;
global using System.Runtime.Serialization;
global using Memphis.Client.Constants;
global using Memphis.Client.Exception;
global using NATS.Client;
global using NATS.Client.Internals;
global using NATS.Client.JetStream;
global using Newtonsoft.Json;
2 changes: 0 additions & 2 deletions src/Memphis.Client/Helper/JsonSerDes.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System.IO;
using System.Runtime.Serialization.Json;
using System.Text;

namespace Memphis.Client.Helper;

Expand Down
57 changes: 27 additions & 30 deletions src/Memphis.Client/Helper/MemphisUtil.cs
Original file line number Diff line number Diff line change
@@ -1,41 +1,38 @@
using System;
using System.Security.Cryptography;
using System.Text;

namespace Memphis.Client.Helper
namespace Memphis.Client.Helper;

internal static class MemphisUtil
{
internal static class MemphisUtil
internal static string GetInternalName(string name)
{
return name.ToLower().Replace(".", "#");
}

internal static string GetStationName(string internalStationName)
{
internal static string GetInternalName(string name)
{
return name.ToLower().Replace(".", "#");
}

internal static string GetStationName(string internalStationName)
return internalStationName.Replace("#", ".");
}

internal static readonly char[] chars =
"0123456789abcdef".ToCharArray();

internal static string GetUniqueKey(int size)
{
byte[] data = new byte[4*size];
using (var crypto = RandomNumberGenerator.Create())
{
return internalStationName.Replace("#", ".");
crypto.GetBytes(data);
}

internal static readonly char[] chars =
"0123456789abcdef".ToCharArray();

internal static string GetUniqueKey(int size)
StringBuilder result = new StringBuilder(size);
for (int i = 0; i < size; i++)
{
byte[] data = new byte[4*size];
using (var crypto = RandomNumberGenerator.Create())
{
crypto.GetBytes(data);
}
StringBuilder result = new StringBuilder(size);
for (int i = 0; i < size; i++)
{
var rnd = BitConverter.ToUInt32(data, i * 4);
var idx = rnd % chars.Length;
var rnd = BitConverter.ToUInt32(data, i * 4);
var idx = rnd % chars.Length;

result.Append(chars[idx]);
}

return result.ToString();
result.Append(chars[idx]);
}

return result.ToString();
}
}
8 changes: 1 addition & 7 deletions src/Memphis.Client/Helper/MessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
using System;
using System.IO;
using System.Text;
using Memphis.Client.Constants;
using Memphis.Client.Exception;
using Newtonsoft.Json;
using SolTechnology.Avro;
using SolTechnology.Avro;

namespace Memphis.Client;

Expand Down
7 changes: 1 addition & 6 deletions src/Memphis.Client/IMemphisClient.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Threading;
using System.Threading.Tasks;
using Memphis.Client.Consumer;
using Memphis.Client.Consumer;
using Memphis.Client.Core;
using Memphis.Client.Producer;
using Memphis.Client.Station;
Expand Down
85 changes: 85 additions & 0 deletions src/Memphis.Client/MemphisClient.Station.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using Memphis.Client.Helper;

namespace Memphis.Client;

public partial class MemphisClient
{
private readonly ConcurrentDictionary<string, FunctionsDetails> _functionDetails = new();
private readonly ConcurrentDictionary<string, IAsyncSubscription> _functionDetailSubscriptions = new();
private readonly ConcurrentDictionary<string, int> _functionDetailSubscriptionCounter = new();

internal ConcurrentDictionary<string, FunctionsDetails> FunctionDetails { get => _functionDetails; }

private Task ListenForFunctionUpdate(string stationName, int stationVersion, CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(stationName) ||
stationVersion <= 0)
return Task.CompletedTask;

try
{
var internalStationName = MemphisUtil.GetInternalName(stationName);
if (_functionDetailSubscriptions.TryGetValue(internalStationName, out _))
{
_functionDetailSubscriptionCounter.AddOrUpdate(internalStationName, 1, (_, count) => count + 1);
return Task.CompletedTask;
}
var functionUpdateSubject = $"{MemphisSubjects.FUNCTIONS_UPDATE}{internalStationName}";
var subscription = _brokerConnection.SubscribeAsync(functionUpdateSubject, FunctionUpdateEventHandler);
if (!_functionDetailSubscriptions.TryAdd(internalStationName, subscription))
throw new MemphisException($"Could not add subscription for {functionUpdateSubject}.");
_functionDetailSubscriptionCounter.AddOrUpdate(internalStationName, 1, (_, count) => count + 1);
return Task.CompletedTask;
}
catch (System.Exception e)
{
throw new MemphisException(e.Message, e);
}

void FunctionUpdateEventHandler(object sender, MsgHandlerEventArgs e)
{
if (e is null || e.Message is null)
return;

var jsonData = Encoding.UTF8.GetString(e.Message.Data);
var functionsUpdate = JsonConvert.DeserializeObject<FunctionsUpdate>(jsonData);
if (functionsUpdate is null)
return;

_functionDetails.AddOrUpdate(
e.Message.Subject,
new FunctionsDetails { PartitionsFunctions = functionsUpdate.Functions },
(_, _) => new FunctionsDetails { PartitionsFunctions = functionsUpdate.Functions });
}
}

private async Task RemoveFunctionUpdateListenerAsync(string stationName)
{
try
{
if (string.IsNullOrWhiteSpace(stationName))
return;

var internalStationName = MemphisUtil.GetInternalName(stationName);
if (!_functionDetailSubscriptionCounter.TryGetValue(internalStationName, out var count) ||
count <= 0)
return;

int countAfterRemoval = count - 1;
_functionDetailSubscriptionCounter.TryUpdate(internalStationName, countAfterRemoval, count);
if (countAfterRemoval <= 0)
{
if (_functionDetailSubscriptions.TryGetValue(internalStationName, out var subscriptionToRemove))
{
await subscriptionToRemove.DrainAsync();
_functionDetailSubscriptions.TryRemove(internalStationName, out _);
}
_functionDetails.TryRemove(internalStationName, out _);
}
}
catch (System.Exception e)
{
throw new MemphisException(e.Message, e);
}
}
}
Loading

0 comments on commit 794f773

Please sign in to comment.