Skip to content

Commit

Permalink
Reduced resource consumption in classes referring final fields (#379)
Browse files Browse the repository at this point in the history
* Update on classes to use immutable fields of JVM classes, added same serialization optimization of KNet Streams SDK

* Prefetch available only on supported platform
  • Loading branch information
masesdevelopers authored Jan 24, 2024
1 parent d4a41cb commit cc2b630
Show file tree
Hide file tree
Showing 20 changed files with 428 additions and 212 deletions.
7 changes: 2 additions & 5 deletions .github/workflows/generateclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,9 @@ jobs:
with:
branch-suffix: short-commit-hash
add-paths: src/*.*
commit-message: |
Update generated classes after commit ${{ env.GITHUB_COMMIT_MESSAGE }}
fix #92
commit-message: Update generated classes after commit ${{ env.GITHUB_COMMIT_MESSAGE }}
title: Changes by GitHub action
body: Automated changes by GitHub action
body: Automated changes by GitHub action, fix #92
reviewers: masesdevelopers
assignees: masesdevelopers
labels: java, .NET, enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,27 @@

using MASES.JCOBridge.C2JBridge.JVMInterop;
using Org.Apache.Kafka.Common.Record;
using System;

namespace Org.Apache.Kafka.Clients.Consumer
{
public partial class ConsumerRecord
{
DateTime? _dateTime = null;
/// <summary>
/// <see cref="System.DateTime"/> of <see cref="Timestamp"/>
/// </summary>
public System.DateTime DateTime => System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime;
public System.DateTime DateTime => _dateTime ??= System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime;

// public TimestampType TimestampType => (TimestampType)System.Enum.Parse(typeof(TimestampType), IExecute<IJavaObject>("timestampType").Invoke<string>("name")); // (TimestampType)(int)IExecute<IJavaObject>("timestampType").GetField("id");
}

public partial class ConsumerRecord<K, V>
{
DateTime? _dateTime = null;
/// <summary>
/// <see cref="System.DateTime"/> of <see cref="Timestamp"/>
/// </summary>
public System.DateTime DateTime => System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime;
public System.DateTime DateTime => _dateTime ??= System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime;
}
}
105 changes: 58 additions & 47 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,31 @@

using Java.Time;
using Java.Util;
using MASES.JCOBridge.C2JBridge;
using System;
using System.Collections.Concurrent;
using Org.Apache.Kafka.Clients.Consumer;
using MASES.KNet.Serialization;
using Org.Apache.Kafka.Common.Header;
using Org.Apache.Kafka.Common.Record;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MASES.KNet.Consumer
{
/// <summary>
/// KNet extension of <see cref="IConsumer{K, V}"/>
/// KNet extension of <see cref="Org.Apache.Kafka.Clients.Consumer.IConsumer{K, V}"/>
/// </summary>
/// <typeparam name="K">The key type</typeparam>
/// <typeparam name="V">The value type</typeparam>
public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
public interface IKNetConsumer<K, V> : Org.Apache.Kafka.Clients.Consumer.IConsumer<byte[], byte[]>
{
#if NET7_0_OR_GREATER
/// <summary>
/// <see langword="true"/> if enumeration will use prefetch and the number of records is more than <see cref="PrefetchThreshold"/>, i.e. the preparation of <see cref="KNetConsumerRecord{K, V}"/> happens in an external thread
/// </summary>
/// <remarks>It is <see langword="true"/> by default if one of <typeparamref name="K"/> or <typeparamref name="V"/> are not <see cref="ValueType"/>, override the value using <see cref="ApplyPrefetch(bool, int)"/></remarks>
bool IsPrefecth { get; }
/// <summary>
/// The minimum threshold to activate pretech, i.e. the preparation of <see cref="KNetConsumerRecord{K, V}"/> happens in external thread if <see cref="Org.Apache.Kafka.Clients.Consumer.ConsumerRecords{K, V}"/> contains more than <see cref="PrefetchThreshold"/> elements
/// </summary>
/// <remarks>The default value is 10, however it shall be chosen by the developer and in the decision shall be verified if external thread activation costs more than inline execution</remarks>
int PrefetchThreshold { get; }
#endif
/// <summary>
/// <see langword="true"/> if the <see cref="IKNetConsumer{K, V}"/> instance is completing async operation
/// </summary>
Expand All @@ -50,6 +55,15 @@ public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
/// Number of messages in the <see cref="IKNetConsumer{K, V}"/> instance waiting to be processed in async operation
/// </summary>
int WaitingMessages { get; }
#if NET7_0_OR_GREATER
/// <summary>
/// Set to <see langword="true"/> to enable enumeration with prefetch over <paramref name="prefetchThreshold"/> threshold, i.e. preparation of <see cref="KNetConsumerRecord{K, V}"/> in external thread
/// </summary>
/// <param name="enablePrefetch"><see langword="true"/> to enable prefetch. See <see cref="IsPrefecth"/></param>
/// <param name="prefetchThreshold">The minimum threshold to activate pretech, default is 10. See <see cref="PrefetchThreshold"/></param>
/// <remarks>Setting <paramref name="prefetchThreshold"/> to a value less, or equal, to 0 and <paramref name="enablePrefetch"/> to <see langword="true"/>, the prefetch is always actived</remarks>
void ApplyPrefetch(bool enablePrefetch = true, int prefetchThreshold = 10);
#endif
/// <summary>
/// Sets the <see cref="Action{T}"/> to use to receive <see cref="KNetConsumerRecord{K, V}"/>
/// </summary>
Expand Down Expand Up @@ -81,65 +95,44 @@ public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
void Consume(long timeoutMs, Action<KNetConsumerRecord<K, V>> callback);
}
/// <summary>
/// KNet extension of <see cref="KafkaConsumer{K, V}"/>
/// KNet extension of <see cref="Org.Apache.Kafka.Clients.Consumer.KafkaConsumer{K, V}"/>
/// </summary>
/// <typeparam name="K">The key type</typeparam>
/// <typeparam name="V">The value type</typeparam>
public class KNetConsumer<K, V> : KafkaConsumer<byte[], byte[]>, IKNetConsumer<K, V>
public class KNetConsumer<K, V> : Org.Apache.Kafka.Clients.Consumer.KafkaConsumer<byte[], byte[]>, IKNetConsumer<K, V>
{
readonly bool _autoCreateSerDes = false;
bool _threadRunning = false;
long _dequeing = 0;
readonly System.Threading.Thread _consumeThread = null;
readonly ConcurrentQueue<KNetConsumerRecords<K, V>> _consumedRecords = null;
readonly KNetConsumerCallback<K, V> _consumerCallback = null;
readonly IKNetDeserializer<K> _keyDeserializer;
readonly IKNetDeserializer<V> _valueDeserializer;
readonly IKNetSerDes<K> _keyDeserializer;
readonly IKNetSerDes<V> _valueDeserializer;
/// <summary>
/// <see href="https://www.jcobridge.com/api-clr/html/P_MASES_JCOBridge_C2JBridge_JVMBridgeBase_BridgeClassName.htm"/>
/// </summary>
public override string BridgeClassName => "org.mases.knet.clients.consumer.KNetConsumer";
/// <summary>
/// Initialize a new instance of <see cref="KNetConsumer{K, V}"/>
/// </summary>
/// <param name="props">The properties to use, see <see cref="ConsumerConfig"/> and <see cref="ConsumerConfigBuilder"/></param>
/// <param name="useJVMCallback"><see langword="true"/> to active callback based mode</param>
public KNetConsumer(Properties props, bool useJVMCallback = false)
: this(props, new KNetSerDes<K>(), new KNetSerDes<V>(), useJVMCallback)
{
_autoCreateSerDes = true;

if (useJVMCallback)
{
_consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage, _keyDeserializer, _valueDeserializer);
IExecute("setCallback", _consumerCallback);
}
else
{
_consumedRecords = new();
_threadRunning = true;
_consumeThread = new(ConsumeHandler);
_consumeThread.Start();
}
}
internal KNetConsumer(Properties props) : base(props) { }

/// <summary>
/// Initialize a new instance of <see cref="KNetConsumer{K, V}"/>
/// </summary>
/// <param name="configBuilder">An instance of <see cref="ConsumerConfigBuilder"/> </param>
/// <param name="useJVMCallback"><see langword="true"/> to active callback based mode</param>
public KNetConsumer(ConsumerConfigBuilder configBuilder, bool useJVMCallback = false)
: this(configBuilder, configBuilder.BuildKeySerDes<K>(), configBuilder.BuildValueSerDes<V>())
: this(configBuilder, configBuilder.BuildKeySerDes<K>(), configBuilder.BuildValueSerDes<V>(), useJVMCallback)
{
_autoCreateSerDes = true;
}
/// <summary>
/// Initialize a new instance of <see cref="KNetConsumer{K, V}"/>
/// </summary>
/// <param name="props">The properties to use, see <see cref="ConsumerConfig"/> and <see cref="ConsumerConfigBuilder"/></param>
/// <param name="props">The properties to use, see <see cref="ConsumerConfigBuilder"/></param>
/// <param name="keyDeserializer">Key serializer base on <see cref="KNetSerDes{K}"/></param>
/// <param name="valueDeserializer">Value serializer base on <see cref="KNetSerDes{K}"/></param>
/// <param name="useJVMCallback"><see langword="true"/> to active callback based mode</param>
public KNetConsumer(Properties props, IKNetDeserializer<K> keyDeserializer, IKNetDeserializer<V> valueDeserializer, bool useJVMCallback = false)
public KNetConsumer(ConsumerConfigBuilder props, IKNetSerDes<K> keyDeserializer, IKNetSerDes<V> valueDeserializer, bool useJVMCallback = false)
: base(CheckProperties(props), keyDeserializer.KafkaDeserializer, valueDeserializer.KafkaDeserializer)
{
_keyDeserializer = keyDeserializer;
Expand All @@ -161,17 +154,17 @@ public KNetConsumer(Properties props, IKNetDeserializer<K> keyDeserializer, IKNe

static Properties CheckProperties(Properties props)
{
if (!props.ContainsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
if (!props.ContainsKey(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
{
props.Put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
}
else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}, remove from configuration.");
else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}, remove from configuration.");

if (!props.ContainsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
if (!props.ContainsKey(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
{
props.Put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
}
else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}, remove from configuration.");
else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}, remove from configuration.");

return props;
}
Expand All @@ -186,13 +179,13 @@ static Properties CheckProperties(Properties props)
/// <inheritdoc cref="IKNetConsumer{K, V}.Poll(long)"/>
public new KNetConsumerRecords<K, V> Poll(long timeoutMs)
{
var records = IExecute<ConsumerRecords<byte[], byte[]>>("poll", timeoutMs);
var records = IExecute<Org.Apache.Kafka.Clients.Consumer.ConsumerRecords<byte[], byte[]>>("poll", timeoutMs);
return new KNetConsumerRecords<K, V>(records, _keyDeserializer, _valueDeserializer);
}
/// <inheritdoc cref="IKNetConsumer{K, V}.Poll(Duration)"/>
public new KNetConsumerRecords<K, V> Poll(Duration timeout)
{
var records = IExecute<ConsumerRecords<byte[], byte[]>>("poll", timeout);
var records = IExecute<Org.Apache.Kafka.Clients.Consumer.ConsumerRecords<byte[], byte[]>>("poll", timeout);
return new KNetConsumerRecords<K, V>(records, _keyDeserializer, _valueDeserializer);
}

Expand Down Expand Up @@ -227,6 +220,14 @@ public override void Dispose()
}
base.Dispose();
}
#if NET7_0_OR_GREATER
/// <inheritdoc cref="IKNetConsumer{K, V}.ApplyPrefetch(bool, int)"/>
public void ApplyPrefetch(bool enablePrefetch = true, int prefetchThreshold = 10)
{
IsPrefecth = enablePrefetch;
PrefetchThreshold = IsPrefecth ? prefetchThreshold : 10;
}
#endif
/// <inheritdoc cref="IKNetConsumer{K, V}.SetCallback(Action{KNetConsumerRecord{K, V}})"/>
public void SetCallback(Action<KNetConsumerRecord<K, V>> cb)
{
Expand Down Expand Up @@ -266,6 +267,12 @@ void ConsumeHandler(object o)
}
catch { }
}
#if NET7_0_OR_GREATER
/// <inheritdoc cref="IKNetConsumer{K, V}.IsPrefecth"/>
public bool IsPrefecth { get; private set; } = !(typeof(K).IsValueType && typeof(V).IsValueType);
/// <inheritdoc cref="IKNetConsumer{K, V}.PrefetchThreshold"/>
public int PrefetchThreshold { get; private set; } = 10;
#endif
/// <inheritdoc cref="IKNetConsumer{K, V}.IsCompleting"/>
public bool IsCompleting => !_consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref _dequeing) != 0;
/// <inheritdoc cref="IKNetConsumer{K, V}.IsEmpty"/>
Expand All @@ -284,7 +291,11 @@ public bool ConsumeAsync(long timeoutMs)
bool isEmpty = results.IsEmpty;
if (!isEmpty)
{
#if NET7_0_OR_GREATER
_consumedRecords.Enqueue(results.ApplyPrefetch(IsPrefecth, PrefetchThreshold));
#else
_consumedRecords.Enqueue(results);
#endif
lock (_consumedRecords)
{
System.Threading.Monitor.Pulse(_consumedRecords);
Expand Down
2 changes: 1 addition & 1 deletion src/net/KNet/Specific/Consumer/KNetConsumerCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public KNetConsumerCallback(Action<KNetConsumerRecord<K, V>> recordReady, IKNetD
void OnRecordReadyEventHandler(object sender, CLRListenerEventArgs<CLREventData> data)
{
var record = this.BridgeInstance.Invoke<ConsumerRecord<byte[], byte[]>>("getRecord");
recordReadyFunction(new KNetConsumerRecord<K, V>(record, _keyDeserializer, _valueDeserializer));
recordReadyFunction(new KNetConsumerRecord<K, V>(record, _keyDeserializer, _valueDeserializer, false));
}

public virtual void RecordReady(KNetConsumerRecord<K, V> message) { }
Expand Down
Loading

0 comments on commit cc2b630

Please sign in to comment.