Skip to content

Commit

Permalink
Added batch size and wait time validation (superstreamlabs#172)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbazen authored Dec 21, 2023
1 parent b89fd40 commit d085f98
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 14 deletions.
6 changes: 1 addition & 5 deletions src/Memphis.Client/Constants/MemphisConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@ internal class MemphisStations
public const string MEMPHIS_PRODUCER_CREATIONS = "$memphis_producer_creations";
public const string MEMPHIS_CONSUMER_CREATIONS = "$memphis_consumer_creations";
public const string MEMPHIS_STATION_CREATIONS = "$memphis_station_creations";

public const string MEMPHIS_PRODUCER_DESTRUCTIONS = "$memphis_producer_destructions";
public const string MEMPHIS_CONSUMER_DESTRUCTIONS = "$memphis_consumer_destructions";

public const string MEMPHIS_SCHEMA_ATTACHMENTS = "$memphis_schema_attachments";
public const string MEMPHIS_SCHEMA_DETACHMENTS = "$memphis_schema_detachments";

public const string MEMPHIS_NOTIFICATIONS = "$memphis_notifications";


public const string MEMPHIS_STATION_DESTRUCTION = "$memphis_station_destructions";
}

Expand Down Expand Up @@ -63,6 +58,7 @@ internal static class MemphisGlobalVariables
{
public const string GLOBAL_ACCOUNT_NAME = "$memphis";
public const uint MURMUR_HASH_SEED = 31;
public const int MAX_BATCH_SIZE = 5000;
}

internal static class MemphisRequestVersions
Expand Down
15 changes: 14 additions & 1 deletion src/Memphis.Client/Consumer/FetchMessageOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@ namespace Memphis.Client.Consumer;

public sealed class FetchMessageOptions
{
private int _batchMaxTimeToWaitMs = 5_000;

public string ConsumerName { get; set; }
public string StationName { get; set; }
public string ConsumerGroup { get; set; }
public int BatchSize { get; set; } = 10;
public int BatchMaxTimeToWaitMs { get; set; } = 5_000;

/// <summary>
/// The maximum time to wait for a batch message to be consumed in milliseconds.
/// The default value is 5000 (5 seconds).
/// The lowest value is 1000 (1 second), and if it is set a value lower than 1 second, it will be ignored.
/// </summary>
public int BatchMaxTimeToWaitMs
{
get => _batchMaxTimeToWaitMs;
set =>_batchMaxTimeToWaitMs = (value < 1_000) ? 1_000 : value;
}

public int MaxAckTimeMs { get; set; } = 30_000;
public int MaxMsgDeliveries { get; set; } = 2;

Expand Down
2 changes: 2 additions & 0 deletions src/Memphis.Client/Consumer/MemphisConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ public IEnumerable<MemphisMessage> Fetch(int batchSize, bool prefetch)

internal IEnumerable<MemphisMessage> Fetch(FetchMessageOptions fetchMessageOptions)
{
MemphisClient.EnsureBatchSizeIsValid(fetchMessageOptions.BatchSize);

try
{
var batchSize = fetchMessageOptions.BatchSize;
Expand Down
13 changes: 12 additions & 1 deletion src/Memphis.Client/Consumer/MemphisConsumerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@ namespace Memphis.Client.Consumer;

public sealed class MemphisConsumerOptions
{
private int _batchMaxTimeToWaitMs = 5_000;

public string StationName { get; set; }
public string ConsumerName { get; set; }
public string ConsumerGroup { get; set; } = string.Empty;
public int PullIntervalMs { get; set; } = 1_000;
public int BatchSize { get; set; } = 10;
public int BatchMaxTimeToWaitMs { get; set; } = 5_000;
/// <summary>
/// The maximum time to wait for a batch message to be consumed in milliseconds.
/// The default value is 5000 (5 seconds).
/// The lowest value is 1000 (1 second), and if it is set a value lower than 1 second, it will be ignored.
/// </summary>
public int BatchMaxTimeToWaitMs
{
get => _batchMaxTimeToWaitMs;
set => _batchMaxTimeToWaitMs = (value < 1_000) ? 1_000 : value;
}
public int MaxAckTimeMs { get; set; } = 30_000;
public int MaxMsgDeliveries { get; set; } = 2;

Expand Down
18 changes: 11 additions & 7 deletions src/Memphis.Client/MemphisClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,12 @@ public sealed partial class MemphisClient : IMemphisClient
/// Schema update listners can be either producers or consumers
/// </summary>
private readonly ConcurrentDictionary<string, int> _stationSchemaUpdateListeners;

private readonly ConcurrentDictionary<ValidatorType, ISchemaValidator> _schemaValidators;
private readonly ConcurrentDictionary<string, MemphisProducer> _producerCache;
private readonly ConcurrentDictionary<string, MemphisConsumer> _consumerCache;
private readonly ConcurrentDictionary<string, bool> _stationSchemaVerseToDlsMap;
private readonly ConcurrentDictionary<string, bool> _clusterConfigurations;


private readonly ConcurrentDictionary<string, PartitionsUpdate> _stationPartitions;


private readonly SemaphoreSlim _schemaUpdateSemaphore = new(1, 1);
private readonly SemaphoreSlim _sdkClientUpdateSemaphore = new(1, 1);

Expand Down Expand Up @@ -160,6 +155,8 @@ internal async Task<Msg> RequestAsync(
/// <returns>An <see cref="MemphisConsumer"/> object connected to the station from consuming data</returns>
public async Task<MemphisConsumer> CreateConsumer(MemphisConsumerOptions consumerOptions, int timeoutRetry = 5, CancellationToken cancellationToken = default)
{
EnsureBatchSizeIsValid(consumerOptions.BatchSize);

if (_brokerConnection.IsClosed())
{
throw new MemphisConnectionException("Connection is dead");
Expand Down Expand Up @@ -509,6 +506,13 @@ static void HandleSchemaCreationErrorResponse(byte[] responseBytes)
}
}

internal static void EnsureBatchSizeIsValid(int batchSize)
{
if (batchSize > MemphisGlobalVariables.MAX_BATCH_SIZE ||
batchSize < 1)
throw new MemphisException($"Batch size should be between 1 and {MemphisGlobalVariables.MAX_BATCH_SIZE}");
}

internal string GetStationSchemaType(string internalStationName)
{
if (_schemaUpdateDictionary.TryGetValue(internalStationName,
Expand Down Expand Up @@ -880,7 +884,7 @@ private async Task ListenForSchemaUpdate(string stationName)
{
if (subscription is null || !subscription.IsValid)
break;

var schemaUpdateMsg = subscription.NextMessage();
await ProcessAndStoreSchemaUpdate(internalStationName, schemaUpdateMsg);

Expand Down Expand Up @@ -926,7 +930,7 @@ private async Task ListenForSchemaUpdate(string internalStationName, SchemaUpdat
{
if (subscription is null || !subscription.IsValid)
break;

var schemaUpdateMsg = subscription.NextMessage();
await ProcessAndStoreSchemaUpdate(internalStationName, schemaUpdateMsg);
}
Expand Down

0 comments on commit d085f98

Please sign in to comment.