From d085f98e73708cb5b3ce2e1124e03995cb6d26ac Mon Sep 17 00:00:00 2001 From: Bazen Date: Thu, 21 Dec 2023 16:34:24 +0200 Subject: [PATCH] Added batch size and wait time validation (#172) --- .../Constants/MemphisConstants.cs | 6 +----- .../Consumer/FetchMessageOptions.cs | 15 ++++++++++++++- src/Memphis.Client/Consumer/MemphisConsumer.cs | 2 ++ .../Consumer/MemphisConsumerOptions.cs | 13 ++++++++++++- src/Memphis.Client/MemphisClient.cs | 18 +++++++++++------- 5 files changed, 40 insertions(+), 14 deletions(-) diff --git a/src/Memphis.Client/Constants/MemphisConstants.cs b/src/Memphis.Client/Constants/MemphisConstants.cs index 02c211a..aaf2924 100644 --- a/src/Memphis.Client/Constants/MemphisConstants.cs +++ b/src/Memphis.Client/Constants/MemphisConstants.cs @@ -4,16 +4,11 @@ internal class MemphisStations public const string MEMPHIS_PRODUCER_CREATIONS = "$memphis_producer_creations"; public const string MEMPHIS_CONSUMER_CREATIONS = "$memphis_consumer_creations"; public const string MEMPHIS_STATION_CREATIONS = "$memphis_station_creations"; - public const string MEMPHIS_PRODUCER_DESTRUCTIONS = "$memphis_producer_destructions"; public const string MEMPHIS_CONSUMER_DESTRUCTIONS = "$memphis_consumer_destructions"; - public const string MEMPHIS_SCHEMA_ATTACHMENTS = "$memphis_schema_attachments"; public const string MEMPHIS_SCHEMA_DETACHMENTS = "$memphis_schema_detachments"; - public const string MEMPHIS_NOTIFICATIONS = "$memphis_notifications"; - - public const string MEMPHIS_STATION_DESTRUCTION = "$memphis_station_destructions"; } @@ -63,6 +58,7 @@ internal static class MemphisGlobalVariables { public const string GLOBAL_ACCOUNT_NAME = "$memphis"; public const uint MURMUR_HASH_SEED = 31; + public const int MAX_BATCH_SIZE = 5000; } internal static class MemphisRequestVersions diff --git a/src/Memphis.Client/Consumer/FetchMessageOptions.cs b/src/Memphis.Client/Consumer/FetchMessageOptions.cs index dc88419..ac7c8f6 100644 --- a/src/Memphis.Client/Consumer/FetchMessageOptions.cs +++ b/src/Memphis.Client/Consumer/FetchMessageOptions.cs @@ -4,11 +4,24 @@ namespace Memphis.Client.Consumer; public sealed class FetchMessageOptions { + private int _batchMaxTimeToWaitMs = 5_000; + public string ConsumerName { get; set; } public string StationName { get; set; } public string ConsumerGroup { get; set; } public int BatchSize { get; set; } = 10; - public int BatchMaxTimeToWaitMs { get; set; } = 5_000; + + /// + /// The maximum time to wait for a batch message to be consumed in milliseconds. + /// The default value is 5000 (5 seconds). + /// The lowest value is 1000 (1 second), and if it is set a value lower than 1 second, it will be ignored. + /// + public int BatchMaxTimeToWaitMs + { + get => _batchMaxTimeToWaitMs; + set =>_batchMaxTimeToWaitMs = (value < 1_000) ? 1_000 : value; + } + public int MaxAckTimeMs { get; set; } = 30_000; public int MaxMsgDeliveries { get; set; } = 2; diff --git a/src/Memphis.Client/Consumer/MemphisConsumer.cs b/src/Memphis.Client/Consumer/MemphisConsumer.cs index 8dca40b..b091ac7 100644 --- a/src/Memphis.Client/Consumer/MemphisConsumer.cs +++ b/src/Memphis.Client/Consumer/MemphisConsumer.cs @@ -188,6 +188,8 @@ public IEnumerable Fetch(int batchSize, bool prefetch) internal IEnumerable Fetch(FetchMessageOptions fetchMessageOptions) { + MemphisClient.EnsureBatchSizeIsValid(fetchMessageOptions.BatchSize); + try { var batchSize = fetchMessageOptions.BatchSize; diff --git a/src/Memphis.Client/Consumer/MemphisConsumerOptions.cs b/src/Memphis.Client/Consumer/MemphisConsumerOptions.cs index 72a2166..3392518 100644 --- a/src/Memphis.Client/Consumer/MemphisConsumerOptions.cs +++ b/src/Memphis.Client/Consumer/MemphisConsumerOptions.cs @@ -4,12 +4,23 @@ namespace Memphis.Client.Consumer; public sealed class MemphisConsumerOptions { + private int _batchMaxTimeToWaitMs = 5_000; + public string StationName { get; set; } public string ConsumerName { get; set; } public string ConsumerGroup { get; set; } = string.Empty; public int PullIntervalMs { get; set; } = 1_000; public int BatchSize { get; set; } = 10; - public int BatchMaxTimeToWaitMs { get; set; } = 5_000; + /// + /// The maximum time to wait for a batch message to be consumed in milliseconds. + /// The default value is 5000 (5 seconds). + /// The lowest value is 1000 (1 second), and if it is set a value lower than 1 second, it will be ignored. + /// + public int BatchMaxTimeToWaitMs + { + get => _batchMaxTimeToWaitMs; + set => _batchMaxTimeToWaitMs = (value < 1_000) ? 1_000 : value; + } public int MaxAckTimeMs { get; set; } = 30_000; public int MaxMsgDeliveries { get; set; } = 2; diff --git a/src/Memphis.Client/MemphisClient.cs b/src/Memphis.Client/MemphisClient.cs index d921c20..e95d07f 100644 --- a/src/Memphis.Client/MemphisClient.cs +++ b/src/Memphis.Client/MemphisClient.cs @@ -38,17 +38,12 @@ public sealed partial class MemphisClient : IMemphisClient /// Schema update listners can be either producers or consumers /// private readonly ConcurrentDictionary _stationSchemaUpdateListeners; - private readonly ConcurrentDictionary _schemaValidators; private readonly ConcurrentDictionary _producerCache; private readonly ConcurrentDictionary _consumerCache; private readonly ConcurrentDictionary _stationSchemaVerseToDlsMap; private readonly ConcurrentDictionary _clusterConfigurations; - - private readonly ConcurrentDictionary _stationPartitions; - - private readonly SemaphoreSlim _schemaUpdateSemaphore = new(1, 1); private readonly SemaphoreSlim _sdkClientUpdateSemaphore = new(1, 1); @@ -160,6 +155,8 @@ internal async Task RequestAsync( /// An object connected to the station from consuming data public async Task CreateConsumer(MemphisConsumerOptions consumerOptions, int timeoutRetry = 5, CancellationToken cancellationToken = default) { + EnsureBatchSizeIsValid(consumerOptions.BatchSize); + if (_brokerConnection.IsClosed()) { throw new MemphisConnectionException("Connection is dead"); @@ -509,6 +506,13 @@ static void HandleSchemaCreationErrorResponse(byte[] responseBytes) } } + internal static void EnsureBatchSizeIsValid(int batchSize) + { + if (batchSize > MemphisGlobalVariables.MAX_BATCH_SIZE || + batchSize < 1) + throw new MemphisException($"Batch size should be between 1 and {MemphisGlobalVariables.MAX_BATCH_SIZE}"); + } + internal string GetStationSchemaType(string internalStationName) { if (_schemaUpdateDictionary.TryGetValue(internalStationName, @@ -880,7 +884,7 @@ private async Task ListenForSchemaUpdate(string stationName) { if (subscription is null || !subscription.IsValid) break; - + var schemaUpdateMsg = subscription.NextMessage(); await ProcessAndStoreSchemaUpdate(internalStationName, schemaUpdateMsg); @@ -926,7 +930,7 @@ private async Task ListenForSchemaUpdate(string internalStationName, SchemaUpdat { if (subscription is null || !subscription.IsValid) break; - + var schemaUpdateMsg = subscription.NextMessage(); await ProcessAndStoreSchemaUpdate(internalStationName, schemaUpdateMsg); }