From 94877b4a47d13fe3aeba3185316ab9fa464d3d7b Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Mon, 2 May 2022 19:40:06 +0200 Subject: [PATCH 1/7] Updates on KNetConsumer --- .../Clients/Consumer/KNetConsumer.cs | 133 ++++++++++++++++-- tests/KNetBenchmark/ProgramKNet.cs | 21 +-- 2 files changed, 134 insertions(+), 20 deletions(-) diff --git a/src/net/KNet/ClientSide/BridgedClasses/Clients/Consumer/KNetConsumer.cs b/src/net/KNet/ClientSide/BridgedClasses/Clients/Consumer/KNetConsumer.cs index a91c8c2f7f..7d46484a82 100644 --- a/src/net/KNet/ClientSide/BridgedClasses/Clients/Consumer/KNetConsumer.cs +++ b/src/net/KNet/ClientSide/BridgedClasses/Clients/Consumer/KNetConsumer.cs @@ -23,24 +23,30 @@ using Java.Util.Regex; using MASES.JCOBridge.C2JBridge; using System; +using System.Collections.Concurrent; namespace MASES.KNet.Clients.Consumer { public class Message { + readonly ConsumerRecord record; readonly KNetConsumerCallback obj; internal Message(KNetConsumerCallback obj) { this.obj = obj; } + internal Message(ConsumerRecord record) + { + this.record = record; + } - //public string Topic => data.TypedEventData; + public string Topic => record != null ? record.Topic : obj.Instance.Invoke("getTopic"); - //public int Partition => data.GetAt(0); + public int Partition => record != null ? record.Partition : obj.Instance.Invoke("getPartition"); - public K Key => obj.Instance.Invoke("getKey"); + public K Key => record != null ? record.Key : obj.Instance.Invoke("getKey"); - public V Value => obj.Instance.Invoke("getValue"); + public V Value => record != null ? record.Value : obj.Instance.Invoke("getValue"); } public interface IKNetConsumerCallback : IJVMBridgeBase @@ -73,11 +79,26 @@ public virtual void RecordReady(Message message) { } public interface IKNetConsumer : IConsumer { + bool IsCompleting { get; } + + bool IsEmpty { get; } + + int WaitingMessages { get; } + + void SetCallback(Action> cb); + + void ConsumeAsync(long timeoutMs); + void Consume(long timeoutMs, Action> callback); } public class KNetConsumer : KafkaConsumer, IKNetConsumer { + bool threadRunning = false; + long dequeing = 0; + readonly System.Threading.Thread consumeThread = null; + readonly System.Threading.ManualResetEvent threadExited = null; + readonly ConcurrentQueue> consumedRecords = null; readonly KNetConsumerCallback consumerCallback = null; public override string ClassName => "org.mases.knet.clients.consumer.KNetConsumer"; @@ -86,18 +107,40 @@ public KNetConsumer() { } - public KNetConsumer(Properties props) + public KNetConsumer(Properties props, bool useJVMCallback = false) : base(props) { - consumerCallback = new KNetConsumerCallback(CallbackMessage); - IExecute("setCallback", consumerCallback); + if (useJVMCallback) + { + consumerCallback = new KNetConsumerCallback(CallbackMessage); + IExecute("setCallback", consumerCallback); + } + else + { + consumedRecords = new(); + threadRunning = true; + threadExited = new(false); + consumeThread = new(ConsumeHandler); + consumeThread.Start(); + } } - public KNetConsumer(Properties props, Deserializer keyDeserializer, Deserializer valueDeserializer) + public KNetConsumer(Properties props, Deserializer keyDeserializer, Deserializer valueDeserializer, bool useJVMCallback = false) : base(props, keyDeserializer, valueDeserializer) { - consumerCallback = new KNetConsumerCallback(CallbackMessage); - IExecute("setCallback", consumerCallback); + if (useJVMCallback) + { + consumerCallback = new KNetConsumerCallback(CallbackMessage); + IExecute("setCallback", consumerCallback); + } + else + { + consumedRecords = new(); + threadRunning = true; + threadExited = new(false); + consumeThread = new(ConsumeHandler); + consumeThread.Start(); + } } Action> actionCallback = null; @@ -112,11 +155,81 @@ public override void Dispose() base.Dispose(); IExecute("setCallback", null); consumerCallback?.Dispose(); + threadRunning = false; + if (consumedRecords != null) + { + lock (consumedRecords) + { + System.Threading.Monitor.Pulse(consumedRecords); + } + while (IsCompleting) { threadExited.WaitOne(100); }; + actionCallback = null; + } + } + + public void SetCallback(Action> cb) + { + actionCallback = cb; + } + + void ConsumeHandler(object o) + { + try + { + while (threadRunning) + { + if (consumedRecords.TryDequeue(out ConsumerRecords records)) + { + System.Threading.Interlocked.Increment(ref dequeing); + try + { + foreach (var item in records) + { + actionCallback?.Invoke(new Message(item)); + } + } + catch { } + finally + { + System.Threading.Interlocked.Decrement(ref dequeing); + } + } + else + { + lock (consumedRecords) + { + System.Threading.Monitor.Wait(consumedRecords); + } + } + } + } + catch { } + finally { threadExited.Set(); threadRunning = false; } + } + + public bool IsCompleting => !consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref dequeing) != 0; + + public bool IsEmpty => consumedRecords.IsEmpty; + + public int WaitingMessages => consumedRecords.Count; + + public void ConsumeAsync(long timeoutMs) + { + Duration duration = TimeSpan.FromMilliseconds(timeoutMs); + if (consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true."); + if (!threadRunning) throw new InvalidOperationException("Dispatching thread is not running."); + var results = this.Poll(duration); + consumedRecords.Enqueue(results); + lock (consumedRecords) + { + System.Threading.Monitor.Pulse(consumedRecords); + } } public void Consume(long timeoutMs, Action> callback) { Duration duration = TimeSpan.FromMilliseconds(timeoutMs); + if (consumerCallback == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to false."); try { actionCallback = callback; diff --git a/tests/KNetBenchmark/ProgramKNet.cs b/tests/KNetBenchmark/ProgramKNet.cs index da06968b53..a35497ca4a 100644 --- a/tests/KNetBenchmark/ProgramKNet.cs +++ b/tests/KNetBenchmark/ProgramKNet.cs @@ -472,20 +472,21 @@ static Stopwatch ConsumeKNet(string topicName, int length, int numpacket, byte[] { int counter = 0; consumer.Subscribe(Collections.Singleton(topicName), rebalanceListener); - while (true) + consumer.SetCallback((message) => { - consumer.Consume((long)TimeSpan.FromMilliseconds(100).TotalMilliseconds, (message) => + if (CheckOnConsume) { - if (CheckOnConsume) + if (!message.Value.SequenceEqual(data) + || (!SinglePacket && message.Key != counter)) { - if (!message.Value.SequenceEqual(data) - || (!SinglePacket && message.Key != counter)) - { - throw new InvalidOperationException($"Incorrect data counter {counter} item.Key {message.Key}"); - } + throw new InvalidOperationException($"Incorrect data counter {counter} item.Key {message.Key}"); } - counter++; - }); + } + counter++; + }); + while (true) + { + consumer.ConsumeAsync((long)TimeSpan.FromMilliseconds(100).TotalMilliseconds); if (AlwaysCommit) consumer.CommitSync(); if (counter >= numpacket) From 4aa196a5f2c9fe99c8698c5ebc9df536d3522b1e Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Mon, 2 May 2022 19:40:46 +0200 Subject: [PATCH 2/7] https://github.com/masesgroup/KNet/issues/53#issuecomment-1115114120 --- tests/KNetBenchmark/Program.cs | 15 ++++++++++++--- tests/KNetBenchmark/ProgramSupport.cs | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/tests/KNetBenchmark/Program.cs b/tests/KNetBenchmark/Program.cs index d2eeb649e1..fb88a9db3d 100644 --- a/tests/KNetBenchmark/Program.cs +++ b/tests/KNetBenchmark/Program.cs @@ -145,20 +145,29 @@ static void Main(string[] args) if (ProduceOnly) { Console.WriteLine("Produce"); - Console.WriteLine($"KNet microseconds - Max {kNetProduceTimes.Max():####.##} Min {kNetProduceTimes.Min():####.##} Avg {kNetProduceTimes.Average():####.##} stdev {kNetProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation():####.##} %"); - Console.WriteLine($"Confluent microseconds - Max {confluentProduceTimes.Max():####.##} Min {confluentProduceTimes.Min():####.##} Avg {confluentProduceTimes.Average():####.##} stdev {confluentProduceTimes.StandardDeviation():####.##} CV {100 * confluentProduceTimes.CoefficientOfVariation():####.##} %"); - Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetProduceTimes.Max() / confluentProduceTimes.Max():####.##} Min {100 * (double)kNetProduceTimes.Min() / confluentProduceTimes.Min():####.##} Avg {100 * (double)kNetProduceTimes.Average() / confluentProduceTimes.Average():####.##} stdev {100 * (double)kNetProduceTimes.StandardDeviation() / confluentProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation() / confluentProduceTimes.CoefficientOfVariation():####.##} %"); + Console.WriteLine($"KNet microseconds - Max {kNetProduceTimes.Max():####.##} Min {kNetProduceTimes.Min():####.##} Avg {kNetProduceTimes.Average():####.##} stdev {kNetProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.StandardDeviation() / kNetProduceTimes.Average():####.##} %"); + Console.WriteLine($"KNet microseconds - Avg Filtered {kNetProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {kNetProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().StandardDeviation() / kNetProduceTimes.FilterMinMax().Average():####.##} %"); + Console.WriteLine($"Confluent microseconds - Max {confluentProduceTimes.Max():####.##} Min {confluentProduceTimes.Min():####.##} Avg {confluentProduceTimes.Average():####.##} stdev {confluentProduceTimes.StandardDeviation():####.##} CV {100 * confluentProduceTimes.StandardDeviation() / confluentProduceTimes.Average():####.##} %"); + Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().Average():####.##} %"); + Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetProduceTimes.Max() / confluentProduceTimes.Max():####.##} Min {100 * (double)kNetProduceTimes.Min() / confluentProduceTimes.Min():####.##} Avg {100 * (double)kNetProduceTimes.Average() / confluentProduceTimes.Average():####.##} stdev {100 * (double)kNetProduceTimes.StandardDeviation() / confluentProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation() / confluentProduceTimes.CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetProduceTimes.FilterMinMax().Average() / confluentProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {100 * (double)kNetProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().CoefficientOfVariation() / confluentProduceTimes.FilterMinMax().CoefficientOfVariation():####.##}"); } else { Console.WriteLine("Produce"); Console.WriteLine($"KNet microseconds - Max {kNetProduceTimes.Max():####.##} Min {kNetProduceTimes.Min():####.##} Avg {kNetProduceTimes.Average():####.##} stdev {kNetProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.StandardDeviation() / kNetProduceTimes.Average():####.##} %"); + Console.WriteLine($"KNet microseconds - Avg Filtered {kNetProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {kNetProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().StandardDeviation() / kNetProduceTimes.FilterMinMax().Average():####.##} %"); Console.WriteLine($"Confluent microseconds - Max {confluentProduceTimes.Max():####.##} Min {confluentProduceTimes.Min():####.##} Avg {confluentProduceTimes.Average():####.##} stdev {confluentProduceTimes.StandardDeviation():####.##} CV {100 * confluentProduceTimes.StandardDeviation() / confluentProduceTimes.Average():####.##} %"); + Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().Average():####.##} %"); Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetProduceTimes.Max() / confluentProduceTimes.Max():####.##} Min {100 * (double)kNetProduceTimes.Min() / confluentProduceTimes.Min():####.##} Avg {100 * (double)kNetProduceTimes.Average() / confluentProduceTimes.Average():####.##} stdev {100 * (double)kNetProduceTimes.StandardDeviation() / confluentProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation() / confluentProduceTimes.CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetProduceTimes.FilterMinMax().Average() / confluentProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {100 * (double)kNetProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().CoefficientOfVariation() / confluentProduceTimes.FilterMinMax().CoefficientOfVariation():####.##}"); Console.WriteLine("Consume"); Console.WriteLine($"KNet microseconds - Max {kNetConsumeTimes.Max():####.##} Min {kNetConsumeTimes.Min():####.##} Avg {kNetConsumeTimes.Average():####.##} stdev {kNetConsumeTimes.StandardDeviation():####.##} CV {100 * kNetConsumeTimes.CoefficientOfVariation():####.##} %"); + Console.WriteLine($"KNet microseconds - Avg Filtered {kNetConsumeTimes.FilterMinMax().Average():####.##} stdev Filtered {kNetConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##} %"); Console.WriteLine($"Confluent microseconds - Max {confluentConsumeTimes.Max():####.##} Min {confluentConsumeTimes.Min():####.##} Avg {confluentConsumeTimes.Average():####.##} stdev {confluentConsumeTimes.StandardDeviation():####.##} CV {100 * confluentConsumeTimes.CoefficientOfVariation():####.##} %"); + Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentConsumeTimes.FilterMinMax().Average():####.##} stdev Filtered {confluentConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##} %"); Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetConsumeTimes.Max() / confluentConsumeTimes.Max():########.##} Min {100 * (double)kNetConsumeTimes.Min() / confluentConsumeTimes.Min():####.##} Avg {100 * (double)kNetConsumeTimes.Average() / confluentConsumeTimes.Average():####.##} stdev {100 * (double)kNetConsumeTimes.StandardDeviation() / confluentConsumeTimes.StandardDeviation():####.##} CV {100 * kNetConsumeTimes.CoefficientOfVariation() / confluentConsumeTimes.CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetConsumeTimes.FilterMinMax().Average() / confluentConsumeTimes.FilterMinMax().Average():####.##} stdev Filtered {100 * (double)kNetConsumeTimes.FilterMinMax().StandardDeviation() / confluentConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetConsumeTimes.FilterMinMax().CoefficientOfVariation() / confluentConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##}"); } File.WriteAllText(Path.Combine(ResultsPath, $"results_{DateTime.Now:yyyyMMdd_HHmmss}.csv"), sb.ToString()); diff --git a/tests/KNetBenchmark/ProgramSupport.cs b/tests/KNetBenchmark/ProgramSupport.cs index 69ead12267..ac4e024171 100644 --- a/tests/KNetBenchmark/ProgramSupport.cs +++ b/tests/KNetBenchmark/ProgramSupport.cs @@ -54,6 +54,24 @@ public static double KbPerSecond(this Stopwatch watch, int numPacket, int length return (double)(length * numPacket) / 1024 / ((double)watch.ElapsedMicroSeconds() / 1000000); } + public static IEnumerable FilterMinMax(this IEnumerable values) + { + return FilterMinMax(values.Select(p => (double)p)); + } + + public static IEnumerable FilterMinMax(this IEnumerable values) + { + System.Collections.Generic.List result = new(values); + if (result.Count > 2) + { + var min = result.Min(); + var max = result.Max(); + result.Remove(min); + result.Remove(max); + } + return result; + } + public static double StandardDeviation(this IEnumerable values) { return StandardDeviation(values.Select(p => (double)p)); From 6d5d3d29cf5d65fc2ce8936b396d7490e7254360 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Mon, 2 May 2022 19:58:46 +0200 Subject: [PATCH 3/7] Fixed some numbers --- src/net/Documentation/articles/performance.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/net/Documentation/articles/performance.md b/src/net/Documentation/articles/performance.md index 17bc0660d9..bbcca35d75 100644 --- a/src/net/Documentation/articles/performance.md +++ b/src/net/Documentation/articles/performance.md @@ -102,7 +102,7 @@ Here below a set of results, in bold the results which are better using KNet: | | 10 bytes | 100 bytes | 1000 bytes | 10000 bytes | 100000 bytes | |--- |--- |--- |--- |--- |--- | | 10 messages | **9,04 (4,34)** | **5,47 (3,1)** | **15,45 (5,29)** | **7,54 (4,38)** | **19,73 (4,23)** | -| 100 messages | **18,9 (18,9)** | **38,1 (8,1)** | **30,34 (5,44)** | **26 (3,04)** | **69,4 (13,09)** | +| 100 messages | **18,9 (6,29)** | **38,1 (8,1)** | **30,34 (5,44)** | **26 (3,04)** | **69,4 (13,09)** | | 1000 messages | 197,73 (10,54) | 109,92 (6,13) | **57,6 (7,32)** | **52,71 (8,17)** | **75,76 (43,7)** | | 10000 messages | 2102,28 (736,54) | 796,84 (514,28) | 173,39 (401,76) | 123,62 (620,46) | **99,5 (108,3)** | @@ -111,7 +111,7 @@ Here below a set of results, in bold the results which are better using KNet: | | 10 bytes | 100 bytes | 1000 bytes | 10000 bytes | 100000 bytes | |--- |--- |--- |--- |--- |--- | | 10 messages | **85,93 (399,84)**| **85,41 (282,85)** | **85,14 (297,98)** | **24,07 (229,23)** | **36,23 (285,77)** | -| 100 messages | **94,54 (94,54)** | **94,7 (287,78)** | **68,49 (389,25)** | **71,97 (276,56)** | 108,57 (89,45) | +| 100 messages | **94,54 (479,13)** | **94,7 (287,78)** | **68,49 (389,25)** | **71,97 (276,56)** | 108,57 (89,45) | | 1000 messages | 192,27 (906,94) | 521,86 (867,93) | 103,62 (1854,84) | 255,52 (287,33) | 163,24 (124,23) | | 10000 messages | 9153,56 (77543,04) | 7948,76 (69701,75) | 3848,12 (23910,64) | 706,83 (3905,89) | 213,46 (1013,16) | @@ -123,7 +123,7 @@ The best consume performance was obtained with 10 messages of 10000 bytes: KNet ### Stdev ratio percentage -Looking at value within the brackets that represents the ratio of the stdev it is possible to highlight that: +Looking at value within the brackets, that represents the ratio of the stdev, it is possible to highlight that: - in produce KNet has more stable measures except when the number of messages is high (10000 messages); - in consume KNet has less stable measures. @@ -135,4 +135,4 @@ Using KNetProducer the numbers of JNI invocation are less than using KafkaProduc The same consideration can be applied on the consume side: KNetConsumer does not reduce the impact of JNI interface and it does not give any great improvement. The JNI interface has an impact even when the number of messages is high because during processing the Garbage Collector is activated many times increasing the JNI overhead. -With the upcoming JCOBridge major version the JNI impact will be reduced and KNet will get extra performance both in produce and in consume. +**With the upcoming JCOBridge major version the JNI impact will be reduced and KNet will get extra performance both in produce and in consume.** From bf80641dc934fad8fe90bffaa84e17c4283e8490 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Tue, 3 May 2022 08:32:13 +0200 Subject: [PATCH 4/7] #53: stdev becomes SD --- src/net/Documentation/articles/performance.md | 8 ++--- tests/KNetBenchmark/Program.cs | 36 +++++++++---------- tests/KNetBenchmark/ProgramSupport.cs | 2 +- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/net/Documentation/articles/performance.md b/src/net/Documentation/articles/performance.md index bbcca35d75..880d0d261b 100644 --- a/src/net/Documentation/articles/performance.md +++ b/src/net/Documentation/articles/performance.md @@ -97,7 +97,7 @@ The configuration is: Here below a set of results, in bold the results which are better using KNet: -- KNet/Confluent.Kafka Produce Average ratio percentage (stdev ratio percentage): +- KNet/Confluent.Kafka Produce Average ratio percentage (SD ratio percentage): | | 10 bytes | 100 bytes | 1000 bytes | 10000 bytes | 100000 bytes | |--- |--- |--- |--- |--- |--- | @@ -106,7 +106,7 @@ Here below a set of results, in bold the results which are better using KNet: | 1000 messages | 197,73 (10,54) | 109,92 (6,13) | **57,6 (7,32)** | **52,71 (8,17)** | **75,76 (43,7)** | | 10000 messages | 2102,28 (736,54) | 796,84 (514,28) | 173,39 (401,76) | 123,62 (620,46) | **99,5 (108,3)** | -- KNet/Confluent.Kafka Consume Average ratio percentage (stdev ratio percentage): +- KNet/Confluent.Kafka Consume Average ratio percentage (SD ratio percentage): | | 10 bytes | 100 bytes | 1000 bytes | 10000 bytes | 100000 bytes | |--- |--- |--- |--- |--- |--- | @@ -121,9 +121,9 @@ Looking at the above table KNet performs better than Confluent.Kafka with burst The best produce performance was obtained with 10 messages of 100, or 10000, bytes: KNet is 20 times fast than Confluent.Kafka. The best consume performance was obtained with 10 messages of 10000 bytes: KNet is 4 times fast than Confluent.Kafka. -### Stdev ratio percentage +### SD ratio percentage -Looking at value within the brackets, that represents the ratio of the stdev, it is possible to highlight that: +Looking at value within the brackets, that represents the ratio of the SD, it is possible to highlight that: - in produce KNet has more stable measures except when the number of messages is high (10000 messages); - in consume KNet has less stable measures. diff --git a/tests/KNetBenchmark/Program.cs b/tests/KNetBenchmark/Program.cs index fb88a9db3d..9458a4b21f 100644 --- a/tests/KNetBenchmark/Program.cs +++ b/tests/KNetBenchmark/Program.cs @@ -145,29 +145,29 @@ static void Main(string[] args) if (ProduceOnly) { Console.WriteLine("Produce"); - Console.WriteLine($"KNet microseconds - Max {kNetProduceTimes.Max():####.##} Min {kNetProduceTimes.Min():####.##} Avg {kNetProduceTimes.Average():####.##} stdev {kNetProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.StandardDeviation() / kNetProduceTimes.Average():####.##} %"); - Console.WriteLine($"KNet microseconds - Avg Filtered {kNetProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {kNetProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().StandardDeviation() / kNetProduceTimes.FilterMinMax().Average():####.##} %"); - Console.WriteLine($"Confluent microseconds - Max {confluentProduceTimes.Max():####.##} Min {confluentProduceTimes.Min():####.##} Avg {confluentProduceTimes.Average():####.##} stdev {confluentProduceTimes.StandardDeviation():####.##} CV {100 * confluentProduceTimes.StandardDeviation() / confluentProduceTimes.Average():####.##} %"); - Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().Average():####.##} %"); - Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetProduceTimes.Max() / confluentProduceTimes.Max():####.##} Min {100 * (double)kNetProduceTimes.Min() / confluentProduceTimes.Min():####.##} Avg {100 * (double)kNetProduceTimes.Average() / confluentProduceTimes.Average():####.##} stdev {100 * (double)kNetProduceTimes.StandardDeviation() / confluentProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation() / confluentProduceTimes.CoefficientOfVariation():####.##}"); - Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetProduceTimes.FilterMinMax().Average() / confluentProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {100 * (double)kNetProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().CoefficientOfVariation() / confluentProduceTimes.FilterMinMax().CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet microseconds - Max {kNetProduceTimes.Max():####.##} Min {kNetProduceTimes.Min():####.##} Avg {kNetProduceTimes.Average():####.##} SD {kNetProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.StandardDeviation() / kNetProduceTimes.Average():####.##} %"); + Console.WriteLine($"KNet microseconds - Avg Filtered {kNetProduceTimes.FilterMinMax().Average():####.##} SD Filtered {kNetProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().StandardDeviation() / kNetProduceTimes.FilterMinMax().Average():####.##} %"); + Console.WriteLine($"Confluent microseconds - Max {confluentProduceTimes.Max():####.##} Min {confluentProduceTimes.Min():####.##} Avg {confluentProduceTimes.Average():####.##} SD {confluentProduceTimes.StandardDeviation():####.##} CV {100 * confluentProduceTimes.StandardDeviation() / confluentProduceTimes.Average():####.##} %"); + Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentProduceTimes.FilterMinMax().Average():####.##} SD Filtered {confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().Average():####.##} %"); + Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetProduceTimes.Max() / confluentProduceTimes.Max():####.##} Min {100 * (double)kNetProduceTimes.Min() / confluentProduceTimes.Min():####.##} Avg {100 * (double)kNetProduceTimes.Average() / confluentProduceTimes.Average():####.##} SD {100 * (double)kNetProduceTimes.StandardDeviation() / confluentProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation() / confluentProduceTimes.CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetProduceTimes.FilterMinMax().Average() / confluentProduceTimes.FilterMinMax().Average():####.##} SD Filtered {100 * (double)kNetProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().CoefficientOfVariation() / confluentProduceTimes.FilterMinMax().CoefficientOfVariation():####.##}"); } else { Console.WriteLine("Produce"); - Console.WriteLine($"KNet microseconds - Max {kNetProduceTimes.Max():####.##} Min {kNetProduceTimes.Min():####.##} Avg {kNetProduceTimes.Average():####.##} stdev {kNetProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.StandardDeviation() / kNetProduceTimes.Average():####.##} %"); - Console.WriteLine($"KNet microseconds - Avg Filtered {kNetProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {kNetProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().StandardDeviation() / kNetProduceTimes.FilterMinMax().Average():####.##} %"); - Console.WriteLine($"Confluent microseconds - Max {confluentProduceTimes.Max():####.##} Min {confluentProduceTimes.Min():####.##} Avg {confluentProduceTimes.Average():####.##} stdev {confluentProduceTimes.StandardDeviation():####.##} CV {100 * confluentProduceTimes.StandardDeviation() / confluentProduceTimes.Average():####.##} %"); - Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().Average():####.##} %"); - Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetProduceTimes.Max() / confluentProduceTimes.Max():####.##} Min {100 * (double)kNetProduceTimes.Min() / confluentProduceTimes.Min():####.##} Avg {100 * (double)kNetProduceTimes.Average() / confluentProduceTimes.Average():####.##} stdev {100 * (double)kNetProduceTimes.StandardDeviation() / confluentProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation() / confluentProduceTimes.CoefficientOfVariation():####.##}"); - Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetProduceTimes.FilterMinMax().Average() / confluentProduceTimes.FilterMinMax().Average():####.##} stdev Filtered {100 * (double)kNetProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().CoefficientOfVariation() / confluentProduceTimes.FilterMinMax().CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet microseconds - Max {kNetProduceTimes.Max():####.##} Min {kNetProduceTimes.Min():####.##} Avg {kNetProduceTimes.Average():####.##} SD {kNetProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.StandardDeviation() / kNetProduceTimes.Average():####.##} %"); + Console.WriteLine($"KNet microseconds - Avg Filtered {kNetProduceTimes.FilterMinMax().Average():####.##} SD Filtered {kNetProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().StandardDeviation() / kNetProduceTimes.FilterMinMax().Average():####.##} %"); + Console.WriteLine($"Confluent microseconds - Max {confluentProduceTimes.Max():####.##} Min {confluentProduceTimes.Min():####.##} Avg {confluentProduceTimes.Average():####.##} SD {confluentProduceTimes.StandardDeviation():####.##} CV {100 * confluentProduceTimes.StandardDeviation() / confluentProduceTimes.Average():####.##} %"); + Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentProduceTimes.FilterMinMax().Average():####.##} SD Filtered {confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().Average():####.##} %"); + Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetProduceTimes.Max() / confluentProduceTimes.Max():####.##} Min {100 * (double)kNetProduceTimes.Min() / confluentProduceTimes.Min():####.##} Avg {100 * (double)kNetProduceTimes.Average() / confluentProduceTimes.Average():####.##} SD {100 * (double)kNetProduceTimes.StandardDeviation() / confluentProduceTimes.StandardDeviation():####.##} CV {100 * kNetProduceTimes.CoefficientOfVariation() / confluentProduceTimes.CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetProduceTimes.FilterMinMax().Average() / confluentProduceTimes.FilterMinMax().Average():####.##} SD Filtered {100 * (double)kNetProduceTimes.FilterMinMax().StandardDeviation() / confluentProduceTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetProduceTimes.FilterMinMax().CoefficientOfVariation() / confluentProduceTimes.FilterMinMax().CoefficientOfVariation():####.##}"); Console.WriteLine("Consume"); - Console.WriteLine($"KNet microseconds - Max {kNetConsumeTimes.Max():####.##} Min {kNetConsumeTimes.Min():####.##} Avg {kNetConsumeTimes.Average():####.##} stdev {kNetConsumeTimes.StandardDeviation():####.##} CV {100 * kNetConsumeTimes.CoefficientOfVariation():####.##} %"); - Console.WriteLine($"KNet microseconds - Avg Filtered {kNetConsumeTimes.FilterMinMax().Average():####.##} stdev Filtered {kNetConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##} %"); - Console.WriteLine($"Confluent microseconds - Max {confluentConsumeTimes.Max():####.##} Min {confluentConsumeTimes.Min():####.##} Avg {confluentConsumeTimes.Average():####.##} stdev {confluentConsumeTimes.StandardDeviation():####.##} CV {100 * confluentConsumeTimes.CoefficientOfVariation():####.##} %"); - Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentConsumeTimes.FilterMinMax().Average():####.##} stdev Filtered {confluentConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##} %"); - Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetConsumeTimes.Max() / confluentConsumeTimes.Max():########.##} Min {100 * (double)kNetConsumeTimes.Min() / confluentConsumeTimes.Min():####.##} Avg {100 * (double)kNetConsumeTimes.Average() / confluentConsumeTimes.Average():####.##} stdev {100 * (double)kNetConsumeTimes.StandardDeviation() / confluentConsumeTimes.StandardDeviation():####.##} CV {100 * kNetConsumeTimes.CoefficientOfVariation() / confluentConsumeTimes.CoefficientOfVariation():####.##}"); - Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetConsumeTimes.FilterMinMax().Average() / confluentConsumeTimes.FilterMinMax().Average():####.##} stdev Filtered {100 * (double)kNetConsumeTimes.FilterMinMax().StandardDeviation() / confluentConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetConsumeTimes.FilterMinMax().CoefficientOfVariation() / confluentConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet microseconds - Max {kNetConsumeTimes.Max():####.##} Min {kNetConsumeTimes.Min():####.##} Avg {kNetConsumeTimes.Average():####.##} SD {kNetConsumeTimes.StandardDeviation():####.##} CV {100 * kNetConsumeTimes.CoefficientOfVariation():####.##} %"); + Console.WriteLine($"KNet microseconds - Avg Filtered {kNetConsumeTimes.FilterMinMax().Average():####.##} SD Filtered {kNetConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##} %"); + Console.WriteLine($"Confluent microseconds - Max {confluentConsumeTimes.Max():####.##} Min {confluentConsumeTimes.Min():####.##} Avg {confluentConsumeTimes.Average():####.##} SD {confluentConsumeTimes.StandardDeviation():####.##} CV {100 * confluentConsumeTimes.CoefficientOfVariation():####.##} %"); + Console.WriteLine($"Confluent microseconds - Avg Filtered {confluentConsumeTimes.FilterMinMax().Average():####.##} SD Filtered {confluentConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * confluentConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##} %"); + Console.WriteLine($"KNet/Confluent ratio(%) - Max {100 * (double)kNetConsumeTimes.Max() / confluentConsumeTimes.Max():########.##} Min {100 * (double)kNetConsumeTimes.Min() / confluentConsumeTimes.Min():####.##} Avg {100 * (double)kNetConsumeTimes.Average() / confluentConsumeTimes.Average():####.##} SD {100 * (double)kNetConsumeTimes.StandardDeviation() / confluentConsumeTimes.StandardDeviation():####.##} CV {100 * kNetConsumeTimes.CoefficientOfVariation() / confluentConsumeTimes.CoefficientOfVariation():####.##}"); + Console.WriteLine($"KNet/Confluent ratio(%) - Avg Filtered {100 * (double)kNetConsumeTimes.FilterMinMax().Average() / confluentConsumeTimes.FilterMinMax().Average():####.##} SD Filtered {100 * (double)kNetConsumeTimes.FilterMinMax().StandardDeviation() / confluentConsumeTimes.FilterMinMax().StandardDeviation():####.##} CV Filtered {100 * kNetConsumeTimes.FilterMinMax().CoefficientOfVariation() / confluentConsumeTimes.FilterMinMax().CoefficientOfVariation():####.##}"); } File.WriteAllText(Path.Combine(ResultsPath, $"results_{DateTime.Now:yyyyMMdd_HHmmss}.csv"), sb.ToString()); diff --git a/tests/KNetBenchmark/ProgramSupport.cs b/tests/KNetBenchmark/ProgramSupport.cs index ac4e024171..2eb1fe73fa 100644 --- a/tests/KNetBenchmark/ProgramSupport.cs +++ b/tests/KNetBenchmark/ProgramSupport.cs @@ -77,7 +77,7 @@ public static double StandardDeviation(this IEnumerable values) return StandardDeviation(values.Select(p => (double)p)); } - public static double StandardDeviation(this IEnumerable values) + public static double StandardDeviation(this IEnumerable values) // from stackoverflow { double standardDeviation = 0; From 819d81aeb0b29f018dbf3e3424b20f8a77e8fd99 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 4 May 2022 00:41:26 +0200 Subject: [PATCH 5/7] #53: updates on report --- src/net/Documentation/articles/performance.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/net/Documentation/articles/performance.md b/src/net/Documentation/articles/performance.md index 880d0d261b..3c6c0f65dd 100644 --- a/src/net/Documentation/articles/performance.md +++ b/src/net/Documentation/articles/performance.md @@ -136,3 +136,16 @@ The same consideration can be applied on the consume side: KNetConsumer does not The JNI interface has an impact even when the number of messages is high because during processing the Garbage Collector is activated many times increasing the JNI overhead. **With the upcoming JCOBridge major version the JNI impact will be reduced and KNet will get extra performance both in produce and in consume.** + +The first measures done with an internal alfa release reports big improvements. Here some cases: +- 1000 messages: + - 10 bytes: Produce Average ratio percentage from 197 to 111, Consume Average ratio percentage from 192 to 137 + - 100 bytes: Produce Average ratio percentage from 109 to 86, Consume Average ratio percentage from 521 to 276 +- 10000 messages: + - 10 bytes: Produce Average ratio percentage from 2102 to 656, Consume Average ratio percentage from 9153 to 4139 + - 100 bytes: Produce Average ratio percentage from 796 to 410, Consume Average ratio percentage from 7948 to 3378 + - 1000 bytes: Produce Average ratio percentage from 173 to 81, Consume Average ratio percentage from 3848 to 1668 + - 10000 bytes: Produce Average ratio percentage from 123 to 80, Consume Average ratio percentage from 706 to 390 + - 100000 bytes: Produce Average ratio percentage from 100 to 98, Consume Average ratio percentage from 213 to 172 + + From cf6783c372c66283b8941c6819b5ef6b681913a9 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 4 May 2022 00:41:26 +0200 Subject: [PATCH 6/7] #53: updates on report --- src/net/Documentation/articles/performance.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/net/Documentation/articles/performance.md b/src/net/Documentation/articles/performance.md index 880d0d261b..0b00ef8e79 100644 --- a/src/net/Documentation/articles/performance.md +++ b/src/net/Documentation/articles/performance.md @@ -136,3 +136,16 @@ The same consideration can be applied on the consume side: KNetConsumer does not The JNI interface has an impact even when the number of messages is high because during processing the Garbage Collector is activated many times increasing the JNI overhead. **With the upcoming JCOBridge major version the JNI impact will be reduced and KNet will get extra performance both in produce and in consume.** + +The first measures done with an internal alfa release reports big improvements. Here some cases: +- 1000 messages: + - 10 bytes: Produce Average ratio percentage from 197 to 111, Consume Average ratio percentage from 192 to 137 + - 100 bytes: Produce Average ratio percentage from 109 to 86, Consume Average ratio percentage from 521 to 276 +- 10000 messages: + - 10 bytes: Produce Average ratio percentage from 2102 to 656, Consume Average ratio percentage from 9153 to 4139 + - 100 bytes: Produce Average ratio percentage from 796 to 410, Consume Average ratio percentage from 7948 to 3378 + - 1000 bytes: Produce Average ratio percentage from 173 to 81, Consume Average ratio percentage from 3848 to 1668 + - 10000 bytes: Produce Average ratio percentage from 123 to 80, Consume Average ratio percentage from 706 to 390 + - 100000 bytes: Produce Average ratio percentage from 100 to 96, Consume Average ratio percentage from 213 to 172 + + From b532ed831c50ecef6e3d250e95035014c7c8fcc7 Mon Sep 17 00:00:00 2001 From: masesdevelopers <94312179+masesdevelopers@users.noreply.github.com> Date: Wed, 4 May 2022 02:04:55 +0200 Subject: [PATCH 7/7] #53: extra tests --- src/net/Documentation/articles/performance.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/net/Documentation/articles/performance.md b/src/net/Documentation/articles/performance.md index 0b00ef8e79..fe9ab99138 100644 --- a/src/net/Documentation/articles/performance.md +++ b/src/net/Documentation/articles/performance.md @@ -138,9 +138,18 @@ The JNI interface has an impact even when the number of messages is high because **With the upcoming JCOBridge major version the JNI impact will be reduced and KNet will get extra performance both in produce and in consume.** The first measures done with an internal alfa release reports big improvements. Here some cases: +- 100 messages: + - 10 bytes: Produce Average ratio percentage from 18 to 11, Consume Average ratio percentage from 94 to 88 + - 100 bytes: Produce Average ratio percentage from 38 to 17, Consume Average ratio percentage from 94 to 88 + - 1000 bytes: Produce Average ratio percentage from 30 to 16, Consume Average ratio percentage from 68 to 38 + - 10000 bytes: Produce Average ratio percentage from 26 to 24, Consume Average ratio percentage from 71 to 47 + - 100000 bytes: Produce Average ratio percentage from 69 to 43, Consume Average ratio percentage from 108 to 101 - 1000 messages: - 10 bytes: Produce Average ratio percentage from 197 to 111, Consume Average ratio percentage from 192 to 137 - 100 bytes: Produce Average ratio percentage from 109 to 86, Consume Average ratio percentage from 521 to 276 + - 1000 bytes: Produce Average ratio percentage from 57 to 53, Consume Average ratio percentage from 103 to 56 + - 10000 bytes: Produce Average ratio percentage from 52 to 48, Consume Average ratio percentage from 255 to 177 + - 100000 bytes: Produce Average ratio percentage from 75 to 71, Consume Average ratio percentage from 163 to 146 - 10000 messages: - 10 bytes: Produce Average ratio percentage from 2102 to 656, Consume Average ratio percentage from 9153 to 4139 - 100 bytes: Produce Average ratio percentage from 796 to 410, Consume Average ratio percentage from 7948 to 3378