Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduced resource consumption in classes referring final fields #379

Merged
merged 5 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions .github/workflows/generateclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,9 @@ jobs:
with:
branch-suffix: short-commit-hash
add-paths: src/*.*
commit-message: |
Update generated classes after commit ${{ env.GITHUB_COMMIT_MESSAGE }}

fix #92
commit-message: Update generated classes after commit ${{ env.GITHUB_COMMIT_MESSAGE }}
title: Changes by GitHub action
body: Automated changes by GitHub action
body: Automated changes by GitHub action, fix #92
reviewers: masesdevelopers
assignees: masesdevelopers
labels: java, .NET, enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,27 @@

using MASES.JCOBridge.C2JBridge.JVMInterop;
using Org.Apache.Kafka.Common.Record;
using System;

namespace Org.Apache.Kafka.Clients.Consumer
{
public partial class ConsumerRecord
{
DateTime? _dateTime = null;
/// <summary>
/// <see cref="System.DateTime"/> of <see cref="Timestamp"/>
/// </summary>
public System.DateTime DateTime => System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime;
public System.DateTime DateTime => _dateTime ??= System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime;

// public TimestampType TimestampType => (TimestampType)System.Enum.Parse(typeof(TimestampType), IExecute<IJavaObject>("timestampType").Invoke<string>("name")); // (TimestampType)(int)IExecute<IJavaObject>("timestampType").GetField("id");
}

public partial class ConsumerRecord<K, V>
{
DateTime? _dateTime = null;
/// <summary>
/// <see cref="System.DateTime"/> of <see cref="Timestamp"/>
/// </summary>
public System.DateTime DateTime => System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime;
public System.DateTime DateTime => _dateTime ??= System.DateTimeOffset.FromUnixTimeMilliseconds(Timestamp()).DateTime;
}
}
105 changes: 58 additions & 47 deletions src/net/KNet/Specific/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,31 @@

using Java.Time;
using Java.Util;
using MASES.JCOBridge.C2JBridge;
using System;
using System.Collections.Concurrent;
using Org.Apache.Kafka.Clients.Consumer;
using MASES.KNet.Serialization;
using Org.Apache.Kafka.Common.Header;
using Org.Apache.Kafka.Common.Record;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace MASES.KNet.Consumer
{
/// <summary>
/// KNet extension of <see cref="IConsumer{K, V}"/>
/// KNet extension of <see cref="Org.Apache.Kafka.Clients.Consumer.IConsumer{K, V}"/>
/// </summary>
/// <typeparam name="K">The key type</typeparam>
/// <typeparam name="V">The value type</typeparam>
public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
public interface IKNetConsumer<K, V> : Org.Apache.Kafka.Clients.Consumer.IConsumer<byte[], byte[]>
{
#if NET7_0_OR_GREATER
/// <summary>
/// <see langword="true"/> if enumeration will use prefetch and the number of records is more than <see cref="PrefetchThreshold"/>, i.e. the preparation of <see cref="KNetConsumerRecord{K, V}"/> happens in an external thread
/// </summary>
/// <remarks>It is <see langword="true"/> by default if one of <typeparamref name="K"/> or <typeparamref name="V"/> are not <see cref="ValueType"/>, override the value using <see cref="ApplyPrefetch(bool, int)"/></remarks>
bool IsPrefecth { get; }
/// <summary>
/// The minimum threshold to activate pretech, i.e. the preparation of <see cref="KNetConsumerRecord{K, V}"/> happens in external thread if <see cref="Org.Apache.Kafka.Clients.Consumer.ConsumerRecords{K, V}"/> contains more than <see cref="PrefetchThreshold"/> elements
/// </summary>
/// <remarks>The default value is 10, however it shall be chosen by the developer and in the decision shall be verified if external thread activation costs more than inline execution</remarks>
int PrefetchThreshold { get; }
#endif
/// <summary>
/// <see langword="true"/> if the <see cref="IKNetConsumer{K, V}"/> instance is completing async operation
/// </summary>
Expand All @@ -50,6 +55,15 @@ public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
/// Number of messages in the <see cref="IKNetConsumer{K, V}"/> instance waiting to be processed in async operation
/// </summary>
int WaitingMessages { get; }
#if NET7_0_OR_GREATER
/// <summary>
/// Set to <see langword="true"/> to enable enumeration with prefetch over <paramref name="prefetchThreshold"/> threshold, i.e. preparation of <see cref="KNetConsumerRecord{K, V}"/> in external thread
/// </summary>
/// <param name="enablePrefetch"><see langword="true"/> to enable prefetch. See <see cref="IsPrefecth"/></param>
/// <param name="prefetchThreshold">The minimum threshold to activate pretech, default is 10. See <see cref="PrefetchThreshold"/></param>
/// <remarks>Setting <paramref name="prefetchThreshold"/> to a value less, or equal, to 0 and <paramref name="enablePrefetch"/> to <see langword="true"/>, the prefetch is always actived</remarks>
void ApplyPrefetch(bool enablePrefetch = true, int prefetchThreshold = 10);
#endif
/// <summary>
/// Sets the <see cref="Action{T}"/> to use to receive <see cref="KNetConsumerRecord{K, V}"/>
/// </summary>
Expand Down Expand Up @@ -81,65 +95,44 @@ public interface IKNetConsumer<K, V> : IConsumer<byte[], byte[]>
void Consume(long timeoutMs, Action<KNetConsumerRecord<K, V>> callback);
}
/// <summary>
/// KNet extension of <see cref="KafkaConsumer{K, V}"/>
/// KNet extension of <see cref="Org.Apache.Kafka.Clients.Consumer.KafkaConsumer{K, V}"/>
/// </summary>
/// <typeparam name="K">The key type</typeparam>
/// <typeparam name="V">The value type</typeparam>
public class KNetConsumer<K, V> : KafkaConsumer<byte[], byte[]>, IKNetConsumer<K, V>
public class KNetConsumer<K, V> : Org.Apache.Kafka.Clients.Consumer.KafkaConsumer<byte[], byte[]>, IKNetConsumer<K, V>
{
readonly bool _autoCreateSerDes = false;
bool _threadRunning = false;
long _dequeing = 0;
readonly System.Threading.Thread _consumeThread = null;
readonly ConcurrentQueue<KNetConsumerRecords<K, V>> _consumedRecords = null;
readonly KNetConsumerCallback<K, V> _consumerCallback = null;
readonly IKNetDeserializer<K> _keyDeserializer;
readonly IKNetDeserializer<V> _valueDeserializer;
readonly IKNetSerDes<K> _keyDeserializer;
readonly IKNetSerDes<V> _valueDeserializer;
/// <summary>
/// <see href="https://www.jcobridge.com/api-clr/html/P_MASES_JCOBridge_C2JBridge_JVMBridgeBase_BridgeClassName.htm"/>
/// </summary>
public override string BridgeClassName => "org.mases.knet.clients.consumer.KNetConsumer";
/// <summary>
/// Initialize a new instance of <see cref="KNetConsumer{K, V}"/>
/// </summary>
/// <param name="props">The properties to use, see <see cref="ConsumerConfig"/> and <see cref="ConsumerConfigBuilder"/></param>
/// <param name="useJVMCallback"><see langword="true"/> to active callback based mode</param>
public KNetConsumer(Properties props, bool useJVMCallback = false)
: this(props, new KNetSerDes<K>(), new KNetSerDes<V>(), useJVMCallback)
{
_autoCreateSerDes = true;

if (useJVMCallback)
{
_consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage, _keyDeserializer, _valueDeserializer);
IExecute("setCallback", _consumerCallback);
}
else
{
_consumedRecords = new();
_threadRunning = true;
_consumeThread = new(ConsumeHandler);
_consumeThread.Start();
}
}
internal KNetConsumer(Properties props) : base(props) { }

/// <summary>
/// Initialize a new instance of <see cref="KNetConsumer{K, V}"/>
/// </summary>
/// <param name="configBuilder">An instance of <see cref="ConsumerConfigBuilder"/> </param>
/// <param name="useJVMCallback"><see langword="true"/> to active callback based mode</param>
public KNetConsumer(ConsumerConfigBuilder configBuilder, bool useJVMCallback = false)
: this(configBuilder, configBuilder.BuildKeySerDes<K>(), configBuilder.BuildValueSerDes<V>())
: this(configBuilder, configBuilder.BuildKeySerDes<K>(), configBuilder.BuildValueSerDes<V>(), useJVMCallback)
{
_autoCreateSerDes = true;
}
/// <summary>
/// Initialize a new instance of <see cref="KNetConsumer{K, V}"/>
/// </summary>
/// <param name="props">The properties to use, see <see cref="ConsumerConfig"/> and <see cref="ConsumerConfigBuilder"/></param>
/// <param name="props">The properties to use, see <see cref="ConsumerConfigBuilder"/></param>
/// <param name="keyDeserializer">Key serializer base on <see cref="KNetSerDes{K}"/></param>
/// <param name="valueDeserializer">Value serializer base on <see cref="KNetSerDes{K}"/></param>
/// <param name="useJVMCallback"><see langword="true"/> to active callback based mode</param>
public KNetConsumer(Properties props, IKNetDeserializer<K> keyDeserializer, IKNetDeserializer<V> valueDeserializer, bool useJVMCallback = false)
public KNetConsumer(ConsumerConfigBuilder props, IKNetSerDes<K> keyDeserializer, IKNetSerDes<V> valueDeserializer, bool useJVMCallback = false)
: base(CheckProperties(props), keyDeserializer.KafkaDeserializer, valueDeserializer.KafkaDeserializer)
{
_keyDeserializer = keyDeserializer;
Expand All @@ -161,17 +154,17 @@ public KNetConsumer(Properties props, IKNetDeserializer<K> keyDeserializer, IKNe

static Properties CheckProperties(Properties props)
{
if (!props.ContainsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
if (!props.ContainsKey(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
{
props.Put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
}
else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}, remove from configuration.");
else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG}, remove from configuration.");

if (!props.ContainsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
if (!props.ContainsKey(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
{
props.Put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.Put(Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
}
else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}, remove from configuration.");
else throw new InvalidOperationException($"KNetConsumer auto manages configuration property {Org.Apache.Kafka.Clients.Consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG}, remove from configuration.");

return props;
}
Expand All @@ -186,13 +179,13 @@ static Properties CheckProperties(Properties props)
/// <inheritdoc cref="IKNetConsumer{K, V}.Poll(long)"/>
public new KNetConsumerRecords<K, V> Poll(long timeoutMs)
{
var records = IExecute<ConsumerRecords<byte[], byte[]>>("poll", timeoutMs);
var records = IExecute<Org.Apache.Kafka.Clients.Consumer.ConsumerRecords<byte[], byte[]>>("poll", timeoutMs);
return new KNetConsumerRecords<K, V>(records, _keyDeserializer, _valueDeserializer);
}
/// <inheritdoc cref="IKNetConsumer{K, V}.Poll(Duration)"/>
public new KNetConsumerRecords<K, V> Poll(Duration timeout)
{
var records = IExecute<ConsumerRecords<byte[], byte[]>>("poll", timeout);
var records = IExecute<Org.Apache.Kafka.Clients.Consumer.ConsumerRecords<byte[], byte[]>>("poll", timeout);
return new KNetConsumerRecords<K, V>(records, _keyDeserializer, _valueDeserializer);
}

Expand Down Expand Up @@ -227,6 +220,14 @@ public override void Dispose()
}
base.Dispose();
}
#if NET7_0_OR_GREATER
/// <inheritdoc cref="IKNetConsumer{K, V}.ApplyPrefetch(bool, int)"/>
public void ApplyPrefetch(bool enablePrefetch = true, int prefetchThreshold = 10)
{
IsPrefecth = enablePrefetch;
PrefetchThreshold = IsPrefecth ? prefetchThreshold : 10;
}
#endif
/// <inheritdoc cref="IKNetConsumer{K, V}.SetCallback(Action{KNetConsumerRecord{K, V}})"/>
public void SetCallback(Action<KNetConsumerRecord<K, V>> cb)
{
Expand Down Expand Up @@ -266,6 +267,12 @@ void ConsumeHandler(object o)
}
catch { }
}
#if NET7_0_OR_GREATER
/// <inheritdoc cref="IKNetConsumer{K, V}.IsPrefecth"/>
public bool IsPrefecth { get; private set; } = !(typeof(K).IsValueType && typeof(V).IsValueType);
/// <inheritdoc cref="IKNetConsumer{K, V}.PrefetchThreshold"/>
public int PrefetchThreshold { get; private set; } = 10;
#endif
/// <inheritdoc cref="IKNetConsumer{K, V}.IsCompleting"/>
public bool IsCompleting => !_consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref _dequeing) != 0;
/// <inheritdoc cref="IKNetConsumer{K, V}.IsEmpty"/>
Expand All @@ -284,7 +291,11 @@ public bool ConsumeAsync(long timeoutMs)
bool isEmpty = results.IsEmpty;
if (!isEmpty)
{
#if NET7_0_OR_GREATER
_consumedRecords.Enqueue(results.ApplyPrefetch(IsPrefecth, PrefetchThreshold));
#else
_consumedRecords.Enqueue(results);
#endif
lock (_consumedRecords)
{
System.Threading.Monitor.Pulse(_consumedRecords);
Expand Down
2 changes: 1 addition & 1 deletion src/net/KNet/Specific/Consumer/KNetConsumerCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public KNetConsumerCallback(Action<KNetConsumerRecord<K, V>> recordReady, IKNetD
void OnRecordReadyEventHandler(object sender, CLRListenerEventArgs<CLREventData> data)
{
var record = this.BridgeInstance.Invoke<ConsumerRecord<byte[], byte[]>>("getRecord");
recordReadyFunction(new KNetConsumerRecord<K, V>(record, _keyDeserializer, _valueDeserializer));
recordReadyFunction(new KNetConsumerRecord<K, V>(record, _keyDeserializer, _valueDeserializer, false));
}

public virtual void RecordReady(KNetConsumerRecord<K, V> message) { }
Expand Down
Loading
Loading