From 40e402cefc18e20e31bf2598eb04f5fb3aad9fe6 Mon Sep 17 00:00:00 2001 From: MASES Public Developers Team <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 4 May 2022 02:26:55 +0200 Subject: [PATCH] Benchmark program update (#59) * Updates on KNetConsumer * https://github.com/masesgroup/KNet/issues/53#issuecomment-1115114120 * Fixed some numbers * #53: stdev becomes SD * #53: updates on report * #53: updates on report * #53: extra tests --- src/net/Documentation/articles/performance.md | 36 ++++- .../Clients/Consumer/KNetConsumer.cs | 133 ++++++++++++++++-- tests/KNetBenchmark/Program.cs | 27 ++-- tests/KNetBenchmark/ProgramKNet.cs | 21 +-- tests/KNetBenchmark/ProgramSupport.cs | 20 ++- 5 files changed, 200 insertions(+), 37 deletions(-) diff --git a/src/net/Documentation/articles/performance.md b/src/net/Documentation/articles/performance.md index 17bc0660d9..fe9ab99138 100644 --- a/src/net/Documentation/articles/performance.md +++ b/src/net/Documentation/articles/performance.md @@ -97,21 +97,21 @@ The configuration is: Here below a set of results, in bold the results which are better using KNet: -- KNet/Confluent.Kafka Produce Average ratio percentage (stdev ratio percentage): +- KNet/Confluent.Kafka Produce Average ratio percentage (SD ratio percentage): | | 10 bytes | 100 bytes | 1000 bytes | 10000 bytes | 100000 bytes | |--- |--- |--- |--- |--- |--- | | 10 messages | **9,04 (4,34)** | **5,47 (3,1)** | **15,45 (5,29)** | **7,54 (4,38)** | **19,73 (4,23)** | -| 100 messages | **18,9 (18,9)** | **38,1 (8,1)** | **30,34 (5,44)** | **26 (3,04)** | **69,4 (13,09)** | +| 100 messages | **18,9 (6,29)** | **38,1 (8,1)** | **30,34 (5,44)** | **26 (3,04)** | **69,4 (13,09)** | | 1000 messages | 197,73 (10,54) | 109,92 (6,13) | **57,6 (7,32)** | **52,71 (8,17)** | **75,76 (43,7)** | | 10000 messages | 2102,28 (736,54) | 796,84 (514,28) | 173,39 (401,76) | 123,62 (620,46) | **99,5 (108,3)** | -- KNet/Confluent.Kafka Consume Average ratio percentage (stdev ratio percentage): +- KNet/Confluent.Kafka Consume Average ratio percentage (SD ratio percentage): | | 10 bytes | 100 bytes | 1000 bytes | 10000 bytes | 100000 bytes | |--- |--- |--- |--- |--- |--- | | 10 messages | **85,93 (399,84)**| **85,41 (282,85)** | **85,14 (297,98)** | **24,07 (229,23)** | **36,23 (285,77)** | -| 100 messages | **94,54 (94,54)** | **94,7 (287,78)** | **68,49 (389,25)** | **71,97 (276,56)** | 108,57 (89,45) | +| 100 messages | **94,54 (479,13)** | **94,7 (287,78)** | **68,49 (389,25)** | **71,97 (276,56)** | 108,57 (89,45) | | 1000 messages | 192,27 (906,94) | 521,86 (867,93) | 103,62 (1854,84) | 255,52 (287,33) | 163,24 (124,23) | | 10000 messages | 9153,56 (77543,04) | 7948,76 (69701,75) | 3848,12 (23910,64) | 706,83 (3905,89) | 213,46 (1013,16) | @@ -121,9 +121,9 @@ Looking at the above table KNet performs better than Confluent.Kafka with burst The best produce performance was obtained with 10 messages of 100, or 10000, bytes: KNet is 20 times fast than Confluent.Kafka. The best consume performance was obtained with 10 messages of 10000 bytes: KNet is 4 times fast than Confluent.Kafka. -### Stdev ratio percentage +### SD ratio percentage -Looking at value within the brackets that represents the ratio of the stdev it is possible to highlight that: +Looking at value within the brackets, that represents the ratio of the SD, it is possible to highlight that: - in produce KNet has more stable measures except when the number of messages is high (10000 messages); - in consume KNet has less stable measures. @@ -135,4 +135,26 @@ Using KNetProducer the numbers of JNI invocation are less than using KafkaProduc The same consideration can be applied on the consume side: KNetConsumer does not reduce the impact of JNI interface and it does not give any great improvement. The JNI interface has an impact even when the number of messages is high because during processing the Garbage Collector is activated many times increasing the JNI overhead. -With the upcoming JCOBridge major version the JNI impact will be reduced and KNet will get extra performance both in produce and in consume. +**With the upcoming JCOBridge major version the JNI impact will be reduced and KNet will get extra performance both in produce and in consume.** + +The first measures done with an internal alfa release reports big improvements. Here some cases: +- 100 messages: + - 10 bytes: Produce Average ratio percentage from 18 to 11, Consume Average ratio percentage from 94 to 88 + - 100 bytes: Produce Average ratio percentage from 38 to 17, Consume Average ratio percentage from 94 to 88 + - 1000 bytes: Produce Average ratio percentage from 30 to 16, Consume Average ratio percentage from 68 to 38 + - 10000 bytes: Produce Average ratio percentage from 26 to 24, Consume Average ratio percentage from 71 to 47 + - 100000 bytes: Produce Average ratio percentage from 69 to 43, Consume Average ratio percentage from 108 to 101 +- 1000 messages: + - 10 bytes: Produce Average ratio percentage from 197 to 111, Consume Average ratio percentage from 192 to 137 + - 100 bytes: Produce Average ratio percentage from 109 to 86, Consume Average ratio percentage from 521 to 276 + - 1000 bytes: Produce Average ratio percentage from 57 to 53, Consume Average ratio percentage from 103 to 56 + - 10000 bytes: Produce Average ratio percentage from 52 to 48, Consume Average ratio percentage from 255 to 177 + - 100000 bytes: Produce Average ratio percentage from 75 to 71, Consume Average ratio percentage from 163 to 146 +- 10000 messages: + - 10 bytes: Produce Average ratio percentage from 2102 to 656, Consume Average ratio percentage from 9153 to 4139 + - 100 bytes: Produce Average ratio percentage from 796 to 410, Consume Average ratio percentage from 7948 to 3378 + - 1000 bytes: Produce Average ratio percentage from 173 to 81, Consume Average ratio percentage from 3848 to 1668 + - 10000 bytes: Produce Average ratio percentage from 123 to 80, Consume Average ratio percentage from 706 to 390 + - 100000 bytes: Produce Average ratio percentage from 100 to 96, Consume Average ratio percentage from 213 to 172 + + diff --git a/src/net/KNet/ClientSide/BridgedClasses/Clients/Consumer/KNetConsumer.cs b/src/net/KNet/ClientSide/BridgedClasses/Clients/Consumer/KNetConsumer.cs index a91c8c2f7f..7d46484a82 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Clients/Consumer/KNetConsumer.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Clients/Consumer/KNetConsumer.cs @@ -23,24 +23,30 @@ using Java.Util.Regex; using MASES.JCOBridge.C2JBridge; using System; +using System.Collections.Concurrent; namespace MASES.KNet.Clients.Consumer { public class Message { + readonly ConsumerRecord record; readonly KNetConsumerCallback obj; internal Message(KNetConsumerCallback obj) { this.obj = obj; } + internal Message(ConsumerRecord record) + { + this.record = record; + } - //public string Topic => data.TypedEventData; + public string Topic => record != null ? record.Topic : obj.Instance.Invoke("getTopic"); - //public int Partition => data.GetAt(0); + public int Partition => record != null ? record.Partition : obj.Instance.Invoke("getPartition"); - public K Key => obj.Instance.Invoke("getKey"); + public K Key => record != null ? record.Key : obj.Instance.Invoke("getKey"); - public V Value => obj.Instance.Invoke("getValue"); + public V Value => record != null ? record.Value : obj.Instance.Invoke("getValue"); } public interface IKNetConsumerCallback : IJVMBridgeBase @@ -73,11 +79,26 @@ public virtual void RecordReady(Message message) { } public interface IKNetConsumer : IConsumer { + bool IsCompleting { get; } + + bool IsEmpty { get; } + + int WaitingMessages { get; } + + void SetCallback(Action> cb); + + void ConsumeAsync(long timeoutMs); + void Consume(long timeoutMs, Action> callback); } public class KNetConsumer : KafkaConsumer, IKNetConsumer { + bool threadRunning = false; + long dequeing = 0; + readonly System.Threading.Thread consumeThread = null; + readonly System.Threading.ManualResetEvent threadExited = null; + readonly ConcurrentQueue> consumedRecords = null; readonly KNetConsumerCallback consumerCallback = null; public override string ClassName => "org.mases.knet.clients.consumer.KNetConsumer"; @@ -86,18 +107,40 @@ public KNetConsumer() { } - public KNetConsumer(Properties props) + public KNetConsumer(Properties props, bool useJVMCallback = false) : base(props) { - consumerCallback = new KNetConsumerCallback(CallbackMessage); - IExecute("setCallback", consumerCallback); + if (useJVMCallback) + { + consumerCallback = new KNetConsumerCallback(CallbackMessage); + IExecute("setCallback", consumerCallback); + } + else + { + consumedRecords = new(); + threadRunning = true; + threadExited = new(false); + consumeThread = new(ConsumeHandler); + consumeThread.Start(); + } } - public KNetConsumer(Properties props, Deserializer keyDeserializer, Deserializer valueDeserializer) + public KNetConsumer(Properties props, Deserializer keyDeserializer, Deserializer valueDeserializer, bool useJVMCallback = false) : base(props, keyDeserializer, valueDeserializer) { - consumerCallback = new KNetConsumerCallback(CallbackMessage); - IExecute("setCallback", consumerCallback); + if (useJVMCallback) + { + consumerCallback = new KNetConsumerCallback(CallbackMessage); + IExecute("setCallback", consumerCallback); + } + else + { + consumedRecords = new(); + threadRunning = true; + threadExited = new(false); + consumeThread = new(ConsumeHandler); + consumeThread.Start(); + } } Action> actionCallback = null; @@ -112,11 +155,81 @@ public override void Dispose() base.Dispose(); IExecute("setCallback", null); consumerCallback?.Dispose(); + threadRunning = false; + if (consumedRecords != null) + { + lock (consumedRecords) + { + System.Threading.Monitor.Pulse(consumedRecords); + } + while (IsCompleting) { threadExited.WaitOne(100); }; + actionCallback = null; + } + } + + public void SetCallback(Action> cb) + { + actionCallback = cb; + } + + void ConsumeHandler(object o) + { + try + { + while (threadRunning) + { + if (consumedRecords.TryDequeue(out ConsumerRecords records)) + { + System.Threading.Interlocked.Increment(ref dequeing); + try + { + foreach (var item in records) + { + actionCallback?.Invoke(new Message(item)); + } + } + catch { } + finally + { + System.Threading.Interlocked.Decrement(ref dequeing); + } + } + else + { + lock (consumedRecords) + { + System.Threading.Monitor.Wait(consumedRecords); + } + } + } + } + catch { } + finally { threadExited.Set(); threadRunning = false; } + } + + public bool IsCompleting => !consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref dequeing) != 0; + + public bool IsEmpty => consumedRecords.IsEmpty; + + public int WaitingMessages => consumedRecords.Count; + + public void ConsumeAsync(long timeoutMs) + { + Duration duration = TimeSpan.FromMilliseconds(timeoutMs); + if (consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true."); + if (!threadRunning) throw new InvalidOperationException("Dispatching thread is not running."); + var results = this.Poll(duration); + consumedRecords.Enqueue(results); + lock (consumedRecords) + { + System.Threading.Monitor.Pulse(consumedRecords); + } } public void Consume(long timeoutMs, Action> callback) { Duration duration = TimeSpan.FromMilliseconds(timeoutMs); + if (consumerCallback == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to false."); try { actionCallback = callback; diff --git a/tests/KNetBenchmark/Program.cs b/tests/KNetBenchmark/Program.cs index d2eeb649e1..9458a4b21f 100644 --- a/tests/KNetBenchmark/Program.cs +++ b/tests/KNetBenchmark/Program.cs @@ -145,20 +145,29 @@ static void Main(string[] args) if (ProduceOnly) { Console.WriteLine("Produce"); - Console.WriteLine($"KNet microseconds - Max {kNetProduceTimes.Max():####.##} Min {kNetProduceTimes.Min():####.##} Avg {kNetProduceTimes.Average():####.##} stdev {kNetProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation():####.##} %"); - Console.WriteLine($"Confluent microseconds - Max {confluentProduceTimes.Max():####.##} Min {confluentProduceTimes.Min():####.##} Avg {confluentProduceTimes.Average():####.##} stdev {confluentProduceTimes.StandardDeviation():####.##} CV {100 * confluentProduceTimes.CoefficientOfVariation():####.##} %"); - Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetProduceTimes.Max() / confluentProduceTimes.Max():####.##} Min {100 * (double)kNetProduceTimes.Min() / confluentProduceTimes.Min():####.##} Avg {100 * (double)kNetProduceTimes.Average() / confluentProduceTimes.Average():####.##} stdev {100 * (double)kNetProduceTimes.StandardDeviation() / confluentProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation() / confluentProduceTimes.CoefficientOfVariation():####.##} %"); + Console.WriteLine($"KNet microseconds - Max {kNetProduceTimes.Max():####.##} Min {kNetProduceTimes.Min():####.##} Avg {kNetProduceTimes.Average():####.##} SD {kNetProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.StandardDeviation() / kNetProduceTimes.Average():####.##} %"); + Console.WriteLine($"KNet microseconds - Avg Filtered {kNetProduceTimes.FilterMinMax().Average():####.##} SD Filtered {kNetProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().StandardDeviation() / kNetProduceTimes.FilterMinMax().Average():####.##} %"); + Console.WriteLine($"Confluent microseconds - Max {confluentProduceTimes.Max():####.##} Min {confluentProduceTimes.Min():####.##} Avg {confluentProduceTimes.Average():####.##} SD {confluentProduceTimes.StandardDeviation():####.##} CV {100 * confluentProduceTimes.StandardDeviation() / confluentProduceTimes.Average():####.##} %"); + Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentProduceTimes.FilterMinMax().Average():####.##} SD Filtered {confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().Average():####.##} %"); + Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetProduceTimes.Max() / confluentProduceTimes.Max():####.##} Min {100 * (double)kNetProduceTimes.Min() / confluentProduceTimes.Min():####.##} Avg {100 * (double)kNetProduceTimes.Average() / confluentProduceTimes.Average():####.##} SD {100 * (double)kNetProduceTimes.StandardDeviation() / confluentProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation() / confluentProduceTimes.CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetProduceTimes.FilterMinMax().Average() / confluentProduceTimes.FilterMinMax().Average():####.##} SD Filtered {100 * (double)kNetProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().CoefficientOfVariation() / confluentProduceTimes.FilterMinMax().CoefficientOfVariation():####.##}"); } else { Console.WriteLine("Produce"); - Console.WriteLine($"KNet microseconds - Max {kNetProduceTimes.Max():####.##} Min {kNetProduceTimes.Min():####.##} Avg {kNetProduceTimes.Average():####.##} stdev {kNetProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.StandardDeviation() / kNetProduceTimes.Average():####.##} %"); - Console.WriteLine($"Confluent microseconds - Max {confluentProduceTimes.Max():####.##} Min {confluentProduceTimes.Min():####.##} Avg {confluentProduceTimes.Average():####.##} stdev {confluentProduceTimes.StandardDeviation():####.##} CV {100 * confluentProduceTimes.StandardDeviation() / confluentProduceTimes.Average():####.##} %"); - Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetProduceTimes.Max() / confluentProduceTimes.Max():####.##} Min {100 * (double)kNetProduceTimes.Min() / confluentProduceTimes.Min():####.##} Avg {100 * (double)kNetProduceTimes.Average() / confluentProduceTimes.Average():####.##} stdev {100 * (double)kNetProduceTimes.StandardDeviation() / confluentProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation() / confluentProduceTimes.CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet microseconds - Max {kNetProduceTimes.Max():####.##} Min {kNetProduceTimes.Min():####.##} Avg {kNetProduceTimes.Average():####.##} SD {kNetProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.StandardDeviation() / kNetProduceTimes.Average():####.##} %"); + Console.WriteLine($"KNet microseconds - Avg Filtered {kNetProduceTimes.FilterMinMax().Average():####.##} SD Filtered {kNetProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().StandardDeviation() / kNetProduceTimes.FilterMinMax().Average():####.##} %"); + Console.WriteLine($"Confluent microseconds - Max {confluentProduceTimes.Max():####.##} Min {confluentProduceTimes.Min():####.##} Avg {confluentProduceTimes.Average():####.##} SD {confluentProduceTimes.StandardDeviation():####.##} CV {100 * confluentProduceTimes.StandardDeviation() / confluentProduceTimes.Average():####.##} %"); + Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentProduceTimes.FilterMinMax().Average():####.##} SD Filtered {confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().Average():####.##} %"); + Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetProduceTimes.Max() / confluentProduceTimes.Max():####.##} Min {100 * (double)kNetProduceTimes.Min() / confluentProduceTimes.Min():####.##} Avg {100 * (double)kNetProduceTimes.Average() / confluentProduceTimes.Average():####.##} SD {100 * (double)kNetProduceTimes.StandardDeviation() / confluentProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation() / confluentProduceTimes.CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetProduceTimes.FilterMinMax().Average() / confluentProduceTimes.FilterMinMax().Average():####.##} SD Filtered {100 * (double)kNetProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().CoefficientOfVariation() / confluentProduceTimes.FilterMinMax().CoefficientOfVariation():####.##}"); Console.WriteLine("Consume"); - Console.WriteLine($"KNet microseconds - Max {kNetConsumeTimes.Max():####.##} Min {kNetConsumeTimes.Min():####.##} Avg {kNetConsumeTimes.Average():####.##} stdev {kNetConsumeTimes.StandardDeviation():####.##} CV {100 * kNetConsumeTimes.CoefficientOfVariation():####.##} %"); - Console.WriteLine($"Confluent microseconds - Max {confluentConsumeTimes.Max():####.##} Min {confluentConsumeTimes.Min():####.##} Avg {confluentConsumeTimes.Average():####.##} stdev {confluentConsumeTimes.StandardDeviation():####.##} CV {100 * confluentConsumeTimes.CoefficientOfVariation():####.##} %"); - Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetConsumeTimes.Max() / confluentConsumeTimes.Max():########.##} Min {100 * (double)kNetConsumeTimes.Min() / confluentConsumeTimes.Min():####.##} Avg {100 * (double)kNetConsumeTimes.Average() / confluentConsumeTimes.Average():####.##} stdev {100 * (double)kNetConsumeTimes.StandardDeviation() / confluentConsumeTimes.StandardDeviation():####.##} CV {100 * kNetConsumeTimes.CoefficientOfVariation() / confluentConsumeTimes.CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet microseconds - Max {kNetConsumeTimes.Max():####.##} Min {kNetConsumeTimes.Min():####.##} Avg {kNetConsumeTimes.Average():####.##} SD {kNetConsumeTimes.StandardDeviation():####.##} CV {100 * kNetConsumeTimes.CoefficientOfVariation():####.##} %"); + Console.WriteLine($"KNet microseconds - Avg Filtered {kNetConsumeTimes.FilterMinMax().Average():####.##} SD Filtered {kNetConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##} %"); + Console.WriteLine($"Confluent microseconds - Max {confluentConsumeTimes.Max():####.##} Min {confluentConsumeTimes.Min():####.##} Avg {confluentConsumeTimes.Average():####.##} SD {confluentConsumeTimes.StandardDeviation():####.##} CV {100 * confluentConsumeTimes.CoefficientOfVariation():####.##} %"); + Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentConsumeTimes.FilterMinMax().Average():####.##} SD Filtered {confluentConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##} %"); + Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetConsumeTimes.Max() / confluentConsumeTimes.Max():########.##} Min {100 * (double)kNetConsumeTimes.Min() / confluentConsumeTimes.Min():####.##} Avg {100 * (double)kNetConsumeTimes.Average() / confluentConsumeTimes.Average():####.##} SD {100 * (double)kNetConsumeTimes.StandardDeviation() / confluentConsumeTimes.StandardDeviation():####.##} CV {100 * kNetConsumeTimes.CoefficientOfVariation() / confluentConsumeTimes.CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetConsumeTimes.FilterMinMax().Average() / confluentConsumeTimes.FilterMinMax().Average():####.##} SD Filtered {100 * (double)kNetConsumeTimes.FilterMinMax().StandardDeviation() / confluentConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetConsumeTimes.FilterMinMax().CoefficientOfVariation() / confluentConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##}"); } File.WriteAllText(Path.Combine(ResultsPath, $"results_{DateTime.Now:yyyyMMdd_HHmmss}.csv"), sb.ToString()); diff --git a/tests/KNetBenchmark/ProgramKNet.cs b/tests/KNetBenchmark/ProgramKNet.cs index da06968b53..a35497ca4a 100644 --- a/tests/KNetBenchmark/ProgramKNet.cs +++ b/tests/KNetBenchmark/ProgramKNet.cs @@ -472,20 +472,21 @@ static Stopwatch ConsumeKNet(string topicName, int length, int numpacket, byte[] { int counter = 0; consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener); - while (true) + consumer.SetCallback((message) => { - consumer.Consume((long)TimeSpan.FromMilliseconds(100).TotalMilliseconds, (message) => + if (CheckOnConsume) { - if (CheckOnConsume) + if (!message.Value.SequenceEqual(data) + || (!SinglePacket && message.Key != counter)) { - if (!message.Value.SequenceEqual(data) - || (!SinglePacket && message.Key != counter)) - { - throw new InvalidOperationException($"Incorrect data counter {counter} item.Key {message.Key}"); - } + throw new InvalidOperationException($"Incorrect data counter {counter} item.Key {message.Key}"); } - counter++; - }); + } + counter++; + }); + while (true) + { + consumer.ConsumeAsync((long)TimeSpan.FromMilliseconds(100).TotalMilliseconds); if (AlwaysCommit) consumer.CommitSync(); if (counter >= numpacket) diff --git a/tests/KNetBenchmark/ProgramSupport.cs b/tests/KNetBenchmark/ProgramSupport.cs index 69ead12267..2eb1fe73fa 100644 --- a/tests/KNetBenchmark/ProgramSupport.cs +++ b/tests/KNetBenchmark/ProgramSupport.cs @@ -54,12 +54,30 @@ public static double KbPerSecond(this Stopwatch watch, int numPacket, int length return (double)(length * numPacket) / 1024 / ((double)watch.ElapsedMicroSeconds() / 1000000); } + public static IEnumerable FilterMinMax(this IEnumerable values) + { + return FilterMinMax(values.Select(p => (double)p)); + } + + public static IEnumerable FilterMinMax(this IEnumerable values) + { + System.Collections.Generic.List result = new(values); + if (result.Count > 2) + { + var min = result.Min(); + var max = result.Max(); + result.Remove(min); + result.Remove(max); + } + return result; + } + public static double StandardDeviation(this IEnumerable values) { return StandardDeviation(values.Select(p => (double)p)); } - public static double StandardDeviation(this IEnumerable values) + public static double StandardDeviation(this IEnumerable values) // from stackoverflow { double standardDeviation = 0;