diff --git a/src/jvm/knet/src/main/java/org/mases/knet/streams/kstream/KNetPredicateTester.java b/src/jvm/knet/src/main/java/org/mases/knet/streams/kstream/KNetPredicateEqualityTest.java similarity index 79% rename from src/jvm/knet/src/main/java/org/mases/knet/streams/kstream/KNetPredicateTester.java rename to src/jvm/knet/src/main/java/org/mases/knet/streams/kstream/KNetPredicateEqualityTest.java index 13003a1a12..8e5713ce5b 100644 --- a/src/jvm/knet/src/main/java/org/mases/knet/streams/kstream/KNetPredicateTester.java +++ b/src/jvm/knet/src/main/java/org/mases/knet/streams/kstream/KNetPredicateEqualityTest.java @@ -23,23 +23,19 @@ import java.util.Arrays; -public final class KNetPredicateTester implements Predicate { +public final class KNetPredicateEqualityTest implements Predicate { Boolean _isKey = null; byte[] _key = null; byte[] _value = null; - public KNetPredicateTester(byte[] keyOrValue, boolean isKey) { + public synchronized void setWorkingState(byte[] key, byte[] value, Boolean isKey) { _isKey = isKey; - _key = keyOrValue; - } - - public KNetPredicateTester(byte[] keyOrValue, byte[] value) { - _key = keyOrValue; + _key = key; _value = value; } @Override - public boolean test(byte[] bytes, byte[] bytes2) { + public synchronized boolean test(byte[] bytes, byte[] bytes2) { if (_isKey != null) { if (_isKey == true) { return Arrays.equals(bytes, _key); diff --git a/src/net/KNet/Specific/Streams/Kstream/KNetPredicateEqualityTest.cs b/src/net/KNet/Specific/Streams/Kstream/KNetPredicateEqualityTest.cs new file mode 100644 index 0000000000..6ba131c2ea --- /dev/null +++ b/src/net/KNet/Specific/Streams/Kstream/KNetPredicateEqualityTest.cs @@ -0,0 +1,109 @@ +/* +* Copyright 2023 MASES s.r.l. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +* Refer to LICENSE for more information. +*/ + +using MASES.JCOBridge.C2JBridge; +using MASES.KNet.Serialization; + +namespace MASES.KNet.Streams.Kstream +{ + /// + /// KNet extension of to execute directly in the JVM + /// + public class KNetPredicateEqualityTest : JVMBridgeBase>, IGenericSerDesFactoryApplier + { + TKey _key; + TValue _value; + bool? _isKeyCheck; + IGenericSerDesFactory _factory; + IGenericSerDesFactory IGenericSerDesFactoryApplier.Factory { get => _factory; set { _factory = value; } } + + /// + /// + /// + public override string BridgeClassName => "org.mases.knet.streams.kstream.KNetPredicateEqualityTest"; + /// + /// Converter from to + /// + /// This cast is useful when an API needs in input a type like , however the behavior of the in output is different from the same class allocated directly + public static implicit operator Org.Apache.Kafka.Streams.Kstream.Predicate(KNetPredicateEqualityTest t) + { + if (t._factory == null) throw new System.InvalidOperationException("The operator shall be invoked within a function which set the IGenericSerDesFactory instance."); + t.updateRemote(); + return t.Cast>(); + } + + void updateRemote() + { + if (_factory == null) return; + var keySerDes = _factory.BuildKeySerDes(); + var valueSerDes = _factory.BuildValueSerDes(); + byte[] key = !_isKeyCheck.HasValue || _isKeyCheck.Value ? keySerDes.Serialize(null, _key): null; + byte[] value = !_isKeyCheck.HasValue || !_isKeyCheck.Value ? valueSerDes.Serialize(null, _value) : null; + + IExecute("setWorkingState", key, value, _isKeyCheck.HasValue ? _isKeyCheck.Value : null); + } + + /// + /// Default constructor: even if the corresponding Java class does not have one, it is mandatory for JCOBridge + /// + public KNetPredicateEqualityTest() { } + /// + /// Initialize a new only for key comparison + /// + /// The key to use in comparison + public KNetPredicateEqualityTest(TKey key) + { + _key = key; + _isKeyCheck = true; + } + /// + /// Initialize a new only for value comparison + /// + /// The value to use in comparison + public KNetPredicateEqualityTest(TValue value) + { + _value = value; + _isKeyCheck = false; + } + /// + /// Initialize a new for both key and value comparison + /// + /// The key to use in comparison + /// The value to use in comparison + /// Set to to check the , set to to check the or leave undefined to check both and + /// Both and shall be equal to input parameters of to return + public KNetPredicateEqualityTest(TKey key, TValue value, bool? isKeyCheck = null) + { + _key = key; + _value = value; + _isKeyCheck = isKeyCheck; + } + /// + /// The to check + /// + public TKey Key { get { return _key; } set { _key = value; updateRemote(); } } + /// + /// The to check + /// + public TValue Value { get { return _value; } set { _value = value; updateRemote(); } } + /// + /// Set to to check the , set to to check the or leave undefined to check both and + /// + public bool? IsKeyCheck { get { return _isKeyCheck; } set { _isKeyCheck = value; updateRemote(); } } + } +} diff --git a/src/net/KNet/Specific/Streams/Kstream/KNetPredicateTester.cs b/src/net/KNet/Specific/Streams/Kstream/KNetPredicateTester.cs deleted file mode 100644 index 6945681f98..0000000000 --- a/src/net/KNet/Specific/Streams/Kstream/KNetPredicateTester.cs +++ /dev/null @@ -1,68 +0,0 @@ -/* -* Copyright 2023 MASES s.r.l. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* Refer to LICENSE for more information. -*/ - -using MASES.JCOBridge.C2JBridge; - -namespace MASES.KNet.Streams.Kstream -{ - /// - /// KNet extension of - /// - public class KNetPredicateTester : JVMBridgeBase - { - /// - /// - /// - public override string BridgeClassName => "org.mases.knet.streams.kstream.KNetPredicateTester"; - /// - /// Converter from to - /// - public static implicit operator Org.Apache.Kafka.Streams.Kstream.Predicate(KNetPredicateTester t) => t.Cast(); - /// - /// Converter from to - /// - /// This cast is useful when an API needs in input a type like , however the behavior of the in output is different from the same class allocated directly - public static implicit operator Org.Apache.Kafka.Streams.Kstream.Predicate(KNetPredicateTester t) => t.Cast>(); - /// - /// Default constructor: even if the corresponding Java class does not have one, it is mandatory for JCOBridge - /// - public KNetPredicateTester() { } - /// - /// Generic constructor: it is useful for JCOBridge when there is a derived class which needs to pass arguments to the highest JVMBridgeBase class - /// - public KNetPredicateTester(params object[] args) : base(args) { } - /// - /// Initialize a new for key or value comparison - /// - /// The key, or value, to use in comparison - /// if represent a key, otherwise - public KNetPredicateTester(byte[] keyOrValue, bool isKey) - : base(keyOrValue, isKey) - { - } - /// - /// Initialize a new for both key and value comparison - /// - /// The key to use in comparison - /// The value to use in comparison - /// Both and shall be equal to input parameters of to return - public KNetPredicateTester(byte[] key, byte[] value) : base(key, value) - { - } - } -}